This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch QueryException
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 55a032e3edfdef849194eb4f84dd3ee0676595d3
Author: JackieTien97 <[email protected]>
AuthorDate: Tue Jul 12 16:36:42 2022 +0800

    Done
---
 .../db/mpp/execution/exchange/ISinkHandle.java     | 10 ++++++
 .../db/mpp/execution/exchange/LocalSinkHandle.java | 40 +++++++++++++++++-----
 .../mpp/execution/exchange/LocalSourceHandle.java  |  2 +-
 .../mpp/execution/exchange/SharedTsBlockQueue.java | 18 +++++-----
 .../db/mpp/execution/exchange/SinkHandle.java      | 35 +++++++++++++++----
 .../fragment/FragmentInstanceExecution.java        |  6 +++-
 .../iotdb/db/mpp/execution/memory/MemoryPool.java  | 20 +++++++++++
 .../db/mpp/plan/scheduler/ClusterScheduler.java    |  1 -
 .../scheduler/FragmentInstanceDispatcherImpl.java  |  6 ++++
 9 files changed, 111 insertions(+), 27 deletions(-)

diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/ISinkHandle.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/ISinkHandle.java
index b4be67e164..4450ca8c21 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/ISinkHandle.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/ISinkHandle.java
@@ -68,6 +68,16 @@ public interface ISinkHandle {
   /**
    * Abort the sink handle. Discard all tsblocks which may still be in the 
memory buffer and cancel
    * the future returned by {@link #isFull()}.
+   *
+   * <p>Should only be called in abnormal case
    */
   void abort();
+
+  /**
+   * Close the sink handle. Discard all tsblocks which may still be in the 
memory buffer and
+   * complete the future returned by {@link #isFull()}.
+   *
+   * <p>Should only be called in normal case.
+   */
+  void close();
 }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/LocalSinkHandle.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/LocalSinkHandle.java
index 852e79ac8b..d6960e7664 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/LocalSinkHandle.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/LocalSinkHandle.java
@@ -45,6 +45,7 @@ public class LocalSinkHandle implements ISinkHandle {
   private final SharedTsBlockQueue queue;
   private volatile ListenableFuture<Void> blocked = immediateFuture(null);
   private boolean aborted = false;
+  private boolean closed = false;
 
   public LocalSinkHandle(
       TFragmentInstanceId remoteFragmentInstanceId,
@@ -72,9 +73,7 @@ public class LocalSinkHandle implements ISinkHandle {
 
   @Override
   public synchronized ListenableFuture<?> isFull() {
-    if (aborted) {
-      throw new IllegalStateException("Sink handle is closed.");
-    }
+    checkState();
     return nonCancellationPropagating(blocked);
   }
 
@@ -104,9 +103,7 @@ public class LocalSinkHandle implements ISinkHandle {
   public void send(List<TsBlock> tsBlocks) {
     Validate.notNull(tsBlocks, "tsBlocks is null");
     synchronized (this) {
-      if (aborted) {
-        throw new IllegalStateException("Sink handle is aborted.");
-      }
+      checkState();
       if (!blocked.isDone()) {
         throw new IllegalStateException("Sink handle is blocked.");
       }
@@ -135,7 +132,8 @@ public class LocalSinkHandle implements ISinkHandle {
     synchronized (queue) {
       synchronized (this) {
         logger.info("set noMoreTsBlocks.");
-        if (aborted) {
+        if (aborted || closed) {
+          logger.info("SinkHandle has been aborted={} or closed={}.", aborted, 
closed);
           return;
         }
         queue.setNoMoreTsBlocks(true);
@@ -151,17 +149,33 @@ public class LocalSinkHandle implements ISinkHandle {
     logger.info("Sink handle is being aborted.");
     synchronized (queue) {
       synchronized (this) {
-        if (aborted) {
+        if (aborted || closed) {
           return;
         }
         aborted = true;
-        queue.destroy();
+        queue.abort();
         sinkHandleListener.onAborted(this);
       }
     }
     logger.info("Sink handle is aborted");
   }
 
+  @Override
+  public void close() {
+    logger.info("Sink handle is being closed.");
+    synchronized (queue) {
+      synchronized (this) {
+        if (aborted || closed) {
+          return;
+        }
+        closed = true;
+        queue.close();
+        sinkHandleListener.onFinish(this);
+      }
+    }
+    logger.info("Sink handle is closed");
+  }
+
   public TFragmentInstanceId getRemoteFragmentInstanceId() {
     return remoteFragmentInstanceId;
   }
@@ -173,4 +187,12 @@ public class LocalSinkHandle implements ISinkHandle {
   SharedTsBlockQueue getSharedTsBlockQueue() {
     return queue;
   }
+
+  private void checkState() {
+    if (aborted) {
+      throw new IllegalStateException("Sink handle is aborted.");
+    } else if (closed) {
+      throw new IllegalStateException("Sink Handle is closed.");
+    }
+  }
 }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/LocalSourceHandle.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/LocalSourceHandle.java
index 95056cf44b..3bc455592e 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/LocalSourceHandle.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/LocalSourceHandle.java
@@ -166,7 +166,7 @@ public class LocalSourceHandle implements ISourceHandle {
           if (aborted) {
             return;
           }
-          queue.destroy();
+          queue.close();
           closed = true;
           sourceHandleListener.onFinished(this);
         }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SharedTsBlockQueue.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SharedTsBlockQueue.java
index 7cac64a6e3..29cc6fb44b 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SharedTsBlockQueue.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SharedTsBlockQueue.java
@@ -53,7 +53,7 @@ public class SharedTsBlockQueue {
 
   private ListenableFuture<Void> blockedOnMemory;
 
-  private boolean destroyed = false;
+  private boolean closed = false;
 
   private LocalSourceHandle sourceHandle;
   private LocalSinkHandle sinkHandle;
@@ -93,7 +93,7 @@ public class SharedTsBlockQueue {
   /** Notify no more tsblocks will be added to the queue. */
   public void setNoMoreTsBlocks(boolean noMoreTsBlocks) {
     logger.info("SharedTsBlockQueue receive no more TsBlocks signal.");
-    if (destroyed) {
+    if (closed) {
       throw new IllegalStateException("queue has been destroyed");
     }
     this.noMoreTsBlocks = noMoreTsBlocks;
@@ -110,7 +110,7 @@ public class SharedTsBlockQueue {
    * returned by {@link #isBlocked()} completes.
    */
   public TsBlock remove() {
-    if (destroyed) {
+    if (closed) {
       throw new IllegalStateException("queue has been destroyed");
     }
     TsBlock tsBlock = queue.remove();
@@ -134,7 +134,7 @@ public class SharedTsBlockQueue {
    * the returned future of last invocation completes.
    */
   public ListenableFuture<Void> add(TsBlock tsBlock) {
-    if (destroyed) {
+    if (closed) {
       throw new IllegalStateException("queue has been destroyed");
     }
 
@@ -153,11 +153,11 @@ public class SharedTsBlockQueue {
   }
 
   /** Destroy the queue and complete the future. Should only be called in 
normal case */
-  public void destroy() {
-    if (destroyed) {
+  public void close() {
+    if (closed) {
       return;
     }
-    destroyed = true;
+    closed = true;
     if (!blocked.isDone()) {
       blocked.set(null);
     }
@@ -177,10 +177,10 @@ public class SharedTsBlockQueue {
   // instead of blocked.cancel(true);
   /** Destroy the queue and cancel the future. Should only be called in normal 
case */
   public void abort() {
-    if (destroyed) {
+    if (closed) {
       return;
     }
-    destroyed = true;
+    closed = true;
     if (!blocked.isDone()) {
       blocked.cancel(true);
     }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SinkHandle.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SinkHandle.java
index 3619de44f4..e8247cc621 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SinkHandle.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SinkHandle.java
@@ -82,6 +82,9 @@ public class SinkHandle implements ISinkHandle {
   private long bufferRetainedSizeInBytes = 0;
 
   private boolean aborted = false;
+
+  private boolean closed = false;
+
   private boolean noMoreTsBlocks = false;
 
   public SinkHandle(
@@ -110,9 +113,7 @@ public class SinkHandle implements ISinkHandle {
 
   @Override
   public synchronized ListenableFuture<?> isFull() {
-    if (aborted) {
-      throw new IllegalStateException("Sink handle is aborted.");
-    }
+    checkState();
     return nonCancellationPropagating(blocked);
   }
 
@@ -123,9 +124,7 @@ public class SinkHandle implements ISinkHandle {
   @Override
   public synchronized void send(List<TsBlock> tsBlocks) {
     Validate.notNull(tsBlocks, "tsBlocks is null");
-    if (aborted) {
-      throw new IllegalStateException("Sink handle is aborted.");
-    }
+    checkState();
     if (!blocked.isDone()) {
       throw new IllegalStateException("Sink handle is blocked.");
     }
@@ -223,6 +222,22 @@ public class SinkHandle implements ISinkHandle {
     logger.info("SinkHandle is aborted");
   }
 
+  @Override
+  public void close() {
+    logger.info("SinkHandle is being closed.");
+    sequenceIdToTsBlock.clear();
+    closed = true;
+    bufferRetainedSizeInBytes -= 
localMemoryManager.getQueryPool().tryComplete(blocked);
+    if (bufferRetainedSizeInBytes > 0) {
+      localMemoryManager
+          .getQueryPool()
+          .free(localFragmentInstanceId.getQueryId(), 
bufferRetainedSizeInBytes);
+      bufferRetainedSizeInBytes = 0;
+    }
+    sinkHandleListener.onFinish(this);
+    logger.info("SinkHandle is closed");
+  }
+
   @Override
   public boolean isAborted() {
     return aborted;
@@ -306,6 +321,14 @@ public class SinkHandle implements ISinkHandle {
         localFragmentInstanceId.instanceId);
   }
 
+  private void checkState() {
+    if (aborted) {
+      throw new IllegalStateException("Sink handle is aborted.");
+    } else if (closed) {
+      throw new IllegalStateException("SinkHandle is closed.");
+    }
+  }
+
   @TestOnly
   public void setRetryIntervalInMs(long retryIntervalInMs) {
     this.retryIntervalInMs = retryIntervalInMs;
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceExecution.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceExecution.java
index ac70a848dd..c1fd9568a7 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceExecution.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceExecution.java
@@ -114,7 +114,11 @@ public class FragmentInstanceExecution {
             driver.close();
             // help for gc
             driver = null;
-            sinkHandle.abort();
+            if (newState.isFailed()) {
+              sinkHandle.abort();
+            } else {
+              sinkHandle.close();
+            }
             // help for gc
             sinkHandle = null;
             if (newState.isFailed()) {
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/memory/MemoryPool.java 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/memory/MemoryPool.java
index 2a67c35049..e3f4d0fc66 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/memory/MemoryPool.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/memory/MemoryPool.java
@@ -153,6 +153,26 @@ public class MemoryPool {
     return ((MemoryReservationFuture<Void>) future).getBytes();
   }
 
+  /**
+   * Complete the specified memory reservation. If the reservation has 
finished, do nothing.
+   *
+   * @param future The future returned from {@link #reserve(String, long)}
+   * @return If the future has not complete, return the number of bytes being 
reserved. Otherwise,
+   *     return 0.
+   */
+  public synchronized long tryComplete(ListenableFuture<Void> future) {
+    Validate.notNull(future);
+    // If the future is not a MemoryReservationFuture, it must have been 
completed.
+    if (future.isDone()) {
+      return 0L;
+    }
+    Validate.isTrue(
+        future instanceof MemoryReservationFuture,
+        "invalid future type " + future.getClass().getSimpleName());
+    ((MemoryReservationFuture<Void>) future).set(null);
+    return ((MemoryReservationFuture<Void>) future).getBytes();
+  }
+
   public synchronized void free(String queryId, long bytes) {
     Validate.notNull(queryId);
     Validate.isTrue(bytes > 0L);
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/ClusterScheduler.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/ClusterScheduler.java
index 73730864d2..9563357d12 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/ClusterScheduler.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/ClusterScheduler.java
@@ -99,7 +99,6 @@ public class ClusterScheduler implements IScheduler {
     try {
       FragInstanceDispatchResult result = dispatchResultFuture.get();
       if (!result.isSuccessful()) {
-        logger.error("dispatch failed.");
         if (result.getFailureStatus() != null) {
           stateMachine.transitionToFailed(result.getFailureStatus());
         } else {
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FragmentInstanceDispatcherImpl.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FragmentInstanceDispatcherImpl.java
index 21a4436a6b..4a7c403791 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FragmentInstanceDispatcherImpl.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FragmentInstanceDispatcherImpl.java
@@ -159,6 +159,7 @@ public class FragmentInstanceDispatcherImpl implements 
IFragInstanceDispatcher {
           TSendFragmentInstanceResp sendFragmentInstanceResp =
               client.sendFragmentInstance(sendFragmentInstanceReq);
           if (!sendFragmentInstanceResp.accepted) {
+            logger.error(sendFragmentInstanceResp.message);
             throw new FragmentInstanceDispatchException(
                 RpcUtils.getStatus(
                     TSStatusCode.EXECUTE_STATEMENT_ERROR, 
sendFragmentInstanceResp.message));
@@ -208,6 +209,11 @@ public class FragmentInstanceDispatcherImpl implements 
IFragInstanceDispatcher {
           } else {
             readResponse = 
SchemaRegionConsensusImpl.getInstance().read(groupId, instance);
           }
+          if (readResponse == null) {
+            logger.error("ReadResponse is null");
+            throw new FragmentInstanceDispatchException(
+                RpcUtils.getStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR, 
"ReadResponse is null"));
+          }
         } catch (Throwable t) {
           logger.error("Execute FragmentInstance in ConsensusGroup {} 
failed.", groupId, t);
           throw new FragmentInstanceDispatchException(

Reply via email to