This is an automated email from the ASF dual-hosted git repository.

yihua pushed a commit to branch branch-0.x
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit c32beae1443f889e9e566d91935548cd54ab4bc7
Author: Y Ethan Guo <[email protected]>
AuthorDate: Fri Apr 26 10:39:30 2024 -0700

    [HUDI-7664] Remove Hadoop dependency from hudi-io module (#11089)
---
 .../table/upgrade/SixToFiveDowngradeHandler.java   | 10 +++---
 .../table/timeline/HoodieActiveTimeline.java       |  3 +-
 hudi-io/pom.xml                                    |  6 ----
 .../org/apache/hudi/common/util/FileIOUtils.java   | 40 ++++++++--------------
 .../org/apache/spark/sql/hudi/DedupeSparkJob.scala |  4 +--
 .../apache/hudi/utilities/HoodieRepairTool.java    | 15 ++++----
 6 files changed, 30 insertions(+), 48 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/SixToFiveDowngradeHandler.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/SixToFiveDowngradeHandler.java
index b4c3f902132..68938e895b0 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/SixToFiveDowngradeHandler.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/SixToFiveDowngradeHandler.java
@@ -35,13 +35,11 @@ import org.apache.hudi.config.HoodieCompactionConfig;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.metadata.HoodieTableMetadataUtil;
+import org.apache.hudi.storage.StoragePath;
 import org.apache.hudi.table.HoodieTable;
 import org.apache.hudi.table.action.compact.CompactionTriggerStrategy;
 import 
org.apache.hudi.table.action.compact.strategy.UnBoundedCompactionStrategy;
 
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-
 import java.util.HashMap;
 import java.util.Map;
 
@@ -116,9 +114,9 @@ public class SixToFiveDowngradeHandler implements 
DowngradeHandler {
         .filter(instant -> instant.getState() == 
HoodieInstant.State.REQUESTED);
     compactionTimeline.getInstantsAsStream().forEach(instant -> {
       String fileName = instant.getFileName();
-      FileIOUtils.copy((FileSystem) metaClient.getStorage().getFileSystem(),
-          new Path(metaClient.getMetaPath(), fileName),
-          new Path(metaClient.getMetaAuxiliaryPath(), fileName));
+      FileIOUtils.copy(metaClient.getStorage(),
+          new StoragePath(metaClient.getMetaPath(), fileName),
+          new StoragePath(metaClient.getMetaAuxiliaryPath(), fileName));
     });
   }
 }
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java
index 3c8d6aa4306..ab885a8ced1 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java
@@ -33,7 +33,6 @@ import org.apache.hudi.storage.HoodieStorage;
 import org.apache.hudi.storage.HoodieStorageUtils;
 import org.apache.hudi.storage.StoragePath;
 
-import org.apache.hadoop.conf.Configuration;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -819,7 +818,7 @@ public class HoodieActiveTimeline extends 
HoodieDefaultTimeline {
       HoodieStorage srcStorage = HoodieStorageUtils.getStorage(srcPath, 
metaClient.getHadoopConf());
       HoodieStorage dstStorage = HoodieStorageUtils.getStorage(dstPath, 
metaClient.getHadoopConf());
       dstStorage.createDirectory(dstDir);
-      FileIOUtils.copy(srcStorage, srcPath, dstStorage, dstPath, false, true, 
(Configuration) srcStorage.getConf());
+      FileIOUtils.copy(srcStorage, srcPath, dstStorage, dstPath, false, true);
     } catch (IOException e) {
       throw new HoodieIOException("Could not copy instant from " + srcPath + " 
to " + dstPath, e);
     }
diff --git a/hudi-io/pom.xml b/hudi-io/pom.xml
index c72a2ef263c..e2db7e3b691 100644
--- a/hudi-io/pom.xml
+++ b/hudi-io/pom.xml
@@ -110,12 +110,6 @@
       <artifactId>aircompressor</artifactId>
     </dependency>
 
-    <dependency>
-      <groupId>org.apache.hadoop</groupId>
-      <artifactId>hadoop-common</artifactId>
-      <scope>provided</scope>
-    </dependency>
-
     <dependency>
       <groupId>org.apache.hudi</groupId>
       <artifactId>hudi-tests-common</artifactId>
diff --git a/hudi-io/src/main/java/org/apache/hudi/common/util/FileIOUtils.java 
b/hudi-io/src/main/java/org/apache/hudi/common/util/FileIOUtils.java
index fb37ec429ef..6e398e96953 100644
--- a/hudi-io/src/main/java/org/apache/hudi/common/util/FileIOUtils.java
+++ b/hudi-io/src/main/java/org/apache/hudi/common/util/FileIOUtils.java
@@ -24,9 +24,6 @@ import org.apache.hudi.storage.HoodieStorage;
 import org.apache.hudi.storage.StoragePath;
 import org.apache.hudi.storage.StoragePathInfo;
 
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.io.IOUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -112,18 +109,18 @@ public class FileIOUtils {
   /**
    * Copies the file content from source path to destination path.
    *
-   * @param fileSystem     {@link FileSystem} instance.
+   * @param storage     {@link HoodieStorage} instance.
    * @param sourceFilePath Source file path.
    * @param destFilePath   Destination file path.
    */
-  public static void copy(
-      FileSystem fileSystem, org.apache.hadoop.fs.Path sourceFilePath,
-      org.apache.hadoop.fs.Path destFilePath) {
+  public static void copy(HoodieStorage storage,
+                          StoragePath sourceFilePath,
+                          StoragePath destFilePath) {
     InputStream inputStream = null;
     OutputStream outputStream = null;
     try {
-      inputStream = fileSystem.open(sourceFilePath);
-      outputStream = fileSystem.create(destFilePath, false);
+      inputStream = storage.open(sourceFilePath);
+      outputStream = storage.create(destFilePath, false);
       copy(inputStream, outputStream);
     } catch (IOException e) {
       throw new HoodieIOException(String.format("Cannot copy from %s to %s",
@@ -200,10 +197,9 @@ public class FileIOUtils {
   public static boolean copy(HoodieStorage srcStorage, StoragePath src,
                              HoodieStorage dstStorage, StoragePath dst,
                              boolean deleteSource,
-                             boolean overwrite,
-                             Configuration conf) throws IOException {
+                             boolean overwrite) throws IOException {
     StoragePathInfo pathInfo = srcStorage.getPathInfo(src);
-    return copy(srcStorage, pathInfo, dstStorage, dst, deleteSource, 
overwrite, conf);
+    return copy(srcStorage, pathInfo, dstStorage, dst, deleteSource, 
overwrite);
   }
 
   /**
@@ -212,8 +208,7 @@ public class FileIOUtils {
   public static boolean copy(HoodieStorage srcStorage, StoragePathInfo 
srcPathInfo,
                              HoodieStorage dstStorage, StoragePath dst,
                              boolean deleteSource,
-                             boolean overwrite,
-                             Configuration conf) throws IOException {
+                             boolean overwrite) throws IOException {
     StoragePath src = srcPathInfo.getPath();
     if (srcPathInfo.isDirectory()) {
       if (!dstStorage.createDirectory(dst)) {
@@ -223,19 +218,15 @@ public class FileIOUtils {
       for (StoragePathInfo subPathInfo : contents) {
         copy(srcStorage, subPathInfo, dstStorage,
             new StoragePath(dst, subPathInfo.getPath().getName()),
-            deleteSource, overwrite, conf);
+            deleteSource, overwrite);
       }
     } else {
-      InputStream in = null;
-      OutputStream out = null;
-      try {
-        in = srcStorage.open(src);
-        out = dstStorage.create(dst, overwrite);
-        IOUtils.copyBytes(in, out, conf, true);
+      try (InputStream in = srcStorage.open(src);
+           OutputStream out = dstStorage.create(dst, overwrite)) {
+        copy(in, out);
       } catch (IOException e) {
-        IOUtils.closeStream(out);
-        IOUtils.closeStream(in);
-        throw e;
+        throw new IOException(
+            "Error copying source file " + src + " to the destination file " + 
dst, e);
       }
     }
     if (deleteSource) {
@@ -246,7 +237,6 @@ public class FileIOUtils {
     } else {
       return true;
     }
-
   }
 
   public static Option<byte[]> readDataFromPath(HoodieStorage storage, 
StoragePath detailPath, boolean ignoreIOE) {
diff --git 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/DedupeSparkJob.scala
 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/DedupeSparkJob.scala
index 511f8c7e256..0649d03b499 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/DedupeSparkJob.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/DedupeSparkJob.scala
@@ -205,7 +205,7 @@ class DedupeSparkJob(basePath: String,
       val dstPath = new 
Path(s"$repairOutputPath/${filePath.getName}$badSuffix")
       LOG.info(s"Copying from $filePath to $dstPath")
       FileIOUtils.copy(storage, new StoragePath(filePath.toUri), storage,
-        new StoragePath(dstPath.toUri), false, true, 
storage.getConf.asInstanceOf[Configuration])
+        new StoragePath(dstPath.toUri), false, true)
     }
 
     // 2. Remove duplicates from the bad files
@@ -250,7 +250,7 @@ class DedupeSparkJob(basePath: String,
         // for real
         LOG.info(s"[FOR REAL!!!] Copying from $srcPath to $dstPath")
         FileIOUtils.copy(storage, new StoragePath(srcPath.toUri), storage,
-          new StoragePath(dstPath.toUri), false, true, 
storage.getConf.asInstanceOf[Configuration])
+          new StoragePath(dstPath.toUri), false, true)
       }
     }
   }
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieRepairTool.java 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieRepairTool.java
index 3cdb7fda9df..89af9455944 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieRepairTool.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieRepairTool.java
@@ -33,10 +33,11 @@ import org.apache.hudi.common.util.StringUtils;
 import org.apache.hudi.common.util.collection.ImmutablePair;
 import org.apache.hudi.exception.HoodieIOException;
 import org.apache.hudi.hadoop.fs.HadoopFSUtils;
-import org.apache.hudi.storage.StoragePath;
-import org.apache.hudi.storage.HoodieStorage;
 import org.apache.hudi.metadata.FileSystemBackedTableMetadata;
 import org.apache.hudi.metadata.HoodieTableMetadata;
+import org.apache.hudi.storage.HoodieStorage;
+import org.apache.hudi.storage.HoodieStorageUtils;
+import org.apache.hudi.storage.StoragePath;
 import org.apache.hudi.table.repair.RepairUtils;
 
 import com.beust.jcommander.JCommander;
@@ -251,14 +252,14 @@ public class HoodieRepairTool {
     List<Boolean> allResults = context.parallelize(relativeFilePaths)
         .mapPartitions(iterator -> {
           List<Boolean> results = new ArrayList<>();
-          FileSystem fs = HadoopFSUtils.getFs(destBasePath, conf.get());
+          HoodieStorage storage = HoodieStorageUtils.getStorage(destBasePath, 
conf.get());
           iterator.forEachRemaining(filePath -> {
             boolean success = false;
-            Path sourcePath = new Path(sourceBasePath, filePath);
-            Path destPath = new Path(destBasePath, filePath);
+            StoragePath sourcePath = new StoragePath(sourceBasePath, filePath);
+            StoragePath destPath = new StoragePath(destBasePath, filePath);
             try {
-              if (!fs.exists(destPath)) {
-                FileIOUtils.copy(fs, sourcePath, destPath);
+              if (!storage.exists(destPath)) {
+                FileIOUtils.copy(storage, sourcePath, destPath);
                 success = true;
               }
             } catch (IOException e) {

Reply via email to