This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new cab0e2bbb9a [HUDI-7632] Remove FileSystem usage in 
HoodieLogFormatWriter (#11082)
cab0e2bbb9a is described below

commit cab0e2bbb9afd73e2d157612e085a724fbc14c01
Author: Vova Kolmakov <[email protected]>
AuthorDate: Wed Apr 24 16:08:21 2024 +0700

    [HUDI-7632] Remove FileSystem usage in HoodieLogFormatWriter (#11082)
---
 .../java/org/apache/hudi/common/fs/FSUtils.java    | 12 ++------
 .../common/table/log/HoodieLogFormatWriter.java    | 32 +++++++---------------
 .../org/apache/hudi/hadoop/fs/HadoopFSUtils.java   | 10 +++----
 .../hudi/storage/hadoop/HoodieHadoopStorage.java   | 15 ++++++++++
 .../org/apache/hudi/storage/HoodieStorage.java     | 27 ++++++++++++++++++
 5 files changed, 59 insertions(+), 37 deletions(-)

diff --git a/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java 
b/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java
index afd28f0ebac..a0c39ecff1a 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java
@@ -596,14 +596,6 @@ public class FSUtils {
     return Option.empty();
   }
 
-  public static int getDefaultBufferSize(final FileSystem fs) {
-    return fs.getConf().getInt("io.file.buffer.size", 4096);
-  }
-
-  public static Short getDefaultReplication(FileSystem fs, Path path) {
-    return fs.getDefaultReplication(path);
-  }
-
   /**
    * When a file was opened and the task died without closing the stream, 
another task executor cannot open because the
    * existing lease will be active. We will try to recover the lease, from 
HDFS. If a data node went down, it takes
@@ -611,11 +603,11 @@ public class FSUtils {
    */
   public static boolean recoverDFSFileLease(final DistributedFileSystem dfs, 
final Path p)
       throws IOException, InterruptedException {
-    LOG.info("Recover lease on dfs file " + p);
+    LOG.info("Recover lease on dfs file {}", p);
     // initiate the recovery
     boolean recovered = false;
     for (int nbAttempt = 0; nbAttempt < MAX_ATTEMPTS_RECOVER_LEASE; 
nbAttempt++) {
-      LOG.info("Attempt " + nbAttempt + " to recover lease on dfs file " + p);
+      LOG.info("Attempt {} to recover lease on dfs file {}", nbAttempt, p);
       recovered = dfs.recoverLease(p);
       if (recovered) {
         break;
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFormatWriter.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFormatWriter.java
index b8da75e0b48..db5db422d1d 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFormatWriter.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFormatWriter.java
@@ -18,7 +18,6 @@
 
 package org.apache.hudi.common.table.log;
 
-import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.common.model.HoodieLogFile;
 import org.apache.hudi.common.table.log.HoodieLogFormat.WriterBuilder;
 import org.apache.hudi.common.table.log.block.HoodieLogBlock;
@@ -29,7 +28,6 @@ import org.apache.hudi.storage.HoodieStorage;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileAlreadyExistsException;
 import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException;
 import org.apache.hadoop.ipc.RemoteException;
 import org.slf4j.Logger;
@@ -50,7 +48,6 @@ public class HoodieLogFormatWriter implements 
HoodieLogFormat.Writer {
   private FSDataOutputStream output;
 
   private final HoodieStorage storage;
-  private final FileSystem fs;
   private final long sizeThreshold;
   private final Integer bufferSize;
   private final Short replication;
@@ -68,21 +65,15 @@ public class HoodieLogFormatWriter implements 
HoodieLogFormat.Writer {
       String rolloverLogWriteToken,
       LogFileCreationCallback fileCreationHook) {
     this.storage = storage;
-    this.fs = (FileSystem) storage.getFileSystem();
     this.logFile = logFile;
     this.sizeThreshold = sizeThreshold;
-    this.bufferSize = bufferSize != null ? bufferSize : 
FSUtils.getDefaultBufferSize(fs);
-    this.replication = replication != null ? replication
-        : FSUtils.getDefaultReplication(fs, new 
Path(logFile.getPath().getParent().toString()));
+    this.bufferSize = bufferSize != null ? bufferSize : 
storage.getDefaultBufferSize();
+    this.replication = replication != null ? replication : 
storage.getDefaultReplication(logFile.getPath().getParent());
     this.rolloverLogWriteToken = rolloverLogWriteToken;
     this.fileCreationHook = fileCreationHook;
     addShutDownHook();
   }
 
-  public FileSystem getFs() {
-    return (FileSystem) storage.getFileSystem();
-  }
-
   @Override
   public HoodieLogFile getLogFile() {
     return logFile;
@@ -119,7 +110,7 @@ public class HoodieLogFormatWriter implements 
HoodieLogFormat.Writer {
           rollOver();
         } catch (RemoteException re) {
           if 
(re.getClassName().contentEquals(AlreadyBeingCreatedException.class.getName())) 
{
-            LOG.warn("Another task executor writing to the same log file(" + 
logFile + ", rolling over");
+            LOG.warn("Another task executor writing to the same log file({}), 
rolling over", logFile);
             // Rollover the current log file (since cannot get a stream 
handle) and create new one
             rollOver();
           } else {
@@ -146,7 +137,7 @@ public class HoodieLogFormatWriter implements 
HoodieLogFormat.Writer {
     long startPos = originalOutputStream.getPos();
     long sizeWritten = 0;
     // HUDI-2655. here we wrap originalOutputStream to ensure huge blocks can 
be correctly written
-    FSDataOutputStream outputStream = new 
FSDataOutputStream(originalOutputStream, new 
FileSystem.Statistics(fs.getScheme()), startPos);
+    FSDataOutputStream outputStream = new 
FSDataOutputStream(originalOutputStream, new 
FileSystem.Statistics(storage.getScheme()), startPos);
     for (HoodieLogBlock block: blocks) {
       long startSize = outputStream.size();
 
@@ -218,8 +209,7 @@ public class HoodieLogFormatWriter implements 
HoodieLogFormat.Writer {
   private void rolloverIfNeeded() throws IOException {
     // Roll over if the size is past the threshold
     if (getCurrentSize() > sizeThreshold) {
-      LOG.info("CurrentSize " + getCurrentSize() + " has reached threshold " + 
sizeThreshold
-          + ". Rolling over to the next version");
+      LOG.info("CurrentSize {} has reached threshold {}. Rolling over to the 
next version", getCurrentSize(), sizeThreshold);
       rollOver();
     }
   }
@@ -232,12 +222,10 @@ public class HoodieLogFormatWriter implements 
HoodieLogFormat.Writer {
 
   private void createNewFile() throws IOException {
     fileCreationHook.preFileCreation(this.logFile);
-    this.output =
-        ((FileSystem) storage.getFileSystem()).create(
-            new Path(this.logFile.getPath().toUri()), false,
-            bufferSize,
-            replication,
-            WriterBuilder.DEFAULT_SIZE_THRESHOLD, null);
+    this.output = new FSDataOutputStream(
+        storage.create(this.logFile.getPath(), false, bufferSize, replication, 
WriterBuilder.DEFAULT_SIZE_THRESHOLD),
+        new FileSystem.Statistics(storage.getScheme())
+    );
   }
 
   @Override
@@ -291,7 +279,7 @@ public class HoodieLogFormatWriter implements 
HoodieLogFormat.Writer {
             closeStream();
           }
         } catch (Exception e) {
-          LOG.warn("unable to close output stream for log file " + logFile, e);
+          LOG.warn(String.format("unable to close output stream for log file 
%s", logFile), e);
           // fail silently for any sort of exception
         }
       }
diff --git 
a/hudi-hadoop-common/src/main/java/org/apache/hudi/hadoop/fs/HadoopFSUtils.java 
b/hudi-hadoop-common/src/main/java/org/apache/hudi/hadoop/fs/HadoopFSUtils.java
index f8e3915e5e3..78b293ee75f 100644
--- 
a/hudi-hadoop-common/src/main/java/org/apache/hudi/hadoop/fs/HadoopFSUtils.java
+++ 
b/hudi-hadoop-common/src/main/java/org/apache/hudi/hadoop/fs/HadoopFSUtils.java
@@ -56,7 +56,7 @@ public class HadoopFSUtils {
     // look for all properties, prefixed to be picked up
     for (Map.Entry<String, String> prop : System.getenv().entrySet()) {
       if (prop.getKey().startsWith(HOODIE_ENV_PROPS_PREFIX)) {
-        LOG.info("Picking up value for hoodie env var :" + prop.getKey());
+        LOG.info("Picking up value for hoodie env var : {}", prop.getKey());
         conf.set(prop.getKey().replace(HOODIE_ENV_PROPS_PREFIX, 
"").replaceAll("_DOT_", "."), prop.getValue());
       }
     }
@@ -99,7 +99,7 @@ public class HadoopFSUtils {
     try {
       fs = path.getFileSystem(conf);
     } catch (IOException e) {
-      throw new HoodieIOException("Failed to get instance of " + 
FileSystem.class.getName(), e);
+      throw new HoodieIOException(String.format("Failed to get instance of 
%s", FileSystem.class.getName()), e);
     }
     return fs;
   }
@@ -135,10 +135,10 @@ public class HadoopFSUtils {
     File localFile = new File(path);
     if (!providedPath.isAbsolute() && localFile.exists()) {
       Path resolvedPath = new Path("file://" + localFile.getAbsolutePath());
-      LOG.info("Resolving file " + path + " to be a local file.");
+      LOG.info("Resolving file {} to be a local file.", path);
       return resolvedPath;
     }
-    LOG.info("Resolving file " + path + "to be a remote file.");
+    LOG.info("Resolving file {} to be a remote file.", path);
     return providedPath;
   }
 
@@ -201,7 +201,7 @@ public class HadoopFSUtils {
     try {
       fsDataInputStream = fs.open(convertToHadoopPath(filePath), bufferSize);
     } catch (IOException e) {
-      throw new HoodieIOException("Exception creating input stream from file: 
" + filePath, e);
+      throw new HoodieIOException(String.format("Exception creating input 
stream from file: %s", filePath), e);
     }
 
     if (isGCSFileSystem(fs)) {
diff --git 
a/hudi-hadoop-common/src/main/java/org/apache/hudi/storage/hadoop/HoodieHadoopStorage.java
 
b/hudi-hadoop-common/src/main/java/org/apache/hudi/storage/hadoop/HoodieHadoopStorage.java
index 9785f42989d..975e4267f0c 100644
--- 
a/hudi-hadoop-common/src/main/java/org/apache/hudi/storage/hadoop/HoodieHadoopStorage.java
+++ 
b/hudi-hadoop-common/src/main/java/org/apache/hudi/storage/hadoop/HoodieHadoopStorage.java
@@ -75,6 +75,21 @@ public class HoodieHadoopStorage extends HoodieStorage {
     return fs.create(convertToHadoopPath(path), overwrite);
   }
 
+  @Override
+  public OutputStream create(StoragePath path, boolean overwrite, Integer 
bufferSize, Short replication, Long sizeThreshold) throws IOException {
+    return fs.create(convertToHadoopPath(path), false, bufferSize, 
replication, sizeThreshold, null);
+  }
+
+  @Override
+  public int getDefaultBufferSize() {
+    return fs.getConf().getInt("io.file.buffer.size", 4096);
+  }
+
+  @Override
+  public short getDefaultReplication(StoragePath path) {
+    return fs.getDefaultReplication(convertToHadoopPath(path));
+  }
+
   @Override
   public InputStream open(StoragePath path) throws IOException {
     return fs.open(convertToHadoopPath(path));
diff --git a/hudi-io/src/main/java/org/apache/hudi/storage/HoodieStorage.java 
b/hudi-io/src/main/java/org/apache/hudi/storage/HoodieStorage.java
index b8735cc89d9..5abb1ac13c9 100644
--- a/hudi-io/src/main/java/org/apache/hudi/storage/HoodieStorage.java
+++ b/hudi-io/src/main/java/org/apache/hudi/storage/HoodieStorage.java
@@ -59,6 +59,18 @@ public abstract class HoodieStorage implements Closeable {
   @PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)
   public abstract int getDefaultBlockSize(StoragePath path);
 
+  /**
+   * @return the default buffer size.
+   */
+  @PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)
+  public abstract int getDefaultBufferSize();
+
+  /**
+   * @return the default block replication
+   */
+  @PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)
+  public abstract short getDefaultReplication(StoragePath path);
+
   /**
    * Returns a URI which identifies this HoodieStorage.
    *
@@ -79,6 +91,21 @@ public abstract class HoodieStorage implements Closeable {
   @PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)
   public abstract OutputStream create(StoragePath path, boolean overwrite) 
throws IOException;
 
+  /**
+   * Creates an OutputStream at the indicated path.
+   *
+   * @param path          the file to create
+   * @param overwrite     if a file with this name already exists, then if 
{@code true},
+   *                      the file will be overwritten, and if {@code false} 
an exception will be thrown.
+   * @param bufferSize    the size of the buffer to be used
+   * @param replication   required block replication for the file
+   * @param sizeThreshold block size
+   * @return the OutputStream to write to.
+   * @throws IOException IO error.
+   */
+  @PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)
+  public abstract OutputStream create(StoragePath path, boolean overwrite, 
Integer bufferSize, Short replication, Long sizeThreshold) throws IOException;
+
   /**
    * Opens an InputStream at the indicated path.
    *

Reply via email to