This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch IOTDB-5977
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 058274e8bbd7ced459db21a1cd989f2f79d76c9a
Author: Steve Yurong Su <[email protected]>
AuthorDate: Thu Jun 8 20:57:33 2023 +0800

    [IOTDB-5979] Pipe: validation and customization failures during the first 
run of the PipeProcessor will affect the creation of subsequent pipes
---
 .../apache/iotdb/db/pipe/task/PipeTaskBuilder.java |  1 -
 .../db/pipe/task/stage/PipeTaskCollectorStage.java | 49 ++++++++--------------
 .../db/pipe/task/stage/PipeTaskProcessorStage.java | 42 +++++++------------
 3 files changed, 32 insertions(+), 60 deletions(-)

diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/task/PipeTaskBuilder.java 
b/server/src/main/java/org/apache/iotdb/db/pipe/task/PipeTaskBuilder.java
index 715816cfbcc..7ad1bffa2a4 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/task/PipeTaskBuilder.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/task/PipeTaskBuilder.java
@@ -58,7 +58,6 @@ public class PipeTaskBuilder {
             pipeStaticMeta.getPipeName(),
             dataRegionId,
             collectorStage.getEventSupplier(),
-            collectorStage.getCollectorPendingQueue(),
             pipeStaticMeta.getProcessorParameters(),
             connectorStage.getPipeConnectorPendingQueue());
 
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskCollectorStage.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskCollectorStage.java
index 9e03458bfa0..ca1ec8fda9b 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskCollectorStage.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskCollectorStage.java
@@ -31,28 +31,12 @@ import org.apache.iotdb.pipe.api.PipeCollector;
 import org.apache.iotdb.pipe.api.customizer.PipeParameterValidator;
 import org.apache.iotdb.pipe.api.customizer.PipeParameters;
 import 
org.apache.iotdb.pipe.api.customizer.collector.PipeCollectorRuntimeConfiguration;
-import org.apache.iotdb.pipe.api.event.Event;
 import org.apache.iotdb.pipe.api.exception.PipeException;
 
 import java.util.HashMap;
 
 public class PipeTaskCollectorStage extends PipeTaskStage {
 
-  private final PipeParameters collectorParameters;
-
-  /**
-   * TODO: have a better way to control busy/idle status of 
PipeTaskCollectorStage.
-   *
-   * <p>Currently, this field is for IoTDBDataRegionCollector only. 
IoTDBDataRegionCollector uses
-   * collectorPendingQueue as an internal data structure to store realtime 
events.
-   *
-   * <p>PendingQueue can detect whether the queue is empty or not, and it can 
notify the
-   * PipeTaskProcessorStage to stop processing data when the queue is empty to 
avoid unnecessary
-   * processing, and it also can notify the PipeTaskProcessorStage to start 
processing data when the
-   * queue is not empty.
-   */
-  private UnboundedBlockingPendingQueue<Event> collectorPendingQueue;
-
   private final PipeCollector pipeCollector;
 
   public PipeTaskCollectorStage(
@@ -60,6 +44,8 @@ public class PipeTaskCollectorStage extends PipeTaskStage {
       PipeTaskMeta pipeTaskMeta,
       long creationTime,
       PipeParameters collectorParameters) {
+    PipeParameters localizedCollectorParameters;
+
     // TODO: avoid if-else, use reflection to create collector all the time
     if (collectorParameters
         .getStringOrDefault(
@@ -69,40 +55,43 @@ public class PipeTaskCollectorStage extends PipeTaskStage {
       // we want to pass data region id to collector, so we need to create a 
new collector
       // parameters and put data region id into it. we can't put data region 
id into collector
       // parameters directly, because the given collector parameters may be 
used by other pipe task.
-      this.collectorParameters =
+      localizedCollectorParameters =
           new PipeParameters(new 
HashMap<>(collectorParameters.getAttribute()));
       // set data region id to collector parameters, so that collector can get 
data region id inside
       // collector
-      this.collectorParameters
+      localizedCollectorParameters
           .getAttribute()
           .put(PipeCollectorConstant.DATA_REGION_KEY, 
String.valueOf(dataRegionId.getId()));
 
-      collectorPendingQueue = new UnboundedBlockingPendingQueue<>();
       this.pipeCollector =
-          new IoTDBDataRegionCollector(pipeTaskMeta, creationTime, 
collectorPendingQueue);
+          new IoTDBDataRegionCollector(
+              pipeTaskMeta, creationTime, new 
UnboundedBlockingPendingQueue<>());
     } else {
-      this.collectorParameters = collectorParameters;
+      localizedCollectorParameters = collectorParameters;
 
-      this.pipeCollector = 
PipeAgent.plugin().reflectCollector(collectorParameters);
+      this.pipeCollector = 
PipeAgent.plugin().reflectCollector(localizedCollectorParameters);
     }
-  }
 
-  @Override
-  public void createSubtask() throws PipeException {
+    // validate and customize should be called before createSubtask. this 
allows collector exposing
+    // exceptions in advance.
     try {
       // 1. validate collector parameters
-      pipeCollector.validate(new PipeParameterValidator(collectorParameters));
+      pipeCollector.validate(new 
PipeParameterValidator(localizedCollectorParameters));
 
       // 2. customize collector
       final PipeCollectorRuntimeConfiguration runtimeConfiguration =
           new PipeCollectorRuntimeConfiguration();
-      pipeCollector.customize(collectorParameters, runtimeConfiguration);
-      // TODO: use runtimeConfiguration to configure collector
+      pipeCollector.customize(localizedCollectorParameters, 
runtimeConfiguration);
     } catch (Exception e) {
       throw new PipeException(e.getMessage(), e);
     }
   }
 
+  @Override
+  public void createSubtask() throws PipeException {
+    // do nothing
+  }
+
   @Override
   public void startSubtask() throws PipeException {
     try {
@@ -129,8 +118,4 @@ public class PipeTaskCollectorStage extends PipeTaskStage {
   public EventSupplier getEventSupplier() {
     return pipeCollector::supply;
   }
-
-  public UnboundedBlockingPendingQueue<Event> getCollectorPendingQueue() {
-    return collectorPendingQueue;
-  }
 }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskProcessorStage.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskProcessorStage.java
index 571b04773a4..02c6576ce94 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskProcessorStage.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskProcessorStage.java
@@ -26,7 +26,6 @@ import org.apache.iotdb.db.pipe.config.PipeProcessorConstant;
 import 
org.apache.iotdb.db.pipe.execution.executor.PipeProcessorSubtaskExecutor;
 import org.apache.iotdb.db.pipe.execution.executor.PipeSubtaskExecutorManager;
 import org.apache.iotdb.db.pipe.processor.PipeDoNothingProcessor;
-import org.apache.iotdb.db.pipe.task.connection.BlockingPendingQueue;
 import org.apache.iotdb.db.pipe.task.connection.BoundedBlockingPendingQueue;
 import org.apache.iotdb.db.pipe.task.connection.EventSupplier;
 import org.apache.iotdb.db.pipe.task.connection.PipeEventCollector;
@@ -38,8 +37,6 @@ import 
org.apache.iotdb.pipe.api.customizer.processor.PipeProcessorRuntimeConfig
 import org.apache.iotdb.pipe.api.event.Event;
 import org.apache.iotdb.pipe.api.exception.PipeException;
 
-import javax.annotation.Nullable;
-
 public class PipeTaskProcessorStage extends PipeTaskStage {
 
   protected final PipeProcessorSubtaskExecutor executor =
@@ -49,15 +46,10 @@ public class PipeTaskProcessorStage extends PipeTaskStage {
   protected final PipeProcessor pipeProcessor;
   protected final PipeProcessorSubtask pipeProcessorSubtask;
 
-  protected final BlockingPendingQueue<Event> pipeCollectorInputPendingQueue;
-  protected final BlockingPendingQueue<Event> pipeConnectorOutputPendingQueue;
-
   /**
    * @param pipeName pipe name
    * @param dataRegionId data region id
    * @param pipeCollectorInputEventSupplier used to input events from pipe 
collector
-   * @param pipeCollectorInputPendingQueue used to listen whether pipe 
collector event queue is from
-   *     empty to not empty or from not empty to empty, null means no need to 
listen
    * @param pipeProcessorParameters used to create pipe processor
    * @param pipeConnectorOutputPendingQueue used to output events to pipe 
connector
    */
@@ -65,12 +57,10 @@ public class PipeTaskProcessorStage extends PipeTaskStage {
       String pipeName,
       TConsensusGroupId dataRegionId,
       EventSupplier pipeCollectorInputEventSupplier,
-      @Nullable BlockingPendingQueue<Event> pipeCollectorInputPendingQueue,
       PipeParameters pipeProcessorParameters,
       BoundedBlockingPendingQueue<Event> pipeConnectorOutputPendingQueue) {
     this.pipeProcessorParameters = pipeProcessorParameters;
 
-    final String taskId = pipeName + "_" + dataRegionId;
     pipeProcessor =
         pipeProcessorParameters
                 .getStringOrDefault(
@@ -79,22 +69,8 @@ public class PipeTaskProcessorStage extends PipeTaskStage {
                 
.equals(BuiltinPipePlugin.DO_NOTHING_PROCESSOR.getPipePluginName())
             ? new PipeDoNothingProcessor()
             : PipeAgent.plugin().reflectProcessor(pipeProcessorParameters);
-    final PipeEventCollector pipeConnectorOutputEventCollector =
-        new PipeEventCollector(pipeConnectorOutputPendingQueue);
-
-    this.pipeProcessorSubtask =
-        new PipeProcessorSubtask(
-            taskId,
-            pipeCollectorInputEventSupplier,
-            pipeProcessor,
-            pipeConnectorOutputEventCollector);
-
-    this.pipeCollectorInputPendingQueue = pipeCollectorInputPendingQueue;
-    this.pipeConnectorOutputPendingQueue = pipeConnectorOutputPendingQueue;
-  }
-
-  @Override
-  public void createSubtask() throws PipeException {
+    // validate and customize should be called before createSubtask. this 
allows collector exposing
+    // exceptions in advance.
     try {
       // 1. validate processor parameters
       pipeProcessor.validate(new 
PipeParameterValidator(pipeProcessorParameters));
@@ -103,11 +79,23 @@ public class PipeTaskProcessorStage extends PipeTaskStage {
       final PipeProcessorRuntimeConfiguration runtimeConfiguration =
           new PipeProcessorRuntimeConfiguration();
       pipeProcessor.customize(pipeProcessorParameters, runtimeConfiguration);
-      // TODO: use runtimeConfiguration to configure processor
     } catch (Exception e) {
       throw new PipeException(e.getMessage(), e);
     }
 
+    final String taskId = pipeName + "_" + dataRegionId;
+    final PipeEventCollector pipeConnectorOutputEventCollector =
+        new PipeEventCollector(pipeConnectorOutputPendingQueue);
+    this.pipeProcessorSubtask =
+        new PipeProcessorSubtask(
+            taskId,
+            pipeCollectorInputEventSupplier,
+            pipeProcessor,
+            pipeConnectorOutputEventCollector);
+  }
+
+  @Override
+  public void createSubtask() throws PipeException {
     executor.register(pipeProcessorSubtask);
   }
 

Reply via email to