This is an automated email from the ASF dual-hosted git repository. hui pushed a commit to branch lmh/intoOperator in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit bb548fd9fa488a3b0ed0bb2b5a1d0ac3e36fbfb5 Author: Minghui Liu <[email protected]> AuthorDate: Wed Oct 19 17:30:32 2022 +0800 fix sessionId --- .../iotdb/db/client/DataNodeInternalClient.java | 6 +++-- .../fragment/FragmentInstanceContext.java | 27 ++++++++++++++++++--- .../fragment/FragmentInstanceManager.java | 12 ++++++++-- .../db/mpp/execution/operator/OperatorContext.java | 8 +++++++ .../operator/process/AbstractIntoOperator.java | 6 +++-- .../SimpleFragmentParallelPlanner.java | 2 ++ .../db/mpp/plan/planner/plan/FragmentInstance.java | 28 +++++++++++++++++++--- .../iotdb/db/query/control/SessionManager.java | 5 ---- 8 files changed, 77 insertions(+), 17 deletions(-) diff --git a/server/src/main/java/org/apache/iotdb/db/client/DataNodeInternalClient.java b/server/src/main/java/org/apache/iotdb/db/client/DataNodeInternalClient.java index 43f7271bd2..e1e5bde993 100644 --- a/server/src/main/java/org/apache/iotdb/db/client/DataNodeInternalClient.java +++ b/server/src/main/java/org/apache/iotdb/db/client/DataNodeInternalClient.java @@ -20,6 +20,7 @@ package org.apache.iotdb.db.client; import org.apache.iotdb.common.rpc.thrift.TSStatus; +import org.apache.iotdb.commons.conf.IoTDBConstant; import org.apache.iotdb.db.auth.AuthorityChecker; import org.apache.iotdb.db.conf.IoTDBConfig; import org.apache.iotdb.db.conf.IoTDBDescriptor; @@ -53,7 +54,7 @@ public class DataNodeInternalClient { private final long sessionId; - public DataNodeInternalClient() { + public DataNodeInternalClient(String userName, String zoneId) { if (config.isClusterMode()) { PARTITION_FETCHER = ClusterPartitionFetcher.getInstance(); SCHEMA_FETCHER = ClusterSchemaFetcher.getInstance(); @@ -61,7 +62,8 @@ public class DataNodeInternalClient { PARTITION_FETCHER = StandalonePartitionFetcher.getInstance(); SCHEMA_FETCHER = StandaloneSchemaFetcher.getInstance(); } - sessionId = SESSION_MANAGER.requestInternalSessionId(); + this.sessionId = + SESSION_MANAGER.requestSessionId(userName, zoneId, IoTDBConstant.ClientVersion.V_0_13); } public TSStatus insertTablets(InsertMultiTabletsStatement statement) { diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceContext.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceContext.java index 41a23f8d00..5976897529 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceContext.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceContext.java @@ -58,6 +58,10 @@ public class FragmentInstanceContext extends QueryContext { private final AtomicReference<Long> lastExecutionStartTime = new AtomicReference<>(); private final AtomicReference<Long> executionEndTime = new AtomicReference<>(); + // session info + private String userName; + private String zoneId; + // private final GcMonitor gcMonitor; // private final AtomicLong startNanos = new AtomicLong(); // private final AtomicLong startFullGcCount = new AtomicLong(-1); @@ -67,8 +71,12 @@ public class FragmentInstanceContext extends QueryContext { // private final AtomicLong endFullGcTimeNanos = new AtomicLong(-1); public static FragmentInstanceContext createFragmentInstanceContext( - FragmentInstanceId id, FragmentInstanceStateMachine stateMachine) { - FragmentInstanceContext instanceContext = new FragmentInstanceContext(id, stateMachine); + FragmentInstanceId id, + FragmentInstanceStateMachine stateMachine, + String userName, + String zoneId) { + FragmentInstanceContext instanceContext = + new FragmentInstanceContext(id, stateMachine, userName, zoneId); instanceContext.initialize(); instanceContext.start(); return instanceContext; @@ -79,10 +87,15 @@ public class FragmentInstanceContext extends QueryContext { } private FragmentInstanceContext( - FragmentInstanceId id, FragmentInstanceStateMachine stateMachine) { + FragmentInstanceId id, + FragmentInstanceStateMachine stateMachine, + String userName, + String zoneId) { this.id = id; this.stateMachine = stateMachine; this.executionEndTime.set(END_TIME_INITIAL_VALUE); + this.userName = userName; + this.zoneId = zoneId; } // used for compaction @@ -200,4 +213,12 @@ public class FragmentInstanceContext extends QueryContext { public FragmentInstanceStateMachine getStateMachine() { return stateMachine; } + + public String getUserName() { + return userName; + } + + public String getZoneId() { + return zoneId; + } } diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceManager.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceManager.java index 16bf91ba36..921856babc 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceManager.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceManager.java @@ -108,7 +108,11 @@ public class FragmentInstanceManager { instanceContext.computeIfAbsent( instanceId, fragmentInstanceId -> - createFragmentInstanceContext(fragmentInstanceId, stateMachine)); + createFragmentInstanceContext( + fragmentInstanceId, + stateMachine, + instance.getUserName(), + instance.getZoneId())); try { DataDriver driver = @@ -151,7 +155,11 @@ public class FragmentInstanceManager { instanceContext.computeIfAbsent( instanceId, fragmentInstanceId -> - createFragmentInstanceContext(fragmentInstanceId, stateMachine)); + createFragmentInstanceContext( + fragmentInstanceId, + stateMachine, + instance.getUserName(), + instance.getZoneId())); try { SchemaDriver driver = diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/OperatorContext.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/OperatorContext.java index 356e0c4eed..02c5311901 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/OperatorContext.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/OperatorContext.java @@ -70,6 +70,14 @@ public class OperatorContext { this.maxRunTime = maxRunTime; } + public String getUserName() { + return instanceContext.getUserName(); + } + + public String getZoneId() { + return instanceContext.getZoneId(); + } + @Override public boolean equals(Object o) { if (this == o) { diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AbstractIntoOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AbstractIntoOperator.java index 21cf6951fa..4c13359e75 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AbstractIntoOperator.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AbstractIntoOperator.java @@ -59,7 +59,7 @@ public abstract class AbstractIntoOperator implements ProcessOperator { protected final Map<String, InputLocation> sourceColumnToInputLocationMap; - private final DataNodeInternalClient client = new DataNodeInternalClient(); + private final DataNodeInternalClient client; public AbstractIntoOperator( OperatorContext operatorContext, @@ -70,6 +70,8 @@ public abstract class AbstractIntoOperator implements ProcessOperator { this.child = child; this.insertTabletStatementGenerators = insertTabletStatementGenerators; this.sourceColumnToInputLocationMap = sourceColumnToInputLocationMap; + this.client = + new DataNodeInternalClient(operatorContext.getUserName(), operatorContext.getZoneId()); } protected static List<IntoOperator.InsertTabletStatementGenerator> @@ -109,7 +111,7 @@ public abstract class AbstractIntoOperator implements ProcessOperator { if (executionStatus.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { String message = String.format( - "Error occurred while inserting tablets in SELECT INTO. %s", + "Error occurred while inserting tablets in SELECT INTO: %s", executionStatus.getMessage()); LOGGER.error(message); throw new IntoProcessException(message); diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SimpleFragmentParallelPlanner.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SimpleFragmentParallelPlanner.java index f83730b187..b46ac5b36d 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SimpleFragmentParallelPlanner.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SimpleFragmentParallelPlanner.java @@ -100,6 +100,8 @@ public class SimpleFragmentParallelPlanner implements IFragmentParallelPlaner { timeFilter, queryContext.getQueryType(), queryContext.getTimeOut(), + queryContext.getSession().getUserName(), + queryContext.getSession().getZoneId(), fragment.isRoot()); // Get the target region for origin PlanFragment, then its instance will be distributed one diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/FragmentInstance.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/FragmentInstance.java index 15a25b8dcf..97790ed63e 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/FragmentInstance.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/FragmentInstance.java @@ -65,6 +65,10 @@ public class FragmentInstance implements IConsensusRequest { private boolean isRoot; + // session info + private final String userName; + private final String zoneId; + // We can add some more params for a specific FragmentInstance // So that we can make different FragmentInstance owns different data range. @@ -73,13 +77,17 @@ public class FragmentInstance implements IConsensusRequest { FragmentInstanceId id, Filter timeFilter, QueryType type, - long timeOut) { + long timeOut, + String userName, + String zoneId) { this.fragment = fragment; this.timeFilter = timeFilter; this.id = id; this.type = type; this.timeOut = timeOut > 0 ? timeOut : config.getQueryTimeoutThreshold(); this.isRoot = false; + this.userName = userName; + this.zoneId = zoneId; } public FragmentInstance( @@ -88,8 +96,10 @@ public class FragmentInstance implements IConsensusRequest { Filter timeFilter, QueryType type, long timeOut, + String userName, + String zoneId, boolean isRoot) { - this(fragment, id, timeFilter, type, timeOut); + this(fragment, id, timeFilter, type, timeOut, userName, zoneId); this.isRoot = isRoot; } @@ -188,11 +198,13 @@ public class FragmentInstance implements IConsensusRequest { FragmentInstanceId id = FragmentInstanceId.deserialize(buffer); PlanFragment planFragment = PlanFragment.deserialize(buffer); long timeOut = ReadWriteIOUtils.readLong(buffer); + String userName = ReadWriteIOUtils.readString(buffer); + String zoneId = ReadWriteIOUtils.readString(buffer); boolean hasTimeFilter = ReadWriteIOUtils.readBool(buffer); Filter timeFilter = hasTimeFilter ? FilterFactory.deserialize(buffer) : null; QueryType queryType = QueryType.values()[ReadWriteIOUtils.readInt(buffer)]; FragmentInstance fragmentInstance = - new FragmentInstance(planFragment, id, timeFilter, queryType, timeOut); + new FragmentInstance(planFragment, id, timeFilter, queryType, timeOut, userName, zoneId); boolean hasHostDataNode = ReadWriteIOUtils.readBool(buffer); fragmentInstance.hostDataNode = hasHostDataNode ? ThriftCommonsSerDeUtils.deserializeTDataNodeLocation(buffer) : null; @@ -205,6 +217,8 @@ public class FragmentInstance implements IConsensusRequest { id.serialize(outputStream); fragment.serialize(outputStream); ReadWriteIOUtils.write(timeOut, outputStream); + ReadWriteIOUtils.write(userName, outputStream); + ReadWriteIOUtils.write(zoneId, outputStream); ReadWriteIOUtils.write(timeFilter != null, outputStream); if (timeFilter != null) { timeFilter.serialize(outputStream); @@ -246,4 +260,12 @@ public class FragmentInstance implements IConsensusRequest { public long getTimeOut() { return timeOut; } + + public String getUserName() { + return userName; + } + + public String getZoneId() { + return zoneId; + } } diff --git a/server/src/main/java/org/apache/iotdb/db/query/control/SessionManager.java b/server/src/main/java/org/apache/iotdb/db/query/control/SessionManager.java index 1ee80d5df2..53f140d317 100644 --- a/server/src/main/java/org/apache/iotdb/db/query/control/SessionManager.java +++ b/server/src/main/java/org/apache/iotdb/db/query/control/SessionManager.java @@ -237,11 +237,6 @@ public class SessionManager { return sessionId; } - public long requestInternalSessionId() { - return requestSessionId( - "__internal", ZoneId.systemDefault().getId(), IoTDBConstant.ClientVersion.V_0_13); - } - public boolean releaseSessionResource(long sessionId) { return releaseSessionResource(sessionId, this::releaseQueryResourceNoExceptions); }
