This is an automated email from the ASF dual-hosted git repository. yihua pushed a commit to branch branch-0.x in repository https://gitbox.apache.org/repos/asf/hudi.git
commit 91e176c0ef7f811be932534377599bde42379359 Author: Y Ethan Guo <[email protected]> AuthorDate: Wed Feb 28 10:29:24 2024 -0800 [HUDI-7431] Add replication and block size to StoragePathInfo to be backwards compatible (#10717) --- .../org/apache/hudi/hadoop/fs/HadoopFSUtils.java | 47 ++++++++ .../hudi/storage/hadoop/HoodieHadoopStorage.java | 34 ++---- .../apache/hudi/hadoop/fs/TestHadoopFSUtils.java | 126 +++++++++++++++++++++ .../org/apache/hudi/storage/StoragePathInfo.java | 24 ++++ .../hudi/io/storage/TestHoodieStorageBase.java | 45 ++++---- .../hudi/io/storage/TestStoragePathInfo.java | 16 +-- 6 files changed, 242 insertions(+), 50 deletions(-) diff --git a/hudi-hadoop-common/src/main/java/org/apache/hudi/hadoop/fs/HadoopFSUtils.java b/hudi-hadoop-common/src/main/java/org/apache/hudi/hadoop/fs/HadoopFSUtils.java index be38dfe8d6d..d59bffc9217 100644 --- a/hudi-hadoop-common/src/main/java/org/apache/hudi/hadoop/fs/HadoopFSUtils.java +++ b/hudi-hadoop-common/src/main/java/org/apache/hudi/hadoop/fs/HadoopFSUtils.java @@ -22,9 +22,12 @@ package org.apache.hudi.hadoop.fs; import org.apache.hudi.common.util.ValidationUtils; import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.storage.StorageConfiguration; +import org.apache.hudi.storage.StoragePath; +import org.apache.hudi.storage.StoragePathInfo; import org.apache.hudi.storage.hadoop.HadoopStorageConfiguration; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.slf4j.Logger; @@ -107,4 +110,48 @@ public class HadoopFSUtils { LOG.info("Resolving file " + path + "to be a remote file."); return providedPath; } + + /** + * @param path {@link StoragePath} instance. + * @return the Hadoop {@link Path} instance after conversion. + */ + public static Path convertToHadoopPath(StoragePath path) { + return new Path(path.toUri()); + } + + /** + * @param path Hadoop {@link Path} instance. + * @return the {@link StoragePath} instance after conversion. + */ + public static StoragePath convertToStoragePath(Path path) { + return new StoragePath(path.toUri()); + } + + /** + * @param fileStatus Hadoop {@link FileStatus} instance. + * @return the {@link StoragePathInfo} instance after conversion. + */ + public static StoragePathInfo convertToStoragePathInfo(FileStatus fileStatus) { + return new StoragePathInfo( + convertToStoragePath(fileStatus.getPath()), + fileStatus.getLen(), + fileStatus.isDirectory(), + fileStatus.getReplication(), + fileStatus.getBlockSize(), + fileStatus.getModificationTime()); + } + + /** + * @param pathInfo {@link StoragePathInfo} instance. + * @return the {@link FileStatus} instance after conversion. + */ + public static FileStatus convertToHadoopFileStatus(StoragePathInfo pathInfo) { + return new FileStatus( + pathInfo.getLength(), + pathInfo.isDirectory(), + pathInfo.getBlockReplication(), + pathInfo.getBlockSize(), + pathInfo.getModificationTime(), + convertToHadoopPath(pathInfo.getPath())); + } } diff --git a/hudi-hadoop-common/src/main/java/org/apache/hudi/storage/hadoop/HoodieHadoopStorage.java b/hudi-hadoop-common/src/main/java/org/apache/hudi/storage/hadoop/HoodieHadoopStorage.java index 87d4d9667e6..54c1712be35 100644 --- a/hudi-hadoop-common/src/main/java/org/apache/hudi/storage/hadoop/HoodieHadoopStorage.java +++ b/hudi-hadoop-common/src/main/java/org/apache/hudi/storage/hadoop/HoodieHadoopStorage.java @@ -19,12 +19,12 @@ package org.apache.hudi.storage.hadoop; +import org.apache.hudi.hadoop.fs.HadoopFSUtils; import org.apache.hudi.storage.HoodieStorage; import org.apache.hudi.storage.StoragePath; import org.apache.hudi.storage.StoragePathFilter; import org.apache.hudi.storage.StoragePathInfo; -import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.LocatedFileStatus; import org.apache.hadoop.fs.Path; @@ -39,6 +39,10 @@ import java.util.Arrays; import java.util.List; import java.util.stream.Collectors; +import static org.apache.hudi.hadoop.fs.HadoopFSUtils.convertToHadoopPath; +import static org.apache.hudi.hadoop.fs.HadoopFSUtils.convertToStoragePath; +import static org.apache.hudi.hadoop.fs.HadoopFSUtils.convertToStoragePathInfo; + /** * Implementation of {@link HoodieStorage} using Hadoop's {@link FileSystem} */ @@ -92,7 +96,7 @@ public class HoodieHadoopStorage extends HoodieStorage { @Override public List<StoragePathInfo> listDirectEntries(StoragePath path) throws IOException { return Arrays.stream(fs.listStatus(convertToHadoopPath(path))) - .map(this::convertToStoragePathInfo) + .map(HadoopFSUtils::convertToStoragePathInfo) .collect(Collectors.toList()); } @@ -109,9 +113,9 @@ public class HoodieHadoopStorage extends HoodieStorage { @Override public List<StoragePathInfo> listDirectEntries(List<StoragePath> pathList) throws IOException { return Arrays.stream(fs.listStatus(pathList.stream() - .map(this::convertToHadoopPath) + .map(HadoopFSUtils::convertToHadoopPath) .toArray(Path[]::new))) - .map(this::convertToStoragePathInfo) + .map(HadoopFSUtils::convertToStoragePathInfo) .collect(Collectors.toList()); } @@ -122,7 +126,7 @@ public class HoodieHadoopStorage extends HoodieStorage { return Arrays.stream(fs.listStatus( convertToHadoopPath(path), e -> filter.accept(convertToStoragePath(e)))) - .map(this::convertToStoragePathInfo) + .map(HadoopFSUtils::convertToStoragePathInfo) .collect(Collectors.toList()); } @@ -130,7 +134,7 @@ public class HoodieHadoopStorage extends HoodieStorage { public List<StoragePathInfo> globEntries(StoragePath pathPattern) throws IOException { return Arrays.stream(fs.globStatus(convertToHadoopPath(pathPattern))) - .map(this::convertToStoragePathInfo) + .map(HadoopFSUtils::convertToStoragePathInfo) .collect(Collectors.toList()); } @@ -139,7 +143,7 @@ public class HoodieHadoopStorage extends HoodieStorage { throws IOException { return Arrays.stream(fs.globStatus(convertToHadoopPath(pathPattern), path -> filter.accept(convertToStoragePath(path)))) - .map(this::convertToStoragePathInfo) + .map(HadoopFSUtils::convertToStoragePathInfo) .collect(Collectors.toList()); } @@ -184,22 +188,6 @@ public class HoodieHadoopStorage extends HoodieStorage { return fs.createNewFile(convertToHadoopPath(path)); } - private Path convertToHadoopPath(StoragePath loc) { - return new Path(loc.toUri()); - } - - private StoragePath convertToStoragePath(Path path) { - return new StoragePath(path.toUri()); - } - - private StoragePathInfo convertToStoragePathInfo(FileStatus fileStatus) { - return new StoragePathInfo( - convertToStoragePath(fileStatus.getPath()), - fileStatus.getLen(), - fileStatus.isDirectory(), - fileStatus.getModificationTime()); - } - @Override public void close() throws IOException { fs.close(); diff --git a/hudi-hadoop-common/src/test/java/org/apache/hudi/hadoop/fs/TestHadoopFSUtils.java b/hudi-hadoop-common/src/test/java/org/apache/hudi/hadoop/fs/TestHadoopFSUtils.java new file mode 100644 index 00000000000..7768ff4feae --- /dev/null +++ b/hudi-hadoop-common/src/test/java/org/apache/hudi/hadoop/fs/TestHadoopFSUtils.java @@ -0,0 +1,126 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.hadoop.fs; + +import org.apache.hudi.storage.StoragePath; +import org.apache.hudi.storage.StoragePathInfo; + +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.Path; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.CsvSource; +import org.junit.jupiter.params.provider.ValueSource; + +import static org.apache.hudi.hadoop.fs.HadoopFSUtils.convertToHadoopFileStatus; +import static org.apache.hudi.hadoop.fs.HadoopFSUtils.convertToHadoopPath; +import static org.apache.hudi.hadoop.fs.HadoopFSUtils.convertToStoragePath; +import static org.apache.hudi.hadoop.fs.HadoopFSUtils.convertToStoragePathInfo; +import static org.junit.jupiter.api.Assertions.assertEquals; + +/** + * Tests {@link HadoopFSUtils} + */ +public class TestHadoopFSUtils { + @ParameterizedTest + @ValueSource(strings = { + "/a/b/c", + "s3://bucket/partition=1%2F2%2F3", + "hdfs://x/y/z.file#bar" + }) + public void testPathConversion(String pathString) { + // Hadoop Path -> StoragePath -> Hadoop Path + Path path = new Path(pathString); + StoragePath storagePath = convertToStoragePath(path); + Path convertedPath = convertToHadoopPath(storagePath); + assertEquals(path.toUri(), storagePath.toUri()); + assertEquals(path, convertedPath); + + // StoragePath -> Hadoop Path -> StoragePath + storagePath = new StoragePath(pathString); + path = convertToHadoopPath(storagePath); + StoragePath convertedStoragePath = convertToStoragePath(path); + assertEquals(storagePath.toUri(), path.toUri()); + assertEquals(storagePath, convertedStoragePath); + } + + @ParameterizedTest + @CsvSource({ + "/a/b/c,1000,false,1,1000000,1238493920", + "/x/y/z,0,true,2,0,2002403203" + }) + public void testFileStatusConversion(String path, + long length, + boolean isDirectory, + short blockReplication, + long blockSize, + long modificationTime) { + // FileStatus -> StoragePathInfo -> FileStatus + FileStatus fileStatus = new FileStatus( + length, isDirectory, blockReplication, blockSize, modificationTime, new Path(path)); + StoragePathInfo pathInfo = convertToStoragePathInfo(fileStatus); + assertStoragePathInfo( + pathInfo, path, length, isDirectory, blockReplication, blockSize, modificationTime); + FileStatus convertedFileStatus = convertToHadoopFileStatus(pathInfo); + assertFileStatus( + convertedFileStatus, path, length, isDirectory, blockReplication, blockSize, modificationTime); + + // StoragePathInfo -> FileStatus -> StoragePathInfo + pathInfo = new StoragePathInfo( + new StoragePath(path), length, isDirectory, blockReplication, blockSize, modificationTime); + fileStatus = convertToHadoopFileStatus(pathInfo); + assertFileStatus( + fileStatus, path, length, isDirectory, blockReplication, blockSize, modificationTime); + StoragePathInfo convertedPathInfo = convertToStoragePathInfo(fileStatus); + assertStoragePathInfo( + convertedPathInfo, path, length, isDirectory, blockReplication, blockSize, modificationTime); + } + + private void assertFileStatus(FileStatus fileStatus, + String path, + long length, + boolean isDirectory, + short blockReplication, + long blockSize, + long modificationTime) { + assertEquals(new Path(path), fileStatus.getPath()); + assertEquals(length, fileStatus.getLen()); + assertEquals(isDirectory, fileStatus.isDirectory()); + assertEquals(!isDirectory, fileStatus.isFile()); + assertEquals(blockReplication, fileStatus.getReplication()); + assertEquals(blockSize, fileStatus.getBlockSize()); + assertEquals(modificationTime, fileStatus.getModificationTime()); + } + + private void assertStoragePathInfo(StoragePathInfo pathInfo, + String path, + long length, + boolean isDirectory, + short blockReplication, + long blockSize, + long modificationTime) { + assertEquals(new StoragePath(path), pathInfo.getPath()); + assertEquals(length, pathInfo.getLength()); + assertEquals(isDirectory, pathInfo.isDirectory()); + assertEquals(!isDirectory, pathInfo.isFile()); + assertEquals(blockReplication, pathInfo.getBlockReplication()); + assertEquals(blockSize, pathInfo.getBlockSize()); + assertEquals(modificationTime, pathInfo.getModificationTime()); + } +} diff --git a/hudi-io/src/main/java/org/apache/hudi/storage/StoragePathInfo.java b/hudi-io/src/main/java/org/apache/hudi/storage/StoragePathInfo.java index b4ec8194b4d..e4711bf72dd 100644 --- a/hudi-io/src/main/java/org/apache/hudi/storage/StoragePathInfo.java +++ b/hudi-io/src/main/java/org/apache/hudi/storage/StoragePathInfo.java @@ -35,15 +35,21 @@ public class StoragePathInfo implements Serializable { private final StoragePath path; private final long length; private final boolean isDirectory; + private final short blockReplication; + private final long blockSize; private final long modificationTime; public StoragePathInfo(StoragePath path, long length, boolean isDirectory, + short blockReplication, + long blockSize, long modificationTime) { this.path = path; this.length = length; this.isDirectory = isDirectory; + this.blockReplication = blockReplication; + this.blockSize = blockSize; this.modificationTime = modificationTime; } @@ -79,6 +85,22 @@ public class StoragePathInfo implements Serializable { return isDirectory; } + /** + * @return the block replication if applied. + */ + @PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING) + public short getBlockReplication() { + return blockReplication; + } + + /** + * @return the block size in bytes if applied. + */ + @PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING) + public long getBlockSize() { + return blockSize; + } + /** * @return the modification of a file. */ @@ -114,6 +136,8 @@ public class StoragePathInfo implements Serializable { + "path=" + path + ", length=" + length + ", isDirectory=" + isDirectory + + ", blockReplication=" + blockReplication + + ", blockSize=" + blockSize + ", modificationTime=" + modificationTime + '}'; } diff --git a/hudi-io/src/test/java/org/apache/hudi/io/storage/TestHoodieStorageBase.java b/hudi-io/src/test/java/org/apache/hudi/io/storage/TestHoodieStorageBase.java index a6a0efee6dc..460c831e1c0 100644 --- a/hudi-io/src/test/java/org/apache/hudi/io/storage/TestHoodieStorageBase.java +++ b/hudi-io/src/test/java/org/apache/hudi/io/storage/TestHoodieStorageBase.java @@ -164,37 +164,37 @@ public abstract class TestHoodieStorageBase { validatePathInfoList( Arrays.stream(new StoragePathInfo[] { - new StoragePathInfo(new StoragePath(getTempDir(), "x/1.file"), 0, false, 0), - new StoragePathInfo(new StoragePath(getTempDir(), "x/2.file"), 0, false, 0), - new StoragePathInfo(new StoragePath(getTempDir(), "x/y"), 0, true, 0), - new StoragePathInfo(new StoragePath(getTempDir(), "x/z"), 0, true, 0), + getStoragePathInfo("x/1.file", false), + getStoragePathInfo("x/2.file", false), + getStoragePathInfo("x/y", true), + getStoragePathInfo("x/z", true) }).collect(Collectors.toList()), storage.listDirectEntries(new StoragePath(getTempDir(), "x"))); validatePathInfoList( Arrays.stream(new StoragePathInfo[] { - new StoragePathInfo(new StoragePath(getTempDir(), "x/1.file"), 0, false, 0), - new StoragePathInfo(new StoragePath(getTempDir(), "x/2.file"), 0, false, 0), - new StoragePathInfo(new StoragePath(getTempDir(), "x/y/1.file"), 0, false, 0), - new StoragePathInfo(new StoragePath(getTempDir(), "x/y/2.file"), 0, false, 0), - new StoragePathInfo(new StoragePath(getTempDir(), "x/z/1.file"), 0, false, 0), - new StoragePathInfo(new StoragePath(getTempDir(), "x/z/2.file"), 0, false, 0) + getStoragePathInfo("x/1.file", false), + getStoragePathInfo("x/2.file", false), + getStoragePathInfo("x/y/1.file", false), + getStoragePathInfo("x/y/2.file", false), + getStoragePathInfo("x/z/1.file", false), + getStoragePathInfo("x/z/2.file", false) }).collect(Collectors.toList()), storage.listFiles(new StoragePath(getTempDir(), "x"))); validatePathInfoList( Arrays.stream(new StoragePathInfo[] { - new StoragePathInfo(new StoragePath(getTempDir(), "x/2.file"), 0, false, 0) + getStoragePathInfo("x/2.file", false) }).collect(Collectors.toList()), storage.listDirectEntries( new StoragePath(getTempDir(), "x"), e -> e.getName().contains("2"))); validatePathInfoList( Arrays.stream(new StoragePathInfo[] { - new StoragePathInfo(new StoragePath(getTempDir(), "w/1.file"), 0, false, 0), - new StoragePathInfo(new StoragePath(getTempDir(), "w/2.file"), 0, false, 0), - new StoragePathInfo(new StoragePath(getTempDir(), "x/z/1.file"), 0, false, 0), - new StoragePathInfo(new StoragePath(getTempDir(), "x/z/2.file"), 0, false, 0) + getStoragePathInfo("w/1.file", false), + getStoragePathInfo("w/2.file", false), + getStoragePathInfo("x/z/1.file", false), + getStoragePathInfo("x/z/2.file", false) }).collect(Collectors.toList()), storage.listDirectEntries(Arrays.stream(new StoragePath[] { new StoragePath(getTempDir(), "w"), @@ -206,21 +206,21 @@ public abstract class TestHoodieStorageBase { validatePathInfoList( Arrays.stream(new StoragePathInfo[] { - new StoragePathInfo(new StoragePath(getTempDir(), "x/y/1.file"), 0, false, 0), - new StoragePathInfo(new StoragePath(getTempDir(), "x/z/1.file"), 0, false, 0) + getStoragePathInfo("x/y/1.file", false), + getStoragePathInfo("x/z/1.file", false) }).collect(Collectors.toList()), storage.globEntries(new StoragePath(getTempDir(), "x/*/1.file"))); validatePathInfoList( Arrays.stream(new StoragePathInfo[] { - new StoragePathInfo(new StoragePath(getTempDir(), "x/1.file"), 0, false, 0), - new StoragePathInfo(new StoragePath(getTempDir(), "x/2.file"), 0, false, 0), + getStoragePathInfo("x/1.file", false), + getStoragePathInfo("x/2.file", false) }).collect(Collectors.toList()), storage.globEntries(new StoragePath(getTempDir(), "x/*.file"))); validatePathInfoList( Arrays.stream(new StoragePathInfo[] { - new StoragePathInfo(new StoragePath(getTempDir(), "x/y/1.file"), 0, false, 0), + getStoragePathInfo("x/y/1.file", false) }).collect(Collectors.toList()), storage.globEntries( new StoragePath(getTempDir(), "x/*/*.file"), @@ -319,6 +319,11 @@ public abstract class TestHoodieStorageBase { return getHoodieStorage(getFileSystem(conf), conf); } + private StoragePathInfo getStoragePathInfo(String subPath, boolean isDirectory) { + return new StoragePathInfo(new StoragePath(getTempDir(), subPath), + 0, isDirectory, (short) 1, 1000000L, 10L); + } + private void validatePathInfo(HoodieStorage storage, StoragePath path, byte[] data, diff --git a/hudi-io/src/test/java/org/apache/hudi/io/storage/TestStoragePathInfo.java b/hudi-io/src/test/java/org/apache/hudi/io/storage/TestStoragePathInfo.java index 1d92fa075d0..72640c5e3df 100644 --- a/hudi-io/src/test/java/org/apache/hudi/io/storage/TestStoragePathInfo.java +++ b/hudi-io/src/test/java/org/apache/hudi/io/storage/TestStoragePathInfo.java @@ -41,6 +41,8 @@ import static org.junit.jupiter.api.Assertions.assertFalse; public class TestStoragePathInfo { private static final Logger LOG = LoggerFactory.getLogger(TestStoragePathInfo.class); private static final long LENGTH = 100; + private static final short BLOCK_REPLICATION = 1; + private static final long BLOCK_SIZE = 1000000L; private static final long MODIFICATION_TIME = System.currentTimeMillis(); private static final String PATH1 = "/abc/xyz1"; private static final String PATH2 = "/abc/xyz2"; @@ -49,15 +51,15 @@ public class TestStoragePathInfo { @Test public void testConstructor() { - StoragePathInfo pathInfo = new StoragePathInfo(STORAGE_PATH1, LENGTH, false, MODIFICATION_TIME); + StoragePathInfo pathInfo = new StoragePathInfo(STORAGE_PATH1, LENGTH, false, BLOCK_REPLICATION, BLOCK_SIZE, MODIFICATION_TIME); validateAccessors(pathInfo, PATH1, LENGTH, false, MODIFICATION_TIME); - pathInfo = new StoragePathInfo(STORAGE_PATH2, -1, true, MODIFICATION_TIME + 2L); + pathInfo = new StoragePathInfo(STORAGE_PATH2, -1, true, BLOCK_REPLICATION, BLOCK_SIZE, MODIFICATION_TIME + 2L); validateAccessors(pathInfo, PATH2, -1, true, MODIFICATION_TIME + 2L); } @Test public void testSerializability() throws IOException, ClassNotFoundException { - StoragePathInfo pathInfo = new StoragePathInfo(STORAGE_PATH1, LENGTH, false, MODIFICATION_TIME); + StoragePathInfo pathInfo = new StoragePathInfo(STORAGE_PATH1, LENGTH, false, BLOCK_REPLICATION, BLOCK_SIZE, MODIFICATION_TIME); try (ByteArrayOutputStream baos = new ByteArrayOutputStream(); ObjectOutputStream oos = new ObjectOutputStream(baos)) { oos.writeObject(pathInfo); @@ -72,18 +74,18 @@ public class TestStoragePathInfo { @Test public void testEquals() { StoragePathInfo pathInfo1 = new StoragePathInfo( - new StoragePath(PATH1), LENGTH, false, MODIFICATION_TIME); + new StoragePath(PATH1), LENGTH, false, BLOCK_REPLICATION, BLOCK_SIZE, MODIFICATION_TIME); StoragePathInfo pathInfo2 = new StoragePathInfo( - new StoragePath(PATH1), LENGTH + 2, false, MODIFICATION_TIME + 2L); + new StoragePath(PATH1), LENGTH + 2, false, BLOCK_REPLICATION, BLOCK_SIZE, MODIFICATION_TIME + 2L); assertEquals(pathInfo1, pathInfo2); } @Test public void testNotEquals() { StoragePathInfo pathInfo1 = new StoragePathInfo( - STORAGE_PATH1, LENGTH, false, MODIFICATION_TIME); + STORAGE_PATH1, LENGTH, false, BLOCK_REPLICATION, BLOCK_SIZE, MODIFICATION_TIME); StoragePathInfo pathInfo2 = new StoragePathInfo( - STORAGE_PATH2, LENGTH, false, MODIFICATION_TIME + 2L); + STORAGE_PATH2, LENGTH, false, BLOCK_REPLICATION, BLOCK_SIZE, MODIFICATION_TIME + 2L); assertFalse(pathInfo1.equals(pathInfo2)); assertFalse(pathInfo2.equals(pathInfo1)); }
