This is an automated email from the ASF dual-hosted git repository.
stevenwu pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/main by this push:
new 43bbf08adc Flink: FLIP-27 IcebergSource builder missed a couple of
configs compared to old FlinkSource: expose locality and plan parallelism
(#10957)
43bbf08adc is described below
commit 43bbf08adc63c0a5a5edb26934f62d9994c25af1
Author: Steven Zhen Wu <[email protected]>
AuthorDate: Mon Aug 19 15:29:47 2024 -0700
Flink: FLIP-27 IcebergSource builder missed a couple of configs compared to
old FlinkSource: expose locality and plan parallelism (#10957)
---
.../src/main/java/org/apache/iceberg/flink/source/IcebergSource.java | 4 ++++
.../src/main/java/org/apache/iceberg/flink/source/IcebergSource.java | 4 ++++
.../src/main/java/org/apache/iceberg/flink/source/IcebergSource.java | 4 ++++
3 files changed, 12 insertions(+)
diff --git
a/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/source/IcebergSource.java
b/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/source/IcebergSource.java
index ccbd0d9997..e629cc19bb 100644
---
a/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/source/IcebergSource.java
+++
b/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/source/IcebergSource.java
@@ -482,6 +482,10 @@ public class IcebergSource<T> implements Source<T,
IcebergSourceSplit, IcebergEn
}
contextBuilder.resolveConfig(table, readOptions, flinkConfig);
+ contextBuilder.exposeLocality(
+ SourceUtil.isLocalityEnabled(table, flinkConfig, exposeLocality));
+ contextBuilder.planParallelism(
+
flinkConfig.get(FlinkConfigOptions.TABLE_EXEC_ICEBERG_WORKER_POOL_SIZE));
Schema icebergSchema = table.schema();
if (projectedFlinkSchema != null) {
contextBuilder.project(FlinkSchemaUtil.convert(icebergSchema,
projectedFlinkSchema));
diff --git
a/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/source/IcebergSource.java
b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/source/IcebergSource.java
index ccbd0d9997..e629cc19bb 100644
---
a/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/source/IcebergSource.java
+++
b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/source/IcebergSource.java
@@ -482,6 +482,10 @@ public class IcebergSource<T> implements Source<T,
IcebergSourceSplit, IcebergEn
}
contextBuilder.resolveConfig(table, readOptions, flinkConfig);
+ contextBuilder.exposeLocality(
+ SourceUtil.isLocalityEnabled(table, flinkConfig, exposeLocality));
+ contextBuilder.planParallelism(
+
flinkConfig.get(FlinkConfigOptions.TABLE_EXEC_ICEBERG_WORKER_POOL_SIZE));
Schema icebergSchema = table.schema();
if (projectedFlinkSchema != null) {
contextBuilder.project(FlinkSchemaUtil.convert(icebergSchema,
projectedFlinkSchema));
diff --git
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/source/IcebergSource.java
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/source/IcebergSource.java
index ccbd0d9997..e629cc19bb 100644
---
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/source/IcebergSource.java
+++
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/source/IcebergSource.java
@@ -482,6 +482,10 @@ public class IcebergSource<T> implements Source<T,
IcebergSourceSplit, IcebergEn
}
contextBuilder.resolveConfig(table, readOptions, flinkConfig);
+ contextBuilder.exposeLocality(
+ SourceUtil.isLocalityEnabled(table, flinkConfig, exposeLocality));
+ contextBuilder.planParallelism(
+
flinkConfig.get(FlinkConfigOptions.TABLE_EXEC_ICEBERG_WORKER_POOL_SIZE));
Schema icebergSchema = table.schema();
if (projectedFlinkSchema != null) {
contextBuilder.project(FlinkSchemaUtil.convert(icebergSchema,
projectedFlinkSchema));