This is an automated email from the ASF dual-hosted git repository.
vinoyang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 20786ab [HUDI-1681] Support object storage for Flink writer (#2662)
20786ab is described below
commit 20786ab8a2a1e7735ab846e92802fb9f4449adc9
Author: Danny Chan <[email protected]>
AuthorDate: Fri Mar 12 16:39:24 2021 +0800
[HUDI-1681] Support object storage for Flink writer (#2662)
In order to support object storage, we need these changes:
* Use the Hadoop filesystem so that we can find the plugin filesystem
* Do not fetch file size until the file handle is closed
* Do not close the opened filesystem because we want to use the
filesystem cache
---
.../org/apache/hudi/io/HoodieAppendHandle.java | 2 +
.../apache/hudi/io/storage/HoodieFileWriter.java | 2 +
.../apache/hudi/io/storage/HoodieHFileWriter.java | 5 +
.../hudi/io/storage/HoodieParquetWriter.java | 5 +
hudi-client/hudi-flink-client/pom.xml | 6 +
.../client/common/HoodieFlinkEngineContext.java | 4 +-
.../java/org/apache/hudi/io/FlinkAppendHandle.java | 11 +-
.../java/org/apache/hudi/io/FlinkCreateHandle.java | 3 +-
.../java/org/apache/hudi/util/FlinkClientUtil.java | 81 +++++++
.../apache/hudi/factory/HoodieTableFactory.java | 2 +-
.../org/apache/hudi/source/HoodieTableSource.java | 25 +-
.../apache/hudi/source/format/FilePathUtils.java | 126 +++++-----
.../source/format/cow/CopyOnWriteInputFormat.java | 253 ++++++++++++++++++++-
.../source/format/mor/MergeOnReadInputFormat.java | 4 +-
.../java/org/apache/hudi/util/StreamerUtil.java | 64 +-----
.../apache/hudi/source/TestHoodieTableSource.java | 2 +-
.../apache/hudi/source/TestStreamReadOperator.java | 10 +-
.../apache/hudi/source/format/TestInputFormat.java | 2 +-
18 files changed, 443 insertions(+), 164 deletions(-)
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java
index 986afe6..b248a2f 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java
@@ -274,6 +274,7 @@ public class HoodieAppendHandle<T extends
HoodieRecordPayload, I, K, O> extends
if (!stat.getLogFiles().contains(result.logFile().getFileName())) {
stat.addLogFiles(result.logFile().getFileName());
}
+ stat.setFileSizeInBytes(result.size());
}
private void updateRuntimeStats(HoodieDeltaWriteStat stat) {
@@ -304,6 +305,7 @@ public class HoodieAppendHandle<T extends
HoodieRecordPayload, I, K, O> extends
} else if (stat.getPath().endsWith(result.logFile().getFileName())) {
// append/continued writing to the same log file
stat.setLogOffset(Math.min(stat.getLogOffset(), result.offset()));
+ stat.setFileSizeInBytes(stat.getFileSizeInBytes() + result.size());
accumulateWriteCounts(stat, result);
accumulateRuntimeStats(stat);
} else {
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieFileWriter.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieFileWriter.java
index ea9ecad..1aaa389 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieFileWriter.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieFileWriter.java
@@ -33,4 +33,6 @@ public interface HoodieFileWriter<R extends IndexedRecord> {
void close() throws IOException;
void writeAvro(String key, R oldRecord) throws IOException;
+
+ long getBytesWritten();
}
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieHFileWriter.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieHFileWriter.java
index d77483c..352c51c 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieHFileWriter.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieHFileWriter.java
@@ -156,4 +156,9 @@ public class HoodieHFileWriter<T extends
HoodieRecordPayload, R extends IndexedR
writer.close();
writer = null;
}
+
+ @Override
+ public long getBytesWritten() {
+ return fs.getBytesWritten(file);
+ }
}
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieParquetWriter.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieParquetWriter.java
index d1c695a..c3939d7 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieParquetWriter.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieParquetWriter.java
@@ -94,4 +94,9 @@ public class HoodieParquetWriter<T extends
HoodieRecordPayload, R extends Indexe
super.write(object);
writeSupport.add(key);
}
+
+ @Override
+ public long getBytesWritten() {
+ return fs.getBytesWritten(file);
+ }
}
diff --git a/hudi-client/hudi-flink-client/pom.xml
b/hudi-client/hudi-flink-client/pom.xml
index 7408b4a..57231d9 100644
--- a/hudi-client/hudi-flink-client/pom.xml
+++ b/hudi-client/hudi-flink-client/pom.xml
@@ -46,6 +46,12 @@
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_${scala.binary.version}</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+
<artifactId>flink-hadoop-compatibility_${scala.binary.version}</artifactId>
+ <version>${flink.version}</version>
+ <scope>provided</scope>
+ </dependency>
<!-- Parquet -->
<dependency>
diff --git
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/common/HoodieFlinkEngineContext.java
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/common/HoodieFlinkEngineContext.java
index 6c7f44b..7713c1e 100644
---
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/common/HoodieFlinkEngineContext.java
+++
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/common/HoodieFlinkEngineContext.java
@@ -29,7 +29,6 @@ import
org.apache.hudi.common.function.SerializablePairFunction;
import org.apache.hudi.common.util.Option;
import org.apache.flink.api.common.functions.RuntimeContext;
-import org.apache.hadoop.conf.Configuration;
import java.util.List;
import java.util.Map;
@@ -37,6 +36,7 @@ import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.util.FlinkClientUtil;
import static
org.apache.hudi.common.function.FunctionWrapper.throwingFlatMapWrapper;
import static
org.apache.hudi.common.function.FunctionWrapper.throwingForeachWrapper;
@@ -50,7 +50,7 @@ public class HoodieFlinkEngineContext extends
HoodieEngineContext {
private RuntimeContext runtimeContext;
public HoodieFlinkEngineContext(TaskContextSupplier taskContextSupplier) {
- this(new SerializableConfiguration(new Configuration()),
taskContextSupplier);
+ this(new SerializableConfiguration(FlinkClientUtil.getHadoopConf()),
taskContextSupplier);
}
public HoodieFlinkEngineContext(SerializableConfiguration hadoopConf,
TaskContextSupplier taskContextSupplier) {
diff --git
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkAppendHandle.java
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkAppendHandle.java
index 9d56d47..b827bf2 100644
---
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkAppendHandle.java
+++
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkAppendHandle.java
@@ -20,14 +20,12 @@ package org.apache.hudi.io;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.engine.TaskContextSupplier;
-import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieUpsertException;
import org.apache.hudi.table.HoodieTable;
-import org.apache.hadoop.fs.Path;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -98,14 +96,7 @@ public class FlinkAppendHandle<T extends
HoodieRecordPayload, I, K, O> extends H
needBootStrap = false;
// flush any remaining records to disk
appendDataAndDeleteBlocks(header);
- try {
- for (WriteStatus status: statuses) {
- long logFileSize = FSUtils.getFileSize(fs, new
Path(config.getBasePath(), status.getStat().getPath()));
- status.getStat().setFileSizeInBytes(logFileSize);
- }
- } catch (IOException e) {
- throw new HoodieUpsertException("Failed to get file size for append
handle", e);
- }
+ // need to fix that the incremental write size in bytes may be lost
List<WriteStatus> ret = new ArrayList<>(statuses);
statuses.clear();
return ret;
diff --git
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkCreateHandle.java
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkCreateHandle.java
index ce3725b..6f4638e 100644
---
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkCreateHandle.java
+++
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkCreateHandle.java
@@ -20,7 +20,6 @@ package org.apache.hudi.io;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.engine.TaskContextSupplier;
-import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.HoodieWriteStat;
@@ -109,7 +108,7 @@ public class FlinkCreateHandle<T extends
HoodieRecordPayload, I, K, O>
* @throws IOException if error occurs
*/
private void setUpWriteStatus() throws IOException {
- long fileSizeInBytes = FSUtils.getFileSize(fs, path);
+ long fileSizeInBytes = fileWriter.getBytesWritten();
long incFileSizeInBytes = fileSizeInBytes - lastFileSize;
this.lastFileSize = fileSizeInBytes;
HoodieWriteStat stat = new HoodieWriteStat();
diff --git
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/util/FlinkClientUtil.java
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/util/FlinkClientUtil.java
new file mode 100644
index 0000000..c38c1f1
--- /dev/null
+++
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/util/FlinkClientUtil.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.util;
+
+import org.apache.flink.api.java.hadoop.mapred.utils.HadoopUtils;
+import org.apache.flink.configuration.Configuration;
+import org.apache.hadoop.fs.Path;
+
+import java.io.File;
+
+/**
+ * Utilities for Hoodie Flink client.
+ */
+public class FlinkClientUtil {
+
+ /**
+ * Returns the hadoop configuration with possible hadoop conf paths.
+ * E.G. the configurations under path $HADOOP_CONF_DIR and $HADOOP_HOME.
+ */
+ public static org.apache.hadoop.conf.Configuration getHadoopConf() {
+ // create hadoop configuration with hadoop conf directory configured.
+ org.apache.hadoop.conf.Configuration hadoopConf = null;
+ for (String possibleHadoopConfPath :
HadoopUtils.possibleHadoopConfPaths(new Configuration())) {
+ hadoopConf = getHadoopConfiguration(possibleHadoopConfPath);
+ if (hadoopConf != null) {
+ break;
+ }
+ }
+ if (hadoopConf == null) {
+ hadoopConf = new org.apache.hadoop.conf.Configuration();
+ }
+ return hadoopConf;
+ }
+
+ /**
+ * Returns a new Hadoop Configuration object using the path to the hadoop
conf configured.
+ *
+ * @param hadoopConfDir Hadoop conf directory path.
+ * @return A Hadoop configuration instance.
+ */
+ private static org.apache.hadoop.conf.Configuration
getHadoopConfiguration(String hadoopConfDir) {
+ if (new File(hadoopConfDir).exists()) {
+ org.apache.hadoop.conf.Configuration hadoopConfiguration = new
org.apache.hadoop.conf.Configuration();
+ File coreSite = new File(hadoopConfDir, "core-site.xml");
+ if (coreSite.exists()) {
+ hadoopConfiguration.addResource(new Path(coreSite.getAbsolutePath()));
+ }
+ File hdfsSite = new File(hadoopConfDir, "hdfs-site.xml");
+ if (hdfsSite.exists()) {
+ hadoopConfiguration.addResource(new Path(hdfsSite.getAbsolutePath()));
+ }
+ File yarnSite = new File(hadoopConfDir, "yarn-site.xml");
+ if (yarnSite.exists()) {
+ hadoopConfiguration.addResource(new Path(yarnSite.getAbsolutePath()));
+ }
+ // Add mapred-site.xml. We need to read configurations like compression
codec.
+ File mapredSite = new File(hadoopConfDir, "mapred-site.xml");
+ if (mapredSite.exists()) {
+ hadoopConfiguration.addResource(new
Path(mapredSite.getAbsolutePath()));
+ }
+ return hadoopConfiguration;
+ }
+ return null;
+ }
+}
diff --git
a/hudi-flink/src/main/java/org/apache/hudi/factory/HoodieTableFactory.java
b/hudi-flink/src/main/java/org/apache/hudi/factory/HoodieTableFactory.java
index 4fbe4cd..28883ba 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/factory/HoodieTableFactory.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/factory/HoodieTableFactory.java
@@ -24,7 +24,6 @@ import org.apache.hudi.source.HoodieTableSource;
import org.apache.hudi.util.AvroSchemaConverter;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.core.fs.Path;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.data.RowData;
@@ -35,6 +34,7 @@ import org.apache.flink.table.sinks.TableSink;
import org.apache.flink.table.sources.TableSource;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.utils.TableSchemaUtils;
+import org.apache.hadoop.fs.Path;
import java.util.Collections;
import java.util.HashMap;
diff --git
a/hudi-flink/src/main/java/org/apache/hudi/source/HoodieTableSource.java
b/hudi-flink/src/main/java/org/apache/hudi/source/HoodieTableSource.java
index 2a8fbe0..e78c656 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/source/HoodieTableSource.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/source/HoodieTableSource.java
@@ -49,7 +49,6 @@ import org.apache.flink.api.common.io.InputFormat;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.io.CollectionInputFormat;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
@@ -70,6 +69,7 @@ import org.apache.flink.table.sources.TableSource;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.JobConf;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -166,7 +166,7 @@ public class HoodieTableSource implements
(TypeInformation<RowData>)
TypeInfoDataTypeConverter.fromDataTypeToTypeInfo(getProducedDataType());
if (conf.getBoolean(FlinkOptions.READ_AS_STREAMING)) {
StreamReadMonitoringFunction monitoringFunction = new
StreamReadMonitoringFunction(
- conf, path, metaClient, maxCompactionMemoryInBytes);
+ conf, FilePathUtils.toFlinkPath(path), metaClient,
maxCompactionMemoryInBytes);
OneInputStreamOperatorFactory<MergeOnReadInputSplit, RowData> factory =
StreamReadOperator.factory((MergeOnReadInputFormat) getInputFormat(true));
SingleOutputStreamOperator<RowData> source =
execEnv.addSource(monitoringFunction, "streaming_source")
.setParallelism(1)
@@ -213,7 +213,8 @@ public class HoodieTableSource implements
@Override
public List<Map<String, String>> getPartitions() {
- return FilePathUtils.getPartitions(path, conf, partitionKeys,
defaultPartName);
+ return FilePathUtils.getPartitions(path, hadoopConf, partitionKeys,
defaultPartName,
+ conf.getBoolean(FlinkOptions.HIVE_STYLE_PARTITION));
}
@Override
@@ -253,7 +254,8 @@ public class HoodieTableSource implements
private List<MergeOnReadInputSplit> buildFileIndex(Path[] paths) {
FileStatus[] fileStatuses = Arrays.stream(paths)
- .flatMap(path ->
Arrays.stream(FilePathUtils.getHadoopFileStatusRecursively(path, 1,
hadoopConf)))
+ .flatMap(path ->
+ Arrays.stream(FilePathUtils.getFileStatusRecursively(path, 1,
hadoopConf)))
.toArray(FileStatus[]::new);
if (fileStatuses.length == 0) {
throw new HoodieException("No files found for reading in user provided
path.");
@@ -281,9 +283,7 @@ public class HoodieTableSource implements
} else {
// all the files are logs
return Arrays.stream(paths).map(partitionPath -> {
- String relPartitionPath = FSUtils.getRelativePartitionPath(
- new org.apache.hadoop.fs.Path(path.toUri()),
- new org.apache.hadoop.fs.Path(partitionPath.toUri()));
+ String relPartitionPath = FSUtils.getRelativePartitionPath(path,
partitionPath);
return fsView.getLatestMergedFileSlicesBeforeOrOn(relPartitionPath,
latestCommit)
.map(fileSlice -> {
Option<List<String>> logPaths =
Option.ofNullable(fileSlice.getLogFiles()
@@ -351,14 +351,14 @@ public class HoodieTableSource implements
inputSplits);
return new MergeOnReadInputFormat(
this.conf,
- paths,
+ FilePathUtils.toFlinkPaths(paths),
hoodieTableState,
rowDataType.getChildren(), // use the explicit fields data type
because the AvroSchemaConverter is not very stable.
"default",
this.limit);
case COPY_ON_WRITE:
FileInputFormat<RowData> format = new CopyOnWriteInputFormat(
- paths,
+ FilePathUtils.toFlinkPaths(paths),
this.schema.getFieldNames(),
this.schema.getFieldDataTypes(),
this.requiredPos,
@@ -398,7 +398,8 @@ public class HoodieTableSource implements
public Path[] getReadPaths() {
return partitionKeys.isEmpty()
? new Path[] {path}
- : FilePathUtils.partitionPath2ReadPath(path, conf, partitionKeys,
getOrFetchPartitions());
+ : FilePathUtils.partitionPath2ReadPath(path, partitionKeys,
getOrFetchPartitions(),
+ conf.getBoolean(FlinkOptions.HIVE_STYLE_PARTITION));
}
private static class LatestFileFilter extends FilePathFilter {
@@ -409,8 +410,8 @@ public class HoodieTableSource implements
}
@Override
- public boolean filterPath(Path filePath) {
- return !this.hoodieFilter.accept(new
org.apache.hadoop.fs.Path(filePath.toUri()));
+ public boolean filterPath(org.apache.flink.core.fs.Path filePath) {
+ return !this.hoodieFilter.accept(new Path(filePath.toUri()));
}
}
}
diff --git
a/hudi-flink/src/main/java/org/apache/hudi/source/format/FilePathUtils.java
b/hudi-flink/src/main/java/org/apache/hudi/source/format/FilePathUtils.java
index 03bf53d..1b029b2 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/source/format/FilePathUtils.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/source/format/FilePathUtils.java
@@ -22,14 +22,16 @@ import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.operator.FlinkOptions;
import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.core.fs.FileStatus;
-import org.apache.flink.core.fs.FileSystem;
-import org.apache.flink.core.fs.Path;
import org.apache.flink.table.api.TableException;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.BitSet;
import java.util.LinkedHashMap;
import java.util.List;
@@ -77,14 +79,14 @@ public class FilePathUtils {
/**
* Make partition path from partition spec.
*
- * @param partitionKVs The partition key value mapping
- * @param hiveStylePartition Whether the partition path is with Hive style,
- * e.g. {partition key} = {partition value}
+ * @param partitionKVs The partition key value mapping
+ * @param hivePartition Whether the partition path is with Hive style,
+ * e.g. {partition key} = {partition value}
* @return an escaped, valid partition name
*/
public static String generatePartitionPath(
LinkedHashMap<String, String> partitionKVs,
- boolean hiveStylePartition) {
+ boolean hivePartition) {
if (partitionKVs.isEmpty()) {
return "";
}
@@ -92,16 +94,16 @@ public class FilePathUtils {
int i = 0;
for (Map.Entry<String, String> e : partitionKVs.entrySet()) {
if (i > 0) {
- suffixBuf.append(Path.SEPARATOR);
+ suffixBuf.append(File.separator);
}
- if (hiveStylePartition) {
+ if (hivePartition) {
suffixBuf.append(escapePathName(e.getKey()));
suffixBuf.append('=');
}
suffixBuf.append(escapePathName(e.getValue()));
i++;
}
- suffixBuf.append(Path.SEPARATOR);
+ suffixBuf.append(File.separator);
return suffixBuf.toString();
}
@@ -235,7 +237,11 @@ public class FilePathUtils {
return ret;
}
- private static FileStatus[] getFileStatusRecursively(Path path, int
expectLevel, FileSystem fs) {
+ public static FileStatus[] getFileStatusRecursively(Path path, int
expectLevel, Configuration conf) {
+ return getFileStatusRecursively(path, expectLevel,
FSUtils.getFs(path.toString(), conf));
+ }
+
+ public static FileStatus[] getFileStatusRecursively(Path path, int
expectLevel, FileSystem fs) {
ArrayList<FileStatus> result = new ArrayList<>();
try {
@@ -268,54 +274,6 @@ public class FilePathUtils {
private static boolean isHiddenFile(FileStatus fileStatus) {
String name = fileStatus.getPath().getName();
- return name.startsWith("_") || name.startsWith(".");
- }
-
- /**
- * Same as getFileStatusRecursively but returns hadoop {@link
org.apache.hadoop.fs.FileStatus}s.
- */
- public static org.apache.hadoop.fs.FileStatus[]
getHadoopFileStatusRecursively(
- Path path, int expectLevel, Configuration hadoopConf) {
- ArrayList<org.apache.hadoop.fs.FileStatus> result = new ArrayList<>();
-
- org.apache.hadoop.fs.Path hadoopPath = new
org.apache.hadoop.fs.Path(path.toUri());
- org.apache.hadoop.fs.FileSystem fs = FSUtils.getFs(path.getPath(),
hadoopConf);
-
- try {
- org.apache.hadoop.fs.FileStatus fileStatus =
fs.getFileStatus(hadoopPath);
- listStatusRecursivelyV2(fs, fileStatus, 0, expectLevel, result);
- } catch (IOException ignore) {
- return new org.apache.hadoop.fs.FileStatus[0];
- }
-
- return result.toArray(new org.apache.hadoop.fs.FileStatus[0]);
- }
-
- private static void listStatusRecursivelyV2(
- org.apache.hadoop.fs.FileSystem fs,
- org.apache.hadoop.fs.FileStatus fileStatus,
- int level,
- int expectLevel,
- List<org.apache.hadoop.fs.FileStatus> results) throws IOException {
- if (isHiddenFileV2(fileStatus)) {
- // do nothing
- return;
- }
-
- if (expectLevel == level) {
- results.add(fileStatus);
- return;
- }
-
- if (fileStatus.isDirectory()) {
- for (org.apache.hadoop.fs.FileStatus stat :
fs.listStatus(fileStatus.getPath())) {
- listStatusRecursivelyV2(fs, stat, level + 1, expectLevel, results);
- }
- }
- }
-
- private static boolean isHiddenFileV2(org.apache.hadoop.fs.FileStatus
fileStatus) {
- String name = fileStatus.getPath().getName();
// the log files is hidden file
return name.startsWith("_") || name.startsWith(".") &&
!name.contains(".log.");
}
@@ -333,21 +291,23 @@ public class FilePathUtils {
* <p>The return list should be [{key1:val1, key2:val2, key3:val3},
{key1:val4, key2:val5, key3:val6}].
*
* @param path The base path
- * @param conf The configuration
+ * @param hadoopConf The hadoop configuration
* @param partitionKeys The partition key list
* @param defaultParName The default partition name for nulls
+ * @param hivePartition Whether the partition path is in Hive style
*/
public static List<Map<String, String>> getPartitions(
Path path,
- org.apache.flink.configuration.Configuration conf,
+ Configuration hadoopConf,
List<String> partitionKeys,
- String defaultParName) {
+ String defaultParName,
+ boolean hivePartition) {
try {
return FilePathUtils
.searchPartKeyValueAndPaths(
- path.getFileSystem(),
+ FSUtils.getFs(path.toString(), hadoopConf),
path,
- conf.getBoolean(FlinkOptions.HIVE_STYLE_PARTITION),
+ hivePartition,
partitionKeys.toArray(new String[0]))
.stream()
.map(tuple2 -> tuple2.f0)
@@ -386,21 +346,23 @@ public class FilePathUtils {
* Returns all the file paths that is the parents of the data files.
*
* @param path The base path
- * @param conf The configuration
+ * @param conf The Flink configuration
+ * @param hadoopConf The hadoop configuration
* @param partitionKeys The partition key list
- * @param defaultParName The default partition name for nulls
*/
public static Path[] getReadPaths(
Path path,
org.apache.flink.configuration.Configuration conf,
- List<String> partitionKeys,
- String defaultParName) {
+ Configuration hadoopConf,
+ List<String> partitionKeys) {
if (partitionKeys.isEmpty()) {
return new Path[] {path};
} else {
+ final String defaultParName =
conf.getString(FlinkOptions.PARTITION_DEFAULT_NAME);
+ final boolean hivePartition =
conf.getBoolean(FlinkOptions.HIVE_STYLE_PARTITION);
List<Map<String, String>> partitionPaths =
- getPartitions(path, conf, partitionKeys, defaultParName);
- return partitionPath2ReadPath(path, conf, partitionKeys, partitionPaths);
+ getPartitions(path, hadoopConf, partitionKeys, defaultParName,
hivePartition);
+ return partitionPath2ReadPath(path, partitionKeys, partitionPaths,
hivePartition);
}
}
@@ -408,21 +370,37 @@ public class FilePathUtils {
* Transforms the given partition key value mapping to read paths.
*
* @param path The base path
- * @param conf The hadoop configuration
* @param partitionKeys The partition key list
* @param partitionPaths The partition key value mapping
+ * @param hivePartition Whether the partition path is in Hive style
*
* @see #getReadPaths
*/
public static Path[] partitionPath2ReadPath(
Path path,
- org.apache.flink.configuration.Configuration conf,
List<String> partitionKeys,
- List<Map<String, String>> partitionPaths) {
+ List<Map<String, String>> partitionPaths,
+ boolean hivePartition) {
return partitionPaths.stream()
.map(m -> validateAndReorderPartitions(m, partitionKeys))
- .map(kvs -> FilePathUtils.generatePartitionPath(kvs,
conf.getBoolean(FlinkOptions.HIVE_STYLE_PARTITION)))
+ .map(kvs -> FilePathUtils.generatePartitionPath(kvs, hivePartition))
.map(n -> new Path(path, n))
.toArray(Path[]::new);
}
+
+ /**
+ * Transforms the array of Hadoop paths to Flink paths.
+ */
+ public static org.apache.flink.core.fs.Path[] toFlinkPaths(Path[] paths) {
+ return Arrays.stream(paths)
+ .map(p -> toFlinkPath(p))
+ .toArray(org.apache.flink.core.fs.Path[]::new);
+ }
+
+ /**
+ * Transforms the Hadoop path to Flink path.
+ */
+ public static org.apache.flink.core.fs.Path toFlinkPath(Path path) {
+ return new org.apache.flink.core.fs.Path(path.toUri());
+ }
}
diff --git
a/hudi-flink/src/main/java/org/apache/hudi/source/format/cow/CopyOnWriteInputFormat.java
b/hudi-flink/src/main/java/org/apache/hudi/source/format/cow/CopyOnWriteInputFormat.java
index 2a28d85..709c32e 100644
---
a/hudi-flink/src/main/java/org/apache/hudi/source/format/cow/CopyOnWriteInputFormat.java
+++
b/hudi-flink/src/main/java/org/apache/hudi/source/format/cow/CopyOnWriteInputFormat.java
@@ -18,7 +18,12 @@
package org.apache.hudi.source.format.cow;
+import org.apache.hudi.common.fs.FSUtils;
+
import org.apache.flink.api.common.io.FileInputFormat;
+import org.apache.flink.api.common.io.FilePathFilter;
+import org.apache.flink.api.common.io.GlobFilePathFilter;
+import org.apache.flink.api.common.io.compression.InflaterInputStreamFactory;
import org.apache.flink.core.fs.FileInputSplit;
import org.apache.flink.core.fs.Path;
import org.apache.flink.formats.parquet.utils.SerializableConfiguration;
@@ -26,11 +31,19 @@ import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.utils.PartitionPathUtils;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.BlockLocation;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.Arrays;
+import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
+import java.util.Set;
import static
org.apache.flink.table.data.vector.VectorizedColumnBatch.DEFAULT_SIZE;
import static
org.apache.flink.table.filesystem.RowPartitionComputer.restorePartValueFromType;
@@ -43,11 +56,16 @@ import static
org.apache.flink.table.filesystem.RowPartitionComputer.restorePart
* {@code
org.apache.flink.formats.parquet.ParquetFileSystemFormatFactory.ParquetInputFormat}
* to support TIMESTAMP_MILLIS.
*
+ * <p>Note: Override the {@link #createInputSplits} method from parent to
rewrite the logic creating the FileSystem,
+ * use {@link FSUtils#getFs} to get a plugin filesystem.
+ *
* @see ParquetSplitReaderUtil
*/
public class CopyOnWriteInputFormat extends FileInputFormat<RowData> {
private static final long serialVersionUID = 1L;
+ private static final Logger LOG =
LoggerFactory.getLogger(CopyOnWriteInputFormat.class);
+
private final String[] fullFieldNames;
private final DataType[] fullFieldTypes;
private final int[] selectedFields;
@@ -59,6 +77,11 @@ public class CopyOnWriteInputFormat extends
FileInputFormat<RowData> {
private transient ParquetColumnarRowSplitReader reader;
private transient long currentReadCount;
+ /**
+ * Files filter for determining what files/directories should be included.
+ */
+ private FilePathFilter localFilesFilter = new GlobFilePathFilter();
+
public CopyOnWriteInputFormat(
Path[] paths,
String[] fullFieldNames,
@@ -98,13 +121,139 @@ public class CopyOnWriteInputFormat extends
FileInputFormat<RowData> {
partObjects,
selectedFields,
DEFAULT_SIZE,
- new Path(fileSplit.getPath().toString()),
+ fileSplit.getPath(),
fileSplit.getStart(),
fileSplit.getLength());
this.currentReadCount = 0L;
}
@Override
+ public FileInputSplit[] createInputSplits(int minNumSplits) throws
IOException {
+ if (minNumSplits < 1) {
+ throw new IllegalArgumentException("Number of input splits has to be at
least 1.");
+ }
+
+ // take the desired number of splits into account
+ minNumSplits = Math.max(minNumSplits, this.numSplits);
+
+ final List<FileInputSplit> inputSplits = new
ArrayList<FileInputSplit>(minNumSplits);
+
+ // get all the files that are involved in the splits
+ List<FileStatus> files = new ArrayList<>();
+ long totalLength = 0;
+
+ for (Path path : getFilePaths()) {
+ final org.apache.hadoop.fs.Path hadoopPath = new
org.apache.hadoop.fs.Path(path.toUri());
+ final FileSystem fs = FSUtils.getFs(hadoopPath.toString(),
this.conf.conf());
+ final FileStatus pathFile = fs.getFileStatus(hadoopPath);
+
+ if (pathFile.isDir()) {
+ totalLength += addFilesInDir(hadoopPath, files, true);
+ } else {
+ testForUnsplittable(pathFile);
+
+ files.add(pathFile);
+ totalLength += pathFile.getLen();
+ }
+ }
+
+ // returns if unsplittable
+ if (unsplittable) {
+ int splitNum = 0;
+ for (final FileStatus file : files) {
+ final FileSystem fs = FSUtils.getFs(file.getPath().toString(),
this.conf.conf());
+ final BlockLocation[] blocks = fs.getFileBlockLocations(file, 0,
file.getLen());
+ Set<String> hosts = new HashSet<String>();
+ for (BlockLocation block : blocks) {
+ hosts.addAll(Arrays.asList(block.getHosts()));
+ }
+ long len = file.getLen();
+ if (testForUnsplittable(file)) {
+ len = READ_WHOLE_SPLIT_FLAG;
+ }
+ FileInputSplit fis = new FileInputSplit(splitNum++, new
Path(file.getPath().toUri()), 0, len,
+ hosts.toArray(new String[hosts.size()]));
+ inputSplits.add(fis);
+ }
+ return inputSplits.toArray(new FileInputSplit[inputSplits.size()]);
+ }
+
+
+ final long maxSplitSize = totalLength / minNumSplits + (totalLength %
minNumSplits == 0 ? 0 : 1);
+
+ // now that we have the files, generate the splits
+ int splitNum = 0;
+ for (final FileStatus file : files) {
+
+ final FileSystem fs = FSUtils.getFs(file.getPath().toString(),
this.conf.conf());
+ final long len = file.getLen();
+ final long blockSize = file.getBlockSize();
+
+ final long minSplitSize;
+ if (this.minSplitSize <= blockSize) {
+ minSplitSize = this.minSplitSize;
+ } else {
+ if (LOG.isWarnEnabled()) {
+ LOG.warn("Minimal split size of " + this.minSplitSize + " is larger
than the block size of "
+ + blockSize + ". Decreasing minimal split size to block size.");
+ }
+ minSplitSize = blockSize;
+ }
+
+ final long splitSize = Math.max(minSplitSize, Math.min(maxSplitSize,
blockSize));
+ final long halfSplit = splitSize >>> 1;
+
+ final long maxBytesForLastSplit = (long) (splitSize * 1.1f);
+
+ if (len > 0) {
+
+ // get the block locations and make sure they are in order with
respect to their offset
+ final BlockLocation[] blocks = fs.getFileBlockLocations(file, 0, len);
+ Arrays.sort(blocks);
+
+ long bytesUnassigned = len;
+ long position = 0;
+
+ int blockIndex = 0;
+
+ while (bytesUnassigned > maxBytesForLastSplit) {
+ // get the block containing the majority of the data
+ blockIndex = getBlockIndexForPosition(blocks, position, halfSplit,
blockIndex);
+ // create a new split
+ FileInputSplit fis = new FileInputSplit(splitNum++, new
Path(file.getPath().toUri()), position, splitSize,
+ blocks[blockIndex].getHosts());
+ inputSplits.add(fis);
+
+ // adjust the positions
+ position += splitSize;
+ bytesUnassigned -= splitSize;
+ }
+
+ // assign the last split
+ if (bytesUnassigned > 0) {
+ blockIndex = getBlockIndexForPosition(blocks, position, halfSplit,
blockIndex);
+ final FileInputSplit fis = new FileInputSplit(splitNum++, new
Path(file.getPath().toUri()), position,
+ bytesUnassigned, blocks[blockIndex].getHosts());
+ inputSplits.add(fis);
+ }
+ } else {
+ // special case with a file of zero bytes size
+ final BlockLocation[] blocks = fs.getFileBlockLocations(file, 0, 0);
+ String[] hosts;
+ if (blocks.length > 0) {
+ hosts = blocks[0].getHosts();
+ } else {
+ hosts = new String[0];
+ }
+ final FileInputSplit fis = new FileInputSplit(splitNum++, new
Path(file.getPath().toUri()), 0, 0, hosts);
+ inputSplits.add(fis);
+ }
+ }
+
+ return inputSplits.toArray(new FileInputSplit[inputSplits.size()]);
+ }
+
+ @Override
public boolean supportsMultiPaths() {
return true;
}
@@ -131,4 +280,106 @@ public class CopyOnWriteInputFormat extends
FileInputFormat<RowData> {
}
this.reader = null;
}
+
+ /**
+ * Enumerate all files in the directory and recursive if
enumerateNestedFiles is true.
+ *
+ * @return the total length of accepted files.
+ */
+ private long addFilesInDir(org.apache.hadoop.fs.Path path, List<FileStatus>
files, boolean logExcludedFiles)
+ throws IOException {
+ final org.apache.hadoop.fs.Path hadoopPath = new
org.apache.hadoop.fs.Path(path.toUri());
+ final FileSystem fs = FSUtils.getFs(hadoopPath.toString(),
this.conf.conf());
+
+ long length = 0;
+
+ for (FileStatus dir : fs.listStatus(hadoopPath)) {
+ if (dir.isDir()) {
+ if (acceptFile(dir) && enumerateNestedFiles) {
+ length += addFilesInDir(dir.getPath(), files, logExcludedFiles);
+ } else {
+ if (logExcludedFiles && LOG.isDebugEnabled()) {
+ LOG.debug("Directory " + dir.getPath().toString() + " did not pass
the file-filter and is excluded.");
+ }
+ }
+ } else {
+ if (acceptFile(dir)) {
+ files.add(dir);
+ length += dir.getLen();
+ testForUnsplittable(dir);
+ } else {
+ if (logExcludedFiles && LOG.isDebugEnabled()) {
+ LOG.debug("Directory " + dir.getPath().toString() + " did not pass
the file-filter and is excluded.");
+ }
+ }
+ }
+ }
+ return length;
+ }
+
+ @Override
+ public void setFilesFilter(FilePathFilter filesFilter) {
+ this.localFilesFilter = filesFilter;
+ super.setFilesFilter(filesFilter);
+ }
+
+ /**
+ * A simple hook to filter files and directories from the input.
+ * The method may be overridden. Hadoop's FileInputFormat has a similar
mechanism and applies the
+ * same filters by default.
+ *
+ * @param fileStatus The file status to check.
+ * @return true, if the given file or directory is accepted
+ */
+ public boolean acceptFile(FileStatus fileStatus) {
+ final String name = fileStatus.getPath().getName();
+ return !name.startsWith("_")
+ && !name.startsWith(".")
+ && !localFilesFilter.filterPath(new
Path(fileStatus.getPath().toUri()));
+ }
+
+ /**
+ * Retrieves the index of the <tt>BlockLocation</tt> that contains the part
of the file described by the given
+ * offset.
+ *
+ * @param blocks The different blocks of the file. Must be ordered by
their offset.
+ * @param offset The offset of the position in the file.
+ * @param startIndex The earliest index to look at.
+ * @return The index of the block containing the given position.
+ */
+ private int getBlockIndexForPosition(BlockLocation[] blocks, long offset,
long halfSplitSize, int startIndex) {
+ // go over all indexes after the startIndex
+ for (int i = startIndex; i < blocks.length; i++) {
+ long blockStart = blocks[i].getOffset();
+ long blockEnd = blockStart + blocks[i].getLength();
+
+ if (offset >= blockStart && offset < blockEnd) {
+ // got the block where the split starts
+ // check if the next block contains more than this one does
+ if (i < blocks.length - 1 && blockEnd - offset < halfSplitSize) {
+ return i + 1;
+ } else {
+ return i;
+ }
+ }
+ }
+ throw new IllegalArgumentException("The given offset is not contained in
the any block.");
+ }
+
+ private boolean testForUnsplittable(FileStatus pathFile) {
+ if (getInflaterInputStreamFactory(pathFile.getPath()) != null) {
+ unsplittable = true;
+ return true;
+ }
+ return false;
+ }
+
+ private InflaterInputStreamFactory<?>
getInflaterInputStreamFactory(org.apache.hadoop.fs.Path path) {
+ String fileExtension = extractFileExtension(path.getName());
+ if (fileExtension != null) {
+ return getInflaterInputStreamFactory(fileExtension);
+ } else {
+ return null;
+ }
+ }
}
diff --git
a/hudi-flink/src/main/java/org/apache/hudi/source/format/mor/MergeOnReadInputFormat.java
b/hudi-flink/src/main/java/org/apache/hudi/source/format/mor/MergeOnReadInputFormat.java
index 510b5b5..e2951ab 100644
---
a/hudi-flink/src/main/java/org/apache/hudi/source/format/mor/MergeOnReadInputFormat.java
+++
b/hudi-flink/src/main/java/org/apache/hudi/source/format/mor/MergeOnReadInputFormat.java
@@ -26,6 +26,7 @@ import org.apache.hudi.operator.FlinkOptions;
import org.apache.hudi.source.format.FilePathUtils;
import org.apache.hudi.source.format.FormatUtils;
import org.apache.hudi.source.format.cow.ParquetColumnarRowSplitReader;
+import org.apache.hudi.source.format.cow.ParquetSplitReaderUtil;
import org.apache.hudi.util.AvroToRowDataConverters;
import org.apache.hudi.util.RowDataToAvroConverters;
import org.apache.hudi.util.StreamerUtil;
@@ -40,7 +41,6 @@ import
org.apache.flink.api.common.io.statistics.BaseStatistics;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.Path;
import org.apache.flink.core.io.InputSplitAssigner;
-import org.apache.hudi.source.format.cow.ParquetSplitReaderUtil;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.RowType;
@@ -238,7 +238,7 @@ public class MergeOnReadInputFormat
private ParquetColumnarRowSplitReader getReader(String path, int[]
requiredPos) throws IOException {
// generate partition specs.
LinkedHashMap<String, String> partSpec =
FilePathUtils.extractPartitionKeyValues(
- new org.apache.flink.core.fs.Path(path).getParent(),
+ new org.apache.hadoop.fs.Path(path).getParent(),
this.conf.getBoolean(FlinkOptions.HIVE_STYLE_PARTITION),
this.conf.getString(FlinkOptions.PARTITION_PATH_FIELD).split(","));
LinkedHashMap<String, Object> partObjects = new LinkedHashMap<>();
diff --git a/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
b/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
index b5326bd..5364230 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
@@ -42,7 +42,6 @@ import
org.apache.hudi.table.action.compact.CompactionTriggerStrategy;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
-import org.apache.flink.api.java.hadoop.mapred.utils.HadoopUtils;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.util.Preconditions;
@@ -53,7 +52,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.BufferedReader;
-import java.io.File;
import java.io.IOException;
import java.io.StringReader;
import java.util.List;
@@ -127,50 +125,9 @@ public class StreamerUtil {
return conf;
}
+ // Keep to avoid to much modifications.
public static org.apache.hadoop.conf.Configuration getHadoopConf() {
- // create hadoop configuration with hadoop conf directory configured.
- org.apache.hadoop.conf.Configuration hadoopConf = null;
- for (String possibleHadoopConfPath :
HadoopUtils.possibleHadoopConfPaths(new Configuration())) {
- hadoopConf = getHadoopConfiguration(possibleHadoopConfPath);
- if (hadoopConf != null) {
- break;
- }
- }
- if (hadoopConf == null) {
- hadoopConf = new org.apache.hadoop.conf.Configuration();
- }
- return hadoopConf;
- }
-
- /**
- * Returns a new Hadoop Configuration object using the path to the hadoop
conf configured.
- *
- * @param hadoopConfDir Hadoop conf directory path.
- * @return A Hadoop configuration instance.
- */
- private static org.apache.hadoop.conf.Configuration
getHadoopConfiguration(String hadoopConfDir) {
- if (new File(hadoopConfDir).exists()) {
- org.apache.hadoop.conf.Configuration hadoopConfiguration = new
org.apache.hadoop.conf.Configuration();
- File coreSite = new File(hadoopConfDir, "core-site.xml");
- if (coreSite.exists()) {
- hadoopConfiguration.addResource(new Path(coreSite.getAbsolutePath()));
- }
- File hdfsSite = new File(hadoopConfDir, "hdfs-site.xml");
- if (hdfsSite.exists()) {
- hadoopConfiguration.addResource(new Path(hdfsSite.getAbsolutePath()));
- }
- File yarnSite = new File(hadoopConfDir, "yarn-site.xml");
- if (yarnSite.exists()) {
- hadoopConfiguration.addResource(new Path(yarnSite.getAbsolutePath()));
- }
- // Add mapred-site.xml. We need to read configurations like compression
codec.
- File mapredSite = new File(hadoopConfDir, "mapred-site.xml");
- if (mapredSite.exists()) {
- hadoopConfiguration.addResource(new
Path(mapredSite.getAbsolutePath()));
- }
- return hadoopConfiguration;
- }
- return null;
+ return FlinkClientUtil.getHadoopConf();
}
/**
@@ -291,21 +248,22 @@ public class StreamerUtil {
final String basePath = conf.getString(FlinkOptions.PATH);
final org.apache.hadoop.conf.Configuration hadoopConf =
StreamerUtil.getHadoopConf();
// Hadoop FileSystem
- try (FileSystem fs = FSUtils.getFs(basePath, hadoopConf)) {
- if (!fs.exists(new Path(basePath,
HoodieTableMetaClient.METAFOLDER_NAME))) {
- HoodieTableMetaClient.withPropertyBuilder()
+ FileSystem fs = FSUtils.getFs(basePath, hadoopConf);
+ if (!fs.exists(new Path(basePath, HoodieTableMetaClient.METAFOLDER_NAME)))
{
+ HoodieTableMetaClient.withPropertyBuilder()
.setTableType(conf.getString(FlinkOptions.TABLE_TYPE))
.setTableName(conf.getString(FlinkOptions.TABLE_NAME))
.setPayloadClassName(conf.getString(FlinkOptions.PAYLOAD_CLASS))
.setArchiveLogFolder(DEFAULT_ARCHIVE_LOG_FOLDER)
.setTimelineLayoutVersion(1)
.initTable(hadoopConf, basePath);
- LOG.info("Table initialized under base path {}", basePath);
- } else {
- LOG.info("Table [{}/{}] already exists, no need to initialize the
table",
- basePath, conf.getString(FlinkOptions.TABLE_NAME));
- }
+ LOG.info("Table initialized under base path {}", basePath);
+ } else {
+ LOG.info("Table [{}/{}] already exists, no need to initialize the table",
+ basePath, conf.getString(FlinkOptions.TABLE_NAME));
}
+ // Do not close the filesystem in order to use the CACHE,
+ // some of the filesystems release the handles in #close method.
}
/** Generates the bucket ID using format {partition path}_{fileID}. */
diff --git
a/hudi-flink/src/test/java/org/apache/hudi/source/TestHoodieTableSource.java
b/hudi-flink/src/test/java/org/apache/hudi/source/TestHoodieTableSource.java
index 48bd350..c1e2348 100644
--- a/hudi-flink/src/test/java/org/apache/hudi/source/TestHoodieTableSource.java
+++ b/hudi-flink/src/test/java/org/apache/hudi/source/TestHoodieTableSource.java
@@ -28,8 +28,8 @@ import org.apache.hudi.util.StreamerUtil;
import org.apache.flink.api.common.io.FileInputFormat;
import org.apache.flink.api.common.io.InputFormat;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.core.fs.Path;
import org.apache.flink.table.data.RowData;
+import org.apache.hadoop.fs.Path;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
diff --git
a/hudi-flink/src/test/java/org/apache/hudi/source/TestStreamReadOperator.java
b/hudi-flink/src/test/java/org/apache/hudi/source/TestStreamReadOperator.java
index 2daa6c3..4ada381 100644
---
a/hudi-flink/src/test/java/org/apache/hudi/source/TestStreamReadOperator.java
+++
b/hudi-flink/src/test/java/org/apache/hudi/source/TestStreamReadOperator.java
@@ -36,7 +36,6 @@ import org.apache.hudi.utils.TestUtils;
import org.apache.avro.Schema;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory;
@@ -48,6 +47,7 @@ import
org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.RowType;
+import org.apache.hadoop.fs.Path;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
@@ -242,8 +242,9 @@ public class TestStreamReadOperator {
private OneInputStreamOperatorTestHarness<MergeOnReadInputSplit, RowData>
createReader() throws Exception {
final String basePath = tempFile.getAbsolutePath();
+ final org.apache.hadoop.conf.Configuration hadoopConf =
StreamerUtil.getHadoopConf();
final HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder()
- .setConf(StreamerUtil.getHadoopConf()).setBasePath(basePath).build();
+ .setConf(hadoopConf).setBasePath(basePath).build();
final List<String> partitionKeys = Collections.singletonList("partition");
// This input format is used to opening the emitted split.
@@ -262,11 +263,10 @@ public class TestStreamReadOperator {
tableAvroSchema.toString(),
AvroSchemaConverter.convertToSchema(TestConfigurations.ROW_TYPE).toString(),
Collections.emptyList());
- Path[] paths = FilePathUtils.getReadPaths(
- new Path(basePath), conf, partitionKeys,
conf.getString(FlinkOptions.PARTITION_DEFAULT_NAME));
+ Path[] paths = FilePathUtils.getReadPaths(new Path(basePath), conf,
hadoopConf, partitionKeys);
MergeOnReadInputFormat inputFormat = new MergeOnReadInputFormat(
conf,
- paths,
+ FilePathUtils.toFlinkPaths(paths),
hoodieTableState,
rowDataType.getChildren(),
"default",
diff --git
a/hudi-flink/src/test/java/org/apache/hudi/source/format/TestInputFormat.java
b/hudi-flink/src/test/java/org/apache/hudi/source/format/TestInputFormat.java
index 343f293..1d92dbb 100644
---
a/hudi-flink/src/test/java/org/apache/hudi/source/format/TestInputFormat.java
+++
b/hudi-flink/src/test/java/org/apache/hudi/source/format/TestInputFormat.java
@@ -27,9 +27,9 @@ import org.apache.hudi.util.StreamerUtil;
import org.apache.flink.api.common.io.InputFormat;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.core.fs.Path;
import org.apache.flink.core.io.InputSplit;
import org.apache.flink.table.data.RowData;
+import org.apache.hadoop.fs.Path;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
import org.junit.jupiter.params.ParameterizedTest;