This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch ty/TableModelGrammar
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 84d986de5cd6c3f05e2c3fd96ec3015c43eeae75
Author: Caideyipi <[email protected]>
AuthorDate: Tue Jul 23 14:08:27 2024 +0800

    Pipe: Fix empty tablets generated by pattern parsing on sender side may 
cause NPE on receiver side (#12994)
    
    Co-authored-by: Steve Yurong Su <[email protected]>
    (cherry picked from commit a45adbc8534c1a457d43b220a4e8a7cef4218e96)
---
 .../request/PipeTransferTabletRawReq.java          |  5 +++++
 .../pipe/task/connection/PipeEventCollector.java   | 26 +++++++++++-----------
 2 files changed, 18 insertions(+), 13 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/request/PipeTransferTabletRawReq.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/request/PipeTransferTabletRawReq.java
index 40c191b9d9e..389eb63d07c 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/request/PipeTransferTabletRawReq.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/request/PipeTransferTabletRawReq.java
@@ -61,6 +61,11 @@ public class PipeTransferTabletRawReq extends 
TPipeTransferReq {
     new 
PipeTabletEventSorter(tablet).deduplicateAndSortTimestampsIfNecessary();
 
     try {
+      if (tablet == null || tablet.rowSize == 0) {
+        // Empty statement, will be filtered after construction
+        return new InsertTabletStatement();
+      }
+
       final TSInsertTabletReq request = new TSInsertTabletReq();
 
       for (final IMeasurementSchema measurementSchema : tablet.getSchemas()) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/connection/PipeEventCollector.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/connection/PipeEventCollector.java
index fb88c8983e5..c4c91f80f58 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/connection/PipeEventCollector.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/connection/PipeEventCollector.java
@@ -97,8 +97,7 @@ public class PipeEventCollector implements EventCollector {
     if (sourceEvent.shouldParseTimeOrPattern()) {
       for (final PipeRawTabletInsertionEvent parsedEvent :
           sourceEvent.toRawTabletInsertionEvents()) {
-        hasNoGeneratedEvent = false;
-        collectEvent(parsedEvent);
+        collectParsedRawTableEvent(parsedEvent);
       }
     } else {
       collectEvent(sourceEvent);
@@ -106,15 +105,10 @@ public class PipeEventCollector implements EventCollector 
{
   }
 
   private void parseAndCollectEvent(final PipeRawTabletInsertionEvent 
sourceEvent) {
-    if (sourceEvent.shouldParseTimeOrPattern()) {
-      final PipeRawTabletInsertionEvent parsedEvent = 
sourceEvent.parseEventWithPatternOrTime();
-      if (!parsedEvent.hasNoNeedParsingAndIsEmpty()) {
-        hasNoGeneratedEvent = false;
-        collectEvent(parsedEvent);
-      }
-    } else {
-      collectEvent(sourceEvent);
-    }
+    collectParsedRawTableEvent(
+        sourceEvent.shouldParseTimeOrPattern()
+            ? sourceEvent.parseEventWithPatternOrTime()
+            : sourceEvent);
   }
 
   private void parseAndCollectEvent(final PipeTsFileInsertionEvent 
sourceEvent) throws Exception {
@@ -132,14 +126,20 @@ public class PipeEventCollector implements EventCollector 
{
 
     try {
       for (final TabletInsertionEvent parsedEvent : 
sourceEvent.toTabletInsertionEvents()) {
-        hasNoGeneratedEvent = false;
-        collectEvent(parsedEvent);
+        collectParsedRawTableEvent((PipeRawTabletInsertionEvent) parsedEvent);
       }
     } finally {
       sourceEvent.close();
     }
   }
 
+  private void collectParsedRawTableEvent(final PipeRawTabletInsertionEvent 
parsedEvent) {
+    if (!parsedEvent.hasNoNeedParsingAndIsEmpty()) {
+      hasNoGeneratedEvent = false;
+      collectEvent(parsedEvent);
+    }
+  }
+
   private void parseAndCollectEvent(final PipeSchemaRegionWritePlanEvent 
deleteDataEvent) {
     // Only used by events containing delete data node, no need to bind 
progress index here since
     // delete data event does not have progress index currently

Reply via email to