dmvk commented on code in PR #33606:
URL: https://github.com/apache/beam/pull/33606#discussion_r1918643568


##########
runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/FlinkSourceSplitEnumerator.java:
##########
@@ -111,10 +131,19 @@ public void handleSplitRequest(int subtaskId, @Nullable 
String requesterHostname
 
   @Override
   public void addSplitsBack(List<FlinkSourceSplit<T>> splits, int subtaskId) {
-    LOG.info("Adding splits {} back from subtask {}", splits, subtaskId);
-    List<FlinkSourceSplit<T>> splitsForSubtask =
-        pendingSplits.computeIfAbsent(subtaskId, ignored -> new ArrayList<>());
-    splitsForSubtask.addAll(splits);
+    // reshuffle splits, needed after rescaling

Review Comment:
   👍 would it make sense to provide additional reasoning?



##########
runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/FlinkSourceSplitEnumerator.java:
##########
@@ -63,45 +63,65 @@ public FlinkSourceSplitEnumerator(
       Source<T> beamSource,
       PipelineOptions pipelineOptions,
       int numSplits) {
+
+    this(context, beamSource, pipelineOptions, numSplits, false);
+  }
+
+  public FlinkSourceSplitEnumerator(
+      SplitEnumeratorContext<FlinkSourceSplit<T>> context,
+      Source<T> beamSource,
+      PipelineOptions pipelineOptions,
+      int numSplits,
+      boolean splitsInitialized) {
+
     this.context = context;
     this.beamSource = beamSource;
     this.pipelineOptions = pipelineOptions;
     this.numSplits = numSplits;
     this.pendingSplits = new HashMap<>(numSplits);
-    this.splitsInitialized = false;
+    this.splitsInitialized = splitsInitialized;
+
+    LOG.info(
+        "Created new enumerator with parallelism {}, source {}, numSplits {}, 
initialized {}",
+        context.currentParallelism(),
+        beamSource,
+        numSplits,
+        splitsInitialized);
   }
 
   @Override
   public void start() {
-    context.callAsync(
-        () -> {
-          try {
-            LOG.info("Starting source {}", beamSource);
-            List<? extends Source<T>> beamSplitSourceList = splitBeamSource();
-            Map<Integer, List<FlinkSourceSplit<T>>> flinkSourceSplitsList = 
new HashMap<>();
-            int i = 0;
-            for (Source<T> beamSplitSource : beamSplitSourceList) {
-              int targetSubtask = i % context.currentParallelism();
-              List<FlinkSourceSplit<T>> splitsForTask =
-                  flinkSourceSplitsList.computeIfAbsent(
-                      targetSubtask, ignored -> new ArrayList<>());
-              splitsForTask.add(new FlinkSourceSplit<>(i, beamSplitSource));
-              i++;
+    if (!splitsInitialized) {

Review Comment:
   WDYT about minimizing the changset and minizing nesting by doing
   
   ```
     @Override
     public void start() {
       if (!splitsInitialized) {
         initializeSplits()
       }
     }
   ```
   
   ?



##########
runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/FlinkSourceSplitEnumerator.java:
##########
@@ -111,10 +131,19 @@ public void handleSplitRequest(int subtaskId, @Nullable 
String requesterHostname
 
   @Override
   public void addSplitsBack(List<FlinkSourceSplit<T>> splits, int subtaskId) {
-    LOG.info("Adding splits {} back from subtask {}", splits, subtaskId);
-    List<FlinkSourceSplit<T>> splitsForSubtask =
-        pendingSplits.computeIfAbsent(subtaskId, ignored -> new ArrayList<>());
-    splitsForSubtask.addAll(splits);
+    // reshuffle splits, needed after rescaling
+    splits.forEach(
+        split -> {
+          int target = split.splitIndex() % context.currentParallelism();
+          List<FlinkSourceSplit<T>> splitsForSubtask =
+              pendingSplits.computeIfAbsent(target, ignored -> new 
ArrayList<>());
+          splitsForSubtask.add(split);
+        });
+    LOG.info(

Review Comment:
   ```suggestion
       LOG.debug(
   ```



##########
runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/FlinkSourceSplitEnumerator.java:
##########
@@ -63,45 +63,65 @@ public FlinkSourceSplitEnumerator(
       Source<T> beamSource,
       PipelineOptions pipelineOptions,
       int numSplits) {
+
+    this(context, beamSource, pipelineOptions, numSplits, false);
+  }
+
+  public FlinkSourceSplitEnumerator(
+      SplitEnumeratorContext<FlinkSourceSplit<T>> context,
+      Source<T> beamSource,
+      PipelineOptions pipelineOptions,
+      int numSplits,
+      boolean splitsInitialized) {
+
     this.context = context;
     this.beamSource = beamSource;
     this.pipelineOptions = pipelineOptions;
     this.numSplits = numSplits;
     this.pendingSplits = new HashMap<>(numSplits);
-    this.splitsInitialized = false;
+    this.splitsInitialized = splitsInitialized;
+
+    LOG.info(
+        "Created new enumerator with parallelism {}, source {}, numSplits {}, 
initialized {}",
+        context.currentParallelism(),
+        beamSource,
+        numSplits,
+        splitsInitialized);
   }
 
   @Override
   public void start() {
-    context.callAsync(
-        () -> {
-          try {
-            LOG.info("Starting source {}", beamSource);
-            List<? extends Source<T>> beamSplitSourceList = splitBeamSource();
-            Map<Integer, List<FlinkSourceSplit<T>>> flinkSourceSplitsList = 
new HashMap<>();
-            int i = 0;
-            for (Source<T> beamSplitSource : beamSplitSourceList) {
-              int targetSubtask = i % context.currentParallelism();
-              List<FlinkSourceSplit<T>> splitsForTask =
-                  flinkSourceSplitsList.computeIfAbsent(
-                      targetSubtask, ignored -> new ArrayList<>());
-              splitsForTask.add(new FlinkSourceSplit<>(i, beamSplitSource));
-              i++;
+    if (!splitsInitialized) {

Review Comment:
   hmm, is this correct? are splits guaranteed to be stable over time? is it 
possible that you for example want to re-list kafka partitions? How are new 
partitions handled in Beam?



##########
runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/FlinkSourceSplitEnumerator.java:
##########
@@ -63,45 +63,65 @@ public FlinkSourceSplitEnumerator(
       Source<T> beamSource,
       PipelineOptions pipelineOptions,
       int numSplits) {
+
+    this(context, beamSource, pipelineOptions, numSplits, false);
+  }
+
+  public FlinkSourceSplitEnumerator(
+      SplitEnumeratorContext<FlinkSourceSplit<T>> context,
+      Source<T> beamSource,
+      PipelineOptions pipelineOptions,
+      int numSplits,
+      boolean splitsInitialized) {
+
     this.context = context;
     this.beamSource = beamSource;
     this.pipelineOptions = pipelineOptions;
     this.numSplits = numSplits;
     this.pendingSplits = new HashMap<>(numSplits);
-    this.splitsInitialized = false;
+    this.splitsInitialized = splitsInitialized;
+
+    LOG.info(

Review Comment:
   debug?



##########
runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/FlinkSource.java:
##########
@@ -117,7 +103,8 @@ public Boundedness getBoundedness() {
 
   @Override
   public SplitEnumerator<FlinkSourceSplit<T>, Map<Integer, 
List<FlinkSourceSplit<T>>>>
-      createEnumerator(SplitEnumeratorContext<FlinkSourceSplit<T>> 
enumContext) throws Exception {
+      createEnumerator(SplitEnumeratorContext<FlinkSourceSplit<T>> 
enumContext) {

Review Comment:
   nit: unrelated change -> separate commit



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to