This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 1d30805f266 Pipe: Added check for wal disable and ratis consensus when 
enabling data region processing (#12407)
1d30805f266 is described below

commit 1d30805f2660ffb4466750bf7de1d3eaeaef1796
Author: Caideyipi <[email protected]>
AuthorDate: Thu Apr 25 17:42:05 2024 +0800

    Pipe: Added check for wal disable and ratis consensus when enabling data 
region processing (#12407)
---
 .../java/org/apache/iotdb/db/conf/IoTDBConfig.java |  2 +-
 .../dataregion/IoTDBDataRegionExtractor.java       | 43 ++++++++++++++++++----
 .../extractor/IoTDBDataRegionExtractorTest.java    | 32 ++++++++--------
 .../dataregion/wal/WALManagerTest.java             |  5 +++
 .../dataregion/wal/node/WALNodeTest.java           |  5 +++
 .../wal/node/WalDeleteOutdatedNewTest.java         |  5 +++
 6 files changed, 68 insertions(+), 24 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index 6da32720bf5..7c5e852cf6c 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -896,7 +896,7 @@ public class IoTDBConfig {
    * on startup and set this variable so that the correct class name can be 
obtained later when the
    * data region consensus layer singleton is initialized
    */
-  private String dataRegionConsensusProtocolClass = 
ConsensusFactory.RATIS_CONSENSUS;
+  private String dataRegionConsensusProtocolClass = 
ConsensusFactory.IOT_CONSENSUS;
 
   /**
    * The consensus protocol class for schema region. The Datanode should 
communicate with ConfigNode
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/IoTDBDataRegionExtractor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/IoTDBDataRegionExtractor.java
index b4291925e97..fe292f2877e 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/IoTDBDataRegionExtractor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/IoTDBDataRegionExtractor.java
@@ -20,8 +20,11 @@
 package org.apache.iotdb.db.pipe.extractor.dataregion;
 
 import org.apache.iotdb.commons.consensus.DataRegionId;
+import org.apache.iotdb.commons.exception.IllegalPathException;
 import org.apache.iotdb.commons.pipe.extractor.IoTDBExtractor;
 import org.apache.iotdb.commons.pipe.pattern.PipePattern;
+import org.apache.iotdb.consensus.ConsensusFactory;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.pipe.event.common.heartbeat.PipeHeartbeatEvent;
 import 
org.apache.iotdb.db.pipe.extractor.dataregion.historical.PipeHistoricalDataRegionExtractor;
 import 
org.apache.iotdb.db.pipe.extractor.dataregion.historical.PipeHistoricalDataRegionTsFileExtractor;
@@ -32,6 +35,7 @@ import 
org.apache.iotdb.db.pipe.extractor.dataregion.realtime.PipeRealtimeDataRe
 import 
org.apache.iotdb.db.pipe.extractor.dataregion.realtime.PipeRealtimeDataRegionTsFileExtractor;
 import org.apache.iotdb.db.pipe.metric.PipeExtractorMetrics;
 import org.apache.iotdb.db.storageengine.StorageEngine;
+import org.apache.iotdb.db.storageengine.dataregion.wal.utils.WALMode;
 import 
org.apache.iotdb.pipe.api.customizer.configuration.PipeExtractorRuntimeConfiguration;
 import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameterValidator;
 import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
@@ -90,7 +94,7 @@ public class IoTDBDataRegionExtractor extends IoTDBExtractor {
   private boolean hasNoExtractionNeed = true;
 
   @Override
-  public void validate(PipeParameterValidator validator) throws Exception {
+  public void validate(final PipeParameterValidator validator) throws 
Exception {
     super.validate(validator);
 
     final Pair<Boolean, Boolean> insertionDeletionListeningOptionPair =
@@ -102,6 +106,15 @@ public class IoTDBDataRegionExtractor extends 
IoTDBExtractor {
     }
     hasNoExtractionNeed = false;
 
+    if (insertionDeletionListeningOptionPair.getLeft().equals(true)
+        && IoTDBDescriptor.getInstance()
+            .getConfig()
+            .getDataRegionConsensusProtocolClass()
+            .equals(ConsensusFactory.RATIS_CONSENSUS)) {
+      throw new PipeException(
+          "The pipe cannot transfer data when data region is using ratis 
consensus.");
+    }
+
     // Validate extractor.pattern.format is within valid range
     validator
         .validateAttributeValueRange(
@@ -194,7 +207,7 @@ public class IoTDBDataRegionExtractor extends 
IoTDBExtractor {
     realtimeExtractor.validate(validator);
   }
 
-  private void validatePattern(PipePattern pattern) {
+  private void validatePattern(final PipePattern pattern) {
     if (!pattern.isLegal()) {
       throw new IllegalArgumentException(String.format("Pattern \"%s\" is 
illegal.", pattern));
     }
@@ -205,7 +218,8 @@ public class IoTDBDataRegionExtractor extends 
IoTDBExtractor {
     historicalExtractor = new PipeHistoricalDataRegionTsFileExtractor();
   }
 
-  private void constructRealtimeExtractor(PipeParameters parameters) {
+  private void constructRealtimeExtractor(final PipeParameters parameters)
+      throws IllegalPathException {
     // Enable realtime extractor by default
     if (!parameters.getBooleanOrDefault(
         Arrays.asList(EXTRACTOR_REALTIME_ENABLE_KEY, 
SOURCE_REALTIME_ENABLE_KEY),
@@ -219,6 +233,7 @@ public class IoTDBDataRegionExtractor extends 
IoTDBExtractor {
 
     // Use hybrid mode by default
     if (!parameters.hasAnyAttributes(EXTRACTOR_REALTIME_MODE_KEY, 
SOURCE_REALTIME_MODE_KEY)) {
+      checkWalEnable(parameters);
       realtimeExtractor = new PipeRealtimeDataRegionHybridExtractor();
       LOGGER.info(
           "Pipe: '{}' is not set, use hybrid mode by default.", 
EXTRACTOR_REALTIME_MODE_KEY);
@@ -233,12 +248,15 @@ public class IoTDBDataRegionExtractor extends 
IoTDBExtractor {
       case EXTRACTOR_REALTIME_MODE_HYBRID_VALUE:
       case EXTRACTOR_REALTIME_MODE_LOG_VALUE:
       case EXTRACTOR_REALTIME_MODE_STREAM_MODE_VALUE:
+        checkWalEnable(parameters);
         realtimeExtractor = new PipeRealtimeDataRegionHybridExtractor();
         break;
       case EXTRACTOR_REALTIME_MODE_FORCED_LOG_VALUE:
+        checkWalEnable(parameters);
         realtimeExtractor = new PipeRealtimeDataRegionLogExtractor();
         break;
       default:
+        checkWalEnable(parameters);
         realtimeExtractor = new PipeRealtimeDataRegionHybridExtractor();
         if (LOGGER.isWarnEnabled()) {
           LOGGER.warn(
@@ -248,8 +266,19 @@ public class IoTDBDataRegionExtractor extends 
IoTDBExtractor {
     }
   }
 
+  private void checkWalEnable(final PipeParameters parameters) throws 
IllegalPathException {
+    if (Boolean.TRUE.equals(
+            
DataRegionListeningFilter.parseInsertionDeletionListeningOptionPair(parameters)
+                .getLeft())
+        && 
IoTDBDescriptor.getInstance().getConfig().getWalMode().equals(WALMode.DISABLE)) 
{
+      throw new PipeException(
+          "The pipe cannot transfer realtime insertion if data region disables 
wal. Please set 'realtime.mode'='batch' in source parameters when enabling 
realtime transmission.");
+    }
+  }
+
   @Override
-  public void customize(PipeParameters parameters, 
PipeExtractorRuntimeConfiguration configuration)
+  public void customize(
+      final PipeParameters parameters, final PipeExtractorRuntimeConfiguration 
configuration)
       throws Exception {
     if (hasNoExtractionNeed) {
       return;
@@ -337,7 +366,7 @@ public class IoTDBDataRegionExtractor extends 
IoTDBExtractor {
   }
 
   private void startHistoricalExtractorAndRealtimeExtractor(
-      AtomicReference<Exception> exceptionHolder) {
+      final AtomicReference<Exception> exceptionHolder) {
     try {
       // Start realtimeExtractor first to avoid losing data. This may cause 
some
       // retransmission, yet it is OK according to the idempotency of IoTDB.
@@ -346,7 +375,7 @@ public class IoTDBDataRegionExtractor extends 
IoTDBExtractor {
       // realtimeExtractor after the process, then this part of data will be 
lost.
       realtimeExtractor.start();
       historicalExtractor.start();
-    } catch (Exception e) {
+    } catch (final Exception e) {
       exceptionHolder.set(e);
       LOGGER.warn(
           "Pipe {}@{}: Start historical extractor {} and realtime extractor {} 
error.",
@@ -358,7 +387,7 @@ public class IoTDBDataRegionExtractor extends 
IoTDBExtractor {
     }
   }
 
-  private void rethrowExceptionIfAny(AtomicReference<Exception> 
exceptionHolder) {
+  private void rethrowExceptionIfAny(final AtomicReference<Exception> 
exceptionHolder) {
     if (exceptionHolder.get() != null) {
       throw new PipeException("failed to start extractors.", 
exceptionHolder.get());
     }
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/extractor/IoTDBDataRegionExtractorTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/extractor/IoTDBDataRegionExtractorTest.java
index c9f02090016..bdf63161b96 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/extractor/IoTDBDataRegionExtractorTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/extractor/IoTDBDataRegionExtractorTest.java
@@ -32,7 +32,7 @@ import java.util.HashMap;
 public class IoTDBDataRegionExtractorTest {
   @Test
   public void testIoTDBDataRegionExtractor() {
-    IoTDBDataRegionExtractor extractor = new IoTDBDataRegionExtractor();
+    final IoTDBDataRegionExtractor extractor = new IoTDBDataRegionExtractor();
     try {
       extractor.validate(
           new PipeParameterValidator(
@@ -50,7 +50,7 @@ public class IoTDBDataRegionExtractorTest {
                           
PipeExtractorConstant.EXTRACTOR_REALTIME_MODE_HYBRID_VALUE);
                     }
                   })));
-    } catch (Exception e) {
+    } catch (final Exception e) {
       Assert.fail();
     }
   }
@@ -58,23 +58,23 @@ public class IoTDBDataRegionExtractorTest {
   @Test
   public void testIoTDBDataRegionExtractorWithPattern() {
     Assert.assertEquals(
-        testIoTDBDataRegionExtractorWithPattern("root.a-b").getClass(),
-        IllegalArgumentException.class);
+        IllegalArgumentException.class,
+        testIoTDBDataRegionExtractorWithPattern("root.a-b").getClass());
     Assert.assertEquals(
-        testIoTDBDataRegionExtractorWithPattern("root.1.a").getClass(),
-        IllegalArgumentException.class);
+        IllegalArgumentException.class,
+        testIoTDBDataRegionExtractorWithPattern("root.1.a").getClass());
     Assert.assertEquals(
-        testIoTDBDataRegionExtractorWithPattern("r").getClass(), 
IllegalArgumentException.class);
+        IllegalArgumentException.class, 
testIoTDBDataRegionExtractorWithPattern("r").getClass());
     Assert.assertEquals(
-        testIoTDBDataRegionExtractorWithPattern("").getClass(), 
IllegalArgumentException.class);
+        IllegalArgumentException.class, 
testIoTDBDataRegionExtractorWithPattern("").getClass());
     Assert.assertEquals(
-        testIoTDBDataRegionExtractorWithPattern("123").getClass(), 
IllegalArgumentException.class);
+        IllegalArgumentException.class, 
testIoTDBDataRegionExtractorWithPattern("123").getClass());
     Assert.assertEquals(
-        testIoTDBDataRegionExtractorWithPattern("root.a b").getClass(),
-        IllegalArgumentException.class);
+        IllegalArgumentException.class,
+        testIoTDBDataRegionExtractorWithPattern("root.a b").getClass());
     Assert.assertEquals(
-        testIoTDBDataRegionExtractorWithPattern("root.a+b").getClass(),
-        IllegalArgumentException.class);
+        IllegalArgumentException.class,
+        testIoTDBDataRegionExtractorWithPattern("root.a+b").getClass());
     Assert.assertNull(testIoTDBDataRegionExtractorWithPattern("root.ab."));
     Assert.assertNull(testIoTDBDataRegionExtractorWithPattern("root.a#b"));
     Assert.assertNull(testIoTDBDataRegionExtractorWithPattern("root.一二三"));
@@ -87,8 +87,8 @@ public class IoTDBDataRegionExtractorTest {
     Assert.assertNull(testIoTDBDataRegionExtractorWithPattern("root.1"));
   }
 
-  public Exception testIoTDBDataRegionExtractorWithPattern(String pattern) {
-    try (IoTDBDataRegionExtractor extractor = new IoTDBDataRegionExtractor()) {
+  public Exception testIoTDBDataRegionExtractorWithPattern(final String 
pattern) {
+    try (final IoTDBDataRegionExtractor extractor = new 
IoTDBDataRegionExtractor()) {
       extractor.validate(
           new PipeParameterValidator(
               new PipeParameters(
@@ -97,7 +97,7 @@ public class IoTDBDataRegionExtractorTest {
                       put(PipeExtractorConstant.EXTRACTOR_PATTERN_KEY, 
pattern);
                     }
                   })));
-    } catch (Exception e) {
+    } catch (final Exception e) {
       return e;
     }
     return null;
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/wal/WALManagerTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/wal/WALManagerTest.java
index 64e341b6c39..6dd98a05bf6 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/wal/WALManagerTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/wal/WALManagerTest.java
@@ -22,6 +22,7 @@ import org.apache.iotdb.commons.conf.CommonConfig;
 import org.apache.iotdb.commons.conf.CommonDescriptor;
 import org.apache.iotdb.commons.exception.IllegalPathException;
 import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.consensus.ConsensusFactory;
 import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
@@ -56,10 +57,13 @@ public class WALManagerTest {
         TestConstant.BASE_OUTPUT_PATH.concat("wal_test3")
       };
   private String[] prevWalDirs;
+  private String prevConsensus;
 
   @Before
   public void setUp() throws Exception {
+    prevConsensus = config.getDataRegionConsensusProtocolClass();
     prevWalDirs = commonConfig.getWalDirs();
+    
config.setDataRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS);
     commonConfig.setWalDirs(walDirs);
     EnvironmentUtils.envSetUp();
   }
@@ -70,6 +74,7 @@ public class WALManagerTest {
     for (String walDir : walDirs) {
       EnvironmentUtils.cleanDir(walDir);
     }
+    config.setDataRegionConsensusProtocolClass(prevConsensus);
     commonConfig.setWalDirs(prevWalDirs);
   }
 
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/wal/node/WALNodeTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/wal/node/WALNodeTest.java
index d011a669a98..61f5e4150e5 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/wal/node/WALNodeTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/wal/node/WALNodeTest.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.db.storageengine.dataregion.wal.node;
 import org.apache.iotdb.commons.consensus.DataRegionId;
 import org.apache.iotdb.commons.exception.IllegalPathException;
 import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.consensus.ConsensusFactory;
 import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
@@ -77,13 +78,16 @@ public class WALNodeTest {
   private static final String devicePath = databasePath + ".test_d";
   private static final String dataRegionId = "1";
   private WALMode prevMode;
+  private String prevConsensus;
   private WALNode walNode;
 
   @Before
   public void setUp() throws Exception {
     EnvironmentUtils.cleanDir(logDirectory);
     prevMode = config.getWalMode();
+    prevConsensus = config.getDataRegionConsensusProtocolClass();
     config.setWalMode(WALMode.SYNC);
+    
config.setDataRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS);
     walNode = new WALNode(identifier, logDirectory);
   }
 
@@ -91,6 +95,7 @@ public class WALNodeTest {
   public void tearDown() throws Exception {
     walNode.close();
     config.setWalMode(prevMode);
+    config.setDataRegionConsensusProtocolClass(prevConsensus);
     EnvironmentUtils.cleanDir(logDirectory);
     StorageEngine.getInstance().reset();
   }
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/wal/node/WalDeleteOutdatedNewTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/wal/node/WalDeleteOutdatedNewTest.java
index f51d9307eff..7109a14b030 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/wal/node/WalDeleteOutdatedNewTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/wal/node/WalDeleteOutdatedNewTest.java
@@ -22,6 +22,7 @@ package org.apache.iotdb.db.storageengine.dataregion.wal.node;
 import org.apache.iotdb.commons.consensus.DataRegionId;
 import org.apache.iotdb.commons.exception.IllegalPathException;
 import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.consensus.ConsensusFactory;
 import org.apache.iotdb.consensus.iot.log.ConsensusReqReader;
 import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
@@ -64,13 +65,16 @@ public class WalDeleteOutdatedNewTest {
   private static final String devicePath = databasePath + ".test_d";
   private static final String dataRegionId = "1";
   private WALMode prevMode;
+  private String prevConsensus;
   private WALNode walNode1;
 
   @Before
   public void setUp() throws Exception {
     EnvironmentUtils.cleanDir(logDirectory1);
     prevMode = config.getWalMode();
+    prevConsensus = config.getDataRegionConsensusProtocolClass();
     config.setWalMode(WALMode.SYNC);
+    
config.setDataRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS);
     walNode1 = new WALNode(identifier1, logDirectory1);
     DataRegion dataRegion = new DataRegionTest.DummyDataRegion(logDirectory1, 
databasePath);
     dataRegion.updatePartitionFileVersion(2911, 0);
@@ -81,6 +85,7 @@ public class WalDeleteOutdatedNewTest {
   public void tearDown() throws Exception {
     walNode1.close();
     config.setWalMode(prevMode);
+    config.setDataRegionConsensusProtocolClass(prevConsensus);
     EnvironmentUtils.cleanDir(logDirectory1);
     StorageEngine.getInstance().reset();
     WALInsertNodeCache.getInstance(1).clear();

Reply via email to