This is an automated email from the ASF dual-hosted git repository. rong pushed a commit to branch pipe-task-schedule in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit c3e16a7806f1aaf0de9521dc6bce1348faab0a20 Author: Steve Yurong Su <[email protected]> AuthorDate: Mon May 22 19:24:42 2023 +0800 executeForAWhile -> executeOnce --- .../iotdb/db/pipe/task/subtask/PipeConnectorSubtask.java | 6 ++++-- .../iotdb/db/pipe/task/subtask/PipeProcessorSubtask.java | 6 ++++-- .../org/apache/iotdb/db/pipe/task/subtask/PipeSubtask.java | 10 ++++++++-- 3 files changed, 16 insertions(+), 6 deletions(-) diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeConnectorSubtask.java b/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeConnectorSubtask.java index 1ec0994ecdc..1bd8a6ed515 100644 --- a/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeConnectorSubtask.java +++ b/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeConnectorSubtask.java @@ -51,7 +51,7 @@ public class PipeConnectorSubtask extends PipeSubtask { // TODO: for a while @Override - protected synchronized void executeForAWhile() { + protected synchronized boolean executeOnce() { try { // TODO: reduce the frequency of heartbeat outputPipeConnector.heartbeat(); @@ -64,7 +64,7 @@ public class PipeConnectorSubtask extends PipeSubtask { // record this event for retrying on connection failure or other exceptions lastEvent = event; if (event == null) { - return; + return false; } try { @@ -85,6 +85,8 @@ public class PipeConnectorSubtask extends PipeSubtask { "Error occurred during executing PipeConnector#transfer, perhaps need to check whether the implementation of PipeConnector is correct according to the pipe-api description.", e); } + + return true; } @Override diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeProcessorSubtask.java b/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeProcessorSubtask.java index 0e65894c14d..6a76beb02b3 100644 --- a/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeProcessorSubtask.java +++ b/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeProcessorSubtask.java @@ -50,12 +50,12 @@ public class PipeProcessorSubtask extends PipeSubtask { } @Override - protected synchronized void executeForAWhile() throws Exception { + protected synchronized boolean executeOnce() throws Exception { final Event event = lastEvent != null ? lastEvent : inputEventSupplier.supply(); // record the last event for retry when exception occurs lastEvent = event; if (event == null) { - return; + return false; } try { @@ -74,6 +74,8 @@ public class PipeProcessorSubtask extends PipeSubtask { "Error occurred during executing PipeProcessor#process, perhaps need to check whether the implementation of PipeProcessor is correct according to the pipe-api description.", e); } + + return true; } @Override diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeSubtask.java b/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeSubtask.java index c770b983b9a..bed57c90e88 100644 --- a/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeSubtask.java +++ b/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeSubtask.java @@ -67,7 +67,7 @@ public abstract class PipeSubtask implements FutureCallback<Void>, Callable<Void @Override public Void call() throws Exception { - executeForAWhile(); + executeOnce(); // wait for the callable to be decorated by Futures.addCallback in the executorService // to make sure that the callback can be submitted again on success or failure. @@ -76,7 +76,13 @@ public abstract class PipeSubtask implements FutureCallback<Void>, Callable<Void return null; } - protected abstract void executeForAWhile() throws Exception; + /** + * try to consume an event by the pipe plugin. + * + * @return true if the event is consumed successfully, false if no more event can be consumed + * @throws Exception if any error occurs when consuming the event + */ + protected abstract boolean executeOnce() throws Exception; @Override public void onSuccess(Void result) {
