This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch pipe-table-model-3
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 8cadd0f25f49f32e170b37a17b524116dc9c12a3
Author: Steve Yurong Su <[email protected]>
AuthorDate: Thu Sep 26 16:02:34 2024 +0800

    PipeDataRegionExtractorConstructor impl
---
 .../PipeDataRegionExtractorConstructor.java        | 48 ++++++++++++++++++++++
 .../constructor/PipeExtractorConstructor.java      |  2 +-
 .../plugin/constructor/PipePluginConstructor.java  |  2 +-
 3 files changed, 50 insertions(+), 2 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/dataregion/PipeDataRegionExtractorConstructor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/dataregion/PipeDataRegionExtractorConstructor.java
index ddf6cdb6b8b..46636d76592 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/dataregion/PipeDataRegionExtractorConstructor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/dataregion/PipeDataRegionExtractorConstructor.java
@@ -23,10 +23,24 @@ import 
org.apache.iotdb.commons.pipe.agent.plugin.builtin.BuiltinPipePlugin;
 import 
org.apache.iotdb.commons.pipe.agent.plugin.builtin.extractor.donothing.DoNothingExtractor;
 import 
org.apache.iotdb.commons.pipe.agent.plugin.constructor.PipeExtractorConstructor;
 import 
org.apache.iotdb.commons.pipe.agent.plugin.meta.DataNodePipePluginMetaKeeper;
+import org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant;
+import org.apache.iotdb.commons.pipe.config.constant.SystemConstant;
 import org.apache.iotdb.db.pipe.extractor.dataregion.IoTDBDataRegionExtractor;
+import 
org.apache.iotdb.db.pipe.table.extractor.dataregion.IoTDBDataRegionTableModelExtractor;
+import org.apache.iotdb.pipe.api.PipeExtractor;
+import org.apache.iotdb.pipe.api.PipePlugin;
+import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.function.Supplier;
 
 class PipeDataRegionExtractorConstructor extends PipeExtractorConstructor {
 
+  // TODO: consider refactor the plugin constructors to a more generic way
+  protected final Map<String, Supplier<PipePlugin>> 
tableModelPluginConstructors = new HashMap<>();
+
   PipeDataRegionExtractorConstructor(DataNodePipePluginMetaKeeper 
pipePluginMetaKeeper) {
     super(pipePluginMetaKeeper);
   }
@@ -42,5 +56,39 @@ class PipeDataRegionExtractorConstructor extends 
PipeExtractorConstructor {
         BuiltinPipePlugin.DO_NOTHING_SOURCE.getPipePluginName(), 
DoNothingExtractor::new);
     pluginConstructors.put(
         BuiltinPipePlugin.IOTDB_SOURCE.getPipePluginName(), 
IoTDBDataRegionExtractor::new);
+
+    // Plugins for table model only
+    tableModelPluginConstructors.put(
+        BuiltinPipePlugin.DO_NOTHING_EXTRACTOR.getPipePluginName(), 
DoNothingExtractor::new);
+    tableModelPluginConstructors.put(
+        BuiltinPipePlugin.IOTDB_EXTRACTOR.getPipePluginName(),
+        IoTDBDataRegionTableModelExtractor::new);
+
+    tableModelPluginConstructors.put(
+        BuiltinPipePlugin.DO_NOTHING_SOURCE.getPipePluginName(), 
DoNothingExtractor::new);
+    tableModelPluginConstructors.put(
+        BuiltinPipePlugin.IOTDB_SOURCE.getPipePluginName(),
+        IoTDBDataRegionTableModelExtractor::new);
+  }
+
+  @Override
+  public final PipeExtractor reflectPlugin(PipeParameters extractorParameters) 
{
+    return extractorParameters
+            .getStringOrDefault(
+                SystemConstant.SESSION_MODEL_KEY, 
SystemConstant.SESSION_MODEL_TREE_VALUE)
+            .equals(SystemConstant.SESSION_MODEL_TREE_VALUE)
+        ? super.reflectPlugin(extractorParameters)
+        : reflectTableModelPlugin(
+            extractorParameters
+                .getStringOrDefault(
+                    Arrays.asList(
+                        PipeExtractorConstant.EXTRACTOR_KEY, 
PipeExtractorConstant.SOURCE_KEY),
+                    BuiltinPipePlugin.IOTDB_EXTRACTOR.getPipePluginName())
+                .toLowerCase());
+  }
+
+  private PipeExtractor reflectTableModelPlugin(String pluginName) {
+    return (PipeExtractor)
+        tableModelPluginConstructors.getOrDefault(pluginName, () -> 
reflect(pluginName)).get();
   }
 }
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/constructor/PipeExtractorConstructor.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/constructor/PipeExtractorConstructor.java
index 001ae49e939..03d6836aea6 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/constructor/PipeExtractorConstructor.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/constructor/PipeExtractorConstructor.java
@@ -38,7 +38,7 @@ public abstract class PipeExtractorConstructor extends 
PipePluginConstructor {
   }
 
   @Override
-  public final PipeExtractor reflectPlugin(PipeParameters extractorParameters) 
{
+  public PipeExtractor reflectPlugin(PipeParameters extractorParameters) {
     return (PipeExtractor)
         reflectPluginByKey(
             extractorParameters
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/constructor/PipePluginConstructor.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/constructor/PipePluginConstructor.java
index cbb25340f00..e311471f450 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/constructor/PipePluginConstructor.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/constructor/PipePluginConstructor.java
@@ -60,7 +60,7 @@ public abstract class PipePluginConstructor {
     return pluginConstructors.getOrDefault(pluginKey, () -> 
reflect(pluginKey)).get();
   }
 
-  private PipePlugin reflect(String pluginName) {
+  protected PipePlugin reflect(String pluginName) {
     if (pluginMetaKeeper == null) {
       throw new PipeException(
           "Failed to reflect PipePlugin instance, because PipePluginMetaKeeper 
is null.");

Reply via email to