This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch ty/object_type
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 999b359e9449bebc697170a193123c799f3a9cc3
Author: Caideyipi <[email protected]>
AuthorDate: Tue Dec 2 14:14:30 2025 +0800

    Pipe: Fixed the bug that lower version tablet may cause NPE when sent to 
2.x version & The temporary exception may be wrongly reported (#16843)
    
    * older_version_compatibility
    
    * protect
    
    * dependency
    
    (cherry picked from commit f68c49e349b2974d812e56bf5fbc22dd646e7eb6)
---
 .../db/pipe/agent/task/subtask/processor/PipeProcessorSubtask.java | 7 +++++++
 .../db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java      | 2 +-
 .../iotdb/db/pipe/sink/util/sorter/PipeTabletEventSorter.java      | 5 +++++
 3 files changed, 13 insertions(+), 1 deletion(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/processor/PipeProcessorSubtask.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/processor/PipeProcessorSubtask.java
index a574712cbbd..31ffcef042f 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/processor/PipeProcessorSubtask.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/processor/PipeProcessorSubtask.java
@@ -45,6 +45,7 @@ import 
org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent;
 import org.apache.iotdb.pipe.api.exception.PipeException;
 
 import com.google.common.util.concurrent.ListeningExecutorService;
+import org.apache.tsfile.external.commons.lang3.exception.ExceptionUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -219,6 +220,12 @@ public class PipeProcessorSubtask extends 
PipeReportableSubtask {
           e);
       return false;
     } catch (final Exception e) {
+      if (ExceptionUtils.getRootCause(e) instanceof 
PipeRuntimeOutOfMemoryCriticalException) {
+        LOGGER.info(
+            "Temporarily out of memory in pipe event processing, will wait for 
the memory to release.",
+            e);
+        return false;
+      }
       if (!isClosed.get()) {
         throw new PipeException(
             String.format(
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
index 2a1ab3f5a35..9db3cb57e6a 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
@@ -468,7 +468,7 @@ public class PipeTsFileInsertionEvent extends 
PipeInsertionEvent
           }
         }
       }
-    } catch (final AccessDeniedException e) {
+    } catch (final AccessDeniedException | 
PipeRuntimeOutOfMemoryCriticalException e) {
       throw e;
     } catch (final Exception e) {
       if (e instanceof InterruptedException) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/util/sorter/PipeTabletEventSorter.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/util/sorter/PipeTabletEventSorter.java
index 6569c6be895..6540bf9855d 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/util/sorter/PipeTabletEventSorter.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/util/sorter/PipeTabletEventSorter.java
@@ -27,6 +27,7 @@ import org.apache.tsfile.write.record.Tablet;
 import org.apache.tsfile.write.schema.IMeasurementSchema;
 
 import java.time.LocalDate;
+import java.util.Objects;
 
 public class PipeTabletEventSorter {
 
@@ -84,6 +85,10 @@ public class PipeTabletEventSorter {
       final TSDataType dataType,
       final BitMap originalBitMap,
       final BitMap deDuplicatedBitMap) {
+    // Older version's sender may contain null values, we need to cover this 
case
+    if (Objects.isNull(valueList)) {
+      return null;
+    }
     switch (dataType) {
       case BOOLEAN:
         final boolean[] boolValues = (boolean[]) valueList;

Reply via email to