This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new bcbe84ec51e Pipe: fix PipeDataRegionPluginAgent initialization order &
avoid instantiating meta source and sink (#11821)
bcbe84ec51e is described below
commit bcbe84ec51e06cdf5b0547fa76c0504a3e80ad0e
Author: V_Galaxy <[email protected]>
AuthorDate: Wed Jan 3 21:54:50 2024 +0800
Pipe: fix PipeDataRegionPluginAgent initialization order & avoid
instantiating meta source and sink (#11821)
---
.../manager/pipe/agent/PipeConfigNodeAgent.java | 2 +-
.../agent/plugin/PipePluginConfigNodeAgent.java | 14 ++++++++++---
.../pipe/agent/plugin/PipePluginDataNodeAgent.java | 8 +++++---
.../dataregion/PipeDataRegionPluginAgent.java | 23 +++++++++++++---------
.../schemaregion/PipeSchemaRegionPluginAgent.java | 14 ++++++++++---
.../commons/pipe/agent/plugin/PipePluginAgent.java | 18 ++++++++++-------
6 files changed, 53 insertions(+), 26 deletions(-)
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/agent/PipeConfigNodeAgent.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/agent/PipeConfigNodeAgent.java
index 5d2aeda55db..abc3a91f0df 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/agent/PipeConfigNodeAgent.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/agent/PipeConfigNodeAgent.java
@@ -31,7 +31,7 @@ public class PipeConfigNodeAgent {
/** Private constructor to prevent users from creating a new instance. */
private PipeConfigNodeAgent() {
pipeConfigNodeTaskAgent = new PipeTaskConfigNodeAgent();
- pipePluginConfigNodeAgent = new PipePluginConfigNodeAgent();
+ pipePluginConfigNodeAgent = new PipePluginConfigNodeAgent(null);
}
/** The singleton holder of PipeAgent. */
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/agent/plugin/PipePluginConfigNodeAgent.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/agent/plugin/PipePluginConfigNodeAgent.java
index 62c9143ea9d..3b8ed8f83a1 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/agent/plugin/PipePluginConfigNodeAgent.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/agent/plugin/PipePluginConfigNodeAgent.java
@@ -21,21 +21,29 @@ package
org.apache.iotdb.confignode.manager.pipe.agent.plugin;
import org.apache.iotdb.commons.pipe.agent.plugin.PipePluginAgent;
import org.apache.iotdb.commons.pipe.agent.plugin.PipePluginConstructor;
+import org.apache.iotdb.commons.pipe.plugin.meta.PipePluginMetaKeeper;
public class PipePluginConfigNodeAgent extends PipePluginAgent {
+ public PipePluginConfigNodeAgent(PipePluginMetaKeeper pipePluginMetaKeeper) {
+ super(pipePluginMetaKeeper);
+ }
+
@Override
- protected PipePluginConstructor createPipeExtractorConstructor() {
+ protected PipePluginConstructor createPipeExtractorConstructor(
+ PipePluginMetaKeeper pipePluginMetaKeeper) {
return new PipeConfigRegionExtractorConstructor();
}
@Override
- protected PipePluginConstructor createPipeProcessorConstructor() {
+ protected PipePluginConstructor createPipeProcessorConstructor(
+ PipePluginMetaKeeper pipePluginMetaKeeper) {
return new PipeConfigRegionProcessorConstructor();
}
@Override
- protected PipePluginConstructor createPipeConnectorConstructor() {
+ protected PipePluginConstructor createPipeConnectorConstructor(
+ PipePluginMetaKeeper pipePluginMetaKeeper) {
return new PipeConfigRegionConnectorConstructor();
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/PipePluginDataNodeAgent.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/PipePluginDataNodeAgent.java
index 7d5d7854efa..25e7b2fe2d5 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/PipePluginDataNodeAgent.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/PipePluginDataNodeAgent.java
@@ -53,7 +53,7 @@ public class PipePluginDataNodeAgent {
pipePluginMetaKeeper = new DataNodePipePluginMetaKeeper();
dataRegionPluginAgent = new
PipeDataRegionPluginAgent(pipePluginMetaKeeper);
- schemaRegionPluginAgent = new PipeSchemaRegionPluginAgent();
+ schemaRegionPluginAgent = new PipeSchemaRegionPluginAgent(null);
lock = new ReentrantLock();
}
@@ -209,7 +209,9 @@ public class PipePluginDataNodeAgent {
throws Exception {
dataRegionPluginAgent.validate(
pipeName, extractorAttributes, processorAttributes,
connectorAttributes);
- schemaRegionPluginAgent.validate(
- pipeName, extractorAttributes, processorAttributes,
connectorAttributes);
+ // FIXME: Currently we comment out the following code to avoid
instantiating
+ // `IoTDBMetaConnector` and `IoTDBMetaExtractor`.
+ // schemaRegionPluginAgent.validate(pipeName, extractorAttributes,
processorAttributes,
+ // connectorAttributes);
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/dataregion/PipeDataRegionPluginAgent.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/dataregion/PipeDataRegionPluginAgent.java
index 550d9a6e53c..68a260bceea 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/dataregion/PipeDataRegionPluginAgent.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/dataregion/PipeDataRegionPluginAgent.java
@@ -22,27 +22,32 @@ package org.apache.iotdb.db.pipe.agent.plugin.dataregion;
import org.apache.iotdb.commons.pipe.agent.plugin.PipePluginAgent;
import org.apache.iotdb.commons.pipe.agent.plugin.PipePluginConstructor;
import org.apache.iotdb.commons.pipe.plugin.meta.DataNodePipePluginMetaKeeper;
+import org.apache.iotdb.commons.pipe.plugin.meta.PipePluginMetaKeeper;
public class PipeDataRegionPluginAgent extends PipePluginAgent {
- protected final DataNodePipePluginMetaKeeper pipePluginMetaKeeper;
-
public PipeDataRegionPluginAgent(DataNodePipePluginMetaKeeper
pipePluginMetaKeeper) {
- this.pipePluginMetaKeeper = pipePluginMetaKeeper;
+ super(pipePluginMetaKeeper);
}
@Override
- protected PipePluginConstructor createPipeExtractorConstructor() {
- return new PipeDataRegionExtractorConstructor(pipePluginMetaKeeper);
+ protected PipePluginConstructor createPipeExtractorConstructor(
+ PipePluginMetaKeeper pipePluginMetaKeeper) {
+ return new PipeDataRegionExtractorConstructor(
+ (DataNodePipePluginMetaKeeper) pipePluginMetaKeeper);
}
@Override
- protected PipePluginConstructor createPipeProcessorConstructor() {
- return new PipeDataRegionProcessorConstructor(pipePluginMetaKeeper);
+ protected PipePluginConstructor createPipeProcessorConstructor(
+ PipePluginMetaKeeper pipePluginMetaKeeper) {
+ return new PipeDataRegionProcessorConstructor(
+ (DataNodePipePluginMetaKeeper) pipePluginMetaKeeper);
}
@Override
- protected PipePluginConstructor createPipeConnectorConstructor() {
- return new PipeDataRegionConnectorConstructor(pipePluginMetaKeeper);
+ protected PipePluginConstructor createPipeConnectorConstructor(
+ PipePluginMetaKeeper pipePluginMetaKeeper) {
+ return new PipeDataRegionConnectorConstructor(
+ (DataNodePipePluginMetaKeeper) pipePluginMetaKeeper);
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/schemaregion/PipeSchemaRegionPluginAgent.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/schemaregion/PipeSchemaRegionPluginAgent.java
index 9401f68eb6d..76280d0ff7c 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/schemaregion/PipeSchemaRegionPluginAgent.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/schemaregion/PipeSchemaRegionPluginAgent.java
@@ -21,21 +21,29 @@ package org.apache.iotdb.db.pipe.agent.plugin.schemaregion;
import org.apache.iotdb.commons.pipe.agent.plugin.PipePluginAgent;
import org.apache.iotdb.commons.pipe.agent.plugin.PipePluginConstructor;
+import org.apache.iotdb.commons.pipe.plugin.meta.PipePluginMetaKeeper;
public class PipeSchemaRegionPluginAgent extends PipePluginAgent {
+ public PipeSchemaRegionPluginAgent(PipePluginMetaKeeper
pipePluginMetaKeeper) {
+ super(pipePluginMetaKeeper);
+ }
+
@Override
- protected PipePluginConstructor createPipeExtractorConstructor() {
+ protected PipePluginConstructor createPipeExtractorConstructor(
+ PipePluginMetaKeeper pipePluginMetaKeeper) {
return new PipeSchemaRegionExtractorConstructor();
}
@Override
- protected PipePluginConstructor createPipeProcessorConstructor() {
+ protected PipePluginConstructor createPipeProcessorConstructor(
+ PipePluginMetaKeeper pipePluginMetaKeeper) {
return new PipeSchemaRegionProcessorConstructor();
}
@Override
- protected PipePluginConstructor createPipeConnectorConstructor() {
+ protected PipePluginConstructor createPipeConnectorConstructor(
+ PipePluginMetaKeeper pipePluginMetaKeeper) {
return new PipeSchemaRegionConnectorConstructor();
}
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/PipePluginAgent.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/PipePluginAgent.java
index 52911d6f510..865a8febbc0 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/PipePluginAgent.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/PipePluginAgent.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.commons.pipe.agent.plugin;
import
org.apache.iotdb.commons.pipe.config.plugin.configuraion.PipeTaskRuntimeConfiguration;
import
org.apache.iotdb.commons.pipe.config.plugin.env.PipeTaskTemporaryRuntimeEnvironment;
+import org.apache.iotdb.commons.pipe.plugin.meta.PipePluginMetaKeeper;
import org.apache.iotdb.pipe.api.PipeConnector;
import org.apache.iotdb.pipe.api.PipeExtractor;
import org.apache.iotdb.pipe.api.PipeProcessor;
@@ -40,17 +41,20 @@ public abstract class PipePluginAgent {
private final PipePluginConstructor pipeProcessorConstructor;
private final PipePluginConstructor pipeConnectorConstructor;
- protected PipePluginAgent() {
- pipeExtractorConstructor = createPipeExtractorConstructor();
- pipeProcessorConstructor = createPipeProcessorConstructor();
- pipeConnectorConstructor = createPipeConnectorConstructor();
+ protected PipePluginAgent(PipePluginMetaKeeper pipePluginMetaKeeper) {
+ pipeExtractorConstructor =
createPipeExtractorConstructor(pipePluginMetaKeeper);
+ pipeProcessorConstructor =
createPipeProcessorConstructor(pipePluginMetaKeeper);
+ pipeConnectorConstructor =
createPipeConnectorConstructor(pipePluginMetaKeeper);
}
- protected abstract PipePluginConstructor createPipeExtractorConstructor();
+ protected abstract PipePluginConstructor createPipeExtractorConstructor(
+ PipePluginMetaKeeper pipePluginMetaKeeper);
- protected abstract PipePluginConstructor createPipeProcessorConstructor();
+ protected abstract PipePluginConstructor createPipeProcessorConstructor(
+ PipePluginMetaKeeper pipePluginMetaKeeper);
- protected abstract PipePluginConstructor createPipeConnectorConstructor();
+ protected abstract PipePluginConstructor createPipeConnectorConstructor(
+ PipePluginMetaKeeper pipePluginMetaKeeper);
public final PipeExtractor reflectExtractor(PipeParameters
extractorParameters) {
return (PipeExtractor)
pipeExtractorConstructor.reflectPlugin(extractorParameters);