This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 90616032db0 Pipe: Fixed potential lose point bug when an unclosed 
tsfile reaches `isEventTimeOverlappedWithTimeRange()` judgement (CI fails on 
testInsertNullValueTemplate) (#12156)
90616032db0 is described below

commit 90616032db017bdadf5193bd5356974400b9a090
Author: Caideyipi <[email protected]>
AuthorDate: Tue Mar 12 16:40:24 2024 +0800

    Pipe: Fixed potential lose point bug when an unclosed tsfile reaches 
`isEventTimeOverlappedWithTimeRange()` judgement (CI fails on 
testInsertNullValueTemplate) (#12156)
    
    * Fixed lose point bug
    
    * enable batch
    
    ---------
    
    Co-authored-by: Steve Yurong Su <[email protected]>
---
 .../iotdb/pipe/it/autocreate/IoTDBPipeNullValueIT.java     |  1 -
 .../iotdb/db/pipe/event/UserDefinedEnrichedEvent.java      |  4 ++--
 .../db/pipe/event/common/heartbeat/PipeHeartbeatEvent.java |  2 +-
 .../common/tablet/PipeInsertNodeTabletInsertionEvent.java  |  2 +-
 .../event/common/tablet/PipeRawTabletInsertionEvent.java   |  2 +-
 .../pipe/event/common/tsfile/PipeTsFileInsertionEvent.java |  8 ++++++--
 .../iotdb/db/pipe/event/realtime/PipeRealtimeEvent.java    |  4 ++--
 .../realtime/PipeRealtimeDataRegionExtractor.java          |  2 +-
 .../iotdb/db/pipe/event/PipeTabletInsertionEventTest.java  | 14 +++++++-------
 .../org/apache/iotdb/commons/pipe/event/EnrichedEvent.java |  2 +-
 .../apache/iotdb/commons/pipe/event/PipeSnapshotEvent.java |  2 +-
 .../iotdb/commons/pipe/event/PipeWritePlanEvent.java       |  2 +-
 12 files changed, 24 insertions(+), 21 deletions(-)

diff --git 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeNullValueIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeNullValueIT.java
index b640809a928..008765713d0 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeNullValueIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeNullValueIT.java
@@ -216,7 +216,6 @@ public class IoTDBPipeNullValueIT extends 
AbstractPipeDualAutoIT {
       connectorAttributes.put("connector", "iotdb-thrift-connector");
       connectorAttributes.put("connector.ip", receiverIp);
       connectorAttributes.put("connector.port", 
Integer.toString(receiverPort));
-      connectorAttributes.put("connector.batch.enable", "false");
 
       if (withParsing) {
         extractorAttributes.put("start-time", "1970-01-01T08:00:00.000+08:00");
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/UserDefinedEnrichedEvent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/UserDefinedEnrichedEvent.java
index 492b85bae6d..2a2eb25721f 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/UserDefinedEnrichedEvent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/UserDefinedEnrichedEvent.java
@@ -81,7 +81,7 @@ public class UserDefinedEnrichedEvent extends EnrichedEvent {
   }
 
   @Override
-  public boolean isEventTimeOverlappedWithTimeRange() {
-    return enrichedEvent.isEventTimeOverlappedWithTimeRange();
+  public boolean mayEventTimeOverlappedWithTimeRange() {
+    return enrichedEvent.mayEventTimeOverlappedWithTimeRange();
   }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/heartbeat/PipeHeartbeatEvent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/heartbeat/PipeHeartbeatEvent.java
index ead8455ff6e..a5c0e267488 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/heartbeat/PipeHeartbeatEvent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/heartbeat/PipeHeartbeatEvent.java
@@ -120,7 +120,7 @@ public class PipeHeartbeatEvent extends EnrichedEvent {
   }
 
   @Override
-  public boolean isEventTimeOverlappedWithTimeRange() {
+  public boolean mayEventTimeOverlappedWithTimeRange() {
     return true;
   }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java
index 42db580a502..3a09e865b09 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java
@@ -166,7 +166,7 @@ public class PipeInsertNodeTabletInsertionEvent extends 
EnrichedEvent
   }
 
   @Override
-  public boolean isEventTimeOverlappedWithTimeRange() {
+  public boolean mayEventTimeOverlappedWithTimeRange() {
     try {
       InsertNode insertNode = getInsertNode();
       if (insertNode instanceof InsertRowNode) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java
index 5b3c995e5c9..02386f1e375 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java
@@ -146,7 +146,7 @@ public class PipeRawTabletInsertionEvent extends 
EnrichedEvent implements Tablet
   }
 
   @Override
-  public boolean isEventTimeOverlappedWithTimeRange() {
+  public boolean mayEventTimeOverlappedWithTimeRange() {
     long[] timestamps = tablet.timestamps;
     if (Objects.isNull(timestamps) || timestamps.length == 0) {
       return false;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
index 67d1747900a..dfa229bf108 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
@@ -179,8 +179,12 @@ public class PipeTsFileInsertionEvent extends 
EnrichedEvent implements TsFileIns
   }
 
   @Override
-  public boolean isEventTimeOverlappedWithTimeRange() {
-    return startTime <= resource.getFileEndTime() && 
resource.getFileStartTime() <= endTime;
+  public boolean mayEventTimeOverlappedWithTimeRange() {
+    // If the tsFile is not closed the resource.getFileEndTime() will be 
Long.MIN_VALUE
+    // In that case we only judge the resource.getFileStartTime() to avoid 
losing data
+    return isClosed.get()
+        ? startTime <= resource.getFileEndTime() && 
resource.getFileStartTime() <= endTime
+        : resource.getFileStartTime() <= endTime;
   }
 
   /////////////////////////// TsFileInsertionEvent ///////////////////////////
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/realtime/PipeRealtimeEvent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/realtime/PipeRealtimeEvent.java
index 4ebe4c1936a..e693f6dae4b 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/realtime/PipeRealtimeEvent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/realtime/PipeRealtimeEvent.java
@@ -155,8 +155,8 @@ public class PipeRealtimeEvent extends EnrichedEvent {
   }
 
   @Override
-  public boolean isEventTimeOverlappedWithTimeRange() {
-    return event.isEventTimeOverlappedWithTimeRange();
+  public boolean mayEventTimeOverlappedWithTimeRange() {
+    return event.mayEventTimeOverlappedWithTimeRange();
   }
 
   @Override
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionExtractor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionExtractor.java
index 1f8d72206e0..f564f737539 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionExtractor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionExtractor.java
@@ -245,7 +245,7 @@ public abstract class PipeRealtimeDataRegionExtractor 
implements PipeExtractor {
     // 2. Check if the timestamps of the data contained in this event 
intersect with the time range.
     // If there is no intersection, it indicates that this data will be 
filtered out by the
     // extractor, and the extract process is skipped.
-    if (!event.shouldParseTime() || 
event.getEvent().isEventTimeOverlappedWithTimeRange()) {
+    if (!event.shouldParseTime() || 
event.getEvent().mayEventTimeOverlappedWithTimeRange()) {
       doExtract(event);
     } else {
       
event.decreaseReferenceCount(PipeRealtimeDataRegionExtractor.class.getName(), 
false);
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/PipeTabletInsertionEventTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/PipeTabletInsertionEventTest.java
index 011d26f2ff0..6a29a403739 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/PipeTabletInsertionEventTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/PipeTabletInsertionEventTest.java
@@ -331,19 +331,19 @@ public class PipeTabletInsertionEventTest {
     PipeRawTabletInsertionEvent event;
 
     event = new PipeRawTabletInsertionEvent(tabletForInsertRowNode, 111L, 
113L);
-    Assert.assertFalse(event.isEventTimeOverlappedWithTimeRange());
+    Assert.assertFalse(event.mayEventTimeOverlappedWithTimeRange());
     event = new PipeRawTabletInsertionEvent(tabletForInsertRowNode, 110L, 
110L);
-    Assert.assertTrue(event.isEventTimeOverlappedWithTimeRange());
+    Assert.assertTrue(event.mayEventTimeOverlappedWithTimeRange());
 
     event = new PipeRawTabletInsertionEvent(tabletForInsertTabletNode, 111L, 
113L);
-    Assert.assertTrue(event.isEventTimeOverlappedWithTimeRange());
+    Assert.assertTrue(event.mayEventTimeOverlappedWithTimeRange());
     event = new PipeRawTabletInsertionEvent(tabletForInsertTabletNode, 
Long.MIN_VALUE, 110L);
-    Assert.assertTrue(event.isEventTimeOverlappedWithTimeRange());
+    Assert.assertTrue(event.mayEventTimeOverlappedWithTimeRange());
     event = new PipeRawTabletInsertionEvent(tabletForInsertTabletNode, 114L, 
Long.MAX_VALUE);
-    Assert.assertTrue(event.isEventTimeOverlappedWithTimeRange());
+    Assert.assertTrue(event.mayEventTimeOverlappedWithTimeRange());
     event = new PipeRawTabletInsertionEvent(tabletForInsertTabletNode, 
Long.MIN_VALUE, 109L);
-    Assert.assertFalse(event.isEventTimeOverlappedWithTimeRange());
+    Assert.assertFalse(event.mayEventTimeOverlappedWithTimeRange());
     event = new PipeRawTabletInsertionEvent(tabletForInsertTabletNode, 115L, 
Long.MAX_VALUE);
-    Assert.assertFalse(event.isEventTimeOverlappedWithTimeRange());
+    Assert.assertFalse(event.mayEventTimeOverlappedWithTimeRange());
   }
 }
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/EnrichedEvent.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/EnrichedEvent.java
index bf1044ae4e0..e9e50f958cc 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/EnrichedEvent.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/EnrichedEvent.java
@@ -243,7 +243,7 @@ public abstract class EnrichedEvent implements Event {
     return true;
   }
 
-  public abstract boolean isEventTimeOverlappedWithTimeRange();
+  public abstract boolean mayEventTimeOverlappedWithTimeRange();
 
   public void setCommitterKeyAndCommitId(String committerKey, long commitId) {
     this.committerKey = committerKey;
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/PipeSnapshotEvent.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/PipeSnapshotEvent.java
index b1d1c8cb0b3..96da71b05a0 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/PipeSnapshotEvent.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/PipeSnapshotEvent.java
@@ -102,7 +102,7 @@ public abstract class PipeSnapshotEvent extends 
EnrichedEvent implements Seriali
   }
 
   @Override
-  public boolean isEventTimeOverlappedWithTimeRange() {
+  public boolean mayEventTimeOverlappedWithTimeRange() {
     return true;
   }
 
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/PipeWritePlanEvent.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/PipeWritePlanEvent.java
index abd00135d2f..c888e97ee6f 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/PipeWritePlanEvent.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/PipeWritePlanEvent.java
@@ -82,7 +82,7 @@ public abstract class PipeWritePlanEvent extends 
EnrichedEvent implements Serial
   }
 
   @Override
-  public boolean isEventTimeOverlappedWithTimeRange() {
+  public boolean mayEventTimeOverlappedWithTimeRange() {
     return true;
   }
 }

Reply via email to