This is an automated email from the ASF dual-hosted git repository.
gaojun2048 pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 5328e9d84 [Improve][Connector-V2][JDBC] Add exactly-once for JDBC
source connector (#3750)
5328e9d84 is described below
commit 5328e9d8475468f385d7f40e4baefbee1046a66e
Author: TaoZex <[email protected]>
AuthorDate: Mon Dec 26 15:08:55 2022 +0800
[Improve][Connector-V2][JDBC] Add exactly-once for JDBC source connector
(#3750)
* [Improve][Connector-V2][JDBC] Add exactly-once for JDBC source connector
---
docs/en/connector-v2/source/InfluxDB.md | 3 +
docs/en/connector-v2/source/Jdbc.md | 2 +-
.../seatunnel/jdbc/source/JdbcSource.java | 2 +-
.../jdbc/source/JdbcSourceSplitEnumerator.java | 118 +++++++++++++++------
.../seatunnel/jdbc/state/JdbcSourceState.java | 11 ++
5 files changed, 101 insertions(+), 35 deletions(-)
diff --git a/docs/en/connector-v2/source/InfluxDB.md
b/docs/en/connector-v2/source/InfluxDB.md
index 501b932a5..3e711080e 100644
--- a/docs/en/connector-v2/source/InfluxDB.md
+++ b/docs/en/connector-v2/source/InfluxDB.md
@@ -117,14 +117,17 @@ the `partition_num` of the InfluxDB when you select
> Tips: Ensure that `upper_bound` minus `lower_bound` is divided
> `bypartition_num`, otherwise the query results will overlap
### epoch [string]
+
returned time precision
- Optional values: H, m, s, MS, u, n
- default value: n
### query_timeout_sec [int]
+
the `query_timeout` of the InfluxDB when you select, in seconds
### connect_timeout_ms [long]
+
the timeout for connecting to InfluxDB, in milliseconds
### common options
diff --git a/docs/en/connector-v2/source/Jdbc.md
b/docs/en/connector-v2/source/Jdbc.md
index 53a590274..7aacdb8bf 100644
--- a/docs/en/connector-v2/source/Jdbc.md
+++ b/docs/en/connector-v2/source/Jdbc.md
@@ -18,7 +18,7 @@ e.g. If you use MySQL, should download and copy
`mysql-connector-java-xxx.jar` t
- [x] [batch](../../concept/connector-v2-features.md)
- [ ] [stream](../../concept/connector-v2-features.md)
-- [ ] [exactly-once](../../concept/connector-v2-features.md)
+- [x] [exactly-once](../../concept/connector-v2-features.md)
- [x] [schema projection](../../concept/connector-v2-features.md)
supports query SQL and can achieve projection effect.
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/JdbcSource.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/JdbcSource.java
index ca0cd95c6..076f8a17a 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/JdbcSource.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/JdbcSource.java
@@ -122,7 +122,7 @@ public class JdbcSource implements
SeaTunnelSource<SeaTunnelRow, JdbcSourceSplit
@Override
public SourceSplitEnumerator<JdbcSourceSplit, JdbcSourceState>
restoreEnumerator(SourceSplitEnumerator.Context<JdbcSourceSplit>
enumeratorContext, JdbcSourceState checkpointState) throws Exception {
- return new JdbcSourceSplitEnumerator(enumeratorContext,
jdbcSourceOptions, partitionParameter);
+ return new JdbcSourceSplitEnumerator(enumeratorContext,
jdbcSourceOptions, partitionParameter, checkpointState);
}
private SeaTunnelRowType initTableField(Connection conn) {
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/JdbcSourceSplitEnumerator.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/JdbcSourceSplitEnumerator.java
index 0a10fe125..b9c421952 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/JdbcSourceSplitEnumerator.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/JdbcSourceSplitEnumerator.java
@@ -18,7 +18,9 @@
package org.apache.seatunnel.connectors.seatunnel.jdbc.source;
import org.apache.seatunnel.api.source.SourceSplitEnumerator;
+import org.apache.seatunnel.common.exception.CommonErrorCode;
import org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcSourceOptions;
+import
org.apache.seatunnel.connectors.seatunnel.jdbc.exception.JdbcConnectorException;
import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.split.JdbcNumericBetweenParametersProvider;
import org.apache.seatunnel.connectors.seatunnel.jdbc.state.JdbcSourceState;
@@ -28,6 +30,8 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@@ -38,16 +42,28 @@ public class JdbcSourceSplitEnumerator implements
SourceSplitEnumerator<JdbcSour
private static final Logger LOG =
LoggerFactory.getLogger(JdbcSourceSplitEnumerator.class);
private final SourceSplitEnumerator.Context<JdbcSourceSplit>
enumeratorContext;
- private final Map<Integer, Set<JdbcSourceSplit>> pendingSplits;
+ private final Map<Integer, List<JdbcSourceSplit>> pendingSplits;
+
+ private final Object stateLock = new Object();
+ private volatile boolean shouldEnumerate;
private JdbcSourceOptions jdbcSourceOptions;
private final PartitionParameter partitionParameter;
public
JdbcSourceSplitEnumerator(SourceSplitEnumerator.Context<JdbcSourceSplit>
enumeratorContext, JdbcSourceOptions jdbcSourceOptions, PartitionParameter
partitionParameter) {
+ this(enumeratorContext, jdbcSourceOptions, partitionParameter, null);
+ }
+
+ public
JdbcSourceSplitEnumerator(SourceSplitEnumerator.Context<JdbcSourceSplit>
enumeratorContext, JdbcSourceOptions jdbcSourceOptions, PartitionParameter
partitionParameter, JdbcSourceState sourceState) {
this.enumeratorContext = enumeratorContext;
this.jdbcSourceOptions = jdbcSourceOptions;
this.partitionParameter = partitionParameter;
this.pendingSplits = new HashMap<>();
+ this.shouldEnumerate = sourceState == null;
+ if (sourceState != null) {
+ this.shouldEnumerate = sourceState.isShouldEnumerate();
+ this.pendingSplits.putAll(sourceState.getPendingSplits());
+ }
}
@Override
@@ -57,12 +73,25 @@ public class JdbcSourceSplitEnumerator implements
SourceSplitEnumerator<JdbcSour
@Override
public void run() throws Exception {
- discoverySplits();
- assignPendingSplits();
+ Set<Integer> readers = enumeratorContext.registeredReaders();
+ if (shouldEnumerate) {
+ Set<JdbcSourceSplit> newSplits = discoverySplits();
+
+ synchronized (stateLock) {
+ addPendingSplit(newSplits);
+ shouldEnumerate = false;
+ }
+
+ assignSplit(readers);
+ }
+
+ LOG.debug("No more splits to assign." +
+ " Sending NoMoreSplitsEvent to reader {}.", readers);
+ readers.forEach(enumeratorContext::signalNoMoreSplits);
}
- private void discoverySplits() {
- List<JdbcSourceSplit> allSplit = new ArrayList<>();
+ private Set<JdbcSourceSplit> discoverySplits() {
+ Set<JdbcSourceSplit> allSplit = new HashSet<>();
LOG.info("Starting to calculate splits.");
if (null != partitionParameter) {
int partitionNumber = partitionParameter.getPartitionNumber() !=
null ?
@@ -77,30 +106,7 @@ public class JdbcSourceSplitEnumerator implements
SourceSplitEnumerator<JdbcSour
} else {
allSplit.add(new JdbcSourceSplit(null, 0));
}
- int numReaders = enumeratorContext.currentParallelism();
- for (JdbcSourceSplit split : allSplit) {
- int ownerReader = split.splitId % numReaders;
- pendingSplits.computeIfAbsent(ownerReader, r -> new HashSet<>())
- .add(split);
- }
- LOG.debug("Assigned {} to {} readers.", allSplit, numReaders);
- LOG.info("Calculated splits successfully, the size of splits is {}.",
allSplit.size());
- }
-
- private void assignPendingSplits() {
- // Check if there's any pending splits for given readers
- for (int pendingReader : enumeratorContext.registeredReaders()) {
- // Remove pending assignment for the reader
- final Set<JdbcSourceSplit> pendingAssignmentForReader =
- pendingSplits.remove(pendingReader);
-
- if (pendingAssignmentForReader != null &&
!pendingAssignmentForReader.isEmpty()) {
- // Assign pending splits to reader
- LOG.info("Assigning splits to readers {}",
pendingAssignmentForReader);
- enumeratorContext.assignSplit(pendingReader, new
ArrayList<>(pendingAssignmentForReader));
- }
- enumeratorContext.signalNoMoreSplits(pendingReader);
- }
+ return allSplit;
}
@Override
@@ -110,26 +116,72 @@ public class JdbcSourceSplitEnumerator implements
SourceSplitEnumerator<JdbcSour
@Override
public void addSplitsBack(List<JdbcSourceSplit> splits, int subtaskId) {
+ LOG.debug("Add back splits {} to JdbcSourceSplitEnumerator.",
+ splits);
+ if (!splits.isEmpty()) {
+ addPendingSplit(splits);
+ assignSplit(Collections.singletonList(subtaskId));
+ }
+ }
+
+ private void addPendingSplit(Collection<JdbcSourceSplit> splits) {
+ int readerCount = enumeratorContext.currentParallelism();
+ for (JdbcSourceSplit split : splits) {
+ int ownerReader = getSplitOwner(split.splitId(), readerCount);
+ LOG.info("Assigning {} to {} reader.", split, ownerReader);
+ pendingSplits.computeIfAbsent(ownerReader, r -> new ArrayList<>())
+ .add(split);
+ }
+ }
+
+ private static int getSplitOwner(String tp, int numReaders) {
+ return (tp.hashCode() & Integer.MAX_VALUE) % numReaders;
+ }
+
+ private void assignSplit(Collection<Integer> readers) {
+ LOG.debug("Assign pendingSplits to readers {}", readers);
+
+ for (int reader : readers) {
+ List<JdbcSourceSplit> assignmentForReader =
pendingSplits.remove(reader);
+ if (assignmentForReader != null && !assignmentForReader.isEmpty())
{
+ LOG.info("Assign splits {} to reader {}",
+ assignmentForReader, reader);
+ try {
+ enumeratorContext.assignSplit(reader, assignmentForReader);
+ } catch (Exception e) {
+ LOG.error("Failed to assign splits {} to reader {}",
+ assignmentForReader, reader, e);
+ pendingSplits.put(reader, assignmentForReader);
+ }
+ }
+ }
}
@Override
public int currentUnassignedSplitSize() {
- return 0;
+ return pendingSplits.size();
}
@Override
public void handleSplitRequest(int subtaskId) {
-
+ throw new JdbcConnectorException(CommonErrorCode.UNSUPPORTED_OPERATION,
+ String.format("Unsupported handleSplitRequest: %d",
subtaskId));
}
@Override
public void registerReader(int subtaskId) {
- // nothing
+ LOG.debug("Register reader {} to JdbcSourceSplitEnumerator.",
+ subtaskId);
+ if (!pendingSplits.isEmpty()) {
+ assignSplit(Collections.singletonList(subtaskId));
+ }
}
@Override
public JdbcSourceState snapshotState(long checkpointId) throws Exception {
- return null;
+ synchronized (stateLock) {
+ return new JdbcSourceState(shouldEnumerate, pendingSplits);
+ }
}
@Override
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/state/JdbcSourceState.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/state/JdbcSourceState.java
index be9e088ec..711edb7e3 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/state/JdbcSourceState.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/state/JdbcSourceState.java
@@ -17,7 +17,18 @@
package org.apache.seatunnel.connectors.seatunnel.jdbc.state;
+import org.apache.seatunnel.connectors.seatunnel.jdbc.source.JdbcSourceSplit;
+
+import lombok.AllArgsConstructor;
+import lombok.Getter;
+
import java.io.Serializable;
+import java.util.List;
+import java.util.Map;
+@AllArgsConstructor
+@Getter
public class JdbcSourceState implements Serializable {
+ private boolean shouldEnumerate;
+ private Map<Integer, List<JdbcSourceSplit>> pendingSplits;
}