This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 38473d1a00 Fix the issue that sometimes the FragmentInstance may stuck 
in FLUSHING states (#5808)
38473d1a00 is described below

commit 38473d1a0077b59300c3bc552b744ae721309bdf
Author: Zhang.Jinrui <[email protected]>
AuthorDate: Fri May 6 12:45:42 2022 +0800

    Fix the issue that sometimes the FragmentInstance may stuck in FLUSHING 
states (#5808)
---
 .../execution/datatransfer/DataBlockManager.java   | 19 ++++++++-------
 .../mpp/execution/datatransfer/SourceHandle.java   |  4 ++--
 .../db/mpp/plan/analyze/ClusterSchemaFetcher.java  |  4 ++--
 .../db/mpp/plan/execution/QueryExecution.java      |  4 +++-
 .../mpp/plan/scheduler/SimpleQueryTerminator.java  | 28 ++++++++++++++++------
 .../service/thrift/impl/InternalServiceImpl.java   | 12 ++++++----
 thrift/src/main/thrift/mpp.thrift                  |  1 +
 7 files changed, 48 insertions(+), 24 deletions(-)

diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/datatransfer/DataBlockManager.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/datatransfer/DataBlockManager.java
index c889f18f6d..8cfda3c40b 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/datatransfer/DataBlockManager.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/datatransfer/DataBlockManager.java
@@ -106,10 +106,10 @@ public class DataBlockManager implements 
IDataBlockManager {
           e.getEndSequenceId(),
           e.getSourceFragmentInstanceId());
       if (!sinkHandles.containsKey(e.getSourceFragmentInstanceId())) {
-        throw new TException(
-            "Source fragment instance not found. Fragment instance ID: "
-                + e.getSourceFragmentInstanceId()
-                + ".");
+        logger.warn(
+            "received ACK event but target FragmentInstance[{}] is not found.",
+            e.getSourceFragmentInstanceId());
+        return;
       }
       ((SinkHandle) sinkHandles.get(e.getSourceFragmentInstanceId()))
           .acknowledgeTsBlock(e.getStartSequenceId(), e.getEndSequenceId());
@@ -130,10 +130,13 @@ public class DataBlockManager implements 
IDataBlockManager {
               .get(e.getTargetFragmentInstanceId())
               .get(e.getTargetPlanNodeId())
               .isAborted()) {
-        throw new TException(
-            "Target fragment instance not found. Fragment instance ID: "
-                + e.getTargetFragmentInstanceId()
-                + ".");
+        // In some scenario, when the SourceHandle sends the data block ACK 
event, its upstream may
+        // have already been stopped. For example, in the query whit 
LimitOperator, the downstream
+        // FragmentInstance may be finished, although the upstream is still 
working.
+        logger.warn(
+            "received NewDataBlockEvent but the upstream FragmentInstance[{}] 
is not found",
+            e.getTargetFragmentInstanceId());
+        return;
       }
 
       SourceHandle sourceHandle =
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/datatransfer/SourceHandle.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/datatransfer/SourceHandle.java
index 1c8bf70880..2102e98bfb 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/datatransfer/SourceHandle.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/datatransfer/SourceHandle.java
@@ -320,6 +320,8 @@ public class SourceHandle implements ISourceHandle {
             TsBlock tsBlock = serde.deserialize(byteBuffer);
             tsBlocks.add(tsBlock);
           }
+          executorService.submit(
+              new SendAcknowledgeDataBlockEventTask(startSequenceId, 
endSequenceId));
           synchronized (SourceHandle.this) {
             if (aborted) {
               return;
@@ -331,8 +333,6 @@ public class SourceHandle implements ISourceHandle {
               blocked.set(null);
             }
           }
-          executorService.submit(
-              new SendAcknowledgeDataBlockEventTask(startSequenceId, 
endSequenceId));
           break;
         } catch (Throwable e) {
           logger.error(
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/ClusterSchemaFetcher.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/ClusterSchemaFetcher.java
index 08d8257560..275501eaf7 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/ClusterSchemaFetcher.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/ClusterSchemaFetcher.java
@@ -107,6 +107,8 @@ public class ClusterSchemaFetcher implements ISchemaFetcher 
{
     }
     SchemaTree result = new SchemaTree();
     while (coordinator.getQueryExecution(queryId).hasNextResult()) {
+      // The query will be transited to FINISHED when invoking 
getBatchResult() at the last time
+      // So we don't need to clean up it manually
       Optional<TsBlock> tsBlock = 
coordinator.getQueryExecution(queryId).getBatchResult();
       if (!tsBlock.isPresent()) {
         break;
@@ -121,8 +123,6 @@ public class ClusterSchemaFetcher implements ISchemaFetcher 
{
         result.mergeSchemaTree(fetchedSchemaTree);
       }
     }
-    // TODO: (xingtanzjr) need to release this query's resource here. This is 
a temporary way
-    coordinator.getQueryExecution(queryId).stopAndCleanup();
     return result;
   }
 
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java
index 763ca26a82..7f6078087f 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java
@@ -238,7 +238,9 @@ public class QueryExecution implements IQueryExecution {
       ListenableFuture<Void> blocked = resultHandle.isBlocked();
       blocked.get();
       if (resultHandle.isFinished()) {
-        releaseResource();
+        // Once the resultHandle is finished, we should transit the state of 
this query to FINISHED.
+        // So that the corresponding cleanup work could be triggered.
+        stateMachine.transitionToFinished();
         return Optional.empty();
       }
       return Optional.of(resultHandle.receive());
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/SimpleQueryTerminator.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/SimpleQueryTerminator.java
index df6f08f7c2..357c26fcf5 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/SimpleQueryTerminator.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/SimpleQueryTerminator.java
@@ -25,6 +25,7 @@ import 
org.apache.iotdb.commons.client.sync.SyncDataNodeInternalServiceClient;
 import org.apache.iotdb.db.mpp.common.QueryId;
 import org.apache.iotdb.db.mpp.plan.planner.plan.FragmentInstance;
 import org.apache.iotdb.mpp.rpc.thrift.TCancelQueryReq;
+import org.apache.iotdb.mpp.rpc.thrift.TFragmentInstanceId;
 
 import org.apache.thrift.TException;
 import org.slf4j.Logger;
@@ -37,7 +38,8 @@ import java.util.concurrent.Future;
 import java.util.stream.Collectors;
 
 public class SimpleQueryTerminator implements IQueryTerminator {
-  private static final Logger LOGGER = 
LoggerFactory.getLogger(SimpleQueryTerminator.class);
+  private static final Logger logger = 
LoggerFactory.getLogger(SimpleQueryTerminator.class);
+  private static final long TERMINATION_GRACE_PERIOD_IN_MS = 1000L;
   private final ExecutorService executor;
   private final QueryId queryId;
   private final List<FragmentInstance> fragmentInstances;
@@ -58,17 +60,22 @@ public class SimpleQueryTerminator implements 
IQueryTerminator {
 
   @Override
   public Future<Boolean> terminate() {
-    List<TEndPoint> relatedHost = getRelatedHost(fragmentInstances);
-
+    List<TEndPoint> relatedHost = getRelatedHost();
+    try {
+      Thread.sleep(TERMINATION_GRACE_PERIOD_IN_MS);
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+    }
     return executor.submit(
         () -> {
           for (TEndPoint endPoint : relatedHost) {
             // TODO (jackie tien) change the port
             try (SyncDataNodeInternalServiceClient client =
                 internalServiceClientManager.borrowClient(endPoint)) {
-              client.cancelQuery(new TCancelQueryReq(queryId.getId()));
+              client.cancelQuery(
+                  new TCancelQueryReq(queryId.getId(), 
getRelatedFragmentInstances(endPoint)));
             } catch (IOException e) {
-              LOGGER.error("can't connect to node {}", endPoint, e);
+              logger.error("can't connect to node {}", endPoint, e);
               return false;
             } catch (TException e) {
               return false;
@@ -78,10 +85,17 @@ public class SimpleQueryTerminator implements 
IQueryTerminator {
         });
   }
 
-  private List<TEndPoint> getRelatedHost(List<FragmentInstance> instances) {
-    return instances.stream()
+  private List<TEndPoint> getRelatedHost() {
+    return fragmentInstances.stream()
         .map(instance -> instance.getHostDataNode().internalEndPoint)
         .distinct()
         .collect(Collectors.toList());
   }
+
+  private List<TFragmentInstanceId> getRelatedFragmentInstances(TEndPoint 
endPoint) {
+    return fragmentInstances.stream()
+        .filter(instance -> 
instance.getHostDataNode().internalEndPoint.equals(endPoint))
+        .map(instance -> instance.getId().toThrift())
+        .collect(Collectors.toList());
+  }
 }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/InternalServiceImpl.java
 
b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/InternalServiceImpl.java
index f8b5778611..e2a1fff0c3 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/InternalServiceImpl.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/InternalServiceImpl.java
@@ -72,6 +72,7 @@ import org.slf4j.LoggerFactory;
 
 import java.util.ArrayList;
 import java.util.List;
+import java.util.stream.Collectors;
 
 public class InternalServiceImpl implements InternalService.Iface {
 
@@ -132,11 +133,14 @@ public class InternalServiceImpl implements 
InternalService.Iface {
 
   @Override
   public TCancelResp cancelQuery(TCancelQueryReq req) throws TException {
-
-    // TODO need to be implemented and currently in order not to print 
NotImplementedException log,
-    // we simply return null
+    List<FragmentInstanceId> taskIds =
+        req.getFragmentInstanceIds().stream()
+            .map(FragmentInstanceId::fromThrift)
+            .collect(Collectors.toList());
+    for (FragmentInstanceId taskId : taskIds) {
+      FragmentInstanceManager.getInstance().cancelTask(taskId);
+    }
     return new TCancelResp(true);
-    //    throw new NotImplementedException();
   }
 
   @Override
diff --git a/thrift/src/main/thrift/mpp.thrift 
b/thrift/src/main/thrift/mpp.thrift
index 0c2acf6cb2..1d33d58cc4 100644
--- a/thrift/src/main/thrift/mpp.thrift
+++ b/thrift/src/main/thrift/mpp.thrift
@@ -105,6 +105,7 @@ struct TFragmentInstanceStateResp {
 
 struct TCancelQueryReq {
   1: required string queryId
+  2: required list<TFragmentInstanceId> fragmentInstanceIds
 }
 
 struct TCancelPlanFragmentReq {

Reply via email to