This is an automated email from the ASF dual-hosted git repository.
wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 8d9c6a3714 [Fix][Connector-V2] Fixed clickhouse connectors cannot stop
under multiple parallelism (#7921)
8d9c6a3714 is described below
commit 8d9c6a371499329a8f3a7cc1046ef3a7841fdad4
Author: YOMO LEE <[email protected]>
AuthorDate: Tue Oct 29 20:48:38 2024 +0800
[Fix][Connector-V2] Fixed clickhouse connectors cannot stop under multiple
parallelism (#7921)
---
.../clickhouse/source/ClickhouseSourceReader.java | 60 ++++++++++++++--------
.../source/ClickhouseSourceSplitEnumerator.java | 1 +
.../seatunnel/clickhouse/ClickhouseIT.java | 6 +++
.../src/test/resources/clickhouse_to_console.conf | 45 ++++++++++++++++
.../src/main/resources/seatunnel.yaml | 1 +
5 files changed, 92 insertions(+), 21 deletions(-)
diff --git
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseSourceReader.java
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseSourceReader.java
index 591334d972..3ad0ec041e 100644
---
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseSourceReader.java
+++
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseSourceReader.java
@@ -17,6 +17,7 @@
package org.apache.seatunnel.connectors.seatunnel.clickhouse.source;
+import org.apache.seatunnel.api.source.Boundedness;
import org.apache.seatunnel.api.source.Collector;
import org.apache.seatunnel.api.source.SourceReader;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
@@ -28,6 +29,7 @@ import com.clickhouse.client.ClickHouseFormat;
import com.clickhouse.client.ClickHouseNode;
import com.clickhouse.client.ClickHouseRequest;
import com.clickhouse.client.ClickHouseResponse;
+import lombok.extern.slf4j.Slf4j;
import java.io.IOException;
import java.util.ArrayList;
@@ -35,6 +37,7 @@ import java.util.Collections;
import java.util.List;
import java.util.Random;
+@Slf4j
public class ClickhouseSourceReader implements SourceReader<SeaTunnelRow,
ClickhouseSourceSplit> {
private final List<ClickHouseNode> servers;
@@ -43,6 +46,7 @@ public class ClickhouseSourceReader implements
SourceReader<SeaTunnelRow, Clickh
private final SourceReader.Context readerContext;
private ClickHouseRequest<?> request;
private final String sql;
+ private volatile boolean noMoreSplit;
private final List<ClickhouseSourceSplit> splits;
@@ -75,31 +79,43 @@ public class ClickhouseSourceReader implements
SourceReader<SeaTunnelRow, Clickh
@Override
public void pollNext(Collector<SeaTunnelRow> output) throws Exception {
- if (!splits.isEmpty()) {
- try (ClickHouseResponse response =
this.request.query(sql).executeAndWait()) {
- response.stream()
- .forEach(
- record -> {
- Object[] values =
- new
Object[this.rowTypeInfo.getFieldNames().length];
- for (int i = 0; i < record.size(); i++) {
- if
(record.getValue(i).isNullOrEmpty()) {
- values[i] = null;
- } else {
- values[i] =
-
TypeConvertUtil.valueUnwrap(
-
this.rowTypeInfo.getFieldType(i),
-
record.getValue(i));
+ synchronized (output.getCheckpointLock()) {
+ if (!splits.isEmpty()) {
+ try (ClickHouseResponse response =
this.request.query(sql).executeAndWait()) {
+ response.stream()
+ .forEach(
+ record -> {
+ Object[] values =
+ new
Object[this.rowTypeInfo.getFieldNames().length];
+ for (int i = 0; i < record.size();
i++) {
+ if
(record.getValue(i).isNullOrEmpty()) {
+ values[i] = null;
+ } else {
+ values[i] =
+
TypeConvertUtil.valueUnwrap(
+
this.rowTypeInfo.getFieldType(i),
+
record.getValue(i));
+ }
}
- }
- output.collect(new SeaTunnelRow(values));
- });
+ output.collect(new
SeaTunnelRow(values));
+ });
+ }
+ signalNoMoreElement();
+ }
+ if (noMoreSplit
+ && splits.isEmpty()
+ &&
Boundedness.BOUNDED.equals(readerContext.getBoundedness())) {
+ signalNoMoreElement();
}
- this.readerContext.signalNoMoreElement();
- this.splits.clear();
}
}
+ private void signalNoMoreElement() {
+ log.info("Closed the bounded ClickHouse source");
+ this.readerContext.signalNoMoreElement();
+ this.splits.clear();
+ }
+
@Override
public List<ClickhouseSourceSplit> snapshotState(long checkpointId) throws
Exception {
return Collections.emptyList();
@@ -111,7 +127,9 @@ public class ClickhouseSourceReader implements
SourceReader<SeaTunnelRow, Clickh
}
@Override
- public void handleNoMoreSplits() {}
+ public void handleNoMoreSplits() {
+ noMoreSplit = true;
+ }
@Override
public void notifyCheckpointComplete(long checkpointId) throws Exception {}
diff --git
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseSourceSplitEnumerator.java
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseSourceSplitEnumerator.java
index c0eb4b6c70..f3c1bd0c47 100644
---
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseSourceSplitEnumerator.java
+++
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseSourceSplitEnumerator.java
@@ -78,6 +78,7 @@ public class ClickhouseSourceSplitEnumerator
assigned = subtaskId;
context.assignSplit(subtaskId, new ClickhouseSourceSplit());
}
+ context.signalNoMoreSplits(subtaskId);
}
@Override
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-clickhouse-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/ClickhouseIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-clickhouse-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/ClickhouseIT.java
index 66ee281740..76bdfaa281 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-clickhouse-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/ClickhouseIT.java
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-clickhouse-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/ClickhouseIT.java
@@ -101,6 +101,12 @@ public class ClickhouseIT extends TestSuiteBase implements
TestResource {
clearSinkTable();
}
+ @TestTemplate
+ public void testSourceParallelism(TestContainer container) throws
Exception {
+ Container.ExecResult execResult =
container.executeJob("/clickhouse_to_console.conf");
+ Assertions.assertEquals(0, execResult.getExitCode());
+ }
+
@BeforeAll
@Override
public void startUp() throws Exception {
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-clickhouse-e2e/src/test/resources/clickhouse_to_console.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-clickhouse-e2e/src/test/resources/clickhouse_to_console.conf
new file mode 100644
index 0000000000..e996be8e4a
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-clickhouse-e2e/src/test/resources/clickhouse_to_console.conf
@@ -0,0 +1,45 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in
seatunnel config
+######
+
+env {
+ parallelism = 3
+ job.mode = "BATCH"
+}
+
+source {
+ # This is a example source plugin **only for test and demonstrate the
feature source plugin**
+ Clickhouse {
+ host = "clickhouse:8123"
+ database = "default"
+ sql = "select * from source_table"
+ username = "default"
+ password = ""
+ result_table_name = "source_table"
+ }
+ # If you would like to get more information about how to configure seatunnel
and see full list of source plugins,
+ # please go to
https://seatunnel.apache.org/docs/connector-v2/source/ClickhouseSource
+}
+
+sink {
+ console {
+ }
+ # If you would like to get more information about how to configure seatunnel
and see full list of sink plugins,
+ # please go to https://seatunnel.apache.org/docs/connector-v2/sink
+}
\ No newline at end of file
diff --git
a/seatunnel-engine/seatunnel-engine-common/src/main/resources/seatunnel.yaml
b/seatunnel-engine/seatunnel-engine-common/src/main/resources/seatunnel.yaml
index 33acf185f1..9177a9afca 100644
--- a/seatunnel-engine/seatunnel-engine-common/src/main/resources/seatunnel.yaml
+++ b/seatunnel-engine/seatunnel-engine-common/src/main/resources/seatunnel.yaml
@@ -20,6 +20,7 @@ seatunnel:
backup-count: 1
queue-type: blockingqueue
print-execution-info-interval: 60
+ print-job-metrics-info-interval: 60
slot-service:
dynamic-slot: true
checkpoint: