This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new bcbe84ec51e Pipe: fix PipeDataRegionPluginAgent initialization order & 
avoid instantiating meta source and sink (#11821)
bcbe84ec51e is described below

commit bcbe84ec51e06cdf5b0547fa76c0504a3e80ad0e
Author: V_Galaxy <[email protected]>
AuthorDate: Wed Jan 3 21:54:50 2024 +0800

    Pipe: fix PipeDataRegionPluginAgent initialization order & avoid 
instantiating meta source and sink (#11821)
---
 .../manager/pipe/agent/PipeConfigNodeAgent.java    |  2 +-
 .../agent/plugin/PipePluginConfigNodeAgent.java    | 14 ++++++++++---
 .../pipe/agent/plugin/PipePluginDataNodeAgent.java |  8 +++++---
 .../dataregion/PipeDataRegionPluginAgent.java      | 23 +++++++++++++---------
 .../schemaregion/PipeSchemaRegionPluginAgent.java  | 14 ++++++++++---
 .../commons/pipe/agent/plugin/PipePluginAgent.java | 18 ++++++++++-------
 6 files changed, 53 insertions(+), 26 deletions(-)

diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/agent/PipeConfigNodeAgent.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/agent/PipeConfigNodeAgent.java
index 5d2aeda55db..abc3a91f0df 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/agent/PipeConfigNodeAgent.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/agent/PipeConfigNodeAgent.java
@@ -31,7 +31,7 @@ public class PipeConfigNodeAgent {
   /** Private constructor to prevent users from creating a new instance. */
   private PipeConfigNodeAgent() {
     pipeConfigNodeTaskAgent = new PipeTaskConfigNodeAgent();
-    pipePluginConfigNodeAgent = new PipePluginConfigNodeAgent();
+    pipePluginConfigNodeAgent = new PipePluginConfigNodeAgent(null);
   }
 
   /** The singleton holder of PipeAgent. */
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/agent/plugin/PipePluginConfigNodeAgent.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/agent/plugin/PipePluginConfigNodeAgent.java
index 62c9143ea9d..3b8ed8f83a1 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/agent/plugin/PipePluginConfigNodeAgent.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/agent/plugin/PipePluginConfigNodeAgent.java
@@ -21,21 +21,29 @@ package 
org.apache.iotdb.confignode.manager.pipe.agent.plugin;
 
 import org.apache.iotdb.commons.pipe.agent.plugin.PipePluginAgent;
 import org.apache.iotdb.commons.pipe.agent.plugin.PipePluginConstructor;
+import org.apache.iotdb.commons.pipe.plugin.meta.PipePluginMetaKeeper;
 
 public class PipePluginConfigNodeAgent extends PipePluginAgent {
 
+  public PipePluginConfigNodeAgent(PipePluginMetaKeeper pipePluginMetaKeeper) {
+    super(pipePluginMetaKeeper);
+  }
+
   @Override
-  protected PipePluginConstructor createPipeExtractorConstructor() {
+  protected PipePluginConstructor createPipeExtractorConstructor(
+      PipePluginMetaKeeper pipePluginMetaKeeper) {
     return new PipeConfigRegionExtractorConstructor();
   }
 
   @Override
-  protected PipePluginConstructor createPipeProcessorConstructor() {
+  protected PipePluginConstructor createPipeProcessorConstructor(
+      PipePluginMetaKeeper pipePluginMetaKeeper) {
     return new PipeConfigRegionProcessorConstructor();
   }
 
   @Override
-  protected PipePluginConstructor createPipeConnectorConstructor() {
+  protected PipePluginConstructor createPipeConnectorConstructor(
+      PipePluginMetaKeeper pipePluginMetaKeeper) {
     return new PipeConfigRegionConnectorConstructor();
   }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/PipePluginDataNodeAgent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/PipePluginDataNodeAgent.java
index 7d5d7854efa..25e7b2fe2d5 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/PipePluginDataNodeAgent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/PipePluginDataNodeAgent.java
@@ -53,7 +53,7 @@ public class PipePluginDataNodeAgent {
     pipePluginMetaKeeper = new DataNodePipePluginMetaKeeper();
 
     dataRegionPluginAgent = new 
PipeDataRegionPluginAgent(pipePluginMetaKeeper);
-    schemaRegionPluginAgent = new PipeSchemaRegionPluginAgent();
+    schemaRegionPluginAgent = new PipeSchemaRegionPluginAgent(null);
 
     lock = new ReentrantLock();
   }
@@ -209,7 +209,9 @@ public class PipePluginDataNodeAgent {
       throws Exception {
     dataRegionPluginAgent.validate(
         pipeName, extractorAttributes, processorAttributes, 
connectorAttributes);
-    schemaRegionPluginAgent.validate(
-        pipeName, extractorAttributes, processorAttributes, 
connectorAttributes);
+    // FIXME: Currently we comment out the following code to avoid 
instantiating
+    // `IoTDBMetaConnector` and `IoTDBMetaExtractor`.
+    // schemaRegionPluginAgent.validate(pipeName, extractorAttributes, 
processorAttributes,
+    // connectorAttributes);
   }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/dataregion/PipeDataRegionPluginAgent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/dataregion/PipeDataRegionPluginAgent.java
index 550d9a6e53c..68a260bceea 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/dataregion/PipeDataRegionPluginAgent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/dataregion/PipeDataRegionPluginAgent.java
@@ -22,27 +22,32 @@ package org.apache.iotdb.db.pipe.agent.plugin.dataregion;
 import org.apache.iotdb.commons.pipe.agent.plugin.PipePluginAgent;
 import org.apache.iotdb.commons.pipe.agent.plugin.PipePluginConstructor;
 import org.apache.iotdb.commons.pipe.plugin.meta.DataNodePipePluginMetaKeeper;
+import org.apache.iotdb.commons.pipe.plugin.meta.PipePluginMetaKeeper;
 
 public class PipeDataRegionPluginAgent extends PipePluginAgent {
 
-  protected final DataNodePipePluginMetaKeeper pipePluginMetaKeeper;
-
   public PipeDataRegionPluginAgent(DataNodePipePluginMetaKeeper 
pipePluginMetaKeeper) {
-    this.pipePluginMetaKeeper = pipePluginMetaKeeper;
+    super(pipePluginMetaKeeper);
   }
 
   @Override
-  protected PipePluginConstructor createPipeExtractorConstructor() {
-    return new PipeDataRegionExtractorConstructor(pipePluginMetaKeeper);
+  protected PipePluginConstructor createPipeExtractorConstructor(
+      PipePluginMetaKeeper pipePluginMetaKeeper) {
+    return new PipeDataRegionExtractorConstructor(
+        (DataNodePipePluginMetaKeeper) pipePluginMetaKeeper);
   }
 
   @Override
-  protected PipePluginConstructor createPipeProcessorConstructor() {
-    return new PipeDataRegionProcessorConstructor(pipePluginMetaKeeper);
+  protected PipePluginConstructor createPipeProcessorConstructor(
+      PipePluginMetaKeeper pipePluginMetaKeeper) {
+    return new PipeDataRegionProcessorConstructor(
+        (DataNodePipePluginMetaKeeper) pipePluginMetaKeeper);
   }
 
   @Override
-  protected PipePluginConstructor createPipeConnectorConstructor() {
-    return new PipeDataRegionConnectorConstructor(pipePluginMetaKeeper);
+  protected PipePluginConstructor createPipeConnectorConstructor(
+      PipePluginMetaKeeper pipePluginMetaKeeper) {
+    return new PipeDataRegionConnectorConstructor(
+        (DataNodePipePluginMetaKeeper) pipePluginMetaKeeper);
   }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/schemaregion/PipeSchemaRegionPluginAgent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/schemaregion/PipeSchemaRegionPluginAgent.java
index 9401f68eb6d..76280d0ff7c 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/schemaregion/PipeSchemaRegionPluginAgent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/schemaregion/PipeSchemaRegionPluginAgent.java
@@ -21,21 +21,29 @@ package org.apache.iotdb.db.pipe.agent.plugin.schemaregion;
 
 import org.apache.iotdb.commons.pipe.agent.plugin.PipePluginAgent;
 import org.apache.iotdb.commons.pipe.agent.plugin.PipePluginConstructor;
+import org.apache.iotdb.commons.pipe.plugin.meta.PipePluginMetaKeeper;
 
 public class PipeSchemaRegionPluginAgent extends PipePluginAgent {
 
+  public PipeSchemaRegionPluginAgent(PipePluginMetaKeeper 
pipePluginMetaKeeper) {
+    super(pipePluginMetaKeeper);
+  }
+
   @Override
-  protected PipePluginConstructor createPipeExtractorConstructor() {
+  protected PipePluginConstructor createPipeExtractorConstructor(
+      PipePluginMetaKeeper pipePluginMetaKeeper) {
     return new PipeSchemaRegionExtractorConstructor();
   }
 
   @Override
-  protected PipePluginConstructor createPipeProcessorConstructor() {
+  protected PipePluginConstructor createPipeProcessorConstructor(
+      PipePluginMetaKeeper pipePluginMetaKeeper) {
     return new PipeSchemaRegionProcessorConstructor();
   }
 
   @Override
-  protected PipePluginConstructor createPipeConnectorConstructor() {
+  protected PipePluginConstructor createPipeConnectorConstructor(
+      PipePluginMetaKeeper pipePluginMetaKeeper) {
     return new PipeSchemaRegionConnectorConstructor();
   }
 }
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/PipePluginAgent.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/PipePluginAgent.java
index 52911d6f510..865a8febbc0 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/PipePluginAgent.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/PipePluginAgent.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.commons.pipe.agent.plugin;
 
 import 
org.apache.iotdb.commons.pipe.config.plugin.configuraion.PipeTaskRuntimeConfiguration;
 import 
org.apache.iotdb.commons.pipe.config.plugin.env.PipeTaskTemporaryRuntimeEnvironment;
+import org.apache.iotdb.commons.pipe.plugin.meta.PipePluginMetaKeeper;
 import org.apache.iotdb.pipe.api.PipeConnector;
 import org.apache.iotdb.pipe.api.PipeExtractor;
 import org.apache.iotdb.pipe.api.PipeProcessor;
@@ -40,17 +41,20 @@ public abstract class PipePluginAgent {
   private final PipePluginConstructor pipeProcessorConstructor;
   private final PipePluginConstructor pipeConnectorConstructor;
 
-  protected PipePluginAgent() {
-    pipeExtractorConstructor = createPipeExtractorConstructor();
-    pipeProcessorConstructor = createPipeProcessorConstructor();
-    pipeConnectorConstructor = createPipeConnectorConstructor();
+  protected PipePluginAgent(PipePluginMetaKeeper pipePluginMetaKeeper) {
+    pipeExtractorConstructor = 
createPipeExtractorConstructor(pipePluginMetaKeeper);
+    pipeProcessorConstructor = 
createPipeProcessorConstructor(pipePluginMetaKeeper);
+    pipeConnectorConstructor = 
createPipeConnectorConstructor(pipePluginMetaKeeper);
   }
 
-  protected abstract PipePluginConstructor createPipeExtractorConstructor();
+  protected abstract PipePluginConstructor createPipeExtractorConstructor(
+      PipePluginMetaKeeper pipePluginMetaKeeper);
 
-  protected abstract PipePluginConstructor createPipeProcessorConstructor();
+  protected abstract PipePluginConstructor createPipeProcessorConstructor(
+      PipePluginMetaKeeper pipePluginMetaKeeper);
 
-  protected abstract PipePluginConstructor createPipeConnectorConstructor();
+  protected abstract PipePluginConstructor createPipeConnectorConstructor(
+      PipePluginMetaKeeper pipePluginMetaKeeper);
 
   public final PipeExtractor reflectExtractor(PipeParameters 
extractorParameters) {
     return (PipeExtractor) 
pipeExtractorConstructor.reflectPlugin(extractorParameters);

Reply via email to