This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new f687dcb7f76 Fix potential memory leak in MemoryPool
f687dcb7f76 is described below
commit f687dcb7f76067176ee549b3b2407c64436ce51a
Author: Liao Lanyu <[email protected]>
AuthorDate: Wed Jul 26 20:05:48 2023 +0800
Fix potential memory leak in MemoryPool
---
.../execution/exchange/MPPDataExchangeManager.java | 2 +-
.../execution/exchange/sink/SinkChannel.java | 2 +-
.../queryengine/execution/memory/MemoryPool.java | 91 ++++++----------------
3 files changed, 25 insertions(+), 70 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/MPPDataExchangeManager.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/MPPDataExchangeManager.java
index b92e45eeb2e..886befdf4e9 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/MPPDataExchangeManager.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/MPPDataExchangeManager.java
@@ -551,7 +551,7 @@ public class MPPDataExchangeManager implements
IMPPDataExchangeManager {
public void deRegisterFragmentInstanceFromMemoryPool(String queryId, String
fragmentInstanceId) {
localMemoryManager
.getQueryPool()
- .deRegisterFragmentInstanceToQueryMemoryMap(queryId,
fragmentInstanceId);
+ .deRegisterFragmentInstanceFromQueryMemoryMap(queryId,
fragmentInstanceId);
}
public LocalMemoryManager getLocalMemoryManager() {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/sink/SinkChannel.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/sink/SinkChannel.java
index f1a31847237..cc5b7299466 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/sink/SinkChannel.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/sink/SinkChannel.java
@@ -258,7 +258,7 @@ public class SinkChannel implements ISinkChannel {
}
sequenceIdToTsBlock.clear();
if (blocked != null) {
- bufferRetainedSizeInBytes -=
localMemoryManager.getQueryPool().tryComplete(blocked);
+ bufferRetainedSizeInBytes -=
localMemoryManager.getQueryPool().tryCancel(blocked);
}
if (bufferRetainedSizeInBytes > 0) {
localMemoryManager
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/memory/MemoryPool.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/memory/MemoryPool.java
index 6fd1cbd5b1f..211b614e99b 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/memory/MemoryPool.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/memory/MemoryPool.java
@@ -32,9 +32,7 @@ import org.slf4j.LoggerFactory;
import javax.annotation.Nullable;
-import java.util.ArrayList;
import java.util.Iterator;
-import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.ConcurrentHashMap;
@@ -58,8 +56,6 @@ public class MemoryPool {
*/
private final long maxBytesCanReserve;
- private boolean isMarked = false;
-
private MemoryReservationFuture(
String queryId,
String fragmentInstanceId,
@@ -80,14 +76,6 @@ public class MemoryPool {
return queryId;
}
- public boolean isMarked() {
- return isMarked;
- }
-
- public void setMarked(boolean marked) {
- isMarked = marked;
- }
-
public String getFragmentInstanceId() {
return fragmentInstanceId;
}
@@ -188,20 +176,17 @@ public class MemoryPool {
*
* @throws MemoryLeakException throw {@link MemoryLeakException}
*/
- public void deRegisterFragmentInstanceToQueryMemoryMap(
+ public void deRegisterFragmentInstanceFromQueryMemoryMap(
String queryId, String fragmentInstanceId) {
Map<String, Map<String, Long>> queryRelatedMemory =
queryMemoryReservations.get(queryId);
if (queryRelatedMemory != null) {
Map<String, Long> fragmentRelatedMemory =
queryRelatedMemory.get(fragmentInstanceId);
+ boolean hasPotentialMemoryLeak = false;
// fragmentRelatedMemory could be null if the FI has not reserved any
memory(For example,
// next() of root operator returns no data)
if (fragmentRelatedMemory != null) {
- for (Long memoryReserved : fragmentRelatedMemory.values()) {
- if (memoryReserved != 0) {
- throw new MemoryLeakException(
- "PlanNode related memory is not zero when deregister FI from
query memory pool.");
- }
- }
+ hasPotentialMemoryLeak =
+ fragmentRelatedMemory.values().stream().anyMatch(value -> value !=
0);
}
synchronized (queryMemoryReservations) {
queryRelatedMemory.remove(fragmentInstanceId);
@@ -209,6 +194,10 @@ public class MemoryPool {
queryMemoryReservations.remove(queryId);
}
}
+ if (hasPotentialMemoryLeak) {
+ throw new MemoryLeakException(
+ "PlanNode related memory is not zero when trying to deregister FI
from query memory pool. QueryId is : {}, FragmentInstanceId is : {}, PlanNode
related memory is : {}.");
+ }
}
}
@@ -288,36 +277,21 @@ public class MemoryPool {
* @return If the future has not complete, return the number of bytes being
reserved. Otherwise,
* return 0.
*/
+ @SuppressWarnings("squid:S2445")
public synchronized long tryCancel(ListenableFuture<Void> future) {
- Validate.notNull(future);
- // If the future is not a MemoryReservationFuture, it must have been
completed.
- if (future.isDone()) {
- return 0L;
- }
- Validate.isTrue(
- future instanceof MemoryReservationFuture,
- "invalid future type " + future.getClass().getSimpleName());
- future.cancel(true);
- return ((MemoryReservationFuture<Void>) future).getBytesToReserve();
- }
-
- /**
- * Complete the specified memory reservation. If the reservation has
finished, do nothing.
- *
- * @param future The future returned from {@link #reserve(String, String,
String, long, long)}
- * @return If the future has not complete, return the number of bytes being
reserved. Otherwise,
- * return 0.
- */
- public synchronized long tryComplete(ListenableFuture<Void> future) {
- Validate.notNull(future);
- // If the future is not a MemoryReservationFuture, it must have been
completed.
- if (future.isDone()) {
- return 0L;
+ // add synchronized on the future to avoid that the future is concurrently
completed by
+ // MemoryPool.free() which may lead to memory leak.
+ synchronized (future) {
+ Validate.notNull(future);
+ // If the future is not a MemoryReservationFuture, it must have been
completed.
+ if (future.isDone()) {
+ return 0L;
+ }
+ Validate.isTrue(
+ future instanceof MemoryReservationFuture,
+ "invalid future type " + future.getClass().getSimpleName());
+ future.cancel(true);
}
- Validate.isTrue(
- future instanceof MemoryReservationFuture,
- "invalid future type " + future.getClass().getSimpleName());
- ((MemoryReservationFuture<Void>) future).set(null);
return ((MemoryReservationFuture<Void>) future).getBytesToReserve();
}
@@ -343,7 +317,6 @@ public class MemoryPool {
remainingBytes.addAndGet(bytes);
- List<MemoryReservationFuture<Void>> futureList = new ArrayList<>();
if (memoryReservationFutures.isEmpty()) {
return;
}
@@ -351,7 +324,7 @@ public class MemoryPool {
while (iterator.hasNext()) {
MemoryReservationFuture<Void> future = iterator.next();
synchronized (future) {
- if (future.isCancelled() || future.isDone() || future.isMarked()) {
+ if (future.isCancelled() || future.isDone()) {
continue;
}
long bytesToReserve = future.getBytesToReserve();
@@ -361,31 +334,13 @@ public class MemoryPool {
long maxBytesCanReserve = future.getMaxBytesCanReserve();
if (tryReserve(
curQueryId, curFragmentInstanceId, curPlanNodeId, bytesToReserve,
maxBytesCanReserve)) {
- futureList.add(future);
- future.setMarked(true);
+ future.set(null);
iterator.remove();
} else {
rollbackReserve(curQueryId, curFragmentInstanceId, curPlanNodeId,
bytesToReserve);
}
}
}
-
- // why we need to put this outside MemoryPool's lock?
- // If we put this block inside the MemoryPool's lock, we will get deadlock
case like the
- // following:
- // Assuming that thread-A: LocalSourceHandle.receive() ->
A-SharedTsBlockQueue.remove() ->
- // MemoryPool.free() (hold future's lock) -> future.set(null) -> try to get
- // B-SharedTsBlockQueue's lock
- // thread-B: LocalSourceHandle.receive() -> B-SharedTsBlockQueue.remove()
(hold
- // B-SharedTsBlockQueue's lock) -> try to get future's lock
- for (MemoryReservationFuture<Void> future : futureList) {
- try {
- future.set(null);
- } catch (Throwable t) {
- // ignore it, because we still need to notify other future
- LOGGER.warn("error happened while trying to free memory: ", t);
- }
- }
}
public long getQueryMemoryReservedBytes(String queryId) {