This is an automated email from the ASF dual-hosted git repository. yihua pushed a commit to branch branch-0.x in repository https://gitbox.apache.org/repos/asf/hudi.git
commit 226c2c7f8951dee0b26fdec159147d59e4481fa7 Author: Vova Kolmakov <[email protected]> AuthorDate: Wed May 15 01:22:13 2024 -0700 [HUDI-7632] Remove FileSystem usage in HoodieLogFormatWriter (#11082) --- .../java/org/apache/hudi/common/fs/FSUtils.java | 12 ++------- .../common/table/log/HoodieLogFormatWriter.java | 30 ++++++++++------------ .../org/apache/hudi/hadoop/fs/HadoopFSUtils.java | 10 ++++---- .../hudi/storage/hadoop/HoodieHadoopStorage.java | 15 +++++++++++ .../org/apache/hudi/storage/HoodieStorage.java | 27 +++++++++++++++++++ 5 files changed, 62 insertions(+), 32 deletions(-) diff --git a/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java index 0b6d8699631..2e584dfb8f9 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java @@ -666,14 +666,6 @@ public class FSUtils { : HoodieLogFile.LOGFILE_BASE_VERSION; } - public static int getDefaultBufferSize(final FileSystem fs) { - return fs.getConf().getInt("io.file.buffer.size", 4096); - } - - public static Short getDefaultReplication(FileSystem fs, Path path) { - return fs.getDefaultReplication(path); - } - /** * When a file was opened and the task died without closing the stream, another task executor cannot open because the * existing lease will be active. We will try to recover the lease, from HDFS. If a data node went down, it takes @@ -681,11 +673,11 @@ public class FSUtils { */ public static boolean recoverDFSFileLease(final DistributedFileSystem dfs, final Path p) throws IOException, InterruptedException { - LOG.info("Recover lease on dfs file " + p); + LOG.info("Recover lease on dfs file {}", p); // initiate the recovery boolean recovered = false; for (int nbAttempt = 0; nbAttempt < MAX_ATTEMPTS_RECOVER_LEASE; nbAttempt++) { - LOG.info("Attempt " + nbAttempt + " to recover lease on dfs file " + p); + LOG.info("Attempt {} to recover lease on dfs file {}", nbAttempt, p); recovered = dfs.recoverLease(p); if (recovered) { break; diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFormatWriter.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFormatWriter.java index afc00cd22e6..295d4a14073 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFormatWriter.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFormatWriter.java @@ -52,7 +52,6 @@ public class HoodieLogFormatWriter implements HoodieLogFormat.Writer { private FSDataOutputStream output; private final HoodieStorage storage; - private final FileSystem fs; private final long sizeThreshold; private final Integer bufferSize; private final Short replication; @@ -66,21 +65,15 @@ public class HoodieLogFormatWriter implements HoodieLogFormat.Writer { HoodieLogFormatWriter(HoodieStorage storage, HoodieLogFile logFile, Integer bufferSize, Short replication, Long sizeThreshold, String rolloverLogWriteToken, HoodieLogFileWriteCallback logFileWriteCallback) { this.storage = storage; - this.fs = (FileSystem) storage.getFileSystem(); this.logFile = logFile; this.sizeThreshold = sizeThreshold; - this.bufferSize = bufferSize != null ? bufferSize : FSUtils.getDefaultBufferSize(fs); - this.replication = replication != null ? replication - : FSUtils.getDefaultReplication(fs, new Path(logFile.getPath().getParent().toString())); + this.bufferSize = bufferSize != null ? bufferSize : storage.getDefaultBufferSize(); + this.replication = replication != null ? replication : storage.getDefaultReplication(logFile.getPath().getParent()); this.rolloverLogWriteToken = rolloverLogWriteToken; this.logFileWriteCallback = logFileWriteCallback; addShutDownHook(); } - public FileSystem getFs() { - return (FileSystem) storage.getFileSystem(); - } - @Override public HoodieLogFile getLogFile() { return logFile; @@ -99,6 +92,7 @@ public class HoodieLogFormatWriter implements HoodieLogFormat.Writer { private FSDataOutputStream getOutputStream() throws IOException, InterruptedException { if (this.output == null) { Path path = new Path(logFile.getPath().toUri()); + FileSystem fs = (FileSystem) storage.getFileSystem(); if (fs.exists(path)) { boolean isAppendSupported = StorageSchemes.isAppendSupported(fs.getScheme()); // here we use marker file to fence concurrent append to the same file. So it is safe to use speculation in spark now. @@ -155,7 +149,7 @@ public class HoodieLogFormatWriter implements HoodieLogFormat.Writer { long startPos = originalOutputStream.getPos(); long sizeWritten = 0; // HUDI-2655. here we wrap originalOutputStream to ensure huge blocks can be correctly written - FSDataOutputStream outputStream = new FSDataOutputStream(originalOutputStream, new FileSystem.Statistics(fs.getScheme()), startPos); + FSDataOutputStream outputStream = new FSDataOutputStream(originalOutputStream, new FileSystem.Statistics(storage.getScheme()), startPos); for (HoodieLogBlock block: blocks) { long startSize = outputStream.size(); @@ -227,8 +221,7 @@ public class HoodieLogFormatWriter implements HoodieLogFormat.Writer { private void rolloverIfNeeded() throws IOException { // Roll over if the size is past the threshold if (getCurrentSize() > sizeThreshold) { - LOG.info("CurrentSize " + getCurrentSize() + " has reached threshold " + sizeThreshold - + ". Rolling over to the next version"); + LOG.info("CurrentSize {} has reached threshold {}. Rolling over to the next version", getCurrentSize(), sizeThreshold); rollOver(); } } @@ -241,12 +234,14 @@ public class HoodieLogFormatWriter implements HoodieLogFormat.Writer { private void createNewFile() throws IOException { logFileWriteCallback.preLogFileCreate(logFile); - this.output = - ((FileSystem) storage.getFileSystem()).create( - new Path(this.logFile.getPath().toUri()), false, + this.output = new FSDataOutputStream( + storage.create( + this.logFile.getPath(), false, bufferSize, replication, - WriterBuilder.DEFAULT_SIZE_THRESHOLD, null); + WriterBuilder.DEFAULT_SIZE_THRESHOLD), + new FileSystem.Statistics(storage.getScheme()) + ); } @Override @@ -305,7 +300,7 @@ public class HoodieLogFormatWriter implements HoodieLogFormat.Writer { closeStream(); } } catch (Exception e) { - LOG.warn("unable to close output stream for log file " + logFile, e); + LOG.warn(String.format("unable to close output stream for log file %s", logFile), e); // fail silently for any sort of exception } } @@ -315,6 +310,7 @@ public class HoodieLogFormatWriter implements HoodieLogFormat.Writer { private void handleAppendExceptionOrRecoverLease(Path path, RemoteException e) throws IOException, InterruptedException { + FileSystem fs = (FileSystem) storage.getFileSystem(); if (e.getMessage().contains(APPEND_UNAVAILABLE_EXCEPTION_MESSAGE)) { // This issue happens when all replicas for a file are down and/or being decommissioned. // The fs.append() API could append to the last block for a file. If the last block is full, a new block is diff --git a/hudi-hadoop-common/src/main/java/org/apache/hudi/hadoop/fs/HadoopFSUtils.java b/hudi-hadoop-common/src/main/java/org/apache/hudi/hadoop/fs/HadoopFSUtils.java index f8e3915e5e3..78b293ee75f 100644 --- a/hudi-hadoop-common/src/main/java/org/apache/hudi/hadoop/fs/HadoopFSUtils.java +++ b/hudi-hadoop-common/src/main/java/org/apache/hudi/hadoop/fs/HadoopFSUtils.java @@ -56,7 +56,7 @@ public class HadoopFSUtils { // look for all properties, prefixed to be picked up for (Map.Entry<String, String> prop : System.getenv().entrySet()) { if (prop.getKey().startsWith(HOODIE_ENV_PROPS_PREFIX)) { - LOG.info("Picking up value for hoodie env var :" + prop.getKey()); + LOG.info("Picking up value for hoodie env var : {}", prop.getKey()); conf.set(prop.getKey().replace(HOODIE_ENV_PROPS_PREFIX, "").replaceAll("_DOT_", "."), prop.getValue()); } } @@ -99,7 +99,7 @@ public class HadoopFSUtils { try { fs = path.getFileSystem(conf); } catch (IOException e) { - throw new HoodieIOException("Failed to get instance of " + FileSystem.class.getName(), e); + throw new HoodieIOException(String.format("Failed to get instance of %s", FileSystem.class.getName()), e); } return fs; } @@ -135,10 +135,10 @@ public class HadoopFSUtils { File localFile = new File(path); if (!providedPath.isAbsolute() && localFile.exists()) { Path resolvedPath = new Path("file://" + localFile.getAbsolutePath()); - LOG.info("Resolving file " + path + " to be a local file."); + LOG.info("Resolving file {} to be a local file.", path); return resolvedPath; } - LOG.info("Resolving file " + path + "to be a remote file."); + LOG.info("Resolving file {} to be a remote file.", path); return providedPath; } @@ -201,7 +201,7 @@ public class HadoopFSUtils { try { fsDataInputStream = fs.open(convertToHadoopPath(filePath), bufferSize); } catch (IOException e) { - throw new HoodieIOException("Exception creating input stream from file: " + filePath, e); + throw new HoodieIOException(String.format("Exception creating input stream from file: %s", filePath), e); } if (isGCSFileSystem(fs)) { diff --git a/hudi-hadoop-common/src/main/java/org/apache/hudi/storage/hadoop/HoodieHadoopStorage.java b/hudi-hadoop-common/src/main/java/org/apache/hudi/storage/hadoop/HoodieHadoopStorage.java index 9785f42989d..975e4267f0c 100644 --- a/hudi-hadoop-common/src/main/java/org/apache/hudi/storage/hadoop/HoodieHadoopStorage.java +++ b/hudi-hadoop-common/src/main/java/org/apache/hudi/storage/hadoop/HoodieHadoopStorage.java @@ -75,6 +75,21 @@ public class HoodieHadoopStorage extends HoodieStorage { return fs.create(convertToHadoopPath(path), overwrite); } + @Override + public OutputStream create(StoragePath path, boolean overwrite, Integer bufferSize, Short replication, Long sizeThreshold) throws IOException { + return fs.create(convertToHadoopPath(path), false, bufferSize, replication, sizeThreshold, null); + } + + @Override + public int getDefaultBufferSize() { + return fs.getConf().getInt("io.file.buffer.size", 4096); + } + + @Override + public short getDefaultReplication(StoragePath path) { + return fs.getDefaultReplication(convertToHadoopPath(path)); + } + @Override public InputStream open(StoragePath path) throws IOException { return fs.open(convertToHadoopPath(path)); diff --git a/hudi-io/src/main/java/org/apache/hudi/storage/HoodieStorage.java b/hudi-io/src/main/java/org/apache/hudi/storage/HoodieStorage.java index b8735cc89d9..5abb1ac13c9 100644 --- a/hudi-io/src/main/java/org/apache/hudi/storage/HoodieStorage.java +++ b/hudi-io/src/main/java/org/apache/hudi/storage/HoodieStorage.java @@ -59,6 +59,18 @@ public abstract class HoodieStorage implements Closeable { @PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING) public abstract int getDefaultBlockSize(StoragePath path); + /** + * @return the default buffer size. + */ + @PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING) + public abstract int getDefaultBufferSize(); + + /** + * @return the default block replication + */ + @PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING) + public abstract short getDefaultReplication(StoragePath path); + /** * Returns a URI which identifies this HoodieStorage. * @@ -79,6 +91,21 @@ public abstract class HoodieStorage implements Closeable { @PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING) public abstract OutputStream create(StoragePath path, boolean overwrite) throws IOException; + /** + * Creates an OutputStream at the indicated path. + * + * @param path the file to create + * @param overwrite if a file with this name already exists, then if {@code true}, + * the file will be overwritten, and if {@code false} an exception will be thrown. + * @param bufferSize the size of the buffer to be used + * @param replication required block replication for the file + * @param sizeThreshold block size + * @return the OutputStream to write to. + * @throws IOException IO error. + */ + @PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING) + public abstract OutputStream create(StoragePath path, boolean overwrite, Integer bufferSize, Short replication, Long sizeThreshold) throws IOException; + /** * Opens an InputStream at the indicated path. *
