This is an automated email from the ASF dual-hosted git repository.

haonan pushed a commit to branch rc/1.3.1
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit ad87f69f5951402afdd9c1a3da99d25640484bac
Author: Caideyipi <[email protected]>
AuthorDate: Tue Mar 12 16:40:24 2024 +0800

    Pipe: Fixed potential lose point bug when an unclosed tsfile reaches 
`isEventTimeOverlappedWithTimeRange()` judgement (CI fails on 
testInsertNullValueTemplate) (#12156)
    
    * Fixed lose point bug
    
    * enable batch
    
    ---------
    
    Co-authored-by: Steve Yurong Su <[email protected]>
---
 .../java/org/apache/iotdb/db/pipe/event/EnrichedEvent.java |  7 ++++++-
 .../db/pipe/event/common/heartbeat/PipeHeartbeatEvent.java |  2 +-
 .../common/tablet/PipeInsertNodeTabletInsertionEvent.java  |  2 +-
 .../event/common/tablet/PipeRawTabletInsertionEvent.java   |  2 +-
 .../pipe/event/common/tsfile/PipeTsFileInsertionEvent.java |  8 ++++++--
 .../iotdb/db/pipe/event/realtime/PipeRealtimeEvent.java    |  4 ++--
 .../realtime/PipeRealtimeDataRegionExtractor.java          |  2 +-
 .../compaction/execute/task/InnerSpaceCompactionTask.java  |  1 -
 .../selector/impl/RewriteCrossSpaceCompactionSelector.java |  1 -
 .../iotdb/db/pipe/event/PipeTabletInsertionEventTest.java  | 14 +++++++-------
 10 files changed, 25 insertions(+), 18 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/EnrichedEvent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/EnrichedEvent.java
index a2a98f78ad0..836a25f75bc 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/EnrichedEvent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/EnrichedEvent.java
@@ -225,7 +225,12 @@ public abstract class EnrichedEvent implements Event {
 
   public abstract boolean isGeneratedByPipe();
 
-  public abstract boolean isEventTimeOverlappedWithTimeRange();
+  /** Whether the {@link EnrichedEvent} need to be committed in order. */
+  public boolean needToCommit() {
+    return true;
+  }
+
+  public abstract boolean mayEventTimeOverlappedWithTimeRange();
 
   public void setCommitterKeyAndCommitId(String committerKey, long commitId) {
     this.committerKey = committerKey;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/heartbeat/PipeHeartbeatEvent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/heartbeat/PipeHeartbeatEvent.java
index 0212d64e065..fbeaf489fbf 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/heartbeat/PipeHeartbeatEvent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/heartbeat/PipeHeartbeatEvent.java
@@ -120,7 +120,7 @@ public class PipeHeartbeatEvent extends EnrichedEvent {
   }
 
   @Override
-  public boolean isEventTimeOverlappedWithTimeRange() {
+  public boolean mayEventTimeOverlappedWithTimeRange() {
     return true;
   }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java
index 98ab1c7b2ac..8ed05ee4ec9 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java
@@ -164,7 +164,7 @@ public class PipeInsertNodeTabletInsertionEvent extends 
EnrichedEvent
   }
 
   @Override
-  public boolean isEventTimeOverlappedWithTimeRange() {
+  public boolean mayEventTimeOverlappedWithTimeRange() {
     try {
       InsertNode insertNode = getInsertNode();
       if (insertNode instanceof InsertRowNode) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java
index 94dd2b11cbe..b7fc6bea81e 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java
@@ -136,7 +136,7 @@ public class PipeRawTabletInsertionEvent extends 
EnrichedEvent implements Tablet
   }
 
   @Override
-  public boolean isEventTimeOverlappedWithTimeRange() {
+  public boolean mayEventTimeOverlappedWithTimeRange() {
     long[] timestamps = tablet.timestamps;
     if (Objects.isNull(timestamps) || timestamps.length == 0) {
       return false;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
index e4d15e4b0c7..d516c252ddf 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
@@ -179,8 +179,12 @@ public class PipeTsFileInsertionEvent extends 
EnrichedEvent implements TsFileIns
   }
 
   @Override
-  public boolean isEventTimeOverlappedWithTimeRange() {
-    return startTime <= resource.getFileEndTime() && 
resource.getFileStartTime() <= endTime;
+  public boolean mayEventTimeOverlappedWithTimeRange() {
+    // If the tsFile is not closed the resource.getFileEndTime() will be 
Long.MIN_VALUE
+    // In that case we only judge the resource.getFileStartTime() to avoid 
losing data
+    return isClosed.get()
+        ? startTime <= resource.getFileEndTime() && 
resource.getFileStartTime() <= endTime
+        : resource.getFileStartTime() <= endTime;
   }
 
   /////////////////////////// TsFileInsertionEvent ///////////////////////////
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/realtime/PipeRealtimeEvent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/realtime/PipeRealtimeEvent.java
index 12626adb302..efa03126275 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/realtime/PipeRealtimeEvent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/realtime/PipeRealtimeEvent.java
@@ -154,8 +154,8 @@ public class PipeRealtimeEvent extends EnrichedEvent {
   }
 
   @Override
-  public boolean isEventTimeOverlappedWithTimeRange() {
-    return event.isEventTimeOverlappedWithTimeRange();
+  public boolean mayEventTimeOverlappedWithTimeRange() {
+    return event.mayEventTimeOverlappedWithTimeRange();
   }
 
   @Override
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionExtractor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionExtractor.java
index f12c24ee718..60744e6c0e6 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionExtractor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionExtractor.java
@@ -248,7 +248,7 @@ public abstract class PipeRealtimeDataRegionExtractor 
implements PipeExtractor {
     // 2. Check if the timestamps of the data contained in this event 
intersect with the time range.
     // If there is no intersection, it indicates that this data will be 
filtered out by the
     // extractor, and the extract process is skipped.
-    if (!event.shouldParseTime() || 
event.getEvent().isEventTimeOverlappedWithTimeRange()) {
+    if (!event.shouldParseTime() || 
event.getEvent().mayEventTimeOverlappedWithTimeRange()) {
       doExtract(event);
     } else {
       
event.decreaseReferenceCount(PipeRealtimeDataRegionExtractor.class.getName(), 
false);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/InnerSpaceCompactionTask.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/InnerSpaceCompactionTask.java
index f679e28dd28..7fcc419db46 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/InnerSpaceCompactionTask.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/InnerSpaceCompactionTask.java
@@ -47,7 +47,6 @@ import org.apache.iotdb.tsfile.utils.TsFileUtils;
 
 import java.io.File;
 import java.io.IOException;
-import java.nio.channels.ClosedByInterruptException;
 import java.nio.file.Files;
 import java.util.ArrayList;
 import java.util.Collections;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/impl/RewriteCrossSpaceCompactionSelector.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/impl/RewriteCrossSpaceCompactionSelector.java
index 01053cdbe8b..fdc1ea8fbf4 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/impl/RewriteCrossSpaceCompactionSelector.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/impl/RewriteCrossSpaceCompactionSelector.java
@@ -44,7 +44,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
-import java.nio.channels.ClosedByInterruptException;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/PipeTabletInsertionEventTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/PipeTabletInsertionEventTest.java
index 011d26f2ff0..6a29a403739 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/PipeTabletInsertionEventTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/PipeTabletInsertionEventTest.java
@@ -331,19 +331,19 @@ public class PipeTabletInsertionEventTest {
     PipeRawTabletInsertionEvent event;
 
     event = new PipeRawTabletInsertionEvent(tabletForInsertRowNode, 111L, 
113L);
-    Assert.assertFalse(event.isEventTimeOverlappedWithTimeRange());
+    Assert.assertFalse(event.mayEventTimeOverlappedWithTimeRange());
     event = new PipeRawTabletInsertionEvent(tabletForInsertRowNode, 110L, 
110L);
-    Assert.assertTrue(event.isEventTimeOverlappedWithTimeRange());
+    Assert.assertTrue(event.mayEventTimeOverlappedWithTimeRange());
 
     event = new PipeRawTabletInsertionEvent(tabletForInsertTabletNode, 111L, 
113L);
-    Assert.assertTrue(event.isEventTimeOverlappedWithTimeRange());
+    Assert.assertTrue(event.mayEventTimeOverlappedWithTimeRange());
     event = new PipeRawTabletInsertionEvent(tabletForInsertTabletNode, 
Long.MIN_VALUE, 110L);
-    Assert.assertTrue(event.isEventTimeOverlappedWithTimeRange());
+    Assert.assertTrue(event.mayEventTimeOverlappedWithTimeRange());
     event = new PipeRawTabletInsertionEvent(tabletForInsertTabletNode, 114L, 
Long.MAX_VALUE);
-    Assert.assertTrue(event.isEventTimeOverlappedWithTimeRange());
+    Assert.assertTrue(event.mayEventTimeOverlappedWithTimeRange());
     event = new PipeRawTabletInsertionEvent(tabletForInsertTabletNode, 
Long.MIN_VALUE, 109L);
-    Assert.assertFalse(event.isEventTimeOverlappedWithTimeRange());
+    Assert.assertFalse(event.mayEventTimeOverlappedWithTimeRange());
     event = new PipeRawTabletInsertionEvent(tabletForInsertTabletNode, 115L, 
Long.MAX_VALUE);
-    Assert.assertFalse(event.isEventTimeOverlappedWithTimeRange());
+    Assert.assertFalse(event.mayEventTimeOverlappedWithTimeRange());
   }
 }

Reply via email to