danny0405 commented on a change in pull request #2496:
URL: https://github.com/apache/hudi/pull/2496#discussion_r567627556
##########
File path:
hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieEngineContext.java
##########
@@ -54,6 +54,10 @@ public TaskContextSupplier getTaskContextSupplier() {
return taskContextSupplier;
}
+ public void setConfig(String name, String value) {
+ getHadoopConf().get().set(name, value);
+ }
Review comment:
`setConfig` => `setHadoopConf` ?
##########
File path:
hudi-common/src/main/java/org/apache/hudi/common/fs/HoodieWrapperFileSystem.java
##########
@@ -118,12 +156,31 @@ private static Registry getMetricRegistryForPath(Path p) {
return executeFuncWithTimeMetrics(metricName, p, func);
}
+ /**
+ * Executes the given function and updates the metric with time taken to
execute the function and number of bytes.
+ *
+ * The number of bytes are returned from the execution of the provided
function.
+ */
+ protected static int executeFuncWithTimeAndByteMetrics(String metricName,
Path p, CheckedIntFunction func) throws IOException {
+ int ret = executeFuncWithTimeMetrics(metricName, p, func);
+ if (ret > 0) {
+ Registry registry = getMetricRegistryForPath(p);
+ if (registry != null) {
Review comment:
The code invoke `getMetricRegistryForPath(p)` 2 times.
##########
File path: hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java
##########
@@ -419,13 +417,8 @@ public static boolean isLogFile(Path logPath) {
* Get the names of all the base and log files in the given partition path.
*/
public static FileStatus[] getAllDataFilesInPartition(FileSystem fs, Path
partitionPath) throws IOException {
- final Set<String> validFileExtensions =
Arrays.stream(HoodieFileFormat.values())
-
.map(HoodieFileFormat::getFileExtension).collect(Collectors.toCollection(HashSet::new));
- final String logFileExtension =
HoodieFileFormat.HOODIE_LOG.getFileExtension();
-
return Arrays.stream(fs.listStatus(partitionPath, path -> {
- String extension = FSUtils.getFileExtension(path.getName());
- return validFileExtensions.contains(extension) ||
path.getName().contains(logFileExtension);
+ return HoodieFileFormat.isBaseFile(path) || isLogFile(path);
})).filter(FileStatus::isFile).toArray(FileStatus[]::new);
}
Review comment:
I guess `FileStatus::isFile` is always true if
`HoodieFileFormat.isBaseFile(path) || isLogFile(path)` is true ?
##########
File path:
hudi-common/src/main/java/org/apache/hudi/common/fs/TimedFSInputStream.java
##########
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
Review comment:
Why still name the class `TimedFSInputStream` based on the fact that the
written bytes size is also recorded ?
##########
File path:
hudi-common/src/main/java/org/apache/hudi/common/fs/HoodieWrapperFileSystem.java
##########
@@ -192,27 +261,74 @@ public FSDataOutputStream create(Path f, FsPermission
permission, boolean overwr
return executeFuncWithTimeMetrics(MetricName.create.name(), f, () -> {
final Path translatedPath = convertToDefaultPath(f);
return wrapOutputStream(f,
- fileSystem.create(translatedPath, permission, overwrite, bufferSize,
replication, blockSize, progress));
+ fileSystem.create(translatedPath, permission, overwrite, bufferSize,
replication, blockSize, progress), bufferSize);
});
}
- private FSDataOutputStream wrapOutputStream(final Path path,
FSDataOutputStream fsDataOutputStream)
+
+ /**
+ * The stream hierarchy after wrapping will be as follows.
+ *
+ * FSDataOuputStream (returned)
+ * BufferedOutputStream (required for output buffering)
+ * TimedSizeAwareOutputStream (required for tracking metrics,
timings and the number of bytes written)
+ * fs.open() (Original stream returned from underlying
FileSystem)
+ */
+ private FSDataOutputStream wrapOutputStream(final Path path,
FSDataOutputStream fsDataOutputStream, int bufferSize)
throws IOException {
- if (fsDataOutputStream instanceof SizeAwareFSDataOutputStream) {
- return fsDataOutputStream;
+ TimedSizeAwareOutputStream tos = new TimedSizeAwareOutputStream(path,
fsDataOutputStream, consistencyGuard,
+ () -> openStreams.remove(path.getName()));
+
+ if (ioBufferingEnabled) {
+ int minBufferSize = (HoodieFileFormat.isBaseFile(path) ||
FSUtils.isLogFile(path)) ? minDataFileIOBufferSizeBytes
+ : minFileIOBufferSizeBytes;
Review comment:
Can we just add a method named `FsUtil.isDataFile(Path)` to enclose
```java
HoodieFileFormat.isBaseFile(path) || FSUtils.isLogFile(path)
```
##########
File path:
hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFileReader.java
##########
@@ -79,15 +79,12 @@ public HoodieLogFileReader(FileSystem fs, HoodieLogFile
logFile, Schema readerSc
this.inputStream = new TimedFSDataInputStream(logFile.getPath(), new
FSDataInputStream(
new BufferedFSInputStream((FSInputStream) ((
(FSDataInputStream)
fsDataInputStream.getWrappedStream()).getWrappedStream()), bufferSize)));
- } else if (fsDataInputStream.getWrappedStream() instanceof FSInputStream) {
- this.inputStream = new TimedFSDataInputStream(logFile.getPath(), new
FSDataInputStream(
- new BufferedFSInputStream((FSInputStream)
fsDataInputStream.getWrappedStream(), bufferSize)));
} else {
- // fsDataInputStream.getWrappedStream() maybe a BufferedFSInputStream
- // need to wrap in another BufferedFSInputStream the make bufferSize
work?
this.inputStream = fsDataInputStream;
}
+ LOG.error("Opened inputstream of type " +
this.inputStream.getClass().getName() + " wrapping over "
Review comment:
+1, why ???
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]