This is an automated email from the ASF dual-hosted git repository.

fanjia pushed a commit to branch api-draft
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/api-draft by this push:
     new ebf9489f2 [Api-draft] Add clickhouse source support (#2013)
ebf9489f2 is described below

commit ebf9489f2bd510c4d8d1de53550421a6ab33f1b8
Author: Hisoka <[email protected]>
AuthorDate: Wed Jun 15 11:52:58 2022 +0800

    [Api-draft] Add clickhouse source support (#2013)
    
    * fix maven dependency duplicate
    
    * add clickhouse source templete code
    
    * add clickhouse source support
    
    * add clickhouse source support
    
    * add local time support
    
    * fix merge error
    
    * remove useless code
    
    * remove useless code
    
    * add license
---
 pom.xml                                            |   2 +-
 .../apache/seatunnel/api/table/type/BasicType.java |  11 +-
 .../seatunnel-connectors-seatunnel-dist/pom.xml    |   5 +
 .../seatunnel-connectors-seatunnel/pom.xml         |   1 +
 .../pom.xml                                        |  47 +++++++
 .../seatunnel/clickhouse/config/Config.java        |  35 +++++
 .../clickhouse/source/ClickhouseSource.java        | 151 +++++++++++++++++++++
 .../clickhouse/source/ClickhouseSourceReader.java  | 108 +++++++++++++++
 .../clickhouse/source/ClickhouseSourceSplit.java   |  27 ++++
 .../source/ClickhouseSourceSplitEnumerator.java    |  99 ++++++++++++++
 .../clickhouse/state/ClickhouseSourceState.java    |  23 ++++
 .../seatunnel/clickhouse/util/TypeConvertUtil.java | 110 +++++++++++++++
 .../seatunnel-connector-seatunnel-kafka/pom.xml    |   1 +
 .../spark/execution/SinkExecuteProcessor.java      |   1 +
 seatunnel-dist/release-docs/LICENSE                |   2 +
 .../seatunnel-flink-examples/pom.xml               |   5 -
 .../seatunnel-flink-new-connector-example/pom.xml  |   5 -
 .../seatunnel-flink-sql-examples/pom.xml           |   7 +-
 .../seatunnel-spark-examples/pom.xml               |   5 -
 .../seatunnel-spark-new-connector-example/pom.xml  |   5 -
 .../translation/source/CoordinatedSource.java      |   1 +
 .../translation/source/ParallelSource.java         |   3 +
 .../flink/utils/TypeConverterUtils.java            |   1 -
 .../batch/CoordinatedBatchPartitionReader.java     |   1 +
 .../CoordinatedMicroBatchPartitionReader.java      |   1 +
 tools/dependencies/known-dependencies.txt          |   2 +
 26 files changed, 626 insertions(+), 33 deletions(-)

diff --git a/pom.xml b/pom.xml
index 0e479d784..d8df74e2c 100644
--- a/pom.xml
+++ b/pom.xml
@@ -40,7 +40,7 @@
     <licenses>
         <license>
             <name>The Apache License, Version 2.0</name>
-            <url>http://www.apache.org/licenses/LICENSE-2.0.txt</url>
+            <url>https://www.apache.org/licenses/LICENSE-2.0.txt</url>
         </license>
     </licenses>
 
diff --git 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/BasicType.java
 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/BasicType.java
index 2c61b4e32..74498c7b9 100644
--- 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/BasicType.java
+++ 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/BasicType.java
@@ -21,6 +21,7 @@ import java.math.BigDecimal;
 import java.math.BigInteger;
 import java.time.Instant;
 import java.util.Date;
+import java.util.Objects;
 
 public class BasicType<T> implements SeaTunnelDataType<T> {
     private static final long serialVersionUID = 1L;
@@ -64,15 +65,15 @@ public class BasicType<T> implements SeaTunnelDataType<T> {
     }
 
     @Override
-    public boolean equals(Object obj) {
-        if (this == obj) {
+    public boolean equals(Object o) {
+        if (this == o) {
             return true;
         }
-        if (obj == null) {
+        if (o == null || getClass() != o.getClass()) {
             return false;
         }
-        BasicType<?> other = (BasicType<?>) obj;
-        return this.physicalTypeClass.equals(other.physicalTypeClass);
+        BasicType<?> basicType = (BasicType<?>) o;
+        return Objects.equals(physicalTypeClass, basicType.physicalTypeClass);
     }
 
     @Override
diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel-dist/pom.xml 
b/seatunnel-connectors/seatunnel-connectors-seatunnel-dist/pom.xml
index d00131697..ba8c8be33 100644
--- a/seatunnel-connectors/seatunnel-connectors-seatunnel-dist/pom.xml
+++ b/seatunnel-connectors/seatunnel-connectors-seatunnel-dist/pom.xml
@@ -60,6 +60,11 @@
             <artifactId>seatunnel-connector-seatunnel-socket</artifactId>
             <version>${project.version}</version>
         </dependency>
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>seatunnel-connector-seatunnel-clickhouse</artifactId>
+            <version>${project.version}</version>
+        </dependency>
     </dependencies>
 
     <build>
diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/pom.xml 
b/seatunnel-connectors/seatunnel-connectors-seatunnel/pom.xml
index 86d0b5271..89ebf68d0 100644
--- a/seatunnel-connectors/seatunnel-connectors-seatunnel/pom.xml
+++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/pom.xml
@@ -37,5 +37,6 @@
         <module>seatunnel-connector-seatunnel-kafka</module>
         <module>seatunnel-connector-seatunnel-jdbc</module>
         <module>seatunnel-connector-seatunnel-socket</module>
+        <module>seatunnel-connector-seatunnel-clickhouse</module>
     </modules>
 </project>
\ No newline at end of file
diff --git 
a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/pom.xml
 
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/pom.xml
new file mode 100644
index 000000000..e3fb148aa
--- /dev/null
+++ 
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/pom.xml
@@ -0,0 +1,47 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~    http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0";
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+    <parent>
+        <artifactId>seatunnel-connectors-seatunnel</artifactId>
+        <groupId>org.apache.seatunnel</groupId>
+        <version>${revision}</version>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>seatunnel-connector-seatunnel-clickhouse</artifactId>
+
+    <dependencies>
+
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>seatunnel-api</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+
+        <!-- TODO add to dependency management after version unify -->
+        <dependency>
+            <groupId>com.clickhouse</groupId>
+            <artifactId>clickhouse-http-client</artifactId>
+            <version>0.3.2-patch9</version>
+        </dependency>
+    </dependencies>
+
+</project>
\ No newline at end of file
diff --git 
a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/config/Config.java
 
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/config/Config.java
new file mode 100644
index 000000000..65b7af7c6
--- /dev/null
+++ 
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/config/Config.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.clickhouse.config;
+
+/**
+ * The config of clickhouse
+ */
+public class Config {
+
+    public static final String NODE_ADDRESS = "node_address";
+
+    public static final String DATABASE = "database";
+
+    public static final String SQL = "sql";
+
+    public static final String USERNAME = "username";
+
+    public static final String PASSWORD = "password";
+
+}
diff --git 
a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseSource.java
 
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseSource.java
new file mode 100644
index 000000000..59e6b5cdb
--- /dev/null
+++ 
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseSource.java
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.clickhouse.source;
+
+import static 
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.Config.DATABASE;
+import static 
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.Config.NODE_ADDRESS;
+import static 
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.Config.PASSWORD;
+import static 
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.Config.SQL;
+import static 
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.Config.USERNAME;
+
+import org.apache.seatunnel.api.common.PrepareFailException;
+import org.apache.seatunnel.api.common.SeaTunnelContext;
+import org.apache.seatunnel.api.serialization.DefaultSerializer;
+import org.apache.seatunnel.api.serialization.Serializer;
+import org.apache.seatunnel.api.source.Boundedness;
+import org.apache.seatunnel.api.source.SeaTunnelSource;
+import org.apache.seatunnel.api.source.SourceReader;
+import org.apache.seatunnel.api.source.SourceSplitEnumerator;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.common.config.CheckConfigUtil;
+import org.apache.seatunnel.common.config.CheckResult;
+import org.apache.seatunnel.common.constants.PluginType;
+import 
org.apache.seatunnel.connectors.seatunnel.clickhouse.state.ClickhouseSourceState;
+import 
org.apache.seatunnel.connectors.seatunnel.clickhouse.util.TypeConvertUtil;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import com.clickhouse.client.ClickHouseClient;
+import com.clickhouse.client.ClickHouseCredentials;
+import com.clickhouse.client.ClickHouseException;
+import com.clickhouse.client.ClickHouseFormat;
+import com.clickhouse.client.ClickHouseNode;
+import com.clickhouse.client.ClickHouseProtocol;
+import com.clickhouse.client.ClickHouseResponse;
+import com.google.auto.service.AutoService;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.stream.Collectors;
+
+@AutoService(SeaTunnelSource.class)
+public class ClickhouseSource implements SeaTunnelSource<SeaTunnelRow, 
ClickhouseSourceSplit, ClickhouseSourceState> {
+
+    private SeaTunnelContext seaTunnelContext;
+    private List<ClickHouseNode> servers;
+    private SeaTunnelRowType rowTypeInfo;
+    private String sql;
+
+    @Override
+    public String getPluginName() {
+        return "Clickhouse";
+    }
+
+    @Override
+    public void prepare(Config config) throws PrepareFailException {
+        CheckResult result = CheckConfigUtil.checkAllExists(config, 
NODE_ADDRESS, DATABASE, SQL, USERNAME, PASSWORD);
+        if (!result.isSuccess()) {
+            throw new PrepareFailException(getPluginName(), PluginType.SOURCE, 
result.getMsg());
+        }
+        servers = 
Arrays.stream(config.getString(NODE_ADDRESS).split(",")).map(address -> {
+            String[] nodeAndPort = address.split(":", 2);
+            return 
ClickHouseNode.builder().host(nodeAndPort[0]).port(ClickHouseProtocol.HTTP,
+                            
Integer.parseInt(nodeAndPort[1])).database(config.getString(DATABASE))
+                    
.credentials(ClickHouseCredentials.fromUserAndPassword(config.getString(USERNAME),
+                            config.getString(PASSWORD))).build();
+        }).collect(Collectors.toList());
+
+        sql = config.getString(SQL);
+        try (ClickHouseClient client = 
ClickHouseClient.newInstance(servers.get(0).getProtocol());
+             ClickHouseResponse response =
+                     
client.connect(servers.get(0)).format(ClickHouseFormat.RowBinaryWithNamesAndTypes)
+                             
.query(modifySQLToLimit1(config.getString(SQL))).executeAndWait()) {
+
+            int columnSize = response.getColumns().size();
+            String[] fieldNames = new String[columnSize];
+            SeaTunnelDataType<?>[] seaTunnelDataTypes = new 
SeaTunnelDataType[columnSize];
+
+            for (int i = 0; i < columnSize; i++) {
+                fieldNames[i] = response.getColumns().get(i).getColumnName();
+                seaTunnelDataTypes[i] = 
TypeConvertUtil.convert(response.getColumns().get(i).getDataType());
+            }
+
+            this.rowTypeInfo = new SeaTunnelRowType(fieldNames, 
seaTunnelDataTypes);
+
+        } catch (ClickHouseException e) {
+            throw new PrepareFailException(getPluginName(), PluginType.SOURCE, 
e.getMessage());
+        }
+
+    }
+
+    private String modifySQLToLimit1(String sql) {
+        return String.format("SELECT * FROM (%s) s LIMIT 1", sql);
+    }
+
+    @Override
+    public Boundedness getBoundedness() {
+        return Boundedness.BOUNDED;
+    }
+
+    @Override
+    public SeaTunnelRowType getProducedType() {
+        return this.rowTypeInfo;
+    }
+
+    @Override
+    public SourceReader<SeaTunnelRow, ClickhouseSourceSplit> 
createReader(SourceReader.Context readerContext) throws Exception {
+        return new ClickhouseSourceReader(servers, readerContext, 
this.rowTypeInfo, sql);
+    }
+
+    @Override
+    public SourceSplitEnumerator<ClickhouseSourceSplit, ClickhouseSourceState> 
createEnumerator(SourceSplitEnumerator.Context<ClickhouseSourceSplit> 
enumeratorContext) throws Exception {
+        return new ClickhouseSourceSplitEnumerator(enumeratorContext);
+    }
+
+    @Override
+    public SourceSplitEnumerator<ClickhouseSourceSplit, ClickhouseSourceState> 
restoreEnumerator(SourceSplitEnumerator.Context<ClickhouseSourceSplit> 
enumeratorContext, ClickhouseSourceState checkpointState) throws Exception {
+        return new ClickhouseSourceSplitEnumerator(enumeratorContext);
+    }
+
+    @Override
+    public Serializer<ClickhouseSourceState> getEnumeratorStateSerializer() {
+        return new DefaultSerializer<>();
+    }
+
+    @Override
+    public SeaTunnelContext getSeaTunnelContext() {
+        return seaTunnelContext;
+    }
+
+    @Override
+    public void setSeaTunnelContext(SeaTunnelContext seaTunnelContext) {
+        this.seaTunnelContext = seaTunnelContext;
+    }
+}
diff --git 
a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseSourceReader.java
 
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseSourceReader.java
new file mode 100644
index 000000000..72a461061
--- /dev/null
+++ 
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseSourceReader.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.clickhouse.source;
+
+import org.apache.seatunnel.api.source.Collector;
+import org.apache.seatunnel.api.source.SourceReader;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import 
org.apache.seatunnel.connectors.seatunnel.clickhouse.util.TypeConvertUtil;
+
+import com.clickhouse.client.ClickHouseClient;
+import com.clickhouse.client.ClickHouseFormat;
+import com.clickhouse.client.ClickHouseNode;
+import com.clickhouse.client.ClickHouseRequest;
+import com.clickhouse.client.ClickHouseResponse;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+
+public class ClickhouseSourceReader implements SourceReader<SeaTunnelRow, 
ClickhouseSourceSplit> {
+
+    private final List<ClickHouseNode> servers;
+    private ClickHouseClient client;
+    private final SeaTunnelRowType rowTypeInfo;
+    private final SourceReader.Context readerContext;
+    private ClickHouseRequest<?> request;
+    private final String sql;
+
+    private final List<ClickhouseSourceSplit> splits;
+
+    ClickhouseSourceReader(List<ClickHouseNode> servers, SourceReader.Context 
readerContext,
+                           SeaTunnelRowType rowTypeInfo, String sql) {
+        this.servers = servers;
+        this.readerContext = readerContext;
+        this.rowTypeInfo = rowTypeInfo;
+        this.sql = sql;
+        this.splits = new ArrayList<>();
+    }
+
+    @Override
+    public void open() {
+        Random random = new Random();
+        ClickHouseNode server = servers.get(random.nextInt(servers.size()));
+        client = ClickHouseClient.newInstance(server.getProtocol());
+        request = 
client.connect(server).format(ClickHouseFormat.RowBinaryWithNamesAndTypes);
+    }
+
+    @Override
+    public void close() throws IOException {
+        if (client != null) {
+            client.close();
+        }
+    }
+
+    @Override
+    public void pollNext(Collector<SeaTunnelRow> output) throws Exception {
+        if (!splits.isEmpty()) {
+            try (ClickHouseResponse response = 
this.request.query(sql).executeAndWait()) {
+                response.stream().forEach(record -> {
+                    Object[] values = new 
Object[this.rowTypeInfo.getFieldNames().length];
+                    for (int i = 0; i < record.size(); i++) {
+                        values[i] = TypeConvertUtil.valueUnwrap(
+                                this.rowTypeInfo.getFieldType(i), 
record.getValue(i));
+                    }
+                    output.collect(new SeaTunnelRow(values));
+                });
+            }
+            this.readerContext.signalNoMoreElement();
+        }
+    }
+
+    @Override
+    public List<ClickhouseSourceSplit> snapshotState(long checkpointId) throws 
Exception {
+        return null;
+    }
+
+    @Override
+    public void addSplits(List<ClickhouseSourceSplit> splits) {
+        this.splits.addAll(splits);
+    }
+
+    @Override
+    public void handleNoMoreSplits() {
+
+    }
+
+    @Override
+    public void notifyCheckpointComplete(long checkpointId) throws Exception {
+
+    }
+}
diff --git 
a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseSourceSplit.java
 
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseSourceSplit.java
new file mode 100644
index 000000000..e70d2cd44
--- /dev/null
+++ 
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseSourceSplit.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.clickhouse.source;
+
+import org.apache.seatunnel.api.source.SourceSplit;
+
+public class ClickhouseSourceSplit implements SourceSplit {
+    @Override
+    public String splitId() {
+        return null;
+    }
+}
diff --git 
a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseSourceSplitEnumerator.java
 
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseSourceSplitEnumerator.java
new file mode 100644
index 000000000..ce07b3412
--- /dev/null
+++ 
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseSourceSplitEnumerator.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.clickhouse.source;
+
+import org.apache.seatunnel.api.source.SourceSplitEnumerator;
+import 
org.apache.seatunnel.connectors.seatunnel.clickhouse.state.ClickhouseSourceState;
+
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+
+public class ClickhouseSourceSplitEnumerator implements
+        SourceSplitEnumerator<ClickhouseSourceSplit, ClickhouseSourceState> {
+
+    private final Context<ClickhouseSourceSplit> context;
+    private final Set<Integer> readers;
+    private volatile int assigned = -1;
+
+    ClickhouseSourceSplitEnumerator(Context<ClickhouseSourceSplit> 
enumeratorContext) {
+        this.context = enumeratorContext;
+        this.readers = new HashSet<>();
+    }
+
+    @Override
+    public void open() {
+
+    }
+
+    @Override
+    public void run() throws Exception {
+
+    }
+
+    @Override
+    public void close() throws IOException {
+
+    }
+
+    @Override
+    public void addSplitsBack(List<ClickhouseSourceSplit> splits, int 
subtaskId) {
+        if (splits.isEmpty()) {
+            return;
+        }
+        if (subtaskId == assigned) {
+            Optional<Integer> otherReader = readers.stream().filter(r -> r != 
subtaskId).findAny();
+            if (otherReader.isPresent()) {
+                context.assignSplit(otherReader.get(), splits);
+            } else {
+                assigned = -1;
+            }
+        }
+    }
+
+    @Override
+    public int currentUnassignedSplitSize() {
+        return assigned < 0 ? 0 : 1;
+    }
+
+    @Override
+    public void handleSplitRequest(int subtaskId) {
+
+    }
+
+    @Override
+    public void registerReader(int subtaskId) {
+        readers.add(subtaskId);
+        if (assigned < 0) {
+            assigned = subtaskId;
+            context.assignSplit(subtaskId, new ClickhouseSourceSplit());
+        }
+    }
+
+    @Override
+    public ClickhouseSourceState snapshotState(long checkpointId) throws 
Exception {
+        return null;
+    }
+
+    @Override
+    public void notifyCheckpointComplete(long checkpointId) throws Exception {
+
+    }
+}
diff --git 
a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/state/ClickhouseSourceState.java
 
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/state/ClickhouseSourceState.java
new file mode 100644
index 000000000..85b1c703b
--- /dev/null
+++ 
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/state/ClickhouseSourceState.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.clickhouse.state;
+
+import java.io.Serializable;
+
+public class ClickhouseSourceState implements Serializable {
+}
diff --git 
a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/util/TypeConvertUtil.java
 
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/util/TypeConvertUtil.java
new file mode 100644
index 000000000..1f98a7d33
--- /dev/null
+++ 
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/util/TypeConvertUtil.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.clickhouse.util;
+
+import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.ListType;
+import org.apache.seatunnel.api.table.type.LocalTimeType;
+import org.apache.seatunnel.api.table.type.MapType;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+
+import com.clickhouse.client.ClickHouseDataType;
+import com.clickhouse.client.ClickHouseValue;
+
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.util.List;
+import java.util.Map;
+
+public class TypeConvertUtil {
+
+    public static SeaTunnelDataType<?> convert(ClickHouseDataType dataType) {
+        Class<?> type = dataType.getObjectClass();
+        if (Integer.class.equals(type)) {
+            return BasicType.INT_TYPE;
+        } else if (Long.class.equals(type)) {
+            return BasicType.LONG_TYPE;
+        } else if (Short.class.equals(type)) {
+            return BasicType.SHORT_TYPE;
+        } else if (BigInteger.class.equals(type)) {
+            return BasicType.BIG_INT_TYPE;
+        } else if (Byte.class.equals(type)) {
+            return BasicType.BYTE_TYPE;
+        } else if (Boolean.class.equals(type)) {
+            return BasicType.BOOLEAN_TYPE;
+        } else if (LocalDate.class.equals(type)) {
+            return LocalTimeType.LOCAL_DATE_TYPE;
+        } else if (LocalDateTime.class.equals(type)) {
+            return LocalTimeType.LOCAL_DATE_TIME_TYPE;
+        } else if (BigDecimal.class.equals(type)) {
+            return BasicType.BIG_DECIMAL_TYPE;
+        } else if (String.class.equals(type)) {
+            return BasicType.STRING_TYPE;
+        } else if (Float.class.equals(type)) {
+            return BasicType.FLOAT_TYPE;
+        } else if (Double.class.equals(type)) {
+            return BasicType.DOUBLE_TYPE;
+        } else if (Map.class.equals(type)) {
+            return new MapType<>(BasicType.STRING_TYPE, BasicType.STRING_TYPE);
+        } else if (List.class.equals(type)) {
+            return new ListType<>(BasicType.STRING_TYPE);
+        } else {
+            // TODO support pojo
+            throw new IllegalArgumentException("not supported data type: " + 
dataType);
+        }
+    }
+
+    public static Object valueUnwrap(SeaTunnelDataType<?> dataType, 
ClickHouseValue record) {
+
+        if (dataType.equals(BasicType.BIG_DECIMAL_TYPE)) {
+            return record.asBigDecimal();
+        } else if (dataType.equals(BasicType.BIG_INT_TYPE)) {
+            return record.asBigInteger();
+        } else if (dataType.equals(BasicType.BOOLEAN_TYPE)) {
+            return record.asBoolean();
+        } else if (dataType.equals(BasicType.INT_TYPE)) {
+            return record.asInteger();
+        } else if (dataType.equals(BasicType.LONG_TYPE)) {
+            return record.asLong();
+        } else if (dataType.equals(BasicType.SHORT_TYPE)) {
+            return record.asShort();
+        } else if (dataType.equals(BasicType.BYTE_TYPE)) {
+            return record.asByte();
+        } else if (dataType.equals(LocalTimeType.LOCAL_DATE_TYPE)) {
+            return record.asDate();
+        } else if (dataType.equals(LocalTimeType.LOCAL_DATE_TIME_TYPE)) {
+            return record.asDateTime();
+        } else if (dataType.equals(BasicType.STRING_TYPE)) {
+            return record.asString();
+        } else if (dataType.equals(BasicType.FLOAT_TYPE)) {
+            return record.asFloat();
+        } else if (dataType.equals(BasicType.DOUBLE_TYPE)) {
+            return record.asDouble();
+        } else if (dataType instanceof MapType) {
+            return record.asMap();
+        } else if (dataType instanceof ListType) {
+            return record.asTuple();
+        } else {
+            // TODO support pojo
+            throw new IllegalArgumentException("not supported data type: " + 
dataType);
+        }
+    }
+
+}
diff --git 
a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-kafka/pom.xml
 
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-kafka/pom.xml
index 67428381c..bff843e06 100644
--- 
a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-kafka/pom.xml
+++ 
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-kafka/pom.xml
@@ -37,6 +37,7 @@
             <version>${project.version}</version>
         </dependency>
 
+        <!-- TODO add to dependency management after version unify-->
         <dependency>
             <groupId>org.apache.kafka</groupId>
             <artifactId>kafka-clients</artifactId>
diff --git 
a/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java
 
b/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java
index 112f4fbc0..a3f534f44 100644
--- 
a/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java
+++ 
b/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java
@@ -71,6 +71,7 @@ public class SinkExecuteProcessor extends 
AbstractPluginExecuteProcessor<SeaTunn
             SeaTunnelSink<?, ?, ?, ?> seaTunnelSink = plugins.get(i);
             Dataset<Row> dataset = fromSourceTable(sinkConfig, 
sparkEnvironment).orElse(input);
             // TODO modify checkpoint location
+            // TODO add set type info: seaTunnelSink.setTypeInfo();
             SparkSinkInjector.inject(dataset.write(), seaTunnelSink, new 
HashMap<>(Common.COLLECTION_SIZE)).option(
                 "checkpointLocation", "/tmp").save();
         }
diff --git a/seatunnel-dist/release-docs/LICENSE 
b/seatunnel-dist/release-docs/LICENSE
index f8a69980b..57a4e7544 100644
--- a/seatunnel-dist/release-docs/LICENSE
+++ b/seatunnel-dist/release-docs/LICENSE
@@ -827,6 +827,8 @@ The text of each license is the standard Apache 2.0 license.
      (The Apache Software License, Version 2.0) aggs-matrix-stats 
(org.elasticsearch.plugin:aggs-matrix-stats-client:6.3.1 - 
https://github.com/elastic/elasticsearch)
      (The Apache Software License, Version 2.0) aggs-matrix-stats 
(org.elasticsearch.plugin:aggs-matrix-stats-client:7.5.1 - 
https://github.com/elastic/elasticsearch)
      (The Apache Software License, Version 2.0) cli 
(org.elasticsearch:elasticsearch-cli:6.3.1 - 
https://github.com/elastic/elasticsearch)
+     (The Apache Software License, Version 2.0) clickhouse-client 
(com.clickhouse:clickhouse-client:0.3.2-patch9 - 
https://github.com/ClickHouse/clickhouse-jdbc)
+     (The Apache Software License, Version 2.0) clickhouse-http-client 
(com.clickhouse:clickhouse-http-client:0.3.2-patch9 - 
https://github.com/ClickHouse/clickhouse-jdbc)
      (The Apache Software License, Version 2.0) clickhouse-jdbc 
(ru.yandex.clickhouse:clickhouse-jdbc:0.2 - 
https://github.com/yandex/clickhouse-jdbc)
      (The Apache Software License, Version 2.0) elasticsearch-cli 
(org.elasticsearch:elasticsearch-cli:7.5.1 - 
https://github.com/elastic/elasticsearch)
      (The Apache Software License, Version 2.0) elasticsearch-core 
(org.elasticsearch:elasticsearch-core:6.3.1 - 
https://github.com/elastic/elasticsearch)
diff --git a/seatunnel-examples/seatunnel-flink-examples/pom.xml 
b/seatunnel-examples/seatunnel-flink-examples/pom.xml
index df7572b8b..6c8eb0f09 100644
--- a/seatunnel-examples/seatunnel-flink-examples/pom.xml
+++ b/seatunnel-examples/seatunnel-flink-examples/pom.xml
@@ -96,11 +96,6 @@
             <scope>${flink.scope}</scope>
         </dependency>
 
-        <dependency>
-            <groupId>org.slf4j</groupId>
-            <artifactId>slf4j-log4j12</artifactId>
-            <version>${slf4j.version}</version>
-        </dependency>
     </dependencies>
 
 </project>
\ No newline at end of file
diff --git a/seatunnel-examples/seatunnel-flink-new-connector-example/pom.xml 
b/seatunnel-examples/seatunnel-flink-new-connector-example/pom.xml
index a14668275..445ad61e0 100644
--- a/seatunnel-examples/seatunnel-flink-new-connector-example/pom.xml
+++ b/seatunnel-examples/seatunnel-flink-new-connector-example/pom.xml
@@ -101,11 +101,6 @@
             <scope>${flink.scope}</scope>
         </dependency>
 
-        <dependency>
-            <groupId>org.slf4j</groupId>
-            <artifactId>slf4j-log4j12</artifactId>
-            <version>${slf4j.version}</version>
-        </dependency>
     </dependencies>
 
 </project>
\ No newline at end of file
diff --git a/seatunnel-examples/seatunnel-flink-sql-examples/pom.xml 
b/seatunnel-examples/seatunnel-flink-sql-examples/pom.xml
index 63a714fd4..813e04980 100644
--- a/seatunnel-examples/seatunnel-flink-sql-examples/pom.xml
+++ b/seatunnel-examples/seatunnel-flink-sql-examples/pom.xml
@@ -62,7 +62,7 @@
             <version>${flink.version}</version>
             <scope>${flink.scope}</scope>
         </dependency>
-        
+
         <dependency>
             <groupId>org.apache.flink</groupId>
             
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
@@ -91,10 +91,5 @@
             <scope>${flink.scope}</scope>
         </dependency>
 
-        <dependency>
-            <groupId>org.slf4j</groupId>
-            <artifactId>slf4j-log4j12</artifactId>
-            <version>${slf4j.version}</version>
-        </dependency>
     </dependencies>
 </project>
diff --git a/seatunnel-examples/seatunnel-spark-examples/pom.xml 
b/seatunnel-examples/seatunnel-spark-examples/pom.xml
index 6b9d2c798..3eaff1d70 100644
--- a/seatunnel-examples/seatunnel-spark-examples/pom.xml
+++ b/seatunnel-examples/seatunnel-spark-examples/pom.xml
@@ -85,10 +85,5 @@
             <version>1.3.0</version>
         </dependency>
 
-        <dependency>
-            <groupId>org.slf4j</groupId>
-            <artifactId>slf4j-log4j12</artifactId>
-            <version>${slf4j.version}</version>
-        </dependency>
     </dependencies>
 </project>
\ No newline at end of file
diff --git a/seatunnel-examples/seatunnel-spark-new-connector-example/pom.xml 
b/seatunnel-examples/seatunnel-spark-new-connector-example/pom.xml
index 5a8eb4a10..2d6d74aae 100644
--- a/seatunnel-examples/seatunnel-spark-new-connector-example/pom.xml
+++ b/seatunnel-examples/seatunnel-spark-new-connector-example/pom.xml
@@ -86,11 +86,6 @@
             <version>1.3.0</version>
         </dependency>
 
-        <dependency>
-            <groupId>org.slf4j</groupId>
-            <artifactId>slf4j-log4j12</artifactId>
-            <version>${slf4j.version}</version>
-        </dependency>
     </dependencies>
 
 </project>
\ No newline at end of file
diff --git 
a/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/source/CoordinatedSource.java
 
b/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/source/CoordinatedSource.java
index 9806a6d08..7c6f6043d 100644
--- 
a/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/source/CoordinatedSource.java
+++ 
b/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/source/CoordinatedSource.java
@@ -139,6 +139,7 @@ public class CoordinatedSource<T, SplitT extends 
SourceSplit, StateT> implements
                 while (flag.get()) {
                     try {
                         reader.pollNext(collector);
+                        Thread.sleep(SLEEP_TIME_INTERVAL);
                     } catch (Exception e) {
                         running = false;
                         flag.set(false);
diff --git 
a/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/source/ParallelSource.java
 
b/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/source/ParallelSource.java
index 12d8c35eb..ab0940d6e 100644
--- 
a/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/source/ParallelSource.java
+++ 
b/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/source/ParallelSource.java
@@ -17,6 +17,8 @@
 
 package org.apache.seatunnel.translation.source;
 
+import static 
org.apache.seatunnel.translation.source.CoordinatedSource.SLEEP_TIME_INTERVAL;
+
 import org.apache.seatunnel.api.serialization.Serializer;
 import org.apache.seatunnel.api.source.Collector;
 import org.apache.seatunnel.api.source.SeaTunnelSource;
@@ -116,6 +118,7 @@ public class ParallelSource<T, SplitT extends SourceSplit, 
StateT> implements Ba
                 future.get();
             }
             reader.pollNext(collector);
+            Thread.sleep(SLEEP_TIME_INTERVAL);
         }
     }
 
diff --git 
a/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/utils/TypeConverterUtils.java
 
b/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/utils/TypeConverterUtils.java
index 9dbd5413b..a7e25e9b3 100644
--- 
a/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/utils/TypeConverterUtils.java
+++ 
b/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/utils/TypeConverterUtils.java
@@ -95,7 +95,6 @@ public class TypeConverterUtils {
         throw new UnsupportedOperationException("TypeConverterUtils is a 
utility class and cannot be instantiated");
     }
 
-    @SuppressWarnings("unchecked")
     public static SeaTunnelDataType<?> convert(TypeInformation<?> dataType) {
         BridgedType bridgedType = BRIDGED_TYPES.get(dataType.getTypeClass());
         if (bridgedType != null) {
diff --git 
a/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/source/batch/CoordinatedBatchPartitionReader.java
 
b/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/source/batch/CoordinatedBatchPartitionReader.java
index 00c2cd718..2c00aafe6 100644
--- 
a/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/source/batch/CoordinatedBatchPartitionReader.java
+++ 
b/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/source/batch/CoordinatedBatchPartitionReader.java
@@ -75,6 +75,7 @@ public class CoordinatedBatchPartitionReader extends 
ParallelBatchPartitionReade
                     while (flag.get()) {
                         try {
                             reader.pollNext(rowCollector);
+                            Thread.sleep(SLEEP_TIME_INTERVAL);
                         } catch (Exception e) {
                             this.running = false;
                             flag.set(false);
diff --git 
a/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/source/micro/CoordinatedMicroBatchPartitionReader.java
 
b/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/source/micro/CoordinatedMicroBatchPartitionReader.java
index c24825883..4df4d585b 100644
--- 
a/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/source/micro/CoordinatedMicroBatchPartitionReader.java
+++ 
b/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/source/micro/CoordinatedMicroBatchPartitionReader.java
@@ -109,6 +109,7 @@ public class CoordinatedMicroBatchPartitionReader extends 
ParallelMicroBatchPart
                     while (flag.get()) {
                         try {
                             reader.pollNext(rowCollector);
+                            Thread.sleep(SLEEP_TIME_INTERVAL);
                         } catch (Exception e) {
                             this.running = false;
                             flag.set(false);
diff --git a/tools/dependencies/known-dependencies.txt 
b/tools/dependencies/known-dependencies.txt
index 956f1df1b..62d79b2d8 100755
--- a/tools/dependencies/known-dependencies.txt
+++ b/tools/dependencies/known-dependencies.txt
@@ -60,6 +60,8 @@ checker-qual-3.4.0.jar
 chill-java-0.9.3.jar
 chill_2.11-0.9.3.jar
 classmate-1.1.0.jar
+clickhouse-client-0.3.2-patch9.jar
+clickhouse-http-client-0.3.2-patch9.jar
 clickhouse-jdbc-0.2.jar
 commons-beanutils-1.7.0.jar
 commons-beanutils-1.9.3.jar

Reply via email to