This is an automated email from the ASF dual-hosted git repository.

sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 8a487ea  [HUDI-2494] Fixing glob pattern to skip all hoodie meta paths 
(#3768)
8a487ea is described below

commit 8a487eafa7b85a8936953a9595b16a2079dc7e7e
Author: Sivabalan Narayanan <[email protected]>
AuthorDate: Tue Oct 12 14:06:40 2021 -0400

    [HUDI-2494] Fixing glob pattern to skip all hoodie meta paths (#3768)
---
 .../scala/org/apache/hudi/HoodieSparkUtils.scala   | 29 ++++++++++++++++------
 .../org/apache/hudi/TestHoodieSparkUtils.scala     | 17 +++++++++----
 .../apache/hudi/functional/TestCOWDataSource.scala |  6 ++---
 3 files changed, 37 insertions(+), 15 deletions(-)

diff --git 
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala
 
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala
index ed0ab97..172bbc4 100644
--- 
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala
+++ 
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala
@@ -19,13 +19,13 @@
 package org.apache.hudi
 
 import java.util.Properties
-
 import org.apache.avro.Schema
 import org.apache.avro.generic.GenericRecord
 import org.apache.hadoop.fs.{FileSystem, Path}
 import org.apache.hudi.client.utils.SparkRowSerDe
 import org.apache.hudi.common.config.TypedProperties
 import org.apache.hudi.common.model.HoodieRecord
+import org.apache.hudi.common.table.HoodieTableMetaClient
 import org.apache.hudi.keygen.constant.KeyGeneratorOptions
 import org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory
 import org.apache.hudi.keygen.{BaseKeyGenerator, CustomAvroKeyGenerator, 
CustomKeyGenerator, KeyGenerator}
@@ -60,12 +60,28 @@ object HoodieSparkUtils extends SparkAdapterSupport {
   }
 
   /**
-   * This method copied from [[org.apache.spark.deploy.SparkHadoopUtil]].
-   * [[org.apache.spark.deploy.SparkHadoopUtil]] becomes private since Spark 
3.0.0 and hence we had to copy it locally.
+   * This method is inspired from [[org.apache.spark.deploy.SparkHadoopUtil]] 
with some modifications like
+   * skipping meta paths.
    */
   def globPath(fs: FileSystem, pattern: Path): Seq[Path] = {
-    Option(fs.globStatus(pattern)).map { statuses =>
-      statuses.map(_.getPath.makeQualified(fs.getUri, 
fs.getWorkingDirectory)).toSeq
+    // find base path to assist in skipping meta paths
+    var basePath = pattern.getParent
+    while (basePath.getName.equals("*")) {
+      basePath = basePath.getParent
+    }
+
+    Option(fs.globStatus(pattern)).map { statuses => {
+      val nonMetaStatuses = statuses.filterNot(entry => {
+        // skip all entries in meta path
+        var leafPath = entry.getPath
+        // walk through every parent until we reach base path. if .hoodie is 
found anywhere, path needs to be skipped
+        while (!leafPath.equals(basePath) && 
!leafPath.getName.equals(HoodieTableMetaClient.METAFOLDER_NAME)) {
+            leafPath = leafPath.getParent
+        }
+        leafPath.getName.equals(HoodieTableMetaClient.METAFOLDER_NAME)
+      })
+      nonMetaStatuses.map(_.getPath.makeQualified(fs.getUri, 
fs.getWorkingDirectory)).toSeq
+    }
     }.getOrElse(Seq.empty[Path])
   }
 
@@ -88,8 +104,7 @@ object HoodieSparkUtils extends SparkAdapterSupport {
   def checkAndGlobPathIfNecessary(paths: Seq[String], fs: FileSystem): 
Seq[Path] = {
     paths.flatMap(path => {
       val qualified = new Path(path).makeQualified(fs.getUri, 
fs.getWorkingDirectory)
-      val globPaths = globPathIfNecessary(fs, qualified)
-      globPaths
+      globPathIfNecessary(fs, qualified)
     })
   }
 
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkUtils.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkUtils.scala
index b86eade..1b756b5 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkUtils.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkUtils.scala
@@ -41,14 +41,18 @@ class TestHoodieSparkUtils {
   def testGlobPaths(@TempDir tempDir: File): Unit = {
     val folders: Seq[Path] = Seq(
       new Path(Paths.get(tempDir.getAbsolutePath, "folder1").toUri),
-      new Path(Paths.get(tempDir.getAbsolutePath, "folder2").toUri)
+      new Path(Paths.get(tempDir.getAbsolutePath, "folder2").toUri),
+      new Path(Paths.get(tempDir.getAbsolutePath, ".hoodie").toUri),
+      new Path(Paths.get(tempDir.getAbsolutePath, ".hoodie", "metadata").toUri)
     )
 
     val files: Seq[Path] = Seq(
       new Path(Paths.get(tempDir.getAbsolutePath, "folder1", "file1").toUri),
       new Path(Paths.get(tempDir.getAbsolutePath, "folder1", "file2").toUri),
       new Path(Paths.get(tempDir.getAbsolutePath, "folder2", "file3").toUri),
-      new Path(Paths.get(tempDir.getAbsolutePath, "folder2", "file4").toUri)
+      new Path(Paths.get(tempDir.getAbsolutePath, "folder2","file4").toUri),
+      new Path(Paths.get(tempDir.getAbsolutePath, ".hoodie","metadata", 
"file5").toUri),
+      new Path(Paths.get(tempDir.getAbsolutePath, ".hoodie","metadata", 
"file6").toUri)
     )
 
     folders.foreach(folder => new File(folder.toUri).mkdir())
@@ -57,12 +61,14 @@ class TestHoodieSparkUtils {
     var paths = Seq(tempDir.getAbsolutePath + "/*")
     var globbedPaths = HoodieSparkUtils.checkAndGlobPathIfNecessary(paths,
       new Path(paths.head).getFileSystem(new Configuration()))
-    assertEquals(folders.sortWith(_.toString < _.toString), 
globbedPaths.sortWith(_.toString < _.toString))
+    assertEquals(folders.filterNot(entry => entry.toString.contains(".hoodie"))
+      .sortWith(_.toString < _.toString), globbedPaths.sortWith(_.toString < 
_.toString))
 
     paths = Seq(tempDir.getAbsolutePath + "/*/*")
     globbedPaths = HoodieSparkUtils.checkAndGlobPathIfNecessary(paths,
       new Path(paths.head).getFileSystem(new Configuration()))
-    assertEquals(files.sortWith(_.toString < _.toString), 
globbedPaths.sortWith(_.toString < _.toString))
+    assertEquals(files.filterNot(entry => entry.toString.contains(".hoodie"))
+      .sortWith(_.toString < _.toString), globbedPaths.sortWith(_.toString < 
_.toString))
 
     paths = Seq(tempDir.getAbsolutePath + "/folder1/*")
     globbedPaths = HoodieSparkUtils.checkAndGlobPathIfNecessary(paths,
@@ -79,7 +85,8 @@ class TestHoodieSparkUtils {
     paths = Seq(tempDir.getAbsolutePath + "/folder1/*", 
tempDir.getAbsolutePath + "/folder2/*")
     globbedPaths = HoodieSparkUtils.checkAndGlobPathIfNecessary(paths,
       new Path(paths.head).getFileSystem(new Configuration()))
-    assertEquals(files.sortWith(_.toString < _.toString), 
globbedPaths.sortWith(_.toString < _.toString))
+    assertEquals(files.filterNot(entry => entry.toString.contains(".hoodie"))
+      .sortWith(_.toString < _.toString), globbedPaths.sortWith(_.toString < 
_.toString))
   }
 
   @Test
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala
index ffe2b4e..6634934 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala
@@ -419,7 +419,7 @@ class TestCOWDataSource extends HoodieClientTestBase {
 
   @Test def testSparkPartitonByWithCustomKeyGenerator(): Unit = {
     // Without fieldType, the default is SIMPLE
-    var writer = getDataFrameWriter(classOf[CustomKeyGenerator].getName, false)
+    var writer = getDataFrameWriter(classOf[CustomKeyGenerator].getName)
     writer.partitionBy("current_ts")
       .mode(SaveMode.Overwrite)
       .save(basePath)
@@ -428,7 +428,7 @@ class TestCOWDataSource extends HoodieClientTestBase {
     assertTrue(recordsReadDF.filter(col("_hoodie_partition_path") =!= 
col("current_ts").cast("string")).count() == 0)
 
     // Specify fieldType as TIMESTAMP
-    writer = getDataFrameWriter(classOf[CustomKeyGenerator].getName, false)
+    writer = getDataFrameWriter(classOf[CustomKeyGenerator].getName)
     writer.partitionBy("current_ts:TIMESTAMP")
       .option(Config.TIMESTAMP_TYPE_FIELD_PROP, "EPOCHMILLISECONDS")
       .option(Config.TIMESTAMP_OUTPUT_DATE_FORMAT_PROP, "yyyyMMdd")
@@ -504,7 +504,7 @@ class TestCOWDataSource extends HoodieClientTestBase {
   }
 
   @Test def testSparkPartitonByWithTimestampBasedKeyGenerator() {
-    val writer = 
getDataFrameWriter(classOf[TimestampBasedKeyGenerator].getName, false)
+    val writer = 
getDataFrameWriter(classOf[TimestampBasedKeyGenerator].getName)
     writer.partitionBy("current_ts")
       .option(Config.TIMESTAMP_TYPE_FIELD_PROP, "EPOCHMILLISECONDS")
       .option(Config.TIMESTAMP_OUTPUT_DATE_FORMAT_PROP, "yyyyMMdd")

Reply via email to