This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 57428ae2b9 [IOTDB-4199] Fix NPE in LocalSourceHandle and memory leak
in SourceHandle (#7082)
57428ae2b9 is described below
commit 57428ae2b999154b39a496f908db26ce5d367baa
Author: Jackie Tien <[email protected]>
AuthorDate: Tue Aug 23 10:13:51 2022 +0800
[IOTDB-4199] Fix NPE in LocalSourceHandle and memory leak in SourceHandle
(#7082)
---
.../mpp/execution/exchange/SharedTsBlockQueue.java | 38 +++++++++++++++++-----
.../db/mpp/execution/exchange/SourceHandle.java | 10 +++---
2 files changed, 34 insertions(+), 14 deletions(-)
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SharedTsBlockQueue.java
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SharedTsBlockQueue.java
index aacffe4796..1222ee5ef0 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SharedTsBlockQueue.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SharedTsBlockQueue.java
@@ -22,6 +22,7 @@ package org.apache.iotdb.db.mpp.execution.exchange;
import org.apache.iotdb.db.mpp.execution.memory.LocalMemoryManager;
import org.apache.iotdb.mpp.rpc.thrift.TFragmentInstanceId;
import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+import org.apache.iotdb.tsfile.utils.Pair;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
@@ -34,6 +35,9 @@ import javax.annotation.concurrent.NotThreadSafe;
import java.util.LinkedList;
import java.util.Queue;
+import static com.google.common.util.concurrent.Futures.immediateVoidFuture;
+import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
+
/** This is not thread safe class, the caller should ensure multi-threads
safety. */
@NotThreadSafe
public class SharedTsBlockQueue {
@@ -94,7 +98,8 @@ public class SharedTsBlockQueue {
public void setNoMoreTsBlocks(boolean noMoreTsBlocks) {
logger.info("SharedTsBlockQueue receive no more TsBlocks signal.");
if (closed) {
- throw new IllegalStateException("queue has been destroyed");
+ logger.warn("queue has been destroyed");
+ return;
}
this.noMoreTsBlocks = noMoreTsBlocks;
if (!blocked.isDone()) {
@@ -135,21 +140,38 @@ public class SharedTsBlockQueue {
*/
public ListenableFuture<Void> add(TsBlock tsBlock) {
if (closed) {
- throw new IllegalStateException("queue has been destroyed");
+ logger.warn("queue has been destroyed");
+ return immediateVoidFuture();
}
Validate.notNull(tsBlock, "TsBlock cannot be null");
Validate.isTrue(blockedOnMemory == null || blockedOnMemory.isDone(),
"queue is full");
- blockedOnMemory =
+ Pair<ListenableFuture<Void>, Boolean> pair =
localMemoryManager
.getQueryPool()
- .reserve(localFragmentInstanceId.getQueryId(),
tsBlock.getRetainedSizeInBytes())
- .left;
+ .reserve(localFragmentInstanceId.getQueryId(),
tsBlock.getRetainedSizeInBytes());
+ blockedOnMemory = pair.left;
bufferRetainedSizeInBytes += tsBlock.getRetainedSizeInBytes();
- queue.add(tsBlock);
- if (!blocked.isDone()) {
- blocked.set(null);
+
+ // reserve memory failed, we should wait until there is enough memory
+ if (!pair.right) {
+ blockedOnMemory.addListener(
+ () -> {
+ synchronized (this) {
+ queue.add(tsBlock);
+ if (!blocked.isDone()) {
+ blocked.set(null);
+ }
+ }
+ },
+ directExecutor());
+ } else { // reserve memory succeeded, add the TsBlock directly
+ queue.add(tsBlock);
+ if (!blocked.isDone()) {
+ blocked.set(null);
+ }
}
+
return blockedOnMemory;
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SourceHandle.java
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SourceHandle.java
index 1e4fa305e4..c0a353f8b9 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SourceHandle.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SourceHandle.java
@@ -129,13 +129,11 @@ public class SourceHandle implements ISourceHandle {
if (tsBlock == null) {
return null;
}
- logger.info(
- "Receive {} TsBlock, size is {}", currSequenceId,
tsBlock.getRetainedSizeInBytes());
+ long retainedSize = sequenceIdToDataBlockSize.remove(currSequenceId);
+ logger.info("Receive {} TsBlock, size is {}", currSequenceId,
retainedSize);
currSequenceId += 1;
- bufferRetainedSizeInBytes -= tsBlock.getRetainedSizeInBytes();
- localMemoryManager
- .getQueryPool()
- .free(localFragmentInstanceId.getQueryId(),
tsBlock.getRetainedSizeInBytes());
+ bufferRetainedSizeInBytes -= retainedSize;
+
localMemoryManager.getQueryPool().free(localFragmentInstanceId.getQueryId(),
retainedSize);
if (sequenceIdToTsBlock.isEmpty() && !isFinished()) {
logger.info("no buffered TsBlock, blocked");