This is an automated email from the ASF dual-hosted git repository.

wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 8d9c6a3714 [Fix][Connector-V2] Fixed clickhouse connectors cannot stop 
under multiple parallelism (#7921)
8d9c6a3714 is described below

commit 8d9c6a371499329a8f3a7cc1046ef3a7841fdad4
Author: YOMO LEE <[email protected]>
AuthorDate: Tue Oct 29 20:48:38 2024 +0800

    [Fix][Connector-V2] Fixed clickhouse connectors cannot stop under multiple 
parallelism (#7921)
---
 .../clickhouse/source/ClickhouseSourceReader.java  | 60 ++++++++++++++--------
 .../source/ClickhouseSourceSplitEnumerator.java    |  1 +
 .../seatunnel/clickhouse/ClickhouseIT.java         |  6 +++
 .../src/test/resources/clickhouse_to_console.conf  | 45 ++++++++++++++++
 .../src/main/resources/seatunnel.yaml              |  1 +
 5 files changed, 92 insertions(+), 21 deletions(-)

diff --git 
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseSourceReader.java
 
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseSourceReader.java
index 591334d972..3ad0ec041e 100644
--- 
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseSourceReader.java
+++ 
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseSourceReader.java
@@ -17,6 +17,7 @@
 
 package org.apache.seatunnel.connectors.seatunnel.clickhouse.source;
 
+import org.apache.seatunnel.api.source.Boundedness;
 import org.apache.seatunnel.api.source.Collector;
 import org.apache.seatunnel.api.source.SourceReader;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
@@ -28,6 +29,7 @@ import com.clickhouse.client.ClickHouseFormat;
 import com.clickhouse.client.ClickHouseNode;
 import com.clickhouse.client.ClickHouseRequest;
 import com.clickhouse.client.ClickHouseResponse;
+import lombok.extern.slf4j.Slf4j;
 
 import java.io.IOException;
 import java.util.ArrayList;
@@ -35,6 +37,7 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Random;
 
+@Slf4j
 public class ClickhouseSourceReader implements SourceReader<SeaTunnelRow, 
ClickhouseSourceSplit> {
 
     private final List<ClickHouseNode> servers;
@@ -43,6 +46,7 @@ public class ClickhouseSourceReader implements 
SourceReader<SeaTunnelRow, Clickh
     private final SourceReader.Context readerContext;
     private ClickHouseRequest<?> request;
     private final String sql;
+    private volatile boolean noMoreSplit;
 
     private final List<ClickhouseSourceSplit> splits;
 
@@ -75,31 +79,43 @@ public class ClickhouseSourceReader implements 
SourceReader<SeaTunnelRow, Clickh
 
     @Override
     public void pollNext(Collector<SeaTunnelRow> output) throws Exception {
-        if (!splits.isEmpty()) {
-            try (ClickHouseResponse response = 
this.request.query(sql).executeAndWait()) {
-                response.stream()
-                        .forEach(
-                                record -> {
-                                    Object[] values =
-                                            new 
Object[this.rowTypeInfo.getFieldNames().length];
-                                    for (int i = 0; i < record.size(); i++) {
-                                        if 
(record.getValue(i).isNullOrEmpty()) {
-                                            values[i] = null;
-                                        } else {
-                                            values[i] =
-                                                    
TypeConvertUtil.valueUnwrap(
-                                                            
this.rowTypeInfo.getFieldType(i),
-                                                            
record.getValue(i));
+        synchronized (output.getCheckpointLock()) {
+            if (!splits.isEmpty()) {
+                try (ClickHouseResponse response = 
this.request.query(sql).executeAndWait()) {
+                    response.stream()
+                            .forEach(
+                                    record -> {
+                                        Object[] values =
+                                                new 
Object[this.rowTypeInfo.getFieldNames().length];
+                                        for (int i = 0; i < record.size(); 
i++) {
+                                            if 
(record.getValue(i).isNullOrEmpty()) {
+                                                values[i] = null;
+                                            } else {
+                                                values[i] =
+                                                        
TypeConvertUtil.valueUnwrap(
+                                                                
this.rowTypeInfo.getFieldType(i),
+                                                                
record.getValue(i));
+                                            }
                                         }
-                                    }
-                                    output.collect(new SeaTunnelRow(values));
-                                });
+                                        output.collect(new 
SeaTunnelRow(values));
+                                    });
+                }
+                signalNoMoreElement();
+            }
+            if (noMoreSplit
+                    && splits.isEmpty()
+                    && 
Boundedness.BOUNDED.equals(readerContext.getBoundedness())) {
+                signalNoMoreElement();
             }
-            this.readerContext.signalNoMoreElement();
-            this.splits.clear();
         }
     }
 
+    private void signalNoMoreElement() {
+        log.info("Closed the bounded ClickHouse source");
+        this.readerContext.signalNoMoreElement();
+        this.splits.clear();
+    }
+
     @Override
     public List<ClickhouseSourceSplit> snapshotState(long checkpointId) throws 
Exception {
         return Collections.emptyList();
@@ -111,7 +127,9 @@ public class ClickhouseSourceReader implements 
SourceReader<SeaTunnelRow, Clickh
     }
 
     @Override
-    public void handleNoMoreSplits() {}
+    public void handleNoMoreSplits() {
+        noMoreSplit = true;
+    }
 
     @Override
     public void notifyCheckpointComplete(long checkpointId) throws Exception {}
diff --git 
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseSourceSplitEnumerator.java
 
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseSourceSplitEnumerator.java
index c0eb4b6c70..f3c1bd0c47 100644
--- 
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseSourceSplitEnumerator.java
+++ 
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseSourceSplitEnumerator.java
@@ -78,6 +78,7 @@ public class ClickhouseSourceSplitEnumerator
             assigned = subtaskId;
             context.assignSplit(subtaskId, new ClickhouseSourceSplit());
         }
+        context.signalNoMoreSplits(subtaskId);
     }
 
     @Override
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-clickhouse-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/ClickhouseIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-clickhouse-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/ClickhouseIT.java
index 66ee281740..76bdfaa281 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-clickhouse-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/ClickhouseIT.java
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-clickhouse-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/ClickhouseIT.java
@@ -101,6 +101,12 @@ public class ClickhouseIT extends TestSuiteBase implements 
TestResource {
         clearSinkTable();
     }
 
+    @TestTemplate
+    public void testSourceParallelism(TestContainer container) throws 
Exception {
+        Container.ExecResult execResult = 
container.executeJob("/clickhouse_to_console.conf");
+        Assertions.assertEquals(0, execResult.getExitCode());
+    }
+
     @BeforeAll
     @Override
     public void startUp() throws Exception {
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-clickhouse-e2e/src/test/resources/clickhouse_to_console.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-clickhouse-e2e/src/test/resources/clickhouse_to_console.conf
new file mode 100644
index 0000000000..e996be8e4a
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-clickhouse-e2e/src/test/resources/clickhouse_to_console.conf
@@ -0,0 +1,45 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in 
seatunnel config
+######
+
+env {
+  parallelism = 3
+  job.mode = "BATCH"
+}
+
+source {
+  # This is a example source plugin **only for test and demonstrate the 
feature source plugin**
+  Clickhouse {
+    host = "clickhouse:8123"
+    database = "default"
+    sql = "select * from source_table"
+    username = "default"
+    password = ""
+    result_table_name = "source_table"
+  }
+  # If you would like to get more information about how to configure seatunnel 
and see full list of source plugins,
+  # please go to 
https://seatunnel.apache.org/docs/connector-v2/source/ClickhouseSource
+}
+
+sink {
+  console {
+  }
+  # If you would like to get more information about how to configure seatunnel 
and see full list of sink plugins,
+  # please go to https://seatunnel.apache.org/docs/connector-v2/sink
+}
\ No newline at end of file
diff --git 
a/seatunnel-engine/seatunnel-engine-common/src/main/resources/seatunnel.yaml 
b/seatunnel-engine/seatunnel-engine-common/src/main/resources/seatunnel.yaml
index 33acf185f1..9177a9afca 100644
--- a/seatunnel-engine/seatunnel-engine-common/src/main/resources/seatunnel.yaml
+++ b/seatunnel-engine/seatunnel-engine-common/src/main/resources/seatunnel.yaml
@@ -20,6 +20,7 @@ seatunnel:
         backup-count: 1
         queue-type: blockingqueue
         print-execution-info-interval: 60
+        print-job-metrics-info-interval: 60
         slot-service:
             dynamic-slot: true
         checkpoint:

Reply via email to