This is an automated email from the ASF dual-hosted git repository.

yihua pushed a commit to branch branch-0.x
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit c8c52e252fb8b2e273cab089359e4d025bf9fe08
Author: Y Ethan Guo <[email protected]>
AuthorDate: Tue May 14 17:02:25 2024 -0700

    [HUDI-7635] Add default block size and openSeekable APIs to HoodieStorage 
(#11048)
    
    This PR adds `getDefaultBlockSize` and `openSeekable` APIs to
    `HoodieStorage` and implements these APIs in `HoodieHadoopStorage`.
    The implementation follows the same logic of creating seekable input
    stream for log file reading, and `openSeekable` will be used by the log
    reading logic.
    
    A few util methods are moved from `FSUtils` and
    `HoodieLogFileReader` classes to `HadoopFSUtilsclass`.
---
 .../java/org/apache/hudi/common/fs/FSUtils.java    | 18 -----
 .../hudi/common/table/log/HoodieLogFileReader.java | 75 +-----------------
 .../org/apache/hudi/hadoop/fs/HadoopFSUtils.java   | 90 ++++++++++++++++++++++
 .../hudi/storage/hadoop/HoodieHadoopStorage.java   | 13 ++++
 .../org/apache/hudi/storage/HoodieStorage.java     | 30 ++++++++
 .../hudi/io/storage/TestHoodieStorageBase.java     | 43 +++++++++++
 6 files changed, 179 insertions(+), 90 deletions(-)

diff --git a/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java 
b/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java
index 292c2b41946..1b51fd78bfa 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java
@@ -667,24 +667,6 @@ public class FSUtils {
     return fs.getUri() + fullPartitionPath.toUri().getRawPath();
   }
 
-  /**
-   * This is due to HUDI-140 GCS has a different behavior for detecting EOF 
during seek().
-   *
-   * @param fs fileSystem instance.
-   * @return true if the inputstream or the wrapped one is of type 
GoogleHadoopFSInputStream
-   */
-  public static boolean isGCSFileSystem(FileSystem fs) {
-    return fs.getScheme().equals(StorageSchemes.GCS.getScheme());
-  }
-
-  /**
-   * Chdfs will throw {@code IOException} instead of {@code EOFException}. It 
will cause error in isBlockCorrupted().
-   * Wrapped by {@code BoundedFsDataInputStream}, to check whether the desired 
offset is out of the file size in advance.
-   */
-  public static boolean isCHDFileSystem(FileSystem fs) {
-    return StorageSchemes.CHDFS.getScheme().equals(fs.getScheme());
-  }
-
   public static Configuration registerFileSystem(Path file, Configuration 
conf) {
     Configuration returnConf = new Configuration(conf);
     String scheme = HadoopFSUtils.getFs(file.toString(), conf).getScheme();
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFileReader.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFileReader.java
index c1daf5e32d1..062e3639073 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFileReader.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFileReader.java
@@ -37,20 +37,15 @@ import org.apache.hudi.common.util.Option;
 import org.apache.hudi.exception.CorruptedLogFileException;
 import org.apache.hudi.exception.HoodieIOException;
 import org.apache.hudi.exception.HoodieNotSupportedException;
-import org.apache.hudi.hadoop.fs.BoundedFsDataInputStream;
 import org.apache.hudi.hadoop.fs.HadoopSeekableDataInputStream;
-import org.apache.hudi.hadoop.fs.SchemeAwareFSDataInputStream;
-import org.apache.hudi.hadoop.fs.TimedFSDataInputStream;
 import org.apache.hudi.internal.schema.InternalSchema;
 import org.apache.hudi.io.SeekableDataInputStream;
 import org.apache.hudi.io.util.IOUtils;
+import org.apache.hudi.storage.StoragePath;
 import org.apache.hudi.storage.StorageSchemes;
 
 import org.apache.avro.Schema;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.BufferedFSInputStream;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FSInputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.slf4j.Logger;
@@ -67,6 +62,7 @@ import java.util.Objects;
 
 import static org.apache.hudi.common.util.ValidationUtils.checkArgument;
 import static org.apache.hudi.common.util.ValidationUtils.checkState;
+import static org.apache.hudi.hadoop.fs.HadoopFSUtils.getFSDataInputStream;
 
 /**
  * Scans a log file and provides block level iterator on the log file Loads 
the entire block contents in memory Can emit
@@ -479,71 +475,6 @@ public class HoodieLogFileReader implements 
HoodieLogFormat.Reader {
   private static SeekableDataInputStream getDataInputStream(FileSystem fs,
                                                             HoodieLogFile 
logFile,
                                                             int bufferSize) {
-    return new HadoopSeekableDataInputStream(getFSDataInputStream(fs, logFile, 
bufferSize));
-  }
-
-  /**
-   * Fetch the right {@link FSDataInputStream} to be used by wrapping with 
required input streams.
-   *
-   * @param fs         instance of {@link FileSystem} in use.
-   * @param bufferSize buffer size to be used.
-   * @return the right {@link FSDataInputStream} as required.
-   */
-  private static FSDataInputStream getFSDataInputStream(FileSystem fs,
-                                                        HoodieLogFile logFile,
-                                                        int bufferSize) {
-    FSDataInputStream fsDataInputStream = null;
-    try {
-      fsDataInputStream = fs.open(logFile.getPath(), bufferSize);
-    } catch (IOException e) {
-      throw new HoodieIOException("Exception creating input stream from file: 
" + logFile, e);
-    }
-
-    if (FSUtils.isGCSFileSystem(fs)) {
-      // in GCS FS, we might need to interceptor seek offsets as we might get 
EOF exception
-      return new 
SchemeAwareFSDataInputStream(getFSDataInputStreamForGCS(fsDataInputStream, 
logFile, bufferSize), true);
-    }
-
-    if (FSUtils.isCHDFileSystem(fs)) {
-      return new BoundedFsDataInputStream(fs, logFile.getPath(), 
fsDataInputStream);
-    }
-
-    if (fsDataInputStream.getWrappedStream() instanceof FSInputStream) {
-      return new TimedFSDataInputStream(logFile.getPath(), new 
FSDataInputStream(
-          new BufferedFSInputStream((FSInputStream) 
fsDataInputStream.getWrappedStream(), bufferSize)));
-    }
-
-    // fsDataInputStream.getWrappedStream() maybe a BufferedFSInputStream
-    // need to wrap in another BufferedFSInputStream the make bufferSize work?
-    return fsDataInputStream;
-  }
-
-  /**
-   * GCS FileSystem needs some special handling for seek and hence this method 
assists to fetch the right {@link FSDataInputStream} to be
-   * used by wrapping with required input streams.
-   * @param fsDataInputStream original instance of {@link FSDataInputStream}.
-   * @param bufferSize buffer size to be used.
-   * @return the right {@link FSDataInputStream} as required.
-   */
-  private static FSDataInputStream 
getFSDataInputStreamForGCS(FSDataInputStream fsDataInputStream,
-                                                              HoodieLogFile 
logFile,
-                                                              int bufferSize) {
-    // in case of GCS FS, there are two flows.
-    // a. fsDataInputStream.getWrappedStream() instanceof FSInputStream
-    // b. fsDataInputStream.getWrappedStream() not an instanceof 
FSInputStream, but an instance of FSDataInputStream.
-    // (a) is handled in the first if block and (b) is handled in the second 
if block. If not, we fallback to original fsDataInputStream
-    if (fsDataInputStream.getWrappedStream() instanceof FSInputStream) {
-      return new TimedFSDataInputStream(logFile.getPath(), new 
FSDataInputStream(
-          new BufferedFSInputStream((FSInputStream) 
fsDataInputStream.getWrappedStream(), bufferSize)));
-    }
-
-    if (fsDataInputStream.getWrappedStream() instanceof FSDataInputStream
-        && ((FSDataInputStream) 
fsDataInputStream.getWrappedStream()).getWrappedStream() instanceof 
FSInputStream) {
-      FSInputStream inputStream = (FSInputStream)((FSDataInputStream) 
fsDataInputStream.getWrappedStream()).getWrappedStream();
-      return new TimedFSDataInputStream(logFile.getPath(),
-          new FSDataInputStream(new BufferedFSInputStream(inputStream, 
bufferSize)));
-    }
-
-    return fsDataInputStream;
+    return new HadoopSeekableDataInputStream(getFSDataInputStream(fs, new 
StoragePath(logFile.getPath().toUri()), bufferSize));
   }
 }
diff --git 
a/hudi-hadoop-common/src/main/java/org/apache/hudi/hadoop/fs/HadoopFSUtils.java 
b/hudi-hadoop-common/src/main/java/org/apache/hudi/hadoop/fs/HadoopFSUtils.java
index d59bffc9217..8eaa9398082 100644
--- 
a/hudi-hadoop-common/src/main/java/org/apache/hudi/hadoop/fs/HadoopFSUtils.java
+++ 
b/hudi-hadoop-common/src/main/java/org/apache/hudi/hadoop/fs/HadoopFSUtils.java
@@ -24,9 +24,13 @@ import org.apache.hudi.exception.HoodieIOException;
 import org.apache.hudi.storage.StorageConfiguration;
 import org.apache.hudi.storage.StoragePath;
 import org.apache.hudi.storage.StoragePathInfo;
+import org.apache.hudi.storage.StorageSchemes;
 import org.apache.hudi.storage.hadoop.HadoopStorageConfiguration;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.BufferedFSInputStream;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSInputStream;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -154,4 +158,90 @@ public class HadoopFSUtils {
         pathInfo.getModificationTime(),
         convertToHadoopPath(pathInfo.getPath()));
   }
+
+  /**
+   * Fetch the right {@link FSDataInputStream} to be used by wrapping with 
required input streams.
+   *
+   * @param fs         instance of {@link FileSystem} in use.
+   * @param filePath   path of the file.
+   * @param bufferSize buffer size to be used.
+   * @return the right {@link FSDataInputStream} as required.
+   */
+  public static FSDataInputStream getFSDataInputStream(FileSystem fs,
+                                                       StoragePath filePath,
+                                                       int bufferSize) {
+    FSDataInputStream fsDataInputStream = null;
+    try {
+      fsDataInputStream = fs.open(convertToHadoopPath(filePath), bufferSize);
+    } catch (IOException e) {
+      throw new HoodieIOException("Exception creating input stream from file: 
" + filePath, e);
+    }
+
+    if (isGCSFileSystem(fs)) {
+      // in GCS FS, we might need to interceptor seek offsets as we might get 
EOF exception
+      return new 
SchemeAwareFSDataInputStream(getFSDataInputStreamForGCS(fsDataInputStream, 
filePath, bufferSize), true);
+    }
+
+    if (isCHDFileSystem(fs)) {
+      return new BoundedFsDataInputStream(fs, convertToHadoopPath(filePath), 
fsDataInputStream);
+    }
+
+    if (fsDataInputStream.getWrappedStream() instanceof FSInputStream) {
+      return new TimedFSDataInputStream(convertToHadoopPath(filePath), new 
FSDataInputStream(
+          new BufferedFSInputStream((FSInputStream) 
fsDataInputStream.getWrappedStream(), bufferSize)));
+    }
+
+    // fsDataInputStream.getWrappedStream() maybe a BufferedFSInputStream
+    // need to wrap in another BufferedFSInputStream the make bufferSize work?
+    return fsDataInputStream;
+  }
+
+  /**
+   * GCS FileSystem needs some special handling for seek and hence this method 
assists to fetch the right {@link FSDataInputStream} to be
+   * used by wrapping with required input streams.
+   *
+   * @param fsDataInputStream original instance of {@link FSDataInputStream}.
+   * @param filePath          path of the file.
+   * @param bufferSize        buffer size to be used.
+   * @return the right {@link FSDataInputStream} as required.
+   */
+  private static FSDataInputStream 
getFSDataInputStreamForGCS(FSDataInputStream fsDataInputStream,
+                                                              StoragePath 
filePath,
+                                                              int bufferSize) {
+    // in case of GCS FS, there are two flows.
+    // a. fsDataInputStream.getWrappedStream() instanceof FSInputStream
+    // b. fsDataInputStream.getWrappedStream() not an instanceof 
FSInputStream, but an instance of FSDataInputStream.
+    // (a) is handled in the first if block and (b) is handled in the second 
if block. If not, we fallback to original fsDataInputStream
+    if (fsDataInputStream.getWrappedStream() instanceof FSInputStream) {
+      return new TimedFSDataInputStream(convertToHadoopPath(filePath), new 
FSDataInputStream(
+          new BufferedFSInputStream((FSInputStream) 
fsDataInputStream.getWrappedStream(), bufferSize)));
+    }
+
+    if (fsDataInputStream.getWrappedStream() instanceof FSDataInputStream
+        && ((FSDataInputStream) 
fsDataInputStream.getWrappedStream()).getWrappedStream() instanceof 
FSInputStream) {
+      FSInputStream inputStream = (FSInputStream) ((FSDataInputStream) 
fsDataInputStream.getWrappedStream()).getWrappedStream();
+      return new TimedFSDataInputStream(convertToHadoopPath(filePath),
+          new FSDataInputStream(new BufferedFSInputStream(inputStream, 
bufferSize)));
+    }
+
+    return fsDataInputStream;
+  }
+
+  /**
+   * This is due to HUDI-140 GCS has a different behavior for detecting EOF 
during seek().
+   *
+   * @param fs fileSystem instance.
+   * @return true if the inputstream or the wrapped one is of type 
GoogleHadoopFSInputStream
+   */
+  public static boolean isGCSFileSystem(FileSystem fs) {
+    return fs.getScheme().equals(StorageSchemes.GCS.getScheme());
+  }
+
+  /**
+   * Chdfs will throw {@code IOException} instead of {@code EOFException}. It 
will cause error in isBlockCorrupted().
+   * Wrapped by {@code BoundedFsDataInputStream}, to check whether the desired 
offset is out of the file size in advance.
+   */
+  public static boolean isCHDFileSystem(FileSystem fs) {
+    return StorageSchemes.CHDFS.getScheme().equals(fs.getScheme());
+  }
 }
diff --git 
a/hudi-hadoop-common/src/main/java/org/apache/hudi/storage/hadoop/HoodieHadoopStorage.java
 
b/hudi-hadoop-common/src/main/java/org/apache/hudi/storage/hadoop/HoodieHadoopStorage.java
index 54c1712be35..9785f42989d 100644
--- 
a/hudi-hadoop-common/src/main/java/org/apache/hudi/storage/hadoop/HoodieHadoopStorage.java
+++ 
b/hudi-hadoop-common/src/main/java/org/apache/hudi/storage/hadoop/HoodieHadoopStorage.java
@@ -20,6 +20,8 @@
 package org.apache.hudi.storage.hadoop;
 
 import org.apache.hudi.hadoop.fs.HadoopFSUtils;
+import org.apache.hudi.hadoop.fs.HadoopSeekableDataInputStream;
+import org.apache.hudi.io.SeekableDataInputStream;
 import org.apache.hudi.storage.HoodieStorage;
 import org.apache.hudi.storage.StoragePath;
 import org.apache.hudi.storage.StoragePathFilter;
@@ -63,6 +65,11 @@ public class HoodieHadoopStorage extends HoodieStorage {
     return fs.getUri();
   }
 
+  @Override
+  public int getDefaultBlockSize(StoragePath path) {
+    return (int) fs.getDefaultBlockSize(convertToHadoopPath(path));
+  }
+
   @Override
   public OutputStream create(StoragePath path, boolean overwrite) throws 
IOException {
     return fs.create(convertToHadoopPath(path), overwrite);
@@ -73,6 +80,12 @@ public class HoodieHadoopStorage extends HoodieStorage {
     return fs.open(convertToHadoopPath(path));
   }
 
+  @Override
+  public SeekableDataInputStream openSeekable(StoragePath path, int 
bufferSize) throws IOException {
+    return new HadoopSeekableDataInputStream(
+        HadoopFSUtils.getFSDataInputStream(fs, path, bufferSize));
+  }
+
   @Override
   public OutputStream append(StoragePath path) throws IOException {
     return fs.append(convertToHadoopPath(path));
diff --git a/hudi-io/src/main/java/org/apache/hudi/storage/HoodieStorage.java 
b/hudi-io/src/main/java/org/apache/hudi/storage/HoodieStorage.java
index 9ab5e9f9e08..adf9371c243 100644
--- a/hudi-io/src/main/java/org/apache/hudi/storage/HoodieStorage.java
+++ b/hudi-io/src/main/java/org/apache/hudi/storage/HoodieStorage.java
@@ -24,6 +24,7 @@ import org.apache.hudi.PublicAPIClass;
 import org.apache.hudi.PublicAPIMethod;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.io.SeekableDataInputStream;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -52,6 +53,12 @@ public abstract class HoodieStorage implements Closeable {
   @PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)
   public abstract String getScheme();
 
+  /**
+   * @return the default block size.
+   */
+  @PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)
+  public abstract int getDefaultBlockSize(StoragePath path);
+
   /**
    * Returns a URI which identifies this HoodieStorage.
    *
@@ -82,6 +89,17 @@ public abstract class HoodieStorage implements Closeable {
   @PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)
   public abstract InputStream open(StoragePath path) throws IOException;
 
+  /**
+   * Opens an SeekableDataInputStream at the indicated path with seeks 
supported.
+   *
+   * @param path       the file to open.
+   * @param bufferSize buffer size to use.
+   * @return the InputStream to read from.
+   * @throws IOException IO error.
+   */
+  @PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)
+  public abstract SeekableDataInputStream openSeekable(StoragePath path, int 
bufferSize) throws IOException;
+
   /**
    * Appends to an existing file (optional operation).
    *
@@ -332,6 +350,18 @@ public abstract class HoodieStorage implements Closeable {
     }
   }
 
+  /**
+   * Opens an SeekableDataInputStream at the indicated path with seeks 
supported.
+   *
+   * @param path the file to open.
+   * @return the InputStream to read from.
+   * @throws IOException IO error.
+   */
+  @PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)
+  public SeekableDataInputStream openSeekable(StoragePath path) throws 
IOException {
+    return openSeekable(path, getDefaultBlockSize(path));
+  }
+
   /**
    * Lists the file info of the direct files/directories in the given list of 
paths,
    * if the paths are directory.
diff --git 
a/hudi-io/src/test/java/org/apache/hudi/io/storage/TestHoodieStorageBase.java 
b/hudi-io/src/test/java/org/apache/hudi/io/storage/TestHoodieStorageBase.java
index 460c831e1c0..e044599b115 100644
--- 
a/hudi-io/src/test/java/org/apache/hudi/io/storage/TestHoodieStorageBase.java
+++ 
b/hudi-io/src/test/java/org/apache/hudi/io/storage/TestHoodieStorageBase.java
@@ -20,6 +20,7 @@
 package org.apache.hudi.io.storage;
 
 import org.apache.hudi.common.util.Option;
+import org.apache.hudi.io.SeekableDataInputStream;
 import org.apache.hudi.io.util.IOUtils;
 import org.apache.hudi.storage.HoodieStorage;
 import org.apache.hudi.storage.StoragePath;
@@ -36,6 +37,7 @@ import java.io.OutputStream;
 import java.net.URI;
 import java.net.URISyntaxException;
 import java.nio.file.Path;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Comparator;
 import java.util.List;
@@ -148,6 +150,47 @@ public abstract class TestHoodieStorageBase {
     assertTrue(storage.createDirectory(path4));
   }
 
+  @Test
+  public void testSeekable() throws IOException {
+    HoodieStorage storage = getHoodieStorage();
+    StoragePath path = new StoragePath(getTempDir(), "testSeekable/1.file");
+    assertFalse(storage.exists(path));
+    byte[] data = new byte[] {2, 42, 49, (byte) 158, (byte) 233, 66, 9, 34, 
79};
+
+    // By default, create overwrites the file
+    try (OutputStream stream = storage.create(path)) {
+      stream.write(data);
+      stream.flush();
+    }
+
+    try (SeekableDataInputStream seekableStream = storage.openSeekable(path)) {
+      validateSeekableDataInputStream(seekableStream, data);
+    }
+
+    try (SeekableDataInputStream seekableStream = storage.openSeekable(path, 
2)) {
+      validateSeekableDataInputStream(seekableStream, data);
+    }
+  }
+
+  private void validateSeekableDataInputStream(SeekableDataInputStream 
seekableStream,
+                                               byte[] expectedData) throws 
IOException {
+    List<Integer> positionList = new ArrayList<>();
+    // Adding these positions for testing non-contiguous and backward seeks
+    positionList.add(1);
+    positionList.add(expectedData.length / 2);
+    positionList.add(expectedData.length - 1);
+    for (int i = 0; i < expectedData.length; i++) {
+      positionList.add(i);
+    }
+
+    assertEquals(0, seekableStream.getPos());
+    for (Integer pos : positionList) {
+      seekableStream.seek(pos);
+      assertEquals(pos, (int) seekableStream.getPos());
+      assertEquals(expectedData[pos], seekableStream.readByte());
+    }
+  }
+
   @Test
   public void testListing() throws IOException {
     HoodieStorage storage = getHoodieStorage();

Reply via email to