This is an automated email from the ASF dual-hosted git repository.

yihua pushed a commit to branch branch-0.x
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit 226c2c7f8951dee0b26fdec159147d59e4481fa7
Author: Vova Kolmakov <[email protected]>
AuthorDate: Wed May 15 01:22:13 2024 -0700

    [HUDI-7632] Remove FileSystem usage in HoodieLogFormatWriter (#11082)
---
 .../java/org/apache/hudi/common/fs/FSUtils.java    | 12 ++-------
 .../common/table/log/HoodieLogFormatWriter.java    | 30 ++++++++++------------
 .../org/apache/hudi/hadoop/fs/HadoopFSUtils.java   | 10 ++++----
 .../hudi/storage/hadoop/HoodieHadoopStorage.java   | 15 +++++++++++
 .../org/apache/hudi/storage/HoodieStorage.java     | 27 +++++++++++++++++++
 5 files changed, 62 insertions(+), 32 deletions(-)

diff --git a/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java 
b/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java
index 0b6d8699631..2e584dfb8f9 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java
@@ -666,14 +666,6 @@ public class FSUtils {
         : HoodieLogFile.LOGFILE_BASE_VERSION;
   }
 
-  public static int getDefaultBufferSize(final FileSystem fs) {
-    return fs.getConf().getInt("io.file.buffer.size", 4096);
-  }
-
-  public static Short getDefaultReplication(FileSystem fs, Path path) {
-    return fs.getDefaultReplication(path);
-  }
-
   /**
    * When a file was opened and the task died without closing the stream, 
another task executor cannot open because the
    * existing lease will be active. We will try to recover the lease, from 
HDFS. If a data node went down, it takes
@@ -681,11 +673,11 @@ public class FSUtils {
    */
   public static boolean recoverDFSFileLease(final DistributedFileSystem dfs, 
final Path p)
       throws IOException, InterruptedException {
-    LOG.info("Recover lease on dfs file " + p);
+    LOG.info("Recover lease on dfs file {}", p);
     // initiate the recovery
     boolean recovered = false;
     for (int nbAttempt = 0; nbAttempt < MAX_ATTEMPTS_RECOVER_LEASE; 
nbAttempt++) {
-      LOG.info("Attempt " + nbAttempt + " to recover lease on dfs file " + p);
+      LOG.info("Attempt {} to recover lease on dfs file {}", nbAttempt, p);
       recovered = dfs.recoverLease(p);
       if (recovered) {
         break;
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFormatWriter.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFormatWriter.java
index afc00cd22e6..295d4a14073 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFormatWriter.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFormatWriter.java
@@ -52,7 +52,6 @@ public class HoodieLogFormatWriter implements 
HoodieLogFormat.Writer {
   private FSDataOutputStream output;
 
   private final HoodieStorage storage;
-  private final FileSystem fs;
   private final long sizeThreshold;
   private final Integer bufferSize;
   private final Short replication;
@@ -66,21 +65,15 @@ public class HoodieLogFormatWriter implements 
HoodieLogFormat.Writer {
   HoodieLogFormatWriter(HoodieStorage storage, HoodieLogFile logFile, Integer 
bufferSize, Short replication, Long sizeThreshold,
                         String rolloverLogWriteToken, 
HoodieLogFileWriteCallback logFileWriteCallback) {
     this.storage = storage;
-    this.fs = (FileSystem) storage.getFileSystem();
     this.logFile = logFile;
     this.sizeThreshold = sizeThreshold;
-    this.bufferSize = bufferSize != null ? bufferSize : 
FSUtils.getDefaultBufferSize(fs);
-    this.replication = replication != null ? replication
-        : FSUtils.getDefaultReplication(fs, new 
Path(logFile.getPath().getParent().toString()));
+    this.bufferSize = bufferSize != null ? bufferSize : 
storage.getDefaultBufferSize();
+    this.replication = replication != null ? replication : 
storage.getDefaultReplication(logFile.getPath().getParent());
     this.rolloverLogWriteToken = rolloverLogWriteToken;
     this.logFileWriteCallback = logFileWriteCallback;
     addShutDownHook();
   }
 
-  public FileSystem getFs() {
-    return (FileSystem) storage.getFileSystem();
-  }
-
   @Override
   public HoodieLogFile getLogFile() {
     return logFile;
@@ -99,6 +92,7 @@ public class HoodieLogFormatWriter implements 
HoodieLogFormat.Writer {
   private FSDataOutputStream getOutputStream() throws IOException, 
InterruptedException {
     if (this.output == null) {
       Path path = new Path(logFile.getPath().toUri());
+      FileSystem fs = (FileSystem) storage.getFileSystem();
       if (fs.exists(path)) {
         boolean isAppendSupported = 
StorageSchemes.isAppendSupported(fs.getScheme());
         // here we use marker file to fence concurrent append to the same 
file. So it is safe to use speculation in spark now.
@@ -155,7 +149,7 @@ public class HoodieLogFormatWriter implements 
HoodieLogFormat.Writer {
     long startPos = originalOutputStream.getPos();
     long sizeWritten = 0;
     // HUDI-2655. here we wrap originalOutputStream to ensure huge blocks can 
be correctly written
-    FSDataOutputStream outputStream = new 
FSDataOutputStream(originalOutputStream, new 
FileSystem.Statistics(fs.getScheme()), startPos);
+    FSDataOutputStream outputStream = new 
FSDataOutputStream(originalOutputStream, new 
FileSystem.Statistics(storage.getScheme()), startPos);
     for (HoodieLogBlock block: blocks) {
       long startSize = outputStream.size();
 
@@ -227,8 +221,7 @@ public class HoodieLogFormatWriter implements 
HoodieLogFormat.Writer {
   private void rolloverIfNeeded() throws IOException {
     // Roll over if the size is past the threshold
     if (getCurrentSize() > sizeThreshold) {
-      LOG.info("CurrentSize " + getCurrentSize() + " has reached threshold " + 
sizeThreshold
-          + ". Rolling over to the next version");
+      LOG.info("CurrentSize {} has reached threshold {}. Rolling over to the 
next version", getCurrentSize(), sizeThreshold);
       rollOver();
     }
   }
@@ -241,12 +234,14 @@ public class HoodieLogFormatWriter implements 
HoodieLogFormat.Writer {
 
   private void createNewFile() throws IOException {
     logFileWriteCallback.preLogFileCreate(logFile);
-    this.output =
-        ((FileSystem) storage.getFileSystem()).create(
-            new Path(this.logFile.getPath().toUri()), false,
+    this.output = new FSDataOutputStream(
+        storage.create(
+            this.logFile.getPath(), false,
             bufferSize,
             replication,
-            WriterBuilder.DEFAULT_SIZE_THRESHOLD, null);
+            WriterBuilder.DEFAULT_SIZE_THRESHOLD),
+        new FileSystem.Statistics(storage.getScheme())
+    );
   }
 
   @Override
@@ -305,7 +300,7 @@ public class HoodieLogFormatWriter implements 
HoodieLogFormat.Writer {
             closeStream();
           }
         } catch (Exception e) {
-          LOG.warn("unable to close output stream for log file " + logFile, e);
+          LOG.warn(String.format("unable to close output stream for log file 
%s", logFile), e);
           // fail silently for any sort of exception
         }
       }
@@ -315,6 +310,7 @@ public class HoodieLogFormatWriter implements 
HoodieLogFormat.Writer {
 
   private void handleAppendExceptionOrRecoverLease(Path path, RemoteException 
e)
       throws IOException, InterruptedException {
+    FileSystem fs = (FileSystem) storage.getFileSystem();
     if (e.getMessage().contains(APPEND_UNAVAILABLE_EXCEPTION_MESSAGE)) {
       // This issue happens when all replicas for a file are down and/or being 
decommissioned.
       // The fs.append() API could append to the last block for a file. If the 
last block is full, a new block is
diff --git 
a/hudi-hadoop-common/src/main/java/org/apache/hudi/hadoop/fs/HadoopFSUtils.java 
b/hudi-hadoop-common/src/main/java/org/apache/hudi/hadoop/fs/HadoopFSUtils.java
index f8e3915e5e3..78b293ee75f 100644
--- 
a/hudi-hadoop-common/src/main/java/org/apache/hudi/hadoop/fs/HadoopFSUtils.java
+++ 
b/hudi-hadoop-common/src/main/java/org/apache/hudi/hadoop/fs/HadoopFSUtils.java
@@ -56,7 +56,7 @@ public class HadoopFSUtils {
     // look for all properties, prefixed to be picked up
     for (Map.Entry<String, String> prop : System.getenv().entrySet()) {
       if (prop.getKey().startsWith(HOODIE_ENV_PROPS_PREFIX)) {
-        LOG.info("Picking up value for hoodie env var :" + prop.getKey());
+        LOG.info("Picking up value for hoodie env var : {}", prop.getKey());
         conf.set(prop.getKey().replace(HOODIE_ENV_PROPS_PREFIX, 
"").replaceAll("_DOT_", "."), prop.getValue());
       }
     }
@@ -99,7 +99,7 @@ public class HadoopFSUtils {
     try {
       fs = path.getFileSystem(conf);
     } catch (IOException e) {
-      throw new HoodieIOException("Failed to get instance of " + 
FileSystem.class.getName(), e);
+      throw new HoodieIOException(String.format("Failed to get instance of 
%s", FileSystem.class.getName()), e);
     }
     return fs;
   }
@@ -135,10 +135,10 @@ public class HadoopFSUtils {
     File localFile = new File(path);
     if (!providedPath.isAbsolute() && localFile.exists()) {
       Path resolvedPath = new Path("file://" + localFile.getAbsolutePath());
-      LOG.info("Resolving file " + path + " to be a local file.");
+      LOG.info("Resolving file {} to be a local file.", path);
       return resolvedPath;
     }
-    LOG.info("Resolving file " + path + "to be a remote file.");
+    LOG.info("Resolving file {} to be a remote file.", path);
     return providedPath;
   }
 
@@ -201,7 +201,7 @@ public class HadoopFSUtils {
     try {
       fsDataInputStream = fs.open(convertToHadoopPath(filePath), bufferSize);
     } catch (IOException e) {
-      throw new HoodieIOException("Exception creating input stream from file: 
" + filePath, e);
+      throw new HoodieIOException(String.format("Exception creating input 
stream from file: %s", filePath), e);
     }
 
     if (isGCSFileSystem(fs)) {
diff --git 
a/hudi-hadoop-common/src/main/java/org/apache/hudi/storage/hadoop/HoodieHadoopStorage.java
 
b/hudi-hadoop-common/src/main/java/org/apache/hudi/storage/hadoop/HoodieHadoopStorage.java
index 9785f42989d..975e4267f0c 100644
--- 
a/hudi-hadoop-common/src/main/java/org/apache/hudi/storage/hadoop/HoodieHadoopStorage.java
+++ 
b/hudi-hadoop-common/src/main/java/org/apache/hudi/storage/hadoop/HoodieHadoopStorage.java
@@ -75,6 +75,21 @@ public class HoodieHadoopStorage extends HoodieStorage {
     return fs.create(convertToHadoopPath(path), overwrite);
   }
 
+  @Override
+  public OutputStream create(StoragePath path, boolean overwrite, Integer 
bufferSize, Short replication, Long sizeThreshold) throws IOException {
+    return fs.create(convertToHadoopPath(path), false, bufferSize, 
replication, sizeThreshold, null);
+  }
+
+  @Override
+  public int getDefaultBufferSize() {
+    return fs.getConf().getInt("io.file.buffer.size", 4096);
+  }
+
+  @Override
+  public short getDefaultReplication(StoragePath path) {
+    return fs.getDefaultReplication(convertToHadoopPath(path));
+  }
+
   @Override
   public InputStream open(StoragePath path) throws IOException {
     return fs.open(convertToHadoopPath(path));
diff --git a/hudi-io/src/main/java/org/apache/hudi/storage/HoodieStorage.java 
b/hudi-io/src/main/java/org/apache/hudi/storage/HoodieStorage.java
index b8735cc89d9..5abb1ac13c9 100644
--- a/hudi-io/src/main/java/org/apache/hudi/storage/HoodieStorage.java
+++ b/hudi-io/src/main/java/org/apache/hudi/storage/HoodieStorage.java
@@ -59,6 +59,18 @@ public abstract class HoodieStorage implements Closeable {
   @PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)
   public abstract int getDefaultBlockSize(StoragePath path);
 
+  /**
+   * @return the default buffer size.
+   */
+  @PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)
+  public abstract int getDefaultBufferSize();
+
+  /**
+   * @return the default block replication
+   */
+  @PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)
+  public abstract short getDefaultReplication(StoragePath path);
+
   /**
    * Returns a URI which identifies this HoodieStorage.
    *
@@ -79,6 +91,21 @@ public abstract class HoodieStorage implements Closeable {
   @PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)
   public abstract OutputStream create(StoragePath path, boolean overwrite) 
throws IOException;
 
+  /**
+   * Creates an OutputStream at the indicated path.
+   *
+   * @param path          the file to create
+   * @param overwrite     if a file with this name already exists, then if 
{@code true},
+   *                      the file will be overwritten, and if {@code false} 
an exception will be thrown.
+   * @param bufferSize    the size of the buffer to be used
+   * @param replication   required block replication for the file
+   * @param sizeThreshold block size
+   * @return the OutputStream to write to.
+   * @throws IOException IO error.
+   */
+  @PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)
+  public abstract OutputStream create(StoragePath path, boolean overwrite, 
Integer bufferSize, Short replication, Long sizeThreshold) throws IOException;
+
   /**
    * Opens an InputStream at the indicated path.
    *

Reply via email to