This is an automated email from the ASF dual-hosted git repository. yihua pushed a commit to branch branch-0.x in repository https://gitbox.apache.org/repos/asf/hudi.git
commit c8c52e252fb8b2e273cab089359e4d025bf9fe08 Author: Y Ethan Guo <[email protected]> AuthorDate: Tue May 14 17:02:25 2024 -0700 [HUDI-7635] Add default block size and openSeekable APIs to HoodieStorage (#11048) This PR adds `getDefaultBlockSize` and `openSeekable` APIs to `HoodieStorage` and implements these APIs in `HoodieHadoopStorage`. The implementation follows the same logic of creating seekable input stream for log file reading, and `openSeekable` will be used by the log reading logic. A few util methods are moved from `FSUtils` and `HoodieLogFileReader` classes to `HadoopFSUtilsclass`. --- .../java/org/apache/hudi/common/fs/FSUtils.java | 18 ----- .../hudi/common/table/log/HoodieLogFileReader.java | 75 +----------------- .../org/apache/hudi/hadoop/fs/HadoopFSUtils.java | 90 ++++++++++++++++++++++ .../hudi/storage/hadoop/HoodieHadoopStorage.java | 13 ++++ .../org/apache/hudi/storage/HoodieStorage.java | 30 ++++++++ .../hudi/io/storage/TestHoodieStorageBase.java | 43 +++++++++++ 6 files changed, 179 insertions(+), 90 deletions(-) diff --git a/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java index 292c2b41946..1b51fd78bfa 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java @@ -667,24 +667,6 @@ public class FSUtils { return fs.getUri() + fullPartitionPath.toUri().getRawPath(); } - /** - * This is due to HUDI-140 GCS has a different behavior for detecting EOF during seek(). - * - * @param fs fileSystem instance. - * @return true if the inputstream or the wrapped one is of type GoogleHadoopFSInputStream - */ - public static boolean isGCSFileSystem(FileSystem fs) { - return fs.getScheme().equals(StorageSchemes.GCS.getScheme()); - } - - /** - * Chdfs will throw {@code IOException} instead of {@code EOFException}. It will cause error in isBlockCorrupted(). - * Wrapped by {@code BoundedFsDataInputStream}, to check whether the desired offset is out of the file size in advance. - */ - public static boolean isCHDFileSystem(FileSystem fs) { - return StorageSchemes.CHDFS.getScheme().equals(fs.getScheme()); - } - public static Configuration registerFileSystem(Path file, Configuration conf) { Configuration returnConf = new Configuration(conf); String scheme = HadoopFSUtils.getFs(file.toString(), conf).getScheme(); diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFileReader.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFileReader.java index c1daf5e32d1..062e3639073 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFileReader.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFileReader.java @@ -37,20 +37,15 @@ import org.apache.hudi.common.util.Option; import org.apache.hudi.exception.CorruptedLogFileException; import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.exception.HoodieNotSupportedException; -import org.apache.hudi.hadoop.fs.BoundedFsDataInputStream; import org.apache.hudi.hadoop.fs.HadoopSeekableDataInputStream; -import org.apache.hudi.hadoop.fs.SchemeAwareFSDataInputStream; -import org.apache.hudi.hadoop.fs.TimedFSDataInputStream; import org.apache.hudi.internal.schema.InternalSchema; import org.apache.hudi.io.SeekableDataInputStream; import org.apache.hudi.io.util.IOUtils; +import org.apache.hudi.storage.StoragePath; import org.apache.hudi.storage.StorageSchemes; import org.apache.avro.Schema; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.BufferedFSInputStream; -import org.apache.hadoop.fs.FSDataInputStream; -import org.apache.hadoop.fs.FSInputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.slf4j.Logger; @@ -67,6 +62,7 @@ import java.util.Objects; import static org.apache.hudi.common.util.ValidationUtils.checkArgument; import static org.apache.hudi.common.util.ValidationUtils.checkState; +import static org.apache.hudi.hadoop.fs.HadoopFSUtils.getFSDataInputStream; /** * Scans a log file and provides block level iterator on the log file Loads the entire block contents in memory Can emit @@ -479,71 +475,6 @@ public class HoodieLogFileReader implements HoodieLogFormat.Reader { private static SeekableDataInputStream getDataInputStream(FileSystem fs, HoodieLogFile logFile, int bufferSize) { - return new HadoopSeekableDataInputStream(getFSDataInputStream(fs, logFile, bufferSize)); - } - - /** - * Fetch the right {@link FSDataInputStream} to be used by wrapping with required input streams. - * - * @param fs instance of {@link FileSystem} in use. - * @param bufferSize buffer size to be used. - * @return the right {@link FSDataInputStream} as required. - */ - private static FSDataInputStream getFSDataInputStream(FileSystem fs, - HoodieLogFile logFile, - int bufferSize) { - FSDataInputStream fsDataInputStream = null; - try { - fsDataInputStream = fs.open(logFile.getPath(), bufferSize); - } catch (IOException e) { - throw new HoodieIOException("Exception creating input stream from file: " + logFile, e); - } - - if (FSUtils.isGCSFileSystem(fs)) { - // in GCS FS, we might need to interceptor seek offsets as we might get EOF exception - return new SchemeAwareFSDataInputStream(getFSDataInputStreamForGCS(fsDataInputStream, logFile, bufferSize), true); - } - - if (FSUtils.isCHDFileSystem(fs)) { - return new BoundedFsDataInputStream(fs, logFile.getPath(), fsDataInputStream); - } - - if (fsDataInputStream.getWrappedStream() instanceof FSInputStream) { - return new TimedFSDataInputStream(logFile.getPath(), new FSDataInputStream( - new BufferedFSInputStream((FSInputStream) fsDataInputStream.getWrappedStream(), bufferSize))); - } - - // fsDataInputStream.getWrappedStream() maybe a BufferedFSInputStream - // need to wrap in another BufferedFSInputStream the make bufferSize work? - return fsDataInputStream; - } - - /** - * GCS FileSystem needs some special handling for seek and hence this method assists to fetch the right {@link FSDataInputStream} to be - * used by wrapping with required input streams. - * @param fsDataInputStream original instance of {@link FSDataInputStream}. - * @param bufferSize buffer size to be used. - * @return the right {@link FSDataInputStream} as required. - */ - private static FSDataInputStream getFSDataInputStreamForGCS(FSDataInputStream fsDataInputStream, - HoodieLogFile logFile, - int bufferSize) { - // in case of GCS FS, there are two flows. - // a. fsDataInputStream.getWrappedStream() instanceof FSInputStream - // b. fsDataInputStream.getWrappedStream() not an instanceof FSInputStream, but an instance of FSDataInputStream. - // (a) is handled in the first if block and (b) is handled in the second if block. If not, we fallback to original fsDataInputStream - if (fsDataInputStream.getWrappedStream() instanceof FSInputStream) { - return new TimedFSDataInputStream(logFile.getPath(), new FSDataInputStream( - new BufferedFSInputStream((FSInputStream) fsDataInputStream.getWrappedStream(), bufferSize))); - } - - if (fsDataInputStream.getWrappedStream() instanceof FSDataInputStream - && ((FSDataInputStream) fsDataInputStream.getWrappedStream()).getWrappedStream() instanceof FSInputStream) { - FSInputStream inputStream = (FSInputStream)((FSDataInputStream) fsDataInputStream.getWrappedStream()).getWrappedStream(); - return new TimedFSDataInputStream(logFile.getPath(), - new FSDataInputStream(new BufferedFSInputStream(inputStream, bufferSize))); - } - - return fsDataInputStream; + return new HadoopSeekableDataInputStream(getFSDataInputStream(fs, new StoragePath(logFile.getPath().toUri()), bufferSize)); } } diff --git a/hudi-hadoop-common/src/main/java/org/apache/hudi/hadoop/fs/HadoopFSUtils.java b/hudi-hadoop-common/src/main/java/org/apache/hudi/hadoop/fs/HadoopFSUtils.java index d59bffc9217..8eaa9398082 100644 --- a/hudi-hadoop-common/src/main/java/org/apache/hudi/hadoop/fs/HadoopFSUtils.java +++ b/hudi-hadoop-common/src/main/java/org/apache/hudi/hadoop/fs/HadoopFSUtils.java @@ -24,9 +24,13 @@ import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.storage.StorageConfiguration; import org.apache.hudi.storage.StoragePath; import org.apache.hudi.storage.StoragePathInfo; +import org.apache.hudi.storage.StorageSchemes; import org.apache.hudi.storage.hadoop.HadoopStorageConfiguration; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.BufferedFSInputStream; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSInputStream; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -154,4 +158,90 @@ public class HadoopFSUtils { pathInfo.getModificationTime(), convertToHadoopPath(pathInfo.getPath())); } + + /** + * Fetch the right {@link FSDataInputStream} to be used by wrapping with required input streams. + * + * @param fs instance of {@link FileSystem} in use. + * @param filePath path of the file. + * @param bufferSize buffer size to be used. + * @return the right {@link FSDataInputStream} as required. + */ + public static FSDataInputStream getFSDataInputStream(FileSystem fs, + StoragePath filePath, + int bufferSize) { + FSDataInputStream fsDataInputStream = null; + try { + fsDataInputStream = fs.open(convertToHadoopPath(filePath), bufferSize); + } catch (IOException e) { + throw new HoodieIOException("Exception creating input stream from file: " + filePath, e); + } + + if (isGCSFileSystem(fs)) { + // in GCS FS, we might need to interceptor seek offsets as we might get EOF exception + return new SchemeAwareFSDataInputStream(getFSDataInputStreamForGCS(fsDataInputStream, filePath, bufferSize), true); + } + + if (isCHDFileSystem(fs)) { + return new BoundedFsDataInputStream(fs, convertToHadoopPath(filePath), fsDataInputStream); + } + + if (fsDataInputStream.getWrappedStream() instanceof FSInputStream) { + return new TimedFSDataInputStream(convertToHadoopPath(filePath), new FSDataInputStream( + new BufferedFSInputStream((FSInputStream) fsDataInputStream.getWrappedStream(), bufferSize))); + } + + // fsDataInputStream.getWrappedStream() maybe a BufferedFSInputStream + // need to wrap in another BufferedFSInputStream the make bufferSize work? + return fsDataInputStream; + } + + /** + * GCS FileSystem needs some special handling for seek and hence this method assists to fetch the right {@link FSDataInputStream} to be + * used by wrapping with required input streams. + * + * @param fsDataInputStream original instance of {@link FSDataInputStream}. + * @param filePath path of the file. + * @param bufferSize buffer size to be used. + * @return the right {@link FSDataInputStream} as required. + */ + private static FSDataInputStream getFSDataInputStreamForGCS(FSDataInputStream fsDataInputStream, + StoragePath filePath, + int bufferSize) { + // in case of GCS FS, there are two flows. + // a. fsDataInputStream.getWrappedStream() instanceof FSInputStream + // b. fsDataInputStream.getWrappedStream() not an instanceof FSInputStream, but an instance of FSDataInputStream. + // (a) is handled in the first if block and (b) is handled in the second if block. If not, we fallback to original fsDataInputStream + if (fsDataInputStream.getWrappedStream() instanceof FSInputStream) { + return new TimedFSDataInputStream(convertToHadoopPath(filePath), new FSDataInputStream( + new BufferedFSInputStream((FSInputStream) fsDataInputStream.getWrappedStream(), bufferSize))); + } + + if (fsDataInputStream.getWrappedStream() instanceof FSDataInputStream + && ((FSDataInputStream) fsDataInputStream.getWrappedStream()).getWrappedStream() instanceof FSInputStream) { + FSInputStream inputStream = (FSInputStream) ((FSDataInputStream) fsDataInputStream.getWrappedStream()).getWrappedStream(); + return new TimedFSDataInputStream(convertToHadoopPath(filePath), + new FSDataInputStream(new BufferedFSInputStream(inputStream, bufferSize))); + } + + return fsDataInputStream; + } + + /** + * This is due to HUDI-140 GCS has a different behavior for detecting EOF during seek(). + * + * @param fs fileSystem instance. + * @return true if the inputstream or the wrapped one is of type GoogleHadoopFSInputStream + */ + public static boolean isGCSFileSystem(FileSystem fs) { + return fs.getScheme().equals(StorageSchemes.GCS.getScheme()); + } + + /** + * Chdfs will throw {@code IOException} instead of {@code EOFException}. It will cause error in isBlockCorrupted(). + * Wrapped by {@code BoundedFsDataInputStream}, to check whether the desired offset is out of the file size in advance. + */ + public static boolean isCHDFileSystem(FileSystem fs) { + return StorageSchemes.CHDFS.getScheme().equals(fs.getScheme()); + } } diff --git a/hudi-hadoop-common/src/main/java/org/apache/hudi/storage/hadoop/HoodieHadoopStorage.java b/hudi-hadoop-common/src/main/java/org/apache/hudi/storage/hadoop/HoodieHadoopStorage.java index 54c1712be35..9785f42989d 100644 --- a/hudi-hadoop-common/src/main/java/org/apache/hudi/storage/hadoop/HoodieHadoopStorage.java +++ b/hudi-hadoop-common/src/main/java/org/apache/hudi/storage/hadoop/HoodieHadoopStorage.java @@ -20,6 +20,8 @@ package org.apache.hudi.storage.hadoop; import org.apache.hudi.hadoop.fs.HadoopFSUtils; +import org.apache.hudi.hadoop.fs.HadoopSeekableDataInputStream; +import org.apache.hudi.io.SeekableDataInputStream; import org.apache.hudi.storage.HoodieStorage; import org.apache.hudi.storage.StoragePath; import org.apache.hudi.storage.StoragePathFilter; @@ -63,6 +65,11 @@ public class HoodieHadoopStorage extends HoodieStorage { return fs.getUri(); } + @Override + public int getDefaultBlockSize(StoragePath path) { + return (int) fs.getDefaultBlockSize(convertToHadoopPath(path)); + } + @Override public OutputStream create(StoragePath path, boolean overwrite) throws IOException { return fs.create(convertToHadoopPath(path), overwrite); @@ -73,6 +80,12 @@ public class HoodieHadoopStorage extends HoodieStorage { return fs.open(convertToHadoopPath(path)); } + @Override + public SeekableDataInputStream openSeekable(StoragePath path, int bufferSize) throws IOException { + return new HadoopSeekableDataInputStream( + HadoopFSUtils.getFSDataInputStream(fs, path, bufferSize)); + } + @Override public OutputStream append(StoragePath path) throws IOException { return fs.append(convertToHadoopPath(path)); diff --git a/hudi-io/src/main/java/org/apache/hudi/storage/HoodieStorage.java b/hudi-io/src/main/java/org/apache/hudi/storage/HoodieStorage.java index 9ab5e9f9e08..adf9371c243 100644 --- a/hudi-io/src/main/java/org/apache/hudi/storage/HoodieStorage.java +++ b/hudi-io/src/main/java/org/apache/hudi/storage/HoodieStorage.java @@ -24,6 +24,7 @@ import org.apache.hudi.PublicAPIClass; import org.apache.hudi.PublicAPIMethod; import org.apache.hudi.common.util.Option; import org.apache.hudi.exception.HoodieIOException; +import org.apache.hudi.io.SeekableDataInputStream; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -52,6 +53,12 @@ public abstract class HoodieStorage implements Closeable { @PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING) public abstract String getScheme(); + /** + * @return the default block size. + */ + @PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING) + public abstract int getDefaultBlockSize(StoragePath path); + /** * Returns a URI which identifies this HoodieStorage. * @@ -82,6 +89,17 @@ public abstract class HoodieStorage implements Closeable { @PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING) public abstract InputStream open(StoragePath path) throws IOException; + /** + * Opens an SeekableDataInputStream at the indicated path with seeks supported. + * + * @param path the file to open. + * @param bufferSize buffer size to use. + * @return the InputStream to read from. + * @throws IOException IO error. + */ + @PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING) + public abstract SeekableDataInputStream openSeekable(StoragePath path, int bufferSize) throws IOException; + /** * Appends to an existing file (optional operation). * @@ -332,6 +350,18 @@ public abstract class HoodieStorage implements Closeable { } } + /** + * Opens an SeekableDataInputStream at the indicated path with seeks supported. + * + * @param path the file to open. + * @return the InputStream to read from. + * @throws IOException IO error. + */ + @PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING) + public SeekableDataInputStream openSeekable(StoragePath path) throws IOException { + return openSeekable(path, getDefaultBlockSize(path)); + } + /** * Lists the file info of the direct files/directories in the given list of paths, * if the paths are directory. diff --git a/hudi-io/src/test/java/org/apache/hudi/io/storage/TestHoodieStorageBase.java b/hudi-io/src/test/java/org/apache/hudi/io/storage/TestHoodieStorageBase.java index 460c831e1c0..e044599b115 100644 --- a/hudi-io/src/test/java/org/apache/hudi/io/storage/TestHoodieStorageBase.java +++ b/hudi-io/src/test/java/org/apache/hudi/io/storage/TestHoodieStorageBase.java @@ -20,6 +20,7 @@ package org.apache.hudi.io.storage; import org.apache.hudi.common.util.Option; +import org.apache.hudi.io.SeekableDataInputStream; import org.apache.hudi.io.util.IOUtils; import org.apache.hudi.storage.HoodieStorage; import org.apache.hudi.storage.StoragePath; @@ -36,6 +37,7 @@ import java.io.OutputStream; import java.net.URI; import java.net.URISyntaxException; import java.nio.file.Path; +import java.util.ArrayList; import java.util.Arrays; import java.util.Comparator; import java.util.List; @@ -148,6 +150,47 @@ public abstract class TestHoodieStorageBase { assertTrue(storage.createDirectory(path4)); } + @Test + public void testSeekable() throws IOException { + HoodieStorage storage = getHoodieStorage(); + StoragePath path = new StoragePath(getTempDir(), "testSeekable/1.file"); + assertFalse(storage.exists(path)); + byte[] data = new byte[] {2, 42, 49, (byte) 158, (byte) 233, 66, 9, 34, 79}; + + // By default, create overwrites the file + try (OutputStream stream = storage.create(path)) { + stream.write(data); + stream.flush(); + } + + try (SeekableDataInputStream seekableStream = storage.openSeekable(path)) { + validateSeekableDataInputStream(seekableStream, data); + } + + try (SeekableDataInputStream seekableStream = storage.openSeekable(path, 2)) { + validateSeekableDataInputStream(seekableStream, data); + } + } + + private void validateSeekableDataInputStream(SeekableDataInputStream seekableStream, + byte[] expectedData) throws IOException { + List<Integer> positionList = new ArrayList<>(); + // Adding these positions for testing non-contiguous and backward seeks + positionList.add(1); + positionList.add(expectedData.length / 2); + positionList.add(expectedData.length - 1); + for (int i = 0; i < expectedData.length; i++) { + positionList.add(i); + } + + assertEquals(0, seekableStream.getPos()); + for (Integer pos : positionList) { + seekableStream.seek(pos); + assertEquals(pos, (int) seekableStream.getPos()); + assertEquals(expectedData[pos], seekableStream.readByte()); + } + } + @Test public void testListing() throws IOException { HoodieStorage storage = getHoodieStorage();
