This is an automated email from the ASF dual-hosted git repository. jackietien pushed a commit to branch IOTDB-5434 in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 388f5637708ad17d3eb5bdcf508750f507c28ae8 Author: JackieTien97 <[email protected]> AuthorDate: Sun Jan 29 11:13:24 2023 +0800 [IOTDB-5434] Fix occasional timeout error in CI --- .../execution/exchange/MPPDataExchangeManager.java | 92 ++++++++-------------- 1 file changed, 31 insertions(+), 61 deletions(-) diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/MPPDataExchangeManager.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/MPPDataExchangeManager.java index b3d89c0340..f51c4b77cb 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/MPPDataExchangeManager.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/MPPDataExchangeManager.java @@ -43,9 +43,6 @@ import org.slf4j.LoggerFactory; import java.io.IOException; import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.Optional; @@ -170,18 +167,15 @@ public class MPPDataExchangeManager implements IMPPDataExchangeManager { e.getTargetPlanNodeId(), e.getTargetFragmentInstanceId(), e.getSourceFragmentInstanceId()); - if (!sourceHandles.containsKey(e.getTargetFragmentInstanceId()) - || !sourceHandles - .get(e.getTargetFragmentInstanceId()) - .containsKey(e.getTargetPlanNodeId()) - || sourceHandles - .get(e.getTargetFragmentInstanceId()) - .get(e.getTargetPlanNodeId()) - .isAborted() - || sourceHandles - .get(e.getTargetFragmentInstanceId()) - .get(e.getTargetPlanNodeId()) - .isFinished()) { + + Map<String, ISourceHandle> sourceHandleMap = + sourceHandles.get(e.getTargetFragmentInstanceId()); + SourceHandle sourceHandle = + sourceHandleMap == null + ? null + : (SourceHandle) sourceHandleMap.get(e.getTargetPlanNodeId()); + + if (sourceHandle == null || sourceHandle.isAborted() || sourceHandle.isFinished()) { // In some scenario, when the SourceHandle sends the data block ACK event, its upstream // may // have already been stopped. For example, in the query whit LimitOperator, the downstream @@ -192,9 +186,6 @@ public class MPPDataExchangeManager implements IMPPDataExchangeManager { return; } - SourceHandle sourceHandle = - (SourceHandle) - sourceHandles.get(e.getTargetFragmentInstanceId()).get(e.getTargetPlanNodeId()); sourceHandle.updatePendingDataBlockInfo(e.getStartSequenceId(), e.getBlockSizes()); } finally { QUERY_METRICS.recordDataExchangeCost( @@ -212,28 +203,21 @@ public class MPPDataExchangeManager implements IMPPDataExchangeManager { e.getTargetPlanNodeId(), e.getTargetFragmentInstanceId(), e.getSourceFragmentInstanceId()); - if (!sourceHandles.containsKey(e.getTargetFragmentInstanceId()) - || !sourceHandles - .get(e.getTargetFragmentInstanceId()) - .containsKey(e.getTargetPlanNodeId()) - || sourceHandles - .get(e.getTargetFragmentInstanceId()) - .get(e.getTargetPlanNodeId()) - .isAborted() - || sourceHandles - .get(e.getTargetFragmentInstanceId()) - .get(e.getTargetPlanNodeId()) - .isFinished()) { + + Map<String, ISourceHandle> sourceHandleMap = + sourceHandles.get(e.getTargetFragmentInstanceId()); + SourceHandle sourceHandle = + sourceHandleMap == null + ? null + : (SourceHandle) sourceHandleMap.get(e.getTargetPlanNodeId()); + + if (sourceHandle == null || sourceHandle.isAborted() || sourceHandle.isFinished()) { logger.debug( "received onEndOfDataBlockEvent but the downstream FragmentInstance[{}] is not found", e.getTargetFragmentInstanceId()); return; } - SourceHandle sourceHandle = - (SourceHandle) - sourceHandles - .getOrDefault(e.getTargetFragmentInstanceId(), Collections.emptyMap()) - .get(e.getTargetPlanNodeId()); + sourceHandle.setNoMoreTsBlocks(e.getLastSequenceId()); } } @@ -251,18 +235,14 @@ public class MPPDataExchangeManager implements IMPPDataExchangeManager { @Override public void onFinished(ISourceHandle sourceHandle) { logger.debug("[ScHListenerOnFinish]"); - if (!sourceHandles.containsKey(sourceHandle.getLocalFragmentInstanceId()) - || !sourceHandles - .get(sourceHandle.getLocalFragmentInstanceId()) - .containsKey(sourceHandle.getLocalPlanNodeId())) { + Map<String, ISourceHandle> sourceHandleMap = + sourceHandles.get(sourceHandle.getLocalFragmentInstanceId()); + if (sourceHandleMap == null + || sourceHandleMap.remove(sourceHandle.getLocalPlanNodeId()) == null) { logger.debug("[ScHListenerAlreadyReleased]"); - } else { - sourceHandles - .get(sourceHandle.getLocalFragmentInstanceId()) - .remove(sourceHandle.getLocalPlanNodeId()); } - if (sourceHandles.containsKey(sourceHandle.getLocalFragmentInstanceId()) - && sourceHandles.get(sourceHandle.getLocalFragmentInstanceId()).isEmpty()) { + + if (sourceHandleMap != null && sourceHandleMap.isEmpty()) { sourceHandles.remove(sourceHandle.getLocalFragmentInstanceId()); } } @@ -459,8 +439,10 @@ public class MPPDataExchangeManager implements IMPPDataExchangeManager { localFragmentInstanceId); SharedTsBlockQueue queue; - if (sourceHandles.containsKey(remoteFragmentInstanceId) - && sourceHandles.get(remoteFragmentInstanceId).containsKey(remotePlanNodeId)) { + Map<String, ISourceHandle> sourceHandleMap = sourceHandles.get(remoteFragmentInstanceId); + LocalSourceHandle localSourceHandle = + sourceHandleMap == null ? null : (LocalSourceHandle) sourceHandleMap.get(remotePlanNodeId); + if (localSourceHandle != null) { logger.debug("Get shared tsblock queue from local source handle"); queue = ((LocalSourceHandle) sourceHandles.get(remoteFragmentInstanceId).get(remotePlanNodeId)) @@ -595,8 +577,8 @@ public class MPPDataExchangeManager implements IMPPDataExchangeManager { TEndPoint remoteEndpoint, TFragmentInstanceId remoteFragmentInstanceId, IMPPDataExchangeManagerCallback<Throwable> onFailureCallback) { - if (sourceHandles.containsKey(localFragmentInstanceId) - && sourceHandles.get(localFragmentInstanceId).containsKey(localPlanNodeId)) { + Map<String, ISourceHandle> sourceHandleMap = sourceHandles.get(localFragmentInstanceId); + if (sourceHandleMap != null && sourceHandleMap.containsKey(localPlanNodeId)) { throw new IllegalStateException( "Source handle for plan node " + localPlanNodeId @@ -661,16 +643,4 @@ public class MPPDataExchangeManager implements IMPPDataExchangeManager { + "." + suffix; } - - public ISinkHandle getISinkHandle(TFragmentInstanceId fragmentInstanceId) { - return sinkHandles.get(fragmentInstanceId); - } - - public List<ISourceHandle> getISourceHandle(TFragmentInstanceId fragmentInstanceId) { - if (sourceHandles.containsKey(fragmentInstanceId)) { - return new ArrayList<>(sourceHandles.get(fragmentInstanceId).values()); - } else { - return new ArrayList<>(); - } - } }
