This is an automated email from the ASF dual-hosted git repository.

wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new b69fceceee [Improve] update clickhouse connector, use factory to 
create source/sink (#7946)
b69fceceee is described below

commit b69fceceeeed9fdd5772646047fc6675232c85be
Author: Jarvis <[email protected]>
AuthorDate: Wed Oct 30 17:34:12 2024 +0800

    [Improve] update clickhouse connector, use factory to create source/sink 
(#7946)
---
 .../clickhouse/sink/ClickhouseSinkFactory.java     |  61 -------
 .../sink/client/ClickhouseBatchStatement.java      |   2 +-
 .../clickhouse/sink/client/ClickhouseSink.java     | 193 +--------------------
 .../sink/client/ClickhouseSinkFactory.java         | 162 +++++++++++++++++
 .../sink/client/ClickhouseSinkWriter.java          |   3 +-
 .../clickhouse/sink/client/ShardRouter.java        |   3 +-
 .../clickhouse/sink/file/ClickhouseFileSink.java   |   2 +-
 .../sink/file/ClickhouseFileSinkAggCommitter.java  |   2 +-
 .../sink/file/ClickhouseFileSinkWriter.java        |   2 +-
 .../clickhouse/sink/file/ClickhouseTable.java      |   2 +-
 .../clickhouse/source/ClickhouseSource.java        | 128 ++------------
 .../clickhouse/source/ClickhouseSourceFactory.java |  79 +++++++++
 .../{sink/client => util}/ClickhouseProxy.java     |   3 +-
 .../seatunnel/clickhouse/util/ClickhouseUtil.java  |  13 ++
 .../{sink => util}/DistributedEngine.java          |   2 +-
 .../clickhouse/{tool => util}/IntHolder.java       |   2 +-
 .../clickhouse/ClickhouseFactoryTest.java          |   2 +-
 17 files changed, 289 insertions(+), 372 deletions(-)

diff --git 
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/ClickhouseSinkFactory.java
 
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/ClickhouseSinkFactory.java
deleted file mode 100644
index 17ffdd2d40..0000000000
--- 
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/ClickhouseSinkFactory.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.connectors.seatunnel.clickhouse.sink;
-
-import org.apache.seatunnel.api.configuration.util.OptionRule;
-import org.apache.seatunnel.api.table.factory.Factory;
-import org.apache.seatunnel.api.table.factory.TableSinkFactory;
-
-import com.google.auto.service.AutoService;
-
-import static 
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.ALLOW_EXPERIMENTAL_LIGHTWEIGHT_DELETE;
-import static 
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.BULK_SIZE;
-import static 
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.CLICKHOUSE_CONFIG;
-import static 
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.DATABASE;
-import static 
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.HOST;
-import static 
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.PASSWORD;
-import static 
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.PRIMARY_KEY;
-import static 
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.SHARDING_KEY;
-import static 
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.SPLIT_MODE;
-import static 
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.SUPPORT_UPSERT;
-import static 
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.TABLE;
-import static 
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.USERNAME;
-
-@AutoService(Factory.class)
-public class ClickhouseSinkFactory implements TableSinkFactory {
-    @Override
-    public String factoryIdentifier() {
-        return "Clickhouse";
-    }
-
-    @Override
-    public OptionRule optionRule() {
-        return OptionRule.builder()
-                .required(HOST, DATABASE, TABLE)
-                .optional(
-                        CLICKHOUSE_CONFIG,
-                        BULK_SIZE,
-                        SPLIT_MODE,
-                        SHARDING_KEY,
-                        PRIMARY_KEY,
-                        SUPPORT_UPSERT,
-                        ALLOW_EXPERIMENTAL_LIGHTWEIGHT_DELETE)
-                .bundled(USERNAME, PASSWORD)
-                .build();
-    }
-}
diff --git 
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ClickhouseBatchStatement.java
 
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ClickhouseBatchStatement.java
index 52397229db..04ee5755e5 100644
--- 
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ClickhouseBatchStatement.java
+++ 
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ClickhouseBatchStatement.java
@@ -18,7 +18,7 @@
 package org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.client;
 
 import 
org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.client.executor.JdbcBatchStatementExecutor;
-import org.apache.seatunnel.connectors.seatunnel.clickhouse.tool.IntHolder;
+import org.apache.seatunnel.connectors.seatunnel.clickhouse.util.IntHolder;
 
 import com.clickhouse.jdbc.internal.ClickHouseConnectionImpl;
 
diff --git 
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ClickhouseSink.java
 
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ClickhouseSink.java
index d2de6fd182..22f18694e2 100644
--- 
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ClickhouseSink.java
+++ 
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ClickhouseSink.java
@@ -17,210 +17,35 @@
 
 package org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.client;
 
-import org.apache.seatunnel.shade.com.typesafe.config.Config;
-import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory;
-
-import org.apache.seatunnel.api.common.PrepareFailException;
-import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
 import org.apache.seatunnel.api.serialization.DefaultSerializer;
 import org.apache.seatunnel.api.serialization.Serializer;
 import org.apache.seatunnel.api.sink.SeaTunnelSink;
 import org.apache.seatunnel.api.sink.SinkWriter;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
-import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
-import org.apache.seatunnel.common.config.CheckConfigUtil;
-import org.apache.seatunnel.common.config.CheckResult;
-import org.apache.seatunnel.common.constants.PluginType;
-import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated;
 import 
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ReaderOption;
-import 
org.apache.seatunnel.connectors.seatunnel.clickhouse.exception.ClickhouseConnectorException;
-import org.apache.seatunnel.connectors.seatunnel.clickhouse.shard.Shard;
-import 
org.apache.seatunnel.connectors.seatunnel.clickhouse.shard.ShardMetadata;
-import 
org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.file.ClickhouseTable;
 import 
org.apache.seatunnel.connectors.seatunnel.clickhouse.state.CKAggCommitInfo;
 import org.apache.seatunnel.connectors.seatunnel.clickhouse.state.CKCommitInfo;
 import 
org.apache.seatunnel.connectors.seatunnel.clickhouse.state.ClickhouseSinkState;
-import 
org.apache.seatunnel.connectors.seatunnel.clickhouse.util.ClickhouseUtil;
-
-import com.clickhouse.client.ClickHouseNode;
-import com.google.auto.service.AutoService;
-import com.google.common.collect.ImmutableMap;
 
 import java.io.IOException;
 import java.util.List;
-import java.util.Map;
-import java.util.Objects;
 import java.util.Optional;
-import java.util.Properties;
-
-import static 
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.ALLOW_EXPERIMENTAL_LIGHTWEIGHT_DELETE;
-import static 
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.BULK_SIZE;
-import static 
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.CLICKHOUSE_CONFIG;
-import static 
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.DATABASE;
-import static 
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.HOST;
-import static 
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.PASSWORD;
-import static 
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.PRIMARY_KEY;
-import static 
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.SERVER_TIME_ZONE;
-import static 
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.SHARDING_KEY;
-import static 
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.SPLIT_MODE;
-import static 
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.SUPPORT_UPSERT;
-import static 
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.TABLE;
-import static 
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.USERNAME;
 
-@AutoService(SeaTunnelSink.class)
 public class ClickhouseSink
         implements SeaTunnelSink<SeaTunnelRow, ClickhouseSinkState, 
CKCommitInfo, CKAggCommitInfo> {
 
     private ReaderOption option;
+    private CatalogTable catalogTable;
 
-    @Override
-    public String getPluginName() {
-        return "Clickhouse";
+    public ClickhouseSink(ReaderOption option, CatalogTable catalogTable) {
+        this.option = option;
+        this.catalogTable = catalogTable;
     }
 
     @Override
-    public void prepare(Config config) throws PrepareFailException {
-        CheckResult result =
-                CheckConfigUtil.checkAllExists(config, HOST.key(), 
DATABASE.key(), TABLE.key());
-
-        boolean isCredential = config.hasPath(USERNAME.key()) || 
config.hasPath(PASSWORD.key());
-
-        if (isCredential) {
-            result = CheckConfigUtil.checkAllExists(config, USERNAME.key(), 
PASSWORD.key());
-        }
-
-        if (!result.isSuccess()) {
-            throw new ClickhouseConnectorException(
-                    SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
-                    String.format(
-                            "PluginName: %s, PluginType: %s, Message: %s",
-                            getPluginName(), PluginType.SINK, 
result.getMsg()));
-        }
-        Map<String, Object> defaultConfig =
-                ImmutableMap.<String, Object>builder()
-                        .put(BULK_SIZE.key(), BULK_SIZE.defaultValue())
-                        .put(SPLIT_MODE.key(), SPLIT_MODE.defaultValue())
-                        .put(SERVER_TIME_ZONE.key(), 
SERVER_TIME_ZONE.defaultValue())
-                        .build();
-
-        config = config.withFallback(ConfigFactory.parseMap(defaultConfig));
-
-        List<ClickHouseNode> nodes;
-        if (!isCredential) {
-            nodes =
-                    ClickhouseUtil.createNodes(
-                            config.getString(HOST.key()),
-                            config.getString(DATABASE.key()),
-                            config.getString(SERVER_TIME_ZONE.key()),
-                            null,
-                            null,
-                            null);
-        } else {
-            nodes =
-                    ClickhouseUtil.createNodes(
-                            config.getString(HOST.key()),
-                            config.getString(DATABASE.key()),
-                            config.getString(SERVER_TIME_ZONE.key()),
-                            config.getString(USERNAME.key()),
-                            config.getString(PASSWORD.key()),
-                            null);
-        }
-
-        Properties clickhouseProperties = new Properties();
-        if (CheckConfigUtil.isValidParam(config, CLICKHOUSE_CONFIG.key())) {
-            config.getObject(CLICKHOUSE_CONFIG.key())
-                    .forEach(
-                            (key, value) ->
-                                    clickhouseProperties.put(
-                                            key, 
String.valueOf(value.unwrapped())));
-        }
-
-        if (isCredential) {
-            clickhouseProperties.put("user", config.getString(USERNAME.key()));
-            clickhouseProperties.put("password", 
config.getString(PASSWORD.key()));
-        }
-
-        ClickhouseProxy proxy = new ClickhouseProxy(nodes.get(0));
-        Map<String, String> tableSchema =
-                proxy.getClickhouseTableSchema(config.getString(TABLE.key()));
-        String shardKey = null;
-        String shardKeyType = null;
-        ClickhouseTable table =
-                proxy.getClickhouseTable(
-                        config.getString(DATABASE.key()), 
config.getString(TABLE.key()));
-        if (config.getBoolean(SPLIT_MODE.key())) {
-            if (!"Distributed".equals(table.getEngine())) {
-                throw new ClickhouseConnectorException(
-                        CommonErrorCodeDeprecated.ILLEGAL_ARGUMENT,
-                        "split mode only support table which engine is "
-                                + "'Distributed' engine at now");
-            }
-            if (config.hasPath(SHARDING_KEY.key())) {
-                shardKey = config.getString(SHARDING_KEY.key());
-                shardKeyType = tableSchema.get(shardKey);
-            }
-        }
-        ShardMetadata metadata;
-
-        if (isCredential) {
-            metadata =
-                    new ShardMetadata(
-                            shardKey,
-                            shardKeyType,
-                            table.getSortingKey(),
-                            config.getString(DATABASE.key()),
-                            config.getString(TABLE.key()),
-                            table.getEngine(),
-                            config.getBoolean(SPLIT_MODE.key()),
-                            new Shard(1, 1, nodes.get(0)),
-                            config.getString(USERNAME.key()),
-                            config.getString(PASSWORD.key()));
-        } else {
-            metadata =
-                    new ShardMetadata(
-                            shardKey,
-                            shardKeyType,
-                            table.getSortingKey(),
-                            config.getString(DATABASE.key()),
-                            config.getString(TABLE.key()),
-                            table.getEngine(),
-                            config.getBoolean(SPLIT_MODE.key()),
-                            new Shard(1, 1, nodes.get(0)));
-        }
-
-        proxy.close();
-
-        String[] primaryKeys = null;
-        if (config.hasPath(PRIMARY_KEY.key())) {
-            String primaryKey = config.getString(PRIMARY_KEY.key());
-            if (shardKey != null && !Objects.equals(primaryKey, shardKey)) {
-                throw new ClickhouseConnectorException(
-                        CommonErrorCodeDeprecated.ILLEGAL_ARGUMENT,
-                        "sharding_key and primary_key must be consistent to 
ensure correct processing of cdc events");
-            }
-            primaryKeys = new String[] {primaryKey};
-        }
-        boolean supportUpsert = SUPPORT_UPSERT.defaultValue();
-        if (config.hasPath(SUPPORT_UPSERT.key())) {
-            supportUpsert = config.getBoolean(SUPPORT_UPSERT.key());
-        }
-        boolean allowExperimentalLightweightDelete =
-                ALLOW_EXPERIMENTAL_LIGHTWEIGHT_DELETE.defaultValue();
-        if (config.hasPath(ALLOW_EXPERIMENTAL_LIGHTWEIGHT_DELETE.key())) {
-            allowExperimentalLightweightDelete =
-                    
config.getBoolean(ALLOW_EXPERIMENTAL_LIGHTWEIGHT_DELETE.key());
-        }
-        this.option =
-                ReaderOption.builder()
-                        .shardMetadata(metadata)
-                        .properties(clickhouseProperties)
-                        .tableEngine(table.getEngine())
-                        .tableSchema(tableSchema)
-                        .bulkSize(config.getInt(BULK_SIZE.key()))
-                        .primaryKeys(primaryKeys)
-                        .supportUpsert(supportUpsert)
-                        
.allowExperimentalLightweightDelete(allowExperimentalLightweightDelete)
-                        .build();
+    public String getPluginName() {
+        return "Clickhouse";
     }
 
     @Override
@@ -241,7 +66,7 @@ public class ClickhouseSink
     }
 
     @Override
-    public void setTypeInfo(SeaTunnelRowType seaTunnelRowType) {
-        this.option.setSeaTunnelRowType(seaTunnelRowType);
+    public Optional<CatalogTable> getWriteCatalogTable() {
+        return Optional.of(catalogTable);
     }
 }
diff --git 
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ClickhouseSinkFactory.java
 
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ClickhouseSinkFactory.java
new file mode 100644
index 0000000000..720efacc32
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ClickhouseSinkFactory.java
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.client;
+
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.connector.TableSink;
+import org.apache.seatunnel.api.table.factory.Factory;
+import org.apache.seatunnel.api.table.factory.TableSinkFactory;
+import org.apache.seatunnel.api.table.factory.TableSinkFactoryContext;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated;
+import 
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ReaderOption;
+import 
org.apache.seatunnel.connectors.seatunnel.clickhouse.exception.ClickhouseConnectorException;
+import org.apache.seatunnel.connectors.seatunnel.clickhouse.shard.Shard;
+import 
org.apache.seatunnel.connectors.seatunnel.clickhouse.shard.ShardMetadata;
+import 
org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.file.ClickhouseTable;
+import 
org.apache.seatunnel.connectors.seatunnel.clickhouse.state.CKAggCommitInfo;
+import org.apache.seatunnel.connectors.seatunnel.clickhouse.state.CKCommitInfo;
+import 
org.apache.seatunnel.connectors.seatunnel.clickhouse.state.ClickhouseSinkState;
+import 
org.apache.seatunnel.connectors.seatunnel.clickhouse.util.ClickhouseProxy;
+import 
org.apache.seatunnel.connectors.seatunnel.clickhouse.util.ClickhouseUtil;
+
+import com.clickhouse.client.ClickHouseNode;
+import com.google.auto.service.AutoService;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Properties;
+
+import static 
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.ALLOW_EXPERIMENTAL_LIGHTWEIGHT_DELETE;
+import static 
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.BULK_SIZE;
+import static 
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.CLICKHOUSE_CONFIG;
+import static 
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.DATABASE;
+import static 
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.HOST;
+import static 
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.PASSWORD;
+import static 
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.PRIMARY_KEY;
+import static 
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.SHARDING_KEY;
+import static 
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.SPLIT_MODE;
+import static 
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.SUPPORT_UPSERT;
+import static 
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.TABLE;
+import static 
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.USERNAME;
+
+@AutoService(Factory.class)
+public class ClickhouseSinkFactory implements TableSinkFactory {
+    @Override
+    public String factoryIdentifier() {
+        return "Clickhouse";
+    }
+
+    @Override
+    public TableSink<SeaTunnelRow, ClickhouseSinkState, CKCommitInfo, 
CKAggCommitInfo> createSink(
+            TableSinkFactoryContext context) {
+        ReadonlyConfig readonlyConfig = context.getOptions();
+        CatalogTable catalogTable = context.getCatalogTable();
+        List<ClickHouseNode> nodes = 
ClickhouseUtil.createNodes(readonlyConfig);
+        Properties clickhouseProperties = new Properties();
+        readonlyConfig
+                .get(CLICKHOUSE_CONFIG)
+                .forEach((key, value) -> clickhouseProperties.put(key, 
String.valueOf(value)));
+
+        clickhouseProperties.put("user", readonlyConfig.get(USERNAME));
+        clickhouseProperties.put("password", readonlyConfig.get(PASSWORD));
+        ClickhouseProxy proxy = new ClickhouseProxy(nodes.get(0));
+        try {
+            Map<String, String> tableSchema =
+                    proxy.getClickhouseTableSchema(readonlyConfig.get(TABLE));
+            String shardKey = null;
+            String shardKeyType = null;
+            ClickhouseTable table =
+                    proxy.getClickhouseTable(
+                            readonlyConfig.get(DATABASE), 
readonlyConfig.get(TABLE));
+            if (readonlyConfig.get(SPLIT_MODE)) {
+                if (!"Distributed".equals(table.getEngine())) {
+                    throw new ClickhouseConnectorException(
+                            CommonErrorCodeDeprecated.ILLEGAL_ARGUMENT,
+                            "split mode only support table which engine is "
+                                    + "'Distributed' engine at now");
+                }
+                if (readonlyConfig.getOptional(SHARDING_KEY).isPresent()) {
+                    shardKey = readonlyConfig.get(SHARDING_KEY);
+                    shardKeyType = tableSchema.get(shardKey);
+                }
+            }
+            ShardMetadata metadata =
+                    new ShardMetadata(
+                            shardKey,
+                            shardKeyType,
+                            table.getSortingKey(),
+                            readonlyConfig.get(DATABASE),
+                            readonlyConfig.get(TABLE),
+                            table.getEngine(),
+                            readonlyConfig.get(SPLIT_MODE),
+                            new Shard(1, 1, nodes.get(0)),
+                            readonlyConfig.get(USERNAME),
+                            readonlyConfig.get(PASSWORD));
+            proxy.close();
+            String[] primaryKeys = null;
+            if (readonlyConfig.getOptional(PRIMARY_KEY).isPresent()) {
+                String primaryKey = readonlyConfig.get(PRIMARY_KEY);
+                if (shardKey != null && !Objects.equals(primaryKey, shardKey)) 
{
+                    throw new ClickhouseConnectorException(
+                            CommonErrorCodeDeprecated.ILLEGAL_ARGUMENT,
+                            "sharding_key and primary_key must be consistent 
to ensure correct processing of cdc events");
+                }
+                primaryKeys = new String[] {primaryKey};
+            }
+            boolean supportUpsert = readonlyConfig.get(SUPPORT_UPSERT);
+            boolean allowExperimentalLightweightDelete =
+                    readonlyConfig.get(ALLOW_EXPERIMENTAL_LIGHTWEIGHT_DELETE);
+
+            ReaderOption option =
+                    ReaderOption.builder()
+                            .shardMetadata(metadata)
+                            .properties(clickhouseProperties)
+                            
.seaTunnelRowType(catalogTable.getSeaTunnelRowType())
+                            .tableEngine(table.getEngine())
+                            .tableSchema(tableSchema)
+                            .bulkSize(readonlyConfig.get(BULK_SIZE))
+                            .primaryKeys(primaryKeys)
+                            .supportUpsert(supportUpsert)
+                            
.allowExperimentalLightweightDelete(allowExperimentalLightweightDelete)
+                            .build();
+            return () -> new ClickhouseSink(option, catalogTable);
+        } finally {
+            proxy.close();
+        }
+    }
+
+    @Override
+    public OptionRule optionRule() {
+        return OptionRule.builder()
+                .required(HOST, DATABASE, TABLE)
+                .optional(
+                        CLICKHOUSE_CONFIG,
+                        BULK_SIZE,
+                        SPLIT_MODE,
+                        SHARDING_KEY,
+                        PRIMARY_KEY,
+                        SUPPORT_UPSERT,
+                        ALLOW_EXPERIMENTAL_LIGHTWEIGHT_DELETE)
+                .bundled(USERNAME, PASSWORD)
+                .build();
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ClickhouseSinkWriter.java
 
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ClickhouseSinkWriter.java
index b5f1505d11..6b7f652aba 100644
--- 
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ClickhouseSinkWriter.java
+++ 
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ClickhouseSinkWriter.java
@@ -28,7 +28,8 @@ import 
org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.client.executor
 import 
org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.client.executor.JdbcBatchStatementExecutorBuilder;
 import org.apache.seatunnel.connectors.seatunnel.clickhouse.state.CKCommitInfo;
 import 
org.apache.seatunnel.connectors.seatunnel.clickhouse.state.ClickhouseSinkState;
-import org.apache.seatunnel.connectors.seatunnel.clickhouse.tool.IntHolder;
+import 
org.apache.seatunnel.connectors.seatunnel.clickhouse.util.ClickhouseProxy;
+import org.apache.seatunnel.connectors.seatunnel.clickhouse.util.IntHolder;
 
 import org.apache.commons.lang3.StringUtils;
 
diff --git 
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ShardRouter.java
 
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ShardRouter.java
index 140e40b3b1..03f6efec31 100644
--- 
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ShardRouter.java
+++ 
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ShardRouter.java
@@ -21,7 +21,8 @@ import 
org.apache.seatunnel.connectors.seatunnel.clickhouse.exception.Clickhouse
 import 
org.apache.seatunnel.connectors.seatunnel.clickhouse.exception.ClickhouseConnectorException;
 import org.apache.seatunnel.connectors.seatunnel.clickhouse.shard.Shard;
 import 
org.apache.seatunnel.connectors.seatunnel.clickhouse.shard.ShardMetadata;
-import 
org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.DistributedEngine;
+import 
org.apache.seatunnel.connectors.seatunnel.clickhouse.util.ClickhouseProxy;
+import 
org.apache.seatunnel.connectors.seatunnel.clickhouse.util.DistributedEngine;
 
 import org.apache.commons.lang3.StringUtils;
 
diff --git 
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseFileSink.java
 
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseFileSink.java
index bb445d4282..4a0c80e02c 100644
--- 
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseFileSink.java
+++ 
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseFileSink.java
@@ -38,10 +38,10 @@ import 
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.FileReaderOpt
 import 
org.apache.seatunnel.connectors.seatunnel.clickhouse.exception.ClickhouseConnectorException;
 import org.apache.seatunnel.connectors.seatunnel.clickhouse.shard.Shard;
 import 
org.apache.seatunnel.connectors.seatunnel.clickhouse.shard.ShardMetadata;
-import 
org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.client.ClickhouseProxy;
 import 
org.apache.seatunnel.connectors.seatunnel.clickhouse.state.CKFileAggCommitInfo;
 import 
org.apache.seatunnel.connectors.seatunnel.clickhouse.state.CKFileCommitInfo;
 import 
org.apache.seatunnel.connectors.seatunnel.clickhouse.state.ClickhouseSinkState;
+import 
org.apache.seatunnel.connectors.seatunnel.clickhouse.util.ClickhouseProxy;
 import 
org.apache.seatunnel.connectors.seatunnel.clickhouse.util.ClickhouseUtil;
 
 import com.clickhouse.client.ClickHouseNode;
diff --git 
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseFileSinkAggCommitter.java
 
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseFileSinkAggCommitter.java
index 53e5fcb5ab..5d69191cac 100644
--- 
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseFileSinkAggCommitter.java
+++ 
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseFileSinkAggCommitter.java
@@ -21,9 +21,9 @@ import org.apache.seatunnel.api.sink.SinkAggregatedCommitter;
 import org.apache.seatunnel.common.utils.SeaTunnelException;
 import 
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.FileReaderOption;
 import org.apache.seatunnel.connectors.seatunnel.clickhouse.shard.Shard;
-import 
org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.client.ClickhouseProxy;
 import 
org.apache.seatunnel.connectors.seatunnel.clickhouse.state.CKFileAggCommitInfo;
 import 
org.apache.seatunnel.connectors.seatunnel.clickhouse.state.CKFileCommitInfo;
+import 
org.apache.seatunnel.connectors.seatunnel.clickhouse.util.ClickhouseProxy;
 
 import com.clickhouse.client.ClickHouseException;
 import com.clickhouse.client.ClickHouseRequest;
diff --git 
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseFileSinkWriter.java
 
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseFileSinkWriter.java
index 2abeb04647..e705acc768 100644
--- 
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseFileSinkWriter.java
+++ 
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseFileSinkWriter.java
@@ -26,10 +26,10 @@ import 
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.FileReaderOpt
 import 
org.apache.seatunnel.connectors.seatunnel.clickhouse.exception.ClickhouseConnectorErrorCode;
 import 
org.apache.seatunnel.connectors.seatunnel.clickhouse.exception.ClickhouseConnectorException;
 import org.apache.seatunnel.connectors.seatunnel.clickhouse.shard.Shard;
-import 
org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.client.ClickhouseProxy;
 import 
org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.client.ShardRouter;
 import 
org.apache.seatunnel.connectors.seatunnel.clickhouse.state.CKFileCommitInfo;
 import 
org.apache.seatunnel.connectors.seatunnel.clickhouse.state.ClickhouseSinkState;
+import 
org.apache.seatunnel.connectors.seatunnel.clickhouse.util.ClickhouseProxy;
 
 import org.apache.commons.io.FileUtils;
 
diff --git 
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseTable.java
 
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseTable.java
index 546f1f7466..2525caeb3d 100644
--- 
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseTable.java
+++ 
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseTable.java
@@ -17,7 +17,7 @@
 
 package org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.file;
 
-import 
org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.DistributedEngine;
+import 
org.apache.seatunnel.connectors.seatunnel.clickhouse.util.DistributedEngine;
 
 import lombok.Getter;
 import lombok.Setter;
diff --git 
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseSource.java
 
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseSource.java
index 2cc401dce2..d7c6b43856 100644
--- 
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseSource.java
+++ 
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseSource.java
@@ -17,142 +17,39 @@
 
 package org.apache.seatunnel.connectors.seatunnel.clickhouse.source;
 
-import org.apache.seatunnel.shade.com.typesafe.config.Config;
-import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory;
-
-import org.apache.seatunnel.api.common.PrepareFailException;
-import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
 import org.apache.seatunnel.api.source.Boundedness;
 import org.apache.seatunnel.api.source.SeaTunnelSource;
 import org.apache.seatunnel.api.source.SourceReader;
 import org.apache.seatunnel.api.source.SourceSplitEnumerator;
 import org.apache.seatunnel.api.source.SupportColumnProjection;
 import org.apache.seatunnel.api.source.SupportParallelism;
-import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
-import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
-import org.apache.seatunnel.common.config.CheckConfigUtil;
-import org.apache.seatunnel.common.config.CheckResult;
-import org.apache.seatunnel.common.constants.PluginType;
-import org.apache.seatunnel.common.utils.ExceptionUtils;
-import 
org.apache.seatunnel.connectors.seatunnel.clickhouse.exception.ClickhouseConnectorException;
 import 
org.apache.seatunnel.connectors.seatunnel.clickhouse.state.ClickhouseSourceState;
-import 
org.apache.seatunnel.connectors.seatunnel.clickhouse.util.ClickhouseUtil;
-import 
org.apache.seatunnel.connectors.seatunnel.clickhouse.util.TypeConvertUtil;
 
-import com.clickhouse.client.ClickHouseClient;
-import com.clickhouse.client.ClickHouseException;
-import com.clickhouse.client.ClickHouseFormat;
 import com.clickhouse.client.ClickHouseNode;
-import com.clickhouse.client.ClickHouseResponse;
-import com.google.auto.service.AutoService;
-import com.google.common.collect.ImmutableMap;
 
+import java.util.Collections;
 import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ThreadLocalRandom;
-import java.util.stream.Collectors;
-
-import static 
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.CLICKHOUSE_CONFIG;
-import static 
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.DATABASE;
-import static 
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.HOST;
-import static 
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.PASSWORD;
-import static 
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.SERVER_TIME_ZONE;
-import static 
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.SQL;
-import static 
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.USERNAME;
 
-@AutoService(SeaTunnelSource.class)
 public class ClickhouseSource
         implements SeaTunnelSource<SeaTunnelRow, ClickhouseSourceSplit, 
ClickhouseSourceState>,
                 SupportParallelism,
                 SupportColumnProjection {
 
     private List<ClickHouseNode> servers;
-    private SeaTunnelRowType rowTypeInfo;
+    private CatalogTable catalogTable;
     private String sql;
 
-    @Override
-    public String getPluginName() {
-        return "Clickhouse";
+    public ClickhouseSource(List<ClickHouseNode> servers, CatalogTable 
catalogTable, String sql) {
+        this.servers = servers;
+        this.catalogTable = catalogTable;
+        this.sql = sql;
     }
 
     @Override
-    public void prepare(Config config) throws PrepareFailException {
-        CheckResult result =
-                CheckConfigUtil.checkAllExists(
-                        config,
-                        HOST.key(),
-                        DATABASE.key(),
-                        SQL.key(),
-                        USERNAME.key(),
-                        PASSWORD.key());
-        if (!result.isSuccess()) {
-            throw new ClickhouseConnectorException(
-                    SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
-                    String.format(
-                            "PluginName: %s, PluginType: %s, Message: %s",
-                            getPluginName(), PluginType.SOURCE, 
result.getMsg()));
-        }
-        Map<String, Object> defaultConfig =
-                ImmutableMap.<String, Object>builder()
-                        .put(SERVER_TIME_ZONE.key(), 
SERVER_TIME_ZONE.defaultValue())
-                        .build();
-
-        config = config.withFallback(ConfigFactory.parseMap(defaultConfig));
-
-        Map<String, String> customConfig = null;
-
-        if (CheckConfigUtil.isValidParam(config, CLICKHOUSE_CONFIG.key())) {
-            customConfig =
-                    
config.getObject(CLICKHOUSE_CONFIG.key()).entrySet().stream()
-                            .collect(
-                                    Collectors.toMap(
-                                            Map.Entry::getKey,
-                                            entrySet ->
-                                                    
entrySet.getValue().unwrapped().toString()));
-        }
-
-        servers =
-                ClickhouseUtil.createNodes(
-                        config.getString(HOST.key()),
-                        config.getString(DATABASE.key()),
-                        config.getString(SERVER_TIME_ZONE.key()),
-                        config.getString(USERNAME.key()),
-                        config.getString(PASSWORD.key()),
-                        customConfig);
-
-        sql = config.getString(SQL.key());
-        ClickHouseNode currentServer =
-                
servers.get(ThreadLocalRandom.current().nextInt(servers.size()));
-        try (ClickHouseClient client = 
ClickHouseClient.newInstance(currentServer.getProtocol());
-                ClickHouseResponse response =
-                        client.connect(currentServer)
-                                
.format(ClickHouseFormat.RowBinaryWithNamesAndTypes)
-                                
.query(modifySQLToLimit1(config.getString(SQL.key())))
-                                .executeAndWait()) {
-
-            int columnSize = response.getColumns().size();
-            String[] fieldNames = new String[columnSize];
-            SeaTunnelDataType<?>[] seaTunnelDataTypes = new 
SeaTunnelDataType[columnSize];
-
-            for (int i = 0; i < columnSize; i++) {
-                fieldNames[i] = response.getColumns().get(i).getColumnName();
-                seaTunnelDataTypes[i] = 
TypeConvertUtil.convert(response.getColumns().get(i));
-            }
-
-            this.rowTypeInfo = new SeaTunnelRowType(fieldNames, 
seaTunnelDataTypes);
-
-        } catch (ClickHouseException e) {
-            throw new ClickhouseConnectorException(
-                    SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
-                    String.format(
-                            "PluginName: %s, PluginType: %s, Message: %s",
-                            getPluginName(), PluginType.SOURCE, 
ExceptionUtils.getMessage(e)));
-        }
-    }
-
-    private String modifySQLToLimit1(String sql) {
-        return String.format("SELECT * FROM (%s) s LIMIT 1", sql);
+    public String getPluginName() {
+        return "Clickhouse";
     }
 
     @Override
@@ -161,14 +58,15 @@ public class ClickhouseSource
     }
 
     @Override
-    public SeaTunnelRowType getProducedType() {
-        return this.rowTypeInfo;
+    public List<CatalogTable> getProducedCatalogTables() {
+        return Collections.singletonList(catalogTable);
     }
 
     @Override
     public SourceReader<SeaTunnelRow, ClickhouseSourceSplit> createReader(
             SourceReader.Context readerContext) throws Exception {
-        return new ClickhouseSourceReader(servers, readerContext, 
this.rowTypeInfo, sql);
+        return new ClickhouseSourceReader(
+                servers, readerContext, 
this.catalogTable.getSeaTunnelRowType(), sql);
     }
 
     @Override
diff --git 
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseSourceFactory.java
 
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseSourceFactory.java
index 4adea4b80c..bb91d3c05e 100644
--- 
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseSourceFactory.java
+++ 
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseSourceFactory.java
@@ -17,13 +17,37 @@
 
 package org.apache.seatunnel.connectors.seatunnel.clickhouse.source;
 
+import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
 import org.apache.seatunnel.api.configuration.util.OptionRule;
 import org.apache.seatunnel.api.source.SeaTunnelSource;
+import org.apache.seatunnel.api.source.SourceSplit;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.PhysicalColumn;
+import org.apache.seatunnel.api.table.catalog.TableIdentifier;
+import org.apache.seatunnel.api.table.catalog.TableSchema;
+import org.apache.seatunnel.api.table.connector.TableSource;
 import org.apache.seatunnel.api.table.factory.Factory;
 import org.apache.seatunnel.api.table.factory.TableSourceFactory;
+import org.apache.seatunnel.api.table.factory.TableSourceFactoryContext;
+import org.apache.seatunnel.common.constants.PluginType;
+import 
org.apache.seatunnel.connectors.seatunnel.clickhouse.exception.ClickhouseConnectorException;
+import 
org.apache.seatunnel.connectors.seatunnel.clickhouse.util.ClickhouseUtil;
+import 
org.apache.seatunnel.connectors.seatunnel.clickhouse.util.TypeConvertUtil;
 
+import com.clickhouse.client.ClickHouseClient;
+import com.clickhouse.client.ClickHouseColumn;
+import com.clickhouse.client.ClickHouseException;
+import com.clickhouse.client.ClickHouseFormat;
+import com.clickhouse.client.ClickHouseNode;
+import com.clickhouse.client.ClickHouseResponse;
 import com.google.auto.service.AutoService;
 
+import java.io.Serializable;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.ThreadLocalRandom;
+
 import static 
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.CLICKHOUSE_CONFIG;
 import static 
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.DATABASE;
 import static 
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.HOST;
@@ -38,6 +62,61 @@ public class ClickhouseSourceFactory implements 
TableSourceFactory {
         return "Clickhouse";
     }
 
+    @Override
+    public <T, SplitT extends SourceSplit, StateT extends Serializable>
+            TableSource<T, SplitT, StateT> 
createSource(TableSourceFactoryContext context) {
+        ReadonlyConfig readonlyConfig = context.getOptions();
+        List<ClickHouseNode> nodes = 
ClickhouseUtil.createNodes(readonlyConfig);
+
+        String sql = readonlyConfig.get(SQL);
+        ClickHouseNode currentServer = 
nodes.get(ThreadLocalRandom.current().nextInt(nodes.size()));
+        try (ClickHouseClient client = 
ClickHouseClient.newInstance(currentServer.getProtocol());
+                ClickHouseResponse response =
+                        client.connect(currentServer)
+                                
.format(ClickHouseFormat.RowBinaryWithNamesAndTypes)
+                                .query(modifySQLToLimit1(sql))
+                                .executeAndWait()) {
+            TableSchema.Builder builder = TableSchema.builder();
+            List<ClickHouseColumn> columns = response.getColumns();
+            columns.forEach(
+                    column -> {
+                        PhysicalColumn physicalColumn =
+                                PhysicalColumn.of(
+                                        column.getColumnName(),
+                                        TypeConvertUtil.convert(column),
+                                        (long) column.getEstimatedLength(),
+                                        column.getScale(),
+                                        column.isNullable(),
+                                        null,
+                                        null);
+                        builder.column(physicalColumn);
+                    });
+            String catalogName = "clickhouse_catalog";
+            CatalogTable catalogTable =
+                    CatalogTable.of(
+                            TableIdentifier.of(
+                                    catalogName, readonlyConfig.get(DATABASE), 
"default"),
+                            builder.build(),
+                            Collections.emptyMap(),
+                            Collections.emptyList(),
+                            "",
+                            catalogName);
+            return () ->
+                    (SeaTunnelSource<T, SplitT, StateT>)
+                            new ClickhouseSource(nodes, catalogTable, sql);
+        } catch (ClickHouseException e) {
+            throw new ClickhouseConnectorException(
+                    SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
+                    String.format(
+                            "PluginName: %s, PluginType: %s, Message: %s",
+                            factoryIdentifier(), PluginType.SOURCE, 
e.getMessage()));
+        }
+    }
+
+    private String modifySQLToLimit1(String sql) {
+        return String.format("SELECT * FROM (%s) s LIMIT 1", sql);
+    }
+
     @Override
     public OptionRule optionRule() {
         return OptionRule.builder()
diff --git 
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ClickhouseProxy.java
 
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/util/ClickhouseProxy.java
similarity index 98%
rename from 
seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ClickhouseProxy.java
rename to 
seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/util/ClickhouseProxy.java
index bf0f9a5552..c417818257 100644
--- 
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ClickhouseProxy.java
+++ 
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/util/ClickhouseProxy.java
@@ -15,14 +15,13 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.client;
+package org.apache.seatunnel.connectors.seatunnel.clickhouse.util;
 
 import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
 import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated;
 import 
org.apache.seatunnel.connectors.seatunnel.clickhouse.exception.ClickhouseConnectorErrorCode;
 import 
org.apache.seatunnel.connectors.seatunnel.clickhouse.exception.ClickhouseConnectorException;
 import org.apache.seatunnel.connectors.seatunnel.clickhouse.shard.Shard;
-import 
org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.DistributedEngine;
 import 
org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.file.ClickhouseTable;
 
 import com.clickhouse.client.ClickHouseClient;
diff --git 
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/util/ClickhouseUtil.java
 
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/util/ClickhouseUtil.java
index f787cf5c8f..13667d0e40 100644
--- 
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/util/ClickhouseUtil.java
+++ 
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/util/ClickhouseUtil.java
@@ -17,6 +17,9 @@
 
 package org.apache.seatunnel.connectors.seatunnel.clickhouse.util;
 
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import 
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig;
+
 import org.apache.commons.collections4.MapUtils;
 import org.apache.commons.lang3.StringUtils;
 
@@ -31,6 +34,16 @@ import java.util.stream.Collectors;
 
 public class ClickhouseUtil {
 
+    public static List<ClickHouseNode> createNodes(ReadonlyConfig config) {
+        return createNodes(
+                config.get(ClickhouseConfig.HOST),
+                config.get(ClickhouseConfig.DATABASE),
+                config.get(ClickhouseConfig.SERVER_TIME_ZONE),
+                config.get(ClickhouseConfig.USERNAME),
+                config.get(ClickhouseConfig.PASSWORD),
+                null);
+    }
+
     public static List<ClickHouseNode> createNodes(
             String nodeAddress,
             String database,
diff --git 
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/DistributedEngine.java
 
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/util/DistributedEngine.java
similarity index 94%
rename from 
seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/DistributedEngine.java
rename to 
seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/util/DistributedEngine.java
index 067f09fdbc..8974b7cd0c 100644
--- 
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/DistributedEngine.java
+++ 
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/util/DistributedEngine.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.connectors.seatunnel.clickhouse.sink;
+package org.apache.seatunnel.connectors.seatunnel.clickhouse.util;
 
 import lombok.AllArgsConstructor;
 import lombok.Getter;
diff --git 
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/tool/IntHolder.java
 
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/util/IntHolder.java
similarity index 94%
rename from 
seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/tool/IntHolder.java
rename to 
seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/util/IntHolder.java
index 02e7be5966..9913d7a408 100644
--- 
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/tool/IntHolder.java
+++ 
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/util/IntHolder.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.connectors.seatunnel.clickhouse.tool;
+package org.apache.seatunnel.connectors.seatunnel.clickhouse.util;
 
 import java.io.Serializable;
 
diff --git 
a/seatunnel-connectors-v2/connector-clickhouse/src/test/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/ClickhouseFactoryTest.java
 
b/seatunnel-connectors-v2/connector-clickhouse/src/test/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/ClickhouseFactoryTest.java
index e6c50b0611..d193b53ea7 100644
--- 
a/seatunnel-connectors-v2/connector-clickhouse/src/test/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/ClickhouseFactoryTest.java
+++ 
b/seatunnel-connectors-v2/connector-clickhouse/src/test/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/ClickhouseFactoryTest.java
@@ -17,7 +17,7 @@
 
 package org.apache.seatunnel.connectors.seatunnel.clickhouse;
 
-import 
org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.ClickhouseSinkFactory;
+import 
org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.client.ClickhouseSinkFactory;
 import 
org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.file.ClickhouseFileSinkFactory;
 import 
org.apache.seatunnel.connectors.seatunnel.clickhouse.source.ClickhouseSourceFactory;
 

Reply via email to