This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/dev/1.3 by this push:
new e020df2aff6 [To dev/1.3] fix: memory to be released is larger than the
memory of memory block in TVList owner transfer case #16943 Open
e020df2aff6 is described below
commit e020df2aff657a41281797795e03e270416dab58
Author: shizy <[email protected]>
AuthorDate: Tue Dec 23 16:04:04 2025 +0800
[To dev/1.3] fix: memory to be released is larger than the memory of memory
block in TVList owner transfer case #16943 Open
---
.../fragment/FragmentInstanceContext.java | 32 +++--
.../plan/planner/LocalExecutionPlanner.java | 8 ++
.../memory/FakedMemoryReservationManager.java | 11 ++
.../planner/memory/MemoryReservationManager.java | 29 ++++
.../NotThreadSafeMemoryReservationManager.java | 23 ++++
.../memory/ThreadSafeMemoryReservationManager.java | 13 ++
.../fragment/FragmentInstanceExecutionTest.java | 149 +++++++++++++++++----
7 files changed, 230 insertions(+), 35 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java
index 2ee3f5dc1e7..1a9a80300f8 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java
@@ -57,6 +57,7 @@ import
org.apache.iotdb.mpp.rpc.thrift.TFetchFragmentInstanceStatisticsResp;
import org.apache.tsfile.file.metadata.IDeviceID;
import org.apache.tsfile.read.common.TimeRange;
import org.apache.tsfile.read.filter.basic.Filter;
+import org.apache.tsfile.utils.Pair;
import org.apache.tsfile.utils.RamUsageEstimator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -873,20 +874,33 @@ public class FragmentInstanceContext extends QueryContext
{
try {
queryContextSet.remove(this);
if (tvList.getOwnerQuery() == this) {
- if (queryContextSet.isEmpty()) {
- LOGGER.debug(
- "TVList {} is released by the query, FragmentInstance Id is
{}",
- tvList,
- this.getId());
+ if (LOGGER.isDebugEnabled()) {
+ if (queryContextSet.isEmpty()) {
+ LOGGER.debug(
+ "TVList {} is released by the query, FragmentInstance Id is
{}",
+ tvList,
+ this.getId());
+ }
memoryReservationManager.releaseMemoryCumulatively(tvList.calculateRamSize());
tvList.clear();
} else {
+ // Transfer memory to next query. It must be exception-safe as
this method is called
+ // during FragmentInstanceExecution cleanup. Any exception during
this process could
+ // prevent proper resource cleanup and cause memory leaks.
+ Pair<Long, Long> releasedBytes =
+
memoryReservationManager.releaseMemoryVirtually(tvList.calculateRamSize());
FragmentInstanceContext queryContext =
(FragmentInstanceContext) queryContextSet.iterator().next();
- LOGGER.debug(
- "TVList {} is now owned by another query, FragmentInstance Id
is {}",
- tvList,
- queryContext.getId());
+ queryContext
+ .getMemoryReservationContext()
+ .reserveMemoryVirtually(releasedBytes.left,
releasedBytes.right);
+
+ if (LOGGER.isDebugEnabled()) {
+ LOGGER.debug(
+ "TVList {} is now owned by another query, FragmentInstance
Id is {}",
+ tvList,
+ queryContext.getId());
+ }
tvList.setOwnerQuery(queryContext);
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LocalExecutionPlanner.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LocalExecutionPlanner.java
index d91e68e7413..97445414c65 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LocalExecutionPlanner.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LocalExecutionPlanner.java
@@ -244,6 +244,10 @@ public class LocalExecutionPlanner {
final long reservedBytes,
final String queryId,
final String contextHolder) {
+ if (memoryInBytes <= 0) {
+ throw new IllegalArgumentException(
+ "Bytes to reserve from free memory for operators should be larger
than 0");
+ }
if (memoryInBytes > freeMemoryForOperators) {
throw new MemoryNotEnoughException(
String.format(
@@ -264,6 +268,10 @@ public class LocalExecutionPlanner {
}
public synchronized void releaseToFreeMemoryForOperators(final long
memoryInBytes) {
+ if (memoryInBytes <= 0) {
+ throw new IllegalArgumentException(
+ "Bytes to release to free memory for operators should be larger than
0");
+ }
freeMemoryForOperators += memoryInBytes;
if (freeMemoryForOperators > ALLOCATE_MEMORY_FOR_OPERATORS) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/memory/FakedMemoryReservationManager.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/memory/FakedMemoryReservationManager.java
index 265ca47ca23..35ded4d6252 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/memory/FakedMemoryReservationManager.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/memory/FakedMemoryReservationManager.java
@@ -19,6 +19,8 @@
package org.apache.iotdb.db.queryengine.plan.planner.memory;
+import org.apache.tsfile.utils.Pair;
+
public class FakedMemoryReservationManager implements MemoryReservationManager
{
@Override
@@ -32,4 +34,13 @@ public class FakedMemoryReservationManager implements
MemoryReservationManager {
@Override
public void releaseAllReservedMemory() {}
+
+ @Override
+ public Pair<Long, Long> releaseMemoryVirtually(final long size) {
+ return new Pair<>(0L, 0L);
+ }
+
+ @Override
+ public void reserveMemoryVirtually(
+ final long bytesToBeReserved, final long bytesAlreadyReserved) {}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/memory/MemoryReservationManager.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/memory/MemoryReservationManager.java
index ba55d60b096..62393673120 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/memory/MemoryReservationManager.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/memory/MemoryReservationManager.java
@@ -19,6 +19,8 @@
package org.apache.iotdb.db.queryengine.plan.planner.memory;
+import org.apache.tsfile.utils.Pair;
+
public interface MemoryReservationManager {
/**
* Reserve memory for the given size. The memory reservation request will be
accumulated and the
@@ -43,4 +45,31 @@ public interface MemoryReservationManager {
* this manager ends, Or the memory to be released in the batch may not be
released correctly.
*/
void releaseAllReservedMemory();
+
+ /**
+ * Release memory virtually without actually freeing the memory. This is
used for memory
+ * reservation transfer scenarios where memory ownership needs to be
transferred between different
+ * FragmentInstances without actual memory deallocation.
+ *
+ * <p>NOTE: When calling this method, it should be guaranteed that
bytesToBeReserved +
+ * reservedBytesInTotal >= size to ensure proper memory accounting and
prevent negative
+ * reservation values.
+ *
+ * @param size the size of memory to release virtually
+ * @return a Pair where the left element is the amount of memory released
from the pending
+ * reservation queue (bytesToBeReserved), and the right element is the
amount of memory that
+ * has already been reserved
+ */
+ Pair<Long, Long> releaseMemoryVirtually(final long size);
+
+ /**
+ * Reserve memory virtually without actually allocating new memory. This is
used to transfer
+ * memory ownership from one FragmentInstances to another by reserving the
memory that was
+ * previously released virtually. It updates the internal reservation state
without changing the
+ * actual memory allocation.
+ *
+ * @param bytesToBeReserved the amount of memory that needs to be reserved
cumulatively.
+ * @param bytesAlreadyReserved the amount of memory that has already been
reserved
+ */
+ void reserveMemoryVirtually(final long bytesToBeReserved, final long
bytesAlreadyReserved);
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/memory/NotThreadSafeMemoryReservationManager.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/memory/NotThreadSafeMemoryReservationManager.java
index 7d8d4b076c2..4fa97f368ad 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/memory/NotThreadSafeMemoryReservationManager.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/memory/NotThreadSafeMemoryReservationManager.java
@@ -22,6 +22,8 @@ package org.apache.iotdb.db.queryengine.plan.planner.memory;
import org.apache.iotdb.db.queryengine.common.QueryId;
import org.apache.iotdb.db.queryengine.plan.planner.LocalExecutionPlanner;
+import org.apache.tsfile.utils.Pair;
+
import javax.annotation.concurrent.NotThreadSafe;
@NotThreadSafe
@@ -91,4 +93,25 @@ public class NotThreadSafeMemoryReservationManager
implements MemoryReservationM
bytesToBeReleased = 0;
}
}
+
+ @Override
+ public Pair<Long, Long> releaseMemoryVirtually(final long size) {
+ if (bytesToBeReserved >= size) {
+ bytesToBeReserved -= size;
+ return new Pair<>(size, 0L);
+ } else {
+ long releasedBytesInReserved = bytesToBeReserved;
+ long releasedBytesInTotal = size - bytesToBeReserved;
+ bytesToBeReserved = 0;
+ reservedBytesInTotal -= releasedBytesInTotal;
+ return new Pair<>(releasedBytesInReserved, releasedBytesInTotal);
+ }
+ }
+
+ @Override
+ public void reserveMemoryVirtually(
+ final long bytesToBeReserved, final long bytesAlreadyReserved) {
+ reservedBytesInTotal += bytesAlreadyReserved;
+ reserveMemoryCumulatively(bytesToBeReserved);
+ }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/memory/ThreadSafeMemoryReservationManager.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/memory/ThreadSafeMemoryReservationManager.java
index efe83d23c05..190c4fedd5c 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/memory/ThreadSafeMemoryReservationManager.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/memory/ThreadSafeMemoryReservationManager.java
@@ -21,6 +21,8 @@ package org.apache.iotdb.db.queryengine.plan.planner.memory;
import org.apache.iotdb.db.queryengine.common.QueryId;
+import org.apache.tsfile.utils.Pair;
+
import javax.annotation.concurrent.ThreadSafe;
@ThreadSafe
@@ -48,4 +50,15 @@ public class ThreadSafeMemoryReservationManager extends
NotThreadSafeMemoryReser
public synchronized void releaseAllReservedMemory() {
super.releaseAllReservedMemory();
}
+
+ @Override
+ public Pair<Long, Long> releaseMemoryVirtually(final long size) {
+ return super.releaseMemoryVirtually(size);
+ }
+
+ @Override
+ public void reserveMemoryVirtually(
+ final long bytesToBeReserved, final long bytesAlreadyReserved) {
+ super.reserveMemoryVirtually(bytesToBeReserved, bytesAlreadyReserved);
+ }
}
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceExecutionTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceExecutionTest.java
index 87e63ae0691..fb659c6914a 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceExecutionTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceExecutionTest.java
@@ -20,6 +20,7 @@
package org.apache.iotdb.db.queryengine.execution.fragment;
import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.queryengine.common.FragmentInstanceId;
import org.apache.iotdb.db.queryengine.common.PlanFragmentId;
import org.apache.iotdb.db.queryengine.exception.CpuNotEnoughException;
@@ -29,17 +30,26 @@ import
org.apache.iotdb.db.queryengine.execution.exchange.MPPDataExchangeManager
import org.apache.iotdb.db.queryengine.execution.exchange.sink.ISink;
import org.apache.iotdb.db.queryengine.execution.schedule.IDriverScheduler;
import org.apache.iotdb.db.storageengine.dataregion.DataRegion;
+import org.apache.iotdb.db.utils.datastructure.AlignedTVList;
+import org.apache.iotdb.db.utils.datastructure.TVList;
+import com.google.common.collect.ImmutableMap;
+import org.apache.tsfile.enums.TSDataType;
import org.junit.Test;
import org.mockito.Mockito;
+import java.io.ByteArrayOutputStream;
+import java.io.PrintStream;
+import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
+import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import static org.apache.iotdb.db.queryengine.common.QueryId.MOCK_QUERY_ID;
import static
org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceContext.createFragmentInstanceContext;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
@@ -50,32 +60,11 @@ public class FragmentInstanceExecutionTest {
ExecutorService instanceNotificationExecutor =
IoTDBThreadPoolFactory.newFixedThreadPool(1,
"test-instance-notification");
try {
- IDriverScheduler scheduler = Mockito.mock(IDriverScheduler.class);
- FragmentInstanceId instanceId =
- new FragmentInstanceId(new PlanFragmentId(MOCK_QUERY_ID, 0), "0");
- FragmentInstanceStateMachine stateMachine =
- new FragmentInstanceStateMachine(instanceId,
instanceNotificationExecutor);
- DataRegion dataRegion = Mockito.mock(DataRegion.class);
- FragmentInstanceContext fragmentInstanceContext =
- createFragmentInstanceContext(instanceId, stateMachine);
- fragmentInstanceContext.initializeNumOfDrivers(1);
- fragmentInstanceContext.setMayHaveTmpFile(true);
- fragmentInstanceContext.setDataRegion(dataRegion);
- List<IDriver> drivers = Collections.emptyList();
- ISink sinkHandle = Mockito.mock(ISink.class);
- long timeOut = -1;
- MPPDataExchangeManager exchangeManager =
Mockito.mock(MPPDataExchangeManager.class);
FragmentInstanceExecution execution =
- FragmentInstanceExecution.createFragmentInstanceExecution(
- scheduler,
- instanceId,
- fragmentInstanceContext,
- drivers,
- sinkHandle,
- stateMachine,
- timeOut,
- false,
- exchangeManager);
+ createFragmentInstanceExecution(0, instanceNotificationExecutor);
+ FragmentInstanceContext fragmentInstanceContext =
execution.getFragmentInstanceContext();
+ FragmentInstanceStateMachine stateMachine = execution.getStateMachine();
+
assertEquals(FragmentInstanceState.RUNNING,
execution.getInstanceState());
FragmentInstanceInfo instanceInfo = execution.getInstanceInfo();
assertEquals(FragmentInstanceState.RUNNING, instanceInfo.getState());
@@ -84,7 +73,7 @@ public class FragmentInstanceExecutionTest {
assertEquals(fragmentInstanceContext.getFailureInfoList(),
instanceInfo.getFailureInfoList());
assertEquals(fragmentInstanceContext.getStartTime(),
execution.getStartTime());
- assertEquals(timeOut, execution.getTimeoutInMs());
+ assertEquals(-1, execution.getTimeoutInMs());
assertEquals(stateMachine, execution.getStateMachine());
fragmentInstanceContext.decrementNumOfUnClosedDriver();
@@ -107,4 +96,112 @@ public class FragmentInstanceExecutionTest {
instanceNotificationExecutor.shutdown();
}
}
+
+ @Test
+ public void testTVListOwnerTransfer() throws InterruptedException {
+ // Capture System.err to check for warning messages
+ PrintStream systemOut = System.out;
+ ByteArrayOutputStream logPrint = new ByteArrayOutputStream();
+ System.setOut(new PrintStream(logPrint));
+
+ try {
+ IoTDBDescriptor.getInstance().getConfig().setDataNodeId(1);
+
+ ExecutorService instanceNotificationExecutor =
+ IoTDBThreadPoolFactory.newFixedThreadPool(1,
"test-instance-notification");
+ try {
+ // TVList
+ TVList tvList = buildTVList();
+
+ // FragmentInstance Context & Execution
+ FragmentInstanceExecution execution1 =
+ createFragmentInstanceExecution(1, instanceNotificationExecutor);
+ FragmentInstanceContext fragmentInstanceContext1 =
execution1.getFragmentInstanceContext();
+ fragmentInstanceContext1.addTVListToSet(ImmutableMap.of(tvList, 0));
+ tvList.getQueryContextSet().add(fragmentInstanceContext1);
+
+ FragmentInstanceExecution execution2 =
+ createFragmentInstanceExecution(2, instanceNotificationExecutor);
+ FragmentInstanceContext fragmentInstanceContext2 =
execution2.getFragmentInstanceContext();
+ fragmentInstanceContext2.addTVListToSet(ImmutableMap.of(tvList, 0));
+ tvList.getQueryContextSet().add(fragmentInstanceContext2);
+
+ // mock flush's behavior
+ fragmentInstanceContext1
+ .getMemoryReservationContext()
+ .reserveMemoryCumulatively(tvList.calculateRamSize());
+ tvList.setOwnerQuery(fragmentInstanceContext1);
+
+ fragmentInstanceContext1.decrementNumOfUnClosedDriver();
+ fragmentInstanceContext2.decrementNumOfUnClosedDriver();
+
+ fragmentInstanceContext1.getStateMachine().finished();
+ Thread.sleep(100);
+ fragmentInstanceContext2.getStateMachine().finished();
+
+ assertTrue(execution1.getInstanceState().isDone());
+ assertTrue(execution2.getInstanceState().isDone());
+ Thread.sleep(100);
+ } catch (CpuNotEnoughException | MemoryNotEnoughException |
IllegalArgumentException e) {
+ fail(e.getMessage());
+ } finally {
+ instanceNotificationExecutor.shutdown();
+ }
+ } finally {
+ // Restore original System.out
+ System.setErr(systemOut);
+
+ // should not contain warn message: "The memory cost to be released is
larger than the memory
+ // cost of memory block"
+ String capturedOutput = logPrint.toString();
+ assertFalse(
+ "Should not contain error message",
+ capturedOutput.contains("is more than allocated memory"));
+ }
+ }
+
+ private FragmentInstanceExecution createFragmentInstanceExecution(int id,
Executor executor)
+ throws CpuNotEnoughException {
+ IDriverScheduler scheduler = Mockito.mock(IDriverScheduler.class);
+ FragmentInstanceId instanceId =
+ new FragmentInstanceId(new PlanFragmentId(MOCK_QUERY_ID, id),
String.valueOf(id));
+ FragmentInstanceStateMachine stateMachine =
+ new FragmentInstanceStateMachine(instanceId, executor);
+ DataRegion dataRegion = Mockito.mock(DataRegion.class);
+ FragmentInstanceContext fragmentInstanceContext =
+ createFragmentInstanceContext(instanceId, stateMachine);
+ fragmentInstanceContext.initializeNumOfDrivers(1);
+ fragmentInstanceContext.setMayHaveTmpFile(true);
+ fragmentInstanceContext.setDataRegion(dataRegion);
+ List<IDriver> drivers = Collections.emptyList();
+ ISink sinkHandle = Mockito.mock(ISink.class);
+ long timeOut = -1;
+ MPPDataExchangeManager exchangeManager =
Mockito.mock(MPPDataExchangeManager.class);
+ return FragmentInstanceExecution.createFragmentInstanceExecution(
+ scheduler,
+ instanceId,
+ fragmentInstanceContext,
+ drivers,
+ sinkHandle,
+ stateMachine,
+ timeOut,
+ false,
+ exchangeManager);
+ }
+
+ private TVList buildTVList() {
+ int columns = 200;
+ int rows = 1000;
+ List<TSDataType> dataTypes = new ArrayList<>();
+ Object[] values = new Object[columns];
+ for (int i = 0; i < columns; i++) {
+ dataTypes.add(TSDataType.INT64);
+ values[i] = 1L;
+ }
+ AlignedTVList tvList = AlignedTVList.newAlignedList(dataTypes);
+ for (long t = 1; t < rows; t++) {
+ tvList.putAlignedValue(t, values);
+ }
+ return tvList;
+ }
}