This is an automated email from the ASF dual-hosted git repository. jackietien pushed a commit to branch QueryException in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 55a032e3edfdef849194eb4f84dd3ee0676595d3 Author: JackieTien97 <[email protected]> AuthorDate: Tue Jul 12 16:36:42 2022 +0800 Done --- .../db/mpp/execution/exchange/ISinkHandle.java | 10 ++++++ .../db/mpp/execution/exchange/LocalSinkHandle.java | 40 +++++++++++++++++----- .../mpp/execution/exchange/LocalSourceHandle.java | 2 +- .../mpp/execution/exchange/SharedTsBlockQueue.java | 18 +++++----- .../db/mpp/execution/exchange/SinkHandle.java | 35 +++++++++++++++---- .../fragment/FragmentInstanceExecution.java | 6 +++- .../iotdb/db/mpp/execution/memory/MemoryPool.java | 20 +++++++++++ .../db/mpp/plan/scheduler/ClusterScheduler.java | 1 - .../scheduler/FragmentInstanceDispatcherImpl.java | 6 ++++ 9 files changed, 111 insertions(+), 27 deletions(-) diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/ISinkHandle.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/ISinkHandle.java index b4be67e164..4450ca8c21 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/ISinkHandle.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/ISinkHandle.java @@ -68,6 +68,16 @@ public interface ISinkHandle { /** * Abort the sink handle. Discard all tsblocks which may still be in the memory buffer and cancel * the future returned by {@link #isFull()}. + * + * <p>Should only be called in abnormal case */ void abort(); + + /** + * Close the sink handle. Discard all tsblocks which may still be in the memory buffer and + * complete the future returned by {@link #isFull()}. + * + * <p>Should only be called in normal case. + */ + void close(); } diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/LocalSinkHandle.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/LocalSinkHandle.java index 852e79ac8b..d6960e7664 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/LocalSinkHandle.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/LocalSinkHandle.java @@ -45,6 +45,7 @@ public class LocalSinkHandle implements ISinkHandle { private final SharedTsBlockQueue queue; private volatile ListenableFuture<Void> blocked = immediateFuture(null); private boolean aborted = false; + private boolean closed = false; public LocalSinkHandle( TFragmentInstanceId remoteFragmentInstanceId, @@ -72,9 +73,7 @@ public class LocalSinkHandle implements ISinkHandle { @Override public synchronized ListenableFuture<?> isFull() { - if (aborted) { - throw new IllegalStateException("Sink handle is closed."); - } + checkState(); return nonCancellationPropagating(blocked); } @@ -104,9 +103,7 @@ public class LocalSinkHandle implements ISinkHandle { public void send(List<TsBlock> tsBlocks) { Validate.notNull(tsBlocks, "tsBlocks is null"); synchronized (this) { - if (aborted) { - throw new IllegalStateException("Sink handle is aborted."); - } + checkState(); if (!blocked.isDone()) { throw new IllegalStateException("Sink handle is blocked."); } @@ -135,7 +132,8 @@ public class LocalSinkHandle implements ISinkHandle { synchronized (queue) { synchronized (this) { logger.info("set noMoreTsBlocks."); - if (aborted) { + if (aborted || closed) { + logger.info("SinkHandle has been aborted={} or closed={}.", aborted, closed); return; } queue.setNoMoreTsBlocks(true); @@ -151,17 +149,33 @@ public class LocalSinkHandle implements ISinkHandle { logger.info("Sink handle is being aborted."); synchronized (queue) { synchronized (this) { - if (aborted) { + if (aborted || closed) { return; } aborted = true; - queue.destroy(); + queue.abort(); sinkHandleListener.onAborted(this); } } logger.info("Sink handle is aborted"); } + @Override + public void close() { + logger.info("Sink handle is being closed."); + synchronized (queue) { + synchronized (this) { + if (aborted || closed) { + return; + } + closed = true; + queue.close(); + sinkHandleListener.onFinish(this); + } + } + logger.info("Sink handle is closed"); + } + public TFragmentInstanceId getRemoteFragmentInstanceId() { return remoteFragmentInstanceId; } @@ -173,4 +187,12 @@ public class LocalSinkHandle implements ISinkHandle { SharedTsBlockQueue getSharedTsBlockQueue() { return queue; } + + private void checkState() { + if (aborted) { + throw new IllegalStateException("Sink handle is aborted."); + } else if (closed) { + throw new IllegalStateException("Sink Handle is closed."); + } + } } diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/LocalSourceHandle.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/LocalSourceHandle.java index 95056cf44b..3bc455592e 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/LocalSourceHandle.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/LocalSourceHandle.java @@ -166,7 +166,7 @@ public class LocalSourceHandle implements ISourceHandle { if (aborted) { return; } - queue.destroy(); + queue.close(); closed = true; sourceHandleListener.onFinished(this); } diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SharedTsBlockQueue.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SharedTsBlockQueue.java index 7cac64a6e3..29cc6fb44b 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SharedTsBlockQueue.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SharedTsBlockQueue.java @@ -53,7 +53,7 @@ public class SharedTsBlockQueue { private ListenableFuture<Void> blockedOnMemory; - private boolean destroyed = false; + private boolean closed = false; private LocalSourceHandle sourceHandle; private LocalSinkHandle sinkHandle; @@ -93,7 +93,7 @@ public class SharedTsBlockQueue { /** Notify no more tsblocks will be added to the queue. */ public void setNoMoreTsBlocks(boolean noMoreTsBlocks) { logger.info("SharedTsBlockQueue receive no more TsBlocks signal."); - if (destroyed) { + if (closed) { throw new IllegalStateException("queue has been destroyed"); } this.noMoreTsBlocks = noMoreTsBlocks; @@ -110,7 +110,7 @@ public class SharedTsBlockQueue { * returned by {@link #isBlocked()} completes. */ public TsBlock remove() { - if (destroyed) { + if (closed) { throw new IllegalStateException("queue has been destroyed"); } TsBlock tsBlock = queue.remove(); @@ -134,7 +134,7 @@ public class SharedTsBlockQueue { * the returned future of last invocation completes. */ public ListenableFuture<Void> add(TsBlock tsBlock) { - if (destroyed) { + if (closed) { throw new IllegalStateException("queue has been destroyed"); } @@ -153,11 +153,11 @@ public class SharedTsBlockQueue { } /** Destroy the queue and complete the future. Should only be called in normal case */ - public void destroy() { - if (destroyed) { + public void close() { + if (closed) { return; } - destroyed = true; + closed = true; if (!blocked.isDone()) { blocked.set(null); } @@ -177,10 +177,10 @@ public class SharedTsBlockQueue { // instead of blocked.cancel(true); /** Destroy the queue and cancel the future. Should only be called in normal case */ public void abort() { - if (destroyed) { + if (closed) { return; } - destroyed = true; + closed = true; if (!blocked.isDone()) { blocked.cancel(true); } diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SinkHandle.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SinkHandle.java index 3619de44f4..e8247cc621 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SinkHandle.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SinkHandle.java @@ -82,6 +82,9 @@ public class SinkHandle implements ISinkHandle { private long bufferRetainedSizeInBytes = 0; private boolean aborted = false; + + private boolean closed = false; + private boolean noMoreTsBlocks = false; public SinkHandle( @@ -110,9 +113,7 @@ public class SinkHandle implements ISinkHandle { @Override public synchronized ListenableFuture<?> isFull() { - if (aborted) { - throw new IllegalStateException("Sink handle is aborted."); - } + checkState(); return nonCancellationPropagating(blocked); } @@ -123,9 +124,7 @@ public class SinkHandle implements ISinkHandle { @Override public synchronized void send(List<TsBlock> tsBlocks) { Validate.notNull(tsBlocks, "tsBlocks is null"); - if (aborted) { - throw new IllegalStateException("Sink handle is aborted."); - } + checkState(); if (!blocked.isDone()) { throw new IllegalStateException("Sink handle is blocked."); } @@ -223,6 +222,22 @@ public class SinkHandle implements ISinkHandle { logger.info("SinkHandle is aborted"); } + @Override + public void close() { + logger.info("SinkHandle is being closed."); + sequenceIdToTsBlock.clear(); + closed = true; + bufferRetainedSizeInBytes -= localMemoryManager.getQueryPool().tryComplete(blocked); + if (bufferRetainedSizeInBytes > 0) { + localMemoryManager + .getQueryPool() + .free(localFragmentInstanceId.getQueryId(), bufferRetainedSizeInBytes); + bufferRetainedSizeInBytes = 0; + } + sinkHandleListener.onFinish(this); + logger.info("SinkHandle is closed"); + } + @Override public boolean isAborted() { return aborted; @@ -306,6 +321,14 @@ public class SinkHandle implements ISinkHandle { localFragmentInstanceId.instanceId); } + private void checkState() { + if (aborted) { + throw new IllegalStateException("Sink handle is aborted."); + } else if (closed) { + throw new IllegalStateException("SinkHandle is closed."); + } + } + @TestOnly public void setRetryIntervalInMs(long retryIntervalInMs) { this.retryIntervalInMs = retryIntervalInMs; diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceExecution.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceExecution.java index ac70a848dd..c1fd9568a7 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceExecution.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceExecution.java @@ -114,7 +114,11 @@ public class FragmentInstanceExecution { driver.close(); // help for gc driver = null; - sinkHandle.abort(); + if (newState.isFailed()) { + sinkHandle.abort(); + } else { + sinkHandle.close(); + } // help for gc sinkHandle = null; if (newState.isFailed()) { diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/memory/MemoryPool.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/memory/MemoryPool.java index 2a67c35049..e3f4d0fc66 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/memory/MemoryPool.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/memory/MemoryPool.java @@ -153,6 +153,26 @@ public class MemoryPool { return ((MemoryReservationFuture<Void>) future).getBytes(); } + /** + * Complete the specified memory reservation. If the reservation has finished, do nothing. + * + * @param future The future returned from {@link #reserve(String, long)} + * @return If the future has not complete, return the number of bytes being reserved. Otherwise, + * return 0. + */ + public synchronized long tryComplete(ListenableFuture<Void> future) { + Validate.notNull(future); + // If the future is not a MemoryReservationFuture, it must have been completed. + if (future.isDone()) { + return 0L; + } + Validate.isTrue( + future instanceof MemoryReservationFuture, + "invalid future type " + future.getClass().getSimpleName()); + ((MemoryReservationFuture<Void>) future).set(null); + return ((MemoryReservationFuture<Void>) future).getBytes(); + } + public synchronized void free(String queryId, long bytes) { Validate.notNull(queryId); Validate.isTrue(bytes > 0L); diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/ClusterScheduler.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/ClusterScheduler.java index 73730864d2..9563357d12 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/ClusterScheduler.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/ClusterScheduler.java @@ -99,7 +99,6 @@ public class ClusterScheduler implements IScheduler { try { FragInstanceDispatchResult result = dispatchResultFuture.get(); if (!result.isSuccessful()) { - logger.error("dispatch failed."); if (result.getFailureStatus() != null) { stateMachine.transitionToFailed(result.getFailureStatus()); } else { diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FragmentInstanceDispatcherImpl.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FragmentInstanceDispatcherImpl.java index 21a4436a6b..4a7c403791 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FragmentInstanceDispatcherImpl.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FragmentInstanceDispatcherImpl.java @@ -159,6 +159,7 @@ public class FragmentInstanceDispatcherImpl implements IFragInstanceDispatcher { TSendFragmentInstanceResp sendFragmentInstanceResp = client.sendFragmentInstance(sendFragmentInstanceReq); if (!sendFragmentInstanceResp.accepted) { + logger.error(sendFragmentInstanceResp.message); throw new FragmentInstanceDispatchException( RpcUtils.getStatus( TSStatusCode.EXECUTE_STATEMENT_ERROR, sendFragmentInstanceResp.message)); @@ -208,6 +209,11 @@ public class FragmentInstanceDispatcherImpl implements IFragInstanceDispatcher { } else { readResponse = SchemaRegionConsensusImpl.getInstance().read(groupId, instance); } + if (readResponse == null) { + logger.error("ReadResponse is null"); + throw new FragmentInstanceDispatchException( + RpcUtils.getStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR, "ReadResponse is null")); + } } catch (Throwable t) { logger.error("Execute FragmentInstance in ConsensusGroup {} failed.", groupId, t); throw new FragmentInstanceDispatchException(
