This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 5b31438b0b7 Pipe: Support converting data type on data sync receiver 
metadata mismatch (#13110)
5b31438b0b7 is described below

commit 5b31438b0b7fb77492f69bdc5948e0788c5191cd
Author: Steve Yurong Su <[email protected]>
AuthorDate: Fri Aug 9 11:17:00 2024 +0800

    Pipe: Support converting data type on data sync receiver metadata mismatch 
(#13110)
---
 .../client/IoTDBConfigNodeSyncClientManager.java   |  12 +-
 .../protocol/IoTDBConfigRegionAirGapConnector.java |   3 +
 .../protocol/IoTDBConfigRegionConnector.java       |  10 +-
 .../client/IoTDBDataNodeAsyncClientManager.java    |  12 +-
 .../client/IoTDBDataNodeSyncClientManager.java     |  12 +-
 .../airgap/IoTDBDataNodeAirGapConnector.java       |   3 +
 .../async/IoTDBDataRegionAsyncConnector.java       |   3 +-
 .../thrift/sync/IoTDBDataNodeSyncConnector.java    |  11 +-
 .../protocol/thrift/IoTDBDataNodeReceiver.java     |  40 +-
 .../transform/converter/ArrayConverter.java        | 940 +++++++++++++++++++++
 .../transform/converter/ValueConverter.java        | 787 +++++++++++++++++
 .../statement/PipeConvertedInsertRowStatement.java | 112 +++
 .../PipeConvertedInsertTabletStatement.java        |  64 ++
 ...peStatementDataTypeConvertExecutionVisitor.java | 173 ++++
 .../plan/statement/crud/InsertBaseStatement.java   |   8 +-
 .../plan/statement/crud/InsertRowStatement.java    |  12 +-
 .../plan/statement/crud/InsertTabletStatement.java |  10 +-
 .../apache/iotdb/db/utils/TypeInferenceUtils.java  |   2 +-
 .../config/constant/PipeConnectorConstant.java     |   7 +
 .../connector/client/IoTDBSyncClientManager.java   |  10 +-
 .../common/PipeTransferHandshakeConstant.java      |   1 +
 .../pipe/connector/protocol/IoTDBConnector.java    |  15 +
 .../connector/protocol/IoTDBSslSyncConnector.java  |  11 +-
 .../commons/pipe/receiver/IoTDBFileReceiver.java   |  12 +
 24 files changed, 2225 insertions(+), 45 deletions(-)

diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/client/IoTDBConfigNodeSyncClientManager.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/client/IoTDBConfigNodeSyncClientManager.java
index e420a6b5c6d..fd5b3c9ddb9 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/client/IoTDBConfigNodeSyncClientManager.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/client/IoTDBConfigNodeSyncClientManager.java
@@ -38,8 +38,16 @@ public class IoTDBConfigNodeSyncClientManager extends 
IoTDBSyncClientManager {
       boolean useSSL,
       String trustStorePath,
       String trustStorePwd,
-      String loadBalanceStrategy) {
-    super(endPoints, useSSL, trustStorePath, trustStorePwd, false, 
loadBalanceStrategy);
+      String loadBalanceStrategy,
+      boolean shouldReceiverConvertOnTypeMismatch) {
+    super(
+        endPoints,
+        useSSL,
+        trustStorePath,
+        trustStorePwd,
+        false,
+        loadBalanceStrategy,
+        shouldReceiverConvertOnTypeMismatch);
   }
 
   @Override
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/protocol/IoTDBConfigRegionAirGapConnector.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/protocol/IoTDBConfigRegionAirGapConnector.java
index 1c5febcb4bf..ba151351c39 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/protocol/IoTDBConfigRegionAirGapConnector.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/protocol/IoTDBConfigRegionAirGapConnector.java
@@ -68,6 +68,9 @@ public class IoTDBConfigRegionAirGapConnector extends 
IoTDBAirGapConnector {
     params.put(
         PipeTransferHandshakeConstant.HANDSHAKE_KEY_TIME_PRECISION,
         CommonDescriptor.getInstance().getConfig().getTimestampPrecision());
+    params.put(
+        PipeTransferHandshakeConstant.HANDSHAKE_KEY_CONVERT_ON_TYPE_MISMATCH,
+        Boolean.toString(shouldReceiverConvertOnTypeMismatch));
 
     return PipeTransferConfigNodeHandshakeV2Req.toTPipeTransferBytes(params);
   }
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/protocol/IoTDBConfigRegionConnector.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/protocol/IoTDBConfigRegionConnector.java
index 9ba081018b9..96059980c38 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/protocol/IoTDBConfigRegionConnector.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/protocol/IoTDBConfigRegionConnector.java
@@ -62,9 +62,15 @@ public class IoTDBConfigRegionConnector extends 
IoTDBSslSyncConnector {
       final String trustStorePath,
       final String trustStorePwd,
       final boolean useLeaderCache,
-      final String loadBalanceStrategy) {
+      final String loadBalanceStrategy,
+      final boolean shouldReceiverConvertOnTypeMismatch) {
     return new IoTDBConfigNodeSyncClientManager(
-        nodeUrls, useSSL, trustStorePath, trustStorePwd, loadBalanceStrategy);
+        nodeUrls,
+        useSSL,
+        trustStorePath,
+        trustStorePwd,
+        loadBalanceStrategy,
+        shouldReceiverConvertOnTypeMismatch);
   }
 
   @Override
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeAsyncClientManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeAsyncClientManager.java
index eff84ab5a5a..6814f8151b9 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeAsyncClientManager.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeAsyncClientManager.java
@@ -67,8 +67,13 @@ public class IoTDBDataNodeAsyncClientManager extends 
IoTDBClientManager
 
   private final LoadBalancer loadBalancer;
 
+  private final boolean shouldReceiverConvertOnTypeMismatch;
+
   public IoTDBDataNodeAsyncClientManager(
-      List<TEndPoint> endPoints, boolean useLeaderCache, String 
loadBalanceStrategy) {
+      List<TEndPoint> endPoints,
+      boolean useLeaderCache,
+      String loadBalanceStrategy,
+      boolean shouldReceiverConvertOnTypeMismatch) {
     super(endPoints, useLeaderCache);
 
     endPointSet = new HashSet<>(endPoints);
@@ -101,6 +106,8 @@ public class IoTDBDataNodeAsyncClientManager extends 
IoTDBClientManager
             loadBalanceStrategy);
         loadBalancer = new RoundRobinLoadBalancer();
     }
+
+    this.shouldReceiverConvertOnTypeMismatch = 
shouldReceiverConvertOnTypeMismatch;
   }
 
   public AsyncPipeDataTransferServiceClient borrowClient() throws Exception {
@@ -209,6 +216,9 @@ public class IoTDBDataNodeAsyncClientManager extends 
IoTDBClientManager
       params.put(
           PipeTransferHandshakeConstant.HANDSHAKE_KEY_TIME_PRECISION,
           CommonDescriptor.getInstance().getConfig().getTimestampPrecision());
+      params.put(
+          PipeTransferHandshakeConstant.HANDSHAKE_KEY_CONVERT_ON_TYPE_MISMATCH,
+          Boolean.toString(shouldReceiverConvertOnTypeMismatch));
 
       
client.setTimeoutDynamically(PipeConfig.getInstance().getPipeConnectorHandshakeTimeoutMs());
       
client.pipeTransfer(PipeTransferDataNodeHandshakeV2Req.toTPipeTransferReq(params),
 callback);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeSyncClientManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeSyncClientManager.java
index 2c23cba4542..df9ae2a0e29 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeSyncClientManager.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeSyncClientManager.java
@@ -48,8 +48,16 @@ public class IoTDBDataNodeSyncClientManager extends 
IoTDBSyncClientManager
       String trustStorePath,
       String trustStorePwd,
       boolean useLeaderCache,
-      String loadBalanceStrategy) {
-    super(endPoints, useSSL, trustStorePath, trustStorePwd, useLeaderCache, 
loadBalanceStrategy);
+      String loadBalanceStrategy,
+      boolean shouldReceiverConvertOnTypeMismatch) {
+    super(
+        endPoints,
+        useSSL,
+        trustStorePath,
+        trustStorePwd,
+        useLeaderCache,
+        loadBalanceStrategy,
+        shouldReceiverConvertOnTypeMismatch);
   }
 
   @Override
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBDataNodeAirGapConnector.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBDataNodeAirGapConnector.java
index b66c0012700..f0491431196 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBDataNodeAirGapConnector.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBDataNodeAirGapConnector.java
@@ -100,6 +100,9 @@ public abstract class IoTDBDataNodeAirGapConnector extends 
IoTDBAirGapConnector
     params.put(
         PipeTransferHandshakeConstant.HANDSHAKE_KEY_TIME_PRECISION,
         CommonDescriptor.getInstance().getConfig().getTimestampPrecision());
+    params.put(
+        PipeTransferHandshakeConstant.HANDSHAKE_KEY_CONVERT_ON_TYPE_MISMATCH,
+        Boolean.toString(shouldReceiverConvertOnTypeMismatch));
 
     return PipeTransferDataNodeHandshakeV2Req.toTPipeTransferBytes(params);
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java
index 255e5adec76..43c5fa6eff2 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java
@@ -124,7 +124,8 @@ public class IoTDBDataRegionAsyncConnector extends 
IoTDBConnector {
             parameters.getBooleanOrDefault(
                 Arrays.asList(SINK_LEADER_CACHE_ENABLE_KEY, 
CONNECTOR_LEADER_CACHE_ENABLE_KEY),
                 CONNECTOR_LEADER_CACHE_ENABLE_DEFAULT_VALUE),
-            loadBalanceStrategy);
+            loadBalanceStrategy,
+            shouldReceiverConvertOnTypeMismatch);
 
     if (isTabletBatchModeEnabled) {
       tabletBatchBuilder = new PipeTransferBatchReqBuilder(parameters);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBDataNodeSyncConnector.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBDataNodeSyncConnector.java
index 9ad83ee84e2..32c886fbe56 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBDataNodeSyncConnector.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBDataNodeSyncConnector.java
@@ -86,10 +86,17 @@ public abstract class IoTDBDataNodeSyncConnector extends 
IoTDBSslSyncConnector {
       final String trustStorePath,
       final String trustStorePwd,
       final boolean useLeaderCache,
-      final String loadBalanceStrategy) {
+      final String loadBalanceStrategy,
+      final boolean shouldReceiverConvertOnTypeMismatch) {
     clientManager =
         new IoTDBDataNodeSyncClientManager(
-            nodeUrls, useSSL, trustStorePath, trustStorePwd, useLeaderCache, 
loadBalanceStrategy);
+            nodeUrls,
+            useSSL,
+            trustStorePath,
+            trustStorePwd,
+            useLeaderCache,
+            loadBalanceStrategy,
+            shouldReceiverConvertOnTypeMismatch);
     return clientManager;
   }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
index 12916ba4506..b70f833a7c5 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
@@ -51,6 +51,7 @@ import 
org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransfer
 import 
org.apache.iotdb.db.pipe.event.common.schema.PipeSchemaRegionSnapshotEvent;
 import org.apache.iotdb.db.pipe.metric.PipeDataNodeReceiverMetrics;
 import org.apache.iotdb.db.pipe.receiver.visitor.PipePlanToStatementVisitor;
+import 
org.apache.iotdb.db.pipe.receiver.visitor.PipeStatementDataTypeConvertExecutionVisitor;
 import org.apache.iotdb.db.pipe.receiver.visitor.PipeStatementExceptionVisitor;
 import 
org.apache.iotdb.db.pipe.receiver.visitor.PipeStatementPatternParseVisitor;
 import org.apache.iotdb.db.pipe.receiver.visitor.PipeStatementTSStatusVisitor;
@@ -61,7 +62,6 @@ import 
org.apache.iotdb.db.queryengine.common.header.ColumnHeaderConstant;
 import org.apache.iotdb.db.queryengine.plan.Coordinator;
 import org.apache.iotdb.db.queryengine.plan.analyze.ClusterPartitionFetcher;
 import 
org.apache.iotdb.db.queryengine.plan.analyze.schema.ClusterSchemaFetcher;
-import org.apache.iotdb.db.queryengine.plan.execution.ExecutionResult;
 import 
org.apache.iotdb.db.queryengine.plan.execution.config.executor.ClusterConfigTaskExecutor;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.write.view.AlterLogicalViewNode;
 import org.apache.iotdb.db.queryengine.plan.statement.Statement;
@@ -117,6 +117,9 @@ public class IoTDBDataNodeReceiver extends 
IoTDBFileReceiver {
       new PipeStatementExceptionVisitor();
   private static final PipeStatementPatternParseVisitor 
STATEMENT_PATTERN_PARSE_VISITOR =
       new PipeStatementPatternParseVisitor();
+  private static final PipeStatementDataTypeConvertExecutionVisitor
+      STATEMENT_DATA_TYPE_CONVERT_EXECUTION_VISITOR =
+          new 
PipeStatementDataTypeConvertExecutionVisitor(IoTDBDataNodeReceiver::executeStatement);
   private final PipeStatementToBatchVisitor batchVisitor = new 
PipeStatementToBatchVisitor();
 
   // Used for data transfer: confignode (cluster A) -> datanode (cluster B) -> 
confignode (cluster
@@ -461,7 +464,7 @@ public class IoTDBDataNodeReceiver extends 
IoTDBFileReceiver {
 
   private TSStatus executeStatementAndClassifyExceptions(final Statement 
statement) {
     try {
-      final TSStatus result = executeStatement(statement);
+      final TSStatus result = 
executeStatementWithRetryOnDataTypeMismatch(statement);
       if (result.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()
           || result.getCode() == 
TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()) {
         return result;
@@ -483,25 +486,30 @@ public class IoTDBDataNodeReceiver extends 
IoTDBFileReceiver {
     }
   }
 
-  private TSStatus executeStatement(Statement statement) {
+  private TSStatus executeStatementWithRetryOnDataTypeMismatch(final Statement 
statement) {
     if (statement == null) {
       return RpcUtils.getStatus(
           TSStatusCode.PIPE_TRANSFER_EXECUTE_STATEMENT_ERROR, "Execute null 
statement.");
     }
 
-    statement = new PipeEnrichedStatement(statement);
-
-    final ExecutionResult result =
-        Coordinator.getInstance()
-            .executeForTreeModel(
-                statement,
-                SessionManager.getInstance().requestQueryId(),
-                new SessionInfo(0, AuthorityChecker.SUPER_USER, 
ZoneId.systemDefault()),
-                "",
-                ClusterPartitionFetcher.getInstance(),
-                ClusterSchemaFetcher.getInstance(),
-                
IoTDBDescriptor.getInstance().getConfig().getQueryTimeoutThreshold());
-    return result.status;
+    final TSStatus status = executeStatement(statement);
+    return status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()
+            || !shouldConvertDataTypeOnTypeMismatch
+        ? status
+        : statement.accept(STATEMENT_DATA_TYPE_CONVERT_EXECUTION_VISITOR, 
status).orElse(status);
+  }
+
+  private static TSStatus executeStatement(final Statement statement) {
+    return Coordinator.getInstance()
+        .executeForTreeModel(
+            new PipeEnrichedStatement(statement),
+            SessionManager.getInstance().requestQueryId(),
+            new SessionInfo(0, AuthorityChecker.SUPER_USER, 
ZoneId.systemDefault()),
+            "",
+            ClusterPartitionFetcher.getInstance(),
+            ClusterSchemaFetcher.getInstance(),
+            
IoTDBDescriptor.getInstance().getConfig().getQueryTimeoutThreshold())
+        .status;
   }
 
   @Override
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/transform/converter/ArrayConverter.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/transform/converter/ArrayConverter.java
new file mode 100644
index 00000000000..3d5a9b0cfb6
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/transform/converter/ArrayConverter.java
@@ -0,0 +1,940 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.receiver.transform.converter;
+
+import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.utils.Binary;
+
+public class ArrayConverter {
+
+  @FunctionalInterface
+  private interface Converter {
+    Object convert(
+        final TSDataType sourceDataType,
+        final TSDataType targetDataType,
+        final Object sourceValues);
+  }
+
+  private static final Converter[][] CONVERTER =
+      new Converter[TSDataType.values().length][TSDataType.values().length];
+
+  private static final Converter DO_NOTHING_CONVERTER =
+      (sourceDataType, targetDataType, sourceValues) -> sourceValues;
+
+  static {
+    for (final TSDataType sourceDataType : TSDataType.values()) {
+      for (final TSDataType targetDataType : TSDataType.values()) {
+        CONVERTER[sourceDataType.ordinal()][targetDataType.ordinal()] = 
DO_NOTHING_CONVERTER;
+      }
+    }
+
+    // BOOLEAN
+    CONVERTER[TSDataType.BOOLEAN.ordinal()][TSDataType.BOOLEAN.ordinal()] =
+        (sourceDataType, targetDataType, sourceValues) -> sourceValues;
+    CONVERTER[TSDataType.BOOLEAN.ordinal()][TSDataType.INT32.ordinal()] =
+        (sourceDataType, targetDataType, sourceValues) -> {
+          final boolean[] boolValues = (boolean[]) sourceValues;
+          final int[] intValues = new int[boolValues.length];
+          for (int i = 0; i < boolValues.length; i++) {
+            intValues[i] = ValueConverter.convertBooleanToInt32(boolValues[i]);
+          }
+          return intValues;
+        };
+    CONVERTER[TSDataType.BOOLEAN.ordinal()][TSDataType.INT64.ordinal()] =
+        (sourceDataType, targetDataType, sourceValues) -> {
+          final boolean[] boolValues = (boolean[]) sourceValues;
+          final long[] longValues = new long[boolValues.length];
+          for (int i = 0; i < boolValues.length; i++) {
+            longValues[i] = 
ValueConverter.convertBooleanToInt64(boolValues[i]);
+          }
+          return longValues;
+        };
+    CONVERTER[TSDataType.BOOLEAN.ordinal()][TSDataType.FLOAT.ordinal()] =
+        (sourceDataType, targetDataType, sourceValues) -> {
+          final boolean[] boolValues = (boolean[]) sourceValues;
+          final float[] floatValues = new float[boolValues.length];
+          for (int i = 0; i < boolValues.length; i++) {
+            floatValues[i] = 
ValueConverter.convertBooleanToFloat(boolValues[i]);
+          }
+          return floatValues;
+        };
+    CONVERTER[TSDataType.BOOLEAN.ordinal()][TSDataType.DOUBLE.ordinal()] =
+        (sourceDataType, targetDataType, sourceValues) -> {
+          final boolean[] boolValues = (boolean[]) sourceValues;
+          final double[] doubleValues = new double[boolValues.length];
+          for (int i = 0; i < boolValues.length; i++) {
+            doubleValues[i] = 
ValueConverter.convertBooleanToDouble(boolValues[i]);
+          }
+          return doubleValues;
+        };
+    CONVERTER[TSDataType.BOOLEAN.ordinal()][TSDataType.TEXT.ordinal()] =
+        (sourceDataType, targetDataType, sourceValues) -> {
+          final boolean[] boolValues = (boolean[]) sourceValues;
+          final Binary[] textValues = new Binary[boolValues.length];
+          for (int i = 0; i < boolValues.length; i++) {
+            textValues[i] = ValueConverter.convertBooleanToText(boolValues[i]);
+          }
+          return textValues;
+        };
+    CONVERTER[TSDataType.BOOLEAN.ordinal()][TSDataType.VECTOR.ordinal()] = 
DO_NOTHING_CONVERTER;
+    CONVERTER[TSDataType.BOOLEAN.ordinal()][TSDataType.UNKNOWN.ordinal()] = 
DO_NOTHING_CONVERTER;
+    CONVERTER[TSDataType.BOOLEAN.ordinal()][TSDataType.TIMESTAMP.ordinal()] =
+        (sourceDataType, targetDataType, sourceValues) -> {
+          final boolean[] boolValues = (boolean[]) sourceValues;
+          final long[] timestampValues = new long[boolValues.length];
+          for (int i = 0; i < boolValues.length; i++) {
+            timestampValues[i] = 
ValueConverter.convertBooleanToTimestamp(boolValues[i]);
+          }
+          return timestampValues;
+        };
+    CONVERTER[TSDataType.BOOLEAN.ordinal()][TSDataType.DATE.ordinal()] =
+        (sourceDataType, targetDataType, sourceValues) -> {
+          final boolean[] boolValues = (boolean[]) sourceValues;
+          final int[] dateValues = new int[boolValues.length];
+          for (int i = 0; i < boolValues.length; i++) {
+            dateValues[i] = ValueConverter.convertBooleanToDate(boolValues[i]);
+          }
+          return dateValues;
+        };
+    CONVERTER[TSDataType.BOOLEAN.ordinal()][TSDataType.BLOB.ordinal()] =
+        (sourceDataType, targetDataType, sourceValues) -> {
+          final boolean[] boolValues = (boolean[]) sourceValues;
+          final Binary[] blobValues = new Binary[boolValues.length];
+          for (int i = 0; i < boolValues.length; i++) {
+            blobValues[i] = ValueConverter.convertBooleanToBlob(boolValues[i]);
+          }
+          return blobValues;
+        };
+    CONVERTER[TSDataType.BOOLEAN.ordinal()][TSDataType.STRING.ordinal()] =
+        (sourceDataType, targetDataType, sourceValues) -> {
+          final boolean[] boolValues = (boolean[]) sourceValues;
+          final Binary[] stringValues = new Binary[boolValues.length];
+          for (int i = 0; i < boolValues.length; i++) {
+            stringValues[i] = 
ValueConverter.convertBooleanToString(boolValues[i]);
+          }
+          return stringValues;
+        };
+
+    // INT32
+    CONVERTER[TSDataType.INT32.ordinal()][TSDataType.BOOLEAN.ordinal()] =
+        (sourceDataType, targetDataType, sourceValues) -> {
+          final int[] intValues = (int[]) sourceValues;
+          final boolean[] boolValues = new boolean[intValues.length];
+          for (int i = 0; i < intValues.length; i++) {
+            boolValues[i] = ValueConverter.convertInt32ToBoolean(intValues[i]);
+          }
+          return boolValues;
+        };
+    CONVERTER[TSDataType.INT32.ordinal()][TSDataType.INT32.ordinal()] =
+        (sourceDataType, targetDataType, sourceValues) -> sourceValues;
+    CONVERTER[TSDataType.INT32.ordinal()][TSDataType.INT64.ordinal()] =
+        (sourceDataType, targetDataType, sourceValues) -> {
+          final int[] intValues = (int[]) sourceValues;
+          final long[] longValues = new long[intValues.length];
+          for (int i = 0; i < intValues.length; i++) {
+            longValues[i] = ValueConverter.convertInt32ToInt64(intValues[i]);
+          }
+          return longValues;
+        };
+    CONVERTER[TSDataType.INT32.ordinal()][TSDataType.FLOAT.ordinal()] =
+        (sourceDataType, targetDataType, sourceValues) -> {
+          final int[] intValues = (int[]) sourceValues;
+          final float[] floatValues = new float[intValues.length];
+          for (int i = 0; i < intValues.length; i++) {
+            floatValues[i] = ValueConverter.convertInt32ToFloat(intValues[i]);
+          }
+          return floatValues;
+        };
+    CONVERTER[TSDataType.INT32.ordinal()][TSDataType.DOUBLE.ordinal()] =
+        (sourceDataType, targetDataType, sourceValues) -> {
+          final int[] intValues = (int[]) sourceValues;
+          final double[] doubleValues = new double[intValues.length];
+          for (int i = 0; i < intValues.length; i++) {
+            doubleValues[i] = 
ValueConverter.convertInt32ToDouble(intValues[i]);
+          }
+          return doubleValues;
+        };
+    CONVERTER[TSDataType.INT32.ordinal()][TSDataType.TEXT.ordinal()] =
+        (sourceDataType, targetDataType, sourceValues) -> {
+          final int[] intValues = (int[]) sourceValues;
+          final Binary[] textValues = new Binary[intValues.length];
+          for (int i = 0; i < intValues.length; i++) {
+            textValues[i] = ValueConverter.convertInt32ToText(intValues[i]);
+          }
+          return textValues;
+        };
+    CONVERTER[TSDataType.INT32.ordinal()][TSDataType.VECTOR.ordinal()] = 
DO_NOTHING_CONVERTER;
+    CONVERTER[TSDataType.INT32.ordinal()][TSDataType.UNKNOWN.ordinal()] = 
DO_NOTHING_CONVERTER;
+    CONVERTER[TSDataType.INT32.ordinal()][TSDataType.TIMESTAMP.ordinal()] =
+        (sourceDataType, targetDataType, sourceValues) -> {
+          final int[] intValues = (int[]) sourceValues;
+          final long[] timestampValues = new long[intValues.length];
+          for (int i = 0; i < intValues.length; i++) {
+            timestampValues[i] = 
ValueConverter.convertInt32ToTimestamp(intValues[i]);
+          }
+          return timestampValues;
+        };
+    CONVERTER[TSDataType.INT32.ordinal()][TSDataType.DATE.ordinal()] =
+        (sourceDataType, targetDataType, sourceValues) -> {
+          final int[] intValues = (int[]) sourceValues;
+          final int[] dateValues = new int[intValues.length];
+          for (int i = 0; i < intValues.length; i++) {
+            dateValues[i] = ValueConverter.convertInt32ToDate(intValues[i]);
+          }
+          return dateValues;
+        };
+    CONVERTER[TSDataType.INT32.ordinal()][TSDataType.BLOB.ordinal()] =
+        (sourceDataType, targetDataType, sourceValues) -> {
+          final int[] intValues = (int[]) sourceValues;
+          final Binary[] blobValues = new Binary[intValues.length];
+          for (int i = 0; i < intValues.length; i++) {
+            blobValues[i] = ValueConverter.convertInt32ToBlob(intValues[i]);
+          }
+          return blobValues;
+        };
+    CONVERTER[TSDataType.INT32.ordinal()][TSDataType.STRING.ordinal()] =
+        (sourceDataType, targetDataType, sourceValues) -> {
+          final int[] intValues = (int[]) sourceValues;
+          final Binary[] stringValues = new Binary[intValues.length];
+          for (int i = 0; i < intValues.length; i++) {
+            stringValues[i] = 
ValueConverter.convertInt32ToString(intValues[i]);
+          }
+          return stringValues;
+        };
+
+    // INT64
+    CONVERTER[TSDataType.INT64.ordinal()][TSDataType.BOOLEAN.ordinal()] =
+        (sourceDataType, targetDataType, sourceValues) -> {
+          final long[] longValues = (long[]) sourceValues;
+          final boolean[] boolValues = new boolean[longValues.length];
+          for (int i = 0; i < longValues.length; i++) {
+            boolValues[i] = 
ValueConverter.convertInt64ToBoolean(longValues[i]);
+          }
+          return boolValues;
+        };
+    CONVERTER[TSDataType.INT64.ordinal()][TSDataType.INT32.ordinal()] =
+        (sourceDataType, targetDataType, sourceValues) -> {
+          final long[] longValues = (long[]) sourceValues;
+          final int[] intValues = new int[longValues.length];
+          for (int i = 0; i < longValues.length; i++) {
+            intValues[i] = ValueConverter.convertInt64ToInt32(longValues[i]);
+          }
+          return intValues;
+        };
+    CONVERTER[TSDataType.INT64.ordinal()][TSDataType.INT64.ordinal()] =
+        (sourceDataType, targetDataType, sourceValues) -> sourceValues;
+    CONVERTER[TSDataType.INT64.ordinal()][TSDataType.FLOAT.ordinal()] =
+        (sourceDataType, targetDataType, sourceValues) -> {
+          final long[] longValues = (long[]) sourceValues;
+          final float[] floatValues = new float[longValues.length];
+          for (int i = 0; i < longValues.length; i++) {
+            floatValues[i] = ValueConverter.convertInt64ToFloat(longValues[i]);
+          }
+          return floatValues;
+        };
+    CONVERTER[TSDataType.INT64.ordinal()][TSDataType.DOUBLE.ordinal()] =
+        (sourceDataType, targetDataType, sourceValues) -> {
+          final long[] longValues = (long[]) sourceValues;
+          final double[] doubleValues = new double[longValues.length];
+          for (int i = 0; i < longValues.length; i++) {
+            doubleValues[i] = 
ValueConverter.convertInt64ToDouble(longValues[i]);
+          }
+          return doubleValues;
+        };
+    CONVERTER[TSDataType.INT64.ordinal()][TSDataType.TEXT.ordinal()] =
+        (sourceDataType, targetDataType, sourceValues) -> {
+          final long[] longValues = (long[]) sourceValues;
+          final Binary[] textValues = new Binary[longValues.length];
+          for (int i = 0; i < longValues.length; i++) {
+            textValues[i] = ValueConverter.convertInt64ToText(longValues[i]);
+          }
+          return textValues;
+        };
+    CONVERTER[TSDataType.INT64.ordinal()][TSDataType.VECTOR.ordinal()] = 
DO_NOTHING_CONVERTER;
+    CONVERTER[TSDataType.INT64.ordinal()][TSDataType.UNKNOWN.ordinal()] = 
DO_NOTHING_CONVERTER;
+    CONVERTER[TSDataType.INT64.ordinal()][TSDataType.TIMESTAMP.ordinal()] =
+        (sourceDataType, targetDataType, sourceValues) -> {
+          final long[] longValues = (long[]) sourceValues;
+          final long[] timestampValues = new long[longValues.length];
+          for (int i = 0; i < longValues.length; i++) {
+            timestampValues[i] = 
ValueConverter.convertInt64ToTimestamp(longValues[i]);
+          }
+          return timestampValues;
+        };
+    CONVERTER[TSDataType.INT64.ordinal()][TSDataType.DATE.ordinal()] =
+        (sourceDataType, targetDataType, sourceValues) -> {
+          final long[] longValues = (long[]) sourceValues;
+          final int[] dateValues = new int[longValues.length];
+          for (int i = 0; i < longValues.length; i++) {
+            dateValues[i] = ValueConverter.convertInt64ToDate(longValues[i]);
+          }
+          return dateValues;
+        };
+    CONVERTER[TSDataType.INT64.ordinal()][TSDataType.BLOB.ordinal()] =
+        (sourceDataType, targetDataType, sourceValues) -> {
+          final long[] longValues = (long[]) sourceValues;
+          final Binary[] blobValues = new Binary[longValues.length];
+          for (int i = 0; i < longValues.length; i++) {
+            blobValues[i] = ValueConverter.convertInt64ToBlob(longValues[i]);
+          }
+          return blobValues;
+        };
+    CONVERTER[TSDataType.INT64.ordinal()][TSDataType.STRING.ordinal()] =
+        (sourceDataType, targetDataType, sourceValues) -> {
+          final long[] longValues = (long[]) sourceValues;
+          final Binary[] stringValues = new Binary[longValues.length];
+          for (int i = 0; i < longValues.length; i++) {
+            stringValues[i] = 
ValueConverter.convertInt64ToString(longValues[i]);
+          }
+          return stringValues;
+        };
+
+    // FLOAT
+    CONVERTER[TSDataType.FLOAT.ordinal()][TSDataType.BOOLEAN.ordinal()] =
+        (sourceDataType, targetDataType, sourceValues) -> {
+          final float[] floatValues = (float[]) sourceValues;
+          final boolean[] boolValues = new boolean[floatValues.length];
+          for (int i = 0; i < floatValues.length; i++) {
+            boolValues[i] = 
ValueConverter.convertFloatToBoolean(floatValues[i]);
+          }
+          return boolValues;
+        };
+    CONVERTER[TSDataType.FLOAT.ordinal()][TSDataType.INT32.ordinal()] =
+        (sourceDataType, targetDataType, sourceValues) -> {
+          final float[] floatValues = (float[]) sourceValues;
+          final int[] intValues = new int[floatValues.length];
+          for (int i = 0; i < floatValues.length; i++) {
+            intValues[i] = ValueConverter.convertFloatToInt32(floatValues[i]);
+          }
+          return intValues;
+        };
+    CONVERTER[TSDataType.FLOAT.ordinal()][TSDataType.INT64.ordinal()] =
+        (sourceDataType, targetDataType, sourceValues) -> {
+          final float[] floatValues = (float[]) sourceValues;
+          final long[] longValues = new long[floatValues.length];
+          for (int i = 0; i < floatValues.length; i++) {
+            longValues[i] = ValueConverter.convertFloatToInt64(floatValues[i]);
+          }
+          return longValues;
+        };
+    CONVERTER[TSDataType.FLOAT.ordinal()][TSDataType.FLOAT.ordinal()] =
+        (sourceDataType, targetDataType, sourceValues) -> sourceValues;
+    CONVERTER[TSDataType.FLOAT.ordinal()][TSDataType.DOUBLE.ordinal()] =
+        (sourceDataType, targetDataType, sourceValues) -> {
+          final float[] floatValues = (float[]) sourceValues;
+          final double[] doubleValues = new double[floatValues.length];
+          for (int i = 0; i < floatValues.length; i++) {
+            doubleValues[i] = 
ValueConverter.convertFloatToDouble(floatValues[i]);
+          }
+          return doubleValues;
+        };
+    CONVERTER[TSDataType.FLOAT.ordinal()][TSDataType.TEXT.ordinal()] =
+        (sourceDataType, targetDataType, sourceValues) -> {
+          final float[] floatValues = (float[]) sourceValues;
+          final Binary[] textValues = new Binary[floatValues.length];
+          for (int i = 0; i < floatValues.length; i++) {
+            textValues[i] = ValueConverter.convertFloatToText(floatValues[i]);
+          }
+          return textValues;
+        };
+    CONVERTER[TSDataType.FLOAT.ordinal()][TSDataType.VECTOR.ordinal()] = 
DO_NOTHING_CONVERTER;
+    CONVERTER[TSDataType.FLOAT.ordinal()][TSDataType.UNKNOWN.ordinal()] = 
DO_NOTHING_CONVERTER;
+    CONVERTER[TSDataType.FLOAT.ordinal()][TSDataType.TIMESTAMP.ordinal()] =
+        (sourceDataType, targetDataType, sourceValues) -> {
+          final float[] floatValues = (float[]) sourceValues;
+          final long[] timestampValues = new long[floatValues.length];
+          for (int i = 0; i < floatValues.length; i++) {
+            timestampValues[i] = 
ValueConverter.convertFloatToTimestamp(floatValues[i]);
+          }
+          return timestampValues;
+        };
+    CONVERTER[TSDataType.FLOAT.ordinal()][TSDataType.DATE.ordinal()] =
+        (sourceDataType, targetDataType, sourceValues) -> {
+          final float[] floatValues = (float[]) sourceValues;
+          final int[] dateValues = new int[floatValues.length];
+          for (int i = 0; i < floatValues.length; i++) {
+            dateValues[i] = ValueConverter.convertFloatToDate(floatValues[i]);
+          }
+          return dateValues;
+        };
+    CONVERTER[TSDataType.FLOAT.ordinal()][TSDataType.BLOB.ordinal()] =
+        (sourceDataType, targetDataType, sourceValues) -> {
+          final float[] floatValues = (float[]) sourceValues;
+          final Binary[] blobValues = new Binary[floatValues.length];
+          for (int i = 0; i < floatValues.length; i++) {
+            blobValues[i] = ValueConverter.convertFloatToBlob(floatValues[i]);
+          }
+          return blobValues;
+        };
+    CONVERTER[TSDataType.FLOAT.ordinal()][TSDataType.STRING.ordinal()] =
+        (sourceDataType, targetDataType, sourceValues) -> {
+          final float[] floatValues = (float[]) sourceValues;
+          final Binary[] stringValues = new Binary[floatValues.length];
+          for (int i = 0; i < floatValues.length; i++) {
+            stringValues[i] = 
ValueConverter.convertFloatToString(floatValues[i]);
+          }
+          return stringValues;
+        };
+
+    // DOUBLE
+    CONVERTER[TSDataType.DOUBLE.ordinal()][TSDataType.BOOLEAN.ordinal()] =
+        (sourceDataType, targetDataType, sourceValues) -> {
+          final double[] doubleValues = (double[]) sourceValues;
+          final boolean[] boolValues = new boolean[doubleValues.length];
+          for (int i = 0; i < doubleValues.length; i++) {
+            boolValues[i] = 
ValueConverter.convertDoubleToBoolean(doubleValues[i]);
+          }
+          return boolValues;
+        };
+    CONVERTER[TSDataType.DOUBLE.ordinal()][TSDataType.INT32.ordinal()] =
+        (sourceDataType, targetDataType, sourceValues) -> {
+          final double[] doubleValues = (double[]) sourceValues;
+          final int[] intValues = new int[doubleValues.length];
+          for (int i = 0; i < doubleValues.length; i++) {
+            intValues[i] = 
ValueConverter.convertDoubleToInt32(doubleValues[i]);
+          }
+          return intValues;
+        };
+    CONVERTER[TSDataType.DOUBLE.ordinal()][TSDataType.INT64.ordinal()] =
+        (sourceDataType, targetDataType, sourceValues) -> {
+          final double[] doubleValues = (double[]) sourceValues;
+          final long[] longValues = new long[doubleValues.length];
+          for (int i = 0; i < doubleValues.length; i++) {
+            longValues[i] = 
ValueConverter.convertDoubleToInt64(doubleValues[i]);
+          }
+          return longValues;
+        };
+    CONVERTER[TSDataType.DOUBLE.ordinal()][TSDataType.FLOAT.ordinal()] =
+        (sourceDataType, targetDataType, sourceValues) -> {
+          final double[] doubleValues = (double[]) sourceValues;
+          final float[] floatValues = new float[doubleValues.length];
+          for (int i = 0; i < doubleValues.length; i++) {
+            floatValues[i] = 
ValueConverter.convertDoubleToFloat(doubleValues[i]);
+          }
+          return floatValues;
+        };
+    CONVERTER[TSDataType.DOUBLE.ordinal()][TSDataType.DOUBLE.ordinal()] =
+        (sourceDataType, targetDataType, sourceValues) -> sourceValues;
+    CONVERTER[TSDataType.DOUBLE.ordinal()][TSDataType.TEXT.ordinal()] =
+        (sourceDataType, targetDataType, sourceValues) -> {
+          final double[] doubleValues = (double[]) sourceValues;
+          final Binary[] textValues = new Binary[doubleValues.length];
+          for (int i = 0; i < doubleValues.length; i++) {
+            textValues[i] = 
ValueConverter.convertDoubleToText(doubleValues[i]);
+          }
+          return textValues;
+        };
+    CONVERTER[TSDataType.DOUBLE.ordinal()][TSDataType.VECTOR.ordinal()] = 
DO_NOTHING_CONVERTER;
+    CONVERTER[TSDataType.DOUBLE.ordinal()][TSDataType.UNKNOWN.ordinal()] = 
DO_NOTHING_CONVERTER;
+    CONVERTER[TSDataType.DOUBLE.ordinal()][TSDataType.TIMESTAMP.ordinal()] =
+        (sourceDataType, targetDataType, sourceValues) -> {
+          final double[] doubleValues = (double[]) sourceValues;
+          final long[] timestampValues = new long[doubleValues.length];
+          for (int i = 0; i < doubleValues.length; i++) {
+            timestampValues[i] = 
ValueConverter.convertDoubleToTimestamp(doubleValues[i]);
+          }
+          return timestampValues;
+        };
+    CONVERTER[TSDataType.DOUBLE.ordinal()][TSDataType.DATE.ordinal()] =
+        (sourceDataType, targetDataType, sourceValues) -> {
+          final double[] doubleValues = (double[]) sourceValues;
+          final int[] dateValues = new int[doubleValues.length];
+          for (int i = 0; i < doubleValues.length; i++) {
+            dateValues[i] = 
ValueConverter.convertDoubleToDate(doubleValues[i]);
+          }
+          return dateValues;
+        };
+    CONVERTER[TSDataType.DOUBLE.ordinal()][TSDataType.BLOB.ordinal()] =
+        (sourceDataType, targetDataType, sourceValues) -> {
+          final double[] doubleValues = (double[]) sourceValues;
+          final Binary[] blobValues = new Binary[doubleValues.length];
+          for (int i = 0; i < doubleValues.length; i++) {
+            blobValues[i] = 
ValueConverter.convertDoubleToBlob(doubleValues[i]);
+          }
+          return blobValues;
+        };
+    CONVERTER[TSDataType.DOUBLE.ordinal()][TSDataType.STRING.ordinal()] =
+        (sourceDataType, targetDataType, sourceValues) -> {
+          final double[] doubleValues = (double[]) sourceValues;
+          final Binary[] stringValues = new Binary[doubleValues.length];
+          for (int i = 0; i < doubleValues.length; i++) {
+            stringValues[i] = 
ValueConverter.convertDoubleToString(doubleValues[i]);
+          }
+          return stringValues;
+        };
+
+    // TEXT
+    CONVERTER[TSDataType.TEXT.ordinal()][TSDataType.BOOLEAN.ordinal()] =
+        (sourceDataType, targetDataType, sourceValues) -> {
+          final Binary[] textValues = (Binary[]) sourceValues;
+          final boolean[] boolValues = new boolean[textValues.length];
+          for (int i = 0; i < textValues.length; i++) {
+            boolValues[i] = ValueConverter.convertTextToBoolean(textValues[i]);
+          }
+          return boolValues;
+        };
+    CONVERTER[TSDataType.TEXT.ordinal()][TSDataType.INT32.ordinal()] =
+        (sourceDataType, targetDataType, sourceValues) -> {
+          final Binary[] textValues = (Binary[]) sourceValues;
+          final int[] intValues = new int[textValues.length];
+          for (int i = 0; i < textValues.length; i++) {
+            intValues[i] = ValueConverter.convertTextToInt32(textValues[i]);
+          }
+          return intValues;
+        };
+    CONVERTER[TSDataType.TEXT.ordinal()][TSDataType.INT64.ordinal()] =
+        (sourceDataType, targetDataType, sourceValues) -> {
+          final Binary[] textValues = (Binary[]) sourceValues;
+          final long[] longValues = new long[textValues.length];
+          for (int i = 0; i < textValues.length; i++) {
+            longValues[i] = ValueConverter.convertTextToInt64(textValues[i]);
+          }
+          return longValues;
+        };
+    CONVERTER[TSDataType.TEXT.ordinal()][TSDataType.FLOAT.ordinal()] =
+        (sourceDataType, targetDataType, sourceValues) -> {
+          final Binary[] textValues = (Binary[]) sourceValues;
+          final float[] floatValues = new float[textValues.length];
+          for (int i = 0; i < textValues.length; i++) {
+            floatValues[i] = ValueConverter.convertTextToFloat(textValues[i]);
+          }
+          return floatValues;
+        };
+    CONVERTER[TSDataType.TEXT.ordinal()][TSDataType.DOUBLE.ordinal()] =
+        (sourceDataType, targetDataType, sourceValues) -> {
+          final Binary[] textValues = (Binary[]) sourceValues;
+          final double[] doubleValues = new double[textValues.length];
+          for (int i = 0; i < textValues.length; i++) {
+            doubleValues[i] = 
ValueConverter.convertTextToDouble(textValues[i]);
+          }
+          return doubleValues;
+        };
+    CONVERTER[TSDataType.TEXT.ordinal()][TSDataType.TEXT.ordinal()] =
+        (sourceDataType, targetDataType, sourceValues) -> sourceValues;
+    CONVERTER[TSDataType.TEXT.ordinal()][TSDataType.VECTOR.ordinal()] = 
DO_NOTHING_CONVERTER;
+    CONVERTER[TSDataType.TEXT.ordinal()][TSDataType.UNKNOWN.ordinal()] = 
DO_NOTHING_CONVERTER;
+    CONVERTER[TSDataType.TEXT.ordinal()][TSDataType.TIMESTAMP.ordinal()] =
+        (sourceDataType, targetDataType, sourceValues) -> {
+          final Binary[] textValues = (Binary[]) sourceValues;
+          final long[] timestampValues = new long[textValues.length];
+          for (int i = 0; i < textValues.length; i++) {
+            timestampValues[i] = 
ValueConverter.convertTextToTimestamp(textValues[i]);
+          }
+          return timestampValues;
+        };
+    CONVERTER[TSDataType.TEXT.ordinal()][TSDataType.DATE.ordinal()] =
+        (sourceDataType, targetDataType, sourceValues) -> {
+          final Binary[] textValues = (Binary[]) sourceValues;
+          final int[] dateValues = new int[textValues.length];
+          for (int i = 0; i < textValues.length; i++) {
+            dateValues[i] = ValueConverter.convertTextToDate(textValues[i]);
+          }
+          return dateValues;
+        };
+    CONVERTER[TSDataType.TEXT.ordinal()][TSDataType.BLOB.ordinal()] =
+        (sourceDataType, targetDataType, sourceValues) -> {
+          final Binary[] textValues = (Binary[]) sourceValues;
+          final Binary[] blobValues = new Binary[textValues.length];
+          for (int i = 0; i < textValues.length; i++) {
+            blobValues[i] = ValueConverter.convertTextToBlob(textValues[i]);
+          }
+          return blobValues;
+        };
+    CONVERTER[TSDataType.TEXT.ordinal()][TSDataType.STRING.ordinal()] =
+        (sourceDataType, targetDataType, sourceValues) -> {
+          final Binary[] textValues = (Binary[]) sourceValues;
+          final Binary[] stringValues = new Binary[textValues.length];
+          for (int i = 0; i < textValues.length; i++) {
+            stringValues[i] = 
ValueConverter.convertTextToString(textValues[i]);
+          }
+          return stringValues;
+        };
+
+    // VECTOR
+    for (int i = 0; i < TSDataType.values().length; i++) {
+      CONVERTER[TSDataType.VECTOR.ordinal()][i] = DO_NOTHING_CONVERTER;
+    }
+
+    // UNKNOWN
+    for (int i = 0; i < TSDataType.values().length; i++) {
+      CONVERTER[TSDataType.UNKNOWN.ordinal()][i] = DO_NOTHING_CONVERTER;
+    }
+
+    // TIMESTAMP
+    CONVERTER[TSDataType.TIMESTAMP.ordinal()][TSDataType.BOOLEAN.ordinal()] =
+        (sourceDataType, targetDataType, sourceValues) -> {
+          final long[] timestampValues = (long[]) sourceValues;
+          final boolean[] boolValues = new boolean[timestampValues.length];
+          for (int i = 0; i < timestampValues.length; i++) {
+            boolValues[i] = 
ValueConverter.convertTimestampToBoolean(timestampValues[i]);
+          }
+          return boolValues;
+        };
+    CONVERTER[TSDataType.TIMESTAMP.ordinal()][TSDataType.INT32.ordinal()] =
+        (sourceDataType, targetDataType, sourceValues) -> {
+          final long[] timestampValues = (long[]) sourceValues;
+          final int[] intValues = new int[timestampValues.length];
+          for (int i = 0; i < timestampValues.length; i++) {
+            intValues[i] = 
ValueConverter.convertTimestampToInt32(timestampValues[i]);
+          }
+          return intValues;
+        };
+    CONVERTER[TSDataType.TIMESTAMP.ordinal()][TSDataType.INT64.ordinal()] =
+        (sourceDataType, targetDataType, sourceValues) -> {
+          final long[] timestampValues = (long[]) sourceValues;
+          final long[] longValues = new long[timestampValues.length];
+          for (int i = 0; i < timestampValues.length; i++) {
+            longValues[i] = 
ValueConverter.convertTimestampToInt64(timestampValues[i]);
+          }
+          return longValues;
+        };
+    CONVERTER[TSDataType.TIMESTAMP.ordinal()][TSDataType.FLOAT.ordinal()] =
+        (sourceDataType, targetDataType, sourceValues) -> {
+          final long[] timestampValues = (long[]) sourceValues;
+          final float[] floatValues = new float[timestampValues.length];
+          for (int i = 0; i < timestampValues.length; i++) {
+            floatValues[i] = 
ValueConverter.convertTimestampToFloat(timestampValues[i]);
+          }
+          return floatValues;
+        };
+    CONVERTER[TSDataType.TIMESTAMP.ordinal()][TSDataType.DOUBLE.ordinal()] =
+        (sourceDataType, targetDataType, sourceValues) -> {
+          final long[] timestampValues = (long[]) sourceValues;
+          final double[] doubleValues = new double[timestampValues.length];
+          for (int i = 0; i < timestampValues.length; i++) {
+            doubleValues[i] = 
ValueConverter.convertTimestampToDouble(timestampValues[i]);
+          }
+          return doubleValues;
+        };
+    CONVERTER[TSDataType.TIMESTAMP.ordinal()][TSDataType.TEXT.ordinal()] =
+        (sourceDataType, targetDataType, sourceValues) -> {
+          final long[] timestampValues = (long[]) sourceValues;
+          final Binary[] textValues = new Binary[timestampValues.length];
+          for (int i = 0; i < timestampValues.length; i++) {
+            textValues[i] = 
ValueConverter.convertTimestampToText(timestampValues[i]);
+          }
+          return textValues;
+        };
+    CONVERTER[TSDataType.TIMESTAMP.ordinal()][TSDataType.VECTOR.ordinal()] = 
DO_NOTHING_CONVERTER;
+    CONVERTER[TSDataType.TIMESTAMP.ordinal()][TSDataType.UNKNOWN.ordinal()] = 
DO_NOTHING_CONVERTER;
+    CONVERTER[TSDataType.TIMESTAMP.ordinal()][TSDataType.TIMESTAMP.ordinal()] =
+        (sourceDataType, targetDataType, sourceValues) -> sourceValues;
+    CONVERTER[TSDataType.TIMESTAMP.ordinal()][TSDataType.DATE.ordinal()] =
+        (sourceDataType, targetDataType, sourceValues) -> {
+          final long[] timestampValues = (long[]) sourceValues;
+          final int[] dateValues = new int[timestampValues.length];
+          for (int i = 0; i < timestampValues.length; i++) {
+            dateValues[i] = 
ValueConverter.convertTimestampToDate(timestampValues[i]);
+          }
+          return dateValues;
+        };
+    CONVERTER[TSDataType.TIMESTAMP.ordinal()][TSDataType.BLOB.ordinal()] =
+        (sourceDataType, targetDataType, sourceValues) -> {
+          final long[] timestampValues = (long[]) sourceValues;
+          final Binary[] blobValues = new Binary[timestampValues.length];
+          for (int i = 0; i < timestampValues.length; i++) {
+            blobValues[i] = 
ValueConverter.convertTimestampToBlob(timestampValues[i]);
+          }
+          return blobValues;
+        };
+    CONVERTER[TSDataType.TIMESTAMP.ordinal()][TSDataType.STRING.ordinal()] =
+        (sourceDataType, targetDataType, sourceValues) -> {
+          final long[] timestampValues = (long[]) sourceValues;
+          final Binary[] stringValues = new Binary[timestampValues.length];
+          for (int i = 0; i < timestampValues.length; i++) {
+            stringValues[i] = 
ValueConverter.convertTimestampToString(timestampValues[i]);
+          }
+          return stringValues;
+        };
+
+    // DATE
+    CONVERTER[TSDataType.DATE.ordinal()][TSDataType.BOOLEAN.ordinal()] =
+        (sourceDataType, targetDataType, sourceValues) -> {
+          final int[] dateValues = (int[]) sourceValues;
+          final boolean[] boolValues = new boolean[dateValues.length];
+          for (int i = 0; i < dateValues.length; i++) {
+            boolValues[i] = ValueConverter.convertDateToBoolean(dateValues[i]);
+          }
+          return boolValues;
+        };
+    CONVERTER[TSDataType.DATE.ordinal()][TSDataType.INT32.ordinal()] =
+        (sourceDataType, targetDataType, sourceValues) -> {
+          final int[] dateValues = (int[]) sourceValues;
+          final int[] intValues = new int[dateValues.length];
+          for (int i = 0; i < dateValues.length; i++) {
+            intValues[i] = ValueConverter.convertDateToInt32(dateValues[i]);
+          }
+          return intValues;
+        };
+    CONVERTER[TSDataType.DATE.ordinal()][TSDataType.INT64.ordinal()] =
+        (sourceDataType, targetDataType, sourceValues) -> {
+          final int[] dateValues = (int[]) sourceValues;
+          final long[] longValues = new long[dateValues.length];
+          for (int i = 0; i < dateValues.length; i++) {
+            longValues[i] = ValueConverter.convertDateToInt64(dateValues[i]);
+          }
+          return longValues;
+        };
+    CONVERTER[TSDataType.DATE.ordinal()][TSDataType.FLOAT.ordinal()] =
+        (sourceDataType, targetDataType, sourceValues) -> {
+          final int[] dateValues = (int[]) sourceValues;
+          final float[] floatValues = new float[dateValues.length];
+          for (int i = 0; i < dateValues.length; i++) {
+            floatValues[i] = ValueConverter.convertDateToFloat(dateValues[i]);
+          }
+          return floatValues;
+        };
+    CONVERTER[TSDataType.DATE.ordinal()][TSDataType.DOUBLE.ordinal()] =
+        (sourceDataType, targetDataType, sourceValues) -> {
+          final int[] dateValues = (int[]) sourceValues;
+          final double[] doubleValues = new double[dateValues.length];
+          for (int i = 0; i < dateValues.length; i++) {
+            doubleValues[i] = 
ValueConverter.convertDateToDouble(dateValues[i]);
+          }
+          return doubleValues;
+        };
+    CONVERTER[TSDataType.DATE.ordinal()][TSDataType.TEXT.ordinal()] =
+        (sourceDataType, targetDataType, sourceValues) -> {
+          final int[] dateValues = (int[]) sourceValues;
+          final Binary[] textValues = new Binary[dateValues.length];
+          for (int i = 0; i < dateValues.length; i++) {
+            textValues[i] = ValueConverter.convertDateToText(dateValues[i]);
+          }
+          return textValues;
+        };
+    CONVERTER[TSDataType.DATE.ordinal()][TSDataType.VECTOR.ordinal()] = 
DO_NOTHING_CONVERTER;
+    CONVERTER[TSDataType.DATE.ordinal()][TSDataType.UNKNOWN.ordinal()] = 
DO_NOTHING_CONVERTER;
+    CONVERTER[TSDataType.DATE.ordinal()][TSDataType.TIMESTAMP.ordinal()] =
+        (sourceDataType, targetDataType, sourceValues) -> {
+          final int[] dateValues = (int[]) sourceValues;
+          final long[] timestampValues = new long[dateValues.length];
+          for (int i = 0; i < dateValues.length; i++) {
+            timestampValues[i] = 
ValueConverter.convertDateToTimestamp(dateValues[i]);
+          }
+          return timestampValues;
+        };
+    CONVERTER[TSDataType.DATE.ordinal()][TSDataType.DATE.ordinal()] =
+        (sourceDataType, targetDataType, sourceValues) -> sourceValues;
+    CONVERTER[TSDataType.DATE.ordinal()][TSDataType.BLOB.ordinal()] =
+        (sourceDataType, targetDataType, sourceValues) -> {
+          final int[] dateValues = (int[]) sourceValues;
+          final Binary[] blobValues = new Binary[dateValues.length];
+          for (int i = 0; i < dateValues.length; i++) {
+            blobValues[i] = ValueConverter.convertDateToBlob(dateValues[i]);
+          }
+          return blobValues;
+        };
+    CONVERTER[TSDataType.DATE.ordinal()][TSDataType.STRING.ordinal()] =
+        (sourceDataType, targetDataType, sourceValues) -> {
+          final int[] dateValues = (int[]) sourceValues;
+          final Binary[] stringValues = new Binary[dateValues.length];
+          for (int i = 0; i < dateValues.length; i++) {
+            stringValues[i] = 
ValueConverter.convertDateToString(dateValues[i]);
+          }
+          return stringValues;
+        };
+
+    // BLOB
+    CONVERTER[TSDataType.BLOB.ordinal()][TSDataType.BOOLEAN.ordinal()] =
+        (sourceDataType, targetDataType, sourceValues) -> {
+          final Binary[] blobValues = (Binary[]) sourceValues;
+          final boolean[] boolValues = new boolean[blobValues.length];
+          for (int i = 0; i < blobValues.length; i++) {
+            boolValues[i] = ValueConverter.convertBlobToBoolean(blobValues[i]);
+          }
+          return boolValues;
+        };
+    CONVERTER[TSDataType.BLOB.ordinal()][TSDataType.INT32.ordinal()] =
+        (sourceDataType, targetDataType, sourceValues) -> {
+          final Binary[] blobValues = (Binary[]) sourceValues;
+          final int[] intValues = new int[blobValues.length];
+          for (int i = 0; i < blobValues.length; i++) {
+            intValues[i] = ValueConverter.convertBlobToInt32(blobValues[i]);
+          }
+          return intValues;
+        };
+    CONVERTER[TSDataType.BLOB.ordinal()][TSDataType.INT64.ordinal()] =
+        (sourceDataType, targetDataType, sourceValues) -> {
+          final Binary[] blobValues = (Binary[]) sourceValues;
+          final long[] longValues = new long[blobValues.length];
+          for (int i = 0; i < blobValues.length; i++) {
+            longValues[i] = ValueConverter.convertBlobToInt64(blobValues[i]);
+          }
+          return longValues;
+        };
+    CONVERTER[TSDataType.BLOB.ordinal()][TSDataType.FLOAT.ordinal()] =
+        (sourceDataType, targetDataType, sourceValues) -> {
+          final Binary[] blobValues = (Binary[]) sourceValues;
+          final float[] floatValues = new float[blobValues.length];
+          for (int i = 0; i < blobValues.length; i++) {
+            floatValues[i] = ValueConverter.convertBlobToFloat(blobValues[i]);
+          }
+          return floatValues;
+        };
+    CONVERTER[TSDataType.BLOB.ordinal()][TSDataType.DOUBLE.ordinal()] =
+        (sourceDataType, targetDataType, sourceValues) -> {
+          final Binary[] blobValues = (Binary[]) sourceValues;
+          final double[] doubleValues = new double[blobValues.length];
+          for (int i = 0; i < blobValues.length; i++) {
+            doubleValues[i] = 
ValueConverter.convertBlobToDouble(blobValues[i]);
+          }
+          return doubleValues;
+        };
+    CONVERTER[TSDataType.BLOB.ordinal()][TSDataType.TEXT.ordinal()] =
+        (sourceDataType, targetDataType, sourceValues) -> {
+          final Binary[] blobValues = (Binary[]) sourceValues;
+          final Binary[] textValues = new Binary[blobValues.length];
+          for (int i = 0; i < blobValues.length; i++) {
+            textValues[i] = ValueConverter.convertBlobToText(blobValues[i]);
+          }
+          return textValues;
+        };
+    CONVERTER[TSDataType.BLOB.ordinal()][TSDataType.VECTOR.ordinal()] = 
DO_NOTHING_CONVERTER;
+    CONVERTER[TSDataType.BLOB.ordinal()][TSDataType.UNKNOWN.ordinal()] = 
DO_NOTHING_CONVERTER;
+    CONVERTER[TSDataType.BLOB.ordinal()][TSDataType.TIMESTAMP.ordinal()] =
+        (sourceDataType, targetDataType, sourceValues) -> {
+          final Binary[] blobValues = (Binary[]) sourceValues;
+          final long[] timestampValues = new long[blobValues.length];
+          for (int i = 0; i < blobValues.length; i++) {
+            timestampValues[i] = 
ValueConverter.convertBlobToTimestamp(blobValues[i]);
+          }
+          return timestampValues;
+        };
+    CONVERTER[TSDataType.BLOB.ordinal()][TSDataType.DATE.ordinal()] =
+        (sourceDataType, targetDataType, sourceValues) -> {
+          final Binary[] blobValues = (Binary[]) sourceValues;
+          final int[] dateValues = new int[blobValues.length];
+          for (int i = 0; i < blobValues.length; i++) {
+            dateValues[i] = ValueConverter.convertBlobToDate(blobValues[i]);
+          }
+          return dateValues;
+        };
+    CONVERTER[TSDataType.BLOB.ordinal()][TSDataType.BLOB.ordinal()] =
+        (sourceDataType, targetDataType, sourceValues) -> sourceValues;
+    CONVERTER[TSDataType.BLOB.ordinal()][TSDataType.STRING.ordinal()] =
+        (sourceDataType, targetDataType, sourceValues) -> {
+          final Binary[] blobValues = (Binary[]) sourceValues;
+          final Binary[] stringValues = new Binary[blobValues.length];
+          for (int i = 0; i < blobValues.length; i++) {
+            stringValues[i] = 
ValueConverter.convertBlobToString(blobValues[i]);
+          }
+          return stringValues;
+        };
+
+    // STRING
+    CONVERTER[TSDataType.STRING.ordinal()][TSDataType.BOOLEAN.ordinal()] =
+        (sourceDataType, targetDataType, sourceValues) -> {
+          final Binary[] stringValues = (Binary[]) sourceValues;
+          final boolean[] boolValues = new boolean[stringValues.length];
+          for (int i = 0; i < stringValues.length; i++) {
+            boolValues[i] = 
ValueConverter.convertStringToBoolean(stringValues[i]);
+          }
+          return boolValues;
+        };
+    CONVERTER[TSDataType.STRING.ordinal()][TSDataType.INT32.ordinal()] =
+        (sourceDataType, targetDataType, sourceValues) -> {
+          final Binary[] stringValues = (Binary[]) sourceValues;
+          final int[] intValues = new int[stringValues.length];
+          for (int i = 0; i < stringValues.length; i++) {
+            intValues[i] = 
ValueConverter.convertStringToInt32(stringValues[i]);
+          }
+          return intValues;
+        };
+    CONVERTER[TSDataType.STRING.ordinal()][TSDataType.INT64.ordinal()] =
+        (sourceDataType, targetDataType, sourceValues) -> {
+          final Binary[] stringValues = (Binary[]) sourceValues;
+          final long[] longValues = new long[stringValues.length];
+          for (int i = 0; i < stringValues.length; i++) {
+            longValues[i] = 
ValueConverter.convertStringToInt64(stringValues[i]);
+          }
+          return longValues;
+        };
+    CONVERTER[TSDataType.STRING.ordinal()][TSDataType.FLOAT.ordinal()] =
+        (sourceDataType, targetDataType, sourceValues) -> {
+          final Binary[] stringValues = (Binary[]) sourceValues;
+          final float[] floatValues = new float[stringValues.length];
+          for (int i = 0; i < stringValues.length; i++) {
+            floatValues[i] = 
ValueConverter.convertStringToFloat(stringValues[i]);
+          }
+          return floatValues;
+        };
+    CONVERTER[TSDataType.STRING.ordinal()][TSDataType.DOUBLE.ordinal()] =
+        (sourceDataType, targetDataType, sourceValues) -> {
+          final Binary[] stringValues = (Binary[]) sourceValues;
+          final double[] doubleValues = new double[stringValues.length];
+          for (int i = 0; i < stringValues.length; i++) {
+            doubleValues[i] = 
ValueConverter.convertStringToDouble(stringValues[i]);
+          }
+          return doubleValues;
+        };
+    CONVERTER[TSDataType.STRING.ordinal()][TSDataType.TEXT.ordinal()] =
+        (sourceDataType, targetDataType, sourceValues) -> {
+          final Binary[] stringValues = (Binary[]) sourceValues;
+          final Binary[] textValues = new Binary[stringValues.length];
+          for (int i = 0; i < stringValues.length; i++) {
+            textValues[i] = 
ValueConverter.convertStringToText(stringValues[i]);
+          }
+          return textValues;
+        };
+    CONVERTER[TSDataType.STRING.ordinal()][TSDataType.VECTOR.ordinal()] = 
DO_NOTHING_CONVERTER;
+    CONVERTER[TSDataType.STRING.ordinal()][TSDataType.UNKNOWN.ordinal()] = 
DO_NOTHING_CONVERTER;
+    CONVERTER[TSDataType.STRING.ordinal()][TSDataType.TIMESTAMP.ordinal()] =
+        (sourceDataType, targetDataType, sourceValues) -> {
+          final Binary[] stringValues = (Binary[]) sourceValues;
+          final long[] timestampValues = new long[stringValues.length];
+          for (int i = 0; i < stringValues.length; i++) {
+            timestampValues[i] = 
ValueConverter.convertStringToTimestamp(stringValues[i]);
+          }
+          return timestampValues;
+        };
+    CONVERTER[TSDataType.STRING.ordinal()][TSDataType.DATE.ordinal()] =
+        (sourceDataType, targetDataType, sourceValues) -> {
+          final Binary[] stringValues = (Binary[]) sourceValues;
+          final int[] dateValues = new int[stringValues.length];
+          for (int i = 0; i < stringValues.length; i++) {
+            dateValues[i] = 
ValueConverter.convertStringToDate(stringValues[i]);
+          }
+          return dateValues;
+        };
+    CONVERTER[TSDataType.STRING.ordinal()][TSDataType.BLOB.ordinal()] =
+        (sourceDataType, targetDataType, sourceValues) -> {
+          final Binary[] stringValues = (Binary[]) sourceValues;
+          final Binary[] blobValues = new Binary[stringValues.length];
+          for (int i = 0; i < stringValues.length; i++) {
+            blobValues[i] = 
ValueConverter.convertStringToBlob(stringValues[i]);
+          }
+          return blobValues;
+        };
+    CONVERTER[TSDataType.STRING.ordinal()][TSDataType.STRING.ordinal()] =
+        (sourceDataType, targetDataType, sourceValues) -> sourceValues;
+  }
+
+  public static Object convert(
+      final TSDataType sourceDataType, final TSDataType targetDataType, final 
Object sourceValues) {
+    return sourceValues == null
+        ? null
+        : 
CONVERTER[sourceDataType.ordinal()][targetDataType.ordinal()].convert(
+            sourceDataType, targetDataType, sourceValues);
+  }
+
+  private ArrayConverter() {
+    // forbidden to construct
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/transform/converter/ValueConverter.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/transform/converter/ValueConverter.java
new file mode 100644
index 00000000000..098792fd0b3
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/transform/converter/ValueConverter.java
@@ -0,0 +1,787 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.receiver.transform.converter;
+
+import org.apache.iotdb.db.utils.DateTimeUtils;
+import org.apache.iotdb.db.utils.TypeInferenceUtils;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.tsfile.common.conf.TSFileConfig;
+import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.utils.Binary;
+
+import java.time.ZoneId;
+
+public class ValueConverter {
+
+  @FunctionalInterface
+  private interface Converter {
+    Object convert(
+        final TSDataType sourceDataType, final TSDataType targetDataType, 
final Object sourceValue);
+  }
+
+  private static final Converter[][] CONVERTER =
+      new Converter[TSDataType.values().length][TSDataType.values().length];
+
+  private static final Converter DO_NOTHING_CONVERTER =
+      (sourceDataType, targetDataType, sourceValue) -> sourceValue;
+
+  static {
+    for (final TSDataType sourceDataType : TSDataType.values()) {
+      for (final TSDataType targetDataType : TSDataType.values()) {
+        CONVERTER[sourceDataType.ordinal()][targetDataType.ordinal()] = 
DO_NOTHING_CONVERTER;
+      }
+    }
+
+    // BOOLEAN
+    CONVERTER[TSDataType.BOOLEAN.ordinal()][TSDataType.BOOLEAN.ordinal()] = 
DO_NOTHING_CONVERTER;
+    CONVERTER[TSDataType.BOOLEAN.ordinal()][TSDataType.INT32.ordinal()] =
+        (sourceDataType, targetDataType, sourceValue) ->
+            convertBooleanToInt32((boolean) sourceValue);
+    CONVERTER[TSDataType.BOOLEAN.ordinal()][TSDataType.INT64.ordinal()] =
+        (sourceDataType, targetDataType, sourceValue) ->
+            convertBooleanToInt64((boolean) sourceValue);
+    CONVERTER[TSDataType.BOOLEAN.ordinal()][TSDataType.FLOAT.ordinal()] =
+        (sourceDataType, targetDataType, sourceValue) ->
+            convertBooleanToFloat((boolean) sourceValue);
+    CONVERTER[TSDataType.BOOLEAN.ordinal()][TSDataType.DOUBLE.ordinal()] =
+        (sourceDataType, targetDataType, sourceValue) ->
+            convertBooleanToDouble((boolean) sourceValue);
+    CONVERTER[TSDataType.BOOLEAN.ordinal()][TSDataType.TEXT.ordinal()] =
+        (sourceDataType, targetDataType, sourceValue) ->
+            convertBooleanToText((boolean) sourceValue);
+    CONVERTER[TSDataType.BOOLEAN.ordinal()][TSDataType.TIMESTAMP.ordinal()] =
+        (sourceDataType, targetDataType, sourceValue) ->
+            convertBooleanToTimestamp((boolean) sourceValue);
+    CONVERTER[TSDataType.BOOLEAN.ordinal()][TSDataType.DATE.ordinal()] =
+        (sourceDataType, targetDataType, sourceValue) ->
+            convertBooleanToDate((boolean) sourceValue);
+    CONVERTER[TSDataType.BOOLEAN.ordinal()][TSDataType.BLOB.ordinal()] =
+        (sourceDataType, targetDataType, sourceValue) ->
+            convertBooleanToBlob((boolean) sourceValue);
+    CONVERTER[TSDataType.BOOLEAN.ordinal()][TSDataType.STRING.ordinal()] =
+        (sourceDataType, targetDataType, sourceValue) ->
+            convertBooleanToString((boolean) sourceValue);
+
+    // INT32
+    CONVERTER[TSDataType.INT32.ordinal()][TSDataType.BOOLEAN.ordinal()] =
+        (sourceDataType, targetDataType, sourceValue) -> 
convertInt32ToBoolean((int) sourceValue);
+    CONVERTER[TSDataType.INT32.ordinal()][TSDataType.INT32.ordinal()] = 
DO_NOTHING_CONVERTER;
+    CONVERTER[TSDataType.INT32.ordinal()][TSDataType.INT64.ordinal()] =
+        (sourceDataType, targetDataType, sourceValue) -> 
convertInt32ToInt64((int) sourceValue);
+    CONVERTER[TSDataType.INT32.ordinal()][TSDataType.FLOAT.ordinal()] =
+        (sourceDataType, targetDataType, sourceValue) -> 
convertInt32ToFloat((int) sourceValue);
+    CONVERTER[TSDataType.INT32.ordinal()][TSDataType.DOUBLE.ordinal()] =
+        (sourceDataType, targetDataType, sourceValue) -> 
convertInt32ToDouble((int) sourceValue);
+    CONVERTER[TSDataType.INT32.ordinal()][TSDataType.TEXT.ordinal()] =
+        (sourceDataType, targetDataType, sourceValue) -> 
convertInt32ToText((int) sourceValue);
+    CONVERTER[TSDataType.INT32.ordinal()][TSDataType.TIMESTAMP.ordinal()] =
+        (sourceDataType, targetDataType, sourceValue) -> 
convertInt32ToTimestamp((int) sourceValue);
+    CONVERTER[TSDataType.INT32.ordinal()][TSDataType.DATE.ordinal()] =
+        (sourceDataType, targetDataType, sourceValue) -> 
convertInt32ToDate((int) sourceValue);
+    CONVERTER[TSDataType.INT32.ordinal()][TSDataType.BLOB.ordinal()] =
+        (sourceDataType, targetDataType, sourceValue) -> 
convertInt32ToBlob((int) sourceValue);
+    CONVERTER[TSDataType.INT32.ordinal()][TSDataType.STRING.ordinal()] =
+        (sourceDataType, targetDataType, sourceValue) -> 
convertInt32ToString((int) sourceValue);
+
+    // INT64
+    CONVERTER[TSDataType.INT64.ordinal()][TSDataType.BOOLEAN.ordinal()] =
+        (sourceDataType, targetDataType, sourceValue) -> 
convertInt64ToBoolean((long) sourceValue);
+    CONVERTER[TSDataType.INT64.ordinal()][TSDataType.INT32.ordinal()] =
+        (sourceDataType, targetDataType, sourceValue) -> 
convertInt64ToInt32((long) sourceValue);
+    CONVERTER[TSDataType.INT64.ordinal()][TSDataType.INT64.ordinal()] = 
DO_NOTHING_CONVERTER;
+    CONVERTER[TSDataType.INT64.ordinal()][TSDataType.FLOAT.ordinal()] =
+        (sourceDataType, targetDataType, sourceValue) -> 
convertInt64ToFloat((long) sourceValue);
+    CONVERTER[TSDataType.INT64.ordinal()][TSDataType.DOUBLE.ordinal()] =
+        (sourceDataType, targetDataType, sourceValue) -> 
convertInt64ToDouble((long) sourceValue);
+    CONVERTER[TSDataType.INT64.ordinal()][TSDataType.TEXT.ordinal()] =
+        (sourceDataType, targetDataType, sourceValue) -> 
convertInt64ToText((long) sourceValue);
+    CONVERTER[TSDataType.INT64.ordinal()][TSDataType.TIMESTAMP.ordinal()] =
+        (sourceDataType, targetDataType, sourceValue) ->
+            convertInt64ToTimestamp((long) sourceValue);
+    CONVERTER[TSDataType.INT64.ordinal()][TSDataType.DATE.ordinal()] =
+        (sourceDataType, targetDataType, sourceValue) -> 
convertInt64ToDate((long) sourceValue);
+    CONVERTER[TSDataType.INT64.ordinal()][TSDataType.BLOB.ordinal()] =
+        (sourceDataType, targetDataType, sourceValue) -> 
convertInt64ToBlob((long) sourceValue);
+    CONVERTER[TSDataType.INT64.ordinal()][TSDataType.STRING.ordinal()] =
+        (sourceDataType, targetDataType, sourceValue) -> 
convertInt64ToString((long) sourceValue);
+
+    // FLOAT
+    CONVERTER[TSDataType.FLOAT.ordinal()][TSDataType.BOOLEAN.ordinal()] =
+        (sourceDataType, targetDataType, sourceValue) -> 
convertFloatToBoolean((float) sourceValue);
+    CONVERTER[TSDataType.FLOAT.ordinal()][TSDataType.INT32.ordinal()] =
+        (sourceDataType, targetDataType, sourceValue) -> 
convertFloatToInt32((float) sourceValue);
+    CONVERTER[TSDataType.FLOAT.ordinal()][TSDataType.INT64.ordinal()] =
+        (sourceDataType, targetDataType, sourceValue) -> 
convertFloatToInt64((float) sourceValue);
+    CONVERTER[TSDataType.FLOAT.ordinal()][TSDataType.FLOAT.ordinal()] = 
DO_NOTHING_CONVERTER;
+    CONVERTER[TSDataType.FLOAT.ordinal()][TSDataType.DOUBLE.ordinal()] =
+        (sourceDataType, targetDataType, sourceValue) -> 
convertFloatToDouble((float) sourceValue);
+    CONVERTER[TSDataType.FLOAT.ordinal()][TSDataType.TEXT.ordinal()] =
+        (sourceDataType, targetDataType, sourceValue) -> 
convertFloatToText((float) sourceValue);
+    CONVERTER[TSDataType.FLOAT.ordinal()][TSDataType.TIMESTAMP.ordinal()] =
+        (sourceDataType, targetDataType, sourceValue) ->
+            convertFloatToTimestamp((float) sourceValue);
+    CONVERTER[TSDataType.FLOAT.ordinal()][TSDataType.DATE.ordinal()] =
+        (sourceDataType, targetDataType, sourceValue) -> 
convertFloatToDate((float) sourceValue);
+    CONVERTER[TSDataType.FLOAT.ordinal()][TSDataType.BLOB.ordinal()] =
+        (sourceDataType, targetDataType, sourceValue) -> 
convertFloatToBlob((float) sourceValue);
+    CONVERTER[TSDataType.FLOAT.ordinal()][TSDataType.STRING.ordinal()] =
+        (sourceDataType, targetDataType, sourceValue) -> 
convertFloatToString((float) sourceValue);
+
+    // DOUBLE
+    CONVERTER[TSDataType.DOUBLE.ordinal()][TSDataType.BOOLEAN.ordinal()] =
+        (sourceDataType, targetDataType, sourceValue) ->
+            convertDoubleToBoolean((double) sourceValue);
+    CONVERTER[TSDataType.DOUBLE.ordinal()][TSDataType.INT32.ordinal()] =
+        (sourceDataType, targetDataType, sourceValue) -> 
convertDoubleToInt32((double) sourceValue);
+    CONVERTER[TSDataType.DOUBLE.ordinal()][TSDataType.INT64.ordinal()] =
+        (sourceDataType, targetDataType, sourceValue) -> 
convertDoubleToInt64((double) sourceValue);
+    CONVERTER[TSDataType.DOUBLE.ordinal()][TSDataType.FLOAT.ordinal()] =
+        (sourceDataType, targetDataType, sourceValue) -> 
convertDoubleToFloat((double) sourceValue);
+    CONVERTER[TSDataType.DOUBLE.ordinal()][TSDataType.DOUBLE.ordinal()] = 
DO_NOTHING_CONVERTER;
+    CONVERTER[TSDataType.DOUBLE.ordinal()][TSDataType.TEXT.ordinal()] =
+        (sourceDataType, targetDataType, sourceValue) -> 
convertDoubleToText((double) sourceValue);
+    CONVERTER[TSDataType.DOUBLE.ordinal()][TSDataType.TIMESTAMP.ordinal()] =
+        (sourceDataType, targetDataType, sourceValue) ->
+            convertDoubleToTimestamp((double) sourceValue);
+    CONVERTER[TSDataType.DOUBLE.ordinal()][TSDataType.DATE.ordinal()] =
+        (sourceDataType, targetDataType, sourceValue) -> 
convertDoubleToDate((double) sourceValue);
+    CONVERTER[TSDataType.DOUBLE.ordinal()][TSDataType.BLOB.ordinal()] =
+        (sourceDataType, targetDataType, sourceValue) -> 
convertDoubleToBlob((double) sourceValue);
+    CONVERTER[TSDataType.DOUBLE.ordinal()][TSDataType.STRING.ordinal()] =
+        (sourceDataType, targetDataType, sourceValue) ->
+            convertDoubleToString((double) sourceValue);
+
+    // TEXT
+    CONVERTER[TSDataType.TEXT.ordinal()][TSDataType.BOOLEAN.ordinal()] =
+        (sourceDataType, targetDataType, sourceValue) -> 
convertTextToBoolean((Binary) sourceValue);
+    CONVERTER[TSDataType.TEXT.ordinal()][TSDataType.INT32.ordinal()] =
+        (sourceDataType, targetDataType, sourceValue) -> 
convertTextToInt32((Binary) sourceValue);
+    CONVERTER[TSDataType.TEXT.ordinal()][TSDataType.INT64.ordinal()] =
+        (sourceDataType, targetDataType, sourceValue) -> 
convertTextToInt64((Binary) sourceValue);
+    CONVERTER[TSDataType.TEXT.ordinal()][TSDataType.FLOAT.ordinal()] =
+        (sourceDataType, targetDataType, sourceValue) -> 
convertTextToFloat((Binary) sourceValue);
+    CONVERTER[TSDataType.TEXT.ordinal()][TSDataType.DOUBLE.ordinal()] =
+        (sourceDataType, targetDataType, sourceValue) -> 
convertTextToDouble((Binary) sourceValue);
+    CONVERTER[TSDataType.TEXT.ordinal()][TSDataType.TEXT.ordinal()] = 
DO_NOTHING_CONVERTER;
+    CONVERTER[TSDataType.TEXT.ordinal()][TSDataType.TIMESTAMP.ordinal()] =
+        (sourceDataType, targetDataType, sourceValue) ->
+            convertTextToTimestamp((Binary) sourceValue);
+    CONVERTER[TSDataType.TEXT.ordinal()][TSDataType.DATE.ordinal()] =
+        (sourceDataType, targetDataType, sourceValue) -> 
convertTextToDate((Binary) sourceValue);
+    CONVERTER[TSDataType.TEXT.ordinal()][TSDataType.BLOB.ordinal()] =
+        (sourceDataType, targetDataType, sourceValue) -> 
convertTextToBlob((Binary) sourceValue);
+    CONVERTER[TSDataType.TEXT.ordinal()][TSDataType.STRING.ordinal()] =
+        (sourceDataType, targetDataType, sourceValue) -> 
convertTextToString((Binary) sourceValue);
+
+    // TIMESTAMP
+    CONVERTER[TSDataType.TIMESTAMP.ordinal()][TSDataType.BOOLEAN.ordinal()] =
+        (sourceDataType, targetDataType, sourceValue) ->
+            convertTimestampToBoolean((long) sourceValue);
+    CONVERTER[TSDataType.TIMESTAMP.ordinal()][TSDataType.INT32.ordinal()] =
+        (sourceDataType, targetDataType, sourceValue) ->
+            convertTimestampToInt32((long) sourceValue);
+    CONVERTER[TSDataType.TIMESTAMP.ordinal()][TSDataType.INT64.ordinal()] =
+        (sourceDataType, targetDataType, sourceValue) ->
+            convertTimestampToInt64((long) sourceValue);
+    CONVERTER[TSDataType.TIMESTAMP.ordinal()][TSDataType.FLOAT.ordinal()] =
+        (sourceDataType, targetDataType, sourceValue) ->
+            convertTimestampToFloat((long) sourceValue);
+    CONVERTER[TSDataType.TIMESTAMP.ordinal()][TSDataType.DOUBLE.ordinal()] =
+        (sourceDataType, targetDataType, sourceValue) ->
+            convertTimestampToDouble((long) sourceValue);
+    CONVERTER[TSDataType.TIMESTAMP.ordinal()][TSDataType.TEXT.ordinal()] =
+        (sourceDataType, targetDataType, sourceValue) -> 
convertTimestampToText((long) sourceValue);
+    CONVERTER[TSDataType.TIMESTAMP.ordinal()][TSDataType.TIMESTAMP.ordinal()] =
+        DO_NOTHING_CONVERTER;
+    CONVERTER[TSDataType.TIMESTAMP.ordinal()][TSDataType.DATE.ordinal()] =
+        (sourceDataType, targetDataType, sourceValue) -> 
convertTimestampToDate((long) sourceValue);
+    CONVERTER[TSDataType.TIMESTAMP.ordinal()][TSDataType.BLOB.ordinal()] =
+        (sourceDataType, targetDataType, sourceValue) -> 
convertTimestampToBlob((long) sourceValue);
+    CONVERTER[TSDataType.TIMESTAMP.ordinal()][TSDataType.STRING.ordinal()] =
+        (sourceDataType, targetDataType, sourceValue) ->
+            convertTimestampToString((long) sourceValue);
+
+    // DATE
+    CONVERTER[TSDataType.DATE.ordinal()][TSDataType.BOOLEAN.ordinal()] =
+        (sourceDataType, targetDataType, sourceValue) -> 
convertDateToBoolean((int) sourceValue);
+    CONVERTER[TSDataType.DATE.ordinal()][TSDataType.INT32.ordinal()] =
+        (sourceDataType, targetDataType, sourceValue) -> 
convertDateToInt32((int) sourceValue);
+    CONVERTER[TSDataType.DATE.ordinal()][TSDataType.INT64.ordinal()] =
+        (sourceDataType, targetDataType, sourceValue) -> 
convertDateToInt64((int) sourceValue);
+    CONVERTER[TSDataType.DATE.ordinal()][TSDataType.FLOAT.ordinal()] =
+        (sourceDataType, targetDataType, sourceValue) -> 
convertDateToFloat((int) sourceValue);
+    CONVERTER[TSDataType.DATE.ordinal()][TSDataType.DOUBLE.ordinal()] =
+        (sourceDataType, targetDataType, sourceValue) -> 
convertDateToDouble((int) sourceValue);
+    CONVERTER[TSDataType.DATE.ordinal()][TSDataType.TEXT.ordinal()] =
+        (sourceDataType, targetDataType, sourceValue) -> 
convertDateToText((int) sourceValue);
+    CONVERTER[TSDataType.DATE.ordinal()][TSDataType.TIMESTAMP.ordinal()] =
+        (sourceDataType, targetDataType, sourceValue) -> 
convertDateToTimestamp((int) sourceValue);
+    CONVERTER[TSDataType.DATE.ordinal()][TSDataType.DATE.ordinal()] = 
DO_NOTHING_CONVERTER;
+    CONVERTER[TSDataType.DATE.ordinal()][TSDataType.BLOB.ordinal()] =
+        (sourceDataType, targetDataType, sourceValue) -> 
convertDateToBlob((int) sourceValue);
+    CONVERTER[TSDataType.DATE.ordinal()][TSDataType.STRING.ordinal()] =
+        (sourceDataType, targetDataType, sourceValue) -> 
convertDateToString((int) sourceValue);
+
+    // BLOB
+    CONVERTER[TSDataType.BLOB.ordinal()][TSDataType.BOOLEAN.ordinal()] =
+        (sourceDataType, targetDataType, sourceValue) -> 
convertBlobToBoolean((Binary) sourceValue);
+    CONVERTER[TSDataType.BLOB.ordinal()][TSDataType.INT32.ordinal()] =
+        (sourceDataType, targetDataType, sourceValue) -> 
convertBlobToInt32((Binary) sourceValue);
+    CONVERTER[TSDataType.BLOB.ordinal()][TSDataType.INT64.ordinal()] =
+        (sourceDataType, targetDataType, sourceValue) -> 
convertBlobToInt64((Binary) sourceValue);
+    CONVERTER[TSDataType.BLOB.ordinal()][TSDataType.FLOAT.ordinal()] =
+        (sourceDataType, targetDataType, sourceValue) -> 
convertBlobToFloat((Binary) sourceValue);
+    CONVERTER[TSDataType.BLOB.ordinal()][TSDataType.DOUBLE.ordinal()] =
+        (sourceDataType, targetDataType, sourceValue) -> 
convertBlobToDouble((Binary) sourceValue);
+    CONVERTER[TSDataType.BLOB.ordinal()][TSDataType.TEXT.ordinal()] =
+        (sourceDataType, targetDataType, sourceValue) -> 
convertBlobToText((Binary) sourceValue);
+    CONVERTER[TSDataType.BLOB.ordinal()][TSDataType.TIMESTAMP.ordinal()] =
+        (sourceDataType, targetDataType, sourceValue) ->
+            convertBlobToTimestamp((Binary) sourceValue);
+    CONVERTER[TSDataType.BLOB.ordinal()][TSDataType.DATE.ordinal()] =
+        (sourceDataType, targetDataType, sourceValue) -> 
convertBlobToDate((Binary) sourceValue);
+    CONVERTER[TSDataType.BLOB.ordinal()][TSDataType.BLOB.ordinal()] = 
DO_NOTHING_CONVERTER;
+    CONVERTER[TSDataType.BLOB.ordinal()][TSDataType.STRING.ordinal()] =
+        (sourceDataType, targetDataType, sourceValue) -> 
convertBlobToString((Binary) sourceValue);
+
+    // STRING
+    CONVERTER[TSDataType.STRING.ordinal()][TSDataType.BOOLEAN.ordinal()] =
+        (sourceDataType, targetDataType, sourceValue) ->
+            convertStringToBoolean((Binary) sourceValue);
+    CONVERTER[TSDataType.STRING.ordinal()][TSDataType.INT32.ordinal()] =
+        (sourceDataType, targetDataType, sourceValue) -> 
convertStringToInt32((Binary) sourceValue);
+    CONVERTER[TSDataType.STRING.ordinal()][TSDataType.INT64.ordinal()] =
+        (sourceDataType, targetDataType, sourceValue) -> 
convertStringToInt64((Binary) sourceValue);
+    CONVERTER[TSDataType.STRING.ordinal()][TSDataType.FLOAT.ordinal()] =
+        (sourceDataType, targetDataType, sourceValue) -> 
convertStringToFloat((Binary) sourceValue);
+    CONVERTER[TSDataType.STRING.ordinal()][TSDataType.DOUBLE.ordinal()] =
+        (sourceDataType, targetDataType, sourceValue) ->
+            convertStringToDouble((Binary) sourceValue);
+    CONVERTER[TSDataType.STRING.ordinal()][TSDataType.TEXT.ordinal()] =
+        (sourceDataType, targetDataType, sourceValue) -> 
convertStringToText((Binary) sourceValue);
+    CONVERTER[TSDataType.STRING.ordinal()][TSDataType.TIMESTAMP.ordinal()] =
+        (sourceDataType, targetDataType, sourceValue) ->
+            convertStringToTimestamp((Binary) sourceValue);
+    CONVERTER[TSDataType.STRING.ordinal()][TSDataType.DATE.ordinal()] =
+        (sourceDataType, targetDataType, sourceValue) -> 
convertStringToDate((Binary) sourceValue);
+    CONVERTER[TSDataType.STRING.ordinal()][TSDataType.BLOB.ordinal()] =
+        (sourceDataType, targetDataType, sourceValue) -> 
convertStringToBlob((Binary) sourceValue);
+    CONVERTER[TSDataType.STRING.ordinal()][TSDataType.STRING.ordinal()] = 
DO_NOTHING_CONVERTER;
+  }
+
+  public static Object convert(
+      final TSDataType sourceDataType, final TSDataType targetDataType, final 
Object sourceValue) {
+    return sourceValue == null
+        ? null
+        : 
CONVERTER[sourceDataType.ordinal()][targetDataType.ordinal()].convert(
+            sourceDataType, targetDataType, sourceValue);
+  }
+
+  ////////////// BOOLEAN //////////////
+
+  private static final Binary BINARY_TRUE = 
parseString(Boolean.TRUE.toString());
+  private static final Binary BINARY_FALSE = 
parseString(Boolean.FALSE.toString());
+
+  public static int convertBooleanToInt32(final boolean value) {
+    return value ? 1 : 0;
+  }
+
+  public static long convertBooleanToInt64(final boolean value) {
+    return value ? 1L : 0L;
+  }
+
+  public static float convertBooleanToFloat(final boolean value) {
+    return value ? 1.0f : 0.0f;
+  }
+
+  public static double convertBooleanToDouble(final boolean value) {
+    return value ? 1.0 : 0.0;
+  }
+
+  public static Binary convertBooleanToText(final boolean value) {
+    return value ? BINARY_TRUE : BINARY_FALSE;
+  }
+
+  public static long convertBooleanToTimestamp(final boolean value) {
+    return value ? 1L : 0L;
+  }
+
+  public static int convertBooleanToDate(final boolean value) {
+    return value ? 1 : 0;
+  }
+
+  public static Binary convertBooleanToBlob(final boolean value) {
+    return value ? BINARY_TRUE : BINARY_FALSE;
+  }
+
+  public static Binary convertBooleanToString(final boolean value) {
+    return value ? BINARY_TRUE : BINARY_FALSE;
+  }
+
+  ///////////// INT32 //////////////
+
+  public static boolean convertInt32ToBoolean(final int value) {
+    return value != 0;
+  }
+
+  public static long convertInt32ToInt64(final int value) {
+    return value;
+  }
+
+  public static float convertInt32ToFloat(final int value) {
+    return value;
+  }
+
+  public static double convertInt32ToDouble(final int value) {
+    return value;
+  }
+
+  public static Binary convertInt32ToText(final int value) {
+    return parseText(Integer.toString(value));
+  }
+
+  public static long convertInt32ToTimestamp(final int value) {
+    return value;
+  }
+
+  public static int convertInt32ToDate(final int value) {
+    return value;
+  }
+
+  public static Binary convertInt32ToBlob(final int value) {
+    return parseBlob(Integer.toString(value));
+  }
+
+  public static Binary convertInt32ToString(final int value) {
+    return parseString(Integer.toString(value));
+  }
+
+  ///////////// INT64 //////////////
+
+  public static boolean convertInt64ToBoolean(final long value) {
+    return value != 0;
+  }
+
+  public static int convertInt64ToInt32(final long value) {
+    return (int) value;
+  }
+
+  public static float convertInt64ToFloat(final long value) {
+    return value;
+  }
+
+  public static double convertInt64ToDouble(final long value) {
+    return value;
+  }
+
+  public static Binary convertInt64ToText(final long value) {
+    return parseText(Long.toString(value));
+  }
+
+  public static long convertInt64ToTimestamp(final long value) {
+    return value;
+  }
+
+  public static int convertInt64ToDate(final long value) {
+    return (int) value;
+  }
+
+  public static Binary convertInt64ToBlob(final long value) {
+    return parseBlob(Long.toString(value));
+  }
+
+  public static Binary convertInt64ToString(final long value) {
+    return parseString(Long.toString(value));
+  }
+
+  ///////////// FLOAT //////////////
+
+  public static boolean convertFloatToBoolean(final float value) {
+    return value != 0;
+  }
+
+  public static int convertFloatToInt32(final float value) {
+    return (int) value;
+  }
+
+  public static long convertFloatToInt64(final float value) {
+    return (long) value;
+  }
+
+  public static double convertFloatToDouble(final float value) {
+    return value;
+  }
+
+  public static Binary convertFloatToText(final float value) {
+    return parseText(Float.toString(value));
+  }
+
+  public static long convertFloatToTimestamp(final float value) {
+    return (long) value;
+  }
+
+  public static int convertFloatToDate(final float value) {
+    return (int) value;
+  }
+
+  public static Binary convertFloatToBlob(final float value) {
+    return parseBlob(Float.toString(value));
+  }
+
+  public static Binary convertFloatToString(final float value) {
+    return parseString(Float.toString(value));
+  }
+
+  ///////////// DOUBLE //////////////
+
+  public static boolean convertDoubleToBoolean(final double value) {
+    return value != 0;
+  }
+
+  public static int convertDoubleToInt32(final double value) {
+    return (int) value;
+  }
+
+  public static long convertDoubleToInt64(final double value) {
+    return (long) value;
+  }
+
+  public static float convertDoubleToFloat(final double value) {
+    return (float) value;
+  }
+
+  public static Binary convertDoubleToText(final double value) {
+    return parseText(Double.toString(value));
+  }
+
+  public static long convertDoubleToTimestamp(final double value) {
+    return (long) value;
+  }
+
+  public static int convertDoubleToDate(final double value) {
+    return (int) value;
+  }
+
+  public static Binary convertDoubleToBlob(final double value) {
+    return parseBlob(Double.toString(value));
+  }
+
+  public static Binary convertDoubleToString(final double value) {
+    return parseString(Double.toString(value));
+  }
+
+  ///////////// TEXT //////////////
+
+  public static boolean convertTextToBoolean(final Binary value) {
+    return Boolean.parseBoolean(value.toString());
+  }
+
+  public static int convertTextToInt32(final Binary value) {
+    return parseInteger(value.toString());
+  }
+
+  public static long convertTextToInt64(final Binary value) {
+    return parseLong(value.toString());
+  }
+
+  public static float convertTextToFloat(final Binary value) {
+    return parseFloat(value.toString());
+  }
+
+  public static double convertTextToDouble(final Binary value) {
+    return parseDouble(value.toString());
+  }
+
+  public static long convertTextToTimestamp(final Binary value) {
+    return parseTimestamp(value.toString());
+  }
+
+  public static int convertTextToDate(final Binary value) {
+    return parseDate(value.toString());
+  }
+
+  public static Binary convertTextToBlob(final Binary value) {
+    return parseBlob(value.toString());
+  }
+
+  public static Binary convertTextToString(final Binary value) {
+    return value;
+  }
+
+  ///////////// TIMESTAMP //////////////
+
+  public static boolean convertTimestampToBoolean(final long value) {
+    return value != 0;
+  }
+
+  public static int convertTimestampToInt32(final long value) {
+    return (int) value;
+  }
+
+  public static long convertTimestampToInt64(final long value) {
+    return value;
+  }
+
+  public static float convertTimestampToFloat(final long value) {
+    return value;
+  }
+
+  public static double convertTimestampToDouble(final long value) {
+    return value;
+  }
+
+  public static Binary convertTimestampToText(final long value) {
+    return parseText(Long.toString(value));
+  }
+
+  public static int convertTimestampToDate(final long value) {
+    return (int) value;
+  }
+
+  public static Binary convertTimestampToBlob(final long value) {
+    return parseBlob(Long.toString(value));
+  }
+
+  public static Binary convertTimestampToString(final long value) {
+    return parseString(Long.toString(value));
+  }
+
+  ///////////// DATE //////////////
+
+  public static boolean convertDateToBoolean(final int value) {
+    return value != 0;
+  }
+
+  public static int convertDateToInt32(final int value) {
+    return value;
+  }
+
+  public static long convertDateToInt64(final int value) {
+    return value;
+  }
+
+  public static float convertDateToFloat(final int value) {
+    return value;
+  }
+
+  public static double convertDateToDouble(final int value) {
+    return value;
+  }
+
+  public static Binary convertDateToText(final int value) {
+    return parseText(Integer.toString(value));
+  }
+
+  public static long convertDateToTimestamp(final int value) {
+    return value;
+  }
+
+  public static Binary convertDateToBlob(final int value) {
+    return parseBlob(Integer.toString(value));
+  }
+
+  public static Binary convertDateToString(final int value) {
+    return parseString(Integer.toString(value));
+  }
+
+  ///////////// BLOB //////////////
+
+  public static boolean convertBlobToBoolean(final Binary value) {
+    return Boolean.parseBoolean(value.toString());
+  }
+
+  public static int convertBlobToInt32(final Binary value) {
+    return parseInteger(value.toString());
+  }
+
+  public static long convertBlobToInt64(final Binary value) {
+    return parseLong(value.toString());
+  }
+
+  public static float convertBlobToFloat(final Binary value) {
+    return parseFloat(value.toString());
+  }
+
+  public static double convertBlobToDouble(final Binary value) {
+    return parseDouble(value.toString());
+  }
+
+  public static long convertBlobToTimestamp(final Binary value) {
+    return parseTimestamp(value.toString());
+  }
+
+  public static int convertBlobToDate(final Binary value) {
+    return parseDate(value.toString());
+  }
+
+  public static Binary convertBlobToString(final Binary value) {
+    return value;
+  }
+
+  public static Binary convertBlobToText(final Binary value) {
+    return value;
+  }
+
+  ///////////// STRING //////////////
+
+  public static boolean convertStringToBoolean(final Binary value) {
+    return Boolean.parseBoolean(value.toString());
+  }
+
+  public static int convertStringToInt32(final Binary value) {
+    return parseInteger(value.toString());
+  }
+
+  public static long convertStringToInt64(final Binary value) {
+    return parseLong(value.toString());
+  }
+
+  public static float convertStringToFloat(final Binary value) {
+    return parseFloat(value.toString());
+  }
+
+  public static double convertStringToDouble(final Binary value) {
+    return parseDouble(value.toString());
+  }
+
+  public static long convertStringToTimestamp(final Binary value) {
+    return parseTimestamp(value.toString());
+  }
+
+  public static int convertStringToDate(final Binary value) {
+    return parseDate(value.toString());
+  }
+
+  public static Binary convertStringToBlob(final Binary value) {
+    return parseBlob(value.toString());
+  }
+
+  public static Binary convertStringToText(final Binary value) {
+    return value;
+  }
+
+  ///////////// UTILS //////////////
+
+  public static Object parse(final String value, final TSDataType dataType) {
+    if (value == null) {
+      return null;
+    }
+    switch (dataType) {
+      case BOOLEAN:
+        return Boolean.parseBoolean(value);
+      case INT32:
+        return parseInteger(value);
+      case INT64:
+        return parseLong(value);
+      case FLOAT:
+        return parseFloat(value);
+      case DOUBLE:
+        return parseDouble(value);
+      case TEXT:
+        return parseText(value);
+      case TIMESTAMP:
+        return parseTimestamp(value);
+      case DATE:
+        return parseDate(value);
+      case BLOB:
+        return parseBlob(value);
+      case STRING:
+        return parseString(value);
+      default:
+        throw new UnsupportedOperationException("Unsupported data type: " + 
dataType);
+    }
+  }
+
+  private static Binary parseBlob(final String value) {
+    return new Binary(value, TSFileConfig.STRING_CHARSET);
+  }
+
+  private static int parseInteger(final String value) {
+    try {
+      return Integer.parseInt(value);
+    } catch (Exception e) {
+      return 0;
+    }
+  }
+
+  private static long parseLong(final String value) {
+    try {
+      return Long.parseLong(value);
+    } catch (Exception e) {
+      return 0L;
+    }
+  }
+
+  private static float parseFloat(final String value) {
+    try {
+      return Float.parseFloat(value);
+    } catch (Exception e) {
+      return 0.0f;
+    }
+  }
+
+  private static double parseDouble(final String value) {
+    try {
+      return Double.parseDouble(value);
+    } catch (Exception e) {
+      return 0.0d;
+    }
+  }
+
+  private static long parseTimestamp(final String value) {
+    if (value == null || value.isEmpty()) {
+      return 0L;
+    }
+    try {
+      return TypeInferenceUtils.isNumber(value)
+          ? Long.parseLong(value)
+          : DateTimeUtils.parseDateTimeExpressionToLong(
+              StringUtils.trim(value), ZoneId.systemDefault());
+    } catch (final Exception e) {
+      return 0L;
+    }
+  }
+
+  private static int parseDate(final String value) {
+    if (value == null || value.isEmpty()) {
+      return 0;
+    }
+    try {
+      return TypeInferenceUtils.isNumber(value)
+          ? Integer.parseInt(value)
+          : DateTimeUtils.parseDateExpressionToInt(StringUtils.trim(value));
+    } catch (final Exception e) {
+      return 0;
+    }
+  }
+
+  private static Binary parseString(final String value) {
+    return new Binary(value, TSFileConfig.STRING_CHARSET);
+  }
+
+  private static Binary parseText(final String value) {
+    return new Binary(value, TSFileConfig.STRING_CHARSET);
+  }
+
+  private ValueConverter() {
+    // forbidden to construct
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/transform/statement/PipeConvertedInsertRowStatement.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/transform/statement/PipeConvertedInsertRowStatement.java
new file mode 100644
index 00000000000..3df76b79ce4
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/transform/statement/PipeConvertedInsertRowStatement.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.receiver.transform.statement;
+
+import org.apache.iotdb.commons.conf.IoTDBConstant;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.exception.metadata.PathNotExistException;
+import org.apache.iotdb.db.exception.query.QueryProcessException;
+import org.apache.iotdb.db.pipe.receiver.transform.converter.ValueConverter;
+import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertRowStatement;
+
+import org.apache.tsfile.enums.TSDataType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.ZoneId;
+
+public class PipeConvertedInsertRowStatement extends InsertRowStatement {
+
+  private static final Logger LOGGER =
+      LoggerFactory.getLogger(PipeConvertedInsertRowStatement.class);
+
+  public PipeConvertedInsertRowStatement(final InsertRowStatement 
insertRowStatement) {
+    super();
+    // Statement
+    isDebug = insertRowStatement.isDebug();
+    // InsertBaseStatement
+    devicePath = insertRowStatement.getDevicePath();
+    isAligned = insertRowStatement.isAligned();
+    measurementSchemas = insertRowStatement.getMeasurementSchemas();
+    measurements = insertRowStatement.getMeasurements();
+    dataTypes = insertRowStatement.getDataTypes();
+    // InsertRowStatement
+    time = insertRowStatement.getTime();
+    values = insertRowStatement.getValues();
+    isNeedInferType = insertRowStatement.isNeedInferType();
+  }
+
+  @Override
+  protected boolean checkAndCastDataType(int columnIndex, TSDataType dataType) 
{
+    LOGGER.info(
+        "Pipe: Inserting row to {}.{}. Casting type from {} to {}.",
+        devicePath,
+        measurements[columnIndex],
+        dataTypes[columnIndex],
+        dataType);
+    values[columnIndex] =
+        ValueConverter.convert(dataTypes[columnIndex], dataType, 
values[columnIndex]);
+    dataTypes[columnIndex] = dataType;
+    return true;
+  }
+
+  @Override
+  public void transferType(ZoneId zoneId) throws QueryProcessException {
+    for (int i = 0; i < measurementSchemas.length; i++) {
+      // null when time series doesn't exist
+      if (measurementSchemas[i] == null) {
+        if 
(!IoTDBDescriptor.getInstance().getConfig().isEnablePartialInsert()) {
+          throw new QueryProcessException(
+              new PathNotExistException(
+                  devicePath.getFullPath() + IoTDBConstant.PATH_SEPARATOR + 
measurements[i]));
+        } else {
+          markFailedMeasurement(
+              i,
+              new QueryProcessException(
+                  new PathNotExistException(
+                      devicePath.getFullPath() + IoTDBConstant.PATH_SEPARATOR 
+ measurements[i])));
+        }
+        continue;
+      }
+
+      // parse string value to specific type
+      dataTypes[i] = measurementSchemas[i].getType();
+      try {
+        values[i] = ValueConverter.parse(values[i].toString(), dataTypes[i]);
+      } catch (Exception e) {
+        LOGGER.warn(
+            "data type of {}.{} is not consistent, "
+                + "registered type {}, inserting timestamp {}, value {}",
+            devicePath,
+            measurements[i],
+            dataTypes[i],
+            time,
+            values[i]);
+        if 
(!IoTDBDescriptor.getInstance().getConfig().isEnablePartialInsert()) {
+          throw e;
+        } else {
+          markFailedMeasurement(i, e);
+        }
+      }
+    }
+
+    isNeedInferType = false;
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/transform/statement/PipeConvertedInsertTabletStatement.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/transform/statement/PipeConvertedInsertTabletStatement.java
new file mode 100644
index 00000000000..2481034cbae
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/transform/statement/PipeConvertedInsertTabletStatement.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.receiver.transform.statement;
+
+import org.apache.iotdb.db.pipe.receiver.transform.converter.ArrayConverter;
+import 
org.apache.iotdb.db.queryengine.plan.statement.crud.InsertTabletStatement;
+
+import org.apache.tsfile.enums.TSDataType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class PipeConvertedInsertTabletStatement extends InsertTabletStatement {
+
+  private static final Logger LOGGER =
+      LoggerFactory.getLogger(PipeConvertedInsertTabletStatement.class);
+
+  public PipeConvertedInsertTabletStatement(final InsertTabletStatement 
insertTabletStatement) {
+    super();
+    // Statement
+    isDebug = insertTabletStatement.isDebug();
+    // InsertBaseStatement
+    devicePath = insertTabletStatement.getDevicePath();
+    isAligned = insertTabletStatement.isAligned();
+    measurementSchemas = insertTabletStatement.getMeasurementSchemas();
+    measurements = insertTabletStatement.getMeasurements();
+    dataTypes = insertTabletStatement.getDataTypes();
+    // InsertTabletStatement
+    times = insertTabletStatement.getTimes();
+    bitMaps = insertTabletStatement.getBitMaps();
+    columns = insertTabletStatement.getColumns();
+    rowCount = insertTabletStatement.getRowCount();
+  }
+
+  @Override
+  protected boolean checkAndCastDataType(int columnIndex, TSDataType dataType) 
{
+    LOGGER.info(
+        "Pipe: Inserting tablet to {}.{}. Casting type from {} to {}.",
+        devicePath,
+        measurements[columnIndex],
+        dataTypes[columnIndex],
+        dataType);
+    columns[columnIndex] =
+        ArrayConverter.convert(dataTypes[columnIndex], dataType, 
columns[columnIndex]);
+    dataTypes[columnIndex] = dataType;
+    return true;
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeStatementDataTypeConvertExecutionVisitor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeStatementDataTypeConvertExecutionVisitor.java
new file mode 100644
index 00000000000..69f456552f7
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeStatementDataTypeConvertExecutionVisitor.java
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.receiver.visitor;
+
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.db.exception.metadata.DataTypeMismatchException;
+import 
org.apache.iotdb.db.pipe.receiver.transform.statement.PipeConvertedInsertRowStatement;
+import 
org.apache.iotdb.db.pipe.receiver.transform.statement.PipeConvertedInsertTabletStatement;
+import org.apache.iotdb.db.queryengine.plan.statement.Statement;
+import org.apache.iotdb.db.queryengine.plan.statement.StatementNode;
+import org.apache.iotdb.db.queryengine.plan.statement.StatementVisitor;
+import 
org.apache.iotdb.db.queryengine.plan.statement.crud.InsertMultiTabletsStatement;
+import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertRowStatement;
+import 
org.apache.iotdb.db.queryengine.plan.statement.crud.InsertRowsOfOneDeviceStatement;
+import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertRowsStatement;
+import 
org.apache.iotdb.db.queryengine.plan.statement.crud.InsertTabletStatement;
+import org.apache.iotdb.db.queryengine.plan.statement.crud.LoadTsFileStatement;
+import org.apache.iotdb.rpc.TSStatusCode;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Optional;
+import java.util.stream.Collectors;
+
+/**
+ * This visitor transforms the data type of the statement when the statement 
is executed and an
+ * exception occurs. The transformed statement (if any) is returned and will 
be executed again.
+ */
+public class PipeStatementDataTypeConvertExecutionVisitor
+    extends StatementVisitor<Optional<TSStatus>, TSStatus> {
+
+  private static final Logger LOGGER =
+      
LoggerFactory.getLogger(PipeStatementDataTypeConvertExecutionVisitor.class);
+
+  @FunctionalInterface
+  public interface StatementExecutor {
+    TSStatus execute(final Statement statement);
+  }
+
+  private final StatementExecutor statementExecutor;
+
+  public PipeStatementDataTypeConvertExecutionVisitor(final StatementExecutor 
statementExecutor) {
+    this.statementExecutor = statementExecutor;
+  }
+
+  private Optional<TSStatus> tryExecute(final Statement statement) {
+    try {
+      return Optional.of(statementExecutor.execute(statement));
+    } catch (final Exception e) {
+      LOGGER.warn("Failed to execute statement after data type conversion.", 
e);
+      return Optional.empty();
+    }
+  }
+
+  @Override
+  public Optional<TSStatus> visitNode(final StatementNode statementNode, final 
TSStatus status) {
+    return Optional.empty();
+  }
+
+  @Override
+  public Optional<TSStatus> visitLoadFile(
+      final LoadTsFileStatement loadTsFileStatement, final TSStatus status) {
+    // TODO: judge if the exception is caused by data type mismatch
+    // TODO: convert the data type of the statement
+    return visitStatement(loadTsFileStatement, status);
+  }
+
+  @Override
+  public Optional<TSStatus> visitInsertRow(
+      final InsertRowStatement insertRowStatement, final TSStatus status) {
+    return status.getCode() == TSStatusCode.METADATA_ERROR.getStatusCode()
+            && status.getMessage() != null
+            && 
status.getMessage().contains(DataTypeMismatchException.REGISTERED_TYPE_STRING)
+        ? tryExecute(new PipeConvertedInsertRowStatement(insertRowStatement))
+        : Optional.empty();
+  }
+
+  @Override
+  public Optional<TSStatus> visitInsertRows(
+      final InsertRowsStatement insertRowsStatement, final TSStatus status) {
+    if (!((status.getCode() == TSStatusCode.METADATA_ERROR.getStatusCode()
+            || status.getCode() == TSStatusCode.MULTIPLE_ERROR.getStatusCode())
+        && 
status.toString().contains(DataTypeMismatchException.REGISTERED_TYPE_STRING))) {
+      return Optional.empty();
+    }
+
+    if (insertRowsStatement.getInsertRowStatementList() == null
+        || insertRowsStatement.getInsertRowStatementList().isEmpty()) {
+      return Optional.empty();
+    }
+
+    final InsertRowsStatement convertedInsertRowsStatement = new 
InsertRowsStatement();
+    convertedInsertRowsStatement.setInsertRowStatementList(
+        insertRowsStatement.getInsertRowStatementList().stream()
+            .map(PipeConvertedInsertRowStatement::new)
+            .collect(Collectors.toList()));
+    return tryExecute(convertedInsertRowsStatement);
+  }
+
+  @Override
+  public Optional<TSStatus> visitInsertRowsOfOneDevice(
+      final InsertRowsOfOneDeviceStatement insertRowsOfOneDeviceStatement, 
final TSStatus status) {
+    if (!((status.getCode() == TSStatusCode.METADATA_ERROR.getStatusCode()
+            || status.getCode() == TSStatusCode.MULTIPLE_ERROR.getStatusCode())
+        && 
status.toString().contains(DataTypeMismatchException.REGISTERED_TYPE_STRING))) {
+      return Optional.empty();
+    }
+
+    if (insertRowsOfOneDeviceStatement.getInsertRowStatementList() == null
+        || 
insertRowsOfOneDeviceStatement.getInsertRowStatementList().isEmpty()) {
+      return Optional.empty();
+    }
+
+    final InsertRowsOfOneDeviceStatement 
convertedInsertRowsOfOneDeviceStatement =
+        new InsertRowsOfOneDeviceStatement();
+    convertedInsertRowsOfOneDeviceStatement.setInsertRowStatementList(
+        insertRowsOfOneDeviceStatement.getInsertRowStatementList().stream()
+            .map(PipeConvertedInsertRowStatement::new)
+            .collect(Collectors.toList()));
+    return tryExecute(convertedInsertRowsOfOneDeviceStatement);
+  }
+
+  @Override
+  public Optional<TSStatus> visitInsertTablet(
+      final InsertTabletStatement insertTabletStatement, final TSStatus 
status) {
+    return status.getCode() == TSStatusCode.METADATA_ERROR.getStatusCode()
+            && status.getMessage() != null
+            && 
status.getMessage().contains(DataTypeMismatchException.REGISTERED_TYPE_STRING)
+        ? tryExecute(new 
PipeConvertedInsertTabletStatement(insertTabletStatement))
+        : Optional.empty();
+  }
+
+  @Override
+  public Optional<TSStatus> visitInsertMultiTablets(
+      final InsertMultiTabletsStatement insertMultiTabletsStatement, final 
TSStatus status) {
+    if (!((status.getCode() == TSStatusCode.METADATA_ERROR.getStatusCode()
+            || status.getCode() == TSStatusCode.MULTIPLE_ERROR.getStatusCode())
+        && 
status.toString().contains(DataTypeMismatchException.REGISTERED_TYPE_STRING))) {
+      return Optional.empty();
+    }
+
+    if (insertMultiTabletsStatement.getInsertTabletStatementList() == null
+        || 
insertMultiTabletsStatement.getInsertTabletStatementList().isEmpty()) {
+      return Optional.empty();
+    }
+
+    final InsertMultiTabletsStatement convertedInsertMultiTabletsStatement =
+        new InsertMultiTabletsStatement();
+    convertedInsertMultiTabletsStatement.setInsertTabletStatementList(
+        insertMultiTabletsStatement.getInsertTabletStatementList().stream()
+            .map(PipeConvertedInsertTabletStatement::new)
+            .collect(Collectors.toList()));
+    return tryExecute(convertedInsertMultiTabletsStatement);
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertBaseStatement.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertBaseStatement.java
index bc31e71ec80..88e5b7735cf 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertBaseStatement.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertBaseStatement.java
@@ -71,7 +71,7 @@ public abstract class InsertBaseStatement extends Statement {
   // region params used by analyzing logical views.
 
   /** This param records the logical view schema appeared in this statement. */
-  List<LogicalViewSchema> logicalViewSchemaList;
+  protected List<LogicalViewSchema> logicalViewSchemaList;
 
   /**
    * This param records the index of the location where the source of this 
view should be placed.
@@ -79,13 +79,13 @@ public abstract class InsertBaseStatement extends Statement 
{
    * <p>For example, indexListOfLogicalViewPaths[alpha] = beta means source of
    * logicalViewSchemaList[alpha] should be filled into 
measurementSchemas[beta].
    */
-  List<Integer> indexOfSourcePathsOfLogicalViews;
+  protected List<Integer> indexOfSourcePathsOfLogicalViews;
 
   /** it is the end of last range, the beginning of current range. */
-  int recordedBeginOfLogicalViewSchemaList = 0;
+  protected int recordedBeginOfLogicalViewSchemaList = 0;
 
   /** it is the end of current range. */
-  int recordedEndOfLogicalViewSchemaList = 0;
+  protected int recordedEndOfLogicalViewSchemaList = 0;
 
   // endregion
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertRowStatement.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertRowStatement.java
index a5b864e30f4..bb67086456a 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertRowStatement.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertRowStatement.java
@@ -59,18 +59,18 @@ public class InsertRowStatement extends InsertBaseStatement 
implements ISchemaVa
 
   private static final Logger LOGGER = 
LoggerFactory.getLogger(InsertRowStatement.class);
 
-  private static final byte TYPE_RAW_STRING = -1;
-  private static final byte TYPE_NULL = -2;
+  protected static final byte TYPE_RAW_STRING = -1;
+  protected static final byte TYPE_NULL = -2;
 
-  private long time;
-  private Object[] values;
-  private boolean isNeedInferType = false;
+  protected long time;
+  protected Object[] values;
+  protected boolean isNeedInferType = false;
 
   /**
    * This param record whether the source of logical view is aligned. Only 
used when there are
    * views.
    */
-  private boolean[] measurementIsAligned;
+  protected boolean[] measurementIsAligned;
 
   public InsertRowStatement() {
     super();
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertTabletStatement.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertTabletStatement.java
index b38997ef9c0..329fe77203b 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertTabletStatement.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertTabletStatement.java
@@ -56,17 +56,17 @@ public class InsertTabletStatement extends 
InsertBaseStatement implements ISchem
 
   private static final String DATATYPE_UNSUPPORTED = "Data type %s is not 
supported.";
 
-  private long[] times; // times should be sorted. It is done in the session 
API.
-  private BitMap[] bitMaps;
-  private Object[] columns;
+  protected long[] times; // times should be sorted. It is done in the session 
API.
+  protected BitMap[] bitMaps;
+  protected Object[] columns;
 
-  private int rowCount = 0;
+  protected int rowCount = 0;
 
   /**
    * This param record whether the source of logical view is aligned. Only 
used when there are
    * views.
    */
-  private boolean[] measurementIsAligned;
+  protected boolean[] measurementIsAligned;
 
   public InsertTabletStatement() {
     super();
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/TypeInferenceUtils.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/TypeInferenceUtils.java
index e2772cd08e4..b7ce69136fe 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/TypeInferenceUtils.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/TypeInferenceUtils.java
@@ -49,7 +49,7 @@ public class TypeInferenceUtils {
     return s.length() >= 3 && s.startsWith("X'") && s.endsWith("'");
   }
 
-  static boolean isNumber(String s) {
+  public static boolean isNumber(String s) {
     if (s == null || s.equals("NaN")) {
       return false;
     }
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeConnectorConstant.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeConnectorConstant.java
index 62af58eece4..87ec0aaaeae 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeConnectorConstant.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeConnectorConstant.java
@@ -81,6 +81,13 @@ public class PipeConnectorConstant {
   public static final String SINK_IOTDB_PASSWORD_KEY = "sink.password";
   public static final String CONNECTOR_IOTDB_PASSWORD_DEFAULT_VALUE = "root";
 
+  public static final String 
CONNECTOR_EXCEPTION_DATA_CONVERT_ON_TYPE_MISMATCH_KEY =
+      "connector.exception.data.convert-on-type-mismatch";
+  public static final String SINK_EXCEPTION_DATA_CONVERT_ON_TYPE_MISMATCH_KEY =
+      "sink.exception.data.convert-on-type-mismatch";
+  public static final boolean 
CONNECTOR_EXCEPTION_DATA_CONVERT_ON_TYPE_MISMATCH_DEFAULT_VALUE =
+      true;
+
   public static final String CONNECTOR_EXCEPTION_CONFLICT_RESOLVE_STRATEGY_KEY 
=
       "connector.exception.conflict.resolve-strategy";
   public static final String SINK_EXCEPTION_CONFLICT_RESOLVE_STRATEGY_KEY =
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/client/IoTDBSyncClientManager.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/client/IoTDBSyncClientManager.java
index 667313e1976..57b9a4fede4 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/client/IoTDBSyncClientManager.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/client/IoTDBSyncClientManager.java
@@ -60,13 +60,16 @@ public abstract class IoTDBSyncClientManager extends 
IoTDBClientManager implemen
 
   private final LoadBalancer loadBalancer;
 
+  private final boolean shouldReceiverConvertOnTypeMismatch;
+
   protected IoTDBSyncClientManager(
       List<TEndPoint> endPoints,
       boolean useSSL,
       String trustStorePath,
       String trustStorePwd,
       boolean useLeaderCache,
-      String loadBalanceStrategy) {
+      String loadBalanceStrategy,
+      boolean shouldReceiverConvertOnTypeMismatch) {
     super(endPoints, useLeaderCache);
 
     this.useSSL = useSSL;
@@ -93,6 +96,8 @@ public abstract class IoTDBSyncClientManager extends 
IoTDBClientManager implemen
             loadBalanceStrategy);
         loadBalancer = new RoundRobinLoadBalancer();
     }
+
+    this.shouldReceiverConvertOnTypeMismatch = 
shouldReceiverConvertOnTypeMismatch;
   }
 
   public void checkClientStatusAndTryReconstructIfNecessary() {
@@ -169,6 +174,9 @@ public abstract class IoTDBSyncClientManager extends 
IoTDBClientManager implemen
           PipeTransferHandshakeConstant.HANDSHAKE_KEY_TIME_PRECISION,
           CommonDescriptor.getInstance().getConfig().getTimestampPrecision());
       params.put(PipeTransferHandshakeConstant.HANDSHAKE_KEY_CLUSTER_ID, 
getClusterId());
+      params.put(
+          PipeTransferHandshakeConstant.HANDSHAKE_KEY_CONVERT_ON_TYPE_MISMATCH,
+          Boolean.toString(shouldReceiverConvertOnTypeMismatch));
 
       // Try to handshake by PipeTransferHandshakeV2Req.
       TPipeTransferResp resp = 
client.pipeTransfer(buildHandshakeV2Req(params));
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/payload/thrift/common/PipeTransferHandshakeConstant.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/payload/thrift/common/PipeTransferHandshakeConstant.java
index 07c7bb46390..b6864706aaa 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/payload/thrift/common/PipeTransferHandshakeConstant.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/payload/thrift/common/PipeTransferHandshakeConstant.java
@@ -23,6 +23,7 @@ public class PipeTransferHandshakeConstant {
 
   public static final String HANDSHAKE_KEY_TIME_PRECISION = 
"timestampPrecision";
   public static final String HANDSHAKE_KEY_CLUSTER_ID = "clusterID";
+  public static final String HANDSHAKE_KEY_CONVERT_ON_TYPE_MISMATCH = 
"convertOnTypeMismatch";
 
   private PipeTransferHandshakeConstant() {
     // Utility class
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBConnector.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBConnector.java
index 6ac6f9432df..9dc08f93402 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBConnector.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBConnector.java
@@ -62,6 +62,8 @@ import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstan
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_EXCEPTION_CONFLICT_RESOLVE_STRATEGY_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_EXCEPTION_CONFLICT_RETRY_MAX_TIME_SECONDS_DEFAULT_VALUE;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_EXCEPTION_CONFLICT_RETRY_MAX_TIME_SECONDS_KEY;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_EXCEPTION_DATA_CONVERT_ON_TYPE_MISMATCH_DEFAULT_VALUE;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_EXCEPTION_DATA_CONVERT_ON_TYPE_MISMATCH_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_EXCEPTION_OTHERS_RECORD_IGNORED_DATA_DEFAULT_VALUE;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_EXCEPTION_OTHERS_RECORD_IGNORED_DATA_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_EXCEPTION_OTHERS_RETRY_MAX_TIME_SECONDS_DEFAULT_VALUE;
@@ -88,6 +90,7 @@ import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstan
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_EXCEPTION_CONFLICT_RECORD_IGNORED_DATA_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_EXCEPTION_CONFLICT_RESOLVE_STRATEGY_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_EXCEPTION_CONFLICT_RETRY_MAX_TIME_SECONDS_KEY;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_EXCEPTION_DATA_CONVERT_ON_TYPE_MISMATCH_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_EXCEPTION_OTHERS_RECORD_IGNORED_DATA_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_EXCEPTION_OTHERS_RETRY_MAX_TIME_SECONDS_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_FORMAT_KEY;
@@ -124,6 +127,8 @@ public abstract class IoTDBConnector implements 
PipeConnector {
   protected boolean isTabletBatchModeEnabled = true;
 
   protected PipeReceiverStatusHandler receiverStatusHandler;
+  protected boolean shouldReceiverConvertOnTypeMismatch =
+      CONNECTOR_EXCEPTION_DATA_CONVERT_ON_TYPE_MISMATCH_DEFAULT_VALUE;
 
   @Override
   public void validate(final PipeParameterValidator validator) throws 
Exception {
@@ -312,6 +317,16 @@ public abstract class IoTDBConnector implements 
PipeConnector {
                     CONNECTOR_EXCEPTION_OTHERS_RECORD_IGNORED_DATA_KEY,
                     SINK_EXCEPTION_OTHERS_RECORD_IGNORED_DATA_KEY),
                 CONNECTOR_EXCEPTION_OTHERS_RECORD_IGNORED_DATA_DEFAULT_VALUE));
+    shouldReceiverConvertOnTypeMismatch =
+        parameters.getBooleanOrDefault(
+            Arrays.asList(
+                CONNECTOR_EXCEPTION_DATA_CONVERT_ON_TYPE_MISMATCH_KEY,
+                SINK_EXCEPTION_DATA_CONVERT_ON_TYPE_MISMATCH_KEY),
+            CONNECTOR_EXCEPTION_DATA_CONVERT_ON_TYPE_MISMATCH_DEFAULT_VALUE);
+    LOGGER.info(
+        "IoTDBConnector {} = {}",
+        CONNECTOR_EXCEPTION_DATA_CONVERT_ON_TYPE_MISMATCH_KEY,
+        shouldReceiverConvertOnTypeMismatch);
   }
 
   protected LinkedHashSet<TEndPoint> parseNodeUrls(final PipeParameters 
parameters)
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBSslSyncConnector.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBSslSyncConnector.java
index be93018bf9c..271596cf549 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBSslSyncConnector.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBSslSyncConnector.java
@@ -116,7 +116,13 @@ public abstract class IoTDBSslSyncConnector extends 
IoTDBConnector {
 
     clientManager =
         constructClient(
-            nodeUrls, useSSL, trustStorePath, trustStorePwd, useLeaderCache, 
loadBalanceStrategy);
+            nodeUrls,
+            useSSL,
+            trustStorePath,
+            trustStorePwd,
+            useLeaderCache,
+            loadBalanceStrategy,
+            shouldReceiverConvertOnTypeMismatch);
   }
 
   protected abstract IoTDBSyncClientManager constructClient(
@@ -125,7 +131,8 @@ public abstract class IoTDBSslSyncConnector extends 
IoTDBConnector {
       String trustStorePath,
       String trustStorePwd,
       boolean useLeaderCache,
-      String loadBalanceStrategy);
+      String loadBalanceStrategy,
+      boolean shouldReceiverConvertOnTypeMismatch);
 
   @Override
   public void handshake() throws Exception {
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/IoTDBFileReceiver.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/IoTDBFileReceiver.java
index b60dd1d5e1c..7197e112526 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/IoTDBFileReceiver.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/IoTDBFileReceiver.java
@@ -50,6 +50,8 @@ import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.stream.Collectors;
 
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_EXCEPTION_DATA_CONVERT_ON_TYPE_MISMATCH_DEFAULT_VALUE;
+
 /**
  * {@link IoTDBFileReceiver} is the parent class of receiver on both 
configNode and DataNode,
  * handling all the logic of parallel file receiving.
@@ -66,6 +68,9 @@ public abstract class IoTDBFileReceiver implements 
IoTDBReceiver {
   private File writingFile;
   private RandomAccessFile writingFileWriter;
 
+  protected boolean shouldConvertDataTypeOnTypeMismatch =
+      CONNECTOR_EXCEPTION_DATA_CONVERT_ON_TYPE_MISMATCH_DEFAULT_VALUE;
+
   @Override
   public IoTDBConnectorRequestVersion getVersion() {
     return IoTDBConnectorRequestVersion.VERSION_1;
@@ -216,6 +221,13 @@ public abstract class IoTDBFileReceiver implements 
IoTDBReceiver {
       return new TPipeTransferResp(status);
     }
 
+    final String shouldConvertDataTypeOnTypeMismatchString =
+        
req.getParams().get(PipeTransferHandshakeConstant.HANDSHAKE_KEY_CONVERT_ON_TYPE_MISMATCH);
+    if (shouldConvertDataTypeOnTypeMismatchString != null) {
+      shouldConvertDataTypeOnTypeMismatch =
+          Boolean.parseBoolean(shouldConvertDataTypeOnTypeMismatchString);
+    }
+
     // Handle the handshake request as a v1 request.
     // Here we construct a fake "dataNode" request to valid from v1 validation 
logic, though
     // it may not require the actual type of the v1 request.

Reply via email to