This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch fix-pipe-cause-wal-pin
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 8f581253ca17d3327d982198444f2e98d8956d78
Author: Steve Yurong Su <[email protected]>
AuthorDate: Thu Sep 28 17:50:35 2023 +0800

    NEW HYBRID MODE
---
 .../PipeRealtimeDataRegionHybridExtractor.java     | 96 ++++++++--------------
 1 file changed, 32 insertions(+), 64 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionHybridExtractor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionHybridExtractor.java
index e4dbf60f56c..4473773a802 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionHybridExtractor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionHybridExtractor.java
@@ -81,39 +81,27 @@ public class PipeRealtimeDataRegionHybridExtractor extends 
PipeRealtimeDataRegio
       //  size of wal buffer), the write operation will be throttled, so we 
should not extract any
       //  more tablet events.
       //  3. The number of tsfile events in the pending queue has exceeded the 
limit.
-      event.getTsFileEpoch().migrateState(this, state -> 
TsFileEpoch.State.USING_TSFILE);
+      event
+          .getTsFileEpoch()
+          .migrateState(
+              this,
+              state ->
+                  state.equals(TsFileEpoch.State.EMPTY) ? 
TsFileEpoch.State.USING_TSFILE : state);
     }
 
-    final TsFileEpoch.State state = event.getTsFileEpoch().getState(this);
-    switch (state) {
-      case USING_TSFILE:
-        // Ignore the tablet event.
-        
event.decreaseReferenceCount(PipeRealtimeDataRegionHybridExtractor.class.getName(),
 false);
-        break;
-      case EMPTY:
-      case USING_TABLET:
-        if (!pendingQueue.waitedOffer(event)) {
-          // this would not happen, but just in case.
-          // pendingQueue is unbounded, so it should never reach capacity.
-          final String errorMessage =
-              String.format(
-                  "extractTabletInsertion: pending queue of 
PipeRealtimeDataRegionHybridExtractor %s "
-                      + "has reached capacity, discard tablet event %s, 
current state %s",
-                  this, event, event.getTsFileEpoch().getState(this));
-          LOGGER.error(errorMessage);
-          PipeAgent.runtime()
-              .report(pipeTaskMeta, new 
PipeRuntimeNonCriticalException(errorMessage));
-
-          // Ignore the tablet event.
-          event.decreaseReferenceCount(
-              PipeRealtimeDataRegionHybridExtractor.class.getName(), false);
-        }
-        break;
-      default:
-        throw new UnsupportedOperationException(
-            String.format(
-                "Unsupported state %s for hybrid realtime extractor %s",
-                state, PipeRealtimeDataRegionHybridExtractor.class.getName()));
+    if (!pendingQueue.waitedOffer(event)) {
+      // this would not happen, but just in case.
+      // pendingQueue is unbounded, so it should never reach capacity.
+      final String errorMessage =
+          String.format(
+              "extractTabletInsertion: pending queue of 
PipeRealtimeDataRegionHybridExtractor %s "
+                  + "has reached capacity, discard tablet event %s, current 
state %s",
+              this, event, event.getTsFileEpoch().getState(this));
+      LOGGER.error(errorMessage);
+      PipeAgent.runtime().report(pipeTaskMeta, new 
PipeRuntimeNonCriticalException(errorMessage));
+
+      // Ignore the tablet event.
+      
event.decreaseReferenceCount(PipeRealtimeDataRegionHybridExtractor.class.getName(),
 false);
     }
   }
 
@@ -125,39 +113,19 @@ public class PipeRealtimeDataRegionHybridExtractor 
extends PipeRealtimeDataRegio
             state ->
                 state.equals(TsFileEpoch.State.EMPTY) ? 
TsFileEpoch.State.USING_TSFILE : state);
 
-    final TsFileEpoch.State state = event.getTsFileEpoch().getState(this);
-    switch (state) {
-      case EMPTY:
-      case USING_TSFILE:
-        if (!pendingQueue.waitedOffer(event)) {
-          // this would not happen, but just in case.
-          // pendingQueue is unbounded, so it should never reach capacity.
-          final String errorMessage =
-              String.format(
-                  "extractTsFileInsertion: pending queue of 
PipeRealtimeDataRegionHybridExtractor %s "
-                      + "has reached capacity, discard TsFile event %s, 
current state %s",
-                  this, event, event.getTsFileEpoch().getState(this));
-          LOGGER.error(errorMessage);
-          PipeAgent.runtime()
-              .report(pipeTaskMeta, new 
PipeRuntimeNonCriticalException(errorMessage));
-
-          // Ignore the tsfile event.
-          event.decreaseReferenceCount(
-              PipeRealtimeDataRegionHybridExtractor.class.getName(), false);
-        }
-        break;
-      case USING_TABLET:
-        // All the tablet events have been extracted, so we can ignore the 
tsFile event.
-        // Report this event for SimpleProgressIndex, which does not have 
progressIndex for wal.
-        // This report won't affect IoTProgressIndex since the previous wal 
events have been
-        // successfully transferred here.
-        
event.decreaseReferenceCount(PipeRealtimeDataRegionHybridExtractor.class.getName(),
 true);
-        break;
-      default:
-        throw new UnsupportedOperationException(
-            String.format(
-                "Unsupported state %s for hybrid realtime extractor %s",
-                state, PipeRealtimeDataRegionHybridExtractor.class.getName()));
+    if (!pendingQueue.waitedOffer(event)) {
+      // this would not happen, but just in case.
+      // pendingQueue is unbounded, so it should never reach capacity.
+      final String errorMessage =
+          String.format(
+              "extractTsFileInsertion: pending queue of 
PipeRealtimeDataRegionHybridExtractor %s "
+                  + "has reached capacity, discard TsFile event %s, current 
state %s",
+              this, event, event.getTsFileEpoch().getState(this));
+      LOGGER.error(errorMessage);
+      PipeAgent.runtime().report(pipeTaskMeta, new 
PipeRuntimeNonCriticalException(errorMessage));
+
+      // Ignore the tsfile event.
+      
event.decreaseReferenceCount(PipeRealtimeDataRegionHybridExtractor.class.getName(),
 false);
     }
   }
 

Reply via email to