This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch IOTDB-5434
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 388f5637708ad17d3eb5bdcf508750f507c28ae8
Author: JackieTien97 <[email protected]>
AuthorDate: Sun Jan 29 11:13:24 2023 +0800

    [IOTDB-5434] Fix occasional timeout error in CI
---
 .../execution/exchange/MPPDataExchangeManager.java | 92 ++++++++--------------
 1 file changed, 31 insertions(+), 61 deletions(-)

diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/MPPDataExchangeManager.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/MPPDataExchangeManager.java
index b3d89c0340..f51c4b77cb 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/MPPDataExchangeManager.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/MPPDataExchangeManager.java
@@ -43,9 +43,6 @@ import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Optional;
@@ -170,18 +167,15 @@ public class MPPDataExchangeManager implements 
IMPPDataExchangeManager {
             e.getTargetPlanNodeId(),
             e.getTargetFragmentInstanceId(),
             e.getSourceFragmentInstanceId());
-        if (!sourceHandles.containsKey(e.getTargetFragmentInstanceId())
-            || !sourceHandles
-                .get(e.getTargetFragmentInstanceId())
-                .containsKey(e.getTargetPlanNodeId())
-            || sourceHandles
-                .get(e.getTargetFragmentInstanceId())
-                .get(e.getTargetPlanNodeId())
-                .isAborted()
-            || sourceHandles
-                .get(e.getTargetFragmentInstanceId())
-                .get(e.getTargetPlanNodeId())
-                .isFinished()) {
+
+        Map<String, ISourceHandle> sourceHandleMap =
+            sourceHandles.get(e.getTargetFragmentInstanceId());
+        SourceHandle sourceHandle =
+            sourceHandleMap == null
+                ? null
+                : (SourceHandle) sourceHandleMap.get(e.getTargetPlanNodeId());
+
+        if (sourceHandle == null || sourceHandle.isAborted() || 
sourceHandle.isFinished()) {
           // In some scenario, when the SourceHandle sends the data block ACK 
event, its upstream
           // may
           // have already been stopped. For example, in the query whit 
LimitOperator, the downstream
@@ -192,9 +186,6 @@ public class MPPDataExchangeManager implements 
IMPPDataExchangeManager {
           return;
         }
 
-        SourceHandle sourceHandle =
-            (SourceHandle)
-                
sourceHandles.get(e.getTargetFragmentInstanceId()).get(e.getTargetPlanNodeId());
         sourceHandle.updatePendingDataBlockInfo(e.getStartSequenceId(), 
e.getBlockSizes());
       } finally {
         QUERY_METRICS.recordDataExchangeCost(
@@ -212,28 +203,21 @@ public class MPPDataExchangeManager implements 
IMPPDataExchangeManager {
             e.getTargetPlanNodeId(),
             e.getTargetFragmentInstanceId(),
             e.getSourceFragmentInstanceId());
-        if (!sourceHandles.containsKey(e.getTargetFragmentInstanceId())
-            || !sourceHandles
-                .get(e.getTargetFragmentInstanceId())
-                .containsKey(e.getTargetPlanNodeId())
-            || sourceHandles
-                .get(e.getTargetFragmentInstanceId())
-                .get(e.getTargetPlanNodeId())
-                .isAborted()
-            || sourceHandles
-                .get(e.getTargetFragmentInstanceId())
-                .get(e.getTargetPlanNodeId())
-                .isFinished()) {
+
+        Map<String, ISourceHandle> sourceHandleMap =
+            sourceHandles.get(e.getTargetFragmentInstanceId());
+        SourceHandle sourceHandle =
+            sourceHandleMap == null
+                ? null
+                : (SourceHandle) sourceHandleMap.get(e.getTargetPlanNodeId());
+
+        if (sourceHandle == null || sourceHandle.isAborted() || 
sourceHandle.isFinished()) {
           logger.debug(
               "received onEndOfDataBlockEvent but the downstream 
FragmentInstance[{}] is not found",
               e.getTargetFragmentInstanceId());
           return;
         }
-        SourceHandle sourceHandle =
-            (SourceHandle)
-                sourceHandles
-                    .getOrDefault(e.getTargetFragmentInstanceId(), 
Collections.emptyMap())
-                    .get(e.getTargetPlanNodeId());
+
         sourceHandle.setNoMoreTsBlocks(e.getLastSequenceId());
       }
     }
@@ -251,18 +235,14 @@ public class MPPDataExchangeManager implements 
IMPPDataExchangeManager {
     @Override
     public void onFinished(ISourceHandle sourceHandle) {
       logger.debug("[ScHListenerOnFinish]");
-      if (!sourceHandles.containsKey(sourceHandle.getLocalFragmentInstanceId())
-          || !sourceHandles
-              .get(sourceHandle.getLocalFragmentInstanceId())
-              .containsKey(sourceHandle.getLocalPlanNodeId())) {
+      Map<String, ISourceHandle> sourceHandleMap =
+          sourceHandles.get(sourceHandle.getLocalFragmentInstanceId());
+      if (sourceHandleMap == null
+          || sourceHandleMap.remove(sourceHandle.getLocalPlanNodeId()) == 
null) {
         logger.debug("[ScHListenerAlreadyReleased]");
-      } else {
-        sourceHandles
-            .get(sourceHandle.getLocalFragmentInstanceId())
-            .remove(sourceHandle.getLocalPlanNodeId());
       }
-      if (sourceHandles.containsKey(sourceHandle.getLocalFragmentInstanceId())
-          && 
sourceHandles.get(sourceHandle.getLocalFragmentInstanceId()).isEmpty()) {
+
+      if (sourceHandleMap != null && sourceHandleMap.isEmpty()) {
         sourceHandles.remove(sourceHandle.getLocalFragmentInstanceId());
       }
     }
@@ -459,8 +439,10 @@ public class MPPDataExchangeManager implements 
IMPPDataExchangeManager {
         localFragmentInstanceId);
 
     SharedTsBlockQueue queue;
-    if (sourceHandles.containsKey(remoteFragmentInstanceId)
-        && 
sourceHandles.get(remoteFragmentInstanceId).containsKey(remotePlanNodeId)) {
+    Map<String, ISourceHandle> sourceHandleMap = 
sourceHandles.get(remoteFragmentInstanceId);
+    LocalSourceHandle localSourceHandle =
+        sourceHandleMap == null ? null : (LocalSourceHandle) 
sourceHandleMap.get(remotePlanNodeId);
+    if (localSourceHandle != null) {
       logger.debug("Get shared tsblock queue from local source handle");
       queue =
           ((LocalSourceHandle) 
sourceHandles.get(remoteFragmentInstanceId).get(remotePlanNodeId))
@@ -595,8 +577,8 @@ public class MPPDataExchangeManager implements 
IMPPDataExchangeManager {
       TEndPoint remoteEndpoint,
       TFragmentInstanceId remoteFragmentInstanceId,
       IMPPDataExchangeManagerCallback<Throwable> onFailureCallback) {
-    if (sourceHandles.containsKey(localFragmentInstanceId)
-        && 
sourceHandles.get(localFragmentInstanceId).containsKey(localPlanNodeId)) {
+    Map<String, ISourceHandle> sourceHandleMap = 
sourceHandles.get(localFragmentInstanceId);
+    if (sourceHandleMap != null && 
sourceHandleMap.containsKey(localPlanNodeId)) {
       throw new IllegalStateException(
           "Source handle for plan node "
               + localPlanNodeId
@@ -661,16 +643,4 @@ public class MPPDataExchangeManager implements 
IMPPDataExchangeManager {
         + "."
         + suffix;
   }
-
-  public ISinkHandle getISinkHandle(TFragmentInstanceId fragmentInstanceId) {
-    return sinkHandles.get(fragmentInstanceId);
-  }
-
-  public List<ISourceHandle> getISourceHandle(TFragmentInstanceId 
fragmentInstanceId) {
-    if (sourceHandles.containsKey(fragmentInstanceId)) {
-      return new ArrayList<>(sourceHandles.get(fragmentInstanceId).values());
-    } else {
-      return new ArrayList<>();
-    }
-  }
 }

Reply via email to