This is an automated email from the ASF dual-hosted git repository. jackietien pushed a commit to branch ty/TableModelGrammar in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 84d986de5cd6c3f05e2c3fd96ec3015c43eeae75 Author: Caideyipi <[email protected]> AuthorDate: Tue Jul 23 14:08:27 2024 +0800 Pipe: Fix empty tablets generated by pattern parsing on sender side may cause NPE on receiver side (#12994) Co-authored-by: Steve Yurong Su <[email protected]> (cherry picked from commit a45adbc8534c1a457d43b220a4e8a7cef4218e96) --- .../request/PipeTransferTabletRawReq.java | 5 +++++ .../pipe/task/connection/PipeEventCollector.java | 26 +++++++++++----------- 2 files changed, 18 insertions(+), 13 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/request/PipeTransferTabletRawReq.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/request/PipeTransferTabletRawReq.java index 40c191b9d9e..389eb63d07c 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/request/PipeTransferTabletRawReq.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/request/PipeTransferTabletRawReq.java @@ -61,6 +61,11 @@ public class PipeTransferTabletRawReq extends TPipeTransferReq { new PipeTabletEventSorter(tablet).deduplicateAndSortTimestampsIfNecessary(); try { + if (tablet == null || tablet.rowSize == 0) { + // Empty statement, will be filtered after construction + return new InsertTabletStatement(); + } + final TSInsertTabletReq request = new TSInsertTabletReq(); for (final IMeasurementSchema measurementSchema : tablet.getSchemas()) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/connection/PipeEventCollector.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/connection/PipeEventCollector.java index fb88c8983e5..c4c91f80f58 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/connection/PipeEventCollector.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/connection/PipeEventCollector.java @@ -97,8 +97,7 @@ public class PipeEventCollector implements EventCollector { if (sourceEvent.shouldParseTimeOrPattern()) { for (final PipeRawTabletInsertionEvent parsedEvent : sourceEvent.toRawTabletInsertionEvents()) { - hasNoGeneratedEvent = false; - collectEvent(parsedEvent); + collectParsedRawTableEvent(parsedEvent); } } else { collectEvent(sourceEvent); @@ -106,15 +105,10 @@ public class PipeEventCollector implements EventCollector { } private void parseAndCollectEvent(final PipeRawTabletInsertionEvent sourceEvent) { - if (sourceEvent.shouldParseTimeOrPattern()) { - final PipeRawTabletInsertionEvent parsedEvent = sourceEvent.parseEventWithPatternOrTime(); - if (!parsedEvent.hasNoNeedParsingAndIsEmpty()) { - hasNoGeneratedEvent = false; - collectEvent(parsedEvent); - } - } else { - collectEvent(sourceEvent); - } + collectParsedRawTableEvent( + sourceEvent.shouldParseTimeOrPattern() + ? sourceEvent.parseEventWithPatternOrTime() + : sourceEvent); } private void parseAndCollectEvent(final PipeTsFileInsertionEvent sourceEvent) throws Exception { @@ -132,14 +126,20 @@ public class PipeEventCollector implements EventCollector { try { for (final TabletInsertionEvent parsedEvent : sourceEvent.toTabletInsertionEvents()) { - hasNoGeneratedEvent = false; - collectEvent(parsedEvent); + collectParsedRawTableEvent((PipeRawTabletInsertionEvent) parsedEvent); } } finally { sourceEvent.close(); } } + private void collectParsedRawTableEvent(final PipeRawTabletInsertionEvent parsedEvent) { + if (!parsedEvent.hasNoNeedParsingAndIsEmpty()) { + hasNoGeneratedEvent = false; + collectEvent(parsedEvent); + } + } + private void parseAndCollectEvent(final PipeSchemaRegionWritePlanEvent deleteDataEvent) { // Only used by events containing delete data node, no need to bind progress index here since // delete data event does not have progress index currently
