This is an automated email from the ASF dual-hosted git repository. pvary pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/main by this push: new bfa9172525 Flink: Backport: Adds support for SpeculativeExecution for IcebergSink (#13663) bfa9172525 is described below commit bfa9172525fa48acfc46a2fab18055e2fb99e90b Author: Rodrigo <rmenesespinil...@apple.com> AuthorDate: Thu Jul 24 13:05:10 2025 -0700 Flink: Backport: Adds support for SpeculativeExecution for IcebergSink (#13663) backport of #13642 --- .../org/apache/iceberg/flink/sink/IcebergSink.java | 4 +++- .../source/TestIcebergSpeculativeExecutionSupport.java | 18 ++++++++++++++++-- .../org/apache/iceberg/flink/sink/IcebergSink.java | 4 +++- .../source/TestIcebergSpeculativeExecutionSupport.java | 18 ++++++++++++++++-- 4 files changed, 38 insertions(+), 6 deletions(-) diff --git a/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergSink.java b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergSink.java index 98ab51569a..0ea0232278 100644 --- a/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergSink.java +++ b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergSink.java @@ -30,6 +30,7 @@ import java.util.Set; import java.util.UUID; import java.util.function.Function; import org.apache.flink.annotation.Experimental; +import org.apache.flink.api.common.SupportsConcurrentExecutionAttempts; import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.typeinfo.TypeInformation; @@ -138,7 +139,8 @@ public class IcebergSink SupportsPreWriteTopology<RowData>, SupportsCommitter<IcebergCommittable>, SupportsPreCommitTopology<WriteResult, IcebergCommittable>, - SupportsPostCommitTopology<IcebergCommittable> { + SupportsPostCommitTopology<IcebergCommittable>, + SupportsConcurrentExecutionAttempts { private static final Logger LOG = LoggerFactory.getLogger(IcebergSink.class); private final TableLoader tableLoader; private final Map<String, String> snapshotProperties; diff --git a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSpeculativeExecutionSupport.java b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSpeculativeExecutionSupport.java index 05a08c24d8..7bd98c69ff 100644 --- a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSpeculativeExecutionSupport.java +++ b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSpeculativeExecutionSupport.java @@ -44,12 +44,16 @@ import org.apache.flink.table.api.TableEnvironment; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import org.apache.flink.test.junit5.MiniClusterExtension; import org.apache.flink.types.Row; +import org.apache.iceberg.Parameter; +import org.apache.iceberg.ParameterizedTestExtension; +import org.apache.iceberg.Parameters; import org.apache.iceberg.flink.FlinkConfigOptions; import org.apache.iceberg.flink.TestBase; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestTemplate; import org.junit.jupiter.api.Timeout; +import org.junit.jupiter.api.extension.ExtendWith; import org.junit.jupiter.api.extension.RegisterExtension; /** @@ -57,6 +61,7 @@ import org.junit.jupiter.api.extension.RegisterExtension; * anything goes wrong unexpectedly. */ @Timeout(value = 60) +@ExtendWith(ParameterizedTestExtension.class) public class TestIcebergSpeculativeExecutionSupport extends TestBase { private static final int NUM_TASK_MANAGERS = 1; private static final int NUM_TASK_SLOTS = 3; @@ -90,6 +95,14 @@ public class TestIcebergSpeculativeExecutionSupport extends TestBase { return tEnv; } + @Parameter(index = 0) + private boolean useV2Sink; + + @Parameters(name = "useV2Sink = {0}") + public static Object[][] parameters() { + return new Object[][] {{true}, {false}}; + } + @BeforeEach public void before() throws IOException { String warehouse = @@ -114,8 +127,9 @@ public class TestIcebergSpeculativeExecutionSupport extends TestBase { dropCatalog(CATALOG_NAME, true); } - @Test + @TestTemplate public void testSpeculativeExecution() throws Exception { + tEnv.getConfig().set("table.exec.iceberg.use-v2-sink", String.valueOf(useV2Sink)); Table table = tEnv.sqlQuery(String.format("SELECT * FROM %s.%s", DATABASE_NAME, INPUT_TABLE_NAME)); DataStream<Row> slowStream = diff --git a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergSink.java b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergSink.java index 2cd8653203..f2d3e1fe44 100644 --- a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergSink.java +++ b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergSink.java @@ -30,6 +30,7 @@ import java.util.Set; import java.util.UUID; import java.util.function.Function; import org.apache.flink.annotation.Experimental; +import org.apache.flink.api.common.SupportsConcurrentExecutionAttempts; import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.typeinfo.TypeInformation; @@ -138,7 +139,8 @@ public class IcebergSink SupportsPreWriteTopology<RowData>, SupportsCommitter<IcebergCommittable>, SupportsPreCommitTopology<WriteResult, IcebergCommittable>, - SupportsPostCommitTopology<IcebergCommittable> { + SupportsPostCommitTopology<IcebergCommittable>, + SupportsConcurrentExecutionAttempts { private static final Logger LOG = LoggerFactory.getLogger(IcebergSink.class); private final TableLoader tableLoader; private final Map<String, String> snapshotProperties; diff --git a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSpeculativeExecutionSupport.java b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSpeculativeExecutionSupport.java index f0d083060c..61a587e778 100644 --- a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSpeculativeExecutionSupport.java +++ b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSpeculativeExecutionSupport.java @@ -44,12 +44,16 @@ import org.apache.flink.table.api.TableEnvironment; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import org.apache.flink.test.junit5.MiniClusterExtension; import org.apache.flink.types.Row; +import org.apache.iceberg.Parameter; +import org.apache.iceberg.ParameterizedTestExtension; +import org.apache.iceberg.Parameters; import org.apache.iceberg.flink.FlinkConfigOptions; import org.apache.iceberg.flink.TestBase; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestTemplate; import org.junit.jupiter.api.Timeout; +import org.junit.jupiter.api.extension.ExtendWith; import org.junit.jupiter.api.extension.RegisterExtension; /** @@ -57,6 +61,7 @@ import org.junit.jupiter.api.extension.RegisterExtension; * anything goes wrong unexpectedly. */ @Timeout(value = 60) +@ExtendWith(ParameterizedTestExtension.class) public class TestIcebergSpeculativeExecutionSupport extends TestBase { private static final int NUM_TASK_MANAGERS = 1; private static final int NUM_TASK_SLOTS = 3; @@ -90,6 +95,14 @@ public class TestIcebergSpeculativeExecutionSupport extends TestBase { return tEnv; } + @Parameter(index = 0) + private boolean useV2Sink; + + @Parameters(name = "useV2Sink = {0}") + public static Object[][] parameters() { + return new Object[][] {{true}, {false}}; + } + @BeforeEach public void before() throws IOException { String warehouse = @@ -114,8 +127,9 @@ public class TestIcebergSpeculativeExecutionSupport extends TestBase { dropCatalog(CATALOG_NAME, true); } - @Test + @TestTemplate public void testSpeculativeExecution() throws Exception { + tEnv.getConfig().set("table.exec.iceberg.use-v2-sink", String.valueOf(useV2Sink)); Table table = tEnv.sqlQuery(String.format("SELECT * FROM %s.%s", DATABASE_NAME, INPUT_TABLE_NAME)); DataStream<Row> slowStream =