This is an automated email from the ASF dual-hosted git repository. jackietien pushed a commit to branch TableModelV1 in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit cc6e54f8b2133e26a43ef256fa5d08dcce04103d Author: Steve Yurong Su <[email protected]> AuthorDate: Fri Aug 9 11:17:00 2024 +0800 Pipe: Support converting data type on data sync receiver metadata mismatch (#13110) (cherry picked from commit 5b31438b0b7fb77492f69bdc5948e0788c5191cd) --- .../client/IoTDBConfigNodeSyncClientManager.java | 12 +- .../protocol/IoTDBConfigRegionAirGapConnector.java | 3 + .../protocol/IoTDBConfigRegionConnector.java | 10 +- .../client/IoTDBDataNodeAsyncClientManager.java | 12 +- .../client/IoTDBDataNodeSyncClientManager.java | 12 +- .../airgap/IoTDBDataNodeAirGapConnector.java | 3 + .../async/IoTDBDataRegionAsyncConnector.java | 3 +- .../thrift/sync/IoTDBDataNodeSyncConnector.java | 11 +- .../protocol/thrift/IoTDBDataNodeReceiver.java | 40 +- .../transform/converter/ArrayConverter.java | 940 +++++++++++++++++++++ .../transform/converter/ValueConverter.java | 787 +++++++++++++++++ .../statement/PipeConvertedInsertRowStatement.java | 112 +++ .../PipeConvertedInsertTabletStatement.java | 64 ++ ...peStatementDataTypeConvertExecutionVisitor.java | 173 ++++ .../plan/statement/crud/InsertBaseStatement.java | 8 +- .../plan/statement/crud/InsertRowStatement.java | 12 +- .../plan/statement/crud/InsertTabletStatement.java | 10 +- .../apache/iotdb/db/utils/TypeInferenceUtils.java | 2 +- .../config/constant/PipeConnectorConstant.java | 7 + .../connector/client/IoTDBSyncClientManager.java | 10 +- .../common/PipeTransferHandshakeConstant.java | 1 + .../pipe/connector/protocol/IoTDBConnector.java | 15 + .../connector/protocol/IoTDBSslSyncConnector.java | 11 +- .../commons/pipe/receiver/IoTDBFileReceiver.java | 12 + 24 files changed, 2225 insertions(+), 45 deletions(-) diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/client/IoTDBConfigNodeSyncClientManager.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/client/IoTDBConfigNodeSyncClientManager.java index e420a6b5c6d..fd5b3c9ddb9 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/client/IoTDBConfigNodeSyncClientManager.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/client/IoTDBConfigNodeSyncClientManager.java @@ -38,8 +38,16 @@ public class IoTDBConfigNodeSyncClientManager extends IoTDBSyncClientManager { boolean useSSL, String trustStorePath, String trustStorePwd, - String loadBalanceStrategy) { - super(endPoints, useSSL, trustStorePath, trustStorePwd, false, loadBalanceStrategy); + String loadBalanceStrategy, + boolean shouldReceiverConvertOnTypeMismatch) { + super( + endPoints, + useSSL, + trustStorePath, + trustStorePwd, + false, + loadBalanceStrategy, + shouldReceiverConvertOnTypeMismatch); } @Override diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/protocol/IoTDBConfigRegionAirGapConnector.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/protocol/IoTDBConfigRegionAirGapConnector.java index 1c5febcb4bf..ba151351c39 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/protocol/IoTDBConfigRegionAirGapConnector.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/protocol/IoTDBConfigRegionAirGapConnector.java @@ -68,6 +68,9 @@ public class IoTDBConfigRegionAirGapConnector extends IoTDBAirGapConnector { params.put( PipeTransferHandshakeConstant.HANDSHAKE_KEY_TIME_PRECISION, CommonDescriptor.getInstance().getConfig().getTimestampPrecision()); + params.put( + PipeTransferHandshakeConstant.HANDSHAKE_KEY_CONVERT_ON_TYPE_MISMATCH, + Boolean.toString(shouldReceiverConvertOnTypeMismatch)); return PipeTransferConfigNodeHandshakeV2Req.toTPipeTransferBytes(params); } diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/protocol/IoTDBConfigRegionConnector.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/protocol/IoTDBConfigRegionConnector.java index 9ba081018b9..96059980c38 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/protocol/IoTDBConfigRegionConnector.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/protocol/IoTDBConfigRegionConnector.java @@ -62,9 +62,15 @@ public class IoTDBConfigRegionConnector extends IoTDBSslSyncConnector { final String trustStorePath, final String trustStorePwd, final boolean useLeaderCache, - final String loadBalanceStrategy) { + final String loadBalanceStrategy, + final boolean shouldReceiverConvertOnTypeMismatch) { return new IoTDBConfigNodeSyncClientManager( - nodeUrls, useSSL, trustStorePath, trustStorePwd, loadBalanceStrategy); + nodeUrls, + useSSL, + trustStorePath, + trustStorePwd, + loadBalanceStrategy, + shouldReceiverConvertOnTypeMismatch); } @Override diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeAsyncClientManager.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeAsyncClientManager.java index eff84ab5a5a..6814f8151b9 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeAsyncClientManager.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeAsyncClientManager.java @@ -67,8 +67,13 @@ public class IoTDBDataNodeAsyncClientManager extends IoTDBClientManager private final LoadBalancer loadBalancer; + private final boolean shouldReceiverConvertOnTypeMismatch; + public IoTDBDataNodeAsyncClientManager( - List<TEndPoint> endPoints, boolean useLeaderCache, String loadBalanceStrategy) { + List<TEndPoint> endPoints, + boolean useLeaderCache, + String loadBalanceStrategy, + boolean shouldReceiverConvertOnTypeMismatch) { super(endPoints, useLeaderCache); endPointSet = new HashSet<>(endPoints); @@ -101,6 +106,8 @@ public class IoTDBDataNodeAsyncClientManager extends IoTDBClientManager loadBalanceStrategy); loadBalancer = new RoundRobinLoadBalancer(); } + + this.shouldReceiverConvertOnTypeMismatch = shouldReceiverConvertOnTypeMismatch; } public AsyncPipeDataTransferServiceClient borrowClient() throws Exception { @@ -209,6 +216,9 @@ public class IoTDBDataNodeAsyncClientManager extends IoTDBClientManager params.put( PipeTransferHandshakeConstant.HANDSHAKE_KEY_TIME_PRECISION, CommonDescriptor.getInstance().getConfig().getTimestampPrecision()); + params.put( + PipeTransferHandshakeConstant.HANDSHAKE_KEY_CONVERT_ON_TYPE_MISMATCH, + Boolean.toString(shouldReceiverConvertOnTypeMismatch)); client.setTimeoutDynamically(PipeConfig.getInstance().getPipeConnectorHandshakeTimeoutMs()); client.pipeTransfer(PipeTransferDataNodeHandshakeV2Req.toTPipeTransferReq(params), callback); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeSyncClientManager.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeSyncClientManager.java index 2c23cba4542..df9ae2a0e29 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeSyncClientManager.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeSyncClientManager.java @@ -48,8 +48,16 @@ public class IoTDBDataNodeSyncClientManager extends IoTDBSyncClientManager String trustStorePath, String trustStorePwd, boolean useLeaderCache, - String loadBalanceStrategy) { - super(endPoints, useSSL, trustStorePath, trustStorePwd, useLeaderCache, loadBalanceStrategy); + String loadBalanceStrategy, + boolean shouldReceiverConvertOnTypeMismatch) { + super( + endPoints, + useSSL, + trustStorePath, + trustStorePwd, + useLeaderCache, + loadBalanceStrategy, + shouldReceiverConvertOnTypeMismatch); } @Override diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBDataNodeAirGapConnector.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBDataNodeAirGapConnector.java index b66c0012700..f0491431196 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBDataNodeAirGapConnector.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBDataNodeAirGapConnector.java @@ -100,6 +100,9 @@ public abstract class IoTDBDataNodeAirGapConnector extends IoTDBAirGapConnector params.put( PipeTransferHandshakeConstant.HANDSHAKE_KEY_TIME_PRECISION, CommonDescriptor.getInstance().getConfig().getTimestampPrecision()); + params.put( + PipeTransferHandshakeConstant.HANDSHAKE_KEY_CONVERT_ON_TYPE_MISMATCH, + Boolean.toString(shouldReceiverConvertOnTypeMismatch)); return PipeTransferDataNodeHandshakeV2Req.toTPipeTransferBytes(params); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java index 255e5adec76..43c5fa6eff2 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java @@ -124,7 +124,8 @@ public class IoTDBDataRegionAsyncConnector extends IoTDBConnector { parameters.getBooleanOrDefault( Arrays.asList(SINK_LEADER_CACHE_ENABLE_KEY, CONNECTOR_LEADER_CACHE_ENABLE_KEY), CONNECTOR_LEADER_CACHE_ENABLE_DEFAULT_VALUE), - loadBalanceStrategy); + loadBalanceStrategy, + shouldReceiverConvertOnTypeMismatch); if (isTabletBatchModeEnabled) { tabletBatchBuilder = new PipeTransferBatchReqBuilder(parameters); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBDataNodeSyncConnector.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBDataNodeSyncConnector.java index 9ad83ee84e2..32c886fbe56 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBDataNodeSyncConnector.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBDataNodeSyncConnector.java @@ -86,10 +86,17 @@ public abstract class IoTDBDataNodeSyncConnector extends IoTDBSslSyncConnector { final String trustStorePath, final String trustStorePwd, final boolean useLeaderCache, - final String loadBalanceStrategy) { + final String loadBalanceStrategy, + final boolean shouldReceiverConvertOnTypeMismatch) { clientManager = new IoTDBDataNodeSyncClientManager( - nodeUrls, useSSL, trustStorePath, trustStorePwd, useLeaderCache, loadBalanceStrategy); + nodeUrls, + useSSL, + trustStorePath, + trustStorePwd, + useLeaderCache, + loadBalanceStrategy, + shouldReceiverConvertOnTypeMismatch); return clientManager; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java index 762af561bc8..bb4e315f77c 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java @@ -51,6 +51,7 @@ import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransfer import org.apache.iotdb.db.pipe.event.common.schema.PipeSchemaRegionSnapshotEvent; import org.apache.iotdb.db.pipe.metric.PipeDataNodeReceiverMetrics; import org.apache.iotdb.db.pipe.receiver.visitor.PipePlanToStatementVisitor; +import org.apache.iotdb.db.pipe.receiver.visitor.PipeStatementDataTypeConvertExecutionVisitor; import org.apache.iotdb.db.pipe.receiver.visitor.PipeStatementExceptionVisitor; import org.apache.iotdb.db.pipe.receiver.visitor.PipeStatementPatternParseVisitor; import org.apache.iotdb.db.pipe.receiver.visitor.PipeStatementTSStatusVisitor; @@ -61,7 +62,6 @@ import org.apache.iotdb.db.queryengine.common.header.ColumnHeaderConstant; import org.apache.iotdb.db.queryengine.plan.Coordinator; import org.apache.iotdb.db.queryengine.plan.analyze.ClusterPartitionFetcher; import org.apache.iotdb.db.queryengine.plan.analyze.schema.ClusterSchemaFetcher; -import org.apache.iotdb.db.queryengine.plan.execution.ExecutionResult; import org.apache.iotdb.db.queryengine.plan.execution.config.executor.ClusterConfigTaskExecutor; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.metadata.write.view.AlterLogicalViewNode; import org.apache.iotdb.db.queryengine.plan.statement.Statement; @@ -117,6 +117,9 @@ public class IoTDBDataNodeReceiver extends IoTDBFileReceiver { new PipeStatementExceptionVisitor(); private static final PipeStatementPatternParseVisitor STATEMENT_PATTERN_PARSE_VISITOR = new PipeStatementPatternParseVisitor(); + private static final PipeStatementDataTypeConvertExecutionVisitor + STATEMENT_DATA_TYPE_CONVERT_EXECUTION_VISITOR = + new PipeStatementDataTypeConvertExecutionVisitor(IoTDBDataNodeReceiver::executeStatement); private final PipeStatementToBatchVisitor batchVisitor = new PipeStatementToBatchVisitor(); // Used for data transfer: confignode (cluster A) -> datanode (cluster B) -> confignode (cluster @@ -461,7 +464,7 @@ public class IoTDBDataNodeReceiver extends IoTDBFileReceiver { private TSStatus executeStatementAndClassifyExceptions(final Statement statement) { try { - final TSStatus result = executeStatement(statement); + final TSStatus result = executeStatementWithRetryOnDataTypeMismatch(statement); if (result.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode() || result.getCode() == TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()) { return result; @@ -483,25 +486,30 @@ public class IoTDBDataNodeReceiver extends IoTDBFileReceiver { } } - private TSStatus executeStatement(Statement statement) { + private TSStatus executeStatementWithRetryOnDataTypeMismatch(final Statement statement) { if (statement == null) { return RpcUtils.getStatus( TSStatusCode.PIPE_TRANSFER_EXECUTE_STATEMENT_ERROR, "Execute null statement."); } - statement = new PipeEnrichedStatement(statement); - - final ExecutionResult result = - Coordinator.getInstance() - .executeForTreeModel( - statement, - SessionManager.getInstance().requestQueryId(), - new SessionInfo(0, AuthorityChecker.SUPER_USER, ZoneId.systemDefault()), - "", - ClusterPartitionFetcher.getInstance(), - ClusterSchemaFetcher.getInstance(), - IoTDBDescriptor.getInstance().getConfig().getQueryTimeoutThreshold()); - return result.status; + final TSStatus status = executeStatement(statement); + return status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode() + || !shouldConvertDataTypeOnTypeMismatch + ? status + : statement.accept(STATEMENT_DATA_TYPE_CONVERT_EXECUTION_VISITOR, status).orElse(status); + } + + private static TSStatus executeStatement(final Statement statement) { + return Coordinator.getInstance() + .executeForTreeModel( + new PipeEnrichedStatement(statement), + SessionManager.getInstance().requestQueryId(), + new SessionInfo(0, AuthorityChecker.SUPER_USER, ZoneId.systemDefault()), + "", + ClusterPartitionFetcher.getInstance(), + ClusterSchemaFetcher.getInstance(), + IoTDBDescriptor.getInstance().getConfig().getQueryTimeoutThreshold()) + .status; } @Override diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/transform/converter/ArrayConverter.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/transform/converter/ArrayConverter.java new file mode 100644 index 00000000000..3d5a9b0cfb6 --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/transform/converter/ArrayConverter.java @@ -0,0 +1,940 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.pipe.receiver.transform.converter; + +import org.apache.tsfile.enums.TSDataType; +import org.apache.tsfile.utils.Binary; + +public class ArrayConverter { + + @FunctionalInterface + private interface Converter { + Object convert( + final TSDataType sourceDataType, + final TSDataType targetDataType, + final Object sourceValues); + } + + private static final Converter[][] CONVERTER = + new Converter[TSDataType.values().length][TSDataType.values().length]; + + private static final Converter DO_NOTHING_CONVERTER = + (sourceDataType, targetDataType, sourceValues) -> sourceValues; + + static { + for (final TSDataType sourceDataType : TSDataType.values()) { + for (final TSDataType targetDataType : TSDataType.values()) { + CONVERTER[sourceDataType.ordinal()][targetDataType.ordinal()] = DO_NOTHING_CONVERTER; + } + } + + // BOOLEAN + CONVERTER[TSDataType.BOOLEAN.ordinal()][TSDataType.BOOLEAN.ordinal()] = + (sourceDataType, targetDataType, sourceValues) -> sourceValues; + CONVERTER[TSDataType.BOOLEAN.ordinal()][TSDataType.INT32.ordinal()] = + (sourceDataType, targetDataType, sourceValues) -> { + final boolean[] boolValues = (boolean[]) sourceValues; + final int[] intValues = new int[boolValues.length]; + for (int i = 0; i < boolValues.length; i++) { + intValues[i] = ValueConverter.convertBooleanToInt32(boolValues[i]); + } + return intValues; + }; + CONVERTER[TSDataType.BOOLEAN.ordinal()][TSDataType.INT64.ordinal()] = + (sourceDataType, targetDataType, sourceValues) -> { + final boolean[] boolValues = (boolean[]) sourceValues; + final long[] longValues = new long[boolValues.length]; + for (int i = 0; i < boolValues.length; i++) { + longValues[i] = ValueConverter.convertBooleanToInt64(boolValues[i]); + } + return longValues; + }; + CONVERTER[TSDataType.BOOLEAN.ordinal()][TSDataType.FLOAT.ordinal()] = + (sourceDataType, targetDataType, sourceValues) -> { + final boolean[] boolValues = (boolean[]) sourceValues; + final float[] floatValues = new float[boolValues.length]; + for (int i = 0; i < boolValues.length; i++) { + floatValues[i] = ValueConverter.convertBooleanToFloat(boolValues[i]); + } + return floatValues; + }; + CONVERTER[TSDataType.BOOLEAN.ordinal()][TSDataType.DOUBLE.ordinal()] = + (sourceDataType, targetDataType, sourceValues) -> { + final boolean[] boolValues = (boolean[]) sourceValues; + final double[] doubleValues = new double[boolValues.length]; + for (int i = 0; i < boolValues.length; i++) { + doubleValues[i] = ValueConverter.convertBooleanToDouble(boolValues[i]); + } + return doubleValues; + }; + CONVERTER[TSDataType.BOOLEAN.ordinal()][TSDataType.TEXT.ordinal()] = + (sourceDataType, targetDataType, sourceValues) -> { + final boolean[] boolValues = (boolean[]) sourceValues; + final Binary[] textValues = new Binary[boolValues.length]; + for (int i = 0; i < boolValues.length; i++) { + textValues[i] = ValueConverter.convertBooleanToText(boolValues[i]); + } + return textValues; + }; + CONVERTER[TSDataType.BOOLEAN.ordinal()][TSDataType.VECTOR.ordinal()] = DO_NOTHING_CONVERTER; + CONVERTER[TSDataType.BOOLEAN.ordinal()][TSDataType.UNKNOWN.ordinal()] = DO_NOTHING_CONVERTER; + CONVERTER[TSDataType.BOOLEAN.ordinal()][TSDataType.TIMESTAMP.ordinal()] = + (sourceDataType, targetDataType, sourceValues) -> { + final boolean[] boolValues = (boolean[]) sourceValues; + final long[] timestampValues = new long[boolValues.length]; + for (int i = 0; i < boolValues.length; i++) { + timestampValues[i] = ValueConverter.convertBooleanToTimestamp(boolValues[i]); + } + return timestampValues; + }; + CONVERTER[TSDataType.BOOLEAN.ordinal()][TSDataType.DATE.ordinal()] = + (sourceDataType, targetDataType, sourceValues) -> { + final boolean[] boolValues = (boolean[]) sourceValues; + final int[] dateValues = new int[boolValues.length]; + for (int i = 0; i < boolValues.length; i++) { + dateValues[i] = ValueConverter.convertBooleanToDate(boolValues[i]); + } + return dateValues; + }; + CONVERTER[TSDataType.BOOLEAN.ordinal()][TSDataType.BLOB.ordinal()] = + (sourceDataType, targetDataType, sourceValues) -> { + final boolean[] boolValues = (boolean[]) sourceValues; + final Binary[] blobValues = new Binary[boolValues.length]; + for (int i = 0; i < boolValues.length; i++) { + blobValues[i] = ValueConverter.convertBooleanToBlob(boolValues[i]); + } + return blobValues; + }; + CONVERTER[TSDataType.BOOLEAN.ordinal()][TSDataType.STRING.ordinal()] = + (sourceDataType, targetDataType, sourceValues) -> { + final boolean[] boolValues = (boolean[]) sourceValues; + final Binary[] stringValues = new Binary[boolValues.length]; + for (int i = 0; i < boolValues.length; i++) { + stringValues[i] = ValueConverter.convertBooleanToString(boolValues[i]); + } + return stringValues; + }; + + // INT32 + CONVERTER[TSDataType.INT32.ordinal()][TSDataType.BOOLEAN.ordinal()] = + (sourceDataType, targetDataType, sourceValues) -> { + final int[] intValues = (int[]) sourceValues; + final boolean[] boolValues = new boolean[intValues.length]; + for (int i = 0; i < intValues.length; i++) { + boolValues[i] = ValueConverter.convertInt32ToBoolean(intValues[i]); + } + return boolValues; + }; + CONVERTER[TSDataType.INT32.ordinal()][TSDataType.INT32.ordinal()] = + (sourceDataType, targetDataType, sourceValues) -> sourceValues; + CONVERTER[TSDataType.INT32.ordinal()][TSDataType.INT64.ordinal()] = + (sourceDataType, targetDataType, sourceValues) -> { + final int[] intValues = (int[]) sourceValues; + final long[] longValues = new long[intValues.length]; + for (int i = 0; i < intValues.length; i++) { + longValues[i] = ValueConverter.convertInt32ToInt64(intValues[i]); + } + return longValues; + }; + CONVERTER[TSDataType.INT32.ordinal()][TSDataType.FLOAT.ordinal()] = + (sourceDataType, targetDataType, sourceValues) -> { + final int[] intValues = (int[]) sourceValues; + final float[] floatValues = new float[intValues.length]; + for (int i = 0; i < intValues.length; i++) { + floatValues[i] = ValueConverter.convertInt32ToFloat(intValues[i]); + } + return floatValues; + }; + CONVERTER[TSDataType.INT32.ordinal()][TSDataType.DOUBLE.ordinal()] = + (sourceDataType, targetDataType, sourceValues) -> { + final int[] intValues = (int[]) sourceValues; + final double[] doubleValues = new double[intValues.length]; + for (int i = 0; i < intValues.length; i++) { + doubleValues[i] = ValueConverter.convertInt32ToDouble(intValues[i]); + } + return doubleValues; + }; + CONVERTER[TSDataType.INT32.ordinal()][TSDataType.TEXT.ordinal()] = + (sourceDataType, targetDataType, sourceValues) -> { + final int[] intValues = (int[]) sourceValues; + final Binary[] textValues = new Binary[intValues.length]; + for (int i = 0; i < intValues.length; i++) { + textValues[i] = ValueConverter.convertInt32ToText(intValues[i]); + } + return textValues; + }; + CONVERTER[TSDataType.INT32.ordinal()][TSDataType.VECTOR.ordinal()] = DO_NOTHING_CONVERTER; + CONVERTER[TSDataType.INT32.ordinal()][TSDataType.UNKNOWN.ordinal()] = DO_NOTHING_CONVERTER; + CONVERTER[TSDataType.INT32.ordinal()][TSDataType.TIMESTAMP.ordinal()] = + (sourceDataType, targetDataType, sourceValues) -> { + final int[] intValues = (int[]) sourceValues; + final long[] timestampValues = new long[intValues.length]; + for (int i = 0; i < intValues.length; i++) { + timestampValues[i] = ValueConverter.convertInt32ToTimestamp(intValues[i]); + } + return timestampValues; + }; + CONVERTER[TSDataType.INT32.ordinal()][TSDataType.DATE.ordinal()] = + (sourceDataType, targetDataType, sourceValues) -> { + final int[] intValues = (int[]) sourceValues; + final int[] dateValues = new int[intValues.length]; + for (int i = 0; i < intValues.length; i++) { + dateValues[i] = ValueConverter.convertInt32ToDate(intValues[i]); + } + return dateValues; + }; + CONVERTER[TSDataType.INT32.ordinal()][TSDataType.BLOB.ordinal()] = + (sourceDataType, targetDataType, sourceValues) -> { + final int[] intValues = (int[]) sourceValues; + final Binary[] blobValues = new Binary[intValues.length]; + for (int i = 0; i < intValues.length; i++) { + blobValues[i] = ValueConverter.convertInt32ToBlob(intValues[i]); + } + return blobValues; + }; + CONVERTER[TSDataType.INT32.ordinal()][TSDataType.STRING.ordinal()] = + (sourceDataType, targetDataType, sourceValues) -> { + final int[] intValues = (int[]) sourceValues; + final Binary[] stringValues = new Binary[intValues.length]; + for (int i = 0; i < intValues.length; i++) { + stringValues[i] = ValueConverter.convertInt32ToString(intValues[i]); + } + return stringValues; + }; + + // INT64 + CONVERTER[TSDataType.INT64.ordinal()][TSDataType.BOOLEAN.ordinal()] = + (sourceDataType, targetDataType, sourceValues) -> { + final long[] longValues = (long[]) sourceValues; + final boolean[] boolValues = new boolean[longValues.length]; + for (int i = 0; i < longValues.length; i++) { + boolValues[i] = ValueConverter.convertInt64ToBoolean(longValues[i]); + } + return boolValues; + }; + CONVERTER[TSDataType.INT64.ordinal()][TSDataType.INT32.ordinal()] = + (sourceDataType, targetDataType, sourceValues) -> { + final long[] longValues = (long[]) sourceValues; + final int[] intValues = new int[longValues.length]; + for (int i = 0; i < longValues.length; i++) { + intValues[i] = ValueConverter.convertInt64ToInt32(longValues[i]); + } + return intValues; + }; + CONVERTER[TSDataType.INT64.ordinal()][TSDataType.INT64.ordinal()] = + (sourceDataType, targetDataType, sourceValues) -> sourceValues; + CONVERTER[TSDataType.INT64.ordinal()][TSDataType.FLOAT.ordinal()] = + (sourceDataType, targetDataType, sourceValues) -> { + final long[] longValues = (long[]) sourceValues; + final float[] floatValues = new float[longValues.length]; + for (int i = 0; i < longValues.length; i++) { + floatValues[i] = ValueConverter.convertInt64ToFloat(longValues[i]); + } + return floatValues; + }; + CONVERTER[TSDataType.INT64.ordinal()][TSDataType.DOUBLE.ordinal()] = + (sourceDataType, targetDataType, sourceValues) -> { + final long[] longValues = (long[]) sourceValues; + final double[] doubleValues = new double[longValues.length]; + for (int i = 0; i < longValues.length; i++) { + doubleValues[i] = ValueConverter.convertInt64ToDouble(longValues[i]); + } + return doubleValues; + }; + CONVERTER[TSDataType.INT64.ordinal()][TSDataType.TEXT.ordinal()] = + (sourceDataType, targetDataType, sourceValues) -> { + final long[] longValues = (long[]) sourceValues; + final Binary[] textValues = new Binary[longValues.length]; + for (int i = 0; i < longValues.length; i++) { + textValues[i] = ValueConverter.convertInt64ToText(longValues[i]); + } + return textValues; + }; + CONVERTER[TSDataType.INT64.ordinal()][TSDataType.VECTOR.ordinal()] = DO_NOTHING_CONVERTER; + CONVERTER[TSDataType.INT64.ordinal()][TSDataType.UNKNOWN.ordinal()] = DO_NOTHING_CONVERTER; + CONVERTER[TSDataType.INT64.ordinal()][TSDataType.TIMESTAMP.ordinal()] = + (sourceDataType, targetDataType, sourceValues) -> { + final long[] longValues = (long[]) sourceValues; + final long[] timestampValues = new long[longValues.length]; + for (int i = 0; i < longValues.length; i++) { + timestampValues[i] = ValueConverter.convertInt64ToTimestamp(longValues[i]); + } + return timestampValues; + }; + CONVERTER[TSDataType.INT64.ordinal()][TSDataType.DATE.ordinal()] = + (sourceDataType, targetDataType, sourceValues) -> { + final long[] longValues = (long[]) sourceValues; + final int[] dateValues = new int[longValues.length]; + for (int i = 0; i < longValues.length; i++) { + dateValues[i] = ValueConverter.convertInt64ToDate(longValues[i]); + } + return dateValues; + }; + CONVERTER[TSDataType.INT64.ordinal()][TSDataType.BLOB.ordinal()] = + (sourceDataType, targetDataType, sourceValues) -> { + final long[] longValues = (long[]) sourceValues; + final Binary[] blobValues = new Binary[longValues.length]; + for (int i = 0; i < longValues.length; i++) { + blobValues[i] = ValueConverter.convertInt64ToBlob(longValues[i]); + } + return blobValues; + }; + CONVERTER[TSDataType.INT64.ordinal()][TSDataType.STRING.ordinal()] = + (sourceDataType, targetDataType, sourceValues) -> { + final long[] longValues = (long[]) sourceValues; + final Binary[] stringValues = new Binary[longValues.length]; + for (int i = 0; i < longValues.length; i++) { + stringValues[i] = ValueConverter.convertInt64ToString(longValues[i]); + } + return stringValues; + }; + + // FLOAT + CONVERTER[TSDataType.FLOAT.ordinal()][TSDataType.BOOLEAN.ordinal()] = + (sourceDataType, targetDataType, sourceValues) -> { + final float[] floatValues = (float[]) sourceValues; + final boolean[] boolValues = new boolean[floatValues.length]; + for (int i = 0; i < floatValues.length; i++) { + boolValues[i] = ValueConverter.convertFloatToBoolean(floatValues[i]); + } + return boolValues; + }; + CONVERTER[TSDataType.FLOAT.ordinal()][TSDataType.INT32.ordinal()] = + (sourceDataType, targetDataType, sourceValues) -> { + final float[] floatValues = (float[]) sourceValues; + final int[] intValues = new int[floatValues.length]; + for (int i = 0; i < floatValues.length; i++) { + intValues[i] = ValueConverter.convertFloatToInt32(floatValues[i]); + } + return intValues; + }; + CONVERTER[TSDataType.FLOAT.ordinal()][TSDataType.INT64.ordinal()] = + (sourceDataType, targetDataType, sourceValues) -> { + final float[] floatValues = (float[]) sourceValues; + final long[] longValues = new long[floatValues.length]; + for (int i = 0; i < floatValues.length; i++) { + longValues[i] = ValueConverter.convertFloatToInt64(floatValues[i]); + } + return longValues; + }; + CONVERTER[TSDataType.FLOAT.ordinal()][TSDataType.FLOAT.ordinal()] = + (sourceDataType, targetDataType, sourceValues) -> sourceValues; + CONVERTER[TSDataType.FLOAT.ordinal()][TSDataType.DOUBLE.ordinal()] = + (sourceDataType, targetDataType, sourceValues) -> { + final float[] floatValues = (float[]) sourceValues; + final double[] doubleValues = new double[floatValues.length]; + for (int i = 0; i < floatValues.length; i++) { + doubleValues[i] = ValueConverter.convertFloatToDouble(floatValues[i]); + } + return doubleValues; + }; + CONVERTER[TSDataType.FLOAT.ordinal()][TSDataType.TEXT.ordinal()] = + (sourceDataType, targetDataType, sourceValues) -> { + final float[] floatValues = (float[]) sourceValues; + final Binary[] textValues = new Binary[floatValues.length]; + for (int i = 0; i < floatValues.length; i++) { + textValues[i] = ValueConverter.convertFloatToText(floatValues[i]); + } + return textValues; + }; + CONVERTER[TSDataType.FLOAT.ordinal()][TSDataType.VECTOR.ordinal()] = DO_NOTHING_CONVERTER; + CONVERTER[TSDataType.FLOAT.ordinal()][TSDataType.UNKNOWN.ordinal()] = DO_NOTHING_CONVERTER; + CONVERTER[TSDataType.FLOAT.ordinal()][TSDataType.TIMESTAMP.ordinal()] = + (sourceDataType, targetDataType, sourceValues) -> { + final float[] floatValues = (float[]) sourceValues; + final long[] timestampValues = new long[floatValues.length]; + for (int i = 0; i < floatValues.length; i++) { + timestampValues[i] = ValueConverter.convertFloatToTimestamp(floatValues[i]); + } + return timestampValues; + }; + CONVERTER[TSDataType.FLOAT.ordinal()][TSDataType.DATE.ordinal()] = + (sourceDataType, targetDataType, sourceValues) -> { + final float[] floatValues = (float[]) sourceValues; + final int[] dateValues = new int[floatValues.length]; + for (int i = 0; i < floatValues.length; i++) { + dateValues[i] = ValueConverter.convertFloatToDate(floatValues[i]); + } + return dateValues; + }; + CONVERTER[TSDataType.FLOAT.ordinal()][TSDataType.BLOB.ordinal()] = + (sourceDataType, targetDataType, sourceValues) -> { + final float[] floatValues = (float[]) sourceValues; + final Binary[] blobValues = new Binary[floatValues.length]; + for (int i = 0; i < floatValues.length; i++) { + blobValues[i] = ValueConverter.convertFloatToBlob(floatValues[i]); + } + return blobValues; + }; + CONVERTER[TSDataType.FLOAT.ordinal()][TSDataType.STRING.ordinal()] = + (sourceDataType, targetDataType, sourceValues) -> { + final float[] floatValues = (float[]) sourceValues; + final Binary[] stringValues = new Binary[floatValues.length]; + for (int i = 0; i < floatValues.length; i++) { + stringValues[i] = ValueConverter.convertFloatToString(floatValues[i]); + } + return stringValues; + }; + + // DOUBLE + CONVERTER[TSDataType.DOUBLE.ordinal()][TSDataType.BOOLEAN.ordinal()] = + (sourceDataType, targetDataType, sourceValues) -> { + final double[] doubleValues = (double[]) sourceValues; + final boolean[] boolValues = new boolean[doubleValues.length]; + for (int i = 0; i < doubleValues.length; i++) { + boolValues[i] = ValueConverter.convertDoubleToBoolean(doubleValues[i]); + } + return boolValues; + }; + CONVERTER[TSDataType.DOUBLE.ordinal()][TSDataType.INT32.ordinal()] = + (sourceDataType, targetDataType, sourceValues) -> { + final double[] doubleValues = (double[]) sourceValues; + final int[] intValues = new int[doubleValues.length]; + for (int i = 0; i < doubleValues.length; i++) { + intValues[i] = ValueConverter.convertDoubleToInt32(doubleValues[i]); + } + return intValues; + }; + CONVERTER[TSDataType.DOUBLE.ordinal()][TSDataType.INT64.ordinal()] = + (sourceDataType, targetDataType, sourceValues) -> { + final double[] doubleValues = (double[]) sourceValues; + final long[] longValues = new long[doubleValues.length]; + for (int i = 0; i < doubleValues.length; i++) { + longValues[i] = ValueConverter.convertDoubleToInt64(doubleValues[i]); + } + return longValues; + }; + CONVERTER[TSDataType.DOUBLE.ordinal()][TSDataType.FLOAT.ordinal()] = + (sourceDataType, targetDataType, sourceValues) -> { + final double[] doubleValues = (double[]) sourceValues; + final float[] floatValues = new float[doubleValues.length]; + for (int i = 0; i < doubleValues.length; i++) { + floatValues[i] = ValueConverter.convertDoubleToFloat(doubleValues[i]); + } + return floatValues; + }; + CONVERTER[TSDataType.DOUBLE.ordinal()][TSDataType.DOUBLE.ordinal()] = + (sourceDataType, targetDataType, sourceValues) -> sourceValues; + CONVERTER[TSDataType.DOUBLE.ordinal()][TSDataType.TEXT.ordinal()] = + (sourceDataType, targetDataType, sourceValues) -> { + final double[] doubleValues = (double[]) sourceValues; + final Binary[] textValues = new Binary[doubleValues.length]; + for (int i = 0; i < doubleValues.length; i++) { + textValues[i] = ValueConverter.convertDoubleToText(doubleValues[i]); + } + return textValues; + }; + CONVERTER[TSDataType.DOUBLE.ordinal()][TSDataType.VECTOR.ordinal()] = DO_NOTHING_CONVERTER; + CONVERTER[TSDataType.DOUBLE.ordinal()][TSDataType.UNKNOWN.ordinal()] = DO_NOTHING_CONVERTER; + CONVERTER[TSDataType.DOUBLE.ordinal()][TSDataType.TIMESTAMP.ordinal()] = + (sourceDataType, targetDataType, sourceValues) -> { + final double[] doubleValues = (double[]) sourceValues; + final long[] timestampValues = new long[doubleValues.length]; + for (int i = 0; i < doubleValues.length; i++) { + timestampValues[i] = ValueConverter.convertDoubleToTimestamp(doubleValues[i]); + } + return timestampValues; + }; + CONVERTER[TSDataType.DOUBLE.ordinal()][TSDataType.DATE.ordinal()] = + (sourceDataType, targetDataType, sourceValues) -> { + final double[] doubleValues = (double[]) sourceValues; + final int[] dateValues = new int[doubleValues.length]; + for (int i = 0; i < doubleValues.length; i++) { + dateValues[i] = ValueConverter.convertDoubleToDate(doubleValues[i]); + } + return dateValues; + }; + CONVERTER[TSDataType.DOUBLE.ordinal()][TSDataType.BLOB.ordinal()] = + (sourceDataType, targetDataType, sourceValues) -> { + final double[] doubleValues = (double[]) sourceValues; + final Binary[] blobValues = new Binary[doubleValues.length]; + for (int i = 0; i < doubleValues.length; i++) { + blobValues[i] = ValueConverter.convertDoubleToBlob(doubleValues[i]); + } + return blobValues; + }; + CONVERTER[TSDataType.DOUBLE.ordinal()][TSDataType.STRING.ordinal()] = + (sourceDataType, targetDataType, sourceValues) -> { + final double[] doubleValues = (double[]) sourceValues; + final Binary[] stringValues = new Binary[doubleValues.length]; + for (int i = 0; i < doubleValues.length; i++) { + stringValues[i] = ValueConverter.convertDoubleToString(doubleValues[i]); + } + return stringValues; + }; + + // TEXT + CONVERTER[TSDataType.TEXT.ordinal()][TSDataType.BOOLEAN.ordinal()] = + (sourceDataType, targetDataType, sourceValues) -> { + final Binary[] textValues = (Binary[]) sourceValues; + final boolean[] boolValues = new boolean[textValues.length]; + for (int i = 0; i < textValues.length; i++) { + boolValues[i] = ValueConverter.convertTextToBoolean(textValues[i]); + } + return boolValues; + }; + CONVERTER[TSDataType.TEXT.ordinal()][TSDataType.INT32.ordinal()] = + (sourceDataType, targetDataType, sourceValues) -> { + final Binary[] textValues = (Binary[]) sourceValues; + final int[] intValues = new int[textValues.length]; + for (int i = 0; i < textValues.length; i++) { + intValues[i] = ValueConverter.convertTextToInt32(textValues[i]); + } + return intValues; + }; + CONVERTER[TSDataType.TEXT.ordinal()][TSDataType.INT64.ordinal()] = + (sourceDataType, targetDataType, sourceValues) -> { + final Binary[] textValues = (Binary[]) sourceValues; + final long[] longValues = new long[textValues.length]; + for (int i = 0; i < textValues.length; i++) { + longValues[i] = ValueConverter.convertTextToInt64(textValues[i]); + } + return longValues; + }; + CONVERTER[TSDataType.TEXT.ordinal()][TSDataType.FLOAT.ordinal()] = + (sourceDataType, targetDataType, sourceValues) -> { + final Binary[] textValues = (Binary[]) sourceValues; + final float[] floatValues = new float[textValues.length]; + for (int i = 0; i < textValues.length; i++) { + floatValues[i] = ValueConverter.convertTextToFloat(textValues[i]); + } + return floatValues; + }; + CONVERTER[TSDataType.TEXT.ordinal()][TSDataType.DOUBLE.ordinal()] = + (sourceDataType, targetDataType, sourceValues) -> { + final Binary[] textValues = (Binary[]) sourceValues; + final double[] doubleValues = new double[textValues.length]; + for (int i = 0; i < textValues.length; i++) { + doubleValues[i] = ValueConverter.convertTextToDouble(textValues[i]); + } + return doubleValues; + }; + CONVERTER[TSDataType.TEXT.ordinal()][TSDataType.TEXT.ordinal()] = + (sourceDataType, targetDataType, sourceValues) -> sourceValues; + CONVERTER[TSDataType.TEXT.ordinal()][TSDataType.VECTOR.ordinal()] = DO_NOTHING_CONVERTER; + CONVERTER[TSDataType.TEXT.ordinal()][TSDataType.UNKNOWN.ordinal()] = DO_NOTHING_CONVERTER; + CONVERTER[TSDataType.TEXT.ordinal()][TSDataType.TIMESTAMP.ordinal()] = + (sourceDataType, targetDataType, sourceValues) -> { + final Binary[] textValues = (Binary[]) sourceValues; + final long[] timestampValues = new long[textValues.length]; + for (int i = 0; i < textValues.length; i++) { + timestampValues[i] = ValueConverter.convertTextToTimestamp(textValues[i]); + } + return timestampValues; + }; + CONVERTER[TSDataType.TEXT.ordinal()][TSDataType.DATE.ordinal()] = + (sourceDataType, targetDataType, sourceValues) -> { + final Binary[] textValues = (Binary[]) sourceValues; + final int[] dateValues = new int[textValues.length]; + for (int i = 0; i < textValues.length; i++) { + dateValues[i] = ValueConverter.convertTextToDate(textValues[i]); + } + return dateValues; + }; + CONVERTER[TSDataType.TEXT.ordinal()][TSDataType.BLOB.ordinal()] = + (sourceDataType, targetDataType, sourceValues) -> { + final Binary[] textValues = (Binary[]) sourceValues; + final Binary[] blobValues = new Binary[textValues.length]; + for (int i = 0; i < textValues.length; i++) { + blobValues[i] = ValueConverter.convertTextToBlob(textValues[i]); + } + return blobValues; + }; + CONVERTER[TSDataType.TEXT.ordinal()][TSDataType.STRING.ordinal()] = + (sourceDataType, targetDataType, sourceValues) -> { + final Binary[] textValues = (Binary[]) sourceValues; + final Binary[] stringValues = new Binary[textValues.length]; + for (int i = 0; i < textValues.length; i++) { + stringValues[i] = ValueConverter.convertTextToString(textValues[i]); + } + return stringValues; + }; + + // VECTOR + for (int i = 0; i < TSDataType.values().length; i++) { + CONVERTER[TSDataType.VECTOR.ordinal()][i] = DO_NOTHING_CONVERTER; + } + + // UNKNOWN + for (int i = 0; i < TSDataType.values().length; i++) { + CONVERTER[TSDataType.UNKNOWN.ordinal()][i] = DO_NOTHING_CONVERTER; + } + + // TIMESTAMP + CONVERTER[TSDataType.TIMESTAMP.ordinal()][TSDataType.BOOLEAN.ordinal()] = + (sourceDataType, targetDataType, sourceValues) -> { + final long[] timestampValues = (long[]) sourceValues; + final boolean[] boolValues = new boolean[timestampValues.length]; + for (int i = 0; i < timestampValues.length; i++) { + boolValues[i] = ValueConverter.convertTimestampToBoolean(timestampValues[i]); + } + return boolValues; + }; + CONVERTER[TSDataType.TIMESTAMP.ordinal()][TSDataType.INT32.ordinal()] = + (sourceDataType, targetDataType, sourceValues) -> { + final long[] timestampValues = (long[]) sourceValues; + final int[] intValues = new int[timestampValues.length]; + for (int i = 0; i < timestampValues.length; i++) { + intValues[i] = ValueConverter.convertTimestampToInt32(timestampValues[i]); + } + return intValues; + }; + CONVERTER[TSDataType.TIMESTAMP.ordinal()][TSDataType.INT64.ordinal()] = + (sourceDataType, targetDataType, sourceValues) -> { + final long[] timestampValues = (long[]) sourceValues; + final long[] longValues = new long[timestampValues.length]; + for (int i = 0; i < timestampValues.length; i++) { + longValues[i] = ValueConverter.convertTimestampToInt64(timestampValues[i]); + } + return longValues; + }; + CONVERTER[TSDataType.TIMESTAMP.ordinal()][TSDataType.FLOAT.ordinal()] = + (sourceDataType, targetDataType, sourceValues) -> { + final long[] timestampValues = (long[]) sourceValues; + final float[] floatValues = new float[timestampValues.length]; + for (int i = 0; i < timestampValues.length; i++) { + floatValues[i] = ValueConverter.convertTimestampToFloat(timestampValues[i]); + } + return floatValues; + }; + CONVERTER[TSDataType.TIMESTAMP.ordinal()][TSDataType.DOUBLE.ordinal()] = + (sourceDataType, targetDataType, sourceValues) -> { + final long[] timestampValues = (long[]) sourceValues; + final double[] doubleValues = new double[timestampValues.length]; + for (int i = 0; i < timestampValues.length; i++) { + doubleValues[i] = ValueConverter.convertTimestampToDouble(timestampValues[i]); + } + return doubleValues; + }; + CONVERTER[TSDataType.TIMESTAMP.ordinal()][TSDataType.TEXT.ordinal()] = + (sourceDataType, targetDataType, sourceValues) -> { + final long[] timestampValues = (long[]) sourceValues; + final Binary[] textValues = new Binary[timestampValues.length]; + for (int i = 0; i < timestampValues.length; i++) { + textValues[i] = ValueConverter.convertTimestampToText(timestampValues[i]); + } + return textValues; + }; + CONVERTER[TSDataType.TIMESTAMP.ordinal()][TSDataType.VECTOR.ordinal()] = DO_NOTHING_CONVERTER; + CONVERTER[TSDataType.TIMESTAMP.ordinal()][TSDataType.UNKNOWN.ordinal()] = DO_NOTHING_CONVERTER; + CONVERTER[TSDataType.TIMESTAMP.ordinal()][TSDataType.TIMESTAMP.ordinal()] = + (sourceDataType, targetDataType, sourceValues) -> sourceValues; + CONVERTER[TSDataType.TIMESTAMP.ordinal()][TSDataType.DATE.ordinal()] = + (sourceDataType, targetDataType, sourceValues) -> { + final long[] timestampValues = (long[]) sourceValues; + final int[] dateValues = new int[timestampValues.length]; + for (int i = 0; i < timestampValues.length; i++) { + dateValues[i] = ValueConverter.convertTimestampToDate(timestampValues[i]); + } + return dateValues; + }; + CONVERTER[TSDataType.TIMESTAMP.ordinal()][TSDataType.BLOB.ordinal()] = + (sourceDataType, targetDataType, sourceValues) -> { + final long[] timestampValues = (long[]) sourceValues; + final Binary[] blobValues = new Binary[timestampValues.length]; + for (int i = 0; i < timestampValues.length; i++) { + blobValues[i] = ValueConverter.convertTimestampToBlob(timestampValues[i]); + } + return blobValues; + }; + CONVERTER[TSDataType.TIMESTAMP.ordinal()][TSDataType.STRING.ordinal()] = + (sourceDataType, targetDataType, sourceValues) -> { + final long[] timestampValues = (long[]) sourceValues; + final Binary[] stringValues = new Binary[timestampValues.length]; + for (int i = 0; i < timestampValues.length; i++) { + stringValues[i] = ValueConverter.convertTimestampToString(timestampValues[i]); + } + return stringValues; + }; + + // DATE + CONVERTER[TSDataType.DATE.ordinal()][TSDataType.BOOLEAN.ordinal()] = + (sourceDataType, targetDataType, sourceValues) -> { + final int[] dateValues = (int[]) sourceValues; + final boolean[] boolValues = new boolean[dateValues.length]; + for (int i = 0; i < dateValues.length; i++) { + boolValues[i] = ValueConverter.convertDateToBoolean(dateValues[i]); + } + return boolValues; + }; + CONVERTER[TSDataType.DATE.ordinal()][TSDataType.INT32.ordinal()] = + (sourceDataType, targetDataType, sourceValues) -> { + final int[] dateValues = (int[]) sourceValues; + final int[] intValues = new int[dateValues.length]; + for (int i = 0; i < dateValues.length; i++) { + intValues[i] = ValueConverter.convertDateToInt32(dateValues[i]); + } + return intValues; + }; + CONVERTER[TSDataType.DATE.ordinal()][TSDataType.INT64.ordinal()] = + (sourceDataType, targetDataType, sourceValues) -> { + final int[] dateValues = (int[]) sourceValues; + final long[] longValues = new long[dateValues.length]; + for (int i = 0; i < dateValues.length; i++) { + longValues[i] = ValueConverter.convertDateToInt64(dateValues[i]); + } + return longValues; + }; + CONVERTER[TSDataType.DATE.ordinal()][TSDataType.FLOAT.ordinal()] = + (sourceDataType, targetDataType, sourceValues) -> { + final int[] dateValues = (int[]) sourceValues; + final float[] floatValues = new float[dateValues.length]; + for (int i = 0; i < dateValues.length; i++) { + floatValues[i] = ValueConverter.convertDateToFloat(dateValues[i]); + } + return floatValues; + }; + CONVERTER[TSDataType.DATE.ordinal()][TSDataType.DOUBLE.ordinal()] = + (sourceDataType, targetDataType, sourceValues) -> { + final int[] dateValues = (int[]) sourceValues; + final double[] doubleValues = new double[dateValues.length]; + for (int i = 0; i < dateValues.length; i++) { + doubleValues[i] = ValueConverter.convertDateToDouble(dateValues[i]); + } + return doubleValues; + }; + CONVERTER[TSDataType.DATE.ordinal()][TSDataType.TEXT.ordinal()] = + (sourceDataType, targetDataType, sourceValues) -> { + final int[] dateValues = (int[]) sourceValues; + final Binary[] textValues = new Binary[dateValues.length]; + for (int i = 0; i < dateValues.length; i++) { + textValues[i] = ValueConverter.convertDateToText(dateValues[i]); + } + return textValues; + }; + CONVERTER[TSDataType.DATE.ordinal()][TSDataType.VECTOR.ordinal()] = DO_NOTHING_CONVERTER; + CONVERTER[TSDataType.DATE.ordinal()][TSDataType.UNKNOWN.ordinal()] = DO_NOTHING_CONVERTER; + CONVERTER[TSDataType.DATE.ordinal()][TSDataType.TIMESTAMP.ordinal()] = + (sourceDataType, targetDataType, sourceValues) -> { + final int[] dateValues = (int[]) sourceValues; + final long[] timestampValues = new long[dateValues.length]; + for (int i = 0; i < dateValues.length; i++) { + timestampValues[i] = ValueConverter.convertDateToTimestamp(dateValues[i]); + } + return timestampValues; + }; + CONVERTER[TSDataType.DATE.ordinal()][TSDataType.DATE.ordinal()] = + (sourceDataType, targetDataType, sourceValues) -> sourceValues; + CONVERTER[TSDataType.DATE.ordinal()][TSDataType.BLOB.ordinal()] = + (sourceDataType, targetDataType, sourceValues) -> { + final int[] dateValues = (int[]) sourceValues; + final Binary[] blobValues = new Binary[dateValues.length]; + for (int i = 0; i < dateValues.length; i++) { + blobValues[i] = ValueConverter.convertDateToBlob(dateValues[i]); + } + return blobValues; + }; + CONVERTER[TSDataType.DATE.ordinal()][TSDataType.STRING.ordinal()] = + (sourceDataType, targetDataType, sourceValues) -> { + final int[] dateValues = (int[]) sourceValues; + final Binary[] stringValues = new Binary[dateValues.length]; + for (int i = 0; i < dateValues.length; i++) { + stringValues[i] = ValueConverter.convertDateToString(dateValues[i]); + } + return stringValues; + }; + + // BLOB + CONVERTER[TSDataType.BLOB.ordinal()][TSDataType.BOOLEAN.ordinal()] = + (sourceDataType, targetDataType, sourceValues) -> { + final Binary[] blobValues = (Binary[]) sourceValues; + final boolean[] boolValues = new boolean[blobValues.length]; + for (int i = 0; i < blobValues.length; i++) { + boolValues[i] = ValueConverter.convertBlobToBoolean(blobValues[i]); + } + return boolValues; + }; + CONVERTER[TSDataType.BLOB.ordinal()][TSDataType.INT32.ordinal()] = + (sourceDataType, targetDataType, sourceValues) -> { + final Binary[] blobValues = (Binary[]) sourceValues; + final int[] intValues = new int[blobValues.length]; + for (int i = 0; i < blobValues.length; i++) { + intValues[i] = ValueConverter.convertBlobToInt32(blobValues[i]); + } + return intValues; + }; + CONVERTER[TSDataType.BLOB.ordinal()][TSDataType.INT64.ordinal()] = + (sourceDataType, targetDataType, sourceValues) -> { + final Binary[] blobValues = (Binary[]) sourceValues; + final long[] longValues = new long[blobValues.length]; + for (int i = 0; i < blobValues.length; i++) { + longValues[i] = ValueConverter.convertBlobToInt64(blobValues[i]); + } + return longValues; + }; + CONVERTER[TSDataType.BLOB.ordinal()][TSDataType.FLOAT.ordinal()] = + (sourceDataType, targetDataType, sourceValues) -> { + final Binary[] blobValues = (Binary[]) sourceValues; + final float[] floatValues = new float[blobValues.length]; + for (int i = 0; i < blobValues.length; i++) { + floatValues[i] = ValueConverter.convertBlobToFloat(blobValues[i]); + } + return floatValues; + }; + CONVERTER[TSDataType.BLOB.ordinal()][TSDataType.DOUBLE.ordinal()] = + (sourceDataType, targetDataType, sourceValues) -> { + final Binary[] blobValues = (Binary[]) sourceValues; + final double[] doubleValues = new double[blobValues.length]; + for (int i = 0; i < blobValues.length; i++) { + doubleValues[i] = ValueConverter.convertBlobToDouble(blobValues[i]); + } + return doubleValues; + }; + CONVERTER[TSDataType.BLOB.ordinal()][TSDataType.TEXT.ordinal()] = + (sourceDataType, targetDataType, sourceValues) -> { + final Binary[] blobValues = (Binary[]) sourceValues; + final Binary[] textValues = new Binary[blobValues.length]; + for (int i = 0; i < blobValues.length; i++) { + textValues[i] = ValueConverter.convertBlobToText(blobValues[i]); + } + return textValues; + }; + CONVERTER[TSDataType.BLOB.ordinal()][TSDataType.VECTOR.ordinal()] = DO_NOTHING_CONVERTER; + CONVERTER[TSDataType.BLOB.ordinal()][TSDataType.UNKNOWN.ordinal()] = DO_NOTHING_CONVERTER; + CONVERTER[TSDataType.BLOB.ordinal()][TSDataType.TIMESTAMP.ordinal()] = + (sourceDataType, targetDataType, sourceValues) -> { + final Binary[] blobValues = (Binary[]) sourceValues; + final long[] timestampValues = new long[blobValues.length]; + for (int i = 0; i < blobValues.length; i++) { + timestampValues[i] = ValueConverter.convertBlobToTimestamp(blobValues[i]); + } + return timestampValues; + }; + CONVERTER[TSDataType.BLOB.ordinal()][TSDataType.DATE.ordinal()] = + (sourceDataType, targetDataType, sourceValues) -> { + final Binary[] blobValues = (Binary[]) sourceValues; + final int[] dateValues = new int[blobValues.length]; + for (int i = 0; i < blobValues.length; i++) { + dateValues[i] = ValueConverter.convertBlobToDate(blobValues[i]); + } + return dateValues; + }; + CONVERTER[TSDataType.BLOB.ordinal()][TSDataType.BLOB.ordinal()] = + (sourceDataType, targetDataType, sourceValues) -> sourceValues; + CONVERTER[TSDataType.BLOB.ordinal()][TSDataType.STRING.ordinal()] = + (sourceDataType, targetDataType, sourceValues) -> { + final Binary[] blobValues = (Binary[]) sourceValues; + final Binary[] stringValues = new Binary[blobValues.length]; + for (int i = 0; i < blobValues.length; i++) { + stringValues[i] = ValueConverter.convertBlobToString(blobValues[i]); + } + return stringValues; + }; + + // STRING + CONVERTER[TSDataType.STRING.ordinal()][TSDataType.BOOLEAN.ordinal()] = + (sourceDataType, targetDataType, sourceValues) -> { + final Binary[] stringValues = (Binary[]) sourceValues; + final boolean[] boolValues = new boolean[stringValues.length]; + for (int i = 0; i < stringValues.length; i++) { + boolValues[i] = ValueConverter.convertStringToBoolean(stringValues[i]); + } + return boolValues; + }; + CONVERTER[TSDataType.STRING.ordinal()][TSDataType.INT32.ordinal()] = + (sourceDataType, targetDataType, sourceValues) -> { + final Binary[] stringValues = (Binary[]) sourceValues; + final int[] intValues = new int[stringValues.length]; + for (int i = 0; i < stringValues.length; i++) { + intValues[i] = ValueConverter.convertStringToInt32(stringValues[i]); + } + return intValues; + }; + CONVERTER[TSDataType.STRING.ordinal()][TSDataType.INT64.ordinal()] = + (sourceDataType, targetDataType, sourceValues) -> { + final Binary[] stringValues = (Binary[]) sourceValues; + final long[] longValues = new long[stringValues.length]; + for (int i = 0; i < stringValues.length; i++) { + longValues[i] = ValueConverter.convertStringToInt64(stringValues[i]); + } + return longValues; + }; + CONVERTER[TSDataType.STRING.ordinal()][TSDataType.FLOAT.ordinal()] = + (sourceDataType, targetDataType, sourceValues) -> { + final Binary[] stringValues = (Binary[]) sourceValues; + final float[] floatValues = new float[stringValues.length]; + for (int i = 0; i < stringValues.length; i++) { + floatValues[i] = ValueConverter.convertStringToFloat(stringValues[i]); + } + return floatValues; + }; + CONVERTER[TSDataType.STRING.ordinal()][TSDataType.DOUBLE.ordinal()] = + (sourceDataType, targetDataType, sourceValues) -> { + final Binary[] stringValues = (Binary[]) sourceValues; + final double[] doubleValues = new double[stringValues.length]; + for (int i = 0; i < stringValues.length; i++) { + doubleValues[i] = ValueConverter.convertStringToDouble(stringValues[i]); + } + return doubleValues; + }; + CONVERTER[TSDataType.STRING.ordinal()][TSDataType.TEXT.ordinal()] = + (sourceDataType, targetDataType, sourceValues) -> { + final Binary[] stringValues = (Binary[]) sourceValues; + final Binary[] textValues = new Binary[stringValues.length]; + for (int i = 0; i < stringValues.length; i++) { + textValues[i] = ValueConverter.convertStringToText(stringValues[i]); + } + return textValues; + }; + CONVERTER[TSDataType.STRING.ordinal()][TSDataType.VECTOR.ordinal()] = DO_NOTHING_CONVERTER; + CONVERTER[TSDataType.STRING.ordinal()][TSDataType.UNKNOWN.ordinal()] = DO_NOTHING_CONVERTER; + CONVERTER[TSDataType.STRING.ordinal()][TSDataType.TIMESTAMP.ordinal()] = + (sourceDataType, targetDataType, sourceValues) -> { + final Binary[] stringValues = (Binary[]) sourceValues; + final long[] timestampValues = new long[stringValues.length]; + for (int i = 0; i < stringValues.length; i++) { + timestampValues[i] = ValueConverter.convertStringToTimestamp(stringValues[i]); + } + return timestampValues; + }; + CONVERTER[TSDataType.STRING.ordinal()][TSDataType.DATE.ordinal()] = + (sourceDataType, targetDataType, sourceValues) -> { + final Binary[] stringValues = (Binary[]) sourceValues; + final int[] dateValues = new int[stringValues.length]; + for (int i = 0; i < stringValues.length; i++) { + dateValues[i] = ValueConverter.convertStringToDate(stringValues[i]); + } + return dateValues; + }; + CONVERTER[TSDataType.STRING.ordinal()][TSDataType.BLOB.ordinal()] = + (sourceDataType, targetDataType, sourceValues) -> { + final Binary[] stringValues = (Binary[]) sourceValues; + final Binary[] blobValues = new Binary[stringValues.length]; + for (int i = 0; i < stringValues.length; i++) { + blobValues[i] = ValueConverter.convertStringToBlob(stringValues[i]); + } + return blobValues; + }; + CONVERTER[TSDataType.STRING.ordinal()][TSDataType.STRING.ordinal()] = + (sourceDataType, targetDataType, sourceValues) -> sourceValues; + } + + public static Object convert( + final TSDataType sourceDataType, final TSDataType targetDataType, final Object sourceValues) { + return sourceValues == null + ? null + : CONVERTER[sourceDataType.ordinal()][targetDataType.ordinal()].convert( + sourceDataType, targetDataType, sourceValues); + } + + private ArrayConverter() { + // forbidden to construct + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/transform/converter/ValueConverter.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/transform/converter/ValueConverter.java new file mode 100644 index 00000000000..098792fd0b3 --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/transform/converter/ValueConverter.java @@ -0,0 +1,787 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.pipe.receiver.transform.converter; + +import org.apache.iotdb.db.utils.DateTimeUtils; +import org.apache.iotdb.db.utils.TypeInferenceUtils; + +import org.apache.commons.lang3.StringUtils; +import org.apache.tsfile.common.conf.TSFileConfig; +import org.apache.tsfile.enums.TSDataType; +import org.apache.tsfile.utils.Binary; + +import java.time.ZoneId; + +public class ValueConverter { + + @FunctionalInterface + private interface Converter { + Object convert( + final TSDataType sourceDataType, final TSDataType targetDataType, final Object sourceValue); + } + + private static final Converter[][] CONVERTER = + new Converter[TSDataType.values().length][TSDataType.values().length]; + + private static final Converter DO_NOTHING_CONVERTER = + (sourceDataType, targetDataType, sourceValue) -> sourceValue; + + static { + for (final TSDataType sourceDataType : TSDataType.values()) { + for (final TSDataType targetDataType : TSDataType.values()) { + CONVERTER[sourceDataType.ordinal()][targetDataType.ordinal()] = DO_NOTHING_CONVERTER; + } + } + + // BOOLEAN + CONVERTER[TSDataType.BOOLEAN.ordinal()][TSDataType.BOOLEAN.ordinal()] = DO_NOTHING_CONVERTER; + CONVERTER[TSDataType.BOOLEAN.ordinal()][TSDataType.INT32.ordinal()] = + (sourceDataType, targetDataType, sourceValue) -> + convertBooleanToInt32((boolean) sourceValue); + CONVERTER[TSDataType.BOOLEAN.ordinal()][TSDataType.INT64.ordinal()] = + (sourceDataType, targetDataType, sourceValue) -> + convertBooleanToInt64((boolean) sourceValue); + CONVERTER[TSDataType.BOOLEAN.ordinal()][TSDataType.FLOAT.ordinal()] = + (sourceDataType, targetDataType, sourceValue) -> + convertBooleanToFloat((boolean) sourceValue); + CONVERTER[TSDataType.BOOLEAN.ordinal()][TSDataType.DOUBLE.ordinal()] = + (sourceDataType, targetDataType, sourceValue) -> + convertBooleanToDouble((boolean) sourceValue); + CONVERTER[TSDataType.BOOLEAN.ordinal()][TSDataType.TEXT.ordinal()] = + (sourceDataType, targetDataType, sourceValue) -> + convertBooleanToText((boolean) sourceValue); + CONVERTER[TSDataType.BOOLEAN.ordinal()][TSDataType.TIMESTAMP.ordinal()] = + (sourceDataType, targetDataType, sourceValue) -> + convertBooleanToTimestamp((boolean) sourceValue); + CONVERTER[TSDataType.BOOLEAN.ordinal()][TSDataType.DATE.ordinal()] = + (sourceDataType, targetDataType, sourceValue) -> + convertBooleanToDate((boolean) sourceValue); + CONVERTER[TSDataType.BOOLEAN.ordinal()][TSDataType.BLOB.ordinal()] = + (sourceDataType, targetDataType, sourceValue) -> + convertBooleanToBlob((boolean) sourceValue); + CONVERTER[TSDataType.BOOLEAN.ordinal()][TSDataType.STRING.ordinal()] = + (sourceDataType, targetDataType, sourceValue) -> + convertBooleanToString((boolean) sourceValue); + + // INT32 + CONVERTER[TSDataType.INT32.ordinal()][TSDataType.BOOLEAN.ordinal()] = + (sourceDataType, targetDataType, sourceValue) -> convertInt32ToBoolean((int) sourceValue); + CONVERTER[TSDataType.INT32.ordinal()][TSDataType.INT32.ordinal()] = DO_NOTHING_CONVERTER; + CONVERTER[TSDataType.INT32.ordinal()][TSDataType.INT64.ordinal()] = + (sourceDataType, targetDataType, sourceValue) -> convertInt32ToInt64((int) sourceValue); + CONVERTER[TSDataType.INT32.ordinal()][TSDataType.FLOAT.ordinal()] = + (sourceDataType, targetDataType, sourceValue) -> convertInt32ToFloat((int) sourceValue); + CONVERTER[TSDataType.INT32.ordinal()][TSDataType.DOUBLE.ordinal()] = + (sourceDataType, targetDataType, sourceValue) -> convertInt32ToDouble((int) sourceValue); + CONVERTER[TSDataType.INT32.ordinal()][TSDataType.TEXT.ordinal()] = + (sourceDataType, targetDataType, sourceValue) -> convertInt32ToText((int) sourceValue); + CONVERTER[TSDataType.INT32.ordinal()][TSDataType.TIMESTAMP.ordinal()] = + (sourceDataType, targetDataType, sourceValue) -> convertInt32ToTimestamp((int) sourceValue); + CONVERTER[TSDataType.INT32.ordinal()][TSDataType.DATE.ordinal()] = + (sourceDataType, targetDataType, sourceValue) -> convertInt32ToDate((int) sourceValue); + CONVERTER[TSDataType.INT32.ordinal()][TSDataType.BLOB.ordinal()] = + (sourceDataType, targetDataType, sourceValue) -> convertInt32ToBlob((int) sourceValue); + CONVERTER[TSDataType.INT32.ordinal()][TSDataType.STRING.ordinal()] = + (sourceDataType, targetDataType, sourceValue) -> convertInt32ToString((int) sourceValue); + + // INT64 + CONVERTER[TSDataType.INT64.ordinal()][TSDataType.BOOLEAN.ordinal()] = + (sourceDataType, targetDataType, sourceValue) -> convertInt64ToBoolean((long) sourceValue); + CONVERTER[TSDataType.INT64.ordinal()][TSDataType.INT32.ordinal()] = + (sourceDataType, targetDataType, sourceValue) -> convertInt64ToInt32((long) sourceValue); + CONVERTER[TSDataType.INT64.ordinal()][TSDataType.INT64.ordinal()] = DO_NOTHING_CONVERTER; + CONVERTER[TSDataType.INT64.ordinal()][TSDataType.FLOAT.ordinal()] = + (sourceDataType, targetDataType, sourceValue) -> convertInt64ToFloat((long) sourceValue); + CONVERTER[TSDataType.INT64.ordinal()][TSDataType.DOUBLE.ordinal()] = + (sourceDataType, targetDataType, sourceValue) -> convertInt64ToDouble((long) sourceValue); + CONVERTER[TSDataType.INT64.ordinal()][TSDataType.TEXT.ordinal()] = + (sourceDataType, targetDataType, sourceValue) -> convertInt64ToText((long) sourceValue); + CONVERTER[TSDataType.INT64.ordinal()][TSDataType.TIMESTAMP.ordinal()] = + (sourceDataType, targetDataType, sourceValue) -> + convertInt64ToTimestamp((long) sourceValue); + CONVERTER[TSDataType.INT64.ordinal()][TSDataType.DATE.ordinal()] = + (sourceDataType, targetDataType, sourceValue) -> convertInt64ToDate((long) sourceValue); + CONVERTER[TSDataType.INT64.ordinal()][TSDataType.BLOB.ordinal()] = + (sourceDataType, targetDataType, sourceValue) -> convertInt64ToBlob((long) sourceValue); + CONVERTER[TSDataType.INT64.ordinal()][TSDataType.STRING.ordinal()] = + (sourceDataType, targetDataType, sourceValue) -> convertInt64ToString((long) sourceValue); + + // FLOAT + CONVERTER[TSDataType.FLOAT.ordinal()][TSDataType.BOOLEAN.ordinal()] = + (sourceDataType, targetDataType, sourceValue) -> convertFloatToBoolean((float) sourceValue); + CONVERTER[TSDataType.FLOAT.ordinal()][TSDataType.INT32.ordinal()] = + (sourceDataType, targetDataType, sourceValue) -> convertFloatToInt32((float) sourceValue); + CONVERTER[TSDataType.FLOAT.ordinal()][TSDataType.INT64.ordinal()] = + (sourceDataType, targetDataType, sourceValue) -> convertFloatToInt64((float) sourceValue); + CONVERTER[TSDataType.FLOAT.ordinal()][TSDataType.FLOAT.ordinal()] = DO_NOTHING_CONVERTER; + CONVERTER[TSDataType.FLOAT.ordinal()][TSDataType.DOUBLE.ordinal()] = + (sourceDataType, targetDataType, sourceValue) -> convertFloatToDouble((float) sourceValue); + CONVERTER[TSDataType.FLOAT.ordinal()][TSDataType.TEXT.ordinal()] = + (sourceDataType, targetDataType, sourceValue) -> convertFloatToText((float) sourceValue); + CONVERTER[TSDataType.FLOAT.ordinal()][TSDataType.TIMESTAMP.ordinal()] = + (sourceDataType, targetDataType, sourceValue) -> + convertFloatToTimestamp((float) sourceValue); + CONVERTER[TSDataType.FLOAT.ordinal()][TSDataType.DATE.ordinal()] = + (sourceDataType, targetDataType, sourceValue) -> convertFloatToDate((float) sourceValue); + CONVERTER[TSDataType.FLOAT.ordinal()][TSDataType.BLOB.ordinal()] = + (sourceDataType, targetDataType, sourceValue) -> convertFloatToBlob((float) sourceValue); + CONVERTER[TSDataType.FLOAT.ordinal()][TSDataType.STRING.ordinal()] = + (sourceDataType, targetDataType, sourceValue) -> convertFloatToString((float) sourceValue); + + // DOUBLE + CONVERTER[TSDataType.DOUBLE.ordinal()][TSDataType.BOOLEAN.ordinal()] = + (sourceDataType, targetDataType, sourceValue) -> + convertDoubleToBoolean((double) sourceValue); + CONVERTER[TSDataType.DOUBLE.ordinal()][TSDataType.INT32.ordinal()] = + (sourceDataType, targetDataType, sourceValue) -> convertDoubleToInt32((double) sourceValue); + CONVERTER[TSDataType.DOUBLE.ordinal()][TSDataType.INT64.ordinal()] = + (sourceDataType, targetDataType, sourceValue) -> convertDoubleToInt64((double) sourceValue); + CONVERTER[TSDataType.DOUBLE.ordinal()][TSDataType.FLOAT.ordinal()] = + (sourceDataType, targetDataType, sourceValue) -> convertDoubleToFloat((double) sourceValue); + CONVERTER[TSDataType.DOUBLE.ordinal()][TSDataType.DOUBLE.ordinal()] = DO_NOTHING_CONVERTER; + CONVERTER[TSDataType.DOUBLE.ordinal()][TSDataType.TEXT.ordinal()] = + (sourceDataType, targetDataType, sourceValue) -> convertDoubleToText((double) sourceValue); + CONVERTER[TSDataType.DOUBLE.ordinal()][TSDataType.TIMESTAMP.ordinal()] = + (sourceDataType, targetDataType, sourceValue) -> + convertDoubleToTimestamp((double) sourceValue); + CONVERTER[TSDataType.DOUBLE.ordinal()][TSDataType.DATE.ordinal()] = + (sourceDataType, targetDataType, sourceValue) -> convertDoubleToDate((double) sourceValue); + CONVERTER[TSDataType.DOUBLE.ordinal()][TSDataType.BLOB.ordinal()] = + (sourceDataType, targetDataType, sourceValue) -> convertDoubleToBlob((double) sourceValue); + CONVERTER[TSDataType.DOUBLE.ordinal()][TSDataType.STRING.ordinal()] = + (sourceDataType, targetDataType, sourceValue) -> + convertDoubleToString((double) sourceValue); + + // TEXT + CONVERTER[TSDataType.TEXT.ordinal()][TSDataType.BOOLEAN.ordinal()] = + (sourceDataType, targetDataType, sourceValue) -> convertTextToBoolean((Binary) sourceValue); + CONVERTER[TSDataType.TEXT.ordinal()][TSDataType.INT32.ordinal()] = + (sourceDataType, targetDataType, sourceValue) -> convertTextToInt32((Binary) sourceValue); + CONVERTER[TSDataType.TEXT.ordinal()][TSDataType.INT64.ordinal()] = + (sourceDataType, targetDataType, sourceValue) -> convertTextToInt64((Binary) sourceValue); + CONVERTER[TSDataType.TEXT.ordinal()][TSDataType.FLOAT.ordinal()] = + (sourceDataType, targetDataType, sourceValue) -> convertTextToFloat((Binary) sourceValue); + CONVERTER[TSDataType.TEXT.ordinal()][TSDataType.DOUBLE.ordinal()] = + (sourceDataType, targetDataType, sourceValue) -> convertTextToDouble((Binary) sourceValue); + CONVERTER[TSDataType.TEXT.ordinal()][TSDataType.TEXT.ordinal()] = DO_NOTHING_CONVERTER; + CONVERTER[TSDataType.TEXT.ordinal()][TSDataType.TIMESTAMP.ordinal()] = + (sourceDataType, targetDataType, sourceValue) -> + convertTextToTimestamp((Binary) sourceValue); + CONVERTER[TSDataType.TEXT.ordinal()][TSDataType.DATE.ordinal()] = + (sourceDataType, targetDataType, sourceValue) -> convertTextToDate((Binary) sourceValue); + CONVERTER[TSDataType.TEXT.ordinal()][TSDataType.BLOB.ordinal()] = + (sourceDataType, targetDataType, sourceValue) -> convertTextToBlob((Binary) sourceValue); + CONVERTER[TSDataType.TEXT.ordinal()][TSDataType.STRING.ordinal()] = + (sourceDataType, targetDataType, sourceValue) -> convertTextToString((Binary) sourceValue); + + // TIMESTAMP + CONVERTER[TSDataType.TIMESTAMP.ordinal()][TSDataType.BOOLEAN.ordinal()] = + (sourceDataType, targetDataType, sourceValue) -> + convertTimestampToBoolean((long) sourceValue); + CONVERTER[TSDataType.TIMESTAMP.ordinal()][TSDataType.INT32.ordinal()] = + (sourceDataType, targetDataType, sourceValue) -> + convertTimestampToInt32((long) sourceValue); + CONVERTER[TSDataType.TIMESTAMP.ordinal()][TSDataType.INT64.ordinal()] = + (sourceDataType, targetDataType, sourceValue) -> + convertTimestampToInt64((long) sourceValue); + CONVERTER[TSDataType.TIMESTAMP.ordinal()][TSDataType.FLOAT.ordinal()] = + (sourceDataType, targetDataType, sourceValue) -> + convertTimestampToFloat((long) sourceValue); + CONVERTER[TSDataType.TIMESTAMP.ordinal()][TSDataType.DOUBLE.ordinal()] = + (sourceDataType, targetDataType, sourceValue) -> + convertTimestampToDouble((long) sourceValue); + CONVERTER[TSDataType.TIMESTAMP.ordinal()][TSDataType.TEXT.ordinal()] = + (sourceDataType, targetDataType, sourceValue) -> convertTimestampToText((long) sourceValue); + CONVERTER[TSDataType.TIMESTAMP.ordinal()][TSDataType.TIMESTAMP.ordinal()] = + DO_NOTHING_CONVERTER; + CONVERTER[TSDataType.TIMESTAMP.ordinal()][TSDataType.DATE.ordinal()] = + (sourceDataType, targetDataType, sourceValue) -> convertTimestampToDate((long) sourceValue); + CONVERTER[TSDataType.TIMESTAMP.ordinal()][TSDataType.BLOB.ordinal()] = + (sourceDataType, targetDataType, sourceValue) -> convertTimestampToBlob((long) sourceValue); + CONVERTER[TSDataType.TIMESTAMP.ordinal()][TSDataType.STRING.ordinal()] = + (sourceDataType, targetDataType, sourceValue) -> + convertTimestampToString((long) sourceValue); + + // DATE + CONVERTER[TSDataType.DATE.ordinal()][TSDataType.BOOLEAN.ordinal()] = + (sourceDataType, targetDataType, sourceValue) -> convertDateToBoolean((int) sourceValue); + CONVERTER[TSDataType.DATE.ordinal()][TSDataType.INT32.ordinal()] = + (sourceDataType, targetDataType, sourceValue) -> convertDateToInt32((int) sourceValue); + CONVERTER[TSDataType.DATE.ordinal()][TSDataType.INT64.ordinal()] = + (sourceDataType, targetDataType, sourceValue) -> convertDateToInt64((int) sourceValue); + CONVERTER[TSDataType.DATE.ordinal()][TSDataType.FLOAT.ordinal()] = + (sourceDataType, targetDataType, sourceValue) -> convertDateToFloat((int) sourceValue); + CONVERTER[TSDataType.DATE.ordinal()][TSDataType.DOUBLE.ordinal()] = + (sourceDataType, targetDataType, sourceValue) -> convertDateToDouble((int) sourceValue); + CONVERTER[TSDataType.DATE.ordinal()][TSDataType.TEXT.ordinal()] = + (sourceDataType, targetDataType, sourceValue) -> convertDateToText((int) sourceValue); + CONVERTER[TSDataType.DATE.ordinal()][TSDataType.TIMESTAMP.ordinal()] = + (sourceDataType, targetDataType, sourceValue) -> convertDateToTimestamp((int) sourceValue); + CONVERTER[TSDataType.DATE.ordinal()][TSDataType.DATE.ordinal()] = DO_NOTHING_CONVERTER; + CONVERTER[TSDataType.DATE.ordinal()][TSDataType.BLOB.ordinal()] = + (sourceDataType, targetDataType, sourceValue) -> convertDateToBlob((int) sourceValue); + CONVERTER[TSDataType.DATE.ordinal()][TSDataType.STRING.ordinal()] = + (sourceDataType, targetDataType, sourceValue) -> convertDateToString((int) sourceValue); + + // BLOB + CONVERTER[TSDataType.BLOB.ordinal()][TSDataType.BOOLEAN.ordinal()] = + (sourceDataType, targetDataType, sourceValue) -> convertBlobToBoolean((Binary) sourceValue); + CONVERTER[TSDataType.BLOB.ordinal()][TSDataType.INT32.ordinal()] = + (sourceDataType, targetDataType, sourceValue) -> convertBlobToInt32((Binary) sourceValue); + CONVERTER[TSDataType.BLOB.ordinal()][TSDataType.INT64.ordinal()] = + (sourceDataType, targetDataType, sourceValue) -> convertBlobToInt64((Binary) sourceValue); + CONVERTER[TSDataType.BLOB.ordinal()][TSDataType.FLOAT.ordinal()] = + (sourceDataType, targetDataType, sourceValue) -> convertBlobToFloat((Binary) sourceValue); + CONVERTER[TSDataType.BLOB.ordinal()][TSDataType.DOUBLE.ordinal()] = + (sourceDataType, targetDataType, sourceValue) -> convertBlobToDouble((Binary) sourceValue); + CONVERTER[TSDataType.BLOB.ordinal()][TSDataType.TEXT.ordinal()] = + (sourceDataType, targetDataType, sourceValue) -> convertBlobToText((Binary) sourceValue); + CONVERTER[TSDataType.BLOB.ordinal()][TSDataType.TIMESTAMP.ordinal()] = + (sourceDataType, targetDataType, sourceValue) -> + convertBlobToTimestamp((Binary) sourceValue); + CONVERTER[TSDataType.BLOB.ordinal()][TSDataType.DATE.ordinal()] = + (sourceDataType, targetDataType, sourceValue) -> convertBlobToDate((Binary) sourceValue); + CONVERTER[TSDataType.BLOB.ordinal()][TSDataType.BLOB.ordinal()] = DO_NOTHING_CONVERTER; + CONVERTER[TSDataType.BLOB.ordinal()][TSDataType.STRING.ordinal()] = + (sourceDataType, targetDataType, sourceValue) -> convertBlobToString((Binary) sourceValue); + + // STRING + CONVERTER[TSDataType.STRING.ordinal()][TSDataType.BOOLEAN.ordinal()] = + (sourceDataType, targetDataType, sourceValue) -> + convertStringToBoolean((Binary) sourceValue); + CONVERTER[TSDataType.STRING.ordinal()][TSDataType.INT32.ordinal()] = + (sourceDataType, targetDataType, sourceValue) -> convertStringToInt32((Binary) sourceValue); + CONVERTER[TSDataType.STRING.ordinal()][TSDataType.INT64.ordinal()] = + (sourceDataType, targetDataType, sourceValue) -> convertStringToInt64((Binary) sourceValue); + CONVERTER[TSDataType.STRING.ordinal()][TSDataType.FLOAT.ordinal()] = + (sourceDataType, targetDataType, sourceValue) -> convertStringToFloat((Binary) sourceValue); + CONVERTER[TSDataType.STRING.ordinal()][TSDataType.DOUBLE.ordinal()] = + (sourceDataType, targetDataType, sourceValue) -> + convertStringToDouble((Binary) sourceValue); + CONVERTER[TSDataType.STRING.ordinal()][TSDataType.TEXT.ordinal()] = + (sourceDataType, targetDataType, sourceValue) -> convertStringToText((Binary) sourceValue); + CONVERTER[TSDataType.STRING.ordinal()][TSDataType.TIMESTAMP.ordinal()] = + (sourceDataType, targetDataType, sourceValue) -> + convertStringToTimestamp((Binary) sourceValue); + CONVERTER[TSDataType.STRING.ordinal()][TSDataType.DATE.ordinal()] = + (sourceDataType, targetDataType, sourceValue) -> convertStringToDate((Binary) sourceValue); + CONVERTER[TSDataType.STRING.ordinal()][TSDataType.BLOB.ordinal()] = + (sourceDataType, targetDataType, sourceValue) -> convertStringToBlob((Binary) sourceValue); + CONVERTER[TSDataType.STRING.ordinal()][TSDataType.STRING.ordinal()] = DO_NOTHING_CONVERTER; + } + + public static Object convert( + final TSDataType sourceDataType, final TSDataType targetDataType, final Object sourceValue) { + return sourceValue == null + ? null + : CONVERTER[sourceDataType.ordinal()][targetDataType.ordinal()].convert( + sourceDataType, targetDataType, sourceValue); + } + + ////////////// BOOLEAN ////////////// + + private static final Binary BINARY_TRUE = parseString(Boolean.TRUE.toString()); + private static final Binary BINARY_FALSE = parseString(Boolean.FALSE.toString()); + + public static int convertBooleanToInt32(final boolean value) { + return value ? 1 : 0; + } + + public static long convertBooleanToInt64(final boolean value) { + return value ? 1L : 0L; + } + + public static float convertBooleanToFloat(final boolean value) { + return value ? 1.0f : 0.0f; + } + + public static double convertBooleanToDouble(final boolean value) { + return value ? 1.0 : 0.0; + } + + public static Binary convertBooleanToText(final boolean value) { + return value ? BINARY_TRUE : BINARY_FALSE; + } + + public static long convertBooleanToTimestamp(final boolean value) { + return value ? 1L : 0L; + } + + public static int convertBooleanToDate(final boolean value) { + return value ? 1 : 0; + } + + public static Binary convertBooleanToBlob(final boolean value) { + return value ? BINARY_TRUE : BINARY_FALSE; + } + + public static Binary convertBooleanToString(final boolean value) { + return value ? BINARY_TRUE : BINARY_FALSE; + } + + ///////////// INT32 ////////////// + + public static boolean convertInt32ToBoolean(final int value) { + return value != 0; + } + + public static long convertInt32ToInt64(final int value) { + return value; + } + + public static float convertInt32ToFloat(final int value) { + return value; + } + + public static double convertInt32ToDouble(final int value) { + return value; + } + + public static Binary convertInt32ToText(final int value) { + return parseText(Integer.toString(value)); + } + + public static long convertInt32ToTimestamp(final int value) { + return value; + } + + public static int convertInt32ToDate(final int value) { + return value; + } + + public static Binary convertInt32ToBlob(final int value) { + return parseBlob(Integer.toString(value)); + } + + public static Binary convertInt32ToString(final int value) { + return parseString(Integer.toString(value)); + } + + ///////////// INT64 ////////////// + + public static boolean convertInt64ToBoolean(final long value) { + return value != 0; + } + + public static int convertInt64ToInt32(final long value) { + return (int) value; + } + + public static float convertInt64ToFloat(final long value) { + return value; + } + + public static double convertInt64ToDouble(final long value) { + return value; + } + + public static Binary convertInt64ToText(final long value) { + return parseText(Long.toString(value)); + } + + public static long convertInt64ToTimestamp(final long value) { + return value; + } + + public static int convertInt64ToDate(final long value) { + return (int) value; + } + + public static Binary convertInt64ToBlob(final long value) { + return parseBlob(Long.toString(value)); + } + + public static Binary convertInt64ToString(final long value) { + return parseString(Long.toString(value)); + } + + ///////////// FLOAT ////////////// + + public static boolean convertFloatToBoolean(final float value) { + return value != 0; + } + + public static int convertFloatToInt32(final float value) { + return (int) value; + } + + public static long convertFloatToInt64(final float value) { + return (long) value; + } + + public static double convertFloatToDouble(final float value) { + return value; + } + + public static Binary convertFloatToText(final float value) { + return parseText(Float.toString(value)); + } + + public static long convertFloatToTimestamp(final float value) { + return (long) value; + } + + public static int convertFloatToDate(final float value) { + return (int) value; + } + + public static Binary convertFloatToBlob(final float value) { + return parseBlob(Float.toString(value)); + } + + public static Binary convertFloatToString(final float value) { + return parseString(Float.toString(value)); + } + + ///////////// DOUBLE ////////////// + + public static boolean convertDoubleToBoolean(final double value) { + return value != 0; + } + + public static int convertDoubleToInt32(final double value) { + return (int) value; + } + + public static long convertDoubleToInt64(final double value) { + return (long) value; + } + + public static float convertDoubleToFloat(final double value) { + return (float) value; + } + + public static Binary convertDoubleToText(final double value) { + return parseText(Double.toString(value)); + } + + public static long convertDoubleToTimestamp(final double value) { + return (long) value; + } + + public static int convertDoubleToDate(final double value) { + return (int) value; + } + + public static Binary convertDoubleToBlob(final double value) { + return parseBlob(Double.toString(value)); + } + + public static Binary convertDoubleToString(final double value) { + return parseString(Double.toString(value)); + } + + ///////////// TEXT ////////////// + + public static boolean convertTextToBoolean(final Binary value) { + return Boolean.parseBoolean(value.toString()); + } + + public static int convertTextToInt32(final Binary value) { + return parseInteger(value.toString()); + } + + public static long convertTextToInt64(final Binary value) { + return parseLong(value.toString()); + } + + public static float convertTextToFloat(final Binary value) { + return parseFloat(value.toString()); + } + + public static double convertTextToDouble(final Binary value) { + return parseDouble(value.toString()); + } + + public static long convertTextToTimestamp(final Binary value) { + return parseTimestamp(value.toString()); + } + + public static int convertTextToDate(final Binary value) { + return parseDate(value.toString()); + } + + public static Binary convertTextToBlob(final Binary value) { + return parseBlob(value.toString()); + } + + public static Binary convertTextToString(final Binary value) { + return value; + } + + ///////////// TIMESTAMP ////////////// + + public static boolean convertTimestampToBoolean(final long value) { + return value != 0; + } + + public static int convertTimestampToInt32(final long value) { + return (int) value; + } + + public static long convertTimestampToInt64(final long value) { + return value; + } + + public static float convertTimestampToFloat(final long value) { + return value; + } + + public static double convertTimestampToDouble(final long value) { + return value; + } + + public static Binary convertTimestampToText(final long value) { + return parseText(Long.toString(value)); + } + + public static int convertTimestampToDate(final long value) { + return (int) value; + } + + public static Binary convertTimestampToBlob(final long value) { + return parseBlob(Long.toString(value)); + } + + public static Binary convertTimestampToString(final long value) { + return parseString(Long.toString(value)); + } + + ///////////// DATE ////////////// + + public static boolean convertDateToBoolean(final int value) { + return value != 0; + } + + public static int convertDateToInt32(final int value) { + return value; + } + + public static long convertDateToInt64(final int value) { + return value; + } + + public static float convertDateToFloat(final int value) { + return value; + } + + public static double convertDateToDouble(final int value) { + return value; + } + + public static Binary convertDateToText(final int value) { + return parseText(Integer.toString(value)); + } + + public static long convertDateToTimestamp(final int value) { + return value; + } + + public static Binary convertDateToBlob(final int value) { + return parseBlob(Integer.toString(value)); + } + + public static Binary convertDateToString(final int value) { + return parseString(Integer.toString(value)); + } + + ///////////// BLOB ////////////// + + public static boolean convertBlobToBoolean(final Binary value) { + return Boolean.parseBoolean(value.toString()); + } + + public static int convertBlobToInt32(final Binary value) { + return parseInteger(value.toString()); + } + + public static long convertBlobToInt64(final Binary value) { + return parseLong(value.toString()); + } + + public static float convertBlobToFloat(final Binary value) { + return parseFloat(value.toString()); + } + + public static double convertBlobToDouble(final Binary value) { + return parseDouble(value.toString()); + } + + public static long convertBlobToTimestamp(final Binary value) { + return parseTimestamp(value.toString()); + } + + public static int convertBlobToDate(final Binary value) { + return parseDate(value.toString()); + } + + public static Binary convertBlobToString(final Binary value) { + return value; + } + + public static Binary convertBlobToText(final Binary value) { + return value; + } + + ///////////// STRING ////////////// + + public static boolean convertStringToBoolean(final Binary value) { + return Boolean.parseBoolean(value.toString()); + } + + public static int convertStringToInt32(final Binary value) { + return parseInteger(value.toString()); + } + + public static long convertStringToInt64(final Binary value) { + return parseLong(value.toString()); + } + + public static float convertStringToFloat(final Binary value) { + return parseFloat(value.toString()); + } + + public static double convertStringToDouble(final Binary value) { + return parseDouble(value.toString()); + } + + public static long convertStringToTimestamp(final Binary value) { + return parseTimestamp(value.toString()); + } + + public static int convertStringToDate(final Binary value) { + return parseDate(value.toString()); + } + + public static Binary convertStringToBlob(final Binary value) { + return parseBlob(value.toString()); + } + + public static Binary convertStringToText(final Binary value) { + return value; + } + + ///////////// UTILS ////////////// + + public static Object parse(final String value, final TSDataType dataType) { + if (value == null) { + return null; + } + switch (dataType) { + case BOOLEAN: + return Boolean.parseBoolean(value); + case INT32: + return parseInteger(value); + case INT64: + return parseLong(value); + case FLOAT: + return parseFloat(value); + case DOUBLE: + return parseDouble(value); + case TEXT: + return parseText(value); + case TIMESTAMP: + return parseTimestamp(value); + case DATE: + return parseDate(value); + case BLOB: + return parseBlob(value); + case STRING: + return parseString(value); + default: + throw new UnsupportedOperationException("Unsupported data type: " + dataType); + } + } + + private static Binary parseBlob(final String value) { + return new Binary(value, TSFileConfig.STRING_CHARSET); + } + + private static int parseInteger(final String value) { + try { + return Integer.parseInt(value); + } catch (Exception e) { + return 0; + } + } + + private static long parseLong(final String value) { + try { + return Long.parseLong(value); + } catch (Exception e) { + return 0L; + } + } + + private static float parseFloat(final String value) { + try { + return Float.parseFloat(value); + } catch (Exception e) { + return 0.0f; + } + } + + private static double parseDouble(final String value) { + try { + return Double.parseDouble(value); + } catch (Exception e) { + return 0.0d; + } + } + + private static long parseTimestamp(final String value) { + if (value == null || value.isEmpty()) { + return 0L; + } + try { + return TypeInferenceUtils.isNumber(value) + ? Long.parseLong(value) + : DateTimeUtils.parseDateTimeExpressionToLong( + StringUtils.trim(value), ZoneId.systemDefault()); + } catch (final Exception e) { + return 0L; + } + } + + private static int parseDate(final String value) { + if (value == null || value.isEmpty()) { + return 0; + } + try { + return TypeInferenceUtils.isNumber(value) + ? Integer.parseInt(value) + : DateTimeUtils.parseDateExpressionToInt(StringUtils.trim(value)); + } catch (final Exception e) { + return 0; + } + } + + private static Binary parseString(final String value) { + return new Binary(value, TSFileConfig.STRING_CHARSET); + } + + private static Binary parseText(final String value) { + return new Binary(value, TSFileConfig.STRING_CHARSET); + } + + private ValueConverter() { + // forbidden to construct + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/transform/statement/PipeConvertedInsertRowStatement.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/transform/statement/PipeConvertedInsertRowStatement.java new file mode 100644 index 00000000000..3df76b79ce4 --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/transform/statement/PipeConvertedInsertRowStatement.java @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.pipe.receiver.transform.statement; + +import org.apache.iotdb.commons.conf.IoTDBConstant; +import org.apache.iotdb.db.conf.IoTDBDescriptor; +import org.apache.iotdb.db.exception.metadata.PathNotExistException; +import org.apache.iotdb.db.exception.query.QueryProcessException; +import org.apache.iotdb.db.pipe.receiver.transform.converter.ValueConverter; +import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertRowStatement; + +import org.apache.tsfile.enums.TSDataType; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.time.ZoneId; + +public class PipeConvertedInsertRowStatement extends InsertRowStatement { + + private static final Logger LOGGER = + LoggerFactory.getLogger(PipeConvertedInsertRowStatement.class); + + public PipeConvertedInsertRowStatement(final InsertRowStatement insertRowStatement) { + super(); + // Statement + isDebug = insertRowStatement.isDebug(); + // InsertBaseStatement + devicePath = insertRowStatement.getDevicePath(); + isAligned = insertRowStatement.isAligned(); + measurementSchemas = insertRowStatement.getMeasurementSchemas(); + measurements = insertRowStatement.getMeasurements(); + dataTypes = insertRowStatement.getDataTypes(); + // InsertRowStatement + time = insertRowStatement.getTime(); + values = insertRowStatement.getValues(); + isNeedInferType = insertRowStatement.isNeedInferType(); + } + + @Override + protected boolean checkAndCastDataType(int columnIndex, TSDataType dataType) { + LOGGER.info( + "Pipe: Inserting row to {}.{}. Casting type from {} to {}.", + devicePath, + measurements[columnIndex], + dataTypes[columnIndex], + dataType); + values[columnIndex] = + ValueConverter.convert(dataTypes[columnIndex], dataType, values[columnIndex]); + dataTypes[columnIndex] = dataType; + return true; + } + + @Override + public void transferType(ZoneId zoneId) throws QueryProcessException { + for (int i = 0; i < measurementSchemas.length; i++) { + // null when time series doesn't exist + if (measurementSchemas[i] == null) { + if (!IoTDBDescriptor.getInstance().getConfig().isEnablePartialInsert()) { + throw new QueryProcessException( + new PathNotExistException( + devicePath.getFullPath() + IoTDBConstant.PATH_SEPARATOR + measurements[i])); + } else { + markFailedMeasurement( + i, + new QueryProcessException( + new PathNotExistException( + devicePath.getFullPath() + IoTDBConstant.PATH_SEPARATOR + measurements[i]))); + } + continue; + } + + // parse string value to specific type + dataTypes[i] = measurementSchemas[i].getType(); + try { + values[i] = ValueConverter.parse(values[i].toString(), dataTypes[i]); + } catch (Exception e) { + LOGGER.warn( + "data type of {}.{} is not consistent, " + + "registered type {}, inserting timestamp {}, value {}", + devicePath, + measurements[i], + dataTypes[i], + time, + values[i]); + if (!IoTDBDescriptor.getInstance().getConfig().isEnablePartialInsert()) { + throw e; + } else { + markFailedMeasurement(i, e); + } + } + } + + isNeedInferType = false; + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/transform/statement/PipeConvertedInsertTabletStatement.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/transform/statement/PipeConvertedInsertTabletStatement.java new file mode 100644 index 00000000000..2481034cbae --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/transform/statement/PipeConvertedInsertTabletStatement.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.pipe.receiver.transform.statement; + +import org.apache.iotdb.db.pipe.receiver.transform.converter.ArrayConverter; +import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertTabletStatement; + +import org.apache.tsfile.enums.TSDataType; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class PipeConvertedInsertTabletStatement extends InsertTabletStatement { + + private static final Logger LOGGER = + LoggerFactory.getLogger(PipeConvertedInsertTabletStatement.class); + + public PipeConvertedInsertTabletStatement(final InsertTabletStatement insertTabletStatement) { + super(); + // Statement + isDebug = insertTabletStatement.isDebug(); + // InsertBaseStatement + devicePath = insertTabletStatement.getDevicePath(); + isAligned = insertTabletStatement.isAligned(); + measurementSchemas = insertTabletStatement.getMeasurementSchemas(); + measurements = insertTabletStatement.getMeasurements(); + dataTypes = insertTabletStatement.getDataTypes(); + // InsertTabletStatement + times = insertTabletStatement.getTimes(); + bitMaps = insertTabletStatement.getBitMaps(); + columns = insertTabletStatement.getColumns(); + rowCount = insertTabletStatement.getRowCount(); + } + + @Override + protected boolean checkAndCastDataType(int columnIndex, TSDataType dataType) { + LOGGER.info( + "Pipe: Inserting tablet to {}.{}. Casting type from {} to {}.", + devicePath, + measurements[columnIndex], + dataTypes[columnIndex], + dataType); + columns[columnIndex] = + ArrayConverter.convert(dataTypes[columnIndex], dataType, columns[columnIndex]); + dataTypes[columnIndex] = dataType; + return true; + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeStatementDataTypeConvertExecutionVisitor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeStatementDataTypeConvertExecutionVisitor.java new file mode 100644 index 00000000000..69f456552f7 --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeStatementDataTypeConvertExecutionVisitor.java @@ -0,0 +1,173 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.pipe.receiver.visitor; + +import org.apache.iotdb.common.rpc.thrift.TSStatus; +import org.apache.iotdb.db.exception.metadata.DataTypeMismatchException; +import org.apache.iotdb.db.pipe.receiver.transform.statement.PipeConvertedInsertRowStatement; +import org.apache.iotdb.db.pipe.receiver.transform.statement.PipeConvertedInsertTabletStatement; +import org.apache.iotdb.db.queryengine.plan.statement.Statement; +import org.apache.iotdb.db.queryengine.plan.statement.StatementNode; +import org.apache.iotdb.db.queryengine.plan.statement.StatementVisitor; +import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertMultiTabletsStatement; +import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertRowStatement; +import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertRowsOfOneDeviceStatement; +import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertRowsStatement; +import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertTabletStatement; +import org.apache.iotdb.db.queryengine.plan.statement.crud.LoadTsFileStatement; +import org.apache.iotdb.rpc.TSStatusCode; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Optional; +import java.util.stream.Collectors; + +/** + * This visitor transforms the data type of the statement when the statement is executed and an + * exception occurs. The transformed statement (if any) is returned and will be executed again. + */ +public class PipeStatementDataTypeConvertExecutionVisitor + extends StatementVisitor<Optional<TSStatus>, TSStatus> { + + private static final Logger LOGGER = + LoggerFactory.getLogger(PipeStatementDataTypeConvertExecutionVisitor.class); + + @FunctionalInterface + public interface StatementExecutor { + TSStatus execute(final Statement statement); + } + + private final StatementExecutor statementExecutor; + + public PipeStatementDataTypeConvertExecutionVisitor(final StatementExecutor statementExecutor) { + this.statementExecutor = statementExecutor; + } + + private Optional<TSStatus> tryExecute(final Statement statement) { + try { + return Optional.of(statementExecutor.execute(statement)); + } catch (final Exception e) { + LOGGER.warn("Failed to execute statement after data type conversion.", e); + return Optional.empty(); + } + } + + @Override + public Optional<TSStatus> visitNode(final StatementNode statementNode, final TSStatus status) { + return Optional.empty(); + } + + @Override + public Optional<TSStatus> visitLoadFile( + final LoadTsFileStatement loadTsFileStatement, final TSStatus status) { + // TODO: judge if the exception is caused by data type mismatch + // TODO: convert the data type of the statement + return visitStatement(loadTsFileStatement, status); + } + + @Override + public Optional<TSStatus> visitInsertRow( + final InsertRowStatement insertRowStatement, final TSStatus status) { + return status.getCode() == TSStatusCode.METADATA_ERROR.getStatusCode() + && status.getMessage() != null + && status.getMessage().contains(DataTypeMismatchException.REGISTERED_TYPE_STRING) + ? tryExecute(new PipeConvertedInsertRowStatement(insertRowStatement)) + : Optional.empty(); + } + + @Override + public Optional<TSStatus> visitInsertRows( + final InsertRowsStatement insertRowsStatement, final TSStatus status) { + if (!((status.getCode() == TSStatusCode.METADATA_ERROR.getStatusCode() + || status.getCode() == TSStatusCode.MULTIPLE_ERROR.getStatusCode()) + && status.toString().contains(DataTypeMismatchException.REGISTERED_TYPE_STRING))) { + return Optional.empty(); + } + + if (insertRowsStatement.getInsertRowStatementList() == null + || insertRowsStatement.getInsertRowStatementList().isEmpty()) { + return Optional.empty(); + } + + final InsertRowsStatement convertedInsertRowsStatement = new InsertRowsStatement(); + convertedInsertRowsStatement.setInsertRowStatementList( + insertRowsStatement.getInsertRowStatementList().stream() + .map(PipeConvertedInsertRowStatement::new) + .collect(Collectors.toList())); + return tryExecute(convertedInsertRowsStatement); + } + + @Override + public Optional<TSStatus> visitInsertRowsOfOneDevice( + final InsertRowsOfOneDeviceStatement insertRowsOfOneDeviceStatement, final TSStatus status) { + if (!((status.getCode() == TSStatusCode.METADATA_ERROR.getStatusCode() + || status.getCode() == TSStatusCode.MULTIPLE_ERROR.getStatusCode()) + && status.toString().contains(DataTypeMismatchException.REGISTERED_TYPE_STRING))) { + return Optional.empty(); + } + + if (insertRowsOfOneDeviceStatement.getInsertRowStatementList() == null + || insertRowsOfOneDeviceStatement.getInsertRowStatementList().isEmpty()) { + return Optional.empty(); + } + + final InsertRowsOfOneDeviceStatement convertedInsertRowsOfOneDeviceStatement = + new InsertRowsOfOneDeviceStatement(); + convertedInsertRowsOfOneDeviceStatement.setInsertRowStatementList( + insertRowsOfOneDeviceStatement.getInsertRowStatementList().stream() + .map(PipeConvertedInsertRowStatement::new) + .collect(Collectors.toList())); + return tryExecute(convertedInsertRowsOfOneDeviceStatement); + } + + @Override + public Optional<TSStatus> visitInsertTablet( + final InsertTabletStatement insertTabletStatement, final TSStatus status) { + return status.getCode() == TSStatusCode.METADATA_ERROR.getStatusCode() + && status.getMessage() != null + && status.getMessage().contains(DataTypeMismatchException.REGISTERED_TYPE_STRING) + ? tryExecute(new PipeConvertedInsertTabletStatement(insertTabletStatement)) + : Optional.empty(); + } + + @Override + public Optional<TSStatus> visitInsertMultiTablets( + final InsertMultiTabletsStatement insertMultiTabletsStatement, final TSStatus status) { + if (!((status.getCode() == TSStatusCode.METADATA_ERROR.getStatusCode() + || status.getCode() == TSStatusCode.MULTIPLE_ERROR.getStatusCode()) + && status.toString().contains(DataTypeMismatchException.REGISTERED_TYPE_STRING))) { + return Optional.empty(); + } + + if (insertMultiTabletsStatement.getInsertTabletStatementList() == null + || insertMultiTabletsStatement.getInsertTabletStatementList().isEmpty()) { + return Optional.empty(); + } + + final InsertMultiTabletsStatement convertedInsertMultiTabletsStatement = + new InsertMultiTabletsStatement(); + convertedInsertMultiTabletsStatement.setInsertTabletStatementList( + insertMultiTabletsStatement.getInsertTabletStatementList().stream() + .map(PipeConvertedInsertTabletStatement::new) + .collect(Collectors.toList())); + return tryExecute(convertedInsertMultiTabletsStatement); + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertBaseStatement.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertBaseStatement.java index d0aa848504b..8bccc30f493 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertBaseStatement.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertBaseStatement.java @@ -82,7 +82,7 @@ public abstract class InsertBaseStatement extends Statement { // region params used by analyzing logical views. /** This param records the logical view schema appeared in this statement. */ - List<LogicalViewSchema> logicalViewSchemaList; + protected List<LogicalViewSchema> logicalViewSchemaList; /** * This param records the index of the location where the source of this view should be placed. @@ -90,13 +90,13 @@ public abstract class InsertBaseStatement extends Statement { * <p>For example, indexListOfLogicalViewPaths[alpha] = beta means source of * logicalViewSchemaList[alpha] should be filled into measurementSchemas[beta]. */ - List<Integer> indexOfSourcePathsOfLogicalViews; + protected List<Integer> indexOfSourcePathsOfLogicalViews; /** it is the end of last range, the beginning of current range. */ - int recordedBeginOfLogicalViewSchemaList = 0; + protected int recordedBeginOfLogicalViewSchemaList = 0; /** it is the end of current range. */ - int recordedEndOfLogicalViewSchemaList = 0; + protected int recordedEndOfLogicalViewSchemaList = 0; @TableModel private String databaseName; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertRowStatement.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertRowStatement.java index bec3192c0a1..0ac735450f8 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertRowStatement.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertRowStatement.java @@ -66,18 +66,18 @@ public class InsertRowStatement extends InsertBaseStatement implements ISchemaVa private static final Logger LOGGER = LoggerFactory.getLogger(InsertRowStatement.class); - private static final byte TYPE_RAW_STRING = -1; - private static final byte TYPE_NULL = -2; + protected static final byte TYPE_RAW_STRING = -1; + protected static final byte TYPE_NULL = -2; - private long time; - private Object[] values; - private boolean isNeedInferType = false; + protected long time; + protected Object[] values; + protected boolean isNeedInferType = false; /** * This param record whether the source of logical view is aligned. Only used when there are * views. */ - private boolean[] measurementIsAligned; + protected boolean[] measurementIsAligned; private IDeviceID deviceID; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertTabletStatement.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertTabletStatement.java index dc0bad4c885..988abea1add 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertTabletStatement.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertTabletStatement.java @@ -65,19 +65,19 @@ public class InsertTabletStatement extends InsertBaseStatement implements ISchem private static final String DATATYPE_UNSUPPORTED = "Data type %s is not supported."; - private long[] times; // times should be sorted. It is done in the session API. - private BitMap[] bitMaps; - private Object[] columns; + protected long[] times; // times should be sorted. It is done in the session API. + protected BitMap[] bitMaps; + protected Object[] columns; private IDeviceID[] deviceIDs; - private int rowCount = 0; + protected int rowCount = 0; /** * This param record whether the source of logical view is aligned. Only used when there are * views. */ - private boolean[] measurementIsAligned; + protected boolean[] measurementIsAligned; public InsertTabletStatement() { super(); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/TypeInferenceUtils.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/TypeInferenceUtils.java index b8721a59d8c..8fc1d647dc3 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/TypeInferenceUtils.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/TypeInferenceUtils.java @@ -48,7 +48,7 @@ public class TypeInferenceUtils { return s.length() >= 3 && s.startsWith("X'") && s.endsWith("'"); } - static boolean isNumber(String s) { + public static boolean isNumber(String s) { if (s == null || s.equals("NaN")) { return false; } diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeConnectorConstant.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeConnectorConstant.java index 62af58eece4..87ec0aaaeae 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeConnectorConstant.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeConnectorConstant.java @@ -81,6 +81,13 @@ public class PipeConnectorConstant { public static final String SINK_IOTDB_PASSWORD_KEY = "sink.password"; public static final String CONNECTOR_IOTDB_PASSWORD_DEFAULT_VALUE = "root"; + public static final String CONNECTOR_EXCEPTION_DATA_CONVERT_ON_TYPE_MISMATCH_KEY = + "connector.exception.data.convert-on-type-mismatch"; + public static final String SINK_EXCEPTION_DATA_CONVERT_ON_TYPE_MISMATCH_KEY = + "sink.exception.data.convert-on-type-mismatch"; + public static final boolean CONNECTOR_EXCEPTION_DATA_CONVERT_ON_TYPE_MISMATCH_DEFAULT_VALUE = + true; + public static final String CONNECTOR_EXCEPTION_CONFLICT_RESOLVE_STRATEGY_KEY = "connector.exception.conflict.resolve-strategy"; public static final String SINK_EXCEPTION_CONFLICT_RESOLVE_STRATEGY_KEY = diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/client/IoTDBSyncClientManager.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/client/IoTDBSyncClientManager.java index 667313e1976..57b9a4fede4 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/client/IoTDBSyncClientManager.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/client/IoTDBSyncClientManager.java @@ -60,13 +60,16 @@ public abstract class IoTDBSyncClientManager extends IoTDBClientManager implemen private final LoadBalancer loadBalancer; + private final boolean shouldReceiverConvertOnTypeMismatch; + protected IoTDBSyncClientManager( List<TEndPoint> endPoints, boolean useSSL, String trustStorePath, String trustStorePwd, boolean useLeaderCache, - String loadBalanceStrategy) { + String loadBalanceStrategy, + boolean shouldReceiverConvertOnTypeMismatch) { super(endPoints, useLeaderCache); this.useSSL = useSSL; @@ -93,6 +96,8 @@ public abstract class IoTDBSyncClientManager extends IoTDBClientManager implemen loadBalanceStrategy); loadBalancer = new RoundRobinLoadBalancer(); } + + this.shouldReceiverConvertOnTypeMismatch = shouldReceiverConvertOnTypeMismatch; } public void checkClientStatusAndTryReconstructIfNecessary() { @@ -169,6 +174,9 @@ public abstract class IoTDBSyncClientManager extends IoTDBClientManager implemen PipeTransferHandshakeConstant.HANDSHAKE_KEY_TIME_PRECISION, CommonDescriptor.getInstance().getConfig().getTimestampPrecision()); params.put(PipeTransferHandshakeConstant.HANDSHAKE_KEY_CLUSTER_ID, getClusterId()); + params.put( + PipeTransferHandshakeConstant.HANDSHAKE_KEY_CONVERT_ON_TYPE_MISMATCH, + Boolean.toString(shouldReceiverConvertOnTypeMismatch)); // Try to handshake by PipeTransferHandshakeV2Req. TPipeTransferResp resp = client.pipeTransfer(buildHandshakeV2Req(params)); diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/payload/thrift/common/PipeTransferHandshakeConstant.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/payload/thrift/common/PipeTransferHandshakeConstant.java index 07c7bb46390..b6864706aaa 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/payload/thrift/common/PipeTransferHandshakeConstant.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/payload/thrift/common/PipeTransferHandshakeConstant.java @@ -23,6 +23,7 @@ public class PipeTransferHandshakeConstant { public static final String HANDSHAKE_KEY_TIME_PRECISION = "timestampPrecision"; public static final String HANDSHAKE_KEY_CLUSTER_ID = "clusterID"; + public static final String HANDSHAKE_KEY_CONVERT_ON_TYPE_MISMATCH = "convertOnTypeMismatch"; private PipeTransferHandshakeConstant() { // Utility class diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBConnector.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBConnector.java index 6ac6f9432df..9dc08f93402 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBConnector.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBConnector.java @@ -62,6 +62,8 @@ import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstan import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_EXCEPTION_CONFLICT_RESOLVE_STRATEGY_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_EXCEPTION_CONFLICT_RETRY_MAX_TIME_SECONDS_DEFAULT_VALUE; import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_EXCEPTION_CONFLICT_RETRY_MAX_TIME_SECONDS_KEY; +import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_EXCEPTION_DATA_CONVERT_ON_TYPE_MISMATCH_DEFAULT_VALUE; +import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_EXCEPTION_DATA_CONVERT_ON_TYPE_MISMATCH_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_EXCEPTION_OTHERS_RECORD_IGNORED_DATA_DEFAULT_VALUE; import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_EXCEPTION_OTHERS_RECORD_IGNORED_DATA_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_EXCEPTION_OTHERS_RETRY_MAX_TIME_SECONDS_DEFAULT_VALUE; @@ -88,6 +90,7 @@ import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstan import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_EXCEPTION_CONFLICT_RECORD_IGNORED_DATA_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_EXCEPTION_CONFLICT_RESOLVE_STRATEGY_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_EXCEPTION_CONFLICT_RETRY_MAX_TIME_SECONDS_KEY; +import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_EXCEPTION_DATA_CONVERT_ON_TYPE_MISMATCH_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_EXCEPTION_OTHERS_RECORD_IGNORED_DATA_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_EXCEPTION_OTHERS_RETRY_MAX_TIME_SECONDS_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_FORMAT_KEY; @@ -124,6 +127,8 @@ public abstract class IoTDBConnector implements PipeConnector { protected boolean isTabletBatchModeEnabled = true; protected PipeReceiverStatusHandler receiverStatusHandler; + protected boolean shouldReceiverConvertOnTypeMismatch = + CONNECTOR_EXCEPTION_DATA_CONVERT_ON_TYPE_MISMATCH_DEFAULT_VALUE; @Override public void validate(final PipeParameterValidator validator) throws Exception { @@ -312,6 +317,16 @@ public abstract class IoTDBConnector implements PipeConnector { CONNECTOR_EXCEPTION_OTHERS_RECORD_IGNORED_DATA_KEY, SINK_EXCEPTION_OTHERS_RECORD_IGNORED_DATA_KEY), CONNECTOR_EXCEPTION_OTHERS_RECORD_IGNORED_DATA_DEFAULT_VALUE)); + shouldReceiverConvertOnTypeMismatch = + parameters.getBooleanOrDefault( + Arrays.asList( + CONNECTOR_EXCEPTION_DATA_CONVERT_ON_TYPE_MISMATCH_KEY, + SINK_EXCEPTION_DATA_CONVERT_ON_TYPE_MISMATCH_KEY), + CONNECTOR_EXCEPTION_DATA_CONVERT_ON_TYPE_MISMATCH_DEFAULT_VALUE); + LOGGER.info( + "IoTDBConnector {} = {}", + CONNECTOR_EXCEPTION_DATA_CONVERT_ON_TYPE_MISMATCH_KEY, + shouldReceiverConvertOnTypeMismatch); } protected LinkedHashSet<TEndPoint> parseNodeUrls(final PipeParameters parameters) diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBSslSyncConnector.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBSslSyncConnector.java index be93018bf9c..271596cf549 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBSslSyncConnector.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBSslSyncConnector.java @@ -116,7 +116,13 @@ public abstract class IoTDBSslSyncConnector extends IoTDBConnector { clientManager = constructClient( - nodeUrls, useSSL, trustStorePath, trustStorePwd, useLeaderCache, loadBalanceStrategy); + nodeUrls, + useSSL, + trustStorePath, + trustStorePwd, + useLeaderCache, + loadBalanceStrategy, + shouldReceiverConvertOnTypeMismatch); } protected abstract IoTDBSyncClientManager constructClient( @@ -125,7 +131,8 @@ public abstract class IoTDBSslSyncConnector extends IoTDBConnector { String trustStorePath, String trustStorePwd, boolean useLeaderCache, - String loadBalanceStrategy); + String loadBalanceStrategy, + boolean shouldReceiverConvertOnTypeMismatch); @Override public void handshake() throws Exception { diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/IoTDBFileReceiver.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/IoTDBFileReceiver.java index b60dd1d5e1c..7197e112526 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/IoTDBFileReceiver.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/IoTDBFileReceiver.java @@ -50,6 +50,8 @@ import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; +import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_EXCEPTION_DATA_CONVERT_ON_TYPE_MISMATCH_DEFAULT_VALUE; + /** * {@link IoTDBFileReceiver} is the parent class of receiver on both configNode and DataNode, * handling all the logic of parallel file receiving. @@ -66,6 +68,9 @@ public abstract class IoTDBFileReceiver implements IoTDBReceiver { private File writingFile; private RandomAccessFile writingFileWriter; + protected boolean shouldConvertDataTypeOnTypeMismatch = + CONNECTOR_EXCEPTION_DATA_CONVERT_ON_TYPE_MISMATCH_DEFAULT_VALUE; + @Override public IoTDBConnectorRequestVersion getVersion() { return IoTDBConnectorRequestVersion.VERSION_1; @@ -216,6 +221,13 @@ public abstract class IoTDBFileReceiver implements IoTDBReceiver { return new TPipeTransferResp(status); } + final String shouldConvertDataTypeOnTypeMismatchString = + req.getParams().get(PipeTransferHandshakeConstant.HANDSHAKE_KEY_CONVERT_ON_TYPE_MISMATCH); + if (shouldConvertDataTypeOnTypeMismatchString != null) { + shouldConvertDataTypeOnTypeMismatch = + Boolean.parseBoolean(shouldConvertDataTypeOnTypeMismatchString); + } + // Handle the handshake request as a v1 request. // Here we construct a fake "dataNode" request to valid from v1 validation logic, though // it may not require the actual type of the v1 request.
