This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch pipe-batch-request
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/pipe-batch-request by this 
push:
     new 03caa8dc7f3 ...
03caa8dc7f3 is described below

commit 03caa8dc7f39908681b52f54eb1cd582036ee7cf
Author: Steve Yurong Su <[email protected]>
AuthorDate: Thu Aug 17 19:13:55 2023 +0800

    ...
---
 .../config/constant/PipeConnectorConstant.java     | 13 ++---
 .../payload/evolvable/PipeRequestType.java         |  1 +
 .../request/PipeTransferInsertNodeReq.java         |  1 +
 .../evolvable/request/PipeTransferTabletReq.java   |  3 ++
 .../db/pipe/connector/protocol/IoTDBConnector.java | 38 ++++++--------
 .../thrift/sync/IoTDBThriftSyncConnector.java      | 60 +++++++++++-----------
 6 files changed, 59 insertions(+), 57 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/config/constant/PipeConnectorConstant.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/config/constant/PipeConnectorConstant.java
index 6053f8728a4..a5bef6e2e59 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/config/constant/PipeConnectorConstant.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/config/constant/PipeConnectorConstant.java
@@ -19,6 +19,8 @@
 
 package org.apache.iotdb.db.pipe.config.constant;
 
+import static org.apache.iotdb.commons.conf.IoTDBConstant.MB;
+
 public class PipeConnectorConstant {
 
   public static final String CONNECTOR_KEY = "connector";
@@ -27,15 +29,14 @@ public class PipeConnectorConstant {
   public static final String CONNECTOR_IOTDB_PORT_KEY = "connector.port";
   public static final String CONNECTOR_IOTDB_NODE_URLS_KEY = 
"connector.node-urls";
 
-  public static final String CONNECTOR_IOTDB_MODE_KEY = "connector.mode";
-  public static final String CONNECTOR_IOTDB_MODE_SINGLE = "single";
-  public static final String CONNECTOR_IOTDB_MODE_BATCH = "batch";
+  public static final String CONNECTOR_IOTDB_BATCH_MODE_ENABLED_KEY = 
"connector.batch.enabled";
+  public static final boolean CONNECTOR_IOTDB_BATCH_MODE_ENABLED_DEFAULT_VALUE 
= false;
 
-  public static final String CONNECTOR_IOTDB_BATCH_DELAY_KEY = 
"connector.batch.delayInSeconds";
+  public static final String CONNECTOR_IOTDB_BATCH_DELAY_KEY = 
"connector.batch.max-delay-seconds";
   public static final int CONNECTOR_IOTDB_BATCH_DELAY_DEFAULT_VALUE = 60;
 
-  public static final String CONNECTOR_IOTDB_BATCH_SIZE_KEY = 
"connector.batch.sizeInMbs";
-  public static final long CONNECTOR_IOTDB_BATCH_SIZE_DEFAULT_VALUE = 16;
+  public static final String CONNECTOR_IOTDB_BATCH_SIZE_KEY = 
"connector.batch.size-bytes";
+  public static final long CONNECTOR_IOTDB_BATCH_SIZE_DEFAULT_VALUE = 16 * MB;
 
   public static final String CONNECTOR_IOTDB_USER_KEY = "connector.user";
   public static final String CONNECTOR_IOTDB_USER_DEFAULT_VALUE = "root";
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/PipeRequestType.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/PipeRequestType.java
index 51845929caa..ed9c1384eac 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/PipeRequestType.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/PipeRequestType.java
@@ -31,6 +31,7 @@ public enum PipeRequestType {
 
   TRANSFER_FILE_PIECE((short) 4),
   TRANSFER_FILE_SEAL((short) 5),
+
   TRANSFER_BATCH((short) 6),
   ;
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/request/PipeTransferInsertNodeReq.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/request/PipeTransferInsertNodeReq.java
index 191a78177f9..f2bdd0db906 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/request/PipeTransferInsertNodeReq.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/request/PipeTransferInsertNodeReq.java
@@ -80,6 +80,7 @@ public class PipeTransferInsertNodeReq extends 
TPipeTransferReq {
       statement.setMeasurementSchemas(node.getMeasurementSchemas());
       return statement;
     }
+
     throw new UnsupportedOperationException(
         String.format(
             "unknown InsertNode type %s when constructing statement from 
insert node.",
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/request/PipeTransferTabletReq.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/request/PipeTransferTabletReq.java
index 9c99c2f8628..8be16edda7b 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/request/PipeTransferTabletReq.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/request/PipeTransferTabletReq.java
@@ -235,6 +235,7 @@ public class PipeTransferTabletReq extends TPipeTransferReq 
{
   }
 
   /////////////////////////////// Air Gap ///////////////////////////////
+
   public static byte[] toTPipeTransferTabletBytes(Tablet tablet, boolean 
isAligned)
       throws IOException {
     try (final PublicBAOS byteArrayOutputStream = new PublicBAOS();
@@ -247,6 +248,8 @@ public class PipeTransferTabletReq extends TPipeTransferReq 
{
     }
   }
 
+  /////////////////////////////// Object ///////////////////////////////
+
   @Override
   public boolean equals(Object obj) {
     if (this == obj) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/IoTDBConnector.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/IoTDBConnector.java
index d78010ddcde..38b38460e7a 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/IoTDBConnector.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/IoTDBConnector.java
@@ -35,10 +35,9 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
 
+import static 
org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_BATCH_MODE_ENABLED_DEFAULT_VALUE;
 import static 
org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_IP_KEY;
-import static 
org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_MODE_BATCH;
-import static 
org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_MODE_KEY;
-import static 
org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_MODE_SINGLE;
+import static 
org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_BATCH_MODE_ENABLED_KEY;
 import static 
org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_NODE_URLS_KEY;
 import static 
org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_PORT_KEY;
 
@@ -48,25 +47,19 @@ public abstract class IoTDBConnector implements 
PipeConnector {
 
   protected final List<TEndPoint> nodeUrls = new ArrayList<>();
 
-  protected String mode;
+  protected boolean isTabletBatchModeEnabled = false;
 
   @Override
   public void validate(PipeParameterValidator validator) throws Exception {
     final PipeParameters parameters = validator.getParameters();
-    validator
-        .validate(
-            args -> (boolean) args[0] || ((boolean) args[1] && (boolean) 
args[2]),
-            String.format(
-                "Either %s or %s:%s must be specified",
-                CONNECTOR_IOTDB_NODE_URLS_KEY, CONNECTOR_IOTDB_IP_KEY, 
CONNECTOR_IOTDB_PORT_KEY),
-            parameters.hasAttribute(CONNECTOR_IOTDB_NODE_URLS_KEY),
-            parameters.hasAttribute(CONNECTOR_IOTDB_IP_KEY),
-            parameters.hasAttribute(CONNECTOR_IOTDB_PORT_KEY))
-        .validateAttributeValueRange(
-            CONNECTOR_IOTDB_MODE_KEY,
-            true,
-            CONNECTOR_IOTDB_MODE_SINGLE,
-            CONNECTOR_IOTDB_MODE_BATCH);
+    validator.validate(
+        args -> (boolean) args[0] || ((boolean) args[1] && (boolean) args[2]),
+        String.format(
+            "Either %s or %s:%s must be specified",
+            CONNECTOR_IOTDB_NODE_URLS_KEY, CONNECTOR_IOTDB_IP_KEY, 
CONNECTOR_IOTDB_PORT_KEY),
+        parameters.hasAttribute(CONNECTOR_IOTDB_NODE_URLS_KEY),
+        parameters.hasAttribute(CONNECTOR_IOTDB_IP_KEY),
+        parameters.hasAttribute(CONNECTOR_IOTDB_PORT_KEY));
   }
 
   @Override
@@ -90,9 +83,12 @@ public abstract class IoTDBConnector implements 
PipeConnector {
 
     nodeUrls.clear();
     nodeUrls.addAll(givenNodeUrls);
+    LOGGER.info("IoTDBConnector nodeUrls: {}", nodeUrls);
 
-    LOGGER.info("IoTDBThriftConnector nodeUrls: {}", nodeUrls);
-
-    mode = parameters.getStringOrDefault(CONNECTOR_IOTDB_MODE_KEY, 
CONNECTOR_IOTDB_MODE_SINGLE);
+    isTabletBatchModeEnabled =
+        parameters.getBooleanOrDefault(
+            CONNECTOR_IOTDB_BATCH_MODE_ENABLED_KEY,
+            CONNECTOR_IOTDB_BATCH_MODE_ENABLED_DEFAULT_VALUE);
+    LOGGER.info("IoTDBConnector isTabletBatchModeEnabled: {}", 
isTabletBatchModeEnabled);
   }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBThriftSyncConnector.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBThriftSyncConnector.java
index c2288ff146e..e7fa0f7e5b4 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBThriftSyncConnector.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBThriftSyncConnector.java
@@ -83,11 +83,13 @@ public class IoTDBThriftSyncConnector extends 
IoTDBConnector {
   public void customize(PipeParameters parameters, 
PipeConnectorRuntimeConfiguration configuration)
       throws Exception {
     super.customize(parameters, configuration);
+
     for (int i = 0; i < nodeUrls.size(); i++) {
       isClientAlive.add(false);
       clients.add(null);
     }
-    if (CONNECTOR_IOTDB_MODE_BATCH.equals(mode)) {
+
+    if (isTabletBatchModeEnabled) {
       batchBuilder =
           new IoTDBThriftBatchSyncBuilder(
               parameters.getIntOrDefault(
@@ -190,36 +192,9 @@ public class IoTDBThriftSyncConnector extends 
IoTDBConnector {
 
     try {
       if (tabletInsertionEvent instanceof PipeInsertNodeTabletInsertionEvent) {
-        PipeInsertNodeTabletInsertionEvent event =
-            (PipeInsertNodeTabletInsertionEvent) tabletInsertionEvent;
-        if (CONNECTOR_IOTDB_MODE_BATCH.equals(mode)) {
-          // Init last send time to record the first element
-          batchBuilder.initLastSendTime();
-          PipeTransferInsertNodeReq insertNodeReq =
-              
PipeTransferInsertNodeReq.toTPipeTransferReq(event.getInsertNode());
-
-          // To avoid redundant event when retrying
-          batchBuilder.tryCacheReq(insertNodeReq, event);
-          if (batchBuilder.needSend()) {
-            doTransferInBatch(client);
-          }
-        } else {
-          doTransfer(client, event);
-        }
+        doTransfer(client, (PipeInsertNodeTabletInsertionEvent) 
tabletInsertionEvent);
       } else if (tabletInsertionEvent instanceof PipeRawTabletInsertionEvent) {
-        PipeRawTabletInsertionEvent event = (PipeRawTabletInsertionEvent) 
tabletInsertionEvent;
-        if (CONNECTOR_IOTDB_MODE_BATCH.equals(mode)) {
-          // Init last send time to record the first element
-          batchBuilder.initLastSendTime();
-          PipeTransferTabletReq tabletReq =
-              
PipeTransferTabletReq.toTPipeTransferReq(event.convertToTablet(), 
event.isAligned());
-          batchBuilder.tryCacheReq(tabletReq, event);
-          if (batchBuilder.needSend()) {
-            doTransferInBatch(client);
-          }
-        } else {
-          doTransfer(client, event);
-        }
+        doTransfer(client, (PipeRawTabletInsertionEvent) tabletInsertionEvent);
       } else {
         LOGGER.warn(
             "IoTDBThriftSyncConnector only support "
@@ -255,6 +230,7 @@ public class IoTDBThriftSyncConnector extends 
IoTDBConnector {
       doTransfer(client, (PipeTsFileInsertionEvent) tsFileInsertionEvent);
     } catch (TException e) {
       isClientAlive.set(clientIndex, false);
+
       throw new PipeConnectionException(
           String.format(
               "Network error when transfer tsfile insertion event %s, because 
%s.",
@@ -285,6 +261,19 @@ public class IoTDBThriftSyncConnector extends 
IoTDBConnector {
       IoTDBThriftSyncConnectorClient client,
       PipeInsertNodeTabletInsertionEvent pipeInsertNodeTabletInsertionEvent)
       throws PipeException, TException, WALPipeException {
+    if (isTabletBatchModeEnabled) {
+      // Init last send time to record the first element
+      batchBuilder.initLastSendTime();
+      PipeTransferInsertNodeReq insertNodeReq =
+          PipeTransferInsertNodeReq.toTPipeTransferReq(event.getInsertNode());
+
+      // To avoid redundant event when retrying
+      batchBuilder.tryCacheReq(insertNodeReq, event);
+      if (batchBuilder.needSend()) {
+        doTransferInBatch(client);
+      }
+    }
+
     final TPipeTransferResp resp =
         client.pipeTransfer(
             PipeTransferInsertNodeReq.toTPipeTransferReq(
@@ -302,6 +291,17 @@ public class IoTDBThriftSyncConnector extends 
IoTDBConnector {
       IoTDBThriftSyncConnectorClient client,
       PipeRawTabletInsertionEvent pipeRawTabletInsertionEvent)
       throws PipeException, TException, IOException {
+    if (isTabletBatchModeEnabled) {
+      // Init last send time to record the first element
+      batchBuilder.initLastSendTime();
+      PipeTransferTabletReq tabletReq =
+          PipeTransferTabletReq.toTPipeTransferReq(event.convertToTablet(), 
event.isAligned());
+      batchBuilder.tryCacheReq(tabletReq, event);
+      if (batchBuilder.needSend()) {
+        doTransferInBatch(client);
+      }
+    }
+
     final TPipeTransferResp resp =
         client.pipeTransfer(
             PipeTransferTabletReq.toTPipeTransferReq(

Reply via email to