This is an automated email from the ASF dual-hosted git repository. haonan pushed a commit to branch 0.13_data_insert_adapt in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit ee62a3d9120b23aacab6df2cbc638aced2bbe157 Author: HTHou <[email protected]> AuthorDate: Thu Apr 6 15:56:22 2023 +0800 [To rel/1.1] Adapt insert data by 0.13 client --- .../src/assembly/resources/conf/iotdb-common.properties | 5 +++++ .../main/java/org/apache/iotdb/db/conf/IoTDBConfig.java | 10 ++++++++++ .../java/org/apache/iotdb/db/conf/IoTDBDescriptor.java | 5 +++++ .../java/org/apache/iotdb/db/mpp/common/SessionInfo.java | 14 ++++++++++++++ .../apache/iotdb/db/mpp/plan/execution/QueryExecution.java | 7 ++++++- .../org/apache/iotdb/db/query/control/SessionManager.java | 6 +++++- 6 files changed, 45 insertions(+), 2 deletions(-) diff --git a/node-commons/src/assembly/resources/conf/iotdb-common.properties b/node-commons/src/assembly/resources/conf/iotdb-common.properties index 0a61caa03c..c62602f654 100644 --- a/node-commons/src/assembly/resources/conf/iotdb-common.properties +++ b/node-commons/src/assembly/resources/conf/iotdb-common.properties @@ -514,6 +514,11 @@ cluster_name=defaultCluster # Datatype: int # recovery_log_interval_in_ms=5000 +# If using v0.13 client to insert data, please set this configuration to true. +# Notice: if using v1.0 client or setting Client Version to V_0_13 manually, enable this config may reduce the insert performance. +# Datatype: boolean +# 0.13_data_insert_adapt=false + # When there exists old version(v2) TsFile, how many thread will be set up to perform upgrade tasks, 1 by default. # Set to 1 when less than or equal to 0. # Datatype: int diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java index 38bc963235..a2ea88984e 100644 --- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java +++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java @@ -752,6 +752,8 @@ public class IoTDBConfig { // if enable partial insert, one measurement failure will not impact other measurements private boolean enablePartialInsert = true; + private boolean enable13DataInsertAdapt = false; + /** * Used to estimate the memory usage of text fields in a UDF query. It is recommended to set this * value to be slightly larger than the average length of all text records. @@ -1838,6 +1840,14 @@ public class IoTDBConfig { this.enablePartialInsert = enablePartialInsert; } + public boolean isEnable13DataInsertAdapt() { + return enable13DataInsertAdapt; + } + + public void setEnable13DataInsertAdapt(boolean enable13DataInsertAdapt) { + this.enable13DataInsertAdapt = enable13DataInsertAdapt; + } + public int getCompactionThreadCount() { return compactionThreadCount; } diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java index 77378e7f72..46909a69e2 100644 --- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java +++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java @@ -671,6 +671,11 @@ public class IoTDBDescriptor { properties.getProperty( "enable_partial_insert", String.valueOf(conf.isEnablePartialInsert())))); + conf.setEnable13DataInsertAdapt( + Boolean.parseBoolean( + properties.getProperty( + "0.13_data_insert_adapt", String.valueOf(conf.isEnable13DataInsertAdapt())))); + int rpcSelectorThreadNum = Integer.parseInt( properties.getProperty( diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/common/SessionInfo.java b/server/src/main/java/org/apache/iotdb/db/mpp/common/SessionInfo.java index 5d7645c78e..3183c50b11 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/common/SessionInfo.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/common/SessionInfo.java @@ -18,6 +18,7 @@ */ package org.apache.iotdb.db.mpp.common; +import org.apache.iotdb.commons.conf.IoTDBConstant.ClientVersion; import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils; import java.io.DataOutputStream; @@ -29,12 +30,21 @@ public class SessionInfo { private final String userName; private final String zoneId; + private ClientVersion version = ClientVersion.V_1_0; + public SessionInfo(long sessionId, String userName, String zoneId) { this.sessionId = sessionId; this.userName = userName; this.zoneId = zoneId; } + public SessionInfo(long sessionId, String userName, String zoneId, ClientVersion version) { + this.sessionId = sessionId; + this.userName = userName; + this.zoneId = zoneId; + this.version = version; + } + public long getSessionId() { return sessionId; } @@ -47,6 +57,10 @@ public class SessionInfo { return zoneId; } + public ClientVersion getVersion() { + return version; + } + public static SessionInfo deserializeFrom(ByteBuffer buffer) { long sessionId = ReadWriteIOUtils.readLong(buffer); String userName = ReadWriteIOUtils.readString(buffer); diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java index 4c2d0c23bf..3588cc4462 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java @@ -23,6 +23,7 @@ import org.apache.iotdb.common.rpc.thrift.TSStatus; import org.apache.iotdb.commons.client.IClientManager; import org.apache.iotdb.commons.client.async.AsyncDataNodeInternalServiceClient; import org.apache.iotdb.commons.client.sync.SyncDataNodeInternalServiceClient; +import org.apache.iotdb.commons.conf.IoTDBConstant; import org.apache.iotdb.commons.exception.IoTDBException; import org.apache.iotdb.commons.service.metric.enums.PerformanceOverviewMetrics; import org.apache.iotdb.db.conf.IoTDBConfig; @@ -618,8 +619,12 @@ public class QueryExecution implements IQueryExecution { } // collect redirect info to client for writing + // if 0.13_data_insert_adapt is true and ClientVersion is NOT V_1_0, stop returning redirect + // info to client if (analysis.getStatement() instanceof InsertBaseStatement - && !analysis.isFinishQueryAfterAnalyze()) { + && !analysis.isFinishQueryAfterAnalyze() + && (!config.isEnable13DataInsertAdapt() + || IoTDBConstant.ClientVersion.V_1_0.equals(context.getSession().getVersion()))) { InsertBaseStatement insertStatement = (InsertBaseStatement) analysis.getStatement(); List<TEndPoint> redirectNodeList = insertStatement.collectRedirectInfo(analysis.getDataPartitionInfo()); diff --git a/server/src/main/java/org/apache/iotdb/db/query/control/SessionManager.java b/server/src/main/java/org/apache/iotdb/db/query/control/SessionManager.java index e2bd2a0c5c..93198c34da 100644 --- a/server/src/main/java/org/apache/iotdb/db/query/control/SessionManager.java +++ b/server/src/main/java/org/apache/iotdb/db/query/control/SessionManager.java @@ -351,7 +351,11 @@ public class SessionManager implements SessionManagerMBean { } public SessionInfo getSessionInfo(IClientSession session) { - return new SessionInfo(session.getId(), session.getUsername(), session.getZoneId().getId()); + return new SessionInfo( + session.getId(), + session.getUsername(), + session.getZoneId().getId(), + session.getClientVersion()); } @Override
