This is an automated email from the ASF dual-hosted git repository.

wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new aded56299c [Improve][Connector-V2] Support TableSourceFactory on 
StarRocks (#6498)
aded56299c is described below

commit aded56299c7f348fdf30047bbac0ba8a7f923a14
Author: xiaochen <[email protected]>
AuthorDate: Thu Mar 14 10:04:24 2024 +0800

    [Improve][Connector-V2] Support TableSourceFactory on StarRocks (#6498)
---
 docs/en/connector-v2/source/StarRocks.md           |  7 ++
 .../common/config/TypesafeConfigUtils.java         | 33 --------
 .../common/config/TypesafeConfigUtilsTest.java     | 17 -----
 .../starrocks/catalog/StarRocksCatalogFactory.java |  3 +-
 .../seatunnel/starrocks/config/CommonConfig.java   | 12 +++
 .../seatunnel/starrocks/config/SourceConfig.java   | 87 ++++++----------------
 .../starrocks/sink/StarRocksSinkFactory.java       |  3 +-
 .../starrocks/source/StarRocksSource.java          | 68 +++++------------
 .../starrocks/source/StarRocksSourceFactory.java   | 22 +++++-
 .../seatunnel/starrocks/StarRocksFactoryTest.java  |  2 +
 .../starrocks-thrift-to-starrocks-streamload.conf  |  1 +
 11 files changed, 88 insertions(+), 167 deletions(-)

diff --git a/docs/en/connector-v2/source/StarRocks.md 
b/docs/en/connector-v2/source/StarRocks.md
index ef00d4d7d5..df814105aa 100644
--- a/docs/en/connector-v2/source/StarRocks.md
+++ b/docs/en/connector-v2/source/StarRocks.md
@@ -35,6 +35,7 @@ delivers the query plan as a parameter to BE nodes, and then 
obtains data result
 | scan_batch_rows         | int    | no       | 1024              |
 | scan_mem_limit          | long   | no       | 2147483648        |
 | max_retries             | int    | no       | 3                 |
+| scan.params.*           | string | no       | -                 |
 
 ### node_urls [list]
 
@@ -136,6 +137,10 @@ The maximum memory space allowed for a single query in the 
BE node, in bytes. Th
 
 number of retry requests sent to StarRocks
 
+### scan.params. [string]
+
+The parameter of the scan data from be
+
 ## Example
 
 ```
@@ -164,6 +169,8 @@ source {
        DATETIME_COL = TIMESTAMP
        DATE_COL = DATE
     }
+    scan.params.scanner_thread_pool_thread_num = "3"
+    
   }
 }
 ```
diff --git 
a/seatunnel-common/src/main/java/org/apache/seatunnel/common/config/TypesafeConfigUtils.java
 
b/seatunnel-common/src/main/java/org/apache/seatunnel/common/config/TypesafeConfigUtils.java
index c931535e18..d80273ece0 100644
--- 
a/seatunnel-common/src/main/java/org/apache/seatunnel/common/config/TypesafeConfigUtils.java
+++ 
b/seatunnel-common/src/main/java/org/apache/seatunnel/common/config/TypesafeConfigUtils.java
@@ -18,13 +18,11 @@
 package org.apache.seatunnel.common.config;
 
 import org.apache.seatunnel.shade.com.typesafe.config.Config;
-import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory;
 import org.apache.seatunnel.shade.com.typesafe.config.ConfigValue;
 
 import lombok.NonNull;
 
 import java.util.HashMap;
-import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 
@@ -32,37 +30,6 @@ public final class TypesafeConfigUtils {
 
     private TypesafeConfigUtils() {}
 
-    /**
-     * Extract sub config with fixed prefix
-     *
-     * @param source config source
-     * @param prefix config prefix
-     * @param keepPrefix true if keep prefix
-     * @deprecated use org.apache.seatunnel.api.configuration.Option interface 
instead
-     */
-    @Deprecated
-    public static Config extractSubConfig(Config source, String prefix, 
boolean keepPrefix) {
-
-        // use LinkedHashMap to keep insertion order
-        Map<String, String> values = new LinkedHashMap<>();
-
-        for (Map.Entry<String, ConfigValue> entry : source.entrySet()) {
-            final String key = entry.getKey();
-            final String value = String.valueOf(entry.getValue().unwrapped());
-
-            if (key.startsWith(prefix)) {
-
-                if (keepPrefix) {
-                    values.put(key, value);
-                } else {
-                    values.put(key.substring(prefix.length()), value);
-                }
-            }
-        }
-
-        return ConfigFactory.parseMap(values);
-    }
-
     /**
      * Check if config with specific prefix exists
      *
diff --git 
a/seatunnel-common/src/test/java/org/apache/seatunnel/common/config/TypesafeConfigUtilsTest.java
 
b/seatunnel-common/src/test/java/org/apache/seatunnel/common/config/TypesafeConfigUtilsTest.java
index 3b297a731a..d9f5d50599 100644
--- 
a/seatunnel-common/src/test/java/org/apache/seatunnel/common/config/TypesafeConfigUtilsTest.java
+++ 
b/seatunnel-common/src/test/java/org/apache/seatunnel/common/config/TypesafeConfigUtilsTest.java
@@ -26,27 +26,10 @@ import org.junit.jupiter.api.Test;
 import java.util.HashMap;
 import java.util.Map;
 
-import static 
org.apache.seatunnel.common.config.TypesafeConfigUtils.extractSubConfig;
 import static 
org.apache.seatunnel.common.config.TypesafeConfigUtils.hasSubConfig;
 
 public class TypesafeConfigUtilsTest {
 
-    @Test
-    public void testExtractSubConfig() {
-        Config config = getConfig();
-        Config subConfig = extractSubConfig(config, "test.", true);
-        Map<String, String> configMap = new HashMap<>();
-        configMap.put("test.t0", "v0");
-        configMap.put("test.t1", "v1");
-        Assertions.assertEquals(ConfigFactory.parseMap(configMap), subConfig);
-
-        subConfig = extractSubConfig(config, "test.", false);
-        configMap = new HashMap<>();
-        configMap.put("t0", "v0");
-        configMap.put("t1", "v1");
-        Assertions.assertEquals(ConfigFactory.parseMap(configMap), subConfig);
-    }
-
     @Test
     public void testHasSubConfig() {
         Config config = getConfig();
diff --git 
a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/catalog/StarRocksCatalogFactory.java
 
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/catalog/StarRocksCatalogFactory.java
index 94a93b3f56..124e025719 100644
--- 
a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/catalog/StarRocksCatalogFactory.java
+++ 
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/catalog/StarRocksCatalogFactory.java
@@ -22,6 +22,7 @@ import org.apache.seatunnel.api.configuration.util.OptionRule;
 import org.apache.seatunnel.api.table.catalog.Catalog;
 import org.apache.seatunnel.api.table.factory.CatalogFactory;
 import org.apache.seatunnel.api.table.factory.Factory;
+import org.apache.seatunnel.connectors.seatunnel.starrocks.config.CommonConfig;
 import 
org.apache.seatunnel.connectors.seatunnel.starrocks.config.StarRocksOptions;
 import 
org.apache.seatunnel.connectors.seatunnel.starrocks.config.StarRocksSinkOptions;
 
@@ -29,7 +30,7 @@ import com.google.auto.service.AutoService;
 
 @AutoService(Factory.class)
 public class StarRocksCatalogFactory implements CatalogFactory {
-    public static final String IDENTIFIER = "StarRocks";
+    public static final String IDENTIFIER = CommonConfig.CONNECTOR_IDENTITY;
 
     @Override
     public Catalog createCatalog(String catalogName, ReadonlyConfig options) {
diff --git 
a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/config/CommonConfig.java
 
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/config/CommonConfig.java
index ffb9b11594..c8a4775fcf 100644
--- 
a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/config/CommonConfig.java
+++ 
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/config/CommonConfig.java
@@ -19,6 +19,7 @@ package 
org.apache.seatunnel.connectors.seatunnel.starrocks.config;
 
 import org.apache.seatunnel.api.configuration.Option;
 import org.apache.seatunnel.api.configuration.Options;
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
 
 import lombok.AllArgsConstructor;
 import lombok.Getter;
@@ -31,6 +32,9 @@ import java.util.List;
 @ToString
 @AllArgsConstructor
 public class CommonConfig implements Serializable {
+
+    public static final String CONNECTOR_IDENTITY = "StarRocks";
+
     public static final Option<List<String>> NODE_URLS =
             Options.key("nodeUrls")
                     .listType()
@@ -67,4 +71,12 @@ public class CommonConfig implements Serializable {
     private String password;
     private String database;
     private String table;
+
+    public CommonConfig(ReadonlyConfig config) {
+        this.nodeUrls = config.get(NODE_URLS);
+        this.username = config.get(USERNAME);
+        this.password = config.get(PASSWORD);
+        this.database = config.get(DATABASE);
+        this.table = config.get(TABLE);
+    }
 }
diff --git 
a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/config/SourceConfig.java
 
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/config/SourceConfig.java
index 10d0358a8f..d069863843 100644
--- 
a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/config/SourceConfig.java
+++ 
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/config/SourceConfig.java
@@ -17,18 +17,14 @@
 
 package org.apache.seatunnel.connectors.seatunnel.starrocks.config;
 
-import org.apache.seatunnel.shade.com.typesafe.config.Config;
-
 import org.apache.seatunnel.api.configuration.Option;
 import org.apache.seatunnel.api.configuration.Options;
-import org.apache.seatunnel.common.config.TypesafeConfigUtils;
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
 
 import lombok.Getter;
-import lombok.NonNull;
 import lombok.Setter;
 
 import java.util.HashMap;
-import java.util.List;
 import java.util.Map;
 
 @Setter
@@ -37,13 +33,26 @@ public class SourceConfig extends CommonConfig {
 
     private static final long DEFAULT_SCAN_MEM_LIMIT = 1024 * 1024 * 1024L;
 
-    public SourceConfig(
-            @NonNull List<String> nodeUrls,
-            @NonNull String username,
-            @NonNull String password,
-            @NonNull String database,
-            @NonNull String table) {
-        super(nodeUrls, username, password, database, table);
+    public SourceConfig(ReadonlyConfig config) {
+        super(config);
+        this.maxRetries = config.get(MAX_RETRIES);
+        this.requestTabletSize = config.get(QUERY_TABLET_SIZE);
+        this.scanFilter = config.get(SCAN_FILTER);
+        this.connectTimeoutMs = config.get(SCAN_CONNECT_TIMEOUT);
+        this.batchRows = config.get(SCAN_BATCH_ROWS);
+        this.keepAliveMin = config.get(SCAN_KEEP_ALIVE_MIN);
+        this.queryTimeoutSec = config.get(SCAN_QUERY_TIMEOUT_SEC);
+        this.memLimit = config.get(SCAN_MEM_LIMIT);
+
+        String prefix = STARROCKS_SCAN_CONFIG_PREFIX.key();
+        config.toMap()
+                .forEach(
+                        (key, value) -> {
+                            if (key.startsWith(prefix)) {
+                                this.sourceOptionProps.put(
+                                        
key.substring(prefix.length()).toLowerCase(), value);
+                            }
+                        });
     }
 
     public static final Option<Integer> MAX_RETRIES =
@@ -105,57 +114,5 @@ public class SourceConfig extends CommonConfig {
     private int keepAliveMin = SCAN_KEEP_ALIVE_MIN.defaultValue();
     private int batchRows = SCAN_BATCH_ROWS.defaultValue();
     private int connectTimeoutMs = SCAN_CONNECT_TIMEOUT.defaultValue();
-    private final Map<String, String> sourceOptionProps = new HashMap<>();
-
-    public static SourceConfig loadConfig(Config pluginConfig) {
-        SourceConfig sourceConfig =
-                new SourceConfig(
-                        pluginConfig.getStringList(NODE_URLS.key()),
-                        pluginConfig.getString(USERNAME.key()),
-                        pluginConfig.getString(PASSWORD.key()),
-                        pluginConfig.getString(DATABASE.key()),
-                        pluginConfig.getString(TABLE.key()));
-
-        if (pluginConfig.hasPath(MAX_RETRIES.key())) {
-            sourceConfig.setMaxRetries(pluginConfig.getInt(MAX_RETRIES.key()));
-        }
-        if (pluginConfig.hasPath(QUERY_TABLET_SIZE.key())) {
-            
sourceConfig.setRequestTabletSize(pluginConfig.getInt(QUERY_TABLET_SIZE.key()));
-        }
-        if (pluginConfig.hasPath(SCAN_FILTER.key())) {
-            
sourceConfig.setScanFilter(pluginConfig.getString(SCAN_FILTER.key()));
-        }
-        if (pluginConfig.hasPath(SCAN_CONNECT_TIMEOUT.key())) {
-            
sourceConfig.setConnectTimeoutMs(pluginConfig.getInt(SCAN_CONNECT_TIMEOUT.key()));
-        }
-        if (pluginConfig.hasPath(SCAN_BATCH_ROWS.key())) {
-            
sourceConfig.setBatchRows(pluginConfig.getInt(SCAN_BATCH_ROWS.key()));
-        }
-        if (pluginConfig.hasPath(SCAN_KEEP_ALIVE_MIN.key())) {
-            
sourceConfig.setKeepAliveMin(pluginConfig.getInt(SCAN_KEEP_ALIVE_MIN.key()));
-        }
-        if (pluginConfig.hasPath(SCAN_QUERY_TIMEOUT_SEC.key())) {
-            
sourceConfig.setQueryTimeoutSec(pluginConfig.getInt(SCAN_QUERY_TIMEOUT_SEC.key()));
-        }
-        if (pluginConfig.hasPath(SCAN_MEM_LIMIT.key())) {
-            
sourceConfig.setMemLimit(pluginConfig.getLong(SCAN_MEM_LIMIT.key()));
-        }
-        parseSourceOptionProperties(pluginConfig, sourceConfig);
-        return sourceConfig;
-    }
-
-    private static void parseSourceOptionProperties(
-            Config pluginConfig, SourceConfig sourceConfig) {
-        Config sourceOptionConfig =
-                TypesafeConfigUtils.extractSubConfig(
-                        pluginConfig, STARROCKS_SCAN_CONFIG_PREFIX.key(), 
false);
-        sourceOptionConfig
-                .entrySet()
-                .forEach(
-                        entry -> {
-                            final String configKey = 
entry.getKey().toLowerCase();
-                            sourceConfig.sourceOptionProps.put(
-                                    configKey, (String) 
entry.getValue().unwrapped());
-                        });
-    }
+    private Map<String, String> sourceOptionProps = new HashMap<>();
 }
diff --git 
a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSinkFactory.java
 
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSinkFactory.java
index 081645270f..08fc690698 100644
--- 
a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSinkFactory.java
+++ 
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSinkFactory.java
@@ -25,6 +25,7 @@ import org.apache.seatunnel.api.table.connector.TableSink;
 import org.apache.seatunnel.api.table.factory.Factory;
 import org.apache.seatunnel.api.table.factory.TableSinkFactory;
 import org.apache.seatunnel.api.table.factory.TableSinkFactoryContext;
+import org.apache.seatunnel.connectors.seatunnel.starrocks.config.CommonConfig;
 import org.apache.seatunnel.connectors.seatunnel.starrocks.config.SinkConfig;
 import 
org.apache.seatunnel.connectors.seatunnel.starrocks.config.StarRocksOptions;
 import 
org.apache.seatunnel.connectors.seatunnel.starrocks.config.StarRocksSinkOptions;
@@ -42,7 +43,7 @@ import static 
org.apache.seatunnel.connectors.seatunnel.starrocks.config.StarRoc
 public class StarRocksSinkFactory implements TableSinkFactory {
     @Override
     public String factoryIdentifier() {
-        return "StarRocks";
+        return CommonConfig.CONNECTOR_IDENTITY;
     }
 
     @Override
diff --git 
a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/source/StarRocksSource.java
 
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/source/StarRocksSource.java
index 738400436a..9bde1b22a3 100644
--- 
a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/source/StarRocksSource.java
+++ 
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/source/StarRocksSource.java
@@ -17,70 +17,35 @@
 
 package org.apache.seatunnel.connectors.seatunnel.starrocks.source;
 
-import org.apache.seatunnel.shade.com.typesafe.config.Config;
-
-import org.apache.seatunnel.api.common.PrepareFailException;
-import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
 import org.apache.seatunnel.api.source.Boundedness;
 import org.apache.seatunnel.api.source.SeaTunnelSource;
 import org.apache.seatunnel.api.source.SourceReader;
 import org.apache.seatunnel.api.source.SourceSplitEnumerator;
-import org.apache.seatunnel.api.table.catalog.CatalogTableUtil;
-import org.apache.seatunnel.api.table.catalog.schema.TableSchemaOptions;
-import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
-import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
-import org.apache.seatunnel.common.config.CheckConfigUtil;
-import org.apache.seatunnel.common.config.CheckResult;
-import org.apache.seatunnel.common.constants.PluginType;
+import org.apache.seatunnel.connectors.seatunnel.starrocks.config.CommonConfig;
 import org.apache.seatunnel.connectors.seatunnel.starrocks.config.SourceConfig;
-import 
org.apache.seatunnel.connectors.seatunnel.starrocks.exception.StarRocksConnectorException;
 
 import com.google.auto.service.AutoService;
 
-import static 
org.apache.seatunnel.connectors.seatunnel.starrocks.config.CommonConfig.DATABASE;
-import static 
org.apache.seatunnel.connectors.seatunnel.starrocks.config.CommonConfig.NODE_URLS;
-import static 
org.apache.seatunnel.connectors.seatunnel.starrocks.config.CommonConfig.PASSWORD;
-import static 
org.apache.seatunnel.connectors.seatunnel.starrocks.config.CommonConfig.TABLE;
-import static 
org.apache.seatunnel.connectors.seatunnel.starrocks.config.CommonConfig.USERNAME;
+import java.util.Collections;
+import java.util.List;
 
 @AutoService(SeaTunnelSource.class)
 public class StarRocksSource
         implements SeaTunnelSource<SeaTunnelRow, StarRocksSourceSplit, 
StarRocksSourceState> {
 
-    private SeaTunnelRowType typeInfo;
+    private CatalogTable catalogTable;
     private SourceConfig sourceConfig;
 
     @Override
     public String getPluginName() {
-        return "StarRocks";
+        return CommonConfig.CONNECTOR_IDENTITY;
     }
 
-    @Override
-    public void prepare(Config pluginConfig) throws PrepareFailException {
-        CheckResult checkResult =
-                CheckConfigUtil.checkAllExists(
-                        pluginConfig,
-                        NODE_URLS.key(),
-                        DATABASE.key(),
-                        TABLE.key(),
-                        USERNAME.key(),
-                        PASSWORD.key());
-
-        CheckResult schemaCheckResult =
-                CheckConfigUtil.checkAllExists(pluginConfig, 
TableSchemaOptions.SCHEMA.key());
-        CheckResult mergedConfigCheck =
-                CheckConfigUtil.mergeCheckResults(checkResult, 
schemaCheckResult);
-        if (!mergedConfigCheck.isSuccess()) {
-            throw new StarRocksConnectorException(
-                    SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
-                    String.format(
-                            "PluginName: %s, PluginType: %s, Message: %s",
-                            getPluginName(), PluginType.SOURCE, 
mergedConfigCheck.getMsg()));
-        }
-
-        this.typeInfo = 
CatalogTableUtil.buildWithConfig(pluginConfig).getSeaTunnelRowType();
-        this.sourceConfig = SourceConfig.loadConfig(pluginConfig);
+    public StarRocksSource(SourceConfig sourceConfig, CatalogTable 
catalogTable) {
+        this.sourceConfig = sourceConfig;
+        this.catalogTable = catalogTable;
     }
 
     @Override
@@ -89,13 +54,14 @@ public class StarRocksSource
     }
 
     @Override
-    public SeaTunnelDataType getProducedType() {
-        return typeInfo;
+    public List<CatalogTable> getProducedCatalogTables() {
+        return Collections.singletonList(catalogTable);
     }
 
     @Override
     public SourceReader createReader(SourceReader.Context readerContext) {
-        return new StarRocksSourceReader(readerContext, typeInfo, 
sourceConfig);
+        return new StarRocksSourceReader(
+                readerContext, catalogTable.getSeaTunnelRowType(), 
sourceConfig);
     }
 
     @Override
@@ -104,11 +70,15 @@ public class StarRocksSource
             StarRocksSourceState checkpointState)
             throws Exception {
         return new StartRocksSourceSplitEnumerator(
-                enumeratorContext, sourceConfig, typeInfo, checkpointState);
+                enumeratorContext,
+                sourceConfig,
+                catalogTable.getSeaTunnelRowType(),
+                checkpointState);
     }
 
     @Override
     public SourceSplitEnumerator 
createEnumerator(SourceSplitEnumerator.Context enumeratorContext) {
-        return new StartRocksSourceSplitEnumerator(enumeratorContext, 
sourceConfig, typeInfo);
+        return new StartRocksSourceSplitEnumerator(
+                enumeratorContext, sourceConfig, 
catalogTable.getSeaTunnelRowType());
     }
 }
diff --git 
a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/source/StarRocksSourceFactory.java
 
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/source/StarRocksSourceFactory.java
index fffb5a435c..1f5e3c1690 100644
--- 
a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/source/StarRocksSourceFactory.java
+++ 
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/source/StarRocksSourceFactory.java
@@ -17,20 +17,29 @@
 
 package org.apache.seatunnel.connectors.seatunnel.starrocks.source;
 
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
 import org.apache.seatunnel.api.configuration.util.OptionRule;
 import org.apache.seatunnel.api.source.SeaTunnelSource;
+import org.apache.seatunnel.api.source.SourceSplit;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.CatalogTableUtil;
 import org.apache.seatunnel.api.table.catalog.schema.TableSchemaOptions;
+import org.apache.seatunnel.api.table.connector.TableSource;
 import org.apache.seatunnel.api.table.factory.Factory;
 import org.apache.seatunnel.api.table.factory.TableSourceFactory;
+import org.apache.seatunnel.api.table.factory.TableSourceFactoryContext;
+import org.apache.seatunnel.connectors.seatunnel.starrocks.config.CommonConfig;
 import org.apache.seatunnel.connectors.seatunnel.starrocks.config.SourceConfig;
 
 import com.google.auto.service.AutoService;
 
+import java.io.Serializable;
+
 @AutoService(Factory.class)
 public class StarRocksSourceFactory implements TableSourceFactory {
     @Override
     public String factoryIdentifier() {
-        return "StarRocks";
+        return CommonConfig.CONNECTOR_IDENTITY;
     }
 
     @Override
@@ -59,4 +68,15 @@ public class StarRocksSourceFactory implements 
TableSourceFactory {
     public Class<? extends SeaTunnelSource> getSourceClass() {
         return StarRocksSource.class;
     }
+
+    @Override
+    public <T, SplitT extends SourceSplit, StateT extends Serializable>
+            TableSource<T, SplitT, StateT> 
createSource(TableSourceFactoryContext context) {
+        ReadonlyConfig config = context.getOptions();
+        SourceConfig starRocksSourceConfig = new SourceConfig(config);
+        CatalogTable catalogTable = CatalogTableUtil.buildWithConfig(config);
+        return () ->
+                (SeaTunnelSource<T, SplitT, StateT>)
+                        new StarRocksSource(starRocksSourceConfig, 
catalogTable);
+    }
 }
diff --git 
a/seatunnel-connectors-v2/connector-starrocks/src/test/java/org/apache/seatunnel/connectors/seatunnel/starrocks/StarRocksFactoryTest.java
 
b/seatunnel-connectors-v2/connector-starrocks/src/test/java/org/apache/seatunnel/connectors/seatunnel/starrocks/StarRocksFactoryTest.java
index 9bc934aac9..0054f16fc9 100644
--- 
a/seatunnel-connectors-v2/connector-starrocks/src/test/java/org/apache/seatunnel/connectors/seatunnel/starrocks/StarRocksFactoryTest.java
+++ 
b/seatunnel-connectors-v2/connector-starrocks/src/test/java/org/apache/seatunnel/connectors/seatunnel/starrocks/StarRocksFactoryTest.java
@@ -18,6 +18,7 @@
 package org.apache.seatunnel.connectors.seatunnel.starrocks;
 
 import 
org.apache.seatunnel.connectors.seatunnel.starrocks.sink.StarRocksSinkFactory;
+import 
org.apache.seatunnel.connectors.seatunnel.starrocks.source.StarRocksSourceFactory;
 
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
@@ -27,5 +28,6 @@ class StarRocksFactoryTest {
     @Test
     void optionRule() {
         Assertions.assertNotNull((new StarRocksSinkFactory()).optionRule());
+        Assertions.assertNotNull((new StarRocksSourceFactory()).optionRule());
     }
 }
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-starrocks-e2e/src/test/resources/starrocks-thrift-to-starrocks-streamload.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-starrocks-e2e/src/test/resources/starrocks-thrift-to-starrocks-streamload.conf
index 7b4c25af73..91f7b0402d 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-starrocks-e2e/src/test/resources/starrocks-thrift-to-starrocks-streamload.conf
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-starrocks-e2e/src/test/resources/starrocks-thrift-to-starrocks-streamload.conf
@@ -46,6 +46,7 @@ source {
         DATE_COL = DATE
       }
     }
+    scan.params.scanner_thread_pool_thread_num = "3"
   }
 }
 

Reply via email to