hailin0 commented on code in PR #2904:
URL: 
https://github.com/apache/incubator-seatunnel/pull/2904#discussion_r980688377


##########
seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/JdbcSourceReader.java:
##########
@@ -60,12 +60,14 @@ public void close() throws IOException {
     public void pollNext(Collector<SeaTunnelRow> output) throws Exception {
         JdbcSourceSplit split = splits.poll();

Review Comment:
   move to synchronized code block ->  line#64



##########
seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/JdbcSourceSplitEnumerator.java:
##########
@@ -28,47 +28,81 @@
 import java.io.IOException;
 import java.io.Serializable;
 import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
-import java.util.stream.Collectors;
+import java.util.Map;
+import java.util.Set;
 
 public class JdbcSourceSplitEnumerator implements 
SourceSplitEnumerator<JdbcSourceSplit, JdbcSourceState> {
     private static final Logger LOG = 
LoggerFactory.getLogger(JdbcSourceSplitEnumerator.class);
     private final SourceSplitEnumerator.Context<JdbcSourceSplit> 
enumeratorContext;
-    private List<JdbcSourceSplit> allSplit = new ArrayList<>();
+
+    private final Map<Integer, Set<JdbcSourceSplit>> pendingSplits;
+
     private JdbcSourceOptions jdbcSourceOptions;
     private final PartitionParameter partitionParameter;
-    private final int parallelism;
 
     public 
JdbcSourceSplitEnumerator(SourceSplitEnumerator.Context<JdbcSourceSplit> 
enumeratorContext, JdbcSourceOptions jdbcSourceOptions, PartitionParameter 
partitionParameter) {
         this.enumeratorContext = enumeratorContext;
         this.jdbcSourceOptions = jdbcSourceOptions;
         this.partitionParameter = partitionParameter;
-        this.parallelism = enumeratorContext.currentParallelism();
+        this.pendingSplits = new HashMap<>();
     }
 
     @Override
     public void open() {
+        // No connection needs to be opened
+    }
+
+    @Override
+    public void run() throws Exception {
+        discoverySplits();
+        assignPendingSplits();
+    }
+
+    private void discoverySplits() {
+        List<JdbcSourceSplit> allSplit = new ArrayList<>();
         LOG.info("Starting to calculate splits.");
         if (null != partitionParameter) {
             JdbcNumericBetweenParametersProvider 
jdbcNumericBetweenParametersProvider =
-                    new 
JdbcNumericBetweenParametersProvider(partitionParameter.minValue, 
partitionParameter.maxValue).ofBatchNum(parallelism);
+                new 
JdbcNumericBetweenParametersProvider(partitionParameter.minValue, 
partitionParameter.maxValue).ofBatchNum(enumeratorContext.currentParallelism());
             Serializable[][] parameterValues = 
jdbcNumericBetweenParametersProvider.getParameterValues();
             for (int i = 0; i < parameterValues.length; i++) {
                 allSplit.add(new JdbcSourceSplit(parameterValues[i], i));
             }
         } else {
             allSplit.add(new JdbcSourceSplit(null, 0));
         }
+        int numReaders = enumeratorContext.currentParallelism();
+        for (JdbcSourceSplit split : allSplit) {
+            int ownerReader = split.splitId % numReaders;
+            pendingSplits.computeIfAbsent(ownerReader, r -> new HashSet<>())
+                .add(split);
+        }
+        LOG.debug("Assigned {} to {} readers.", allSplit, numReaders);
         LOG.info("Calculated splits successfully, the size of splits is {}.", 
allSplit.size());
     }
 
-    @Override
-    public void run() throws Exception {
+    private void assignPendingSplits() {
+        // Check if there's any pending splits for given readers
+        for (int pendingReader : enumeratorContext.registeredReaders()) {
+            // Remove pending assignment for the reader
+            final Set<JdbcSourceSplit> pendingAssignmentForReader =
+                pendingSplits.remove(pendingReader);

Review Comment:
   How to rollback when an exception is thrown?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to