This is an automated email from the ASF dual-hosted git repository.

xingtanzjr pushed a commit to branch xingtanzjr/mpp_issues
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/xingtanzjr/mpp_issues by this 
push:
     new a10332bc38 fix the issue that deadlock may occur in QueryTerminator
a10332bc38 is described below

commit a10332bc38c63e330ddd88d909740c101cf02458
Author: Jinrui.Zhang <[email protected]>
AuthorDate: Tue Apr 19 16:35:56 2022 +0800

    fix the issue that deadlock may occur in QueryTerminator
---
 .../apache/iotdb/db/mpp/execution/Coordinator.java |  4 +-
 .../iotdb/db/mpp/execution/QueryExecution.java     |  3 ++
 .../mpp/execution/scheduler/ClusterScheduler.java  |  4 +-
 .../mpp/execution/scheduler/IQueryTerminator.java  |  4 +-
 .../execution/scheduler/SimpleQueryTerminator.java | 52 ++++++++++------------
 5 files changed, 35 insertions(+), 32 deletions(-)

diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/Coordinator.java 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/Coordinator.java
index ba2c602855..c9393c6be7 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/Coordinator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/Coordinator.java
@@ -48,9 +48,9 @@ public class Coordinator {
   private static final Logger LOG = LoggerFactory.getLogger(Coordinator.class);
 
   private static final String COORDINATOR_EXECUTOR_NAME = "MPPCoordinator";
-  private static final int COORDINATOR_EXECUTOR_SIZE = 20;
+  private static final int COORDINATOR_EXECUTOR_SIZE = 1;
   private static final String COORDINATOR_SCHEDULED_EXECUTOR_NAME = 
"MPPCoordinatorScheduled";
-  private static final int COORDINATOR_SCHEDULED_EXECUTOR_SIZE = 2;
+  private static final int COORDINATOR_SCHEDULED_EXECUTOR_SIZE = 1;
 
   private static final Endpoint LOCAL_HOST =
       new Endpoint(
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryExecution.java 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryExecution.java
index 2b2ea1f762..0a1528422a 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryExecution.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryExecution.java
@@ -261,13 +261,16 @@ public class QueryExecution implements IQueryExecution {
     SettableFuture<QueryState> future = SettableFuture.create();
     stateMachine.addStateChangeListener(
         state -> {
+          LOG.info("[QueryExecution {}]:  wait status callback invoked", 
context.getQueryId());
           if (state == QueryState.RUNNING || state.isDone()) {
             future.set(state);
           }
         });
 
     try {
+      LOG.info("[QueryExecution {}]:  start to wait status", 
context.getQueryId());
       QueryState state = future.get();
+      LOG.info("[QueryExecution {}]:  status got", context.getQueryId());
       // TODO: (xingtanzjr) use more TSStatusCode if the QueryState isn't 
FINISHED
       TSStatusCode statusCode =
           // For WRITE, the state should be FINISHED; For READ, the state 
could be RUNNING
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/ClusterScheduler.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/ClusterScheduler.java
index 694a82738f..1f3ddd87a3 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/ClusterScheduler.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/ClusterScheduler.java
@@ -95,7 +95,7 @@ public class ClusterScheduler implements IScheduler {
     try {
       LOGGER.info("[{}] wait dispatch to be finished", 
queryContext.getQueryId());
       FragInstanceDispatchResult result = dispatchResultFuture.get();
-      LOGGER.info("[{}] dispatch finished: ", result.isSuccessful());
+      LOGGER.info("[{}] dispatch finished: {}", queryContext.getQueryId(), 
result.isSuccessful());
       if (!result.isSuccessful()) {
         stateMachine.transitionToFailed(new IllegalStateException("Fragment 
cannot be dispatched"));
         return;
@@ -108,7 +108,9 @@ public class ClusterScheduler implements IScheduler {
 
     // For the FragmentInstance of WRITE, it will be executed directly when 
dispatching.
     if (queryType == QueryType.WRITE) {
+      LOGGER.info("[{}] prepare to transition WRITE to finished", 
queryContext.getQueryId());
       stateMachine.transitionToFinished();
+      LOGGER.info("[{}] transition done", queryContext.getQueryId());
       return;
     }
 
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/IQueryTerminator.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/IQueryTerminator.java
index 25ff71363b..29a1e27cfd 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/IQueryTerminator.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/IQueryTerminator.java
@@ -19,6 +19,8 @@
 
 package org.apache.iotdb.db.mpp.execution.scheduler;
 
+import java.util.concurrent.Future;
+
 public interface IQueryTerminator {
-  boolean terminate();
+  Future<Boolean> terminate();
 }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/SimpleQueryTerminator.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/SimpleQueryTerminator.java
index 728966daea..236a118449 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/SimpleQueryTerminator.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/SimpleQueryTerminator.java
@@ -27,15 +27,16 @@ import org.apache.iotdb.mpp.rpc.thrift.InternalService;
 import org.apache.iotdb.mpp.rpc.thrift.TCancelQueryReq;
 
 import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.util.List;
-import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
 import java.util.stream.Collectors;
 
 public class SimpleQueryTerminator implements IQueryTerminator {
-
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(SimpleQueryTerminator.class);
   private final ExecutorService executor;
   private final QueryId queryId;
   private final List<FragmentInstance> fragmentInstances;
@@ -48,33 +49,28 @@ public class SimpleQueryTerminator implements 
IQueryTerminator {
   }
 
   @Override
-  public boolean terminate() {
+  public Future<Boolean> terminate() {
+    LOGGER.info("[{}] start to submit terminate command", queryId);
     List<Endpoint> relatedHost = getRelatedHost(fragmentInstances);
-    Future<Boolean> future =
-        executor.submit(
-            () -> {
-              try {
-                for (Endpoint endpoint : relatedHost) {
-                  // TODO (jackie tien) change the port
-                  InternalService.Iface client =
-                      InternalServiceClientFactory.getInternalServiceClient(
-                          new Endpoint(
-                              endpoint.getIp(),
-                              
IoTDBDescriptor.getInstance().getConfig().getInternalPort()));
-                  client.cancelQuery(new TCancelQueryReq(queryId.getId()));
-                }
-              } catch (TException e) {
-                return false;
-              }
-              return true;
-            });
-    try {
-      return future.get();
-    } catch (InterruptedException | ExecutionException e) {
-      // TODO: (xingtanzjr) Record the error info with logger
-      Thread.currentThread().interrupt();
-      return false;
-    }
+
+    return executor.submit(
+        () -> {
+          try {
+            for (Endpoint endpoint : relatedHost) {
+              // TODO (jackie tien) change the port
+              InternalService.Iface client =
+                  InternalServiceClientFactory.getInternalServiceClient(
+                      new Endpoint(
+                          endpoint.getIp(),
+                          
IoTDBDescriptor.getInstance().getConfig().getInternalPort()));
+              client.cancelQuery(new TCancelQueryReq(queryId.getId()));
+              LOGGER.info("[{}] cancel query from DataNode[{}]", queryId, 
endpoint.getIp());
+            }
+          } catch (TException e) {
+            return false;
+          }
+          return true;
+        });
   }
 
   private List<Endpoint> getRelatedHost(List<FragmentInstance> instances) {

Reply via email to