This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch stable-mpp
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 86690f209e67a5f7270c7e3018aa32042b4a113a
Merge: fb52130d2b 67dafed0e6
Author: JackieTien97 <[email protected]>
AuthorDate: Wed Apr 20 17:54:57 2022 +0800

    Add FragmentInstanceStateMachine

 .../iotdb/cluster/coordinator/Coordinator.java     |    6 +-
 .../log/manage/PartitionedSnapshotLogManager.java  |    2 +-
 .../iotdb/cluster/metadata/CSchemaProcessor.java   |   37 -
 .../apache/iotdb/cluster/metadata/MetaPuller.java  |    3 +-
 .../iotdb/cluster/query/LocalQueryExecutor.java    |    8 +-
 .../cluster/server/member/DataGroupMember.java     |   11 +-
 .../cluster/server/member/MetaGroupMember.java     |    6 +-
 .../cluster/server/service/BaseAsyncService.java   |    4 +-
 .../apache/iotdb/cluster/utils/StatusUtils.java    |    6 +-
 .../FilePartitionedSnapshotLogManagerTest.java     |    2 +
 .../cluster/server/member/DataGroupMemberTest.java |    2 +
 .../cluster/server/member/MetaGroupMemberTest.java |    5 +-
 .../consensus/response/DataNodesInfoDataSet.java   |    4 +-
 .../statemachine/PartitionRegionStateMachine.java  |   17 +
 .../confignode/consensus/RatisConsensusDemo.java   |    6 +-
 .../manager/ConfigManagerManualTest.java           |    6 +-
 .../server/ConfigNodeRPCServerProcessorTest.java   |   26 +-
 .../iotdb/consensus/common/SnapshotMeta.java       |   40 +-
 .../iotdb/consensus/ratis/RatisConsensus.java      |    4 +-
 .../consensus/standalone/StandAloneServerImpl.java |   18 +
 .../consensus/statemachine/EmptyStateMachine.java  |   18 +
 .../consensus/statemachine/IStateMachine.java      |   49 +
 .../iotdb/consensus/ratis/RatisConsensusTest.java  |   15 +
 .../standalone/StandAloneConsensusTest.java        |   15 +
 distribution/src/assembly/all.xml                  |    4 +
 distribution/src/assembly/server.xml               |    4 +
 docs/UserGuide/Maintenance-Tools/Metric-Tool.md    |   12 +-
 .../Maintenance-Tools/SchemaFileSketch-Tool.md     |   38 +
 .../Ecosystem Integration/Grafana Plugin.md        |  143 +-
 docs/zh/UserGuide/Maintenance-Tools/Metric-Tool.md |   12 +-
 .../Maintenance-Tools/SchemaFileSketch-Tool.md     |   35 +
 .../Apache IoTDB Dashboard v0.13.1.json            | 1527 ++++++++++++++++++++
 .../Apache IoTDB Dashboard v0.14.0.json            | 1527 ++++++++++++++++++++
 .../iotdb/influxdb/protocol/dto/SessionPoint.java  |    8 +-
 .../iotdb/influxdb/session/InfluxDBSession.java    |    6 +-
 .../influxdb/integration/IoTDBInfluxDBIT.java      |    4 +-
 .../iotdb/commons/partition/DataPartition.java     |   17 +-
 .../{PartitionInfo.java => Partition.java}         |   27 +-
 .../iotdb/commons/partition/RegionReplicaSet.java  |    6 +-
 .../iotdb/commons/partition/SchemaPartition.java   |   17 +-
 .../SchemaFileSketcher.bat}                        |    4 +-
 .../mLogParser.sh => schema/SchemaFileSketcher.sh} |    4 +-
 .../tools/{mlog => schema}/mLogParser.bat          |    2 +-
 .../resources/tools/{mlog => schema}/mLogParser.sh |    0
 .../org/apache/iotdb/db/conf/IoTDBConfigCheck.java |    8 +
 .../statemachine/DataRegionStateMachine.java       |   17 +
 .../statemachine/SchemaRegionStateMachine.java     |   22 +-
 .../engine/compaction/CompactionTaskManager.java   |    6 +-
 .../db/engine/storagegroup/TsFileProcessor.java    |  214 ++-
 .../apache/iotdb/db/metadata/LocalConfigNode.java  |    2 +-
 .../db/metadata/LocalSchemaPartitionTable.java     |    6 +
 .../iotdb/db/metadata/LocalSchemaProcessor.java    |   39 -
 .../iotdb/db/metadata/mtree/IMTreeBelowSG.java     |  307 ++++
 ...reeBelowSG.java => MTreeBelowSGCachedImpl.java} |  134 +-
 ...reeBelowSG.java => MTreeBelowSGMemoryImpl.java} |  909 +++++-------
 .../mtree/store/disk/MTreeFlushTaskManager.java    |    2 +-
 .../mtree/store/disk/MTreeReleaseTaskManager.java  |    2 +-
 .../mtree/store/disk/schemafile/ISegment.java      |    2 +
 .../mtree/store/disk/schemafile/RecordUtils.java   |   24 +-
 .../mtree/store/disk/schemafile/SchemaFile.java    |   37 +-
 .../mtree/store/disk/schemafile/SchemaPage.java    |    9 +-
 .../mtree/store/disk/schemafile/Segment.java       |   51 +
 .../db/metadata/schemaregion/ISchemaRegion.java    |  247 +++-
 .../db/metadata/schemaregion/SchemaEngine.java     |    5 +-
 ...hemaRegion.java => SchemaRegionMemoryImpl.java} |  600 +++-----
 ...Region.java => SchemaRegionSchemaFileImpl.java} |   93 +-
 .../schemaregion/rocksdb/RSchemaRegion.java        |   81 +-
 .../SchemaExecutionVisitor.java}                   |   59 +-
 .../iotdb/db/mpp/buffer/DataBlockManager.java      |   26 +-
 .../apache/iotdb/db/mpp/buffer/ISinkHandle.java    |   14 +-
 .../org/apache/iotdb/db/mpp/buffer/SinkHandle.java |   37 +-
 .../apache/iotdb/db/mpp/buffer/SourceHandle.java   |   27 +-
 .../apache/iotdb/db/mpp/buffer/StubSinkHandle.java |   13 +-
 .../apache/iotdb/db/mpp/execution/Coordinator.java |   16 +-
 .../apache/iotdb/db/mpp/execution/DataDriver.java  |   19 +-
 .../iotdb/db/mpp/execution/DataDriverContext.java  |    1 +
 .../org/apache/iotdb/db/mpp/execution/Driver.java  |  112 +-
 .../iotdb/db/mpp/execution/DriverContext.java      |    2 -
 .../db/mpp/execution/FragmentInstanceContext.java  |   12 +-
 .../mpp/execution/FragmentInstanceExecution.java   |   79 +-
 .../db/mpp/execution/FragmentInstanceManager.java  |   34 +-
 .../execution/FragmentInstanceStateMachine.java    |   49 +-
 .../iotdb/db/mpp/execution/IQueryExecution.java    |    2 +
 .../iotdb/db/mpp/execution/QueryExecution.java     |   43 +-
 .../iotdb/db/mpp/execution/QueryStateMachine.java  |   14 +-
 .../iotdb/db/mpp/execution/SchemaDriver.java       |    5 +-
 .../db/mpp/execution/SchemaDriverContext.java      |    1 +
 .../db/mpp/execution/config/ConfigExecution.java   |   59 +-
 .../config/ConfigTaskVisitor.java}                 |   24 +-
 .../iotdb/db/mpp/execution/config/IConfigTask.java |    2 +-
 .../mpp/execution/scheduler/ClusterScheduler.java  |   32 +-
 .../mpp/execution/scheduler/IQueryTerminator.java  |    4 +-
 .../scheduler/SimpleFragInstanceDispatcher.java    |   51 +-
 .../execution/scheduler/SimpleQueryTerminator.java |   50 +-
 .../org/apache/iotdb/db/mpp/operator/Operator.java |    2 -
 .../mpp/operator/schema/SchemaFetchOperator.java   |    1 -
 .../source/SeriesAggregateScanOperator.java        |  420 +++++-
 .../db/mpp/operator/source/SeriesScanOperator.java |    2 +-
 .../db/mpp/operator/source/SeriesScanUtil.java     |   10 +-
 .../FragmentInstanceAbortedException.java}         |   28 +-
 .../db/mpp/schedule/FragmentInstanceScheduler.java |   15 +-
 .../mpp/schedule/FragmentInstanceTaskExecutor.java |    1 +
 .../schedule/FragmentInstanceTimeoutSentinel.java  |    1 +
 .../db/mpp/schedule/queue/L1PriorityQueue.java     |   37 +-
 .../db/mpp/schedule/queue/L2PriorityQueue.java     |   66 +-
 .../db/mpp/schedule/task/FragmentInstanceTask.java |   18 +-
 .../mpp/schedule/task/FragmentInstanceTaskID.java  |   10 +-
 .../db/mpp/sql/analyze/ClusterSchemaFetcher.java   |    4 +-
 .../db/mpp/sql/planner/DistributionPlanner.java    |   20 +
 .../db/mpp/sql/planner/LocalExecutionPlanner.java  |   28 +-
 .../iotdb/db/mpp/sql/planner/QueryPlanBuilder.java |    5 +
 .../db/mpp/sql/planner/plan/FragmentInstance.java  |    1 +
 .../db/mpp/sql/planner/plan/node/PlanVisitor.java  |   13 +-
 .../node/metedata/write/AlterTimeSeriesNode.java   |   41 +-
 .../node/metedata/write/CreateTimeSeriesNode.java  |    1 +
 .../plan/node/source/SeriesAggregateScanNode.java  |   45 +-
 .../statement/ConfigStatement.java}                |   10 +-
 .../db/mpp/sql/statement/StatementVisitor.java     |    5 +
 .../statement/component/GroupByTimeComponent.java  |   73 +
 .../metadata/SetStorageGroupStatement.java         |   10 +-
 .../db/protocol/influxdb/handler/QueryHandler.java |    4 +-
 .../influxdb/util/JacksonUtils.java}               |   36 +-
 .../db/protocol/influxdb/util/StringUtils.java     |    3 +-
 .../iotdb/db/query/control/SessionManager.java     |    7 +-
 .../db/query/dataset/AlignByDeviceDataSet.java     |    4 +-
 .../query/dataset/groupby/GroupByFillDataSet.java  |    8 +-
 .../query/dataset/groupby/GroupByTimeDataSet.java  |   30 +-
 .../dataset/groupby/GroupByTimeEngineDataSet.java  |   24 +-
 .../groupby/GroupByWithValueFilterDataSet.java     |    6 +-
 .../groupby/GroupByWithoutValueFilterDataSet.java  |    9 +-
 .../iotdb/db/query/reader/series/SeriesReader.java |    3 +
 .../java/org/apache/iotdb/db/service/DataNode.java |    4 +-
 .../java/org/apache/iotdb/db/service/IoTDB.java    |    2 +-
 .../thrift/impl/DataNodeTSIServiceImpl.java        |   11 +-
 .../service/thrift/impl/InternalServiceImpl.java   |    6 +-
 .../db/service/thrift/impl/TSServiceImpl.java      |   47 +-
 .../db/sync/sender/manager/SchemaSyncManager.java  |    6 +-
 .../db/tools/{mlog => schema}/MLogParser.java      |    2 +-
 .../db/tools/schema/SchemaFileSketchTool.java      |  165 +++
 .../apache/iotdb/db/utils/QueryDataSetUtils.java   |    2 +-
 .../timerangeiterator/AggrWindowIterator.java      |   38 +-
 .../timerangeiterator/ITimeRangeIterator.java      |    8 +-
 .../timerangeiterator/PreAggrWindowIterator.java   |   38 +-
 .../PreAggrWindowWithNaturalMonthIterator.java     |   50 +-
 .../SingleTimeWindowIterator.java                  |   65 +
 .../org/apache/iotdb/db/wal/buffer/WALBuffer.java  |  143 +-
 .../java/org/apache/iotdb/db/wal/node/WALNode.java |    6 +-
 .../db/engine/storagegroup/DataRegionTest.java     |  910 ++++++++++++
 .../engine/storagegroup/TsFileProcessorTest.java   |   17 +
 ...ocessorTest.java => TsFileProcessorV2Test.java} |  104 +-
 .../iotdb/db/metadata/mtree/MTreeBelowSGTest.java  |   18 +-
 .../iotdb/db/mpp/buffer/SourceHandleTest.java      |    6 +-
 .../db/mpp/execution/ConfigExecutionTest.java      |  130 ++
 .../iotdb/db/mpp/execution/DataDriverTest.java     |   10 +-
 .../iotdb/db/mpp/operator/LimitOperatorTest.java   |   11 +-
 .../operator/SeriesAggregateScanOperatorTest.java  |  373 +++++
 .../db/mpp/operator/SeriesScanOperatorTest.java    |   17 +-
 .../db/mpp/operator/TimeJoinOperatorTest.java      |   11 +-
 .../operator/schema/SchemaScanOperatorTest.java    |   20 +-
 .../db/mpp/schedule/DefaultTaskSchedulerTest.java  |   18 +
 .../schedule/FragmentInstanceSchedulerTest.java    |   20 +
 .../FragmentInstanceTimeoutSentinelTest.java       |   55 +-
 .../db/mpp/schedule/queue/L1PriorityQueueTest.java |   22 +
 .../db/mpp/schedule/queue/L2PriorityQueueTest.java |   27 +
 .../db/mpp/sql/plan/DistributionPlannerTest.java   |   39 +
 .../db/mpp/sql/plan/QueryLogicalPlanUtil.java      |    8 +
 .../source/SeriesAggregateScanNodeSerdeTest.java   |    7 +-
 .../dataset/groupby/GroupByTimeDataSetTest.java    |   74 +-
 .../query/reader/series/SeriesReaderTestUtil.java  |    8 +
 .../iotdb/db/service/InternalServiceImplTest.java  |  117 +-
 .../org/apache/iotdb/db/tools/MLogParserTest.java  |    2 +-
 .../iotdb/db/tools/SchemaFileSketchTest.java       |  158 ++
 .../iotdb/db/utils/TimeRangeIteratorTest.java      |  230 ++-
 .../db/wal/recover/WALRecoverManagerTest.java      |    6 +-
 .../org/apache/iotdb/rpc/RedirectException.java    |   14 +-
 .../main/java/org/apache/iotdb/rpc/RpcUtils.java   |    4 +-
 .../java/org/apache/iotdb/session/Session.java     |   52 +-
 .../apache/iotdb/session/SessionConnection.java    |   16 +-
 .../apache/iotdb/session/util/SessionUtils.java    |   12 +-
 .../apache/iotdb/session/SessionCacheLeaderUT.java |   28 +-
 spark-iotdb-connector/pom.xml                      |    2 +-
 .../src/main/thrift/confignode.thrift              |    4 +-
 thrift/src/main/thrift/common.thrift               |    6 +-
 .../iotdb/tsfile/read/common/block/TsBlock.java    |   22 +-
 .../tsfile/read/common/block/TsBlockBuilder.java   |    5 +
 .../common/block/column/BinaryColumnBuilder.java   |   11 +
 .../common/block/column/BooleanColumnBuilder.java  |   11 +
 .../read/common/block/column/ColumnBuilder.java    |    5 +
 .../common/block/column/DoubleColumnBuilder.java   |   11 +
 .../common/block/column/FloatColumnBuilder.java    |   11 +
 .../read/common/block/column/IntColumnBuilder.java |   11 +
 .../common/block/column/LongColumnBuilder.java     |   11 +
 .../read/common/block/column/TimeColumn.java       |    4 +
 .../common/block/column/TimeColumnBuilder.java     |   11 +
 zeppelin-interpreter/pom.xml                       |    1 -
 195 files changed, 9032 insertions(+), 2543 deletions(-)

diff --cc 
server/src/main/java/org/apache/iotdb/db/mpp/buffer/DataBlockManager.java
index ab7daa5289,573771fe81..7c3f24509d
--- a/server/src/main/java/org/apache/iotdb/db/mpp/buffer/DataBlockManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/buffer/DataBlockManager.java
@@@ -61,6 -61,6 +61,8 @@@ public class DataBlockManager implement
      void onClosed(SinkHandle sinkHandle);
  
      void onAborted(SinkHandle sinkHandle);
++
++    void onFailed(Throwable t);
    }
  
    /** Handle thrift communications. */
@@@ -166,6 -166,6 +168,7 @@@
  
    /** Listen to the state changes of a source handle. */
    class SourceHandleListenerImpl implements SourceHandleListener {
++
      @Override
      public void onFinished(SourceHandle sourceHandle) {
        logger.info("Release resources of finished source handle {}", 
sourceHandle);
@@@ -206,12 -208,12 +211,12 @@@
          logger.info("Resources of finished sink handle {} has already been 
released", sinkHandle);
        }
        sinkHandles.remove(sinkHandle.getLocalFragmentInstanceId());
--      context.finish();
++      context.finished();
      }
  
      @Override
      public void onClosed(SinkHandle sinkHandle) {
--      context.flushing();
++      context.transitionToFlushing();
      }
  
      @Override
@@@ -222,6 -224,6 +227,11 @@@
        }
        sinkHandles.remove(sinkHandle.getLocalFragmentInstanceId());
      }
++
++    @Override
++    public void onFailed(Throwable t) {
++      context.failed(t);
++    }
    }
  
    private final LocalMemoryManager localMemoryManager;
diff --cc server/src/main/java/org/apache/iotdb/db/mpp/buffer/ISinkHandle.java
index 1d037678ed,6300c5beef..05d12e4dfa
--- a/server/src/main/java/org/apache/iotdb/db/mpp/buffer/ISinkHandle.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/buffer/ISinkHandle.java
@@@ -24,9 -24,8 +24,8 @@@ import com.google.common.util.concurren
  
  import java.io.IOException;
  import java.util.List;
- import java.util.Optional;
  
 -public interface ISinkHandle extends AutoCloseable {
 +public interface ISinkHandle {
  
    /** Get the total amount of memory used by buffered tsblocks. */
    long getBufferRetainedSizeInBytes();
@@@ -71,7 -70,8 +70,7 @@@
     * downstream instances. A {@link RuntimeException} will be thrown if any 
exception happened
     * during the data transmission.
     */
 -  @Override
--  void close() throws IOException;
++  void close();
  
    /** Abort the sink handle, discarding all tsblocks which may still be in 
memory buffer. */
    void abort();
diff --cc server/src/main/java/org/apache/iotdb/db/mpp/buffer/SinkHandle.java
index b2fd70459d,a72dcb9cb6..c6e3e64739
--- a/server/src/main/java/org/apache/iotdb/db/mpp/buffer/SinkHandle.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/buffer/SinkHandle.java
@@@ -74,7 -73,7 +73,6 @@@ public class SinkHandle implements ISin
    private long bufferRetainedSizeInBytes;
    private boolean closed;
    private boolean noMoreTsBlocks;
--  private Throwable throwable;
  
    public SinkHandle(
        String remoteHostname,
@@@ -191,11 -193,11 +189,8 @@@
    }
  
    @Override
--  public void close() throws IOException {
++  public void close() {
      logger.info("Sink handle {} is being closed.", this);
--    if (throwable != null) {
--      throw new IOException(throwable);
--    }
      if (closed) {
        return;
      }
@@@ -207,7 -209,7 +202,7 @@@
      try {
        sendEndOfDataBlockEvent();
      } catch (TException e) {
--      throw new IOException(e);
++      throw new RuntimeException("Send EndOfDataBlockEvent failed", e);
      }
      logger.info("Sink handle {} is closed.", this);
    }
@@@ -249,7 -243,7 +236,7 @@@
  
    @Override
    public boolean isFinished() {
--    return throwable == null && noMoreTsBlocks && 
sequenceIdToTsBlock.isEmpty();
++    return noMoreTsBlocks && sequenceIdToTsBlock.isEmpty();
    }
  
    @Override
@@@ -364,7 -358,7 +351,7 @@@
          try {
            client.onNewDataBlockEvent(newDataBlockEvent);
            break;
--        } catch (TException e) {
++        } catch (Throwable e) {
            logger.error(
                "Failed to send new data block event to plan node {} of {} due 
to {}, attempt times: {}",
                remotePlanNodeId,
@@@ -373,9 -367,9 +360,7 @@@
                attempt,
                e);
            if (attempt == MAX_ATTEMPT_TIMES) {
--            synchronized (this) {
--              throwable = e;
--            }
++            sinkHandleListener.onFailed(e);
            }
          }
        }
diff --cc 
server/src/main/java/org/apache/iotdb/db/mpp/buffer/StubSinkHandle.java
index c4bc0e377a,9f3b9240c2..49a13eddd7
--- a/server/src/main/java/org/apache/iotdb/db/mpp/buffer/StubSinkHandle.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/buffer/StubSinkHandle.java
@@@ -88,7 -87,7 +87,7 @@@ public class StubSinkHandle implements 
        return;
      }
      closed = true;
--    instanceContext.flushing();
++    instanceContext.transitionToFlushing();
    }
  
    @Override
diff --cc server/src/main/java/org/apache/iotdb/db/mpp/execution/DataDriver.java
index 2921c9d889,5ae66b14d7..e6e8e8083a
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/DataDriver.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/DataDriver.java
@@@ -18,7 -18,7 +18,6 @@@
   */
  package org.apache.iotdb.db.mpp.execution;
  
- import com.google.common.util.concurrent.SettableFuture;
 -import org.apache.iotdb.commons.exception.IoTDBException;
  import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
  import org.apache.iotdb.db.engine.storagegroup.DataRegion;
  import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
@@@ -29,30 -29,81 +28,35 @@@ import org.apache.iotdb.db.mpp.buffer.I
  import org.apache.iotdb.db.mpp.operator.Operator;
  import org.apache.iotdb.db.mpp.operator.source.DataSourceOperator;
  import org.apache.iotdb.db.query.control.FileReaderManager;
 -import org.apache.iotdb.tsfile.read.common.block.TsBlock;
  
 -import com.google.common.util.concurrent.ListenableFuture;
+ import com.google.common.util.concurrent.SettableFuture;
 -import io.airlift.units.Duration;
 -import org.slf4j.Logger;
 -import org.slf4j.LoggerFactory;
+ 
  import javax.annotation.concurrent.NotThreadSafe;
+ 
 -import java.io.IOException;
 -import java.util.Collections;
  import java.util.HashSet;
  import java.util.Iterator;
  import java.util.List;
  import java.util.Set;
 -import java.util.concurrent.TimeUnit;
 -import java.util.concurrent.atomic.AtomicReference;
  import java.util.stream.Collectors;
  
 -import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
 -import static org.apache.iotdb.db.mpp.operator.Operator.NOT_BLOCKED;
--
+ /**
+  * One dataDriver is responsible for one FragmentInstance which is for data 
query, which may
+  * contains several series.
+  */
  @NotThreadSafe
 -public class DataDriver implements Driver {
 -
 -  private static final Logger logger = 
LoggerFactory.getLogger(DataDriver.class);
 -
 -  private final Operator root;
 -  private final ISinkHandle sinkHandle;
 -  private final DataDriverContext driverContext;
 +public class DataDriver extends Driver {
  
    private boolean init;
 -  private boolean closed;
  
    /** closed tsfile used in this fragment instance */
    private Set<TsFileResource> closedFilePaths;
    /** unClosed tsfile used in this fragment instance */
    private Set<TsFileResource> unClosedFilePaths;
  
 -  private final AtomicReference<SettableFuture<Void>> driverBlockedFuture = 
new AtomicReference<>();
--
    public DataDriver(Operator root, ISinkHandle sinkHandle, DataDriverContext 
driverContext) {
 -    this.root = root;
 -    this.sinkHandle = sinkHandle;
 -    this.driverContext = driverContext;
 +    super(root, sinkHandle, driverContext);
      this.closedFilePaths = new HashSet<>();
      this.unClosedFilePaths = new HashSet<>();
 -    // initially the driverBlockedFuture is not blocked (it is completed)
 -    SettableFuture<Void> future = SettableFuture.create();
 -    future.set(null);
 -    driverBlockedFuture.set(future);
 -  }
 -
 -  @Override
 -  public boolean isFinished() {
 -    try {
 -      boolean isFinished =
 -          closed || (driverBlockedFuture.get().isDone() && root != null && 
root.isFinished());
 -      if (isFinished) {
 -        close();
 -      }
 -      return isFinished;
 -    } catch (Throwable t) {
 -      logger.error(
 -          "Failed to query whether the data driver {} is finished", 
driverContext.getId(), t);
 -      driverContext.failed(t);
 -      close();
 -      return true;
 -    }
    }
  
    @Override
@@@ -61,39 -117,77 +65,39 @@@
        try {
          initialize();
        } catch (Throwable t) {
 -        logger.error(
 +        LOGGER.error(
              "Failed to do the initialization for fragment instance {} ", 
driverContext.getId(), t);
          driverContext.failed(t);
 -        close();
          blockedFuture.setException(t);
 -        return blockedFuture;
 +        return false;
        }
      }
 -
 -    // if the driver is blocked we don't need to continue
 -    if (!blockedFuture.isDone()) {
 -      return blockedFuture;
 -    }
 -
 -    long maxRuntime = duration.roundTo(TimeUnit.NANOSECONDS);
 -
 -    long start = System.nanoTime();
 -    try {
 -      do {
 -        ListenableFuture<Void> future = processInternal();
 -        if (!future.isDone()) {
 -          return updateDriverBlockedFuture(future);
 -        }
 -      } while (System.nanoTime() - start < maxRuntime && !root.isFinished());
 -    } catch (Throwable t) {
 -      logger.error("Failed to execute fragment instance {}", 
driverContext.getId(), t);
 -      driverContext.failed(t);
 -      close();
 -      blockedFuture.setException(t);
 -      return blockedFuture;
 -    }
 -    return NOT_BLOCKED;
 -  }
 -
 -  @Override
 -  public FragmentInstanceId getInfo() {
 -    return driverContext.getId();
 +    return true;
    }
  
 +  /**
 +   * All file paths used by this fragment instance must be cleared and thus 
the usage reference must
 +   * be decreased.
 +   */
    @Override
 -  public void close() {
 -    if (closed) {
 -      return;
 +  protected void releaseResource() {
 +    for (TsFileResource tsFile : closedFilePaths) {
 +      FileReaderManager.getInstance().decreaseFileReaderReference(tsFile, 
true);
      }
 -    closed = true;
 -    try {
 -      if (root != null) {
 -        root.close();
 -      }
 -      if (sinkHandle != null) {
 -        sinkHandle.close();
 -      }
 -    } catch (Throwable t) {
 -      logger.error("Failed to closed driver {}", driverContext.getId(), t);
 -      driverContext.failed(t);
 -    } finally {
 -      removeUsedFilesForQuery();
 +    closedFilePaths = null;
 +    for (TsFileResource tsFile : unClosedFilePaths) {
 +      FileReaderManager.getInstance().decreaseFileReaderReference(tsFile, 
true);
      }
 -  }
 -
 -  @Override
 -  public void failed(Throwable t) {
 -    driverContext.failed(t);
 +    unClosedFilePaths = null;
    }
  
- 
    /**
     * init seq file list and unseq file list in QueryDataSource and set it 
into each SourceNode TODO
     * we should change all the blocked lock operation into tryLock
     */
    private void initialize() throws QueryProcessException {
-     List<DataSourceOperator> sourceOperators = 
((DataDriverContext)driverContext).getSourceOperators();
 -    List<DataSourceOperator> sourceOperators = 
driverContext.getSourceOperators();
++    List<DataSourceOperator> sourceOperators =
++        ((DataDriverContext) driverContext).getSourceOperators();
      if (sourceOperators != null && !sourceOperators.isEmpty()) {
        QueryDataSource dataSource = initQueryDataSourceCache();
        sourceOperators.forEach(
@@@ -121,9 -214,9 +125,7 @@@
      dataRegion.readLock();
      try {
        List<PartialPath> pathList =
-           context.getPaths().stream()
 -          driverContext.getPaths().stream()
--              .map(IDTable::translateQueryPath)
--              .collect(Collectors.toList());
++          
context.getPaths().stream().map(IDTable::translateQueryPath).collect(Collectors.toList());
        // when all the selected series are under the same device, the 
QueryDataSource will be
        // filtered according to timeIndex
        Set<String> selectedDeviceIdSet =
@@@ -188,5 -296,39 +190,4 @@@
        FileReaderManager.getInstance().increaseFileReaderReference(tsFile, 
isClosed);
      }
    }
 -
 -  private ListenableFuture<Void> processInternal() throws IOException, 
IoTDBException {
 -    ListenableFuture<Void> blocked = root.isBlocked();
 -    if (!blocked.isDone()) {
 -      return blocked;
 -    }
 -    blocked = sinkHandle.isFull();
 -    if (!blocked.isDone()) {
 -      return blocked;
 -    }
 -    if (root.hasNext()) {
 -      TsBlock tsBlock = root.next();
 -      if (tsBlock != null && !tsBlock.isEmpty()) {
 -        sinkHandle.send(Collections.singletonList(tsBlock));
 -      }
 -    }
 -    return NOT_BLOCKED;
 -  }
 -
 -  private ListenableFuture<Void> updateDriverBlockedFuture(
 -      ListenableFuture<Void> sourceBlockedFuture) {
 -    // driverBlockedFuture will be completed as soon as the 
sourceBlockedFuture is completed
 -    // or any of the operators gets a memory revocation request
 -    SettableFuture<Void> newDriverBlockedFuture = SettableFuture.create();
 -    driverBlockedFuture.set(newDriverBlockedFuture);
 -    sourceBlockedFuture.addListener(() -> newDriverBlockedFuture.set(null), 
directExecutor());
 -
 -    // TODO Although we don't have memory management for operator now, we 
should consider it for
 -    // future
 -    // it's possible that memory revoking is requested for some operator
 -    // before we update driverBlockedFuture above and we don't want to miss 
that
 -    // notification, so we check to see whether that's the case before 
returning.
--
 -    return newDriverBlockedFuture;
 -  }
  }
diff --cc server/src/main/java/org/apache/iotdb/db/mpp/execution/Driver.java
index fd2fef8b3a,f211ce593c..8687627055
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/Driver.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/Driver.java
@@@ -18,88 -18,25 +18,90 @@@
   */
  package org.apache.iotdb.db.mpp.execution;
  
- import com.google.common.collect.ImmutableList;
- import com.google.common.util.concurrent.ListenableFuture;
- import com.google.common.util.concurrent.SettableFuture;
- import io.airlift.units.Duration;
 +import org.apache.iotdb.db.mpp.buffer.ISinkHandle;
  import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
 +import org.apache.iotdb.db.mpp.operator.Operator;
 +import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+ 
++import com.google.common.collect.ImmutableList;
+ import com.google.common.util.concurrent.ListenableFuture;
++import com.google.common.util.concurrent.SettableFuture;
+ import io.airlift.units.Duration;
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
  
 -import java.io.Closeable;
 +import javax.annotation.concurrent.GuardedBy;
++
 +import java.util.Collections;
 +import java.util.List;
 +import java.util.Optional;
 +import java.util.concurrent.TimeUnit;
 +import java.util.concurrent.atomic.AtomicReference;
 +import java.util.concurrent.locks.ReentrantLock;
 +import java.util.function.Supplier;
 +
 +import static com.google.common.base.Preconditions.checkState;
 +import static com.google.common.base.Throwables.throwIfUnchecked;
 +import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
 +import static java.lang.Boolean.TRUE;
 +import static org.apache.iotdb.db.mpp.operator.Operator.NOT_BLOCKED;
  
  /**
   * Driver encapsulates some methods which are necessary for execution 
scheduler to run a fragment
   * instance
   */
 -public interface Driver extends Closeable {
 +public abstract class Driver {
 +
 +  protected static final Logger LOGGER = 
LoggerFactory.getLogger(Driver.class);
 +
- 
 +  protected final Operator root;
 +  protected final ISinkHandle sinkHandle;
 +  protected final DriverContext driverContext;
-   protected final AtomicReference<SettableFuture<Void>> driverBlockedFuture = 
new AtomicReference<>();
++  protected final AtomicReference<SettableFuture<Void>> driverBlockedFuture =
++      new AtomicReference<>();
 +  protected final AtomicReference<State> state = new 
AtomicReference<>(State.ALIVE);
 +
 +  protected final DriverLock exclusiveLock = new DriverLock();
 +
 +  protected enum State {
-     ALIVE, NEED_DESTRUCTION, DESTROYED
++    ALIVE,
++    NEED_DESTRUCTION,
++    DESTROYED
 +  }
 +
 +  public Driver(Operator root, ISinkHandle sinkHandle, DriverContext 
driverContext) {
 +    this.root = root;
 +    this.sinkHandle = sinkHandle;
 +    this.driverContext = driverContext;
 +
 +    // initially the driverBlockedFuture is not blocked (it is completed)
 +    SettableFuture<Void> future = SettableFuture.create();
 +    future.set(null);
 +    driverBlockedFuture.set(future);
 +  }
  
    /**
     * Used to judge whether this fragment instance should be scheduled for 
execution anymore
     *
     * @return true if the FragmentInstance is done or terminated due to 
failure, otherwise false.
     */
 -  boolean isFinished();
 +  public boolean isFinished() {
 +    checkLockNotHeld("Cannot check finished status while holding the driver 
lock");
 +
 +    // if we can get the lock, attempt a clean shutdown; otherwise someone 
else will shutdown
 +    Optional<Boolean> result = 
tryWithLockUnInterruptibly(this::isFinishedInternal);
 +    return result.orElseGet(() -> state.get() != State.ALIVE || 
driverContext.isDone());
 +  }
 +
 +  /**
 +   * do initialization
 +   *
 +   * @return true if init succeed, false otherwise
 +   */
 +  protected abstract boolean init(SettableFuture<Void> blockedFuture);
 +
-   /**
-    * release resource this driver used
-    */
++  /** release resource this driver used */
 +  protected abstract void releaseResource();
  
    /**
     * run the fragment instance for {@param duration} time slice, the time of 
this run is likely not
@@@ -107,351 -44,27 +109,355 @@@
     *
     * @param duration how long should this fragment instance run
     * @return the returned ListenableFuture<Void> is used to represent status 
of this processing if
-    * isDone() return true, meaning that this fragment instance is not blocked 
and is ready for
-    * next processing otherwise, meaning that this fragment instance is 
blocked and not ready for
-    * next processing.
+    *     isDone() return true, meaning that this fragment instance is not 
blocked and is ready for
+    *     next processing otherwise, meaning that this fragment instance is 
blocked and not ready for
+    *     next processing.
     */
 -  ListenableFuture<Void> processFor(Duration duration);
 +  public ListenableFuture<Void> processFor(Duration duration) {
 +
 +    SettableFuture<Void> blockedFuture = driverBlockedFuture.get();
 +    // initialization may be time-consuming, so we keep it in the processFor 
method
 +    // in normal case, it won't cause deadlock and should finish soon, 
otherwise it will be a
 +    // critical bug
 +    if (!init(blockedFuture)) {
 +      return blockedFuture;
 +    }
 +
 +    // if the driver is blocked we don't need to continue
 +    if (!blockedFuture.isDone()) {
 +      return blockedFuture;
 +    }
 +
 +    long maxRuntime = duration.roundTo(TimeUnit.NANOSECONDS);
 +
-     Optional<ListenableFuture<Void>> result = tryWithLock(100, 
TimeUnit.MILLISECONDS, true, () -> {
-       long start = System.nanoTime();
-       do {
-         ListenableFuture<Void> future = processInternal();
-         if (!future.isDone()) {
-           return updateDriverBlockedFuture(future);
-         }
-       }
-       while (System.nanoTime() - start < maxRuntime && !isFinishedInternal());
-       return NOT_BLOCKED;
-     });
++    Optional<ListenableFuture<Void>> result =
++        tryWithLock(
++            100,
++            TimeUnit.MILLISECONDS,
++            true,
++            () -> {
++              long start = System.nanoTime();
++              do {
++                ListenableFuture<Void> future = processInternal();
++                if (!future.isDone()) {
++                  return updateDriverBlockedFuture(future);
++                }
++              } while (System.nanoTime() - start < maxRuntime && 
!isFinishedInternal());
++              return NOT_BLOCKED;
++            });
 +
 +    return result.orElse(NOT_BLOCKED);
 +  }
  
    /**
     * the id information about this Fragment Instance.
     *
     * @return a {@link FragmentInstanceId} instance.
     */
 -  FragmentInstanceId getInfo();
 +  public FragmentInstanceId getInfo() {
 +    return driverContext.getId();
 +  }
  
-   /**
-    * clear resource used by this fragment instance
-    */
+   /** clear resource used by this fragment instance */
 -  @Override
 -  void close();
 +  public void close() {
 +    // mark the service for destruction
 +    if (!state.compareAndSet(State.ALIVE, State.NEED_DESTRUCTION)) {
 +      return;
 +    }
 +
 +    exclusiveLock.interruptCurrentOwner();
 +
 +    // if we can get the lock, attempt a clean shutdown; otherwise someone 
else will shutdown
 +    tryWithLockUnInterruptibly(() -> TRUE);
 +  }
  
    /**
     * fail current driver
     *
     * @param t reason cause this failure
     */
 -  void failed(Throwable t);
 +  public void failed(Throwable t) {
 +    driverContext.failed(t);
 +  }
 +
 +  public ISinkHandle getSinkHandle() {
 +    return sinkHandle;
 +  }
 +
 +  @GuardedBy("exclusiveLock")
 +  private boolean isFinishedInternal() {
 +    checkLockHeld("Lock must be held to call isFinishedInternal");
 +
-     boolean finished = state.get() != State.ALIVE || driverContext.isDone() 
|| root == null || root.isFinished();
++    boolean finished =
++        state.get() != State.ALIVE || driverContext.isDone() || root == null 
|| root.isFinished();
 +    if (finished) {
 +      state.compareAndSet(State.ALIVE, State.NEED_DESTRUCTION);
 +    }
 +    return finished;
 +  }
 +
- 
 +  private ListenableFuture<Void> processInternal() {
 +    try {
 +      ListenableFuture<Void> blocked = root.isBlocked();
 +      if (!blocked.isDone()) {
 +        return blocked;
 +      }
 +      blocked = sinkHandle.isFull();
 +      if (!blocked.isDone()) {
 +        return blocked;
 +      }
 +      if (root.hasNext()) {
 +        TsBlock tsBlock = root.next();
 +        if (tsBlock != null && !tsBlock.isEmpty()) {
 +          sinkHandle.send(Collections.singletonList(tsBlock));
 +        }
 +      }
 +      return NOT_BLOCKED;
 +    } catch (Throwable t) {
 +      LOGGER.error("Failed to execute fragment instance {}", 
driverContext.getId(), t);
 +      List<StackTraceElement> interrupterStack = 
exclusiveLock.getInterrupterStack();
 +      if (interrupterStack == null) {
 +        driverContext.failed(t);
 +        throw t;
 +      }
 +
 +      // Driver thread was interrupted which should only happen if the task 
is already finished.
-       // If this becomes the actual cause of a failed query there is a bug in 
the task state machine.
++      // If this becomes the actual cause of a failed query there is a bug in 
the task state
++      // machine.
 +      Exception exception = new Exception("Interrupted By");
 +      exception.setStackTrace(interrupterStack.toArray(new 
StackTraceElement[0]));
 +      RuntimeException newException = new RuntimeException("Driver was 
interrupted", exception);
 +      newException.addSuppressed(t);
 +      driverContext.failed(newException);
 +      throw newException;
 +    }
 +  }
 +
 +  private ListenableFuture<Void> updateDriverBlockedFuture(
 +      ListenableFuture<Void> sourceBlockedFuture) {
 +    // driverBlockedFuture will be completed as soon as the 
sourceBlockedFuture is completed
 +    // or any of the operators gets a memory revocation request
 +    SettableFuture<Void> newDriverBlockedFuture = SettableFuture.create();
 +    driverBlockedFuture.set(newDriverBlockedFuture);
 +    sourceBlockedFuture.addListener(() -> newDriverBlockedFuture.set(null), 
directExecutor());
 +
 +    // TODO Although we don't have memory management for operator now, we 
should consider it for
 +    // future
 +    // it's possible that memory revoking is requested for some operator
 +    // before we update driverBlockedFuture above and we don't want to miss 
that
 +    // notification, so we check to see whether that's the case before 
returning.
 +
 +    return newDriverBlockedFuture;
 +  }
 +
- 
 +  private synchronized void checkLockNotHeld(String message) {
 +    checkState(!exclusiveLock.isHeldByCurrentThread(), message);
 +  }
 +
 +  @GuardedBy("exclusiveLock")
 +  private synchronized void checkLockHeld(String message) {
 +    checkState(exclusiveLock.isHeldByCurrentThread(), message);
 +  }
 +
 +  /**
-    * Try to acquire the {@code exclusiveLock} immediately and run a {@code 
task}
-    * The task will not be interrupted if the {@code Driver} is closed.
-    * <p>
-    * Note: task cannot return null
++   * Try to acquire the {@code exclusiveLock} immediately and run a {@code 
task} The task will not
++   * be interrupted if the {@code Driver} is closed.
++   *
++   * <p>Note: task cannot return null
 +   */
 +  private <T> Optional<T> tryWithLockUnInterruptibly(Supplier<T> task) {
 +    return tryWithLock(0, TimeUnit.MILLISECONDS, false, task);
 +  }
 +
 +  /**
-    * Try to acquire the {@code exclusiveLock} with {@code timeout} and run a 
{@code task}.
-    * If the {@code interruptOnClose} flag is set to {@code true} the {@code 
task} will be
-    * interrupted if the {@code Driver} is closed.
-    * <p>
-    * Note: task cannot return null
++   * Try to acquire the {@code exclusiveLock} with {@code timeout} and run a 
{@code task}. If the
++   * {@code interruptOnClose} flag is set to {@code true} the {@code task} 
will be interrupted if
++   * the {@code Driver} is closed.
++   *
++   * <p>Note: task cannot return null
 +   */
-   private <T> Optional<T> tryWithLock(long timeout, TimeUnit unit, boolean 
interruptOnClose, Supplier<T> task) {
++  private <T> Optional<T> tryWithLock(
++      long timeout, TimeUnit unit, boolean interruptOnClose, Supplier<T> 
task) {
 +    checkLockNotHeld("Lock cannot be reacquired");
 +
 +    boolean acquired = false;
 +    try {
 +      acquired = exclusiveLock.tryLock(timeout, unit, interruptOnClose);
 +    } catch (InterruptedException e) {
 +      Thread.currentThread().interrupt();
 +    }
 +
 +    if (!acquired) {
 +      return Optional.empty();
 +    }
 +
 +    Optional<T> result;
 +    try {
 +      result = Optional.of(task.get());
 +    } finally {
 +      try {
 +        destroyIfNecessary();
 +      } finally {
 +        exclusiveLock.unlock();
 +      }
 +    }
 +
 +    return result;
 +  }
 +
 +  @GuardedBy("exclusiveLock")
 +  private void destroyIfNecessary() {
 +    checkLockHeld("Lock must be held to call destroyIfNecessary");
 +
 +    if (!state.compareAndSet(State.NEED_DESTRUCTION, State.DESTROYED)) {
 +      return;
 +    }
 +
 +    // if we get an error while closing a driver, record it and we will throw 
it at the end
 +    Throwable inFlightException = null;
 +    try {
 +      inFlightException = closeAndDestroyOperators();
 +      driverContext.finished();
 +    } catch (Throwable t) {
 +      // this shouldn't happen but be safe
-       inFlightException = addSuppressedException(
-           inFlightException,
-           t,
-           "Error destroying driver for task %s",
-           driverContext.getId());
++      inFlightException =
++          addSuppressedException(
++              inFlightException, t, "Error destroying driver for task %s", 
driverContext.getId());
 +    } finally {
 +      releaseResource();
 +    }
 +
 +    if (inFlightException != null) {
 +      // this will always be an Error or Runtime
 +      throwIfUnchecked(inFlightException);
 +      throw new RuntimeException(inFlightException);
 +    }
 +  }
 +
 +  private Throwable closeAndDestroyOperators() {
 +    // record the current interrupted status (and clear the flag); we'll 
reset it later
 +    boolean wasInterrupted = Thread.interrupted();
 +
 +    Throwable inFlightException = null;
 +
 +    try {
 +      if (root != null) {
 +        root.close();
 +      }
 +      if (sinkHandle != null) {
 +        sinkHandle.close();
 +      }
 +    } catch (InterruptedException t) {
 +      // don't record the stack
 +      wasInterrupted = true;
 +    } catch (Throwable t) {
 +      // TODO currently, we won't know exact operator which is failed in 
closing
-       inFlightException = addSuppressedException(
-           inFlightException,
-           t,
-           "Error closing operator {} for fragment instance {}",
-           root.getOperatorContext().getOperatorId(),
-           driverContext.getId());
++      inFlightException =
++          addSuppressedException(
++              inFlightException,
++              t,
++              "Error closing operator {} for fragment instance {}",
++              root.getOperatorContext().getOperatorId(),
++              driverContext.getId());
 +    } finally {
 +      // reset the interrupted flag
 +      if (wasInterrupted) {
 +        Thread.currentThread().interrupt();
 +      }
 +    }
 +    return inFlightException;
 +  }
 +
-   private static Throwable addSuppressedException(Throwable 
inFlightException, Throwable newException, String message, Object... args) {
++  private static Throwable addSuppressedException(
++      Throwable inFlightException, Throwable newException, String message, 
Object... args) {
 +    if (newException instanceof Error) {
 +      if (inFlightException == null) {
 +        inFlightException = newException;
 +      } else {
 +        // Self-suppression not permitted
 +        if (inFlightException != newException) {
 +          inFlightException.addSuppressed(newException);
 +        }
 +      }
 +    } else {
 +      // log normal exceptions instead of rethrowing them
 +      LOGGER.error(message, args, newException);
 +    }
 +    return inFlightException;
 +  }
 +
 +  private static class DriverLock {
 +    private final ReentrantLock lock = new ReentrantLock();
 +
 +    @GuardedBy("this")
 +    private Thread currentOwner;
++
 +    @GuardedBy("this")
 +    private boolean currentOwnerInterruptionAllowed;
 +
 +    @GuardedBy("this")
 +    private List<StackTraceElement> interrupterStack;
 +
 +    public boolean isHeldByCurrentThread() {
 +      return lock.isHeldByCurrentThread();
 +    }
 +
 +    public boolean tryLock(boolean currentThreadInterruptionAllowed) {
 +      checkState(!lock.isHeldByCurrentThread(), "Lock is not reentrant");
 +      boolean acquired = lock.tryLock();
 +      if (acquired) {
 +        setOwner(currentThreadInterruptionAllowed);
 +      }
 +      return acquired;
 +    }
 +
 +    public boolean tryLock(long timeout, TimeUnit unit, boolean 
currentThreadInterruptionAllowed)
 +        throws InterruptedException {
 +      checkState(!lock.isHeldByCurrentThread(), "Lock is not reentrant");
 +      boolean acquired = lock.tryLock(timeout, unit);
 +      if (acquired) {
 +        setOwner(currentThreadInterruptionAllowed);
 +      }
 +      return acquired;
 +    }
 +
 +    private synchronized void setOwner(boolean interruptionAllowed) {
 +      checkState(lock.isHeldByCurrentThread(), "Current thread does not hold 
lock");
 +      currentOwner = Thread.currentThread();
 +      currentOwnerInterruptionAllowed = interruptionAllowed;
 +      // NOTE: We do not use interrupted stack information to know that 
another
 +      // thread has attempted to interrupt the driver, and interrupt this new 
lock
 +      // owner.  The interrupted stack information is for debugging purposes 
only.
 +      // In the case of interruption, the caller should (and does) have a 
separate
 +      // state to prevent further processing in the Driver.
 +    }
 +
 +    public synchronized void unlock() {
 +      checkState(lock.isHeldByCurrentThread(), "Current thread does not hold 
lock");
 +      currentOwner = null;
 +      currentOwnerInterruptionAllowed = false;
 +      lock.unlock();
 +    }
 +
 +    public synchronized List<StackTraceElement> getInterrupterStack() {
 +      return interrupterStack;
 +    }
 +
 +    public synchronized void interruptCurrentOwner() {
 +      if (!currentOwnerInterruptionAllowed) {
 +        return;
 +      }
 +      // there is a benign race condition here were the lock holder
 +      // can be change between attempting to get lock and grabbing
 +      // the synchronized lock here, but in either case we want to
 +      // interrupt the lock holder thread
 +      if (interrupterStack == null) {
 +        interrupterStack = 
ImmutableList.copyOf(Thread.currentThread().getStackTrace());
 +      }
 +
 +      if (currentOwner != null) {
 +        currentOwner.interrupt();
 +      }
 +    }
 +  }
  }
diff --cc 
server/src/main/java/org/apache/iotdb/db/mpp/execution/DriverContext.java
index d0bc3bb232,8c20a2c334..8985508b0c
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/DriverContext.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/DriverContext.java
@@@ -26,9 -24,6 +26,8 @@@ public class DriverContext 
  
    private final FragmentInstanceContext fragmentInstanceContext;
  
 +  private final AtomicBoolean finished = new AtomicBoolean();
 +
- 
    public DriverContext(FragmentInstanceContext fragmentInstanceContext) {
      this.fragmentInstanceContext = fragmentInstanceContext;
    }
@@@ -43,15 -38,13 +42,14 @@@
  
    public void failed(Throwable cause) {
      fragmentInstanceContext.failed(cause);
 +    finished.set(true);
    }
  
 -  public void finish() {
 -    fragmentInstanceContext.finish();
 +  public void finished() {
 +    finished.compareAndSet(false, true);
    }
  
- 
 -  public void flushing() {
 -    fragmentInstanceContext.flushing();
 +  public boolean isDone() {
 +    return finished.get();
    }
  }
diff --cc 
server/src/main/java/org/apache/iotdb/db/mpp/execution/FragmentInstanceContext.java
index c3d33dbd74,b2e51be2bc..d1b81d7be9
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/FragmentInstanceContext.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/FragmentInstanceContext.java
@@@ -65,44 -58,10 +64,43 @@@ public class FragmentInstanceContext ex
    //    private final AtomicLong endFullGcCount = new AtomicLong(-1);
    //    private final AtomicLong endFullGcTimeNanos = new AtomicLong(-1);
  
--  public FragmentInstanceContext(
-       FragmentInstanceId id, FragmentInstanceStateMachine stateMachine) {
 -      FragmentInstanceId id, AtomicReference<FragmentInstanceState> state) {
++  public FragmentInstanceContext(FragmentInstanceId id, 
FragmentInstanceStateMachine stateMachine) {
      this.id = id;
 -    this.state = state;
 +    this.stateMachine = stateMachine;
 +  }
 +
 +  public void start() {
 +    long now = System.currentTimeMillis();
 +    executionStartTime.compareAndSet(null, now);
 +    startNanos.compareAndSet(0, System.nanoTime());
 +
 +    // always update last execution start time
 +    lastExecutionStartTime.set(now);
 +  }
 +
 +  // the state change listener is added here in a separate initialize() method
 +  // instead of the constructor to prevent leaking the "this" reference to
 +  // another thread, which will cause unsafe publication of this instance.
 +  private void initialize() {
 +    stateMachine.addStateChangeListener(this::updateStatsIfDone);
 +  }
 +
 +  private void updateStatsIfDone(FragmentInstanceState newState) {
 +    if (newState.isDone()) {
 +      long now = System.currentTimeMillis();
 +
 +      // before setting the end times, make sure a start has been recorded
 +      executionStartTime.compareAndSet(null, now);
 +      startNanos.compareAndSet(0, System.nanoTime());
 +
 +      // Only update last start time, if the nothing was started
 +      lastExecutionStartTime.compareAndSet(null, now);
 +
 +      // use compare and set from initial value to avoid overwriting if there
 +      // were a duplicate notification, which shouldn't happen
 +      executionEndTime.compareAndSet(null, now);
 +      endNanos.compareAndSet(0, System.nanoTime());
 +    }
    }
  
    public OperatorContext addOperatorContext(
@@@ -139,10 -98,40 +137,18 @@@
    }
  
    public void failed(Throwable cause) {
 -    LOGGER.warn("Fragment Instance {} failed.", id, cause);
 -    state.set(FragmentInstanceState.FAILED);
 +    stateMachine.failed(cause);
    }
  
 -  public void cancel() {
 -    state.set(FragmentInstanceState.CANCELED);
 -    this.endTime = System.currentTimeMillis();
++  public void finished() {
++    stateMachine.finished();
+   }
+ 
 -  public void abort() {
 -    state.set(FragmentInstanceState.ABORTED);
 -    this.endTime = System.currentTimeMillis();
 -  }
 -
 -  public void finish() {
 -    if (state.get().isDone()) {
 -      return;
 -    }
 -    state.set(FragmentInstanceState.FINISHED);
 -    this.endTime = System.currentTimeMillis();
 -  }
 -
 -  public void flushing() {
 -    if (state.get().isDone()) {
 -      return;
 -    }
 -    state.set(FragmentInstanceState.FLUSHING);
++  public void transitionToFlushing() {
++    stateMachine.transitionToFlushing();
+   }
+ 
    public long getEndTime() {
 -    return endTime;
 -  }
 -
 -  public void setEndTime(long endTime) {
 -    this.endTime = endTime;
 +    return executionEndTime.get();
    }
  }
diff --cc 
server/src/main/java/org/apache/iotdb/db/mpp/execution/FragmentInstanceExecution.java
index b00ddcadb3,21e0cc3d50..149bce1964
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/FragmentInstanceExecution.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/FragmentInstanceExecution.java
@@@ -18,17 -18,14 +18,15 @@@
   */
  package org.apache.iotdb.db.mpp.execution;
  
- import io.airlift.stats.CounterStat;
 +import org.apache.iotdb.db.mpp.buffer.ISinkHandle;
  import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
  import org.apache.iotdb.db.mpp.schedule.IFragmentInstanceScheduler;
  
  import com.google.common.collect.ImmutableList;
 -
 -import java.util.concurrent.atomic.AtomicReference;
++import io.airlift.stats.CounterStat;
  
  import static java.util.Objects.requireNonNull;
- import static org.apache.iotdb.db.mpp.execution.FragmentInstanceState.ABORTED;
- import static 
org.apache.iotdb.db.mpp.execution.FragmentInstanceState.CANCELLED;
 +import static org.apache.iotdb.db.mpp.execution.FragmentInstanceState.FAILED;
  
  public class FragmentInstanceExecution {
  
@@@ -45,18 -41,7 +43,20 @@@
  
    private long lastHeartbeat;
  
-   public static FragmentInstanceExecution 
createFragmentInstanceExecution(IFragmentInstanceScheduler scheduler,
-                                                                           
FragmentInstanceId instanceId,
-                                                                           
FragmentInstanceContext context,
-                                                                           
Driver driver,
-                                                                           
FragmentInstanceStateMachine stateMachine,
-                                                                           
CounterStat failedInstances) {
-     FragmentInstanceExecution execution = new 
FragmentInstanceExecution(scheduler, instanceId, context, driver, stateMachine);
 -  public FragmentInstanceExecution(
++  public static FragmentInstanceExecution createFragmentInstanceExecution(
++      IFragmentInstanceScheduler scheduler,
++      FragmentInstanceId instanceId,
++      FragmentInstanceContext context,
++      Driver driver,
++      FragmentInstanceStateMachine stateMachine,
++      CounterStat failedInstances) {
++    FragmentInstanceExecution execution =
++        new FragmentInstanceExecution(scheduler, instanceId, context, driver, 
stateMachine);
 +    execution.initialize(failedInstances, scheduler, instanceId, driver);
 +    return execution;
 +  }
 +
 +  private FragmentInstanceExecution(
        IFragmentInstanceScheduler scheduler,
        FragmentInstanceId instanceId,
        FragmentInstanceContext context,
@@@ -97,49 -82,7 +97,30 @@@
    }
  
    public void abort() {
 -    scheduler.abortFragmentInstance(instanceId);
 -    context.abort();
 +    stateMachine.abort();
 +  }
 +
 +  // this is a separate method to ensure that the `this` reference is not 
leaked during construction
-   private void initialize(CounterStat failedInstances, 
IFragmentInstanceScheduler scheduler, FragmentInstanceId instanceId, Driver 
driver) {
++  private void initialize(
++      CounterStat failedInstances,
++      IFragmentInstanceScheduler scheduler,
++      FragmentInstanceId instanceId,
++      Driver driver) {
 +    requireNonNull(failedInstances, "failedInstances is null");
-     stateMachine.addStateChangeListener(newState -> {
- 
-       if (!newState.isDone()) {
-         return;
-       }
- 
-       // Update failed tasks counter
-       if (newState == FAILED) {
-         failedInstances.update(1);
-       }
- 
-       driver.close();
-       sinkHandle.abort();
-       scheduler.abortFragmentInstance(instanceId);
-     });
-   }
- 
-   private synchronized void instanceCompletion() {
-     if (stateMachine.getState().isDone()) {
-       return;
-     }
- 
-     if (!sinkHandle.isFinished()) {
-       stateMachine.transitionToFlushing();
-       return;
-     }
- 
-     if (sinkHandle.isFinished()) {
-       // Cool! All done!
-       stateMachine.finished();
-       return;
-     }
- 
-     if (sinkHandle.isFailed()) {
-       Throwable failureCause = sinkHandle.getFailureCause()
-           .orElseGet(() -> new RuntimeException("Fragment Instance " + 
instanceId + " 's SinkHandle is failed but the failure cause is missing"));
-       stateMachine.failed(failureCause);
-     }
++    stateMachine.addStateChangeListener(
++        newState -> {
++          if (!newState.isDone()) {
++            return;
++          }
++
++          // Update failed tasks counter
++          if (newState == FAILED) {
++            failedInstances.update(1);
++          }
++
++          driver.close();
++          sinkHandle.abort();
++          scheduler.abortFragmentInstance(instanceId);
++        });
    }
  }
diff --cc 
server/src/main/java/org/apache/iotdb/db/mpp/execution/FragmentInstanceManager.java
index 5f49fa47ef,537f330f04..2e23cf03b4
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/FragmentInstanceManager.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/FragmentInstanceManager.java
@@@ -28,6 -27,6 +27,7 @@@ import org.apache.iotdb.db.mpp.schedule
  import org.apache.iotdb.db.mpp.sql.planner.LocalExecutionPlanner;
  import org.apache.iotdb.db.mpp.sql.planner.plan.FragmentInstance;
  
++import io.airlift.stats.CounterStat;
  import io.airlift.units.Duration;
  import org.slf4j.Logger;
  import org.slf4j.LoggerFactory;
@@@ -51,14 -49,9 +51,13 @@@ public class FragmentInstanceManager 
    private final IFragmentInstanceScheduler scheduler = 
FragmentInstanceScheduler.getInstance();
  
    private final ScheduledExecutorService instanceManagementExecutor;
 +  private final ExecutorService instanceNotificationExecutor;
  
- 
    private final Duration infoCacheTime;
  
 +  // record failed instances count
 +  private final CounterStat failedInstances = new CounterStat();
 +
    public static FragmentInstanceManager getInstance() {
      return FragmentInstanceManager.InstanceHolder.INSTANCE;
    }
@@@ -68,7 -61,6 +67,8 @@@
      this.instanceExecution = new ConcurrentHashMap<>();
      this.instanceManagementExecutor =
          IoTDBThreadPoolFactory.newScheduledThreadPool(1, 
"instance-management");
-     this.instanceNotificationExecutor = 
IoTDBThreadPoolFactory.newFixedThreadPool(4, "instance-notification");
++    this.instanceNotificationExecutor =
++        IoTDBThreadPoolFactory.newFixedThreadPool(4, "instance-notification");
  
      this.infoCacheTime = new Duration(15, TimeUnit.MINUTES);
  
@@@ -93,13 -85,13 +93,14 @@@
          instanceExecution.computeIfAbsent(
              instanceId,
              id -> {
- 
-               FragmentInstanceStateMachine stateMachine = new 
FragmentInstanceStateMachine(instanceId, instanceNotificationExecutor);
 -              AtomicReference<FragmentInstanceState> state = new 
AtomicReference<>();
 -              state.set(FragmentInstanceState.PLANNED);
++              FragmentInstanceStateMachine stateMachine =
++                  new FragmentInstanceStateMachine(instanceId, 
instanceNotificationExecutor);
  
                FragmentInstanceContext context =
                    instanceContext.computeIfAbsent(
                        instanceId,
-                       fragmentInstanceId -> new 
FragmentInstanceContext(fragmentInstanceId, stateMachine));
 -                      fragmentInstanceId -> new 
FragmentInstanceContext(fragmentInstanceId, state));
++                      fragmentInstanceId ->
++                          new FragmentInstanceContext(fragmentInstanceId, 
stateMachine));
  
                try {
                  DataDriver driver =
@@@ -108,9 -100,9 +109,10 @@@
                          context,
                          instance.getTimeFilter(),
                          dataRegion);
-                 return createFragmentInstanceExecution(scheduler, instanceId, 
context, driver, stateMachine, failedInstances);
 -                return new FragmentInstanceExecution(scheduler, instanceId, 
context, driver, state);
++                return createFragmentInstanceExecution(
++                    scheduler, instanceId, context, driver, stateMachine, 
failedInstances);
                } catch (Throwable t) {
 -                context.failed(t);
 +                stateMachine.failed(t);
                  return null;
                }
              });
@@@ -126,29 -118,26 +128,29 @@@
          instanceExecution.computeIfAbsent(
              instanceId,
              id -> {
-               FragmentInstanceStateMachine stateMachine = new 
FragmentInstanceStateMachine(instanceId, instanceNotificationExecutor);
- 
 -              AtomicReference<FragmentInstanceState> state = new 
AtomicReference<>();
 -              state.set(FragmentInstanceState.PLANNED);
++              FragmentInstanceStateMachine stateMachine =
++                  new FragmentInstanceStateMachine(instanceId, 
instanceNotificationExecutor);
  
                FragmentInstanceContext context =
                    instanceContext.computeIfAbsent(
                        instanceId,
-                       fragmentInstanceId -> new 
FragmentInstanceContext(fragmentInstanceId, stateMachine));
 -                      fragmentInstanceId -> new 
FragmentInstanceContext(fragmentInstanceId, state));
++                      fragmentInstanceId ->
++                          new FragmentInstanceContext(fragmentInstanceId, 
stateMachine));
  
                try {
                  SchemaDriver driver =
                      planner.plan(instance.getFragment().getRoot(), context, 
schemaRegion);
-                 return createFragmentInstanceExecution(scheduler, instanceId, 
context, driver, stateMachine, failedInstances);
 -                return new FragmentInstanceExecution(scheduler, instanceId, 
context, driver, state);
++                return createFragmentInstanceExecution(
++                    scheduler, instanceId, context, driver, stateMachine, 
failedInstances);
                } catch (Throwable t) {
 -                context.failed(t);
 +                stateMachine.failed(t);
                  return null;
                }
              });
      return execution != null ? execution.getInstanceInfo() : 
createFailedInstanceInfo(instanceId);
    }
  
-   /**
-    * Aborts a FragmentInstance.
-    */
++  /** Aborts a FragmentInstance. */
    public FragmentInstanceInfo abortFragmentInstance(FragmentInstanceId 
fragmentInstanceId) {
      FragmentInstanceExecution execution = 
instanceExecution.remove(fragmentInstanceId);
      if (execution != null) {
@@@ -159,21 -148,6 +161,19 @@@
      return null;
    }
  
-   /**
-    * Cancels a FragmentInstance.
-    */
++  /** Cancels a FragmentInstance. */
 +  public FragmentInstanceInfo cancelTask(FragmentInstanceId instanceId) {
 +    requireNonNull(instanceId, "taskId is null");
 +
 +    FragmentInstanceExecution execution = 
instanceExecution.remove(instanceId);
 +    if (execution != null) {
 +      instanceContext.remove(instanceId);
 +      execution.cancel();
 +      return execution.getInstanceInfo();
 +    }
 +    return null;
 +  }
 +
    /**
     * Gets the info for the specified fragment instance.
     *
diff --cc 
server/src/main/java/org/apache/iotdb/db/mpp/execution/FragmentInstanceStateMachine.java
index 7f66d1a321,0000000000..febc9e9e09
mode 100644,000000..100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/FragmentInstanceStateMachine.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/FragmentInstanceStateMachine.java
@@@ -1,171 -1,0 +1,182 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing,
 + * software distributed under the License is distributed on an
 + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 + * KIND, either express or implied.  See the License for the
 + * specific language governing permissions and limitations
 + * under the License.
 + */
 +package org.apache.iotdb.db.mpp.execution;
 +
++import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
++import org.apache.iotdb.db.mpp.execution.StateMachine.StateChangeListener;
++
 +import com.google.common.collect.ImmutableList;
 +import com.google.common.collect.ImmutableMap;
 +import com.google.common.util.concurrent.ListenableFuture;
- import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
- import org.apache.iotdb.db.mpp.execution.StateMachine.StateChangeListener;
 +
 +import javax.annotation.concurrent.GuardedBy;
 +import javax.annotation.concurrent.ThreadSafe;
++
 +import java.util.ArrayList;
 +import java.util.HashMap;
 +import java.util.List;
 +import java.util.Map;
 +import java.util.concurrent.Executor;
 +import java.util.concurrent.LinkedBlockingQueue;
 +
 +import static com.google.common.base.MoreObjects.toStringHelper;
 +import static com.google.common.base.Preconditions.checkArgument;
 +import static com.google.common.util.concurrent.Futures.immediateFuture;
 +import static java.util.Objects.requireNonNull;
 +import static org.apache.iotdb.db.mpp.execution.FragmentInstanceState.ABORTED;
 +import static 
org.apache.iotdb.db.mpp.execution.FragmentInstanceState.CANCELLED;
 +import static org.apache.iotdb.db.mpp.execution.FragmentInstanceState.FAILED;
 +import static 
org.apache.iotdb.db.mpp.execution.FragmentInstanceState.FINISHED;
 +import static 
org.apache.iotdb.db.mpp.execution.FragmentInstanceState.FLUSHING;
 +import static org.apache.iotdb.db.mpp.execution.FragmentInstanceState.RUNNING;
 +import static 
org.apache.iotdb.db.mpp.execution.FragmentInstanceState.TERMINAL_INSTANCE_STATES;
 +
- 
 +@ThreadSafe
 +public class FragmentInstanceStateMachine {
 +  private static final Logger LOGGER = 
LoggerFactory.getLogger(FragmentInstanceStateMachine.class);
 +
 +  private final long createdTime = System.currentTimeMillis();
 +
 +  private final FragmentInstanceId instanceId;
 +  private final Executor executor;
 +  private final StateMachine<FragmentInstanceState> instanceState;
 +  private final LinkedBlockingQueue<Throwable> failureCauses = new 
LinkedBlockingQueue<>();
 +
 +  @GuardedBy("this")
 +  private final Map<FragmentInstanceId, Throwable> sourceInstanceFailures = 
new HashMap<>();
++
 +  @GuardedBy("this")
-   private final List<FragmentInstanceFailureListener> 
sourceInstanceFailureListeners = new ArrayList<>();
++  private final List<FragmentInstanceFailureListener> 
sourceInstanceFailureListeners =
++      new ArrayList<>();
 +
 +  public FragmentInstanceStateMachine(FragmentInstanceId fragmentInstanceId, 
Executor executor) {
 +    this.instanceId = requireNonNull(fragmentInstanceId, "fragmentInstanceId 
is null");
 +    this.executor = requireNonNull(executor, "executor is null");
-     instanceState = new StateMachine<>("FragmentInstance " + 
fragmentInstanceId, executor, RUNNING, TERMINAL_INSTANCE_STATES);
-     instanceState.addStateChangeListener(newState -> LOGGER.debug("Fragment 
Instance {} is {}", fragmentInstanceId, newState));
++    instanceState =
++        new StateMachine<>(
++            "FragmentInstance " + fragmentInstanceId, executor, RUNNING, 
TERMINAL_INSTANCE_STATES);
++    instanceState.addStateChangeListener(
++        newState -> LOGGER.debug("Fragment Instance {} is {}", 
fragmentInstanceId, newState));
 +  }
 +
 +  public long getCreatedTime() {
 +    return createdTime;
 +  }
 +
 +  public FragmentInstanceId getFragmentInstanceId() {
 +    return instanceId;
 +  }
 +
 +  public FragmentInstanceState getState() {
 +    return instanceState.get();
 +  }
 +
-   public ListenableFuture<FragmentInstanceState> 
getStateChange(FragmentInstanceState currentState) {
++  public ListenableFuture<FragmentInstanceState> getStateChange(
++      FragmentInstanceState currentState) {
 +    requireNonNull(currentState, "currentState is null");
 +    checkArgument(!currentState.isDone(), "Current state is already done");
 +
 +    ListenableFuture<FragmentInstanceState> future = 
instanceState.getStateChange(currentState);
 +    FragmentInstanceState state = instanceState.get();
 +    if (state.isDone()) {
 +      return immediateFuture(state);
 +    }
 +    return future;
 +  }
 +
 +  public LinkedBlockingQueue<Throwable> getFailureCauses() {
 +    return failureCauses;
 +  }
 +
 +  public void transitionToFlushing() {
 +    instanceState.setIf(FLUSHING, currentState -> currentState == RUNNING);
 +  }
 +
 +  public void finished() {
 +    transitionToDoneState(FINISHED);
 +  }
 +
 +  public void cancel() {
 +    transitionToDoneState(CANCELLED);
 +  }
 +
 +  public void abort() {
 +    transitionToDoneState(ABORTED);
 +  }
 +
 +  public void failed(Throwable cause) {
 +    failureCauses.add(cause);
 +    transitionToDoneState(FAILED);
 +  }
 +
 +  private void transitionToDoneState(FragmentInstanceState doneState) {
 +    requireNonNull(doneState, "doneState is null");
 +    checkArgument(doneState.isDone(), "doneState %s is not a done state", 
doneState);
 +
 +    instanceState.setIf(doneState, currentState -> !currentState.isDone());
 +  }
 +
 +  /**
-    * Listener is always notified asynchronously using a dedicated 
notification thread pool so, care should
-    * be taken to avoid leaking {@code this} when adding a listener in a 
constructor. Additionally, it is
-    * possible notifications are observed out of order due to the asynchronous 
execution.
++   * Listener is always notified asynchronously using a dedicated 
notification thread pool so, care
++   * should be taken to avoid leaking {@code this} when adding a listener in 
a constructor.
++   * Additionally, it is possible notifications are observed out of order due 
to the asynchronous
++   * execution.
 +   */
-   public void 
addStateChangeListener(StateChangeListener<FragmentInstanceState> 
stateChangeListener) {
++  public void addStateChangeListener(
++      StateChangeListener<FragmentInstanceState> stateChangeListener) {
 +    instanceState.addStateChangeListener(stateChangeListener);
 +  }
 +
 +  public void addSourceTaskFailureListener(FragmentInstanceFailureListener 
listener) {
 +    Map<FragmentInstanceId, Throwable> failures;
 +    synchronized (this) {
 +      sourceInstanceFailureListeners.add(listener);
 +      failures = ImmutableMap.copyOf(sourceInstanceFailures);
 +    }
-     executor.execute(() -> {
-       failures.forEach(listener::onTaskFailed);
-     });
++    executor.execute(
++        () -> {
++          failures.forEach(listener::onTaskFailed);
++        });
 +  }
 +
 +  public void sourceTaskFailed(FragmentInstanceId instanceId, Throwable 
failure) {
 +    List<FragmentInstanceFailureListener> listeners;
 +    synchronized (this) {
 +      sourceInstanceFailures.putIfAbsent(instanceId, failure);
 +      listeners = ImmutableList.copyOf(sourceInstanceFailureListeners);
 +    }
-     executor.execute(() -> {
-       for (FragmentInstanceFailureListener listener : listeners) {
-         listener.onTaskFailed(instanceId, failure);
-       }
-     });
++    executor.execute(
++        () -> {
++          for (FragmentInstanceFailureListener listener : listeners) {
++            listener.onTaskFailed(instanceId, failure);
++          }
++        });
 +  }
 +
 +  @Override
 +  public String toString() {
 +    return toStringHelper(this)
 +        .add("FragmentInstanceId", instanceId)
 +        .add("FragmentInstanceState", instanceState)
 +        .add("failureCauses", failureCauses)
 +        .toString();
 +  }
 +}
diff --cc 
server/src/main/java/org/apache/iotdb/db/mpp/execution/SchemaDriver.java
index 6c45eb529a,844fe417ba..258065cd80
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/SchemaDriver.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/SchemaDriver.java
@@@ -18,23 -18,152 +18,24 @@@
   */
  package org.apache.iotdb.db.mpp.execution;
  
- import com.google.common.util.concurrent.SettableFuture;
  import org.apache.iotdb.db.mpp.buffer.ISinkHandle;
 -import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
  import org.apache.iotdb.db.mpp.operator.Operator;
 -import org.apache.iotdb.tsfile.read.common.block.TsBlock;
  
 -import com.google.common.util.concurrent.ListenableFuture;
+ import com.google.common.util.concurrent.SettableFuture;
 -import io.airlift.units.Duration;
 -import org.slf4j.Logger;
 -import org.slf4j.LoggerFactory;
+ 
  import javax.annotation.concurrent.NotThreadSafe;
  
 -import java.io.IOException;
 -import java.util.Collections;
 -import java.util.concurrent.TimeUnit;
 -import java.util.concurrent.atomic.AtomicReference;
 -
 -import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
 -import static org.apache.iotdb.db.mpp.operator.Operator.NOT_BLOCKED;
 -
+ /** One SchemaDriver is used to execute one FragmentInstance which is for 
metadata query. */
  @NotThreadSafe
 -public class SchemaDriver implements Driver {
 -
 -  private static final Logger logger = 
LoggerFactory.getLogger(SchemaDriver.class);
 -
 -  private final Operator root;
 -  private final ISinkHandle sinkHandle;
 -  private final SchemaDriverContext driverContext;
 -
 -  private final AtomicReference<SettableFuture<Void>> driverBlockedFuture = 
new AtomicReference<>();
 -
 -  private boolean closed = false;
 +public class SchemaDriver extends Driver {
  
    public SchemaDriver(Operator root, ISinkHandle sinkHandle, 
SchemaDriverContext driverContext) {
 -    this.root = root;
 -    this.sinkHandle = sinkHandle;
 -    this.driverContext = driverContext;
 -    // initially the driverBlockedFuture is not blocked (it is completed)
 -    SettableFuture<Void> future = SettableFuture.create();
 -    future.set(null);
 -    driverBlockedFuture.set(future);
 -  }
 -
 -  @Override
 -  public boolean isFinished() {
 -    try {
 -      boolean isFinished = driverBlockedFuture.get().isDone() && root != null 
&& root.isFinished();
 -      if (isFinished) {
 -        close();
 -      }
 -      return isFinished;
 -    } catch (Throwable t) {
 -      logger.error(
 -          "Failed to query whether the schema driver {} is finished", 
driverContext.getId(), t);
 -      driverContext.failed(t);
 -      return true;
 -    }
 -  }
 -
 -  @Override
 -  public ListenableFuture<Void> processFor(Duration duration) {
 -    // if the driver is blocked we don't need to continue
 -    SettableFuture<Void> blockedFuture = driverBlockedFuture.get();
 -    if (!blockedFuture.isDone()) {
 -      return blockedFuture;
 -    }
 -
 -    long maxRuntime = duration.roundTo(TimeUnit.NANOSECONDS);
 -
 -    long start = System.nanoTime();
 -    try {
 -      do {
 -        ListenableFuture<Void> future = processInternal();
 -        if (!future.isDone()) {
 -          return updateDriverBlockedFuture(future);
 -        }
 -      } while (System.nanoTime() - start < maxRuntime && !root.isFinished());
 -    } catch (Throwable t) {
 -      logger.error("Failed to execute fragment instance {}", 
driverContext.getId(), t);
 -      driverContext.failed(t);
 -      close();
 -      blockedFuture.setException(t);
 -      return blockedFuture;
 -    }
 -    return NOT_BLOCKED;
 -  }
 -
 -  private ListenableFuture<Void> processInternal() throws IOException {
 -    ListenableFuture<Void> blocked = root.isBlocked();
 -    if (!blocked.isDone()) {
 -      return blocked;
 -    }
 -    blocked = sinkHandle.isFull();
 -    if (!blocked.isDone()) {
 -      return blocked;
 -    }
 -    if (root.hasNext()) {
 -      TsBlock tsBlock = root.next();
 -      if (tsBlock != null && !tsBlock.isEmpty()) {
 -        sinkHandle.send(Collections.singletonList(tsBlock));
 -      }
 -    }
 -    return NOT_BLOCKED;
 -  }
 -
 -  private ListenableFuture<Void> updateDriverBlockedFuture(
 -      ListenableFuture<Void> sourceBlockedFuture) {
 -    // driverBlockedFuture will be completed as soon as the 
sourceBlockedFuture is completed
 -    // or any of the operators gets a memory revocation request
 -    SettableFuture<Void> newDriverBlockedFuture = SettableFuture.create();
 -    driverBlockedFuture.set(newDriverBlockedFuture);
 -    sourceBlockedFuture.addListener(() -> newDriverBlockedFuture.set(null), 
directExecutor());
 -
 -    // TODO Although we don't have memory management for operator now, we 
should consider it for
 -    // future
 -    // it's possible that memory revoking is requested for some operator
 -    // before we update driverBlockedFuture above and we don't want to miss 
that
 -    // notification, so we check to see whether that's the case before 
returning.
 -
 -    return newDriverBlockedFuture;
 -  }
 -
 -  @Override
 -  public FragmentInstanceId getInfo() {
 -    return driverContext.getId();
 +    super(root, sinkHandle, driverContext);
    }
  
- 
    @Override
 -  public void close() {
 -    if (closed) {
 -      return;
 -    }
 -    closed = true;
 -    try {
 -      if (root != null) {
 -        root.close();
 -      }
 -      if (sinkHandle != null) {
 -        sinkHandle.close();
 -      }
 -    } catch (Throwable t) {
 -      logger.error("Failed to closed driver {}", driverContext.getId(), t);
 -      driverContext.failed(t);
 -    }
 +  protected boolean init(SettableFuture<Void> blockedFuture) {
 +    return true;
    }
  
    @Override
diff --cc server/src/main/java/org/apache/iotdb/db/mpp/operator/Operator.java
index 7305feee83,c8cccc0520..398b2f03c4
--- a/server/src/main/java/org/apache/iotdb/db/mpp/operator/Operator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/operator/Operator.java
@@@ -22,8 -22,8 +22,6 @@@ import org.apache.iotdb.tsfile.read.com
  
  import com.google.common.util.concurrent.ListenableFuture;
  
--import java.io.IOException;
--
  import static com.google.common.util.concurrent.Futures.immediateVoidFuture;
  
  public interface Operator extends AutoCloseable {
diff --cc 
server/src/main/java/org/apache/iotdb/db/mpp/operator/schema/SchemaFetchOperator.java
index 97da0a4935,003da76f2d..34fb44373e
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/operator/schema/SchemaFetchOperator.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/operator/schema/SchemaFetchOperator.java
@@@ -35,7 -35,7 +35,6 @@@ import org.apache.iotdb.tsfile.utils.Bi
  import org.slf4j.Logger;
  import org.slf4j.LoggerFactory;
  
--import java.io.IOException;
  import java.nio.BufferOverflowException;
  import java.nio.ByteBuffer;
  import java.util.List;
diff --cc 
server/src/main/java/org/apache/iotdb/db/mpp/schedule/task/FragmentInstanceTask.java
index 888f4ce5e7,abebdaf30d..b66d4f2ff1
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/schedule/task/FragmentInstanceTask.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/schedule/task/FragmentInstanceTask.java
@@@ -18,8 -18,6 +18,7 @@@
   */
  package org.apache.iotdb.db.mpp.schedule.task;
  
- import com.google.common.util.concurrent.SettableFuture;
 +import org.apache.iotdb.db.mpp.buffer.ISinkHandle;
  import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
  import org.apache.iotdb.db.mpp.common.PlanFragmentId;
  import org.apache.iotdb.db.mpp.common.QueryId;
@@@ -32,6 -28,6 +29,7 @@@ import org.apache.iotdb.db.mpp.schedule
  import org.apache.iotdb.db.mpp.schedule.queue.IDIndexedAccessible;
  
  import com.google.common.util.concurrent.ListenableFuture;
++import com.google.common.util.concurrent.SettableFuture;
  import io.airlift.units.Duration;
  
  import java.util.Comparator;
@@@ -194,16 -196,6 +202,14 @@@ public class FragmentInstanceTask imple
        return false;
      }
  
 +    @Override
 +    protected boolean init(SettableFuture<Void> blockedFuture) {
 +      return true;
 +    }
 +
 +    @Override
-     protected void releaseResource() {
- 
-     }
++    protected void releaseResource() {}
 +
      @Override
      public ListenableFuture<Void> processFor(Duration duration) {
        return null;
diff --cc 
server/src/test/java/org/apache/iotdb/db/mpp/execution/DataDriverTest.java
index db060ac405,2703d0aa37..d41c7f5391
--- a/server/src/test/java/org/apache/iotdb/db/mpp/execution/DataDriverTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/DataDriverTest.java
@@@ -56,8 -55,7 +56,7 @@@ import java.util.Arrays
  import java.util.HashSet;
  import java.util.List;
  import java.util.Set;
 -import java.util.concurrent.atomic.AtomicReference;
 +import java.util.concurrent.ExecutorService;
- import java.util.concurrent.atomic.AtomicReference;
  
  import static 
org.apache.iotdb.db.mpp.schedule.FragmentInstanceTaskExecutor.EXECUTION_TIME_SLICE;
  import static org.junit.Assert.assertEquals;
@@@ -87,7 -85,6 +86,8 @@@ public class DataDriverTest 
  
    @Test
    public void batchTest() {
-     ExecutorService instanceNotificationExecutor = 
IoTDBThreadPoolFactory.newFixedThreadPool(1, "test-instance-notification");
++    ExecutorService instanceNotificationExecutor =
++        IoTDBThreadPoolFactory.newFixedThreadPool(1, 
"test-instance-notification");
      try {
        MeasurementPath measurementPath1 =
            new MeasurementPath(DATA_DRIVER_TEST_SG + ".device0.sensor0", 
TSDataType.INT32);
@@@ -95,10 -92,11 +95,12 @@@
        allSensors.add("sensor0");
        allSensors.add("sensor1");
        QueryId queryId = new QueryId("stub_query");
-       FragmentInstanceId instanceId = new FragmentInstanceId(new 
PlanFragmentId(queryId, 0), "stub-instance");
-       FragmentInstanceStateMachine stateMachine = new 
FragmentInstanceStateMachine(instanceId, instanceNotificationExecutor);
 -      AtomicReference<FragmentInstanceState> state =
 -          new AtomicReference<>(FragmentInstanceState.RUNNING);
++      FragmentInstanceId instanceId =
++          new FragmentInstanceId(new PlanFragmentId(queryId, 0), 
"stub-instance");
++      FragmentInstanceStateMachine stateMachine =
++          new FragmentInstanceStateMachine(instanceId, 
instanceNotificationExecutor);
        FragmentInstanceContext fragmentInstanceContext =
 -          new FragmentInstanceContext(
 -              new FragmentInstanceId(new PlanFragmentId(queryId, 0), 
"stub-instance"), state);
 +          new FragmentInstanceContext(instanceId, stateMachine);
        PlanNodeId planNodeId1 = new PlanNodeId("1");
        fragmentInstanceContext.addOperatorContext(
            1, planNodeId1, SeriesScanOperator.class.getSimpleName());
diff --cc 
server/src/test/java/org/apache/iotdb/db/mpp/operator/LimitOperatorTest.java
index ab0849ddf2,6ad74bd110..17a45db8da
--- 
a/server/src/test/java/org/apache/iotdb/db/mpp/operator/LimitOperatorTest.java
+++ 
b/server/src/test/java/org/apache/iotdb/db/mpp/operator/LimitOperatorTest.java
@@@ -28,8 -27,7 +28,7 @@@ import org.apache.iotdb.db.mpp.common.F
  import org.apache.iotdb.db.mpp.common.PlanFragmentId;
  import org.apache.iotdb.db.mpp.common.QueryId;
  import org.apache.iotdb.db.mpp.execution.FragmentInstanceContext;
--import org.apache.iotdb.db.mpp.execution.FragmentInstanceState;
 +import org.apache.iotdb.db.mpp.execution.FragmentInstanceStateMachine;
  import org.apache.iotdb.db.mpp.operator.process.LimitOperator;
  import org.apache.iotdb.db.mpp.operator.process.TimeJoinOperator;
  import org.apache.iotdb.db.mpp.operator.source.SeriesScanOperator;
@@@ -52,8 -50,7 +51,7 @@@ import java.util.Arrays
  import java.util.HashSet;
  import java.util.List;
  import java.util.Set;
 -import java.util.concurrent.atomic.AtomicReference;
 +import java.util.concurrent.ExecutorService;
- import java.util.concurrent.atomic.AtomicReference;
  
  import static org.junit.Assert.assertEquals;
  import static org.junit.Assert.assertTrue;
@@@ -81,7 -78,6 +79,8 @@@ public class LimitOperatorTest 
  
    @Test
    public void batchTest() {
-     ExecutorService instanceNotificationExecutor = 
IoTDBThreadPoolFactory.newFixedThreadPool(1, "test-instance-notification");
++    ExecutorService instanceNotificationExecutor =
++        IoTDBThreadPoolFactory.newFixedThreadPool(1, 
"test-instance-notification");
      try {
        MeasurementPath measurementPath1 =
            new MeasurementPath(TIME_JOIN_OPERATOR_TEST_SG + 
".device0.sensor0", TSDataType.INT32);
@@@ -89,10 -85,11 +88,12 @@@
        allSensors.add("sensor0");
        allSensors.add("sensor1");
        QueryId queryId = new QueryId("stub_query");
-       FragmentInstanceId instanceId = new FragmentInstanceId(new 
PlanFragmentId(queryId, 0), "stub-instance");
-       FragmentInstanceStateMachine stateMachine = new 
FragmentInstanceStateMachine(instanceId, instanceNotificationExecutor);
 -      AtomicReference<FragmentInstanceState> state =
 -          new AtomicReference<>(FragmentInstanceState.RUNNING);
++      FragmentInstanceId instanceId =
++          new FragmentInstanceId(new PlanFragmentId(queryId, 0), 
"stub-instance");
++      FragmentInstanceStateMachine stateMachine =
++          new FragmentInstanceStateMachine(instanceId, 
instanceNotificationExecutor);
        FragmentInstanceContext fragmentInstanceContext =
 -          new FragmentInstanceContext(
 -              new FragmentInstanceId(new PlanFragmentId(queryId, 0), 
"stub-instance"), state);
 +          new FragmentInstanceContext(instanceId, stateMachine);
        PlanNodeId planNodeId1 = new PlanNodeId("1");
        fragmentInstanceContext.addOperatorContext(
            1, planNodeId1, SeriesScanOperator.class.getSimpleName());
diff --cc 
server/src/test/java/org/apache/iotdb/db/mpp/operator/SeriesScanOperatorTest.java
index dd28a90263,78e6821bcd..52ebe68508
--- 
a/server/src/test/java/org/apache/iotdb/db/mpp/operator/SeriesScanOperatorTest.java
+++ 
b/server/src/test/java/org/apache/iotdb/db/mpp/operator/SeriesScanOperatorTest.java
@@@ -28,8 -27,7 +28,7 @@@ import org.apache.iotdb.db.mpp.common.F
  import org.apache.iotdb.db.mpp.common.PlanFragmentId;
  import org.apache.iotdb.db.mpp.common.QueryId;
  import org.apache.iotdb.db.mpp.execution.FragmentInstanceContext;
--import org.apache.iotdb.db.mpp.execution.FragmentInstanceState;
 +import org.apache.iotdb.db.mpp.execution.FragmentInstanceStateMachine;
  import org.apache.iotdb.db.mpp.operator.source.SeriesScanOperator;
  import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
  import org.apache.iotdb.db.query.reader.series.SeriesReaderTestUtil;
@@@ -45,11 -44,9 +45,9 @@@ import org.junit.Test
  
  import java.io.IOException;
  import java.util.ArrayList;
- import java.util.HashSet;
  import java.util.List;
  import java.util.Set;
 -import java.util.concurrent.atomic.AtomicReference;
 +import java.util.concurrent.ExecutorService;
- import java.util.concurrent.atomic.AtomicReference;
  
  import static org.junit.Assert.assertEquals;
  import static org.junit.Assert.assertTrue;
@@@ -76,17 -73,16 +74,19 @@@ public class SeriesScanOperatorTest 
  
    @Test
    public void batchTest() {
-     ExecutorService instanceNotificationExecutor = 
IoTDBThreadPoolFactory.newFixedThreadPool(1, "test-instance-notification");
++    ExecutorService instanceNotificationExecutor =
++        IoTDBThreadPoolFactory.newFixedThreadPool(1, 
"test-instance-notification");
      try {
        MeasurementPath measurementPath =
            new MeasurementPath(SERIES_SCAN_OPERATOR_TEST_SG + 
".device0.sensor0", TSDataType.INT32);
-       Set<String> allSensors = new HashSet<>();
-       allSensors.add("sensor0");
+       Set<String> allSensors = Sets.newHashSet("sensor0");
        QueryId queryId = new QueryId("stub_query");
-       FragmentInstanceId instanceId = new FragmentInstanceId(new 
PlanFragmentId(queryId, 0), "stub-instance");
-       FragmentInstanceStateMachine stateMachine = new 
FragmentInstanceStateMachine(instanceId, instanceNotificationExecutor);
 -      AtomicReference<FragmentInstanceState> state =
 -          new AtomicReference<>(FragmentInstanceState.RUNNING);
++      FragmentInstanceId instanceId =
++          new FragmentInstanceId(new PlanFragmentId(queryId, 0), 
"stub-instance");
++      FragmentInstanceStateMachine stateMachine =
++          new FragmentInstanceStateMachine(instanceId, 
instanceNotificationExecutor);
        FragmentInstanceContext fragmentInstanceContext =
 -          new FragmentInstanceContext(
 -              new FragmentInstanceId(new PlanFragmentId(queryId, 0), 
"stub-instance"), state);
 +          new FragmentInstanceContext(instanceId, stateMachine);
        PlanNodeId planNodeId = new PlanNodeId("1");
        fragmentInstanceContext.addOperatorContext(
            1, planNodeId, SeriesScanOperator.class.getSimpleName());
diff --cc 
server/src/test/java/org/apache/iotdb/db/mpp/operator/TimeJoinOperatorTest.java
index 70fe4def02,5534418b84..5fbfda4199
--- 
a/server/src/test/java/org/apache/iotdb/db/mpp/operator/TimeJoinOperatorTest.java
+++ 
b/server/src/test/java/org/apache/iotdb/db/mpp/operator/TimeJoinOperatorTest.java
@@@ -28,8 -27,7 +28,7 @@@ import org.apache.iotdb.db.mpp.common.F
  import org.apache.iotdb.db.mpp.common.PlanFragmentId;
  import org.apache.iotdb.db.mpp.common.QueryId;
  import org.apache.iotdb.db.mpp.execution.FragmentInstanceContext;
--import org.apache.iotdb.db.mpp.execution.FragmentInstanceState;
 +import org.apache.iotdb.db.mpp.execution.FragmentInstanceStateMachine;
  import org.apache.iotdb.db.mpp.operator.process.TimeJoinOperator;
  import org.apache.iotdb.db.mpp.operator.source.SeriesScanOperator;
  import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
@@@ -51,8 -49,7 +50,7 @@@ import java.util.Arrays
  import java.util.HashSet;
  import java.util.List;
  import java.util.Set;
 -import java.util.concurrent.atomic.AtomicReference;
 +import java.util.concurrent.ExecutorService;
- import java.util.concurrent.atomic.AtomicReference;
  
  import static org.junit.Assert.*;
  
@@@ -77,7 -74,6 +75,8 @@@ public class TimeJoinOperatorTest 
  
    @Test
    public void batchTest() {
-     ExecutorService instanceNotificationExecutor = 
IoTDBThreadPoolFactory.newFixedThreadPool(1, "test-instance-notification");
++    ExecutorService instanceNotificationExecutor =
++        IoTDBThreadPoolFactory.newFixedThreadPool(1, 
"test-instance-notification");
      try {
        MeasurementPath measurementPath1 =
            new MeasurementPath(TIME_JOIN_OPERATOR_TEST_SG + 
".device0.sensor0", TSDataType.INT32);
@@@ -85,10 -81,11 +84,12 @@@
        allSensors.add("sensor0");
        allSensors.add("sensor1");
        QueryId queryId = new QueryId("stub_query");
-       FragmentInstanceId instanceId = new FragmentInstanceId(new 
PlanFragmentId(queryId, 0), "stub-instance");
-       FragmentInstanceStateMachine stateMachine = new 
FragmentInstanceStateMachine(instanceId, instanceNotificationExecutor);
 -      AtomicReference<FragmentInstanceState> state =
 -          new AtomicReference<>(FragmentInstanceState.RUNNING);
++      FragmentInstanceId instanceId =
++          new FragmentInstanceId(new PlanFragmentId(queryId, 0), 
"stub-instance");
++      FragmentInstanceStateMachine stateMachine =
++          new FragmentInstanceStateMachine(instanceId, 
instanceNotificationExecutor);
        FragmentInstanceContext fragmentInstanceContext =
 -          new FragmentInstanceContext(
 -              new FragmentInstanceId(new PlanFragmentId(queryId, 0), 
"stub-instance"), state);
 +          new FragmentInstanceContext(instanceId, stateMachine);
        PlanNodeId planNodeId1 = new PlanNodeId("1");
        fragmentInstanceContext.addOperatorContext(
            1, planNodeId1, SeriesScanOperator.class.getSimpleName());
diff --cc 
server/src/test/java/org/apache/iotdb/db/mpp/operator/schema/SchemaScanOperatorTest.java
index 320be3be21,609dc9befa..958b0a5b72
--- 
a/server/src/test/java/org/apache/iotdb/db/mpp/operator/schema/SchemaScanOperatorTest.java
+++ 
b/server/src/test/java/org/apache/iotdb/db/mpp/operator/schema/SchemaScanOperatorTest.java
@@@ -29,8 -28,7 +29,7 @@@ import org.apache.iotdb.db.mpp.common.F
  import org.apache.iotdb.db.mpp.common.PlanFragmentId;
  import org.apache.iotdb.db.mpp.common.QueryId;
  import org.apache.iotdb.db.mpp.execution.FragmentInstanceContext;
--import org.apache.iotdb.db.mpp.execution.FragmentInstanceState;
 +import org.apache.iotdb.db.mpp.execution.FragmentInstanceStateMachine;
  import org.apache.iotdb.db.mpp.execution.SchemaDriverContext;
  import org.apache.iotdb.db.mpp.operator.OperatorContext;
  import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
@@@ -54,8 -52,7 +53,7 @@@ import java.io.IOException
  import java.util.ArrayList;
  import java.util.Arrays;
  import java.util.List;
 -import java.util.concurrent.atomic.AtomicReference;
 +import java.util.concurrent.ExecutorService;
- import java.util.concurrent.atomic.AtomicReference;
  
  import static org.apache.iotdb.commons.conf.IoTDBConstant.COLUMN_ATTRIBUTES;
  import static org.apache.iotdb.commons.conf.IoTDBConstant.COLUMN_DEVICES;
@@@ -93,13 -90,13 +91,16 @@@ public class SchemaScanOperatorTest 
  
    @Test
    public void testDeviceMetaScanOperator() {
-     ExecutorService instanceNotificationExecutor = 
IoTDBThreadPoolFactory.newFixedThreadPool(1, "test-instance-notification");
++    ExecutorService instanceNotificationExecutor =
++        IoTDBThreadPoolFactory.newFixedThreadPool(1, 
"test-instance-notification");
      try {
        QueryId queryId = new QueryId("stub_query");
-       FragmentInstanceId instanceId = new FragmentInstanceId(new 
PlanFragmentId(queryId, 0), "stub-instance");
-       FragmentInstanceStateMachine stateMachine = new 
FragmentInstanceStateMachine(instanceId, instanceNotificationExecutor);
 -      AtomicReference<FragmentInstanceState> state =
 -          new AtomicReference<>(FragmentInstanceState.RUNNING);
++      FragmentInstanceId instanceId =
++          new FragmentInstanceId(new PlanFragmentId(queryId, 0), 
"stub-instance");
++      FragmentInstanceStateMachine stateMachine =
++          new FragmentInstanceStateMachine(instanceId, 
instanceNotificationExecutor);
        FragmentInstanceContext fragmentInstanceContext =
 -          new FragmentInstanceContext(
 -              new FragmentInstanceId(new PlanFragmentId(queryId, 0), 
"stub-instance"), state);
 +          new FragmentInstanceContext(instanceId, stateMachine);
        PlanNodeId planNodeId = queryId.genPlanNodeId();
        OperatorContext operatorContext =
            fragmentInstanceContext.addOperatorContext(
@@@ -159,13 -154,13 +160,16 @@@
  
    @Test
    public void testTimeSeriesMetaScanOperator() {
-     ExecutorService instanceNotificationExecutor = 
IoTDBThreadPoolFactory.newFixedThreadPool(1, "test-instance-notification");
++    ExecutorService instanceNotificationExecutor =
++        IoTDBThreadPoolFactory.newFixedThreadPool(1, 
"test-instance-notification");
      try {
        QueryId queryId = new QueryId("stub_query");
-       FragmentInstanceId instanceId = new FragmentInstanceId(new 
PlanFragmentId(queryId, 0), "stub-instance");
-       FragmentInstanceStateMachine stateMachine = new 
FragmentInstanceStateMachine(instanceId, instanceNotificationExecutor);
 -      AtomicReference<FragmentInstanceState> state =
 -          new AtomicReference<>(FragmentInstanceState.RUNNING);
++      FragmentInstanceId instanceId =
++          new FragmentInstanceId(new PlanFragmentId(queryId, 0), 
"stub-instance");
++      FragmentInstanceStateMachine stateMachine =
++          new FragmentInstanceStateMachine(instanceId, 
instanceNotificationExecutor);
        FragmentInstanceContext fragmentInstanceContext =
 -          new FragmentInstanceContext(
 -              new FragmentInstanceId(new PlanFragmentId(queryId, 0), 
"stub-instance"), state);
 +          new FragmentInstanceContext(instanceId, stateMachine);
        PlanNodeId planNodeId = queryId.genPlanNodeId();
        OperatorContext operatorContext =
            fragmentInstanceContext.addOperatorContext(

Reply via email to