This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new fcae2b48400 Pipe: support reporting progress by UserDefinedEvent
(#12000)
fcae2b48400 is described below
commit fcae2b48400f45bdd5a3c9cdc65f601e83ab1fa1
Author: Steve Yurong Su <[email protected]>
AuthorDate: Mon Jan 29 17:21:34 2024 +0800
Pipe: support reporting progress by UserDefinedEvent (#12000)
---
.../iotdb/pipe/api/event/UserDefinedEvent.java | 58 +++++++++++++++
.../apache/iotdb/db/pipe/event/EnrichedEvent.java | 4 +
.../db/pipe/event/UserDefinedEnrichedEvent.java | 86 ++++++++++++++++++++++
.../subtask/connector/PipeConnectorSubtask.java | 11 ++-
.../subtask/processor/PipeProcessorSubtask.java | 12 ++-
5 files changed, 167 insertions(+), 4 deletions(-)
diff --git
a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/event/UserDefinedEvent.java
b/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/event/UserDefinedEvent.java
new file mode 100644
index 00000000000..4c4ae4975fd
--- /dev/null
+++
b/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/event/UserDefinedEvent.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.pipe.api.event;
+
+/**
+ * User defined event is used to wrap data generated by users, keeping a
source event to
+ * automatically report the processing progress to pipe engine.
+ */
+public abstract class UserDefinedEvent implements Event {
+
+ /** The user defined event is generated from this source event. */
+ protected final Event sourceEvent;
+
+ /**
+ * @param sourceEvent The source event of this user defined event which is
used to report the
+ * processing progress to pipe engine. Please notice that the source
event should satisfy the
+ * following conditions: 1. A source event can only be assigned to one
user defined event. 2.
+ * If more than one user defined events are generated from the same
source event, only the
+ * last generated user defined event can be assigned with the source
event, others should be
+ * assigned {@code null}, or call {@link #UserDefinedEvent()} to
generate a user defined event
+ * without source event.
+ */
+ protected UserDefinedEvent(Event sourceEvent) {
+ this.sourceEvent = parseRootSourceEvent(sourceEvent);
+ }
+
+ /** Generate a user defined event without source event. */
+ protected UserDefinedEvent() {
+ this.sourceEvent = null;
+ }
+
+ private Event parseRootSourceEvent(Event sourceEvent) {
+ return sourceEvent instanceof UserDefinedEvent
+ ? ((UserDefinedEvent) sourceEvent).getSourceEvent()
+ : sourceEvent;
+ }
+
+ public Event getSourceEvent() {
+ return sourceEvent;
+ }
+}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/EnrichedEvent.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/EnrichedEvent.java
index a2a98f78ad0..894c5eae035 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/EnrichedEvent.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/EnrichedEvent.java
@@ -175,6 +175,10 @@ public abstract class EnrichedEvent implements Event {
return pipeName;
}
+ public final PipeTaskMeta getPipeTaskMeta() {
+ return pipeTaskMeta;
+ }
+
/**
* Get the pattern of this event.
*
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/UserDefinedEnrichedEvent.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/UserDefinedEnrichedEvent.java
new file mode 100644
index 00000000000..816478a7e68
--- /dev/null
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/UserDefinedEnrichedEvent.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.event;
+
+import org.apache.iotdb.commons.consensus.index.ProgressIndex;
+import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
+import org.apache.iotdb.pipe.api.event.Event;
+import org.apache.iotdb.pipe.api.event.UserDefinedEvent;
+
+public class UserDefinedEnrichedEvent extends EnrichedEvent {
+
+ private final UserDefinedEvent userDefinedEvent;
+ private final EnrichedEvent enrichedEvent;
+
+ public static Event maybeOf(Event event) {
+ return event instanceof UserDefinedEvent
+ && ((UserDefinedEvent) event).getSourceEvent() instanceof
EnrichedEvent
+ ? new UserDefinedEnrichedEvent(
+ (UserDefinedEvent) event, (EnrichedEvent) ((UserDefinedEvent)
event).getSourceEvent())
+ : event;
+ }
+
+ private UserDefinedEnrichedEvent(UserDefinedEvent userDefinedEvent,
EnrichedEvent enrichedEvent) {
+ super(
+ enrichedEvent.getPipeName(),
+ enrichedEvent.getPipeTaskMeta(),
+ enrichedEvent.getPattern(),
+ enrichedEvent.getStartTime(),
+ enrichedEvent.getEndTime());
+ this.userDefinedEvent = userDefinedEvent;
+ this.enrichedEvent = enrichedEvent;
+ }
+
+ public UserDefinedEvent getUserDefinedEvent() {
+ return userDefinedEvent;
+ }
+
+ @Override
+ public boolean internallyIncreaseResourceReferenceCount(String
holderMessage) {
+ return
enrichedEvent.internallyIncreaseResourceReferenceCount(holderMessage);
+ }
+
+ @Override
+ public boolean internallyDecreaseResourceReferenceCount(String
holderMessage) {
+ return
enrichedEvent.internallyDecreaseResourceReferenceCount(holderMessage);
+ }
+
+ @Override
+ public ProgressIndex getProgressIndex() {
+ return enrichedEvent.getProgressIndex();
+ }
+
+ @Override
+ public EnrichedEvent shallowCopySelfAndBindPipeTaskMetaForProgressReport(
+ String pipeName, PipeTaskMeta pipeTaskMeta, String pattern, long
startTime, long endTime) {
+ return enrichedEvent.shallowCopySelfAndBindPipeTaskMetaForProgressReport(
+ pipeName, pipeTaskMeta, pattern, startTime, endTime);
+ }
+
+ @Override
+ public boolean isGeneratedByPipe() {
+ return enrichedEvent.isGeneratedByPipe();
+ }
+
+ @Override
+ public boolean isEventTimeOverlappedWithTimeRange() {
+ return enrichedEvent.isEventTimeOverlappedWithTimeRange();
+ }
+}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtask.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtask.java
index f472a07fa5f..167ef3e538f 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtask.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtask.java
@@ -26,6 +26,7 @@ import org.apache.iotdb.commons.pipe.task.DecoratingLock;
import
org.apache.iotdb.commons.pipe.task.connection.BoundedBlockingPendingQueue;
import
org.apache.iotdb.db.pipe.connector.protocol.thrift.async.IoTDBThriftAsyncConnector;
import org.apache.iotdb.db.pipe.event.EnrichedEvent;
+import org.apache.iotdb.db.pipe.event.UserDefinedEnrichedEvent;
import org.apache.iotdb.db.pipe.event.common.heartbeat.PipeHeartbeatEvent;
import org.apache.iotdb.db.pipe.metric.PipeConnectorMetrics;
import org.apache.iotdb.db.pipe.task.connection.PipeEventCollector;
@@ -120,7 +121,10 @@ public class PipeConnectorSubtask extends
PipeDataNodeSubtask {
return false;
}
- final Event event = lastEvent != null ? lastEvent :
inputPendingQueue.waitedPoll();
+ final Event event =
+ lastEvent != null
+ ? lastEvent
+ : UserDefinedEnrichedEvent.maybeOf(inputPendingQueue.waitedPoll());
// Record this event for retrying on connection failure or other exceptions
setLastEvent(event);
@@ -142,7 +146,10 @@ public class PipeConnectorSubtask extends
PipeDataNodeSubtask {
} else if (event instanceof PipeHeartbeatEvent) {
transferHeartbeatEvent((PipeHeartbeatEvent) event);
} else {
- outputPipeConnector.transfer(event);
+ outputPipeConnector.transfer(
+ event instanceof UserDefinedEnrichedEvent
+ ? ((UserDefinedEnrichedEvent) event).getUserDefinedEvent()
+ : event);
}
releaseLastEvent(true);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/processor/PipeProcessorSubtask.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/processor/PipeProcessorSubtask.java
index 149541aa82b..6ab50749760 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/processor/PipeProcessorSubtask.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/processor/PipeProcessorSubtask.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.db.pipe.task.subtask.processor;
import org.apache.iotdb.commons.pipe.execution.scheduler.PipeSubtaskScheduler;
import org.apache.iotdb.commons.pipe.task.EventSupplier;
+import org.apache.iotdb.db.pipe.event.UserDefinedEnrichedEvent;
import org.apache.iotdb.db.pipe.event.common.heartbeat.PipeHeartbeatEvent;
import org.apache.iotdb.db.pipe.metric.PipeProcessorMetrics;
import org.apache.iotdb.db.pipe.task.connection.PipeEventCollector;
@@ -97,7 +98,10 @@ public class PipeProcessorSubtask extends
PipeDataNodeSubtask {
return false;
}
- final Event event = lastEvent != null ? lastEvent :
inputEventSupplier.supply();
+ final Event event =
+ lastEvent != null
+ ? lastEvent
+ : UserDefinedEnrichedEvent.maybeOf(inputEventSupplier.supply());
// Record the last event for retry when exception occurs
setLastEvent(event);
if (
@@ -125,7 +129,11 @@ public class PipeProcessorSubtask extends
PipeDataNodeSubtask {
((PipeHeartbeatEvent) event).onProcessed();
PipeProcessorMetrics.getInstance().markPipeHeartbeatEvent(taskID);
} else {
- pipeProcessor.process(event, outputEventCollector);
+ pipeProcessor.process(
+ event instanceof UserDefinedEnrichedEvent
+ ? ((UserDefinedEnrichedEvent) event).getUserDefinedEvent()
+ : event,
+ outputEventCollector);
}
}