This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch convert-on-type-mismatch
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 868f030615d4e04ec506f41ab711bc8b67745e4e
Author: Steve Yurong Su <[email protected]>
AuthorDate: Tue Aug 6 12:11:59 2024 +0800

    sender: convert on data type mismatch
---
 .../client/IoTDBConfigNodeSyncClientManager.java          | 12 ++++++++++--
 .../protocol/IoTDBConfigRegionAirGapConnector.java        |  3 +++
 .../connector/protocol/IoTDBConfigRegionConnector.java    | 10 ++++++++--
 .../connector/client/IoTDBDataNodeAsyncClientManager.java | 12 +++++++++++-
 .../connector/client/IoTDBDataNodeSyncClientManager.java  | 12 ++++++++++--
 .../protocol/airgap/IoTDBDataNodeAirGapConnector.java     |  3 +++
 .../thrift/async/IoTDBDataRegionAsyncConnector.java       |  3 ++-
 .../protocol/thrift/sync/IoTDBDataNodeSyncConnector.java  | 11 +++++++++--
 .../pipe/config/constant/PipeConnectorConstant.java       |  7 +++++++
 .../pipe/connector/client/IoTDBSyncClientManager.java     | 10 +++++++++-
 .../thrift/common/PipeTransferHandshakeConstant.java      |  1 +
 .../commons/pipe/connector/protocol/IoTDBConnector.java   | 15 +++++++++++++++
 .../pipe/connector/protocol/IoTDBSslSyncConnector.java    | 11 +++++++++--
 .../iotdb/commons/pipe/receiver/IoTDBFileReceiver.java    | 12 ++++++++++++
 14 files changed, 109 insertions(+), 13 deletions(-)

diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/client/IoTDBConfigNodeSyncClientManager.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/client/IoTDBConfigNodeSyncClientManager.java
index e420a6b5c6d..fd5b3c9ddb9 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/client/IoTDBConfigNodeSyncClientManager.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/client/IoTDBConfigNodeSyncClientManager.java
@@ -38,8 +38,16 @@ public class IoTDBConfigNodeSyncClientManager extends 
IoTDBSyncClientManager {
       boolean useSSL,
       String trustStorePath,
       String trustStorePwd,
-      String loadBalanceStrategy) {
-    super(endPoints, useSSL, trustStorePath, trustStorePwd, false, 
loadBalanceStrategy);
+      String loadBalanceStrategy,
+      boolean shouldReceiverConvertOnTypeMismatch) {
+    super(
+        endPoints,
+        useSSL,
+        trustStorePath,
+        trustStorePwd,
+        false,
+        loadBalanceStrategy,
+        shouldReceiverConvertOnTypeMismatch);
   }
 
   @Override
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/protocol/IoTDBConfigRegionAirGapConnector.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/protocol/IoTDBConfigRegionAirGapConnector.java
index 1c5febcb4bf..ba151351c39 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/protocol/IoTDBConfigRegionAirGapConnector.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/protocol/IoTDBConfigRegionAirGapConnector.java
@@ -68,6 +68,9 @@ public class IoTDBConfigRegionAirGapConnector extends 
IoTDBAirGapConnector {
     params.put(
         PipeTransferHandshakeConstant.HANDSHAKE_KEY_TIME_PRECISION,
         CommonDescriptor.getInstance().getConfig().getTimestampPrecision());
+    params.put(
+        PipeTransferHandshakeConstant.HANDSHAKE_KEY_CONVERT_ON_TYPE_MISMATCH,
+        Boolean.toString(shouldReceiverConvertOnTypeMismatch));
 
     return PipeTransferConfigNodeHandshakeV2Req.toTPipeTransferBytes(params);
   }
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/protocol/IoTDBConfigRegionConnector.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/protocol/IoTDBConfigRegionConnector.java
index 9ba081018b9..96059980c38 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/protocol/IoTDBConfigRegionConnector.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/protocol/IoTDBConfigRegionConnector.java
@@ -62,9 +62,15 @@ public class IoTDBConfigRegionConnector extends 
IoTDBSslSyncConnector {
       final String trustStorePath,
       final String trustStorePwd,
       final boolean useLeaderCache,
-      final String loadBalanceStrategy) {
+      final String loadBalanceStrategy,
+      final boolean shouldReceiverConvertOnTypeMismatch) {
     return new IoTDBConfigNodeSyncClientManager(
-        nodeUrls, useSSL, trustStorePath, trustStorePwd, loadBalanceStrategy);
+        nodeUrls,
+        useSSL,
+        trustStorePath,
+        trustStorePwd,
+        loadBalanceStrategy,
+        shouldReceiverConvertOnTypeMismatch);
   }
 
   @Override
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeAsyncClientManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeAsyncClientManager.java
index eff84ab5a5a..6814f8151b9 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeAsyncClientManager.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeAsyncClientManager.java
@@ -67,8 +67,13 @@ public class IoTDBDataNodeAsyncClientManager extends 
IoTDBClientManager
 
   private final LoadBalancer loadBalancer;
 
+  private final boolean shouldReceiverConvertOnTypeMismatch;
+
   public IoTDBDataNodeAsyncClientManager(
-      List<TEndPoint> endPoints, boolean useLeaderCache, String 
loadBalanceStrategy) {
+      List<TEndPoint> endPoints,
+      boolean useLeaderCache,
+      String loadBalanceStrategy,
+      boolean shouldReceiverConvertOnTypeMismatch) {
     super(endPoints, useLeaderCache);
 
     endPointSet = new HashSet<>(endPoints);
@@ -101,6 +106,8 @@ public class IoTDBDataNodeAsyncClientManager extends 
IoTDBClientManager
             loadBalanceStrategy);
         loadBalancer = new RoundRobinLoadBalancer();
     }
+
+    this.shouldReceiverConvertOnTypeMismatch = 
shouldReceiverConvertOnTypeMismatch;
   }
 
   public AsyncPipeDataTransferServiceClient borrowClient() throws Exception {
@@ -209,6 +216,9 @@ public class IoTDBDataNodeAsyncClientManager extends 
IoTDBClientManager
       params.put(
           PipeTransferHandshakeConstant.HANDSHAKE_KEY_TIME_PRECISION,
           CommonDescriptor.getInstance().getConfig().getTimestampPrecision());
+      params.put(
+          PipeTransferHandshakeConstant.HANDSHAKE_KEY_CONVERT_ON_TYPE_MISMATCH,
+          Boolean.toString(shouldReceiverConvertOnTypeMismatch));
 
       
client.setTimeoutDynamically(PipeConfig.getInstance().getPipeConnectorHandshakeTimeoutMs());
       
client.pipeTransfer(PipeTransferDataNodeHandshakeV2Req.toTPipeTransferReq(params),
 callback);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeSyncClientManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeSyncClientManager.java
index 2c23cba4542..df9ae2a0e29 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeSyncClientManager.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeSyncClientManager.java
@@ -48,8 +48,16 @@ public class IoTDBDataNodeSyncClientManager extends 
IoTDBSyncClientManager
       String trustStorePath,
       String trustStorePwd,
       boolean useLeaderCache,
-      String loadBalanceStrategy) {
-    super(endPoints, useSSL, trustStorePath, trustStorePwd, useLeaderCache, 
loadBalanceStrategy);
+      String loadBalanceStrategy,
+      boolean shouldReceiverConvertOnTypeMismatch) {
+    super(
+        endPoints,
+        useSSL,
+        trustStorePath,
+        trustStorePwd,
+        useLeaderCache,
+        loadBalanceStrategy,
+        shouldReceiverConvertOnTypeMismatch);
   }
 
   @Override
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBDataNodeAirGapConnector.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBDataNodeAirGapConnector.java
index b66c0012700..f0491431196 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBDataNodeAirGapConnector.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBDataNodeAirGapConnector.java
@@ -100,6 +100,9 @@ public abstract class IoTDBDataNodeAirGapConnector extends 
IoTDBAirGapConnector
     params.put(
         PipeTransferHandshakeConstant.HANDSHAKE_KEY_TIME_PRECISION,
         CommonDescriptor.getInstance().getConfig().getTimestampPrecision());
+    params.put(
+        PipeTransferHandshakeConstant.HANDSHAKE_KEY_CONVERT_ON_TYPE_MISMATCH,
+        Boolean.toString(shouldReceiverConvertOnTypeMismatch));
 
     return PipeTransferDataNodeHandshakeV2Req.toTPipeTransferBytes(params);
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java
index 255e5adec76..43c5fa6eff2 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java
@@ -124,7 +124,8 @@ public class IoTDBDataRegionAsyncConnector extends 
IoTDBConnector {
             parameters.getBooleanOrDefault(
                 Arrays.asList(SINK_LEADER_CACHE_ENABLE_KEY, 
CONNECTOR_LEADER_CACHE_ENABLE_KEY),
                 CONNECTOR_LEADER_CACHE_ENABLE_DEFAULT_VALUE),
-            loadBalanceStrategy);
+            loadBalanceStrategy,
+            shouldReceiverConvertOnTypeMismatch);
 
     if (isTabletBatchModeEnabled) {
       tabletBatchBuilder = new PipeTransferBatchReqBuilder(parameters);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBDataNodeSyncConnector.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBDataNodeSyncConnector.java
index 9ad83ee84e2..0c297343b48 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBDataNodeSyncConnector.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBDataNodeSyncConnector.java
@@ -86,10 +86,17 @@ public abstract class IoTDBDataNodeSyncConnector extends 
IoTDBSslSyncConnector {
       final String trustStorePath,
       final String trustStorePwd,
       final boolean useLeaderCache,
-      final String loadBalanceStrategy) {
+      final String loadBalanceStrategy,
+      boolean shouldReceiverConvertOnTypeMismatch) {
     clientManager =
         new IoTDBDataNodeSyncClientManager(
-            nodeUrls, useSSL, trustStorePath, trustStorePwd, useLeaderCache, 
loadBalanceStrategy);
+            nodeUrls,
+            useSSL,
+            trustStorePath,
+            trustStorePwd,
+            useLeaderCache,
+            loadBalanceStrategy,
+            shouldReceiverConvertOnTypeMismatch);
     return clientManager;
   }
 
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeConnectorConstant.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeConnectorConstant.java
index 61e773da3e4..65c76dd7067 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeConnectorConstant.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeConnectorConstant.java
@@ -81,6 +81,13 @@ public class PipeConnectorConstant {
   public static final String SINK_IOTDB_PASSWORD_KEY = "sink.password";
   public static final String CONNECTOR_IOTDB_PASSWORD_DEFAULT_VALUE = "root";
 
+  public static final String 
CONNECTOR_EXCEPTION_DATA_CONVERT_ON_TYPE_MISMATCH_KEY =
+      "connector.exception.data.convert-on-type-mismatch";
+  public static final String SINK_EXCEPTION_DATA_CONVERT_ON_TYPE_MISMATCH_KEY =
+      "sink.exception.data.convert-on-type-mismatch";
+  public static final boolean 
CONNECTOR_EXCEPTION_DATA_CONVERT_ON_TYPE_MISMATCH_DEFAULT_VALUE =
+      true;
+
   public static final String CONNECTOR_EXCEPTION_CONFLICT_RESOLVE_STRATEGY_KEY 
=
       "connector.exception.conflict.resolve-strategy";
   public static final String SINK_EXCEPTION_CONFLICT_RESOLVE_STRATEGY_KEY =
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/client/IoTDBSyncClientManager.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/client/IoTDBSyncClientManager.java
index 667313e1976..57b9a4fede4 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/client/IoTDBSyncClientManager.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/client/IoTDBSyncClientManager.java
@@ -60,13 +60,16 @@ public abstract class IoTDBSyncClientManager extends 
IoTDBClientManager implemen
 
   private final LoadBalancer loadBalancer;
 
+  private final boolean shouldReceiverConvertOnTypeMismatch;
+
   protected IoTDBSyncClientManager(
       List<TEndPoint> endPoints,
       boolean useSSL,
       String trustStorePath,
       String trustStorePwd,
       boolean useLeaderCache,
-      String loadBalanceStrategy) {
+      String loadBalanceStrategy,
+      boolean shouldReceiverConvertOnTypeMismatch) {
     super(endPoints, useLeaderCache);
 
     this.useSSL = useSSL;
@@ -93,6 +96,8 @@ public abstract class IoTDBSyncClientManager extends 
IoTDBClientManager implemen
             loadBalanceStrategy);
         loadBalancer = new RoundRobinLoadBalancer();
     }
+
+    this.shouldReceiverConvertOnTypeMismatch = 
shouldReceiverConvertOnTypeMismatch;
   }
 
   public void checkClientStatusAndTryReconstructIfNecessary() {
@@ -169,6 +174,9 @@ public abstract class IoTDBSyncClientManager extends 
IoTDBClientManager implemen
           PipeTransferHandshakeConstant.HANDSHAKE_KEY_TIME_PRECISION,
           CommonDescriptor.getInstance().getConfig().getTimestampPrecision());
       params.put(PipeTransferHandshakeConstant.HANDSHAKE_KEY_CLUSTER_ID, 
getClusterId());
+      params.put(
+          PipeTransferHandshakeConstant.HANDSHAKE_KEY_CONVERT_ON_TYPE_MISMATCH,
+          Boolean.toString(shouldReceiverConvertOnTypeMismatch));
 
       // Try to handshake by PipeTransferHandshakeV2Req.
       TPipeTransferResp resp = 
client.pipeTransfer(buildHandshakeV2Req(params));
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/payload/thrift/common/PipeTransferHandshakeConstant.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/payload/thrift/common/PipeTransferHandshakeConstant.java
index 07c7bb46390..b6864706aaa 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/payload/thrift/common/PipeTransferHandshakeConstant.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/payload/thrift/common/PipeTransferHandshakeConstant.java
@@ -23,6 +23,7 @@ public class PipeTransferHandshakeConstant {
 
   public static final String HANDSHAKE_KEY_TIME_PRECISION = 
"timestampPrecision";
   public static final String HANDSHAKE_KEY_CLUSTER_ID = "clusterID";
+  public static final String HANDSHAKE_KEY_CONVERT_ON_TYPE_MISMATCH = 
"convertOnTypeMismatch";
 
   private PipeTransferHandshakeConstant() {
     // Utility class
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBConnector.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBConnector.java
index 6ac6f9432df..9dc08f93402 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBConnector.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBConnector.java
@@ -62,6 +62,8 @@ import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstan
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_EXCEPTION_CONFLICT_RESOLVE_STRATEGY_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_EXCEPTION_CONFLICT_RETRY_MAX_TIME_SECONDS_DEFAULT_VALUE;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_EXCEPTION_CONFLICT_RETRY_MAX_TIME_SECONDS_KEY;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_EXCEPTION_DATA_CONVERT_ON_TYPE_MISMATCH_DEFAULT_VALUE;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_EXCEPTION_DATA_CONVERT_ON_TYPE_MISMATCH_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_EXCEPTION_OTHERS_RECORD_IGNORED_DATA_DEFAULT_VALUE;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_EXCEPTION_OTHERS_RECORD_IGNORED_DATA_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_EXCEPTION_OTHERS_RETRY_MAX_TIME_SECONDS_DEFAULT_VALUE;
@@ -88,6 +90,7 @@ import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstan
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_EXCEPTION_CONFLICT_RECORD_IGNORED_DATA_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_EXCEPTION_CONFLICT_RESOLVE_STRATEGY_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_EXCEPTION_CONFLICT_RETRY_MAX_TIME_SECONDS_KEY;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_EXCEPTION_DATA_CONVERT_ON_TYPE_MISMATCH_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_EXCEPTION_OTHERS_RECORD_IGNORED_DATA_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_EXCEPTION_OTHERS_RETRY_MAX_TIME_SECONDS_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_FORMAT_KEY;
@@ -124,6 +127,8 @@ public abstract class IoTDBConnector implements 
PipeConnector {
   protected boolean isTabletBatchModeEnabled = true;
 
   protected PipeReceiverStatusHandler receiverStatusHandler;
+  protected boolean shouldReceiverConvertOnTypeMismatch =
+      CONNECTOR_EXCEPTION_DATA_CONVERT_ON_TYPE_MISMATCH_DEFAULT_VALUE;
 
   @Override
   public void validate(final PipeParameterValidator validator) throws 
Exception {
@@ -312,6 +317,16 @@ public abstract class IoTDBConnector implements 
PipeConnector {
                     CONNECTOR_EXCEPTION_OTHERS_RECORD_IGNORED_DATA_KEY,
                     SINK_EXCEPTION_OTHERS_RECORD_IGNORED_DATA_KEY),
                 CONNECTOR_EXCEPTION_OTHERS_RECORD_IGNORED_DATA_DEFAULT_VALUE));
+    shouldReceiverConvertOnTypeMismatch =
+        parameters.getBooleanOrDefault(
+            Arrays.asList(
+                CONNECTOR_EXCEPTION_DATA_CONVERT_ON_TYPE_MISMATCH_KEY,
+                SINK_EXCEPTION_DATA_CONVERT_ON_TYPE_MISMATCH_KEY),
+            CONNECTOR_EXCEPTION_DATA_CONVERT_ON_TYPE_MISMATCH_DEFAULT_VALUE);
+    LOGGER.info(
+        "IoTDBConnector {} = {}",
+        CONNECTOR_EXCEPTION_DATA_CONVERT_ON_TYPE_MISMATCH_KEY,
+        shouldReceiverConvertOnTypeMismatch);
   }
 
   protected LinkedHashSet<TEndPoint> parseNodeUrls(final PipeParameters 
parameters)
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBSslSyncConnector.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBSslSyncConnector.java
index be93018bf9c..271596cf549 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBSslSyncConnector.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBSslSyncConnector.java
@@ -116,7 +116,13 @@ public abstract class IoTDBSslSyncConnector extends 
IoTDBConnector {
 
     clientManager =
         constructClient(
-            nodeUrls, useSSL, trustStorePath, trustStorePwd, useLeaderCache, 
loadBalanceStrategy);
+            nodeUrls,
+            useSSL,
+            trustStorePath,
+            trustStorePwd,
+            useLeaderCache,
+            loadBalanceStrategy,
+            shouldReceiverConvertOnTypeMismatch);
   }
 
   protected abstract IoTDBSyncClientManager constructClient(
@@ -125,7 +131,8 @@ public abstract class IoTDBSslSyncConnector extends 
IoTDBConnector {
       String trustStorePath,
       String trustStorePwd,
       boolean useLeaderCache,
-      String loadBalanceStrategy);
+      String loadBalanceStrategy,
+      boolean shouldReceiverConvertOnTypeMismatch);
 
   @Override
   public void handshake() throws Exception {
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/IoTDBFileReceiver.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/IoTDBFileReceiver.java
index b60dd1d5e1c..6665ee21d36 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/IoTDBFileReceiver.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/IoTDBFileReceiver.java
@@ -50,6 +50,8 @@ import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.stream.Collectors;
 
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_EXCEPTION_DATA_CONVERT_ON_TYPE_MISMATCH_DEFAULT_VALUE;
+
 /**
  * {@link IoTDBFileReceiver} is the parent class of receiver on both 
configNode and DataNode,
  * handling all the logic of parallel file receiving.
@@ -66,6 +68,9 @@ public abstract class IoTDBFileReceiver implements 
IoTDBReceiver {
   private File writingFile;
   private RandomAccessFile writingFileWriter;
 
+  private boolean shouldConvertDataTypeOnTypeMismatch =
+      CONNECTOR_EXCEPTION_DATA_CONVERT_ON_TYPE_MISMATCH_DEFAULT_VALUE;
+
   @Override
   public IoTDBConnectorRequestVersion getVersion() {
     return IoTDBConnectorRequestVersion.VERSION_1;
@@ -216,6 +221,13 @@ public abstract class IoTDBFileReceiver implements 
IoTDBReceiver {
       return new TPipeTransferResp(status);
     }
 
+    final String shouldConvertDataTypeOnTypeMismatchString =
+        
req.getParams().get(PipeTransferHandshakeConstant.HANDSHAKE_KEY_CONVERT_ON_TYPE_MISMATCH);
+    if (shouldConvertDataTypeOnTypeMismatchString != null) {
+      shouldConvertDataTypeOnTypeMismatch =
+          Boolean.parseBoolean(shouldConvertDataTypeOnTypeMismatchString);
+    }
+
     // Handle the handshake request as a v1 request.
     // Here we construct a fake "dataNode" request to valid from v1 validation 
logic, though
     // it may not require the actual type of the v1 request.

Reply via email to