This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 90616032db0 Pipe: Fixed potential lose point bug when an unclosed
tsfile reaches `isEventTimeOverlappedWithTimeRange()` judgement (CI fails on
testInsertNullValueTemplate) (#12156)
90616032db0 is described below
commit 90616032db017bdadf5193bd5356974400b9a090
Author: Caideyipi <[email protected]>
AuthorDate: Tue Mar 12 16:40:24 2024 +0800
Pipe: Fixed potential lose point bug when an unclosed tsfile reaches
`isEventTimeOverlappedWithTimeRange()` judgement (CI fails on
testInsertNullValueTemplate) (#12156)
* Fixed lose point bug
* enable batch
---------
Co-authored-by: Steve Yurong Su <[email protected]>
---
.../iotdb/pipe/it/autocreate/IoTDBPipeNullValueIT.java | 1 -
.../iotdb/db/pipe/event/UserDefinedEnrichedEvent.java | 4 ++--
.../db/pipe/event/common/heartbeat/PipeHeartbeatEvent.java | 2 +-
.../common/tablet/PipeInsertNodeTabletInsertionEvent.java | 2 +-
.../event/common/tablet/PipeRawTabletInsertionEvent.java | 2 +-
.../pipe/event/common/tsfile/PipeTsFileInsertionEvent.java | 8 ++++++--
.../iotdb/db/pipe/event/realtime/PipeRealtimeEvent.java | 4 ++--
.../realtime/PipeRealtimeDataRegionExtractor.java | 2 +-
.../iotdb/db/pipe/event/PipeTabletInsertionEventTest.java | 14 +++++++-------
.../org/apache/iotdb/commons/pipe/event/EnrichedEvent.java | 2 +-
.../apache/iotdb/commons/pipe/event/PipeSnapshotEvent.java | 2 +-
.../iotdb/commons/pipe/event/PipeWritePlanEvent.java | 2 +-
12 files changed, 24 insertions(+), 21 deletions(-)
diff --git
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeNullValueIT.java
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeNullValueIT.java
index b640809a928..008765713d0 100644
---
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeNullValueIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeNullValueIT.java
@@ -216,7 +216,6 @@ public class IoTDBPipeNullValueIT extends
AbstractPipeDualAutoIT {
connectorAttributes.put("connector", "iotdb-thrift-connector");
connectorAttributes.put("connector.ip", receiverIp);
connectorAttributes.put("connector.port",
Integer.toString(receiverPort));
- connectorAttributes.put("connector.batch.enable", "false");
if (withParsing) {
extractorAttributes.put("start-time", "1970-01-01T08:00:00.000+08:00");
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/UserDefinedEnrichedEvent.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/UserDefinedEnrichedEvent.java
index 492b85bae6d..2a2eb25721f 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/UserDefinedEnrichedEvent.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/UserDefinedEnrichedEvent.java
@@ -81,7 +81,7 @@ public class UserDefinedEnrichedEvent extends EnrichedEvent {
}
@Override
- public boolean isEventTimeOverlappedWithTimeRange() {
- return enrichedEvent.isEventTimeOverlappedWithTimeRange();
+ public boolean mayEventTimeOverlappedWithTimeRange() {
+ return enrichedEvent.mayEventTimeOverlappedWithTimeRange();
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/heartbeat/PipeHeartbeatEvent.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/heartbeat/PipeHeartbeatEvent.java
index ead8455ff6e..a5c0e267488 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/heartbeat/PipeHeartbeatEvent.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/heartbeat/PipeHeartbeatEvent.java
@@ -120,7 +120,7 @@ public class PipeHeartbeatEvent extends EnrichedEvent {
}
@Override
- public boolean isEventTimeOverlappedWithTimeRange() {
+ public boolean mayEventTimeOverlappedWithTimeRange() {
return true;
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java
index 42db580a502..3a09e865b09 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java
@@ -166,7 +166,7 @@ public class PipeInsertNodeTabletInsertionEvent extends
EnrichedEvent
}
@Override
- public boolean isEventTimeOverlappedWithTimeRange() {
+ public boolean mayEventTimeOverlappedWithTimeRange() {
try {
InsertNode insertNode = getInsertNode();
if (insertNode instanceof InsertRowNode) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java
index 5b3c995e5c9..02386f1e375 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java
@@ -146,7 +146,7 @@ public class PipeRawTabletInsertionEvent extends
EnrichedEvent implements Tablet
}
@Override
- public boolean isEventTimeOverlappedWithTimeRange() {
+ public boolean mayEventTimeOverlappedWithTimeRange() {
long[] timestamps = tablet.timestamps;
if (Objects.isNull(timestamps) || timestamps.length == 0) {
return false;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
index 67d1747900a..dfa229bf108 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
@@ -179,8 +179,12 @@ public class PipeTsFileInsertionEvent extends
EnrichedEvent implements TsFileIns
}
@Override
- public boolean isEventTimeOverlappedWithTimeRange() {
- return startTime <= resource.getFileEndTime() &&
resource.getFileStartTime() <= endTime;
+ public boolean mayEventTimeOverlappedWithTimeRange() {
+ // If the tsFile is not closed the resource.getFileEndTime() will be
Long.MIN_VALUE
+ // In that case we only judge the resource.getFileStartTime() to avoid
losing data
+ return isClosed.get()
+ ? startTime <= resource.getFileEndTime() &&
resource.getFileStartTime() <= endTime
+ : resource.getFileStartTime() <= endTime;
}
/////////////////////////// TsFileInsertionEvent ///////////////////////////
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/realtime/PipeRealtimeEvent.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/realtime/PipeRealtimeEvent.java
index 4ebe4c1936a..e693f6dae4b 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/realtime/PipeRealtimeEvent.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/realtime/PipeRealtimeEvent.java
@@ -155,8 +155,8 @@ public class PipeRealtimeEvent extends EnrichedEvent {
}
@Override
- public boolean isEventTimeOverlappedWithTimeRange() {
- return event.isEventTimeOverlappedWithTimeRange();
+ public boolean mayEventTimeOverlappedWithTimeRange() {
+ return event.mayEventTimeOverlappedWithTimeRange();
}
@Override
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionExtractor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionExtractor.java
index 1f8d72206e0..f564f737539 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionExtractor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionExtractor.java
@@ -245,7 +245,7 @@ public abstract class PipeRealtimeDataRegionExtractor
implements PipeExtractor {
// 2. Check if the timestamps of the data contained in this event
intersect with the time range.
// If there is no intersection, it indicates that this data will be
filtered out by the
// extractor, and the extract process is skipped.
- if (!event.shouldParseTime() ||
event.getEvent().isEventTimeOverlappedWithTimeRange()) {
+ if (!event.shouldParseTime() ||
event.getEvent().mayEventTimeOverlappedWithTimeRange()) {
doExtract(event);
} else {
event.decreaseReferenceCount(PipeRealtimeDataRegionExtractor.class.getName(),
false);
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/PipeTabletInsertionEventTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/PipeTabletInsertionEventTest.java
index 011d26f2ff0..6a29a403739 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/PipeTabletInsertionEventTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/PipeTabletInsertionEventTest.java
@@ -331,19 +331,19 @@ public class PipeTabletInsertionEventTest {
PipeRawTabletInsertionEvent event;
event = new PipeRawTabletInsertionEvent(tabletForInsertRowNode, 111L,
113L);
- Assert.assertFalse(event.isEventTimeOverlappedWithTimeRange());
+ Assert.assertFalse(event.mayEventTimeOverlappedWithTimeRange());
event = new PipeRawTabletInsertionEvent(tabletForInsertRowNode, 110L,
110L);
- Assert.assertTrue(event.isEventTimeOverlappedWithTimeRange());
+ Assert.assertTrue(event.mayEventTimeOverlappedWithTimeRange());
event = new PipeRawTabletInsertionEvent(tabletForInsertTabletNode, 111L,
113L);
- Assert.assertTrue(event.isEventTimeOverlappedWithTimeRange());
+ Assert.assertTrue(event.mayEventTimeOverlappedWithTimeRange());
event = new PipeRawTabletInsertionEvent(tabletForInsertTabletNode,
Long.MIN_VALUE, 110L);
- Assert.assertTrue(event.isEventTimeOverlappedWithTimeRange());
+ Assert.assertTrue(event.mayEventTimeOverlappedWithTimeRange());
event = new PipeRawTabletInsertionEvent(tabletForInsertTabletNode, 114L,
Long.MAX_VALUE);
- Assert.assertTrue(event.isEventTimeOverlappedWithTimeRange());
+ Assert.assertTrue(event.mayEventTimeOverlappedWithTimeRange());
event = new PipeRawTabletInsertionEvent(tabletForInsertTabletNode,
Long.MIN_VALUE, 109L);
- Assert.assertFalse(event.isEventTimeOverlappedWithTimeRange());
+ Assert.assertFalse(event.mayEventTimeOverlappedWithTimeRange());
event = new PipeRawTabletInsertionEvent(tabletForInsertTabletNode, 115L,
Long.MAX_VALUE);
- Assert.assertFalse(event.isEventTimeOverlappedWithTimeRange());
+ Assert.assertFalse(event.mayEventTimeOverlappedWithTimeRange());
}
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/EnrichedEvent.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/EnrichedEvent.java
index bf1044ae4e0..e9e50f958cc 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/EnrichedEvent.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/EnrichedEvent.java
@@ -243,7 +243,7 @@ public abstract class EnrichedEvent implements Event {
return true;
}
- public abstract boolean isEventTimeOverlappedWithTimeRange();
+ public abstract boolean mayEventTimeOverlappedWithTimeRange();
public void setCommitterKeyAndCommitId(String committerKey, long commitId) {
this.committerKey = committerKey;
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/PipeSnapshotEvent.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/PipeSnapshotEvent.java
index b1d1c8cb0b3..96da71b05a0 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/PipeSnapshotEvent.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/PipeSnapshotEvent.java
@@ -102,7 +102,7 @@ public abstract class PipeSnapshotEvent extends
EnrichedEvent implements Seriali
}
@Override
- public boolean isEventTimeOverlappedWithTimeRange() {
+ public boolean mayEventTimeOverlappedWithTimeRange() {
return true;
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/PipeWritePlanEvent.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/PipeWritePlanEvent.java
index abd00135d2f..c888e97ee6f 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/PipeWritePlanEvent.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/PipeWritePlanEvent.java
@@ -82,7 +82,7 @@ public abstract class PipeWritePlanEvent extends
EnrichedEvent implements Serial
}
@Override
- public boolean isEventTimeOverlappedWithTimeRange() {
+ public boolean mayEventTimeOverlappedWithTimeRange() {
return true;
}
}