This is an automated email from the ASF dual-hosted git repository.

pvary pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/main by this push:
     new 84c9125173 Flink: Backport #10548 to v1.18 and v1.17 (#10776)
84c9125173 is described below

commit 84c91251738cb86f741952bd1b23daa45c80d2aa
Author: Venkata krishnan Sowrirajan <[email protected]>
AuthorDate: Thu Aug 1 00:22:51 2024 -0700

    Flink: Backport #10548 to v1.18 and v1.17 (#10776)
---
 .../flink/source/enumerator/AbstractIcebergEnumerator.java   | 11 ++++++++++-
 .../flink/source/TestIcebergSpeculativeExecutionSupport.java | 12 ++++++------
 .../flink/source/enumerator/AbstractIcebergEnumerator.java   | 11 ++++++++++-
 .../flink/source/TestIcebergSpeculativeExecutionSupport.java | 12 ++++++------
 .../flink/source/TestIcebergSpeculativeExecutionSupport.java |  2 +-
 5 files changed, 33 insertions(+), 15 deletions(-)

diff --git 
a/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/AbstractIcebergEnumerator.java
 
b/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/AbstractIcebergEnumerator.java
index 3aca390755..801baf77a6 100644
--- 
a/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/AbstractIcebergEnumerator.java
+++ 
b/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/AbstractIcebergEnumerator.java
@@ -29,6 +29,7 @@ import javax.annotation.Nullable;
 import org.apache.flink.api.connector.source.SourceEvent;
 import org.apache.flink.api.connector.source.SplitEnumerator;
 import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+import 
org.apache.flink.api.connector.source.SupportsHandleExecutionAttemptSourceEvent;
 import org.apache.iceberg.flink.source.assigner.GetSplitResult;
 import org.apache.iceberg.flink.source.assigner.SplitAssigner;
 import org.apache.iceberg.flink.source.split.IcebergSourceSplit;
@@ -41,7 +42,8 @@ import org.slf4j.LoggerFactory;
  * resolved
  */
 abstract class AbstractIcebergEnumerator
-    implements SplitEnumerator<IcebergSourceSplit, IcebergEnumeratorState> {
+    implements SplitEnumerator<IcebergSourceSplit, IcebergEnumeratorState>,
+        SupportsHandleExecutionAttemptSourceEvent {
   private static final Logger LOG = 
LoggerFactory.getLogger(AbstractIcebergEnumerator.class);
 
   private final SplitEnumeratorContext<IcebergSourceSplit> enumeratorContext;
@@ -93,6 +95,13 @@ abstract class AbstractIcebergEnumerator
     }
   }
 
+  // Flink's SourceCoordinator already keeps track of subTask to splits 
mapping.
+  // It already takes care of re-assigning splits to speculated attempts as 
well.
+  @Override
+  public void handleSourceEvent(int subTaskId, int attemptNumber, SourceEvent 
sourceEvent) {
+    handleSourceEvent(subTaskId, sourceEvent);
+  }
+
   @Override
   public void addSplitsBack(List<IcebergSourceSplit> splits, int subtaskId) {
     LOG.info("Add {} splits back to the pool for failed subtask {}", 
splits.size(), subtaskId);
diff --git 
a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSpeculativeExecutionSupport.java
 
b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSpeculativeExecutionSupport.java
similarity index 94%
copy from 
flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSpeculativeExecutionSupport.java
copy to 
flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSpeculativeExecutionSupport.java
index b21010a91b..95d0b90b6c 100644
--- 
a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSpeculativeExecutionSupport.java
+++ 
b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSpeculativeExecutionSupport.java
@@ -55,7 +55,7 @@ public class TestIcebergSpeculativeExecutionSupport extends 
TestBase {
   private static final int NUM_TASK_SLOTS = 3;
 
   @RegisterExtension
-  public static final MiniClusterExtension MINI_CLUSTER_EXTENSION =
+  public static MiniClusterExtension miniClusterResource =
       new MiniClusterExtension(
           new MiniClusterResourceConfiguration.Builder()
               .setNumberTaskManagers(NUM_TASK_MANAGERS)
@@ -103,7 +103,7 @@ public class TestIcebergSpeculativeExecutionSupport extends 
TestBase {
   public void after() {
     sql("DROP TABLE IF EXISTS %s.%s", DATABASE_NAME, INPUT_TABLE_NAME);
     sql("DROP TABLE IF EXISTS %s.%s", DATABASE_NAME, OUTPUT_TABLE_NAME);
-    dropDatabase(DATABASE_NAME, true);
+    sql("DROP DATABASE %s", DATABASE_NAME);
     dropCatalog(CATALOG_NAME, true);
   }
 
@@ -144,9 +144,9 @@ public class TestIcebergSpeculativeExecutionSupport extends 
TestBase {
   private static class TestingMap extends RichMapFunction<Row, Row> {
     @Override
     public Row map(Row row) throws Exception {
-      // Put the even subtask indices with the first attempt to sleep to 
trigger speculative
+      // Put the subtasks with the first attempt to sleep to trigger 
speculative
       // execution
-      if (getRuntimeContext().getTaskInfo().getAttemptNumber() <= 0) {
+      if (getRuntimeContext().getAttemptNumber() <= 0) {
         Thread.sleep(Integer.MAX_VALUE);
       }
 
@@ -154,8 +154,8 @@ public class TestIcebergSpeculativeExecutionSupport extends 
TestBase {
           Row.of(
               row.getField(0),
               row.getField(1),
-              getRuntimeContext().getTaskInfo().getIndexOfThisSubtask(),
-              getRuntimeContext().getTaskInfo().getAttemptNumber());
+              getRuntimeContext().getIndexOfThisSubtask(),
+              getRuntimeContext().getAttemptNumber());
 
       return output;
     }
diff --git 
a/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/AbstractIcebergEnumerator.java
 
b/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/AbstractIcebergEnumerator.java
index 6c9a855bc1..280a126a46 100644
--- 
a/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/AbstractIcebergEnumerator.java
+++ 
b/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/AbstractIcebergEnumerator.java
@@ -29,6 +29,7 @@ import javax.annotation.Nullable;
 import org.apache.flink.api.connector.source.SourceEvent;
 import org.apache.flink.api.connector.source.SplitEnumerator;
 import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+import 
org.apache.flink.api.connector.source.SupportsHandleExecutionAttemptSourceEvent;
 import org.apache.iceberg.flink.source.assigner.GetSplitResult;
 import org.apache.iceberg.flink.source.assigner.SplitAssigner;
 import org.apache.iceberg.flink.source.split.IcebergSourceSplit;
@@ -37,7 +38,8 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 abstract class AbstractIcebergEnumerator
-    implements SplitEnumerator<IcebergSourceSplit, IcebergEnumeratorState> {
+    implements SplitEnumerator<IcebergSourceSplit, IcebergEnumeratorState>,
+        SupportsHandleExecutionAttemptSourceEvent {
   private static final Logger LOG = 
LoggerFactory.getLogger(AbstractIcebergEnumerator.class);
 
   private final SplitEnumeratorContext<IcebergSourceSplit> enumeratorContext;
@@ -95,6 +97,13 @@ abstract class AbstractIcebergEnumerator
     }
   }
 
+  // Flink's SourceCoordinator already keeps track of subTask to splits 
mapping.
+  // It already takes care of re-assigning splits to speculated attempts as 
well.
+  @Override
+  public void handleSourceEvent(int subTaskId, int attemptNumber, SourceEvent 
sourceEvent) {
+    handleSourceEvent(subTaskId, sourceEvent);
+  }
+
   @Override
   public void addSplitsBack(List<IcebergSourceSplit> splits, int subtaskId) {
     LOG.info("Add {} splits back to the pool for failed subtask {}", 
splits.size(), subtaskId);
diff --git 
a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSpeculativeExecutionSupport.java
 
b/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSpeculativeExecutionSupport.java
similarity index 94%
copy from 
flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSpeculativeExecutionSupport.java
copy to 
flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSpeculativeExecutionSupport.java
index b21010a91b..95d0b90b6c 100644
--- 
a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSpeculativeExecutionSupport.java
+++ 
b/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSpeculativeExecutionSupport.java
@@ -55,7 +55,7 @@ public class TestIcebergSpeculativeExecutionSupport extends 
TestBase {
   private static final int NUM_TASK_SLOTS = 3;
 
   @RegisterExtension
-  public static final MiniClusterExtension MINI_CLUSTER_EXTENSION =
+  public static MiniClusterExtension miniClusterResource =
       new MiniClusterExtension(
           new MiniClusterResourceConfiguration.Builder()
               .setNumberTaskManagers(NUM_TASK_MANAGERS)
@@ -103,7 +103,7 @@ public class TestIcebergSpeculativeExecutionSupport extends 
TestBase {
   public void after() {
     sql("DROP TABLE IF EXISTS %s.%s", DATABASE_NAME, INPUT_TABLE_NAME);
     sql("DROP TABLE IF EXISTS %s.%s", DATABASE_NAME, OUTPUT_TABLE_NAME);
-    dropDatabase(DATABASE_NAME, true);
+    sql("DROP DATABASE %s", DATABASE_NAME);
     dropCatalog(CATALOG_NAME, true);
   }
 
@@ -144,9 +144,9 @@ public class TestIcebergSpeculativeExecutionSupport extends 
TestBase {
   private static class TestingMap extends RichMapFunction<Row, Row> {
     @Override
     public Row map(Row row) throws Exception {
-      // Put the even subtask indices with the first attempt to sleep to 
trigger speculative
+      // Put the subtasks with the first attempt to sleep to trigger 
speculative
       // execution
-      if (getRuntimeContext().getTaskInfo().getAttemptNumber() <= 0) {
+      if (getRuntimeContext().getAttemptNumber() <= 0) {
         Thread.sleep(Integer.MAX_VALUE);
       }
 
@@ -154,8 +154,8 @@ public class TestIcebergSpeculativeExecutionSupport extends 
TestBase {
           Row.of(
               row.getField(0),
               row.getField(1),
-              getRuntimeContext().getTaskInfo().getIndexOfThisSubtask(),
-              getRuntimeContext().getTaskInfo().getAttemptNumber());
+              getRuntimeContext().getIndexOfThisSubtask(),
+              getRuntimeContext().getAttemptNumber());
 
       return output;
     }
diff --git 
a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSpeculativeExecutionSupport.java
 
b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSpeculativeExecutionSupport.java
index b21010a91b..41b023b936 100644
--- 
a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSpeculativeExecutionSupport.java
+++ 
b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSpeculativeExecutionSupport.java
@@ -144,7 +144,7 @@ public class TestIcebergSpeculativeExecutionSupport extends 
TestBase {
   private static class TestingMap extends RichMapFunction<Row, Row> {
     @Override
     public Row map(Row row) throws Exception {
-      // Put the even subtask indices with the first attempt to sleep to 
trigger speculative
+      // Put the subtasks with the first attempt to sleep to trigger 
speculative
       // execution
       if (getRuntimeContext().getTaskInfo().getAttemptNumber() <= 0) {
         Thread.sleep(Integer.MAX_VALUE);

Reply via email to