This is an automated email from the ASF dual-hosted git repository. Caideyipi pushed a commit to branch no-listen-sealed in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 02be9628d37914fd0d59aecffa8b5ee482d8a7e1 Author: Caideyipi <[email protected]> AuthorDate: Thu May 14 20:30:00 2026 +0800 no-listen-sealed --- .../common/tsfile/PipeTsFileInsertionEvent.java | 58 +++++++++++++++++++++- .../pipe/event/PipeTsFileInsertionEventTest.java | 46 +++++++++++++++-- 2 files changed, 98 insertions(+), 6 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java index 02ffcfcb2b4..081bd4fd702 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java @@ -91,6 +91,9 @@ public class PipeTsFileInsertionEvent extends PipeInsertionEvent protected final boolean isGeneratedByPipe; protected final boolean isGeneratedByIoTConsensusV2; protected final boolean isGeneratedByHistoricalExtractor; + // Realtime TsFile events are created after TsFileProcessor#endFile(), so the file is already + // immutable even if TsFileResource status is still UNCLOSED. + private final boolean isTsFileSealed; private final AtomicBoolean isClosed; private final AtomicReference<TsFileInsertionEventParser> eventParser; @@ -130,7 +133,8 @@ public class PipeTsFileInsertionEvent extends PipeInsertionEvent null, true, Long.MIN_VALUE, - Long.MAX_VALUE); + Long.MAX_VALUE, + true); } public PipeTsFileInsertionEvent( @@ -153,6 +157,50 @@ public class PipeTsFileInsertionEvent extends PipeInsertionEvent final boolean skipIfNoPrivileges, final long startTime, final long endTime) { + this( + isTableModelEvent, + databaseNameFromDataRegion, + resource, + tsFile, + isWithMod, + isLoaded, + isGeneratedByHistoricalExtractor, + tableNames, + pipeName, + creationTime, + pipeTaskMeta, + treePattern, + tablePattern, + userId, + userName, + cliHostname, + skipIfNoPrivileges, + startTime, + endTime, + false); + } + + private PipeTsFileInsertionEvent( + final Boolean isTableModelEvent, + final String databaseNameFromDataRegion, + final TsFileResource resource, + final File tsFile, + final boolean isWithMod, + final boolean isLoaded, + final boolean isGeneratedByHistoricalExtractor, + final Set<String> tableNames, + final String pipeName, + final long creationTime, + final PipeTaskMeta pipeTaskMeta, + final TreePattern treePattern, + final TablePattern tablePattern, + final String userId, + final String userName, + final String cliHostname, + final boolean skipIfNoPrivileges, + final long startTime, + final long endTime, + final boolean isTsFileSealed) { super( pipeName, creationTime, @@ -186,6 +234,7 @@ public class PipeTsFileInsertionEvent extends PipeInsertionEvent this.isGeneratedByPipe = resource.isGeneratedByPipe(); this.isGeneratedByIoTConsensusV2 = resource.isGeneratedByIoTConsensusV2(); this.isGeneratedByHistoricalExtractor = isGeneratedByHistoricalExtractor; + this.isTsFileSealed = isTsFileSealed; this.tableNames = tableNames; isClosed = new AtomicBoolean(resource.isClosed()); @@ -242,6 +291,10 @@ public class PipeTsFileInsertionEvent extends PipeInsertionEvent return true; } + if (isTsFileSealed) { + return !resource.isEmpty(); + } + if (!isClosed.get()) { isClosed.set(resource.isClosed()); @@ -452,7 +505,8 @@ public class PipeTsFileInsertionEvent extends PipeInsertionEvent cliHostname, skipIfNoPrivileges, startTime, - endTime) + endTime, + isTsFileSealed) .bindTsFileDedupScopeID(tsFileDedupScopeID); } diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/PipeTsFileInsertionEventTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/PipeTsFileInsertionEventTest.java index 338a851cc90..3d57e632baa 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/PipeTsFileInsertionEventTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/PipeTsFileInsertionEventTest.java @@ -242,18 +242,56 @@ public class PipeTsFileInsertionEventTest { } } + @Test(timeout = 5000) + public void testRealtimeEventCanSkipWaitingForClosedStatusAfterTsFileSealed() throws Exception { + final File tempDir = Files.createTempDirectory("pipeTsFileSealed").toFile(); + + try { + final TsFileResource resource = + createNonEmptyTsFileResource(tempDir, "realtime.tsfile", 1L, 1); + Assert.assertFalse(resource.isClosed()); + Assert.assertFalse(resource.isEmpty()); + + final PipeTsFileInsertionEvent sourceEvent = + new PipeTsFileInsertionEvent(false, "root.db", resource, false); + Assert.assertTrue(sourceEvent.waitForTsFileClose()); + + final PipeTsFileInsertionEvent copiedEvent = + sourceEvent.shallowCopySelfAndBindPipeTaskMetaForProgressReport( + "pipe", 1L, null, null, null, null, null, null, true, Long.MIN_VALUE, Long.MAX_VALUE); + Assert.assertTrue(copiedEvent.waitForTsFileClose()); + + copiedEvent.close(); + sourceEvent.close(); + } finally { + FileUtils.deleteFileOrDirectory(tempDir); + } + } + private TsFileResource createSpyTsFileResource( final File tempDir, final String fileName, final long flushOrderId, final int dataRegionId) throws IOException { + final TsFileResource resource = + createNonEmptyTsFileResource(tempDir, fileName, flushOrderId, dataRegionId); + final TsFileResource spyResource = Mockito.spy(resource); + Mockito.doReturn(String.valueOf(dataRegionId)).when(spyResource).getDataRegionId(); + return spyResource; + } + + private TsFileResource createNonEmptyTsFileResource( + final File tempDir, final String fileName, final long flushOrderId, final int dataRegionId) + throws IOException { final File file = new File(tempDir, fileName); Assert.assertTrue(file.createNewFile()); final TsFileResource resource = new TsFileResource(file); resource.updateProgressIndex(new SimpleProgressIndex(1, flushOrderId)); - - final TsFileResource spyResource = Mockito.spy(resource); - Mockito.doReturn(String.valueOf(dataRegionId)).when(spyResource).getDataRegionId(); - return spyResource; + final ITimeIndex timeIndex = new ArrayDeviceTimeIndex(); + final IDeviceID deviceID = IDeviceID.Factory.DEFAULT_FACTORY.create("root.db.d" + dataRegionId); + timeIndex.putStartTime(deviceID, 1); + timeIndex.putEndTime(deviceID, 1); + resource.setTimeIndex(timeIndex); + return resource; } static class TestAccessControl implements AccessControl {
