This is an automated email from the ASF dual-hosted git repository.

vinoyang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 20786ab  [HUDI-1681] Support object storage for Flink writer (#2662)
20786ab is described below

commit 20786ab8a2a1e7735ab846e92802fb9f4449adc9
Author: Danny Chan <[email protected]>
AuthorDate: Fri Mar 12 16:39:24 2021 +0800

    [HUDI-1681] Support object storage for Flink writer (#2662)
    
    In order to support object storage, we need these changes:
    
    * Use the Hadoop filesystem so that we can find the plugin filesystem
    * Do not fetch file size until the file handle is closed
    * Do not close the opened filesystem because we want to use the
      filesystem cache
---
 .../org/apache/hudi/io/HoodieAppendHandle.java     |   2 +
 .../apache/hudi/io/storage/HoodieFileWriter.java   |   2 +
 .../apache/hudi/io/storage/HoodieHFileWriter.java  |   5 +
 .../hudi/io/storage/HoodieParquetWriter.java       |   5 +
 hudi-client/hudi-flink-client/pom.xml              |   6 +
 .../client/common/HoodieFlinkEngineContext.java    |   4 +-
 .../java/org/apache/hudi/io/FlinkAppendHandle.java |  11 +-
 .../java/org/apache/hudi/io/FlinkCreateHandle.java |   3 +-
 .../java/org/apache/hudi/util/FlinkClientUtil.java |  81 +++++++
 .../apache/hudi/factory/HoodieTableFactory.java    |   2 +-
 .../org/apache/hudi/source/HoodieTableSource.java  |  25 +-
 .../apache/hudi/source/format/FilePathUtils.java   | 126 +++++-----
 .../source/format/cow/CopyOnWriteInputFormat.java  | 253 ++++++++++++++++++++-
 .../source/format/mor/MergeOnReadInputFormat.java  |   4 +-
 .../java/org/apache/hudi/util/StreamerUtil.java    |  64 +-----
 .../apache/hudi/source/TestHoodieTableSource.java  |   2 +-
 .../apache/hudi/source/TestStreamReadOperator.java |  10 +-
 .../apache/hudi/source/format/TestInputFormat.java |   2 +-
 18 files changed, 443 insertions(+), 164 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java
index 986afe6..b248a2f 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java
@@ -274,6 +274,7 @@ public class HoodieAppendHandle<T extends 
HoodieRecordPayload, I, K, O> extends
     if (!stat.getLogFiles().contains(result.logFile().getFileName())) {
       stat.addLogFiles(result.logFile().getFileName());
     }
+    stat.setFileSizeInBytes(result.size());
   }
 
   private void updateRuntimeStats(HoodieDeltaWriteStat stat) {
@@ -304,6 +305,7 @@ public class HoodieAppendHandle<T extends 
HoodieRecordPayload, I, K, O> extends
     } else if (stat.getPath().endsWith(result.logFile().getFileName())) {
       // append/continued writing to the same log file
       stat.setLogOffset(Math.min(stat.getLogOffset(), result.offset()));
+      stat.setFileSizeInBytes(stat.getFileSizeInBytes() + result.size());
       accumulateWriteCounts(stat, result);
       accumulateRuntimeStats(stat);
     } else {
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieFileWriter.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieFileWriter.java
index ea9ecad..1aaa389 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieFileWriter.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieFileWriter.java
@@ -33,4 +33,6 @@ public interface HoodieFileWriter<R extends IndexedRecord> {
   void close() throws IOException;
 
   void writeAvro(String key, R oldRecord) throws IOException;
+
+  long getBytesWritten();
 }
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieHFileWriter.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieHFileWriter.java
index d77483c..352c51c 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieHFileWriter.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieHFileWriter.java
@@ -156,4 +156,9 @@ public class HoodieHFileWriter<T extends 
HoodieRecordPayload, R extends IndexedR
     writer.close();
     writer = null;
   }
+
+  @Override
+  public long getBytesWritten() {
+    return fs.getBytesWritten(file);
+  }
 }
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieParquetWriter.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieParquetWriter.java
index d1c695a..c3939d7 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieParquetWriter.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieParquetWriter.java
@@ -94,4 +94,9 @@ public class HoodieParquetWriter<T extends 
HoodieRecordPayload, R extends Indexe
     super.write(object);
     writeSupport.add(key);
   }
+
+  @Override
+  public long getBytesWritten() {
+    return fs.getBytesWritten(file);
+  }
 }
diff --git a/hudi-client/hudi-flink-client/pom.xml 
b/hudi-client/hudi-flink-client/pom.xml
index 7408b4a..57231d9 100644
--- a/hudi-client/hudi-flink-client/pom.xml
+++ b/hudi-client/hudi-flink-client/pom.xml
@@ -46,6 +46,12 @@
       <groupId>org.apache.flink</groupId>
       <artifactId>flink-clients_${scala.binary.version}</artifactId>
     </dependency>
+    <dependency>
+      <groupId>org.apache.flink</groupId>
+      
<artifactId>flink-hadoop-compatibility_${scala.binary.version}</artifactId>
+      <version>${flink.version}</version>
+      <scope>provided</scope>
+    </dependency>
 
     <!-- Parquet -->
     <dependency>
diff --git 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/common/HoodieFlinkEngineContext.java
 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/common/HoodieFlinkEngineContext.java
index 6c7f44b..7713c1e 100644
--- 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/common/HoodieFlinkEngineContext.java
+++ 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/common/HoodieFlinkEngineContext.java
@@ -29,7 +29,6 @@ import 
org.apache.hudi.common.function.SerializablePairFunction;
 import org.apache.hudi.common.util.Option;
 
 import org.apache.flink.api.common.functions.RuntimeContext;
-import org.apache.hadoop.conf.Configuration;
 
 import java.util.List;
 import java.util.Map;
@@ -37,6 +36,7 @@ import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
 import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.util.FlinkClientUtil;
 
 import static 
org.apache.hudi.common.function.FunctionWrapper.throwingFlatMapWrapper;
 import static 
org.apache.hudi.common.function.FunctionWrapper.throwingForeachWrapper;
@@ -50,7 +50,7 @@ public class HoodieFlinkEngineContext extends 
HoodieEngineContext {
   private RuntimeContext runtimeContext;
 
   public HoodieFlinkEngineContext(TaskContextSupplier taskContextSupplier) {
-    this(new SerializableConfiguration(new Configuration()), 
taskContextSupplier);
+    this(new SerializableConfiguration(FlinkClientUtil.getHadoopConf()), 
taskContextSupplier);
   }
 
   public HoodieFlinkEngineContext(SerializableConfiguration hadoopConf, 
TaskContextSupplier taskContextSupplier) {
diff --git 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkAppendHandle.java
 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkAppendHandle.java
index 9d56d47..b827bf2 100644
--- 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkAppendHandle.java
+++ 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkAppendHandle.java
@@ -20,14 +20,12 @@ package org.apache.hudi.io;
 
 import org.apache.hudi.client.WriteStatus;
 import org.apache.hudi.common.engine.TaskContextSupplier;
-import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieRecordPayload;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.exception.HoodieUpsertException;
 import org.apache.hudi.table.HoodieTable;
 
-import org.apache.hadoop.fs.Path;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -98,14 +96,7 @@ public class FlinkAppendHandle<T extends 
HoodieRecordPayload, I, K, O> extends H
     needBootStrap = false;
     // flush any remaining records to disk
     appendDataAndDeleteBlocks(header);
-    try {
-      for (WriteStatus status: statuses) {
-        long logFileSize = FSUtils.getFileSize(fs, new 
Path(config.getBasePath(), status.getStat().getPath()));
-        status.getStat().setFileSizeInBytes(logFileSize);
-      }
-    } catch (IOException e) {
-      throw new HoodieUpsertException("Failed to get file size for append 
handle", e);
-    }
+    // need to fix that the incremental write size in bytes may be lost
     List<WriteStatus> ret = new ArrayList<>(statuses);
     statuses.clear();
     return ret;
diff --git 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkCreateHandle.java
 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkCreateHandle.java
index ce3725b..6f4638e 100644
--- 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkCreateHandle.java
+++ 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkCreateHandle.java
@@ -20,7 +20,6 @@ package org.apache.hudi.io;
 
 import org.apache.hudi.client.WriteStatus;
 import org.apache.hudi.common.engine.TaskContextSupplier;
-import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieRecordPayload;
 import org.apache.hudi.common.model.HoodieWriteStat;
@@ -109,7 +108,7 @@ public class FlinkCreateHandle<T extends 
HoodieRecordPayload, I, K, O>
    * @throws IOException if error occurs
    */
   private void setUpWriteStatus() throws IOException {
-    long fileSizeInBytes = FSUtils.getFileSize(fs, path);
+    long fileSizeInBytes = fileWriter.getBytesWritten();
     long incFileSizeInBytes = fileSizeInBytes - lastFileSize;
     this.lastFileSize = fileSizeInBytes;
     HoodieWriteStat stat = new HoodieWriteStat();
diff --git 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/util/FlinkClientUtil.java
 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/util/FlinkClientUtil.java
new file mode 100644
index 0000000..c38c1f1
--- /dev/null
+++ 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/util/FlinkClientUtil.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.util;
+
+import org.apache.flink.api.java.hadoop.mapred.utils.HadoopUtils;
+import org.apache.flink.configuration.Configuration;
+import org.apache.hadoop.fs.Path;
+
+import java.io.File;
+
+/**
+ * Utilities for Hoodie Flink client.
+ */
+public class FlinkClientUtil {
+
+  /**
+   * Returns the hadoop configuration with possible hadoop conf paths.
+   * E.G. the configurations under path $HADOOP_CONF_DIR and $HADOOP_HOME.
+   */
+  public static org.apache.hadoop.conf.Configuration getHadoopConf() {
+    // create hadoop configuration with hadoop conf directory configured.
+    org.apache.hadoop.conf.Configuration hadoopConf = null;
+    for (String possibleHadoopConfPath : 
HadoopUtils.possibleHadoopConfPaths(new Configuration())) {
+      hadoopConf = getHadoopConfiguration(possibleHadoopConfPath);
+      if (hadoopConf != null) {
+        break;
+      }
+    }
+    if (hadoopConf == null) {
+      hadoopConf = new org.apache.hadoop.conf.Configuration();
+    }
+    return hadoopConf;
+  }
+
+  /**
+   * Returns a new Hadoop Configuration object using the path to the hadoop 
conf configured.
+   *
+   * @param hadoopConfDir Hadoop conf directory path.
+   * @return A Hadoop configuration instance.
+   */
+  private static org.apache.hadoop.conf.Configuration 
getHadoopConfiguration(String hadoopConfDir) {
+    if (new File(hadoopConfDir).exists()) {
+      org.apache.hadoop.conf.Configuration hadoopConfiguration = new 
org.apache.hadoop.conf.Configuration();
+      File coreSite = new File(hadoopConfDir, "core-site.xml");
+      if (coreSite.exists()) {
+        hadoopConfiguration.addResource(new Path(coreSite.getAbsolutePath()));
+      }
+      File hdfsSite = new File(hadoopConfDir, "hdfs-site.xml");
+      if (hdfsSite.exists()) {
+        hadoopConfiguration.addResource(new Path(hdfsSite.getAbsolutePath()));
+      }
+      File yarnSite = new File(hadoopConfDir, "yarn-site.xml");
+      if (yarnSite.exists()) {
+        hadoopConfiguration.addResource(new Path(yarnSite.getAbsolutePath()));
+      }
+      // Add mapred-site.xml. We need to read configurations like compression 
codec.
+      File mapredSite = new File(hadoopConfDir, "mapred-site.xml");
+      if (mapredSite.exists()) {
+        hadoopConfiguration.addResource(new 
Path(mapredSite.getAbsolutePath()));
+      }
+      return hadoopConfiguration;
+    }
+    return null;
+  }
+}
diff --git 
a/hudi-flink/src/main/java/org/apache/hudi/factory/HoodieTableFactory.java 
b/hudi-flink/src/main/java/org/apache/hudi/factory/HoodieTableFactory.java
index 4fbe4cd..28883ba 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/factory/HoodieTableFactory.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/factory/HoodieTableFactory.java
@@ -24,7 +24,6 @@ import org.apache.hudi.source.HoodieTableSource;
 import org.apache.hudi.util.AvroSchemaConverter;
 
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.core.fs.Path;
 import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.api.ValidationException;
 import org.apache.flink.table.data.RowData;
@@ -35,6 +34,7 @@ import org.apache.flink.table.sinks.TableSink;
 import org.apache.flink.table.sources.TableSource;
 import org.apache.flink.table.types.logical.LogicalType;
 import org.apache.flink.table.utils.TableSchemaUtils;
+import org.apache.hadoop.fs.Path;
 
 import java.util.Collections;
 import java.util.HashMap;
diff --git 
a/hudi-flink/src/main/java/org/apache/hudi/source/HoodieTableSource.java 
b/hudi-flink/src/main/java/org/apache/hudi/source/HoodieTableSource.java
index 2a8fbe0..e78c656 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/source/HoodieTableSource.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/source/HoodieTableSource.java
@@ -49,7 +49,6 @@ import org.apache.flink.api.common.io.InputFormat;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.io.CollectionInputFormat;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.core.fs.Path;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.DataStreamSource;
 import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
@@ -70,6 +69,7 @@ import org.apache.flink.table.sources.TableSource;
 import org.apache.flink.table.types.DataType;
 import org.apache.flink.table.types.logical.RowType;
 import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapred.JobConf;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -166,7 +166,7 @@ public class HoodieTableSource implements
         (TypeInformation<RowData>) 
TypeInfoDataTypeConverter.fromDataTypeToTypeInfo(getProducedDataType());
     if (conf.getBoolean(FlinkOptions.READ_AS_STREAMING)) {
       StreamReadMonitoringFunction monitoringFunction = new 
StreamReadMonitoringFunction(
-          conf, path, metaClient, maxCompactionMemoryInBytes);
+          conf, FilePathUtils.toFlinkPath(path), metaClient, 
maxCompactionMemoryInBytes);
       OneInputStreamOperatorFactory<MergeOnReadInputSplit, RowData> factory = 
StreamReadOperator.factory((MergeOnReadInputFormat) getInputFormat(true));
       SingleOutputStreamOperator<RowData> source = 
execEnv.addSource(monitoringFunction, "streaming_source")
           .setParallelism(1)
@@ -213,7 +213,8 @@ public class HoodieTableSource implements
 
   @Override
   public List<Map<String, String>> getPartitions() {
-    return FilePathUtils.getPartitions(path, conf, partitionKeys, 
defaultPartName);
+    return FilePathUtils.getPartitions(path, hadoopConf, partitionKeys, 
defaultPartName,
+        conf.getBoolean(FlinkOptions.HIVE_STYLE_PARTITION));
   }
 
   @Override
@@ -253,7 +254,8 @@ public class HoodieTableSource implements
 
   private List<MergeOnReadInputSplit> buildFileIndex(Path[] paths) {
     FileStatus[] fileStatuses = Arrays.stream(paths)
-        .flatMap(path -> 
Arrays.stream(FilePathUtils.getHadoopFileStatusRecursively(path, 1, 
hadoopConf)))
+        .flatMap(path ->
+            Arrays.stream(FilePathUtils.getFileStatusRecursively(path, 1, 
hadoopConf)))
         .toArray(FileStatus[]::new);
     if (fileStatuses.length == 0) {
       throw new HoodieException("No files found for reading in user provided 
path.");
@@ -281,9 +283,7 @@ public class HoodieTableSource implements
     } else {
       // all the files are logs
       return Arrays.stream(paths).map(partitionPath -> {
-        String relPartitionPath = FSUtils.getRelativePartitionPath(
-            new org.apache.hadoop.fs.Path(path.toUri()),
-            new org.apache.hadoop.fs.Path(partitionPath.toUri()));
+        String relPartitionPath = FSUtils.getRelativePartitionPath(path, 
partitionPath);
         return fsView.getLatestMergedFileSlicesBeforeOrOn(relPartitionPath, 
latestCommit)
             .map(fileSlice -> {
               Option<List<String>> logPaths = 
Option.ofNullable(fileSlice.getLogFiles()
@@ -351,14 +351,14 @@ public class HoodieTableSource implements
               inputSplits);
           return new MergeOnReadInputFormat(
               this.conf,
-              paths,
+              FilePathUtils.toFlinkPaths(paths),
               hoodieTableState,
               rowDataType.getChildren(), // use the explicit fields data type 
because the AvroSchemaConverter is not very stable.
               "default",
               this.limit);
         case COPY_ON_WRITE:
           FileInputFormat<RowData> format = new CopyOnWriteInputFormat(
-              paths,
+              FilePathUtils.toFlinkPaths(paths),
               this.schema.getFieldNames(),
               this.schema.getFieldDataTypes(),
               this.requiredPos,
@@ -398,7 +398,8 @@ public class HoodieTableSource implements
   public Path[] getReadPaths() {
     return partitionKeys.isEmpty()
         ? new Path[] {path}
-        : FilePathUtils.partitionPath2ReadPath(path, conf, partitionKeys, 
getOrFetchPartitions());
+        : FilePathUtils.partitionPath2ReadPath(path, partitionKeys, 
getOrFetchPartitions(),
+        conf.getBoolean(FlinkOptions.HIVE_STYLE_PARTITION));
   }
 
   private static class LatestFileFilter extends FilePathFilter {
@@ -409,8 +410,8 @@ public class HoodieTableSource implements
     }
 
     @Override
-    public boolean filterPath(Path filePath) {
-      return !this.hoodieFilter.accept(new 
org.apache.hadoop.fs.Path(filePath.toUri()));
+    public boolean filterPath(org.apache.flink.core.fs.Path filePath) {
+      return !this.hoodieFilter.accept(new Path(filePath.toUri()));
     }
   }
 }
diff --git 
a/hudi-flink/src/main/java/org/apache/hudi/source/format/FilePathUtils.java 
b/hudi-flink/src/main/java/org/apache/hudi/source/format/FilePathUtils.java
index 03bf53d..1b029b2 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/source/format/FilePathUtils.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/source/format/FilePathUtils.java
@@ -22,14 +22,16 @@ import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.operator.FlinkOptions;
 
 import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.core.fs.FileStatus;
-import org.apache.flink.core.fs.FileSystem;
-import org.apache.flink.core.fs.Path;
 import org.apache.flink.table.api.TableException;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
 
+import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.BitSet;
 import java.util.LinkedHashMap;
 import java.util.List;
@@ -77,14 +79,14 @@ public class FilePathUtils {
   /**
    * Make partition path from partition spec.
    *
-   * @param partitionKVs       The partition key value mapping
-   * @param hiveStylePartition Whether the partition path is with Hive style,
-   *                           e.g. {partition key} = {partition value}
+   * @param partitionKVs  The partition key value mapping
+   * @param hivePartition Whether the partition path is with Hive style,
+   *                      e.g. {partition key} = {partition value}
    * @return an escaped, valid partition name
    */
   public static String generatePartitionPath(
       LinkedHashMap<String, String> partitionKVs,
-      boolean hiveStylePartition) {
+      boolean hivePartition) {
     if (partitionKVs.isEmpty()) {
       return "";
     }
@@ -92,16 +94,16 @@ public class FilePathUtils {
     int i = 0;
     for (Map.Entry<String, String> e : partitionKVs.entrySet()) {
       if (i > 0) {
-        suffixBuf.append(Path.SEPARATOR);
+        suffixBuf.append(File.separator);
       }
-      if (hiveStylePartition) {
+      if (hivePartition) {
         suffixBuf.append(escapePathName(e.getKey()));
         suffixBuf.append('=');
       }
       suffixBuf.append(escapePathName(e.getValue()));
       i++;
     }
-    suffixBuf.append(Path.SEPARATOR);
+    suffixBuf.append(File.separator);
     return suffixBuf.toString();
   }
 
@@ -235,7 +237,11 @@ public class FilePathUtils {
     return ret;
   }
 
-  private static FileStatus[] getFileStatusRecursively(Path path, int 
expectLevel, FileSystem fs) {
+  public static FileStatus[] getFileStatusRecursively(Path path, int 
expectLevel, Configuration conf) {
+    return getFileStatusRecursively(path, expectLevel, 
FSUtils.getFs(path.toString(), conf));
+  }
+
+  public static FileStatus[] getFileStatusRecursively(Path path, int 
expectLevel, FileSystem fs) {
     ArrayList<FileStatus> result = new ArrayList<>();
 
     try {
@@ -268,54 +274,6 @@ public class FilePathUtils {
 
   private static boolean isHiddenFile(FileStatus fileStatus) {
     String name = fileStatus.getPath().getName();
-    return name.startsWith("_") || name.startsWith(".");
-  }
-
-  /**
-   * Same as getFileStatusRecursively but returns hadoop {@link 
org.apache.hadoop.fs.FileStatus}s.
-   */
-  public static org.apache.hadoop.fs.FileStatus[] 
getHadoopFileStatusRecursively(
-      Path path, int expectLevel, Configuration hadoopConf) {
-    ArrayList<org.apache.hadoop.fs.FileStatus> result = new ArrayList<>();
-
-    org.apache.hadoop.fs.Path hadoopPath = new 
org.apache.hadoop.fs.Path(path.toUri());
-    org.apache.hadoop.fs.FileSystem fs = FSUtils.getFs(path.getPath(), 
hadoopConf);
-
-    try {
-      org.apache.hadoop.fs.FileStatus fileStatus = 
fs.getFileStatus(hadoopPath);
-      listStatusRecursivelyV2(fs, fileStatus, 0, expectLevel, result);
-    } catch (IOException ignore) {
-      return new org.apache.hadoop.fs.FileStatus[0];
-    }
-
-    return result.toArray(new org.apache.hadoop.fs.FileStatus[0]);
-  }
-
-  private static void listStatusRecursivelyV2(
-      org.apache.hadoop.fs.FileSystem fs,
-      org.apache.hadoop.fs.FileStatus fileStatus,
-      int level,
-      int expectLevel,
-      List<org.apache.hadoop.fs.FileStatus> results) throws IOException {
-    if (isHiddenFileV2(fileStatus)) {
-      // do nothing
-      return;
-    }
-
-    if (expectLevel == level) {
-      results.add(fileStatus);
-      return;
-    }
-
-    if (fileStatus.isDirectory()) {
-      for (org.apache.hadoop.fs.FileStatus stat : 
fs.listStatus(fileStatus.getPath())) {
-        listStatusRecursivelyV2(fs, stat, level + 1, expectLevel, results);
-      }
-    }
-  }
-
-  private static boolean isHiddenFileV2(org.apache.hadoop.fs.FileStatus 
fileStatus) {
-    String name = fileStatus.getPath().getName();
     // the log files is hidden file
     return name.startsWith("_") || name.startsWith(".") && 
!name.contains(".log.");
   }
@@ -333,21 +291,23 @@ public class FilePathUtils {
    * <p>The return list should be [{key1:val1, key2:val2, key3:val3}, 
{key1:val4, key2:val5, key3:val6}].
    *
    * @param path The base path
-   * @param conf The configuration
+   * @param hadoopConf The hadoop configuration
    * @param partitionKeys The partition key list
    * @param defaultParName The default partition name for nulls
+   * @param hivePartition Whether the partition path is in Hive style
    */
   public static List<Map<String, String>> getPartitions(
       Path path,
-      org.apache.flink.configuration.Configuration conf,
+      Configuration hadoopConf,
       List<String> partitionKeys,
-      String defaultParName) {
+      String defaultParName,
+      boolean hivePartition) {
     try {
       return FilePathUtils
           .searchPartKeyValueAndPaths(
-              path.getFileSystem(),
+              FSUtils.getFs(path.toString(), hadoopConf),
               path,
-              conf.getBoolean(FlinkOptions.HIVE_STYLE_PARTITION),
+              hivePartition,
               partitionKeys.toArray(new String[0]))
           .stream()
           .map(tuple2 -> tuple2.f0)
@@ -386,21 +346,23 @@ public class FilePathUtils {
    * Returns all the file paths that is the parents of the data files.
    *
    * @param path The base path
-   * @param conf The configuration
+   * @param conf The Flink configuration
+   * @param hadoopConf The hadoop configuration
    * @param partitionKeys The partition key list
-   * @param defaultParName The default partition name for nulls
    */
   public static Path[] getReadPaths(
       Path path,
       org.apache.flink.configuration.Configuration conf,
-      List<String> partitionKeys,
-      String defaultParName) {
+      Configuration hadoopConf,
+      List<String> partitionKeys) {
     if (partitionKeys.isEmpty()) {
       return new Path[] {path};
     } else {
+      final String defaultParName = 
conf.getString(FlinkOptions.PARTITION_DEFAULT_NAME);
+      final boolean hivePartition = 
conf.getBoolean(FlinkOptions.HIVE_STYLE_PARTITION);
       List<Map<String, String>> partitionPaths =
-          getPartitions(path, conf, partitionKeys, defaultParName);
-      return partitionPath2ReadPath(path, conf, partitionKeys, partitionPaths);
+          getPartitions(path, hadoopConf, partitionKeys, defaultParName, 
hivePartition);
+      return partitionPath2ReadPath(path, partitionKeys, partitionPaths, 
hivePartition);
     }
   }
 
@@ -408,21 +370,37 @@ public class FilePathUtils {
    * Transforms the given partition key value mapping to read paths.
    *
    * @param path The base path
-   * @param conf The hadoop configuration
    * @param partitionKeys The partition key list
    * @param partitionPaths The partition key value mapping
+   * @param hivePartition Whether the partition path is in Hive style
    *
    * @see #getReadPaths
    */
   public static Path[] partitionPath2ReadPath(
       Path path,
-      org.apache.flink.configuration.Configuration conf,
       List<String> partitionKeys,
-      List<Map<String, String>> partitionPaths) {
+      List<Map<String, String>> partitionPaths,
+      boolean hivePartition) {
     return partitionPaths.stream()
         .map(m -> validateAndReorderPartitions(m, partitionKeys))
-        .map(kvs -> FilePathUtils.generatePartitionPath(kvs, 
conf.getBoolean(FlinkOptions.HIVE_STYLE_PARTITION)))
+        .map(kvs -> FilePathUtils.generatePartitionPath(kvs, hivePartition))
         .map(n -> new Path(path, n))
         .toArray(Path[]::new);
   }
+
+  /**
+   * Transforms the array of Hadoop paths to Flink paths.
+   */
+  public static org.apache.flink.core.fs.Path[] toFlinkPaths(Path[] paths) {
+    return Arrays.stream(paths)
+        .map(p -> toFlinkPath(p))
+        .toArray(org.apache.flink.core.fs.Path[]::new);
+  }
+
+  /**
+   * Transforms the Hadoop path to Flink path.
+   */
+  public static org.apache.flink.core.fs.Path toFlinkPath(Path path) {
+    return new org.apache.flink.core.fs.Path(path.toUri());
+  }
 }
diff --git 
a/hudi-flink/src/main/java/org/apache/hudi/source/format/cow/CopyOnWriteInputFormat.java
 
b/hudi-flink/src/main/java/org/apache/hudi/source/format/cow/CopyOnWriteInputFormat.java
index 2a28d85..709c32e 100644
--- 
a/hudi-flink/src/main/java/org/apache/hudi/source/format/cow/CopyOnWriteInputFormat.java
+++ 
b/hudi-flink/src/main/java/org/apache/hudi/source/format/cow/CopyOnWriteInputFormat.java
@@ -18,7 +18,12 @@
 
 package org.apache.hudi.source.format.cow;
 
+import org.apache.hudi.common.fs.FSUtils;
+
 import org.apache.flink.api.common.io.FileInputFormat;
+import org.apache.flink.api.common.io.FilePathFilter;
+import org.apache.flink.api.common.io.GlobFilePathFilter;
+import org.apache.flink.api.common.io.compression.InflaterInputStreamFactory;
 import org.apache.flink.core.fs.FileInputSplit;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.formats.parquet.utils.SerializableConfiguration;
@@ -26,11 +31,19 @@ import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.types.DataType;
 import org.apache.flink.table.utils.PartitionPathUtils;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.BlockLocation;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.HashSet;
 import java.util.LinkedHashMap;
 import java.util.List;
+import java.util.Set;
 
 import static 
org.apache.flink.table.data.vector.VectorizedColumnBatch.DEFAULT_SIZE;
 import static 
org.apache.flink.table.filesystem.RowPartitionComputer.restorePartValueFromType;
@@ -43,11 +56,16 @@ import static 
org.apache.flink.table.filesystem.RowPartitionComputer.restorePart
  * {@code 
org.apache.flink.formats.parquet.ParquetFileSystemFormatFactory.ParquetInputFormat}
  * to support TIMESTAMP_MILLIS.
  *
+ * <p>Note: Override the {@link #createInputSplits} method from parent to 
rewrite the logic creating the FileSystem,
+ * use {@link FSUtils#getFs} to get a plugin filesystem.
+ *
  * @see ParquetSplitReaderUtil
  */
 public class CopyOnWriteInputFormat extends FileInputFormat<RowData> {
   private static final long serialVersionUID = 1L;
 
+  private static final Logger LOG = 
LoggerFactory.getLogger(CopyOnWriteInputFormat.class);
+
   private final String[] fullFieldNames;
   private final DataType[] fullFieldTypes;
   private final int[] selectedFields;
@@ -59,6 +77,11 @@ public class CopyOnWriteInputFormat extends 
FileInputFormat<RowData> {
   private transient ParquetColumnarRowSplitReader reader;
   private transient long currentReadCount;
 
+  /**
+   * Files filter for determining what files/directories should be included.
+   */
+  private FilePathFilter localFilesFilter = new GlobFilePathFilter();
+
   public CopyOnWriteInputFormat(
       Path[] paths,
       String[] fullFieldNames,
@@ -98,13 +121,139 @@ public class CopyOnWriteInputFormat extends 
FileInputFormat<RowData> {
         partObjects,
         selectedFields,
         DEFAULT_SIZE,
-        new Path(fileSplit.getPath().toString()),
+        fileSplit.getPath(),
         fileSplit.getStart(),
         fileSplit.getLength());
     this.currentReadCount = 0L;
   }
 
   @Override
+  public FileInputSplit[] createInputSplits(int minNumSplits) throws 
IOException {
+    if (minNumSplits < 1) {
+      throw new IllegalArgumentException("Number of input splits has to be at 
least 1.");
+    }
+
+    // take the desired number of splits into account
+    minNumSplits = Math.max(minNumSplits, this.numSplits);
+
+    final List<FileInputSplit> inputSplits = new 
ArrayList<FileInputSplit>(minNumSplits);
+
+    // get all the files that are involved in the splits
+    List<FileStatus> files = new ArrayList<>();
+    long totalLength = 0;
+
+    for (Path path : getFilePaths()) {
+      final org.apache.hadoop.fs.Path hadoopPath = new 
org.apache.hadoop.fs.Path(path.toUri());
+      final FileSystem fs = FSUtils.getFs(hadoopPath.toString(), 
this.conf.conf());
+      final FileStatus pathFile = fs.getFileStatus(hadoopPath);
+
+      if (pathFile.isDir()) {
+        totalLength += addFilesInDir(hadoopPath, files, true);
+      } else {
+        testForUnsplittable(pathFile);
+
+        files.add(pathFile);
+        totalLength += pathFile.getLen();
+      }
+    }
+
+    // returns if unsplittable
+    if (unsplittable) {
+      int splitNum = 0;
+      for (final FileStatus file : files) {
+        final FileSystem fs = FSUtils.getFs(file.getPath().toString(), 
this.conf.conf());
+        final BlockLocation[] blocks = fs.getFileBlockLocations(file, 0, 
file.getLen());
+        Set<String> hosts = new HashSet<String>();
+        for (BlockLocation block : blocks) {
+          hosts.addAll(Arrays.asList(block.getHosts()));
+        }
+        long len = file.getLen();
+        if (testForUnsplittable(file)) {
+          len = READ_WHOLE_SPLIT_FLAG;
+        }
+        FileInputSplit fis = new FileInputSplit(splitNum++, new 
Path(file.getPath().toUri()), 0, len,
+            hosts.toArray(new String[hosts.size()]));
+        inputSplits.add(fis);
+      }
+      return inputSplits.toArray(new FileInputSplit[inputSplits.size()]);
+    }
+
+
+    final long maxSplitSize = totalLength / minNumSplits + (totalLength % 
minNumSplits == 0 ? 0 : 1);
+
+    // now that we have the files, generate the splits
+    int splitNum = 0;
+    for (final FileStatus file : files) {
+
+      final FileSystem fs = FSUtils.getFs(file.getPath().toString(), 
this.conf.conf());
+      final long len = file.getLen();
+      final long blockSize = file.getBlockSize();
+
+      final long minSplitSize;
+      if (this.minSplitSize <= blockSize) {
+        minSplitSize = this.minSplitSize;
+      } else {
+        if (LOG.isWarnEnabled()) {
+          LOG.warn("Minimal split size of " + this.minSplitSize + " is larger 
than the block size of "
+              + blockSize + ". Decreasing minimal split size to block size.");
+        }
+        minSplitSize = blockSize;
+      }
+
+      final long splitSize = Math.max(minSplitSize, Math.min(maxSplitSize, 
blockSize));
+      final long halfSplit = splitSize >>> 1;
+
+      final long maxBytesForLastSplit = (long) (splitSize * 1.1f);
+
+      if (len > 0) {
+
+        // get the block locations and make sure they are in order with 
respect to their offset
+        final BlockLocation[] blocks = fs.getFileBlockLocations(file, 0, len);
+        Arrays.sort(blocks);
+
+        long bytesUnassigned = len;
+        long position = 0;
+
+        int blockIndex = 0;
+
+        while (bytesUnassigned > maxBytesForLastSplit) {
+          // get the block containing the majority of the data
+          blockIndex = getBlockIndexForPosition(blocks, position, halfSplit, 
blockIndex);
+          // create a new split
+          FileInputSplit fis = new FileInputSplit(splitNum++, new 
Path(file.getPath().toUri()), position, splitSize,
+              blocks[blockIndex].getHosts());
+          inputSplits.add(fis);
+
+          // adjust the positions
+          position += splitSize;
+          bytesUnassigned -= splitSize;
+        }
+
+        // assign the last split
+        if (bytesUnassigned > 0) {
+          blockIndex = getBlockIndexForPosition(blocks, position, halfSplit, 
blockIndex);
+          final FileInputSplit fis = new FileInputSplit(splitNum++, new 
Path(file.getPath().toUri()), position,
+              bytesUnassigned, blocks[blockIndex].getHosts());
+          inputSplits.add(fis);
+        }
+      } else {
+        // special case with a file of zero bytes size
+        final BlockLocation[] blocks = fs.getFileBlockLocations(file, 0, 0);
+        String[] hosts;
+        if (blocks.length > 0) {
+          hosts = blocks[0].getHosts();
+        } else {
+          hosts = new String[0];
+        }
+        final FileInputSplit fis = new FileInputSplit(splitNum++, new 
Path(file.getPath().toUri()), 0, 0, hosts);
+        inputSplits.add(fis);
+      }
+    }
+
+    return inputSplits.toArray(new FileInputSplit[inputSplits.size()]);
+  }
+
+  @Override
   public boolean supportsMultiPaths() {
     return true;
   }
@@ -131,4 +280,106 @@ public class CopyOnWriteInputFormat extends 
FileInputFormat<RowData> {
     }
     this.reader = null;
   }
+
+  /**
+   * Enumerate all files in the directory and recursive if 
enumerateNestedFiles is true.
+   *
+   * @return the total length of accepted files.
+   */
+  private long addFilesInDir(org.apache.hadoop.fs.Path path, List<FileStatus> 
files, boolean logExcludedFiles)
+      throws IOException {
+    final org.apache.hadoop.fs.Path hadoopPath = new 
org.apache.hadoop.fs.Path(path.toUri());
+    final FileSystem fs = FSUtils.getFs(hadoopPath.toString(), 
this.conf.conf());
+
+    long length = 0;
+
+    for (FileStatus dir : fs.listStatus(hadoopPath)) {
+      if (dir.isDir()) {
+        if (acceptFile(dir) && enumerateNestedFiles) {
+          length += addFilesInDir(dir.getPath(), files, logExcludedFiles);
+        } else {
+          if (logExcludedFiles && LOG.isDebugEnabled()) {
+            LOG.debug("Directory " + dir.getPath().toString() + " did not pass 
the file-filter and is excluded.");
+          }
+        }
+      } else {
+        if (acceptFile(dir)) {
+          files.add(dir);
+          length += dir.getLen();
+          testForUnsplittable(dir);
+        } else {
+          if (logExcludedFiles && LOG.isDebugEnabled()) {
+            LOG.debug("Directory " + dir.getPath().toString() + " did not pass 
the file-filter and is excluded.");
+          }
+        }
+      }
+    }
+    return length;
+  }
+
+  @Override
+  public void setFilesFilter(FilePathFilter filesFilter) {
+    this.localFilesFilter = filesFilter;
+    super.setFilesFilter(filesFilter);
+  }
+
+  /**
+   * A simple hook to filter files and directories from the input.
+   * The method may be overridden. Hadoop's FileInputFormat has a similar 
mechanism and applies the
+   * same filters by default.
+   *
+   * @param fileStatus The file status to check.
+   * @return true, if the given file or directory is accepted
+   */
+  public boolean acceptFile(FileStatus fileStatus) {
+    final String name = fileStatus.getPath().getName();
+    return !name.startsWith("_")
+        && !name.startsWith(".")
+        && !localFilesFilter.filterPath(new 
Path(fileStatus.getPath().toUri()));
+  }
+
+  /**
+   * Retrieves the index of the <tt>BlockLocation</tt> that contains the part 
of the file described by the given
+   * offset.
+   *
+   * @param blocks     The different blocks of the file. Must be ordered by 
their offset.
+   * @param offset     The offset of the position in the file.
+   * @param startIndex The earliest index to look at.
+   * @return The index of the block containing the given position.
+   */
+  private int getBlockIndexForPosition(BlockLocation[] blocks, long offset, 
long halfSplitSize, int startIndex) {
+    // go over all indexes after the startIndex
+    for (int i = startIndex; i < blocks.length; i++) {
+      long blockStart = blocks[i].getOffset();
+      long blockEnd = blockStart + blocks[i].getLength();
+
+      if (offset >= blockStart && offset < blockEnd) {
+        // got the block where the split starts
+        // check if the next block contains more than this one does
+        if (i < blocks.length - 1 && blockEnd - offset < halfSplitSize) {
+          return i + 1;
+        } else {
+          return i;
+        }
+      }
+    }
+    throw new IllegalArgumentException("The given offset is not contained in 
the any block.");
+  }
+
+  private boolean testForUnsplittable(FileStatus pathFile) {
+    if (getInflaterInputStreamFactory(pathFile.getPath()) != null) {
+      unsplittable = true;
+      return true;
+    }
+    return false;
+  }
+
+  private InflaterInputStreamFactory<?> 
getInflaterInputStreamFactory(org.apache.hadoop.fs.Path path) {
+    String fileExtension = extractFileExtension(path.getName());
+    if (fileExtension != null) {
+      return getInflaterInputStreamFactory(fileExtension);
+    } else {
+      return null;
+    }
+  }
 }
diff --git 
a/hudi-flink/src/main/java/org/apache/hudi/source/format/mor/MergeOnReadInputFormat.java
 
b/hudi-flink/src/main/java/org/apache/hudi/source/format/mor/MergeOnReadInputFormat.java
index 510b5b5..e2951ab 100644
--- 
a/hudi-flink/src/main/java/org/apache/hudi/source/format/mor/MergeOnReadInputFormat.java
+++ 
b/hudi-flink/src/main/java/org/apache/hudi/source/format/mor/MergeOnReadInputFormat.java
@@ -26,6 +26,7 @@ import org.apache.hudi.operator.FlinkOptions;
 import org.apache.hudi.source.format.FilePathUtils;
 import org.apache.hudi.source.format.FormatUtils;
 import org.apache.hudi.source.format.cow.ParquetColumnarRowSplitReader;
+import org.apache.hudi.source.format.cow.ParquetSplitReaderUtil;
 import org.apache.hudi.util.AvroToRowDataConverters;
 import org.apache.hudi.util.RowDataToAvroConverters;
 import org.apache.hudi.util.StreamerUtil;
@@ -40,7 +41,6 @@ import 
org.apache.flink.api.common.io.statistics.BaseStatistics;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.core.io.InputSplitAssigner;
-import org.apache.hudi.source.format.cow.ParquetSplitReaderUtil;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.types.DataType;
 import org.apache.flink.table.types.logical.RowType;
@@ -238,7 +238,7 @@ public class MergeOnReadInputFormat
   private ParquetColumnarRowSplitReader getReader(String path, int[] 
requiredPos) throws IOException {
     // generate partition specs.
     LinkedHashMap<String, String> partSpec = 
FilePathUtils.extractPartitionKeyValues(
-        new org.apache.flink.core.fs.Path(path).getParent(),
+        new org.apache.hadoop.fs.Path(path).getParent(),
         this.conf.getBoolean(FlinkOptions.HIVE_STYLE_PARTITION),
         this.conf.getString(FlinkOptions.PARTITION_PATH_FIELD).split(","));
     LinkedHashMap<String, Object> partObjects = new LinkedHashMap<>();
diff --git a/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java 
b/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
index b5326bd..5364230 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
@@ -42,7 +42,6 @@ import 
org.apache.hudi.table.action.compact.CompactionTriggerStrategy;
 
 import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericRecord;
-import org.apache.flink.api.java.hadoop.mapred.utils.HadoopUtils;
 import org.apache.flink.configuration.ConfigOption;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.util.Preconditions;
@@ -53,7 +52,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.BufferedReader;
-import java.io.File;
 import java.io.IOException;
 import java.io.StringReader;
 import java.util.List;
@@ -127,50 +125,9 @@ public class StreamerUtil {
     return conf;
   }
 
+  // Keep to avoid to much modifications.
   public static org.apache.hadoop.conf.Configuration getHadoopConf() {
-    // create hadoop configuration with hadoop conf directory configured.
-    org.apache.hadoop.conf.Configuration hadoopConf = null;
-    for (String possibleHadoopConfPath : 
HadoopUtils.possibleHadoopConfPaths(new Configuration())) {
-      hadoopConf = getHadoopConfiguration(possibleHadoopConfPath);
-      if (hadoopConf != null) {
-        break;
-      }
-    }
-    if (hadoopConf == null) {
-      hadoopConf = new org.apache.hadoop.conf.Configuration();
-    }
-    return hadoopConf;
-  }
-
-  /**
-   * Returns a new Hadoop Configuration object using the path to the hadoop 
conf configured.
-   *
-   * @param hadoopConfDir Hadoop conf directory path.
-   * @return A Hadoop configuration instance.
-   */
-  private static org.apache.hadoop.conf.Configuration 
getHadoopConfiguration(String hadoopConfDir) {
-    if (new File(hadoopConfDir).exists()) {
-      org.apache.hadoop.conf.Configuration hadoopConfiguration = new 
org.apache.hadoop.conf.Configuration();
-      File coreSite = new File(hadoopConfDir, "core-site.xml");
-      if (coreSite.exists()) {
-        hadoopConfiguration.addResource(new Path(coreSite.getAbsolutePath()));
-      }
-      File hdfsSite = new File(hadoopConfDir, "hdfs-site.xml");
-      if (hdfsSite.exists()) {
-        hadoopConfiguration.addResource(new Path(hdfsSite.getAbsolutePath()));
-      }
-      File yarnSite = new File(hadoopConfDir, "yarn-site.xml");
-      if (yarnSite.exists()) {
-        hadoopConfiguration.addResource(new Path(yarnSite.getAbsolutePath()));
-      }
-      // Add mapred-site.xml. We need to read configurations like compression 
codec.
-      File mapredSite = new File(hadoopConfDir, "mapred-site.xml");
-      if (mapredSite.exists()) {
-        hadoopConfiguration.addResource(new 
Path(mapredSite.getAbsolutePath()));
-      }
-      return hadoopConfiguration;
-    }
-    return null;
+    return FlinkClientUtil.getHadoopConf();
   }
 
   /**
@@ -291,21 +248,22 @@ public class StreamerUtil {
     final String basePath = conf.getString(FlinkOptions.PATH);
     final org.apache.hadoop.conf.Configuration hadoopConf = 
StreamerUtil.getHadoopConf();
     // Hadoop FileSystem
-    try (FileSystem fs = FSUtils.getFs(basePath, hadoopConf)) {
-      if (!fs.exists(new Path(basePath, 
HoodieTableMetaClient.METAFOLDER_NAME))) {
-        HoodieTableMetaClient.withPropertyBuilder()
+    FileSystem fs = FSUtils.getFs(basePath, hadoopConf);
+    if (!fs.exists(new Path(basePath, HoodieTableMetaClient.METAFOLDER_NAME))) 
{
+      HoodieTableMetaClient.withPropertyBuilder()
           .setTableType(conf.getString(FlinkOptions.TABLE_TYPE))
           .setTableName(conf.getString(FlinkOptions.TABLE_NAME))
           .setPayloadClassName(conf.getString(FlinkOptions.PAYLOAD_CLASS))
           .setArchiveLogFolder(DEFAULT_ARCHIVE_LOG_FOLDER)
           .setTimelineLayoutVersion(1)
           .initTable(hadoopConf, basePath);
-        LOG.info("Table initialized under base path {}", basePath);
-      } else {
-        LOG.info("Table [{}/{}] already exists, no need to initialize the 
table",
-            basePath, conf.getString(FlinkOptions.TABLE_NAME));
-      }
+      LOG.info("Table initialized under base path {}", basePath);
+    } else {
+      LOG.info("Table [{}/{}] already exists, no need to initialize the table",
+          basePath, conf.getString(FlinkOptions.TABLE_NAME));
     }
+    // Do not close the filesystem in order to use the CACHE,
+    // some of the filesystems release the handles in #close method.
   }
 
   /** Generates the bucket ID using format {partition path}_{fileID}. */
diff --git 
a/hudi-flink/src/test/java/org/apache/hudi/source/TestHoodieTableSource.java 
b/hudi-flink/src/test/java/org/apache/hudi/source/TestHoodieTableSource.java
index 48bd350..c1e2348 100644
--- a/hudi-flink/src/test/java/org/apache/hudi/source/TestHoodieTableSource.java
+++ b/hudi-flink/src/test/java/org/apache/hudi/source/TestHoodieTableSource.java
@@ -28,8 +28,8 @@ import org.apache.hudi.util.StreamerUtil;
 import org.apache.flink.api.common.io.FileInputFormat;
 import org.apache.flink.api.common.io.InputFormat;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.core.fs.Path;
 import org.apache.flink.table.data.RowData;
+import org.apache.hadoop.fs.Path;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.io.TempDir;
diff --git 
a/hudi-flink/src/test/java/org/apache/hudi/source/TestStreamReadOperator.java 
b/hudi-flink/src/test/java/org/apache/hudi/source/TestStreamReadOperator.java
index 2daa6c3..4ada381 100644
--- 
a/hudi-flink/src/test/java/org/apache/hudi/source/TestStreamReadOperator.java
+++ 
b/hudi-flink/src/test/java/org/apache/hudi/source/TestStreamReadOperator.java
@@ -36,7 +36,6 @@ import org.apache.hudi.utils.TestUtils;
 
 import org.apache.avro.Schema;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
 import org.apache.flink.streaming.api.TimeCharacteristic;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory;
@@ -48,6 +47,7 @@ import 
org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.types.DataType;
 import org.apache.flink.table.types.logical.RowType;
+import org.apache.hadoop.fs.Path;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.io.TempDir;
@@ -242,8 +242,9 @@ public class TestStreamReadOperator {
 
   private OneInputStreamOperatorTestHarness<MergeOnReadInputSplit, RowData> 
createReader() throws Exception {
     final String basePath = tempFile.getAbsolutePath();
+    final org.apache.hadoop.conf.Configuration hadoopConf = 
StreamerUtil.getHadoopConf();
     final HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder()
-        .setConf(StreamerUtil.getHadoopConf()).setBasePath(basePath).build();
+        .setConf(hadoopConf).setBasePath(basePath).build();
     final List<String> partitionKeys = Collections.singletonList("partition");
 
     // This input format is used to opening the emitted split.
@@ -262,11 +263,10 @@ public class TestStreamReadOperator {
         tableAvroSchema.toString(),
         
AvroSchemaConverter.convertToSchema(TestConfigurations.ROW_TYPE).toString(),
         Collections.emptyList());
-    Path[] paths = FilePathUtils.getReadPaths(
-        new Path(basePath), conf, partitionKeys, 
conf.getString(FlinkOptions.PARTITION_DEFAULT_NAME));
+    Path[] paths = FilePathUtils.getReadPaths(new Path(basePath), conf, 
hadoopConf, partitionKeys);
     MergeOnReadInputFormat inputFormat = new MergeOnReadInputFormat(
         conf,
-        paths,
+        FilePathUtils.toFlinkPaths(paths),
         hoodieTableState,
         rowDataType.getChildren(),
         "default",
diff --git 
a/hudi-flink/src/test/java/org/apache/hudi/source/format/TestInputFormat.java 
b/hudi-flink/src/test/java/org/apache/hudi/source/format/TestInputFormat.java
index 343f293..1d92dbb 100644
--- 
a/hudi-flink/src/test/java/org/apache/hudi/source/format/TestInputFormat.java
+++ 
b/hudi-flink/src/test/java/org/apache/hudi/source/format/TestInputFormat.java
@@ -27,9 +27,9 @@ import org.apache.hudi.util.StreamerUtil;
 
 import org.apache.flink.api.common.io.InputFormat;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.core.fs.Path;
 import org.apache.flink.core.io.InputSplit;
 import org.apache.flink.table.data.RowData;
+import org.apache.hadoop.fs.Path;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.io.TempDir;
 import org.junit.jupiter.params.ParameterizedTest;

Reply via email to