This is an automated email from the ASF dual-hosted git repository.

pvary pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/main by this push:
     new bfa9172525 Flink: Backport: Adds support for SpeculativeExecution for 
IcebergSink (#13663)
bfa9172525 is described below

commit bfa9172525fa48acfc46a2fab18055e2fb99e90b
Author: Rodrigo <rmenesespinil...@apple.com>
AuthorDate: Thu Jul 24 13:05:10 2025 -0700

    Flink: Backport: Adds support for SpeculativeExecution for IcebergSink 
(#13663)
    
    backport of #13642
---
 .../org/apache/iceberg/flink/sink/IcebergSink.java     |  4 +++-
 .../source/TestIcebergSpeculativeExecutionSupport.java | 18 ++++++++++++++++--
 .../org/apache/iceberg/flink/sink/IcebergSink.java     |  4 +++-
 .../source/TestIcebergSpeculativeExecutionSupport.java | 18 ++++++++++++++++--
 4 files changed, 38 insertions(+), 6 deletions(-)

diff --git 
a/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergSink.java
 
b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergSink.java
index 98ab51569a..0ea0232278 100644
--- 
a/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergSink.java
+++ 
b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergSink.java
@@ -30,6 +30,7 @@ import java.util.Set;
 import java.util.UUID;
 import java.util.function.Function;
 import org.apache.flink.annotation.Experimental;
+import org.apache.flink.api.common.SupportsConcurrentExecutionAttempts;
 import org.apache.flink.api.common.functions.FlatMapFunction;
 import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
@@ -138,7 +139,8 @@ public class IcebergSink
         SupportsPreWriteTopology<RowData>,
         SupportsCommitter<IcebergCommittable>,
         SupportsPreCommitTopology<WriteResult, IcebergCommittable>,
-        SupportsPostCommitTopology<IcebergCommittable> {
+        SupportsPostCommitTopology<IcebergCommittable>,
+        SupportsConcurrentExecutionAttempts {
   private static final Logger LOG = LoggerFactory.getLogger(IcebergSink.class);
   private final TableLoader tableLoader;
   private final Map<String, String> snapshotProperties;
diff --git 
a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSpeculativeExecutionSupport.java
 
b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSpeculativeExecutionSupport.java
index 05a08c24d8..7bd98c69ff 100644
--- 
a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSpeculativeExecutionSupport.java
+++ 
b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSpeculativeExecutionSupport.java
@@ -44,12 +44,16 @@ import org.apache.flink.table.api.TableEnvironment;
 import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
 import org.apache.flink.test.junit5.MiniClusterExtension;
 import org.apache.flink.types.Row;
+import org.apache.iceberg.Parameter;
+import org.apache.iceberg.ParameterizedTestExtension;
+import org.apache.iceberg.Parameters;
 import org.apache.iceberg.flink.FlinkConfigOptions;
 import org.apache.iceberg.flink.TestBase;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestTemplate;
 import org.junit.jupiter.api.Timeout;
+import org.junit.jupiter.api.extension.ExtendWith;
 import org.junit.jupiter.api.extension.RegisterExtension;
 
 /**
@@ -57,6 +61,7 @@ import org.junit.jupiter.api.extension.RegisterExtension;
  * anything goes wrong unexpectedly.
  */
 @Timeout(value = 60)
+@ExtendWith(ParameterizedTestExtension.class)
 public class TestIcebergSpeculativeExecutionSupport extends TestBase {
   private static final int NUM_TASK_MANAGERS = 1;
   private static final int NUM_TASK_SLOTS = 3;
@@ -90,6 +95,14 @@ public class TestIcebergSpeculativeExecutionSupport extends 
TestBase {
     return tEnv;
   }
 
+  @Parameter(index = 0)
+  private boolean useV2Sink;
+
+  @Parameters(name = "useV2Sink = {0}")
+  public static Object[][] parameters() {
+    return new Object[][] {{true}, {false}};
+  }
+
   @BeforeEach
   public void before() throws IOException {
     String warehouse =
@@ -114,8 +127,9 @@ public class TestIcebergSpeculativeExecutionSupport extends 
TestBase {
     dropCatalog(CATALOG_NAME, true);
   }
 
-  @Test
+  @TestTemplate
   public void testSpeculativeExecution() throws Exception {
+    tEnv.getConfig().set("table.exec.iceberg.use-v2-sink", 
String.valueOf(useV2Sink));
     Table table =
         tEnv.sqlQuery(String.format("SELECT * FROM %s.%s", DATABASE_NAME, 
INPUT_TABLE_NAME));
     DataStream<Row> slowStream =
diff --git 
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergSink.java
 
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergSink.java
index 2cd8653203..f2d3e1fe44 100644
--- 
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergSink.java
+++ 
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergSink.java
@@ -30,6 +30,7 @@ import java.util.Set;
 import java.util.UUID;
 import java.util.function.Function;
 import org.apache.flink.annotation.Experimental;
+import org.apache.flink.api.common.SupportsConcurrentExecutionAttempts;
 import org.apache.flink.api.common.functions.FlatMapFunction;
 import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
@@ -138,7 +139,8 @@ public class IcebergSink
         SupportsPreWriteTopology<RowData>,
         SupportsCommitter<IcebergCommittable>,
         SupportsPreCommitTopology<WriteResult, IcebergCommittable>,
-        SupportsPostCommitTopology<IcebergCommittable> {
+        SupportsPostCommitTopology<IcebergCommittable>,
+        SupportsConcurrentExecutionAttempts {
   private static final Logger LOG = LoggerFactory.getLogger(IcebergSink.class);
   private final TableLoader tableLoader;
   private final Map<String, String> snapshotProperties;
diff --git 
a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSpeculativeExecutionSupport.java
 
b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSpeculativeExecutionSupport.java
index f0d083060c..61a587e778 100644
--- 
a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSpeculativeExecutionSupport.java
+++ 
b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSpeculativeExecutionSupport.java
@@ -44,12 +44,16 @@ import org.apache.flink.table.api.TableEnvironment;
 import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
 import org.apache.flink.test.junit5.MiniClusterExtension;
 import org.apache.flink.types.Row;
+import org.apache.iceberg.Parameter;
+import org.apache.iceberg.ParameterizedTestExtension;
+import org.apache.iceberg.Parameters;
 import org.apache.iceberg.flink.FlinkConfigOptions;
 import org.apache.iceberg.flink.TestBase;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestTemplate;
 import org.junit.jupiter.api.Timeout;
+import org.junit.jupiter.api.extension.ExtendWith;
 import org.junit.jupiter.api.extension.RegisterExtension;
 
 /**
@@ -57,6 +61,7 @@ import org.junit.jupiter.api.extension.RegisterExtension;
  * anything goes wrong unexpectedly.
  */
 @Timeout(value = 60)
+@ExtendWith(ParameterizedTestExtension.class)
 public class TestIcebergSpeculativeExecutionSupport extends TestBase {
   private static final int NUM_TASK_MANAGERS = 1;
   private static final int NUM_TASK_SLOTS = 3;
@@ -90,6 +95,14 @@ public class TestIcebergSpeculativeExecutionSupport extends 
TestBase {
     return tEnv;
   }
 
+  @Parameter(index = 0)
+  private boolean useV2Sink;
+
+  @Parameters(name = "useV2Sink = {0}")
+  public static Object[][] parameters() {
+    return new Object[][] {{true}, {false}};
+  }
+
   @BeforeEach
   public void before() throws IOException {
     String warehouse =
@@ -114,8 +127,9 @@ public class TestIcebergSpeculativeExecutionSupport extends 
TestBase {
     dropCatalog(CATALOG_NAME, true);
   }
 
-  @Test
+  @TestTemplate
   public void testSpeculativeExecution() throws Exception {
+    tEnv.getConfig().set("table.exec.iceberg.use-v2-sink", 
String.valueOf(useV2Sink));
     Table table =
         tEnv.sqlQuery(String.format("SELECT * FROM %s.%s", DATABASE_NAME, 
INPUT_TABLE_NAME));
     DataStream<Row> slowStream =

Reply via email to