This is an automated email from the ASF dual-hosted git repository.
jt2594838 pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/dev/1.3 by this push:
new 4593bd471d4 Pipe: Do not wait for sealed tsFiles close at realtime
(#17671) (#17681)
4593bd471d4 is described below
commit 4593bd471d4918264624057da32c9758c14d151a
Author: Caideyipi <[email protected]>
AuthorDate: Fri May 15 13:18:32 2026 +0800
Pipe: Do not wait for sealed tsFiles close at realtime (#17671) (#17681)
---
.../common/tsfile/PipeTsFileInsertionEvent.java | 56 ++++++++++++++-
.../pipe/event/PipeTsFileInsertionEventTest.java | 80 ++++++++++++++++++++++
2 files changed, 133 insertions(+), 3 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
index f099301a63c..53f0b16826a 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
@@ -78,6 +78,10 @@ public class PipeTsFileInsertionEvent extends EnrichedEvent
protected final boolean isGeneratedByPipeConsensus;
protected final boolean isGeneratedByHistoricalExtractor;
+ // Realtime TsFile events are created after TsFileProcessor#endFile(), so
the file is already
+ // immutable even if TsFileResource status is still UNCLOSED.
+ private final boolean isTsFileSealed;
+
protected final AtomicBoolean isClosed;
protected final AtomicReference<TsFileInsertionDataContainer> dataContainer;
@@ -91,7 +95,18 @@ public class PipeTsFileInsertionEvent extends EnrichedEvent
public PipeTsFileInsertionEvent(final TsFileResource resource, final boolean
isLoaded) {
// The modFile must be copied before the event is assigned to the
listening pipes
this(
- resource, null, true, isLoaded, false, null, 0, null, null,
Long.MIN_VALUE, Long.MAX_VALUE);
+ resource,
+ null,
+ true,
+ isLoaded,
+ false,
+ null,
+ 0,
+ null,
+ null,
+ Long.MIN_VALUE,
+ Long.MAX_VALUE,
+ true);
}
public PipeTsFileInsertionEvent(
@@ -106,7 +121,36 @@ public class PipeTsFileInsertionEvent extends EnrichedEvent
final PipePattern pattern,
final long startTime,
final long endTime) {
- super(pipeName, creationTime, pipeTaskMeta, pattern, startTime, endTime);
+ this(
+ resource,
+ tsFile,
+ isWithMod,
+ isLoaded,
+ isGeneratedByHistoricalExtractor,
+ pipeName,
+ creationTime,
+ pipeTaskMeta,
+ pattern,
+ startTime,
+ endTime,
+ false);
+ }
+
+ private PipeTsFileInsertionEvent(
+ final TsFileResource resource,
+ final File tsFile,
+ final boolean isWithMod,
+ final boolean isLoaded,
+ final boolean isGeneratedByHistoricalExtractor,
+ final String pipeName,
+ final long creationTime,
+ final PipeTaskMeta pipeTaskMeta,
+ final PipePattern pipePattern,
+ final long startTime,
+ final long endTime,
+ final boolean isTsFileSealed) {
+ super(pipeName, creationTime, pipeTaskMeta, pipePattern, startTime,
endTime);
+
this.resource = resource;
// For events created at assigner or historical extractor, the tsFile is
get from the resource
@@ -123,6 +167,7 @@ public class PipeTsFileInsertionEvent extends EnrichedEvent
this.isGeneratedByPipe = resource.isGeneratedByPipe();
this.isGeneratedByPipeConsensus = resource.isGeneratedByPipeConsensus();
this.isGeneratedByHistoricalExtractor = isGeneratedByHistoricalExtractor;
+ this.isTsFileSealed = isTsFileSealed;
this.tableNames = tableNames;
this.dataContainer = new AtomicReference<>(null);
@@ -180,6 +225,10 @@ public class PipeTsFileInsertionEvent extends EnrichedEvent
return true;
}
+ if (isTsFileSealed) {
+ return !resource.isEmpty();
+ }
+
if (!isClosed.get()) {
isClosed.set(resource.isClosed());
@@ -359,7 +408,8 @@ public class PipeTsFileInsertionEvent extends EnrichedEvent
pipeTaskMeta,
pattern,
startTime,
- endTime);
+ endTime,
+ isTsFileSealed);
}
@Override
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/PipeTsFileInsertionEventTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/PipeTsFileInsertionEventTest.java
new file mode 100644
index 00000000000..fd824d2d257
--- /dev/null
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/PipeTsFileInsertionEventTest.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.event;
+
+import org.apache.iotdb.commons.consensus.index.impl.SimpleProgressIndex;
+import org.apache.iotdb.commons.utils.FileUtils;
+import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent;
+import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
+import
org.apache.iotdb.db.storageengine.dataregion.tsfile.timeindex.DeviceTimeIndex;
+import
org.apache.iotdb.db.storageengine.dataregion.tsfile.timeindex.ITimeIndex;
+
+import org.apache.tsfile.file.metadata.IDeviceID;
+import org.apache.tsfile.file.metadata.PlainDeviceID;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+
+public class PipeTsFileInsertionEventTest {
+
+ @Test(timeout = 5000)
+ public void
testRealtimeEventCanSkipWaitingForClosedStatusAfterTsFileSealed() throws
Exception {
+ final File tempDir =
Files.createTempDirectory("pipeTsFileSealed").toFile();
+
+ try {
+ final TsFileResource resource =
+ createNonEmptyTsFileResource(tempDir, "realtime.tsfile", 1L, 1);
+ Assert.assertFalse(resource.isClosed());
+ Assert.assertFalse(resource.isEmpty());
+
+ final PipeTsFileInsertionEvent sourceEvent = new
PipeTsFileInsertionEvent(resource, false);
+ Assert.assertTrue(sourceEvent.waitForTsFileClose());
+
+ final PipeTsFileInsertionEvent copiedEvent =
+ sourceEvent.shallowCopySelfAndBindPipeTaskMetaForProgressReport(
+ "pipe", 1L, null, null, Long.MIN_VALUE, Long.MAX_VALUE);
+ Assert.assertTrue(copiedEvent.waitForTsFileClose());
+
+ copiedEvent.close();
+ sourceEvent.close();
+ } finally {
+ FileUtils.deleteFileOrDirectory(tempDir);
+ }
+ }
+
+ private TsFileResource createNonEmptyTsFileResource(
+ final File tempDir, final String fileName, final long flushOrderId,
final int dataRegionId)
+ throws IOException {
+ final File file = new File(tempDir, fileName);
+ Assert.assertTrue(file.createNewFile());
+
+ final TsFileResource resource = new TsFileResource(file);
+ resource.updateProgressIndex(new SimpleProgressIndex(1, flushOrderId));
+ final ITimeIndex timeIndex = new DeviceTimeIndex();
+ final IDeviceID deviceID = new PlainDeviceID("root.db.d" + dataRegionId);
+ timeIndex.putStartTime(deviceID, 1);
+ timeIndex.putEndTime(deviceID, 1);
+ resource.setTimeIndex(timeIndex);
+ return resource;
+ }
+}