This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new cab0e2bbb9a [HUDI-7632] Remove FileSystem usage in
HoodieLogFormatWriter (#11082)
cab0e2bbb9a is described below
commit cab0e2bbb9afd73e2d157612e085a724fbc14c01
Author: Vova Kolmakov <[email protected]>
AuthorDate: Wed Apr 24 16:08:21 2024 +0700
[HUDI-7632] Remove FileSystem usage in HoodieLogFormatWriter (#11082)
---
.../java/org/apache/hudi/common/fs/FSUtils.java | 12 ++------
.../common/table/log/HoodieLogFormatWriter.java | 32 +++++++---------------
.../org/apache/hudi/hadoop/fs/HadoopFSUtils.java | 10 +++----
.../hudi/storage/hadoop/HoodieHadoopStorage.java | 15 ++++++++++
.../org/apache/hudi/storage/HoodieStorage.java | 27 ++++++++++++++++++
5 files changed, 59 insertions(+), 37 deletions(-)
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java
b/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java
index afd28f0ebac..a0c39ecff1a 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java
@@ -596,14 +596,6 @@ public class FSUtils {
return Option.empty();
}
- public static int getDefaultBufferSize(final FileSystem fs) {
- return fs.getConf().getInt("io.file.buffer.size", 4096);
- }
-
- public static Short getDefaultReplication(FileSystem fs, Path path) {
- return fs.getDefaultReplication(path);
- }
-
/**
* When a file was opened and the task died without closing the stream,
another task executor cannot open because the
* existing lease will be active. We will try to recover the lease, from
HDFS. If a data node went down, it takes
@@ -611,11 +603,11 @@ public class FSUtils {
*/
public static boolean recoverDFSFileLease(final DistributedFileSystem dfs,
final Path p)
throws IOException, InterruptedException {
- LOG.info("Recover lease on dfs file " + p);
+ LOG.info("Recover lease on dfs file {}", p);
// initiate the recovery
boolean recovered = false;
for (int nbAttempt = 0; nbAttempt < MAX_ATTEMPTS_RECOVER_LEASE;
nbAttempt++) {
- LOG.info("Attempt " + nbAttempt + " to recover lease on dfs file " + p);
+ LOG.info("Attempt {} to recover lease on dfs file {}", nbAttempt, p);
recovered = dfs.recoverLease(p);
if (recovered) {
break;
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFormatWriter.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFormatWriter.java
index b8da75e0b48..db5db422d1d 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFormatWriter.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFormatWriter.java
@@ -18,7 +18,6 @@
package org.apache.hudi.common.table.log;
-import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.table.log.HoodieLogFormat.WriterBuilder;
import org.apache.hudi.common.table.log.block.HoodieLogBlock;
@@ -29,7 +28,6 @@ import org.apache.hudi.storage.HoodieStorage;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileAlreadyExistsException;
import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException;
import org.apache.hadoop.ipc.RemoteException;
import org.slf4j.Logger;
@@ -50,7 +48,6 @@ public class HoodieLogFormatWriter implements
HoodieLogFormat.Writer {
private FSDataOutputStream output;
private final HoodieStorage storage;
- private final FileSystem fs;
private final long sizeThreshold;
private final Integer bufferSize;
private final Short replication;
@@ -68,21 +65,15 @@ public class HoodieLogFormatWriter implements
HoodieLogFormat.Writer {
String rolloverLogWriteToken,
LogFileCreationCallback fileCreationHook) {
this.storage = storage;
- this.fs = (FileSystem) storage.getFileSystem();
this.logFile = logFile;
this.sizeThreshold = sizeThreshold;
- this.bufferSize = bufferSize != null ? bufferSize :
FSUtils.getDefaultBufferSize(fs);
- this.replication = replication != null ? replication
- : FSUtils.getDefaultReplication(fs, new
Path(logFile.getPath().getParent().toString()));
+ this.bufferSize = bufferSize != null ? bufferSize :
storage.getDefaultBufferSize();
+ this.replication = replication != null ? replication :
storage.getDefaultReplication(logFile.getPath().getParent());
this.rolloverLogWriteToken = rolloverLogWriteToken;
this.fileCreationHook = fileCreationHook;
addShutDownHook();
}
- public FileSystem getFs() {
- return (FileSystem) storage.getFileSystem();
- }
-
@Override
public HoodieLogFile getLogFile() {
return logFile;
@@ -119,7 +110,7 @@ public class HoodieLogFormatWriter implements
HoodieLogFormat.Writer {
rollOver();
} catch (RemoteException re) {
if
(re.getClassName().contentEquals(AlreadyBeingCreatedException.class.getName()))
{
- LOG.warn("Another task executor writing to the same log file(" +
logFile + ", rolling over");
+ LOG.warn("Another task executor writing to the same log file({}),
rolling over", logFile);
// Rollover the current log file (since cannot get a stream
handle) and create new one
rollOver();
} else {
@@ -146,7 +137,7 @@ public class HoodieLogFormatWriter implements
HoodieLogFormat.Writer {
long startPos = originalOutputStream.getPos();
long sizeWritten = 0;
// HUDI-2655. here we wrap originalOutputStream to ensure huge blocks can
be correctly written
- FSDataOutputStream outputStream = new
FSDataOutputStream(originalOutputStream, new
FileSystem.Statistics(fs.getScheme()), startPos);
+ FSDataOutputStream outputStream = new
FSDataOutputStream(originalOutputStream, new
FileSystem.Statistics(storage.getScheme()), startPos);
for (HoodieLogBlock block: blocks) {
long startSize = outputStream.size();
@@ -218,8 +209,7 @@ public class HoodieLogFormatWriter implements
HoodieLogFormat.Writer {
private void rolloverIfNeeded() throws IOException {
// Roll over if the size is past the threshold
if (getCurrentSize() > sizeThreshold) {
- LOG.info("CurrentSize " + getCurrentSize() + " has reached threshold " +
sizeThreshold
- + ". Rolling over to the next version");
+ LOG.info("CurrentSize {} has reached threshold {}. Rolling over to the
next version", getCurrentSize(), sizeThreshold);
rollOver();
}
}
@@ -232,12 +222,10 @@ public class HoodieLogFormatWriter implements
HoodieLogFormat.Writer {
private void createNewFile() throws IOException {
fileCreationHook.preFileCreation(this.logFile);
- this.output =
- ((FileSystem) storage.getFileSystem()).create(
- new Path(this.logFile.getPath().toUri()), false,
- bufferSize,
- replication,
- WriterBuilder.DEFAULT_SIZE_THRESHOLD, null);
+ this.output = new FSDataOutputStream(
+ storage.create(this.logFile.getPath(), false, bufferSize, replication,
WriterBuilder.DEFAULT_SIZE_THRESHOLD),
+ new FileSystem.Statistics(storage.getScheme())
+ );
}
@Override
@@ -291,7 +279,7 @@ public class HoodieLogFormatWriter implements
HoodieLogFormat.Writer {
closeStream();
}
} catch (Exception e) {
- LOG.warn("unable to close output stream for log file " + logFile, e);
+ LOG.warn(String.format("unable to close output stream for log file
%s", logFile), e);
// fail silently for any sort of exception
}
}
diff --git
a/hudi-hadoop-common/src/main/java/org/apache/hudi/hadoop/fs/HadoopFSUtils.java
b/hudi-hadoop-common/src/main/java/org/apache/hudi/hadoop/fs/HadoopFSUtils.java
index f8e3915e5e3..78b293ee75f 100644
---
a/hudi-hadoop-common/src/main/java/org/apache/hudi/hadoop/fs/HadoopFSUtils.java
+++
b/hudi-hadoop-common/src/main/java/org/apache/hudi/hadoop/fs/HadoopFSUtils.java
@@ -56,7 +56,7 @@ public class HadoopFSUtils {
// look for all properties, prefixed to be picked up
for (Map.Entry<String, String> prop : System.getenv().entrySet()) {
if (prop.getKey().startsWith(HOODIE_ENV_PROPS_PREFIX)) {
- LOG.info("Picking up value for hoodie env var :" + prop.getKey());
+ LOG.info("Picking up value for hoodie env var : {}", prop.getKey());
conf.set(prop.getKey().replace(HOODIE_ENV_PROPS_PREFIX,
"").replaceAll("_DOT_", "."), prop.getValue());
}
}
@@ -99,7 +99,7 @@ public class HadoopFSUtils {
try {
fs = path.getFileSystem(conf);
} catch (IOException e) {
- throw new HoodieIOException("Failed to get instance of " +
FileSystem.class.getName(), e);
+ throw new HoodieIOException(String.format("Failed to get instance of
%s", FileSystem.class.getName()), e);
}
return fs;
}
@@ -135,10 +135,10 @@ public class HadoopFSUtils {
File localFile = new File(path);
if (!providedPath.isAbsolute() && localFile.exists()) {
Path resolvedPath = new Path("file://" + localFile.getAbsolutePath());
- LOG.info("Resolving file " + path + " to be a local file.");
+ LOG.info("Resolving file {} to be a local file.", path);
return resolvedPath;
}
- LOG.info("Resolving file " + path + "to be a remote file.");
+ LOG.info("Resolving file {} to be a remote file.", path);
return providedPath;
}
@@ -201,7 +201,7 @@ public class HadoopFSUtils {
try {
fsDataInputStream = fs.open(convertToHadoopPath(filePath), bufferSize);
} catch (IOException e) {
- throw new HoodieIOException("Exception creating input stream from file:
" + filePath, e);
+ throw new HoodieIOException(String.format("Exception creating input
stream from file: %s", filePath), e);
}
if (isGCSFileSystem(fs)) {
diff --git
a/hudi-hadoop-common/src/main/java/org/apache/hudi/storage/hadoop/HoodieHadoopStorage.java
b/hudi-hadoop-common/src/main/java/org/apache/hudi/storage/hadoop/HoodieHadoopStorage.java
index 9785f42989d..975e4267f0c 100644
---
a/hudi-hadoop-common/src/main/java/org/apache/hudi/storage/hadoop/HoodieHadoopStorage.java
+++
b/hudi-hadoop-common/src/main/java/org/apache/hudi/storage/hadoop/HoodieHadoopStorage.java
@@ -75,6 +75,21 @@ public class HoodieHadoopStorage extends HoodieStorage {
return fs.create(convertToHadoopPath(path), overwrite);
}
+ @Override
+ public OutputStream create(StoragePath path, boolean overwrite, Integer
bufferSize, Short replication, Long sizeThreshold) throws IOException {
+ return fs.create(convertToHadoopPath(path), false, bufferSize,
replication, sizeThreshold, null);
+ }
+
+ @Override
+ public int getDefaultBufferSize() {
+ return fs.getConf().getInt("io.file.buffer.size", 4096);
+ }
+
+ @Override
+ public short getDefaultReplication(StoragePath path) {
+ return fs.getDefaultReplication(convertToHadoopPath(path));
+ }
+
@Override
public InputStream open(StoragePath path) throws IOException {
return fs.open(convertToHadoopPath(path));
diff --git a/hudi-io/src/main/java/org/apache/hudi/storage/HoodieStorage.java
b/hudi-io/src/main/java/org/apache/hudi/storage/HoodieStorage.java
index b8735cc89d9..5abb1ac13c9 100644
--- a/hudi-io/src/main/java/org/apache/hudi/storage/HoodieStorage.java
+++ b/hudi-io/src/main/java/org/apache/hudi/storage/HoodieStorage.java
@@ -59,6 +59,18 @@ public abstract class HoodieStorage implements Closeable {
@PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)
public abstract int getDefaultBlockSize(StoragePath path);
+ /**
+ * @return the default buffer size.
+ */
+ @PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)
+ public abstract int getDefaultBufferSize();
+
+ /**
+ * @return the default block replication
+ */
+ @PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)
+ public abstract short getDefaultReplication(StoragePath path);
+
/**
* Returns a URI which identifies this HoodieStorage.
*
@@ -79,6 +91,21 @@ public abstract class HoodieStorage implements Closeable {
@PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)
public abstract OutputStream create(StoragePath path, boolean overwrite)
throws IOException;
+ /**
+ * Creates an OutputStream at the indicated path.
+ *
+ * @param path the file to create
+ * @param overwrite if a file with this name already exists, then if
{@code true},
+ * the file will be overwritten, and if {@code false}
an exception will be thrown.
+ * @param bufferSize the size of the buffer to be used
+ * @param replication required block replication for the file
+ * @param sizeThreshold block size
+ * @return the OutputStream to write to.
+ * @throws IOException IO error.
+ */
+ @PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)
+ public abstract OutputStream create(StoragePath path, boolean overwrite,
Integer bufferSize, Short replication, Long sizeThreshold) throws IOException;
+
/**
* Opens an InputStream at the indicated path.
*