This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 1d30805f266 Pipe: Added check for wal disable and ratis consensus when
enabling data region processing (#12407)
1d30805f266 is described below
commit 1d30805f2660ffb4466750bf7de1d3eaeaef1796
Author: Caideyipi <[email protected]>
AuthorDate: Thu Apr 25 17:42:05 2024 +0800
Pipe: Added check for wal disable and ratis consensus when enabling data
region processing (#12407)
---
.../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 2 +-
.../dataregion/IoTDBDataRegionExtractor.java | 43 ++++++++++++++++++----
.../extractor/IoTDBDataRegionExtractorTest.java | 32 ++++++++--------
.../dataregion/wal/WALManagerTest.java | 5 +++
.../dataregion/wal/node/WALNodeTest.java | 5 +++
.../wal/node/WalDeleteOutdatedNewTest.java | 5 +++
6 files changed, 68 insertions(+), 24 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index 6da32720bf5..7c5e852cf6c 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -896,7 +896,7 @@ public class IoTDBConfig {
* on startup and set this variable so that the correct class name can be
obtained later when the
* data region consensus layer singleton is initialized
*/
- private String dataRegionConsensusProtocolClass =
ConsensusFactory.RATIS_CONSENSUS;
+ private String dataRegionConsensusProtocolClass =
ConsensusFactory.IOT_CONSENSUS;
/**
* The consensus protocol class for schema region. The Datanode should
communicate with ConfigNode
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/IoTDBDataRegionExtractor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/IoTDBDataRegionExtractor.java
index b4291925e97..fe292f2877e 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/IoTDBDataRegionExtractor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/IoTDBDataRegionExtractor.java
@@ -20,8 +20,11 @@
package org.apache.iotdb.db.pipe.extractor.dataregion;
import org.apache.iotdb.commons.consensus.DataRegionId;
+import org.apache.iotdb.commons.exception.IllegalPathException;
import org.apache.iotdb.commons.pipe.extractor.IoTDBExtractor;
import org.apache.iotdb.commons.pipe.pattern.PipePattern;
+import org.apache.iotdb.consensus.ConsensusFactory;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.pipe.event.common.heartbeat.PipeHeartbeatEvent;
import
org.apache.iotdb.db.pipe.extractor.dataregion.historical.PipeHistoricalDataRegionExtractor;
import
org.apache.iotdb.db.pipe.extractor.dataregion.historical.PipeHistoricalDataRegionTsFileExtractor;
@@ -32,6 +35,7 @@ import
org.apache.iotdb.db.pipe.extractor.dataregion.realtime.PipeRealtimeDataRe
import
org.apache.iotdb.db.pipe.extractor.dataregion.realtime.PipeRealtimeDataRegionTsFileExtractor;
import org.apache.iotdb.db.pipe.metric.PipeExtractorMetrics;
import org.apache.iotdb.db.storageengine.StorageEngine;
+import org.apache.iotdb.db.storageengine.dataregion.wal.utils.WALMode;
import
org.apache.iotdb.pipe.api.customizer.configuration.PipeExtractorRuntimeConfiguration;
import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameterValidator;
import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
@@ -90,7 +94,7 @@ public class IoTDBDataRegionExtractor extends IoTDBExtractor {
private boolean hasNoExtractionNeed = true;
@Override
- public void validate(PipeParameterValidator validator) throws Exception {
+ public void validate(final PipeParameterValidator validator) throws
Exception {
super.validate(validator);
final Pair<Boolean, Boolean> insertionDeletionListeningOptionPair =
@@ -102,6 +106,15 @@ public class IoTDBDataRegionExtractor extends
IoTDBExtractor {
}
hasNoExtractionNeed = false;
+ if (insertionDeletionListeningOptionPair.getLeft().equals(true)
+ && IoTDBDescriptor.getInstance()
+ .getConfig()
+ .getDataRegionConsensusProtocolClass()
+ .equals(ConsensusFactory.RATIS_CONSENSUS)) {
+ throw new PipeException(
+ "The pipe cannot transfer data when data region is using ratis
consensus.");
+ }
+
// Validate extractor.pattern.format is within valid range
validator
.validateAttributeValueRange(
@@ -194,7 +207,7 @@ public class IoTDBDataRegionExtractor extends
IoTDBExtractor {
realtimeExtractor.validate(validator);
}
- private void validatePattern(PipePattern pattern) {
+ private void validatePattern(final PipePattern pattern) {
if (!pattern.isLegal()) {
throw new IllegalArgumentException(String.format("Pattern \"%s\" is
illegal.", pattern));
}
@@ -205,7 +218,8 @@ public class IoTDBDataRegionExtractor extends
IoTDBExtractor {
historicalExtractor = new PipeHistoricalDataRegionTsFileExtractor();
}
- private void constructRealtimeExtractor(PipeParameters parameters) {
+ private void constructRealtimeExtractor(final PipeParameters parameters)
+ throws IllegalPathException {
// Enable realtime extractor by default
if (!parameters.getBooleanOrDefault(
Arrays.asList(EXTRACTOR_REALTIME_ENABLE_KEY,
SOURCE_REALTIME_ENABLE_KEY),
@@ -219,6 +233,7 @@ public class IoTDBDataRegionExtractor extends
IoTDBExtractor {
// Use hybrid mode by default
if (!parameters.hasAnyAttributes(EXTRACTOR_REALTIME_MODE_KEY,
SOURCE_REALTIME_MODE_KEY)) {
+ checkWalEnable(parameters);
realtimeExtractor = new PipeRealtimeDataRegionHybridExtractor();
LOGGER.info(
"Pipe: '{}' is not set, use hybrid mode by default.",
EXTRACTOR_REALTIME_MODE_KEY);
@@ -233,12 +248,15 @@ public class IoTDBDataRegionExtractor extends
IoTDBExtractor {
case EXTRACTOR_REALTIME_MODE_HYBRID_VALUE:
case EXTRACTOR_REALTIME_MODE_LOG_VALUE:
case EXTRACTOR_REALTIME_MODE_STREAM_MODE_VALUE:
+ checkWalEnable(parameters);
realtimeExtractor = new PipeRealtimeDataRegionHybridExtractor();
break;
case EXTRACTOR_REALTIME_MODE_FORCED_LOG_VALUE:
+ checkWalEnable(parameters);
realtimeExtractor = new PipeRealtimeDataRegionLogExtractor();
break;
default:
+ checkWalEnable(parameters);
realtimeExtractor = new PipeRealtimeDataRegionHybridExtractor();
if (LOGGER.isWarnEnabled()) {
LOGGER.warn(
@@ -248,8 +266,19 @@ public class IoTDBDataRegionExtractor extends
IoTDBExtractor {
}
}
+ private void checkWalEnable(final PipeParameters parameters) throws
IllegalPathException {
+ if (Boolean.TRUE.equals(
+
DataRegionListeningFilter.parseInsertionDeletionListeningOptionPair(parameters)
+ .getLeft())
+ &&
IoTDBDescriptor.getInstance().getConfig().getWalMode().equals(WALMode.DISABLE))
{
+ throw new PipeException(
+ "The pipe cannot transfer realtime insertion if data region disables
wal. Please set 'realtime.mode'='batch' in source parameters when enabling
realtime transmission.");
+ }
+ }
+
@Override
- public void customize(PipeParameters parameters,
PipeExtractorRuntimeConfiguration configuration)
+ public void customize(
+ final PipeParameters parameters, final PipeExtractorRuntimeConfiguration
configuration)
throws Exception {
if (hasNoExtractionNeed) {
return;
@@ -337,7 +366,7 @@ public class IoTDBDataRegionExtractor extends
IoTDBExtractor {
}
private void startHistoricalExtractorAndRealtimeExtractor(
- AtomicReference<Exception> exceptionHolder) {
+ final AtomicReference<Exception> exceptionHolder) {
try {
// Start realtimeExtractor first to avoid losing data. This may cause
some
// retransmission, yet it is OK according to the idempotency of IoTDB.
@@ -346,7 +375,7 @@ public class IoTDBDataRegionExtractor extends
IoTDBExtractor {
// realtimeExtractor after the process, then this part of data will be
lost.
realtimeExtractor.start();
historicalExtractor.start();
- } catch (Exception e) {
+ } catch (final Exception e) {
exceptionHolder.set(e);
LOGGER.warn(
"Pipe {}@{}: Start historical extractor {} and realtime extractor {}
error.",
@@ -358,7 +387,7 @@ public class IoTDBDataRegionExtractor extends
IoTDBExtractor {
}
}
- private void rethrowExceptionIfAny(AtomicReference<Exception>
exceptionHolder) {
+ private void rethrowExceptionIfAny(final AtomicReference<Exception>
exceptionHolder) {
if (exceptionHolder.get() != null) {
throw new PipeException("failed to start extractors.",
exceptionHolder.get());
}
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/extractor/IoTDBDataRegionExtractorTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/extractor/IoTDBDataRegionExtractorTest.java
index c9f02090016..bdf63161b96 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/extractor/IoTDBDataRegionExtractorTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/extractor/IoTDBDataRegionExtractorTest.java
@@ -32,7 +32,7 @@ import java.util.HashMap;
public class IoTDBDataRegionExtractorTest {
@Test
public void testIoTDBDataRegionExtractor() {
- IoTDBDataRegionExtractor extractor = new IoTDBDataRegionExtractor();
+ final IoTDBDataRegionExtractor extractor = new IoTDBDataRegionExtractor();
try {
extractor.validate(
new PipeParameterValidator(
@@ -50,7 +50,7 @@ public class IoTDBDataRegionExtractorTest {
PipeExtractorConstant.EXTRACTOR_REALTIME_MODE_HYBRID_VALUE);
}
})));
- } catch (Exception e) {
+ } catch (final Exception e) {
Assert.fail();
}
}
@@ -58,23 +58,23 @@ public class IoTDBDataRegionExtractorTest {
@Test
public void testIoTDBDataRegionExtractorWithPattern() {
Assert.assertEquals(
- testIoTDBDataRegionExtractorWithPattern("root.a-b").getClass(),
- IllegalArgumentException.class);
+ IllegalArgumentException.class,
+ testIoTDBDataRegionExtractorWithPattern("root.a-b").getClass());
Assert.assertEquals(
- testIoTDBDataRegionExtractorWithPattern("root.1.a").getClass(),
- IllegalArgumentException.class);
+ IllegalArgumentException.class,
+ testIoTDBDataRegionExtractorWithPattern("root.1.a").getClass());
Assert.assertEquals(
- testIoTDBDataRegionExtractorWithPattern("r").getClass(),
IllegalArgumentException.class);
+ IllegalArgumentException.class,
testIoTDBDataRegionExtractorWithPattern("r").getClass());
Assert.assertEquals(
- testIoTDBDataRegionExtractorWithPattern("").getClass(),
IllegalArgumentException.class);
+ IllegalArgumentException.class,
testIoTDBDataRegionExtractorWithPattern("").getClass());
Assert.assertEquals(
- testIoTDBDataRegionExtractorWithPattern("123").getClass(),
IllegalArgumentException.class);
+ IllegalArgumentException.class,
testIoTDBDataRegionExtractorWithPattern("123").getClass());
Assert.assertEquals(
- testIoTDBDataRegionExtractorWithPattern("root.a b").getClass(),
- IllegalArgumentException.class);
+ IllegalArgumentException.class,
+ testIoTDBDataRegionExtractorWithPattern("root.a b").getClass());
Assert.assertEquals(
- testIoTDBDataRegionExtractorWithPattern("root.a+b").getClass(),
- IllegalArgumentException.class);
+ IllegalArgumentException.class,
+ testIoTDBDataRegionExtractorWithPattern("root.a+b").getClass());
Assert.assertNull(testIoTDBDataRegionExtractorWithPattern("root.ab."));
Assert.assertNull(testIoTDBDataRegionExtractorWithPattern("root.a#b"));
Assert.assertNull(testIoTDBDataRegionExtractorWithPattern("root.一二三"));
@@ -87,8 +87,8 @@ public class IoTDBDataRegionExtractorTest {
Assert.assertNull(testIoTDBDataRegionExtractorWithPattern("root.1"));
}
- public Exception testIoTDBDataRegionExtractorWithPattern(String pattern) {
- try (IoTDBDataRegionExtractor extractor = new IoTDBDataRegionExtractor()) {
+ public Exception testIoTDBDataRegionExtractorWithPattern(final String
pattern) {
+ try (final IoTDBDataRegionExtractor extractor = new
IoTDBDataRegionExtractor()) {
extractor.validate(
new PipeParameterValidator(
new PipeParameters(
@@ -97,7 +97,7 @@ public class IoTDBDataRegionExtractorTest {
put(PipeExtractorConstant.EXTRACTOR_PATTERN_KEY,
pattern);
}
})));
- } catch (Exception e) {
+ } catch (final Exception e) {
return e;
}
return null;
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/wal/WALManagerTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/wal/WALManagerTest.java
index 64e341b6c39..6dd98a05bf6 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/wal/WALManagerTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/wal/WALManagerTest.java
@@ -22,6 +22,7 @@ import org.apache.iotdb.commons.conf.CommonConfig;
import org.apache.iotdb.commons.conf.CommonDescriptor;
import org.apache.iotdb.commons.exception.IllegalPathException;
import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.consensus.ConsensusFactory;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
@@ -56,10 +57,13 @@ public class WALManagerTest {
TestConstant.BASE_OUTPUT_PATH.concat("wal_test3")
};
private String[] prevWalDirs;
+ private String prevConsensus;
@Before
public void setUp() throws Exception {
+ prevConsensus = config.getDataRegionConsensusProtocolClass();
prevWalDirs = commonConfig.getWalDirs();
+
config.setDataRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS);
commonConfig.setWalDirs(walDirs);
EnvironmentUtils.envSetUp();
}
@@ -70,6 +74,7 @@ public class WALManagerTest {
for (String walDir : walDirs) {
EnvironmentUtils.cleanDir(walDir);
}
+ config.setDataRegionConsensusProtocolClass(prevConsensus);
commonConfig.setWalDirs(prevWalDirs);
}
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/wal/node/WALNodeTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/wal/node/WALNodeTest.java
index d011a669a98..61f5e4150e5 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/wal/node/WALNodeTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/wal/node/WALNodeTest.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.db.storageengine.dataregion.wal.node;
import org.apache.iotdb.commons.consensus.DataRegionId;
import org.apache.iotdb.commons.exception.IllegalPathException;
import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.consensus.ConsensusFactory;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
@@ -77,13 +78,16 @@ public class WALNodeTest {
private static final String devicePath = databasePath + ".test_d";
private static final String dataRegionId = "1";
private WALMode prevMode;
+ private String prevConsensus;
private WALNode walNode;
@Before
public void setUp() throws Exception {
EnvironmentUtils.cleanDir(logDirectory);
prevMode = config.getWalMode();
+ prevConsensus = config.getDataRegionConsensusProtocolClass();
config.setWalMode(WALMode.SYNC);
+
config.setDataRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS);
walNode = new WALNode(identifier, logDirectory);
}
@@ -91,6 +95,7 @@ public class WALNodeTest {
public void tearDown() throws Exception {
walNode.close();
config.setWalMode(prevMode);
+ config.setDataRegionConsensusProtocolClass(prevConsensus);
EnvironmentUtils.cleanDir(logDirectory);
StorageEngine.getInstance().reset();
}
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/wal/node/WalDeleteOutdatedNewTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/wal/node/WalDeleteOutdatedNewTest.java
index f51d9307eff..7109a14b030 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/wal/node/WalDeleteOutdatedNewTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/wal/node/WalDeleteOutdatedNewTest.java
@@ -22,6 +22,7 @@ package org.apache.iotdb.db.storageengine.dataregion.wal.node;
import org.apache.iotdb.commons.consensus.DataRegionId;
import org.apache.iotdb.commons.exception.IllegalPathException;
import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.consensus.ConsensusFactory;
import org.apache.iotdb.consensus.iot.log.ConsensusReqReader;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
@@ -64,13 +65,16 @@ public class WalDeleteOutdatedNewTest {
private static final String devicePath = databasePath + ".test_d";
private static final String dataRegionId = "1";
private WALMode prevMode;
+ private String prevConsensus;
private WALNode walNode1;
@Before
public void setUp() throws Exception {
EnvironmentUtils.cleanDir(logDirectory1);
prevMode = config.getWalMode();
+ prevConsensus = config.getDataRegionConsensusProtocolClass();
config.setWalMode(WALMode.SYNC);
+
config.setDataRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS);
walNode1 = new WALNode(identifier1, logDirectory1);
DataRegion dataRegion = new DataRegionTest.DummyDataRegion(logDirectory1,
databasePath);
dataRegion.updatePartitionFileVersion(2911, 0);
@@ -81,6 +85,7 @@ public class WalDeleteOutdatedNewTest {
public void tearDown() throws Exception {
walNode1.close();
config.setWalMode(prevMode);
+ config.setDataRegionConsensusProtocolClass(prevConsensus);
EnvironmentUtils.cleanDir(logDirectory1);
StorageEngine.getInstance().reset();
WALInsertNodeCache.getInstance(1).clear();