This is an automated email from the ASF dual-hosted git repository.
wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new b69fceceee [Improve] update clickhouse connector, use factory to
create source/sink (#7946)
b69fceceee is described below
commit b69fceceeeed9fdd5772646047fc6675232c85be
Author: Jarvis <[email protected]>
AuthorDate: Wed Oct 30 17:34:12 2024 +0800
[Improve] update clickhouse connector, use factory to create source/sink
(#7946)
---
.../clickhouse/sink/ClickhouseSinkFactory.java | 61 -------
.../sink/client/ClickhouseBatchStatement.java | 2 +-
.../clickhouse/sink/client/ClickhouseSink.java | 193 +--------------------
.../sink/client/ClickhouseSinkFactory.java | 162 +++++++++++++++++
.../sink/client/ClickhouseSinkWriter.java | 3 +-
.../clickhouse/sink/client/ShardRouter.java | 3 +-
.../clickhouse/sink/file/ClickhouseFileSink.java | 2 +-
.../sink/file/ClickhouseFileSinkAggCommitter.java | 2 +-
.../sink/file/ClickhouseFileSinkWriter.java | 2 +-
.../clickhouse/sink/file/ClickhouseTable.java | 2 +-
.../clickhouse/source/ClickhouseSource.java | 128 ++------------
.../clickhouse/source/ClickhouseSourceFactory.java | 79 +++++++++
.../{sink/client => util}/ClickhouseProxy.java | 3 +-
.../seatunnel/clickhouse/util/ClickhouseUtil.java | 13 ++
.../{sink => util}/DistributedEngine.java | 2 +-
.../clickhouse/{tool => util}/IntHolder.java | 2 +-
.../clickhouse/ClickhouseFactoryTest.java | 2 +-
17 files changed, 289 insertions(+), 372 deletions(-)
diff --git
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/ClickhouseSinkFactory.java
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/ClickhouseSinkFactory.java
deleted file mode 100644
index 17ffdd2d40..0000000000
---
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/ClickhouseSinkFactory.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.connectors.seatunnel.clickhouse.sink;
-
-import org.apache.seatunnel.api.configuration.util.OptionRule;
-import org.apache.seatunnel.api.table.factory.Factory;
-import org.apache.seatunnel.api.table.factory.TableSinkFactory;
-
-import com.google.auto.service.AutoService;
-
-import static
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.ALLOW_EXPERIMENTAL_LIGHTWEIGHT_DELETE;
-import static
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.BULK_SIZE;
-import static
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.CLICKHOUSE_CONFIG;
-import static
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.DATABASE;
-import static
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.HOST;
-import static
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.PASSWORD;
-import static
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.PRIMARY_KEY;
-import static
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.SHARDING_KEY;
-import static
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.SPLIT_MODE;
-import static
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.SUPPORT_UPSERT;
-import static
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.TABLE;
-import static
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.USERNAME;
-
-@AutoService(Factory.class)
-public class ClickhouseSinkFactory implements TableSinkFactory {
- @Override
- public String factoryIdentifier() {
- return "Clickhouse";
- }
-
- @Override
- public OptionRule optionRule() {
- return OptionRule.builder()
- .required(HOST, DATABASE, TABLE)
- .optional(
- CLICKHOUSE_CONFIG,
- BULK_SIZE,
- SPLIT_MODE,
- SHARDING_KEY,
- PRIMARY_KEY,
- SUPPORT_UPSERT,
- ALLOW_EXPERIMENTAL_LIGHTWEIGHT_DELETE)
- .bundled(USERNAME, PASSWORD)
- .build();
- }
-}
diff --git
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ClickhouseBatchStatement.java
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ClickhouseBatchStatement.java
index 52397229db..04ee5755e5 100644
---
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ClickhouseBatchStatement.java
+++
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ClickhouseBatchStatement.java
@@ -18,7 +18,7 @@
package org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.client;
import
org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.client.executor.JdbcBatchStatementExecutor;
-import org.apache.seatunnel.connectors.seatunnel.clickhouse.tool.IntHolder;
+import org.apache.seatunnel.connectors.seatunnel.clickhouse.util.IntHolder;
import com.clickhouse.jdbc.internal.ClickHouseConnectionImpl;
diff --git
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ClickhouseSink.java
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ClickhouseSink.java
index d2de6fd182..22f18694e2 100644
---
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ClickhouseSink.java
+++
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ClickhouseSink.java
@@ -17,210 +17,35 @@
package org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.client;
-import org.apache.seatunnel.shade.com.typesafe.config.Config;
-import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory;
-
-import org.apache.seatunnel.api.common.PrepareFailException;
-import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
import org.apache.seatunnel.api.serialization.DefaultSerializer;
import org.apache.seatunnel.api.serialization.Serializer;
import org.apache.seatunnel.api.sink.SeaTunnelSink;
import org.apache.seatunnel.api.sink.SinkWriter;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
-import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
-import org.apache.seatunnel.common.config.CheckConfigUtil;
-import org.apache.seatunnel.common.config.CheckResult;
-import org.apache.seatunnel.common.constants.PluginType;
-import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated;
import
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ReaderOption;
-import
org.apache.seatunnel.connectors.seatunnel.clickhouse.exception.ClickhouseConnectorException;
-import org.apache.seatunnel.connectors.seatunnel.clickhouse.shard.Shard;
-import
org.apache.seatunnel.connectors.seatunnel.clickhouse.shard.ShardMetadata;
-import
org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.file.ClickhouseTable;
import
org.apache.seatunnel.connectors.seatunnel.clickhouse.state.CKAggCommitInfo;
import org.apache.seatunnel.connectors.seatunnel.clickhouse.state.CKCommitInfo;
import
org.apache.seatunnel.connectors.seatunnel.clickhouse.state.ClickhouseSinkState;
-import
org.apache.seatunnel.connectors.seatunnel.clickhouse.util.ClickhouseUtil;
-
-import com.clickhouse.client.ClickHouseNode;
-import com.google.auto.service.AutoService;
-import com.google.common.collect.ImmutableMap;
import java.io.IOException;
import java.util.List;
-import java.util.Map;
-import java.util.Objects;
import java.util.Optional;
-import java.util.Properties;
-
-import static
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.ALLOW_EXPERIMENTAL_LIGHTWEIGHT_DELETE;
-import static
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.BULK_SIZE;
-import static
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.CLICKHOUSE_CONFIG;
-import static
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.DATABASE;
-import static
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.HOST;
-import static
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.PASSWORD;
-import static
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.PRIMARY_KEY;
-import static
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.SERVER_TIME_ZONE;
-import static
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.SHARDING_KEY;
-import static
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.SPLIT_MODE;
-import static
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.SUPPORT_UPSERT;
-import static
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.TABLE;
-import static
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.USERNAME;
-@AutoService(SeaTunnelSink.class)
public class ClickhouseSink
implements SeaTunnelSink<SeaTunnelRow, ClickhouseSinkState,
CKCommitInfo, CKAggCommitInfo> {
private ReaderOption option;
+ private CatalogTable catalogTable;
- @Override
- public String getPluginName() {
- return "Clickhouse";
+ public ClickhouseSink(ReaderOption option, CatalogTable catalogTable) {
+ this.option = option;
+ this.catalogTable = catalogTable;
}
@Override
- public void prepare(Config config) throws PrepareFailException {
- CheckResult result =
- CheckConfigUtil.checkAllExists(config, HOST.key(),
DATABASE.key(), TABLE.key());
-
- boolean isCredential = config.hasPath(USERNAME.key()) ||
config.hasPath(PASSWORD.key());
-
- if (isCredential) {
- result = CheckConfigUtil.checkAllExists(config, USERNAME.key(),
PASSWORD.key());
- }
-
- if (!result.isSuccess()) {
- throw new ClickhouseConnectorException(
- SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
- String.format(
- "PluginName: %s, PluginType: %s, Message: %s",
- getPluginName(), PluginType.SINK,
result.getMsg()));
- }
- Map<String, Object> defaultConfig =
- ImmutableMap.<String, Object>builder()
- .put(BULK_SIZE.key(), BULK_SIZE.defaultValue())
- .put(SPLIT_MODE.key(), SPLIT_MODE.defaultValue())
- .put(SERVER_TIME_ZONE.key(),
SERVER_TIME_ZONE.defaultValue())
- .build();
-
- config = config.withFallback(ConfigFactory.parseMap(defaultConfig));
-
- List<ClickHouseNode> nodes;
- if (!isCredential) {
- nodes =
- ClickhouseUtil.createNodes(
- config.getString(HOST.key()),
- config.getString(DATABASE.key()),
- config.getString(SERVER_TIME_ZONE.key()),
- null,
- null,
- null);
- } else {
- nodes =
- ClickhouseUtil.createNodes(
- config.getString(HOST.key()),
- config.getString(DATABASE.key()),
- config.getString(SERVER_TIME_ZONE.key()),
- config.getString(USERNAME.key()),
- config.getString(PASSWORD.key()),
- null);
- }
-
- Properties clickhouseProperties = new Properties();
- if (CheckConfigUtil.isValidParam(config, CLICKHOUSE_CONFIG.key())) {
- config.getObject(CLICKHOUSE_CONFIG.key())
- .forEach(
- (key, value) ->
- clickhouseProperties.put(
- key,
String.valueOf(value.unwrapped())));
- }
-
- if (isCredential) {
- clickhouseProperties.put("user", config.getString(USERNAME.key()));
- clickhouseProperties.put("password",
config.getString(PASSWORD.key()));
- }
-
- ClickhouseProxy proxy = new ClickhouseProxy(nodes.get(0));
- Map<String, String> tableSchema =
- proxy.getClickhouseTableSchema(config.getString(TABLE.key()));
- String shardKey = null;
- String shardKeyType = null;
- ClickhouseTable table =
- proxy.getClickhouseTable(
- config.getString(DATABASE.key()),
config.getString(TABLE.key()));
- if (config.getBoolean(SPLIT_MODE.key())) {
- if (!"Distributed".equals(table.getEngine())) {
- throw new ClickhouseConnectorException(
- CommonErrorCodeDeprecated.ILLEGAL_ARGUMENT,
- "split mode only support table which engine is "
- + "'Distributed' engine at now");
- }
- if (config.hasPath(SHARDING_KEY.key())) {
- shardKey = config.getString(SHARDING_KEY.key());
- shardKeyType = tableSchema.get(shardKey);
- }
- }
- ShardMetadata metadata;
-
- if (isCredential) {
- metadata =
- new ShardMetadata(
- shardKey,
- shardKeyType,
- table.getSortingKey(),
- config.getString(DATABASE.key()),
- config.getString(TABLE.key()),
- table.getEngine(),
- config.getBoolean(SPLIT_MODE.key()),
- new Shard(1, 1, nodes.get(0)),
- config.getString(USERNAME.key()),
- config.getString(PASSWORD.key()));
- } else {
- metadata =
- new ShardMetadata(
- shardKey,
- shardKeyType,
- table.getSortingKey(),
- config.getString(DATABASE.key()),
- config.getString(TABLE.key()),
- table.getEngine(),
- config.getBoolean(SPLIT_MODE.key()),
- new Shard(1, 1, nodes.get(0)));
- }
-
- proxy.close();
-
- String[] primaryKeys = null;
- if (config.hasPath(PRIMARY_KEY.key())) {
- String primaryKey = config.getString(PRIMARY_KEY.key());
- if (shardKey != null && !Objects.equals(primaryKey, shardKey)) {
- throw new ClickhouseConnectorException(
- CommonErrorCodeDeprecated.ILLEGAL_ARGUMENT,
- "sharding_key and primary_key must be consistent to
ensure correct processing of cdc events");
- }
- primaryKeys = new String[] {primaryKey};
- }
- boolean supportUpsert = SUPPORT_UPSERT.defaultValue();
- if (config.hasPath(SUPPORT_UPSERT.key())) {
- supportUpsert = config.getBoolean(SUPPORT_UPSERT.key());
- }
- boolean allowExperimentalLightweightDelete =
- ALLOW_EXPERIMENTAL_LIGHTWEIGHT_DELETE.defaultValue();
- if (config.hasPath(ALLOW_EXPERIMENTAL_LIGHTWEIGHT_DELETE.key())) {
- allowExperimentalLightweightDelete =
-
config.getBoolean(ALLOW_EXPERIMENTAL_LIGHTWEIGHT_DELETE.key());
- }
- this.option =
- ReaderOption.builder()
- .shardMetadata(metadata)
- .properties(clickhouseProperties)
- .tableEngine(table.getEngine())
- .tableSchema(tableSchema)
- .bulkSize(config.getInt(BULK_SIZE.key()))
- .primaryKeys(primaryKeys)
- .supportUpsert(supportUpsert)
-
.allowExperimentalLightweightDelete(allowExperimentalLightweightDelete)
- .build();
+ public String getPluginName() {
+ return "Clickhouse";
}
@Override
@@ -241,7 +66,7 @@ public class ClickhouseSink
}
@Override
- public void setTypeInfo(SeaTunnelRowType seaTunnelRowType) {
- this.option.setSeaTunnelRowType(seaTunnelRowType);
+ public Optional<CatalogTable> getWriteCatalogTable() {
+ return Optional.of(catalogTable);
}
}
diff --git
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ClickhouseSinkFactory.java
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ClickhouseSinkFactory.java
new file mode 100644
index 0000000000..720efacc32
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ClickhouseSinkFactory.java
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.client;
+
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.connector.TableSink;
+import org.apache.seatunnel.api.table.factory.Factory;
+import org.apache.seatunnel.api.table.factory.TableSinkFactory;
+import org.apache.seatunnel.api.table.factory.TableSinkFactoryContext;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated;
+import
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ReaderOption;
+import
org.apache.seatunnel.connectors.seatunnel.clickhouse.exception.ClickhouseConnectorException;
+import org.apache.seatunnel.connectors.seatunnel.clickhouse.shard.Shard;
+import
org.apache.seatunnel.connectors.seatunnel.clickhouse.shard.ShardMetadata;
+import
org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.file.ClickhouseTable;
+import
org.apache.seatunnel.connectors.seatunnel.clickhouse.state.CKAggCommitInfo;
+import org.apache.seatunnel.connectors.seatunnel.clickhouse.state.CKCommitInfo;
+import
org.apache.seatunnel.connectors.seatunnel.clickhouse.state.ClickhouseSinkState;
+import
org.apache.seatunnel.connectors.seatunnel.clickhouse.util.ClickhouseProxy;
+import
org.apache.seatunnel.connectors.seatunnel.clickhouse.util.ClickhouseUtil;
+
+import com.clickhouse.client.ClickHouseNode;
+import com.google.auto.service.AutoService;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Properties;
+
+import static
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.ALLOW_EXPERIMENTAL_LIGHTWEIGHT_DELETE;
+import static
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.BULK_SIZE;
+import static
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.CLICKHOUSE_CONFIG;
+import static
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.DATABASE;
+import static
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.HOST;
+import static
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.PASSWORD;
+import static
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.PRIMARY_KEY;
+import static
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.SHARDING_KEY;
+import static
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.SPLIT_MODE;
+import static
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.SUPPORT_UPSERT;
+import static
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.TABLE;
+import static
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.USERNAME;
+
+@AutoService(Factory.class)
+public class ClickhouseSinkFactory implements TableSinkFactory {
+ @Override
+ public String factoryIdentifier() {
+ return "Clickhouse";
+ }
+
+ @Override
+ public TableSink<SeaTunnelRow, ClickhouseSinkState, CKCommitInfo,
CKAggCommitInfo> createSink(
+ TableSinkFactoryContext context) {
+ ReadonlyConfig readonlyConfig = context.getOptions();
+ CatalogTable catalogTable = context.getCatalogTable();
+ List<ClickHouseNode> nodes =
ClickhouseUtil.createNodes(readonlyConfig);
+ Properties clickhouseProperties = new Properties();
+ readonlyConfig
+ .get(CLICKHOUSE_CONFIG)
+ .forEach((key, value) -> clickhouseProperties.put(key,
String.valueOf(value)));
+
+ clickhouseProperties.put("user", readonlyConfig.get(USERNAME));
+ clickhouseProperties.put("password", readonlyConfig.get(PASSWORD));
+ ClickhouseProxy proxy = new ClickhouseProxy(nodes.get(0));
+ try {
+ Map<String, String> tableSchema =
+ proxy.getClickhouseTableSchema(readonlyConfig.get(TABLE));
+ String shardKey = null;
+ String shardKeyType = null;
+ ClickhouseTable table =
+ proxy.getClickhouseTable(
+ readonlyConfig.get(DATABASE),
readonlyConfig.get(TABLE));
+ if (readonlyConfig.get(SPLIT_MODE)) {
+ if (!"Distributed".equals(table.getEngine())) {
+ throw new ClickhouseConnectorException(
+ CommonErrorCodeDeprecated.ILLEGAL_ARGUMENT,
+ "split mode only support table which engine is "
+ + "'Distributed' engine at now");
+ }
+ if (readonlyConfig.getOptional(SHARDING_KEY).isPresent()) {
+ shardKey = readonlyConfig.get(SHARDING_KEY);
+ shardKeyType = tableSchema.get(shardKey);
+ }
+ }
+ ShardMetadata metadata =
+ new ShardMetadata(
+ shardKey,
+ shardKeyType,
+ table.getSortingKey(),
+ readonlyConfig.get(DATABASE),
+ readonlyConfig.get(TABLE),
+ table.getEngine(),
+ readonlyConfig.get(SPLIT_MODE),
+ new Shard(1, 1, nodes.get(0)),
+ readonlyConfig.get(USERNAME),
+ readonlyConfig.get(PASSWORD));
+ proxy.close();
+ String[] primaryKeys = null;
+ if (readonlyConfig.getOptional(PRIMARY_KEY).isPresent()) {
+ String primaryKey = readonlyConfig.get(PRIMARY_KEY);
+ if (shardKey != null && !Objects.equals(primaryKey, shardKey))
{
+ throw new ClickhouseConnectorException(
+ CommonErrorCodeDeprecated.ILLEGAL_ARGUMENT,
+ "sharding_key and primary_key must be consistent
to ensure correct processing of cdc events");
+ }
+ primaryKeys = new String[] {primaryKey};
+ }
+ boolean supportUpsert = readonlyConfig.get(SUPPORT_UPSERT);
+ boolean allowExperimentalLightweightDelete =
+ readonlyConfig.get(ALLOW_EXPERIMENTAL_LIGHTWEIGHT_DELETE);
+
+ ReaderOption option =
+ ReaderOption.builder()
+ .shardMetadata(metadata)
+ .properties(clickhouseProperties)
+
.seaTunnelRowType(catalogTable.getSeaTunnelRowType())
+ .tableEngine(table.getEngine())
+ .tableSchema(tableSchema)
+ .bulkSize(readonlyConfig.get(BULK_SIZE))
+ .primaryKeys(primaryKeys)
+ .supportUpsert(supportUpsert)
+
.allowExperimentalLightweightDelete(allowExperimentalLightweightDelete)
+ .build();
+ return () -> new ClickhouseSink(option, catalogTable);
+ } finally {
+ proxy.close();
+ }
+ }
+
+ @Override
+ public OptionRule optionRule() {
+ return OptionRule.builder()
+ .required(HOST, DATABASE, TABLE)
+ .optional(
+ CLICKHOUSE_CONFIG,
+ BULK_SIZE,
+ SPLIT_MODE,
+ SHARDING_KEY,
+ PRIMARY_KEY,
+ SUPPORT_UPSERT,
+ ALLOW_EXPERIMENTAL_LIGHTWEIGHT_DELETE)
+ .bundled(USERNAME, PASSWORD)
+ .build();
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ClickhouseSinkWriter.java
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ClickhouseSinkWriter.java
index b5f1505d11..6b7f652aba 100644
---
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ClickhouseSinkWriter.java
+++
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ClickhouseSinkWriter.java
@@ -28,7 +28,8 @@ import
org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.client.executor
import
org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.client.executor.JdbcBatchStatementExecutorBuilder;
import org.apache.seatunnel.connectors.seatunnel.clickhouse.state.CKCommitInfo;
import
org.apache.seatunnel.connectors.seatunnel.clickhouse.state.ClickhouseSinkState;
-import org.apache.seatunnel.connectors.seatunnel.clickhouse.tool.IntHolder;
+import
org.apache.seatunnel.connectors.seatunnel.clickhouse.util.ClickhouseProxy;
+import org.apache.seatunnel.connectors.seatunnel.clickhouse.util.IntHolder;
import org.apache.commons.lang3.StringUtils;
diff --git
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ShardRouter.java
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ShardRouter.java
index 140e40b3b1..03f6efec31 100644
---
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ShardRouter.java
+++
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ShardRouter.java
@@ -21,7 +21,8 @@ import
org.apache.seatunnel.connectors.seatunnel.clickhouse.exception.Clickhouse
import
org.apache.seatunnel.connectors.seatunnel.clickhouse.exception.ClickhouseConnectorException;
import org.apache.seatunnel.connectors.seatunnel.clickhouse.shard.Shard;
import
org.apache.seatunnel.connectors.seatunnel.clickhouse.shard.ShardMetadata;
-import
org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.DistributedEngine;
+import
org.apache.seatunnel.connectors.seatunnel.clickhouse.util.ClickhouseProxy;
+import
org.apache.seatunnel.connectors.seatunnel.clickhouse.util.DistributedEngine;
import org.apache.commons.lang3.StringUtils;
diff --git
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseFileSink.java
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseFileSink.java
index bb445d4282..4a0c80e02c 100644
---
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseFileSink.java
+++
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseFileSink.java
@@ -38,10 +38,10 @@ import
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.FileReaderOpt
import
org.apache.seatunnel.connectors.seatunnel.clickhouse.exception.ClickhouseConnectorException;
import org.apache.seatunnel.connectors.seatunnel.clickhouse.shard.Shard;
import
org.apache.seatunnel.connectors.seatunnel.clickhouse.shard.ShardMetadata;
-import
org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.client.ClickhouseProxy;
import
org.apache.seatunnel.connectors.seatunnel.clickhouse.state.CKFileAggCommitInfo;
import
org.apache.seatunnel.connectors.seatunnel.clickhouse.state.CKFileCommitInfo;
import
org.apache.seatunnel.connectors.seatunnel.clickhouse.state.ClickhouseSinkState;
+import
org.apache.seatunnel.connectors.seatunnel.clickhouse.util.ClickhouseProxy;
import
org.apache.seatunnel.connectors.seatunnel.clickhouse.util.ClickhouseUtil;
import com.clickhouse.client.ClickHouseNode;
diff --git
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseFileSinkAggCommitter.java
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseFileSinkAggCommitter.java
index 53e5fcb5ab..5d69191cac 100644
---
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseFileSinkAggCommitter.java
+++
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseFileSinkAggCommitter.java
@@ -21,9 +21,9 @@ import org.apache.seatunnel.api.sink.SinkAggregatedCommitter;
import org.apache.seatunnel.common.utils.SeaTunnelException;
import
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.FileReaderOption;
import org.apache.seatunnel.connectors.seatunnel.clickhouse.shard.Shard;
-import
org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.client.ClickhouseProxy;
import
org.apache.seatunnel.connectors.seatunnel.clickhouse.state.CKFileAggCommitInfo;
import
org.apache.seatunnel.connectors.seatunnel.clickhouse.state.CKFileCommitInfo;
+import
org.apache.seatunnel.connectors.seatunnel.clickhouse.util.ClickhouseProxy;
import com.clickhouse.client.ClickHouseException;
import com.clickhouse.client.ClickHouseRequest;
diff --git
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseFileSinkWriter.java
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseFileSinkWriter.java
index 2abeb04647..e705acc768 100644
---
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseFileSinkWriter.java
+++
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseFileSinkWriter.java
@@ -26,10 +26,10 @@ import
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.FileReaderOpt
import
org.apache.seatunnel.connectors.seatunnel.clickhouse.exception.ClickhouseConnectorErrorCode;
import
org.apache.seatunnel.connectors.seatunnel.clickhouse.exception.ClickhouseConnectorException;
import org.apache.seatunnel.connectors.seatunnel.clickhouse.shard.Shard;
-import
org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.client.ClickhouseProxy;
import
org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.client.ShardRouter;
import
org.apache.seatunnel.connectors.seatunnel.clickhouse.state.CKFileCommitInfo;
import
org.apache.seatunnel.connectors.seatunnel.clickhouse.state.ClickhouseSinkState;
+import
org.apache.seatunnel.connectors.seatunnel.clickhouse.util.ClickhouseProxy;
import org.apache.commons.io.FileUtils;
diff --git
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseTable.java
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseTable.java
index 546f1f7466..2525caeb3d 100644
---
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseTable.java
+++
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseTable.java
@@ -17,7 +17,7 @@
package org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.file;
-import
org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.DistributedEngine;
+import
org.apache.seatunnel.connectors.seatunnel.clickhouse.util.DistributedEngine;
import lombok.Getter;
import lombok.Setter;
diff --git
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseSource.java
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseSource.java
index 2cc401dce2..d7c6b43856 100644
---
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseSource.java
+++
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseSource.java
@@ -17,142 +17,39 @@
package org.apache.seatunnel.connectors.seatunnel.clickhouse.source;
-import org.apache.seatunnel.shade.com.typesafe.config.Config;
-import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory;
-
-import org.apache.seatunnel.api.common.PrepareFailException;
-import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
import org.apache.seatunnel.api.source.Boundedness;
import org.apache.seatunnel.api.source.SeaTunnelSource;
import org.apache.seatunnel.api.source.SourceReader;
import org.apache.seatunnel.api.source.SourceSplitEnumerator;
import org.apache.seatunnel.api.source.SupportColumnProjection;
import org.apache.seatunnel.api.source.SupportParallelism;
-import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
-import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
-import org.apache.seatunnel.common.config.CheckConfigUtil;
-import org.apache.seatunnel.common.config.CheckResult;
-import org.apache.seatunnel.common.constants.PluginType;
-import org.apache.seatunnel.common.utils.ExceptionUtils;
-import
org.apache.seatunnel.connectors.seatunnel.clickhouse.exception.ClickhouseConnectorException;
import
org.apache.seatunnel.connectors.seatunnel.clickhouse.state.ClickhouseSourceState;
-import
org.apache.seatunnel.connectors.seatunnel.clickhouse.util.ClickhouseUtil;
-import
org.apache.seatunnel.connectors.seatunnel.clickhouse.util.TypeConvertUtil;
-import com.clickhouse.client.ClickHouseClient;
-import com.clickhouse.client.ClickHouseException;
-import com.clickhouse.client.ClickHouseFormat;
import com.clickhouse.client.ClickHouseNode;
-import com.clickhouse.client.ClickHouseResponse;
-import com.google.auto.service.AutoService;
-import com.google.common.collect.ImmutableMap;
+import java.util.Collections;
import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ThreadLocalRandom;
-import java.util.stream.Collectors;
-
-import static
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.CLICKHOUSE_CONFIG;
-import static
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.DATABASE;
-import static
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.HOST;
-import static
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.PASSWORD;
-import static
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.SERVER_TIME_ZONE;
-import static
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.SQL;
-import static
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.USERNAME;
-@AutoService(SeaTunnelSource.class)
public class ClickhouseSource
implements SeaTunnelSource<SeaTunnelRow, ClickhouseSourceSplit,
ClickhouseSourceState>,
SupportParallelism,
SupportColumnProjection {
private List<ClickHouseNode> servers;
- private SeaTunnelRowType rowTypeInfo;
+ private CatalogTable catalogTable;
private String sql;
- @Override
- public String getPluginName() {
- return "Clickhouse";
+ public ClickhouseSource(List<ClickHouseNode> servers, CatalogTable
catalogTable, String sql) {
+ this.servers = servers;
+ this.catalogTable = catalogTable;
+ this.sql = sql;
}
@Override
- public void prepare(Config config) throws PrepareFailException {
- CheckResult result =
- CheckConfigUtil.checkAllExists(
- config,
- HOST.key(),
- DATABASE.key(),
- SQL.key(),
- USERNAME.key(),
- PASSWORD.key());
- if (!result.isSuccess()) {
- throw new ClickhouseConnectorException(
- SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
- String.format(
- "PluginName: %s, PluginType: %s, Message: %s",
- getPluginName(), PluginType.SOURCE,
result.getMsg()));
- }
- Map<String, Object> defaultConfig =
- ImmutableMap.<String, Object>builder()
- .put(SERVER_TIME_ZONE.key(),
SERVER_TIME_ZONE.defaultValue())
- .build();
-
- config = config.withFallback(ConfigFactory.parseMap(defaultConfig));
-
- Map<String, String> customConfig = null;
-
- if (CheckConfigUtil.isValidParam(config, CLICKHOUSE_CONFIG.key())) {
- customConfig =
-
config.getObject(CLICKHOUSE_CONFIG.key()).entrySet().stream()
- .collect(
- Collectors.toMap(
- Map.Entry::getKey,
- entrySet ->
-
entrySet.getValue().unwrapped().toString()));
- }
-
- servers =
- ClickhouseUtil.createNodes(
- config.getString(HOST.key()),
- config.getString(DATABASE.key()),
- config.getString(SERVER_TIME_ZONE.key()),
- config.getString(USERNAME.key()),
- config.getString(PASSWORD.key()),
- customConfig);
-
- sql = config.getString(SQL.key());
- ClickHouseNode currentServer =
-
servers.get(ThreadLocalRandom.current().nextInt(servers.size()));
- try (ClickHouseClient client =
ClickHouseClient.newInstance(currentServer.getProtocol());
- ClickHouseResponse response =
- client.connect(currentServer)
-
.format(ClickHouseFormat.RowBinaryWithNamesAndTypes)
-
.query(modifySQLToLimit1(config.getString(SQL.key())))
- .executeAndWait()) {
-
- int columnSize = response.getColumns().size();
- String[] fieldNames = new String[columnSize];
- SeaTunnelDataType<?>[] seaTunnelDataTypes = new
SeaTunnelDataType[columnSize];
-
- for (int i = 0; i < columnSize; i++) {
- fieldNames[i] = response.getColumns().get(i).getColumnName();
- seaTunnelDataTypes[i] =
TypeConvertUtil.convert(response.getColumns().get(i));
- }
-
- this.rowTypeInfo = new SeaTunnelRowType(fieldNames,
seaTunnelDataTypes);
-
- } catch (ClickHouseException e) {
- throw new ClickhouseConnectorException(
- SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
- String.format(
- "PluginName: %s, PluginType: %s, Message: %s",
- getPluginName(), PluginType.SOURCE,
ExceptionUtils.getMessage(e)));
- }
- }
-
- private String modifySQLToLimit1(String sql) {
- return String.format("SELECT * FROM (%s) s LIMIT 1", sql);
+ public String getPluginName() {
+ return "Clickhouse";
}
@Override
@@ -161,14 +58,15 @@ public class ClickhouseSource
}
@Override
- public SeaTunnelRowType getProducedType() {
- return this.rowTypeInfo;
+ public List<CatalogTable> getProducedCatalogTables() {
+ return Collections.singletonList(catalogTable);
}
@Override
public SourceReader<SeaTunnelRow, ClickhouseSourceSplit> createReader(
SourceReader.Context readerContext) throws Exception {
- return new ClickhouseSourceReader(servers, readerContext,
this.rowTypeInfo, sql);
+ return new ClickhouseSourceReader(
+ servers, readerContext,
this.catalogTable.getSeaTunnelRowType(), sql);
}
@Override
diff --git
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseSourceFactory.java
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseSourceFactory.java
index 4adea4b80c..bb91d3c05e 100644
---
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseSourceFactory.java
+++
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseSourceFactory.java
@@ -17,13 +17,37 @@
package org.apache.seatunnel.connectors.seatunnel.clickhouse.source;
+import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.configuration.util.OptionRule;
import org.apache.seatunnel.api.source.SeaTunnelSource;
+import org.apache.seatunnel.api.source.SourceSplit;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.PhysicalColumn;
+import org.apache.seatunnel.api.table.catalog.TableIdentifier;
+import org.apache.seatunnel.api.table.catalog.TableSchema;
+import org.apache.seatunnel.api.table.connector.TableSource;
import org.apache.seatunnel.api.table.factory.Factory;
import org.apache.seatunnel.api.table.factory.TableSourceFactory;
+import org.apache.seatunnel.api.table.factory.TableSourceFactoryContext;
+import org.apache.seatunnel.common.constants.PluginType;
+import
org.apache.seatunnel.connectors.seatunnel.clickhouse.exception.ClickhouseConnectorException;
+import
org.apache.seatunnel.connectors.seatunnel.clickhouse.util.ClickhouseUtil;
+import
org.apache.seatunnel.connectors.seatunnel.clickhouse.util.TypeConvertUtil;
+import com.clickhouse.client.ClickHouseClient;
+import com.clickhouse.client.ClickHouseColumn;
+import com.clickhouse.client.ClickHouseException;
+import com.clickhouse.client.ClickHouseFormat;
+import com.clickhouse.client.ClickHouseNode;
+import com.clickhouse.client.ClickHouseResponse;
import com.google.auto.service.AutoService;
+import java.io.Serializable;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.ThreadLocalRandom;
+
import static
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.CLICKHOUSE_CONFIG;
import static
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.DATABASE;
import static
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.HOST;
@@ -38,6 +62,61 @@ public class ClickhouseSourceFactory implements
TableSourceFactory {
return "Clickhouse";
}
+ @Override
+ public <T, SplitT extends SourceSplit, StateT extends Serializable>
+ TableSource<T, SplitT, StateT>
createSource(TableSourceFactoryContext context) {
+ ReadonlyConfig readonlyConfig = context.getOptions();
+ List<ClickHouseNode> nodes =
ClickhouseUtil.createNodes(readonlyConfig);
+
+ String sql = readonlyConfig.get(SQL);
+ ClickHouseNode currentServer =
nodes.get(ThreadLocalRandom.current().nextInt(nodes.size()));
+ try (ClickHouseClient client =
ClickHouseClient.newInstance(currentServer.getProtocol());
+ ClickHouseResponse response =
+ client.connect(currentServer)
+
.format(ClickHouseFormat.RowBinaryWithNamesAndTypes)
+ .query(modifySQLToLimit1(sql))
+ .executeAndWait()) {
+ TableSchema.Builder builder = TableSchema.builder();
+ List<ClickHouseColumn> columns = response.getColumns();
+ columns.forEach(
+ column -> {
+ PhysicalColumn physicalColumn =
+ PhysicalColumn.of(
+ column.getColumnName(),
+ TypeConvertUtil.convert(column),
+ (long) column.getEstimatedLength(),
+ column.getScale(),
+ column.isNullable(),
+ null,
+ null);
+ builder.column(physicalColumn);
+ });
+ String catalogName = "clickhouse_catalog";
+ CatalogTable catalogTable =
+ CatalogTable.of(
+ TableIdentifier.of(
+ catalogName, readonlyConfig.get(DATABASE),
"default"),
+ builder.build(),
+ Collections.emptyMap(),
+ Collections.emptyList(),
+ "",
+ catalogName);
+ return () ->
+ (SeaTunnelSource<T, SplitT, StateT>)
+ new ClickhouseSource(nodes, catalogTable, sql);
+ } catch (ClickHouseException e) {
+ throw new ClickhouseConnectorException(
+ SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
+ String.format(
+ "PluginName: %s, PluginType: %s, Message: %s",
+ factoryIdentifier(), PluginType.SOURCE,
e.getMessage()));
+ }
+ }
+
+ private String modifySQLToLimit1(String sql) {
+ return String.format("SELECT * FROM (%s) s LIMIT 1", sql);
+ }
+
@Override
public OptionRule optionRule() {
return OptionRule.builder()
diff --git
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ClickhouseProxy.java
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/util/ClickhouseProxy.java
similarity index 98%
rename from
seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ClickhouseProxy.java
rename to
seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/util/ClickhouseProxy.java
index bf0f9a5552..c417818257 100644
---
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ClickhouseProxy.java
+++
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/util/ClickhouseProxy.java
@@ -15,14 +15,13 @@
* limitations under the License.
*/
-package org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.client;
+package org.apache.seatunnel.connectors.seatunnel.clickhouse.util;
import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated;
import
org.apache.seatunnel.connectors.seatunnel.clickhouse.exception.ClickhouseConnectorErrorCode;
import
org.apache.seatunnel.connectors.seatunnel.clickhouse.exception.ClickhouseConnectorException;
import org.apache.seatunnel.connectors.seatunnel.clickhouse.shard.Shard;
-import
org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.DistributedEngine;
import
org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.file.ClickhouseTable;
import com.clickhouse.client.ClickHouseClient;
diff --git
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/util/ClickhouseUtil.java
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/util/ClickhouseUtil.java
index f787cf5c8f..13667d0e40 100644
---
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/util/ClickhouseUtil.java
+++
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/util/ClickhouseUtil.java
@@ -17,6 +17,9 @@
package org.apache.seatunnel.connectors.seatunnel.clickhouse.util;
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig;
+
import org.apache.commons.collections4.MapUtils;
import org.apache.commons.lang3.StringUtils;
@@ -31,6 +34,16 @@ import java.util.stream.Collectors;
public class ClickhouseUtil {
+ public static List<ClickHouseNode> createNodes(ReadonlyConfig config) {
+ return createNodes(
+ config.get(ClickhouseConfig.HOST),
+ config.get(ClickhouseConfig.DATABASE),
+ config.get(ClickhouseConfig.SERVER_TIME_ZONE),
+ config.get(ClickhouseConfig.USERNAME),
+ config.get(ClickhouseConfig.PASSWORD),
+ null);
+ }
+
public static List<ClickHouseNode> createNodes(
String nodeAddress,
String database,
diff --git
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/DistributedEngine.java
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/util/DistributedEngine.java
similarity index 94%
rename from
seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/DistributedEngine.java
rename to
seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/util/DistributedEngine.java
index 067f09fdbc..8974b7cd0c 100644
---
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/DistributedEngine.java
+++
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/util/DistributedEngine.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.seatunnel.connectors.seatunnel.clickhouse.sink;
+package org.apache.seatunnel.connectors.seatunnel.clickhouse.util;
import lombok.AllArgsConstructor;
import lombok.Getter;
diff --git
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/tool/IntHolder.java
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/util/IntHolder.java
similarity index 94%
rename from
seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/tool/IntHolder.java
rename to
seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/util/IntHolder.java
index 02e7be5966..9913d7a408 100644
---
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/tool/IntHolder.java
+++
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/util/IntHolder.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.seatunnel.connectors.seatunnel.clickhouse.tool;
+package org.apache.seatunnel.connectors.seatunnel.clickhouse.util;
import java.io.Serializable;
diff --git
a/seatunnel-connectors-v2/connector-clickhouse/src/test/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/ClickhouseFactoryTest.java
b/seatunnel-connectors-v2/connector-clickhouse/src/test/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/ClickhouseFactoryTest.java
index e6c50b0611..d193b53ea7 100644
---
a/seatunnel-connectors-v2/connector-clickhouse/src/test/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/ClickhouseFactoryTest.java
+++
b/seatunnel-connectors-v2/connector-clickhouse/src/test/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/ClickhouseFactoryTest.java
@@ -17,7 +17,7 @@
package org.apache.seatunnel.connectors.seatunnel.clickhouse;
-import
org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.ClickhouseSinkFactory;
+import
org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.client.ClickhouseSinkFactory;
import
org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.file.ClickhouseFileSinkFactory;
import
org.apache.seatunnel.connectors.seatunnel.clickhouse.source.ClickhouseSourceFactory;