This is an automated email from the ASF dual-hosted git repository.

hui pushed a commit to branch lmh/intoOperator
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 94cebb2572a9de5456af7a14e300cdbdd71d9910
Author: Minghui Liu <[email protected]>
AuthorDate: Wed Oct 19 19:12:23 2022 +0800

    add session info in FragmentInstance
---
 .../execution/fragment/FragmentInstanceContext.java | 12 ++++++++++++
 .../distribution/WriteFragmentParallelPlanner.java  |  4 +++-
 .../plan/scheduler/load/LoadTsFileScheduler.java    |  8 ++++++--
 .../db/mpp/plan/plan/FragmentInstanceSerdeTest.java |  9 +++++++--
 .../mpp/plan/scheduler/StandaloneSchedulerTest.java | 21 ++++++++++++++++-----
 5 files changed, 44 insertions(+), 10 deletions(-)

diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceContext.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceContext.java
index 5976897529..07e0c9bdb5 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceContext.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceContext.java
@@ -18,6 +18,7 @@
  */
 package org.apache.iotdb.db.mpp.execution.fragment;
 
+import org.apache.iotdb.commons.utils.TestOnly;
 import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
 import org.apache.iotdb.db.mpp.execution.driver.DriverContext;
 import org.apache.iotdb.db.mpp.execution.operator.OperatorContext;
@@ -27,6 +28,7 @@ import org.apache.iotdb.db.query.context.QueryContext;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.time.ZoneId;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.atomic.AtomicLong;
@@ -86,6 +88,16 @@ public class FragmentInstanceContext extends QueryContext {
     return new FragmentInstanceContext(queryId);
   }
 
+  @TestOnly
+  public static FragmentInstanceContext createFragmentInstanceContext(
+      FragmentInstanceId id, FragmentInstanceStateMachine stateMachine) {
+    FragmentInstanceContext instanceContext =
+        new FragmentInstanceContext(id, stateMachine, "test", 
ZoneId.systemDefault().getId());
+    instanceContext.initialize();
+    instanceContext.start();
+    return instanceContext;
+  }
+
   private FragmentInstanceContext(
       FragmentInstanceId id,
       FragmentInstanceStateMachine stateMachine,
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/WriteFragmentParallelPlanner.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/WriteFragmentParallelPlanner.java
index 0bbf402818..7c526ef488 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/WriteFragmentParallelPlanner.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/WriteFragmentParallelPlanner.java
@@ -62,7 +62,9 @@ public class WriteFragmentParallelPlanner implements 
IFragmentParallelPlaner {
               fragment.getId().genFragmentInstanceId(),
               timeFilter,
               queryContext.getQueryType(),
-              queryContext.getTimeOut());
+              queryContext.getTimeOut(),
+              queryContext.getSession().getUserName(),
+              queryContext.getSession().getZoneId());
       instance.setDataRegionAndHost(split.getRegionReplicaSet());
       ret.add(instance);
     }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/load/LoadTsFileScheduler.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/load/LoadTsFileScheduler.java
index dd403d46ea..c89902b7a8 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/load/LoadTsFileScheduler.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/load/LoadTsFileScheduler.java
@@ -143,7 +143,9 @@ public class LoadTsFileScheduler implements IScheduler {
                 fragmentId.genFragmentInstanceId(),
                 null,
                 queryContext.getQueryType(),
-                queryContext.getTimeOut());
+                queryContext.getTimeOut(),
+                queryContext.getSession().getUserName(),
+                queryContext.getSession().getZoneId());
         instance.setDataRegionAndHost(entry.getKey());
         Future<FragInstanceDispatchResult> dispatchResultFuture =
             dispatcher.dispatch(Collections.singletonList(instance));
@@ -231,7 +233,9 @@ public class LoadTsFileScheduler implements IScheduler {
               fragmentId.genFragmentInstanceId(),
               null,
               queryContext.getQueryType(),
-              queryContext.getTimeOut());
+              queryContext.getTimeOut(),
+              queryContext.getSession().getUserName(),
+              queryContext.getSession().getZoneId());
       instance.setDataRegionAndHost(node.getLocalRegionReplicaSet());
       dispatcher.dispatchLocally(instance);
     } catch (FragmentInstanceDispatchException e) {
diff --git 
a/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/FragmentInstanceSerdeTest.java
 
b/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/FragmentInstanceSerdeTest.java
index a1898ea023..04fa430e5e 100644
--- 
a/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/FragmentInstanceSerdeTest.java
+++ 
b/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/FragmentInstanceSerdeTest.java
@@ -44,6 +44,7 @@ import com.google.common.collect.ImmutableList;
 import org.junit.Test;
 
 import java.nio.ByteBuffer;
+import java.time.ZoneId;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNull;
@@ -68,7 +69,9 @@ public class FragmentInstanceSerdeTest {
             planFragmentId.genFragmentInstanceId(),
             new GroupByFilter(1, 2, 3, 4),
             QueryType.READ,
-            config.getQueryTimeoutThreshold());
+            config.getQueryTimeoutThreshold(),
+            "test",
+            ZoneId.systemDefault().getId());
     TRegionReplicaSet regionReplicaSet =
         new TRegionReplicaSet(
             new TConsensusGroupId(TConsensusGroupType.DataRegion, 1),
@@ -101,7 +104,9 @@ public class FragmentInstanceSerdeTest {
             planFragmentId.genFragmentInstanceId(),
             null,
             QueryType.READ,
-            config.getQueryTimeoutThreshold());
+            config.getQueryTimeoutThreshold(),
+            "test",
+            ZoneId.systemDefault().getId());
     TRegionReplicaSet regionReplicaSet =
         new TRegionReplicaSet(
             new TConsensusGroupId(TConsensusGroupType.DataRegion, 1),
diff --git 
a/server/src/test/java/org/apache/iotdb/db/mpp/plan/scheduler/StandaloneSchedulerTest.java
 
b/server/src/test/java/org/apache/iotdb/db/mpp/plan/scheduler/StandaloneSchedulerTest.java
index 94fac23509..b80adbc755 100644
--- 
a/server/src/test/java/org/apache/iotdb/db/mpp/plan/scheduler/StandaloneSchedulerTest.java
+++ 
b/server/src/test/java/org/apache/iotdb/db/mpp/plan/scheduler/StandaloneSchedulerTest.java
@@ -65,6 +65,7 @@ import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 
+import java.time.ZoneId;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
@@ -137,7 +138,9 @@ public class StandaloneSchedulerTest {
             planFragment.getId().genFragmentInstanceId(),
             new GroupByFilter(1, 2, 3, 4),
             QueryType.WRITE,
-            conf.getQueryTimeoutThreshold());
+            conf.getQueryTimeoutThreshold(),
+            "test",
+            ZoneId.systemDefault().getId());
     fragmentInstance.setDataRegionAndHost(regionReplicaSet);
 
     configNode.getBelongedSchemaRegionIdWithAutoCreate(new 
PartialPath("root.ln.wf01.wt01.status"));
@@ -240,7 +243,9 @@ public class StandaloneSchedulerTest {
             planFragment.getId().genFragmentInstanceId(),
             new GroupByFilter(1, 2, 3, 4),
             QueryType.WRITE,
-            conf.getQueryTimeoutThreshold());
+            conf.getQueryTimeoutThreshold(),
+            "test",
+            ZoneId.systemDefault().getId());
     fragmentInstance.setDataRegionAndHost(regionReplicaSet);
 
     configNode.getBelongedSchemaRegionIdWithAutoCreate(new 
PartialPath("root.ln.wf01.GPS"));
@@ -353,7 +358,9 @@ public class StandaloneSchedulerTest {
             planFragment.getId().genFragmentInstanceId(),
             new GroupByFilter(1, 2, 3, 4),
             QueryType.WRITE,
-            conf.getQueryTimeoutThreshold());
+            conf.getQueryTimeoutThreshold(),
+            "test",
+            ZoneId.systemDefault().getId());
     fragmentInstance.setDataRegionAndHost(regionReplicaSet);
 
     configNode.getBelongedSchemaRegionIdWithAutoCreate(new 
PartialPath("root.ln.d3"));
@@ -404,7 +411,9 @@ public class StandaloneSchedulerTest {
             planFragment.getId().genFragmentInstanceId(),
             new GroupByFilter(1, 2, 3, 4),
             QueryType.WRITE,
-            conf.getQueryTimeoutThreshold());
+            conf.getQueryTimeoutThreshold(),
+            "test",
+            ZoneId.systemDefault().getId());
     fragmentInstance.setDataRegionAndHost(regionReplicaSet);
 
     configNode.getBelongedSchemaRegionIdWithAutoCreate(new 
PartialPath(deviceId));
@@ -485,7 +494,9 @@ public class StandaloneSchedulerTest {
             planFragment.getId().genFragmentInstanceId(),
             new GroupByFilter(1, 2, 3, 4),
             QueryType.WRITE,
-            conf.getQueryTimeoutThreshold());
+            conf.getQueryTimeoutThreshold(),
+            "test",
+            ZoneId.systemDefault().getId());
     fragmentInstance.setDataRegionAndHost(regionReplicaSet);
 
     configNode.getBelongedSchemaRegionIdWithAutoCreate(deviceId);

Reply via email to