This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 5b31438b0b7 Pipe: Support converting data type on data sync receiver
metadata mismatch (#13110)
5b31438b0b7 is described below
commit 5b31438b0b7fb77492f69bdc5948e0788c5191cd
Author: Steve Yurong Su <[email protected]>
AuthorDate: Fri Aug 9 11:17:00 2024 +0800
Pipe: Support converting data type on data sync receiver metadata mismatch
(#13110)
---
.../client/IoTDBConfigNodeSyncClientManager.java | 12 +-
.../protocol/IoTDBConfigRegionAirGapConnector.java | 3 +
.../protocol/IoTDBConfigRegionConnector.java | 10 +-
.../client/IoTDBDataNodeAsyncClientManager.java | 12 +-
.../client/IoTDBDataNodeSyncClientManager.java | 12 +-
.../airgap/IoTDBDataNodeAirGapConnector.java | 3 +
.../async/IoTDBDataRegionAsyncConnector.java | 3 +-
.../thrift/sync/IoTDBDataNodeSyncConnector.java | 11 +-
.../protocol/thrift/IoTDBDataNodeReceiver.java | 40 +-
.../transform/converter/ArrayConverter.java | 940 +++++++++++++++++++++
.../transform/converter/ValueConverter.java | 787 +++++++++++++++++
.../statement/PipeConvertedInsertRowStatement.java | 112 +++
.../PipeConvertedInsertTabletStatement.java | 64 ++
...peStatementDataTypeConvertExecutionVisitor.java | 173 ++++
.../plan/statement/crud/InsertBaseStatement.java | 8 +-
.../plan/statement/crud/InsertRowStatement.java | 12 +-
.../plan/statement/crud/InsertTabletStatement.java | 10 +-
.../apache/iotdb/db/utils/TypeInferenceUtils.java | 2 +-
.../config/constant/PipeConnectorConstant.java | 7 +
.../connector/client/IoTDBSyncClientManager.java | 10 +-
.../common/PipeTransferHandshakeConstant.java | 1 +
.../pipe/connector/protocol/IoTDBConnector.java | 15 +
.../connector/protocol/IoTDBSslSyncConnector.java | 11 +-
.../commons/pipe/receiver/IoTDBFileReceiver.java | 12 +
24 files changed, 2225 insertions(+), 45 deletions(-)
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/client/IoTDBConfigNodeSyncClientManager.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/client/IoTDBConfigNodeSyncClientManager.java
index e420a6b5c6d..fd5b3c9ddb9 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/client/IoTDBConfigNodeSyncClientManager.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/client/IoTDBConfigNodeSyncClientManager.java
@@ -38,8 +38,16 @@ public class IoTDBConfigNodeSyncClientManager extends
IoTDBSyncClientManager {
boolean useSSL,
String trustStorePath,
String trustStorePwd,
- String loadBalanceStrategy) {
- super(endPoints, useSSL, trustStorePath, trustStorePwd, false,
loadBalanceStrategy);
+ String loadBalanceStrategy,
+ boolean shouldReceiverConvertOnTypeMismatch) {
+ super(
+ endPoints,
+ useSSL,
+ trustStorePath,
+ trustStorePwd,
+ false,
+ loadBalanceStrategy,
+ shouldReceiverConvertOnTypeMismatch);
}
@Override
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/protocol/IoTDBConfigRegionAirGapConnector.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/protocol/IoTDBConfigRegionAirGapConnector.java
index 1c5febcb4bf..ba151351c39 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/protocol/IoTDBConfigRegionAirGapConnector.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/protocol/IoTDBConfigRegionAirGapConnector.java
@@ -68,6 +68,9 @@ public class IoTDBConfigRegionAirGapConnector extends
IoTDBAirGapConnector {
params.put(
PipeTransferHandshakeConstant.HANDSHAKE_KEY_TIME_PRECISION,
CommonDescriptor.getInstance().getConfig().getTimestampPrecision());
+ params.put(
+ PipeTransferHandshakeConstant.HANDSHAKE_KEY_CONVERT_ON_TYPE_MISMATCH,
+ Boolean.toString(shouldReceiverConvertOnTypeMismatch));
return PipeTransferConfigNodeHandshakeV2Req.toTPipeTransferBytes(params);
}
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/protocol/IoTDBConfigRegionConnector.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/protocol/IoTDBConfigRegionConnector.java
index 9ba081018b9..96059980c38 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/protocol/IoTDBConfigRegionConnector.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/protocol/IoTDBConfigRegionConnector.java
@@ -62,9 +62,15 @@ public class IoTDBConfigRegionConnector extends
IoTDBSslSyncConnector {
final String trustStorePath,
final String trustStorePwd,
final boolean useLeaderCache,
- final String loadBalanceStrategy) {
+ final String loadBalanceStrategy,
+ final boolean shouldReceiverConvertOnTypeMismatch) {
return new IoTDBConfigNodeSyncClientManager(
- nodeUrls, useSSL, trustStorePath, trustStorePwd, loadBalanceStrategy);
+ nodeUrls,
+ useSSL,
+ trustStorePath,
+ trustStorePwd,
+ loadBalanceStrategy,
+ shouldReceiverConvertOnTypeMismatch);
}
@Override
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeAsyncClientManager.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeAsyncClientManager.java
index eff84ab5a5a..6814f8151b9 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeAsyncClientManager.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeAsyncClientManager.java
@@ -67,8 +67,13 @@ public class IoTDBDataNodeAsyncClientManager extends
IoTDBClientManager
private final LoadBalancer loadBalancer;
+ private final boolean shouldReceiverConvertOnTypeMismatch;
+
public IoTDBDataNodeAsyncClientManager(
- List<TEndPoint> endPoints, boolean useLeaderCache, String
loadBalanceStrategy) {
+ List<TEndPoint> endPoints,
+ boolean useLeaderCache,
+ String loadBalanceStrategy,
+ boolean shouldReceiverConvertOnTypeMismatch) {
super(endPoints, useLeaderCache);
endPointSet = new HashSet<>(endPoints);
@@ -101,6 +106,8 @@ public class IoTDBDataNodeAsyncClientManager extends
IoTDBClientManager
loadBalanceStrategy);
loadBalancer = new RoundRobinLoadBalancer();
}
+
+ this.shouldReceiverConvertOnTypeMismatch =
shouldReceiverConvertOnTypeMismatch;
}
public AsyncPipeDataTransferServiceClient borrowClient() throws Exception {
@@ -209,6 +216,9 @@ public class IoTDBDataNodeAsyncClientManager extends
IoTDBClientManager
params.put(
PipeTransferHandshakeConstant.HANDSHAKE_KEY_TIME_PRECISION,
CommonDescriptor.getInstance().getConfig().getTimestampPrecision());
+ params.put(
+ PipeTransferHandshakeConstant.HANDSHAKE_KEY_CONVERT_ON_TYPE_MISMATCH,
+ Boolean.toString(shouldReceiverConvertOnTypeMismatch));
client.setTimeoutDynamically(PipeConfig.getInstance().getPipeConnectorHandshakeTimeoutMs());
client.pipeTransfer(PipeTransferDataNodeHandshakeV2Req.toTPipeTransferReq(params),
callback);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeSyncClientManager.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeSyncClientManager.java
index 2c23cba4542..df9ae2a0e29 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeSyncClientManager.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeSyncClientManager.java
@@ -48,8 +48,16 @@ public class IoTDBDataNodeSyncClientManager extends
IoTDBSyncClientManager
String trustStorePath,
String trustStorePwd,
boolean useLeaderCache,
- String loadBalanceStrategy) {
- super(endPoints, useSSL, trustStorePath, trustStorePwd, useLeaderCache,
loadBalanceStrategy);
+ String loadBalanceStrategy,
+ boolean shouldReceiverConvertOnTypeMismatch) {
+ super(
+ endPoints,
+ useSSL,
+ trustStorePath,
+ trustStorePwd,
+ useLeaderCache,
+ loadBalanceStrategy,
+ shouldReceiverConvertOnTypeMismatch);
}
@Override
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBDataNodeAirGapConnector.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBDataNodeAirGapConnector.java
index b66c0012700..f0491431196 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBDataNodeAirGapConnector.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBDataNodeAirGapConnector.java
@@ -100,6 +100,9 @@ public abstract class IoTDBDataNodeAirGapConnector extends
IoTDBAirGapConnector
params.put(
PipeTransferHandshakeConstant.HANDSHAKE_KEY_TIME_PRECISION,
CommonDescriptor.getInstance().getConfig().getTimestampPrecision());
+ params.put(
+ PipeTransferHandshakeConstant.HANDSHAKE_KEY_CONVERT_ON_TYPE_MISMATCH,
+ Boolean.toString(shouldReceiverConvertOnTypeMismatch));
return PipeTransferDataNodeHandshakeV2Req.toTPipeTransferBytes(params);
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java
index 255e5adec76..43c5fa6eff2 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java
@@ -124,7 +124,8 @@ public class IoTDBDataRegionAsyncConnector extends
IoTDBConnector {
parameters.getBooleanOrDefault(
Arrays.asList(SINK_LEADER_CACHE_ENABLE_KEY,
CONNECTOR_LEADER_CACHE_ENABLE_KEY),
CONNECTOR_LEADER_CACHE_ENABLE_DEFAULT_VALUE),
- loadBalanceStrategy);
+ loadBalanceStrategy,
+ shouldReceiverConvertOnTypeMismatch);
if (isTabletBatchModeEnabled) {
tabletBatchBuilder = new PipeTransferBatchReqBuilder(parameters);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBDataNodeSyncConnector.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBDataNodeSyncConnector.java
index 9ad83ee84e2..32c886fbe56 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBDataNodeSyncConnector.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBDataNodeSyncConnector.java
@@ -86,10 +86,17 @@ public abstract class IoTDBDataNodeSyncConnector extends
IoTDBSslSyncConnector {
final String trustStorePath,
final String trustStorePwd,
final boolean useLeaderCache,
- final String loadBalanceStrategy) {
+ final String loadBalanceStrategy,
+ final boolean shouldReceiverConvertOnTypeMismatch) {
clientManager =
new IoTDBDataNodeSyncClientManager(
- nodeUrls, useSSL, trustStorePath, trustStorePwd, useLeaderCache,
loadBalanceStrategy);
+ nodeUrls,
+ useSSL,
+ trustStorePath,
+ trustStorePwd,
+ useLeaderCache,
+ loadBalanceStrategy,
+ shouldReceiverConvertOnTypeMismatch);
return clientManager;
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
index 12916ba4506..b70f833a7c5 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
@@ -51,6 +51,7 @@ import
org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransfer
import
org.apache.iotdb.db.pipe.event.common.schema.PipeSchemaRegionSnapshotEvent;
import org.apache.iotdb.db.pipe.metric.PipeDataNodeReceiverMetrics;
import org.apache.iotdb.db.pipe.receiver.visitor.PipePlanToStatementVisitor;
+import
org.apache.iotdb.db.pipe.receiver.visitor.PipeStatementDataTypeConvertExecutionVisitor;
import org.apache.iotdb.db.pipe.receiver.visitor.PipeStatementExceptionVisitor;
import
org.apache.iotdb.db.pipe.receiver.visitor.PipeStatementPatternParseVisitor;
import org.apache.iotdb.db.pipe.receiver.visitor.PipeStatementTSStatusVisitor;
@@ -61,7 +62,6 @@ import
org.apache.iotdb.db.queryengine.common.header.ColumnHeaderConstant;
import org.apache.iotdb.db.queryengine.plan.Coordinator;
import org.apache.iotdb.db.queryengine.plan.analyze.ClusterPartitionFetcher;
import
org.apache.iotdb.db.queryengine.plan.analyze.schema.ClusterSchemaFetcher;
-import org.apache.iotdb.db.queryengine.plan.execution.ExecutionResult;
import
org.apache.iotdb.db.queryengine.plan.execution.config.executor.ClusterConfigTaskExecutor;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.write.view.AlterLogicalViewNode;
import org.apache.iotdb.db.queryengine.plan.statement.Statement;
@@ -117,6 +117,9 @@ public class IoTDBDataNodeReceiver extends
IoTDBFileReceiver {
new PipeStatementExceptionVisitor();
private static final PipeStatementPatternParseVisitor
STATEMENT_PATTERN_PARSE_VISITOR =
new PipeStatementPatternParseVisitor();
+ private static final PipeStatementDataTypeConvertExecutionVisitor
+ STATEMENT_DATA_TYPE_CONVERT_EXECUTION_VISITOR =
+ new
PipeStatementDataTypeConvertExecutionVisitor(IoTDBDataNodeReceiver::executeStatement);
private final PipeStatementToBatchVisitor batchVisitor = new
PipeStatementToBatchVisitor();
// Used for data transfer: confignode (cluster A) -> datanode (cluster B) ->
confignode (cluster
@@ -461,7 +464,7 @@ public class IoTDBDataNodeReceiver extends
IoTDBFileReceiver {
private TSStatus executeStatementAndClassifyExceptions(final Statement
statement) {
try {
- final TSStatus result = executeStatement(statement);
+ final TSStatus result =
executeStatementWithRetryOnDataTypeMismatch(statement);
if (result.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()
|| result.getCode() ==
TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()) {
return result;
@@ -483,25 +486,30 @@ public class IoTDBDataNodeReceiver extends
IoTDBFileReceiver {
}
}
- private TSStatus executeStatement(Statement statement) {
+ private TSStatus executeStatementWithRetryOnDataTypeMismatch(final Statement
statement) {
if (statement == null) {
return RpcUtils.getStatus(
TSStatusCode.PIPE_TRANSFER_EXECUTE_STATEMENT_ERROR, "Execute null
statement.");
}
- statement = new PipeEnrichedStatement(statement);
-
- final ExecutionResult result =
- Coordinator.getInstance()
- .executeForTreeModel(
- statement,
- SessionManager.getInstance().requestQueryId(),
- new SessionInfo(0, AuthorityChecker.SUPER_USER,
ZoneId.systemDefault()),
- "",
- ClusterPartitionFetcher.getInstance(),
- ClusterSchemaFetcher.getInstance(),
-
IoTDBDescriptor.getInstance().getConfig().getQueryTimeoutThreshold());
- return result.status;
+ final TSStatus status = executeStatement(statement);
+ return status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()
+ || !shouldConvertDataTypeOnTypeMismatch
+ ? status
+ : statement.accept(STATEMENT_DATA_TYPE_CONVERT_EXECUTION_VISITOR,
status).orElse(status);
+ }
+
+ private static TSStatus executeStatement(final Statement statement) {
+ return Coordinator.getInstance()
+ .executeForTreeModel(
+ new PipeEnrichedStatement(statement),
+ SessionManager.getInstance().requestQueryId(),
+ new SessionInfo(0, AuthorityChecker.SUPER_USER,
ZoneId.systemDefault()),
+ "",
+ ClusterPartitionFetcher.getInstance(),
+ ClusterSchemaFetcher.getInstance(),
+
IoTDBDescriptor.getInstance().getConfig().getQueryTimeoutThreshold())
+ .status;
}
@Override
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/transform/converter/ArrayConverter.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/transform/converter/ArrayConverter.java
new file mode 100644
index 00000000000..3d5a9b0cfb6
--- /dev/null
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/transform/converter/ArrayConverter.java
@@ -0,0 +1,940 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.receiver.transform.converter;
+
+import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.utils.Binary;
+
+public class ArrayConverter {
+
+ @FunctionalInterface
+ private interface Converter {
+ Object convert(
+ final TSDataType sourceDataType,
+ final TSDataType targetDataType,
+ final Object sourceValues);
+ }
+
+ private static final Converter[][] CONVERTER =
+ new Converter[TSDataType.values().length][TSDataType.values().length];
+
+ private static final Converter DO_NOTHING_CONVERTER =
+ (sourceDataType, targetDataType, sourceValues) -> sourceValues;
+
+ static {
+ for (final TSDataType sourceDataType : TSDataType.values()) {
+ for (final TSDataType targetDataType : TSDataType.values()) {
+ CONVERTER[sourceDataType.ordinal()][targetDataType.ordinal()] =
DO_NOTHING_CONVERTER;
+ }
+ }
+
+ // BOOLEAN
+ CONVERTER[TSDataType.BOOLEAN.ordinal()][TSDataType.BOOLEAN.ordinal()] =
+ (sourceDataType, targetDataType, sourceValues) -> sourceValues;
+ CONVERTER[TSDataType.BOOLEAN.ordinal()][TSDataType.INT32.ordinal()] =
+ (sourceDataType, targetDataType, sourceValues) -> {
+ final boolean[] boolValues = (boolean[]) sourceValues;
+ final int[] intValues = new int[boolValues.length];
+ for (int i = 0; i < boolValues.length; i++) {
+ intValues[i] = ValueConverter.convertBooleanToInt32(boolValues[i]);
+ }
+ return intValues;
+ };
+ CONVERTER[TSDataType.BOOLEAN.ordinal()][TSDataType.INT64.ordinal()] =
+ (sourceDataType, targetDataType, sourceValues) -> {
+ final boolean[] boolValues = (boolean[]) sourceValues;
+ final long[] longValues = new long[boolValues.length];
+ for (int i = 0; i < boolValues.length; i++) {
+ longValues[i] =
ValueConverter.convertBooleanToInt64(boolValues[i]);
+ }
+ return longValues;
+ };
+ CONVERTER[TSDataType.BOOLEAN.ordinal()][TSDataType.FLOAT.ordinal()] =
+ (sourceDataType, targetDataType, sourceValues) -> {
+ final boolean[] boolValues = (boolean[]) sourceValues;
+ final float[] floatValues = new float[boolValues.length];
+ for (int i = 0; i < boolValues.length; i++) {
+ floatValues[i] =
ValueConverter.convertBooleanToFloat(boolValues[i]);
+ }
+ return floatValues;
+ };
+ CONVERTER[TSDataType.BOOLEAN.ordinal()][TSDataType.DOUBLE.ordinal()] =
+ (sourceDataType, targetDataType, sourceValues) -> {
+ final boolean[] boolValues = (boolean[]) sourceValues;
+ final double[] doubleValues = new double[boolValues.length];
+ for (int i = 0; i < boolValues.length; i++) {
+ doubleValues[i] =
ValueConverter.convertBooleanToDouble(boolValues[i]);
+ }
+ return doubleValues;
+ };
+ CONVERTER[TSDataType.BOOLEAN.ordinal()][TSDataType.TEXT.ordinal()] =
+ (sourceDataType, targetDataType, sourceValues) -> {
+ final boolean[] boolValues = (boolean[]) sourceValues;
+ final Binary[] textValues = new Binary[boolValues.length];
+ for (int i = 0; i < boolValues.length; i++) {
+ textValues[i] = ValueConverter.convertBooleanToText(boolValues[i]);
+ }
+ return textValues;
+ };
+ CONVERTER[TSDataType.BOOLEAN.ordinal()][TSDataType.VECTOR.ordinal()] =
DO_NOTHING_CONVERTER;
+ CONVERTER[TSDataType.BOOLEAN.ordinal()][TSDataType.UNKNOWN.ordinal()] =
DO_NOTHING_CONVERTER;
+ CONVERTER[TSDataType.BOOLEAN.ordinal()][TSDataType.TIMESTAMP.ordinal()] =
+ (sourceDataType, targetDataType, sourceValues) -> {
+ final boolean[] boolValues = (boolean[]) sourceValues;
+ final long[] timestampValues = new long[boolValues.length];
+ for (int i = 0; i < boolValues.length; i++) {
+ timestampValues[i] =
ValueConverter.convertBooleanToTimestamp(boolValues[i]);
+ }
+ return timestampValues;
+ };
+ CONVERTER[TSDataType.BOOLEAN.ordinal()][TSDataType.DATE.ordinal()] =
+ (sourceDataType, targetDataType, sourceValues) -> {
+ final boolean[] boolValues = (boolean[]) sourceValues;
+ final int[] dateValues = new int[boolValues.length];
+ for (int i = 0; i < boolValues.length; i++) {
+ dateValues[i] = ValueConverter.convertBooleanToDate(boolValues[i]);
+ }
+ return dateValues;
+ };
+ CONVERTER[TSDataType.BOOLEAN.ordinal()][TSDataType.BLOB.ordinal()] =
+ (sourceDataType, targetDataType, sourceValues) -> {
+ final boolean[] boolValues = (boolean[]) sourceValues;
+ final Binary[] blobValues = new Binary[boolValues.length];
+ for (int i = 0; i < boolValues.length; i++) {
+ blobValues[i] = ValueConverter.convertBooleanToBlob(boolValues[i]);
+ }
+ return blobValues;
+ };
+ CONVERTER[TSDataType.BOOLEAN.ordinal()][TSDataType.STRING.ordinal()] =
+ (sourceDataType, targetDataType, sourceValues) -> {
+ final boolean[] boolValues = (boolean[]) sourceValues;
+ final Binary[] stringValues = new Binary[boolValues.length];
+ for (int i = 0; i < boolValues.length; i++) {
+ stringValues[i] =
ValueConverter.convertBooleanToString(boolValues[i]);
+ }
+ return stringValues;
+ };
+
+ // INT32
+ CONVERTER[TSDataType.INT32.ordinal()][TSDataType.BOOLEAN.ordinal()] =
+ (sourceDataType, targetDataType, sourceValues) -> {
+ final int[] intValues = (int[]) sourceValues;
+ final boolean[] boolValues = new boolean[intValues.length];
+ for (int i = 0; i < intValues.length; i++) {
+ boolValues[i] = ValueConverter.convertInt32ToBoolean(intValues[i]);
+ }
+ return boolValues;
+ };
+ CONVERTER[TSDataType.INT32.ordinal()][TSDataType.INT32.ordinal()] =
+ (sourceDataType, targetDataType, sourceValues) -> sourceValues;
+ CONVERTER[TSDataType.INT32.ordinal()][TSDataType.INT64.ordinal()] =
+ (sourceDataType, targetDataType, sourceValues) -> {
+ final int[] intValues = (int[]) sourceValues;
+ final long[] longValues = new long[intValues.length];
+ for (int i = 0; i < intValues.length; i++) {
+ longValues[i] = ValueConverter.convertInt32ToInt64(intValues[i]);
+ }
+ return longValues;
+ };
+ CONVERTER[TSDataType.INT32.ordinal()][TSDataType.FLOAT.ordinal()] =
+ (sourceDataType, targetDataType, sourceValues) -> {
+ final int[] intValues = (int[]) sourceValues;
+ final float[] floatValues = new float[intValues.length];
+ for (int i = 0; i < intValues.length; i++) {
+ floatValues[i] = ValueConverter.convertInt32ToFloat(intValues[i]);
+ }
+ return floatValues;
+ };
+ CONVERTER[TSDataType.INT32.ordinal()][TSDataType.DOUBLE.ordinal()] =
+ (sourceDataType, targetDataType, sourceValues) -> {
+ final int[] intValues = (int[]) sourceValues;
+ final double[] doubleValues = new double[intValues.length];
+ for (int i = 0; i < intValues.length; i++) {
+ doubleValues[i] =
ValueConverter.convertInt32ToDouble(intValues[i]);
+ }
+ return doubleValues;
+ };
+ CONVERTER[TSDataType.INT32.ordinal()][TSDataType.TEXT.ordinal()] =
+ (sourceDataType, targetDataType, sourceValues) -> {
+ final int[] intValues = (int[]) sourceValues;
+ final Binary[] textValues = new Binary[intValues.length];
+ for (int i = 0; i < intValues.length; i++) {
+ textValues[i] = ValueConverter.convertInt32ToText(intValues[i]);
+ }
+ return textValues;
+ };
+ CONVERTER[TSDataType.INT32.ordinal()][TSDataType.VECTOR.ordinal()] =
DO_NOTHING_CONVERTER;
+ CONVERTER[TSDataType.INT32.ordinal()][TSDataType.UNKNOWN.ordinal()] =
DO_NOTHING_CONVERTER;
+ CONVERTER[TSDataType.INT32.ordinal()][TSDataType.TIMESTAMP.ordinal()] =
+ (sourceDataType, targetDataType, sourceValues) -> {
+ final int[] intValues = (int[]) sourceValues;
+ final long[] timestampValues = new long[intValues.length];
+ for (int i = 0; i < intValues.length; i++) {
+ timestampValues[i] =
ValueConverter.convertInt32ToTimestamp(intValues[i]);
+ }
+ return timestampValues;
+ };
+ CONVERTER[TSDataType.INT32.ordinal()][TSDataType.DATE.ordinal()] =
+ (sourceDataType, targetDataType, sourceValues) -> {
+ final int[] intValues = (int[]) sourceValues;
+ final int[] dateValues = new int[intValues.length];
+ for (int i = 0; i < intValues.length; i++) {
+ dateValues[i] = ValueConverter.convertInt32ToDate(intValues[i]);
+ }
+ return dateValues;
+ };
+ CONVERTER[TSDataType.INT32.ordinal()][TSDataType.BLOB.ordinal()] =
+ (sourceDataType, targetDataType, sourceValues) -> {
+ final int[] intValues = (int[]) sourceValues;
+ final Binary[] blobValues = new Binary[intValues.length];
+ for (int i = 0; i < intValues.length; i++) {
+ blobValues[i] = ValueConverter.convertInt32ToBlob(intValues[i]);
+ }
+ return blobValues;
+ };
+ CONVERTER[TSDataType.INT32.ordinal()][TSDataType.STRING.ordinal()] =
+ (sourceDataType, targetDataType, sourceValues) -> {
+ final int[] intValues = (int[]) sourceValues;
+ final Binary[] stringValues = new Binary[intValues.length];
+ for (int i = 0; i < intValues.length; i++) {
+ stringValues[i] =
ValueConverter.convertInt32ToString(intValues[i]);
+ }
+ return stringValues;
+ };
+
+ // INT64
+ CONVERTER[TSDataType.INT64.ordinal()][TSDataType.BOOLEAN.ordinal()] =
+ (sourceDataType, targetDataType, sourceValues) -> {
+ final long[] longValues = (long[]) sourceValues;
+ final boolean[] boolValues = new boolean[longValues.length];
+ for (int i = 0; i < longValues.length; i++) {
+ boolValues[i] =
ValueConverter.convertInt64ToBoolean(longValues[i]);
+ }
+ return boolValues;
+ };
+ CONVERTER[TSDataType.INT64.ordinal()][TSDataType.INT32.ordinal()] =
+ (sourceDataType, targetDataType, sourceValues) -> {
+ final long[] longValues = (long[]) sourceValues;
+ final int[] intValues = new int[longValues.length];
+ for (int i = 0; i < longValues.length; i++) {
+ intValues[i] = ValueConverter.convertInt64ToInt32(longValues[i]);
+ }
+ return intValues;
+ };
+ CONVERTER[TSDataType.INT64.ordinal()][TSDataType.INT64.ordinal()] =
+ (sourceDataType, targetDataType, sourceValues) -> sourceValues;
+ CONVERTER[TSDataType.INT64.ordinal()][TSDataType.FLOAT.ordinal()] =
+ (sourceDataType, targetDataType, sourceValues) -> {
+ final long[] longValues = (long[]) sourceValues;
+ final float[] floatValues = new float[longValues.length];
+ for (int i = 0; i < longValues.length; i++) {
+ floatValues[i] = ValueConverter.convertInt64ToFloat(longValues[i]);
+ }
+ return floatValues;
+ };
+ CONVERTER[TSDataType.INT64.ordinal()][TSDataType.DOUBLE.ordinal()] =
+ (sourceDataType, targetDataType, sourceValues) -> {
+ final long[] longValues = (long[]) sourceValues;
+ final double[] doubleValues = new double[longValues.length];
+ for (int i = 0; i < longValues.length; i++) {
+ doubleValues[i] =
ValueConverter.convertInt64ToDouble(longValues[i]);
+ }
+ return doubleValues;
+ };
+ CONVERTER[TSDataType.INT64.ordinal()][TSDataType.TEXT.ordinal()] =
+ (sourceDataType, targetDataType, sourceValues) -> {
+ final long[] longValues = (long[]) sourceValues;
+ final Binary[] textValues = new Binary[longValues.length];
+ for (int i = 0; i < longValues.length; i++) {
+ textValues[i] = ValueConverter.convertInt64ToText(longValues[i]);
+ }
+ return textValues;
+ };
+ CONVERTER[TSDataType.INT64.ordinal()][TSDataType.VECTOR.ordinal()] =
DO_NOTHING_CONVERTER;
+ CONVERTER[TSDataType.INT64.ordinal()][TSDataType.UNKNOWN.ordinal()] =
DO_NOTHING_CONVERTER;
+ CONVERTER[TSDataType.INT64.ordinal()][TSDataType.TIMESTAMP.ordinal()] =
+ (sourceDataType, targetDataType, sourceValues) -> {
+ final long[] longValues = (long[]) sourceValues;
+ final long[] timestampValues = new long[longValues.length];
+ for (int i = 0; i < longValues.length; i++) {
+ timestampValues[i] =
ValueConverter.convertInt64ToTimestamp(longValues[i]);
+ }
+ return timestampValues;
+ };
+ CONVERTER[TSDataType.INT64.ordinal()][TSDataType.DATE.ordinal()] =
+ (sourceDataType, targetDataType, sourceValues) -> {
+ final long[] longValues = (long[]) sourceValues;
+ final int[] dateValues = new int[longValues.length];
+ for (int i = 0; i < longValues.length; i++) {
+ dateValues[i] = ValueConverter.convertInt64ToDate(longValues[i]);
+ }
+ return dateValues;
+ };
+ CONVERTER[TSDataType.INT64.ordinal()][TSDataType.BLOB.ordinal()] =
+ (sourceDataType, targetDataType, sourceValues) -> {
+ final long[] longValues = (long[]) sourceValues;
+ final Binary[] blobValues = new Binary[longValues.length];
+ for (int i = 0; i < longValues.length; i++) {
+ blobValues[i] = ValueConverter.convertInt64ToBlob(longValues[i]);
+ }
+ return blobValues;
+ };
+ CONVERTER[TSDataType.INT64.ordinal()][TSDataType.STRING.ordinal()] =
+ (sourceDataType, targetDataType, sourceValues) -> {
+ final long[] longValues = (long[]) sourceValues;
+ final Binary[] stringValues = new Binary[longValues.length];
+ for (int i = 0; i < longValues.length; i++) {
+ stringValues[i] =
ValueConverter.convertInt64ToString(longValues[i]);
+ }
+ return stringValues;
+ };
+
+ // FLOAT
+ CONVERTER[TSDataType.FLOAT.ordinal()][TSDataType.BOOLEAN.ordinal()] =
+ (sourceDataType, targetDataType, sourceValues) -> {
+ final float[] floatValues = (float[]) sourceValues;
+ final boolean[] boolValues = new boolean[floatValues.length];
+ for (int i = 0; i < floatValues.length; i++) {
+ boolValues[i] =
ValueConverter.convertFloatToBoolean(floatValues[i]);
+ }
+ return boolValues;
+ };
+ CONVERTER[TSDataType.FLOAT.ordinal()][TSDataType.INT32.ordinal()] =
+ (sourceDataType, targetDataType, sourceValues) -> {
+ final float[] floatValues = (float[]) sourceValues;
+ final int[] intValues = new int[floatValues.length];
+ for (int i = 0; i < floatValues.length; i++) {
+ intValues[i] = ValueConverter.convertFloatToInt32(floatValues[i]);
+ }
+ return intValues;
+ };
+ CONVERTER[TSDataType.FLOAT.ordinal()][TSDataType.INT64.ordinal()] =
+ (sourceDataType, targetDataType, sourceValues) -> {
+ final float[] floatValues = (float[]) sourceValues;
+ final long[] longValues = new long[floatValues.length];
+ for (int i = 0; i < floatValues.length; i++) {
+ longValues[i] = ValueConverter.convertFloatToInt64(floatValues[i]);
+ }
+ return longValues;
+ };
+ CONVERTER[TSDataType.FLOAT.ordinal()][TSDataType.FLOAT.ordinal()] =
+ (sourceDataType, targetDataType, sourceValues) -> sourceValues;
+ CONVERTER[TSDataType.FLOAT.ordinal()][TSDataType.DOUBLE.ordinal()] =
+ (sourceDataType, targetDataType, sourceValues) -> {
+ final float[] floatValues = (float[]) sourceValues;
+ final double[] doubleValues = new double[floatValues.length];
+ for (int i = 0; i < floatValues.length; i++) {
+ doubleValues[i] =
ValueConverter.convertFloatToDouble(floatValues[i]);
+ }
+ return doubleValues;
+ };
+ CONVERTER[TSDataType.FLOAT.ordinal()][TSDataType.TEXT.ordinal()] =
+ (sourceDataType, targetDataType, sourceValues) -> {
+ final float[] floatValues = (float[]) sourceValues;
+ final Binary[] textValues = new Binary[floatValues.length];
+ for (int i = 0; i < floatValues.length; i++) {
+ textValues[i] = ValueConverter.convertFloatToText(floatValues[i]);
+ }
+ return textValues;
+ };
+ CONVERTER[TSDataType.FLOAT.ordinal()][TSDataType.VECTOR.ordinal()] =
DO_NOTHING_CONVERTER;
+ CONVERTER[TSDataType.FLOAT.ordinal()][TSDataType.UNKNOWN.ordinal()] =
DO_NOTHING_CONVERTER;
+ CONVERTER[TSDataType.FLOAT.ordinal()][TSDataType.TIMESTAMP.ordinal()] =
+ (sourceDataType, targetDataType, sourceValues) -> {
+ final float[] floatValues = (float[]) sourceValues;
+ final long[] timestampValues = new long[floatValues.length];
+ for (int i = 0; i < floatValues.length; i++) {
+ timestampValues[i] =
ValueConverter.convertFloatToTimestamp(floatValues[i]);
+ }
+ return timestampValues;
+ };
+ CONVERTER[TSDataType.FLOAT.ordinal()][TSDataType.DATE.ordinal()] =
+ (sourceDataType, targetDataType, sourceValues) -> {
+ final float[] floatValues = (float[]) sourceValues;
+ final int[] dateValues = new int[floatValues.length];
+ for (int i = 0; i < floatValues.length; i++) {
+ dateValues[i] = ValueConverter.convertFloatToDate(floatValues[i]);
+ }
+ return dateValues;
+ };
+ CONVERTER[TSDataType.FLOAT.ordinal()][TSDataType.BLOB.ordinal()] =
+ (sourceDataType, targetDataType, sourceValues) -> {
+ final float[] floatValues = (float[]) sourceValues;
+ final Binary[] blobValues = new Binary[floatValues.length];
+ for (int i = 0; i < floatValues.length; i++) {
+ blobValues[i] = ValueConverter.convertFloatToBlob(floatValues[i]);
+ }
+ return blobValues;
+ };
+ CONVERTER[TSDataType.FLOAT.ordinal()][TSDataType.STRING.ordinal()] =
+ (sourceDataType, targetDataType, sourceValues) -> {
+ final float[] floatValues = (float[]) sourceValues;
+ final Binary[] stringValues = new Binary[floatValues.length];
+ for (int i = 0; i < floatValues.length; i++) {
+ stringValues[i] =
ValueConverter.convertFloatToString(floatValues[i]);
+ }
+ return stringValues;
+ };
+
+ // DOUBLE
+ CONVERTER[TSDataType.DOUBLE.ordinal()][TSDataType.BOOLEAN.ordinal()] =
+ (sourceDataType, targetDataType, sourceValues) -> {
+ final double[] doubleValues = (double[]) sourceValues;
+ final boolean[] boolValues = new boolean[doubleValues.length];
+ for (int i = 0; i < doubleValues.length; i++) {
+ boolValues[i] =
ValueConverter.convertDoubleToBoolean(doubleValues[i]);
+ }
+ return boolValues;
+ };
+ CONVERTER[TSDataType.DOUBLE.ordinal()][TSDataType.INT32.ordinal()] =
+ (sourceDataType, targetDataType, sourceValues) -> {
+ final double[] doubleValues = (double[]) sourceValues;
+ final int[] intValues = new int[doubleValues.length];
+ for (int i = 0; i < doubleValues.length; i++) {
+ intValues[i] =
ValueConverter.convertDoubleToInt32(doubleValues[i]);
+ }
+ return intValues;
+ };
+ CONVERTER[TSDataType.DOUBLE.ordinal()][TSDataType.INT64.ordinal()] =
+ (sourceDataType, targetDataType, sourceValues) -> {
+ final double[] doubleValues = (double[]) sourceValues;
+ final long[] longValues = new long[doubleValues.length];
+ for (int i = 0; i < doubleValues.length; i++) {
+ longValues[i] =
ValueConverter.convertDoubleToInt64(doubleValues[i]);
+ }
+ return longValues;
+ };
+ CONVERTER[TSDataType.DOUBLE.ordinal()][TSDataType.FLOAT.ordinal()] =
+ (sourceDataType, targetDataType, sourceValues) -> {
+ final double[] doubleValues = (double[]) sourceValues;
+ final float[] floatValues = new float[doubleValues.length];
+ for (int i = 0; i < doubleValues.length; i++) {
+ floatValues[i] =
ValueConverter.convertDoubleToFloat(doubleValues[i]);
+ }
+ return floatValues;
+ };
+ CONVERTER[TSDataType.DOUBLE.ordinal()][TSDataType.DOUBLE.ordinal()] =
+ (sourceDataType, targetDataType, sourceValues) -> sourceValues;
+ CONVERTER[TSDataType.DOUBLE.ordinal()][TSDataType.TEXT.ordinal()] =
+ (sourceDataType, targetDataType, sourceValues) -> {
+ final double[] doubleValues = (double[]) sourceValues;
+ final Binary[] textValues = new Binary[doubleValues.length];
+ for (int i = 0; i < doubleValues.length; i++) {
+ textValues[i] =
ValueConverter.convertDoubleToText(doubleValues[i]);
+ }
+ return textValues;
+ };
+ CONVERTER[TSDataType.DOUBLE.ordinal()][TSDataType.VECTOR.ordinal()] =
DO_NOTHING_CONVERTER;
+ CONVERTER[TSDataType.DOUBLE.ordinal()][TSDataType.UNKNOWN.ordinal()] =
DO_NOTHING_CONVERTER;
+ CONVERTER[TSDataType.DOUBLE.ordinal()][TSDataType.TIMESTAMP.ordinal()] =
+ (sourceDataType, targetDataType, sourceValues) -> {
+ final double[] doubleValues = (double[]) sourceValues;
+ final long[] timestampValues = new long[doubleValues.length];
+ for (int i = 0; i < doubleValues.length; i++) {
+ timestampValues[i] =
ValueConverter.convertDoubleToTimestamp(doubleValues[i]);
+ }
+ return timestampValues;
+ };
+ CONVERTER[TSDataType.DOUBLE.ordinal()][TSDataType.DATE.ordinal()] =
+ (sourceDataType, targetDataType, sourceValues) -> {
+ final double[] doubleValues = (double[]) sourceValues;
+ final int[] dateValues = new int[doubleValues.length];
+ for (int i = 0; i < doubleValues.length; i++) {
+ dateValues[i] =
ValueConverter.convertDoubleToDate(doubleValues[i]);
+ }
+ return dateValues;
+ };
+ CONVERTER[TSDataType.DOUBLE.ordinal()][TSDataType.BLOB.ordinal()] =
+ (sourceDataType, targetDataType, sourceValues) -> {
+ final double[] doubleValues = (double[]) sourceValues;
+ final Binary[] blobValues = new Binary[doubleValues.length];
+ for (int i = 0; i < doubleValues.length; i++) {
+ blobValues[i] =
ValueConverter.convertDoubleToBlob(doubleValues[i]);
+ }
+ return blobValues;
+ };
+ CONVERTER[TSDataType.DOUBLE.ordinal()][TSDataType.STRING.ordinal()] =
+ (sourceDataType, targetDataType, sourceValues) -> {
+ final double[] doubleValues = (double[]) sourceValues;
+ final Binary[] stringValues = new Binary[doubleValues.length];
+ for (int i = 0; i < doubleValues.length; i++) {
+ stringValues[i] =
ValueConverter.convertDoubleToString(doubleValues[i]);
+ }
+ return stringValues;
+ };
+
+ // TEXT
+ CONVERTER[TSDataType.TEXT.ordinal()][TSDataType.BOOLEAN.ordinal()] =
+ (sourceDataType, targetDataType, sourceValues) -> {
+ final Binary[] textValues = (Binary[]) sourceValues;
+ final boolean[] boolValues = new boolean[textValues.length];
+ for (int i = 0; i < textValues.length; i++) {
+ boolValues[i] = ValueConverter.convertTextToBoolean(textValues[i]);
+ }
+ return boolValues;
+ };
+ CONVERTER[TSDataType.TEXT.ordinal()][TSDataType.INT32.ordinal()] =
+ (sourceDataType, targetDataType, sourceValues) -> {
+ final Binary[] textValues = (Binary[]) sourceValues;
+ final int[] intValues = new int[textValues.length];
+ for (int i = 0; i < textValues.length; i++) {
+ intValues[i] = ValueConverter.convertTextToInt32(textValues[i]);
+ }
+ return intValues;
+ };
+ CONVERTER[TSDataType.TEXT.ordinal()][TSDataType.INT64.ordinal()] =
+ (sourceDataType, targetDataType, sourceValues) -> {
+ final Binary[] textValues = (Binary[]) sourceValues;
+ final long[] longValues = new long[textValues.length];
+ for (int i = 0; i < textValues.length; i++) {
+ longValues[i] = ValueConverter.convertTextToInt64(textValues[i]);
+ }
+ return longValues;
+ };
+ CONVERTER[TSDataType.TEXT.ordinal()][TSDataType.FLOAT.ordinal()] =
+ (sourceDataType, targetDataType, sourceValues) -> {
+ final Binary[] textValues = (Binary[]) sourceValues;
+ final float[] floatValues = new float[textValues.length];
+ for (int i = 0; i < textValues.length; i++) {
+ floatValues[i] = ValueConverter.convertTextToFloat(textValues[i]);
+ }
+ return floatValues;
+ };
+ CONVERTER[TSDataType.TEXT.ordinal()][TSDataType.DOUBLE.ordinal()] =
+ (sourceDataType, targetDataType, sourceValues) -> {
+ final Binary[] textValues = (Binary[]) sourceValues;
+ final double[] doubleValues = new double[textValues.length];
+ for (int i = 0; i < textValues.length; i++) {
+ doubleValues[i] =
ValueConverter.convertTextToDouble(textValues[i]);
+ }
+ return doubleValues;
+ };
+ CONVERTER[TSDataType.TEXT.ordinal()][TSDataType.TEXT.ordinal()] =
+ (sourceDataType, targetDataType, sourceValues) -> sourceValues;
+ CONVERTER[TSDataType.TEXT.ordinal()][TSDataType.VECTOR.ordinal()] =
DO_NOTHING_CONVERTER;
+ CONVERTER[TSDataType.TEXT.ordinal()][TSDataType.UNKNOWN.ordinal()] =
DO_NOTHING_CONVERTER;
+ CONVERTER[TSDataType.TEXT.ordinal()][TSDataType.TIMESTAMP.ordinal()] =
+ (sourceDataType, targetDataType, sourceValues) -> {
+ final Binary[] textValues = (Binary[]) sourceValues;
+ final long[] timestampValues = new long[textValues.length];
+ for (int i = 0; i < textValues.length; i++) {
+ timestampValues[i] =
ValueConverter.convertTextToTimestamp(textValues[i]);
+ }
+ return timestampValues;
+ };
+ CONVERTER[TSDataType.TEXT.ordinal()][TSDataType.DATE.ordinal()] =
+ (sourceDataType, targetDataType, sourceValues) -> {
+ final Binary[] textValues = (Binary[]) sourceValues;
+ final int[] dateValues = new int[textValues.length];
+ for (int i = 0; i < textValues.length; i++) {
+ dateValues[i] = ValueConverter.convertTextToDate(textValues[i]);
+ }
+ return dateValues;
+ };
+ CONVERTER[TSDataType.TEXT.ordinal()][TSDataType.BLOB.ordinal()] =
+ (sourceDataType, targetDataType, sourceValues) -> {
+ final Binary[] textValues = (Binary[]) sourceValues;
+ final Binary[] blobValues = new Binary[textValues.length];
+ for (int i = 0; i < textValues.length; i++) {
+ blobValues[i] = ValueConverter.convertTextToBlob(textValues[i]);
+ }
+ return blobValues;
+ };
+ CONVERTER[TSDataType.TEXT.ordinal()][TSDataType.STRING.ordinal()] =
+ (sourceDataType, targetDataType, sourceValues) -> {
+ final Binary[] textValues = (Binary[]) sourceValues;
+ final Binary[] stringValues = new Binary[textValues.length];
+ for (int i = 0; i < textValues.length; i++) {
+ stringValues[i] =
ValueConverter.convertTextToString(textValues[i]);
+ }
+ return stringValues;
+ };
+
+ // VECTOR
+ for (int i = 0; i < TSDataType.values().length; i++) {
+ CONVERTER[TSDataType.VECTOR.ordinal()][i] = DO_NOTHING_CONVERTER;
+ }
+
+ // UNKNOWN
+ for (int i = 0; i < TSDataType.values().length; i++) {
+ CONVERTER[TSDataType.UNKNOWN.ordinal()][i] = DO_NOTHING_CONVERTER;
+ }
+
+ // TIMESTAMP
+ CONVERTER[TSDataType.TIMESTAMP.ordinal()][TSDataType.BOOLEAN.ordinal()] =
+ (sourceDataType, targetDataType, sourceValues) -> {
+ final long[] timestampValues = (long[]) sourceValues;
+ final boolean[] boolValues = new boolean[timestampValues.length];
+ for (int i = 0; i < timestampValues.length; i++) {
+ boolValues[i] =
ValueConverter.convertTimestampToBoolean(timestampValues[i]);
+ }
+ return boolValues;
+ };
+ CONVERTER[TSDataType.TIMESTAMP.ordinal()][TSDataType.INT32.ordinal()] =
+ (sourceDataType, targetDataType, sourceValues) -> {
+ final long[] timestampValues = (long[]) sourceValues;
+ final int[] intValues = new int[timestampValues.length];
+ for (int i = 0; i < timestampValues.length; i++) {
+ intValues[i] =
ValueConverter.convertTimestampToInt32(timestampValues[i]);
+ }
+ return intValues;
+ };
+ CONVERTER[TSDataType.TIMESTAMP.ordinal()][TSDataType.INT64.ordinal()] =
+ (sourceDataType, targetDataType, sourceValues) -> {
+ final long[] timestampValues = (long[]) sourceValues;
+ final long[] longValues = new long[timestampValues.length];
+ for (int i = 0; i < timestampValues.length; i++) {
+ longValues[i] =
ValueConverter.convertTimestampToInt64(timestampValues[i]);
+ }
+ return longValues;
+ };
+ CONVERTER[TSDataType.TIMESTAMP.ordinal()][TSDataType.FLOAT.ordinal()] =
+ (sourceDataType, targetDataType, sourceValues) -> {
+ final long[] timestampValues = (long[]) sourceValues;
+ final float[] floatValues = new float[timestampValues.length];
+ for (int i = 0; i < timestampValues.length; i++) {
+ floatValues[i] =
ValueConverter.convertTimestampToFloat(timestampValues[i]);
+ }
+ return floatValues;
+ };
+ CONVERTER[TSDataType.TIMESTAMP.ordinal()][TSDataType.DOUBLE.ordinal()] =
+ (sourceDataType, targetDataType, sourceValues) -> {
+ final long[] timestampValues = (long[]) sourceValues;
+ final double[] doubleValues = new double[timestampValues.length];
+ for (int i = 0; i < timestampValues.length; i++) {
+ doubleValues[i] =
ValueConverter.convertTimestampToDouble(timestampValues[i]);
+ }
+ return doubleValues;
+ };
+ CONVERTER[TSDataType.TIMESTAMP.ordinal()][TSDataType.TEXT.ordinal()] =
+ (sourceDataType, targetDataType, sourceValues) -> {
+ final long[] timestampValues = (long[]) sourceValues;
+ final Binary[] textValues = new Binary[timestampValues.length];
+ for (int i = 0; i < timestampValues.length; i++) {
+ textValues[i] =
ValueConverter.convertTimestampToText(timestampValues[i]);
+ }
+ return textValues;
+ };
+ CONVERTER[TSDataType.TIMESTAMP.ordinal()][TSDataType.VECTOR.ordinal()] =
DO_NOTHING_CONVERTER;
+ CONVERTER[TSDataType.TIMESTAMP.ordinal()][TSDataType.UNKNOWN.ordinal()] =
DO_NOTHING_CONVERTER;
+ CONVERTER[TSDataType.TIMESTAMP.ordinal()][TSDataType.TIMESTAMP.ordinal()] =
+ (sourceDataType, targetDataType, sourceValues) -> sourceValues;
+ CONVERTER[TSDataType.TIMESTAMP.ordinal()][TSDataType.DATE.ordinal()] =
+ (sourceDataType, targetDataType, sourceValues) -> {
+ final long[] timestampValues = (long[]) sourceValues;
+ final int[] dateValues = new int[timestampValues.length];
+ for (int i = 0; i < timestampValues.length; i++) {
+ dateValues[i] =
ValueConverter.convertTimestampToDate(timestampValues[i]);
+ }
+ return dateValues;
+ };
+ CONVERTER[TSDataType.TIMESTAMP.ordinal()][TSDataType.BLOB.ordinal()] =
+ (sourceDataType, targetDataType, sourceValues) -> {
+ final long[] timestampValues = (long[]) sourceValues;
+ final Binary[] blobValues = new Binary[timestampValues.length];
+ for (int i = 0; i < timestampValues.length; i++) {
+ blobValues[i] =
ValueConverter.convertTimestampToBlob(timestampValues[i]);
+ }
+ return blobValues;
+ };
+ CONVERTER[TSDataType.TIMESTAMP.ordinal()][TSDataType.STRING.ordinal()] =
+ (sourceDataType, targetDataType, sourceValues) -> {
+ final long[] timestampValues = (long[]) sourceValues;
+ final Binary[] stringValues = new Binary[timestampValues.length];
+ for (int i = 0; i < timestampValues.length; i++) {
+ stringValues[i] =
ValueConverter.convertTimestampToString(timestampValues[i]);
+ }
+ return stringValues;
+ };
+
+ // DATE
+ CONVERTER[TSDataType.DATE.ordinal()][TSDataType.BOOLEAN.ordinal()] =
+ (sourceDataType, targetDataType, sourceValues) -> {
+ final int[] dateValues = (int[]) sourceValues;
+ final boolean[] boolValues = new boolean[dateValues.length];
+ for (int i = 0; i < dateValues.length; i++) {
+ boolValues[i] = ValueConverter.convertDateToBoolean(dateValues[i]);
+ }
+ return boolValues;
+ };
+ CONVERTER[TSDataType.DATE.ordinal()][TSDataType.INT32.ordinal()] =
+ (sourceDataType, targetDataType, sourceValues) -> {
+ final int[] dateValues = (int[]) sourceValues;
+ final int[] intValues = new int[dateValues.length];
+ for (int i = 0; i < dateValues.length; i++) {
+ intValues[i] = ValueConverter.convertDateToInt32(dateValues[i]);
+ }
+ return intValues;
+ };
+ CONVERTER[TSDataType.DATE.ordinal()][TSDataType.INT64.ordinal()] =
+ (sourceDataType, targetDataType, sourceValues) -> {
+ final int[] dateValues = (int[]) sourceValues;
+ final long[] longValues = new long[dateValues.length];
+ for (int i = 0; i < dateValues.length; i++) {
+ longValues[i] = ValueConverter.convertDateToInt64(dateValues[i]);
+ }
+ return longValues;
+ };
+ CONVERTER[TSDataType.DATE.ordinal()][TSDataType.FLOAT.ordinal()] =
+ (sourceDataType, targetDataType, sourceValues) -> {
+ final int[] dateValues = (int[]) sourceValues;
+ final float[] floatValues = new float[dateValues.length];
+ for (int i = 0; i < dateValues.length; i++) {
+ floatValues[i] = ValueConverter.convertDateToFloat(dateValues[i]);
+ }
+ return floatValues;
+ };
+ CONVERTER[TSDataType.DATE.ordinal()][TSDataType.DOUBLE.ordinal()] =
+ (sourceDataType, targetDataType, sourceValues) -> {
+ final int[] dateValues = (int[]) sourceValues;
+ final double[] doubleValues = new double[dateValues.length];
+ for (int i = 0; i < dateValues.length; i++) {
+ doubleValues[i] =
ValueConverter.convertDateToDouble(dateValues[i]);
+ }
+ return doubleValues;
+ };
+ CONVERTER[TSDataType.DATE.ordinal()][TSDataType.TEXT.ordinal()] =
+ (sourceDataType, targetDataType, sourceValues) -> {
+ final int[] dateValues = (int[]) sourceValues;
+ final Binary[] textValues = new Binary[dateValues.length];
+ for (int i = 0; i < dateValues.length; i++) {
+ textValues[i] = ValueConverter.convertDateToText(dateValues[i]);
+ }
+ return textValues;
+ };
+ CONVERTER[TSDataType.DATE.ordinal()][TSDataType.VECTOR.ordinal()] =
DO_NOTHING_CONVERTER;
+ CONVERTER[TSDataType.DATE.ordinal()][TSDataType.UNKNOWN.ordinal()] =
DO_NOTHING_CONVERTER;
+ CONVERTER[TSDataType.DATE.ordinal()][TSDataType.TIMESTAMP.ordinal()] =
+ (sourceDataType, targetDataType, sourceValues) -> {
+ final int[] dateValues = (int[]) sourceValues;
+ final long[] timestampValues = new long[dateValues.length];
+ for (int i = 0; i < dateValues.length; i++) {
+ timestampValues[i] =
ValueConverter.convertDateToTimestamp(dateValues[i]);
+ }
+ return timestampValues;
+ };
+ CONVERTER[TSDataType.DATE.ordinal()][TSDataType.DATE.ordinal()] =
+ (sourceDataType, targetDataType, sourceValues) -> sourceValues;
+ CONVERTER[TSDataType.DATE.ordinal()][TSDataType.BLOB.ordinal()] =
+ (sourceDataType, targetDataType, sourceValues) -> {
+ final int[] dateValues = (int[]) sourceValues;
+ final Binary[] blobValues = new Binary[dateValues.length];
+ for (int i = 0; i < dateValues.length; i++) {
+ blobValues[i] = ValueConverter.convertDateToBlob(dateValues[i]);
+ }
+ return blobValues;
+ };
+ CONVERTER[TSDataType.DATE.ordinal()][TSDataType.STRING.ordinal()] =
+ (sourceDataType, targetDataType, sourceValues) -> {
+ final int[] dateValues = (int[]) sourceValues;
+ final Binary[] stringValues = new Binary[dateValues.length];
+ for (int i = 0; i < dateValues.length; i++) {
+ stringValues[i] =
ValueConverter.convertDateToString(dateValues[i]);
+ }
+ return stringValues;
+ };
+
+ // BLOB
+ CONVERTER[TSDataType.BLOB.ordinal()][TSDataType.BOOLEAN.ordinal()] =
+ (sourceDataType, targetDataType, sourceValues) -> {
+ final Binary[] blobValues = (Binary[]) sourceValues;
+ final boolean[] boolValues = new boolean[blobValues.length];
+ for (int i = 0; i < blobValues.length; i++) {
+ boolValues[i] = ValueConverter.convertBlobToBoolean(blobValues[i]);
+ }
+ return boolValues;
+ };
+ CONVERTER[TSDataType.BLOB.ordinal()][TSDataType.INT32.ordinal()] =
+ (sourceDataType, targetDataType, sourceValues) -> {
+ final Binary[] blobValues = (Binary[]) sourceValues;
+ final int[] intValues = new int[blobValues.length];
+ for (int i = 0; i < blobValues.length; i++) {
+ intValues[i] = ValueConverter.convertBlobToInt32(blobValues[i]);
+ }
+ return intValues;
+ };
+ CONVERTER[TSDataType.BLOB.ordinal()][TSDataType.INT64.ordinal()] =
+ (sourceDataType, targetDataType, sourceValues) -> {
+ final Binary[] blobValues = (Binary[]) sourceValues;
+ final long[] longValues = new long[blobValues.length];
+ for (int i = 0; i < blobValues.length; i++) {
+ longValues[i] = ValueConverter.convertBlobToInt64(blobValues[i]);
+ }
+ return longValues;
+ };
+ CONVERTER[TSDataType.BLOB.ordinal()][TSDataType.FLOAT.ordinal()] =
+ (sourceDataType, targetDataType, sourceValues) -> {
+ final Binary[] blobValues = (Binary[]) sourceValues;
+ final float[] floatValues = new float[blobValues.length];
+ for (int i = 0; i < blobValues.length; i++) {
+ floatValues[i] = ValueConverter.convertBlobToFloat(blobValues[i]);
+ }
+ return floatValues;
+ };
+ CONVERTER[TSDataType.BLOB.ordinal()][TSDataType.DOUBLE.ordinal()] =
+ (sourceDataType, targetDataType, sourceValues) -> {
+ final Binary[] blobValues = (Binary[]) sourceValues;
+ final double[] doubleValues = new double[blobValues.length];
+ for (int i = 0; i < blobValues.length; i++) {
+ doubleValues[i] =
ValueConverter.convertBlobToDouble(blobValues[i]);
+ }
+ return doubleValues;
+ };
+ CONVERTER[TSDataType.BLOB.ordinal()][TSDataType.TEXT.ordinal()] =
+ (sourceDataType, targetDataType, sourceValues) -> {
+ final Binary[] blobValues = (Binary[]) sourceValues;
+ final Binary[] textValues = new Binary[blobValues.length];
+ for (int i = 0; i < blobValues.length; i++) {
+ textValues[i] = ValueConverter.convertBlobToText(blobValues[i]);
+ }
+ return textValues;
+ };
+ CONVERTER[TSDataType.BLOB.ordinal()][TSDataType.VECTOR.ordinal()] =
DO_NOTHING_CONVERTER;
+ CONVERTER[TSDataType.BLOB.ordinal()][TSDataType.UNKNOWN.ordinal()] =
DO_NOTHING_CONVERTER;
+ CONVERTER[TSDataType.BLOB.ordinal()][TSDataType.TIMESTAMP.ordinal()] =
+ (sourceDataType, targetDataType, sourceValues) -> {
+ final Binary[] blobValues = (Binary[]) sourceValues;
+ final long[] timestampValues = new long[blobValues.length];
+ for (int i = 0; i < blobValues.length; i++) {
+ timestampValues[i] =
ValueConverter.convertBlobToTimestamp(blobValues[i]);
+ }
+ return timestampValues;
+ };
+ CONVERTER[TSDataType.BLOB.ordinal()][TSDataType.DATE.ordinal()] =
+ (sourceDataType, targetDataType, sourceValues) -> {
+ final Binary[] blobValues = (Binary[]) sourceValues;
+ final int[] dateValues = new int[blobValues.length];
+ for (int i = 0; i < blobValues.length; i++) {
+ dateValues[i] = ValueConverter.convertBlobToDate(blobValues[i]);
+ }
+ return dateValues;
+ };
+ CONVERTER[TSDataType.BLOB.ordinal()][TSDataType.BLOB.ordinal()] =
+ (sourceDataType, targetDataType, sourceValues) -> sourceValues;
+ CONVERTER[TSDataType.BLOB.ordinal()][TSDataType.STRING.ordinal()] =
+ (sourceDataType, targetDataType, sourceValues) -> {
+ final Binary[] blobValues = (Binary[]) sourceValues;
+ final Binary[] stringValues = new Binary[blobValues.length];
+ for (int i = 0; i < blobValues.length; i++) {
+ stringValues[i] =
ValueConverter.convertBlobToString(blobValues[i]);
+ }
+ return stringValues;
+ };
+
+ // STRING
+ CONVERTER[TSDataType.STRING.ordinal()][TSDataType.BOOLEAN.ordinal()] =
+ (sourceDataType, targetDataType, sourceValues) -> {
+ final Binary[] stringValues = (Binary[]) sourceValues;
+ final boolean[] boolValues = new boolean[stringValues.length];
+ for (int i = 0; i < stringValues.length; i++) {
+ boolValues[i] =
ValueConverter.convertStringToBoolean(stringValues[i]);
+ }
+ return boolValues;
+ };
+ CONVERTER[TSDataType.STRING.ordinal()][TSDataType.INT32.ordinal()] =
+ (sourceDataType, targetDataType, sourceValues) -> {
+ final Binary[] stringValues = (Binary[]) sourceValues;
+ final int[] intValues = new int[stringValues.length];
+ for (int i = 0; i < stringValues.length; i++) {
+ intValues[i] =
ValueConverter.convertStringToInt32(stringValues[i]);
+ }
+ return intValues;
+ };
+ CONVERTER[TSDataType.STRING.ordinal()][TSDataType.INT64.ordinal()] =
+ (sourceDataType, targetDataType, sourceValues) -> {
+ final Binary[] stringValues = (Binary[]) sourceValues;
+ final long[] longValues = new long[stringValues.length];
+ for (int i = 0; i < stringValues.length; i++) {
+ longValues[i] =
ValueConverter.convertStringToInt64(stringValues[i]);
+ }
+ return longValues;
+ };
+ CONVERTER[TSDataType.STRING.ordinal()][TSDataType.FLOAT.ordinal()] =
+ (sourceDataType, targetDataType, sourceValues) -> {
+ final Binary[] stringValues = (Binary[]) sourceValues;
+ final float[] floatValues = new float[stringValues.length];
+ for (int i = 0; i < stringValues.length; i++) {
+ floatValues[i] =
ValueConverter.convertStringToFloat(stringValues[i]);
+ }
+ return floatValues;
+ };
+ CONVERTER[TSDataType.STRING.ordinal()][TSDataType.DOUBLE.ordinal()] =
+ (sourceDataType, targetDataType, sourceValues) -> {
+ final Binary[] stringValues = (Binary[]) sourceValues;
+ final double[] doubleValues = new double[stringValues.length];
+ for (int i = 0; i < stringValues.length; i++) {
+ doubleValues[i] =
ValueConverter.convertStringToDouble(stringValues[i]);
+ }
+ return doubleValues;
+ };
+ CONVERTER[TSDataType.STRING.ordinal()][TSDataType.TEXT.ordinal()] =
+ (sourceDataType, targetDataType, sourceValues) -> {
+ final Binary[] stringValues = (Binary[]) sourceValues;
+ final Binary[] textValues = new Binary[stringValues.length];
+ for (int i = 0; i < stringValues.length; i++) {
+ textValues[i] =
ValueConverter.convertStringToText(stringValues[i]);
+ }
+ return textValues;
+ };
+ CONVERTER[TSDataType.STRING.ordinal()][TSDataType.VECTOR.ordinal()] =
DO_NOTHING_CONVERTER;
+ CONVERTER[TSDataType.STRING.ordinal()][TSDataType.UNKNOWN.ordinal()] =
DO_NOTHING_CONVERTER;
+ CONVERTER[TSDataType.STRING.ordinal()][TSDataType.TIMESTAMP.ordinal()] =
+ (sourceDataType, targetDataType, sourceValues) -> {
+ final Binary[] stringValues = (Binary[]) sourceValues;
+ final long[] timestampValues = new long[stringValues.length];
+ for (int i = 0; i < stringValues.length; i++) {
+ timestampValues[i] =
ValueConverter.convertStringToTimestamp(stringValues[i]);
+ }
+ return timestampValues;
+ };
+ CONVERTER[TSDataType.STRING.ordinal()][TSDataType.DATE.ordinal()] =
+ (sourceDataType, targetDataType, sourceValues) -> {
+ final Binary[] stringValues = (Binary[]) sourceValues;
+ final int[] dateValues = new int[stringValues.length];
+ for (int i = 0; i < stringValues.length; i++) {
+ dateValues[i] =
ValueConverter.convertStringToDate(stringValues[i]);
+ }
+ return dateValues;
+ };
+ CONVERTER[TSDataType.STRING.ordinal()][TSDataType.BLOB.ordinal()] =
+ (sourceDataType, targetDataType, sourceValues) -> {
+ final Binary[] stringValues = (Binary[]) sourceValues;
+ final Binary[] blobValues = new Binary[stringValues.length];
+ for (int i = 0; i < stringValues.length; i++) {
+ blobValues[i] =
ValueConverter.convertStringToBlob(stringValues[i]);
+ }
+ return blobValues;
+ };
+ CONVERTER[TSDataType.STRING.ordinal()][TSDataType.STRING.ordinal()] =
+ (sourceDataType, targetDataType, sourceValues) -> sourceValues;
+ }
+
+ public static Object convert(
+ final TSDataType sourceDataType, final TSDataType targetDataType, final
Object sourceValues) {
+ return sourceValues == null
+ ? null
+ :
CONVERTER[sourceDataType.ordinal()][targetDataType.ordinal()].convert(
+ sourceDataType, targetDataType, sourceValues);
+ }
+
+ private ArrayConverter() {
+ // forbidden to construct
+ }
+}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/transform/converter/ValueConverter.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/transform/converter/ValueConverter.java
new file mode 100644
index 00000000000..098792fd0b3
--- /dev/null
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/transform/converter/ValueConverter.java
@@ -0,0 +1,787 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.receiver.transform.converter;
+
+import org.apache.iotdb.db.utils.DateTimeUtils;
+import org.apache.iotdb.db.utils.TypeInferenceUtils;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.tsfile.common.conf.TSFileConfig;
+import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.utils.Binary;
+
+import java.time.ZoneId;
+
+public class ValueConverter {
+
+ @FunctionalInterface
+ private interface Converter {
+ Object convert(
+ final TSDataType sourceDataType, final TSDataType targetDataType,
final Object sourceValue);
+ }
+
+ private static final Converter[][] CONVERTER =
+ new Converter[TSDataType.values().length][TSDataType.values().length];
+
+ private static final Converter DO_NOTHING_CONVERTER =
+ (sourceDataType, targetDataType, sourceValue) -> sourceValue;
+
+ static {
+ for (final TSDataType sourceDataType : TSDataType.values()) {
+ for (final TSDataType targetDataType : TSDataType.values()) {
+ CONVERTER[sourceDataType.ordinal()][targetDataType.ordinal()] =
DO_NOTHING_CONVERTER;
+ }
+ }
+
+ // BOOLEAN
+ CONVERTER[TSDataType.BOOLEAN.ordinal()][TSDataType.BOOLEAN.ordinal()] =
DO_NOTHING_CONVERTER;
+ CONVERTER[TSDataType.BOOLEAN.ordinal()][TSDataType.INT32.ordinal()] =
+ (sourceDataType, targetDataType, sourceValue) ->
+ convertBooleanToInt32((boolean) sourceValue);
+ CONVERTER[TSDataType.BOOLEAN.ordinal()][TSDataType.INT64.ordinal()] =
+ (sourceDataType, targetDataType, sourceValue) ->
+ convertBooleanToInt64((boolean) sourceValue);
+ CONVERTER[TSDataType.BOOLEAN.ordinal()][TSDataType.FLOAT.ordinal()] =
+ (sourceDataType, targetDataType, sourceValue) ->
+ convertBooleanToFloat((boolean) sourceValue);
+ CONVERTER[TSDataType.BOOLEAN.ordinal()][TSDataType.DOUBLE.ordinal()] =
+ (sourceDataType, targetDataType, sourceValue) ->
+ convertBooleanToDouble((boolean) sourceValue);
+ CONVERTER[TSDataType.BOOLEAN.ordinal()][TSDataType.TEXT.ordinal()] =
+ (sourceDataType, targetDataType, sourceValue) ->
+ convertBooleanToText((boolean) sourceValue);
+ CONVERTER[TSDataType.BOOLEAN.ordinal()][TSDataType.TIMESTAMP.ordinal()] =
+ (sourceDataType, targetDataType, sourceValue) ->
+ convertBooleanToTimestamp((boolean) sourceValue);
+ CONVERTER[TSDataType.BOOLEAN.ordinal()][TSDataType.DATE.ordinal()] =
+ (sourceDataType, targetDataType, sourceValue) ->
+ convertBooleanToDate((boolean) sourceValue);
+ CONVERTER[TSDataType.BOOLEAN.ordinal()][TSDataType.BLOB.ordinal()] =
+ (sourceDataType, targetDataType, sourceValue) ->
+ convertBooleanToBlob((boolean) sourceValue);
+ CONVERTER[TSDataType.BOOLEAN.ordinal()][TSDataType.STRING.ordinal()] =
+ (sourceDataType, targetDataType, sourceValue) ->
+ convertBooleanToString((boolean) sourceValue);
+
+ // INT32
+ CONVERTER[TSDataType.INT32.ordinal()][TSDataType.BOOLEAN.ordinal()] =
+ (sourceDataType, targetDataType, sourceValue) ->
convertInt32ToBoolean((int) sourceValue);
+ CONVERTER[TSDataType.INT32.ordinal()][TSDataType.INT32.ordinal()] =
DO_NOTHING_CONVERTER;
+ CONVERTER[TSDataType.INT32.ordinal()][TSDataType.INT64.ordinal()] =
+ (sourceDataType, targetDataType, sourceValue) ->
convertInt32ToInt64((int) sourceValue);
+ CONVERTER[TSDataType.INT32.ordinal()][TSDataType.FLOAT.ordinal()] =
+ (sourceDataType, targetDataType, sourceValue) ->
convertInt32ToFloat((int) sourceValue);
+ CONVERTER[TSDataType.INT32.ordinal()][TSDataType.DOUBLE.ordinal()] =
+ (sourceDataType, targetDataType, sourceValue) ->
convertInt32ToDouble((int) sourceValue);
+ CONVERTER[TSDataType.INT32.ordinal()][TSDataType.TEXT.ordinal()] =
+ (sourceDataType, targetDataType, sourceValue) ->
convertInt32ToText((int) sourceValue);
+ CONVERTER[TSDataType.INT32.ordinal()][TSDataType.TIMESTAMP.ordinal()] =
+ (sourceDataType, targetDataType, sourceValue) ->
convertInt32ToTimestamp((int) sourceValue);
+ CONVERTER[TSDataType.INT32.ordinal()][TSDataType.DATE.ordinal()] =
+ (sourceDataType, targetDataType, sourceValue) ->
convertInt32ToDate((int) sourceValue);
+ CONVERTER[TSDataType.INT32.ordinal()][TSDataType.BLOB.ordinal()] =
+ (sourceDataType, targetDataType, sourceValue) ->
convertInt32ToBlob((int) sourceValue);
+ CONVERTER[TSDataType.INT32.ordinal()][TSDataType.STRING.ordinal()] =
+ (sourceDataType, targetDataType, sourceValue) ->
convertInt32ToString((int) sourceValue);
+
+ // INT64
+ CONVERTER[TSDataType.INT64.ordinal()][TSDataType.BOOLEAN.ordinal()] =
+ (sourceDataType, targetDataType, sourceValue) ->
convertInt64ToBoolean((long) sourceValue);
+ CONVERTER[TSDataType.INT64.ordinal()][TSDataType.INT32.ordinal()] =
+ (sourceDataType, targetDataType, sourceValue) ->
convertInt64ToInt32((long) sourceValue);
+ CONVERTER[TSDataType.INT64.ordinal()][TSDataType.INT64.ordinal()] =
DO_NOTHING_CONVERTER;
+ CONVERTER[TSDataType.INT64.ordinal()][TSDataType.FLOAT.ordinal()] =
+ (sourceDataType, targetDataType, sourceValue) ->
convertInt64ToFloat((long) sourceValue);
+ CONVERTER[TSDataType.INT64.ordinal()][TSDataType.DOUBLE.ordinal()] =
+ (sourceDataType, targetDataType, sourceValue) ->
convertInt64ToDouble((long) sourceValue);
+ CONVERTER[TSDataType.INT64.ordinal()][TSDataType.TEXT.ordinal()] =
+ (sourceDataType, targetDataType, sourceValue) ->
convertInt64ToText((long) sourceValue);
+ CONVERTER[TSDataType.INT64.ordinal()][TSDataType.TIMESTAMP.ordinal()] =
+ (sourceDataType, targetDataType, sourceValue) ->
+ convertInt64ToTimestamp((long) sourceValue);
+ CONVERTER[TSDataType.INT64.ordinal()][TSDataType.DATE.ordinal()] =
+ (sourceDataType, targetDataType, sourceValue) ->
convertInt64ToDate((long) sourceValue);
+ CONVERTER[TSDataType.INT64.ordinal()][TSDataType.BLOB.ordinal()] =
+ (sourceDataType, targetDataType, sourceValue) ->
convertInt64ToBlob((long) sourceValue);
+ CONVERTER[TSDataType.INT64.ordinal()][TSDataType.STRING.ordinal()] =
+ (sourceDataType, targetDataType, sourceValue) ->
convertInt64ToString((long) sourceValue);
+
+ // FLOAT
+ CONVERTER[TSDataType.FLOAT.ordinal()][TSDataType.BOOLEAN.ordinal()] =
+ (sourceDataType, targetDataType, sourceValue) ->
convertFloatToBoolean((float) sourceValue);
+ CONVERTER[TSDataType.FLOAT.ordinal()][TSDataType.INT32.ordinal()] =
+ (sourceDataType, targetDataType, sourceValue) ->
convertFloatToInt32((float) sourceValue);
+ CONVERTER[TSDataType.FLOAT.ordinal()][TSDataType.INT64.ordinal()] =
+ (sourceDataType, targetDataType, sourceValue) ->
convertFloatToInt64((float) sourceValue);
+ CONVERTER[TSDataType.FLOAT.ordinal()][TSDataType.FLOAT.ordinal()] =
DO_NOTHING_CONVERTER;
+ CONVERTER[TSDataType.FLOAT.ordinal()][TSDataType.DOUBLE.ordinal()] =
+ (sourceDataType, targetDataType, sourceValue) ->
convertFloatToDouble((float) sourceValue);
+ CONVERTER[TSDataType.FLOAT.ordinal()][TSDataType.TEXT.ordinal()] =
+ (sourceDataType, targetDataType, sourceValue) ->
convertFloatToText((float) sourceValue);
+ CONVERTER[TSDataType.FLOAT.ordinal()][TSDataType.TIMESTAMP.ordinal()] =
+ (sourceDataType, targetDataType, sourceValue) ->
+ convertFloatToTimestamp((float) sourceValue);
+ CONVERTER[TSDataType.FLOAT.ordinal()][TSDataType.DATE.ordinal()] =
+ (sourceDataType, targetDataType, sourceValue) ->
convertFloatToDate((float) sourceValue);
+ CONVERTER[TSDataType.FLOAT.ordinal()][TSDataType.BLOB.ordinal()] =
+ (sourceDataType, targetDataType, sourceValue) ->
convertFloatToBlob((float) sourceValue);
+ CONVERTER[TSDataType.FLOAT.ordinal()][TSDataType.STRING.ordinal()] =
+ (sourceDataType, targetDataType, sourceValue) ->
convertFloatToString((float) sourceValue);
+
+ // DOUBLE
+ CONVERTER[TSDataType.DOUBLE.ordinal()][TSDataType.BOOLEAN.ordinal()] =
+ (sourceDataType, targetDataType, sourceValue) ->
+ convertDoubleToBoolean((double) sourceValue);
+ CONVERTER[TSDataType.DOUBLE.ordinal()][TSDataType.INT32.ordinal()] =
+ (sourceDataType, targetDataType, sourceValue) ->
convertDoubleToInt32((double) sourceValue);
+ CONVERTER[TSDataType.DOUBLE.ordinal()][TSDataType.INT64.ordinal()] =
+ (sourceDataType, targetDataType, sourceValue) ->
convertDoubleToInt64((double) sourceValue);
+ CONVERTER[TSDataType.DOUBLE.ordinal()][TSDataType.FLOAT.ordinal()] =
+ (sourceDataType, targetDataType, sourceValue) ->
convertDoubleToFloat((double) sourceValue);
+ CONVERTER[TSDataType.DOUBLE.ordinal()][TSDataType.DOUBLE.ordinal()] =
DO_NOTHING_CONVERTER;
+ CONVERTER[TSDataType.DOUBLE.ordinal()][TSDataType.TEXT.ordinal()] =
+ (sourceDataType, targetDataType, sourceValue) ->
convertDoubleToText((double) sourceValue);
+ CONVERTER[TSDataType.DOUBLE.ordinal()][TSDataType.TIMESTAMP.ordinal()] =
+ (sourceDataType, targetDataType, sourceValue) ->
+ convertDoubleToTimestamp((double) sourceValue);
+ CONVERTER[TSDataType.DOUBLE.ordinal()][TSDataType.DATE.ordinal()] =
+ (sourceDataType, targetDataType, sourceValue) ->
convertDoubleToDate((double) sourceValue);
+ CONVERTER[TSDataType.DOUBLE.ordinal()][TSDataType.BLOB.ordinal()] =
+ (sourceDataType, targetDataType, sourceValue) ->
convertDoubleToBlob((double) sourceValue);
+ CONVERTER[TSDataType.DOUBLE.ordinal()][TSDataType.STRING.ordinal()] =
+ (sourceDataType, targetDataType, sourceValue) ->
+ convertDoubleToString((double) sourceValue);
+
+ // TEXT
+ CONVERTER[TSDataType.TEXT.ordinal()][TSDataType.BOOLEAN.ordinal()] =
+ (sourceDataType, targetDataType, sourceValue) ->
convertTextToBoolean((Binary) sourceValue);
+ CONVERTER[TSDataType.TEXT.ordinal()][TSDataType.INT32.ordinal()] =
+ (sourceDataType, targetDataType, sourceValue) ->
convertTextToInt32((Binary) sourceValue);
+ CONVERTER[TSDataType.TEXT.ordinal()][TSDataType.INT64.ordinal()] =
+ (sourceDataType, targetDataType, sourceValue) ->
convertTextToInt64((Binary) sourceValue);
+ CONVERTER[TSDataType.TEXT.ordinal()][TSDataType.FLOAT.ordinal()] =
+ (sourceDataType, targetDataType, sourceValue) ->
convertTextToFloat((Binary) sourceValue);
+ CONVERTER[TSDataType.TEXT.ordinal()][TSDataType.DOUBLE.ordinal()] =
+ (sourceDataType, targetDataType, sourceValue) ->
convertTextToDouble((Binary) sourceValue);
+ CONVERTER[TSDataType.TEXT.ordinal()][TSDataType.TEXT.ordinal()] =
DO_NOTHING_CONVERTER;
+ CONVERTER[TSDataType.TEXT.ordinal()][TSDataType.TIMESTAMP.ordinal()] =
+ (sourceDataType, targetDataType, sourceValue) ->
+ convertTextToTimestamp((Binary) sourceValue);
+ CONVERTER[TSDataType.TEXT.ordinal()][TSDataType.DATE.ordinal()] =
+ (sourceDataType, targetDataType, sourceValue) ->
convertTextToDate((Binary) sourceValue);
+ CONVERTER[TSDataType.TEXT.ordinal()][TSDataType.BLOB.ordinal()] =
+ (sourceDataType, targetDataType, sourceValue) ->
convertTextToBlob((Binary) sourceValue);
+ CONVERTER[TSDataType.TEXT.ordinal()][TSDataType.STRING.ordinal()] =
+ (sourceDataType, targetDataType, sourceValue) ->
convertTextToString((Binary) sourceValue);
+
+ // TIMESTAMP
+ CONVERTER[TSDataType.TIMESTAMP.ordinal()][TSDataType.BOOLEAN.ordinal()] =
+ (sourceDataType, targetDataType, sourceValue) ->
+ convertTimestampToBoolean((long) sourceValue);
+ CONVERTER[TSDataType.TIMESTAMP.ordinal()][TSDataType.INT32.ordinal()] =
+ (sourceDataType, targetDataType, sourceValue) ->
+ convertTimestampToInt32((long) sourceValue);
+ CONVERTER[TSDataType.TIMESTAMP.ordinal()][TSDataType.INT64.ordinal()] =
+ (sourceDataType, targetDataType, sourceValue) ->
+ convertTimestampToInt64((long) sourceValue);
+ CONVERTER[TSDataType.TIMESTAMP.ordinal()][TSDataType.FLOAT.ordinal()] =
+ (sourceDataType, targetDataType, sourceValue) ->
+ convertTimestampToFloat((long) sourceValue);
+ CONVERTER[TSDataType.TIMESTAMP.ordinal()][TSDataType.DOUBLE.ordinal()] =
+ (sourceDataType, targetDataType, sourceValue) ->
+ convertTimestampToDouble((long) sourceValue);
+ CONVERTER[TSDataType.TIMESTAMP.ordinal()][TSDataType.TEXT.ordinal()] =
+ (sourceDataType, targetDataType, sourceValue) ->
convertTimestampToText((long) sourceValue);
+ CONVERTER[TSDataType.TIMESTAMP.ordinal()][TSDataType.TIMESTAMP.ordinal()] =
+ DO_NOTHING_CONVERTER;
+ CONVERTER[TSDataType.TIMESTAMP.ordinal()][TSDataType.DATE.ordinal()] =
+ (sourceDataType, targetDataType, sourceValue) ->
convertTimestampToDate((long) sourceValue);
+ CONVERTER[TSDataType.TIMESTAMP.ordinal()][TSDataType.BLOB.ordinal()] =
+ (sourceDataType, targetDataType, sourceValue) ->
convertTimestampToBlob((long) sourceValue);
+ CONVERTER[TSDataType.TIMESTAMP.ordinal()][TSDataType.STRING.ordinal()] =
+ (sourceDataType, targetDataType, sourceValue) ->
+ convertTimestampToString((long) sourceValue);
+
+ // DATE
+ CONVERTER[TSDataType.DATE.ordinal()][TSDataType.BOOLEAN.ordinal()] =
+ (sourceDataType, targetDataType, sourceValue) ->
convertDateToBoolean((int) sourceValue);
+ CONVERTER[TSDataType.DATE.ordinal()][TSDataType.INT32.ordinal()] =
+ (sourceDataType, targetDataType, sourceValue) ->
convertDateToInt32((int) sourceValue);
+ CONVERTER[TSDataType.DATE.ordinal()][TSDataType.INT64.ordinal()] =
+ (sourceDataType, targetDataType, sourceValue) ->
convertDateToInt64((int) sourceValue);
+ CONVERTER[TSDataType.DATE.ordinal()][TSDataType.FLOAT.ordinal()] =
+ (sourceDataType, targetDataType, sourceValue) ->
convertDateToFloat((int) sourceValue);
+ CONVERTER[TSDataType.DATE.ordinal()][TSDataType.DOUBLE.ordinal()] =
+ (sourceDataType, targetDataType, sourceValue) ->
convertDateToDouble((int) sourceValue);
+ CONVERTER[TSDataType.DATE.ordinal()][TSDataType.TEXT.ordinal()] =
+ (sourceDataType, targetDataType, sourceValue) ->
convertDateToText((int) sourceValue);
+ CONVERTER[TSDataType.DATE.ordinal()][TSDataType.TIMESTAMP.ordinal()] =
+ (sourceDataType, targetDataType, sourceValue) ->
convertDateToTimestamp((int) sourceValue);
+ CONVERTER[TSDataType.DATE.ordinal()][TSDataType.DATE.ordinal()] =
DO_NOTHING_CONVERTER;
+ CONVERTER[TSDataType.DATE.ordinal()][TSDataType.BLOB.ordinal()] =
+ (sourceDataType, targetDataType, sourceValue) ->
convertDateToBlob((int) sourceValue);
+ CONVERTER[TSDataType.DATE.ordinal()][TSDataType.STRING.ordinal()] =
+ (sourceDataType, targetDataType, sourceValue) ->
convertDateToString((int) sourceValue);
+
+ // BLOB
+ CONVERTER[TSDataType.BLOB.ordinal()][TSDataType.BOOLEAN.ordinal()] =
+ (sourceDataType, targetDataType, sourceValue) ->
convertBlobToBoolean((Binary) sourceValue);
+ CONVERTER[TSDataType.BLOB.ordinal()][TSDataType.INT32.ordinal()] =
+ (sourceDataType, targetDataType, sourceValue) ->
convertBlobToInt32((Binary) sourceValue);
+ CONVERTER[TSDataType.BLOB.ordinal()][TSDataType.INT64.ordinal()] =
+ (sourceDataType, targetDataType, sourceValue) ->
convertBlobToInt64((Binary) sourceValue);
+ CONVERTER[TSDataType.BLOB.ordinal()][TSDataType.FLOAT.ordinal()] =
+ (sourceDataType, targetDataType, sourceValue) ->
convertBlobToFloat((Binary) sourceValue);
+ CONVERTER[TSDataType.BLOB.ordinal()][TSDataType.DOUBLE.ordinal()] =
+ (sourceDataType, targetDataType, sourceValue) ->
convertBlobToDouble((Binary) sourceValue);
+ CONVERTER[TSDataType.BLOB.ordinal()][TSDataType.TEXT.ordinal()] =
+ (sourceDataType, targetDataType, sourceValue) ->
convertBlobToText((Binary) sourceValue);
+ CONVERTER[TSDataType.BLOB.ordinal()][TSDataType.TIMESTAMP.ordinal()] =
+ (sourceDataType, targetDataType, sourceValue) ->
+ convertBlobToTimestamp((Binary) sourceValue);
+ CONVERTER[TSDataType.BLOB.ordinal()][TSDataType.DATE.ordinal()] =
+ (sourceDataType, targetDataType, sourceValue) ->
convertBlobToDate((Binary) sourceValue);
+ CONVERTER[TSDataType.BLOB.ordinal()][TSDataType.BLOB.ordinal()] =
DO_NOTHING_CONVERTER;
+ CONVERTER[TSDataType.BLOB.ordinal()][TSDataType.STRING.ordinal()] =
+ (sourceDataType, targetDataType, sourceValue) ->
convertBlobToString((Binary) sourceValue);
+
+ // STRING
+ CONVERTER[TSDataType.STRING.ordinal()][TSDataType.BOOLEAN.ordinal()] =
+ (sourceDataType, targetDataType, sourceValue) ->
+ convertStringToBoolean((Binary) sourceValue);
+ CONVERTER[TSDataType.STRING.ordinal()][TSDataType.INT32.ordinal()] =
+ (sourceDataType, targetDataType, sourceValue) ->
convertStringToInt32((Binary) sourceValue);
+ CONVERTER[TSDataType.STRING.ordinal()][TSDataType.INT64.ordinal()] =
+ (sourceDataType, targetDataType, sourceValue) ->
convertStringToInt64((Binary) sourceValue);
+ CONVERTER[TSDataType.STRING.ordinal()][TSDataType.FLOAT.ordinal()] =
+ (sourceDataType, targetDataType, sourceValue) ->
convertStringToFloat((Binary) sourceValue);
+ CONVERTER[TSDataType.STRING.ordinal()][TSDataType.DOUBLE.ordinal()] =
+ (sourceDataType, targetDataType, sourceValue) ->
+ convertStringToDouble((Binary) sourceValue);
+ CONVERTER[TSDataType.STRING.ordinal()][TSDataType.TEXT.ordinal()] =
+ (sourceDataType, targetDataType, sourceValue) ->
convertStringToText((Binary) sourceValue);
+ CONVERTER[TSDataType.STRING.ordinal()][TSDataType.TIMESTAMP.ordinal()] =
+ (sourceDataType, targetDataType, sourceValue) ->
+ convertStringToTimestamp((Binary) sourceValue);
+ CONVERTER[TSDataType.STRING.ordinal()][TSDataType.DATE.ordinal()] =
+ (sourceDataType, targetDataType, sourceValue) ->
convertStringToDate((Binary) sourceValue);
+ CONVERTER[TSDataType.STRING.ordinal()][TSDataType.BLOB.ordinal()] =
+ (sourceDataType, targetDataType, sourceValue) ->
convertStringToBlob((Binary) sourceValue);
+ CONVERTER[TSDataType.STRING.ordinal()][TSDataType.STRING.ordinal()] =
DO_NOTHING_CONVERTER;
+ }
+
+ public static Object convert(
+ final TSDataType sourceDataType, final TSDataType targetDataType, final
Object sourceValue) {
+ return sourceValue == null
+ ? null
+ :
CONVERTER[sourceDataType.ordinal()][targetDataType.ordinal()].convert(
+ sourceDataType, targetDataType, sourceValue);
+ }
+
+ ////////////// BOOLEAN //////////////
+
+ private static final Binary BINARY_TRUE =
parseString(Boolean.TRUE.toString());
+ private static final Binary BINARY_FALSE =
parseString(Boolean.FALSE.toString());
+
+ public static int convertBooleanToInt32(final boolean value) {
+ return value ? 1 : 0;
+ }
+
+ public static long convertBooleanToInt64(final boolean value) {
+ return value ? 1L : 0L;
+ }
+
+ public static float convertBooleanToFloat(final boolean value) {
+ return value ? 1.0f : 0.0f;
+ }
+
+ public static double convertBooleanToDouble(final boolean value) {
+ return value ? 1.0 : 0.0;
+ }
+
+ public static Binary convertBooleanToText(final boolean value) {
+ return value ? BINARY_TRUE : BINARY_FALSE;
+ }
+
+ public static long convertBooleanToTimestamp(final boolean value) {
+ return value ? 1L : 0L;
+ }
+
+ public static int convertBooleanToDate(final boolean value) {
+ return value ? 1 : 0;
+ }
+
+ public static Binary convertBooleanToBlob(final boolean value) {
+ return value ? BINARY_TRUE : BINARY_FALSE;
+ }
+
+ public static Binary convertBooleanToString(final boolean value) {
+ return value ? BINARY_TRUE : BINARY_FALSE;
+ }
+
+ ///////////// INT32 //////////////
+
+ public static boolean convertInt32ToBoolean(final int value) {
+ return value != 0;
+ }
+
+ public static long convertInt32ToInt64(final int value) {
+ return value;
+ }
+
+ public static float convertInt32ToFloat(final int value) {
+ return value;
+ }
+
+ public static double convertInt32ToDouble(final int value) {
+ return value;
+ }
+
+ public static Binary convertInt32ToText(final int value) {
+ return parseText(Integer.toString(value));
+ }
+
+ public static long convertInt32ToTimestamp(final int value) {
+ return value;
+ }
+
+ public static int convertInt32ToDate(final int value) {
+ return value;
+ }
+
+ public static Binary convertInt32ToBlob(final int value) {
+ return parseBlob(Integer.toString(value));
+ }
+
+ public static Binary convertInt32ToString(final int value) {
+ return parseString(Integer.toString(value));
+ }
+
+ ///////////// INT64 //////////////
+
+ public static boolean convertInt64ToBoolean(final long value) {
+ return value != 0;
+ }
+
+ public static int convertInt64ToInt32(final long value) {
+ return (int) value;
+ }
+
+ public static float convertInt64ToFloat(final long value) {
+ return value;
+ }
+
+ public static double convertInt64ToDouble(final long value) {
+ return value;
+ }
+
+ public static Binary convertInt64ToText(final long value) {
+ return parseText(Long.toString(value));
+ }
+
+ public static long convertInt64ToTimestamp(final long value) {
+ return value;
+ }
+
+ public static int convertInt64ToDate(final long value) {
+ return (int) value;
+ }
+
+ public static Binary convertInt64ToBlob(final long value) {
+ return parseBlob(Long.toString(value));
+ }
+
+ public static Binary convertInt64ToString(final long value) {
+ return parseString(Long.toString(value));
+ }
+
+ ///////////// FLOAT //////////////
+
+ public static boolean convertFloatToBoolean(final float value) {
+ return value != 0;
+ }
+
+ public static int convertFloatToInt32(final float value) {
+ return (int) value;
+ }
+
+ public static long convertFloatToInt64(final float value) {
+ return (long) value;
+ }
+
+ public static double convertFloatToDouble(final float value) {
+ return value;
+ }
+
+ public static Binary convertFloatToText(final float value) {
+ return parseText(Float.toString(value));
+ }
+
+ public static long convertFloatToTimestamp(final float value) {
+ return (long) value;
+ }
+
+ public static int convertFloatToDate(final float value) {
+ return (int) value;
+ }
+
+ public static Binary convertFloatToBlob(final float value) {
+ return parseBlob(Float.toString(value));
+ }
+
+ public static Binary convertFloatToString(final float value) {
+ return parseString(Float.toString(value));
+ }
+
+ ///////////// DOUBLE //////////////
+
+ public static boolean convertDoubleToBoolean(final double value) {
+ return value != 0;
+ }
+
+ public static int convertDoubleToInt32(final double value) {
+ return (int) value;
+ }
+
+ public static long convertDoubleToInt64(final double value) {
+ return (long) value;
+ }
+
+ public static float convertDoubleToFloat(final double value) {
+ return (float) value;
+ }
+
+ public static Binary convertDoubleToText(final double value) {
+ return parseText(Double.toString(value));
+ }
+
+ public static long convertDoubleToTimestamp(final double value) {
+ return (long) value;
+ }
+
+ public static int convertDoubleToDate(final double value) {
+ return (int) value;
+ }
+
+ public static Binary convertDoubleToBlob(final double value) {
+ return parseBlob(Double.toString(value));
+ }
+
+ public static Binary convertDoubleToString(final double value) {
+ return parseString(Double.toString(value));
+ }
+
+ ///////////// TEXT //////////////
+
+ public static boolean convertTextToBoolean(final Binary value) {
+ return Boolean.parseBoolean(value.toString());
+ }
+
+ public static int convertTextToInt32(final Binary value) {
+ return parseInteger(value.toString());
+ }
+
+ public static long convertTextToInt64(final Binary value) {
+ return parseLong(value.toString());
+ }
+
+ public static float convertTextToFloat(final Binary value) {
+ return parseFloat(value.toString());
+ }
+
+ public static double convertTextToDouble(final Binary value) {
+ return parseDouble(value.toString());
+ }
+
+ public static long convertTextToTimestamp(final Binary value) {
+ return parseTimestamp(value.toString());
+ }
+
+ public static int convertTextToDate(final Binary value) {
+ return parseDate(value.toString());
+ }
+
+ public static Binary convertTextToBlob(final Binary value) {
+ return parseBlob(value.toString());
+ }
+
+ public static Binary convertTextToString(final Binary value) {
+ return value;
+ }
+
+ ///////////// TIMESTAMP //////////////
+
+ public static boolean convertTimestampToBoolean(final long value) {
+ return value != 0;
+ }
+
+ public static int convertTimestampToInt32(final long value) {
+ return (int) value;
+ }
+
+ public static long convertTimestampToInt64(final long value) {
+ return value;
+ }
+
+ public static float convertTimestampToFloat(final long value) {
+ return value;
+ }
+
+ public static double convertTimestampToDouble(final long value) {
+ return value;
+ }
+
+ public static Binary convertTimestampToText(final long value) {
+ return parseText(Long.toString(value));
+ }
+
+ public static int convertTimestampToDate(final long value) {
+ return (int) value;
+ }
+
+ public static Binary convertTimestampToBlob(final long value) {
+ return parseBlob(Long.toString(value));
+ }
+
+ public static Binary convertTimestampToString(final long value) {
+ return parseString(Long.toString(value));
+ }
+
+ ///////////// DATE //////////////
+
+ public static boolean convertDateToBoolean(final int value) {
+ return value != 0;
+ }
+
+ public static int convertDateToInt32(final int value) {
+ return value;
+ }
+
+ public static long convertDateToInt64(final int value) {
+ return value;
+ }
+
+ public static float convertDateToFloat(final int value) {
+ return value;
+ }
+
+ public static double convertDateToDouble(final int value) {
+ return value;
+ }
+
+ public static Binary convertDateToText(final int value) {
+ return parseText(Integer.toString(value));
+ }
+
+ public static long convertDateToTimestamp(final int value) {
+ return value;
+ }
+
+ public static Binary convertDateToBlob(final int value) {
+ return parseBlob(Integer.toString(value));
+ }
+
+ public static Binary convertDateToString(final int value) {
+ return parseString(Integer.toString(value));
+ }
+
+ ///////////// BLOB //////////////
+
+ public static boolean convertBlobToBoolean(final Binary value) {
+ return Boolean.parseBoolean(value.toString());
+ }
+
+ public static int convertBlobToInt32(final Binary value) {
+ return parseInteger(value.toString());
+ }
+
+ public static long convertBlobToInt64(final Binary value) {
+ return parseLong(value.toString());
+ }
+
+ public static float convertBlobToFloat(final Binary value) {
+ return parseFloat(value.toString());
+ }
+
+ public static double convertBlobToDouble(final Binary value) {
+ return parseDouble(value.toString());
+ }
+
+ public static long convertBlobToTimestamp(final Binary value) {
+ return parseTimestamp(value.toString());
+ }
+
+ public static int convertBlobToDate(final Binary value) {
+ return parseDate(value.toString());
+ }
+
+ public static Binary convertBlobToString(final Binary value) {
+ return value;
+ }
+
+ public static Binary convertBlobToText(final Binary value) {
+ return value;
+ }
+
+ ///////////// STRING //////////////
+
+ public static boolean convertStringToBoolean(final Binary value) {
+ return Boolean.parseBoolean(value.toString());
+ }
+
+ public static int convertStringToInt32(final Binary value) {
+ return parseInteger(value.toString());
+ }
+
+ public static long convertStringToInt64(final Binary value) {
+ return parseLong(value.toString());
+ }
+
+ public static float convertStringToFloat(final Binary value) {
+ return parseFloat(value.toString());
+ }
+
+ public static double convertStringToDouble(final Binary value) {
+ return parseDouble(value.toString());
+ }
+
+ public static long convertStringToTimestamp(final Binary value) {
+ return parseTimestamp(value.toString());
+ }
+
+ public static int convertStringToDate(final Binary value) {
+ return parseDate(value.toString());
+ }
+
+ public static Binary convertStringToBlob(final Binary value) {
+ return parseBlob(value.toString());
+ }
+
+ public static Binary convertStringToText(final Binary value) {
+ return value;
+ }
+
+ ///////////// UTILS //////////////
+
+ public static Object parse(final String value, final TSDataType dataType) {
+ if (value == null) {
+ return null;
+ }
+ switch (dataType) {
+ case BOOLEAN:
+ return Boolean.parseBoolean(value);
+ case INT32:
+ return parseInteger(value);
+ case INT64:
+ return parseLong(value);
+ case FLOAT:
+ return parseFloat(value);
+ case DOUBLE:
+ return parseDouble(value);
+ case TEXT:
+ return parseText(value);
+ case TIMESTAMP:
+ return parseTimestamp(value);
+ case DATE:
+ return parseDate(value);
+ case BLOB:
+ return parseBlob(value);
+ case STRING:
+ return parseString(value);
+ default:
+ throw new UnsupportedOperationException("Unsupported data type: " +
dataType);
+ }
+ }
+
+ private static Binary parseBlob(final String value) {
+ return new Binary(value, TSFileConfig.STRING_CHARSET);
+ }
+
+ private static int parseInteger(final String value) {
+ try {
+ return Integer.parseInt(value);
+ } catch (Exception e) {
+ return 0;
+ }
+ }
+
+ private static long parseLong(final String value) {
+ try {
+ return Long.parseLong(value);
+ } catch (Exception e) {
+ return 0L;
+ }
+ }
+
+ private static float parseFloat(final String value) {
+ try {
+ return Float.parseFloat(value);
+ } catch (Exception e) {
+ return 0.0f;
+ }
+ }
+
+ private static double parseDouble(final String value) {
+ try {
+ return Double.parseDouble(value);
+ } catch (Exception e) {
+ return 0.0d;
+ }
+ }
+
+ private static long parseTimestamp(final String value) {
+ if (value == null || value.isEmpty()) {
+ return 0L;
+ }
+ try {
+ return TypeInferenceUtils.isNumber(value)
+ ? Long.parseLong(value)
+ : DateTimeUtils.parseDateTimeExpressionToLong(
+ StringUtils.trim(value), ZoneId.systemDefault());
+ } catch (final Exception e) {
+ return 0L;
+ }
+ }
+
+ private static int parseDate(final String value) {
+ if (value == null || value.isEmpty()) {
+ return 0;
+ }
+ try {
+ return TypeInferenceUtils.isNumber(value)
+ ? Integer.parseInt(value)
+ : DateTimeUtils.parseDateExpressionToInt(StringUtils.trim(value));
+ } catch (final Exception e) {
+ return 0;
+ }
+ }
+
+ private static Binary parseString(final String value) {
+ return new Binary(value, TSFileConfig.STRING_CHARSET);
+ }
+
+ private static Binary parseText(final String value) {
+ return new Binary(value, TSFileConfig.STRING_CHARSET);
+ }
+
+ private ValueConverter() {
+ // forbidden to construct
+ }
+}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/transform/statement/PipeConvertedInsertRowStatement.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/transform/statement/PipeConvertedInsertRowStatement.java
new file mode 100644
index 00000000000..3df76b79ce4
--- /dev/null
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/transform/statement/PipeConvertedInsertRowStatement.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.receiver.transform.statement;
+
+import org.apache.iotdb.commons.conf.IoTDBConstant;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.exception.metadata.PathNotExistException;
+import org.apache.iotdb.db.exception.query.QueryProcessException;
+import org.apache.iotdb.db.pipe.receiver.transform.converter.ValueConverter;
+import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertRowStatement;
+
+import org.apache.tsfile.enums.TSDataType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.ZoneId;
+
+public class PipeConvertedInsertRowStatement extends InsertRowStatement {
+
+ private static final Logger LOGGER =
+ LoggerFactory.getLogger(PipeConvertedInsertRowStatement.class);
+
+ public PipeConvertedInsertRowStatement(final InsertRowStatement
insertRowStatement) {
+ super();
+ // Statement
+ isDebug = insertRowStatement.isDebug();
+ // InsertBaseStatement
+ devicePath = insertRowStatement.getDevicePath();
+ isAligned = insertRowStatement.isAligned();
+ measurementSchemas = insertRowStatement.getMeasurementSchemas();
+ measurements = insertRowStatement.getMeasurements();
+ dataTypes = insertRowStatement.getDataTypes();
+ // InsertRowStatement
+ time = insertRowStatement.getTime();
+ values = insertRowStatement.getValues();
+ isNeedInferType = insertRowStatement.isNeedInferType();
+ }
+
+ @Override
+ protected boolean checkAndCastDataType(int columnIndex, TSDataType dataType)
{
+ LOGGER.info(
+ "Pipe: Inserting row to {}.{}. Casting type from {} to {}.",
+ devicePath,
+ measurements[columnIndex],
+ dataTypes[columnIndex],
+ dataType);
+ values[columnIndex] =
+ ValueConverter.convert(dataTypes[columnIndex], dataType,
values[columnIndex]);
+ dataTypes[columnIndex] = dataType;
+ return true;
+ }
+
+ @Override
+ public void transferType(ZoneId zoneId) throws QueryProcessException {
+ for (int i = 0; i < measurementSchemas.length; i++) {
+ // null when time series doesn't exist
+ if (measurementSchemas[i] == null) {
+ if
(!IoTDBDescriptor.getInstance().getConfig().isEnablePartialInsert()) {
+ throw new QueryProcessException(
+ new PathNotExistException(
+ devicePath.getFullPath() + IoTDBConstant.PATH_SEPARATOR +
measurements[i]));
+ } else {
+ markFailedMeasurement(
+ i,
+ new QueryProcessException(
+ new PathNotExistException(
+ devicePath.getFullPath() + IoTDBConstant.PATH_SEPARATOR
+ measurements[i])));
+ }
+ continue;
+ }
+
+ // parse string value to specific type
+ dataTypes[i] = measurementSchemas[i].getType();
+ try {
+ values[i] = ValueConverter.parse(values[i].toString(), dataTypes[i]);
+ } catch (Exception e) {
+ LOGGER.warn(
+ "data type of {}.{} is not consistent, "
+ + "registered type {}, inserting timestamp {}, value {}",
+ devicePath,
+ measurements[i],
+ dataTypes[i],
+ time,
+ values[i]);
+ if
(!IoTDBDescriptor.getInstance().getConfig().isEnablePartialInsert()) {
+ throw e;
+ } else {
+ markFailedMeasurement(i, e);
+ }
+ }
+ }
+
+ isNeedInferType = false;
+ }
+}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/transform/statement/PipeConvertedInsertTabletStatement.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/transform/statement/PipeConvertedInsertTabletStatement.java
new file mode 100644
index 00000000000..2481034cbae
--- /dev/null
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/transform/statement/PipeConvertedInsertTabletStatement.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.receiver.transform.statement;
+
+import org.apache.iotdb.db.pipe.receiver.transform.converter.ArrayConverter;
+import
org.apache.iotdb.db.queryengine.plan.statement.crud.InsertTabletStatement;
+
+import org.apache.tsfile.enums.TSDataType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class PipeConvertedInsertTabletStatement extends InsertTabletStatement {
+
+ private static final Logger LOGGER =
+ LoggerFactory.getLogger(PipeConvertedInsertTabletStatement.class);
+
+ public PipeConvertedInsertTabletStatement(final InsertTabletStatement
insertTabletStatement) {
+ super();
+ // Statement
+ isDebug = insertTabletStatement.isDebug();
+ // InsertBaseStatement
+ devicePath = insertTabletStatement.getDevicePath();
+ isAligned = insertTabletStatement.isAligned();
+ measurementSchemas = insertTabletStatement.getMeasurementSchemas();
+ measurements = insertTabletStatement.getMeasurements();
+ dataTypes = insertTabletStatement.getDataTypes();
+ // InsertTabletStatement
+ times = insertTabletStatement.getTimes();
+ bitMaps = insertTabletStatement.getBitMaps();
+ columns = insertTabletStatement.getColumns();
+ rowCount = insertTabletStatement.getRowCount();
+ }
+
+ @Override
+ protected boolean checkAndCastDataType(int columnIndex, TSDataType dataType)
{
+ LOGGER.info(
+ "Pipe: Inserting tablet to {}.{}. Casting type from {} to {}.",
+ devicePath,
+ measurements[columnIndex],
+ dataTypes[columnIndex],
+ dataType);
+ columns[columnIndex] =
+ ArrayConverter.convert(dataTypes[columnIndex], dataType,
columns[columnIndex]);
+ dataTypes[columnIndex] = dataType;
+ return true;
+ }
+}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeStatementDataTypeConvertExecutionVisitor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeStatementDataTypeConvertExecutionVisitor.java
new file mode 100644
index 00000000000..69f456552f7
--- /dev/null
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeStatementDataTypeConvertExecutionVisitor.java
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.receiver.visitor;
+
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.db.exception.metadata.DataTypeMismatchException;
+import
org.apache.iotdb.db.pipe.receiver.transform.statement.PipeConvertedInsertRowStatement;
+import
org.apache.iotdb.db.pipe.receiver.transform.statement.PipeConvertedInsertTabletStatement;
+import org.apache.iotdb.db.queryengine.plan.statement.Statement;
+import org.apache.iotdb.db.queryengine.plan.statement.StatementNode;
+import org.apache.iotdb.db.queryengine.plan.statement.StatementVisitor;
+import
org.apache.iotdb.db.queryengine.plan.statement.crud.InsertMultiTabletsStatement;
+import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertRowStatement;
+import
org.apache.iotdb.db.queryengine.plan.statement.crud.InsertRowsOfOneDeviceStatement;
+import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertRowsStatement;
+import
org.apache.iotdb.db.queryengine.plan.statement.crud.InsertTabletStatement;
+import org.apache.iotdb.db.queryengine.plan.statement.crud.LoadTsFileStatement;
+import org.apache.iotdb.rpc.TSStatusCode;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Optional;
+import java.util.stream.Collectors;
+
+/**
+ * This visitor transforms the data type of the statement when the statement
is executed and an
+ * exception occurs. The transformed statement (if any) is returned and will
be executed again.
+ */
+public class PipeStatementDataTypeConvertExecutionVisitor
+ extends StatementVisitor<Optional<TSStatus>, TSStatus> {
+
+ private static final Logger LOGGER =
+
LoggerFactory.getLogger(PipeStatementDataTypeConvertExecutionVisitor.class);
+
+ @FunctionalInterface
+ public interface StatementExecutor {
+ TSStatus execute(final Statement statement);
+ }
+
+ private final StatementExecutor statementExecutor;
+
+ public PipeStatementDataTypeConvertExecutionVisitor(final StatementExecutor
statementExecutor) {
+ this.statementExecutor = statementExecutor;
+ }
+
+ private Optional<TSStatus> tryExecute(final Statement statement) {
+ try {
+ return Optional.of(statementExecutor.execute(statement));
+ } catch (final Exception e) {
+ LOGGER.warn("Failed to execute statement after data type conversion.",
e);
+ return Optional.empty();
+ }
+ }
+
+ @Override
+ public Optional<TSStatus> visitNode(final StatementNode statementNode, final
TSStatus status) {
+ return Optional.empty();
+ }
+
+ @Override
+ public Optional<TSStatus> visitLoadFile(
+ final LoadTsFileStatement loadTsFileStatement, final TSStatus status) {
+ // TODO: judge if the exception is caused by data type mismatch
+ // TODO: convert the data type of the statement
+ return visitStatement(loadTsFileStatement, status);
+ }
+
+ @Override
+ public Optional<TSStatus> visitInsertRow(
+ final InsertRowStatement insertRowStatement, final TSStatus status) {
+ return status.getCode() == TSStatusCode.METADATA_ERROR.getStatusCode()
+ && status.getMessage() != null
+ &&
status.getMessage().contains(DataTypeMismatchException.REGISTERED_TYPE_STRING)
+ ? tryExecute(new PipeConvertedInsertRowStatement(insertRowStatement))
+ : Optional.empty();
+ }
+
+ @Override
+ public Optional<TSStatus> visitInsertRows(
+ final InsertRowsStatement insertRowsStatement, final TSStatus status) {
+ if (!((status.getCode() == TSStatusCode.METADATA_ERROR.getStatusCode()
+ || status.getCode() == TSStatusCode.MULTIPLE_ERROR.getStatusCode())
+ &&
status.toString().contains(DataTypeMismatchException.REGISTERED_TYPE_STRING))) {
+ return Optional.empty();
+ }
+
+ if (insertRowsStatement.getInsertRowStatementList() == null
+ || insertRowsStatement.getInsertRowStatementList().isEmpty()) {
+ return Optional.empty();
+ }
+
+ final InsertRowsStatement convertedInsertRowsStatement = new
InsertRowsStatement();
+ convertedInsertRowsStatement.setInsertRowStatementList(
+ insertRowsStatement.getInsertRowStatementList().stream()
+ .map(PipeConvertedInsertRowStatement::new)
+ .collect(Collectors.toList()));
+ return tryExecute(convertedInsertRowsStatement);
+ }
+
+ @Override
+ public Optional<TSStatus> visitInsertRowsOfOneDevice(
+ final InsertRowsOfOneDeviceStatement insertRowsOfOneDeviceStatement,
final TSStatus status) {
+ if (!((status.getCode() == TSStatusCode.METADATA_ERROR.getStatusCode()
+ || status.getCode() == TSStatusCode.MULTIPLE_ERROR.getStatusCode())
+ &&
status.toString().contains(DataTypeMismatchException.REGISTERED_TYPE_STRING))) {
+ return Optional.empty();
+ }
+
+ if (insertRowsOfOneDeviceStatement.getInsertRowStatementList() == null
+ ||
insertRowsOfOneDeviceStatement.getInsertRowStatementList().isEmpty()) {
+ return Optional.empty();
+ }
+
+ final InsertRowsOfOneDeviceStatement
convertedInsertRowsOfOneDeviceStatement =
+ new InsertRowsOfOneDeviceStatement();
+ convertedInsertRowsOfOneDeviceStatement.setInsertRowStatementList(
+ insertRowsOfOneDeviceStatement.getInsertRowStatementList().stream()
+ .map(PipeConvertedInsertRowStatement::new)
+ .collect(Collectors.toList()));
+ return tryExecute(convertedInsertRowsOfOneDeviceStatement);
+ }
+
+ @Override
+ public Optional<TSStatus> visitInsertTablet(
+ final InsertTabletStatement insertTabletStatement, final TSStatus
status) {
+ return status.getCode() == TSStatusCode.METADATA_ERROR.getStatusCode()
+ && status.getMessage() != null
+ &&
status.getMessage().contains(DataTypeMismatchException.REGISTERED_TYPE_STRING)
+ ? tryExecute(new
PipeConvertedInsertTabletStatement(insertTabletStatement))
+ : Optional.empty();
+ }
+
+ @Override
+ public Optional<TSStatus> visitInsertMultiTablets(
+ final InsertMultiTabletsStatement insertMultiTabletsStatement, final
TSStatus status) {
+ if (!((status.getCode() == TSStatusCode.METADATA_ERROR.getStatusCode()
+ || status.getCode() == TSStatusCode.MULTIPLE_ERROR.getStatusCode())
+ &&
status.toString().contains(DataTypeMismatchException.REGISTERED_TYPE_STRING))) {
+ return Optional.empty();
+ }
+
+ if (insertMultiTabletsStatement.getInsertTabletStatementList() == null
+ ||
insertMultiTabletsStatement.getInsertTabletStatementList().isEmpty()) {
+ return Optional.empty();
+ }
+
+ final InsertMultiTabletsStatement convertedInsertMultiTabletsStatement =
+ new InsertMultiTabletsStatement();
+ convertedInsertMultiTabletsStatement.setInsertTabletStatementList(
+ insertMultiTabletsStatement.getInsertTabletStatementList().stream()
+ .map(PipeConvertedInsertTabletStatement::new)
+ .collect(Collectors.toList()));
+ return tryExecute(convertedInsertMultiTabletsStatement);
+ }
+}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertBaseStatement.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertBaseStatement.java
index bc31e71ec80..88e5b7735cf 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertBaseStatement.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertBaseStatement.java
@@ -71,7 +71,7 @@ public abstract class InsertBaseStatement extends Statement {
// region params used by analyzing logical views.
/** This param records the logical view schema appeared in this statement. */
- List<LogicalViewSchema> logicalViewSchemaList;
+ protected List<LogicalViewSchema> logicalViewSchemaList;
/**
* This param records the index of the location where the source of this
view should be placed.
@@ -79,13 +79,13 @@ public abstract class InsertBaseStatement extends Statement
{
* <p>For example, indexListOfLogicalViewPaths[alpha] = beta means source of
* logicalViewSchemaList[alpha] should be filled into
measurementSchemas[beta].
*/
- List<Integer> indexOfSourcePathsOfLogicalViews;
+ protected List<Integer> indexOfSourcePathsOfLogicalViews;
/** it is the end of last range, the beginning of current range. */
- int recordedBeginOfLogicalViewSchemaList = 0;
+ protected int recordedBeginOfLogicalViewSchemaList = 0;
/** it is the end of current range. */
- int recordedEndOfLogicalViewSchemaList = 0;
+ protected int recordedEndOfLogicalViewSchemaList = 0;
// endregion
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertRowStatement.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertRowStatement.java
index a5b864e30f4..bb67086456a 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertRowStatement.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertRowStatement.java
@@ -59,18 +59,18 @@ public class InsertRowStatement extends InsertBaseStatement
implements ISchemaVa
private static final Logger LOGGER =
LoggerFactory.getLogger(InsertRowStatement.class);
- private static final byte TYPE_RAW_STRING = -1;
- private static final byte TYPE_NULL = -2;
+ protected static final byte TYPE_RAW_STRING = -1;
+ protected static final byte TYPE_NULL = -2;
- private long time;
- private Object[] values;
- private boolean isNeedInferType = false;
+ protected long time;
+ protected Object[] values;
+ protected boolean isNeedInferType = false;
/**
* This param record whether the source of logical view is aligned. Only
used when there are
* views.
*/
- private boolean[] measurementIsAligned;
+ protected boolean[] measurementIsAligned;
public InsertRowStatement() {
super();
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertTabletStatement.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertTabletStatement.java
index b38997ef9c0..329fe77203b 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertTabletStatement.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertTabletStatement.java
@@ -56,17 +56,17 @@ public class InsertTabletStatement extends
InsertBaseStatement implements ISchem
private static final String DATATYPE_UNSUPPORTED = "Data type %s is not
supported.";
- private long[] times; // times should be sorted. It is done in the session
API.
- private BitMap[] bitMaps;
- private Object[] columns;
+ protected long[] times; // times should be sorted. It is done in the session
API.
+ protected BitMap[] bitMaps;
+ protected Object[] columns;
- private int rowCount = 0;
+ protected int rowCount = 0;
/**
* This param record whether the source of logical view is aligned. Only
used when there are
* views.
*/
- private boolean[] measurementIsAligned;
+ protected boolean[] measurementIsAligned;
public InsertTabletStatement() {
super();
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/TypeInferenceUtils.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/TypeInferenceUtils.java
index e2772cd08e4..b7ce69136fe 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/TypeInferenceUtils.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/TypeInferenceUtils.java
@@ -49,7 +49,7 @@ public class TypeInferenceUtils {
return s.length() >= 3 && s.startsWith("X'") && s.endsWith("'");
}
- static boolean isNumber(String s) {
+ public static boolean isNumber(String s) {
if (s == null || s.equals("NaN")) {
return false;
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeConnectorConstant.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeConnectorConstant.java
index 62af58eece4..87ec0aaaeae 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeConnectorConstant.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeConnectorConstant.java
@@ -81,6 +81,13 @@ public class PipeConnectorConstant {
public static final String SINK_IOTDB_PASSWORD_KEY = "sink.password";
public static final String CONNECTOR_IOTDB_PASSWORD_DEFAULT_VALUE = "root";
+ public static final String
CONNECTOR_EXCEPTION_DATA_CONVERT_ON_TYPE_MISMATCH_KEY =
+ "connector.exception.data.convert-on-type-mismatch";
+ public static final String SINK_EXCEPTION_DATA_CONVERT_ON_TYPE_MISMATCH_KEY =
+ "sink.exception.data.convert-on-type-mismatch";
+ public static final boolean
CONNECTOR_EXCEPTION_DATA_CONVERT_ON_TYPE_MISMATCH_DEFAULT_VALUE =
+ true;
+
public static final String CONNECTOR_EXCEPTION_CONFLICT_RESOLVE_STRATEGY_KEY
=
"connector.exception.conflict.resolve-strategy";
public static final String SINK_EXCEPTION_CONFLICT_RESOLVE_STRATEGY_KEY =
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/client/IoTDBSyncClientManager.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/client/IoTDBSyncClientManager.java
index 667313e1976..57b9a4fede4 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/client/IoTDBSyncClientManager.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/client/IoTDBSyncClientManager.java
@@ -60,13 +60,16 @@ public abstract class IoTDBSyncClientManager extends
IoTDBClientManager implemen
private final LoadBalancer loadBalancer;
+ private final boolean shouldReceiverConvertOnTypeMismatch;
+
protected IoTDBSyncClientManager(
List<TEndPoint> endPoints,
boolean useSSL,
String trustStorePath,
String trustStorePwd,
boolean useLeaderCache,
- String loadBalanceStrategy) {
+ String loadBalanceStrategy,
+ boolean shouldReceiverConvertOnTypeMismatch) {
super(endPoints, useLeaderCache);
this.useSSL = useSSL;
@@ -93,6 +96,8 @@ public abstract class IoTDBSyncClientManager extends
IoTDBClientManager implemen
loadBalanceStrategy);
loadBalancer = new RoundRobinLoadBalancer();
}
+
+ this.shouldReceiverConvertOnTypeMismatch =
shouldReceiverConvertOnTypeMismatch;
}
public void checkClientStatusAndTryReconstructIfNecessary() {
@@ -169,6 +174,9 @@ public abstract class IoTDBSyncClientManager extends
IoTDBClientManager implemen
PipeTransferHandshakeConstant.HANDSHAKE_KEY_TIME_PRECISION,
CommonDescriptor.getInstance().getConfig().getTimestampPrecision());
params.put(PipeTransferHandshakeConstant.HANDSHAKE_KEY_CLUSTER_ID,
getClusterId());
+ params.put(
+ PipeTransferHandshakeConstant.HANDSHAKE_KEY_CONVERT_ON_TYPE_MISMATCH,
+ Boolean.toString(shouldReceiverConvertOnTypeMismatch));
// Try to handshake by PipeTransferHandshakeV2Req.
TPipeTransferResp resp =
client.pipeTransfer(buildHandshakeV2Req(params));
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/payload/thrift/common/PipeTransferHandshakeConstant.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/payload/thrift/common/PipeTransferHandshakeConstant.java
index 07c7bb46390..b6864706aaa 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/payload/thrift/common/PipeTransferHandshakeConstant.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/payload/thrift/common/PipeTransferHandshakeConstant.java
@@ -23,6 +23,7 @@ public class PipeTransferHandshakeConstant {
public static final String HANDSHAKE_KEY_TIME_PRECISION =
"timestampPrecision";
public static final String HANDSHAKE_KEY_CLUSTER_ID = "clusterID";
+ public static final String HANDSHAKE_KEY_CONVERT_ON_TYPE_MISMATCH =
"convertOnTypeMismatch";
private PipeTransferHandshakeConstant() {
// Utility class
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBConnector.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBConnector.java
index 6ac6f9432df..9dc08f93402 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBConnector.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBConnector.java
@@ -62,6 +62,8 @@ import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstan
import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_EXCEPTION_CONFLICT_RESOLVE_STRATEGY_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_EXCEPTION_CONFLICT_RETRY_MAX_TIME_SECONDS_DEFAULT_VALUE;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_EXCEPTION_CONFLICT_RETRY_MAX_TIME_SECONDS_KEY;
+import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_EXCEPTION_DATA_CONVERT_ON_TYPE_MISMATCH_DEFAULT_VALUE;
+import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_EXCEPTION_DATA_CONVERT_ON_TYPE_MISMATCH_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_EXCEPTION_OTHERS_RECORD_IGNORED_DATA_DEFAULT_VALUE;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_EXCEPTION_OTHERS_RECORD_IGNORED_DATA_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_EXCEPTION_OTHERS_RETRY_MAX_TIME_SECONDS_DEFAULT_VALUE;
@@ -88,6 +90,7 @@ import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstan
import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_EXCEPTION_CONFLICT_RECORD_IGNORED_DATA_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_EXCEPTION_CONFLICT_RESOLVE_STRATEGY_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_EXCEPTION_CONFLICT_RETRY_MAX_TIME_SECONDS_KEY;
+import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_EXCEPTION_DATA_CONVERT_ON_TYPE_MISMATCH_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_EXCEPTION_OTHERS_RECORD_IGNORED_DATA_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_EXCEPTION_OTHERS_RETRY_MAX_TIME_SECONDS_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_FORMAT_KEY;
@@ -124,6 +127,8 @@ public abstract class IoTDBConnector implements
PipeConnector {
protected boolean isTabletBatchModeEnabled = true;
protected PipeReceiverStatusHandler receiverStatusHandler;
+ protected boolean shouldReceiverConvertOnTypeMismatch =
+ CONNECTOR_EXCEPTION_DATA_CONVERT_ON_TYPE_MISMATCH_DEFAULT_VALUE;
@Override
public void validate(final PipeParameterValidator validator) throws
Exception {
@@ -312,6 +317,16 @@ public abstract class IoTDBConnector implements
PipeConnector {
CONNECTOR_EXCEPTION_OTHERS_RECORD_IGNORED_DATA_KEY,
SINK_EXCEPTION_OTHERS_RECORD_IGNORED_DATA_KEY),
CONNECTOR_EXCEPTION_OTHERS_RECORD_IGNORED_DATA_DEFAULT_VALUE));
+ shouldReceiverConvertOnTypeMismatch =
+ parameters.getBooleanOrDefault(
+ Arrays.asList(
+ CONNECTOR_EXCEPTION_DATA_CONVERT_ON_TYPE_MISMATCH_KEY,
+ SINK_EXCEPTION_DATA_CONVERT_ON_TYPE_MISMATCH_KEY),
+ CONNECTOR_EXCEPTION_DATA_CONVERT_ON_TYPE_MISMATCH_DEFAULT_VALUE);
+ LOGGER.info(
+ "IoTDBConnector {} = {}",
+ CONNECTOR_EXCEPTION_DATA_CONVERT_ON_TYPE_MISMATCH_KEY,
+ shouldReceiverConvertOnTypeMismatch);
}
protected LinkedHashSet<TEndPoint> parseNodeUrls(final PipeParameters
parameters)
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBSslSyncConnector.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBSslSyncConnector.java
index be93018bf9c..271596cf549 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBSslSyncConnector.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBSslSyncConnector.java
@@ -116,7 +116,13 @@ public abstract class IoTDBSslSyncConnector extends
IoTDBConnector {
clientManager =
constructClient(
- nodeUrls, useSSL, trustStorePath, trustStorePwd, useLeaderCache,
loadBalanceStrategy);
+ nodeUrls,
+ useSSL,
+ trustStorePath,
+ trustStorePwd,
+ useLeaderCache,
+ loadBalanceStrategy,
+ shouldReceiverConvertOnTypeMismatch);
}
protected abstract IoTDBSyncClientManager constructClient(
@@ -125,7 +131,8 @@ public abstract class IoTDBSslSyncConnector extends
IoTDBConnector {
String trustStorePath,
String trustStorePwd,
boolean useLeaderCache,
- String loadBalanceStrategy);
+ String loadBalanceStrategy,
+ boolean shouldReceiverConvertOnTypeMismatch);
@Override
public void handshake() throws Exception {
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/IoTDBFileReceiver.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/IoTDBFileReceiver.java
index b60dd1d5e1c..7197e112526 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/IoTDBFileReceiver.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/IoTDBFileReceiver.java
@@ -50,6 +50,8 @@ import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
+import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_EXCEPTION_DATA_CONVERT_ON_TYPE_MISMATCH_DEFAULT_VALUE;
+
/**
* {@link IoTDBFileReceiver} is the parent class of receiver on both
configNode and DataNode,
* handling all the logic of parallel file receiving.
@@ -66,6 +68,9 @@ public abstract class IoTDBFileReceiver implements
IoTDBReceiver {
private File writingFile;
private RandomAccessFile writingFileWriter;
+ protected boolean shouldConvertDataTypeOnTypeMismatch =
+ CONNECTOR_EXCEPTION_DATA_CONVERT_ON_TYPE_MISMATCH_DEFAULT_VALUE;
+
@Override
public IoTDBConnectorRequestVersion getVersion() {
return IoTDBConnectorRequestVersion.VERSION_1;
@@ -216,6 +221,13 @@ public abstract class IoTDBFileReceiver implements
IoTDBReceiver {
return new TPipeTransferResp(status);
}
+ final String shouldConvertDataTypeOnTypeMismatchString =
+
req.getParams().get(PipeTransferHandshakeConstant.HANDSHAKE_KEY_CONVERT_ON_TYPE_MISMATCH);
+ if (shouldConvertDataTypeOnTypeMismatchString != null) {
+ shouldConvertDataTypeOnTypeMismatch =
+ Boolean.parseBoolean(shouldConvertDataTypeOnTypeMismatchString);
+ }
+
// Handle the handshake request as a v1 request.
// Here we construct a fake "dataNode" request to valid from v1 validation
logic, though
// it may not require the actual type of the v1 request.