This is an automated email from the ASF dual-hosted git repository.

stevenwu pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/main by this push:
     new 43bbf08adc Flink: FLIP-27 IcebergSource builder missed a couple of 
configs compared to old FlinkSource: expose locality and plan parallelism 
(#10957)
43bbf08adc is described below

commit 43bbf08adc63c0a5a5edb26934f62d9994c25af1
Author: Steven Zhen Wu <[email protected]>
AuthorDate: Mon Aug 19 15:29:47 2024 -0700

    Flink: FLIP-27 IcebergSource builder missed a couple of configs compared to 
old FlinkSource: expose locality and plan parallelism (#10957)
---
 .../src/main/java/org/apache/iceberg/flink/source/IcebergSource.java  | 4 ++++
 .../src/main/java/org/apache/iceberg/flink/source/IcebergSource.java  | 4 ++++
 .../src/main/java/org/apache/iceberg/flink/source/IcebergSource.java  | 4 ++++
 3 files changed, 12 insertions(+)

diff --git 
a/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/source/IcebergSource.java
 
b/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/source/IcebergSource.java
index ccbd0d9997..e629cc19bb 100644
--- 
a/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/source/IcebergSource.java
+++ 
b/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/source/IcebergSource.java
@@ -482,6 +482,10 @@ public class IcebergSource<T> implements Source<T, 
IcebergSourceSplit, IcebergEn
       }
 
       contextBuilder.resolveConfig(table, readOptions, flinkConfig);
+      contextBuilder.exposeLocality(
+          SourceUtil.isLocalityEnabled(table, flinkConfig, exposeLocality));
+      contextBuilder.planParallelism(
+          
flinkConfig.get(FlinkConfigOptions.TABLE_EXEC_ICEBERG_WORKER_POOL_SIZE));
       Schema icebergSchema = table.schema();
       if (projectedFlinkSchema != null) {
         contextBuilder.project(FlinkSchemaUtil.convert(icebergSchema, 
projectedFlinkSchema));
diff --git 
a/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/source/IcebergSource.java
 
b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/source/IcebergSource.java
index ccbd0d9997..e629cc19bb 100644
--- 
a/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/source/IcebergSource.java
+++ 
b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/source/IcebergSource.java
@@ -482,6 +482,10 @@ public class IcebergSource<T> implements Source<T, 
IcebergSourceSplit, IcebergEn
       }
 
       contextBuilder.resolveConfig(table, readOptions, flinkConfig);
+      contextBuilder.exposeLocality(
+          SourceUtil.isLocalityEnabled(table, flinkConfig, exposeLocality));
+      contextBuilder.planParallelism(
+          
flinkConfig.get(FlinkConfigOptions.TABLE_EXEC_ICEBERG_WORKER_POOL_SIZE));
       Schema icebergSchema = table.schema();
       if (projectedFlinkSchema != null) {
         contextBuilder.project(FlinkSchemaUtil.convert(icebergSchema, 
projectedFlinkSchema));
diff --git 
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/source/IcebergSource.java
 
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/source/IcebergSource.java
index ccbd0d9997..e629cc19bb 100644
--- 
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/source/IcebergSource.java
+++ 
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/source/IcebergSource.java
@@ -482,6 +482,10 @@ public class IcebergSource<T> implements Source<T, 
IcebergSourceSplit, IcebergEn
       }
 
       contextBuilder.resolveConfig(table, readOptions, flinkConfig);
+      contextBuilder.exposeLocality(
+          SourceUtil.isLocalityEnabled(table, flinkConfig, exposeLocality));
+      contextBuilder.planParallelism(
+          
flinkConfig.get(FlinkConfigOptions.TABLE_EXEC_ICEBERG_WORKER_POOL_SIZE));
       Schema icebergSchema = table.schema();
       if (projectedFlinkSchema != null) {
         contextBuilder.project(FlinkSchemaUtil.convert(icebergSchema, 
projectedFlinkSchema));

Reply via email to