This is an automated email from the ASF dual-hosted git repository. rong pushed a commit to branch pipe-ude in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit d5ab3a354711cae20dc4730874e3e13ffab14b35 Author: Steve Yurong Su <[email protected]> AuthorDate: Mon Jan 29 12:58:29 2024 +0800 Pipe: support reporting progress by UserDefinedEvent --- .../iotdb/pipe/api/event/UserDefinedEvent.java | 26 +++++++++ .../apache/iotdb/db/pipe/event/EnrichedEvent.java | 4 ++ .../db/pipe/event/UserDefinedEnrichedEvent.java | 67 ++++++++++++++++++++++ .../pipe/task/connection/PipeEventCollector.java | 3 + .../subtask/connector/PipeConnectorSubtask.java | 6 +- .../subtask/processor/PipeProcessorSubtask.java | 11 +++- 6 files changed, 114 insertions(+), 3 deletions(-) diff --git a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/event/UserDefinedEvent.java b/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/event/UserDefinedEvent.java new file mode 100644 index 00000000000..ed706c7c5e6 --- /dev/null +++ b/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/event/UserDefinedEvent.java @@ -0,0 +1,26 @@ +package org.apache.iotdb.pipe.api.event; + +/** + * User defined event is used to wrap data generated by users, keeping a source event to + * automatically record the processing progress of the source event. + */ +public abstract class UserDefinedEvent implements Event { + + /** The user defined event is generated from this source event. */ + protected final Event sourceEvent; + + /** @param sourceEvent the source event of this user defined event */ + protected UserDefinedEvent(Event sourceEvent) { + this.sourceEvent = parseRootSourceEvent(sourceEvent); + } + + private Event parseRootSourceEvent(Event sourceEvent) { + return sourceEvent instanceof UserDefinedEvent + ? ((UserDefinedEvent) sourceEvent).getSourceEvent() + : sourceEvent; + } + + public Event getSourceEvent() { + return sourceEvent; + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/EnrichedEvent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/EnrichedEvent.java index a2a98f78ad0..894c5eae035 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/EnrichedEvent.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/EnrichedEvent.java @@ -175,6 +175,10 @@ public abstract class EnrichedEvent implements Event { return pipeName; } + public final PipeTaskMeta getPipeTaskMeta() { + return pipeTaskMeta; + } + /** * Get the pattern of this event. * diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/UserDefinedEnrichedEvent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/UserDefinedEnrichedEvent.java new file mode 100644 index 00000000000..96b6667f9f8 --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/UserDefinedEnrichedEvent.java @@ -0,0 +1,67 @@ +package org.apache.iotdb.db.pipe.event; + +import org.apache.iotdb.commons.consensus.index.ProgressIndex; +import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta; +import org.apache.iotdb.pipe.api.event.Event; +import org.apache.iotdb.pipe.api.event.UserDefinedEvent; + +public class UserDefinedEnrichedEvent extends EnrichedEvent { + + private final UserDefinedEvent userDefinedEvent; + private final EnrichedEvent enrichedEvent; + + public static Event maybeOf(Event event) { + return event instanceof UserDefinedEvent + && ((UserDefinedEvent) event).getSourceEvent() instanceof EnrichedEvent + ? new UserDefinedEnrichedEvent( + (UserDefinedEvent) event, (EnrichedEvent) ((UserDefinedEvent) event).getSourceEvent()) + : event; + } + + private UserDefinedEnrichedEvent(UserDefinedEvent userDefinedEvent, EnrichedEvent enrichedEvent) { + super( + enrichedEvent.getPipeName(), + enrichedEvent.getPipeTaskMeta(), + enrichedEvent.getPattern(), + enrichedEvent.getStartTime(), + enrichedEvent.getEndTime()); + this.userDefinedEvent = userDefinedEvent; + this.enrichedEvent = enrichedEvent; + } + + public UserDefinedEvent getUserDefinedEvent() { + return userDefinedEvent; + } + + @Override + public boolean internallyIncreaseResourceReferenceCount(String holderMessage) { + return enrichedEvent.internallyIncreaseResourceReferenceCount(holderMessage); + } + + @Override + public boolean internallyDecreaseResourceReferenceCount(String holderMessage) { + return enrichedEvent.internallyDecreaseResourceReferenceCount(holderMessage); + } + + @Override + public ProgressIndex getProgressIndex() { + return enrichedEvent.getProgressIndex(); + } + + @Override + public EnrichedEvent shallowCopySelfAndBindPipeTaskMetaForProgressReport( + String pipeName, PipeTaskMeta pipeTaskMeta, String pattern, long startTime, long endTime) { + return enrichedEvent.shallowCopySelfAndBindPipeTaskMetaForProgressReport( + pipeName, pipeTaskMeta, pattern, startTime, endTime); + } + + @Override + public boolean isGeneratedByPipe() { + return enrichedEvent.isGeneratedByPipe(); + } + + @Override + public boolean isEventTimeOverlappedWithTimeRange() { + return enrichedEvent.isEventTimeOverlappedWithTimeRange(); + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/connection/PipeEventCollector.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/connection/PipeEventCollector.java index 727b976b098..c0aaef3a44f 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/connection/PipeEventCollector.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/connection/PipeEventCollector.java @@ -21,6 +21,7 @@ package org.apache.iotdb.db.pipe.task.connection; import org.apache.iotdb.commons.pipe.task.connection.BoundedBlockingPendingQueue; import org.apache.iotdb.db.pipe.event.EnrichedEvent; +import org.apache.iotdb.db.pipe.event.UserDefinedEnrichedEvent; import org.apache.iotdb.db.pipe.event.common.heartbeat.PipeHeartbeatEvent; import org.apache.iotdb.db.pipe.progress.committer.PipeEventCommitManager; import org.apache.iotdb.pipe.api.collector.EventCollector; @@ -47,6 +48,8 @@ public class PipeEventCollector implements EventCollector, AutoCloseable { @Override public synchronized void collect(Event event) { + event = UserDefinedEnrichedEvent.maybeOf(event); + if (event instanceof EnrichedEvent) { ((EnrichedEvent) event).increaseReferenceCount(PipeEventCollector.class.getName()); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtask.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtask.java index f472a07fa5f..0f82f271710 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtask.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtask.java @@ -26,6 +26,7 @@ import org.apache.iotdb.commons.pipe.task.DecoratingLock; import org.apache.iotdb.commons.pipe.task.connection.BoundedBlockingPendingQueue; import org.apache.iotdb.db.pipe.connector.protocol.thrift.async.IoTDBThriftAsyncConnector; import org.apache.iotdb.db.pipe.event.EnrichedEvent; +import org.apache.iotdb.db.pipe.event.UserDefinedEnrichedEvent; import org.apache.iotdb.db.pipe.event.common.heartbeat.PipeHeartbeatEvent; import org.apache.iotdb.db.pipe.metric.PipeConnectorMetrics; import org.apache.iotdb.db.pipe.task.connection.PipeEventCollector; @@ -142,7 +143,10 @@ public class PipeConnectorSubtask extends PipeDataNodeSubtask { } else if (event instanceof PipeHeartbeatEvent) { transferHeartbeatEvent((PipeHeartbeatEvent) event); } else { - outputPipeConnector.transfer(event); + outputPipeConnector.transfer( + event instanceof UserDefinedEnrichedEvent + ? ((UserDefinedEnrichedEvent) event).getUserDefinedEvent() + : event); } releaseLastEvent(true); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/processor/PipeProcessorSubtask.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/processor/PipeProcessorSubtask.java index 149541aa82b..365b28191f9 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/processor/PipeProcessorSubtask.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/processor/PipeProcessorSubtask.java @@ -21,6 +21,7 @@ package org.apache.iotdb.db.pipe.task.subtask.processor; import org.apache.iotdb.commons.pipe.execution.scheduler.PipeSubtaskScheduler; import org.apache.iotdb.commons.pipe.task.EventSupplier; +import org.apache.iotdb.db.pipe.event.UserDefinedEnrichedEvent; import org.apache.iotdb.db.pipe.event.common.heartbeat.PipeHeartbeatEvent; import org.apache.iotdb.db.pipe.metric.PipeProcessorMetrics; import org.apache.iotdb.db.pipe.task.connection.PipeEventCollector; @@ -97,7 +98,9 @@ public class PipeProcessorSubtask extends PipeDataNodeSubtask { return false; } - final Event event = lastEvent != null ? lastEvent : inputEventSupplier.supply(); + final Event event = + UserDefinedEnrichedEvent.maybeOf( + lastEvent != null ? lastEvent : inputEventSupplier.supply()); // Record the last event for retry when exception occurs setLastEvent(event); if ( @@ -125,7 +128,11 @@ public class PipeProcessorSubtask extends PipeDataNodeSubtask { ((PipeHeartbeatEvent) event).onProcessed(); PipeProcessorMetrics.getInstance().markPipeHeartbeatEvent(taskID); } else { - pipeProcessor.process(event, outputEventCollector); + pipeProcessor.process( + event instanceof UserDefinedEnrichedEvent + ? ((UserDefinedEnrichedEvent) event).getUserDefinedEvent() + : event, + outputEventCollector); } }
