kbendick commented on a change in pull request #3817:
URL: https://github.com/apache/iceberg/pull/3817#discussion_r780425675



##########
File path: 
flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/source/FlinkSource.java
##########
@@ -247,6 +258,24 @@ int inferParallelism(FlinkInputFormat format, ScanContext 
context) {
       parallelism = Math.max(1, parallelism);
       return parallelism;
     }
+
+    boolean localityEnabled() {
+      FileIO fileIO = table.io();
+      if (fileIO instanceof HadoopFileIO) {
+        Boolean localityConfig = 
readableConfig.get(FlinkConfigOptions.TABLE_EXEC_ICEBERG_EXPOSE_SPLIT_LOCALITY_INFO);
+
+        HadoopFileIO hadoopFileIO = (HadoopFileIO) fileIO;
+        try {
+          String scheme = new 
Path(table.location()).getFileSystem(hadoopFileIO.getConf()).getScheme();
+          boolean defaultValue = FILE_SYSTEM_SUPPORT_LOCALITY.contains(scheme);

Review comment:
       It would be nice to avoid the dependency on `Path` in this class - so 
people who don't have hadoop needs don't need the dependency, you might be able 
to do the following to get the same result.
   
   If you call `((HadoopFileIO) 
fileIO).newInputFile(table.location()).location()`, I believe you can check the 
scheme on the location sting returned by `InputFile`.
   
   There could very possibly be.a cleaner way to do it. But if this class 
doesn't already pull in `org.apache.hadoop.*`, then it would be best to avoid 
that if possible so that users who run only on cloud environments for example 
don't have to have HDFS on their classpath.
   
   In this case, just be having the import, even if it's `S3FileIO`, users 
still need the hadoop jars on the classpath (which is sometimes a pain on 
Flink).

##########
File path: 
flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/source/FlinkSource.java
##########
@@ -247,6 +258,24 @@ int inferParallelism(FlinkInputFormat format, ScanContext 
context) {
       parallelism = Math.max(1, parallelism);
       return parallelism;
     }
+
+    boolean localityEnabled() {
+      FileIO fileIO = table.io();
+      if (fileIO instanceof HadoopFileIO) {
+        Boolean localityConfig = 
readableConfig.get(FlinkConfigOptions.TABLE_EXEC_ICEBERG_EXPOSE_SPLIT_LOCALITY_INFO);
+
+        HadoopFileIO hadoopFileIO = (HadoopFileIO) fileIO;
+        try {
+          String scheme = new 
Path(table.location()).getFileSystem(hadoopFileIO.getConf()).getScheme();
+          boolean defaultValue = FILE_SYSTEM_SUPPORT_LOCALITY.contains(scheme);

Review comment:
       It would be nice to avoid the dependency on `Path` in this class - so 
people who don't have HDFS requirements don't need the dependency, you might be 
able to do the following to get the same result.
   
   If you call `((HadoopFileIO) 
fileIO).newInputFile(table.location()).location()`, I believe you can check the 
scheme on the location sting returned by `InputFile`.
   
   There could very possibly be.a cleaner way to do it. But if this class 
doesn't already pull in `org.apache.hadoop.*`, then it would be best to avoid 
that if possible so that users who run only on cloud environments for example 
don't have to have hadoop jars on their user classpath.
   
   In this case, just be having the import, even if it's `S3FileIO`, users 
still need the hadoop jars on the classpath (which is sometimes a pain on 
Flink).




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to