This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch ty-mpp
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/ty-mpp by this push:
new d05878b2 finish most FragmentInstanceManager
d05878b2 is described below
commit d05878b281087eed45a335b6ecf8ae77f404b54b
Author: JackieTien97 <[email protected]>
AuthorDate: Thu Mar 31 20:57:55 2022 +0800
finish most FragmentInstanceManager
---
.../iotdb/db/mpp/execution/DriverContext.java | 23 +++-
.../db/mpp/execution/FragmentInstanceContext.java | 10 ++
.../mpp/execution/FragmentInstanceExecution.java | 85 +++++++++++++
.../db/mpp/execution/FragmentInstanceInfo.java | 32 +++++
.../db/mpp/execution/FragmentInstanceManager.java | 93 ++++++++++++++
.../db/mpp/operator/process/TimeJoinOperator.java | 3 +-
...Manager.java => FragmentInstanceScheduler.java} | 10 +-
...anager.java => IFragmentInstanceScheduler.java} | 2 +-
.../db/mpp/sql/planner/LocalExecutionPlanner.java | 134 ++++++++++++++++++---
.../db/mpp/sql/planner/plan/FragmentInstance.java | 7 ++
.../db/mpp/sql/planner/plan/node/PlanVisitor.java | 5 +
.../planner/plan/node/process/TimeJoinNode.java | 8 ++
.../iotdb/db/mpp/operator/LimitOperatorTest.java | 1 -
.../db/mpp/operator/TimeJoinOperatorTest.java | 1 -
.../db/mpp/schedule/DefaultTaskSchedulerTest.java | 2 +-
.../mpp/schedule/FragmentInstanceManagerTest.java | 2 +-
16 files changed, 387 insertions(+), 31 deletions(-)
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/DriverContext.java
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/DriverContext.java
index aa3a265..ea11b12 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/DriverContext.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/DriverContext.java
@@ -27,11 +27,24 @@ import org.apache.iotdb.tsfile.read.filter.basic.Filter;
import java.util.List;
public class DriverContext {
- private FragmentInstanceContext fragmentInstanceContext;
- private List<PartialPath> paths;
- private Filter timeFilter;
- private VirtualStorageGroupProcessor dataRegion;
- private List<SourceOperator> sourceOperators;
+ private final FragmentInstanceContext fragmentInstanceContext;
+ private final List<PartialPath> paths;
+ private final Filter timeFilter;
+ private final VirtualStorageGroupProcessor dataRegion;
+ private final List<SourceOperator> sourceOperators;
+
+ public DriverContext(
+ FragmentInstanceContext fragmentInstanceContext,
+ List<PartialPath> paths,
+ Filter timeFilter,
+ VirtualStorageGroupProcessor dataRegion,
+ List<SourceOperator> sourceOperators) {
+ this.fragmentInstanceContext = fragmentInstanceContext;
+ this.paths = paths;
+ this.timeFilter = timeFilter;
+ this.dataRegion = dataRegion;
+ this.sourceOperators = sourceOperators;
+ }
public FragmentInstanceId getId() {
return fragmentInstanceContext.getId();
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/FragmentInstanceContext.java
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/FragmentInstanceContext.java
index f9333da..fc3c82d 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/FragmentInstanceContext.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/FragmentInstanceContext.java
@@ -37,6 +37,8 @@ public class FragmentInstanceContext extends QueryContext {
private final List<OperatorContext> operatorContexts = new ArrayList<>();
private final long createNanos = System.nanoTime();
+ private DriverContext driverContext;
+
// private final GcMonitor gcMonitor;
// private final AtomicLong startNanos = new AtomicLong();
// private final AtomicLong startFullGcCount = new AtomicLong(-1);
@@ -73,4 +75,12 @@ public class FragmentInstanceContext extends QueryContext {
public FragmentInstanceId getId() {
return id;
}
+
+ public DriverContext getDriverContext() {
+ return driverContext;
+ }
+
+ public void setDriverContext(DriverContext driverContext) {
+ this.driverContext = driverContext;
+ }
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/FragmentInstanceExecution.java
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/FragmentInstanceExecution.java
new file mode 100644
index 0000000..2a46ae8
--- /dev/null
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/FragmentInstanceExecution.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.mpp.execution;
+
+import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
+import org.apache.iotdb.db.mpp.schedule.IFragmentInstanceScheduler;
+
+import com.google.common.collect.ImmutableList;
+
+import static java.util.Objects.requireNonNull;
+
+public class FragmentInstanceExecution {
+
+ private final IFragmentInstanceScheduler scheduler;
+
+ private final FragmentInstanceId instanceId;
+ private final FragmentInstanceContext context;
+
+ private final Driver driver;
+
+ private FragmentInstanceState state;
+
+ private long lastHeartbeat;
+
+ public FragmentInstanceExecution(
+ IFragmentInstanceScheduler scheduler,
+ FragmentInstanceId instanceId,
+ FragmentInstanceContext context,
+ Driver driver) {
+ this.scheduler = scheduler;
+ this.instanceId = instanceId;
+ this.context = context;
+ this.driver = driver;
+ scheduler.submitFragmentInstances(instanceId.getQueryId(),
ImmutableList.of(driver));
+ }
+
+ public void recordHeartbeat() {
+ lastHeartbeat = System.currentTimeMillis();
+ }
+
+ public void setLastHeartbeat(long lastHeartbeat) {
+ this.lastHeartbeat = lastHeartbeat;
+ }
+
+ public FragmentInstanceState getInstanceState() {
+ return state;
+ }
+
+ public void setState(FragmentInstanceState state) {
+ this.state = state;
+ }
+
+ public FragmentInstanceInfo getInstanceInfo() {
+ return new FragmentInstanceInfo(state);
+ }
+
+ public void failed(Throwable cause) {
+ requireNonNull(cause, "cause is null");
+ // TODO
+ }
+
+ public void cancel() {
+ // TODO
+ }
+
+ public void abort() {
+ // TODO
+ }
+}
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/FragmentInstanceInfo.java
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/FragmentInstanceInfo.java
new file mode 100644
index 0000000..e32087b
--- /dev/null
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/FragmentInstanceInfo.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.mpp.execution;
+
+public class FragmentInstanceInfo {
+
+ private final FragmentInstanceState state;
+
+ public FragmentInstanceInfo(FragmentInstanceState state) {
+ this.state = state;
+ }
+
+ public FragmentInstanceState getState() {
+ return state;
+ }
+}
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/FragmentInstanceManager.java
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/FragmentInstanceManager.java
new file mode 100644
index 0000000..f8471ab
--- /dev/null
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/FragmentInstanceManager.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.mpp.execution;
+
+import org.apache.iotdb.db.engine.storagegroup.VirtualStorageGroupProcessor;
+import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
+import org.apache.iotdb.db.mpp.schedule.FragmentInstanceScheduler;
+import org.apache.iotdb.db.mpp.schedule.IFragmentInstanceScheduler;
+import org.apache.iotdb.db.mpp.sql.planner.LocalExecutionPlanner;
+import org.apache.iotdb.db.mpp.sql.planner.plan.FragmentInstance;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import static java.util.Objects.requireNonNull;
+
+public class FragmentInstanceManager {
+
+ private final Map<FragmentInstanceId, FragmentInstanceContext>
instanceContext;
+ private final Map<FragmentInstanceId, FragmentInstanceExecution>
instanceExecution;
+ private final LocalExecutionPlanner planner =
LocalExecutionPlanner.getInstance();
+ private final IFragmentInstanceScheduler scheduler =
FragmentInstanceScheduler.getInstance();
+
+ public static FragmentInstanceManager getInstance() {
+ return FragmentInstanceManager.InstanceHolder.INSTANCE;
+ }
+
+ private FragmentInstanceManager() {
+ this.instanceContext = new ConcurrentHashMap<>();
+ this.instanceExecution = new ConcurrentHashMap<>();
+ }
+
+ public FragmentInstanceInfo execDataQueryFragmentInstance(
+ FragmentInstance instance, VirtualStorageGroupProcessor dataRegion) {
+ FragmentInstanceId instanceId = instance.getId();
+
+ FragmentInstanceExecution execution =
+ instanceExecution.computeIfAbsent(
+ instanceId,
+ id -> {
+ FragmentInstanceContext context =
+ instanceContext.computeIfAbsent(instanceId,
FragmentInstanceContext::new);
+
+ Driver driver =
+ planner.plan(
+ instance.getFragment().getRoot(),
+ context,
+ instance.getTimeFilter(),
+ dataRegion);
+ return new FragmentInstanceExecution(scheduler, instanceId,
context, driver);
+ });
+
+ return execution.getInstanceInfo();
+ }
+
+ /**
+ * Gets the info for the specified fragment instance.
+ *
+ * <p>NOTE: this design assumes that only fragment instances that will
eventually exist are
+ * queried.
+ */
+ public FragmentInstanceInfo getInstanceInfo(FragmentInstanceId instanceId) {
+ requireNonNull(instanceId, "instanceId is null");
+ FragmentInstanceExecution execution = instanceExecution.get(instanceId);
+ if (execution == null) {
+ return null;
+ }
+ return execution.getInstanceInfo();
+ }
+
+ private static class InstanceHolder {
+
+ private InstanceHolder() {}
+
+ private static final FragmentInstanceManager INSTANCE = new
FragmentInstanceManager();
+ }
+}
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/TimeJoinOperator.java
b/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/TimeJoinOperator.java
index 0d0feb6..e38a150 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/TimeJoinOperator.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/TimeJoinOperator.java
@@ -65,7 +65,6 @@ public class TimeJoinOperator implements ProcessOperator {
OperatorContext operatorContext,
List<Operator> children,
OrderBy mergeOrder,
- int columnCount,
List<TSDataType> dataTypes) {
this.operatorContext = operatorContext;
this.children = children;
@@ -74,7 +73,7 @@ public class TimeJoinOperator implements ProcessOperator {
this.inputIndex = new int[this.inputCount];
this.noMoreTsBlocks = new boolean[this.inputCount];
this.timeSelector = new TimeSelector(this.inputCount << 1,
OrderBy.TIMESTAMP_ASC == mergeOrder);
- this.columnCount = columnCount;
+ this.columnCount = dataTypes.size();
this.dataTypes = dataTypes;
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/schedule/FragmentInstanceManager.java
b/server/src/main/java/org/apache/iotdb/db/mpp/schedule/FragmentInstanceScheduler.java
similarity index 97%
rename from
server/src/main/java/org/apache/iotdb/db/mpp/schedule/FragmentInstanceManager.java
rename to
server/src/main/java/org/apache/iotdb/db/mpp/schedule/FragmentInstanceScheduler.java
index c75bfc5..dcc80bd 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/schedule/FragmentInstanceManager.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/schedule/FragmentInstanceScheduler.java
@@ -51,11 +51,11 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
/** the manager of fragment instances scheduling */
-public class FragmentInstanceManager implements IFragmentInstanceManager,
IService {
+public class FragmentInstanceScheduler implements IFragmentInstanceScheduler,
IService {
- private static final Logger logger =
LoggerFactory.getLogger(FragmentInstanceManager.class);
+ private static final Logger logger =
LoggerFactory.getLogger(FragmentInstanceScheduler.class);
- public static FragmentInstanceManager getInstance() {
+ public static FragmentInstanceScheduler getInstance() {
return InstanceHolder.instance;
}
@@ -73,7 +73,7 @@ public class FragmentInstanceManager implements
IFragmentInstanceManager, IServi
private InternalService.Client mppServiceClient; // TODO: use from client
pool
private final List<AbstractExecutor> threads;
- public FragmentInstanceManager() {
+ private FragmentInstanceScheduler() {
this.readyQueue =
new L2PriorityQueue<>(
MAX_CAPACITY,
@@ -238,7 +238,7 @@ public class FragmentInstanceManager implements
IFragmentInstanceManager, IServi
private InstanceHolder() {}
- private static final FragmentInstanceManager instance = new
FragmentInstanceManager();
+ private static final FragmentInstanceScheduler instance = new
FragmentInstanceScheduler();
}
/** the default scheduler implementation */
private class Scheduler implements ITaskScheduler {
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/schedule/IFragmentInstanceManager.java
b/server/src/main/java/org/apache/iotdb/db/mpp/schedule/IFragmentInstanceScheduler.java
similarity index 97%
rename from
server/src/main/java/org/apache/iotdb/db/mpp/schedule/IFragmentInstanceManager.java
rename to
server/src/main/java/org/apache/iotdb/db/mpp/schedule/IFragmentInstanceScheduler.java
index 4fb4326..46ee0ea 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/schedule/IFragmentInstanceManager.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/schedule/IFragmentInstanceScheduler.java
@@ -25,7 +25,7 @@ import org.apache.iotdb.db.mpp.execution.ExecFragmentInstance;
import java.util.List;
/** the interface of fragment instance scheduling */
-public interface IFragmentInstanceManager {
+public interface IFragmentInstanceScheduler {
/**
* Submit one or more {@link ExecFragmentInstance} in one query for later
scheduling.
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/LocalExecutionPlanner.java
b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/LocalExecutionPlanner.java
index b3c60d7..da4f158 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/LocalExecutionPlanner.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/LocalExecutionPlanner.java
@@ -18,17 +18,24 @@
*/
package org.apache.iotdb.db.mpp.sql.planner;
+import org.apache.iotdb.db.engine.storagegroup.VirtualStorageGroupProcessor;
import org.apache.iotdb.db.metadata.path.PartialPath;
+import org.apache.iotdb.db.mpp.buffer.ISinkHandle;
import org.apache.iotdb.db.mpp.common.filter.QueryFilter;
+import org.apache.iotdb.db.mpp.execution.Driver;
+import org.apache.iotdb.db.mpp.execution.DriverContext;
import org.apache.iotdb.db.mpp.execution.FragmentInstanceContext;
import org.apache.iotdb.db.mpp.operator.Operator;
import org.apache.iotdb.db.mpp.operator.OperatorContext;
import org.apache.iotdb.db.mpp.operator.process.LimitOperator;
+import org.apache.iotdb.db.mpp.operator.process.TimeJoinOperator;
import org.apache.iotdb.db.mpp.operator.source.SeriesScanOperator;
+import org.apache.iotdb.db.mpp.operator.source.SourceOperator;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanVisitor;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.process.AggregateNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.process.DeviceMergeNode;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.process.ExchangeNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.process.FillNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.process.FilterNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.process.FilterNullNode;
@@ -37,11 +44,18 @@ import
org.apache.iotdb.db.mpp.sql.planner.plan.node.process.LimitNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.process.OffsetNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.process.SortNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.process.TimeJoinNode;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.sink.FragmentSinkNode;
import
org.apache.iotdb.db.mpp.sql.planner.plan.node.source.SeriesAggregateScanNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.source.SeriesScanNode;
import org.apache.iotdb.db.mpp.sql.statement.component.OrderBy;
+import org.apache.iotdb.tsfile.read.filter.basic.Filter;
+import java.util.ArrayList;
import java.util.List;
+import java.util.stream.Collectors;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static java.util.Objects.requireNonNull;
/**
* used to plan a fragment instance. Currently, we simply change it from
PlanNode to executable
@@ -50,6 +64,30 @@ import java.util.List;
*/
public class LocalExecutionPlanner {
+ public static LocalExecutionPlanner getInstance() {
+ return InstanceHolder.INSTANCE;
+ }
+
+ public Driver plan(
+ PlanNode plan,
+ FragmentInstanceContext instanceContext,
+ Filter timeFilter,
+ VirtualStorageGroupProcessor dataRegion) {
+ LocalExecutionPlanContext context = new
LocalExecutionPlanContext(instanceContext);
+
+ Operator root = plan.accept(new Visitor(), context);
+
+ DriverContext driverContext =
+ new DriverContext(
+ instanceContext,
+ context.getPaths(),
+ timeFilter,
+ dataRegion,
+ context.getSourceOperators());
+ instanceContext.setDriverContext(driverContext);
+ return new Driver(root, context.getSinkHandle(), driverContext);
+ }
+
/** This Visitor is responsible for transferring PlanNode Tree to Operator
Tree */
private class Visitor extends PlanVisitor<Operator,
LocalExecutionPlanContext> {
@@ -63,16 +101,23 @@ public class LocalExecutionPlanner {
PartialPath seriesPath = node.getSeriesPath();
boolean ascending = node.getScanOrder() == OrderBy.TIMESTAMP_ASC;
OperatorContext operatorContext =
- context.taskContext.addOperatorContext(
+ context.instanceContext.addOperatorContext(
context.getNextOperatorId(), node.getId(),
SeriesScanOperator.class.getSimpleName());
- return new SeriesScanOperator(
- seriesPath,
- node.getAllSensors(),
- seriesPath.getSeriesType(),
- operatorContext,
- node.getTimeFilter(),
- node.getValueFilter(),
- ascending);
+
+ SeriesScanOperator seriesScanOperator =
+ new SeriesScanOperator(
+ seriesPath,
+ node.getAllSensors(),
+ seriesPath.getSeriesType(),
+ operatorContext,
+ node.getTimeFilter(),
+ node.getValueFilter(),
+ ascending);
+
+ context.addSourceOperator(seriesScanOperator);
+ context.addPath(seriesPath);
+
+ return seriesScanOperator;
}
@Override
@@ -114,7 +159,7 @@ public class LocalExecutionPlanner {
public Operator visitLimit(LimitNode node, LocalExecutionPlanContext
context) {
Operator child = node.getChild().accept(this, context);
return new LimitOperator(
- context.taskContext.addOperatorContext(
+ context.instanceContext.addOperatorContext(
context.getNextOperatorId(), node.getId(),
LimitOperator.class.getSimpleName()),
node.getLimit(),
child);
@@ -138,20 +183,81 @@ public class LocalExecutionPlanner {
@Override
public Operator visitTimeJoin(TimeJoinNode node, LocalExecutionPlanContext
context) {
- return super.visitTimeJoin(node, context);
+ List<Operator> children =
+ node.getChildren().stream()
+ .map(child -> child.accept(this, context))
+ .collect(Collectors.toList());
+ OperatorContext operatorContext =
+ context.instanceContext.addOperatorContext(
+ context.getNextOperatorId(), node.getId(),
TimeJoinOperator.class.getSimpleName());
+ return new TimeJoinOperator(operatorContext, children,
node.getMergeOrder(), node.getTypes());
+ }
+
+ @Override
+ public Operator visitExchange(ExchangeNode node, LocalExecutionPlanContext
context) {
+ return super.visitExchange(node, context);
+ }
+
+ @Override
+ public Operator visitFragmentSink(FragmentSinkNode node,
LocalExecutionPlanContext context) {
+ Operator child = node.getChild().accept(this, context);
+ // TODO(jackie tien) create SinkHandle here
+ ISinkHandle sinkHandle = null;
+ context.setSinkHandle(sinkHandle);
+ return child;
}
}
+ private static class InstanceHolder {
+
+ private InstanceHolder() {}
+
+ private static final LocalExecutionPlanner INSTANCE = new
LocalExecutionPlanner();
+ }
+
private static class LocalExecutionPlanContext {
- private final FragmentInstanceContext taskContext;
+ private final FragmentInstanceContext instanceContext;
+ private final List<PartialPath> paths;
+ private final List<SourceOperator> sourceOperators;
+ private ISinkHandle sinkHandle;
+
private int nextOperatorId = 0;
- public LocalExecutionPlanContext(FragmentInstanceContext taskContext) {
- this.taskContext = taskContext;
+ public LocalExecutionPlanContext(FragmentInstanceContext instanceContext) {
+ this.instanceContext = instanceContext;
+ this.paths = new ArrayList<>();
+ this.sourceOperators = new ArrayList<>();
}
private int getNextOperatorId() {
return nextOperatorId++;
}
+
+ public List<PartialPath> getPaths() {
+ return paths;
+ }
+
+ public List<SourceOperator> getSourceOperators() {
+ return sourceOperators;
+ }
+
+ public void addPath(PartialPath path) {
+ paths.add(path);
+ }
+
+ public void addSourceOperator(SourceOperator sourceOperator) {
+ sourceOperators.add(sourceOperator);
+ }
+
+ public ISinkHandle getSinkHandle() {
+ return sinkHandle;
+ }
+
+ public void setSinkHandle(ISinkHandle sinkHandle) {
+ requireNonNull(sinkHandle, "sinkHandle is null");
+ checkArgument(this.sinkHandle == null, "There must be at most one
SinkNode");
+
+ this.sinkHandle = sinkHandle;
+ }
}
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/FragmentInstance.java
b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/FragmentInstance.java
index 61f9292..d4c31e2 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/FragmentInstance.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/FragmentInstance.java
@@ -28,6 +28,7 @@ import
org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeUtil;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.sink.FragmentSinkNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.write.InsertTabletNode;
import org.apache.iotdb.service.rpc.thrift.EndPoint;
+import org.apache.iotdb.tsfile.read.filter.basic.Filter;
import java.nio.ByteBuffer;
@@ -40,6 +41,8 @@ public class FragmentInstance implements IConsensusRequest {
private DataRegionReplicaSet dataRegion;
private EndPoint hostEndpoint;
+ private Filter timeFilter;
+
// We can add some more params for a specific FragmentInstance
// So that we can make different FragmentInstance owns different data range.
@@ -87,6 +90,10 @@ public class FragmentInstance implements IConsensusRequest {
return "<No downstream>";
}
+ public Filter getTimeFilter() {
+ return timeFilter;
+ }
+
public String toString() {
StringBuilder ret = new StringBuilder();
ret.append(
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/PlanVisitor.java
b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/PlanVisitor.java
index 916f516..c035a01 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/PlanVisitor.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/PlanVisitor.java
@@ -29,6 +29,7 @@ import
org.apache.iotdb.db.mpp.sql.planner.plan.node.process.LimitNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.process.OffsetNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.process.SortNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.process.TimeJoinNode;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.sink.FragmentSinkNode;
import
org.apache.iotdb.db.mpp.sql.planner.plan.node.source.SeriesAggregateScanNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.source.SeriesScanNode;
@@ -91,4 +92,8 @@ public abstract class PlanVisitor<R, C> {
public R visitExchange(ExchangeNode node, C context) {
return visitPlan(node, context);
}
+
+ public R visitFragmentSink(FragmentSinkNode node, C context) {
+ return visitPlan(node, context);
+ }
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/TimeJoinNode.java
b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/TimeJoinNode.java
index 67e874e..f2c0d84 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/TimeJoinNode.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/TimeJoinNode.java
@@ -25,6 +25,7 @@ import
org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeIdAllocator;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanVisitor;
import org.apache.iotdb.db.mpp.sql.statement.component.FilterNullPolicy;
import org.apache.iotdb.db.mpp.sql.statement.component.OrderBy;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.utils.Pair;
import java.nio.ByteBuffer;
@@ -51,6 +52,9 @@ public class TimeJoinNode extends ProcessNode {
private List<PlanNode> children;
+ // output columns' data type
+ private List<TSDataType> types;
+
public TimeJoinNode(PlanNodeId id, OrderBy mergeOrder, FilterNullPolicy
filterNullPolicy) {
super(id);
this.mergeOrder = mergeOrder;
@@ -131,6 +135,10 @@ public class TimeJoinNode extends ProcessNode {
return "TimeJoinNode-" + this.getId();
}
+ public List<TSDataType> getTypes() {
+ return types;
+ }
+
@TestOnly
public Pair<String, List<String>> print() {
String title = String.format("[TimeJoinNode (%s)]", this.getId());
diff --git
a/server/src/test/java/org/apache/iotdb/db/mpp/operator/LimitOperatorTest.java
b/server/src/test/java/org/apache/iotdb/db/mpp/operator/LimitOperatorTest.java
index b9b439c..e901ab6 100644
---
a/server/src/test/java/org/apache/iotdb/db/mpp/operator/LimitOperatorTest.java
+++
b/server/src/test/java/org/apache/iotdb/db/mpp/operator/LimitOperatorTest.java
@@ -123,7 +123,6 @@ public class LimitOperatorTest {
fragmentInstanceContext.getOperatorContexts().get(2),
Arrays.asList(seriesScanOperator1, seriesScanOperator2),
OrderBy.TIMESTAMP_ASC,
- 2,
Arrays.asList(TSDataType.INT32, TSDataType.INT32));
LimitOperator limitOperator =
diff --git
a/server/src/test/java/org/apache/iotdb/db/mpp/operator/TimeJoinOperatorTest.java
b/server/src/test/java/org/apache/iotdb/db/mpp/operator/TimeJoinOperatorTest.java
index 5548b79..a22554f 100644
---
a/server/src/test/java/org/apache/iotdb/db/mpp/operator/TimeJoinOperatorTest.java
+++
b/server/src/test/java/org/apache/iotdb/db/mpp/operator/TimeJoinOperatorTest.java
@@ -117,7 +117,6 @@ public class TimeJoinOperatorTest {
fragmentInstanceContext.getOperatorContexts().get(2),
Arrays.asList(seriesScanOperator1, seriesScanOperator2),
OrderBy.TIMESTAMP_ASC,
- 2,
Arrays.asList(TSDataType.INT32, TSDataType.INT32));
int count = 0;
while (timeJoinOperator.hasNext()) {
diff --git
a/server/src/test/java/org/apache/iotdb/db/mpp/schedule/DefaultTaskSchedulerTest.java
b/server/src/test/java/org/apache/iotdb/db/mpp/schedule/DefaultTaskSchedulerTest.java
index 5d385ae..bade6dd 100644
---
a/server/src/test/java/org/apache/iotdb/db/mpp/schedule/DefaultTaskSchedulerTest.java
+++
b/server/src/test/java/org/apache/iotdb/db/mpp/schedule/DefaultTaskSchedulerTest.java
@@ -41,7 +41,7 @@ import java.util.concurrent.TimeUnit;
public class DefaultTaskSchedulerTest {
- private final FragmentInstanceManager manager =
FragmentInstanceManager.getInstance();
+ private final FragmentInstanceScheduler manager =
FragmentInstanceScheduler.getInstance();
@After
public void tearDown() {
diff --git
a/server/src/test/java/org/apache/iotdb/db/mpp/schedule/FragmentInstanceManagerTest.java
b/server/src/test/java/org/apache/iotdb/db/mpp/schedule/FragmentInstanceManagerTest.java
index 0233e4c..c79d376 100644
---
a/server/src/test/java/org/apache/iotdb/db/mpp/schedule/FragmentInstanceManagerTest.java
+++
b/server/src/test/java/org/apache/iotdb/db/mpp/schedule/FragmentInstanceManagerTest.java
@@ -38,7 +38,7 @@ import java.util.List;
public class FragmentInstanceManagerTest {
- private final FragmentInstanceManager manager =
FragmentInstanceManager.getInstance();
+ private final FragmentInstanceScheduler manager =
FragmentInstanceScheduler.getInstance();
@After
public void tearDown() {