This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch pipe-batch-request
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/pipe-batch-request by this
push:
new 03caa8dc7f3 ...
03caa8dc7f3 is described below
commit 03caa8dc7f39908681b52f54eb1cd582036ee7cf
Author: Steve Yurong Su <[email protected]>
AuthorDate: Thu Aug 17 19:13:55 2023 +0800
...
---
.../config/constant/PipeConnectorConstant.java | 13 ++---
.../payload/evolvable/PipeRequestType.java | 1 +
.../request/PipeTransferInsertNodeReq.java | 1 +
.../evolvable/request/PipeTransferTabletReq.java | 3 ++
.../db/pipe/connector/protocol/IoTDBConnector.java | 38 ++++++--------
.../thrift/sync/IoTDBThriftSyncConnector.java | 60 +++++++++++-----------
6 files changed, 59 insertions(+), 57 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/config/constant/PipeConnectorConstant.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/config/constant/PipeConnectorConstant.java
index 6053f8728a4..a5bef6e2e59 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/config/constant/PipeConnectorConstant.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/config/constant/PipeConnectorConstant.java
@@ -19,6 +19,8 @@
package org.apache.iotdb.db.pipe.config.constant;
+import static org.apache.iotdb.commons.conf.IoTDBConstant.MB;
+
public class PipeConnectorConstant {
public static final String CONNECTOR_KEY = "connector";
@@ -27,15 +29,14 @@ public class PipeConnectorConstant {
public static final String CONNECTOR_IOTDB_PORT_KEY = "connector.port";
public static final String CONNECTOR_IOTDB_NODE_URLS_KEY =
"connector.node-urls";
- public static final String CONNECTOR_IOTDB_MODE_KEY = "connector.mode";
- public static final String CONNECTOR_IOTDB_MODE_SINGLE = "single";
- public static final String CONNECTOR_IOTDB_MODE_BATCH = "batch";
+ public static final String CONNECTOR_IOTDB_BATCH_MODE_ENABLED_KEY =
"connector.batch.enabled";
+ public static final boolean CONNECTOR_IOTDB_BATCH_MODE_ENABLED_DEFAULT_VALUE
= false;
- public static final String CONNECTOR_IOTDB_BATCH_DELAY_KEY =
"connector.batch.delayInSeconds";
+ public static final String CONNECTOR_IOTDB_BATCH_DELAY_KEY =
"connector.batch.max-delay-seconds";
public static final int CONNECTOR_IOTDB_BATCH_DELAY_DEFAULT_VALUE = 60;
- public static final String CONNECTOR_IOTDB_BATCH_SIZE_KEY =
"connector.batch.sizeInMbs";
- public static final long CONNECTOR_IOTDB_BATCH_SIZE_DEFAULT_VALUE = 16;
+ public static final String CONNECTOR_IOTDB_BATCH_SIZE_KEY =
"connector.batch.size-bytes";
+ public static final long CONNECTOR_IOTDB_BATCH_SIZE_DEFAULT_VALUE = 16 * MB;
public static final String CONNECTOR_IOTDB_USER_KEY = "connector.user";
public static final String CONNECTOR_IOTDB_USER_DEFAULT_VALUE = "root";
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/PipeRequestType.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/PipeRequestType.java
index 51845929caa..ed9c1384eac 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/PipeRequestType.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/PipeRequestType.java
@@ -31,6 +31,7 @@ public enum PipeRequestType {
TRANSFER_FILE_PIECE((short) 4),
TRANSFER_FILE_SEAL((short) 5),
+
TRANSFER_BATCH((short) 6),
;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/request/PipeTransferInsertNodeReq.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/request/PipeTransferInsertNodeReq.java
index 191a78177f9..f2bdd0db906 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/request/PipeTransferInsertNodeReq.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/request/PipeTransferInsertNodeReq.java
@@ -80,6 +80,7 @@ public class PipeTransferInsertNodeReq extends
TPipeTransferReq {
statement.setMeasurementSchemas(node.getMeasurementSchemas());
return statement;
}
+
throw new UnsupportedOperationException(
String.format(
"unknown InsertNode type %s when constructing statement from
insert node.",
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/request/PipeTransferTabletReq.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/request/PipeTransferTabletReq.java
index 9c99c2f8628..8be16edda7b 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/request/PipeTransferTabletReq.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/request/PipeTransferTabletReq.java
@@ -235,6 +235,7 @@ public class PipeTransferTabletReq extends TPipeTransferReq
{
}
/////////////////////////////// Air Gap ///////////////////////////////
+
public static byte[] toTPipeTransferTabletBytes(Tablet tablet, boolean
isAligned)
throws IOException {
try (final PublicBAOS byteArrayOutputStream = new PublicBAOS();
@@ -247,6 +248,8 @@ public class PipeTransferTabletReq extends TPipeTransferReq
{
}
}
+ /////////////////////////////// Object ///////////////////////////////
+
@Override
public boolean equals(Object obj) {
if (this == obj) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/IoTDBConnector.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/IoTDBConnector.java
index d78010ddcde..38b38460e7a 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/IoTDBConnector.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/IoTDBConnector.java
@@ -35,10 +35,9 @@ import java.util.HashSet;
import java.util.List;
import java.util.Set;
+import static
org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_BATCH_MODE_ENABLED_DEFAULT_VALUE;
import static
org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_IP_KEY;
-import static
org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_MODE_BATCH;
-import static
org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_MODE_KEY;
-import static
org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_MODE_SINGLE;
+import static
org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_BATCH_MODE_ENABLED_KEY;
import static
org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_NODE_URLS_KEY;
import static
org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_PORT_KEY;
@@ -48,25 +47,19 @@ public abstract class IoTDBConnector implements
PipeConnector {
protected final List<TEndPoint> nodeUrls = new ArrayList<>();
- protected String mode;
+ protected boolean isTabletBatchModeEnabled = false;
@Override
public void validate(PipeParameterValidator validator) throws Exception {
final PipeParameters parameters = validator.getParameters();
- validator
- .validate(
- args -> (boolean) args[0] || ((boolean) args[1] && (boolean)
args[2]),
- String.format(
- "Either %s or %s:%s must be specified",
- CONNECTOR_IOTDB_NODE_URLS_KEY, CONNECTOR_IOTDB_IP_KEY,
CONNECTOR_IOTDB_PORT_KEY),
- parameters.hasAttribute(CONNECTOR_IOTDB_NODE_URLS_KEY),
- parameters.hasAttribute(CONNECTOR_IOTDB_IP_KEY),
- parameters.hasAttribute(CONNECTOR_IOTDB_PORT_KEY))
- .validateAttributeValueRange(
- CONNECTOR_IOTDB_MODE_KEY,
- true,
- CONNECTOR_IOTDB_MODE_SINGLE,
- CONNECTOR_IOTDB_MODE_BATCH);
+ validator.validate(
+ args -> (boolean) args[0] || ((boolean) args[1] && (boolean) args[2]),
+ String.format(
+ "Either %s or %s:%s must be specified",
+ CONNECTOR_IOTDB_NODE_URLS_KEY, CONNECTOR_IOTDB_IP_KEY,
CONNECTOR_IOTDB_PORT_KEY),
+ parameters.hasAttribute(CONNECTOR_IOTDB_NODE_URLS_KEY),
+ parameters.hasAttribute(CONNECTOR_IOTDB_IP_KEY),
+ parameters.hasAttribute(CONNECTOR_IOTDB_PORT_KEY));
}
@Override
@@ -90,9 +83,12 @@ public abstract class IoTDBConnector implements
PipeConnector {
nodeUrls.clear();
nodeUrls.addAll(givenNodeUrls);
+ LOGGER.info("IoTDBConnector nodeUrls: {}", nodeUrls);
- LOGGER.info("IoTDBThriftConnector nodeUrls: {}", nodeUrls);
-
- mode = parameters.getStringOrDefault(CONNECTOR_IOTDB_MODE_KEY,
CONNECTOR_IOTDB_MODE_SINGLE);
+ isTabletBatchModeEnabled =
+ parameters.getBooleanOrDefault(
+ CONNECTOR_IOTDB_BATCH_MODE_ENABLED_KEY,
+ CONNECTOR_IOTDB_BATCH_MODE_ENABLED_DEFAULT_VALUE);
+ LOGGER.info("IoTDBConnector isTabletBatchModeEnabled: {}",
isTabletBatchModeEnabled);
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBThriftSyncConnector.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBThriftSyncConnector.java
index c2288ff146e..e7fa0f7e5b4 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBThriftSyncConnector.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBThriftSyncConnector.java
@@ -83,11 +83,13 @@ public class IoTDBThriftSyncConnector extends
IoTDBConnector {
public void customize(PipeParameters parameters,
PipeConnectorRuntimeConfiguration configuration)
throws Exception {
super.customize(parameters, configuration);
+
for (int i = 0; i < nodeUrls.size(); i++) {
isClientAlive.add(false);
clients.add(null);
}
- if (CONNECTOR_IOTDB_MODE_BATCH.equals(mode)) {
+
+ if (isTabletBatchModeEnabled) {
batchBuilder =
new IoTDBThriftBatchSyncBuilder(
parameters.getIntOrDefault(
@@ -190,36 +192,9 @@ public class IoTDBThriftSyncConnector extends
IoTDBConnector {
try {
if (tabletInsertionEvent instanceof PipeInsertNodeTabletInsertionEvent) {
- PipeInsertNodeTabletInsertionEvent event =
- (PipeInsertNodeTabletInsertionEvent) tabletInsertionEvent;
- if (CONNECTOR_IOTDB_MODE_BATCH.equals(mode)) {
- // Init last send time to record the first element
- batchBuilder.initLastSendTime();
- PipeTransferInsertNodeReq insertNodeReq =
-
PipeTransferInsertNodeReq.toTPipeTransferReq(event.getInsertNode());
-
- // To avoid redundant event when retrying
- batchBuilder.tryCacheReq(insertNodeReq, event);
- if (batchBuilder.needSend()) {
- doTransferInBatch(client);
- }
- } else {
- doTransfer(client, event);
- }
+ doTransfer(client, (PipeInsertNodeTabletInsertionEvent)
tabletInsertionEvent);
} else if (tabletInsertionEvent instanceof PipeRawTabletInsertionEvent) {
- PipeRawTabletInsertionEvent event = (PipeRawTabletInsertionEvent)
tabletInsertionEvent;
- if (CONNECTOR_IOTDB_MODE_BATCH.equals(mode)) {
- // Init last send time to record the first element
- batchBuilder.initLastSendTime();
- PipeTransferTabletReq tabletReq =
-
PipeTransferTabletReq.toTPipeTransferReq(event.convertToTablet(),
event.isAligned());
- batchBuilder.tryCacheReq(tabletReq, event);
- if (batchBuilder.needSend()) {
- doTransferInBatch(client);
- }
- } else {
- doTransfer(client, event);
- }
+ doTransfer(client, (PipeRawTabletInsertionEvent) tabletInsertionEvent);
} else {
LOGGER.warn(
"IoTDBThriftSyncConnector only support "
@@ -255,6 +230,7 @@ public class IoTDBThriftSyncConnector extends
IoTDBConnector {
doTransfer(client, (PipeTsFileInsertionEvent) tsFileInsertionEvent);
} catch (TException e) {
isClientAlive.set(clientIndex, false);
+
throw new PipeConnectionException(
String.format(
"Network error when transfer tsfile insertion event %s, because
%s.",
@@ -285,6 +261,19 @@ public class IoTDBThriftSyncConnector extends
IoTDBConnector {
IoTDBThriftSyncConnectorClient client,
PipeInsertNodeTabletInsertionEvent pipeInsertNodeTabletInsertionEvent)
throws PipeException, TException, WALPipeException {
+ if (isTabletBatchModeEnabled) {
+ // Init last send time to record the first element
+ batchBuilder.initLastSendTime();
+ PipeTransferInsertNodeReq insertNodeReq =
+ PipeTransferInsertNodeReq.toTPipeTransferReq(event.getInsertNode());
+
+ // To avoid redundant event when retrying
+ batchBuilder.tryCacheReq(insertNodeReq, event);
+ if (batchBuilder.needSend()) {
+ doTransferInBatch(client);
+ }
+ }
+
final TPipeTransferResp resp =
client.pipeTransfer(
PipeTransferInsertNodeReq.toTPipeTransferReq(
@@ -302,6 +291,17 @@ public class IoTDBThriftSyncConnector extends
IoTDBConnector {
IoTDBThriftSyncConnectorClient client,
PipeRawTabletInsertionEvent pipeRawTabletInsertionEvent)
throws PipeException, TException, IOException {
+ if (isTabletBatchModeEnabled) {
+ // Init last send time to record the first element
+ batchBuilder.initLastSendTime();
+ PipeTransferTabletReq tabletReq =
+ PipeTransferTabletReq.toTPipeTransferReq(event.convertToTablet(),
event.isAligned());
+ batchBuilder.tryCacheReq(tabletReq, event);
+ if (batchBuilder.needSend()) {
+ doTransferInBatch(client);
+ }
+ }
+
final TPipeTransferResp resp =
client.pipeTransfer(
PipeTransferTabletReq.toTPipeTransferReq(