This is an automated email from the ASF dual-hosted git repository.
pvary pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/main by this push:
new 84c9125173 Flink: Backport #10548 to v1.18 and v1.17 (#10776)
84c9125173 is described below
commit 84c91251738cb86f741952bd1b23daa45c80d2aa
Author: Venkata krishnan Sowrirajan <[email protected]>
AuthorDate: Thu Aug 1 00:22:51 2024 -0700
Flink: Backport #10548 to v1.18 and v1.17 (#10776)
---
.../flink/source/enumerator/AbstractIcebergEnumerator.java | 11 ++++++++++-
.../flink/source/TestIcebergSpeculativeExecutionSupport.java | 12 ++++++------
.../flink/source/enumerator/AbstractIcebergEnumerator.java | 11 ++++++++++-
.../flink/source/TestIcebergSpeculativeExecutionSupport.java | 12 ++++++------
.../flink/source/TestIcebergSpeculativeExecutionSupport.java | 2 +-
5 files changed, 33 insertions(+), 15 deletions(-)
diff --git
a/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/AbstractIcebergEnumerator.java
b/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/AbstractIcebergEnumerator.java
index 3aca390755..801baf77a6 100644
---
a/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/AbstractIcebergEnumerator.java
+++
b/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/AbstractIcebergEnumerator.java
@@ -29,6 +29,7 @@ import javax.annotation.Nullable;
import org.apache.flink.api.connector.source.SourceEvent;
import org.apache.flink.api.connector.source.SplitEnumerator;
import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+import
org.apache.flink.api.connector.source.SupportsHandleExecutionAttemptSourceEvent;
import org.apache.iceberg.flink.source.assigner.GetSplitResult;
import org.apache.iceberg.flink.source.assigner.SplitAssigner;
import org.apache.iceberg.flink.source.split.IcebergSourceSplit;
@@ -41,7 +42,8 @@ import org.slf4j.LoggerFactory;
* resolved
*/
abstract class AbstractIcebergEnumerator
- implements SplitEnumerator<IcebergSourceSplit, IcebergEnumeratorState> {
+ implements SplitEnumerator<IcebergSourceSplit, IcebergEnumeratorState>,
+ SupportsHandleExecutionAttemptSourceEvent {
private static final Logger LOG =
LoggerFactory.getLogger(AbstractIcebergEnumerator.class);
private final SplitEnumeratorContext<IcebergSourceSplit> enumeratorContext;
@@ -93,6 +95,13 @@ abstract class AbstractIcebergEnumerator
}
}
+ // Flink's SourceCoordinator already keeps track of subTask to splits
mapping.
+ // It already takes care of re-assigning splits to speculated attempts as
well.
+ @Override
+ public void handleSourceEvent(int subTaskId, int attemptNumber, SourceEvent
sourceEvent) {
+ handleSourceEvent(subTaskId, sourceEvent);
+ }
+
@Override
public void addSplitsBack(List<IcebergSourceSplit> splits, int subtaskId) {
LOG.info("Add {} splits back to the pool for failed subtask {}",
splits.size(), subtaskId);
diff --git
a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSpeculativeExecutionSupport.java
b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSpeculativeExecutionSupport.java
similarity index 94%
copy from
flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSpeculativeExecutionSupport.java
copy to
flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSpeculativeExecutionSupport.java
index b21010a91b..95d0b90b6c 100644
---
a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSpeculativeExecutionSupport.java
+++
b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSpeculativeExecutionSupport.java
@@ -55,7 +55,7 @@ public class TestIcebergSpeculativeExecutionSupport extends
TestBase {
private static final int NUM_TASK_SLOTS = 3;
@RegisterExtension
- public static final MiniClusterExtension MINI_CLUSTER_EXTENSION =
+ public static MiniClusterExtension miniClusterResource =
new MiniClusterExtension(
new MiniClusterResourceConfiguration.Builder()
.setNumberTaskManagers(NUM_TASK_MANAGERS)
@@ -103,7 +103,7 @@ public class TestIcebergSpeculativeExecutionSupport extends
TestBase {
public void after() {
sql("DROP TABLE IF EXISTS %s.%s", DATABASE_NAME, INPUT_TABLE_NAME);
sql("DROP TABLE IF EXISTS %s.%s", DATABASE_NAME, OUTPUT_TABLE_NAME);
- dropDatabase(DATABASE_NAME, true);
+ sql("DROP DATABASE %s", DATABASE_NAME);
dropCatalog(CATALOG_NAME, true);
}
@@ -144,9 +144,9 @@ public class TestIcebergSpeculativeExecutionSupport extends
TestBase {
private static class TestingMap extends RichMapFunction<Row, Row> {
@Override
public Row map(Row row) throws Exception {
- // Put the even subtask indices with the first attempt to sleep to
trigger speculative
+ // Put the subtasks with the first attempt to sleep to trigger
speculative
// execution
- if (getRuntimeContext().getTaskInfo().getAttemptNumber() <= 0) {
+ if (getRuntimeContext().getAttemptNumber() <= 0) {
Thread.sleep(Integer.MAX_VALUE);
}
@@ -154,8 +154,8 @@ public class TestIcebergSpeculativeExecutionSupport extends
TestBase {
Row.of(
row.getField(0),
row.getField(1),
- getRuntimeContext().getTaskInfo().getIndexOfThisSubtask(),
- getRuntimeContext().getTaskInfo().getAttemptNumber());
+ getRuntimeContext().getIndexOfThisSubtask(),
+ getRuntimeContext().getAttemptNumber());
return output;
}
diff --git
a/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/AbstractIcebergEnumerator.java
b/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/AbstractIcebergEnumerator.java
index 6c9a855bc1..280a126a46 100644
---
a/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/AbstractIcebergEnumerator.java
+++
b/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/AbstractIcebergEnumerator.java
@@ -29,6 +29,7 @@ import javax.annotation.Nullable;
import org.apache.flink.api.connector.source.SourceEvent;
import org.apache.flink.api.connector.source.SplitEnumerator;
import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+import
org.apache.flink.api.connector.source.SupportsHandleExecutionAttemptSourceEvent;
import org.apache.iceberg.flink.source.assigner.GetSplitResult;
import org.apache.iceberg.flink.source.assigner.SplitAssigner;
import org.apache.iceberg.flink.source.split.IcebergSourceSplit;
@@ -37,7 +38,8 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
abstract class AbstractIcebergEnumerator
- implements SplitEnumerator<IcebergSourceSplit, IcebergEnumeratorState> {
+ implements SplitEnumerator<IcebergSourceSplit, IcebergEnumeratorState>,
+ SupportsHandleExecutionAttemptSourceEvent {
private static final Logger LOG =
LoggerFactory.getLogger(AbstractIcebergEnumerator.class);
private final SplitEnumeratorContext<IcebergSourceSplit> enumeratorContext;
@@ -95,6 +97,13 @@ abstract class AbstractIcebergEnumerator
}
}
+ // Flink's SourceCoordinator already keeps track of subTask to splits
mapping.
+ // It already takes care of re-assigning splits to speculated attempts as
well.
+ @Override
+ public void handleSourceEvent(int subTaskId, int attemptNumber, SourceEvent
sourceEvent) {
+ handleSourceEvent(subTaskId, sourceEvent);
+ }
+
@Override
public void addSplitsBack(List<IcebergSourceSplit> splits, int subtaskId) {
LOG.info("Add {} splits back to the pool for failed subtask {}",
splits.size(), subtaskId);
diff --git
a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSpeculativeExecutionSupport.java
b/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSpeculativeExecutionSupport.java
similarity index 94%
copy from
flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSpeculativeExecutionSupport.java
copy to
flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSpeculativeExecutionSupport.java
index b21010a91b..95d0b90b6c 100644
---
a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSpeculativeExecutionSupport.java
+++
b/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSpeculativeExecutionSupport.java
@@ -55,7 +55,7 @@ public class TestIcebergSpeculativeExecutionSupport extends
TestBase {
private static final int NUM_TASK_SLOTS = 3;
@RegisterExtension
- public static final MiniClusterExtension MINI_CLUSTER_EXTENSION =
+ public static MiniClusterExtension miniClusterResource =
new MiniClusterExtension(
new MiniClusterResourceConfiguration.Builder()
.setNumberTaskManagers(NUM_TASK_MANAGERS)
@@ -103,7 +103,7 @@ public class TestIcebergSpeculativeExecutionSupport extends
TestBase {
public void after() {
sql("DROP TABLE IF EXISTS %s.%s", DATABASE_NAME, INPUT_TABLE_NAME);
sql("DROP TABLE IF EXISTS %s.%s", DATABASE_NAME, OUTPUT_TABLE_NAME);
- dropDatabase(DATABASE_NAME, true);
+ sql("DROP DATABASE %s", DATABASE_NAME);
dropCatalog(CATALOG_NAME, true);
}
@@ -144,9 +144,9 @@ public class TestIcebergSpeculativeExecutionSupport extends
TestBase {
private static class TestingMap extends RichMapFunction<Row, Row> {
@Override
public Row map(Row row) throws Exception {
- // Put the even subtask indices with the first attempt to sleep to
trigger speculative
+ // Put the subtasks with the first attempt to sleep to trigger
speculative
// execution
- if (getRuntimeContext().getTaskInfo().getAttemptNumber() <= 0) {
+ if (getRuntimeContext().getAttemptNumber() <= 0) {
Thread.sleep(Integer.MAX_VALUE);
}
@@ -154,8 +154,8 @@ public class TestIcebergSpeculativeExecutionSupport extends
TestBase {
Row.of(
row.getField(0),
row.getField(1),
- getRuntimeContext().getTaskInfo().getIndexOfThisSubtask(),
- getRuntimeContext().getTaskInfo().getAttemptNumber());
+ getRuntimeContext().getIndexOfThisSubtask(),
+ getRuntimeContext().getAttemptNumber());
return output;
}
diff --git
a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSpeculativeExecutionSupport.java
b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSpeculativeExecutionSupport.java
index b21010a91b..41b023b936 100644
---
a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSpeculativeExecutionSupport.java
+++
b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSpeculativeExecutionSupport.java
@@ -144,7 +144,7 @@ public class TestIcebergSpeculativeExecutionSupport extends
TestBase {
private static class TestingMap extends RichMapFunction<Row, Row> {
@Override
public Row map(Row row) throws Exception {
- // Put the even subtask indices with the first attempt to sleep to
trigger speculative
+ // Put the subtasks with the first attempt to sleep to trigger
speculative
// execution
if (getRuntimeContext().getTaskInfo().getAttemptNumber() <= 0) {
Thread.sleep(Integer.MAX_VALUE);