This is an automated email from the ASF dual-hosted git repository. JackieTien97 pushed a commit to branch ty/ci-stable in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 7628a8479d8687c56d35ad93797e0e88b752876d Author: JackieTien97 <[email protected]> AuthorDate: Tue May 19 12:23:09 2026 +0800 Make some uts more stable --- .../fragment/FragmentInstanceExecutionTest.java | 124 ++++++++++++--------- .../operator/SingleDeviceViewOperatorTest.java | 3 + 2 files changed, 74 insertions(+), 53 deletions(-) diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceExecutionTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceExecutionTest.java index 99a94eb02a3..93290fdf4da 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceExecutionTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceExecutionTest.java @@ -51,6 +51,7 @@ import org.apache.tsfile.file.metadata.enums.CompressionType; import org.apache.tsfile.file.metadata.enums.TSEncoding; import org.apache.tsfile.read.reader.IPointReader; import org.apache.tsfile.write.schema.MeasurementSchema; +import org.junit.BeforeClass; import org.junit.Test; import org.mockito.Mockito; @@ -62,6 +63,7 @@ import java.util.Collections; import java.util.List; import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; import static org.apache.iotdb.db.queryengine.common.QueryId.MOCK_QUERY_ID; import static org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceContext.createFragmentInstanceContext; @@ -72,8 +74,16 @@ import static org.junit.Assert.fail; public class FragmentInstanceExecutionTest { + @BeforeClass + public static void setUpClass() { + // Initialize DataNodeId before any test to avoid ExceptionInInitializerError when + // Coordinator.<clinit> is triggered indirectly by async state-change listeners + // (e.g., via QueryRelatedResourceMetricSet -> Coordinator -> QueryIdGenerator). + IoTDBDescriptor.getInstance().getConfig().setDataNodeId(1); + } + @Test - public void testFragmentInstanceExecution() { + public void testFragmentInstanceExecution() throws InterruptedException { ExecutorService instanceNotificationExecutor = IoTDBThreadPoolFactory.newFixedThreadPool(1, "test-instance-notification"); try { @@ -110,75 +120,84 @@ public class FragmentInstanceExecutionTest { e.printStackTrace(); fail(e.getMessage()); } finally { - instanceNotificationExecutor.shutdown(); + shutdownAndAwaitTermination(instanceNotificationExecutor); } } @Test public void testTVListOwnerTransfer() throws InterruptedException { - // Capture System.err to check for warning messages + ExecutorService instanceNotificationExecutor = + IoTDBThreadPoolFactory.newFixedThreadPool(1, "test-instance-notification"); + + // Capture System.out to check for warning messages. Set up the capture only after the + // executor is created so that any thread-pool init logging does not pollute the captured + // output. Capture must also be torn down before awaiting executor termination so any + // late async log output from this test goes to the original stream, not the captured one. PrintStream systemOut = System.out; ByteArrayOutputStream logPrint = new ByteArrayOutputStream(); System.setOut(new PrintStream(logPrint)); try { - IoTDBDescriptor.getInstance().getConfig().setDataNodeId(1); - - ExecutorService instanceNotificationExecutor = - IoTDBThreadPoolFactory.newFixedThreadPool(1, "test-instance-notification"); - try { - // TVList - TVList tvList = buildTVList(); - - // FragmentInstance Context & Execution - FragmentInstanceExecution execution1 = - createFragmentInstanceExecution(1, instanceNotificationExecutor); - FragmentInstanceContext fragmentInstanceContext1 = execution1.getFragmentInstanceContext(); - fragmentInstanceContext1.addTVListToSet(ImmutableMap.of(tvList, 0)); - tvList.getQueryContextSet().add(fragmentInstanceContext1); - - FragmentInstanceExecution execution2 = - createFragmentInstanceExecution(2, instanceNotificationExecutor); - FragmentInstanceContext fragmentInstanceContext2 = execution2.getFragmentInstanceContext(); - fragmentInstanceContext2.addTVListToSet(ImmutableMap.of(tvList, 0)); - tvList.getQueryContextSet().add(fragmentInstanceContext2); - - // mock flush's behavior - fragmentInstanceContext1 - .getMemoryReservationContext() - .reserveMemoryCumulatively(tvList.calculateRamSize().getRamSize()); - tvList.setOwnerQuery(fragmentInstanceContext1); - - fragmentInstanceContext1.decrementNumOfUnClosedDriver(); - fragmentInstanceContext2.decrementNumOfUnClosedDriver(); - - fragmentInstanceContext1.getStateMachine().finished(); - Thread.sleep(100); - fragmentInstanceContext2.getStateMachine().finished(); - - assertTrue(execution1.getInstanceState().isDone()); - assertTrue(execution2.getInstanceState().isDone()); - Thread.sleep(100); - } catch (CpuNotEnoughException | MemoryNotEnoughException | IllegalArgumentException e) { - fail(e.getMessage()); - } finally { - instanceNotificationExecutor.shutdown(); - } + // TVList + TVList tvList = buildTVList(); + + // FragmentInstance Context & Execution + FragmentInstanceExecution execution1 = + createFragmentInstanceExecution(1, instanceNotificationExecutor); + FragmentInstanceContext fragmentInstanceContext1 = execution1.getFragmentInstanceContext(); + fragmentInstanceContext1.addTVListToSet(ImmutableMap.of(tvList, 0)); + tvList.getQueryContextSet().add(fragmentInstanceContext1); + + FragmentInstanceExecution execution2 = + createFragmentInstanceExecution(2, instanceNotificationExecutor); + FragmentInstanceContext fragmentInstanceContext2 = execution2.getFragmentInstanceContext(); + fragmentInstanceContext2.addTVListToSet(ImmutableMap.of(tvList, 0)); + tvList.getQueryContextSet().add(fragmentInstanceContext2); + + // mock flush's behavior + fragmentInstanceContext1 + .getMemoryReservationContext() + .reserveMemoryCumulatively(tvList.calculateRamSize().getRamSize()); + tvList.setOwnerQuery(fragmentInstanceContext1); + + fragmentInstanceContext1.decrementNumOfUnClosedDriver(); + fragmentInstanceContext2.decrementNumOfUnClosedDriver(); + + fragmentInstanceContext1.getStateMachine().finished(); + Thread.sleep(100); + fragmentInstanceContext2.getStateMachine().finished(); + + assertTrue(execution1.getInstanceState().isDone()); + assertTrue(execution2.getInstanceState().isDone()); + } catch (CpuNotEnoughException | MemoryNotEnoughException | IllegalArgumentException e) { + fail(e.getMessage()); } finally { - // Restore original System.out + // Restore original System.out before waiting for the executor so that any late + // async log output from listeners is written to the real stdout, not the captured buffer. System.setOut(systemOut); + shutdownAndAwaitTermination(instanceNotificationExecutor); // should not contain warn message: "The memory cost to be released is larger than the memory // cost of memory block" String capturedOutput = logPrint.toString(); - assertTrue(capturedOutput.isEmpty()); + assertFalse( + "captured stdout should not contain memory-block release warning, but was:\n" + + capturedOutput, + capturedOutput.contains( + "The memory cost to be released is larger than the memory cost of memory block")); } } - @Test - public void testTVListCloneForQuery() { - IoTDBDescriptor.getInstance().getConfig().setDataNodeId(1); + private static void shutdownAndAwaitTermination(ExecutorService executor) + throws InterruptedException { + executor.shutdown(); + if (!executor.awaitTermination(10, TimeUnit.SECONDS)) { + executor.shutdownNow(); + } + } + @Test + public void testTVListCloneForQuery() throws InterruptedException { ExecutorService instanceNotificationExecutor = IoTDBThreadPoolFactory.newFixedThreadPool(1, "test-instance-notification"); @@ -218,8 +237,7 @@ public class FragmentInstanceExecutionTest { Collections.emptyMap())); ReadOnlyMemChunk readOnlyMemChunk1 = memTable.query(fragmentInstanceContext1, fullPath, Long.MIN_VALUE, null, null); - ReadOnlyMemChunk readOnlyMemChunk2 = - memTable.query(fragmentInstanceContext2, fullPath, Long.MIN_VALUE, null, null); + memTable.query(fragmentInstanceContext2, fullPath, Long.MIN_VALUE, null, null); IPointReader pointReader = readOnlyMemChunk1.getPointReader(); while (pointReader.hasNextTimeValuePair()) { @@ -234,7 +252,7 @@ public class FragmentInstanceExecutionTest { | IllegalArgumentException e) { fail(e.getMessage()); } finally { - instanceNotificationExecutor.shutdown(); + shutdownAndAwaitTermination(instanceNotificationExecutor); } } diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/SingleDeviceViewOperatorTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/SingleDeviceViewOperatorTest.java index cfe53f4e708..f88cdef3e2b 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/SingleDeviceViewOperatorTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/SingleDeviceViewOperatorTest.java @@ -174,6 +174,9 @@ public class SingleDeviceViewOperatorTest { int total = 0; while (singleDeviceViewOperator.isBlocked().isDone() && singleDeviceViewOperator.hasNext()) { TsBlock tsBlock = singleDeviceViewOperator.next(); + if (tsBlock == null || tsBlock.isEmpty()) { + continue; + } assertEquals(4, tsBlock.getValueColumnCount()); total += tsBlock.getPositionCount(); for (int i = 0; i < tsBlock.getPositionCount(); i++) {
