This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch ty/TableModelGrammar
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit d78326a01c5f9a6eab7c825e64ac4954ec9865b7
Author: Caideyipi <[email protected]>
AuthorDate: Wed Jul 24 18:42:31 2024 +0800

    Pipe: Reported the progress of the non-forwarding events (#13008)
    
    Co-authored-by: Steve Yurong Su <[email protected]>
    (cherry picked from commit cb0765836fd986fca7e09db60724879987d9c22c)
---
 .../event/realtime/PipeRealtimeEventFactory.java   | 15 ++++++++++----
 .../realtime/PipeRealtimeDataRegionExtractor.java  | 16 ++++++++++-----
 .../PipeRealtimeDataRegionHeartbeatExtractor.java  |  3 +++
 .../PipeRealtimeDataRegionHybridExtractor.java     |  8 +++++---
 .../PipeRealtimeDataRegionLogExtractor.java        |  8 +++++---
 .../PipeRealtimeDataRegionTsFileExtractor.java     |  8 +++++---
 .../realtime/assigner/PipeDataRegionAssigner.java  | 24 +++++++++++++++++-----
 7 files changed, 59 insertions(+), 23 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/realtime/PipeRealtimeEventFactory.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/realtime/PipeRealtimeEventFactory.java
index 94793b74bc1..60786cfed25 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/realtime/PipeRealtimeEventFactory.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/realtime/PipeRealtimeEventFactory.java
@@ -19,6 +19,7 @@
 
 package org.apache.iotdb.db.pipe.event.realtime;
 
+import org.apache.iotdb.commons.pipe.event.ProgressReportEvent;
 import org.apache.iotdb.db.pipe.event.common.heartbeat.PipeHeartbeatEvent;
 import 
org.apache.iotdb.db.pipe.event.common.schema.PipeSchemaRegionWritePlanEvent;
 import 
org.apache.iotdb.db.pipe.event.common.tablet.PipeInsertNodeTabletInsertionEvent;
@@ -34,13 +35,15 @@ public class PipeRealtimeEventFactory {
   private static final TsFileEpochManager TS_FILE_EPOCH_MANAGER = new 
TsFileEpochManager();
 
   public static PipeRealtimeEvent createRealtimeEvent(
-      TsFileResource resource, boolean isLoaded, boolean isGeneratedByPipe) {
+      final TsFileResource resource, final boolean isLoaded, final boolean 
isGeneratedByPipe) {
     return TS_FILE_EPOCH_MANAGER.bindPipeTsFileInsertionEvent(
         new PipeTsFileInsertionEvent(resource, isLoaded, isGeneratedByPipe), 
resource);
   }
 
   public static PipeRealtimeEvent createRealtimeEvent(
-      WALEntryHandler walEntryHandler, InsertNode insertNode, TsFileResource 
resource) {
+      final WALEntryHandler walEntryHandler,
+      final InsertNode insertNode,
+      final TsFileResource resource) {
     return TS_FILE_EPOCH_MANAGER.bindPipeInsertNodeTabletInsertionEvent(
         new PipeInsertNodeTabletInsertionEvent(
             walEntryHandler,
@@ -53,16 +56,20 @@ public class PipeRealtimeEventFactory {
   }
 
   public static PipeRealtimeEvent createRealtimeEvent(
-      String dataRegionId, boolean shouldPrintMessage) {
+      final String dataRegionId, final boolean shouldPrintMessage) {
     return new PipeRealtimeEvent(
         new PipeHeartbeatEvent(dataRegionId, shouldPrintMessage), null, null, 
null);
   }
 
-  public static PipeRealtimeEvent createRealtimeEvent(DeleteDataNode node) {
+  public static PipeRealtimeEvent createRealtimeEvent(final DeleteDataNode 
node) {
     return new PipeRealtimeEvent(
         new PipeSchemaRegionWritePlanEvent(node, node.isGeneratedByPipe()), 
null, null, null);
   }
 
+  public static PipeRealtimeEvent createRealtimeEvent(final 
ProgressReportEvent event) {
+    return new PipeRealtimeEvent(event, null, null, null);
+  }
+
   private PipeRealtimeEventFactory() {
     // factory class, do not instantiate
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionExtractor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionExtractor.java
index 138947af18d..55faf2db59c 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionExtractor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionExtractor.java
@@ -24,6 +24,7 @@ import 
org.apache.iotdb.commons.exception.pipe.PipeRuntimeNonCriticalException;
 import org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant;
 import 
org.apache.iotdb.commons.pipe.config.plugin.env.PipeTaskExtractorRuntimeEnvironment;
 import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
+import org.apache.iotdb.commons.pipe.event.ProgressReportEvent;
 import org.apache.iotdb.commons.pipe.pattern.PipePattern;
 import 
org.apache.iotdb.commons.pipe.task.connection.UnboundedBlockingPendingQueue;
 import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
@@ -287,6 +288,12 @@ public abstract class PipeRealtimeDataRegionExtractor 
implements PipeExtractor {
    * @param event the {@link Event} from the {@link StorageEngine}
    */
   public final void extract(final PipeRealtimeEvent event) {
+    // The progress report event shall be directly extracted
+    if (event.getEvent() instanceof ProgressReportEvent) {
+      extractDirectly(event);
+      return;
+    }
+
     if (isDbNameCoveredByPattern) {
       event.skipParsingPattern();
     }
@@ -369,14 +376,13 @@ public abstract class PipeRealtimeDataRegionExtractor 
implements PipeExtractor {
     }
   }
 
-  protected void extractDeletion(final PipeRealtimeEvent event) {
+  protected void extractDirectly(final PipeRealtimeEvent event) {
     if (!pendingQueue.waitedOffer(event)) {
       // This would not happen, but just in case.
       // Pending is unbounded, so it should never reach capacity.
       final String errorMessage =
           String.format(
-              "extract: pending queue of %s %s "
-                  + "has reached capacity, discard deletion event %s",
+              "extract: pending queue of %s %s " + "has reached capacity, 
discard event %s",
               this.getClass().getSimpleName(), this, event);
       LOGGER.error(errorMessage);
       PipeDataNodeAgent.runtime()
@@ -404,7 +410,7 @@ public abstract class PipeRealtimeDataRegionExtractor 
implements PipeExtractor {
     }
   }
 
-  protected Event supplyDeletion(final PipeRealtimeEvent event) {
+  protected Event supplyDirectly(final PipeRealtimeEvent event) {
     if 
(event.increaseReferenceCount(PipeRealtimeDataRegionExtractor.class.getName())) 
{
       return event.getEvent();
     } else {
@@ -413,7 +419,7 @@ public abstract class PipeRealtimeDataRegionExtractor 
implements PipeExtractor {
       // event and report the exception to PipeRuntimeAgent.
       final String errorMessage =
           String.format(
-              "TsFile Event %s can not be supplied because "
+              "Event %s can not be supplied because "
                   + "the reference count can not be increased, "
                   + "the data represented by this event is lost",
               event.getEvent());
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionHeartbeatExtractor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionHeartbeatExtractor.java
index 39a64989048..1df62eecc92 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionHeartbeatExtractor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionHeartbeatExtractor.java
@@ -19,6 +19,7 @@
 
 package org.apache.iotdb.db.pipe.extractor.dataregion.realtime;
 
+import org.apache.iotdb.commons.pipe.event.ProgressReportEvent;
 import org.apache.iotdb.db.pipe.event.common.heartbeat.PipeHeartbeatEvent;
 import org.apache.iotdb.db.pipe.event.realtime.PipeRealtimeEvent;
 import org.apache.iotdb.pipe.api.event.Event;
@@ -35,6 +36,8 @@ public class PipeRealtimeDataRegionHeartbeatExtractor extends 
PipeRealtimeDataRe
       // only supply PipeHeartbeatEvent
       if (realtimeEvent.getEvent() instanceof PipeHeartbeatEvent) {
         suppliedEvent = supplyHeartbeat(realtimeEvent);
+      } else if (realtimeEvent.getEvent() instanceof ProgressReportEvent) {
+        suppliedEvent = supplyDirectly(realtimeEvent);
       }
 
       realtimeEvent.decreaseReferenceCount(
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionHybridExtractor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionHybridExtractor.java
index 15a020cd577..ced920f378c 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionHybridExtractor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionHybridExtractor.java
@@ -21,6 +21,7 @@ package 
org.apache.iotdb.db.pipe.extractor.dataregion.realtime;
 
 import org.apache.iotdb.commons.exception.pipe.PipeRuntimeNonCriticalException;
 import org.apache.iotdb.commons.pipe.config.PipeConfig;
+import org.apache.iotdb.commons.pipe.event.ProgressReportEvent;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.pipe.agent.PipeDataNodeAgent;
 import org.apache.iotdb.db.pipe.event.common.heartbeat.PipeHeartbeatEvent;
@@ -57,7 +58,7 @@ public class PipeRealtimeDataRegionHybridExtractor extends 
PipeRealtimeDataRegio
     } else if (eventToExtract instanceof PipeHeartbeatEvent) {
       extractHeartbeat(event);
     } else if (eventToExtract instanceof PipeSchemaRegionWritePlanEvent) {
-      extractDeletion(event);
+      extractDirectly(event);
     } else {
       throw new UnsupportedOperationException(
           String.format(
@@ -259,8 +260,9 @@ public class PipeRealtimeDataRegionHybridExtractor extends 
PipeRealtimeDataRegio
         suppliedEvent = supplyTsFileInsertion(realtimeEvent);
       } else if (eventToSupply instanceof PipeHeartbeatEvent) {
         suppliedEvent = supplyHeartbeat(realtimeEvent);
-      } else if (eventToSupply instanceof PipeSchemaRegionWritePlanEvent) {
-        suppliedEvent = supplyDeletion(realtimeEvent);
+      } else if (eventToSupply instanceof PipeSchemaRegionWritePlanEvent
+          || eventToSupply instanceof ProgressReportEvent) {
+        suppliedEvent = supplyDirectly(realtimeEvent);
       } else {
         throw new UnsupportedOperationException(
             String.format(
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionLogExtractor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionLogExtractor.java
index 40d8616a181..4b300355c80 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionLogExtractor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionLogExtractor.java
@@ -20,6 +20,7 @@
 package org.apache.iotdb.db.pipe.extractor.dataregion.realtime;
 
 import org.apache.iotdb.commons.exception.pipe.PipeRuntimeNonCriticalException;
+import org.apache.iotdb.commons.pipe.event.ProgressReportEvent;
 import org.apache.iotdb.db.pipe.agent.PipeDataNodeAgent;
 import org.apache.iotdb.db.pipe.event.common.heartbeat.PipeHeartbeatEvent;
 import 
org.apache.iotdb.db.pipe.event.common.schema.PipeSchemaRegionWritePlanEvent;
@@ -49,7 +50,7 @@ public class PipeRealtimeDataRegionLogExtractor extends 
PipeRealtimeDataRegionEx
     } else if (eventToExtract instanceof PipeHeartbeatEvent) {
       extractHeartbeat(event);
     } else if (eventToExtract instanceof PipeSchemaRegionWritePlanEvent) {
-      extractDeletion(event);
+      extractDirectly(event);
     } else {
       throw new UnsupportedOperationException(
           String.format(
@@ -130,8 +131,9 @@ public class PipeRealtimeDataRegionLogExtractor extends 
PipeRealtimeDataRegionEx
 
       if (realtimeEvent.getEvent() instanceof PipeHeartbeatEvent) {
         suppliedEvent = supplyHeartbeat(realtimeEvent);
-      } else if (realtimeEvent.getEvent() instanceof 
PipeSchemaRegionWritePlanEvent) {
-        suppliedEvent = supplyDeletion(realtimeEvent);
+      } else if (realtimeEvent.getEvent() instanceof 
PipeSchemaRegionWritePlanEvent
+          || realtimeEvent.getEvent() instanceof ProgressReportEvent) {
+        suppliedEvent = supplyDirectly(realtimeEvent);
       } else if (realtimeEvent.increaseReferenceCount(
           PipeRealtimeDataRegionLogExtractor.class.getName())) {
         suppliedEvent = realtimeEvent.getEvent();
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionTsFileExtractor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionTsFileExtractor.java
index 57e00fa51c1..8072499b3da 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionTsFileExtractor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionTsFileExtractor.java
@@ -20,6 +20,7 @@
 package org.apache.iotdb.db.pipe.extractor.dataregion.realtime;
 
 import org.apache.iotdb.commons.exception.pipe.PipeRuntimeNonCriticalException;
+import org.apache.iotdb.commons.pipe.event.ProgressReportEvent;
 import org.apache.iotdb.db.pipe.agent.PipeDataNodeAgent;
 import org.apache.iotdb.db.pipe.event.common.heartbeat.PipeHeartbeatEvent;
 import 
org.apache.iotdb.db.pipe.event.common.schema.PipeSchemaRegionWritePlanEvent;
@@ -44,7 +45,7 @@ public class PipeRealtimeDataRegionTsFileExtractor extends 
PipeRealtimeDataRegio
     }
 
     if (event.getEvent() instanceof PipeSchemaRegionWritePlanEvent) {
-      extractDeletion(event);
+      extractDirectly(event);
       return;
     }
 
@@ -91,8 +92,9 @@ public class PipeRealtimeDataRegionTsFileExtractor extends 
PipeRealtimeDataRegio
 
       if (realtimeEvent.getEvent() instanceof PipeHeartbeatEvent) {
         suppliedEvent = supplyHeartbeat(realtimeEvent);
-      } else if (realtimeEvent.getEvent() instanceof 
PipeSchemaRegionWritePlanEvent) {
-        suppliedEvent = supplyDeletion(realtimeEvent);
+      } else if (realtimeEvent.getEvent() instanceof 
PipeSchemaRegionWritePlanEvent
+          || realtimeEvent.getEvent() instanceof ProgressReportEvent) {
+        suppliedEvent = supplyDirectly(realtimeEvent);
       } else if (realtimeEvent.increaseReferenceCount(
           PipeRealtimeDataRegionTsFileExtractor.class.getName())) {
         suppliedEvent = realtimeEvent.getEvent();
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/assigner/PipeDataRegionAssigner.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/assigner/PipeDataRegionAssigner.java
index 16f9ebf61d9..1042b1924a0 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/assigner/PipeDataRegionAssigner.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/assigner/PipeDataRegionAssigner.java
@@ -20,9 +20,11 @@
 package org.apache.iotdb.db.pipe.extractor.dataregion.realtime.assigner;
 
 import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
+import org.apache.iotdb.commons.pipe.event.ProgressReportEvent;
 import org.apache.iotdb.db.pipe.event.common.heartbeat.PipeHeartbeatEvent;
 import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent;
 import org.apache.iotdb.db.pipe.event.realtime.PipeRealtimeEvent;
+import org.apache.iotdb.db.pipe.event.realtime.PipeRealtimeEventFactory;
 import 
org.apache.iotdb.db.pipe.extractor.dataregion.realtime.PipeRealtimeDataRegionExtractor;
 import org.apache.iotdb.db.pipe.metric.PipeAssignerMetrics;
 import org.apache.iotdb.db.pipe.pattern.CachedSchemaPatternMatcher;
@@ -47,14 +49,14 @@ public class PipeDataRegionAssigner implements Closeable {
     return dataRegionId;
   }
 
-  public PipeDataRegionAssigner(String dataRegionId) {
+  public PipeDataRegionAssigner(final String dataRegionId) {
     this.matcher = new CachedSchemaPatternMatcher();
     this.disruptor = new DisruptorQueue(this::assignToExtractor);
     this.dataRegionId = dataRegionId;
     PipeAssignerMetrics.getInstance().register(this);
   }
 
-  public void publishToAssign(PipeRealtimeEvent event) {
+  public void publishToAssign(final PipeRealtimeEvent event) {
     event.increaseReferenceCount(PipeDataRegionAssigner.class.getName());
 
     disruptor.publish(event);
@@ -64,12 +66,24 @@ public class PipeDataRegionAssigner implements Closeable {
     }
   }
 
-  public void assignToExtractor(PipeRealtimeEvent event, long sequence, 
boolean endOfBatch) {
+  public void assignToExtractor(
+      final PipeRealtimeEvent event, final long sequence, final boolean 
endOfBatch) {
     matcher
         .match(event)
         .forEach(
             extractor -> {
               if (event.getEvent().isGeneratedByPipe() && 
!extractor.isForwardingPipeRequests()) {
+                final ProgressReportEvent reportEvent =
+                    new ProgressReportEvent(
+                        extractor.getPipeName(),
+                        extractor.getCreationTime(),
+                        extractor.getPipeTaskMeta(),
+                        extractor.getPipePattern(),
+                        extractor.getRealtimeDataExtractionStartTime(),
+                        extractor.getRealtimeDataExtractionEndTime());
+                reportEvent.bindProgressIndex(event.getProgressIndex());
+                
reportEvent.increaseReferenceCount(PipeDataRegionAssigner.class.getName());
+                
extractor.extract(PipeRealtimeEventFactory.createRealtimeEvent(reportEvent));
                 return;
               }
 
@@ -98,11 +112,11 @@ public class PipeDataRegionAssigner implements Closeable {
     event.decreaseReferenceCount(PipeDataRegionAssigner.class.getName(), 
false);
   }
 
-  public void startAssignTo(PipeRealtimeDataRegionExtractor extractor) {
+  public void startAssignTo(final PipeRealtimeDataRegionExtractor extractor) {
     matcher.register(extractor);
   }
 
-  public void stopAssignTo(PipeRealtimeDataRegionExtractor extractor) {
+  public void stopAssignTo(final PipeRealtimeDataRegionExtractor extractor) {
     matcher.deregister(extractor);
   }
 

Reply via email to