This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch pipe-task-schedule
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit c3e16a7806f1aaf0de9521dc6bce1348faab0a20
Author: Steve Yurong Su <[email protected]>
AuthorDate: Mon May 22 19:24:42 2023 +0800

    executeForAWhile -> executeOnce
---
 .../iotdb/db/pipe/task/subtask/PipeConnectorSubtask.java       |  6 ++++--
 .../iotdb/db/pipe/task/subtask/PipeProcessorSubtask.java       |  6 ++++--
 .../org/apache/iotdb/db/pipe/task/subtask/PipeSubtask.java     | 10 ++++++++--
 3 files changed, 16 insertions(+), 6 deletions(-)

diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeConnectorSubtask.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeConnectorSubtask.java
index 1ec0994ecdc..1bd8a6ed515 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeConnectorSubtask.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeConnectorSubtask.java
@@ -51,7 +51,7 @@ public class PipeConnectorSubtask extends PipeSubtask {
 
   // TODO: for a while
   @Override
-  protected synchronized void executeForAWhile() {
+  protected synchronized boolean executeOnce() {
     try {
       // TODO: reduce the frequency of heartbeat
       outputPipeConnector.heartbeat();
@@ -64,7 +64,7 @@ public class PipeConnectorSubtask extends PipeSubtask {
     // record this event for retrying on connection failure or other exceptions
     lastEvent = event;
     if (event == null) {
-      return;
+      return false;
     }
 
     try {
@@ -85,6 +85,8 @@ public class PipeConnectorSubtask extends PipeSubtask {
           "Error occurred during executing PipeConnector#transfer, perhaps 
need to check whether the implementation of PipeConnector is correct according 
to the pipe-api description.",
           e);
     }
+
+    return true;
   }
 
   @Override
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeProcessorSubtask.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeProcessorSubtask.java
index 0e65894c14d..6a76beb02b3 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeProcessorSubtask.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeProcessorSubtask.java
@@ -50,12 +50,12 @@ public class PipeProcessorSubtask extends PipeSubtask {
   }
 
   @Override
-  protected synchronized void executeForAWhile() throws Exception {
+  protected synchronized boolean executeOnce() throws Exception {
     final Event event = lastEvent != null ? lastEvent : 
inputEventSupplier.supply();
     // record the last event for retry when exception occurs
     lastEvent = event;
     if (event == null) {
-      return;
+      return false;
     }
 
     try {
@@ -74,6 +74,8 @@ public class PipeProcessorSubtask extends PipeSubtask {
           "Error occurred during executing PipeProcessor#process, perhaps need 
to check whether the implementation of PipeProcessor is correct according to 
the pipe-api description.",
           e);
     }
+
+    return true;
   }
 
   @Override
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeSubtask.java 
b/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeSubtask.java
index c770b983b9a..bed57c90e88 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeSubtask.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeSubtask.java
@@ -67,7 +67,7 @@ public abstract class PipeSubtask implements 
FutureCallback<Void>, Callable<Void
 
   @Override
   public Void call() throws Exception {
-    executeForAWhile();
+    executeOnce();
 
     // wait for the callable to be decorated by Futures.addCallback in the 
executorService
     // to make sure that the callback can be submitted again on success or 
failure.
@@ -76,7 +76,13 @@ public abstract class PipeSubtask implements 
FutureCallback<Void>, Callable<Void
     return null;
   }
 
-  protected abstract void executeForAWhile() throws Exception;
+  /**
+   * try to consume an event by the pipe plugin.
+   *
+   * @return true if the event is consumed successfully, false if no more 
event can be consumed
+   * @throws Exception if any error occurs when consuming the event
+   */
+  protected abstract boolean executeOnce() throws Exception;
 
   @Override
   public void onSuccess(Void result) {

Reply via email to