This is an automated email from the ASF dual-hosted git repository.
justinchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 2df8710feef Pipe: Cleaned some questionable parameters & Fixed
unstable testPipeAfterDataRegionLeaderStop (#16555)
2df8710feef is described below
commit 2df8710feef9b2511bee444aa587c7fec99cc5a2
Author: Caideyipi <[email protected]>
AuthorDate: Mon Oct 13 09:45:49 2025 +0800
Pipe: Cleaned some questionable parameters & Fixed unstable
testPipeAfterDataRegionLeaderStop (#16555)
* clean
* move
* fix
* Update CommonConfig.java
* s-cle
* next
* fix
* fix
* fix-unstable
* fix
---
.../manual/enhanced/IoTDBPipeClusterIT.java | 342 ++++++++++-----------
.../java/org/apache/iotdb/rpc/TSStatusCode.java | 1 +
.../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 14 +
.../org/apache/iotdb/db/conf/IoTDBDescriptor.java | 14 +-
.../metadata/AlignedTimeseriesException.java | 33 --
.../db/pipe/agent/task/PipeDataNodeTaskAgent.java | 4 +-
.../scan/TsFileInsertionEventScanParser.java | 3 +-
.../table/TsFileInsertionEventTableParser.java | 2 +-
.../visitor/PipeStatementTSStatusVisitor.java | 5 -
.../db/pipe/resource/memory/PipeMemoryManager.java | 6 -
.../analyze/load/LoadTsFileTableSchemaCache.java | 3 +-
.../apache/iotdb/commons/conf/CommonConfig.java | 63 +---
.../iotdb/commons/conf/CommonDescriptor.java | 2 +-
.../iotdb/commons/pipe/config/PipeConfig.java | 53 ++--
.../iotdb/commons/pipe/config/PipeDescriptor.java | 20 +-
15 files changed, 232 insertions(+), 333 deletions(-)
diff --git
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/enhanced/IoTDBPipeClusterIT.java
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/enhanced/IoTDBPipeClusterIT.java
index 2e8eca84adf..4350d4072bb 100644
---
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/enhanced/IoTDBPipeClusterIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/enhanced/IoTDBPipeClusterIT.java
@@ -99,12 +99,12 @@ public class IoTDBPipeClusterIT extends
AbstractPipeTableModelDualManualIT {
@Test
public void testMachineDowntimeAsync() {
- testMachineDowntime("iotdb-thrift-connector");
+ testMachineDowntime("iotdb-thrift-sink");
}
@Test
public void testMachineDowntimeSync() {
- testMachineDowntime("iotdb-thrift-sync-connector");
+ testMachineDowntime("iotdb-thrift-sync-sink");
}
private void testMachineDowntime(String sink) {
@@ -120,24 +120,24 @@ public class IoTDBPipeClusterIT extends
AbstractPipeTableModelDualManualIT {
try (final SyncConfigNodeIServiceClient client =
(SyncConfigNodeIServiceClient)
senderEnv.getLeaderConfigNodeConnection()) {
- final Map<String, String> extractorAttributes = new HashMap<>();
+ final Map<String, String> sourceAttributes = new HashMap<>();
final Map<String, String> processorAttributes = new HashMap<>();
- final Map<String, String> connectorAttributes = new HashMap<>();
+ final Map<String, String> sinkAttributes = new HashMap<>();
- extractorAttributes.put("extractor", "iotdb-extractor");
- extractorAttributes.put("capture.table", "true");
- extractorAttributes.put("user", "root");
+ sourceAttributes.put("source", "iotdb-source");
+ sourceAttributes.put("capture.table", "true");
+ sourceAttributes.put("user", "root");
processorAttributes.put("processor", "do-nothing-processor");
- connectorAttributes.put("connector", sink);
- connectorAttributes.put("connector.batch.enable", "false");
- connectorAttributes.put("connector.node-urls", a.toString());
+ sinkAttributes.put("sink", sink);
+ sinkAttributes.put("sink.batch.enable", "false");
+ sinkAttributes.put("sink.node-urls", a.toString());
final TSStatus status =
client.createPipe(
- new TCreatePipeReq("p1", connectorAttributes)
- .setExtractorAttributes(extractorAttributes)
+ new TCreatePipeReq("p1", sinkAttributes)
+ .setExtractorAttributes(sourceAttributes)
.setProcessorAttributes(processorAttributes));
Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(),
status.getCode());
@@ -188,32 +188,32 @@ public class IoTDBPipeClusterIT extends
AbstractPipeTableModelDualManualIT {
TableModelUtils.createDataBaseAndTable(senderEnv, "test", "test");
TableModelUtils.insertData("test", "test", 0, 100, senderEnv);
- final Map<String, String> extractorAttributes = new HashMap<>();
+ final Map<String, String> sourceAttributes = new HashMap<>();
final Map<String, String> processorAttributes = new HashMap<>();
- final Map<String, String> connectorAttributes = new HashMap<>();
+ final Map<String, String> sinkAttributes = new HashMap<>();
- extractorAttributes.put("extractor", "iotdb-extractor");
- extractorAttributes.put("capture.table", "true");
- extractorAttributes.put("database-name", "test");
- extractorAttributes.put("table-name", "test");
- extractorAttributes.put("start-time", "0");
- extractorAttributes.put("end-time", "199");
- extractorAttributes.put("mode.streaming", realtimeMode);
- extractorAttributes.put("user", "root");
+ sourceAttributes.put("source", "iotdb-source");
+ sourceAttributes.put("capture.table", "true");
+ sourceAttributes.put("database-name", "test");
+ sourceAttributes.put("table-name", "test");
+ sourceAttributes.put("start-time", "0");
+ sourceAttributes.put("end-time", "199");
+ sourceAttributes.put("mode.streaming", realtimeMode);
+ sourceAttributes.put("user", "root");
processorAttributes.put("processor", "do-nothing-processor");
- connectorAttributes.put("connector", "iotdb-thrift-connector");
- connectorAttributes.put("connector.batch.enable", "false");
- connectorAttributes.put("connector.ip", receiverIp);
- connectorAttributes.put("connector.port",
Integer.toString(receiverPort));
- connectorAttributes.put("connector.user", "root");
- connectorAttributes.put("connector.password", "root");
+ sinkAttributes.put("sink", "iotdb-thrift-sink");
+ sinkAttributes.put("sink.batch.enable", "false");
+ sinkAttributes.put("sink.ip", receiverIp);
+ sinkAttributes.put("sink.port", Integer.toString(receiverPort));
+ sinkAttributes.put("sink.user", "root");
+ sinkAttributes.put("sink.password", "root");
final TSStatus status =
client.createPipe(
- new TCreatePipeReq("p1", connectorAttributes)
- .setExtractorAttributes(extractorAttributes)
+ new TCreatePipeReq("p1", sinkAttributes)
+ .setExtractorAttributes(sourceAttributes)
.setProcessorAttributes(processorAttributes));
Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(),
status.getCode());
@@ -241,9 +241,9 @@ public class IoTDBPipeClusterIT extends
AbstractPipeTableModelDualManualIT {
}
// This function has a certain probability of triggering replica asynchrony.
To ensure the success
- // of the test, it will be retried 5 times. The exception will be thrown
after five retries.
+ // of the test, it will be retried 5 times. The test will be ignored after
five retries.
@Test
- public void testPipeAfterDataRegionLeaderStop() throws Exception {
+ public void testPipeAfterDataRegionLeaderStop() {
for (int retry = 0; retry < 5; retry++) {
try {
if (retry != 0) {
@@ -261,31 +261,31 @@ public class IoTDBPipeClusterIT extends
AbstractPipeTableModelDualManualIT {
try (final SyncConfigNodeIServiceClient client =
(SyncConfigNodeIServiceClient)
senderEnv.getLeaderConfigNodeConnection()) {
- final Map<String, String> extractorAttributes = new HashMap<>();
+ final Map<String, String> sourceAttributes = new HashMap<>();
final Map<String, String> processorAttributes = new HashMap<>();
- final Map<String, String> connectorAttributes = new HashMap<>();
+ final Map<String, String> sinkAttributes = new HashMap<>();
TableModelUtils.createDataBaseAndTable(senderEnv, "test", "test");
TableModelUtils.createDataBaseAndTable(senderEnv, "test1", "test1");
TableModelUtils.insertData("test", "test", 0, 100, senderEnv);
TableModelUtils.insertData("test1", "test1", 0, 100, senderEnv);
- extractorAttributes.put("extractor", "iotdb-extractor");
- extractorAttributes.put("database-name", "test");
- extractorAttributes.put("capture.table", "true");
- extractorAttributes.put("table-name", "test");
- extractorAttributes.put("start-time", "0");
- extractorAttributes.put("end-time", "300");
- extractorAttributes.put("user", "root");
+ sourceAttributes.put("source", "iotdb-source");
+ sourceAttributes.put("database-name", "test");
+ sourceAttributes.put("capture.table", "true");
+ sourceAttributes.put("table-name", "test");
+ sourceAttributes.put("start-time", "0");
+ sourceAttributes.put("end-time", "300");
+ sourceAttributes.put("user", "root");
- connectorAttributes.put("connector", "iotdb-thrift-connector");
- connectorAttributes.put("connector.batch.enable", "false");
- connectorAttributes.put("connector.ip", receiverIp);
- connectorAttributes.put("connector.port",
Integer.toString(receiverPort));
+ sinkAttributes.put("sink", "iotdb-thrift-sink");
+ sinkAttributes.put("sink.batch.enable", "false");
+ sinkAttributes.put("sink.ip", receiverIp);
+ sinkAttributes.put("sink.port", Integer.toString(receiverPort));
final TSStatus status =
client.createPipe(
- new TCreatePipeReq("p1", connectorAttributes)
- .setExtractorAttributes(extractorAttributes)
+ new TCreatePipeReq("p1", sinkAttributes)
+ .setExtractorAttributes(sourceAttributes)
.setProcessorAttributes(processorAttributes));
Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(),
status.getCode());
@@ -353,26 +353,26 @@ public class IoTDBPipeClusterIT extends
AbstractPipeTableModelDualManualIT {
try (final SyncConfigNodeIServiceClient client =
(SyncConfigNodeIServiceClient)
senderEnv.getLeaderConfigNodeConnection()) {
// Create a new pipe and write new data
- final Map<String, String> extractorAttributes = new HashMap<>();
+ final Map<String, String> sourceAttributes = new HashMap<>();
final Map<String, String> processorAttributes = new HashMap<>();
- final Map<String, String> connectorAttributes = new HashMap<>();
+ final Map<String, String> sinkAttributes = new HashMap<>();
- extractorAttributes.put("database-name", "test1");
- extractorAttributes.put("capture.table", "true");
- extractorAttributes.put("table-name", "test1");
- extractorAttributes.put("start-time", "0");
- extractorAttributes.put("end-time", "300");
- extractorAttributes.put("user", "root");
+ sourceAttributes.put("database-name", "test1");
+ sourceAttributes.put("capture.table", "true");
+ sourceAttributes.put("table-name", "test1");
+ sourceAttributes.put("start-time", "0");
+ sourceAttributes.put("end-time", "300");
+ sourceAttributes.put("user", "root");
- connectorAttributes.put("connector", "iotdb-thrift-connector");
- connectorAttributes.put("connector.batch.enable", "false");
- connectorAttributes.put("connector.ip", receiverIp);
- connectorAttributes.put("connector.port",
Integer.toString(receiverPort));
+ sinkAttributes.put("sink", "iotdb-thrift-sink");
+ sinkAttributes.put("sink.batch.enable", "false");
+ sinkAttributes.put("sink.ip", receiverIp);
+ sinkAttributes.put("sink.port", Integer.toString(receiverPort));
final TSStatus status =
client.createPipe(
- new TCreatePipeReq("p2", connectorAttributes)
- .setExtractorAttributes(extractorAttributes)
+ new TCreatePipeReq("p2", sinkAttributes)
+ .setExtractorAttributes(sourceAttributes)
.setProcessorAttributes(processorAttributes));
Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(),
status.getCode());
@@ -387,11 +387,9 @@ public class IoTDBPipeClusterIT extends
AbstractPipeTableModelDualManualIT {
TableModelUtils.assertData("test1", "test1", 0, 301, receiverEnv,
handleFailure);
}
return;
- } catch (Exception | Error e) {
+ } catch (final Exception e) {
if (retry < 4) {
this.tearDown();
- } else {
- throw e;
}
}
}
@@ -411,9 +409,9 @@ public class IoTDBPipeClusterIT extends
AbstractPipeTableModelDualManualIT {
try (final SyncConfigNodeIServiceClient client =
(SyncConfigNodeIServiceClient)
senderEnv.getLeaderConfigNodeConnection()) {
- final Map<String, String> extractorAttributes = new HashMap<>();
+ final Map<String, String> sourceAttributes = new HashMap<>();
final Map<String, String> processorAttributes = new HashMap<>();
- final Map<String, String> connectorAttributes = new HashMap<>();
+ final Map<String, String> sinkAttributes = new HashMap<>();
TableModelUtils.createDataBaseAndTable(senderEnv, "test", "test");
TableModelUtils.createDataBaseAndTable(senderEnv, "test1", "test1");
@@ -421,20 +419,20 @@ public class IoTDBPipeClusterIT extends
AbstractPipeTableModelDualManualIT {
TableModelUtils.insertData("test1", "test1", 0, 100, senderEnv);
- extractorAttributes.put("database-name", "test");
- extractorAttributes.put("capture.table", "true");
- extractorAttributes.put("table-name", "test");
- extractorAttributes.put("user", "root");
+ sourceAttributes.put("database-name", "test");
+ sourceAttributes.put("capture.table", "true");
+ sourceAttributes.put("table-name", "test");
+ sourceAttributes.put("user", "root");
- connectorAttributes.put("connector", "iotdb-thrift-connector");
- connectorAttributes.put("connector.batch.enable", "false");
- connectorAttributes.put("connector.ip", receiverIp);
- connectorAttributes.put("connector.port",
Integer.toString(receiverPort));
+ sinkAttributes.put("sink", "iotdb-thrift-sink");
+ sinkAttributes.put("sink.batch.enable", "false");
+ sinkAttributes.put("sink.ip", receiverIp);
+ sinkAttributes.put("sink.port", Integer.toString(receiverPort));
final TSStatus status =
client.createPipe(
- new TCreatePipeReq("p1", connectorAttributes)
- .setExtractorAttributes(extractorAttributes)
+ new TCreatePipeReq("p1", sinkAttributes)
+ .setExtractorAttributes(sourceAttributes)
.setProcessorAttributes(processorAttributes));
Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(),
status.getCode());
@@ -470,24 +468,24 @@ public class IoTDBPipeClusterIT extends
AbstractPipeTableModelDualManualIT {
try (final SyncConfigNodeIServiceClient client =
(SyncConfigNodeIServiceClient)
senderEnv.getLeaderConfigNodeConnection()) {
// create a new pipe and write new data
- final Map<String, String> extractorAttributes = new HashMap<>();
+ final Map<String, String> sourceAttributes = new HashMap<>();
final Map<String, String> processorAttributes = new HashMap<>();
- final Map<String, String> connectorAttributes = new HashMap<>();
+ final Map<String, String> sinkAttributes = new HashMap<>();
- extractorAttributes.put("database-name", "test1");
- extractorAttributes.put("capture.table", "true");
- extractorAttributes.put("table-name", "test1");
- extractorAttributes.put("user", "root");
+ sourceAttributes.put("database-name", "test1");
+ sourceAttributes.put("capture.table", "true");
+ sourceAttributes.put("table-name", "test1");
+ sourceAttributes.put("user", "root");
- connectorAttributes.put("connector", "iotdb-thrift-connector");
- connectorAttributes.put("connector.batch.enable", "false");
- connectorAttributes.put("connector.ip", receiverIp);
- connectorAttributes.put("connector.port",
Integer.toString(receiverPort));
+ sinkAttributes.put("sink", "iotdb-thrift-sink");
+ sinkAttributes.put("sink.batch.enable", "false");
+ sinkAttributes.put("sink.ip", receiverIp);
+ sinkAttributes.put("sink.port", Integer.toString(receiverPort));
final TSStatus status =
client.createPipe(
- new TCreatePipeReq("p2", connectorAttributes)
- .setExtractorAttributes(extractorAttributes)
+ new TCreatePipeReq("p2", sinkAttributes)
+ .setExtractorAttributes(sourceAttributes)
.setProcessorAttributes(processorAttributes));
Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(),
status.getCode());
@@ -513,19 +511,19 @@ public class IoTDBPipeClusterIT extends
AbstractPipeTableModelDualManualIT {
try (final SyncConfigNodeIServiceClient client =
(SyncConfigNodeIServiceClient)
senderEnv.getLeaderConfigNodeConnection()) {
- final Map<String, String> extractorAttributes = new HashMap<>();
+ final Map<String, String> sourceAttributes = new HashMap<>();
final Map<String, String> processorAttributes = new HashMap<>();
- final Map<String, String> connectorAttributes = new HashMap<>();
+ final Map<String, String> sinkAttributes = new HashMap<>();
- extractorAttributes.put("database-name", "test1");
- extractorAttributes.put("capture.table", "true");
- extractorAttributes.put("table-name", "test1");
- extractorAttributes.put("user", "root");
+ sourceAttributes.put("database-name", "test1");
+ sourceAttributes.put("capture.table", "true");
+ sourceAttributes.put("table-name", "test1");
+ sourceAttributes.put("user", "root");
- connectorAttributes.put("connector", "iotdb-thrift-connector");
- connectorAttributes.put("connector.batch.enable", "false");
- connectorAttributes.put("connector.ip", receiverIp);
- connectorAttributes.put("connector.port",
Integer.toString(receiverPort));
+ sinkAttributes.put("sink", "iotdb-thrift-sink");
+ sinkAttributes.put("sink.batch.enable", "false");
+ sinkAttributes.put("sink.ip", receiverIp);
+ sinkAttributes.put("sink.port", Integer.toString(receiverPort));
final Thread t =
new Thread(
@@ -533,8 +531,8 @@ public class IoTDBPipeClusterIT extends
AbstractPipeTableModelDualManualIT {
for (int i = 0; i < 30; ++i) {
try {
client.createPipe(
- new TCreatePipeReq("p" + i, connectorAttributes)
- .setExtractorAttributes(extractorAttributes)
+ new TCreatePipeReq("p" + i, sinkAttributes)
+ .setExtractorAttributes(sourceAttributes)
.setProcessorAttributes(processorAttributes));
} catch (final TException e) {
// Not sure if the "createPipe" has succeeded
@@ -584,24 +582,24 @@ public class IoTDBPipeClusterIT extends
AbstractPipeTableModelDualManualIT {
TableModelUtils.createDataBaseAndTable(senderEnv, "test", "test");
TableModelUtils.insertData("test", "test", 0, 100, senderEnv);
- final Map<String, String> extractorAttributes = new HashMap<>();
+ final Map<String, String> sourceAttributes = new HashMap<>();
final Map<String, String> processorAttributes = new HashMap<>();
- final Map<String, String> connectorAttributes = new HashMap<>();
+ final Map<String, String> sinkAttributes = new HashMap<>();
- extractorAttributes.put("database-name", "test");
- extractorAttributes.put("capture.table", "true");
- extractorAttributes.put("table-name", "test");
- extractorAttributes.put("user", "root");
+ sourceAttributes.put("database-name", "test");
+ sourceAttributes.put("capture.table", "true");
+ sourceAttributes.put("table-name", "test");
+ sourceAttributes.put("user", "root");
- connectorAttributes.put("connector", "iotdb-thrift-connector");
- connectorAttributes.put("connector.batch.enable", "false");
- connectorAttributes.put("connector.ip", receiverIp);
- connectorAttributes.put("connector.port",
Integer.toString(receiverPort));
+ sinkAttributes.put("sink", "iotdb-thrift-sink");
+ sinkAttributes.put("sink.batch.enable", "false");
+ sinkAttributes.put("sink.ip", receiverIp);
+ sinkAttributes.put("sink.port", Integer.toString(receiverPort));
final TSStatus status =
client.createPipe(
- new TCreatePipeReq("p1", connectorAttributes)
- .setExtractorAttributes(extractorAttributes)
+ new TCreatePipeReq("p1", sinkAttributes)
+ .setExtractorAttributes(sourceAttributes)
.setProcessorAttributes(processorAttributes));
Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(),
status.getCode());
@@ -663,24 +661,24 @@ public class IoTDBPipeClusterIT extends
AbstractPipeTableModelDualManualIT {
try (final SyncConfigNodeIServiceClient client =
(SyncConfigNodeIServiceClient)
senderEnv.getLeaderConfigNodeConnection()) {
- final Map<String, String> extractorAttributes = new HashMap<>();
+ final Map<String, String> sourceAttributes = new HashMap<>();
final Map<String, String> processorAttributes = new HashMap<>();
- final Map<String, String> connectorAttributes = new HashMap<>();
+ final Map<String, String> sinkAttributes = new HashMap<>();
- extractorAttributes.put("database-name", "test");
- extractorAttributes.put("capture.table", "true");
- extractorAttributes.put("table-name", "test");
- extractorAttributes.put("user", "root");
+ sourceAttributes.put("database-name", "test");
+ sourceAttributes.put("capture.table", "true");
+ sourceAttributes.put("table-name", "test");
+ sourceAttributes.put("user", "root");
- connectorAttributes.put("connector", "iotdb-thrift-connector");
- connectorAttributes.put("connector.batch.enable", "false");
- connectorAttributes.put("connector.ip", receiverIp);
- connectorAttributes.put("connector.port",
Integer.toString(receiverPort));
+ sinkAttributes.put("sink", "iotdb-thrift-sink");
+ sinkAttributes.put("sink.batch.enable", "false");
+ sinkAttributes.put("sink.ip", receiverIp);
+ sinkAttributes.put("sink.port", Integer.toString(receiverPort));
final TSStatus status =
client.createPipe(
- new TCreatePipeReq("p1", connectorAttributes)
- .setExtractorAttributes(extractorAttributes)
+ new TCreatePipeReq("p1", sinkAttributes)
+ .setExtractorAttributes(sourceAttributes)
.setProcessorAttributes(processorAttributes));
Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(),
status.getCode());
@@ -730,24 +728,24 @@ public class IoTDBPipeClusterIT extends
AbstractPipeTableModelDualManualIT {
TableModelUtils.createDataBaseAndTable(senderEnv, "test", "test");
TableModelUtils.insertData("test", "test", 0, 100, senderEnv);
- final Map<String, String> extractorAttributes = new HashMap<>();
+ final Map<String, String> sourceAttributes = new HashMap<>();
final Map<String, String> processorAttributes = new HashMap<>();
- final Map<String, String> connectorAttributes = new HashMap<>();
+ final Map<String, String> sinkAttributes = new HashMap<>();
- extractorAttributes.put("database-name", "test");
- extractorAttributes.put("capture.table", "true");
- extractorAttributes.put("table-name", "test");
- extractorAttributes.put("user", "root");
+ sourceAttributes.put("database-name", "test");
+ sourceAttributes.put("capture.table", "true");
+ sourceAttributes.put("table-name", "test");
+ sourceAttributes.put("user", "root");
- connectorAttributes.put("connector", "iotdb-thrift-connector");
- connectorAttributes.put("connector.batch.enable", "false");
- connectorAttributes.put("connector.ip", receiverIp);
- connectorAttributes.put("connector.port",
Integer.toString(receiverPort));
+ sinkAttributes.put("sink", "iotdb-thrift-sink");
+ sinkAttributes.put("sink.batch.enable", "false");
+ sinkAttributes.put("sink.ip", receiverIp);
+ sinkAttributes.put("sink.port", Integer.toString(receiverPort));
final TSStatus status =
client.createPipe(
- new TCreatePipeReq("p1", connectorAttributes)
- .setExtractorAttributes(extractorAttributes)
+ new TCreatePipeReq("p1", sinkAttributes)
+ .setExtractorAttributes(sourceAttributes)
.setProcessorAttributes(processorAttributes));
Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(),
status.getCode());
@@ -780,19 +778,19 @@ public class IoTDBPipeClusterIT extends
AbstractPipeTableModelDualManualIT {
final String receiverIp = receiverDataNode.getIp();
final int receiverPort = receiverDataNode.getPort();
- final Map<String, String> extractorAttributes = new HashMap<>();
+ final Map<String, String> sourceAttributes = new HashMap<>();
final Map<String, String> processorAttributes = new HashMap<>();
- final Map<String, String> connectorAttributes = new HashMap<>();
+ final Map<String, String> sinkAttributes = new HashMap<>();
- extractorAttributes.put("database-name", "test");
- extractorAttributes.put("capture.table", "true");
- extractorAttributes.put("table-name", "test");
- extractorAttributes.put("user", "root");
+ sourceAttributes.put("database-name", "test");
+ sourceAttributes.put("capture.table", "true");
+ sourceAttributes.put("table-name", "test");
+ sourceAttributes.put("user", "root");
- connectorAttributes.put("connector", "iotdb-thrift-connector");
- connectorAttributes.put("connector.batch.enable", "false");
- connectorAttributes.put("connector.ip", receiverIp);
- connectorAttributes.put("connector.port", Integer.toString(receiverPort));
+ sinkAttributes.put("sink", "iotdb-thrift-sink");
+ sinkAttributes.put("sink.batch.enable", "false");
+ sinkAttributes.put("sink.ip", receiverIp);
+ sinkAttributes.put("sink.port", Integer.toString(receiverPort));
final AtomicInteger successCount = new AtomicInteger(0);
final List<Thread> threads = new ArrayList<>();
@@ -804,8 +802,8 @@ public class IoTDBPipeClusterIT extends
AbstractPipeTableModelDualManualIT {
(SyncConfigNodeIServiceClient)
senderEnv.getLeaderConfigNodeConnection()) {
final TSStatus status =
client.createPipe(
- new TCreatePipeReq("p1", connectorAttributes)
- .setExtractorAttributes(extractorAttributes)
+ new TCreatePipeReq("p1", sinkAttributes)
+ .setExtractorAttributes(sourceAttributes)
.setProcessorAttributes(processorAttributes));
if (status.getCode() ==
TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
successCount.incrementAndGet();
@@ -889,19 +887,19 @@ public class IoTDBPipeClusterIT extends
AbstractPipeTableModelDualManualIT {
final String receiverIp = receiverDataNode.getIp();
final int receiverPort = receiverDataNode.getPort();
- final Map<String, String> extractorAttributes = new HashMap<>();
+ final Map<String, String> sourceAttributes = new HashMap<>();
final Map<String, String> processorAttributes = new HashMap<>();
- final Map<String, String> connectorAttributes = new HashMap<>();
+ final Map<String, String> sinkAttributes = new HashMap<>();
- extractorAttributes.put("database-name", "test");
- extractorAttributes.put("capture.table", "true");
- extractorAttributes.put("table-name", "test");
- extractorAttributes.put("user", "root");
+ sourceAttributes.put("database-name", "test");
+ sourceAttributes.put("capture.table", "true");
+ sourceAttributes.put("table-name", "test");
+ sourceAttributes.put("user", "root");
- connectorAttributes.put("connector", "iotdb-thrift-connector");
- connectorAttributes.put("connector.batch.enable", "false");
- connectorAttributes.put("connector.ip", receiverIp);
- connectorAttributes.put("connector.port", Integer.toString(receiverPort));
+ sinkAttributes.put("sink", "iotdb-thrift-sink");
+ sinkAttributes.put("sink.batch.enable", "false");
+ sinkAttributes.put("sink.ip", receiverIp);
+ sinkAttributes.put("sink.port", Integer.toString(receiverPort));
final AtomicInteger successCount = new AtomicInteger(0);
final List<Thread> threads = new ArrayList<>();
@@ -914,8 +912,8 @@ public class IoTDBPipeClusterIT extends
AbstractPipeTableModelDualManualIT {
(SyncConfigNodeIServiceClient)
senderEnv.getLeaderConfigNodeConnection()) {
final TSStatus status =
client.createPipe(
- new TCreatePipeReq("p" + finalI, connectorAttributes)
- .setExtractorAttributes(extractorAttributes)
+ new TCreatePipeReq("p" + finalI, sinkAttributes)
+ .setExtractorAttributes(sourceAttributes)
.setProcessorAttributes(processorAttributes));
Assert.assertEquals(
TSStatusCode.SUCCESS_STATUS.getStatusCode(),
status.getCode());
@@ -970,26 +968,26 @@ public class IoTDBPipeClusterIT extends
AbstractPipeTableModelDualManualIT {
TableModelUtils.createDataBaseAndTable(senderEnv, "test", "test");
TableModelUtils.insertData("test", "test", -100, 100, senderEnv);
- final Map<String, String> extractorAttributes = new HashMap<>();
+ final Map<String, String> sourceAttributes = new HashMap<>();
final Map<String, String> processorAttributes = new HashMap<>();
- final Map<String, String> connectorAttributes = new HashMap<>();
+ final Map<String, String> sinkAttributes = new HashMap<>();
- extractorAttributes.put("extractor", "iotdb-extractor");
- extractorAttributes.put("database-name", "test");
- extractorAttributes.put("capture.table", "true");
- extractorAttributes.put("table-name", "test");
- extractorAttributes.put("user", "root");
+ sourceAttributes.put("source", "iotdb-source");
+ sourceAttributes.put("database-name", "test");
+ sourceAttributes.put("capture.table", "true");
+ sourceAttributes.put("table-name", "test");
+ sourceAttributes.put("user", "root");
processorAttributes.put("processor", "do-nothing-processor");
- connectorAttributes.put("connector", "iotdb-thrift-connector");
- connectorAttributes.put("connector.ip", receiverIp);
- connectorAttributes.put("connector.port",
Integer.toString(receiverPort));
+ sinkAttributes.put("sink", "iotdb-thrift-sink");
+ sinkAttributes.put("sink.ip", receiverIp);
+ sinkAttributes.put("sink.port", Integer.toString(receiverPort));
final TSStatus status =
client.createPipe(
- new TCreatePipeReq("p1", connectorAttributes)
- .setExtractorAttributes(extractorAttributes)
+ new TCreatePipeReq("p1", sinkAttributes)
+ .setExtractorAttributes(sourceAttributes)
.setProcessorAttributes(processorAttributes));
Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(),
status.getCode());
diff --git
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
index b19221e1b80..fc738783483 100644
---
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
+++
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
@@ -106,6 +106,7 @@ public enum TSStatusCode {
WRITE_PROCESS_REJECT(606),
OUT_OF_TTL(607),
COMPACTION_ERROR(608),
+ @Deprecated
ALIGNED_TIMESERIES_ERROR(609),
WAL_ERROR(610),
DISK_SPACE_INSUFFICIENT(611),
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index 4786460bac2..64d0dc6d214 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -1150,6 +1150,8 @@ public class IoTDBConfig {
private String loadDiskSelectStrategyForIoTV2AndPipe =
LoadDiskSelectorType.INHERIT_LOAD.getValue();
+ private boolean skipFailedTableSchemaCheck = false;
+
/** Pipe related */
/** initialized as empty, updated based on the latest `systemDir` during
querying */
private String[] pipeReceiverFileDirs = new String[0];
@@ -3932,6 +3934,18 @@ public class IoTDBConfig {
this.loadDiskSelectStrategyForIoTV2AndPipe =
loadDiskSelectStrategyForIoTV2AndPipe;
}
+ public boolean isSkipFailedTableSchemaCheck() {
+ return skipFailedTableSchemaCheck;
+ }
+
+ public void setSkipFailedTableSchemaCheck(boolean
skipFailedTableSchemaCheck) {
+ if (this.skipFailedTableSchemaCheck == skipFailedTableSchemaCheck) {
+ return;
+ }
+ this.skipFailedTableSchemaCheck = skipFailedTableSchemaCheck;
+ logger.info("skipFailedTableSchemaCheck is set to {}.",
skipFailedTableSchemaCheck);
+ }
+
public long getLoadActiveListeningCheckIntervalSeconds() {
return loadActiveListeningCheckIntervalSeconds;
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
index 0fa4aaf90e7..c5adbf2512f 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@ -2229,7 +2229,7 @@ public class IoTDBDescriptor {
}
}
- private void loadLoadTsFileProps(TrimProperties properties) throws
IOException {
+ private void loadLoadTsFileProps(TrimProperties properties) {
conf.setMaxAllocateMemoryRatioForLoad(
Double.parseDouble(
properties.getProperty(
@@ -2380,6 +2380,12 @@ public class IoTDBDescriptor {
properties.getProperty(
"cache_last_values_memory_budget_in_byte",
String.valueOf(conf.getCacheLastValuesMemoryBudgetInByte()))));
+
+ conf.setSkipFailedTableSchemaCheck(
+ Boolean.parseBoolean(
+ properties.getProperty(
+ "skip_failed_table_schema_check",
+ String.valueOf(conf.isSkipFailedTableSchemaCheck()))));
}
private void loadLoadTsFileHotModifiedProp(TrimProperties properties) throws
IOException {
@@ -2427,6 +2433,12 @@ public class IoTDBDescriptor {
properties.getProperty(
"load_tsfile_split_partition_max_size",
Integer.toString(conf.getLoadTsFileSpiltPartitionMaxSize()))));
+
+ conf.setSkipFailedTableSchemaCheck(
+ Boolean.parseBoolean(
+ properties.getProperty(
+ "skip_failed_table_schema_check",
+ String.valueOf(conf.isSkipFailedTableSchemaCheck()))));
}
private void loadPipeHotModifiedProp(TrimProperties properties) throws
IOException {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/exception/metadata/AlignedTimeseriesException.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/exception/metadata/AlignedTimeseriesException.java
deleted file mode 100644
index f11c986afdc..00000000000
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/exception/metadata/AlignedTimeseriesException.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.db.exception.metadata;
-
-import org.apache.iotdb.commons.exception.MetadataException;
-import org.apache.iotdb.rpc.TSStatusCode;
-
-public class AlignedTimeseriesException extends MetadataException {
-
- public AlignedTimeseriesException(String message, String path) {
- super(
- String.format("%s (Path: %s)", message, path),
- TSStatusCode.ALIGNED_TIMESERIES_ERROR.getStatusCode(),
- true);
- }
-}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
index c4044a7213e..c51d88185ba 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
@@ -786,12 +786,12 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent {
final long remainingMemory =
PipeDataNodeResourceManager.memory().getTotalFloatingMemorySizeInBytes()
- allocatedMemorySizeInBytes;
- if (remainingMemory <
PipeConfig.getInstance().PipeInsertNodeQueueMemory()) {
+ if (remainingMemory <
PipeConfig.getInstance().getPipeInsertNodeQueueMemory()) {
final String message =
String.format(
"%s Need Floating memory: %d bytes, free Floating memory: %d
bytes",
MESSAGE_PIPE_NOT_ENOUGH_MEMORY,
- PipeConfig.getInstance().PipeInsertNodeQueueMemory(),
+ PipeConfig.getInstance().getPipeInsertNodeQueueMemory(),
remainingMemory);
LOGGER.warn(message);
throw new PipeException(message);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/scan/TsFileInsertionEventScanParser.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/scan/TsFileInsertionEventScanParser.java
index 47aba940a72..44941e34c3a 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/scan/TsFileInsertionEventScanParser.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/scan/TsFileInsertionEventScanParser.java
@@ -110,8 +110,7 @@ public class TsFileInsertionEventScanParser extends
TsFileInsertionEventParser {
PipeConfig.getInstance().getPipeDataStructureTabletSizeInBytes());
this.allocatedMemoryBlockForChunk =
PipeDataNodeResourceManager.memory()
- .forceAllocateForTabletWithRetry(
-
PipeConfig.getInstance().getPipeMaxAlignedSeriesChunkSizeInOneBatch());
+
.forceAllocateForTabletWithRetry(PipeConfig.getInstance().getPipeMaxReaderChunkSize());
try {
tsFileSequenceReader = new
TsFileSequenceReader(tsFile.getAbsolutePath(), false, false);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/table/TsFileInsertionEventTableParser.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/table/TsFileInsertionEventTableParser.java
index 7a5d869aa1a..76cc32ef347 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/table/TsFileInsertionEventTableParser.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/table/TsFileInsertionEventTableParser.java
@@ -77,7 +77,7 @@ public class TsFileInsertionEventTableParser extends
TsFileInsertionEventParser
this.allocatedMemoryBlockForChunk =
PipeDataNodeResourceManager.memory()
.forceAllocateForTabletWithRetry(
-
PipeConfig.getInstance().getPipeMaxAlignedSeriesChunkSizeInOneBatch());
+ PipeConfig.getInstance().getPipeMaxReaderChunkSize());
this.allocatedMemoryBlockForBatchData =
PipeDataNodeResourceManager.memory().forceAllocateForTabletWithRetry(tableSize);
this.allocatedMemoryBlockForChunkMeta =
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeStatementTSStatusVisitor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeStatementTSStatusVisitor.java
index 8d7a567fce0..882485a6c72 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeStatementTSStatusVisitor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeStatementTSStatusVisitor.java
@@ -133,7 +133,6 @@ public class PipeStatementTSStatusVisitor extends
StatementVisitor<TSStatus, TSS
return new
TSStatus(TSStatusCode.PIPE_RECEIVER_IDEMPOTENT_CONFLICT_EXCEPTION.getStatusCode())
.setMessage(context.getMessage());
} else if (context.getCode() ==
TSStatusCode.PATH_ALREADY_EXIST.getStatusCode()
- || context.getCode() ==
TSStatusCode.ALIGNED_TIMESERIES_ERROR.getStatusCode()
|| context.getCode() ==
TSStatusCode.SCHEMA_QUOTA_EXCEEDED.getStatusCode()) {
return new
TSStatus(TSStatusCode.PIPE_RECEIVER_USER_CONFLICT_EXCEPTION.getStatusCode())
.setMessage(context.getMessage());
@@ -168,10 +167,6 @@ public class PipeStatementTSStatusVisitor extends
StatementVisitor<TSStatus, TSS
if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()
&& status.getCode() !=
TSStatusCode.TIMESERIES_ALREADY_EXIST.getStatusCode()
&& status.getCode() !=
TSStatusCode.ALIAS_ALREADY_EXIST.getStatusCode()) {
- if (status.getCode() ==
TSStatusCode.ALIGNED_TIMESERIES_ERROR.getStatusCode()) {
- return new
TSStatus(TSStatusCode.PIPE_RECEIVER_USER_CONFLICT_EXCEPTION.getStatusCode())
- .setMessage(context.getMessage());
- }
return visitStatement(statement, context);
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryManager.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryManager.java
index 6e43d4bf53a..ada788363d7 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryManager.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryManager.java
@@ -638,12 +638,6 @@ public class PipeMemoryManager {
return usedMemorySizeInBytesOfTsFiles;
}
- public long getAllocatedMemorySizeInBytesOfBatch() {
- return (long)
- (PipeConfig.getInstance().getPipeDataStructureBatchMemoryProportion()
- * getTotalNonFloatingMemorySizeInBytes());
- }
-
public long getFreeMemorySizeInBytes() {
return memoryBlock.getFreeMemoryInBytes();
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileTableSchemaCache.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileTableSchemaCache.java
index 18b5d4cbc32..690a18dbd46 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileTableSchemaCache.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileTableSchemaCache.java
@@ -21,7 +21,6 @@ package org.apache.iotdb.db.queryengine.plan.analyze.load;
import org.apache.iotdb.commons.exception.IllegalPathException;
import org.apache.iotdb.commons.path.PatternTreeMap;
-import org.apache.iotdb.commons.pipe.config.PipeConfig;
import org.apache.iotdb.commons.schema.table.column.TsTableColumnCategory;
import org.apache.iotdb.confignode.rpc.thrift.TDatabaseSchema;
import org.apache.iotdb.db.auth.AuthorityChecker;
@@ -150,7 +149,7 @@ public class LoadTsFileTableSchemaCache {
try {
createTableAndDatabaseIfNecessary(device.getTableName());
} catch (final Exception e) {
- if (PipeConfig.getInstance().isSkipFailedTableSchemaCheck()) {
+ if
(IoTDBDescriptor.getInstance().getConfig().isSkipFailedTableSchemaCheck()) {
LOGGER.info(
"Failed to check table schema, will skip because
skipFailedTableSchemaCheck is set to true, message: {}",
e.getMessage());
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
index c735e8d1a1d..43a637d5c2c 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
@@ -209,7 +209,6 @@ public class CommonConfig {
// Sequentially poll the tsFile by default
private int pipeRealTimeQueuePollHistoricalTsFileThreshold = 1;
private int pipeRealTimeQueueMaxWaitingTsFileSize = 1;
- private boolean skipFailedTableSchemaCheck = false;
/** The maximum number of threads that can be used to execute subtasks in
PipeSubtaskExecutor. */
private int pipeSubtaskExecutorMaxThreadNum =
@@ -219,7 +218,6 @@ public class CommonConfig {
private int pipeDataStructureTabletSizeInBytes = 2097152;
private double pipeDataStructureTabletMemoryBlockAllocationRejectThreshold =
0.3;
private double pipeDataStructureTsFileMemoryBlockAllocationRejectThreshold =
0.3;
- private double PipeDataStructureBatchMemoryProportion = 0.2;
private volatile double pipeTotalFloatingMemoryProportion = 0.5;
// Check if memory check is enabled for Pipe
@@ -255,7 +253,6 @@ public class CommonConfig {
private long pipeSubtaskExecutorPendingQueueMaxBlockingTimeMs = 50;
private long pipeSubtaskExecutorCronHeartbeatEventIntervalSeconds = 20;
- private long pipeSubtaskExecutorForcedRestartIntervalMs = Long.MAX_VALUE;
private long pipeMaxWaitFinishTime = 10 * 1000;
@@ -320,7 +317,7 @@ public class CommonConfig {
private volatile long pipeMemoryExpanderIntervalSeconds = (long) 3 * 60; //
3Min
private volatile long pipeCheckMemoryEnoughIntervalMs = 10L;
private volatile float pipeLeaderCacheMemoryUsagePercentage = 0.1F;
- private volatile long pipeMaxAlignedSeriesChunkSizeInOneBatch = (long) 16 *
1024 * 1024; // 16MB;
+ private volatile long pipeMaxReaderChunkSize = 16 * MB; // 16MB;
private volatile long pipeListeningQueueTransferSnapshotThreshold = 1000;
private volatile int pipeSnapshotExecutionMaxBatchSize = 1000;
private volatile long pipeRemainingTimeCommitRateAutoSwitchSeconds = 30;
@@ -890,21 +887,6 @@ public class CommonConfig {
pipeDataStructureTsFileMemoryBlockAllocationRejectThreshold);
}
- public double getPipeDataStructureBatchMemoryProportion() {
- return PipeDataStructureBatchMemoryProportion;
- }
-
- public void setPipeDataStructureBatchMemoryProportion(
- double PipeDataStructureBatchMemoryProportion) {
- if (this.PipeDataStructureBatchMemoryProportion ==
PipeDataStructureBatchMemoryProportion) {
- return;
- }
- this.PipeDataStructureBatchMemoryProportion =
PipeDataStructureBatchMemoryProportion;
- logger.info(
- "PipeDataStructureBatchMemoryProportion is set to {}.",
- PipeDataStructureBatchMemoryProportion);
- }
-
public boolean isPipeEnableMemoryChecked() {
return isPipeEnableMemoryCheck;
}
@@ -1452,22 +1434,6 @@ public class CommonConfig {
pipeSubtaskExecutorCronHeartbeatEventIntervalSeconds);
}
- public long getPipeSubtaskExecutorForcedRestartIntervalMs() {
- return pipeSubtaskExecutorForcedRestartIntervalMs;
- }
-
- public void setPipeSubtaskExecutorForcedRestartIntervalMs(
- long pipeSubtaskExecutorForcedRestartIntervalMs) {
- if (this.pipeSubtaskExecutorForcedRestartIntervalMs
- == pipeSubtaskExecutorForcedRestartIntervalMs) {
- return;
- }
- this.pipeSubtaskExecutorForcedRestartIntervalMs =
pipeSubtaskExecutorForcedRestartIntervalMs;
- logger.info(
- "pipeSubtaskExecutorForcedRestartIntervalMs is set to {}",
- pipeSubtaskExecutorForcedRestartIntervalMs);
- }
-
public long getPipeMaxWaitFinishTime() {
return pipeMaxWaitFinishTime;
}
@@ -1523,18 +1489,6 @@ public class CommonConfig {
pipeRealTimeQueueMaxWaitingTsFileSize);
}
- public boolean isSkipFailedTableSchemaCheck() {
- return skipFailedTableSchemaCheck;
- }
-
- public void setSkipFailedTableSchemaCheck(boolean
skipFailedTableSchemaCheck) {
- if (this.skipFailedTableSchemaCheck == skipFailedTableSchemaCheck) {
- return;
- }
- this.skipFailedTableSchemaCheck = skipFailedTableSchemaCheck;
- logger.info("skipFailedTableSchemaCheck is set to {}.",
skipFailedTableSchemaCheck);
- }
-
public void setPipeAirGapReceiverEnabled(boolean pipeAirGapReceiverEnabled) {
if (pipeAirGapReceiverEnabled == this.pipeAirGapReceiverEnabled) {
return;
@@ -1798,19 +1752,16 @@ public class CommonConfig {
"pipeLeaderCacheMemoryUsagePercentage is set to {}",
pipeLeaderCacheMemoryUsagePercentage);
}
- public long getPipeMaxAlignedSeriesChunkSizeInOneBatch() {
- return pipeMaxAlignedSeriesChunkSizeInOneBatch;
+ public long getPipeMaxReaderChunkSize() {
+ return pipeMaxReaderChunkSize;
}
- public void setPipeMaxAlignedSeriesChunkSizeInOneBatch(
- long pipeMaxAlignedSeriesChunkSizeInOneBatch) {
- if (this.pipeMaxAlignedSeriesChunkSizeInOneBatch ==
pipeMaxAlignedSeriesChunkSizeInOneBatch) {
+ public void setPipeMaxReaderChunkSize(long pipeMaxReaderChunkSize) {
+ if (this.pipeMaxReaderChunkSize == pipeMaxReaderChunkSize) {
return;
}
- this.pipeMaxAlignedSeriesChunkSizeInOneBatch =
pipeMaxAlignedSeriesChunkSizeInOneBatch;
- logger.info(
- "pipeMaxAlignedSeriesChunkSizeInOneBatch is set to {}",
- pipeMaxAlignedSeriesChunkSizeInOneBatch);
+ this.pipeMaxReaderChunkSize = pipeMaxReaderChunkSize;
+ logger.info("pipeMaxReaderChunkSize is set to {}", pipeMaxReaderChunkSize);
}
public long getPipeListeningQueueTransferSnapshotThreshold() {
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
index 0957317aa59..55860d2b53c 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
@@ -398,7 +398,7 @@ public class CommonDescriptor {
Float.parseFloat(
properties.getProperty(
"subscription_prefetch_missing_rate_threshold",
-
String.valueOf(config.getSubscriptionPrefetchMemoryThreshold()))));
+
String.valueOf(config.getSubscriptionPrefetchMissingRateThreshold()))));
config.setSubscriptionPrefetchEventLocalCountThreshold(
Integer.parseInt(
properties.getProperty(
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
index 37d9af0f3e6..4e840d51c55 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
@@ -69,15 +69,13 @@ public class PipeConfig {
return COMMON_CONFIG.getPipeTotalFloatingMemoryProportion();
}
- public double getPipeDataStructureBatchMemoryProportion() {
- return COMMON_CONFIG.getPipeDataStructureBatchMemoryProportion();
- }
+ /////////////////////////////// Estimation ///////////////////////////////
public boolean isPipeEnableMemoryCheck() {
return COMMON_CONFIG.isPipeEnableMemoryChecked();
}
- public long PipeInsertNodeQueueMemory() {
+ public long getPipeInsertNodeQueueMemory() {
return COMMON_CONFIG.getPipeInsertNodeQueueMemory();
}
@@ -119,10 +117,6 @@ public class PipeConfig {
return COMMON_CONFIG.getPipeRealTimeQueueMaxWaitingTsFileSize();
}
- public boolean isSkipFailedTableSchemaCheck() {
- return COMMON_CONFIG.isSkipFailedTableSchemaCheck();
- }
-
/////////////////////////////// Subtask Executor
///////////////////////////////
public int getPipeSubtaskExecutorMaxThreadNum() {
@@ -145,10 +139,6 @@ public class PipeConfig {
return
COMMON_CONFIG.getPipeSubtaskExecutorCronHeartbeatEventIntervalSeconds();
}
- public long getPipeSubtaskExecutorForcedRestartIntervalMs() {
- return COMMON_CONFIG.getPipeSubtaskExecutorForcedRestartIntervalMs();
- }
-
public long getPipeMaxWaitFinishTime() {
return COMMON_CONFIG.getPipeMaxWaitFinishTime();
}
@@ -237,20 +227,8 @@ public class PipeConfig {
return COMMON_CONFIG.getPipeConnectorRequestSliceThresholdBytes();
}
- public float getPipeLeaderCacheMemoryUsagePercentage() {
- return COMMON_CONFIG.getPipeLeaderCacheMemoryUsagePercentage();
- }
-
- public long getPipeMaxAlignedSeriesChunkSizeInOneBatch() {
- return COMMON_CONFIG.getPipeMaxAlignedSeriesChunkSizeInOneBatch();
- }
-
- public long getPipeListeningQueueTransferSnapshotThreshold() {
- return COMMON_CONFIG.getPipeListeningQueueTransferSnapshotThreshold();
- }
-
- public int getPipeSnapshotExecutionMaxBatchSize() {
- return COMMON_CONFIG.getPipeSnapshotExecutionMaxBatchSize();
+ public long getPipeMaxReaderChunkSize() {
+ return COMMON_CONFIG.getPipeMaxReaderChunkSize();
}
public long getPipeRemainingTimeCommitAutoSwitchSeconds() {
@@ -411,6 +389,10 @@ public class PipeConfig {
return COMMON_CONFIG.getPipeCheckMemoryEnoughIntervalMs();
}
+ public float getPipeLeaderCacheMemoryUsagePercentage() {
+ return COMMON_CONFIG.getPipeLeaderCacheMemoryUsagePercentage();
+ }
+
/////////////////////////////// TwoStage ///////////////////////////////
public long getTwoStageAggregateMaxCombinerLiveTimeInMs() {
@@ -425,6 +407,16 @@ public class PipeConfig {
return COMMON_CONFIG.getTwoStageAggregateSenderEndPointsCacheInMs();
}
+ /////////////////////////////// Meta ///////////////////////////////
+
+ public long getPipeListeningQueueTransferSnapshotThreshold() {
+ return COMMON_CONFIG.getPipeListeningQueueTransferSnapshotThreshold();
+ }
+
+ public int getPipeSnapshotExecutionMaxBatchSize() {
+ return COMMON_CONFIG.getPipeSnapshotExecutionMaxBatchSize();
+ }
+
/////////////////////////////// Ref ///////////////////////////////
public boolean getPipeEventReferenceTrackingEnabled() {
@@ -460,8 +452,6 @@ public class PipeConfig {
getPipeDataStructureTsFileMemoryBlockAllocationRejectThreshold());
LOGGER.info("PipeTotalFloatingMemoryProportion: {}",
getPipeTotalFloatingMemoryProportion());
- LOGGER.info(
- "PipeDataStructureBatchMemoryProportion: {}",
getPipeDataStructureBatchMemoryProportion());
LOGGER.info("IsPipeEnableMemoryCheck: {}", isPipeEnableMemoryCheck());
LOGGER.info("PipeTsFileParserMemory: {}", getTsFileParserMemory());
LOGGER.info("SinkBatchMemoryInsertNode: {}",
getSinkBatchMemoryInsertNode());
@@ -489,9 +479,6 @@ public class PipeConfig {
LOGGER.info(
"PipeSubtaskExecutorCronHeartbeatEventIntervalSeconds: {}",
getPipeSubtaskExecutorCronHeartbeatEventIntervalSeconds());
- LOGGER.info(
- "PipeSubtaskExecutorForcedRestartIntervalMs: {}",
- getPipeSubtaskExecutorForcedRestartIntervalMs());
LOGGER.info("PipeMaxWaitFinishTime: {}", getPipeMaxWaitFinishTime());
LOGGER.info(
@@ -514,9 +501,7 @@ public class PipeConfig {
isPipeConnectorRPCThriftCompressionEnabled());
LOGGER.info(
"PipeLeaderCacheMemoryUsagePercentage: {}",
getPipeLeaderCacheMemoryUsagePercentage());
- LOGGER.info(
- "PipeMaxAlignedSeriesChunkSizeInOneBatch: {}",
- getPipeMaxAlignedSeriesChunkSizeInOneBatch());
+ LOGGER.info("PipeMaxAlignedSeriesChunkSizeInOneBatch: {}",
getPipeMaxReaderChunkSize());
LOGGER.info(
"PipeListeningQueueTransferSnapshotThreshold: {}",
getPipeListeningQueueTransferSnapshotThreshold());
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
index 2a170b6b8d1..760a7b5e51f 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
@@ -210,11 +210,6 @@ public class PipeDescriptor {
"pipe_data_structure_ts_file_memory_block_allocation_reject_threshold",
String.valueOf(
config.getPipeDataStructureTsFileMemoryBlockAllocationRejectThreshold()))));
- config.setPipeDataStructureBatchMemoryProportion(
- Double.parseDouble(
- properties.getProperty(
- "pipe_data_structure_batch_memory_proportion",
-
String.valueOf(config.getPipeDataStructureBatchMemoryProportion()))));
config.setPipeTotalFloatingMemoryProportion(
Double.parseDouble(
properties.getProperty(
@@ -278,11 +273,6 @@ public class PipeDescriptor {
properties.getProperty(
"pipe_realTime_queue_max_waiting_tsFile_size",
String.valueOf(config.getPipeRealTimeQueueMaxWaitingTsFileSize()))));
- config.setSkipFailedTableSchemaCheck(
- Boolean.parseBoolean(
- properties.getProperty(
- "skip_failed_table_schema_check",
- String.valueOf(config.isSkipFailedTableSchemaCheck()))));
config.setPipeSubtaskExecutorBasicCheckPointIntervalByConsumedEventCount(
Integer.parseInt(
properties.getProperty(
@@ -300,11 +290,6 @@ public class PipeDescriptor {
properties.getProperty(
"pipe_subtask_executor_pending_queue_max_blocking_time_ms",
String.valueOf(config.getPipeSubtaskExecutorPendingQueueMaxBlockingTimeMs()))));
- config.setPipeSubtaskExecutorForcedRestartIntervalMs(
- Long.parseLong(
- properties.getProperty(
- "pipe_subtask_executor_forced_restart_interval_ms",
-
String.valueOf(config.getPipeSubtaskExecutorForcedRestartIntervalMs()))));
config.setPipeExtractorAssignerDisruptorRingBufferSize(
Integer.parseInt(
@@ -488,11 +473,10 @@ public class PipeDescriptor {
properties.getProperty(
"pipe_leader_cache_memory_usage_percentage",
String.valueOf(config.getPipeLeaderCacheMemoryUsagePercentage()))));
- config.setPipeMaxAlignedSeriesChunkSizeInOneBatch(
+ config.setPipeMaxReaderChunkSize(
Long.parseLong(
properties.getProperty(
- "pipe_max_aligned_series_chunk_size_in_one_batch",
-
String.valueOf(config.getPipeMaxAlignedSeriesChunkSizeInOneBatch()))));
+ "pipe_max_reader_chunk_size",
String.valueOf(config.getPipeMaxReaderChunkSize()))));
config.setPipeTransferTsFileSync(
Boolean.parseBoolean(