This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch rel/1.0
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/rel/1.0 by this push:
     new 38adae4d7f [To rel/1.0] [IOTDB-5434] Fix occasional timeout error in CI
38adae4d7f is described below

commit 38adae4d7fe982f062c5f0850e9c3ae4800c2683
Author: Jackie Tien <[email protected]>
AuthorDate: Sun Jan 29 19:22:09 2023 +0800

    [To rel/1.0] [IOTDB-5434] Fix occasional timeout error in CI
---
 .../execution/exchange/MPPDataExchangeManager.java | 78 +++++++++-------------
 1 file changed, 31 insertions(+), 47 deletions(-)

diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/MPPDataExchangeManager.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/MPPDataExchangeManager.java
index 28146749a0..4aa68e3130 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/MPPDataExchangeManager.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/MPPDataExchangeManager.java
@@ -42,7 +42,6 @@ import org.slf4j.LoggerFactory;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
@@ -147,18 +146,15 @@ public class MPPDataExchangeManager implements 
IMPPDataExchangeManager {
             e.getTargetPlanNodeId(),
             e.getTargetFragmentInstanceId(),
             e.getSourceFragmentInstanceId());
-        if (!sourceHandles.containsKey(e.getTargetFragmentInstanceId())
-            || !sourceHandles
-                .get(e.getTargetFragmentInstanceId())
-                .containsKey(e.getTargetPlanNodeId())
-            || sourceHandles
-                .get(e.getTargetFragmentInstanceId())
-                .get(e.getTargetPlanNodeId())
-                .isAborted()
-            || sourceHandles
-                .get(e.getTargetFragmentInstanceId())
-                .get(e.getTargetPlanNodeId())
-                .isFinished()) {
+
+        Map<String, ISourceHandle> sourceHandleMap =
+            sourceHandles.get(e.getTargetFragmentInstanceId());
+        SourceHandle sourceHandle =
+            sourceHandleMap == null
+                ? null
+                : (SourceHandle) sourceHandleMap.get(e.getTargetPlanNodeId());
+
+        if (sourceHandle == null || sourceHandle.isAborted() || 
sourceHandle.isFinished()) {
           // In some scenario, when the SourceHandle sends the data block ACK 
event, its upstream
           // may
           // have already been stopped. For example, in the query whit 
LimitOperator, the downstream
@@ -169,9 +165,6 @@ public class MPPDataExchangeManager implements 
IMPPDataExchangeManager {
           return;
         }
 
-        SourceHandle sourceHandle =
-            (SourceHandle)
-                
sourceHandles.get(e.getTargetFragmentInstanceId()).get(e.getTargetPlanNodeId());
         sourceHandle.updatePendingDataBlockInfo(e.getStartSequenceId(), 
e.getBlockSizes());
       }
     }
@@ -185,28 +178,21 @@ public class MPPDataExchangeManager implements 
IMPPDataExchangeManager {
             e.getTargetPlanNodeId(),
             e.getTargetFragmentInstanceId(),
             e.getSourceFragmentInstanceId());
-        if (!sourceHandles.containsKey(e.getTargetFragmentInstanceId())
-            || !sourceHandles
-                .get(e.getTargetFragmentInstanceId())
-                .containsKey(e.getTargetPlanNodeId())
-            || sourceHandles
-                .get(e.getTargetFragmentInstanceId())
-                .get(e.getTargetPlanNodeId())
-                .isAborted()
-            || sourceHandles
-                .get(e.getTargetFragmentInstanceId())
-                .get(e.getTargetPlanNodeId())
-                .isFinished()) {
+
+        Map<String, ISourceHandle> sourceHandleMap =
+            sourceHandles.get(e.getTargetFragmentInstanceId());
+        SourceHandle sourceHandle =
+            sourceHandleMap == null
+                ? null
+                : (SourceHandle) sourceHandleMap.get(e.getTargetPlanNodeId());
+
+        if (sourceHandle == null || sourceHandle.isAborted() || 
sourceHandle.isFinished()) {
           logger.debug(
               "received onEndOfDataBlockEvent but the downstream 
FragmentInstance[{}] is not found",
               e.getTargetFragmentInstanceId());
           return;
         }
-        SourceHandle sourceHandle =
-            (SourceHandle)
-                sourceHandles
-                    .getOrDefault(e.getTargetFragmentInstanceId(), 
Collections.emptyMap())
-                    .get(e.getTargetPlanNodeId());
+
         sourceHandle.setNoMoreTsBlocks(e.getLastSequenceId());
       }
     }
@@ -224,18 +210,14 @@ public class MPPDataExchangeManager implements 
IMPPDataExchangeManager {
     @Override
     public void onFinished(ISourceHandle sourceHandle) {
       logger.debug("[ScHListenerOnFinish]");
-      if (!sourceHandles.containsKey(sourceHandle.getLocalFragmentInstanceId())
-          || !sourceHandles
-              .get(sourceHandle.getLocalFragmentInstanceId())
-              .containsKey(sourceHandle.getLocalPlanNodeId())) {
+      Map<String, ISourceHandle> sourceHandleMap =
+          sourceHandles.get(sourceHandle.getLocalFragmentInstanceId());
+      if (sourceHandleMap == null
+          || sourceHandleMap.remove(sourceHandle.getLocalPlanNodeId()) == 
null) {
         logger.debug("[ScHListenerAlreadyReleased]");
-      } else {
-        sourceHandles
-            .get(sourceHandle.getLocalFragmentInstanceId())
-            .remove(sourceHandle.getLocalPlanNodeId());
       }
-      if (sourceHandles.containsKey(sourceHandle.getLocalFragmentInstanceId())
-          && 
sourceHandles.get(sourceHandle.getLocalFragmentInstanceId()).isEmpty()) {
+
+      if (sourceHandleMap != null && sourceHandleMap.isEmpty()) {
         sourceHandles.remove(sourceHandle.getLocalFragmentInstanceId());
       }
     }
@@ -358,8 +340,10 @@ public class MPPDataExchangeManager implements 
IMPPDataExchangeManager {
         localFragmentInstanceId);
 
     SharedTsBlockQueue queue;
-    if (sourceHandles.containsKey(remoteFragmentInstanceId)
-        && 
sourceHandles.get(remoteFragmentInstanceId).containsKey(remotePlanNodeId)) {
+    Map<String, ISourceHandle> sourceHandleMap = 
sourceHandles.get(remoteFragmentInstanceId);
+    LocalSourceHandle localSourceHandle =
+        sourceHandleMap == null ? null : (LocalSourceHandle) 
sourceHandleMap.get(remotePlanNodeId);
+    if (localSourceHandle != null) {
       logger.debug("Get shared tsblock queue from local source handle");
       queue =
           ((LocalSourceHandle) 
sourceHandles.get(remoteFragmentInstanceId).get(remotePlanNodeId))
@@ -466,8 +450,8 @@ public class MPPDataExchangeManager implements 
IMPPDataExchangeManager {
       TEndPoint remoteEndpoint,
       TFragmentInstanceId remoteFragmentInstanceId,
       IMPPDataExchangeManagerCallback<Throwable> onFailureCallback) {
-    if (sourceHandles.containsKey(localFragmentInstanceId)
-        && 
sourceHandles.get(localFragmentInstanceId).containsKey(localPlanNodeId)) {
+    Map<String, ISourceHandle> sourceHandleMap = 
sourceHandles.get(localFragmentInstanceId);
+    if (sourceHandleMap != null && 
sourceHandleMap.containsKey(localPlanNodeId)) {
       throw new IllegalStateException(
           "Source handle for plan node "
               + localPlanNodeId

Reply via email to