This is an automated email from the ASF dual-hosted git repository. jackietien pushed a commit to branch force_ci/object_type in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 4057642baddc9ce14e130625321117ef385697ef Author: Caideyipi <[email protected]> AuthorDate: Tue Dec 2 14:14:30 2025 +0800 Pipe: Fixed the bug that lower version tablet may cause NPE when sent to 2.x version & The temporary exception may be wrongly reported (#16843) * older_version_compatibility * protect * dependency (cherry picked from commit f68c49e349b2974d812e56bf5fbc22dd646e7eb6) --- .../db/pipe/agent/task/subtask/processor/PipeProcessorSubtask.java | 7 +++++++ .../db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java | 2 +- .../iotdb/db/pipe/sink/util/sorter/PipeTabletEventSorter.java | 5 +++++ 3 files changed, 13 insertions(+), 1 deletion(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/processor/PipeProcessorSubtask.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/processor/PipeProcessorSubtask.java index a574712cbbd..31ffcef042f 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/processor/PipeProcessorSubtask.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/processor/PipeProcessorSubtask.java @@ -45,6 +45,7 @@ import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent; import org.apache.iotdb.pipe.api.exception.PipeException; import com.google.common.util.concurrent.ListeningExecutorService; +import org.apache.tsfile.external.commons.lang3.exception.ExceptionUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -219,6 +220,12 @@ public class PipeProcessorSubtask extends PipeReportableSubtask { e); return false; } catch (final Exception e) { + if (ExceptionUtils.getRootCause(e) instanceof PipeRuntimeOutOfMemoryCriticalException) { + LOGGER.info( + "Temporarily out of memory in pipe event processing, will wait for the memory to release.", + e); + return false; + } if (!isClosed.get()) { throw new PipeException( String.format( diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java index 2a1ab3f5a35..9db3cb57e6a 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java @@ -468,7 +468,7 @@ public class PipeTsFileInsertionEvent extends PipeInsertionEvent } } } - } catch (final AccessDeniedException e) { + } catch (final AccessDeniedException | PipeRuntimeOutOfMemoryCriticalException e) { throw e; } catch (final Exception e) { if (e instanceof InterruptedException) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/util/sorter/PipeTabletEventSorter.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/util/sorter/PipeTabletEventSorter.java index 6569c6be895..6540bf9855d 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/util/sorter/PipeTabletEventSorter.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/util/sorter/PipeTabletEventSorter.java @@ -27,6 +27,7 @@ import org.apache.tsfile.write.record.Tablet; import org.apache.tsfile.write.schema.IMeasurementSchema; import java.time.LocalDate; +import java.util.Objects; public class PipeTabletEventSorter { @@ -84,6 +85,10 @@ public class PipeTabletEventSorter { final TSDataType dataType, final BitMap originalBitMap, final BitMap deDuplicatedBitMap) { + // Older version's sender may contain null values, we need to cover this case + if (Objects.isNull(valueList)) { + return null; + } switch (dataType) { case BOOLEAN: final boolean[] boolValues = (boolean[]) valueList;
