This is an automated email from the ASF dual-hosted git repository. rong pushed a commit to branch convert-on-type-mismatch in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 868f030615d4e04ec506f41ab711bc8b67745e4e Author: Steve Yurong Su <[email protected]> AuthorDate: Tue Aug 6 12:11:59 2024 +0800 sender: convert on data type mismatch --- .../client/IoTDBConfigNodeSyncClientManager.java | 12 ++++++++++-- .../protocol/IoTDBConfigRegionAirGapConnector.java | 3 +++ .../connector/protocol/IoTDBConfigRegionConnector.java | 10 ++++++++-- .../connector/client/IoTDBDataNodeAsyncClientManager.java | 12 +++++++++++- .../connector/client/IoTDBDataNodeSyncClientManager.java | 12 ++++++++++-- .../protocol/airgap/IoTDBDataNodeAirGapConnector.java | 3 +++ .../thrift/async/IoTDBDataRegionAsyncConnector.java | 3 ++- .../protocol/thrift/sync/IoTDBDataNodeSyncConnector.java | 11 +++++++++-- .../pipe/config/constant/PipeConnectorConstant.java | 7 +++++++ .../pipe/connector/client/IoTDBSyncClientManager.java | 10 +++++++++- .../thrift/common/PipeTransferHandshakeConstant.java | 1 + .../commons/pipe/connector/protocol/IoTDBConnector.java | 15 +++++++++++++++ .../pipe/connector/protocol/IoTDBSslSyncConnector.java | 11 +++++++++-- .../iotdb/commons/pipe/receiver/IoTDBFileReceiver.java | 12 ++++++++++++ 14 files changed, 109 insertions(+), 13 deletions(-) diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/client/IoTDBConfigNodeSyncClientManager.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/client/IoTDBConfigNodeSyncClientManager.java index e420a6b5c6d..fd5b3c9ddb9 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/client/IoTDBConfigNodeSyncClientManager.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/client/IoTDBConfigNodeSyncClientManager.java @@ -38,8 +38,16 @@ public class IoTDBConfigNodeSyncClientManager extends IoTDBSyncClientManager { boolean useSSL, String trustStorePath, String trustStorePwd, - String loadBalanceStrategy) { - super(endPoints, useSSL, trustStorePath, trustStorePwd, false, loadBalanceStrategy); + String loadBalanceStrategy, + boolean shouldReceiverConvertOnTypeMismatch) { + super( + endPoints, + useSSL, + trustStorePath, + trustStorePwd, + false, + loadBalanceStrategy, + shouldReceiverConvertOnTypeMismatch); } @Override diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/protocol/IoTDBConfigRegionAirGapConnector.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/protocol/IoTDBConfigRegionAirGapConnector.java index 1c5febcb4bf..ba151351c39 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/protocol/IoTDBConfigRegionAirGapConnector.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/protocol/IoTDBConfigRegionAirGapConnector.java @@ -68,6 +68,9 @@ public class IoTDBConfigRegionAirGapConnector extends IoTDBAirGapConnector { params.put( PipeTransferHandshakeConstant.HANDSHAKE_KEY_TIME_PRECISION, CommonDescriptor.getInstance().getConfig().getTimestampPrecision()); + params.put( + PipeTransferHandshakeConstant.HANDSHAKE_KEY_CONVERT_ON_TYPE_MISMATCH, + Boolean.toString(shouldReceiverConvertOnTypeMismatch)); return PipeTransferConfigNodeHandshakeV2Req.toTPipeTransferBytes(params); } diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/protocol/IoTDBConfigRegionConnector.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/protocol/IoTDBConfigRegionConnector.java index 9ba081018b9..96059980c38 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/protocol/IoTDBConfigRegionConnector.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/protocol/IoTDBConfigRegionConnector.java @@ -62,9 +62,15 @@ public class IoTDBConfigRegionConnector extends IoTDBSslSyncConnector { final String trustStorePath, final String trustStorePwd, final boolean useLeaderCache, - final String loadBalanceStrategy) { + final String loadBalanceStrategy, + final boolean shouldReceiverConvertOnTypeMismatch) { return new IoTDBConfigNodeSyncClientManager( - nodeUrls, useSSL, trustStorePath, trustStorePwd, loadBalanceStrategy); + nodeUrls, + useSSL, + trustStorePath, + trustStorePwd, + loadBalanceStrategy, + shouldReceiverConvertOnTypeMismatch); } @Override diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeAsyncClientManager.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeAsyncClientManager.java index eff84ab5a5a..6814f8151b9 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeAsyncClientManager.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeAsyncClientManager.java @@ -67,8 +67,13 @@ public class IoTDBDataNodeAsyncClientManager extends IoTDBClientManager private final LoadBalancer loadBalancer; + private final boolean shouldReceiverConvertOnTypeMismatch; + public IoTDBDataNodeAsyncClientManager( - List<TEndPoint> endPoints, boolean useLeaderCache, String loadBalanceStrategy) { + List<TEndPoint> endPoints, + boolean useLeaderCache, + String loadBalanceStrategy, + boolean shouldReceiverConvertOnTypeMismatch) { super(endPoints, useLeaderCache); endPointSet = new HashSet<>(endPoints); @@ -101,6 +106,8 @@ public class IoTDBDataNodeAsyncClientManager extends IoTDBClientManager loadBalanceStrategy); loadBalancer = new RoundRobinLoadBalancer(); } + + this.shouldReceiverConvertOnTypeMismatch = shouldReceiverConvertOnTypeMismatch; } public AsyncPipeDataTransferServiceClient borrowClient() throws Exception { @@ -209,6 +216,9 @@ public class IoTDBDataNodeAsyncClientManager extends IoTDBClientManager params.put( PipeTransferHandshakeConstant.HANDSHAKE_KEY_TIME_PRECISION, CommonDescriptor.getInstance().getConfig().getTimestampPrecision()); + params.put( + PipeTransferHandshakeConstant.HANDSHAKE_KEY_CONVERT_ON_TYPE_MISMATCH, + Boolean.toString(shouldReceiverConvertOnTypeMismatch)); client.setTimeoutDynamically(PipeConfig.getInstance().getPipeConnectorHandshakeTimeoutMs()); client.pipeTransfer(PipeTransferDataNodeHandshakeV2Req.toTPipeTransferReq(params), callback); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeSyncClientManager.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeSyncClientManager.java index 2c23cba4542..df9ae2a0e29 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeSyncClientManager.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeSyncClientManager.java @@ -48,8 +48,16 @@ public class IoTDBDataNodeSyncClientManager extends IoTDBSyncClientManager String trustStorePath, String trustStorePwd, boolean useLeaderCache, - String loadBalanceStrategy) { - super(endPoints, useSSL, trustStorePath, trustStorePwd, useLeaderCache, loadBalanceStrategy); + String loadBalanceStrategy, + boolean shouldReceiverConvertOnTypeMismatch) { + super( + endPoints, + useSSL, + trustStorePath, + trustStorePwd, + useLeaderCache, + loadBalanceStrategy, + shouldReceiverConvertOnTypeMismatch); } @Override diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBDataNodeAirGapConnector.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBDataNodeAirGapConnector.java index b66c0012700..f0491431196 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBDataNodeAirGapConnector.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBDataNodeAirGapConnector.java @@ -100,6 +100,9 @@ public abstract class IoTDBDataNodeAirGapConnector extends IoTDBAirGapConnector params.put( PipeTransferHandshakeConstant.HANDSHAKE_KEY_TIME_PRECISION, CommonDescriptor.getInstance().getConfig().getTimestampPrecision()); + params.put( + PipeTransferHandshakeConstant.HANDSHAKE_KEY_CONVERT_ON_TYPE_MISMATCH, + Boolean.toString(shouldReceiverConvertOnTypeMismatch)); return PipeTransferDataNodeHandshakeV2Req.toTPipeTransferBytes(params); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java index 255e5adec76..43c5fa6eff2 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java @@ -124,7 +124,8 @@ public class IoTDBDataRegionAsyncConnector extends IoTDBConnector { parameters.getBooleanOrDefault( Arrays.asList(SINK_LEADER_CACHE_ENABLE_KEY, CONNECTOR_LEADER_CACHE_ENABLE_KEY), CONNECTOR_LEADER_CACHE_ENABLE_DEFAULT_VALUE), - loadBalanceStrategy); + loadBalanceStrategy, + shouldReceiverConvertOnTypeMismatch); if (isTabletBatchModeEnabled) { tabletBatchBuilder = new PipeTransferBatchReqBuilder(parameters); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBDataNodeSyncConnector.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBDataNodeSyncConnector.java index 9ad83ee84e2..0c297343b48 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBDataNodeSyncConnector.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBDataNodeSyncConnector.java @@ -86,10 +86,17 @@ public abstract class IoTDBDataNodeSyncConnector extends IoTDBSslSyncConnector { final String trustStorePath, final String trustStorePwd, final boolean useLeaderCache, - final String loadBalanceStrategy) { + final String loadBalanceStrategy, + boolean shouldReceiverConvertOnTypeMismatch) { clientManager = new IoTDBDataNodeSyncClientManager( - nodeUrls, useSSL, trustStorePath, trustStorePwd, useLeaderCache, loadBalanceStrategy); + nodeUrls, + useSSL, + trustStorePath, + trustStorePwd, + useLeaderCache, + loadBalanceStrategy, + shouldReceiverConvertOnTypeMismatch); return clientManager; } diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeConnectorConstant.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeConnectorConstant.java index 61e773da3e4..65c76dd7067 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeConnectorConstant.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeConnectorConstant.java @@ -81,6 +81,13 @@ public class PipeConnectorConstant { public static final String SINK_IOTDB_PASSWORD_KEY = "sink.password"; public static final String CONNECTOR_IOTDB_PASSWORD_DEFAULT_VALUE = "root"; + public static final String CONNECTOR_EXCEPTION_DATA_CONVERT_ON_TYPE_MISMATCH_KEY = + "connector.exception.data.convert-on-type-mismatch"; + public static final String SINK_EXCEPTION_DATA_CONVERT_ON_TYPE_MISMATCH_KEY = + "sink.exception.data.convert-on-type-mismatch"; + public static final boolean CONNECTOR_EXCEPTION_DATA_CONVERT_ON_TYPE_MISMATCH_DEFAULT_VALUE = + true; + public static final String CONNECTOR_EXCEPTION_CONFLICT_RESOLVE_STRATEGY_KEY = "connector.exception.conflict.resolve-strategy"; public static final String SINK_EXCEPTION_CONFLICT_RESOLVE_STRATEGY_KEY = diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/client/IoTDBSyncClientManager.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/client/IoTDBSyncClientManager.java index 667313e1976..57b9a4fede4 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/client/IoTDBSyncClientManager.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/client/IoTDBSyncClientManager.java @@ -60,13 +60,16 @@ public abstract class IoTDBSyncClientManager extends IoTDBClientManager implemen private final LoadBalancer loadBalancer; + private final boolean shouldReceiverConvertOnTypeMismatch; + protected IoTDBSyncClientManager( List<TEndPoint> endPoints, boolean useSSL, String trustStorePath, String trustStorePwd, boolean useLeaderCache, - String loadBalanceStrategy) { + String loadBalanceStrategy, + boolean shouldReceiverConvertOnTypeMismatch) { super(endPoints, useLeaderCache); this.useSSL = useSSL; @@ -93,6 +96,8 @@ public abstract class IoTDBSyncClientManager extends IoTDBClientManager implemen loadBalanceStrategy); loadBalancer = new RoundRobinLoadBalancer(); } + + this.shouldReceiverConvertOnTypeMismatch = shouldReceiverConvertOnTypeMismatch; } public void checkClientStatusAndTryReconstructIfNecessary() { @@ -169,6 +174,9 @@ public abstract class IoTDBSyncClientManager extends IoTDBClientManager implemen PipeTransferHandshakeConstant.HANDSHAKE_KEY_TIME_PRECISION, CommonDescriptor.getInstance().getConfig().getTimestampPrecision()); params.put(PipeTransferHandshakeConstant.HANDSHAKE_KEY_CLUSTER_ID, getClusterId()); + params.put( + PipeTransferHandshakeConstant.HANDSHAKE_KEY_CONVERT_ON_TYPE_MISMATCH, + Boolean.toString(shouldReceiverConvertOnTypeMismatch)); // Try to handshake by PipeTransferHandshakeV2Req. TPipeTransferResp resp = client.pipeTransfer(buildHandshakeV2Req(params)); diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/payload/thrift/common/PipeTransferHandshakeConstant.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/payload/thrift/common/PipeTransferHandshakeConstant.java index 07c7bb46390..b6864706aaa 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/payload/thrift/common/PipeTransferHandshakeConstant.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/payload/thrift/common/PipeTransferHandshakeConstant.java @@ -23,6 +23,7 @@ public class PipeTransferHandshakeConstant { public static final String HANDSHAKE_KEY_TIME_PRECISION = "timestampPrecision"; public static final String HANDSHAKE_KEY_CLUSTER_ID = "clusterID"; + public static final String HANDSHAKE_KEY_CONVERT_ON_TYPE_MISMATCH = "convertOnTypeMismatch"; private PipeTransferHandshakeConstant() { // Utility class diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBConnector.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBConnector.java index 6ac6f9432df..9dc08f93402 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBConnector.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBConnector.java @@ -62,6 +62,8 @@ import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstan import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_EXCEPTION_CONFLICT_RESOLVE_STRATEGY_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_EXCEPTION_CONFLICT_RETRY_MAX_TIME_SECONDS_DEFAULT_VALUE; import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_EXCEPTION_CONFLICT_RETRY_MAX_TIME_SECONDS_KEY; +import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_EXCEPTION_DATA_CONVERT_ON_TYPE_MISMATCH_DEFAULT_VALUE; +import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_EXCEPTION_DATA_CONVERT_ON_TYPE_MISMATCH_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_EXCEPTION_OTHERS_RECORD_IGNORED_DATA_DEFAULT_VALUE; import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_EXCEPTION_OTHERS_RECORD_IGNORED_DATA_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_EXCEPTION_OTHERS_RETRY_MAX_TIME_SECONDS_DEFAULT_VALUE; @@ -88,6 +90,7 @@ import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstan import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_EXCEPTION_CONFLICT_RECORD_IGNORED_DATA_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_EXCEPTION_CONFLICT_RESOLVE_STRATEGY_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_EXCEPTION_CONFLICT_RETRY_MAX_TIME_SECONDS_KEY; +import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_EXCEPTION_DATA_CONVERT_ON_TYPE_MISMATCH_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_EXCEPTION_OTHERS_RECORD_IGNORED_DATA_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_EXCEPTION_OTHERS_RETRY_MAX_TIME_SECONDS_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_FORMAT_KEY; @@ -124,6 +127,8 @@ public abstract class IoTDBConnector implements PipeConnector { protected boolean isTabletBatchModeEnabled = true; protected PipeReceiverStatusHandler receiverStatusHandler; + protected boolean shouldReceiverConvertOnTypeMismatch = + CONNECTOR_EXCEPTION_DATA_CONVERT_ON_TYPE_MISMATCH_DEFAULT_VALUE; @Override public void validate(final PipeParameterValidator validator) throws Exception { @@ -312,6 +317,16 @@ public abstract class IoTDBConnector implements PipeConnector { CONNECTOR_EXCEPTION_OTHERS_RECORD_IGNORED_DATA_KEY, SINK_EXCEPTION_OTHERS_RECORD_IGNORED_DATA_KEY), CONNECTOR_EXCEPTION_OTHERS_RECORD_IGNORED_DATA_DEFAULT_VALUE)); + shouldReceiverConvertOnTypeMismatch = + parameters.getBooleanOrDefault( + Arrays.asList( + CONNECTOR_EXCEPTION_DATA_CONVERT_ON_TYPE_MISMATCH_KEY, + SINK_EXCEPTION_DATA_CONVERT_ON_TYPE_MISMATCH_KEY), + CONNECTOR_EXCEPTION_DATA_CONVERT_ON_TYPE_MISMATCH_DEFAULT_VALUE); + LOGGER.info( + "IoTDBConnector {} = {}", + CONNECTOR_EXCEPTION_DATA_CONVERT_ON_TYPE_MISMATCH_KEY, + shouldReceiverConvertOnTypeMismatch); } protected LinkedHashSet<TEndPoint> parseNodeUrls(final PipeParameters parameters) diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBSslSyncConnector.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBSslSyncConnector.java index be93018bf9c..271596cf549 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBSslSyncConnector.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBSslSyncConnector.java @@ -116,7 +116,13 @@ public abstract class IoTDBSslSyncConnector extends IoTDBConnector { clientManager = constructClient( - nodeUrls, useSSL, trustStorePath, trustStorePwd, useLeaderCache, loadBalanceStrategy); + nodeUrls, + useSSL, + trustStorePath, + trustStorePwd, + useLeaderCache, + loadBalanceStrategy, + shouldReceiverConvertOnTypeMismatch); } protected abstract IoTDBSyncClientManager constructClient( @@ -125,7 +131,8 @@ public abstract class IoTDBSslSyncConnector extends IoTDBConnector { String trustStorePath, String trustStorePwd, boolean useLeaderCache, - String loadBalanceStrategy); + String loadBalanceStrategy, + boolean shouldReceiverConvertOnTypeMismatch); @Override public void handshake() throws Exception { diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/IoTDBFileReceiver.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/IoTDBFileReceiver.java index b60dd1d5e1c..6665ee21d36 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/IoTDBFileReceiver.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/IoTDBFileReceiver.java @@ -50,6 +50,8 @@ import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; +import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_EXCEPTION_DATA_CONVERT_ON_TYPE_MISMATCH_DEFAULT_VALUE; + /** * {@link IoTDBFileReceiver} is the parent class of receiver on both configNode and DataNode, * handling all the logic of parallel file receiving. @@ -66,6 +68,9 @@ public abstract class IoTDBFileReceiver implements IoTDBReceiver { private File writingFile; private RandomAccessFile writingFileWriter; + private boolean shouldConvertDataTypeOnTypeMismatch = + CONNECTOR_EXCEPTION_DATA_CONVERT_ON_TYPE_MISMATCH_DEFAULT_VALUE; + @Override public IoTDBConnectorRequestVersion getVersion() { return IoTDBConnectorRequestVersion.VERSION_1; @@ -216,6 +221,13 @@ public abstract class IoTDBFileReceiver implements IoTDBReceiver { return new TPipeTransferResp(status); } + final String shouldConvertDataTypeOnTypeMismatchString = + req.getParams().get(PipeTransferHandshakeConstant.HANDSHAKE_KEY_CONVERT_ON_TYPE_MISMATCH); + if (shouldConvertDataTypeOnTypeMismatchString != null) { + shouldConvertDataTypeOnTypeMismatch = + Boolean.parseBoolean(shouldConvertDataTypeOnTypeMismatchString); + } + // Handle the handshake request as a v1 request. // Here we construct a fake "dataNode" request to valid from v1 validation logic, though // it may not require the actual type of the v1 request.
