hailin0 commented on code in PR #2904:
URL:
https://github.com/apache/incubator-seatunnel/pull/2904#discussion_r980697296
##########
seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/JdbcSourceSplitEnumerator.java:
##########
@@ -28,47 +28,81 @@
import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
-import java.util.stream.Collectors;
+import java.util.Map;
+import java.util.Set;
public class JdbcSourceSplitEnumerator implements
SourceSplitEnumerator<JdbcSourceSplit, JdbcSourceState> {
private static final Logger LOG =
LoggerFactory.getLogger(JdbcSourceSplitEnumerator.class);
private final SourceSplitEnumerator.Context<JdbcSourceSplit>
enumeratorContext;
- private List<JdbcSourceSplit> allSplit = new ArrayList<>();
+
+ private final Map<Integer, Set<JdbcSourceSplit>> pendingSplits;
+
private JdbcSourceOptions jdbcSourceOptions;
private final PartitionParameter partitionParameter;
- private final int parallelism;
public
JdbcSourceSplitEnumerator(SourceSplitEnumerator.Context<JdbcSourceSplit>
enumeratorContext, JdbcSourceOptions jdbcSourceOptions, PartitionParameter
partitionParameter) {
this.enumeratorContext = enumeratorContext;
this.jdbcSourceOptions = jdbcSourceOptions;
this.partitionParameter = partitionParameter;
- this.parallelism = enumeratorContext.currentParallelism();
+ this.pendingSplits = new HashMap<>();
}
@Override
public void open() {
+ // No connection needs to be opened
+ }
+
+ @Override
+ public void run() throws Exception {
+ discoverySplits();
+ assignPendingSplits();
+ }
+
+ private void discoverySplits() {
+ List<JdbcSourceSplit> allSplit = new ArrayList<>();
LOG.info("Starting to calculate splits.");
if (null != partitionParameter) {
JdbcNumericBetweenParametersProvider
jdbcNumericBetweenParametersProvider =
- new
JdbcNumericBetweenParametersProvider(partitionParameter.minValue,
partitionParameter.maxValue).ofBatchNum(parallelism);
+ new
JdbcNumericBetweenParametersProvider(partitionParameter.minValue,
partitionParameter.maxValue).ofBatchNum(enumeratorContext.currentParallelism());
Serializable[][] parameterValues =
jdbcNumericBetweenParametersProvider.getParameterValues();
for (int i = 0; i < parameterValues.length; i++) {
allSplit.add(new JdbcSourceSplit(parameterValues[i], i));
}
} else {
allSplit.add(new JdbcSourceSplit(null, 0));
}
+ int numReaders = enumeratorContext.currentParallelism();
+ for (JdbcSourceSplit split : allSplit) {
+ int ownerReader = split.splitId % numReaders;
+ pendingSplits.computeIfAbsent(ownerReader, r -> new HashSet<>())
+ .add(split);
+ }
+ LOG.debug("Assigned {} to {} readers.", allSplit, numReaders);
LOG.info("Calculated splits successfully, the size of splits is {}.",
allSplit.size());
}
- @Override
- public void run() throws Exception {
+ private void assignPendingSplits() {
+ // Check if there's any pending splits for given readers
+ for (int pendingReader : enumeratorContext.registeredReaders()) {
+ // Remove pending assignment for the reader
+ final Set<JdbcSourceSplit> pendingAssignmentForReader =
+ pendingSplits.remove(pendingReader);
Review Comment:
How to rollback `pendingSplits.remove()` ?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]