This is an automated email from the ASF dual-hosted git repository. haonan pushed a commit to branch rc/1.3.1 in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit ad87f69f5951402afdd9c1a3da99d25640484bac Author: Caideyipi <[email protected]> AuthorDate: Tue Mar 12 16:40:24 2024 +0800 Pipe: Fixed potential lose point bug when an unclosed tsfile reaches `isEventTimeOverlappedWithTimeRange()` judgement (CI fails on testInsertNullValueTemplate) (#12156) * Fixed lose point bug * enable batch --------- Co-authored-by: Steve Yurong Su <[email protected]> --- .../java/org/apache/iotdb/db/pipe/event/EnrichedEvent.java | 7 ++++++- .../db/pipe/event/common/heartbeat/PipeHeartbeatEvent.java | 2 +- .../common/tablet/PipeInsertNodeTabletInsertionEvent.java | 2 +- .../event/common/tablet/PipeRawTabletInsertionEvent.java | 2 +- .../pipe/event/common/tsfile/PipeTsFileInsertionEvent.java | 8 ++++++-- .../iotdb/db/pipe/event/realtime/PipeRealtimeEvent.java | 4 ++-- .../realtime/PipeRealtimeDataRegionExtractor.java | 2 +- .../compaction/execute/task/InnerSpaceCompactionTask.java | 1 - .../selector/impl/RewriteCrossSpaceCompactionSelector.java | 1 - .../iotdb/db/pipe/event/PipeTabletInsertionEventTest.java | 14 +++++++------- 10 files changed, 25 insertions(+), 18 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/EnrichedEvent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/EnrichedEvent.java index a2a98f78ad0..836a25f75bc 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/EnrichedEvent.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/EnrichedEvent.java @@ -225,7 +225,12 @@ public abstract class EnrichedEvent implements Event { public abstract boolean isGeneratedByPipe(); - public abstract boolean isEventTimeOverlappedWithTimeRange(); + /** Whether the {@link EnrichedEvent} need to be committed in order. */ + public boolean needToCommit() { + return true; + } + + public abstract boolean mayEventTimeOverlappedWithTimeRange(); public void setCommitterKeyAndCommitId(String committerKey, long commitId) { this.committerKey = committerKey; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/heartbeat/PipeHeartbeatEvent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/heartbeat/PipeHeartbeatEvent.java index 0212d64e065..fbeaf489fbf 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/heartbeat/PipeHeartbeatEvent.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/heartbeat/PipeHeartbeatEvent.java @@ -120,7 +120,7 @@ public class PipeHeartbeatEvent extends EnrichedEvent { } @Override - public boolean isEventTimeOverlappedWithTimeRange() { + public boolean mayEventTimeOverlappedWithTimeRange() { return true; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java index 98ab1c7b2ac..8ed05ee4ec9 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java @@ -164,7 +164,7 @@ public class PipeInsertNodeTabletInsertionEvent extends EnrichedEvent } @Override - public boolean isEventTimeOverlappedWithTimeRange() { + public boolean mayEventTimeOverlappedWithTimeRange() { try { InsertNode insertNode = getInsertNode(); if (insertNode instanceof InsertRowNode) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java index 94dd2b11cbe..b7fc6bea81e 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java @@ -136,7 +136,7 @@ public class PipeRawTabletInsertionEvent extends EnrichedEvent implements Tablet } @Override - public boolean isEventTimeOverlappedWithTimeRange() { + public boolean mayEventTimeOverlappedWithTimeRange() { long[] timestamps = tablet.timestamps; if (Objects.isNull(timestamps) || timestamps.length == 0) { return false; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java index e4d15e4b0c7..d516c252ddf 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java @@ -179,8 +179,12 @@ public class PipeTsFileInsertionEvent extends EnrichedEvent implements TsFileIns } @Override - public boolean isEventTimeOverlappedWithTimeRange() { - return startTime <= resource.getFileEndTime() && resource.getFileStartTime() <= endTime; + public boolean mayEventTimeOverlappedWithTimeRange() { + // If the tsFile is not closed the resource.getFileEndTime() will be Long.MIN_VALUE + // In that case we only judge the resource.getFileStartTime() to avoid losing data + return isClosed.get() + ? startTime <= resource.getFileEndTime() && resource.getFileStartTime() <= endTime + : resource.getFileStartTime() <= endTime; } /////////////////////////// TsFileInsertionEvent /////////////////////////// diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/realtime/PipeRealtimeEvent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/realtime/PipeRealtimeEvent.java index 12626adb302..efa03126275 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/realtime/PipeRealtimeEvent.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/realtime/PipeRealtimeEvent.java @@ -154,8 +154,8 @@ public class PipeRealtimeEvent extends EnrichedEvent { } @Override - public boolean isEventTimeOverlappedWithTimeRange() { - return event.isEventTimeOverlappedWithTimeRange(); + public boolean mayEventTimeOverlappedWithTimeRange() { + return event.mayEventTimeOverlappedWithTimeRange(); } @Override diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionExtractor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionExtractor.java index f12c24ee718..60744e6c0e6 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionExtractor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionExtractor.java @@ -248,7 +248,7 @@ public abstract class PipeRealtimeDataRegionExtractor implements PipeExtractor { // 2. Check if the timestamps of the data contained in this event intersect with the time range. // If there is no intersection, it indicates that this data will be filtered out by the // extractor, and the extract process is skipped. - if (!event.shouldParseTime() || event.getEvent().isEventTimeOverlappedWithTimeRange()) { + if (!event.shouldParseTime() || event.getEvent().mayEventTimeOverlappedWithTimeRange()) { doExtract(event); } else { event.decreaseReferenceCount(PipeRealtimeDataRegionExtractor.class.getName(), false); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/InnerSpaceCompactionTask.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/InnerSpaceCompactionTask.java index f679e28dd28..7fcc419db46 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/InnerSpaceCompactionTask.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/InnerSpaceCompactionTask.java @@ -47,7 +47,6 @@ import org.apache.iotdb.tsfile.utils.TsFileUtils; import java.io.File; import java.io.IOException; -import java.nio.channels.ClosedByInterruptException; import java.nio.file.Files; import java.util.ArrayList; import java.util.Collections; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/impl/RewriteCrossSpaceCompactionSelector.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/impl/RewriteCrossSpaceCompactionSelector.java index 01053cdbe8b..fdc1ea8fbf4 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/impl/RewriteCrossSpaceCompactionSelector.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/impl/RewriteCrossSpaceCompactionSelector.java @@ -44,7 +44,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; -import java.nio.channels.ClosedByInterruptException; import java.util.ArrayList; import java.util.Collections; import java.util.List; diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/PipeTabletInsertionEventTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/PipeTabletInsertionEventTest.java index 011d26f2ff0..6a29a403739 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/PipeTabletInsertionEventTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/PipeTabletInsertionEventTest.java @@ -331,19 +331,19 @@ public class PipeTabletInsertionEventTest { PipeRawTabletInsertionEvent event; event = new PipeRawTabletInsertionEvent(tabletForInsertRowNode, 111L, 113L); - Assert.assertFalse(event.isEventTimeOverlappedWithTimeRange()); + Assert.assertFalse(event.mayEventTimeOverlappedWithTimeRange()); event = new PipeRawTabletInsertionEvent(tabletForInsertRowNode, 110L, 110L); - Assert.assertTrue(event.isEventTimeOverlappedWithTimeRange()); + Assert.assertTrue(event.mayEventTimeOverlappedWithTimeRange()); event = new PipeRawTabletInsertionEvent(tabletForInsertTabletNode, 111L, 113L); - Assert.assertTrue(event.isEventTimeOverlappedWithTimeRange()); + Assert.assertTrue(event.mayEventTimeOverlappedWithTimeRange()); event = new PipeRawTabletInsertionEvent(tabletForInsertTabletNode, Long.MIN_VALUE, 110L); - Assert.assertTrue(event.isEventTimeOverlappedWithTimeRange()); + Assert.assertTrue(event.mayEventTimeOverlappedWithTimeRange()); event = new PipeRawTabletInsertionEvent(tabletForInsertTabletNode, 114L, Long.MAX_VALUE); - Assert.assertTrue(event.isEventTimeOverlappedWithTimeRange()); + Assert.assertTrue(event.mayEventTimeOverlappedWithTimeRange()); event = new PipeRawTabletInsertionEvent(tabletForInsertTabletNode, Long.MIN_VALUE, 109L); - Assert.assertFalse(event.isEventTimeOverlappedWithTimeRange()); + Assert.assertFalse(event.mayEventTimeOverlappedWithTimeRange()); event = new PipeRawTabletInsertionEvent(tabletForInsertTabletNode, 115L, Long.MAX_VALUE); - Assert.assertFalse(event.isEventTimeOverlappedWithTimeRange()); + Assert.assertFalse(event.mayEventTimeOverlappedWithTimeRange()); } }
