This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 89ab2805a5 [IOTDB-3155] Optimize the schedule of MPP framework (#5863)
89ab2805a5 is described below
commit 89ab2805a57af39acbce08d3c8208377bec3f0d8
Author: Zhang.Jinrui <[email protected]>
AuthorDate: Wed May 11 19:38:00 2022 +0800
[IOTDB-3155] Optimize the schedule of MPP framework (#5863)
---
.../mpp/FragmentInstanceDispatchException.java} | 22 +--
.../apache/iotdb/db/mpp/common/PlanFragmentId.java | 2 +-
.../db/mpp/common/schematree/PathPatternTree.java | 1 +
.../execution/fragment/FragmentInstanceInfo.java | 11 +-
.../fragment/FragmentInstanceManager.java | 2 +-
.../org/apache/iotdb/db/mpp/plan/Coordinator.java | 11 ++
.../db/mpp/plan/execution/QueryExecution.java | 6 +-
.../node/metedata/read/SchemaFetchScanNode.java | 1 +
.../planner/plan/node/process/ExchangeNode.java | 2 +-
.../db/mpp/plan/scheduler/ClusterScheduler.java | 6 +-
.../scheduler/FragmentInstanceDispatcherImpl.java | 191 +++++++++++++++++++++
.../iotdb/db/mpp/plan/plan/QueryPlannerTest.java | 1 +
12 files changed, 232 insertions(+), 24 deletions(-)
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceInfo.java
b/server/src/main/java/org/apache/iotdb/db/exception/mpp/FragmentInstanceDispatchException.java
similarity index 62%
copy from
server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceInfo.java
copy to
server/src/main/java/org/apache/iotdb/db/exception/mpp/FragmentInstanceDispatchException.java
index 42d915b42b..8f32997d6f 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceInfo.java
+++
b/server/src/main/java/org/apache/iotdb/db/exception/mpp/FragmentInstanceDispatchException.java
@@ -16,25 +16,11 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.db.mpp.execution.fragment;
-import org.apache.iotdb.consensus.common.DataSet;
+package org.apache.iotdb.db.exception.mpp;
-public class FragmentInstanceInfo implements DataSet {
- private final FragmentInstanceState state;
-
- private final long endTime;
-
- public FragmentInstanceInfo(FragmentInstanceState state, long endTime) {
- this.state = state;
- this.endTime = endTime;
- }
-
- public FragmentInstanceState getState() {
- return state;
- }
-
- public long getEndTime() {
- return endTime;
+public class FragmentInstanceDispatchException extends Exception {
+ public FragmentInstanceDispatchException(Throwable t) {
+ super(t);
}
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/common/PlanFragmentId.java
b/server/src/main/java/org/apache/iotdb/db/mpp/common/PlanFragmentId.java
index 01cde7ef0b..9a7983e401 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/common/PlanFragmentId.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/common/PlanFragmentId.java
@@ -94,6 +94,6 @@ public class PlanFragmentId {
@Override
public int hashCode() {
- return Objects.hash(queryId, id, nextFragmentInstanceId);
+ return Objects.hash(queryId, id);
}
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/common/schematree/PathPatternTree.java
b/server/src/main/java/org/apache/iotdb/db/mpp/common/schematree/PathPatternTree.java
index 4002204237..d7e70aea7e 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/common/schematree/PathPatternTree.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/common/schematree/PathPatternTree.java
@@ -48,6 +48,7 @@ public class PathPatternTree {
public PathPatternTree(PathPatternNode root) {
this.root = root;
+ this.pathList = new ArrayList<>();
}
public PathPatternTree(PartialPath devicePath, String[] measurements) {
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceInfo.java
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceInfo.java
index 42d915b42b..585c2845c5 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceInfo.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceInfo.java
@@ -22,7 +22,7 @@ import org.apache.iotdb.consensus.common.DataSet;
public class FragmentInstanceInfo implements DataSet {
private final FragmentInstanceState state;
-
+ private String message;
private final long endTime;
public FragmentInstanceInfo(FragmentInstanceState state, long endTime) {
@@ -30,6 +30,11 @@ public class FragmentInstanceInfo implements DataSet {
this.endTime = endTime;
}
+ public FragmentInstanceInfo(FragmentInstanceState state, long endTime,
String message) {
+ this(state, endTime);
+ this.message = message;
+ }
+
public FragmentInstanceState getState() {
return state;
}
@@ -37,4 +42,8 @@ public class FragmentInstanceInfo implements DataSet {
public long getEndTime() {
return endTime;
}
+
+ public String getMessage() {
+ return message;
+ }
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceManager.java
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceManager.java
index dbbed705c5..29d41fd934 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceManager.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceManager.java
@@ -116,6 +116,7 @@ public class FragmentInstanceManager {
return createFragmentInstanceExecution(
scheduler, instanceId, context, driver, stateMachine,
failedInstances);
} catch (Throwable t) {
+ logger.error("error when create FragmentInstanceExecution.",
t);
stateMachine.failed(t);
return null;
}
@@ -127,7 +128,6 @@ public class FragmentInstanceManager {
public FragmentInstanceInfo execSchemaQueryFragmentInstance(
FragmentInstance instance, ISchemaRegion schemaRegion) {
FragmentInstanceId instanceId = instance.getId();
-
FragmentInstanceExecution execution =
instanceExecution.computeIfAbsent(
instanceId,
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/Coordinator.java
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/Coordinator.java
index 23c417862c..c6f6c4a2b1 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/Coordinator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/Coordinator.java
@@ -53,6 +53,8 @@ public class Coordinator {
private static final String COORDINATOR_EXECUTOR_NAME = "MPPCoordinator";
private static final int COORDINATOR_EXECUTOR_SIZE = 10;
+ private static final String COORDINATOR_WRITE_EXECUTOR_NAME =
"MPPCoordinatorWrite";
+ private static final int COORDINATOR_WRITE_EXECUTOR_SIZE = 10;
private static final String COORDINATOR_SCHEDULED_EXECUTOR_NAME =
"MPPCoordinatorScheduled";
private static final int COORDINATOR_SCHEDULED_EXECUTOR_SIZE = 1;
@@ -73,6 +75,7 @@ public class Coordinator {
new
DataNodeClientPoolFactory.SyncDataNodeInternalServiceClientPoolFactory());
private final ExecutorService executor;
+ private final ExecutorService writeOperationExecutor;
private final ScheduledExecutorService scheduledExecutor;
private static final Coordinator INSTANCE = new Coordinator();
@@ -82,6 +85,7 @@ public class Coordinator {
private Coordinator() {
this.queryExecutionMap = new ConcurrentHashMap<>();
this.executor = getQueryExecutor();
+ this.writeOperationExecutor = getWriteExecutor();
this.scheduledExecutor = getScheduledExecutor();
}
@@ -98,6 +102,7 @@ public class Coordinator {
statement,
queryContext,
executor,
+ writeOperationExecutor,
scheduledExecutor,
partitionFetcher,
schemaFetcher,
@@ -138,6 +143,12 @@ public class Coordinator {
return IoTDBThreadPoolFactory.newFixedThreadPool(
COORDINATOR_EXECUTOR_SIZE, COORDINATOR_EXECUTOR_NAME);
}
+
+ private ExecutorService getWriteExecutor() {
+ return IoTDBThreadPoolFactory.newFixedThreadPool(
+ COORDINATOR_WRITE_EXECUTOR_SIZE, COORDINATOR_WRITE_EXECUTOR_NAME);
+ }
+
// TODO: (xingtanzjr) need to redo once we have a concrete policy for the
threadPool management
private ScheduledExecutorService getScheduledExecutor() {
return IoTDBThreadPoolFactory.newScheduledThreadPool(
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java
index 9f4b2b9d9d..0e5da23ad1 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java
@@ -86,10 +86,11 @@ public class QueryExecution implements IQueryExecution {
private DistributedQueryPlan distributedPlan;
private final ExecutorService executor;
+ private final ExecutorService writeOperationExecutor;
private final ScheduledExecutorService scheduledExecutor;
// TODO need to use factory to decide standalone or cluster
private final IPartitionFetcher partitionFetcher;
- // TODO need to use factory to decide standalone or cluster
+ // TODO need to use factory to decide standalone or cluster,
private final ISchemaFetcher schemaFetcher;
// The result of QueryExecution will be written to the DataBlockManager in
current Node.
@@ -103,11 +104,13 @@ public class QueryExecution implements IQueryExecution {
Statement statement,
MPPQueryContext context,
ExecutorService executor,
+ ExecutorService writeOperationExecutor,
ScheduledExecutorService scheduledExecutor,
IPartitionFetcher partitionFetcher,
ISchemaFetcher schemaFetcher,
IClientManager<TEndPoint, SyncDataNodeInternalServiceClient>
internalServiceClientManager) {
this.executor = executor;
+ this.writeOperationExecutor = writeOperationExecutor;
this.scheduledExecutor = scheduledExecutor;
this.context = context;
this.planOptimizers = new ArrayList<>();
@@ -176,6 +179,7 @@ public class QueryExecution implements IQueryExecution {
distributedPlan.getInstances(),
context.getQueryType(),
executor,
+ writeOperationExecutor,
scheduledExecutor,
internalServiceClientManager)
: new StandaloneScheduler(
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/metedata/read/SchemaFetchScanNode.java
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/metedata/read/SchemaFetchScanNode.java
index 3a360954dd..7f0f0e9310 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/metedata/read/SchemaFetchScanNode.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/metedata/read/SchemaFetchScanNode.java
@@ -47,6 +47,7 @@ public class SchemaFetchScanNode extends SourceNode {
super(id);
this.storageGroup = storageGroup;
this.patternTree = patternTree;
+ this.patternTree.constructTree();
}
public PartialPath getStorageGroup() {
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/ExchangeNode.java
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/ExchangeNode.java
index db68bb9624..fe110473c9 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/ExchangeNode.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/ExchangeNode.java
@@ -129,7 +129,6 @@ public class ExchangeNode extends PlanNode {
ReadWriteIOUtils.write(upstreamEndpoint.getPort(), byteBuffer);
upstreamInstanceId.serialize(byteBuffer);
upstreamPlanNodeId.serialize(byteBuffer);
- List<String> outputColumnNames = remoteSourceNode.getOutputColumnNames();
ReadWriteIOUtils.write(outputColumnNames.size(), byteBuffer);
for (String outputColumnName : outputColumnNames) {
ReadWriteIOUtils.write(outputColumnName, byteBuffer);
@@ -165,6 +164,7 @@ public class ExchangeNode extends PlanNode {
public void setRemoteSourceNode(FragmentSinkNode remoteSourceNode) {
this.remoteSourceNode = remoteSourceNode;
+ this.setOutputColumnNames(remoteSourceNode.getOutputColumnNames());
}
public void cleanChildren() {
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/ClusterScheduler.java
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/ClusterScheduler.java
index ced23a70ac..dd2ca46271 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/ClusterScheduler.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/ClusterScheduler.java
@@ -58,6 +58,7 @@ public class ClusterScheduler implements IScheduler {
private List<FragmentInstance> instances;
private ExecutorService executor;
+ private ExecutorService writeOperationExecutor;
private ScheduledExecutorService scheduledExecutor;
private IFragInstanceDispatcher dispatcher;
@@ -70,6 +71,7 @@ public class ClusterScheduler implements IScheduler {
List<FragmentInstance> instances,
QueryType queryType,
ExecutorService executor,
+ ExecutorService writeOperationExecutor,
ScheduledExecutorService scheduledExecutor,
IClientManager<TEndPoint, SyncDataNodeInternalServiceClient>
internalServiceClientManager) {
this.queryContext = queryContext;
@@ -78,7 +80,9 @@ public class ClusterScheduler implements IScheduler {
this.queryType = queryType;
this.executor = executor;
this.scheduledExecutor = scheduledExecutor;
- this.dispatcher = new SimpleFragInstanceDispatcher(executor,
internalServiceClientManager);
+ this.dispatcher =
+ new FragmentInstanceDispatcherImpl(
+ queryType, executor, writeOperationExecutor,
internalServiceClientManager);
this.stateTracker =
new FixedRateFragInsStateTracker(
stateMachine, executor, scheduledExecutor, instances,
internalServiceClientManager);
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FragmentInstanceDispatcherImpl.java
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FragmentInstanceDispatcherImpl.java
new file mode 100644
index 0000000000..8922de59e3
--- /dev/null
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FragmentInstanceDispatcherImpl.java
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.plan.scheduler;
+
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
+import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.commons.client.IClientManager;
+import org.apache.iotdb.commons.client.sync.SyncDataNodeInternalServiceClient;
+import org.apache.iotdb.commons.consensus.ConsensusGroupId;
+import org.apache.iotdb.consensus.common.response.ConsensusWriteResponse;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.consensus.ConsensusImpl;
+import org.apache.iotdb.db.exception.mpp.FragmentInstanceDispatchException;
+import org.apache.iotdb.db.exception.sql.SemanticException;
+import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceInfo;
+import org.apache.iotdb.db.mpp.plan.analyze.QueryType;
+import org.apache.iotdb.db.mpp.plan.analyze.SchemaValidator;
+import org.apache.iotdb.db.mpp.plan.planner.plan.FragmentInstance;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertNode;
+import org.apache.iotdb.mpp.rpc.thrift.TFragmentInstance;
+import org.apache.iotdb.mpp.rpc.thrift.TSendFragmentInstanceReq;
+import org.apache.iotdb.mpp.rpc.thrift.TSendFragmentInstanceResp;
+import org.apache.iotdb.rpc.TSStatusCode;
+
+import com.google.common.util.concurrent.SettableFuture;
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+
+public class FragmentInstanceDispatcherImpl implements IFragInstanceDispatcher
{
+
+ private static final Logger logger =
+ LoggerFactory.getLogger(FragmentInstanceDispatcherImpl.class);
+ private final ExecutorService executor;
+ private final ExecutorService writeOperationExecutor;
+ private final QueryType type;
+ private final String localhostIpAddr;
+ private final IClientManager<TEndPoint, SyncDataNodeInternalServiceClient>
+ internalServiceClientManager;
+
+ public FragmentInstanceDispatcherImpl(
+ QueryType type,
+ ExecutorService executor,
+ ExecutorService writeOperationExecutor,
+ IClientManager<TEndPoint, SyncDataNodeInternalServiceClient>
internalServiceClientManager) {
+ this.type = type;
+ this.executor = executor;
+ this.writeOperationExecutor = writeOperationExecutor;
+ this.internalServiceClientManager = internalServiceClientManager;
+ this.localhostIpAddr =
IoTDBDescriptor.getInstance().getConfig().getInternalIp();
+ }
+
+ @Override
+ public Future<FragInstanceDispatchResult> dispatch(List<FragmentInstance>
instances) {
+ if (type == QueryType.READ) {
+ return dispatchRead(instances);
+ } else {
+ return dispatchWrite(instances);
+ }
+ }
+
+ // TODO: (xingtanzjr) currently we use a sequential dispatch policy for
READ, which is
+ // unsafe for current FragmentInstance scheduler framework. We need to
implement the
+ // topological dispatch according to dependency relations between
FragmentInstances
+ private Future<FragInstanceDispatchResult>
dispatchRead(List<FragmentInstance> instances) {
+ return executor.submit(
+ () -> {
+ for (FragmentInstance instance : instances) {
+ boolean accepted = dispatchOneInstance(instance);
+ if (!accepted) {
+ return new FragInstanceDispatchResult(false);
+ }
+ }
+ return new FragInstanceDispatchResult(true);
+ });
+ }
+
+ // TODO: (xingtanzjr) Return the detailed write states for each
FragmentInstance
+ private Future<FragInstanceDispatchResult>
dispatchWrite(List<FragmentInstance> instances) {
+ List<Future<Boolean>> futures = new LinkedList<>();
+ for (FragmentInstance instance : instances) {
+ futures.add(writeOperationExecutor.submit(() ->
dispatchOneInstance(instance)));
+ }
+ SettableFuture<FragInstanceDispatchResult> resultFuture =
SettableFuture.create();
+ for (Future<Boolean> future : futures) {
+ try {
+ Boolean success = future.get();
+ if (!success) {
+ resultFuture.set(new FragInstanceDispatchResult(false));
+ break;
+ }
+ } catch (ExecutionException | InterruptedException e) {
+ if (e instanceof InterruptedException) {
+ Thread.currentThread().interrupt();
+ }
+ resultFuture.setException(e);
+ break;
+ }
+ }
+ resultFuture.set(new FragInstanceDispatchResult(true));
+ return resultFuture;
+ }
+
+ private boolean dispatchOneInstance(FragmentInstance instance)
+ throws FragmentInstanceDispatchException {
+ TEndPoint endPoint = instance.getHostDataNode().getInternalEndPoint();
+ if (isDispatchedToLocal(endPoint)) {
+ return dispatchLocally(instance);
+ } else {
+ return dispatchRemote(instance, endPoint);
+ }
+ }
+
+ private boolean isDispatchedToLocal(TEndPoint endPoint) {
+ return this.localhostIpAddr.equals(endPoint.getIp());
+ }
+
+ private boolean dispatchRemote(FragmentInstance instance, TEndPoint endPoint)
+ throws FragmentInstanceDispatchException {
+ try (SyncDataNodeInternalServiceClient client =
+ internalServiceClientManager.borrowClient(endPoint)) {
+ ByteBuffer buffer = ByteBuffer.allocate(1024 * 1024);
+ instance.serializeRequest(buffer);
+ buffer.flip();
+ TConsensusGroupId groupId = instance.getRegionReplicaSet().getRegionId();
+ TSendFragmentInstanceReq req =
+ new TSendFragmentInstanceReq(
+ new TFragmentInstance(buffer), groupId,
instance.getType().toString());
+ TSendFragmentInstanceResp resp = client.sendFragmentInstance(req);
+ return resp.accepted;
+ } catch (IOException | TException e) {
+ logger.error("can't connect to node {}", endPoint, e);
+ throw new FragmentInstanceDispatchException(e);
+ }
+ }
+
+ private boolean dispatchLocally(FragmentInstance instance)
+ throws FragmentInstanceDispatchException {
+ ConsensusGroupId groupId =
+ ConsensusGroupId.Factory.createFromTConsensusGroupId(
+ instance.getRegionReplicaSet().getRegionId());
+ switch (instance.getType()) {
+ case READ:
+ FragmentInstanceInfo info =
+ (FragmentInstanceInfo) ConsensusImpl.getInstance().read(groupId,
instance).getDataset();
+ return !info.getState().isFailed();
+ case WRITE:
+ PlanNode planNode = instance.getFragment().getRoot();
+ if (planNode instanceof InsertNode) {
+ try {
+ SchemaValidator.validate((InsertNode) planNode);
+ } catch (SemanticException e) {
+ throw new FragmentInstanceDispatchException(e);
+ }
+ }
+ ConsensusWriteResponse resp =
ConsensusImpl.getInstance().write(groupId, instance);
+ return TSStatusCode.SUCCESS_STATUS.getStatusCode() ==
resp.getStatus().getCode();
+ }
+ throw new UnsupportedOperationException(
+ String.format("unknown query type [%s]", instance.getType()));
+ }
+
+ @Override
+ public void abort() {}
+}
diff --git
a/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/QueryPlannerTest.java
b/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/QueryPlannerTest.java
index c6694843a0..48b4137bb2 100644
---
a/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/QueryPlannerTest.java
+++
b/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/QueryPlannerTest.java
@@ -78,6 +78,7 @@ public class QueryPlannerTest {
new TEndPoint(),
new TEndPoint()),
IoTDBThreadPoolFactory.newSingleThreadExecutor("test_query"),
+
IoTDBThreadPoolFactory.newSingleThreadExecutor("test_write_operation"),
IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor("test_query_scheduled"),
new FakePartitionFetcherImpl(),
new FakeSchemaFetcherImpl(),