This is an automated email from the ASF dual-hosted git repository.
fanjia pushed a commit to branch api-draft
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/api-draft by this push:
new ebf9489f2 [Api-draft] Add clickhouse source support (#2013)
ebf9489f2 is described below
commit ebf9489f2bd510c4d8d1de53550421a6ab33f1b8
Author: Hisoka <[email protected]>
AuthorDate: Wed Jun 15 11:52:58 2022 +0800
[Api-draft] Add clickhouse source support (#2013)
* fix maven dependency duplicate
* add clickhouse source templete code
* add clickhouse source support
* add clickhouse source support
* add local time support
* fix merge error
* remove useless code
* remove useless code
* add license
---
pom.xml | 2 +-
.../apache/seatunnel/api/table/type/BasicType.java | 11 +-
.../seatunnel-connectors-seatunnel-dist/pom.xml | 5 +
.../seatunnel-connectors-seatunnel/pom.xml | 1 +
.../pom.xml | 47 +++++++
.../seatunnel/clickhouse/config/Config.java | 35 +++++
.../clickhouse/source/ClickhouseSource.java | 151 +++++++++++++++++++++
.../clickhouse/source/ClickhouseSourceReader.java | 108 +++++++++++++++
.../clickhouse/source/ClickhouseSourceSplit.java | 27 ++++
.../source/ClickhouseSourceSplitEnumerator.java | 99 ++++++++++++++
.../clickhouse/state/ClickhouseSourceState.java | 23 ++++
.../seatunnel/clickhouse/util/TypeConvertUtil.java | 110 +++++++++++++++
.../seatunnel-connector-seatunnel-kafka/pom.xml | 1 +
.../spark/execution/SinkExecuteProcessor.java | 1 +
seatunnel-dist/release-docs/LICENSE | 2 +
.../seatunnel-flink-examples/pom.xml | 5 -
.../seatunnel-flink-new-connector-example/pom.xml | 5 -
.../seatunnel-flink-sql-examples/pom.xml | 7 +-
.../seatunnel-spark-examples/pom.xml | 5 -
.../seatunnel-spark-new-connector-example/pom.xml | 5 -
.../translation/source/CoordinatedSource.java | 1 +
.../translation/source/ParallelSource.java | 3 +
.../flink/utils/TypeConverterUtils.java | 1 -
.../batch/CoordinatedBatchPartitionReader.java | 1 +
.../CoordinatedMicroBatchPartitionReader.java | 1 +
tools/dependencies/known-dependencies.txt | 2 +
26 files changed, 626 insertions(+), 33 deletions(-)
diff --git a/pom.xml b/pom.xml
index 0e479d784..d8df74e2c 100644
--- a/pom.xml
+++ b/pom.xml
@@ -40,7 +40,7 @@
<licenses>
<license>
<name>The Apache License, Version 2.0</name>
- <url>http://www.apache.org/licenses/LICENSE-2.0.txt</url>
+ <url>https://www.apache.org/licenses/LICENSE-2.0.txt</url>
</license>
</licenses>
diff --git
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/BasicType.java
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/BasicType.java
index 2c61b4e32..74498c7b9 100644
---
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/BasicType.java
+++
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/BasicType.java
@@ -21,6 +21,7 @@ import java.math.BigDecimal;
import java.math.BigInteger;
import java.time.Instant;
import java.util.Date;
+import java.util.Objects;
public class BasicType<T> implements SeaTunnelDataType<T> {
private static final long serialVersionUID = 1L;
@@ -64,15 +65,15 @@ public class BasicType<T> implements SeaTunnelDataType<T> {
}
@Override
- public boolean equals(Object obj) {
- if (this == obj) {
+ public boolean equals(Object o) {
+ if (this == o) {
return true;
}
- if (obj == null) {
+ if (o == null || getClass() != o.getClass()) {
return false;
}
- BasicType<?> other = (BasicType<?>) obj;
- return this.physicalTypeClass.equals(other.physicalTypeClass);
+ BasicType<?> basicType = (BasicType<?>) o;
+ return Objects.equals(physicalTypeClass, basicType.physicalTypeClass);
}
@Override
diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel-dist/pom.xml
b/seatunnel-connectors/seatunnel-connectors-seatunnel-dist/pom.xml
index d00131697..ba8c8be33 100644
--- a/seatunnel-connectors/seatunnel-connectors-seatunnel-dist/pom.xml
+++ b/seatunnel-connectors/seatunnel-connectors-seatunnel-dist/pom.xml
@@ -60,6 +60,11 @@
<artifactId>seatunnel-connector-seatunnel-socket</artifactId>
<version>${project.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.seatunnel</groupId>
+ <artifactId>seatunnel-connector-seatunnel-clickhouse</artifactId>
+ <version>${project.version}</version>
+ </dependency>
</dependencies>
<build>
diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/pom.xml
b/seatunnel-connectors/seatunnel-connectors-seatunnel/pom.xml
index 86d0b5271..89ebf68d0 100644
--- a/seatunnel-connectors/seatunnel-connectors-seatunnel/pom.xml
+++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/pom.xml
@@ -37,5 +37,6 @@
<module>seatunnel-connector-seatunnel-kafka</module>
<module>seatunnel-connector-seatunnel-jdbc</module>
<module>seatunnel-connector-seatunnel-socket</module>
+ <module>seatunnel-connector-seatunnel-clickhouse</module>
</modules>
</project>
\ No newline at end of file
diff --git
a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/pom.xml
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/pom.xml
new file mode 100644
index 000000000..e3fb148aa
--- /dev/null
+++
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/pom.xml
@@ -0,0 +1,47 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one or more
+ ~ contributor license agreements. See the NOTICE file distributed with
+ ~ this work for additional information regarding copyright ownership.
+ ~ The ASF licenses this file to You under the Apache License, Version 2.0
+ ~ (the "License"); you may not use this file except in compliance with
+ ~ the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <parent>
+ <artifactId>seatunnel-connectors-seatunnel</artifactId>
+ <groupId>org.apache.seatunnel</groupId>
+ <version>${revision}</version>
+ </parent>
+ <modelVersion>4.0.0</modelVersion>
+
+ <artifactId>seatunnel-connector-seatunnel-clickhouse</artifactId>
+
+ <dependencies>
+
+ <dependency>
+ <groupId>org.apache.seatunnel</groupId>
+ <artifactId>seatunnel-api</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
+ <!-- TODO add to dependency management after version unify -->
+ <dependency>
+ <groupId>com.clickhouse</groupId>
+ <artifactId>clickhouse-http-client</artifactId>
+ <version>0.3.2-patch9</version>
+ </dependency>
+ </dependencies>
+
+</project>
\ No newline at end of file
diff --git
a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/config/Config.java
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/config/Config.java
new file mode 100644
index 000000000..65b7af7c6
--- /dev/null
+++
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/config/Config.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.clickhouse.config;
+
+/**
+ * The config of clickhouse
+ */
+public class Config {
+
+ public static final String NODE_ADDRESS = "node_address";
+
+ public static final String DATABASE = "database";
+
+ public static final String SQL = "sql";
+
+ public static final String USERNAME = "username";
+
+ public static final String PASSWORD = "password";
+
+}
diff --git
a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseSource.java
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseSource.java
new file mode 100644
index 000000000..59e6b5cdb
--- /dev/null
+++
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseSource.java
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.clickhouse.source;
+
+import static
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.Config.DATABASE;
+import static
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.Config.NODE_ADDRESS;
+import static
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.Config.PASSWORD;
+import static
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.Config.SQL;
+import static
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.Config.USERNAME;
+
+import org.apache.seatunnel.api.common.PrepareFailException;
+import org.apache.seatunnel.api.common.SeaTunnelContext;
+import org.apache.seatunnel.api.serialization.DefaultSerializer;
+import org.apache.seatunnel.api.serialization.Serializer;
+import org.apache.seatunnel.api.source.Boundedness;
+import org.apache.seatunnel.api.source.SeaTunnelSource;
+import org.apache.seatunnel.api.source.SourceReader;
+import org.apache.seatunnel.api.source.SourceSplitEnumerator;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.common.config.CheckConfigUtil;
+import org.apache.seatunnel.common.config.CheckResult;
+import org.apache.seatunnel.common.constants.PluginType;
+import
org.apache.seatunnel.connectors.seatunnel.clickhouse.state.ClickhouseSourceState;
+import
org.apache.seatunnel.connectors.seatunnel.clickhouse.util.TypeConvertUtil;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import com.clickhouse.client.ClickHouseClient;
+import com.clickhouse.client.ClickHouseCredentials;
+import com.clickhouse.client.ClickHouseException;
+import com.clickhouse.client.ClickHouseFormat;
+import com.clickhouse.client.ClickHouseNode;
+import com.clickhouse.client.ClickHouseProtocol;
+import com.clickhouse.client.ClickHouseResponse;
+import com.google.auto.service.AutoService;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.stream.Collectors;
+
+@AutoService(SeaTunnelSource.class)
+public class ClickhouseSource implements SeaTunnelSource<SeaTunnelRow,
ClickhouseSourceSplit, ClickhouseSourceState> {
+
+ private SeaTunnelContext seaTunnelContext;
+ private List<ClickHouseNode> servers;
+ private SeaTunnelRowType rowTypeInfo;
+ private String sql;
+
+ @Override
+ public String getPluginName() {
+ return "Clickhouse";
+ }
+
+ @Override
+ public void prepare(Config config) throws PrepareFailException {
+ CheckResult result = CheckConfigUtil.checkAllExists(config,
NODE_ADDRESS, DATABASE, SQL, USERNAME, PASSWORD);
+ if (!result.isSuccess()) {
+ throw new PrepareFailException(getPluginName(), PluginType.SOURCE,
result.getMsg());
+ }
+ servers =
Arrays.stream(config.getString(NODE_ADDRESS).split(",")).map(address -> {
+ String[] nodeAndPort = address.split(":", 2);
+ return
ClickHouseNode.builder().host(nodeAndPort[0]).port(ClickHouseProtocol.HTTP,
+
Integer.parseInt(nodeAndPort[1])).database(config.getString(DATABASE))
+
.credentials(ClickHouseCredentials.fromUserAndPassword(config.getString(USERNAME),
+ config.getString(PASSWORD))).build();
+ }).collect(Collectors.toList());
+
+ sql = config.getString(SQL);
+ try (ClickHouseClient client =
ClickHouseClient.newInstance(servers.get(0).getProtocol());
+ ClickHouseResponse response =
+
client.connect(servers.get(0)).format(ClickHouseFormat.RowBinaryWithNamesAndTypes)
+
.query(modifySQLToLimit1(config.getString(SQL))).executeAndWait()) {
+
+ int columnSize = response.getColumns().size();
+ String[] fieldNames = new String[columnSize];
+ SeaTunnelDataType<?>[] seaTunnelDataTypes = new
SeaTunnelDataType[columnSize];
+
+ for (int i = 0; i < columnSize; i++) {
+ fieldNames[i] = response.getColumns().get(i).getColumnName();
+ seaTunnelDataTypes[i] =
TypeConvertUtil.convert(response.getColumns().get(i).getDataType());
+ }
+
+ this.rowTypeInfo = new SeaTunnelRowType(fieldNames,
seaTunnelDataTypes);
+
+ } catch (ClickHouseException e) {
+ throw new PrepareFailException(getPluginName(), PluginType.SOURCE,
e.getMessage());
+ }
+
+ }
+
+ private String modifySQLToLimit1(String sql) {
+ return String.format("SELECT * FROM (%s) s LIMIT 1", sql);
+ }
+
+ @Override
+ public Boundedness getBoundedness() {
+ return Boundedness.BOUNDED;
+ }
+
+ @Override
+ public SeaTunnelRowType getProducedType() {
+ return this.rowTypeInfo;
+ }
+
+ @Override
+ public SourceReader<SeaTunnelRow, ClickhouseSourceSplit>
createReader(SourceReader.Context readerContext) throws Exception {
+ return new ClickhouseSourceReader(servers, readerContext,
this.rowTypeInfo, sql);
+ }
+
+ @Override
+ public SourceSplitEnumerator<ClickhouseSourceSplit, ClickhouseSourceState>
createEnumerator(SourceSplitEnumerator.Context<ClickhouseSourceSplit>
enumeratorContext) throws Exception {
+ return new ClickhouseSourceSplitEnumerator(enumeratorContext);
+ }
+
+ @Override
+ public SourceSplitEnumerator<ClickhouseSourceSplit, ClickhouseSourceState>
restoreEnumerator(SourceSplitEnumerator.Context<ClickhouseSourceSplit>
enumeratorContext, ClickhouseSourceState checkpointState) throws Exception {
+ return new ClickhouseSourceSplitEnumerator(enumeratorContext);
+ }
+
+ @Override
+ public Serializer<ClickhouseSourceState> getEnumeratorStateSerializer() {
+ return new DefaultSerializer<>();
+ }
+
+ @Override
+ public SeaTunnelContext getSeaTunnelContext() {
+ return seaTunnelContext;
+ }
+
+ @Override
+ public void setSeaTunnelContext(SeaTunnelContext seaTunnelContext) {
+ this.seaTunnelContext = seaTunnelContext;
+ }
+}
diff --git
a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseSourceReader.java
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseSourceReader.java
new file mode 100644
index 000000000..72a461061
--- /dev/null
+++
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseSourceReader.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.clickhouse.source;
+
+import org.apache.seatunnel.api.source.Collector;
+import org.apache.seatunnel.api.source.SourceReader;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import
org.apache.seatunnel.connectors.seatunnel.clickhouse.util.TypeConvertUtil;
+
+import com.clickhouse.client.ClickHouseClient;
+import com.clickhouse.client.ClickHouseFormat;
+import com.clickhouse.client.ClickHouseNode;
+import com.clickhouse.client.ClickHouseRequest;
+import com.clickhouse.client.ClickHouseResponse;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+
+public class ClickhouseSourceReader implements SourceReader<SeaTunnelRow,
ClickhouseSourceSplit> {
+
+ private final List<ClickHouseNode> servers;
+ private ClickHouseClient client;
+ private final SeaTunnelRowType rowTypeInfo;
+ private final SourceReader.Context readerContext;
+ private ClickHouseRequest<?> request;
+ private final String sql;
+
+ private final List<ClickhouseSourceSplit> splits;
+
+ ClickhouseSourceReader(List<ClickHouseNode> servers, SourceReader.Context
readerContext,
+ SeaTunnelRowType rowTypeInfo, String sql) {
+ this.servers = servers;
+ this.readerContext = readerContext;
+ this.rowTypeInfo = rowTypeInfo;
+ this.sql = sql;
+ this.splits = new ArrayList<>();
+ }
+
+ @Override
+ public void open() {
+ Random random = new Random();
+ ClickHouseNode server = servers.get(random.nextInt(servers.size()));
+ client = ClickHouseClient.newInstance(server.getProtocol());
+ request =
client.connect(server).format(ClickHouseFormat.RowBinaryWithNamesAndTypes);
+ }
+
+ @Override
+ public void close() throws IOException {
+ if (client != null) {
+ client.close();
+ }
+ }
+
+ @Override
+ public void pollNext(Collector<SeaTunnelRow> output) throws Exception {
+ if (!splits.isEmpty()) {
+ try (ClickHouseResponse response =
this.request.query(sql).executeAndWait()) {
+ response.stream().forEach(record -> {
+ Object[] values = new
Object[this.rowTypeInfo.getFieldNames().length];
+ for (int i = 0; i < record.size(); i++) {
+ values[i] = TypeConvertUtil.valueUnwrap(
+ this.rowTypeInfo.getFieldType(i),
record.getValue(i));
+ }
+ output.collect(new SeaTunnelRow(values));
+ });
+ }
+ this.readerContext.signalNoMoreElement();
+ }
+ }
+
+ @Override
+ public List<ClickhouseSourceSplit> snapshotState(long checkpointId) throws
Exception {
+ return null;
+ }
+
+ @Override
+ public void addSplits(List<ClickhouseSourceSplit> splits) {
+ this.splits.addAll(splits);
+ }
+
+ @Override
+ public void handleNoMoreSplits() {
+
+ }
+
+ @Override
+ public void notifyCheckpointComplete(long checkpointId) throws Exception {
+
+ }
+}
diff --git
a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseSourceSplit.java
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseSourceSplit.java
new file mode 100644
index 000000000..e70d2cd44
--- /dev/null
+++
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseSourceSplit.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.clickhouse.source;
+
+import org.apache.seatunnel.api.source.SourceSplit;
+
+public class ClickhouseSourceSplit implements SourceSplit {
+ @Override
+ public String splitId() {
+ return null;
+ }
+}
diff --git
a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseSourceSplitEnumerator.java
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseSourceSplitEnumerator.java
new file mode 100644
index 000000000..ce07b3412
--- /dev/null
+++
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseSourceSplitEnumerator.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.clickhouse.source;
+
+import org.apache.seatunnel.api.source.SourceSplitEnumerator;
+import
org.apache.seatunnel.connectors.seatunnel.clickhouse.state.ClickhouseSourceState;
+
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+
+public class ClickhouseSourceSplitEnumerator implements
+ SourceSplitEnumerator<ClickhouseSourceSplit, ClickhouseSourceState> {
+
+ private final Context<ClickhouseSourceSplit> context;
+ private final Set<Integer> readers;
+ private volatile int assigned = -1;
+
+ ClickhouseSourceSplitEnumerator(Context<ClickhouseSourceSplit>
enumeratorContext) {
+ this.context = enumeratorContext;
+ this.readers = new HashSet<>();
+ }
+
+ @Override
+ public void open() {
+
+ }
+
+ @Override
+ public void run() throws Exception {
+
+ }
+
+ @Override
+ public void close() throws IOException {
+
+ }
+
+ @Override
+ public void addSplitsBack(List<ClickhouseSourceSplit> splits, int
subtaskId) {
+ if (splits.isEmpty()) {
+ return;
+ }
+ if (subtaskId == assigned) {
+ Optional<Integer> otherReader = readers.stream().filter(r -> r !=
subtaskId).findAny();
+ if (otherReader.isPresent()) {
+ context.assignSplit(otherReader.get(), splits);
+ } else {
+ assigned = -1;
+ }
+ }
+ }
+
+ @Override
+ public int currentUnassignedSplitSize() {
+ return assigned < 0 ? 0 : 1;
+ }
+
+ @Override
+ public void handleSplitRequest(int subtaskId) {
+
+ }
+
+ @Override
+ public void registerReader(int subtaskId) {
+ readers.add(subtaskId);
+ if (assigned < 0) {
+ assigned = subtaskId;
+ context.assignSplit(subtaskId, new ClickhouseSourceSplit());
+ }
+ }
+
+ @Override
+ public ClickhouseSourceState snapshotState(long checkpointId) throws
Exception {
+ return null;
+ }
+
+ @Override
+ public void notifyCheckpointComplete(long checkpointId) throws Exception {
+
+ }
+}
diff --git
a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/state/ClickhouseSourceState.java
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/state/ClickhouseSourceState.java
new file mode 100644
index 000000000..85b1c703b
--- /dev/null
+++
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/state/ClickhouseSourceState.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.clickhouse.state;
+
+import java.io.Serializable;
+
+public class ClickhouseSourceState implements Serializable {
+}
diff --git
a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/util/TypeConvertUtil.java
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/util/TypeConvertUtil.java
new file mode 100644
index 000000000..1f98a7d33
--- /dev/null
+++
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/util/TypeConvertUtil.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.clickhouse.util;
+
+import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.ListType;
+import org.apache.seatunnel.api.table.type.LocalTimeType;
+import org.apache.seatunnel.api.table.type.MapType;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+
+import com.clickhouse.client.ClickHouseDataType;
+import com.clickhouse.client.ClickHouseValue;
+
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.util.List;
+import java.util.Map;
+
+public class TypeConvertUtil {
+
+ public static SeaTunnelDataType<?> convert(ClickHouseDataType dataType) {
+ Class<?> type = dataType.getObjectClass();
+ if (Integer.class.equals(type)) {
+ return BasicType.INT_TYPE;
+ } else if (Long.class.equals(type)) {
+ return BasicType.LONG_TYPE;
+ } else if (Short.class.equals(type)) {
+ return BasicType.SHORT_TYPE;
+ } else if (BigInteger.class.equals(type)) {
+ return BasicType.BIG_INT_TYPE;
+ } else if (Byte.class.equals(type)) {
+ return BasicType.BYTE_TYPE;
+ } else if (Boolean.class.equals(type)) {
+ return BasicType.BOOLEAN_TYPE;
+ } else if (LocalDate.class.equals(type)) {
+ return LocalTimeType.LOCAL_DATE_TYPE;
+ } else if (LocalDateTime.class.equals(type)) {
+ return LocalTimeType.LOCAL_DATE_TIME_TYPE;
+ } else if (BigDecimal.class.equals(type)) {
+ return BasicType.BIG_DECIMAL_TYPE;
+ } else if (String.class.equals(type)) {
+ return BasicType.STRING_TYPE;
+ } else if (Float.class.equals(type)) {
+ return BasicType.FLOAT_TYPE;
+ } else if (Double.class.equals(type)) {
+ return BasicType.DOUBLE_TYPE;
+ } else if (Map.class.equals(type)) {
+ return new MapType<>(BasicType.STRING_TYPE, BasicType.STRING_TYPE);
+ } else if (List.class.equals(type)) {
+ return new ListType<>(BasicType.STRING_TYPE);
+ } else {
+ // TODO support pojo
+ throw new IllegalArgumentException("not supported data type: " +
dataType);
+ }
+ }
+
+ public static Object valueUnwrap(SeaTunnelDataType<?> dataType,
ClickHouseValue record) {
+
+ if (dataType.equals(BasicType.BIG_DECIMAL_TYPE)) {
+ return record.asBigDecimal();
+ } else if (dataType.equals(BasicType.BIG_INT_TYPE)) {
+ return record.asBigInteger();
+ } else if (dataType.equals(BasicType.BOOLEAN_TYPE)) {
+ return record.asBoolean();
+ } else if (dataType.equals(BasicType.INT_TYPE)) {
+ return record.asInteger();
+ } else if (dataType.equals(BasicType.LONG_TYPE)) {
+ return record.asLong();
+ } else if (dataType.equals(BasicType.SHORT_TYPE)) {
+ return record.asShort();
+ } else if (dataType.equals(BasicType.BYTE_TYPE)) {
+ return record.asByte();
+ } else if (dataType.equals(LocalTimeType.LOCAL_DATE_TYPE)) {
+ return record.asDate();
+ } else if (dataType.equals(LocalTimeType.LOCAL_DATE_TIME_TYPE)) {
+ return record.asDateTime();
+ } else if (dataType.equals(BasicType.STRING_TYPE)) {
+ return record.asString();
+ } else if (dataType.equals(BasicType.FLOAT_TYPE)) {
+ return record.asFloat();
+ } else if (dataType.equals(BasicType.DOUBLE_TYPE)) {
+ return record.asDouble();
+ } else if (dataType instanceof MapType) {
+ return record.asMap();
+ } else if (dataType instanceof ListType) {
+ return record.asTuple();
+ } else {
+ // TODO support pojo
+ throw new IllegalArgumentException("not supported data type: " +
dataType);
+ }
+ }
+
+}
diff --git
a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-kafka/pom.xml
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-kafka/pom.xml
index 67428381c..bff843e06 100644
---
a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-kafka/pom.xml
+++
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-kafka/pom.xml
@@ -37,6 +37,7 @@
<version>${project.version}</version>
</dependency>
+ <!-- TODO add to dependency management after version unify-->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
diff --git
a/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java
b/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java
index 112f4fbc0..a3f534f44 100644
---
a/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java
+++
b/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java
@@ -71,6 +71,7 @@ public class SinkExecuteProcessor extends
AbstractPluginExecuteProcessor<SeaTunn
SeaTunnelSink<?, ?, ?, ?> seaTunnelSink = plugins.get(i);
Dataset<Row> dataset = fromSourceTable(sinkConfig,
sparkEnvironment).orElse(input);
// TODO modify checkpoint location
+ // TODO add set type info: seaTunnelSink.setTypeInfo();
SparkSinkInjector.inject(dataset.write(), seaTunnelSink, new
HashMap<>(Common.COLLECTION_SIZE)).option(
"checkpointLocation", "/tmp").save();
}
diff --git a/seatunnel-dist/release-docs/LICENSE
b/seatunnel-dist/release-docs/LICENSE
index f8a69980b..57a4e7544 100644
--- a/seatunnel-dist/release-docs/LICENSE
+++ b/seatunnel-dist/release-docs/LICENSE
@@ -827,6 +827,8 @@ The text of each license is the standard Apache 2.0 license.
(The Apache Software License, Version 2.0) aggs-matrix-stats
(org.elasticsearch.plugin:aggs-matrix-stats-client:6.3.1 -
https://github.com/elastic/elasticsearch)
(The Apache Software License, Version 2.0) aggs-matrix-stats
(org.elasticsearch.plugin:aggs-matrix-stats-client:7.5.1 -
https://github.com/elastic/elasticsearch)
(The Apache Software License, Version 2.0) cli
(org.elasticsearch:elasticsearch-cli:6.3.1 -
https://github.com/elastic/elasticsearch)
+ (The Apache Software License, Version 2.0) clickhouse-client
(com.clickhouse:clickhouse-client:0.3.2-patch9 -
https://github.com/ClickHouse/clickhouse-jdbc)
+ (The Apache Software License, Version 2.0) clickhouse-http-client
(com.clickhouse:clickhouse-http-client:0.3.2-patch9 -
https://github.com/ClickHouse/clickhouse-jdbc)
(The Apache Software License, Version 2.0) clickhouse-jdbc
(ru.yandex.clickhouse:clickhouse-jdbc:0.2 -
https://github.com/yandex/clickhouse-jdbc)
(The Apache Software License, Version 2.0) elasticsearch-cli
(org.elasticsearch:elasticsearch-cli:7.5.1 -
https://github.com/elastic/elasticsearch)
(The Apache Software License, Version 2.0) elasticsearch-core
(org.elasticsearch:elasticsearch-core:6.3.1 -
https://github.com/elastic/elasticsearch)
diff --git a/seatunnel-examples/seatunnel-flink-examples/pom.xml
b/seatunnel-examples/seatunnel-flink-examples/pom.xml
index df7572b8b..6c8eb0f09 100644
--- a/seatunnel-examples/seatunnel-flink-examples/pom.xml
+++ b/seatunnel-examples/seatunnel-flink-examples/pom.xml
@@ -96,11 +96,6 @@
<scope>${flink.scope}</scope>
</dependency>
- <dependency>
- <groupId>org.slf4j</groupId>
- <artifactId>slf4j-log4j12</artifactId>
- <version>${slf4j.version}</version>
- </dependency>
</dependencies>
</project>
\ No newline at end of file
diff --git a/seatunnel-examples/seatunnel-flink-new-connector-example/pom.xml
b/seatunnel-examples/seatunnel-flink-new-connector-example/pom.xml
index a14668275..445ad61e0 100644
--- a/seatunnel-examples/seatunnel-flink-new-connector-example/pom.xml
+++ b/seatunnel-examples/seatunnel-flink-new-connector-example/pom.xml
@@ -101,11 +101,6 @@
<scope>${flink.scope}</scope>
</dependency>
- <dependency>
- <groupId>org.slf4j</groupId>
- <artifactId>slf4j-log4j12</artifactId>
- <version>${slf4j.version}</version>
- </dependency>
</dependencies>
</project>
\ No newline at end of file
diff --git a/seatunnel-examples/seatunnel-flink-sql-examples/pom.xml
b/seatunnel-examples/seatunnel-flink-sql-examples/pom.xml
index 63a714fd4..813e04980 100644
--- a/seatunnel-examples/seatunnel-flink-sql-examples/pom.xml
+++ b/seatunnel-examples/seatunnel-flink-sql-examples/pom.xml
@@ -62,7 +62,7 @@
<version>${flink.version}</version>
<scope>${flink.scope}</scope>
</dependency>
-
+
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
@@ -91,10 +91,5 @@
<scope>${flink.scope}</scope>
</dependency>
- <dependency>
- <groupId>org.slf4j</groupId>
- <artifactId>slf4j-log4j12</artifactId>
- <version>${slf4j.version}</version>
- </dependency>
</dependencies>
</project>
diff --git a/seatunnel-examples/seatunnel-spark-examples/pom.xml
b/seatunnel-examples/seatunnel-spark-examples/pom.xml
index 6b9d2c798..3eaff1d70 100644
--- a/seatunnel-examples/seatunnel-spark-examples/pom.xml
+++ b/seatunnel-examples/seatunnel-spark-examples/pom.xml
@@ -85,10 +85,5 @@
<version>1.3.0</version>
</dependency>
- <dependency>
- <groupId>org.slf4j</groupId>
- <artifactId>slf4j-log4j12</artifactId>
- <version>${slf4j.version}</version>
- </dependency>
</dependencies>
</project>
\ No newline at end of file
diff --git a/seatunnel-examples/seatunnel-spark-new-connector-example/pom.xml
b/seatunnel-examples/seatunnel-spark-new-connector-example/pom.xml
index 5a8eb4a10..2d6d74aae 100644
--- a/seatunnel-examples/seatunnel-spark-new-connector-example/pom.xml
+++ b/seatunnel-examples/seatunnel-spark-new-connector-example/pom.xml
@@ -86,11 +86,6 @@
<version>1.3.0</version>
</dependency>
- <dependency>
- <groupId>org.slf4j</groupId>
- <artifactId>slf4j-log4j12</artifactId>
- <version>${slf4j.version}</version>
- </dependency>
</dependencies>
</project>
\ No newline at end of file
diff --git
a/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/source/CoordinatedSource.java
b/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/source/CoordinatedSource.java
index 9806a6d08..7c6f6043d 100644
---
a/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/source/CoordinatedSource.java
+++
b/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/source/CoordinatedSource.java
@@ -139,6 +139,7 @@ public class CoordinatedSource<T, SplitT extends
SourceSplit, StateT> implements
while (flag.get()) {
try {
reader.pollNext(collector);
+ Thread.sleep(SLEEP_TIME_INTERVAL);
} catch (Exception e) {
running = false;
flag.set(false);
diff --git
a/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/source/ParallelSource.java
b/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/source/ParallelSource.java
index 12d8c35eb..ab0940d6e 100644
---
a/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/source/ParallelSource.java
+++
b/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/source/ParallelSource.java
@@ -17,6 +17,8 @@
package org.apache.seatunnel.translation.source;
+import static
org.apache.seatunnel.translation.source.CoordinatedSource.SLEEP_TIME_INTERVAL;
+
import org.apache.seatunnel.api.serialization.Serializer;
import org.apache.seatunnel.api.source.Collector;
import org.apache.seatunnel.api.source.SeaTunnelSource;
@@ -116,6 +118,7 @@ public class ParallelSource<T, SplitT extends SourceSplit,
StateT> implements Ba
future.get();
}
reader.pollNext(collector);
+ Thread.sleep(SLEEP_TIME_INTERVAL);
}
}
diff --git
a/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/utils/TypeConverterUtils.java
b/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/utils/TypeConverterUtils.java
index 9dbd5413b..a7e25e9b3 100644
---
a/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/utils/TypeConverterUtils.java
+++
b/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/utils/TypeConverterUtils.java
@@ -95,7 +95,6 @@ public class TypeConverterUtils {
throw new UnsupportedOperationException("TypeConverterUtils is a
utility class and cannot be instantiated");
}
- @SuppressWarnings("unchecked")
public static SeaTunnelDataType<?> convert(TypeInformation<?> dataType) {
BridgedType bridgedType = BRIDGED_TYPES.get(dataType.getTypeClass());
if (bridgedType != null) {
diff --git
a/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/source/batch/CoordinatedBatchPartitionReader.java
b/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/source/batch/CoordinatedBatchPartitionReader.java
index 00c2cd718..2c00aafe6 100644
---
a/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/source/batch/CoordinatedBatchPartitionReader.java
+++
b/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/source/batch/CoordinatedBatchPartitionReader.java
@@ -75,6 +75,7 @@ public class CoordinatedBatchPartitionReader extends
ParallelBatchPartitionReade
while (flag.get()) {
try {
reader.pollNext(rowCollector);
+ Thread.sleep(SLEEP_TIME_INTERVAL);
} catch (Exception e) {
this.running = false;
flag.set(false);
diff --git
a/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/source/micro/CoordinatedMicroBatchPartitionReader.java
b/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/source/micro/CoordinatedMicroBatchPartitionReader.java
index c24825883..4df4d585b 100644
---
a/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/source/micro/CoordinatedMicroBatchPartitionReader.java
+++
b/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/source/micro/CoordinatedMicroBatchPartitionReader.java
@@ -109,6 +109,7 @@ public class CoordinatedMicroBatchPartitionReader extends
ParallelMicroBatchPart
while (flag.get()) {
try {
reader.pollNext(rowCollector);
+ Thread.sleep(SLEEP_TIME_INTERVAL);
} catch (Exception e) {
this.running = false;
flag.set(false);
diff --git a/tools/dependencies/known-dependencies.txt
b/tools/dependencies/known-dependencies.txt
index 956f1df1b..62d79b2d8 100755
--- a/tools/dependencies/known-dependencies.txt
+++ b/tools/dependencies/known-dependencies.txt
@@ -60,6 +60,8 @@ checker-qual-3.4.0.jar
chill-java-0.9.3.jar
chill_2.11-0.9.3.jar
classmate-1.1.0.jar
+clickhouse-client-0.3.2-patch9.jar
+clickhouse-http-client-0.3.2-patch9.jar
clickhouse-jdbc-0.2.jar
commons-beanutils-1.7.0.jar
commons-beanutils-1.9.3.jar