This is an automated email from the ASF dual-hosted git repository. rong pushed a commit to branch pipe-table-model-3 in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 8cadd0f25f49f32e170b37a17b524116dc9c12a3 Author: Steve Yurong Su <[email protected]> AuthorDate: Thu Sep 26 16:02:34 2024 +0800 PipeDataRegionExtractorConstructor impl --- .../PipeDataRegionExtractorConstructor.java | 48 ++++++++++++++++++++++ .../constructor/PipeExtractorConstructor.java | 2 +- .../plugin/constructor/PipePluginConstructor.java | 2 +- 3 files changed, 50 insertions(+), 2 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/dataregion/PipeDataRegionExtractorConstructor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/dataregion/PipeDataRegionExtractorConstructor.java index ddf6cdb6b8b..46636d76592 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/dataregion/PipeDataRegionExtractorConstructor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/dataregion/PipeDataRegionExtractorConstructor.java @@ -23,10 +23,24 @@ import org.apache.iotdb.commons.pipe.agent.plugin.builtin.BuiltinPipePlugin; import org.apache.iotdb.commons.pipe.agent.plugin.builtin.extractor.donothing.DoNothingExtractor; import org.apache.iotdb.commons.pipe.agent.plugin.constructor.PipeExtractorConstructor; import org.apache.iotdb.commons.pipe.agent.plugin.meta.DataNodePipePluginMetaKeeper; +import org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant; +import org.apache.iotdb.commons.pipe.config.constant.SystemConstant; import org.apache.iotdb.db.pipe.extractor.dataregion.IoTDBDataRegionExtractor; +import org.apache.iotdb.db.pipe.table.extractor.dataregion.IoTDBDataRegionTableModelExtractor; +import org.apache.iotdb.pipe.api.PipeExtractor; +import org.apache.iotdb.pipe.api.PipePlugin; +import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters; + +import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; +import java.util.function.Supplier; class PipeDataRegionExtractorConstructor extends PipeExtractorConstructor { + // TODO: consider refactor the plugin constructors to a more generic way + protected final Map<String, Supplier<PipePlugin>> tableModelPluginConstructors = new HashMap<>(); + PipeDataRegionExtractorConstructor(DataNodePipePluginMetaKeeper pipePluginMetaKeeper) { super(pipePluginMetaKeeper); } @@ -42,5 +56,39 @@ class PipeDataRegionExtractorConstructor extends PipeExtractorConstructor { BuiltinPipePlugin.DO_NOTHING_SOURCE.getPipePluginName(), DoNothingExtractor::new); pluginConstructors.put( BuiltinPipePlugin.IOTDB_SOURCE.getPipePluginName(), IoTDBDataRegionExtractor::new); + + // Plugins for table model only + tableModelPluginConstructors.put( + BuiltinPipePlugin.DO_NOTHING_EXTRACTOR.getPipePluginName(), DoNothingExtractor::new); + tableModelPluginConstructors.put( + BuiltinPipePlugin.IOTDB_EXTRACTOR.getPipePluginName(), + IoTDBDataRegionTableModelExtractor::new); + + tableModelPluginConstructors.put( + BuiltinPipePlugin.DO_NOTHING_SOURCE.getPipePluginName(), DoNothingExtractor::new); + tableModelPluginConstructors.put( + BuiltinPipePlugin.IOTDB_SOURCE.getPipePluginName(), + IoTDBDataRegionTableModelExtractor::new); + } + + @Override + public final PipeExtractor reflectPlugin(PipeParameters extractorParameters) { + return extractorParameters + .getStringOrDefault( + SystemConstant.SESSION_MODEL_KEY, SystemConstant.SESSION_MODEL_TREE_VALUE) + .equals(SystemConstant.SESSION_MODEL_TREE_VALUE) + ? super.reflectPlugin(extractorParameters) + : reflectTableModelPlugin( + extractorParameters + .getStringOrDefault( + Arrays.asList( + PipeExtractorConstant.EXTRACTOR_KEY, PipeExtractorConstant.SOURCE_KEY), + BuiltinPipePlugin.IOTDB_EXTRACTOR.getPipePluginName()) + .toLowerCase()); + } + + private PipeExtractor reflectTableModelPlugin(String pluginName) { + return (PipeExtractor) + tableModelPluginConstructors.getOrDefault(pluginName, () -> reflect(pluginName)).get(); } } diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/constructor/PipeExtractorConstructor.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/constructor/PipeExtractorConstructor.java index 001ae49e939..03d6836aea6 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/constructor/PipeExtractorConstructor.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/constructor/PipeExtractorConstructor.java @@ -38,7 +38,7 @@ public abstract class PipeExtractorConstructor extends PipePluginConstructor { } @Override - public final PipeExtractor reflectPlugin(PipeParameters extractorParameters) { + public PipeExtractor reflectPlugin(PipeParameters extractorParameters) { return (PipeExtractor) reflectPluginByKey( extractorParameters diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/constructor/PipePluginConstructor.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/constructor/PipePluginConstructor.java index cbb25340f00..e311471f450 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/constructor/PipePluginConstructor.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/constructor/PipePluginConstructor.java @@ -60,7 +60,7 @@ public abstract class PipePluginConstructor { return pluginConstructors.getOrDefault(pluginKey, () -> reflect(pluginKey)).get(); } - private PipePlugin reflect(String pluginName) { + protected PipePlugin reflect(String pluginName) { if (pluginMetaKeeper == null) { throw new PipeException( "Failed to reflect PipePlugin instance, because PipePluginMetaKeeper is null.");
