This is an automated email from the ASF dual-hosted git repository.

gaojun2048 pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 5328e9d84 [Improve][Connector-V2][JDBC] Add exactly-once for JDBC 
source connector (#3750)
5328e9d84 is described below

commit 5328e9d8475468f385d7f40e4baefbee1046a66e
Author: TaoZex <[email protected]>
AuthorDate: Mon Dec 26 15:08:55 2022 +0800

    [Improve][Connector-V2][JDBC] Add exactly-once for JDBC source connector 
(#3750)
    
    * [Improve][Connector-V2][JDBC] Add exactly-once for JDBC source connector
---
 docs/en/connector-v2/source/InfluxDB.md            |   3 +
 docs/en/connector-v2/source/Jdbc.md                |   2 +-
 .../seatunnel/jdbc/source/JdbcSource.java          |   2 +-
 .../jdbc/source/JdbcSourceSplitEnumerator.java     | 118 +++++++++++++++------
 .../seatunnel/jdbc/state/JdbcSourceState.java      |  11 ++
 5 files changed, 101 insertions(+), 35 deletions(-)

diff --git a/docs/en/connector-v2/source/InfluxDB.md 
b/docs/en/connector-v2/source/InfluxDB.md
index 501b932a5..3e711080e 100644
--- a/docs/en/connector-v2/source/InfluxDB.md
+++ b/docs/en/connector-v2/source/InfluxDB.md
@@ -117,14 +117,17 @@ the `partition_num` of the InfluxDB when you select
 > Tips: Ensure that `upper_bound` minus `lower_bound` is divided 
 > `bypartition_num`, otherwise the query results will overlap
 
 ### epoch [string]
+
 returned time precision
 - Optional values: H, m, s, MS, u, n
 - default value: n
 
 ### query_timeout_sec [int]
+
 the `query_timeout` of the InfluxDB when you select, in seconds
 
 ### connect_timeout_ms [long]
+
 the timeout for connecting to InfluxDB, in milliseconds
 
 ### common options
diff --git a/docs/en/connector-v2/source/Jdbc.md 
b/docs/en/connector-v2/source/Jdbc.md
index 53a590274..7aacdb8bf 100644
--- a/docs/en/connector-v2/source/Jdbc.md
+++ b/docs/en/connector-v2/source/Jdbc.md
@@ -18,7 +18,7 @@ e.g. If you use MySQL, should download and copy 
`mysql-connector-java-xxx.jar` t
 
 - [x] [batch](../../concept/connector-v2-features.md)
 - [ ] [stream](../../concept/connector-v2-features.md)
-- [ ] [exactly-once](../../concept/connector-v2-features.md)
+- [x] [exactly-once](../../concept/connector-v2-features.md)
 - [x] [schema projection](../../concept/connector-v2-features.md)
 
 supports query SQL and can achieve projection effect.
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/JdbcSource.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/JdbcSource.java
index ca0cd95c6..076f8a17a 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/JdbcSource.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/JdbcSource.java
@@ -122,7 +122,7 @@ public class JdbcSource implements 
SeaTunnelSource<SeaTunnelRow, JdbcSourceSplit
 
     @Override
     public SourceSplitEnumerator<JdbcSourceSplit, JdbcSourceState> 
restoreEnumerator(SourceSplitEnumerator.Context<JdbcSourceSplit> 
enumeratorContext, JdbcSourceState checkpointState) throws Exception {
-        return new JdbcSourceSplitEnumerator(enumeratorContext, 
jdbcSourceOptions, partitionParameter);
+        return new JdbcSourceSplitEnumerator(enumeratorContext, 
jdbcSourceOptions, partitionParameter, checkpointState);
     }
 
     private SeaTunnelRowType initTableField(Connection conn) {
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/JdbcSourceSplitEnumerator.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/JdbcSourceSplitEnumerator.java
index 0a10fe125..b9c421952 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/JdbcSourceSplitEnumerator.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/JdbcSourceSplitEnumerator.java
@@ -18,7 +18,9 @@
 package org.apache.seatunnel.connectors.seatunnel.jdbc.source;
 
 import org.apache.seatunnel.api.source.SourceSplitEnumerator;
+import org.apache.seatunnel.common.exception.CommonErrorCode;
 import org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcSourceOptions;
+import 
org.apache.seatunnel.connectors.seatunnel.jdbc.exception.JdbcConnectorException;
 import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.split.JdbcNumericBetweenParametersProvider;
 import org.apache.seatunnel.connectors.seatunnel.jdbc.state.JdbcSourceState;
 
@@ -28,6 +30,8 @@ import org.slf4j.LoggerFactory;
 import java.io.IOException;
 import java.io.Serializable;
 import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -38,16 +42,28 @@ public class JdbcSourceSplitEnumerator implements 
SourceSplitEnumerator<JdbcSour
     private static final Logger LOG = 
LoggerFactory.getLogger(JdbcSourceSplitEnumerator.class);
     private final SourceSplitEnumerator.Context<JdbcSourceSplit> 
enumeratorContext;
 
-    private final Map<Integer, Set<JdbcSourceSplit>> pendingSplits;
+    private final Map<Integer, List<JdbcSourceSplit>> pendingSplits;
+
+    private final Object stateLock = new Object();
+    private volatile boolean shouldEnumerate;
 
     private JdbcSourceOptions jdbcSourceOptions;
     private final PartitionParameter partitionParameter;
 
     public 
JdbcSourceSplitEnumerator(SourceSplitEnumerator.Context<JdbcSourceSplit> 
enumeratorContext, JdbcSourceOptions jdbcSourceOptions, PartitionParameter 
partitionParameter) {
+        this(enumeratorContext, jdbcSourceOptions, partitionParameter, null);
+    }
+
+    public 
JdbcSourceSplitEnumerator(SourceSplitEnumerator.Context<JdbcSourceSplit> 
enumeratorContext, JdbcSourceOptions jdbcSourceOptions, PartitionParameter 
partitionParameter, JdbcSourceState sourceState) {
         this.enumeratorContext = enumeratorContext;
         this.jdbcSourceOptions = jdbcSourceOptions;
         this.partitionParameter = partitionParameter;
         this.pendingSplits = new HashMap<>();
+        this.shouldEnumerate = sourceState == null;
+        if (sourceState != null) {
+            this.shouldEnumerate = sourceState.isShouldEnumerate();
+            this.pendingSplits.putAll(sourceState.getPendingSplits());
+        }
     }
 
     @Override
@@ -57,12 +73,25 @@ public class JdbcSourceSplitEnumerator implements 
SourceSplitEnumerator<JdbcSour
 
     @Override
     public void run() throws Exception {
-        discoverySplits();
-        assignPendingSplits();
+        Set<Integer> readers = enumeratorContext.registeredReaders();
+        if (shouldEnumerate) {
+            Set<JdbcSourceSplit> newSplits = discoverySplits();
+
+            synchronized (stateLock) {
+                addPendingSplit(newSplits);
+                shouldEnumerate = false;
+            }
+
+            assignSplit(readers);
+        }
+
+        LOG.debug("No more splits to assign." +
+                " Sending NoMoreSplitsEvent to reader {}.", readers);
+        readers.forEach(enumeratorContext::signalNoMoreSplits);
     }
 
-    private void discoverySplits() {
-        List<JdbcSourceSplit> allSplit = new ArrayList<>();
+    private Set<JdbcSourceSplit> discoverySplits() {
+        Set<JdbcSourceSplit> allSplit = new HashSet<>();
         LOG.info("Starting to calculate splits.");
         if (null != partitionParameter) {
             int partitionNumber = partitionParameter.getPartitionNumber() != 
null ?
@@ -77,30 +106,7 @@ public class JdbcSourceSplitEnumerator implements 
SourceSplitEnumerator<JdbcSour
         } else {
             allSplit.add(new JdbcSourceSplit(null, 0));
         }
-        int numReaders = enumeratorContext.currentParallelism();
-        for (JdbcSourceSplit split : allSplit) {
-            int ownerReader = split.splitId % numReaders;
-            pendingSplits.computeIfAbsent(ownerReader, r -> new HashSet<>())
-                .add(split);
-        }
-        LOG.debug("Assigned {} to {} readers.", allSplit, numReaders);
-        LOG.info("Calculated splits successfully, the size of splits is {}.", 
allSplit.size());
-    }
-
-    private void assignPendingSplits() {
-        // Check if there's any pending splits for given readers
-        for (int pendingReader : enumeratorContext.registeredReaders()) {
-            // Remove pending assignment for the reader
-            final Set<JdbcSourceSplit> pendingAssignmentForReader =
-                pendingSplits.remove(pendingReader);
-
-            if (pendingAssignmentForReader != null && 
!pendingAssignmentForReader.isEmpty()) {
-                // Assign pending splits to reader
-                LOG.info("Assigning splits to readers {}", 
pendingAssignmentForReader);
-                enumeratorContext.assignSplit(pendingReader, new 
ArrayList<>(pendingAssignmentForReader));
-            }
-            enumeratorContext.signalNoMoreSplits(pendingReader);
-        }
+        return allSplit;
     }
 
     @Override
@@ -110,26 +116,72 @@ public class JdbcSourceSplitEnumerator implements 
SourceSplitEnumerator<JdbcSour
 
     @Override
     public void addSplitsBack(List<JdbcSourceSplit> splits, int subtaskId) {
+        LOG.debug("Add back splits {} to JdbcSourceSplitEnumerator.",
+                splits);
+        if (!splits.isEmpty()) {
+            addPendingSplit(splits);
+            assignSplit(Collections.singletonList(subtaskId));
+        }
+    }
+
+    private void addPendingSplit(Collection<JdbcSourceSplit> splits) {
+        int readerCount = enumeratorContext.currentParallelism();
+        for (JdbcSourceSplit split : splits) {
+            int ownerReader = getSplitOwner(split.splitId(), readerCount);
+            LOG.info("Assigning {} to {} reader.", split, ownerReader);
+            pendingSplits.computeIfAbsent(ownerReader, r -> new ArrayList<>())
+                    .add(split);
+        }
+    }
+
+    private static int getSplitOwner(String tp, int numReaders) {
+        return (tp.hashCode() & Integer.MAX_VALUE) % numReaders;
+    }
+
+    private void assignSplit(Collection<Integer> readers) {
+        LOG.debug("Assign pendingSplits to readers {}", readers);
+
+        for (int reader : readers) {
+            List<JdbcSourceSplit> assignmentForReader = 
pendingSplits.remove(reader);
+            if (assignmentForReader != null && !assignmentForReader.isEmpty()) 
{
+                LOG.info("Assign splits {} to reader {}",
+                        assignmentForReader, reader);
+                try {
+                    enumeratorContext.assignSplit(reader, assignmentForReader);
+                } catch (Exception e) {
+                    LOG.error("Failed to assign splits {} to reader {}",
+                            assignmentForReader, reader, e);
+                    pendingSplits.put(reader, assignmentForReader);
+                }
+            }
+        }
     }
 
     @Override
     public int currentUnassignedSplitSize() {
-        return 0;
+        return pendingSplits.size();
     }
 
     @Override
     public void handleSplitRequest(int subtaskId) {
-
+        throw new JdbcConnectorException(CommonErrorCode.UNSUPPORTED_OPERATION,
+                String.format("Unsupported handleSplitRequest: %d", 
subtaskId));
     }
 
     @Override
     public void registerReader(int subtaskId) {
-        // nothing
+        LOG.debug("Register reader {} to JdbcSourceSplitEnumerator.",
+                subtaskId);
+        if (!pendingSplits.isEmpty()) {
+            assignSplit(Collections.singletonList(subtaskId));
+        }
     }
 
     @Override
     public JdbcSourceState snapshotState(long checkpointId) throws Exception {
-        return null;
+        synchronized (stateLock) {
+            return new JdbcSourceState(shouldEnumerate, pendingSplits);
+        }
     }
 
     @Override
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/state/JdbcSourceState.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/state/JdbcSourceState.java
index be9e088ec..711edb7e3 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/state/JdbcSourceState.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/state/JdbcSourceState.java
@@ -17,7 +17,18 @@
 
 package org.apache.seatunnel.connectors.seatunnel.jdbc.state;
 
+import org.apache.seatunnel.connectors.seatunnel.jdbc.source.JdbcSourceSplit;
+
+import lombok.AllArgsConstructor;
+import lombok.Getter;
+
 import java.io.Serializable;
+import java.util.List;
+import java.util.Map;
 
+@AllArgsConstructor
+@Getter
 public class JdbcSourceState implements Serializable {
+    private boolean shouldEnumerate;
+    private Map<Integer, List<JdbcSourceSplit>> pendingSplits;
 }

Reply via email to