This is an automated email from the ASF dual-hosted git repository. yihua pushed a commit to branch branch-0.x in repository https://gitbox.apache.org/repos/asf/hudi.git
commit c32beae1443f889e9e566d91935548cd54ab4bc7 Author: Y Ethan Guo <[email protected]> AuthorDate: Fri Apr 26 10:39:30 2024 -0700 [HUDI-7664] Remove Hadoop dependency from hudi-io module (#11089) --- .../table/upgrade/SixToFiveDowngradeHandler.java | 10 +++--- .../table/timeline/HoodieActiveTimeline.java | 3 +- hudi-io/pom.xml | 6 ---- .../org/apache/hudi/common/util/FileIOUtils.java | 40 ++++++++-------------- .../org/apache/spark/sql/hudi/DedupeSparkJob.scala | 4 +-- .../apache/hudi/utilities/HoodieRepairTool.java | 15 ++++---- 6 files changed, 30 insertions(+), 48 deletions(-) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/SixToFiveDowngradeHandler.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/SixToFiveDowngradeHandler.java index b4c3f902132..68938e895b0 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/SixToFiveDowngradeHandler.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/SixToFiveDowngradeHandler.java @@ -35,13 +35,11 @@ import org.apache.hudi.config.HoodieCompactionConfig; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieException; import org.apache.hudi.metadata.HoodieTableMetadataUtil; +import org.apache.hudi.storage.StoragePath; import org.apache.hudi.table.HoodieTable; import org.apache.hudi.table.action.compact.CompactionTriggerStrategy; import org.apache.hudi.table.action.compact.strategy.UnBoundedCompactionStrategy; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; - import java.util.HashMap; import java.util.Map; @@ -116,9 +114,9 @@ public class SixToFiveDowngradeHandler implements DowngradeHandler { .filter(instant -> instant.getState() == HoodieInstant.State.REQUESTED); compactionTimeline.getInstantsAsStream().forEach(instant -> { String fileName = instant.getFileName(); - FileIOUtils.copy((FileSystem) metaClient.getStorage().getFileSystem(), - new Path(metaClient.getMetaPath(), fileName), - new Path(metaClient.getMetaAuxiliaryPath(), fileName)); + FileIOUtils.copy(metaClient.getStorage(), + new StoragePath(metaClient.getMetaPath(), fileName), + new StoragePath(metaClient.getMetaAuxiliaryPath(), fileName)); }); } } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java index 3c8d6aa4306..ab885a8ced1 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java @@ -33,7 +33,6 @@ import org.apache.hudi.storage.HoodieStorage; import org.apache.hudi.storage.HoodieStorageUtils; import org.apache.hudi.storage.StoragePath; -import org.apache.hadoop.conf.Configuration; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -819,7 +818,7 @@ public class HoodieActiveTimeline extends HoodieDefaultTimeline { HoodieStorage srcStorage = HoodieStorageUtils.getStorage(srcPath, metaClient.getHadoopConf()); HoodieStorage dstStorage = HoodieStorageUtils.getStorage(dstPath, metaClient.getHadoopConf()); dstStorage.createDirectory(dstDir); - FileIOUtils.copy(srcStorage, srcPath, dstStorage, dstPath, false, true, (Configuration) srcStorage.getConf()); + FileIOUtils.copy(srcStorage, srcPath, dstStorage, dstPath, false, true); } catch (IOException e) { throw new HoodieIOException("Could not copy instant from " + srcPath + " to " + dstPath, e); } diff --git a/hudi-io/pom.xml b/hudi-io/pom.xml index c72a2ef263c..e2db7e3b691 100644 --- a/hudi-io/pom.xml +++ b/hudi-io/pom.xml @@ -110,12 +110,6 @@ <artifactId>aircompressor</artifactId> </dependency> - <dependency> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-common</artifactId> - <scope>provided</scope> - </dependency> - <dependency> <groupId>org.apache.hudi</groupId> <artifactId>hudi-tests-common</artifactId> diff --git a/hudi-io/src/main/java/org/apache/hudi/common/util/FileIOUtils.java b/hudi-io/src/main/java/org/apache/hudi/common/util/FileIOUtils.java index fb37ec429ef..6e398e96953 100644 --- a/hudi-io/src/main/java/org/apache/hudi/common/util/FileIOUtils.java +++ b/hudi-io/src/main/java/org/apache/hudi/common/util/FileIOUtils.java @@ -24,9 +24,6 @@ import org.apache.hudi.storage.HoodieStorage; import org.apache.hudi.storage.StoragePath; import org.apache.hudi.storage.StoragePathInfo; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.io.IOUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -112,18 +109,18 @@ public class FileIOUtils { /** * Copies the file content from source path to destination path. * - * @param fileSystem {@link FileSystem} instance. + * @param storage {@link HoodieStorage} instance. * @param sourceFilePath Source file path. * @param destFilePath Destination file path. */ - public static void copy( - FileSystem fileSystem, org.apache.hadoop.fs.Path sourceFilePath, - org.apache.hadoop.fs.Path destFilePath) { + public static void copy(HoodieStorage storage, + StoragePath sourceFilePath, + StoragePath destFilePath) { InputStream inputStream = null; OutputStream outputStream = null; try { - inputStream = fileSystem.open(sourceFilePath); - outputStream = fileSystem.create(destFilePath, false); + inputStream = storage.open(sourceFilePath); + outputStream = storage.create(destFilePath, false); copy(inputStream, outputStream); } catch (IOException e) { throw new HoodieIOException(String.format("Cannot copy from %s to %s", @@ -200,10 +197,9 @@ public class FileIOUtils { public static boolean copy(HoodieStorage srcStorage, StoragePath src, HoodieStorage dstStorage, StoragePath dst, boolean deleteSource, - boolean overwrite, - Configuration conf) throws IOException { + boolean overwrite) throws IOException { StoragePathInfo pathInfo = srcStorage.getPathInfo(src); - return copy(srcStorage, pathInfo, dstStorage, dst, deleteSource, overwrite, conf); + return copy(srcStorage, pathInfo, dstStorage, dst, deleteSource, overwrite); } /** @@ -212,8 +208,7 @@ public class FileIOUtils { public static boolean copy(HoodieStorage srcStorage, StoragePathInfo srcPathInfo, HoodieStorage dstStorage, StoragePath dst, boolean deleteSource, - boolean overwrite, - Configuration conf) throws IOException { + boolean overwrite) throws IOException { StoragePath src = srcPathInfo.getPath(); if (srcPathInfo.isDirectory()) { if (!dstStorage.createDirectory(dst)) { @@ -223,19 +218,15 @@ public class FileIOUtils { for (StoragePathInfo subPathInfo : contents) { copy(srcStorage, subPathInfo, dstStorage, new StoragePath(dst, subPathInfo.getPath().getName()), - deleteSource, overwrite, conf); + deleteSource, overwrite); } } else { - InputStream in = null; - OutputStream out = null; - try { - in = srcStorage.open(src); - out = dstStorage.create(dst, overwrite); - IOUtils.copyBytes(in, out, conf, true); + try (InputStream in = srcStorage.open(src); + OutputStream out = dstStorage.create(dst, overwrite)) { + copy(in, out); } catch (IOException e) { - IOUtils.closeStream(out); - IOUtils.closeStream(in); - throw e; + throw new IOException( + "Error copying source file " + src + " to the destination file " + dst, e); } } if (deleteSource) { @@ -246,7 +237,6 @@ public class FileIOUtils { } else { return true; } - } public static Option<byte[]> readDataFromPath(HoodieStorage storage, StoragePath detailPath, boolean ignoreIOE) { diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/DedupeSparkJob.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/DedupeSparkJob.scala index 511f8c7e256..0649d03b499 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/DedupeSparkJob.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/DedupeSparkJob.scala @@ -205,7 +205,7 @@ class DedupeSparkJob(basePath: String, val dstPath = new Path(s"$repairOutputPath/${filePath.getName}$badSuffix") LOG.info(s"Copying from $filePath to $dstPath") FileIOUtils.copy(storage, new StoragePath(filePath.toUri), storage, - new StoragePath(dstPath.toUri), false, true, storage.getConf.asInstanceOf[Configuration]) + new StoragePath(dstPath.toUri), false, true) } // 2. Remove duplicates from the bad files @@ -250,7 +250,7 @@ class DedupeSparkJob(basePath: String, // for real LOG.info(s"[FOR REAL!!!] Copying from $srcPath to $dstPath") FileIOUtils.copy(storage, new StoragePath(srcPath.toUri), storage, - new StoragePath(dstPath.toUri), false, true, storage.getConf.asInstanceOf[Configuration]) + new StoragePath(dstPath.toUri), false, true) } } } diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieRepairTool.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieRepairTool.java index 3cdb7fda9df..89af9455944 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieRepairTool.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieRepairTool.java @@ -33,10 +33,11 @@ import org.apache.hudi.common.util.StringUtils; import org.apache.hudi.common.util.collection.ImmutablePair; import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.hadoop.fs.HadoopFSUtils; -import org.apache.hudi.storage.StoragePath; -import org.apache.hudi.storage.HoodieStorage; import org.apache.hudi.metadata.FileSystemBackedTableMetadata; import org.apache.hudi.metadata.HoodieTableMetadata; +import org.apache.hudi.storage.HoodieStorage; +import org.apache.hudi.storage.HoodieStorageUtils; +import org.apache.hudi.storage.StoragePath; import org.apache.hudi.table.repair.RepairUtils; import com.beust.jcommander.JCommander; @@ -251,14 +252,14 @@ public class HoodieRepairTool { List<Boolean> allResults = context.parallelize(relativeFilePaths) .mapPartitions(iterator -> { List<Boolean> results = new ArrayList<>(); - FileSystem fs = HadoopFSUtils.getFs(destBasePath, conf.get()); + HoodieStorage storage = HoodieStorageUtils.getStorage(destBasePath, conf.get()); iterator.forEachRemaining(filePath -> { boolean success = false; - Path sourcePath = new Path(sourceBasePath, filePath); - Path destPath = new Path(destBasePath, filePath); + StoragePath sourcePath = new StoragePath(sourceBasePath, filePath); + StoragePath destPath = new StoragePath(destBasePath, filePath); try { - if (!fs.exists(destPath)) { - FileIOUtils.copy(fs, sourcePath, destPath); + if (!storage.exists(destPath)) { + FileIOUtils.copy(storage, sourcePath, destPath); success = true; } } catch (IOException e) {
