This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new a45adbc8534 Pipe: Fix empty tablets generated by pattern parsing on
sender side may cause NPE on receiver side (#12994)
a45adbc8534 is described below
commit a45adbc8534c1a457d43b220a4e8a7cef4218e96
Author: Caideyipi <[email protected]>
AuthorDate: Tue Jul 23 14:08:27 2024 +0800
Pipe: Fix empty tablets generated by pattern parsing on sender side may
cause NPE on receiver side (#12994)
Co-authored-by: Steve Yurong Su <[email protected]>
---
.../request/PipeTransferTabletRawReq.java | 5 +++++
.../pipe/task/connection/PipeEventCollector.java | 26 +++++++++++-----------
2 files changed, 18 insertions(+), 13 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/request/PipeTransferTabletRawReq.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/request/PipeTransferTabletRawReq.java
index ed4ca323fbe..c739a26a7d3 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/request/PipeTransferTabletRawReq.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/request/PipeTransferTabletRawReq.java
@@ -61,6 +61,11 @@ public class PipeTransferTabletRawReq extends
TPipeTransferReq {
new
PipeTabletEventSorter(tablet).deduplicateAndSortTimestampsIfNecessary();
try {
+ if (tablet == null || tablet.rowSize == 0) {
+ // Empty statement, will be filtered after construction
+ return new InsertTabletStatement();
+ }
+
final TSInsertTabletReq request = new TSInsertTabletReq();
for (final IMeasurementSchema measurementSchema : tablet.getSchemas()) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/connection/PipeEventCollector.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/connection/PipeEventCollector.java
index fb88c8983e5..c4c91f80f58 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/connection/PipeEventCollector.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/connection/PipeEventCollector.java
@@ -97,8 +97,7 @@ public class PipeEventCollector implements EventCollector {
if (sourceEvent.shouldParseTimeOrPattern()) {
for (final PipeRawTabletInsertionEvent parsedEvent :
sourceEvent.toRawTabletInsertionEvents()) {
- hasNoGeneratedEvent = false;
- collectEvent(parsedEvent);
+ collectParsedRawTableEvent(parsedEvent);
}
} else {
collectEvent(sourceEvent);
@@ -106,15 +105,10 @@ public class PipeEventCollector implements EventCollector
{
}
private void parseAndCollectEvent(final PipeRawTabletInsertionEvent
sourceEvent) {
- if (sourceEvent.shouldParseTimeOrPattern()) {
- final PipeRawTabletInsertionEvent parsedEvent =
sourceEvent.parseEventWithPatternOrTime();
- if (!parsedEvent.hasNoNeedParsingAndIsEmpty()) {
- hasNoGeneratedEvent = false;
- collectEvent(parsedEvent);
- }
- } else {
- collectEvent(sourceEvent);
- }
+ collectParsedRawTableEvent(
+ sourceEvent.shouldParseTimeOrPattern()
+ ? sourceEvent.parseEventWithPatternOrTime()
+ : sourceEvent);
}
private void parseAndCollectEvent(final PipeTsFileInsertionEvent
sourceEvent) throws Exception {
@@ -132,14 +126,20 @@ public class PipeEventCollector implements EventCollector
{
try {
for (final TabletInsertionEvent parsedEvent :
sourceEvent.toTabletInsertionEvents()) {
- hasNoGeneratedEvent = false;
- collectEvent(parsedEvent);
+ collectParsedRawTableEvent((PipeRawTabletInsertionEvent) parsedEvent);
}
} finally {
sourceEvent.close();
}
}
+ private void collectParsedRawTableEvent(final PipeRawTabletInsertionEvent
parsedEvent) {
+ if (!parsedEvent.hasNoNeedParsingAndIsEmpty()) {
+ hasNoGeneratedEvent = false;
+ collectEvent(parsedEvent);
+ }
+ }
+
private void parseAndCollectEvent(final PipeSchemaRegionWritePlanEvent
deleteDataEvent) {
// Only used by events containing delete data node, no need to bind
progress index here since
// delete data event does not have progress index currently