This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch rel/1.0
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/rel/1.0 by this push:
new 38adae4d7f [To rel/1.0] [IOTDB-5434] Fix occasional timeout error in CI
38adae4d7f is described below
commit 38adae4d7fe982f062c5f0850e9c3ae4800c2683
Author: Jackie Tien <[email protected]>
AuthorDate: Sun Jan 29 19:22:09 2023 +0800
[To rel/1.0] [IOTDB-5434] Fix occasional timeout error in CI
---
.../execution/exchange/MPPDataExchangeManager.java | 78 +++++++++-------------
1 file changed, 31 insertions(+), 47 deletions(-)
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/MPPDataExchangeManager.java
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/MPPDataExchangeManager.java
index 28146749a0..4aa68e3130 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/MPPDataExchangeManager.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/MPPDataExchangeManager.java
@@ -42,7 +42,6 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
-import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
@@ -147,18 +146,15 @@ public class MPPDataExchangeManager implements
IMPPDataExchangeManager {
e.getTargetPlanNodeId(),
e.getTargetFragmentInstanceId(),
e.getSourceFragmentInstanceId());
- if (!sourceHandles.containsKey(e.getTargetFragmentInstanceId())
- || !sourceHandles
- .get(e.getTargetFragmentInstanceId())
- .containsKey(e.getTargetPlanNodeId())
- || sourceHandles
- .get(e.getTargetFragmentInstanceId())
- .get(e.getTargetPlanNodeId())
- .isAborted()
- || sourceHandles
- .get(e.getTargetFragmentInstanceId())
- .get(e.getTargetPlanNodeId())
- .isFinished()) {
+
+ Map<String, ISourceHandle> sourceHandleMap =
+ sourceHandles.get(e.getTargetFragmentInstanceId());
+ SourceHandle sourceHandle =
+ sourceHandleMap == null
+ ? null
+ : (SourceHandle) sourceHandleMap.get(e.getTargetPlanNodeId());
+
+ if (sourceHandle == null || sourceHandle.isAborted() ||
sourceHandle.isFinished()) {
// In some scenario, when the SourceHandle sends the data block ACK
event, its upstream
// may
// have already been stopped. For example, in the query whit
LimitOperator, the downstream
@@ -169,9 +165,6 @@ public class MPPDataExchangeManager implements
IMPPDataExchangeManager {
return;
}
- SourceHandle sourceHandle =
- (SourceHandle)
-
sourceHandles.get(e.getTargetFragmentInstanceId()).get(e.getTargetPlanNodeId());
sourceHandle.updatePendingDataBlockInfo(e.getStartSequenceId(),
e.getBlockSizes());
}
}
@@ -185,28 +178,21 @@ public class MPPDataExchangeManager implements
IMPPDataExchangeManager {
e.getTargetPlanNodeId(),
e.getTargetFragmentInstanceId(),
e.getSourceFragmentInstanceId());
- if (!sourceHandles.containsKey(e.getTargetFragmentInstanceId())
- || !sourceHandles
- .get(e.getTargetFragmentInstanceId())
- .containsKey(e.getTargetPlanNodeId())
- || sourceHandles
- .get(e.getTargetFragmentInstanceId())
- .get(e.getTargetPlanNodeId())
- .isAborted()
- || sourceHandles
- .get(e.getTargetFragmentInstanceId())
- .get(e.getTargetPlanNodeId())
- .isFinished()) {
+
+ Map<String, ISourceHandle> sourceHandleMap =
+ sourceHandles.get(e.getTargetFragmentInstanceId());
+ SourceHandle sourceHandle =
+ sourceHandleMap == null
+ ? null
+ : (SourceHandle) sourceHandleMap.get(e.getTargetPlanNodeId());
+
+ if (sourceHandle == null || sourceHandle.isAborted() ||
sourceHandle.isFinished()) {
logger.debug(
"received onEndOfDataBlockEvent but the downstream
FragmentInstance[{}] is not found",
e.getTargetFragmentInstanceId());
return;
}
- SourceHandle sourceHandle =
- (SourceHandle)
- sourceHandles
- .getOrDefault(e.getTargetFragmentInstanceId(),
Collections.emptyMap())
- .get(e.getTargetPlanNodeId());
+
sourceHandle.setNoMoreTsBlocks(e.getLastSequenceId());
}
}
@@ -224,18 +210,14 @@ public class MPPDataExchangeManager implements
IMPPDataExchangeManager {
@Override
public void onFinished(ISourceHandle sourceHandle) {
logger.debug("[ScHListenerOnFinish]");
- if (!sourceHandles.containsKey(sourceHandle.getLocalFragmentInstanceId())
- || !sourceHandles
- .get(sourceHandle.getLocalFragmentInstanceId())
- .containsKey(sourceHandle.getLocalPlanNodeId())) {
+ Map<String, ISourceHandle> sourceHandleMap =
+ sourceHandles.get(sourceHandle.getLocalFragmentInstanceId());
+ if (sourceHandleMap == null
+ || sourceHandleMap.remove(sourceHandle.getLocalPlanNodeId()) ==
null) {
logger.debug("[ScHListenerAlreadyReleased]");
- } else {
- sourceHandles
- .get(sourceHandle.getLocalFragmentInstanceId())
- .remove(sourceHandle.getLocalPlanNodeId());
}
- if (sourceHandles.containsKey(sourceHandle.getLocalFragmentInstanceId())
- &&
sourceHandles.get(sourceHandle.getLocalFragmentInstanceId()).isEmpty()) {
+
+ if (sourceHandleMap != null && sourceHandleMap.isEmpty()) {
sourceHandles.remove(sourceHandle.getLocalFragmentInstanceId());
}
}
@@ -358,8 +340,10 @@ public class MPPDataExchangeManager implements
IMPPDataExchangeManager {
localFragmentInstanceId);
SharedTsBlockQueue queue;
- if (sourceHandles.containsKey(remoteFragmentInstanceId)
- &&
sourceHandles.get(remoteFragmentInstanceId).containsKey(remotePlanNodeId)) {
+ Map<String, ISourceHandle> sourceHandleMap =
sourceHandles.get(remoteFragmentInstanceId);
+ LocalSourceHandle localSourceHandle =
+ sourceHandleMap == null ? null : (LocalSourceHandle)
sourceHandleMap.get(remotePlanNodeId);
+ if (localSourceHandle != null) {
logger.debug("Get shared tsblock queue from local source handle");
queue =
((LocalSourceHandle)
sourceHandles.get(remoteFragmentInstanceId).get(remotePlanNodeId))
@@ -466,8 +450,8 @@ public class MPPDataExchangeManager implements
IMPPDataExchangeManager {
TEndPoint remoteEndpoint,
TFragmentInstanceId remoteFragmentInstanceId,
IMPPDataExchangeManagerCallback<Throwable> onFailureCallback) {
- if (sourceHandles.containsKey(localFragmentInstanceId)
- &&
sourceHandles.get(localFragmentInstanceId).containsKey(localPlanNodeId)) {
+ Map<String, ISourceHandle> sourceHandleMap =
sourceHandles.get(localFragmentInstanceId);
+ if (sourceHandleMap != null &&
sourceHandleMap.containsKey(localPlanNodeId)) {
throw new IllegalStateException(
"Source handle for plan node "
+ localPlanNodeId