This is an automated email from the ASF dual-hosted git repository.
kirs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 0e4e2b123 [Feature][Connector-V1 & V2] Support unauthorized ClickHouse
(#2393)
0e4e2b123 is described below
commit 0e4e2b1230c019c6a11ec927b6cd47af4b1cfa4e
Author: qianmoQ <[email protected]>
AuthorDate: Thu Aug 11 21:24:09 2022 +0800
[Feature][Connector-V1 & V2] Support unauthorized ClickHouse (#2393)
* [Feature][Connector-V1] Support unauthorized ClickHouse
* [Feature][Connector-V1] Add license
* [Feature][Connector-V2] Support unauthorized ClickHouse
---
.../seatunnel/clickhouse/shard/ShardMetadata.java | 14 ++++++
.../clickhouse/sink/client/ClickhouseSink.java | 53 +++++++++++++++++-----
.../seatunnel/clickhouse/util/ClickhouseUtil.java | 8 +++-
.../flink/clickhouse/sink/ClickhouseBatchSink.java | 6 ++-
.../clickhouse/sink/client/ClickhouseClient.java | 6 ++-
.../spark/clickhouse/sink/Clickhouse.scala | 8 ++--
.../seatunnel-spark-connector-v2-example/pom.xml | 5 ++
...{SeaTunnelApiExample.java => ExampleUtils.java} | 8 ++--
.../example/spark/v2/SeaTunnelApiExample.java | 25 +---------
.../spark/v2/SeaTunnelApiToClickHouseExample.java | 30 ++++++++++++
.../resources/examples/spark.batch.clickhouse.conf | 52 +++++++++++++++++++++
.../seatunnel-spark-examples/pom.xml | 10 ++++
.../spark/LocalSparkToClickHouseExample.java} | 37 ++++++++-------
.../resources/examples/spark.batch.clickhouse.conf | 47 +++++++++++++++++++
14 files changed, 242 insertions(+), 67 deletions(-)
diff --git
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/shard/ShardMetadata.java
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/shard/ShardMetadata.java
index 3c01922f1..c40344b2a 100644
---
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/shard/ShardMetadata.java
+++
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/shard/ShardMetadata.java
@@ -51,6 +51,20 @@ public class ShardMetadata implements Serializable {
this.password = password;
}
+ public ShardMetadata(String shardKey,
+ String shardKeyType,
+ String database,
+ String table,
+ boolean splitMode,
+ Shard defaultShard) {
+ this.shardKey = shardKey;
+ this.shardKeyType = shardKeyType;
+ this.database = database;
+ this.table = table;
+ this.splitMode = splitMode;
+ this.defaultShard = defaultShard;
+ }
+
public String getShardKey() {
return shardKey;
}
diff --git
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ClickhouseSink.java
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ClickhouseSink.java
index 295547c74..c296ca03a 100644
---
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ClickhouseSink.java
+++
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ClickhouseSink.java
@@ -78,7 +78,14 @@ public class ClickhouseSink implements
SeaTunnelSink<SeaTunnelRow, ClickhouseSin
@SuppressWarnings("checkstyle:MagicNumber")
@Override
public void prepare(Config config) throws PrepareFailException {
- CheckResult result = CheckConfigUtil.checkAllExists(config, HOST,
DATABASE, TABLE, USERNAME, PASSWORD);
+ CheckResult result = CheckConfigUtil.checkAllExists(config, HOST,
DATABASE, TABLE);
+
+ boolean isCredential = config.hasPath(USERNAME) ||
config.hasPath(PASSWORD);
+
+ if (isCredential) {
+ result = CheckConfigUtil.checkAllExists(config, USERNAME,
PASSWORD);
+ }
+
if (!result.isSuccess()) {
throw new PrepareFailException(getPluginName(), PluginType.SINK,
result.getMsg());
}
@@ -89,8 +96,14 @@ public class ClickhouseSink implements
SeaTunnelSink<SeaTunnelRow, ClickhouseSin
config = config.withFallback(ConfigFactory.parseMap(defaultConfig));
- List<ClickHouseNode> nodes =
ClickhouseUtil.createNodes(config.getString(HOST),
- config.getString(DATABASE), config.getString(USERNAME),
config.getString(PASSWORD));
+ List<ClickHouseNode> nodes;
+ if (!isCredential) {
+ nodes = ClickhouseUtil.createNodes(config.getString(HOST),
config.getString(DATABASE),
+ null, null);
+ } else {
+ nodes = ClickhouseUtil.createNodes(config.getString(HOST),
+ config.getString(DATABASE), config.getString(USERNAME),
config.getString(PASSWORD));
+ }
Properties clickhouseProperties = new Properties();
if (TypesafeConfigUtils.hasSubConfig(config, CLICKHOUSE_PREFIX)) {
@@ -98,8 +111,11 @@ public class ClickhouseSink implements
SeaTunnelSink<SeaTunnelRow, ClickhouseSin
clickhouseProperties.put(e.getKey(),
String.valueOf(e.getValue().unwrapped()));
});
}
- clickhouseProperties.put("user", config.getString(USERNAME));
- clickhouseProperties.put("password", config.getString(PASSWORD));
+
+ if (isCredential) {
+ clickhouseProperties.put("user", config.getString(USERNAME));
+ clickhouseProperties.put("password", config.getString(PASSWORD));
+ }
ClickhouseProxy proxy = new ClickhouseProxy(nodes.get(0));
Map<String, String> tableSchema =
proxy.getClickhouseTableSchema(config.getString(TABLE));
@@ -117,13 +133,26 @@ public class ClickhouseSink implements
SeaTunnelSink<SeaTunnelRow, ClickhouseSin
shardKeyType = tableSchema.get(shardKey);
}
}
- ShardMetadata metadata = new ShardMetadata(
- shardKey,
- shardKeyType,
- config.getString(DATABASE),
- config.getString(TABLE),
- config.getBoolean(SPLIT_MODE),
- new Shard(1, 1, nodes.get(0)), config.getString(USERNAME),
config.getString(PASSWORD));
+ ShardMetadata metadata;
+
+ if (isCredential) {
+ metadata = new ShardMetadata(
+ shardKey,
+ shardKeyType,
+ config.getString(DATABASE),
+ config.getString(TABLE),
+ config.getBoolean(SPLIT_MODE),
+ new Shard(1, 1, nodes.get(0)), config.getString(USERNAME),
config.getString(PASSWORD));
+ } else {
+ metadata = new ShardMetadata(
+ shardKey,
+ shardKeyType,
+ config.getString(DATABASE),
+ config.getString(TABLE),
+ config.getBoolean(SPLIT_MODE),
+ new Shard(1, 1, nodes.get(0)));
+ }
+
List<String> fields = new ArrayList<>();
if (config.hasPath(FIELDS)) {
fields.addAll(config.getStringList(FIELDS));
diff --git
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/util/ClickhouseUtil.java
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/util/ClickhouseUtil.java
index 38c835831..1e5e0ef51 100644
---
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/util/ClickhouseUtil.java
+++
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/util/ClickhouseUtil.java
@@ -20,6 +20,7 @@ package
org.apache.seatunnel.connectors.seatunnel.clickhouse.util;
import com.clickhouse.client.ClickHouseCredentials;
import com.clickhouse.client.ClickHouseNode;
import com.clickhouse.client.ClickHouseProtocol;
+import org.apache.commons.lang3.StringUtils;
import java.util.Arrays;
import java.util.List;
@@ -28,13 +29,16 @@ import java.util.stream.Collectors;
public class ClickhouseUtil {
public static List<ClickHouseNode> createNodes(String nodeAddress, String
database, String username,
- String password) {
+ String password) {
return Arrays.stream(nodeAddress.split(",")).map(address -> {
String[] nodeAndPort = address.split(":", 2);
+ if (StringUtils.isEmpty(username) &&
StringUtils.isEmpty(password)) {
+ return
ClickHouseNode.builder().host(nodeAndPort[0]).port(ClickHouseProtocol.HTTP,
+
Integer.parseInt(nodeAndPort[1])).database(database).build();
+ }
return
ClickHouseNode.builder().host(nodeAndPort[0]).port(ClickHouseProtocol.HTTP,
Integer.parseInt(nodeAndPort[1])).database(database)
.credentials(ClickHouseCredentials.fromUserAndPassword(username,
password)).build();
}).collect(Collectors.toList());
}
-
}
diff --git
a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-clickhouse/src/main/java/org/apache/seatunnel/flink/clickhouse/sink/ClickhouseBatchSink.java
b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-clickhouse/src/main/java/org/apache/seatunnel/flink/clickhouse/sink/ClickhouseBatchSink.java
index 369de91f4..5c9ed0d4d 100644
---
a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-clickhouse/src/main/java/org/apache/seatunnel/flink/clickhouse/sink/ClickhouseBatchSink.java
+++
b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-clickhouse/src/main/java/org/apache/seatunnel/flink/clickhouse/sink/ClickhouseBatchSink.java
@@ -82,7 +82,11 @@ public class ClickhouseBatchSink implements FlinkBatchSink {
@Override
public CheckResult checkConfig() {
- return CheckConfigUtil.checkAllExists(config, HOST, TABLE, DATABASE,
USERNAME, PASSWORD);
+ if (config.hasPath(USERNAME) && config.hasPath(PASSWORD)) {
+ return CheckConfigUtil.checkAllExists(config, HOST, TABLE,
DATABASE, USERNAME, PASSWORD);
+ } else {
+ return CheckConfigUtil.checkAllExists(config, HOST, TABLE,
DATABASE);
+ }
}
@Override
diff --git
a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-clickhouse/src/main/java/org/apache/seatunnel/flink/clickhouse/sink/client/ClickhouseClient.java
b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-clickhouse/src/main/java/org/apache/seatunnel/flink/clickhouse/sink/client/ClickhouseClient.java
index ab91a6dde..53925858e 100644
---
a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-clickhouse/src/main/java/org/apache/seatunnel/flink/clickhouse/sink/client/ClickhouseClient.java
+++
b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-clickhouse/src/main/java/org/apache/seatunnel/flink/clickhouse/sink/client/ClickhouseClient.java
@@ -63,8 +63,10 @@ public class ClickhouseClient {
clickhouseProperties.put(e.getKey(),
String.valueOf(e.getValue().unwrapped()));
});
}
- clickhouseProperties.put("user", config.getString(USERNAME));
- clickhouseProperties.put("password", config.getString(PASSWORD));
+ if (config.hasPath(USERNAME) && config.hasPath(PASSWORD)) {
+ clickhouseProperties.put("user", config.getString(USERNAME));
+ clickhouseProperties.put("password", config.getString(PASSWORD));
+ }
String jdbcUrl = "jdbc:clickhouse://" + config.getString(HOST) + "/" +
config.getString(DATABASE);
this.balancedClickhouseDataSource = new
BalancedClickhouseDataSource(jdbcUrl, clickhouseProperties);
}
diff --git
a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-clickhouse/src/main/scala/org/apache/seatunnel/spark/clickhouse/sink/Clickhouse.scala
b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-clickhouse/src/main/scala/org/apache/seatunnel/spark/clickhouse/sink/Clickhouse.scala
index 99c0ae51b..bed3b6e01 100644
---
a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-clickhouse/src/main/scala/org/apache/seatunnel/spark/clickhouse/sink/Clickhouse.scala
+++
b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-clickhouse/src/main/scala/org/apache/seatunnel/spark/clickhouse/sink/Clickhouse.scala
@@ -116,7 +116,7 @@ class Clickhouse extends SparkBatchSink {
}
override def checkConfig(): CheckResult = {
- var checkResult = checkAllExists(config, HOST, TABLE, DATABASE, USERNAME,
PASSWORD)
+ var checkResult = checkAllExists(config, HOST, TABLE, DATABASE)
if (checkResult.isSuccess) {
if (hasSubConfig(config, clickhousePrefix)) {
extractSubConfig(config, clickhousePrefix, false).entrySet().foreach(e
=> {
@@ -124,8 +124,10 @@ class Clickhouse extends SparkBatchSink {
})
}
- properties.put("user", config.getString(USERNAME))
- properties.put("password", config.getString(PASSWORD))
+ if (config.hasPath(USERNAME) && config.hasPath(PASSWORD)) {
+ properties.put("user", config.getString(USERNAME))
+ properties.put("password", config.getString(PASSWORD))
+ }
if (config.hasPath(SPLIT_MODE)) {
splitMode = config.getBoolean(SPLIT_MODE)
diff --git a/seatunnel-examples/seatunnel-spark-connector-v2-example/pom.xml
b/seatunnel-examples/seatunnel-spark-connector-v2-example/pom.xml
index 8c3978e53..b0d88aba4 100644
--- a/seatunnel-examples/seatunnel-spark-connector-v2-example/pom.xml
+++ b/seatunnel-examples/seatunnel-spark-connector-v2-example/pom.xml
@@ -50,6 +50,11 @@
<artifactId>connector-console</artifactId>
<version>${project.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.seatunnel</groupId>
+ <artifactId>connector-clickhouse</artifactId>
+ <version>${project.version}</version>
+ </dependency>
<!-- seatunnel connectors -->
<!--spark-->
diff --git
a/seatunnel-examples/seatunnel-spark-connector-v2-example/src/main/java/org/apache/seatunnel/example/spark/v2/SeaTunnelApiExample.java
b/seatunnel-examples/seatunnel-spark-connector-v2-example/src/main/java/org/apache/seatunnel/example/spark/v2/ExampleUtils.java
similarity index 85%
copy from
seatunnel-examples/seatunnel-spark-connector-v2-example/src/main/java/org/apache/seatunnel/example/spark/v2/SeaTunnelApiExample.java
copy to
seatunnel-examples/seatunnel-spark-connector-v2-example/src/main/java/org/apache/seatunnel/example/spark/v2/ExampleUtils.java
index a4a84986c..a3937dedf 100644
---
a/seatunnel-examples/seatunnel-spark-connector-v2-example/src/main/java/org/apache/seatunnel/example/spark/v2/SeaTunnelApiExample.java
+++
b/seatunnel-examples/seatunnel-spark-connector-v2-example/src/main/java/org/apache/seatunnel/example/spark/v2/ExampleUtils.java
@@ -29,10 +29,10 @@ import java.net.URISyntaxException;
import java.net.URL;
import java.nio.file.Paths;
-public class SeaTunnelApiExample {
+public class ExampleUtils {
- public static void main(String[] args) throws FileNotFoundException,
URISyntaxException, CommandException {
- String configFile = getTestConfigFile("/examples/spark.batch.conf");
+ public static void builder(String configurePath) throws
FileNotFoundException, URISyntaxException, CommandException {
+ String configFile = getTestConfigFile(configurePath);
SparkCommandArgs sparkCommandArgs = new SparkCommandArgs();
sparkCommandArgs.setConfigFile(configFile);
sparkCommandArgs.setCheckConfig(false);
@@ -43,7 +43,7 @@ public class SeaTunnelApiExample {
Seatunnel.run(sparkCommand);
}
- public static String getTestConfigFile(String configFile) throws
FileNotFoundException, URISyntaxException {
+ private static String getTestConfigFile(String configFile) throws
FileNotFoundException, URISyntaxException {
URL resource = SeaTunnelApiExample.class.getResource(configFile);
if (resource == null) {
throw new FileNotFoundException("Can't find config file: " +
configFile);
diff --git
a/seatunnel-examples/seatunnel-spark-connector-v2-example/src/main/java/org/apache/seatunnel/example/spark/v2/SeaTunnelApiExample.java
b/seatunnel-examples/seatunnel-spark-connector-v2-example/src/main/java/org/apache/seatunnel/example/spark/v2/SeaTunnelApiExample.java
index a4a84986c..be4a3c923 100644
---
a/seatunnel-examples/seatunnel-spark-connector-v2-example/src/main/java/org/apache/seatunnel/example/spark/v2/SeaTunnelApiExample.java
+++
b/seatunnel-examples/seatunnel-spark-connector-v2-example/src/main/java/org/apache/seatunnel/example/spark/v2/SeaTunnelApiExample.java
@@ -17,37 +17,14 @@
package org.apache.seatunnel.example.spark.v2;
-import org.apache.seatunnel.common.config.DeployMode;
-import org.apache.seatunnel.core.starter.Seatunnel;
-import org.apache.seatunnel.core.starter.command.Command;
import org.apache.seatunnel.core.starter.exception.CommandException;
-import org.apache.seatunnel.core.starter.spark.args.SparkCommandArgs;
-import org.apache.seatunnel.core.starter.spark.command.SparkCommandBuilder;
import java.io.FileNotFoundException;
import java.net.URISyntaxException;
-import java.net.URL;
-import java.nio.file.Paths;
public class SeaTunnelApiExample {
public static void main(String[] args) throws FileNotFoundException,
URISyntaxException, CommandException {
- String configFile = getTestConfigFile("/examples/spark.batch.conf");
- SparkCommandArgs sparkCommandArgs = new SparkCommandArgs();
- sparkCommandArgs.setConfigFile(configFile);
- sparkCommandArgs.setCheckConfig(false);
- sparkCommandArgs.setVariables(null);
- sparkCommandArgs.setDeployMode(DeployMode.CLIENT);
- Command<SparkCommandArgs> sparkCommand =
- new SparkCommandBuilder().buildCommand(sparkCommandArgs);
- Seatunnel.run(sparkCommand);
- }
-
- public static String getTestConfigFile(String configFile) throws
FileNotFoundException, URISyntaxException {
- URL resource = SeaTunnelApiExample.class.getResource(configFile);
- if (resource == null) {
- throw new FileNotFoundException("Can't find config file: " +
configFile);
- }
- return Paths.get(resource.toURI()).toString();
+ ExampleUtils.builder("/examples/spark.batch.conf");
}
}
diff --git
a/seatunnel-examples/seatunnel-spark-connector-v2-example/src/main/java/org/apache/seatunnel/example/spark/v2/SeaTunnelApiToClickHouseExample.java
b/seatunnel-examples/seatunnel-spark-connector-v2-example/src/main/java/org/apache/seatunnel/example/spark/v2/SeaTunnelApiToClickHouseExample.java
new file mode 100644
index 000000000..b078c30aa
--- /dev/null
+++
b/seatunnel-examples/seatunnel-spark-connector-v2-example/src/main/java/org/apache/seatunnel/example/spark/v2/SeaTunnelApiToClickHouseExample.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.example.spark.v2;
+
+import org.apache.seatunnel.core.starter.exception.CommandException;
+
+import java.io.FileNotFoundException;
+import java.net.URISyntaxException;
+
+public class SeaTunnelApiToClickHouseExample {
+
+ public static void main(String[] args) throws FileNotFoundException,
URISyntaxException, CommandException {
+ ExampleUtils.builder("/examples/spark.batch.clickhouse.conf");
+ }
+}
diff --git
a/seatunnel-examples/seatunnel-spark-connector-v2-example/src/main/resources/examples/spark.batch.clickhouse.conf
b/seatunnel-examples/seatunnel-spark-connector-v2-example/src/main/resources/examples/spark.batch.clickhouse.conf
new file mode 100644
index 000000000..2e27410ee
--- /dev/null
+++
b/seatunnel-examples/seatunnel-spark-connector-v2-example/src/main/resources/examples/spark.batch.clickhouse.conf
@@ -0,0 +1,52 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+ spark.app.name = "SeaTunnelToClickHouseV2"
+ spark.executor.instances = 2
+ spark.executor.cores = 1
+ spark.executor.memory = "1g"
+ spark.master = local
+}
+
+source {
+ FakeSource {
+ result_table_name = "fake"
+ field_name = "name,age"
+ }
+}
+
+
+transform {
+ sql {
+ sql = "select name,age from fake"
+ result_table_name = "sql"
+ }
+}
+
+sink {
+ ClickHouse {
+ host = "139.198.158.103:8123"
+ database = "default"
+ table = "test_clickhouse_table_v2"
+ fields = ["name", "age"]
+ username = 'default'
+ bulk_size = 20000
+ retry_codes = [209, 210]
+ retry = 3
+ }
+}
diff --git a/seatunnel-examples/seatunnel-spark-examples/pom.xml
b/seatunnel-examples/seatunnel-spark-examples/pom.xml
index 757aed3dc..49c634f0b 100644
--- a/seatunnel-examples/seatunnel-spark-examples/pom.xml
+++ b/seatunnel-examples/seatunnel-spark-examples/pom.xml
@@ -49,6 +49,16 @@
<artifactId>seatunnel-connector-spark-console</artifactId>
<version>${project.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.seatunnel</groupId>
+ <artifactId>seatunnel-connector-spark-clickhouse</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.seatunnel</groupId>
+ <artifactId>connector-clickhouse</artifactId>
+ <version>${project.version}</version>
+ </dependency>
<!-- seatunnel connectors -->
<!--spark-->
diff --git
a/seatunnel-examples/seatunnel-spark-connector-v2-example/src/main/java/org/apache/seatunnel/example/spark/v2/SeaTunnelApiExample.java
b/seatunnel-examples/seatunnel-spark-examples/src/main/java/org/apache/seatunnel/example/spark/LocalSparkToClickHouseExample.java
similarity index 52%
copy from
seatunnel-examples/seatunnel-spark-connector-v2-example/src/main/java/org/apache/seatunnel/example/spark/v2/SeaTunnelApiExample.java
copy to
seatunnel-examples/seatunnel-spark-examples/src/main/java/org/apache/seatunnel/example/spark/LocalSparkToClickHouseExample.java
index a4a84986c..eeff4baa7 100644
---
a/seatunnel-examples/seatunnel-spark-connector-v2-example/src/main/java/org/apache/seatunnel/example/spark/v2/SeaTunnelApiExample.java
+++
b/seatunnel-examples/seatunnel-spark-examples/src/main/java/org/apache/seatunnel/example/spark/LocalSparkToClickHouseExample.java
@@ -15,38 +15,37 @@
* limitations under the License.
*/
-package org.apache.seatunnel.example.spark.v2;
+package org.apache.seatunnel.example.spark;
import org.apache.seatunnel.common.config.DeployMode;
-import org.apache.seatunnel.core.starter.Seatunnel;
-import org.apache.seatunnel.core.starter.command.Command;
-import org.apache.seatunnel.core.starter.exception.CommandException;
-import org.apache.seatunnel.core.starter.spark.args.SparkCommandArgs;
-import org.apache.seatunnel.core.starter.spark.command.SparkCommandBuilder;
+import org.apache.seatunnel.core.base.Seatunnel;
+import org.apache.seatunnel.core.base.command.Command;
+import org.apache.seatunnel.core.base.exception.CommandException;
+import org.apache.seatunnel.core.spark.args.SparkCommandArgs;
+import org.apache.seatunnel.core.spark.command.SparkCommandBuilder;
import java.io.FileNotFoundException;
import java.net.URISyntaxException;
import java.net.URL;
import java.nio.file.Paths;
-public class SeaTunnelApiExample {
+public class LocalSparkToClickHouseExample {
- public static void main(String[] args) throws FileNotFoundException,
URISyntaxException, CommandException {
- String configFile = getTestConfigFile("/examples/spark.batch.conf");
- SparkCommandArgs sparkCommandArgs = new SparkCommandArgs();
- sparkCommandArgs.setConfigFile(configFile);
- sparkCommandArgs.setCheckConfig(false);
- sparkCommandArgs.setVariables(null);
- sparkCommandArgs.setDeployMode(DeployMode.CLIENT);
- Command<SparkCommandArgs> sparkCommand =
- new SparkCommandBuilder().buildCommand(sparkCommandArgs);
+ public static void main(String[] args) throws URISyntaxException,
FileNotFoundException, CommandException {
+ String configFile =
getTestConfigFile("/examples/spark.batch.clickhouse.conf");
+ SparkCommandArgs sparkArgs = new SparkCommandArgs();
+ sparkArgs.setConfigFile(configFile);
+ sparkArgs.setCheckConfig(false);
+ sparkArgs.setVariables(null);
+ sparkArgs.setDeployMode(DeployMode.CLIENT);
+ Command<SparkCommandArgs> sparkCommand = new
SparkCommandBuilder().buildCommand(sparkArgs);
Seatunnel.run(sparkCommand);
}
- public static String getTestConfigFile(String configFile) throws
FileNotFoundException, URISyntaxException {
- URL resource = SeaTunnelApiExample.class.getResource(configFile);
+ public static String getTestConfigFile(String configFile) throws
URISyntaxException, FileNotFoundException {
+ URL resource =
LocalSparkToClickHouseExample.class.getResource(configFile);
if (resource == null) {
- throw new FileNotFoundException("Can't find config file: " +
configFile);
+ throw new FileNotFoundException("Could not find config file: " +
configFile);
}
return Paths.get(resource.toURI()).toString();
}
diff --git
a/seatunnel-examples/seatunnel-spark-examples/src/main/resources/examples/spark.batch.clickhouse.conf
b/seatunnel-examples/seatunnel-spark-examples/src/main/resources/examples/spark.batch.clickhouse.conf
new file mode 100644
index 000000000..caaed1199
--- /dev/null
+++
b/seatunnel-examples/seatunnel-spark-examples/src/main/resources/examples/spark.batch.clickhouse.conf
@@ -0,0 +1,47 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+ spark.app.name = "SeaTunnelToClickHouse"
+ spark.executor.instances = 2
+ spark.executor.cores = 1
+ spark.executor.memory = "1g"
+ spark.master = local
+}
+
+source {
+ Fake {
+ result_table_name = "my_dataset"
+ }
+}
+
+
+transform {}
+
+sink {
+ ClickHouse {
+ host = "139.198.158.103:8123"
+ database = "default"
+ table = "test_clickhouse_table"
+ fields = ["name", "age"]
+ username = 'default'
+ bulk_size = 20000
+ retry_codes = [209, 210]
+ retry = 3
+ split_mode = true
+ }
+}