dmvk commented on code in PR #33606:
URL: https://github.com/apache/beam/pull/33606#discussion_r1918643568
##########
runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/FlinkSourceSplitEnumerator.java:
##########
@@ -111,10 +131,19 @@ public void handleSplitRequest(int subtaskId, @Nullable
String requesterHostname
@Override
public void addSplitsBack(List<FlinkSourceSplit<T>> splits, int subtaskId) {
- LOG.info("Adding splits {} back from subtask {}", splits, subtaskId);
- List<FlinkSourceSplit<T>> splitsForSubtask =
- pendingSplits.computeIfAbsent(subtaskId, ignored -> new ArrayList<>());
- splitsForSubtask.addAll(splits);
+ // reshuffle splits, needed after rescaling
Review Comment:
👍 would it make sense to provide additional reasoning?
##########
runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/FlinkSourceSplitEnumerator.java:
##########
@@ -63,45 +63,65 @@ public FlinkSourceSplitEnumerator(
Source<T> beamSource,
PipelineOptions pipelineOptions,
int numSplits) {
+
+ this(context, beamSource, pipelineOptions, numSplits, false);
+ }
+
+ public FlinkSourceSplitEnumerator(
+ SplitEnumeratorContext<FlinkSourceSplit<T>> context,
+ Source<T> beamSource,
+ PipelineOptions pipelineOptions,
+ int numSplits,
+ boolean splitsInitialized) {
+
this.context = context;
this.beamSource = beamSource;
this.pipelineOptions = pipelineOptions;
this.numSplits = numSplits;
this.pendingSplits = new HashMap<>(numSplits);
- this.splitsInitialized = false;
+ this.splitsInitialized = splitsInitialized;
+
+ LOG.info(
+ "Created new enumerator with parallelism {}, source {}, numSplits {},
initialized {}",
+ context.currentParallelism(),
+ beamSource,
+ numSplits,
+ splitsInitialized);
}
@Override
public void start() {
- context.callAsync(
- () -> {
- try {
- LOG.info("Starting source {}", beamSource);
- List<? extends Source<T>> beamSplitSourceList = splitBeamSource();
- Map<Integer, List<FlinkSourceSplit<T>>> flinkSourceSplitsList =
new HashMap<>();
- int i = 0;
- for (Source<T> beamSplitSource : beamSplitSourceList) {
- int targetSubtask = i % context.currentParallelism();
- List<FlinkSourceSplit<T>> splitsForTask =
- flinkSourceSplitsList.computeIfAbsent(
- targetSubtask, ignored -> new ArrayList<>());
- splitsForTask.add(new FlinkSourceSplit<>(i, beamSplitSource));
- i++;
+ if (!splitsInitialized) {
Review Comment:
WDYT about minimizing the changset and minizing nesting by doing
```
@Override
public void start() {
if (!splitsInitialized) {
initializeSplits()
}
}
```
?
##########
runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/FlinkSourceSplitEnumerator.java:
##########
@@ -111,10 +131,19 @@ public void handleSplitRequest(int subtaskId, @Nullable
String requesterHostname
@Override
public void addSplitsBack(List<FlinkSourceSplit<T>> splits, int subtaskId) {
- LOG.info("Adding splits {} back from subtask {}", splits, subtaskId);
- List<FlinkSourceSplit<T>> splitsForSubtask =
- pendingSplits.computeIfAbsent(subtaskId, ignored -> new ArrayList<>());
- splitsForSubtask.addAll(splits);
+ // reshuffle splits, needed after rescaling
+ splits.forEach(
+ split -> {
+ int target = split.splitIndex() % context.currentParallelism();
+ List<FlinkSourceSplit<T>> splitsForSubtask =
+ pendingSplits.computeIfAbsent(target, ignored -> new
ArrayList<>());
+ splitsForSubtask.add(split);
+ });
+ LOG.info(
Review Comment:
```suggestion
LOG.debug(
```
##########
runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/FlinkSourceSplitEnumerator.java:
##########
@@ -63,45 +63,65 @@ public FlinkSourceSplitEnumerator(
Source<T> beamSource,
PipelineOptions pipelineOptions,
int numSplits) {
+
+ this(context, beamSource, pipelineOptions, numSplits, false);
+ }
+
+ public FlinkSourceSplitEnumerator(
+ SplitEnumeratorContext<FlinkSourceSplit<T>> context,
+ Source<T> beamSource,
+ PipelineOptions pipelineOptions,
+ int numSplits,
+ boolean splitsInitialized) {
+
this.context = context;
this.beamSource = beamSource;
this.pipelineOptions = pipelineOptions;
this.numSplits = numSplits;
this.pendingSplits = new HashMap<>(numSplits);
- this.splitsInitialized = false;
+ this.splitsInitialized = splitsInitialized;
+
+ LOG.info(
+ "Created new enumerator with parallelism {}, source {}, numSplits {},
initialized {}",
+ context.currentParallelism(),
+ beamSource,
+ numSplits,
+ splitsInitialized);
}
@Override
public void start() {
- context.callAsync(
- () -> {
- try {
- LOG.info("Starting source {}", beamSource);
- List<? extends Source<T>> beamSplitSourceList = splitBeamSource();
- Map<Integer, List<FlinkSourceSplit<T>>> flinkSourceSplitsList =
new HashMap<>();
- int i = 0;
- for (Source<T> beamSplitSource : beamSplitSourceList) {
- int targetSubtask = i % context.currentParallelism();
- List<FlinkSourceSplit<T>> splitsForTask =
- flinkSourceSplitsList.computeIfAbsent(
- targetSubtask, ignored -> new ArrayList<>());
- splitsForTask.add(new FlinkSourceSplit<>(i, beamSplitSource));
- i++;
+ if (!splitsInitialized) {
Review Comment:
hmm, is this correct? are splits guaranteed to be stable over time? is it
possible that you for example want to re-list kafka partitions? How are new
partitions handled in Beam?
##########
runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/FlinkSourceSplitEnumerator.java:
##########
@@ -63,45 +63,65 @@ public FlinkSourceSplitEnumerator(
Source<T> beamSource,
PipelineOptions pipelineOptions,
int numSplits) {
+
+ this(context, beamSource, pipelineOptions, numSplits, false);
+ }
+
+ public FlinkSourceSplitEnumerator(
+ SplitEnumeratorContext<FlinkSourceSplit<T>> context,
+ Source<T> beamSource,
+ PipelineOptions pipelineOptions,
+ int numSplits,
+ boolean splitsInitialized) {
+
this.context = context;
this.beamSource = beamSource;
this.pipelineOptions = pipelineOptions;
this.numSplits = numSplits;
this.pendingSplits = new HashMap<>(numSplits);
- this.splitsInitialized = false;
+ this.splitsInitialized = splitsInitialized;
+
+ LOG.info(
Review Comment:
debug?
##########
runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/FlinkSource.java:
##########
@@ -117,7 +103,8 @@ public Boundedness getBoundedness() {
@Override
public SplitEnumerator<FlinkSourceSplit<T>, Map<Integer,
List<FlinkSourceSplit<T>>>>
- createEnumerator(SplitEnumeratorContext<FlinkSourceSplit<T>>
enumContext) throws Exception {
+ createEnumerator(SplitEnumeratorContext<FlinkSourceSplit<T>>
enumContext) {
Review Comment:
nit: unrelated change -> separate commit
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]