kbendick commented on a change in pull request #3817:
URL: https://github.com/apache/iceberg/pull/3817#discussion_r779844460



##########
File path: 
flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/source/FlinkSource.java
##########
@@ -225,7 +227,8 @@ int inferParallelism(FlinkInputFormat format, ScanContext 
context) {
       if 
(readableConfig.get(FlinkConfigOptions.TABLE_EXEC_ICEBERG_INFER_SOURCE_PARALLELISM))
 {
         int maxInferParallelism = readableConfig.get(FlinkConfigOptions
             .TABLE_EXEC_ICEBERG_INFER_SOURCE_PARALLELISM_MAX);
-        Preconditions.checkState(maxInferParallelism >= 1,
+        Preconditions.checkState(
+            maxInferParallelism >= 1,

Review comment:
       Nit: It seems like this change isn't updating anything.
   
   Can we ensure we don't update a line / cause a diff where there's no change 
in the source code? This greatly helps people who maintain forks and have to 
`git cherry-pick` most commits.

##########
File path: 
flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/source/FlinkSource.java
##########
@@ -247,6 +250,16 @@ int inferParallelism(FlinkInputFormat format, ScanContext 
context) {
       parallelism = Math.max(1, parallelism);
       return parallelism;
     }
+
+    private boolean localityEnabled() {
+      FileIO fileIO = table.io();
+      if (fileIO instanceof HadoopFileIO) {
+        Boolean localityConfig = readableConfig.get(FlinkConfigOptions
+            .TABLE_EXEC_ICEBERG_EXPOSE_SPLIT_LOCALITY_INFO);
+        return localityConfig != null ? localityConfig : true;

Review comment:
       Nit / open question:
   
   On the subject of having a default value, it seems like the default for 
`HadoopFileIO` is `true`.
   
   Should we also check that the path has a scheme that would make sense with 
locality enabled (e.g. the user isn't using HadoopFileIO with s3a or some other 
object store underneath)? Sort of like how Spark has the 
`FILESYSTEM_SUPPORT_LOCALITY` check?
   
   I'm marking this as an open question, as it might be difficult to pass this 
information through, but I know some people who use Azure or even GCS who use 
the `HadoopFileIO` with the shim for their object storage (whatever is similar 
to `s3a` for their object storage), and they would not benefit from a default 
value of `true`.

##########
File path: 
flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/source/FlinkInputSplit.java
##########
@@ -19,28 +19,19 @@
 
 package org.apache.iceberg.flink.source;
 
-import org.apache.flink.core.io.InputSplit;
 import org.apache.flink.core.io.LocatableInputSplit;
 import org.apache.iceberg.CombinedScanTask;
 import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;
 
-/**
- * TODO Implement {@link LocatableInputSplit}.
- */
-public class FlinkInputSplit implements InputSplit {
+public class FlinkInputSplit extends LocatableInputSplit {
 
-  private final int splitNumber;
   private final CombinedScanTask task;
 
-  FlinkInputSplit(int splitNumber, CombinedScanTask task) {
-    this.splitNumber = splitNumber;
+  FlinkInputSplit(int splitNumber, CombinedScanTask task, String[] hosts) {

Review comment:
       What value will `hosts` be if locality is disabled? Simply `null`?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to