This is an automated email from the ASF dual-hosted git repository.
pvary pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/main by this push:
new 4d0b69beba Flink: Fix IcebergSource tableloader lifecycle management
in batch mode (#9173)
4d0b69beba is described below
commit 4d0b69beba104e6912f6e6850189121fcd23ef8a
Author: Mason Chen <[email protected]>
AuthorDate: Sat Dec 9 00:06:32 2023 -0800
Flink: Fix IcebergSource tableloader lifecycle management in batch mode
(#9173)
---
.../apache/iceberg/flink/source/IcebergSource.java | 58 ++++++++--------------
.../enumerator/ContinuousSplitPlannerImpl.java | 4 +-
2 files changed, 24 insertions(+), 38 deletions(-)
diff --git
a/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/IcebergSource.java
b/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/IcebergSource.java
index 179253cb3a..a7ce2db61f 100644
---
a/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/IcebergSource.java
+++
b/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/IcebergSource.java
@@ -81,16 +81,18 @@ import org.slf4j.LoggerFactory;
public class IcebergSource<T> implements Source<T, IcebergSourceSplit,
IcebergEnumeratorState> {
private static final Logger LOG =
LoggerFactory.getLogger(IcebergSource.class);
+ // This table loader can be closed, and it is only safe to use this instance
for resource
+ // independent information (e.g. a table name). Copies of this are required
to avoid lifecycle
+ // management conflicts with the user provided table loader. e.g. a copy of
this is required for
+ // split planning, which uses the underlying io, and should be closed after
split planning is
+ // complete.
private final TableLoader tableLoader;
private final ScanContext scanContext;
private final ReaderFunction<T> readerFunction;
private final SplitAssignerFactory assignerFactory;
private final SerializableComparator<IcebergSourceSplit> splitComparator;
private final SerializableRecordEmitter<T> emitter;
-
- // Can't use SerializableTable as enumerator needs a regular table
- // that can discover table changes
- private transient Table table;
+ private final String tableName;
IcebergSource(
TableLoader tableLoader,
@@ -100,17 +102,21 @@ public class IcebergSource<T> implements Source<T,
IcebergSourceSplit, IcebergEn
SerializableComparator<IcebergSourceSplit> splitComparator,
Table table,
SerializableRecordEmitter<T> emitter) {
+ Preconditions.checkNotNull(tableLoader, "tableLoader is required.");
+ Preconditions.checkNotNull(readerFunction, "readerFunction is required.");
+ Preconditions.checkNotNull(assignerFactory, "assignerFactory is
required.");
+ Preconditions.checkNotNull(table, "table is required.");
this.tableLoader = tableLoader;
this.scanContext = scanContext;
this.readerFunction = readerFunction;
this.assignerFactory = assignerFactory;
this.splitComparator = splitComparator;
- this.table = table;
this.emitter = emitter;
+ this.tableName = table.name();
}
String name() {
- return "IcebergSource-" + lazyTable().name();
+ return "IcebergSource-" + tableName;
}
private String planningThreadName() {
@@ -120,38 +126,26 @@ public class IcebergSource<T> implements Source<T,
IcebergSourceSplit, IcebergEn
// a public API like the protected method "OperatorCoordinator.Context
getCoordinatorContext()"
// from SourceCoordinatorContext implementation. For now, <table
name>-<random UUID> is used as
// the unique thread pool name.
- return lazyTable().name() + "-" + UUID.randomUUID();
+ return tableName + "-" + UUID.randomUUID();
}
private List<IcebergSourceSplit> planSplitsForBatch(String threadName) {
ExecutorService workerPool =
ThreadPools.newWorkerPool(threadName, scanContext.planParallelism());
- try {
+ try (TableLoader loader = tableLoader.clone()) {
+ loader.open();
List<IcebergSourceSplit> splits =
- FlinkSplitPlanner.planIcebergSourceSplits(lazyTable(), scanContext,
workerPool);
+ FlinkSplitPlanner.planIcebergSourceSplits(loader.loadTable(),
scanContext, workerPool);
LOG.info(
- "Discovered {} splits from table {} during job initialization",
- splits.size(),
- lazyTable().name());
+ "Discovered {} splits from table {} during job initialization",
splits.size(), tableName);
return splits;
+ } catch (IOException e) {
+ throw new UncheckedIOException("Failed to close table loader", e);
} finally {
workerPool.shutdown();
}
}
- private Table lazyTable() {
- if (table == null) {
- tableLoader.open();
- try (TableLoader loader = tableLoader) {
- this.table = loader.loadTable();
- } catch (IOException e) {
- throw new UncheckedIOException("Failed to close table loader", e);
- }
- }
-
- return table;
- }
-
@Override
public Boundedness getBoundedness() {
return scanContext.isStreaming() ? Boundedness.CONTINUOUS_UNBOUNDED :
Boundedness.BOUNDED;
@@ -160,7 +154,7 @@ public class IcebergSource<T> implements Source<T,
IcebergSourceSplit, IcebergEn
@Override
public SourceReader<T, IcebergSourceSplit> createReader(SourceReaderContext
readerContext) {
IcebergSourceReaderMetrics metrics =
- new IcebergSourceReaderMetrics(readerContext.metricGroup(),
lazyTable().name());
+ new IcebergSourceReaderMetrics(readerContext.metricGroup(), tableName);
return new IcebergSourceReader<>(
emitter, metrics, readerFunction, splitComparator, readerContext);
}
@@ -197,13 +191,12 @@ public class IcebergSource<T> implements Source<T,
IcebergSourceSplit, IcebergEn
LOG.info(
"Iceberg source restored {} splits from state for table {}",
enumState.pendingSplits().size(),
- lazyTable().name());
+ tableName);
assigner = assignerFactory.createAssigner(enumState.pendingSplits());
}
-
if (scanContext.isStreaming()) {
ContinuousSplitPlanner splitPlanner =
- new ContinuousSplitPlannerImpl(tableLoader.clone(), scanContext,
planningThreadName());
+ new ContinuousSplitPlannerImpl(tableLoader, scanContext,
planningThreadName());
return new ContinuousIcebergEnumerator(
enumContext, assigner, scanContext, splitPlanner, enumState);
} else {
@@ -537,7 +530,6 @@ public class IcebergSource<T> implements Source<T,
IcebergSourceSplit, IcebergEn
}
}
- checkRequired();
// Since builder already load the table, pass it to the source to avoid
double loading
return new IcebergSource<>(
tableLoader,
@@ -548,11 +540,5 @@ public class IcebergSource<T> implements Source<T,
IcebergSourceSplit, IcebergEn
table,
emitter);
}
-
- private void checkRequired() {
- Preconditions.checkNotNull(tableLoader, "tableLoader is required.");
- Preconditions.checkNotNull(splitAssignerFactory, "assignerFactory is
required.");
- Preconditions.checkNotNull(readerFunction, "readerFunction is
required.");
- }
}
}
diff --git
a/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/ContinuousSplitPlannerImpl.java
b/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/ContinuousSplitPlannerImpl.java
index f0d8ca8d70..450b649253 100644
---
a/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/ContinuousSplitPlannerImpl.java
+++
b/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/ContinuousSplitPlannerImpl.java
@@ -56,9 +56,9 @@ public class ContinuousSplitPlannerImpl implements
ContinuousSplitPlanner {
*/
public ContinuousSplitPlannerImpl(
TableLoader tableLoader, ScanContext scanContext, String threadName) {
- this.tableLoader = tableLoader;
+ this.tableLoader = tableLoader.clone();
this.tableLoader.open();
- this.table = tableLoader.loadTable();
+ this.table = this.tableLoader.loadTable();
this.scanContext = scanContext;
this.isSharedPool = threadName == null;
this.workerPool =