This is an automated email from the ASF dual-hosted git repository.
wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new aded56299c [Improve][Connector-V2] Support TableSourceFactory on
StarRocks (#6498)
aded56299c is described below
commit aded56299c7f348fdf30047bbac0ba8a7f923a14
Author: xiaochen <[email protected]>
AuthorDate: Thu Mar 14 10:04:24 2024 +0800
[Improve][Connector-V2] Support TableSourceFactory on StarRocks (#6498)
---
docs/en/connector-v2/source/StarRocks.md | 7 ++
.../common/config/TypesafeConfigUtils.java | 33 --------
.../common/config/TypesafeConfigUtilsTest.java | 17 -----
.../starrocks/catalog/StarRocksCatalogFactory.java | 3 +-
.../seatunnel/starrocks/config/CommonConfig.java | 12 +++
.../seatunnel/starrocks/config/SourceConfig.java | 87 ++++++----------------
.../starrocks/sink/StarRocksSinkFactory.java | 3 +-
.../starrocks/source/StarRocksSource.java | 68 +++++------------
.../starrocks/source/StarRocksSourceFactory.java | 22 +++++-
.../seatunnel/starrocks/StarRocksFactoryTest.java | 2 +
.../starrocks-thrift-to-starrocks-streamload.conf | 1 +
11 files changed, 88 insertions(+), 167 deletions(-)
diff --git a/docs/en/connector-v2/source/StarRocks.md
b/docs/en/connector-v2/source/StarRocks.md
index ef00d4d7d5..df814105aa 100644
--- a/docs/en/connector-v2/source/StarRocks.md
+++ b/docs/en/connector-v2/source/StarRocks.md
@@ -35,6 +35,7 @@ delivers the query plan as a parameter to BE nodes, and then
obtains data result
| scan_batch_rows | int | no | 1024 |
| scan_mem_limit | long | no | 2147483648 |
| max_retries | int | no | 3 |
+| scan.params.* | string | no | - |
### node_urls [list]
@@ -136,6 +137,10 @@ The maximum memory space allowed for a single query in the
BE node, in bytes. Th
number of retry requests sent to StarRocks
+### scan.params. [string]
+
+The parameter of the scan data from be
+
## Example
```
@@ -164,6 +169,8 @@ source {
DATETIME_COL = TIMESTAMP
DATE_COL = DATE
}
+ scan.params.scanner_thread_pool_thread_num = "3"
+
}
}
```
diff --git
a/seatunnel-common/src/main/java/org/apache/seatunnel/common/config/TypesafeConfigUtils.java
b/seatunnel-common/src/main/java/org/apache/seatunnel/common/config/TypesafeConfigUtils.java
index c931535e18..d80273ece0 100644
---
a/seatunnel-common/src/main/java/org/apache/seatunnel/common/config/TypesafeConfigUtils.java
+++
b/seatunnel-common/src/main/java/org/apache/seatunnel/common/config/TypesafeConfigUtils.java
@@ -18,13 +18,11 @@
package org.apache.seatunnel.common.config;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
-import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory;
import org.apache.seatunnel.shade.com.typesafe.config.ConfigValue;
import lombok.NonNull;
import java.util.HashMap;
-import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
@@ -32,37 +30,6 @@ public final class TypesafeConfigUtils {
private TypesafeConfigUtils() {}
- /**
- * Extract sub config with fixed prefix
- *
- * @param source config source
- * @param prefix config prefix
- * @param keepPrefix true if keep prefix
- * @deprecated use org.apache.seatunnel.api.configuration.Option interface
instead
- */
- @Deprecated
- public static Config extractSubConfig(Config source, String prefix,
boolean keepPrefix) {
-
- // use LinkedHashMap to keep insertion order
- Map<String, String> values = new LinkedHashMap<>();
-
- for (Map.Entry<String, ConfigValue> entry : source.entrySet()) {
- final String key = entry.getKey();
- final String value = String.valueOf(entry.getValue().unwrapped());
-
- if (key.startsWith(prefix)) {
-
- if (keepPrefix) {
- values.put(key, value);
- } else {
- values.put(key.substring(prefix.length()), value);
- }
- }
- }
-
- return ConfigFactory.parseMap(values);
- }
-
/**
* Check if config with specific prefix exists
*
diff --git
a/seatunnel-common/src/test/java/org/apache/seatunnel/common/config/TypesafeConfigUtilsTest.java
b/seatunnel-common/src/test/java/org/apache/seatunnel/common/config/TypesafeConfigUtilsTest.java
index 3b297a731a..d9f5d50599 100644
---
a/seatunnel-common/src/test/java/org/apache/seatunnel/common/config/TypesafeConfigUtilsTest.java
+++
b/seatunnel-common/src/test/java/org/apache/seatunnel/common/config/TypesafeConfigUtilsTest.java
@@ -26,27 +26,10 @@ import org.junit.jupiter.api.Test;
import java.util.HashMap;
import java.util.Map;
-import static
org.apache.seatunnel.common.config.TypesafeConfigUtils.extractSubConfig;
import static
org.apache.seatunnel.common.config.TypesafeConfigUtils.hasSubConfig;
public class TypesafeConfigUtilsTest {
- @Test
- public void testExtractSubConfig() {
- Config config = getConfig();
- Config subConfig = extractSubConfig(config, "test.", true);
- Map<String, String> configMap = new HashMap<>();
- configMap.put("test.t0", "v0");
- configMap.put("test.t1", "v1");
- Assertions.assertEquals(ConfigFactory.parseMap(configMap), subConfig);
-
- subConfig = extractSubConfig(config, "test.", false);
- configMap = new HashMap<>();
- configMap.put("t0", "v0");
- configMap.put("t1", "v1");
- Assertions.assertEquals(ConfigFactory.parseMap(configMap), subConfig);
- }
-
@Test
public void testHasSubConfig() {
Config config = getConfig();
diff --git
a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/catalog/StarRocksCatalogFactory.java
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/catalog/StarRocksCatalogFactory.java
index 94a93b3f56..124e025719 100644
---
a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/catalog/StarRocksCatalogFactory.java
+++
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/catalog/StarRocksCatalogFactory.java
@@ -22,6 +22,7 @@ import org.apache.seatunnel.api.configuration.util.OptionRule;
import org.apache.seatunnel.api.table.catalog.Catalog;
import org.apache.seatunnel.api.table.factory.CatalogFactory;
import org.apache.seatunnel.api.table.factory.Factory;
+import org.apache.seatunnel.connectors.seatunnel.starrocks.config.CommonConfig;
import
org.apache.seatunnel.connectors.seatunnel.starrocks.config.StarRocksOptions;
import
org.apache.seatunnel.connectors.seatunnel.starrocks.config.StarRocksSinkOptions;
@@ -29,7 +30,7 @@ import com.google.auto.service.AutoService;
@AutoService(Factory.class)
public class StarRocksCatalogFactory implements CatalogFactory {
- public static final String IDENTIFIER = "StarRocks";
+ public static final String IDENTIFIER = CommonConfig.CONNECTOR_IDENTITY;
@Override
public Catalog createCatalog(String catalogName, ReadonlyConfig options) {
diff --git
a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/config/CommonConfig.java
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/config/CommonConfig.java
index ffb9b11594..c8a4775fcf 100644
---
a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/config/CommonConfig.java
+++
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/config/CommonConfig.java
@@ -19,6 +19,7 @@ package
org.apache.seatunnel.connectors.seatunnel.starrocks.config;
import org.apache.seatunnel.api.configuration.Option;
import org.apache.seatunnel.api.configuration.Options;
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import lombok.AllArgsConstructor;
import lombok.Getter;
@@ -31,6 +32,9 @@ import java.util.List;
@ToString
@AllArgsConstructor
public class CommonConfig implements Serializable {
+
+ public static final String CONNECTOR_IDENTITY = "StarRocks";
+
public static final Option<List<String>> NODE_URLS =
Options.key("nodeUrls")
.listType()
@@ -67,4 +71,12 @@ public class CommonConfig implements Serializable {
private String password;
private String database;
private String table;
+
+ public CommonConfig(ReadonlyConfig config) {
+ this.nodeUrls = config.get(NODE_URLS);
+ this.username = config.get(USERNAME);
+ this.password = config.get(PASSWORD);
+ this.database = config.get(DATABASE);
+ this.table = config.get(TABLE);
+ }
}
diff --git
a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/config/SourceConfig.java
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/config/SourceConfig.java
index 10d0358a8f..d069863843 100644
---
a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/config/SourceConfig.java
+++
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/config/SourceConfig.java
@@ -17,18 +17,14 @@
package org.apache.seatunnel.connectors.seatunnel.starrocks.config;
-import org.apache.seatunnel.shade.com.typesafe.config.Config;
-
import org.apache.seatunnel.api.configuration.Option;
import org.apache.seatunnel.api.configuration.Options;
-import org.apache.seatunnel.common.config.TypesafeConfigUtils;
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import lombok.Getter;
-import lombok.NonNull;
import lombok.Setter;
import java.util.HashMap;
-import java.util.List;
import java.util.Map;
@Setter
@@ -37,13 +33,26 @@ public class SourceConfig extends CommonConfig {
private static final long DEFAULT_SCAN_MEM_LIMIT = 1024 * 1024 * 1024L;
- public SourceConfig(
- @NonNull List<String> nodeUrls,
- @NonNull String username,
- @NonNull String password,
- @NonNull String database,
- @NonNull String table) {
- super(nodeUrls, username, password, database, table);
+ public SourceConfig(ReadonlyConfig config) {
+ super(config);
+ this.maxRetries = config.get(MAX_RETRIES);
+ this.requestTabletSize = config.get(QUERY_TABLET_SIZE);
+ this.scanFilter = config.get(SCAN_FILTER);
+ this.connectTimeoutMs = config.get(SCAN_CONNECT_TIMEOUT);
+ this.batchRows = config.get(SCAN_BATCH_ROWS);
+ this.keepAliveMin = config.get(SCAN_KEEP_ALIVE_MIN);
+ this.queryTimeoutSec = config.get(SCAN_QUERY_TIMEOUT_SEC);
+ this.memLimit = config.get(SCAN_MEM_LIMIT);
+
+ String prefix = STARROCKS_SCAN_CONFIG_PREFIX.key();
+ config.toMap()
+ .forEach(
+ (key, value) -> {
+ if (key.startsWith(prefix)) {
+ this.sourceOptionProps.put(
+
key.substring(prefix.length()).toLowerCase(), value);
+ }
+ });
}
public static final Option<Integer> MAX_RETRIES =
@@ -105,57 +114,5 @@ public class SourceConfig extends CommonConfig {
private int keepAliveMin = SCAN_KEEP_ALIVE_MIN.defaultValue();
private int batchRows = SCAN_BATCH_ROWS.defaultValue();
private int connectTimeoutMs = SCAN_CONNECT_TIMEOUT.defaultValue();
- private final Map<String, String> sourceOptionProps = new HashMap<>();
-
- public static SourceConfig loadConfig(Config pluginConfig) {
- SourceConfig sourceConfig =
- new SourceConfig(
- pluginConfig.getStringList(NODE_URLS.key()),
- pluginConfig.getString(USERNAME.key()),
- pluginConfig.getString(PASSWORD.key()),
- pluginConfig.getString(DATABASE.key()),
- pluginConfig.getString(TABLE.key()));
-
- if (pluginConfig.hasPath(MAX_RETRIES.key())) {
- sourceConfig.setMaxRetries(pluginConfig.getInt(MAX_RETRIES.key()));
- }
- if (pluginConfig.hasPath(QUERY_TABLET_SIZE.key())) {
-
sourceConfig.setRequestTabletSize(pluginConfig.getInt(QUERY_TABLET_SIZE.key()));
- }
- if (pluginConfig.hasPath(SCAN_FILTER.key())) {
-
sourceConfig.setScanFilter(pluginConfig.getString(SCAN_FILTER.key()));
- }
- if (pluginConfig.hasPath(SCAN_CONNECT_TIMEOUT.key())) {
-
sourceConfig.setConnectTimeoutMs(pluginConfig.getInt(SCAN_CONNECT_TIMEOUT.key()));
- }
- if (pluginConfig.hasPath(SCAN_BATCH_ROWS.key())) {
-
sourceConfig.setBatchRows(pluginConfig.getInt(SCAN_BATCH_ROWS.key()));
- }
- if (pluginConfig.hasPath(SCAN_KEEP_ALIVE_MIN.key())) {
-
sourceConfig.setKeepAliveMin(pluginConfig.getInt(SCAN_KEEP_ALIVE_MIN.key()));
- }
- if (pluginConfig.hasPath(SCAN_QUERY_TIMEOUT_SEC.key())) {
-
sourceConfig.setQueryTimeoutSec(pluginConfig.getInt(SCAN_QUERY_TIMEOUT_SEC.key()));
- }
- if (pluginConfig.hasPath(SCAN_MEM_LIMIT.key())) {
-
sourceConfig.setMemLimit(pluginConfig.getLong(SCAN_MEM_LIMIT.key()));
- }
- parseSourceOptionProperties(pluginConfig, sourceConfig);
- return sourceConfig;
- }
-
- private static void parseSourceOptionProperties(
- Config pluginConfig, SourceConfig sourceConfig) {
- Config sourceOptionConfig =
- TypesafeConfigUtils.extractSubConfig(
- pluginConfig, STARROCKS_SCAN_CONFIG_PREFIX.key(),
false);
- sourceOptionConfig
- .entrySet()
- .forEach(
- entry -> {
- final String configKey =
entry.getKey().toLowerCase();
- sourceConfig.sourceOptionProps.put(
- configKey, (String)
entry.getValue().unwrapped());
- });
- }
+ private Map<String, String> sourceOptionProps = new HashMap<>();
}
diff --git
a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSinkFactory.java
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSinkFactory.java
index 081645270f..08fc690698 100644
---
a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSinkFactory.java
+++
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSinkFactory.java
@@ -25,6 +25,7 @@ import org.apache.seatunnel.api.table.connector.TableSink;
import org.apache.seatunnel.api.table.factory.Factory;
import org.apache.seatunnel.api.table.factory.TableSinkFactory;
import org.apache.seatunnel.api.table.factory.TableSinkFactoryContext;
+import org.apache.seatunnel.connectors.seatunnel.starrocks.config.CommonConfig;
import org.apache.seatunnel.connectors.seatunnel.starrocks.config.SinkConfig;
import
org.apache.seatunnel.connectors.seatunnel.starrocks.config.StarRocksOptions;
import
org.apache.seatunnel.connectors.seatunnel.starrocks.config.StarRocksSinkOptions;
@@ -42,7 +43,7 @@ import static
org.apache.seatunnel.connectors.seatunnel.starrocks.config.StarRoc
public class StarRocksSinkFactory implements TableSinkFactory {
@Override
public String factoryIdentifier() {
- return "StarRocks";
+ return CommonConfig.CONNECTOR_IDENTITY;
}
@Override
diff --git
a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/source/StarRocksSource.java
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/source/StarRocksSource.java
index 738400436a..9bde1b22a3 100644
---
a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/source/StarRocksSource.java
+++
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/source/StarRocksSource.java
@@ -17,70 +17,35 @@
package org.apache.seatunnel.connectors.seatunnel.starrocks.source;
-import org.apache.seatunnel.shade.com.typesafe.config.Config;
-
-import org.apache.seatunnel.api.common.PrepareFailException;
-import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
import org.apache.seatunnel.api.source.Boundedness;
import org.apache.seatunnel.api.source.SeaTunnelSource;
import org.apache.seatunnel.api.source.SourceReader;
import org.apache.seatunnel.api.source.SourceSplitEnumerator;
-import org.apache.seatunnel.api.table.catalog.CatalogTableUtil;
-import org.apache.seatunnel.api.table.catalog.schema.TableSchemaOptions;
-import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
-import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
-import org.apache.seatunnel.common.config.CheckConfigUtil;
-import org.apache.seatunnel.common.config.CheckResult;
-import org.apache.seatunnel.common.constants.PluginType;
+import org.apache.seatunnel.connectors.seatunnel.starrocks.config.CommonConfig;
import org.apache.seatunnel.connectors.seatunnel.starrocks.config.SourceConfig;
-import
org.apache.seatunnel.connectors.seatunnel.starrocks.exception.StarRocksConnectorException;
import com.google.auto.service.AutoService;
-import static
org.apache.seatunnel.connectors.seatunnel.starrocks.config.CommonConfig.DATABASE;
-import static
org.apache.seatunnel.connectors.seatunnel.starrocks.config.CommonConfig.NODE_URLS;
-import static
org.apache.seatunnel.connectors.seatunnel.starrocks.config.CommonConfig.PASSWORD;
-import static
org.apache.seatunnel.connectors.seatunnel.starrocks.config.CommonConfig.TABLE;
-import static
org.apache.seatunnel.connectors.seatunnel.starrocks.config.CommonConfig.USERNAME;
+import java.util.Collections;
+import java.util.List;
@AutoService(SeaTunnelSource.class)
public class StarRocksSource
implements SeaTunnelSource<SeaTunnelRow, StarRocksSourceSplit,
StarRocksSourceState> {
- private SeaTunnelRowType typeInfo;
+ private CatalogTable catalogTable;
private SourceConfig sourceConfig;
@Override
public String getPluginName() {
- return "StarRocks";
+ return CommonConfig.CONNECTOR_IDENTITY;
}
- @Override
- public void prepare(Config pluginConfig) throws PrepareFailException {
- CheckResult checkResult =
- CheckConfigUtil.checkAllExists(
- pluginConfig,
- NODE_URLS.key(),
- DATABASE.key(),
- TABLE.key(),
- USERNAME.key(),
- PASSWORD.key());
-
- CheckResult schemaCheckResult =
- CheckConfigUtil.checkAllExists(pluginConfig,
TableSchemaOptions.SCHEMA.key());
- CheckResult mergedConfigCheck =
- CheckConfigUtil.mergeCheckResults(checkResult,
schemaCheckResult);
- if (!mergedConfigCheck.isSuccess()) {
- throw new StarRocksConnectorException(
- SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
- String.format(
- "PluginName: %s, PluginType: %s, Message: %s",
- getPluginName(), PluginType.SOURCE,
mergedConfigCheck.getMsg()));
- }
-
- this.typeInfo =
CatalogTableUtil.buildWithConfig(pluginConfig).getSeaTunnelRowType();
- this.sourceConfig = SourceConfig.loadConfig(pluginConfig);
+ public StarRocksSource(SourceConfig sourceConfig, CatalogTable
catalogTable) {
+ this.sourceConfig = sourceConfig;
+ this.catalogTable = catalogTable;
}
@Override
@@ -89,13 +54,14 @@ public class StarRocksSource
}
@Override
- public SeaTunnelDataType getProducedType() {
- return typeInfo;
+ public List<CatalogTable> getProducedCatalogTables() {
+ return Collections.singletonList(catalogTable);
}
@Override
public SourceReader createReader(SourceReader.Context readerContext) {
- return new StarRocksSourceReader(readerContext, typeInfo,
sourceConfig);
+ return new StarRocksSourceReader(
+ readerContext, catalogTable.getSeaTunnelRowType(),
sourceConfig);
}
@Override
@@ -104,11 +70,15 @@ public class StarRocksSource
StarRocksSourceState checkpointState)
throws Exception {
return new StartRocksSourceSplitEnumerator(
- enumeratorContext, sourceConfig, typeInfo, checkpointState);
+ enumeratorContext,
+ sourceConfig,
+ catalogTable.getSeaTunnelRowType(),
+ checkpointState);
}
@Override
public SourceSplitEnumerator
createEnumerator(SourceSplitEnumerator.Context enumeratorContext) {
- return new StartRocksSourceSplitEnumerator(enumeratorContext,
sourceConfig, typeInfo);
+ return new StartRocksSourceSplitEnumerator(
+ enumeratorContext, sourceConfig,
catalogTable.getSeaTunnelRowType());
}
}
diff --git
a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/source/StarRocksSourceFactory.java
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/source/StarRocksSourceFactory.java
index fffb5a435c..1f5e3c1690 100644
---
a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/source/StarRocksSourceFactory.java
+++
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/source/StarRocksSourceFactory.java
@@ -17,20 +17,29 @@
package org.apache.seatunnel.connectors.seatunnel.starrocks.source;
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.configuration.util.OptionRule;
import org.apache.seatunnel.api.source.SeaTunnelSource;
+import org.apache.seatunnel.api.source.SourceSplit;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.CatalogTableUtil;
import org.apache.seatunnel.api.table.catalog.schema.TableSchemaOptions;
+import org.apache.seatunnel.api.table.connector.TableSource;
import org.apache.seatunnel.api.table.factory.Factory;
import org.apache.seatunnel.api.table.factory.TableSourceFactory;
+import org.apache.seatunnel.api.table.factory.TableSourceFactoryContext;
+import org.apache.seatunnel.connectors.seatunnel.starrocks.config.CommonConfig;
import org.apache.seatunnel.connectors.seatunnel.starrocks.config.SourceConfig;
import com.google.auto.service.AutoService;
+import java.io.Serializable;
+
@AutoService(Factory.class)
public class StarRocksSourceFactory implements TableSourceFactory {
@Override
public String factoryIdentifier() {
- return "StarRocks";
+ return CommonConfig.CONNECTOR_IDENTITY;
}
@Override
@@ -59,4 +68,15 @@ public class StarRocksSourceFactory implements
TableSourceFactory {
public Class<? extends SeaTunnelSource> getSourceClass() {
return StarRocksSource.class;
}
+
+ @Override
+ public <T, SplitT extends SourceSplit, StateT extends Serializable>
+ TableSource<T, SplitT, StateT>
createSource(TableSourceFactoryContext context) {
+ ReadonlyConfig config = context.getOptions();
+ SourceConfig starRocksSourceConfig = new SourceConfig(config);
+ CatalogTable catalogTable = CatalogTableUtil.buildWithConfig(config);
+ return () ->
+ (SeaTunnelSource<T, SplitT, StateT>)
+ new StarRocksSource(starRocksSourceConfig,
catalogTable);
+ }
}
diff --git
a/seatunnel-connectors-v2/connector-starrocks/src/test/java/org/apache/seatunnel/connectors/seatunnel/starrocks/StarRocksFactoryTest.java
b/seatunnel-connectors-v2/connector-starrocks/src/test/java/org/apache/seatunnel/connectors/seatunnel/starrocks/StarRocksFactoryTest.java
index 9bc934aac9..0054f16fc9 100644
---
a/seatunnel-connectors-v2/connector-starrocks/src/test/java/org/apache/seatunnel/connectors/seatunnel/starrocks/StarRocksFactoryTest.java
+++
b/seatunnel-connectors-v2/connector-starrocks/src/test/java/org/apache/seatunnel/connectors/seatunnel/starrocks/StarRocksFactoryTest.java
@@ -18,6 +18,7 @@
package org.apache.seatunnel.connectors.seatunnel.starrocks;
import
org.apache.seatunnel.connectors.seatunnel.starrocks.sink.StarRocksSinkFactory;
+import
org.apache.seatunnel.connectors.seatunnel.starrocks.source.StarRocksSourceFactory;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
@@ -27,5 +28,6 @@ class StarRocksFactoryTest {
@Test
void optionRule() {
Assertions.assertNotNull((new StarRocksSinkFactory()).optionRule());
+ Assertions.assertNotNull((new StarRocksSourceFactory()).optionRule());
}
}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-starrocks-e2e/src/test/resources/starrocks-thrift-to-starrocks-streamload.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-starrocks-e2e/src/test/resources/starrocks-thrift-to-starrocks-streamload.conf
index 7b4c25af73..91f7b0402d 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-starrocks-e2e/src/test/resources/starrocks-thrift-to-starrocks-streamload.conf
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-starrocks-e2e/src/test/resources/starrocks-thrift-to-starrocks-streamload.conf
@@ -46,6 +46,7 @@ source {
DATE_COL = DATE
}
}
+ scan.params.scanner_thread_pool_thread_num = "3"
}
}