This is an automated email from the ASF dual-hosted git repository.

gaojun2048 pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 7622f2899 [Improve][connector-jdbc] Calculate splits only once in 
JdbcSourceSplitEnumerator (#2900)
7622f2899 is described below

commit 7622f28999c18ff1090c5bec08ef96d7c6c7eb65
Author: Xiao Zhao <[email protected]>
AuthorDate: Mon Sep 26 19:21:59 2022 +0800

    [Improve][connector-jdbc] Calculate splits only once in 
JdbcSourceSplitEnumerator (#2900)
---
 .../jdbc/source/JdbcSourceSplitEnumerator.java     | 43 +++++++++++++---------
 1 file changed, 25 insertions(+), 18 deletions(-)

diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/JdbcSourceSplitEnumerator.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/JdbcSourceSplitEnumerator.java
index f3b5368e9..71de5d176 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/JdbcSourceSplitEnumerator.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/JdbcSourceSplitEnumerator.java
@@ -22,6 +22,9 @@ import 
org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcSourceOptions;
 import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.split.JdbcNumericBetweenParametersProvider;
 import org.apache.seatunnel.connectors.seatunnel.jdbc.state.JdbcSourceState;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import java.io.IOException;
 import java.io.Serializable;
 import java.util.ArrayList;
@@ -29,20 +32,34 @@ import java.util.List;
 import java.util.stream.Collectors;
 
 public class JdbcSourceSplitEnumerator implements 
SourceSplitEnumerator<JdbcSourceSplit, JdbcSourceState> {
-
-    SourceSplitEnumerator.Context<JdbcSourceSplit> enumeratorContext;
-    List<JdbcSourceSplit> allSplit = new ArrayList<>();
-    JdbcSourceOptions jdbcSourceOptions;
-    PartitionParameter partitionParameter;
+    private static final Logger LOG = 
LoggerFactory.getLogger(JdbcSourceSplitEnumerator.class);
+    private final SourceSplitEnumerator.Context<JdbcSourceSplit> 
enumeratorContext;
+    private List<JdbcSourceSplit> allSplit = new ArrayList<>();
+    private JdbcSourceOptions jdbcSourceOptions;
+    private final PartitionParameter partitionParameter;
+    private final int parallelism;
 
     public 
JdbcSourceSplitEnumerator(SourceSplitEnumerator.Context<JdbcSourceSplit> 
enumeratorContext, JdbcSourceOptions jdbcSourceOptions, PartitionParameter 
partitionParameter) {
         this.enumeratorContext = enumeratorContext;
         this.jdbcSourceOptions = jdbcSourceOptions;
         this.partitionParameter = partitionParameter;
+        this.parallelism = enumeratorContext.currentParallelism();
     }
 
     @Override
     public void open() {
+        LOG.info("Starting to calculate splits.");
+        if (null != partitionParameter) {
+            JdbcNumericBetweenParametersProvider 
jdbcNumericBetweenParametersProvider =
+                    new 
JdbcNumericBetweenParametersProvider(partitionParameter.minValue, 
partitionParameter.maxValue).ofBatchNum(parallelism);
+            Serializable[][] parameterValues = 
jdbcNumericBetweenParametersProvider.getParameterValues();
+            for (int i = 0; i < parameterValues.length; i++) {
+                allSplit.add(new JdbcSourceSplit(parameterValues[i], i));
+            }
+        } else {
+            allSplit.add(new JdbcSourceSplit(null, 0));
+        }
+        LOG.info("Calculated splits successfully, the size of splits is {}.", 
allSplit.size());
     }
 
     @Override
@@ -70,20 +87,10 @@ public class JdbcSourceSplitEnumerator implements 
SourceSplitEnumerator<JdbcSour
 
     @Override
     public void registerReader(int subtaskId) {
-        int parallelism = enumeratorContext.currentParallelism();
-        if (allSplit.isEmpty()) {
-            if (null != partitionParameter) {
-                JdbcNumericBetweenParametersProvider 
jdbcNumericBetweenParametersProvider = new 
JdbcNumericBetweenParametersProvider(partitionParameter.minValue, 
partitionParameter.maxValue).ofBatchNum(parallelism);
-                Serializable[][] parameterValues = 
jdbcNumericBetweenParametersProvider.getParameterValues();
-                for (int i = 0; i < parameterValues.length; i++) {
-                    allSplit.add(new JdbcSourceSplit(parameterValues[i], i));
-                }
-            } else {
-                allSplit.add(new JdbcSourceSplit(null, 0));
-            }
-        }
         // Filter the split that the current task needs to run
-        List<JdbcSourceSplit> splits = allSplit.stream().filter(p -> p.splitId 
% parallelism == subtaskId).collect(Collectors.toList());
+        List<JdbcSourceSplit> splits = allSplit.stream()
+                .filter(p -> p.splitId % parallelism == subtaskId)
+                .collect(Collectors.toList());
         enumeratorContext.assignSplit(subtaskId, splits);
         enumeratorContext.signalNoMoreSplits(subtaskId);
     }

Reply via email to