This is an automated email from the ASF dual-hosted git repository.

justinchen pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/dev/1.3 by this push:
     new 411b8e9d273 [To dev/1.3] Pipe: Add retry when TsFile parsing failed to 
avoid race among processor threads (#15624, #15644) (#15659)
411b8e9d273 is described below

commit 411b8e9d27314039a03b8d13d8a7a1400b11401f
Author: Zikun Ma <[email protected]>
AuthorDate: Fri Jun 6 16:59:48 2025 +0800

    [To dev/1.3] Pipe: Add retry when TsFile parsing failed to avoid race among 
processor threads (#15624, #15644) (#15659)
    
    * Pipe: Add retry when TsFile parsing failed to avoid race among processor 
threads (#15624)
    
    (cherry picked from commit 7ad757f58a2cc5b5edb41766ff655b31328c3e80)
    
    * Pipe: Add retry when TsFile parsing failed to avoid race among processor 
threads (#15624, #15644)
    
    * refactor
    
    * refactor
    
    * refactor
    
    (cherry picked from commit c28e50f7afadc0de48bb7e7cb19a4be9398979a3)
---
 .../agent/task/connection/PipeEventCollector.java  |  6 +-
 .../protocol/websocket/WebSocketConnector.java     | 13 +++--
 .../common/tsfile/PipeTsFileInsertionEvent.java    | 65 +++++++++++++++++++---
 .../processor/aggregate/AggregateProcessor.java    | 23 +++++++-
 .../downsampling/DownSamplingProcessor.java        | 25 +++++++--
 .../batch/SubscriptionPipeTsFileEventBatch.java    | 31 ++++++-----
 6 files changed, 123 insertions(+), 40 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/connection/PipeEventCollector.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/connection/PipeEventCollector.java
index fd863b2ab8a..66bc5ab2a50 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/connection/PipeEventCollector.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/connection/PipeEventCollector.java
@@ -34,7 +34,6 @@ import 
org.apache.iotdb.db.pipe.extractor.schemaregion.IoTDBSchemaRegionExtracto
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeType;
 import org.apache.iotdb.pipe.api.collector.EventCollector;
 import org.apache.iotdb.pipe.api.event.Event;
-import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
 import org.apache.iotdb.pipe.api.exception.PipeException;
 
 import org.slf4j.Logger;
@@ -141,9 +140,8 @@ public class PipeEventCollector implements EventCollector {
     }
 
     try {
-      for (final TabletInsertionEvent parsedEvent : 
sourceEvent.toTabletInsertionEvents()) {
-        collectParsedRawTableEvent((PipeRawTabletInsertionEvent) parsedEvent);
-      }
+      sourceEvent.consumeTabletInsertionEventsWithRetry(
+          this::collectParsedRawTableEvent, 
"PipeEventCollector::parseAndCollectEvent");
     } finally {
       sourceEvent.close();
     }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/websocket/WebSocketConnector.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/websocket/WebSocketConnector.java
index cf258452346..92436366511 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/websocket/WebSocketConnector.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/websocket/WebSocketConnector.java
@@ -139,11 +139,14 @@ public class WebSocketConnector implements PipeConnector {
     }
 
     try {
-      for (TabletInsertionEvent event : 
tsFileInsertionEvent.toTabletInsertionEvents()) {
-        // Skip report if any tablet events is added
-        ((PipeTsFileInsertionEvent) tsFileInsertionEvent).skipReportOnCommit();
-        transfer(event);
-      }
+      ((PipeTsFileInsertionEvent) tsFileInsertionEvent)
+          .consumeTabletInsertionEventsWithRetry(
+              event -> {
+                // Skip report if any tablet events is added
+                ((PipeTsFileInsertionEvent) 
tsFileInsertionEvent).skipReportOnCommit();
+                transfer(event);
+              },
+              "WebSocketConnector::transfer");
     } finally {
       tsFileInsertionEvent.close();
     }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
index b648af30e6f..a2843392636 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.db.pipe.event.common.tsfile;
 
 import org.apache.iotdb.commons.consensus.index.ProgressIndex;
 import org.apache.iotdb.commons.consensus.index.impl.MinimumProgressIndex;
+import 
org.apache.iotdb.commons.exception.pipe.PipeRuntimeOutOfMemoryCriticalException;
 import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta;
 import org.apache.iotdb.commons.pipe.config.PipeConfig;
 import org.apache.iotdb.commons.pipe.datastructure.pattern.PipePattern;
@@ -50,11 +51,13 @@ import org.slf4j.LoggerFactory;
 import java.io.File;
 import java.io.IOException;
 import java.util.Collections;
+import java.util.Iterator;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
 
 public class PipeTsFileInsertionEvent extends EnrichedEvent
@@ -413,6 +416,49 @@ public class PipeTsFileInsertionEvent extends EnrichedEvent
 
   /////////////////////////// TsFileInsertionEvent ///////////////////////////
 
+  @FunctionalInterface
+  public interface TabletInsertionEventConsumer {
+    void consume(final PipeRawTabletInsertionEvent event);
+  }
+
+  public void consumeTabletInsertionEventsWithRetry(
+      final TabletInsertionEventConsumer consumer, final String callerName) 
throws PipeException {
+    final Iterable<TabletInsertionEvent> iterable = toTabletInsertionEvents();
+    final Iterator<TabletInsertionEvent> iterator = iterable.iterator();
+    int tabletEventCount = 0;
+    while (iterator.hasNext()) {
+      final TabletInsertionEvent parsedEvent = iterator.next();
+      tabletEventCount++;
+      int retryCount = 0;
+      while (true) {
+        // If failed due do insufficient memory, retry until success to avoid 
race among multiple
+        // processor threads
+        try {
+          consumer.consume((PipeRawTabletInsertionEvent) parsedEvent);
+          break;
+        } catch (final PipeRuntimeOutOfMemoryCriticalException e) {
+          if (retryCount++ % 100 == 0) {
+            LOGGER.warn(
+                "{}: failed to allocate memory for parsing TsFile {}, tablet 
event no. {}, retry count is {}, will keep retrying.",
+                callerName,
+                getTsFile(),
+                tabletEventCount,
+                retryCount,
+                e);
+          } else if (LOGGER.isDebugEnabled()) {
+            LOGGER.debug(
+                "{}: failed to allocate memory for parsing TsFile {}, tablet 
event no. {}, retry count is {}, will keep retrying.",
+                callerName,
+                getTsFile(),
+                tabletEventCount,
+                retryCount,
+                e);
+          }
+        }
+      }
+    }
+  }
+
   @Override
   public Iterable<TabletInsertionEvent> toTabletInsertionEvents() throws 
PipeException {
     // 20 - 40 seconds for waiting
@@ -528,18 +574,19 @@ public class PipeTsFileInsertionEvent extends 
EnrichedEvent
   }
 
   public long count(final boolean skipReportOnCommit) throws IOException {
-    long count = 0;
+    AtomicLong count = new AtomicLong();
 
     if (shouldParseTime()) {
       try {
-        for (final TabletInsertionEvent event : toTabletInsertionEvents()) {
-          final PipeRawTabletInsertionEvent rawEvent = 
((PipeRawTabletInsertionEvent) event);
-          count += rawEvent.count();
-          if (skipReportOnCommit) {
-            rawEvent.skipReportOnCommit();
-          }
-        }
-        return count;
+        consumeTabletInsertionEventsWithRetry(
+            event -> {
+              count.addAndGet(event.count());
+              if (skipReportOnCommit) {
+                event.skipReportOnCommit();
+              }
+            },
+            "PipeTsFileInsertionEvent::count");
+        return count.get();
       } finally {
         close();
       }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/aggregate/AggregateProcessor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/aggregate/AggregateProcessor.java
index 157c6e2103d..dc5a7e4390f 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/aggregate/AggregateProcessor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/aggregate/AggregateProcessor.java
@@ -512,9 +512,26 @@ public class AggregateProcessor implements PipeProcessor {
       final TsFileInsertionEvent tsFileInsertionEvent, final EventCollector 
eventCollector)
       throws Exception {
     try {
-      for (final TabletInsertionEvent tabletInsertionEvent :
-          tsFileInsertionEvent.toTabletInsertionEvents()) {
-        process(tabletInsertionEvent, eventCollector);
+      if (tsFileInsertionEvent instanceof PipeTsFileInsertionEvent) {
+        final AtomicReference<Exception> ex = new AtomicReference<>();
+        ((PipeTsFileInsertionEvent) tsFileInsertionEvent)
+            .consumeTabletInsertionEventsWithRetry(
+                event -> {
+                  try {
+                    process(event, eventCollector);
+                  } catch (Exception e) {
+                    ex.set(e);
+                  }
+                },
+                "AggregateProcessor::process");
+        if (ex.get() != null) {
+          throw ex.get();
+        }
+      } else {
+        for (final TabletInsertionEvent tabletInsertionEvent :
+            tsFileInsertionEvent.toTabletInsertionEvents()) {
+          process(tabletInsertionEvent, eventCollector);
+        }
       }
     } finally {
       tsFileInsertionEvent.close();
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/downsampling/DownSamplingProcessor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/downsampling/DownSamplingProcessor.java
index fd631772b93..a8e0c270570 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/downsampling/DownSamplingProcessor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/downsampling/DownSamplingProcessor.java
@@ -23,6 +23,7 @@ import org.apache.iotdb.commons.consensus.DataRegionId;
 import 
org.apache.iotdb.commons.pipe.config.plugin.env.PipeTaskProcessorRuntimeEnvironment;
 import 
org.apache.iotdb.db.pipe.event.common.tablet.PipeInsertNodeTabletInsertionEvent;
 import 
org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
+import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent;
 import org.apache.iotdb.db.storageengine.StorageEngine;
 import org.apache.iotdb.pipe.api.PipeProcessor;
 import org.apache.iotdb.pipe.api.access.Row;
@@ -45,7 +46,6 @@ import static 
org.apache.iotdb.commons.pipe.config.constant.PipeProcessorConstan
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeProcessorConstant.PROCESSOR_DOWN_SAMPLING_SPLIT_FILE_KEY;
 
 public abstract class DownSamplingProcessor implements PipeProcessor {
-
   protected long memoryLimitInBytes;
 
   protected boolean shouldSplitFile;
@@ -149,9 +149,26 @@ public abstract class DownSamplingProcessor implements 
PipeProcessor {
       throws Exception {
     if (shouldSplitFile) {
       try {
-        for (final TabletInsertionEvent tabletInsertionEvent :
-            tsFileInsertionEvent.toTabletInsertionEvents()) {
-          process(tabletInsertionEvent, eventCollector);
+        if (tsFileInsertionEvent instanceof PipeTsFileInsertionEvent) {
+          final AtomicReference<Exception> ex = new AtomicReference<>();
+          ((PipeTsFileInsertionEvent) tsFileInsertionEvent)
+              .consumeTabletInsertionEventsWithRetry(
+                  event -> {
+                    try {
+                      process(event, eventCollector);
+                    } catch (Exception e) {
+                      ex.set(e);
+                    }
+                  },
+                  "DownSamplingProcessor::process");
+          if (ex.get() != null) {
+            throw ex.get();
+          }
+        } else {
+          for (final TabletInsertionEvent tabletInsertionEvent :
+              tsFileInsertionEvent.toTabletInsertionEvents()) {
+            process(tabletInsertionEvent, eventCollector);
+          }
         }
       } finally {
         tsFileInsertionEvent.close();
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeTsFileEventBatch.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeTsFileEventBatch.java
index 7b1a6caf610..15c93d7a5e6 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeTsFileEventBatch.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeTsFileEventBatch.java
@@ -21,7 +21,6 @@ package org.apache.iotdb.db.subscription.event.batch;
 
 import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
 import 
org.apache.iotdb.db.pipe.connector.payload.evolvable.batch.PipeTabletEventTsFileBatch;
-import 
org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
 import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent;
 import 
org.apache.iotdb.db.subscription.broker.SubscriptionPrefetchingTsFileQueue;
 import org.apache.iotdb.db.subscription.event.SubscriptionEvent;
@@ -95,20 +94,22 @@ public class SubscriptionPipeTsFileEventBatch extends 
SubscriptionPipeEventBatch
   protected void onTsFileInsertionEvent(final TsFileInsertionEvent event) {
     // TODO: parse tsfile event on the fly like 
SubscriptionPipeTabletEventBatch
     try {
-      for (final TabletInsertionEvent parsedEvent : 
event.toTabletInsertionEvents()) {
-        if (!((PipeRawTabletInsertionEvent) parsedEvent)
-            .increaseReferenceCount(this.getClass().getName())) {
-          LOGGER.warn(
-              "SubscriptionPipeTsFileEventBatch: Failed to increase the 
reference count of event {}, skipping it.",
-              ((PipeRawTabletInsertionEvent) parsedEvent).coreReportMessage());
-        } else {
-          try {
-            batch.onEvent(parsedEvent);
-          } catch (final Exception ignored) {
-            // no exceptions will be thrown
-          }
-        }
-      }
+      ((PipeTsFileInsertionEvent) event)
+          .consumeTabletInsertionEventsWithRetry(
+              event1 -> {
+                if (!event1.increaseReferenceCount(this.getClass().getName())) 
{
+                  LOGGER.warn(
+                      "SubscriptionPipeTsFileEventBatch: Failed to increase 
the reference count of event {}, skipping it.",
+                      event1.coreReportMessage());
+                } else {
+                  try {
+                    batch.onEvent(event1);
+                  } catch (final Exception ignored) {
+                    // no exceptions will be thrown
+                  }
+                }
+              },
+              "SubscriptionPipeTsFileEventBatch::onTsFileInsertionEvent");
     } finally {
       try {
         event.close();

Reply via email to