This is an automated email from the ASF dual-hosted git repository. jackietien pushed a commit to branch stable-mpp in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 86690f209e67a5f7270c7e3018aa32042b4a113a Merge: fb52130d2b 67dafed0e6 Author: JackieTien97 <[email protected]> AuthorDate: Wed Apr 20 17:54:57 2022 +0800 Add FragmentInstanceStateMachine .../iotdb/cluster/coordinator/Coordinator.java | 6 +- .../log/manage/PartitionedSnapshotLogManager.java | 2 +- .../iotdb/cluster/metadata/CSchemaProcessor.java | 37 - .../apache/iotdb/cluster/metadata/MetaPuller.java | 3 +- .../iotdb/cluster/query/LocalQueryExecutor.java | 8 +- .../cluster/server/member/DataGroupMember.java | 11 +- .../cluster/server/member/MetaGroupMember.java | 6 +- .../cluster/server/service/BaseAsyncService.java | 4 +- .../apache/iotdb/cluster/utils/StatusUtils.java | 6 +- .../FilePartitionedSnapshotLogManagerTest.java | 2 + .../cluster/server/member/DataGroupMemberTest.java | 2 + .../cluster/server/member/MetaGroupMemberTest.java | 5 +- .../consensus/response/DataNodesInfoDataSet.java | 4 +- .../statemachine/PartitionRegionStateMachine.java | 17 + .../confignode/consensus/RatisConsensusDemo.java | 6 +- .../manager/ConfigManagerManualTest.java | 6 +- .../server/ConfigNodeRPCServerProcessorTest.java | 26 +- .../iotdb/consensus/common/SnapshotMeta.java | 40 +- .../iotdb/consensus/ratis/RatisConsensus.java | 4 +- .../consensus/standalone/StandAloneServerImpl.java | 18 + .../consensus/statemachine/EmptyStateMachine.java | 18 + .../consensus/statemachine/IStateMachine.java | 49 + .../iotdb/consensus/ratis/RatisConsensusTest.java | 15 + .../standalone/StandAloneConsensusTest.java | 15 + distribution/src/assembly/all.xml | 4 + distribution/src/assembly/server.xml | 4 + docs/UserGuide/Maintenance-Tools/Metric-Tool.md | 12 +- .../Maintenance-Tools/SchemaFileSketch-Tool.md | 38 + .../Ecosystem Integration/Grafana Plugin.md | 143 +- docs/zh/UserGuide/Maintenance-Tools/Metric-Tool.md | 12 +- .../Maintenance-Tools/SchemaFileSketch-Tool.md | 35 + .../Apache IoTDB Dashboard v0.13.1.json | 1527 ++++++++++++++++++++ .../Apache IoTDB Dashboard v0.14.0.json | 1527 ++++++++++++++++++++ .../iotdb/influxdb/protocol/dto/SessionPoint.java | 8 +- .../iotdb/influxdb/session/InfluxDBSession.java | 6 +- .../influxdb/integration/IoTDBInfluxDBIT.java | 4 +- .../iotdb/commons/partition/DataPartition.java | 17 +- .../{PartitionInfo.java => Partition.java} | 27 +- .../iotdb/commons/partition/RegionReplicaSet.java | 6 +- .../iotdb/commons/partition/SchemaPartition.java | 17 +- .../SchemaFileSketcher.bat} | 4 +- .../mLogParser.sh => schema/SchemaFileSketcher.sh} | 4 +- .../tools/{mlog => schema}/mLogParser.bat | 2 +- .../resources/tools/{mlog => schema}/mLogParser.sh | 0 .../org/apache/iotdb/db/conf/IoTDBConfigCheck.java | 8 + .../statemachine/DataRegionStateMachine.java | 17 + .../statemachine/SchemaRegionStateMachine.java | 22 +- .../engine/compaction/CompactionTaskManager.java | 6 +- .../db/engine/storagegroup/TsFileProcessor.java | 214 ++- .../apache/iotdb/db/metadata/LocalConfigNode.java | 2 +- .../db/metadata/LocalSchemaPartitionTable.java | 6 + .../iotdb/db/metadata/LocalSchemaProcessor.java | 39 - .../iotdb/db/metadata/mtree/IMTreeBelowSG.java | 307 ++++ ...reeBelowSG.java => MTreeBelowSGCachedImpl.java} | 134 +- ...reeBelowSG.java => MTreeBelowSGMemoryImpl.java} | 909 +++++------- .../mtree/store/disk/MTreeFlushTaskManager.java | 2 +- .../mtree/store/disk/MTreeReleaseTaskManager.java | 2 +- .../mtree/store/disk/schemafile/ISegment.java | 2 + .../mtree/store/disk/schemafile/RecordUtils.java | 24 +- .../mtree/store/disk/schemafile/SchemaFile.java | 37 +- .../mtree/store/disk/schemafile/SchemaPage.java | 9 +- .../mtree/store/disk/schemafile/Segment.java | 51 + .../db/metadata/schemaregion/ISchemaRegion.java | 247 +++- .../db/metadata/schemaregion/SchemaEngine.java | 5 +- ...hemaRegion.java => SchemaRegionMemoryImpl.java} | 600 +++----- ...Region.java => SchemaRegionSchemaFileImpl.java} | 93 +- .../schemaregion/rocksdb/RSchemaRegion.java | 81 +- .../SchemaExecutionVisitor.java} | 59 +- .../iotdb/db/mpp/buffer/DataBlockManager.java | 26 +- .../apache/iotdb/db/mpp/buffer/ISinkHandle.java | 14 +- .../org/apache/iotdb/db/mpp/buffer/SinkHandle.java | 37 +- .../apache/iotdb/db/mpp/buffer/SourceHandle.java | 27 +- .../apache/iotdb/db/mpp/buffer/StubSinkHandle.java | 13 +- .../apache/iotdb/db/mpp/execution/Coordinator.java | 16 +- .../apache/iotdb/db/mpp/execution/DataDriver.java | 19 +- .../iotdb/db/mpp/execution/DataDriverContext.java | 1 + .../org/apache/iotdb/db/mpp/execution/Driver.java | 112 +- .../iotdb/db/mpp/execution/DriverContext.java | 2 - .../db/mpp/execution/FragmentInstanceContext.java | 12 +- .../mpp/execution/FragmentInstanceExecution.java | 79 +- .../db/mpp/execution/FragmentInstanceManager.java | 34 +- .../execution/FragmentInstanceStateMachine.java | 49 +- .../iotdb/db/mpp/execution/IQueryExecution.java | 2 + .../iotdb/db/mpp/execution/QueryExecution.java | 43 +- .../iotdb/db/mpp/execution/QueryStateMachine.java | 14 +- .../iotdb/db/mpp/execution/SchemaDriver.java | 5 +- .../db/mpp/execution/SchemaDriverContext.java | 1 + .../db/mpp/execution/config/ConfigExecution.java | 59 +- .../config/ConfigTaskVisitor.java} | 24 +- .../iotdb/db/mpp/execution/config/IConfigTask.java | 2 +- .../mpp/execution/scheduler/ClusterScheduler.java | 32 +- .../mpp/execution/scheduler/IQueryTerminator.java | 4 +- .../scheduler/SimpleFragInstanceDispatcher.java | 51 +- .../execution/scheduler/SimpleQueryTerminator.java | 50 +- .../org/apache/iotdb/db/mpp/operator/Operator.java | 2 - .../mpp/operator/schema/SchemaFetchOperator.java | 1 - .../source/SeriesAggregateScanOperator.java | 420 +++++- .../db/mpp/operator/source/SeriesScanOperator.java | 2 +- .../db/mpp/operator/source/SeriesScanUtil.java | 10 +- .../FragmentInstanceAbortedException.java} | 28 +- .../db/mpp/schedule/FragmentInstanceScheduler.java | 15 +- .../mpp/schedule/FragmentInstanceTaskExecutor.java | 1 + .../schedule/FragmentInstanceTimeoutSentinel.java | 1 + .../db/mpp/schedule/queue/L1PriorityQueue.java | 37 +- .../db/mpp/schedule/queue/L2PriorityQueue.java | 66 +- .../db/mpp/schedule/task/FragmentInstanceTask.java | 18 +- .../mpp/schedule/task/FragmentInstanceTaskID.java | 10 +- .../db/mpp/sql/analyze/ClusterSchemaFetcher.java | 4 +- .../db/mpp/sql/planner/DistributionPlanner.java | 20 + .../db/mpp/sql/planner/LocalExecutionPlanner.java | 28 +- .../iotdb/db/mpp/sql/planner/QueryPlanBuilder.java | 5 + .../db/mpp/sql/planner/plan/FragmentInstance.java | 1 + .../db/mpp/sql/planner/plan/node/PlanVisitor.java | 13 +- .../node/metedata/write/AlterTimeSeriesNode.java | 41 +- .../node/metedata/write/CreateTimeSeriesNode.java | 1 + .../plan/node/source/SeriesAggregateScanNode.java | 45 +- .../statement/ConfigStatement.java} | 10 +- .../db/mpp/sql/statement/StatementVisitor.java | 5 + .../statement/component/GroupByTimeComponent.java | 73 + .../metadata/SetStorageGroupStatement.java | 10 +- .../db/protocol/influxdb/handler/QueryHandler.java | 4 +- .../influxdb/util/JacksonUtils.java} | 36 +- .../db/protocol/influxdb/util/StringUtils.java | 3 +- .../iotdb/db/query/control/SessionManager.java | 7 +- .../db/query/dataset/AlignByDeviceDataSet.java | 4 +- .../query/dataset/groupby/GroupByFillDataSet.java | 8 +- .../query/dataset/groupby/GroupByTimeDataSet.java | 30 +- .../dataset/groupby/GroupByTimeEngineDataSet.java | 24 +- .../groupby/GroupByWithValueFilterDataSet.java | 6 +- .../groupby/GroupByWithoutValueFilterDataSet.java | 9 +- .../iotdb/db/query/reader/series/SeriesReader.java | 3 + .../java/org/apache/iotdb/db/service/DataNode.java | 4 +- .../java/org/apache/iotdb/db/service/IoTDB.java | 2 +- .../thrift/impl/DataNodeTSIServiceImpl.java | 11 +- .../service/thrift/impl/InternalServiceImpl.java | 6 +- .../db/service/thrift/impl/TSServiceImpl.java | 47 +- .../db/sync/sender/manager/SchemaSyncManager.java | 6 +- .../db/tools/{mlog => schema}/MLogParser.java | 2 +- .../db/tools/schema/SchemaFileSketchTool.java | 165 +++ .../apache/iotdb/db/utils/QueryDataSetUtils.java | 2 +- .../timerangeiterator/AggrWindowIterator.java | 38 +- .../timerangeiterator/ITimeRangeIterator.java | 8 +- .../timerangeiterator/PreAggrWindowIterator.java | 38 +- .../PreAggrWindowWithNaturalMonthIterator.java | 50 +- .../SingleTimeWindowIterator.java | 65 + .../org/apache/iotdb/db/wal/buffer/WALBuffer.java | 143 +- .../java/org/apache/iotdb/db/wal/node/WALNode.java | 6 +- .../db/engine/storagegroup/DataRegionTest.java | 910 ++++++++++++ .../engine/storagegroup/TsFileProcessorTest.java | 17 + ...ocessorTest.java => TsFileProcessorV2Test.java} | 104 +- .../iotdb/db/metadata/mtree/MTreeBelowSGTest.java | 18 +- .../iotdb/db/mpp/buffer/SourceHandleTest.java | 6 +- .../db/mpp/execution/ConfigExecutionTest.java | 130 ++ .../iotdb/db/mpp/execution/DataDriverTest.java | 10 +- .../iotdb/db/mpp/operator/LimitOperatorTest.java | 11 +- .../operator/SeriesAggregateScanOperatorTest.java | 373 +++++ .../db/mpp/operator/SeriesScanOperatorTest.java | 17 +- .../db/mpp/operator/TimeJoinOperatorTest.java | 11 +- .../operator/schema/SchemaScanOperatorTest.java | 20 +- .../db/mpp/schedule/DefaultTaskSchedulerTest.java | 18 + .../schedule/FragmentInstanceSchedulerTest.java | 20 + .../FragmentInstanceTimeoutSentinelTest.java | 55 +- .../db/mpp/schedule/queue/L1PriorityQueueTest.java | 22 + .../db/mpp/schedule/queue/L2PriorityQueueTest.java | 27 + .../db/mpp/sql/plan/DistributionPlannerTest.java | 39 + .../db/mpp/sql/plan/QueryLogicalPlanUtil.java | 8 + .../source/SeriesAggregateScanNodeSerdeTest.java | 7 +- .../dataset/groupby/GroupByTimeDataSetTest.java | 74 +- .../query/reader/series/SeriesReaderTestUtil.java | 8 + .../iotdb/db/service/InternalServiceImplTest.java | 117 +- .../org/apache/iotdb/db/tools/MLogParserTest.java | 2 +- .../iotdb/db/tools/SchemaFileSketchTest.java | 158 ++ .../iotdb/db/utils/TimeRangeIteratorTest.java | 230 ++- .../db/wal/recover/WALRecoverManagerTest.java | 6 +- .../org/apache/iotdb/rpc/RedirectException.java | 14 +- .../main/java/org/apache/iotdb/rpc/RpcUtils.java | 4 +- .../java/org/apache/iotdb/session/Session.java | 52 +- .../apache/iotdb/session/SessionConnection.java | 16 +- .../apache/iotdb/session/util/SessionUtils.java | 12 +- .../apache/iotdb/session/SessionCacheLeaderUT.java | 28 +- spark-iotdb-connector/pom.xml | 2 +- .../src/main/thrift/confignode.thrift | 4 +- thrift/src/main/thrift/common.thrift | 6 +- .../iotdb/tsfile/read/common/block/TsBlock.java | 22 +- .../tsfile/read/common/block/TsBlockBuilder.java | 5 + .../common/block/column/BinaryColumnBuilder.java | 11 + .../common/block/column/BooleanColumnBuilder.java | 11 + .../read/common/block/column/ColumnBuilder.java | 5 + .../common/block/column/DoubleColumnBuilder.java | 11 + .../common/block/column/FloatColumnBuilder.java | 11 + .../read/common/block/column/IntColumnBuilder.java | 11 + .../common/block/column/LongColumnBuilder.java | 11 + .../read/common/block/column/TimeColumn.java | 4 + .../common/block/column/TimeColumnBuilder.java | 11 + zeppelin-interpreter/pom.xml | 1 - 195 files changed, 9032 insertions(+), 2543 deletions(-) diff --cc server/src/main/java/org/apache/iotdb/db/mpp/buffer/DataBlockManager.java index ab7daa5289,573771fe81..7c3f24509d --- a/server/src/main/java/org/apache/iotdb/db/mpp/buffer/DataBlockManager.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/buffer/DataBlockManager.java @@@ -61,6 -61,6 +61,8 @@@ public class DataBlockManager implement void onClosed(SinkHandle sinkHandle); void onAborted(SinkHandle sinkHandle); ++ ++ void onFailed(Throwable t); } /** Handle thrift communications. */ @@@ -166,6 -166,6 +168,7 @@@ /** Listen to the state changes of a source handle. */ class SourceHandleListenerImpl implements SourceHandleListener { ++ @Override public void onFinished(SourceHandle sourceHandle) { logger.info("Release resources of finished source handle {}", sourceHandle); @@@ -206,12 -208,12 +211,12 @@@ logger.info("Resources of finished sink handle {} has already been released", sinkHandle); } sinkHandles.remove(sinkHandle.getLocalFragmentInstanceId()); -- context.finish(); ++ context.finished(); } @Override public void onClosed(SinkHandle sinkHandle) { -- context.flushing(); ++ context.transitionToFlushing(); } @Override @@@ -222,6 -224,6 +227,11 @@@ } sinkHandles.remove(sinkHandle.getLocalFragmentInstanceId()); } ++ ++ @Override ++ public void onFailed(Throwable t) { ++ context.failed(t); ++ } } private final LocalMemoryManager localMemoryManager; diff --cc server/src/main/java/org/apache/iotdb/db/mpp/buffer/ISinkHandle.java index 1d037678ed,6300c5beef..05d12e4dfa --- a/server/src/main/java/org/apache/iotdb/db/mpp/buffer/ISinkHandle.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/buffer/ISinkHandle.java @@@ -24,9 -24,8 +24,8 @@@ import com.google.common.util.concurren import java.io.IOException; import java.util.List; - import java.util.Optional; -public interface ISinkHandle extends AutoCloseable { +public interface ISinkHandle { /** Get the total amount of memory used by buffered tsblocks. */ long getBufferRetainedSizeInBytes(); @@@ -71,7 -70,8 +70,7 @@@ * downstream instances. A {@link RuntimeException} will be thrown if any exception happened * during the data transmission. */ - @Override -- void close() throws IOException; ++ void close(); /** Abort the sink handle, discarding all tsblocks which may still be in memory buffer. */ void abort(); diff --cc server/src/main/java/org/apache/iotdb/db/mpp/buffer/SinkHandle.java index b2fd70459d,a72dcb9cb6..c6e3e64739 --- a/server/src/main/java/org/apache/iotdb/db/mpp/buffer/SinkHandle.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/buffer/SinkHandle.java @@@ -74,7 -73,7 +73,6 @@@ public class SinkHandle implements ISin private long bufferRetainedSizeInBytes; private boolean closed; private boolean noMoreTsBlocks; -- private Throwable throwable; public SinkHandle( String remoteHostname, @@@ -191,11 -193,11 +189,8 @@@ } @Override -- public void close() throws IOException { ++ public void close() { logger.info("Sink handle {} is being closed.", this); -- if (throwable != null) { -- throw new IOException(throwable); -- } if (closed) { return; } @@@ -207,7 -209,7 +202,7 @@@ try { sendEndOfDataBlockEvent(); } catch (TException e) { -- throw new IOException(e); ++ throw new RuntimeException("Send EndOfDataBlockEvent failed", e); } logger.info("Sink handle {} is closed.", this); } @@@ -249,7 -243,7 +236,7 @@@ @Override public boolean isFinished() { -- return throwable == null && noMoreTsBlocks && sequenceIdToTsBlock.isEmpty(); ++ return noMoreTsBlocks && sequenceIdToTsBlock.isEmpty(); } @Override @@@ -364,7 -358,7 +351,7 @@@ try { client.onNewDataBlockEvent(newDataBlockEvent); break; -- } catch (TException e) { ++ } catch (Throwable e) { logger.error( "Failed to send new data block event to plan node {} of {} due to {}, attempt times: {}", remotePlanNodeId, @@@ -373,9 -367,9 +360,7 @@@ attempt, e); if (attempt == MAX_ATTEMPT_TIMES) { -- synchronized (this) { -- throwable = e; -- } ++ sinkHandleListener.onFailed(e); } } } diff --cc server/src/main/java/org/apache/iotdb/db/mpp/buffer/StubSinkHandle.java index c4bc0e377a,9f3b9240c2..49a13eddd7 --- a/server/src/main/java/org/apache/iotdb/db/mpp/buffer/StubSinkHandle.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/buffer/StubSinkHandle.java @@@ -88,7 -87,7 +87,7 @@@ public class StubSinkHandle implements return; } closed = true; -- instanceContext.flushing(); ++ instanceContext.transitionToFlushing(); } @Override diff --cc server/src/main/java/org/apache/iotdb/db/mpp/execution/DataDriver.java index 2921c9d889,5ae66b14d7..e6e8e8083a --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/DataDriver.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/DataDriver.java @@@ -18,7 -18,7 +18,6 @@@ */ package org.apache.iotdb.db.mpp.execution; - import com.google.common.util.concurrent.SettableFuture; -import org.apache.iotdb.commons.exception.IoTDBException; import org.apache.iotdb.db.engine.querycontext.QueryDataSource; import org.apache.iotdb.db.engine.storagegroup.DataRegion; import org.apache.iotdb.db.engine.storagegroup.TsFileResource; @@@ -29,30 -29,81 +28,35 @@@ import org.apache.iotdb.db.mpp.buffer.I import org.apache.iotdb.db.mpp.operator.Operator; import org.apache.iotdb.db.mpp.operator.source.DataSourceOperator; import org.apache.iotdb.db.query.control.FileReaderManager; -import org.apache.iotdb.tsfile.read.common.block.TsBlock; -import com.google.common.util.concurrent.ListenableFuture; + import com.google.common.util.concurrent.SettableFuture; -import io.airlift.units.Duration; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; + import javax.annotation.concurrent.NotThreadSafe; + -import java.io.IOException; -import java.util.Collections; import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Set; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; -import static com.google.common.util.concurrent.MoreExecutors.directExecutor; -import static org.apache.iotdb.db.mpp.operator.Operator.NOT_BLOCKED; -- + /** + * One dataDriver is responsible for one FragmentInstance which is for data query, which may + * contains several series. + */ @NotThreadSafe -public class DataDriver implements Driver { - - private static final Logger logger = LoggerFactory.getLogger(DataDriver.class); - - private final Operator root; - private final ISinkHandle sinkHandle; - private final DataDriverContext driverContext; +public class DataDriver extends Driver { private boolean init; - private boolean closed; /** closed tsfile used in this fragment instance */ private Set<TsFileResource> closedFilePaths; /** unClosed tsfile used in this fragment instance */ private Set<TsFileResource> unClosedFilePaths; - private final AtomicReference<SettableFuture<Void>> driverBlockedFuture = new AtomicReference<>(); -- public DataDriver(Operator root, ISinkHandle sinkHandle, DataDriverContext driverContext) { - this.root = root; - this.sinkHandle = sinkHandle; - this.driverContext = driverContext; + super(root, sinkHandle, driverContext); this.closedFilePaths = new HashSet<>(); this.unClosedFilePaths = new HashSet<>(); - // initially the driverBlockedFuture is not blocked (it is completed) - SettableFuture<Void> future = SettableFuture.create(); - future.set(null); - driverBlockedFuture.set(future); - } - - @Override - public boolean isFinished() { - try { - boolean isFinished = - closed || (driverBlockedFuture.get().isDone() && root != null && root.isFinished()); - if (isFinished) { - close(); - } - return isFinished; - } catch (Throwable t) { - logger.error( - "Failed to query whether the data driver {} is finished", driverContext.getId(), t); - driverContext.failed(t); - close(); - return true; - } } @Override @@@ -61,39 -117,77 +65,39 @@@ try { initialize(); } catch (Throwable t) { - logger.error( + LOGGER.error( "Failed to do the initialization for fragment instance {} ", driverContext.getId(), t); driverContext.failed(t); - close(); blockedFuture.setException(t); - return blockedFuture; + return false; } } - - // if the driver is blocked we don't need to continue - if (!blockedFuture.isDone()) { - return blockedFuture; - } - - long maxRuntime = duration.roundTo(TimeUnit.NANOSECONDS); - - long start = System.nanoTime(); - try { - do { - ListenableFuture<Void> future = processInternal(); - if (!future.isDone()) { - return updateDriverBlockedFuture(future); - } - } while (System.nanoTime() - start < maxRuntime && !root.isFinished()); - } catch (Throwable t) { - logger.error("Failed to execute fragment instance {}", driverContext.getId(), t); - driverContext.failed(t); - close(); - blockedFuture.setException(t); - return blockedFuture; - } - return NOT_BLOCKED; - } - - @Override - public FragmentInstanceId getInfo() { - return driverContext.getId(); + return true; } + /** + * All file paths used by this fragment instance must be cleared and thus the usage reference must + * be decreased. + */ @Override - public void close() { - if (closed) { - return; + protected void releaseResource() { + for (TsFileResource tsFile : closedFilePaths) { + FileReaderManager.getInstance().decreaseFileReaderReference(tsFile, true); } - closed = true; - try { - if (root != null) { - root.close(); - } - if (sinkHandle != null) { - sinkHandle.close(); - } - } catch (Throwable t) { - logger.error("Failed to closed driver {}", driverContext.getId(), t); - driverContext.failed(t); - } finally { - removeUsedFilesForQuery(); + closedFilePaths = null; + for (TsFileResource tsFile : unClosedFilePaths) { + FileReaderManager.getInstance().decreaseFileReaderReference(tsFile, true); } - } - - @Override - public void failed(Throwable t) { - driverContext.failed(t); + unClosedFilePaths = null; } - /** * init seq file list and unseq file list in QueryDataSource and set it into each SourceNode TODO * we should change all the blocked lock operation into tryLock */ private void initialize() throws QueryProcessException { - List<DataSourceOperator> sourceOperators = ((DataDriverContext)driverContext).getSourceOperators(); - List<DataSourceOperator> sourceOperators = driverContext.getSourceOperators(); ++ List<DataSourceOperator> sourceOperators = ++ ((DataDriverContext) driverContext).getSourceOperators(); if (sourceOperators != null && !sourceOperators.isEmpty()) { QueryDataSource dataSource = initQueryDataSourceCache(); sourceOperators.forEach( @@@ -121,9 -214,9 +125,7 @@@ dataRegion.readLock(); try { List<PartialPath> pathList = - context.getPaths().stream() - driverContext.getPaths().stream() -- .map(IDTable::translateQueryPath) -- .collect(Collectors.toList()); ++ context.getPaths().stream().map(IDTable::translateQueryPath).collect(Collectors.toList()); // when all the selected series are under the same device, the QueryDataSource will be // filtered according to timeIndex Set<String> selectedDeviceIdSet = @@@ -188,5 -296,39 +190,4 @@@ FileReaderManager.getInstance().increaseFileReaderReference(tsFile, isClosed); } } - - private ListenableFuture<Void> processInternal() throws IOException, IoTDBException { - ListenableFuture<Void> blocked = root.isBlocked(); - if (!blocked.isDone()) { - return blocked; - } - blocked = sinkHandle.isFull(); - if (!blocked.isDone()) { - return blocked; - } - if (root.hasNext()) { - TsBlock tsBlock = root.next(); - if (tsBlock != null && !tsBlock.isEmpty()) { - sinkHandle.send(Collections.singletonList(tsBlock)); - } - } - return NOT_BLOCKED; - } - - private ListenableFuture<Void> updateDriverBlockedFuture( - ListenableFuture<Void> sourceBlockedFuture) { - // driverBlockedFuture will be completed as soon as the sourceBlockedFuture is completed - // or any of the operators gets a memory revocation request - SettableFuture<Void> newDriverBlockedFuture = SettableFuture.create(); - driverBlockedFuture.set(newDriverBlockedFuture); - sourceBlockedFuture.addListener(() -> newDriverBlockedFuture.set(null), directExecutor()); - - // TODO Although we don't have memory management for operator now, we should consider it for - // future - // it's possible that memory revoking is requested for some operator - // before we update driverBlockedFuture above and we don't want to miss that - // notification, so we check to see whether that's the case before returning. -- - return newDriverBlockedFuture; - } } diff --cc server/src/main/java/org/apache/iotdb/db/mpp/execution/Driver.java index fd2fef8b3a,f211ce593c..8687627055 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/Driver.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/Driver.java @@@ -18,88 -18,25 +18,90 @@@ */ package org.apache.iotdb.db.mpp.execution; - import com.google.common.collect.ImmutableList; - import com.google.common.util.concurrent.ListenableFuture; - import com.google.common.util.concurrent.SettableFuture; - import io.airlift.units.Duration; +import org.apache.iotdb.db.mpp.buffer.ISinkHandle; import org.apache.iotdb.db.mpp.common.FragmentInstanceId; +import org.apache.iotdb.db.mpp.operator.Operator; +import org.apache.iotdb.tsfile.read.common.block.TsBlock; + ++import com.google.common.collect.ImmutableList; + import com.google.common.util.concurrent.ListenableFuture; ++import com.google.common.util.concurrent.SettableFuture; + import io.airlift.units.Duration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; -import java.io.Closeable; +import javax.annotation.concurrent.GuardedBy; ++ +import java.util.Collections; +import java.util.List; +import java.util.Optional; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.locks.ReentrantLock; +import java.util.function.Supplier; + +import static com.google.common.base.Preconditions.checkState; +import static com.google.common.base.Throwables.throwIfUnchecked; +import static com.google.common.util.concurrent.MoreExecutors.directExecutor; +import static java.lang.Boolean.TRUE; +import static org.apache.iotdb.db.mpp.operator.Operator.NOT_BLOCKED; /** * Driver encapsulates some methods which are necessary for execution scheduler to run a fragment * instance */ -public interface Driver extends Closeable { +public abstract class Driver { + + protected static final Logger LOGGER = LoggerFactory.getLogger(Driver.class); + - + protected final Operator root; + protected final ISinkHandle sinkHandle; + protected final DriverContext driverContext; - protected final AtomicReference<SettableFuture<Void>> driverBlockedFuture = new AtomicReference<>(); ++ protected final AtomicReference<SettableFuture<Void>> driverBlockedFuture = ++ new AtomicReference<>(); + protected final AtomicReference<State> state = new AtomicReference<>(State.ALIVE); + + protected final DriverLock exclusiveLock = new DriverLock(); + + protected enum State { - ALIVE, NEED_DESTRUCTION, DESTROYED ++ ALIVE, ++ NEED_DESTRUCTION, ++ DESTROYED + } + + public Driver(Operator root, ISinkHandle sinkHandle, DriverContext driverContext) { + this.root = root; + this.sinkHandle = sinkHandle; + this.driverContext = driverContext; + + // initially the driverBlockedFuture is not blocked (it is completed) + SettableFuture<Void> future = SettableFuture.create(); + future.set(null); + driverBlockedFuture.set(future); + } /** * Used to judge whether this fragment instance should be scheduled for execution anymore * * @return true if the FragmentInstance is done or terminated due to failure, otherwise false. */ - boolean isFinished(); + public boolean isFinished() { + checkLockNotHeld("Cannot check finished status while holding the driver lock"); + + // if we can get the lock, attempt a clean shutdown; otherwise someone else will shutdown + Optional<Boolean> result = tryWithLockUnInterruptibly(this::isFinishedInternal); + return result.orElseGet(() -> state.get() != State.ALIVE || driverContext.isDone()); + } + + /** + * do initialization + * + * @return true if init succeed, false otherwise + */ + protected abstract boolean init(SettableFuture<Void> blockedFuture); + - /** - * release resource this driver used - */ ++ /** release resource this driver used */ + protected abstract void releaseResource(); /** * run the fragment instance for {@param duration} time slice, the time of this run is likely not @@@ -107,351 -44,27 +109,355 @@@ * * @param duration how long should this fragment instance run * @return the returned ListenableFuture<Void> is used to represent status of this processing if - * isDone() return true, meaning that this fragment instance is not blocked and is ready for - * next processing otherwise, meaning that this fragment instance is blocked and not ready for - * next processing. + * isDone() return true, meaning that this fragment instance is not blocked and is ready for + * next processing otherwise, meaning that this fragment instance is blocked and not ready for + * next processing. */ - ListenableFuture<Void> processFor(Duration duration); + public ListenableFuture<Void> processFor(Duration duration) { + + SettableFuture<Void> blockedFuture = driverBlockedFuture.get(); + // initialization may be time-consuming, so we keep it in the processFor method + // in normal case, it won't cause deadlock and should finish soon, otherwise it will be a + // critical bug + if (!init(blockedFuture)) { + return blockedFuture; + } + + // if the driver is blocked we don't need to continue + if (!blockedFuture.isDone()) { + return blockedFuture; + } + + long maxRuntime = duration.roundTo(TimeUnit.NANOSECONDS); + - Optional<ListenableFuture<Void>> result = tryWithLock(100, TimeUnit.MILLISECONDS, true, () -> { - long start = System.nanoTime(); - do { - ListenableFuture<Void> future = processInternal(); - if (!future.isDone()) { - return updateDriverBlockedFuture(future); - } - } - while (System.nanoTime() - start < maxRuntime && !isFinishedInternal()); - return NOT_BLOCKED; - }); ++ Optional<ListenableFuture<Void>> result = ++ tryWithLock( ++ 100, ++ TimeUnit.MILLISECONDS, ++ true, ++ () -> { ++ long start = System.nanoTime(); ++ do { ++ ListenableFuture<Void> future = processInternal(); ++ if (!future.isDone()) { ++ return updateDriverBlockedFuture(future); ++ } ++ } while (System.nanoTime() - start < maxRuntime && !isFinishedInternal()); ++ return NOT_BLOCKED; ++ }); + + return result.orElse(NOT_BLOCKED); + } /** * the id information about this Fragment Instance. * * @return a {@link FragmentInstanceId} instance. */ - FragmentInstanceId getInfo(); + public FragmentInstanceId getInfo() { + return driverContext.getId(); + } - /** - * clear resource used by this fragment instance - */ + /** clear resource used by this fragment instance */ - @Override - void close(); + public void close() { + // mark the service for destruction + if (!state.compareAndSet(State.ALIVE, State.NEED_DESTRUCTION)) { + return; + } + + exclusiveLock.interruptCurrentOwner(); + + // if we can get the lock, attempt a clean shutdown; otherwise someone else will shutdown + tryWithLockUnInterruptibly(() -> TRUE); + } /** * fail current driver * * @param t reason cause this failure */ - void failed(Throwable t); + public void failed(Throwable t) { + driverContext.failed(t); + } + + public ISinkHandle getSinkHandle() { + return sinkHandle; + } + + @GuardedBy("exclusiveLock") + private boolean isFinishedInternal() { + checkLockHeld("Lock must be held to call isFinishedInternal"); + - boolean finished = state.get() != State.ALIVE || driverContext.isDone() || root == null || root.isFinished(); ++ boolean finished = ++ state.get() != State.ALIVE || driverContext.isDone() || root == null || root.isFinished(); + if (finished) { + state.compareAndSet(State.ALIVE, State.NEED_DESTRUCTION); + } + return finished; + } + - + private ListenableFuture<Void> processInternal() { + try { + ListenableFuture<Void> blocked = root.isBlocked(); + if (!blocked.isDone()) { + return blocked; + } + blocked = sinkHandle.isFull(); + if (!blocked.isDone()) { + return blocked; + } + if (root.hasNext()) { + TsBlock tsBlock = root.next(); + if (tsBlock != null && !tsBlock.isEmpty()) { + sinkHandle.send(Collections.singletonList(tsBlock)); + } + } + return NOT_BLOCKED; + } catch (Throwable t) { + LOGGER.error("Failed to execute fragment instance {}", driverContext.getId(), t); + List<StackTraceElement> interrupterStack = exclusiveLock.getInterrupterStack(); + if (interrupterStack == null) { + driverContext.failed(t); + throw t; + } + + // Driver thread was interrupted which should only happen if the task is already finished. - // If this becomes the actual cause of a failed query there is a bug in the task state machine. ++ // If this becomes the actual cause of a failed query there is a bug in the task state ++ // machine. + Exception exception = new Exception("Interrupted By"); + exception.setStackTrace(interrupterStack.toArray(new StackTraceElement[0])); + RuntimeException newException = new RuntimeException("Driver was interrupted", exception); + newException.addSuppressed(t); + driverContext.failed(newException); + throw newException; + } + } + + private ListenableFuture<Void> updateDriverBlockedFuture( + ListenableFuture<Void> sourceBlockedFuture) { + // driverBlockedFuture will be completed as soon as the sourceBlockedFuture is completed + // or any of the operators gets a memory revocation request + SettableFuture<Void> newDriverBlockedFuture = SettableFuture.create(); + driverBlockedFuture.set(newDriverBlockedFuture); + sourceBlockedFuture.addListener(() -> newDriverBlockedFuture.set(null), directExecutor()); + + // TODO Although we don't have memory management for operator now, we should consider it for + // future + // it's possible that memory revoking is requested for some operator + // before we update driverBlockedFuture above and we don't want to miss that + // notification, so we check to see whether that's the case before returning. + + return newDriverBlockedFuture; + } + - + private synchronized void checkLockNotHeld(String message) { + checkState(!exclusiveLock.isHeldByCurrentThread(), message); + } + + @GuardedBy("exclusiveLock") + private synchronized void checkLockHeld(String message) { + checkState(exclusiveLock.isHeldByCurrentThread(), message); + } + + /** - * Try to acquire the {@code exclusiveLock} immediately and run a {@code task} - * The task will not be interrupted if the {@code Driver} is closed. - * <p> - * Note: task cannot return null ++ * Try to acquire the {@code exclusiveLock} immediately and run a {@code task} The task will not ++ * be interrupted if the {@code Driver} is closed. ++ * ++ * <p>Note: task cannot return null + */ + private <T> Optional<T> tryWithLockUnInterruptibly(Supplier<T> task) { + return tryWithLock(0, TimeUnit.MILLISECONDS, false, task); + } + + /** - * Try to acquire the {@code exclusiveLock} with {@code timeout} and run a {@code task}. - * If the {@code interruptOnClose} flag is set to {@code true} the {@code task} will be - * interrupted if the {@code Driver} is closed. - * <p> - * Note: task cannot return null ++ * Try to acquire the {@code exclusiveLock} with {@code timeout} and run a {@code task}. If the ++ * {@code interruptOnClose} flag is set to {@code true} the {@code task} will be interrupted if ++ * the {@code Driver} is closed. ++ * ++ * <p>Note: task cannot return null + */ - private <T> Optional<T> tryWithLock(long timeout, TimeUnit unit, boolean interruptOnClose, Supplier<T> task) { ++ private <T> Optional<T> tryWithLock( ++ long timeout, TimeUnit unit, boolean interruptOnClose, Supplier<T> task) { + checkLockNotHeld("Lock cannot be reacquired"); + + boolean acquired = false; + try { + acquired = exclusiveLock.tryLock(timeout, unit, interruptOnClose); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + + if (!acquired) { + return Optional.empty(); + } + + Optional<T> result; + try { + result = Optional.of(task.get()); + } finally { + try { + destroyIfNecessary(); + } finally { + exclusiveLock.unlock(); + } + } + + return result; + } + + @GuardedBy("exclusiveLock") + private void destroyIfNecessary() { + checkLockHeld("Lock must be held to call destroyIfNecessary"); + + if (!state.compareAndSet(State.NEED_DESTRUCTION, State.DESTROYED)) { + return; + } + + // if we get an error while closing a driver, record it and we will throw it at the end + Throwable inFlightException = null; + try { + inFlightException = closeAndDestroyOperators(); + driverContext.finished(); + } catch (Throwable t) { + // this shouldn't happen but be safe - inFlightException = addSuppressedException( - inFlightException, - t, - "Error destroying driver for task %s", - driverContext.getId()); ++ inFlightException = ++ addSuppressedException( ++ inFlightException, t, "Error destroying driver for task %s", driverContext.getId()); + } finally { + releaseResource(); + } + + if (inFlightException != null) { + // this will always be an Error or Runtime + throwIfUnchecked(inFlightException); + throw new RuntimeException(inFlightException); + } + } + + private Throwable closeAndDestroyOperators() { + // record the current interrupted status (and clear the flag); we'll reset it later + boolean wasInterrupted = Thread.interrupted(); + + Throwable inFlightException = null; + + try { + if (root != null) { + root.close(); + } + if (sinkHandle != null) { + sinkHandle.close(); + } + } catch (InterruptedException t) { + // don't record the stack + wasInterrupted = true; + } catch (Throwable t) { + // TODO currently, we won't know exact operator which is failed in closing - inFlightException = addSuppressedException( - inFlightException, - t, - "Error closing operator {} for fragment instance {}", - root.getOperatorContext().getOperatorId(), - driverContext.getId()); ++ inFlightException = ++ addSuppressedException( ++ inFlightException, ++ t, ++ "Error closing operator {} for fragment instance {}", ++ root.getOperatorContext().getOperatorId(), ++ driverContext.getId()); + } finally { + // reset the interrupted flag + if (wasInterrupted) { + Thread.currentThread().interrupt(); + } + } + return inFlightException; + } + - private static Throwable addSuppressedException(Throwable inFlightException, Throwable newException, String message, Object... args) { ++ private static Throwable addSuppressedException( ++ Throwable inFlightException, Throwable newException, String message, Object... args) { + if (newException instanceof Error) { + if (inFlightException == null) { + inFlightException = newException; + } else { + // Self-suppression not permitted + if (inFlightException != newException) { + inFlightException.addSuppressed(newException); + } + } + } else { + // log normal exceptions instead of rethrowing them + LOGGER.error(message, args, newException); + } + return inFlightException; + } + + private static class DriverLock { + private final ReentrantLock lock = new ReentrantLock(); + + @GuardedBy("this") + private Thread currentOwner; ++ + @GuardedBy("this") + private boolean currentOwnerInterruptionAllowed; + + @GuardedBy("this") + private List<StackTraceElement> interrupterStack; + + public boolean isHeldByCurrentThread() { + return lock.isHeldByCurrentThread(); + } + + public boolean tryLock(boolean currentThreadInterruptionAllowed) { + checkState(!lock.isHeldByCurrentThread(), "Lock is not reentrant"); + boolean acquired = lock.tryLock(); + if (acquired) { + setOwner(currentThreadInterruptionAllowed); + } + return acquired; + } + + public boolean tryLock(long timeout, TimeUnit unit, boolean currentThreadInterruptionAllowed) + throws InterruptedException { + checkState(!lock.isHeldByCurrentThread(), "Lock is not reentrant"); + boolean acquired = lock.tryLock(timeout, unit); + if (acquired) { + setOwner(currentThreadInterruptionAllowed); + } + return acquired; + } + + private synchronized void setOwner(boolean interruptionAllowed) { + checkState(lock.isHeldByCurrentThread(), "Current thread does not hold lock"); + currentOwner = Thread.currentThread(); + currentOwnerInterruptionAllowed = interruptionAllowed; + // NOTE: We do not use interrupted stack information to know that another + // thread has attempted to interrupt the driver, and interrupt this new lock + // owner. The interrupted stack information is for debugging purposes only. + // In the case of interruption, the caller should (and does) have a separate + // state to prevent further processing in the Driver. + } + + public synchronized void unlock() { + checkState(lock.isHeldByCurrentThread(), "Current thread does not hold lock"); + currentOwner = null; + currentOwnerInterruptionAllowed = false; + lock.unlock(); + } + + public synchronized List<StackTraceElement> getInterrupterStack() { + return interrupterStack; + } + + public synchronized void interruptCurrentOwner() { + if (!currentOwnerInterruptionAllowed) { + return; + } + // there is a benign race condition here were the lock holder + // can be change between attempting to get lock and grabbing + // the synchronized lock here, but in either case we want to + // interrupt the lock holder thread + if (interrupterStack == null) { + interrupterStack = ImmutableList.copyOf(Thread.currentThread().getStackTrace()); + } + + if (currentOwner != null) { + currentOwner.interrupt(); + } + } + } } diff --cc server/src/main/java/org/apache/iotdb/db/mpp/execution/DriverContext.java index d0bc3bb232,8c20a2c334..8985508b0c --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/DriverContext.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/DriverContext.java @@@ -26,9 -24,6 +26,8 @@@ public class DriverContext private final FragmentInstanceContext fragmentInstanceContext; + private final AtomicBoolean finished = new AtomicBoolean(); + - public DriverContext(FragmentInstanceContext fragmentInstanceContext) { this.fragmentInstanceContext = fragmentInstanceContext; } @@@ -43,15 -38,13 +42,14 @@@ public void failed(Throwable cause) { fragmentInstanceContext.failed(cause); + finished.set(true); } - public void finish() { - fragmentInstanceContext.finish(); + public void finished() { + finished.compareAndSet(false, true); } - - public void flushing() { - fragmentInstanceContext.flushing(); + public boolean isDone() { + return finished.get(); } } diff --cc server/src/main/java/org/apache/iotdb/db/mpp/execution/FragmentInstanceContext.java index c3d33dbd74,b2e51be2bc..d1b81d7be9 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/FragmentInstanceContext.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/FragmentInstanceContext.java @@@ -65,44 -58,10 +64,43 @@@ public class FragmentInstanceContext ex // private final AtomicLong endFullGcCount = new AtomicLong(-1); // private final AtomicLong endFullGcTimeNanos = new AtomicLong(-1); -- public FragmentInstanceContext( - FragmentInstanceId id, FragmentInstanceStateMachine stateMachine) { - FragmentInstanceId id, AtomicReference<FragmentInstanceState> state) { ++ public FragmentInstanceContext(FragmentInstanceId id, FragmentInstanceStateMachine stateMachine) { this.id = id; - this.state = state; + this.stateMachine = stateMachine; + } + + public void start() { + long now = System.currentTimeMillis(); + executionStartTime.compareAndSet(null, now); + startNanos.compareAndSet(0, System.nanoTime()); + + // always update last execution start time + lastExecutionStartTime.set(now); + } + + // the state change listener is added here in a separate initialize() method + // instead of the constructor to prevent leaking the "this" reference to + // another thread, which will cause unsafe publication of this instance. + private void initialize() { + stateMachine.addStateChangeListener(this::updateStatsIfDone); + } + + private void updateStatsIfDone(FragmentInstanceState newState) { + if (newState.isDone()) { + long now = System.currentTimeMillis(); + + // before setting the end times, make sure a start has been recorded + executionStartTime.compareAndSet(null, now); + startNanos.compareAndSet(0, System.nanoTime()); + + // Only update last start time, if the nothing was started + lastExecutionStartTime.compareAndSet(null, now); + + // use compare and set from initial value to avoid overwriting if there + // were a duplicate notification, which shouldn't happen + executionEndTime.compareAndSet(null, now); + endNanos.compareAndSet(0, System.nanoTime()); + } } public OperatorContext addOperatorContext( @@@ -139,10 -98,40 +137,18 @@@ } public void failed(Throwable cause) { - LOGGER.warn("Fragment Instance {} failed.", id, cause); - state.set(FragmentInstanceState.FAILED); + stateMachine.failed(cause); } - public void cancel() { - state.set(FragmentInstanceState.CANCELED); - this.endTime = System.currentTimeMillis(); ++ public void finished() { ++ stateMachine.finished(); + } + - public void abort() { - state.set(FragmentInstanceState.ABORTED); - this.endTime = System.currentTimeMillis(); - } - - public void finish() { - if (state.get().isDone()) { - return; - } - state.set(FragmentInstanceState.FINISHED); - this.endTime = System.currentTimeMillis(); - } - - public void flushing() { - if (state.get().isDone()) { - return; - } - state.set(FragmentInstanceState.FLUSHING); ++ public void transitionToFlushing() { ++ stateMachine.transitionToFlushing(); + } + public long getEndTime() { - return endTime; - } - - public void setEndTime(long endTime) { - this.endTime = endTime; + return executionEndTime.get(); } } diff --cc server/src/main/java/org/apache/iotdb/db/mpp/execution/FragmentInstanceExecution.java index b00ddcadb3,21e0cc3d50..149bce1964 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/FragmentInstanceExecution.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/FragmentInstanceExecution.java @@@ -18,17 -18,14 +18,15 @@@ */ package org.apache.iotdb.db.mpp.execution; - import io.airlift.stats.CounterStat; +import org.apache.iotdb.db.mpp.buffer.ISinkHandle; import org.apache.iotdb.db.mpp.common.FragmentInstanceId; import org.apache.iotdb.db.mpp.schedule.IFragmentInstanceScheduler; import com.google.common.collect.ImmutableList; - -import java.util.concurrent.atomic.AtomicReference; ++import io.airlift.stats.CounterStat; import static java.util.Objects.requireNonNull; - import static org.apache.iotdb.db.mpp.execution.FragmentInstanceState.ABORTED; - import static org.apache.iotdb.db.mpp.execution.FragmentInstanceState.CANCELLED; +import static org.apache.iotdb.db.mpp.execution.FragmentInstanceState.FAILED; public class FragmentInstanceExecution { @@@ -45,18 -41,7 +43,20 @@@ private long lastHeartbeat; - public static FragmentInstanceExecution createFragmentInstanceExecution(IFragmentInstanceScheduler scheduler, - FragmentInstanceId instanceId, - FragmentInstanceContext context, - Driver driver, - FragmentInstanceStateMachine stateMachine, - CounterStat failedInstances) { - FragmentInstanceExecution execution = new FragmentInstanceExecution(scheduler, instanceId, context, driver, stateMachine); - public FragmentInstanceExecution( ++ public static FragmentInstanceExecution createFragmentInstanceExecution( ++ IFragmentInstanceScheduler scheduler, ++ FragmentInstanceId instanceId, ++ FragmentInstanceContext context, ++ Driver driver, ++ FragmentInstanceStateMachine stateMachine, ++ CounterStat failedInstances) { ++ FragmentInstanceExecution execution = ++ new FragmentInstanceExecution(scheduler, instanceId, context, driver, stateMachine); + execution.initialize(failedInstances, scheduler, instanceId, driver); + return execution; + } + + private FragmentInstanceExecution( IFragmentInstanceScheduler scheduler, FragmentInstanceId instanceId, FragmentInstanceContext context, @@@ -97,49 -82,7 +97,30 @@@ } public void abort() { - scheduler.abortFragmentInstance(instanceId); - context.abort(); + stateMachine.abort(); + } + + // this is a separate method to ensure that the `this` reference is not leaked during construction - private void initialize(CounterStat failedInstances, IFragmentInstanceScheduler scheduler, FragmentInstanceId instanceId, Driver driver) { ++ private void initialize( ++ CounterStat failedInstances, ++ IFragmentInstanceScheduler scheduler, ++ FragmentInstanceId instanceId, ++ Driver driver) { + requireNonNull(failedInstances, "failedInstances is null"); - stateMachine.addStateChangeListener(newState -> { - - if (!newState.isDone()) { - return; - } - - // Update failed tasks counter - if (newState == FAILED) { - failedInstances.update(1); - } - - driver.close(); - sinkHandle.abort(); - scheduler.abortFragmentInstance(instanceId); - }); - } - - private synchronized void instanceCompletion() { - if (stateMachine.getState().isDone()) { - return; - } - - if (!sinkHandle.isFinished()) { - stateMachine.transitionToFlushing(); - return; - } - - if (sinkHandle.isFinished()) { - // Cool! All done! - stateMachine.finished(); - return; - } - - if (sinkHandle.isFailed()) { - Throwable failureCause = sinkHandle.getFailureCause() - .orElseGet(() -> new RuntimeException("Fragment Instance " + instanceId + " 's SinkHandle is failed but the failure cause is missing")); - stateMachine.failed(failureCause); - } ++ stateMachine.addStateChangeListener( ++ newState -> { ++ if (!newState.isDone()) { ++ return; ++ } ++ ++ // Update failed tasks counter ++ if (newState == FAILED) { ++ failedInstances.update(1); ++ } ++ ++ driver.close(); ++ sinkHandle.abort(); ++ scheduler.abortFragmentInstance(instanceId); ++ }); } } diff --cc server/src/main/java/org/apache/iotdb/db/mpp/execution/FragmentInstanceManager.java index 5f49fa47ef,537f330f04..2e23cf03b4 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/FragmentInstanceManager.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/FragmentInstanceManager.java @@@ -28,6 -27,6 +27,7 @@@ import org.apache.iotdb.db.mpp.schedule import org.apache.iotdb.db.mpp.sql.planner.LocalExecutionPlanner; import org.apache.iotdb.db.mpp.sql.planner.plan.FragmentInstance; ++import io.airlift.stats.CounterStat; import io.airlift.units.Duration; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@@ -51,14 -49,9 +51,13 @@@ public class FragmentInstanceManager private final IFragmentInstanceScheduler scheduler = FragmentInstanceScheduler.getInstance(); private final ScheduledExecutorService instanceManagementExecutor; + private final ExecutorService instanceNotificationExecutor; - private final Duration infoCacheTime; + // record failed instances count + private final CounterStat failedInstances = new CounterStat(); + public static FragmentInstanceManager getInstance() { return FragmentInstanceManager.InstanceHolder.INSTANCE; } @@@ -68,7 -61,6 +67,8 @@@ this.instanceExecution = new ConcurrentHashMap<>(); this.instanceManagementExecutor = IoTDBThreadPoolFactory.newScheduledThreadPool(1, "instance-management"); - this.instanceNotificationExecutor = IoTDBThreadPoolFactory.newFixedThreadPool(4, "instance-notification"); ++ this.instanceNotificationExecutor = ++ IoTDBThreadPoolFactory.newFixedThreadPool(4, "instance-notification"); this.infoCacheTime = new Duration(15, TimeUnit.MINUTES); @@@ -93,13 -85,13 +93,14 @@@ instanceExecution.computeIfAbsent( instanceId, id -> { - - FragmentInstanceStateMachine stateMachine = new FragmentInstanceStateMachine(instanceId, instanceNotificationExecutor); - AtomicReference<FragmentInstanceState> state = new AtomicReference<>(); - state.set(FragmentInstanceState.PLANNED); ++ FragmentInstanceStateMachine stateMachine = ++ new FragmentInstanceStateMachine(instanceId, instanceNotificationExecutor); FragmentInstanceContext context = instanceContext.computeIfAbsent( instanceId, - fragmentInstanceId -> new FragmentInstanceContext(fragmentInstanceId, stateMachine)); - fragmentInstanceId -> new FragmentInstanceContext(fragmentInstanceId, state)); ++ fragmentInstanceId -> ++ new FragmentInstanceContext(fragmentInstanceId, stateMachine)); try { DataDriver driver = @@@ -108,9 -100,9 +109,10 @@@ context, instance.getTimeFilter(), dataRegion); - return createFragmentInstanceExecution(scheduler, instanceId, context, driver, stateMachine, failedInstances); - return new FragmentInstanceExecution(scheduler, instanceId, context, driver, state); ++ return createFragmentInstanceExecution( ++ scheduler, instanceId, context, driver, stateMachine, failedInstances); } catch (Throwable t) { - context.failed(t); + stateMachine.failed(t); return null; } }); @@@ -126,29 -118,26 +128,29 @@@ instanceExecution.computeIfAbsent( instanceId, id -> { - FragmentInstanceStateMachine stateMachine = new FragmentInstanceStateMachine(instanceId, instanceNotificationExecutor); - - AtomicReference<FragmentInstanceState> state = new AtomicReference<>(); - state.set(FragmentInstanceState.PLANNED); ++ FragmentInstanceStateMachine stateMachine = ++ new FragmentInstanceStateMachine(instanceId, instanceNotificationExecutor); FragmentInstanceContext context = instanceContext.computeIfAbsent( instanceId, - fragmentInstanceId -> new FragmentInstanceContext(fragmentInstanceId, stateMachine)); - fragmentInstanceId -> new FragmentInstanceContext(fragmentInstanceId, state)); ++ fragmentInstanceId -> ++ new FragmentInstanceContext(fragmentInstanceId, stateMachine)); try { SchemaDriver driver = planner.plan(instance.getFragment().getRoot(), context, schemaRegion); - return createFragmentInstanceExecution(scheduler, instanceId, context, driver, stateMachine, failedInstances); - return new FragmentInstanceExecution(scheduler, instanceId, context, driver, state); ++ return createFragmentInstanceExecution( ++ scheduler, instanceId, context, driver, stateMachine, failedInstances); } catch (Throwable t) { - context.failed(t); + stateMachine.failed(t); return null; } }); return execution != null ? execution.getInstanceInfo() : createFailedInstanceInfo(instanceId); } - /** - * Aborts a FragmentInstance. - */ ++ /** Aborts a FragmentInstance. */ public FragmentInstanceInfo abortFragmentInstance(FragmentInstanceId fragmentInstanceId) { FragmentInstanceExecution execution = instanceExecution.remove(fragmentInstanceId); if (execution != null) { @@@ -159,21 -148,6 +161,19 @@@ return null; } - /** - * Cancels a FragmentInstance. - */ ++ /** Cancels a FragmentInstance. */ + public FragmentInstanceInfo cancelTask(FragmentInstanceId instanceId) { + requireNonNull(instanceId, "taskId is null"); + + FragmentInstanceExecution execution = instanceExecution.remove(instanceId); + if (execution != null) { + instanceContext.remove(instanceId); + execution.cancel(); + return execution.getInstanceInfo(); + } + return null; + } + /** * Gets the info for the specified fragment instance. * diff --cc server/src/main/java/org/apache/iotdb/db/mpp/execution/FragmentInstanceStateMachine.java index 7f66d1a321,0000000000..febc9e9e09 mode 100644,000000..100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/FragmentInstanceStateMachine.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/FragmentInstanceStateMachine.java @@@ -1,171 -1,0 +1,182 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iotdb.db.mpp.execution; + ++import org.apache.iotdb.db.mpp.common.FragmentInstanceId; ++import org.apache.iotdb.db.mpp.execution.StateMachine.StateChangeListener; ++ +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.common.util.concurrent.ListenableFuture; - import org.apache.iotdb.db.mpp.common.FragmentInstanceId; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; - import org.apache.iotdb.db.mpp.execution.StateMachine.StateChangeListener; + +import javax.annotation.concurrent.GuardedBy; +import javax.annotation.concurrent.ThreadSafe; ++ +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.Executor; +import java.util.concurrent.LinkedBlockingQueue; + +import static com.google.common.base.MoreObjects.toStringHelper; +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.util.concurrent.Futures.immediateFuture; +import static java.util.Objects.requireNonNull; +import static org.apache.iotdb.db.mpp.execution.FragmentInstanceState.ABORTED; +import static org.apache.iotdb.db.mpp.execution.FragmentInstanceState.CANCELLED; +import static org.apache.iotdb.db.mpp.execution.FragmentInstanceState.FAILED; +import static org.apache.iotdb.db.mpp.execution.FragmentInstanceState.FINISHED; +import static org.apache.iotdb.db.mpp.execution.FragmentInstanceState.FLUSHING; +import static org.apache.iotdb.db.mpp.execution.FragmentInstanceState.RUNNING; +import static org.apache.iotdb.db.mpp.execution.FragmentInstanceState.TERMINAL_INSTANCE_STATES; + - +@ThreadSafe +public class FragmentInstanceStateMachine { + private static final Logger LOGGER = LoggerFactory.getLogger(FragmentInstanceStateMachine.class); + + private final long createdTime = System.currentTimeMillis(); + + private final FragmentInstanceId instanceId; + private final Executor executor; + private final StateMachine<FragmentInstanceState> instanceState; + private final LinkedBlockingQueue<Throwable> failureCauses = new LinkedBlockingQueue<>(); + + @GuardedBy("this") + private final Map<FragmentInstanceId, Throwable> sourceInstanceFailures = new HashMap<>(); ++ + @GuardedBy("this") - private final List<FragmentInstanceFailureListener> sourceInstanceFailureListeners = new ArrayList<>(); ++ private final List<FragmentInstanceFailureListener> sourceInstanceFailureListeners = ++ new ArrayList<>(); + + public FragmentInstanceStateMachine(FragmentInstanceId fragmentInstanceId, Executor executor) { + this.instanceId = requireNonNull(fragmentInstanceId, "fragmentInstanceId is null"); + this.executor = requireNonNull(executor, "executor is null"); - instanceState = new StateMachine<>("FragmentInstance " + fragmentInstanceId, executor, RUNNING, TERMINAL_INSTANCE_STATES); - instanceState.addStateChangeListener(newState -> LOGGER.debug("Fragment Instance {} is {}", fragmentInstanceId, newState)); ++ instanceState = ++ new StateMachine<>( ++ "FragmentInstance " + fragmentInstanceId, executor, RUNNING, TERMINAL_INSTANCE_STATES); ++ instanceState.addStateChangeListener( ++ newState -> LOGGER.debug("Fragment Instance {} is {}", fragmentInstanceId, newState)); + } + + public long getCreatedTime() { + return createdTime; + } + + public FragmentInstanceId getFragmentInstanceId() { + return instanceId; + } + + public FragmentInstanceState getState() { + return instanceState.get(); + } + - public ListenableFuture<FragmentInstanceState> getStateChange(FragmentInstanceState currentState) { ++ public ListenableFuture<FragmentInstanceState> getStateChange( ++ FragmentInstanceState currentState) { + requireNonNull(currentState, "currentState is null"); + checkArgument(!currentState.isDone(), "Current state is already done"); + + ListenableFuture<FragmentInstanceState> future = instanceState.getStateChange(currentState); + FragmentInstanceState state = instanceState.get(); + if (state.isDone()) { + return immediateFuture(state); + } + return future; + } + + public LinkedBlockingQueue<Throwable> getFailureCauses() { + return failureCauses; + } + + public void transitionToFlushing() { + instanceState.setIf(FLUSHING, currentState -> currentState == RUNNING); + } + + public void finished() { + transitionToDoneState(FINISHED); + } + + public void cancel() { + transitionToDoneState(CANCELLED); + } + + public void abort() { + transitionToDoneState(ABORTED); + } + + public void failed(Throwable cause) { + failureCauses.add(cause); + transitionToDoneState(FAILED); + } + + private void transitionToDoneState(FragmentInstanceState doneState) { + requireNonNull(doneState, "doneState is null"); + checkArgument(doneState.isDone(), "doneState %s is not a done state", doneState); + + instanceState.setIf(doneState, currentState -> !currentState.isDone()); + } + + /** - * Listener is always notified asynchronously using a dedicated notification thread pool so, care should - * be taken to avoid leaking {@code this} when adding a listener in a constructor. Additionally, it is - * possible notifications are observed out of order due to the asynchronous execution. ++ * Listener is always notified asynchronously using a dedicated notification thread pool so, care ++ * should be taken to avoid leaking {@code this} when adding a listener in a constructor. ++ * Additionally, it is possible notifications are observed out of order due to the asynchronous ++ * execution. + */ - public void addStateChangeListener(StateChangeListener<FragmentInstanceState> stateChangeListener) { ++ public void addStateChangeListener( ++ StateChangeListener<FragmentInstanceState> stateChangeListener) { + instanceState.addStateChangeListener(stateChangeListener); + } + + public void addSourceTaskFailureListener(FragmentInstanceFailureListener listener) { + Map<FragmentInstanceId, Throwable> failures; + synchronized (this) { + sourceInstanceFailureListeners.add(listener); + failures = ImmutableMap.copyOf(sourceInstanceFailures); + } - executor.execute(() -> { - failures.forEach(listener::onTaskFailed); - }); ++ executor.execute( ++ () -> { ++ failures.forEach(listener::onTaskFailed); ++ }); + } + + public void sourceTaskFailed(FragmentInstanceId instanceId, Throwable failure) { + List<FragmentInstanceFailureListener> listeners; + synchronized (this) { + sourceInstanceFailures.putIfAbsent(instanceId, failure); + listeners = ImmutableList.copyOf(sourceInstanceFailureListeners); + } - executor.execute(() -> { - for (FragmentInstanceFailureListener listener : listeners) { - listener.onTaskFailed(instanceId, failure); - } - }); ++ executor.execute( ++ () -> { ++ for (FragmentInstanceFailureListener listener : listeners) { ++ listener.onTaskFailed(instanceId, failure); ++ } ++ }); + } + + @Override + public String toString() { + return toStringHelper(this) + .add("FragmentInstanceId", instanceId) + .add("FragmentInstanceState", instanceState) + .add("failureCauses", failureCauses) + .toString(); + } +} diff --cc server/src/main/java/org/apache/iotdb/db/mpp/execution/SchemaDriver.java index 6c45eb529a,844fe417ba..258065cd80 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/SchemaDriver.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/SchemaDriver.java @@@ -18,23 -18,152 +18,24 @@@ */ package org.apache.iotdb.db.mpp.execution; - import com.google.common.util.concurrent.SettableFuture; import org.apache.iotdb.db.mpp.buffer.ISinkHandle; -import org.apache.iotdb.db.mpp.common.FragmentInstanceId; import org.apache.iotdb.db.mpp.operator.Operator; -import org.apache.iotdb.tsfile.read.common.block.TsBlock; -import com.google.common.util.concurrent.ListenableFuture; + import com.google.common.util.concurrent.SettableFuture; -import io.airlift.units.Duration; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; + import javax.annotation.concurrent.NotThreadSafe; -import java.io.IOException; -import java.util.Collections; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicReference; - -import static com.google.common.util.concurrent.MoreExecutors.directExecutor; -import static org.apache.iotdb.db.mpp.operator.Operator.NOT_BLOCKED; - + /** One SchemaDriver is used to execute one FragmentInstance which is for metadata query. */ @NotThreadSafe -public class SchemaDriver implements Driver { - - private static final Logger logger = LoggerFactory.getLogger(SchemaDriver.class); - - private final Operator root; - private final ISinkHandle sinkHandle; - private final SchemaDriverContext driverContext; - - private final AtomicReference<SettableFuture<Void>> driverBlockedFuture = new AtomicReference<>(); - - private boolean closed = false; +public class SchemaDriver extends Driver { public SchemaDriver(Operator root, ISinkHandle sinkHandle, SchemaDriverContext driverContext) { - this.root = root; - this.sinkHandle = sinkHandle; - this.driverContext = driverContext; - // initially the driverBlockedFuture is not blocked (it is completed) - SettableFuture<Void> future = SettableFuture.create(); - future.set(null); - driverBlockedFuture.set(future); - } - - @Override - public boolean isFinished() { - try { - boolean isFinished = driverBlockedFuture.get().isDone() && root != null && root.isFinished(); - if (isFinished) { - close(); - } - return isFinished; - } catch (Throwable t) { - logger.error( - "Failed to query whether the schema driver {} is finished", driverContext.getId(), t); - driverContext.failed(t); - return true; - } - } - - @Override - public ListenableFuture<Void> processFor(Duration duration) { - // if the driver is blocked we don't need to continue - SettableFuture<Void> blockedFuture = driverBlockedFuture.get(); - if (!blockedFuture.isDone()) { - return blockedFuture; - } - - long maxRuntime = duration.roundTo(TimeUnit.NANOSECONDS); - - long start = System.nanoTime(); - try { - do { - ListenableFuture<Void> future = processInternal(); - if (!future.isDone()) { - return updateDriverBlockedFuture(future); - } - } while (System.nanoTime() - start < maxRuntime && !root.isFinished()); - } catch (Throwable t) { - logger.error("Failed to execute fragment instance {}", driverContext.getId(), t); - driverContext.failed(t); - close(); - blockedFuture.setException(t); - return blockedFuture; - } - return NOT_BLOCKED; - } - - private ListenableFuture<Void> processInternal() throws IOException { - ListenableFuture<Void> blocked = root.isBlocked(); - if (!blocked.isDone()) { - return blocked; - } - blocked = sinkHandle.isFull(); - if (!blocked.isDone()) { - return blocked; - } - if (root.hasNext()) { - TsBlock tsBlock = root.next(); - if (tsBlock != null && !tsBlock.isEmpty()) { - sinkHandle.send(Collections.singletonList(tsBlock)); - } - } - return NOT_BLOCKED; - } - - private ListenableFuture<Void> updateDriverBlockedFuture( - ListenableFuture<Void> sourceBlockedFuture) { - // driverBlockedFuture will be completed as soon as the sourceBlockedFuture is completed - // or any of the operators gets a memory revocation request - SettableFuture<Void> newDriverBlockedFuture = SettableFuture.create(); - driverBlockedFuture.set(newDriverBlockedFuture); - sourceBlockedFuture.addListener(() -> newDriverBlockedFuture.set(null), directExecutor()); - - // TODO Although we don't have memory management for operator now, we should consider it for - // future - // it's possible that memory revoking is requested for some operator - // before we update driverBlockedFuture above and we don't want to miss that - // notification, so we check to see whether that's the case before returning. - - return newDriverBlockedFuture; - } - - @Override - public FragmentInstanceId getInfo() { - return driverContext.getId(); + super(root, sinkHandle, driverContext); } - @Override - public void close() { - if (closed) { - return; - } - closed = true; - try { - if (root != null) { - root.close(); - } - if (sinkHandle != null) { - sinkHandle.close(); - } - } catch (Throwable t) { - logger.error("Failed to closed driver {}", driverContext.getId(), t); - driverContext.failed(t); - } + protected boolean init(SettableFuture<Void> blockedFuture) { + return true; } @Override diff --cc server/src/main/java/org/apache/iotdb/db/mpp/operator/Operator.java index 7305feee83,c8cccc0520..398b2f03c4 --- a/server/src/main/java/org/apache/iotdb/db/mpp/operator/Operator.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/operator/Operator.java @@@ -22,8 -22,8 +22,6 @@@ import org.apache.iotdb.tsfile.read.com import com.google.common.util.concurrent.ListenableFuture; --import java.io.IOException; -- import static com.google.common.util.concurrent.Futures.immediateVoidFuture; public interface Operator extends AutoCloseable { diff --cc server/src/main/java/org/apache/iotdb/db/mpp/operator/schema/SchemaFetchOperator.java index 97da0a4935,003da76f2d..34fb44373e --- a/server/src/main/java/org/apache/iotdb/db/mpp/operator/schema/SchemaFetchOperator.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/operator/schema/SchemaFetchOperator.java @@@ -35,7 -35,7 +35,6 @@@ import org.apache.iotdb.tsfile.utils.Bi import org.slf4j.Logger; import org.slf4j.LoggerFactory; --import java.io.IOException; import java.nio.BufferOverflowException; import java.nio.ByteBuffer; import java.util.List; diff --cc server/src/main/java/org/apache/iotdb/db/mpp/schedule/task/FragmentInstanceTask.java index 888f4ce5e7,abebdaf30d..b66d4f2ff1 --- a/server/src/main/java/org/apache/iotdb/db/mpp/schedule/task/FragmentInstanceTask.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/schedule/task/FragmentInstanceTask.java @@@ -18,8 -18,6 +18,7 @@@ */ package org.apache.iotdb.db.mpp.schedule.task; - import com.google.common.util.concurrent.SettableFuture; +import org.apache.iotdb.db.mpp.buffer.ISinkHandle; import org.apache.iotdb.db.mpp.common.FragmentInstanceId; import org.apache.iotdb.db.mpp.common.PlanFragmentId; import org.apache.iotdb.db.mpp.common.QueryId; @@@ -32,6 -28,6 +29,7 @@@ import org.apache.iotdb.db.mpp.schedule import org.apache.iotdb.db.mpp.schedule.queue.IDIndexedAccessible; import com.google.common.util.concurrent.ListenableFuture; ++import com.google.common.util.concurrent.SettableFuture; import io.airlift.units.Duration; import java.util.Comparator; @@@ -194,16 -196,6 +202,14 @@@ public class FragmentInstanceTask imple return false; } + @Override + protected boolean init(SettableFuture<Void> blockedFuture) { + return true; + } + + @Override - protected void releaseResource() { - - } ++ protected void releaseResource() {} + @Override public ListenableFuture<Void> processFor(Duration duration) { return null; diff --cc server/src/test/java/org/apache/iotdb/db/mpp/execution/DataDriverTest.java index db060ac405,2703d0aa37..d41c7f5391 --- a/server/src/test/java/org/apache/iotdb/db/mpp/execution/DataDriverTest.java +++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/DataDriverTest.java @@@ -56,8 -55,7 +56,7 @@@ import java.util.Arrays import java.util.HashSet; import java.util.List; import java.util.Set; -import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.ExecutorService; - import java.util.concurrent.atomic.AtomicReference; import static org.apache.iotdb.db.mpp.schedule.FragmentInstanceTaskExecutor.EXECUTION_TIME_SLICE; import static org.junit.Assert.assertEquals; @@@ -87,7 -85,6 +86,8 @@@ public class DataDriverTest @Test public void batchTest() { - ExecutorService instanceNotificationExecutor = IoTDBThreadPoolFactory.newFixedThreadPool(1, "test-instance-notification"); ++ ExecutorService instanceNotificationExecutor = ++ IoTDBThreadPoolFactory.newFixedThreadPool(1, "test-instance-notification"); try { MeasurementPath measurementPath1 = new MeasurementPath(DATA_DRIVER_TEST_SG + ".device0.sensor0", TSDataType.INT32); @@@ -95,10 -92,11 +95,12 @@@ allSensors.add("sensor0"); allSensors.add("sensor1"); QueryId queryId = new QueryId("stub_query"); - FragmentInstanceId instanceId = new FragmentInstanceId(new PlanFragmentId(queryId, 0), "stub-instance"); - FragmentInstanceStateMachine stateMachine = new FragmentInstanceStateMachine(instanceId, instanceNotificationExecutor); - AtomicReference<FragmentInstanceState> state = - new AtomicReference<>(FragmentInstanceState.RUNNING); ++ FragmentInstanceId instanceId = ++ new FragmentInstanceId(new PlanFragmentId(queryId, 0), "stub-instance"); ++ FragmentInstanceStateMachine stateMachine = ++ new FragmentInstanceStateMachine(instanceId, instanceNotificationExecutor); FragmentInstanceContext fragmentInstanceContext = - new FragmentInstanceContext( - new FragmentInstanceId(new PlanFragmentId(queryId, 0), "stub-instance"), state); + new FragmentInstanceContext(instanceId, stateMachine); PlanNodeId planNodeId1 = new PlanNodeId("1"); fragmentInstanceContext.addOperatorContext( 1, planNodeId1, SeriesScanOperator.class.getSimpleName()); diff --cc server/src/test/java/org/apache/iotdb/db/mpp/operator/LimitOperatorTest.java index ab0849ddf2,6ad74bd110..17a45db8da --- a/server/src/test/java/org/apache/iotdb/db/mpp/operator/LimitOperatorTest.java +++ b/server/src/test/java/org/apache/iotdb/db/mpp/operator/LimitOperatorTest.java @@@ -28,8 -27,7 +28,7 @@@ import org.apache.iotdb.db.mpp.common.F import org.apache.iotdb.db.mpp.common.PlanFragmentId; import org.apache.iotdb.db.mpp.common.QueryId; import org.apache.iotdb.db.mpp.execution.FragmentInstanceContext; --import org.apache.iotdb.db.mpp.execution.FragmentInstanceState; +import org.apache.iotdb.db.mpp.execution.FragmentInstanceStateMachine; import org.apache.iotdb.db.mpp.operator.process.LimitOperator; import org.apache.iotdb.db.mpp.operator.process.TimeJoinOperator; import org.apache.iotdb.db.mpp.operator.source.SeriesScanOperator; @@@ -52,8 -50,7 +51,7 @@@ import java.util.Arrays import java.util.HashSet; import java.util.List; import java.util.Set; -import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.ExecutorService; - import java.util.concurrent.atomic.AtomicReference; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; @@@ -81,7 -78,6 +79,8 @@@ public class LimitOperatorTest @Test public void batchTest() { - ExecutorService instanceNotificationExecutor = IoTDBThreadPoolFactory.newFixedThreadPool(1, "test-instance-notification"); ++ ExecutorService instanceNotificationExecutor = ++ IoTDBThreadPoolFactory.newFixedThreadPool(1, "test-instance-notification"); try { MeasurementPath measurementPath1 = new MeasurementPath(TIME_JOIN_OPERATOR_TEST_SG + ".device0.sensor0", TSDataType.INT32); @@@ -89,10 -85,11 +88,12 @@@ allSensors.add("sensor0"); allSensors.add("sensor1"); QueryId queryId = new QueryId("stub_query"); - FragmentInstanceId instanceId = new FragmentInstanceId(new PlanFragmentId(queryId, 0), "stub-instance"); - FragmentInstanceStateMachine stateMachine = new FragmentInstanceStateMachine(instanceId, instanceNotificationExecutor); - AtomicReference<FragmentInstanceState> state = - new AtomicReference<>(FragmentInstanceState.RUNNING); ++ FragmentInstanceId instanceId = ++ new FragmentInstanceId(new PlanFragmentId(queryId, 0), "stub-instance"); ++ FragmentInstanceStateMachine stateMachine = ++ new FragmentInstanceStateMachine(instanceId, instanceNotificationExecutor); FragmentInstanceContext fragmentInstanceContext = - new FragmentInstanceContext( - new FragmentInstanceId(new PlanFragmentId(queryId, 0), "stub-instance"), state); + new FragmentInstanceContext(instanceId, stateMachine); PlanNodeId planNodeId1 = new PlanNodeId("1"); fragmentInstanceContext.addOperatorContext( 1, planNodeId1, SeriesScanOperator.class.getSimpleName()); diff --cc server/src/test/java/org/apache/iotdb/db/mpp/operator/SeriesScanOperatorTest.java index dd28a90263,78e6821bcd..52ebe68508 --- a/server/src/test/java/org/apache/iotdb/db/mpp/operator/SeriesScanOperatorTest.java +++ b/server/src/test/java/org/apache/iotdb/db/mpp/operator/SeriesScanOperatorTest.java @@@ -28,8 -27,7 +28,7 @@@ import org.apache.iotdb.db.mpp.common.F import org.apache.iotdb.db.mpp.common.PlanFragmentId; import org.apache.iotdb.db.mpp.common.QueryId; import org.apache.iotdb.db.mpp.execution.FragmentInstanceContext; --import org.apache.iotdb.db.mpp.execution.FragmentInstanceState; +import org.apache.iotdb.db.mpp.execution.FragmentInstanceStateMachine; import org.apache.iotdb.db.mpp.operator.source.SeriesScanOperator; import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId; import org.apache.iotdb.db.query.reader.series.SeriesReaderTestUtil; @@@ -45,11 -44,9 +45,9 @@@ import org.junit.Test import java.io.IOException; import java.util.ArrayList; - import java.util.HashSet; import java.util.List; import java.util.Set; -import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.ExecutorService; - import java.util.concurrent.atomic.AtomicReference; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; @@@ -76,17 -73,16 +74,19 @@@ public class SeriesScanOperatorTest @Test public void batchTest() { - ExecutorService instanceNotificationExecutor = IoTDBThreadPoolFactory.newFixedThreadPool(1, "test-instance-notification"); ++ ExecutorService instanceNotificationExecutor = ++ IoTDBThreadPoolFactory.newFixedThreadPool(1, "test-instance-notification"); try { MeasurementPath measurementPath = new MeasurementPath(SERIES_SCAN_OPERATOR_TEST_SG + ".device0.sensor0", TSDataType.INT32); - Set<String> allSensors = new HashSet<>(); - allSensors.add("sensor0"); + Set<String> allSensors = Sets.newHashSet("sensor0"); QueryId queryId = new QueryId("stub_query"); - FragmentInstanceId instanceId = new FragmentInstanceId(new PlanFragmentId(queryId, 0), "stub-instance"); - FragmentInstanceStateMachine stateMachine = new FragmentInstanceStateMachine(instanceId, instanceNotificationExecutor); - AtomicReference<FragmentInstanceState> state = - new AtomicReference<>(FragmentInstanceState.RUNNING); ++ FragmentInstanceId instanceId = ++ new FragmentInstanceId(new PlanFragmentId(queryId, 0), "stub-instance"); ++ FragmentInstanceStateMachine stateMachine = ++ new FragmentInstanceStateMachine(instanceId, instanceNotificationExecutor); FragmentInstanceContext fragmentInstanceContext = - new FragmentInstanceContext( - new FragmentInstanceId(new PlanFragmentId(queryId, 0), "stub-instance"), state); + new FragmentInstanceContext(instanceId, stateMachine); PlanNodeId planNodeId = new PlanNodeId("1"); fragmentInstanceContext.addOperatorContext( 1, planNodeId, SeriesScanOperator.class.getSimpleName()); diff --cc server/src/test/java/org/apache/iotdb/db/mpp/operator/TimeJoinOperatorTest.java index 70fe4def02,5534418b84..5fbfda4199 --- a/server/src/test/java/org/apache/iotdb/db/mpp/operator/TimeJoinOperatorTest.java +++ b/server/src/test/java/org/apache/iotdb/db/mpp/operator/TimeJoinOperatorTest.java @@@ -28,8 -27,7 +28,7 @@@ import org.apache.iotdb.db.mpp.common.F import org.apache.iotdb.db.mpp.common.PlanFragmentId; import org.apache.iotdb.db.mpp.common.QueryId; import org.apache.iotdb.db.mpp.execution.FragmentInstanceContext; --import org.apache.iotdb.db.mpp.execution.FragmentInstanceState; +import org.apache.iotdb.db.mpp.execution.FragmentInstanceStateMachine; import org.apache.iotdb.db.mpp.operator.process.TimeJoinOperator; import org.apache.iotdb.db.mpp.operator.source.SeriesScanOperator; import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId; @@@ -51,8 -49,7 +50,7 @@@ import java.util.Arrays import java.util.HashSet; import java.util.List; import java.util.Set; -import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.ExecutorService; - import java.util.concurrent.atomic.AtomicReference; import static org.junit.Assert.*; @@@ -77,7 -74,6 +75,8 @@@ public class TimeJoinOperatorTest @Test public void batchTest() { - ExecutorService instanceNotificationExecutor = IoTDBThreadPoolFactory.newFixedThreadPool(1, "test-instance-notification"); ++ ExecutorService instanceNotificationExecutor = ++ IoTDBThreadPoolFactory.newFixedThreadPool(1, "test-instance-notification"); try { MeasurementPath measurementPath1 = new MeasurementPath(TIME_JOIN_OPERATOR_TEST_SG + ".device0.sensor0", TSDataType.INT32); @@@ -85,10 -81,11 +84,12 @@@ allSensors.add("sensor0"); allSensors.add("sensor1"); QueryId queryId = new QueryId("stub_query"); - FragmentInstanceId instanceId = new FragmentInstanceId(new PlanFragmentId(queryId, 0), "stub-instance"); - FragmentInstanceStateMachine stateMachine = new FragmentInstanceStateMachine(instanceId, instanceNotificationExecutor); - AtomicReference<FragmentInstanceState> state = - new AtomicReference<>(FragmentInstanceState.RUNNING); ++ FragmentInstanceId instanceId = ++ new FragmentInstanceId(new PlanFragmentId(queryId, 0), "stub-instance"); ++ FragmentInstanceStateMachine stateMachine = ++ new FragmentInstanceStateMachine(instanceId, instanceNotificationExecutor); FragmentInstanceContext fragmentInstanceContext = - new FragmentInstanceContext( - new FragmentInstanceId(new PlanFragmentId(queryId, 0), "stub-instance"), state); + new FragmentInstanceContext(instanceId, stateMachine); PlanNodeId planNodeId1 = new PlanNodeId("1"); fragmentInstanceContext.addOperatorContext( 1, planNodeId1, SeriesScanOperator.class.getSimpleName()); diff --cc server/src/test/java/org/apache/iotdb/db/mpp/operator/schema/SchemaScanOperatorTest.java index 320be3be21,609dc9befa..958b0a5b72 --- a/server/src/test/java/org/apache/iotdb/db/mpp/operator/schema/SchemaScanOperatorTest.java +++ b/server/src/test/java/org/apache/iotdb/db/mpp/operator/schema/SchemaScanOperatorTest.java @@@ -29,8 -28,7 +29,7 @@@ import org.apache.iotdb.db.mpp.common.F import org.apache.iotdb.db.mpp.common.PlanFragmentId; import org.apache.iotdb.db.mpp.common.QueryId; import org.apache.iotdb.db.mpp.execution.FragmentInstanceContext; --import org.apache.iotdb.db.mpp.execution.FragmentInstanceState; +import org.apache.iotdb.db.mpp.execution.FragmentInstanceStateMachine; import org.apache.iotdb.db.mpp.execution.SchemaDriverContext; import org.apache.iotdb.db.mpp.operator.OperatorContext; import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId; @@@ -54,8 -52,7 +53,7 @@@ import java.io.IOException import java.util.ArrayList; import java.util.Arrays; import java.util.List; -import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.ExecutorService; - import java.util.concurrent.atomic.AtomicReference; import static org.apache.iotdb.commons.conf.IoTDBConstant.COLUMN_ATTRIBUTES; import static org.apache.iotdb.commons.conf.IoTDBConstant.COLUMN_DEVICES; @@@ -93,13 -90,13 +91,16 @@@ public class SchemaScanOperatorTest @Test public void testDeviceMetaScanOperator() { - ExecutorService instanceNotificationExecutor = IoTDBThreadPoolFactory.newFixedThreadPool(1, "test-instance-notification"); ++ ExecutorService instanceNotificationExecutor = ++ IoTDBThreadPoolFactory.newFixedThreadPool(1, "test-instance-notification"); try { QueryId queryId = new QueryId("stub_query"); - FragmentInstanceId instanceId = new FragmentInstanceId(new PlanFragmentId(queryId, 0), "stub-instance"); - FragmentInstanceStateMachine stateMachine = new FragmentInstanceStateMachine(instanceId, instanceNotificationExecutor); - AtomicReference<FragmentInstanceState> state = - new AtomicReference<>(FragmentInstanceState.RUNNING); ++ FragmentInstanceId instanceId = ++ new FragmentInstanceId(new PlanFragmentId(queryId, 0), "stub-instance"); ++ FragmentInstanceStateMachine stateMachine = ++ new FragmentInstanceStateMachine(instanceId, instanceNotificationExecutor); FragmentInstanceContext fragmentInstanceContext = - new FragmentInstanceContext( - new FragmentInstanceId(new PlanFragmentId(queryId, 0), "stub-instance"), state); + new FragmentInstanceContext(instanceId, stateMachine); PlanNodeId planNodeId = queryId.genPlanNodeId(); OperatorContext operatorContext = fragmentInstanceContext.addOperatorContext( @@@ -159,13 -154,13 +160,16 @@@ @Test public void testTimeSeriesMetaScanOperator() { - ExecutorService instanceNotificationExecutor = IoTDBThreadPoolFactory.newFixedThreadPool(1, "test-instance-notification"); ++ ExecutorService instanceNotificationExecutor = ++ IoTDBThreadPoolFactory.newFixedThreadPool(1, "test-instance-notification"); try { QueryId queryId = new QueryId("stub_query"); - FragmentInstanceId instanceId = new FragmentInstanceId(new PlanFragmentId(queryId, 0), "stub-instance"); - FragmentInstanceStateMachine stateMachine = new FragmentInstanceStateMachine(instanceId, instanceNotificationExecutor); - AtomicReference<FragmentInstanceState> state = - new AtomicReference<>(FragmentInstanceState.RUNNING); ++ FragmentInstanceId instanceId = ++ new FragmentInstanceId(new PlanFragmentId(queryId, 0), "stub-instance"); ++ FragmentInstanceStateMachine stateMachine = ++ new FragmentInstanceStateMachine(instanceId, instanceNotificationExecutor); FragmentInstanceContext fragmentInstanceContext = - new FragmentInstanceContext( - new FragmentInstanceId(new PlanFragmentId(queryId, 0), "stub-instance"), state); + new FragmentInstanceContext(instanceId, stateMachine); PlanNodeId planNodeId = queryId.genPlanNodeId(); OperatorContext operatorContext = fragmentInstanceContext.addOperatorContext(
