This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch fix-pipe-ref-issue
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 1b588fb9efa54aafd7ddae3edfbc5dae5c9f86fa
Author: Steve Yurong Su <[email protected]>
AuthorDate: Fri Aug 4 17:59:31 2023 +0800

    Pipe: referrence count is not correctly increase / decrease
---
 .../PipeRealtimeDataRegionHybridExtractor.java     | 57 ++++++++++++++--------
 .../PipeRealtimeDataRegionLogExtractor.java        | 23 +++++----
 .../PipeRealtimeDataRegionTsFileExtractor.java     | 25 ++++++----
 3 files changed, 67 insertions(+), 38 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionHybridExtractor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionHybridExtractor.java
index ffefb13daf0..026a3334d54 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionHybridExtractor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionHybridExtractor.java
@@ -73,23 +73,35 @@ public class PipeRealtimeDataRegionHybridExtractor extends 
PipeRealtimeDataRegio
 
   private void extractTabletInsertion(PipeRealtimeEvent event) {
     if (isApproachingCapacity()) {
-      event.getTsFileEpoch().migrateState(this, state -> 
TsFileEpoch.State.USING_TSFILE);
       // if the pending queue is approaching capacity, we should not extract 
any more tablet events.
       // all the data represented by the tablet events should be carried by 
the following tsfile
       // event.
+      event.getTsFileEpoch().migrateState(this, state -> 
TsFileEpoch.State.USING_TSFILE);
+      LOGGER.info(
+          "extractTabletInsertion: pending queue of 
PipeRealtimeDataRegionHybridExtractor "
+              + "is approaching capacity, discard tablet event {}, change 
state of tsfile epoch to {}",
+          event,
+          event.getTsFileEpoch().getState(this));
+
+      // Ignore the tablet event.
+      
event.decreaseReferenceCount(PipeRealtimeDataRegionHybridExtractor.class.getName());
       return;
     }
 
     if 
(!event.getTsFileEpoch().getState(this).equals(TsFileEpoch.State.USING_TSFILE)
         && !pendingQueue.waitedOffer(event)) {
-      LOGGER.warn(
-          "extractTabletInsertion: pending queue of 
PipeRealtimeDataRegionHybridExtractor {} "
-              + "has reached capacity, discard tablet event {}, current state 
{}",
-          this,
-          event,
-          event.getTsFileEpoch().getState(this));
       // this would not happen, but just in case.
-      // UnboundedBlockingPendingQueue is unbounded, so it should never reach 
capacity.
+      // pendingQueue is unbounded, so it should never reach capacity.
+      final String errorMessage =
+          String.format(
+              "extractTabletInsertion: pending queue of 
PipeRealtimeDataRegionHybridExtractor %s "
+                  + "has reached capacity, discard tablet event %s, current 
state %s",
+              this, event, event.getTsFileEpoch().getState(this));
+      LOGGER.error(errorMessage);
+      PipeAgent.runtime().report(pipeTaskMeta, new 
PipeRuntimeNonCriticalException(errorMessage));
+
+      // Ignore the tablet event.
+      
event.decreaseReferenceCount(PipeRealtimeDataRegionHybridExtractor.class.getName());
     }
   }
 
@@ -102,14 +114,18 @@ public class PipeRealtimeDataRegionHybridExtractor 
extends PipeRealtimeDataRegio
                 state.equals(TsFileEpoch.State.EMPTY) ? 
TsFileEpoch.State.USING_TSFILE : state);
 
     if (!pendingQueue.waitedOffer(event)) {
-      LOGGER.warn(
-          "extractTsFileInsertion: pending queue of 
PipeRealtimeDataRegionHybridExtractor {} "
-              + "has reached capacity, discard TsFile event {}, current state 
{}",
-          this,
-          event,
-          event.getTsFileEpoch().getState(this));
       // this would not happen, but just in case.
-      // ListenableUnblockingPendingQueue is unbounded, so it should never 
reach capacity.
+      // pendingQueue is unbounded, so it should never reach capacity.
+      final String errorMessage =
+          String.format(
+              "extractTsFileInsertion: pending queue of 
PipeRealtimeDataRegionHybridExtractor %s "
+                  + "has reached capacity, discard TsFile event %s, current 
state %s",
+              this, event, event.getTsFileEpoch().getState(this));
+      LOGGER.error(errorMessage);
+      PipeAgent.runtime().report(pipeTaskMeta, new 
PipeRuntimeNonCriticalException(errorMessage));
+
+      // Ignore the tsfile event.
+      
event.decreaseReferenceCount(PipeRealtimeDataRegionHybridExtractor.class.getName());
     }
   }
 
@@ -138,8 +154,8 @@ public class PipeRealtimeDataRegionHybridExtractor extends 
PipeRealtimeDataRegio
                 eventToSupply.getClass(), this));
       }
 
-      
realtimeEvent.decreaseReferenceCount(PipeRealtimeDataRegionHybridExtractor.class.getName());
       if (suppliedEvent != null) {
+        
realtimeEvent.decreaseReferenceCount(PipeRealtimeDataRegionHybridExtractor.class.getName());
         return suppliedEvent;
       }
 
@@ -166,7 +182,10 @@ public class PipeRealtimeDataRegionHybridExtractor extends 
PipeRealtimeDataRegio
         // this event is not reliable anymore. but the data represented by 
this event
         // has been carried by the following tsfile event, so we can just 
discard this event.
         event.getTsFileEpoch().migrateState(this, state -> 
TsFileEpoch.State.USING_TSFILE);
-        LOGGER.warn("Increase reference count for event {} error.", event);
+        LOGGER.warn(
+            "Discard tablet event {} because it is not reliable anymore. "
+                + "Change the state of TsFileEpoch to USING_TSFILE.",
+            event);
         return null;
       }
     }
@@ -182,7 +201,7 @@ public class PipeRealtimeDataRegionHybridExtractor extends 
PipeRealtimeDataRegio
             state -> {
               // this would not happen, but just in case.
               if (state.equals(TsFileEpoch.State.EMPTY)) {
-                LOGGER.warn(
+                LOGGER.error(
                     String.format("EMPTY TsFileEpoch when supplying TsFile 
Event %s", event));
                 return TsFileEpoch.State.USING_TSFILE;
               }
@@ -202,7 +221,7 @@ public class PipeRealtimeDataRegionHybridExtractor extends 
PipeRealtimeDataRegio
                     + "the reference count can not be increased, "
                     + "the data represented by this event is lost",
                 event.getEvent());
-        LOGGER.warn(errorMessage);
+        LOGGER.error(errorMessage);
         PipeAgent.runtime().report(pipeTaskMeta, new 
PipeRuntimeNonCriticalException(errorMessage));
         return null;
       }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionLogExtractor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionLogExtractor.java
index 74ff0c533b1..17891add76d 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionLogExtractor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionLogExtractor.java
@@ -48,18 +48,23 @@ public class PipeRealtimeDataRegionLogExtractor extends 
PipeRealtimeDataRegionEx
     event.getTsFileEpoch().migrateState(this, state -> 
TsFileEpoch.State.USING_TABLET);
 
     if (!(event.getEvent() instanceof TabletInsertionEvent)) {
+      
event.decreaseReferenceCount(PipeRealtimeDataRegionLogExtractor.class.getName());
       return;
     }
 
     if (!pendingQueue.waitedOffer(event)) {
-      LOGGER.warn(
-          "extract: pending queue of PipeRealtimeDataRegionLogExtractor {} "
-              + "has reached capacity, discard tablet event {}, current state 
{}",
-          this,
-          event,
-          event.getTsFileEpoch().getState(this));
       // this would not happen, but just in case.
-      // ListenableUnblockingPendingQueue is unbounded, so it should never 
reach capacity.
+      // pendingQueue is unbounded, so it should never reach capacity.
+      final String errorMessage =
+          String.format(
+              "extract: pending queue of PipeRealtimeDataRegionLogExtractor %s 
"
+                  + "has reached capacity, discard tablet event %s, current 
state %s",
+              this, event, event.getTsFileEpoch().getState(this));
+      LOGGER.error(errorMessage);
+      PipeAgent.runtime().report(pipeTaskMeta, new 
PipeRuntimeNonCriticalException(errorMessage));
+
+      // ignore this event.
+      
event.decreaseReferenceCount(PipeRealtimeDataRegionLogExtractor.class.getName());
     }
   }
 
@@ -93,12 +98,12 @@ public class PipeRealtimeDataRegionLogExtractor extends 
PipeRealtimeDataRegionEx
                     + "the reference count can not be increased, "
                     + "the data represented by this event is lost",
                 realtimeEvent.getEvent());
-        LOGGER.warn(errorMessage);
+        LOGGER.error(errorMessage);
         PipeAgent.runtime().report(pipeTaskMeta, new 
PipeRuntimeNonCriticalException(errorMessage));
       }
 
-      
realtimeEvent.decreaseReferenceCount(PipeRealtimeDataRegionLogExtractor.class.getName());
       if (suppliedEvent != null) {
+        
realtimeEvent.decreaseReferenceCount(PipeRealtimeDataRegionLogExtractor.class.getName());
         return suppliedEvent;
       }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionTsFileExtractor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionTsFileExtractor.java
index bab1ea2ec46..5e97a338df1 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionTsFileExtractor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionTsFileExtractor.java
@@ -48,18 +48,23 @@ public class PipeRealtimeDataRegionTsFileExtractor extends 
PipeRealtimeDataRegio
     event.getTsFileEpoch().migrateState(this, state -> 
TsFileEpoch.State.USING_TSFILE);
 
     if (!(event.getEvent() instanceof TsFileInsertionEvent)) {
+      
event.decreaseReferenceCount(PipeRealtimeDataRegionTsFileExtractor.class.getName());
       return;
     }
 
     if (!pendingQueue.waitedOffer(event)) {
-      LOGGER.warn(
-          "extract: pending queue of PipeRealtimeDataRegionTsFileExtractor {} "
-              + "has reached capacity, discard TsFile event {}, current state 
{}",
-          this,
-          event,
-          event.getTsFileEpoch().getState(this));
-      // this would not happen, but just in case.
-      // ListenableUnblockingPendingQueue is unbounded, so it should never 
reach capacity.
+      // This would not happen, but just in case.
+      // Pending is unbounded, so it should never reach capacity.
+      final String errorMessage =
+          String.format(
+              "extract: pending queue of PipeRealtimeDataRegionTsFileExtractor 
%s "
+                  + "has reached capacity, discard TsFile event %s, current 
state %s",
+              this, event, event.getTsFileEpoch().getState(this));
+      LOGGER.error(errorMessage);
+      PipeAgent.runtime().report(pipeTaskMeta, new 
PipeRuntimeNonCriticalException(errorMessage));
+
+      // Ignore the event.
+      
event.decreaseReferenceCount(PipeRealtimeDataRegionTsFileExtractor.class.getName());
     }
   }
 
@@ -93,12 +98,12 @@ public class PipeRealtimeDataRegionTsFileExtractor extends 
PipeRealtimeDataRegio
                     + "the reference count can not be increased, "
                     + "the data represented by this event is lost",
                 realtimeEvent.getEvent());
-        LOGGER.warn(errorMessage);
+        LOGGER.error(errorMessage);
         PipeAgent.runtime().report(pipeTaskMeta, new 
PipeRuntimeNonCriticalException(errorMessage));
       }
 
-      
realtimeEvent.decreaseReferenceCount(PipeRealtimeDataRegionTsFileExtractor.class.getName());
       if (suppliedEvent != null) {
+        
realtimeEvent.decreaseReferenceCount(PipeRealtimeDataRegionTsFileExtractor.class.getName());
         return suppliedEvent;
       }
 

Reply via email to