This is an automated email from the ASF dual-hosted git repository. hui pushed a commit to branch lmh/intoOperator in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 94cebb2572a9de5456af7a14e300cdbdd71d9910 Author: Minghui Liu <[email protected]> AuthorDate: Wed Oct 19 19:12:23 2022 +0800 add session info in FragmentInstance --- .../execution/fragment/FragmentInstanceContext.java | 12 ++++++++++++ .../distribution/WriteFragmentParallelPlanner.java | 4 +++- .../plan/scheduler/load/LoadTsFileScheduler.java | 8 ++++++-- .../db/mpp/plan/plan/FragmentInstanceSerdeTest.java | 9 +++++++-- .../mpp/plan/scheduler/StandaloneSchedulerTest.java | 21 ++++++++++++++++----- 5 files changed, 44 insertions(+), 10 deletions(-) diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceContext.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceContext.java index 5976897529..07e0c9bdb5 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceContext.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceContext.java @@ -18,6 +18,7 @@ */ package org.apache.iotdb.db.mpp.execution.fragment; +import org.apache.iotdb.commons.utils.TestOnly; import org.apache.iotdb.db.mpp.common.FragmentInstanceId; import org.apache.iotdb.db.mpp.execution.driver.DriverContext; import org.apache.iotdb.db.mpp.execution.operator.OperatorContext; @@ -27,6 +28,7 @@ import org.apache.iotdb.db.query.context.QueryContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.time.ZoneId; import java.util.ArrayList; import java.util.List; import java.util.concurrent.atomic.AtomicLong; @@ -86,6 +88,16 @@ public class FragmentInstanceContext extends QueryContext { return new FragmentInstanceContext(queryId); } + @TestOnly + public static FragmentInstanceContext createFragmentInstanceContext( + FragmentInstanceId id, FragmentInstanceStateMachine stateMachine) { + FragmentInstanceContext instanceContext = + new FragmentInstanceContext(id, stateMachine, "test", ZoneId.systemDefault().getId()); + instanceContext.initialize(); + instanceContext.start(); + return instanceContext; + } + private FragmentInstanceContext( FragmentInstanceId id, FragmentInstanceStateMachine stateMachine, diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/WriteFragmentParallelPlanner.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/WriteFragmentParallelPlanner.java index 0bbf402818..7c526ef488 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/WriteFragmentParallelPlanner.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/WriteFragmentParallelPlanner.java @@ -62,7 +62,9 @@ public class WriteFragmentParallelPlanner implements IFragmentParallelPlaner { fragment.getId().genFragmentInstanceId(), timeFilter, queryContext.getQueryType(), - queryContext.getTimeOut()); + queryContext.getTimeOut(), + queryContext.getSession().getUserName(), + queryContext.getSession().getZoneId()); instance.setDataRegionAndHost(split.getRegionReplicaSet()); ret.add(instance); } diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/load/LoadTsFileScheduler.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/load/LoadTsFileScheduler.java index dd403d46ea..c89902b7a8 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/load/LoadTsFileScheduler.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/load/LoadTsFileScheduler.java @@ -143,7 +143,9 @@ public class LoadTsFileScheduler implements IScheduler { fragmentId.genFragmentInstanceId(), null, queryContext.getQueryType(), - queryContext.getTimeOut()); + queryContext.getTimeOut(), + queryContext.getSession().getUserName(), + queryContext.getSession().getZoneId()); instance.setDataRegionAndHost(entry.getKey()); Future<FragInstanceDispatchResult> dispatchResultFuture = dispatcher.dispatch(Collections.singletonList(instance)); @@ -231,7 +233,9 @@ public class LoadTsFileScheduler implements IScheduler { fragmentId.genFragmentInstanceId(), null, queryContext.getQueryType(), - queryContext.getTimeOut()); + queryContext.getTimeOut(), + queryContext.getSession().getUserName(), + queryContext.getSession().getZoneId()); instance.setDataRegionAndHost(node.getLocalRegionReplicaSet()); dispatcher.dispatchLocally(instance); } catch (FragmentInstanceDispatchException e) { diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/FragmentInstanceSerdeTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/FragmentInstanceSerdeTest.java index a1898ea023..04fa430e5e 100644 --- a/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/FragmentInstanceSerdeTest.java +++ b/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/FragmentInstanceSerdeTest.java @@ -44,6 +44,7 @@ import com.google.common.collect.ImmutableList; import org.junit.Test; import java.nio.ByteBuffer; +import java.time.ZoneId; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNull; @@ -68,7 +69,9 @@ public class FragmentInstanceSerdeTest { planFragmentId.genFragmentInstanceId(), new GroupByFilter(1, 2, 3, 4), QueryType.READ, - config.getQueryTimeoutThreshold()); + config.getQueryTimeoutThreshold(), + "test", + ZoneId.systemDefault().getId()); TRegionReplicaSet regionReplicaSet = new TRegionReplicaSet( new TConsensusGroupId(TConsensusGroupType.DataRegion, 1), @@ -101,7 +104,9 @@ public class FragmentInstanceSerdeTest { planFragmentId.genFragmentInstanceId(), null, QueryType.READ, - config.getQueryTimeoutThreshold()); + config.getQueryTimeoutThreshold(), + "test", + ZoneId.systemDefault().getId()); TRegionReplicaSet regionReplicaSet = new TRegionReplicaSet( new TConsensusGroupId(TConsensusGroupType.DataRegion, 1), diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/plan/scheduler/StandaloneSchedulerTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/plan/scheduler/StandaloneSchedulerTest.java index 94fac23509..b80adbc755 100644 --- a/server/src/test/java/org/apache/iotdb/db/mpp/plan/scheduler/StandaloneSchedulerTest.java +++ b/server/src/test/java/org/apache/iotdb/db/mpp/plan/scheduler/StandaloneSchedulerTest.java @@ -65,6 +65,7 @@ import org.junit.Assert; import org.junit.Before; import org.junit.Test; +import java.time.ZoneId; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; @@ -137,7 +138,9 @@ public class StandaloneSchedulerTest { planFragment.getId().genFragmentInstanceId(), new GroupByFilter(1, 2, 3, 4), QueryType.WRITE, - conf.getQueryTimeoutThreshold()); + conf.getQueryTimeoutThreshold(), + "test", + ZoneId.systemDefault().getId()); fragmentInstance.setDataRegionAndHost(regionReplicaSet); configNode.getBelongedSchemaRegionIdWithAutoCreate(new PartialPath("root.ln.wf01.wt01.status")); @@ -240,7 +243,9 @@ public class StandaloneSchedulerTest { planFragment.getId().genFragmentInstanceId(), new GroupByFilter(1, 2, 3, 4), QueryType.WRITE, - conf.getQueryTimeoutThreshold()); + conf.getQueryTimeoutThreshold(), + "test", + ZoneId.systemDefault().getId()); fragmentInstance.setDataRegionAndHost(regionReplicaSet); configNode.getBelongedSchemaRegionIdWithAutoCreate(new PartialPath("root.ln.wf01.GPS")); @@ -353,7 +358,9 @@ public class StandaloneSchedulerTest { planFragment.getId().genFragmentInstanceId(), new GroupByFilter(1, 2, 3, 4), QueryType.WRITE, - conf.getQueryTimeoutThreshold()); + conf.getQueryTimeoutThreshold(), + "test", + ZoneId.systemDefault().getId()); fragmentInstance.setDataRegionAndHost(regionReplicaSet); configNode.getBelongedSchemaRegionIdWithAutoCreate(new PartialPath("root.ln.d3")); @@ -404,7 +411,9 @@ public class StandaloneSchedulerTest { planFragment.getId().genFragmentInstanceId(), new GroupByFilter(1, 2, 3, 4), QueryType.WRITE, - conf.getQueryTimeoutThreshold()); + conf.getQueryTimeoutThreshold(), + "test", + ZoneId.systemDefault().getId()); fragmentInstance.setDataRegionAndHost(regionReplicaSet); configNode.getBelongedSchemaRegionIdWithAutoCreate(new PartialPath(deviceId)); @@ -485,7 +494,9 @@ public class StandaloneSchedulerTest { planFragment.getId().genFragmentInstanceId(), new GroupByFilter(1, 2, 3, 4), QueryType.WRITE, - conf.getQueryTimeoutThreshold()); + conf.getQueryTimeoutThreshold(), + "test", + ZoneId.systemDefault().getId()); fragmentInstance.setDataRegionAndHost(regionReplicaSet); configNode.getBelongedSchemaRegionIdWithAutoCreate(deviceId);
