This is an automated email from the ASF dual-hosted git repository.

wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new d324fc59a4 [improve] openmldb options (#9166)
d324fc59a4 is described below

commit d324fc59a4adfbac9fe1e61de9ef0672ab3bb2fe
Author: Jarvis <[email protected]>
AuthorDate: Fri Apr 25 10:35:46 2025 +0800

    [improve] openmldb options (#9166)
---
 .../seatunnel/api/ConnectorOptionCheckTest.java    |   1 -
 .../openmldb/config/OpenMldbParameters.java        |  35 ++++----
 ...nMldbConfig.java => OpenMldbSourceOptions.java} |   2 +-
 .../seatunnel/openmldb/source/OpenMldbSource.java  | 100 ++++++++-------------
 .../openmldb/source/OpenMldbSourceFactory.java     |  38 +++++---
 5 files changed, 80 insertions(+), 96 deletions(-)

diff --git 
a/seatunnel-ci-tools/src/test/java/org/apache/seatunnel/api/ConnectorOptionCheckTest.java
 
b/seatunnel-ci-tools/src/test/java/org/apache/seatunnel/api/ConnectorOptionCheckTest.java
index 9da7933f7a..5712d27918 100644
--- 
a/seatunnel-ci-tools/src/test/java/org/apache/seatunnel/api/ConnectorOptionCheckTest.java
+++ 
b/seatunnel-ci-tools/src/test/java/org/apache/seatunnel/api/ConnectorOptionCheckTest.java
@@ -209,7 +209,6 @@ public class ConnectorOptionCheckTest {
         whiteList.add("Neo4jSourceOptions");
         whiteList.add("QdrantSourceOptions");
         whiteList.add("SocketSourceOptions");
-        whiteList.add("OpenMldbSourceOptions");
         whiteList.add("PostgresIncrementalSourceOptions");
         whiteList.add("SqlServerIncrementalSourceOptions");
         whiteList.add("OracleIncrementalSourceOptions");
diff --git 
a/seatunnel-connectors-v2/connector-openmldb/src/main/java/org/apache/seatunnel/connectors/seatunnel/openmldb/config/OpenMldbParameters.java
 
b/seatunnel-connectors-v2/connector-openmldb/src/main/java/org/apache/seatunnel/connectors/seatunnel/openmldb/config/OpenMldbParameters.java
index 09d1a6b075..c897755690 100644
--- 
a/seatunnel-connectors-v2/connector-openmldb/src/main/java/org/apache/seatunnel/connectors/seatunnel/openmldb/config/OpenMldbParameters.java
+++ 
b/seatunnel-connectors-v2/connector-openmldb/src/main/java/org/apache/seatunnel/connectors/seatunnel/openmldb/config/OpenMldbParameters.java
@@ -29,8 +29,8 @@ public class OpenMldbParameters implements Serializable {
     private String zkPath;
     private String host;
     private int port;
-    private int sessionTimeout = OpenMldbConfig.SESSION_TIMEOUT.defaultValue();
-    private int requestTimeout = OpenMldbConfig.REQUEST_TIMEOUT.defaultValue();
+    private int sessionTimeout = 
OpenMldbSourceOptions.SESSION_TIMEOUT.defaultValue();
+    private int requestTimeout = 
OpenMldbSourceOptions.REQUEST_TIMEOUT.defaultValue();
     private Boolean clusterMode;
     private String database;
     private String sql;
@@ -41,34 +41,35 @@ public class OpenMldbParameters implements Serializable {
 
     public static OpenMldbParameters buildWithConfig(Config pluginConfig) {
         OpenMldbParameters openMldbParameters = new OpenMldbParameters();
-        openMldbParameters.clusterMode = 
pluginConfig.getBoolean(OpenMldbConfig.CLUSTER_MODE.key());
-        openMldbParameters.database = 
pluginConfig.getString(OpenMldbConfig.DATABASE.key());
-        openMldbParameters.sql = 
pluginConfig.getString(OpenMldbConfig.SQL.key());
+        openMldbParameters.clusterMode =
+                
pluginConfig.getBoolean(OpenMldbSourceOptions.CLUSTER_MODE.key());
+        openMldbParameters.database = 
pluginConfig.getString(OpenMldbSourceOptions.DATABASE.key());
+        openMldbParameters.sql = 
pluginConfig.getString(OpenMldbSourceOptions.SQL.key());
         // set zkHost
-        if (pluginConfig.hasPath(OpenMldbConfig.ZK_HOST.key())) {
-            openMldbParameters.zkHost = 
pluginConfig.getString(OpenMldbConfig.ZK_HOST.key());
+        if (pluginConfig.hasPath(OpenMldbSourceOptions.ZK_HOST.key())) {
+            openMldbParameters.zkHost = 
pluginConfig.getString(OpenMldbSourceOptions.ZK_HOST.key());
         }
         // set zkPath
-        if (pluginConfig.hasPath(OpenMldbConfig.ZK_PATH.key())) {
-            openMldbParameters.zkPath = 
pluginConfig.getString(OpenMldbConfig.ZK_PATH.key());
+        if (pluginConfig.hasPath(OpenMldbSourceOptions.ZK_PATH.key())) {
+            openMldbParameters.zkPath = 
pluginConfig.getString(OpenMldbSourceOptions.ZK_PATH.key());
         }
         // set host
-        if (pluginConfig.hasPath(OpenMldbConfig.HOST.key())) {
-            openMldbParameters.host = 
pluginConfig.getString(OpenMldbConfig.HOST.key());
+        if (pluginConfig.hasPath(OpenMldbSourceOptions.HOST.key())) {
+            openMldbParameters.host = 
pluginConfig.getString(OpenMldbSourceOptions.HOST.key());
         }
         // set port
-        if (pluginConfig.hasPath(OpenMldbConfig.PORT.key())) {
-            openMldbParameters.port = 
pluginConfig.getInt(OpenMldbConfig.PORT.key());
+        if (pluginConfig.hasPath(OpenMldbSourceOptions.PORT.key())) {
+            openMldbParameters.port = 
pluginConfig.getInt(OpenMldbSourceOptions.PORT.key());
         }
         // set session timeout
-        if (pluginConfig.hasPath(OpenMldbConfig.SESSION_TIMEOUT.key())) {
+        if (pluginConfig.hasPath(OpenMldbSourceOptions.SESSION_TIMEOUT.key())) 
{
             openMldbParameters.sessionTimeout =
-                    pluginConfig.getInt(OpenMldbConfig.SESSION_TIMEOUT.key());
+                    
pluginConfig.getInt(OpenMldbSourceOptions.SESSION_TIMEOUT.key());
         }
         // set request timeout
-        if (pluginConfig.hasPath(OpenMldbConfig.REQUEST_TIMEOUT.key())) {
+        if (pluginConfig.hasPath(OpenMldbSourceOptions.REQUEST_TIMEOUT.key())) 
{
             openMldbParameters.requestTimeout =
-                    pluginConfig.getInt(OpenMldbConfig.REQUEST_TIMEOUT.key());
+                    
pluginConfig.getInt(OpenMldbSourceOptions.REQUEST_TIMEOUT.key());
         }
         return openMldbParameters;
     }
diff --git 
a/seatunnel-connectors-v2/connector-openmldb/src/main/java/org/apache/seatunnel/connectors/seatunnel/openmldb/config/OpenMldbConfig.java
 
b/seatunnel-connectors-v2/connector-openmldb/src/main/java/org/apache/seatunnel/connectors/seatunnel/openmldb/config/OpenMldbSourceOptions.java
similarity index 98%
rename from 
seatunnel-connectors-v2/connector-openmldb/src/main/java/org/apache/seatunnel/connectors/seatunnel/openmldb/config/OpenMldbConfig.java
rename to 
seatunnel-connectors-v2/connector-openmldb/src/main/java/org/apache/seatunnel/connectors/seatunnel/openmldb/config/OpenMldbSourceOptions.java
index 892f4592e0..e8f71cead5 100644
--- 
a/seatunnel-connectors-v2/connector-openmldb/src/main/java/org/apache/seatunnel/connectors/seatunnel/openmldb/config/OpenMldbConfig.java
+++ 
b/seatunnel-connectors-v2/connector-openmldb/src/main/java/org/apache/seatunnel/connectors/seatunnel/openmldb/config/OpenMldbSourceOptions.java
@@ -20,7 +20,7 @@ package 
org.apache.seatunnel.connectors.seatunnel.openmldb.config;
 import org.apache.seatunnel.api.configuration.Option;
 import org.apache.seatunnel.api.configuration.Options;
 
-public class OpenMldbConfig {
+public class OpenMldbSourceOptions {
     private static final int DEFAULT_SESSION_TIMEOUT = 10000;
     private static final int DEFAULT_REQUEST_TIMEOUT = 60000;
     public static final Option<String> ZK_HOST =
diff --git 
a/seatunnel-connectors-v2/connector-openmldb/src/main/java/org/apache/seatunnel/connectors/seatunnel/openmldb/source/OpenMldbSource.java
 
b/seatunnel-connectors-v2/connector-openmldb/src/main/java/org/apache/seatunnel/connectors/seatunnel/openmldb/source/OpenMldbSource.java
index e5fb0a0dd6..7b038544eb 100644
--- 
a/seatunnel-connectors-v2/connector-openmldb/src/main/java/org/apache/seatunnel/connectors/seatunnel/openmldb/source/OpenMldbSource.java
+++ 
b/seatunnel-connectors-v2/connector-openmldb/src/main/java/org/apache/seatunnel/connectors/seatunnel/openmldb/source/OpenMldbSource.java
@@ -17,28 +17,22 @@
 
 package org.apache.seatunnel.connectors.seatunnel.openmldb.source;
 
-import org.apache.seatunnel.shade.com.typesafe.config.Config;
-
 import org.apache.seatunnel.api.common.JobContext;
-import org.apache.seatunnel.api.common.PrepareFailException;
-import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
 import org.apache.seatunnel.api.source.Boundedness;
-import org.apache.seatunnel.api.source.SeaTunnelSource;
 import org.apache.seatunnel.api.source.SupportColumnProjection;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.PhysicalColumn;
+import org.apache.seatunnel.api.table.catalog.TableIdentifier;
+import org.apache.seatunnel.api.table.catalog.TableSchema;
 import org.apache.seatunnel.api.table.type.BasicType;
 import org.apache.seatunnel.api.table.type.LocalTimeType;
 import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
-import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
-import org.apache.seatunnel.common.config.CheckConfigUtil;
-import org.apache.seatunnel.common.config.CheckResult;
 import org.apache.seatunnel.common.constants.JobMode;
-import org.apache.seatunnel.common.constants.PluginType;
 import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated;
 import 
org.apache.seatunnel.connectors.seatunnel.common.source.AbstractSingleSplitReader;
 import 
org.apache.seatunnel.connectors.seatunnel.common.source.AbstractSingleSplitSource;
 import 
org.apache.seatunnel.connectors.seatunnel.common.source.SingleSplitReaderContext;
-import 
org.apache.seatunnel.connectors.seatunnel.openmldb.config.OpenMldbConfig;
 import 
org.apache.seatunnel.connectors.seatunnel.openmldb.config.OpenMldbParameters;
 import 
org.apache.seatunnel.connectors.seatunnel.openmldb.config.OpenMldbSqlExecutor;
 import 
org.apache.seatunnel.connectors.seatunnel.openmldb.exception.OpenMldbConnectorException;
@@ -47,60 +41,20 @@ import com._4paradigm.openmldb.sdk.Column;
 import com._4paradigm.openmldb.sdk.Schema;
 import com._4paradigm.openmldb.sdk.SqlException;
 import com._4paradigm.openmldb.sdk.impl.SqlClusterExecutor;
-import com.google.auto.service.AutoService;
 
 import java.sql.SQLException;
 import java.sql.Types;
+import java.util.Collections;
 import java.util.List;
 
-@AutoService(SeaTunnelSource.class)
 public class OpenMldbSource extends AbstractSingleSplitSource<SeaTunnelRow>
         implements SupportColumnProjection {
-    private OpenMldbParameters openMldbParameters;
+    private final OpenMldbParameters openMldbParameters;
+    private final CatalogTable catalogTable;
     private JobContext jobContext;
-    private SeaTunnelRowType seaTunnelRowType;
 
-    @Override
-    public String getPluginName() {
-        return "OpenMldb";
-    }
-
-    @Override
-    public void prepare(Config pluginConfig) throws PrepareFailException {
-        CheckResult result =
-                CheckConfigUtil.checkAllExists(
-                        pluginConfig,
-                        OpenMldbConfig.CLUSTER_MODE.key(),
-                        OpenMldbConfig.SQL.key(),
-                        OpenMldbConfig.DATABASE.key());
-        if (!result.isSuccess()) {
-            throw new OpenMldbConnectorException(
-                    SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
-                    String.format(
-                            "PluginName: %s, PluginType: %s, Message: %s",
-                            getPluginName(), PluginType.SOURCE, 
result.getMsg()));
-        }
-        if (pluginConfig.getBoolean(OpenMldbConfig.CLUSTER_MODE.key())) {
-            // cluster mode
-            result =
-                    CheckConfigUtil.checkAllExists(
-                            pluginConfig,
-                            OpenMldbConfig.ZK_HOST.key(),
-                            OpenMldbConfig.ZK_PATH.key());
-        } else {
-            // single mode
-            result =
-                    CheckConfigUtil.checkAllExists(
-                            pluginConfig, OpenMldbConfig.HOST.key(), 
OpenMldbConfig.PORT.key());
-        }
-        if (!result.isSuccess()) {
-            throw new OpenMldbConnectorException(
-                    SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
-                    String.format(
-                            "PluginName: %s, PluginType: %s, Message: %s",
-                            getPluginName(), PluginType.SOURCE, 
result.getMsg()));
-        }
-        this.openMldbParameters = 
OpenMldbParameters.buildWithConfig(pluginConfig);
+    public OpenMldbSource(OpenMldbParameters openMldbParameters) {
+        this.openMldbParameters = openMldbParameters;
         OpenMldbSqlExecutor.initSdkOption(openMldbParameters);
         try {
             SqlClusterExecutor sqlExecutor = 
OpenMldbSqlExecutor.getSqlExecutor();
@@ -108,7 +62,7 @@ public class OpenMldbSource extends 
AbstractSingleSplitSource<SeaTunnelRow>
                     sqlExecutor.getInputSchema(
                             openMldbParameters.getDatabase(), 
openMldbParameters.getSql());
             List<Column> columnList = inputSchema.getColumnList();
-            this.seaTunnelRowType = convert(columnList);
+            this.catalogTable = convert(columnList);
         } catch (SQLException | SqlException e) {
             throw new OpenMldbConnectorException(
                     CommonErrorCodeDeprecated.TABLE_SCHEMA_GET_FAILED,
@@ -116,6 +70,11 @@ public class OpenMldbSource extends 
AbstractSingleSplitSource<SeaTunnelRow>
         }
     }
 
+    @Override
+    public String getPluginName() {
+        return "OpenMldb";
+    }
+
     @Override
     public Boundedness getBoundedness() {
         return JobMode.BATCH.equals(jobContext.getJobMode())
@@ -124,14 +83,15 @@ public class OpenMldbSource extends 
AbstractSingleSplitSource<SeaTunnelRow>
     }
 
     @Override
-    public SeaTunnelDataType<SeaTunnelRow> getProducedType() {
-        return seaTunnelRowType;
+    public List<CatalogTable> getProducedCatalogTables() {
+        return Collections.singletonList(catalogTable);
     }
 
     @Override
     public AbstractSingleSplitReader<SeaTunnelRow> createReader(
             SingleSplitReaderContext readerContext) throws Exception {
-        return new OpenMldbSourceReader(openMldbParameters, seaTunnelRowType, 
readerContext);
+        return new OpenMldbSourceReader(
+                openMldbParameters, catalogTable.getSeaTunnelRowType(), 
readerContext);
     }
 
     @Override
@@ -166,14 +126,24 @@ public class OpenMldbSource extends 
AbstractSingleSplitSource<SeaTunnelRow>
         }
     }
 
-    private SeaTunnelRowType convert(List<Column> columnList) {
-        String[] fieldsName = new String[columnList.size()];
-        SeaTunnelDataType<?>[] fieldsType = new 
SeaTunnelDataType<?>[columnList.size()];
+    private CatalogTable convert(List<Column> columnList) {
+        TableSchema.Builder builder = TableSchema.builder();
         for (int i = 0; i < columnList.size(); i++) {
             Column column = columnList.get(i);
-            fieldsName[i] = column.getColumnName();
-            fieldsType[i] = convertSeaTunnelDataType(column.getSqlType());
+            builder.column(
+                    PhysicalColumn.of(
+                            column.getColumnName(),
+                            convertSeaTunnelDataType(column.getSqlType()),
+                            (Long) null,
+                            column.isNotNull(),
+                            null,
+                            null));
         }
-        return new SeaTunnelRowType(fieldsName, fieldsType);
+        return CatalogTable.of(
+                TableIdentifier.of("OpenMldb", 
openMldbParameters.getDatabase(), "default"),
+                builder.build(),
+                null,
+                null,
+                null);
     }
 }
diff --git 
a/seatunnel-connectors-v2/connector-openmldb/src/main/java/org/apache/seatunnel/connectors/seatunnel/openmldb/source/OpenMldbSourceFactory.java
 
b/seatunnel-connectors-v2/connector-openmldb/src/main/java/org/apache/seatunnel/connectors/seatunnel/openmldb/source/OpenMldbSourceFactory.java
index f31aab5f82..def0c84181 100644
--- 
a/seatunnel-connectors-v2/connector-openmldb/src/main/java/org/apache/seatunnel/connectors/seatunnel/openmldb/source/OpenMldbSourceFactory.java
+++ 
b/seatunnel-connectors-v2/connector-openmldb/src/main/java/org/apache/seatunnel/connectors/seatunnel/openmldb/source/OpenMldbSourceFactory.java
@@ -19,12 +19,18 @@ package 
org.apache.seatunnel.connectors.seatunnel.openmldb.source;
 
 import org.apache.seatunnel.api.configuration.util.OptionRule;
 import org.apache.seatunnel.api.source.SeaTunnelSource;
+import org.apache.seatunnel.api.source.SourceSplit;
+import org.apache.seatunnel.api.table.connector.TableSource;
 import org.apache.seatunnel.api.table.factory.Factory;
 import org.apache.seatunnel.api.table.factory.TableSourceFactory;
-import 
org.apache.seatunnel.connectors.seatunnel.openmldb.config.OpenMldbConfig;
+import org.apache.seatunnel.api.table.factory.TableSourceFactoryContext;
+import 
org.apache.seatunnel.connectors.seatunnel.openmldb.config.OpenMldbParameters;
+import 
org.apache.seatunnel.connectors.seatunnel.openmldb.config.OpenMldbSourceOptions;
 
 import com.google.auto.service.AutoService;
 
+import java.io.Serializable;
+
 @AutoService(Factory.class)
 public class OpenMldbSourceFactory implements TableSourceFactory {
     @Override
@@ -35,21 +41,21 @@ public class OpenMldbSourceFactory implements 
TableSourceFactory {
     @Override
     public OptionRule optionRule() {
         return OptionRule.builder()
-                .required(OpenMldbConfig.CLUSTER_MODE)
-                .required(OpenMldbConfig.SQL)
-                .required(OpenMldbConfig.DATABASE)
-                .optional(OpenMldbConfig.SESSION_TIMEOUT)
-                .optional(OpenMldbConfig.REQUEST_TIMEOUT)
+                .required(OpenMldbSourceOptions.CLUSTER_MODE)
+                .required(OpenMldbSourceOptions.SQL)
+                .required(OpenMldbSourceOptions.DATABASE)
+                .optional(OpenMldbSourceOptions.SESSION_TIMEOUT)
+                .optional(OpenMldbSourceOptions.REQUEST_TIMEOUT)
                 .conditional(
-                        OpenMldbConfig.CLUSTER_MODE,
+                        OpenMldbSourceOptions.CLUSTER_MODE,
                         false,
-                        OpenMldbConfig.HOST,
-                        OpenMldbConfig.PORT)
+                        OpenMldbSourceOptions.HOST,
+                        OpenMldbSourceOptions.PORT)
                 .conditional(
-                        OpenMldbConfig.CLUSTER_MODE,
+                        OpenMldbSourceOptions.CLUSTER_MODE,
                         true,
-                        OpenMldbConfig.ZK_HOST,
-                        OpenMldbConfig.ZK_PATH)
+                        OpenMldbSourceOptions.ZK_HOST,
+                        OpenMldbSourceOptions.ZK_PATH)
                 .build();
     }
 
@@ -57,4 +63,12 @@ public class OpenMldbSourceFactory implements 
TableSourceFactory {
     public Class<? extends SeaTunnelSource> getSourceClass() {
         return OpenMldbSource.class;
     }
+
+    @Override
+    public <T, SplitT extends SourceSplit, StateT extends Serializable>
+            TableSource<T, SplitT, StateT> 
createSource(TableSourceFactoryContext context) {
+        OpenMldbParameters openMldbParameters =
+                
OpenMldbParameters.buildWithConfig(context.getOptions().toConfig());
+        return () -> (SeaTunnelSource<T, SplitT, StateT>) new 
OpenMldbSource(openMldbParameters);
+    }
 }

Reply via email to