This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch pipe-ude
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit d5ab3a354711cae20dc4730874e3e13ffab14b35
Author: Steve Yurong Su <[email protected]>
AuthorDate: Mon Jan 29 12:58:29 2024 +0800

    Pipe: support reporting progress by UserDefinedEvent
---
 .../iotdb/pipe/api/event/UserDefinedEvent.java     | 26 +++++++++
 .../apache/iotdb/db/pipe/event/EnrichedEvent.java  |  4 ++
 .../db/pipe/event/UserDefinedEnrichedEvent.java    | 67 ++++++++++++++++++++++
 .../pipe/task/connection/PipeEventCollector.java   |  3 +
 .../subtask/connector/PipeConnectorSubtask.java    |  6 +-
 .../subtask/processor/PipeProcessorSubtask.java    | 11 +++-
 6 files changed, 114 insertions(+), 3 deletions(-)

diff --git 
a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/event/UserDefinedEvent.java
 
b/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/event/UserDefinedEvent.java
new file mode 100644
index 00000000000..ed706c7c5e6
--- /dev/null
+++ 
b/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/event/UserDefinedEvent.java
@@ -0,0 +1,26 @@
+package org.apache.iotdb.pipe.api.event;
+
+/**
+ * User defined event is used to wrap data generated by users, keeping a 
source event to
+ * automatically record the processing progress of the source event.
+ */
+public abstract class UserDefinedEvent implements Event {
+
+  /** The user defined event is generated from this source event. */
+  protected final Event sourceEvent;
+
+  /** @param sourceEvent the source event of this user defined event */
+  protected UserDefinedEvent(Event sourceEvent) {
+    this.sourceEvent = parseRootSourceEvent(sourceEvent);
+  }
+
+  private Event parseRootSourceEvent(Event sourceEvent) {
+    return sourceEvent instanceof UserDefinedEvent
+        ? ((UserDefinedEvent) sourceEvent).getSourceEvent()
+        : sourceEvent;
+  }
+
+  public Event getSourceEvent() {
+    return sourceEvent;
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/EnrichedEvent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/EnrichedEvent.java
index a2a98f78ad0..894c5eae035 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/EnrichedEvent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/EnrichedEvent.java
@@ -175,6 +175,10 @@ public abstract class EnrichedEvent implements Event {
     return pipeName;
   }
 
+  public final PipeTaskMeta getPipeTaskMeta() {
+    return pipeTaskMeta;
+  }
+
   /**
    * Get the pattern of this event.
    *
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/UserDefinedEnrichedEvent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/UserDefinedEnrichedEvent.java
new file mode 100644
index 00000000000..96b6667f9f8
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/UserDefinedEnrichedEvent.java
@@ -0,0 +1,67 @@
+package org.apache.iotdb.db.pipe.event;
+
+import org.apache.iotdb.commons.consensus.index.ProgressIndex;
+import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
+import org.apache.iotdb.pipe.api.event.Event;
+import org.apache.iotdb.pipe.api.event.UserDefinedEvent;
+
+public class UserDefinedEnrichedEvent extends EnrichedEvent {
+
+  private final UserDefinedEvent userDefinedEvent;
+  private final EnrichedEvent enrichedEvent;
+
+  public static Event maybeOf(Event event) {
+    return event instanceof UserDefinedEvent
+            && ((UserDefinedEvent) event).getSourceEvent() instanceof 
EnrichedEvent
+        ? new UserDefinedEnrichedEvent(
+            (UserDefinedEvent) event, (EnrichedEvent) ((UserDefinedEvent) 
event).getSourceEvent())
+        : event;
+  }
+
+  private UserDefinedEnrichedEvent(UserDefinedEvent userDefinedEvent, 
EnrichedEvent enrichedEvent) {
+    super(
+        enrichedEvent.getPipeName(),
+        enrichedEvent.getPipeTaskMeta(),
+        enrichedEvent.getPattern(),
+        enrichedEvent.getStartTime(),
+        enrichedEvent.getEndTime());
+    this.userDefinedEvent = userDefinedEvent;
+    this.enrichedEvent = enrichedEvent;
+  }
+
+  public UserDefinedEvent getUserDefinedEvent() {
+    return userDefinedEvent;
+  }
+
+  @Override
+  public boolean internallyIncreaseResourceReferenceCount(String 
holderMessage) {
+    return 
enrichedEvent.internallyIncreaseResourceReferenceCount(holderMessage);
+  }
+
+  @Override
+  public boolean internallyDecreaseResourceReferenceCount(String 
holderMessage) {
+    return 
enrichedEvent.internallyDecreaseResourceReferenceCount(holderMessage);
+  }
+
+  @Override
+  public ProgressIndex getProgressIndex() {
+    return enrichedEvent.getProgressIndex();
+  }
+
+  @Override
+  public EnrichedEvent shallowCopySelfAndBindPipeTaskMetaForProgressReport(
+      String pipeName, PipeTaskMeta pipeTaskMeta, String pattern, long 
startTime, long endTime) {
+    return enrichedEvent.shallowCopySelfAndBindPipeTaskMetaForProgressReport(
+        pipeName, pipeTaskMeta, pattern, startTime, endTime);
+  }
+
+  @Override
+  public boolean isGeneratedByPipe() {
+    return enrichedEvent.isGeneratedByPipe();
+  }
+
+  @Override
+  public boolean isEventTimeOverlappedWithTimeRange() {
+    return enrichedEvent.isEventTimeOverlappedWithTimeRange();
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/connection/PipeEventCollector.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/connection/PipeEventCollector.java
index 727b976b098..c0aaef3a44f 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/connection/PipeEventCollector.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/connection/PipeEventCollector.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.db.pipe.task.connection;
 
 import 
org.apache.iotdb.commons.pipe.task.connection.BoundedBlockingPendingQueue;
 import org.apache.iotdb.db.pipe.event.EnrichedEvent;
+import org.apache.iotdb.db.pipe.event.UserDefinedEnrichedEvent;
 import org.apache.iotdb.db.pipe.event.common.heartbeat.PipeHeartbeatEvent;
 import org.apache.iotdb.db.pipe.progress.committer.PipeEventCommitManager;
 import org.apache.iotdb.pipe.api.collector.EventCollector;
@@ -47,6 +48,8 @@ public class PipeEventCollector implements EventCollector, 
AutoCloseable {
 
   @Override
   public synchronized void collect(Event event) {
+    event = UserDefinedEnrichedEvent.maybeOf(event);
+
     if (event instanceof EnrichedEvent) {
       ((EnrichedEvent) 
event).increaseReferenceCount(PipeEventCollector.class.getName());
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtask.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtask.java
index f472a07fa5f..0f82f271710 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtask.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtask.java
@@ -26,6 +26,7 @@ import org.apache.iotdb.commons.pipe.task.DecoratingLock;
 import 
org.apache.iotdb.commons.pipe.task.connection.BoundedBlockingPendingQueue;
 import 
org.apache.iotdb.db.pipe.connector.protocol.thrift.async.IoTDBThriftAsyncConnector;
 import org.apache.iotdb.db.pipe.event.EnrichedEvent;
+import org.apache.iotdb.db.pipe.event.UserDefinedEnrichedEvent;
 import org.apache.iotdb.db.pipe.event.common.heartbeat.PipeHeartbeatEvent;
 import org.apache.iotdb.db.pipe.metric.PipeConnectorMetrics;
 import org.apache.iotdb.db.pipe.task.connection.PipeEventCollector;
@@ -142,7 +143,10 @@ public class PipeConnectorSubtask extends 
PipeDataNodeSubtask {
       } else if (event instanceof PipeHeartbeatEvent) {
         transferHeartbeatEvent((PipeHeartbeatEvent) event);
       } else {
-        outputPipeConnector.transfer(event);
+        outputPipeConnector.transfer(
+            event instanceof UserDefinedEnrichedEvent
+                ? ((UserDefinedEnrichedEvent) event).getUserDefinedEvent()
+                : event);
       }
 
       releaseLastEvent(true);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/processor/PipeProcessorSubtask.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/processor/PipeProcessorSubtask.java
index 149541aa82b..365b28191f9 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/processor/PipeProcessorSubtask.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/processor/PipeProcessorSubtask.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.db.pipe.task.subtask.processor;
 
 import org.apache.iotdb.commons.pipe.execution.scheduler.PipeSubtaskScheduler;
 import org.apache.iotdb.commons.pipe.task.EventSupplier;
+import org.apache.iotdb.db.pipe.event.UserDefinedEnrichedEvent;
 import org.apache.iotdb.db.pipe.event.common.heartbeat.PipeHeartbeatEvent;
 import org.apache.iotdb.db.pipe.metric.PipeProcessorMetrics;
 import org.apache.iotdb.db.pipe.task.connection.PipeEventCollector;
@@ -97,7 +98,9 @@ public class PipeProcessorSubtask extends PipeDataNodeSubtask 
{
       return false;
     }
 
-    final Event event = lastEvent != null ? lastEvent : 
inputEventSupplier.supply();
+    final Event event =
+        UserDefinedEnrichedEvent.maybeOf(
+            lastEvent != null ? lastEvent : inputEventSupplier.supply());
     // Record the last event for retry when exception occurs
     setLastEvent(event);
     if (
@@ -125,7 +128,11 @@ public class PipeProcessorSubtask extends 
PipeDataNodeSubtask {
           ((PipeHeartbeatEvent) event).onProcessed();
           PipeProcessorMetrics.getInstance().markPipeHeartbeatEvent(taskID);
         } else {
-          pipeProcessor.process(event, outputEventCollector);
+          pipeProcessor.process(
+              event instanceof UserDefinedEnrichedEvent
+                  ? ((UserDefinedEnrichedEvent) event).getUserDefinedEvent()
+                  : event,
+              outputEventCollector);
         }
       }
 

Reply via email to