This is an automated email from the ASF dual-hosted git repository.
wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new d324fc59a4 [improve] openmldb options (#9166)
d324fc59a4 is described below
commit d324fc59a4adfbac9fe1e61de9ef0672ab3bb2fe
Author: Jarvis <[email protected]>
AuthorDate: Fri Apr 25 10:35:46 2025 +0800
[improve] openmldb options (#9166)
---
.../seatunnel/api/ConnectorOptionCheckTest.java | 1 -
.../openmldb/config/OpenMldbParameters.java | 35 ++++----
...nMldbConfig.java => OpenMldbSourceOptions.java} | 2 +-
.../seatunnel/openmldb/source/OpenMldbSource.java | 100 ++++++++-------------
.../openmldb/source/OpenMldbSourceFactory.java | 38 +++++---
5 files changed, 80 insertions(+), 96 deletions(-)
diff --git
a/seatunnel-ci-tools/src/test/java/org/apache/seatunnel/api/ConnectorOptionCheckTest.java
b/seatunnel-ci-tools/src/test/java/org/apache/seatunnel/api/ConnectorOptionCheckTest.java
index 9da7933f7a..5712d27918 100644
---
a/seatunnel-ci-tools/src/test/java/org/apache/seatunnel/api/ConnectorOptionCheckTest.java
+++
b/seatunnel-ci-tools/src/test/java/org/apache/seatunnel/api/ConnectorOptionCheckTest.java
@@ -209,7 +209,6 @@ public class ConnectorOptionCheckTest {
whiteList.add("Neo4jSourceOptions");
whiteList.add("QdrantSourceOptions");
whiteList.add("SocketSourceOptions");
- whiteList.add("OpenMldbSourceOptions");
whiteList.add("PostgresIncrementalSourceOptions");
whiteList.add("SqlServerIncrementalSourceOptions");
whiteList.add("OracleIncrementalSourceOptions");
diff --git
a/seatunnel-connectors-v2/connector-openmldb/src/main/java/org/apache/seatunnel/connectors/seatunnel/openmldb/config/OpenMldbParameters.java
b/seatunnel-connectors-v2/connector-openmldb/src/main/java/org/apache/seatunnel/connectors/seatunnel/openmldb/config/OpenMldbParameters.java
index 09d1a6b075..c897755690 100644
---
a/seatunnel-connectors-v2/connector-openmldb/src/main/java/org/apache/seatunnel/connectors/seatunnel/openmldb/config/OpenMldbParameters.java
+++
b/seatunnel-connectors-v2/connector-openmldb/src/main/java/org/apache/seatunnel/connectors/seatunnel/openmldb/config/OpenMldbParameters.java
@@ -29,8 +29,8 @@ public class OpenMldbParameters implements Serializable {
private String zkPath;
private String host;
private int port;
- private int sessionTimeout = OpenMldbConfig.SESSION_TIMEOUT.defaultValue();
- private int requestTimeout = OpenMldbConfig.REQUEST_TIMEOUT.defaultValue();
+ private int sessionTimeout =
OpenMldbSourceOptions.SESSION_TIMEOUT.defaultValue();
+ private int requestTimeout =
OpenMldbSourceOptions.REQUEST_TIMEOUT.defaultValue();
private Boolean clusterMode;
private String database;
private String sql;
@@ -41,34 +41,35 @@ public class OpenMldbParameters implements Serializable {
public static OpenMldbParameters buildWithConfig(Config pluginConfig) {
OpenMldbParameters openMldbParameters = new OpenMldbParameters();
- openMldbParameters.clusterMode =
pluginConfig.getBoolean(OpenMldbConfig.CLUSTER_MODE.key());
- openMldbParameters.database =
pluginConfig.getString(OpenMldbConfig.DATABASE.key());
- openMldbParameters.sql =
pluginConfig.getString(OpenMldbConfig.SQL.key());
+ openMldbParameters.clusterMode =
+
pluginConfig.getBoolean(OpenMldbSourceOptions.CLUSTER_MODE.key());
+ openMldbParameters.database =
pluginConfig.getString(OpenMldbSourceOptions.DATABASE.key());
+ openMldbParameters.sql =
pluginConfig.getString(OpenMldbSourceOptions.SQL.key());
// set zkHost
- if (pluginConfig.hasPath(OpenMldbConfig.ZK_HOST.key())) {
- openMldbParameters.zkHost =
pluginConfig.getString(OpenMldbConfig.ZK_HOST.key());
+ if (pluginConfig.hasPath(OpenMldbSourceOptions.ZK_HOST.key())) {
+ openMldbParameters.zkHost =
pluginConfig.getString(OpenMldbSourceOptions.ZK_HOST.key());
}
// set zkPath
- if (pluginConfig.hasPath(OpenMldbConfig.ZK_PATH.key())) {
- openMldbParameters.zkPath =
pluginConfig.getString(OpenMldbConfig.ZK_PATH.key());
+ if (pluginConfig.hasPath(OpenMldbSourceOptions.ZK_PATH.key())) {
+ openMldbParameters.zkPath =
pluginConfig.getString(OpenMldbSourceOptions.ZK_PATH.key());
}
// set host
- if (pluginConfig.hasPath(OpenMldbConfig.HOST.key())) {
- openMldbParameters.host =
pluginConfig.getString(OpenMldbConfig.HOST.key());
+ if (pluginConfig.hasPath(OpenMldbSourceOptions.HOST.key())) {
+ openMldbParameters.host =
pluginConfig.getString(OpenMldbSourceOptions.HOST.key());
}
// set port
- if (pluginConfig.hasPath(OpenMldbConfig.PORT.key())) {
- openMldbParameters.port =
pluginConfig.getInt(OpenMldbConfig.PORT.key());
+ if (pluginConfig.hasPath(OpenMldbSourceOptions.PORT.key())) {
+ openMldbParameters.port =
pluginConfig.getInt(OpenMldbSourceOptions.PORT.key());
}
// set session timeout
- if (pluginConfig.hasPath(OpenMldbConfig.SESSION_TIMEOUT.key())) {
+ if (pluginConfig.hasPath(OpenMldbSourceOptions.SESSION_TIMEOUT.key()))
{
openMldbParameters.sessionTimeout =
- pluginConfig.getInt(OpenMldbConfig.SESSION_TIMEOUT.key());
+
pluginConfig.getInt(OpenMldbSourceOptions.SESSION_TIMEOUT.key());
}
// set request timeout
- if (pluginConfig.hasPath(OpenMldbConfig.REQUEST_TIMEOUT.key())) {
+ if (pluginConfig.hasPath(OpenMldbSourceOptions.REQUEST_TIMEOUT.key()))
{
openMldbParameters.requestTimeout =
- pluginConfig.getInt(OpenMldbConfig.REQUEST_TIMEOUT.key());
+
pluginConfig.getInt(OpenMldbSourceOptions.REQUEST_TIMEOUT.key());
}
return openMldbParameters;
}
diff --git
a/seatunnel-connectors-v2/connector-openmldb/src/main/java/org/apache/seatunnel/connectors/seatunnel/openmldb/config/OpenMldbConfig.java
b/seatunnel-connectors-v2/connector-openmldb/src/main/java/org/apache/seatunnel/connectors/seatunnel/openmldb/config/OpenMldbSourceOptions.java
similarity index 98%
rename from
seatunnel-connectors-v2/connector-openmldb/src/main/java/org/apache/seatunnel/connectors/seatunnel/openmldb/config/OpenMldbConfig.java
rename to
seatunnel-connectors-v2/connector-openmldb/src/main/java/org/apache/seatunnel/connectors/seatunnel/openmldb/config/OpenMldbSourceOptions.java
index 892f4592e0..e8f71cead5 100644
---
a/seatunnel-connectors-v2/connector-openmldb/src/main/java/org/apache/seatunnel/connectors/seatunnel/openmldb/config/OpenMldbConfig.java
+++
b/seatunnel-connectors-v2/connector-openmldb/src/main/java/org/apache/seatunnel/connectors/seatunnel/openmldb/config/OpenMldbSourceOptions.java
@@ -20,7 +20,7 @@ package
org.apache.seatunnel.connectors.seatunnel.openmldb.config;
import org.apache.seatunnel.api.configuration.Option;
import org.apache.seatunnel.api.configuration.Options;
-public class OpenMldbConfig {
+public class OpenMldbSourceOptions {
private static final int DEFAULT_SESSION_TIMEOUT = 10000;
private static final int DEFAULT_REQUEST_TIMEOUT = 60000;
public static final Option<String> ZK_HOST =
diff --git
a/seatunnel-connectors-v2/connector-openmldb/src/main/java/org/apache/seatunnel/connectors/seatunnel/openmldb/source/OpenMldbSource.java
b/seatunnel-connectors-v2/connector-openmldb/src/main/java/org/apache/seatunnel/connectors/seatunnel/openmldb/source/OpenMldbSource.java
index e5fb0a0dd6..7b038544eb 100644
---
a/seatunnel-connectors-v2/connector-openmldb/src/main/java/org/apache/seatunnel/connectors/seatunnel/openmldb/source/OpenMldbSource.java
+++
b/seatunnel-connectors-v2/connector-openmldb/src/main/java/org/apache/seatunnel/connectors/seatunnel/openmldb/source/OpenMldbSource.java
@@ -17,28 +17,22 @@
package org.apache.seatunnel.connectors.seatunnel.openmldb.source;
-import org.apache.seatunnel.shade.com.typesafe.config.Config;
-
import org.apache.seatunnel.api.common.JobContext;
-import org.apache.seatunnel.api.common.PrepareFailException;
-import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
import org.apache.seatunnel.api.source.Boundedness;
-import org.apache.seatunnel.api.source.SeaTunnelSource;
import org.apache.seatunnel.api.source.SupportColumnProjection;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.PhysicalColumn;
+import org.apache.seatunnel.api.table.catalog.TableIdentifier;
+import org.apache.seatunnel.api.table.catalog.TableSchema;
import org.apache.seatunnel.api.table.type.BasicType;
import org.apache.seatunnel.api.table.type.LocalTimeType;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
-import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
-import org.apache.seatunnel.common.config.CheckConfigUtil;
-import org.apache.seatunnel.common.config.CheckResult;
import org.apache.seatunnel.common.constants.JobMode;
-import org.apache.seatunnel.common.constants.PluginType;
import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated;
import
org.apache.seatunnel.connectors.seatunnel.common.source.AbstractSingleSplitReader;
import
org.apache.seatunnel.connectors.seatunnel.common.source.AbstractSingleSplitSource;
import
org.apache.seatunnel.connectors.seatunnel.common.source.SingleSplitReaderContext;
-import
org.apache.seatunnel.connectors.seatunnel.openmldb.config.OpenMldbConfig;
import
org.apache.seatunnel.connectors.seatunnel.openmldb.config.OpenMldbParameters;
import
org.apache.seatunnel.connectors.seatunnel.openmldb.config.OpenMldbSqlExecutor;
import
org.apache.seatunnel.connectors.seatunnel.openmldb.exception.OpenMldbConnectorException;
@@ -47,60 +41,20 @@ import com._4paradigm.openmldb.sdk.Column;
import com._4paradigm.openmldb.sdk.Schema;
import com._4paradigm.openmldb.sdk.SqlException;
import com._4paradigm.openmldb.sdk.impl.SqlClusterExecutor;
-import com.google.auto.service.AutoService;
import java.sql.SQLException;
import java.sql.Types;
+import java.util.Collections;
import java.util.List;
-@AutoService(SeaTunnelSource.class)
public class OpenMldbSource extends AbstractSingleSplitSource<SeaTunnelRow>
implements SupportColumnProjection {
- private OpenMldbParameters openMldbParameters;
+ private final OpenMldbParameters openMldbParameters;
+ private final CatalogTable catalogTable;
private JobContext jobContext;
- private SeaTunnelRowType seaTunnelRowType;
- @Override
- public String getPluginName() {
- return "OpenMldb";
- }
-
- @Override
- public void prepare(Config pluginConfig) throws PrepareFailException {
- CheckResult result =
- CheckConfigUtil.checkAllExists(
- pluginConfig,
- OpenMldbConfig.CLUSTER_MODE.key(),
- OpenMldbConfig.SQL.key(),
- OpenMldbConfig.DATABASE.key());
- if (!result.isSuccess()) {
- throw new OpenMldbConnectorException(
- SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
- String.format(
- "PluginName: %s, PluginType: %s, Message: %s",
- getPluginName(), PluginType.SOURCE,
result.getMsg()));
- }
- if (pluginConfig.getBoolean(OpenMldbConfig.CLUSTER_MODE.key())) {
- // cluster mode
- result =
- CheckConfigUtil.checkAllExists(
- pluginConfig,
- OpenMldbConfig.ZK_HOST.key(),
- OpenMldbConfig.ZK_PATH.key());
- } else {
- // single mode
- result =
- CheckConfigUtil.checkAllExists(
- pluginConfig, OpenMldbConfig.HOST.key(),
OpenMldbConfig.PORT.key());
- }
- if (!result.isSuccess()) {
- throw new OpenMldbConnectorException(
- SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
- String.format(
- "PluginName: %s, PluginType: %s, Message: %s",
- getPluginName(), PluginType.SOURCE,
result.getMsg()));
- }
- this.openMldbParameters =
OpenMldbParameters.buildWithConfig(pluginConfig);
+ public OpenMldbSource(OpenMldbParameters openMldbParameters) {
+ this.openMldbParameters = openMldbParameters;
OpenMldbSqlExecutor.initSdkOption(openMldbParameters);
try {
SqlClusterExecutor sqlExecutor =
OpenMldbSqlExecutor.getSqlExecutor();
@@ -108,7 +62,7 @@ public class OpenMldbSource extends
AbstractSingleSplitSource<SeaTunnelRow>
sqlExecutor.getInputSchema(
openMldbParameters.getDatabase(),
openMldbParameters.getSql());
List<Column> columnList = inputSchema.getColumnList();
- this.seaTunnelRowType = convert(columnList);
+ this.catalogTable = convert(columnList);
} catch (SQLException | SqlException e) {
throw new OpenMldbConnectorException(
CommonErrorCodeDeprecated.TABLE_SCHEMA_GET_FAILED,
@@ -116,6 +70,11 @@ public class OpenMldbSource extends
AbstractSingleSplitSource<SeaTunnelRow>
}
}
+ @Override
+ public String getPluginName() {
+ return "OpenMldb";
+ }
+
@Override
public Boundedness getBoundedness() {
return JobMode.BATCH.equals(jobContext.getJobMode())
@@ -124,14 +83,15 @@ public class OpenMldbSource extends
AbstractSingleSplitSource<SeaTunnelRow>
}
@Override
- public SeaTunnelDataType<SeaTunnelRow> getProducedType() {
- return seaTunnelRowType;
+ public List<CatalogTable> getProducedCatalogTables() {
+ return Collections.singletonList(catalogTable);
}
@Override
public AbstractSingleSplitReader<SeaTunnelRow> createReader(
SingleSplitReaderContext readerContext) throws Exception {
- return new OpenMldbSourceReader(openMldbParameters, seaTunnelRowType,
readerContext);
+ return new OpenMldbSourceReader(
+ openMldbParameters, catalogTable.getSeaTunnelRowType(),
readerContext);
}
@Override
@@ -166,14 +126,24 @@ public class OpenMldbSource extends
AbstractSingleSplitSource<SeaTunnelRow>
}
}
- private SeaTunnelRowType convert(List<Column> columnList) {
- String[] fieldsName = new String[columnList.size()];
- SeaTunnelDataType<?>[] fieldsType = new
SeaTunnelDataType<?>[columnList.size()];
+ private CatalogTable convert(List<Column> columnList) {
+ TableSchema.Builder builder = TableSchema.builder();
for (int i = 0; i < columnList.size(); i++) {
Column column = columnList.get(i);
- fieldsName[i] = column.getColumnName();
- fieldsType[i] = convertSeaTunnelDataType(column.getSqlType());
+ builder.column(
+ PhysicalColumn.of(
+ column.getColumnName(),
+ convertSeaTunnelDataType(column.getSqlType()),
+ (Long) null,
+ column.isNotNull(),
+ null,
+ null));
}
- return new SeaTunnelRowType(fieldsName, fieldsType);
+ return CatalogTable.of(
+ TableIdentifier.of("OpenMldb",
openMldbParameters.getDatabase(), "default"),
+ builder.build(),
+ null,
+ null,
+ null);
}
}
diff --git
a/seatunnel-connectors-v2/connector-openmldb/src/main/java/org/apache/seatunnel/connectors/seatunnel/openmldb/source/OpenMldbSourceFactory.java
b/seatunnel-connectors-v2/connector-openmldb/src/main/java/org/apache/seatunnel/connectors/seatunnel/openmldb/source/OpenMldbSourceFactory.java
index f31aab5f82..def0c84181 100644
---
a/seatunnel-connectors-v2/connector-openmldb/src/main/java/org/apache/seatunnel/connectors/seatunnel/openmldb/source/OpenMldbSourceFactory.java
+++
b/seatunnel-connectors-v2/connector-openmldb/src/main/java/org/apache/seatunnel/connectors/seatunnel/openmldb/source/OpenMldbSourceFactory.java
@@ -19,12 +19,18 @@ package
org.apache.seatunnel.connectors.seatunnel.openmldb.source;
import org.apache.seatunnel.api.configuration.util.OptionRule;
import org.apache.seatunnel.api.source.SeaTunnelSource;
+import org.apache.seatunnel.api.source.SourceSplit;
+import org.apache.seatunnel.api.table.connector.TableSource;
import org.apache.seatunnel.api.table.factory.Factory;
import org.apache.seatunnel.api.table.factory.TableSourceFactory;
-import
org.apache.seatunnel.connectors.seatunnel.openmldb.config.OpenMldbConfig;
+import org.apache.seatunnel.api.table.factory.TableSourceFactoryContext;
+import
org.apache.seatunnel.connectors.seatunnel.openmldb.config.OpenMldbParameters;
+import
org.apache.seatunnel.connectors.seatunnel.openmldb.config.OpenMldbSourceOptions;
import com.google.auto.service.AutoService;
+import java.io.Serializable;
+
@AutoService(Factory.class)
public class OpenMldbSourceFactory implements TableSourceFactory {
@Override
@@ -35,21 +41,21 @@ public class OpenMldbSourceFactory implements
TableSourceFactory {
@Override
public OptionRule optionRule() {
return OptionRule.builder()
- .required(OpenMldbConfig.CLUSTER_MODE)
- .required(OpenMldbConfig.SQL)
- .required(OpenMldbConfig.DATABASE)
- .optional(OpenMldbConfig.SESSION_TIMEOUT)
- .optional(OpenMldbConfig.REQUEST_TIMEOUT)
+ .required(OpenMldbSourceOptions.CLUSTER_MODE)
+ .required(OpenMldbSourceOptions.SQL)
+ .required(OpenMldbSourceOptions.DATABASE)
+ .optional(OpenMldbSourceOptions.SESSION_TIMEOUT)
+ .optional(OpenMldbSourceOptions.REQUEST_TIMEOUT)
.conditional(
- OpenMldbConfig.CLUSTER_MODE,
+ OpenMldbSourceOptions.CLUSTER_MODE,
false,
- OpenMldbConfig.HOST,
- OpenMldbConfig.PORT)
+ OpenMldbSourceOptions.HOST,
+ OpenMldbSourceOptions.PORT)
.conditional(
- OpenMldbConfig.CLUSTER_MODE,
+ OpenMldbSourceOptions.CLUSTER_MODE,
true,
- OpenMldbConfig.ZK_HOST,
- OpenMldbConfig.ZK_PATH)
+ OpenMldbSourceOptions.ZK_HOST,
+ OpenMldbSourceOptions.ZK_PATH)
.build();
}
@@ -57,4 +63,12 @@ public class OpenMldbSourceFactory implements
TableSourceFactory {
public Class<? extends SeaTunnelSource> getSourceClass() {
return OpenMldbSource.class;
}
+
+ @Override
+ public <T, SplitT extends SourceSplit, StateT extends Serializable>
+ TableSource<T, SplitT, StateT>
createSource(TableSourceFactoryContext context) {
+ OpenMldbParameters openMldbParameters =
+
OpenMldbParameters.buildWithConfig(context.getOptions().toConfig());
+ return () -> (SeaTunnelSource<T, SplitT, StateT>) new
OpenMldbSource(openMldbParameters);
+ }
}