This is an automated email from the ASF dual-hosted git repository. jackietien pushed a commit to branch AddMoreLog in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 9aaee1755c81d03592760a3ba7c2127f2377deed Author: JackieTien97 <[email protected]> AuthorDate: Thu Aug 11 21:19:27 2022 +0800 Fix SinkHandle bug --- .../org/apache/iotdb/db/mpp/execution/exchange/LocalSourceHandle.java | 4 ++-- .../iotdb/db/mpp/execution/exchange/MPPDataExchangeManager.java | 4 ++++ .../java/org/apache/iotdb/db/mpp/execution/exchange/SinkHandle.java | 4 ++-- .../java/org/apache/iotdb/db/mpp/execution/exchange/SourceHandle.java | 2 +- 4 files changed, 9 insertions(+), 5 deletions(-) diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/LocalSourceHandle.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/LocalSourceHandle.java index 3bc455592e..35c7cda823 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/LocalSourceHandle.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/LocalSourceHandle.java @@ -142,7 +142,7 @@ public class LocalSourceHandle implements ISourceHandle { logger.info("Source handle is being aborted."); synchronized (queue) { synchronized (this) { - if (aborted) { + if (aborted || closed) { return; } queue.abort(); @@ -163,7 +163,7 @@ public class LocalSourceHandle implements ISourceHandle { logger.info("Source handle is being closed."); synchronized (queue) { synchronized (this) { - if (aborted) { + if (aborted || closed) { return; } queue.close(); diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/MPPDataExchangeManager.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/MPPDataExchangeManager.java index 7f087e2a9a..b2b32222d9 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/MPPDataExchangeManager.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/MPPDataExchangeManager.java @@ -121,6 +121,10 @@ public class MPPDataExchangeManager implements IMPPDataExchangeManager { } ((SinkHandle) sinkHandles.get(e.getSourceFragmentInstanceId())) .acknowledgeTsBlock(e.getStartSequenceId(), e.getEndSequenceId()); + } catch (Throwable t) { + logger.error( + "ack TsBlock [{}, {}) failed.", e.getStartSequenceId(), e.getEndSequenceId(), t); + throw t; } } diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SinkHandle.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SinkHandle.java index 872720fb7e..6ccddaf44e 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SinkHandle.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SinkHandle.java @@ -197,7 +197,7 @@ public class SinkHandle implements ISinkHandle { @Override public synchronized void setNoMoreTsBlocks() { logger.info("start to set no-more-tsblocks"); - if (aborted) { + if (aborted || closed) { return; } try { @@ -283,7 +283,7 @@ public class SinkHandle implements ISinkHandle { void acknowledgeTsBlock(int startSequenceId, int endSequenceId) { long freedBytes = 0L; synchronized (this) { - if (aborted) { + if (aborted || closed) { return; } Iterator<Entry<Integer, Pair<TsBlock, Long>>> iterator = diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SourceHandle.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SourceHandle.java index 3d4016dff4..1dc68c27a3 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SourceHandle.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SourceHandle.java @@ -384,7 +384,7 @@ public class SourceHandle implements ISourceHandle { executorService.submit( new SendAcknowledgeDataBlockEventTask(startSequenceId, endSequenceId)); synchronized (SourceHandle.this) { - if (aborted) { + if (aborted || closed) { return; } for (int i = startSequenceId; i < endSequenceId; i++) {
