This is an automated email from the ASF dual-hosted git repository.
sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 8a487ea [HUDI-2494] Fixing glob pattern to skip all hoodie meta paths
(#3768)
8a487ea is described below
commit 8a487eafa7b85a8936953a9595b16a2079dc7e7e
Author: Sivabalan Narayanan <[email protected]>
AuthorDate: Tue Oct 12 14:06:40 2021 -0400
[HUDI-2494] Fixing glob pattern to skip all hoodie meta paths (#3768)
---
.../scala/org/apache/hudi/HoodieSparkUtils.scala | 29 ++++++++++++++++------
.../org/apache/hudi/TestHoodieSparkUtils.scala | 17 +++++++++----
.../apache/hudi/functional/TestCOWDataSource.scala | 6 ++---
3 files changed, 37 insertions(+), 15 deletions(-)
diff --git
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala
index ed0ab97..172bbc4 100644
---
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala
+++
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala
@@ -19,13 +19,13 @@
package org.apache.hudi
import java.util.Properties
-
import org.apache.avro.Schema
import org.apache.avro.generic.GenericRecord
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hudi.client.utils.SparkRowSerDe
import org.apache.hudi.common.config.TypedProperties
import org.apache.hudi.common.model.HoodieRecord
+import org.apache.hudi.common.table.HoodieTableMetaClient
import org.apache.hudi.keygen.constant.KeyGeneratorOptions
import org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory
import org.apache.hudi.keygen.{BaseKeyGenerator, CustomAvroKeyGenerator,
CustomKeyGenerator, KeyGenerator}
@@ -60,12 +60,28 @@ object HoodieSparkUtils extends SparkAdapterSupport {
}
/**
- * This method copied from [[org.apache.spark.deploy.SparkHadoopUtil]].
- * [[org.apache.spark.deploy.SparkHadoopUtil]] becomes private since Spark
3.0.0 and hence we had to copy it locally.
+ * This method is inspired from [[org.apache.spark.deploy.SparkHadoopUtil]]
with some modifications like
+ * skipping meta paths.
*/
def globPath(fs: FileSystem, pattern: Path): Seq[Path] = {
- Option(fs.globStatus(pattern)).map { statuses =>
- statuses.map(_.getPath.makeQualified(fs.getUri,
fs.getWorkingDirectory)).toSeq
+ // find base path to assist in skipping meta paths
+ var basePath = pattern.getParent
+ while (basePath.getName.equals("*")) {
+ basePath = basePath.getParent
+ }
+
+ Option(fs.globStatus(pattern)).map { statuses => {
+ val nonMetaStatuses = statuses.filterNot(entry => {
+ // skip all entries in meta path
+ var leafPath = entry.getPath
+ // walk through every parent until we reach base path. if .hoodie is
found anywhere, path needs to be skipped
+ while (!leafPath.equals(basePath) &&
!leafPath.getName.equals(HoodieTableMetaClient.METAFOLDER_NAME)) {
+ leafPath = leafPath.getParent
+ }
+ leafPath.getName.equals(HoodieTableMetaClient.METAFOLDER_NAME)
+ })
+ nonMetaStatuses.map(_.getPath.makeQualified(fs.getUri,
fs.getWorkingDirectory)).toSeq
+ }
}.getOrElse(Seq.empty[Path])
}
@@ -88,8 +104,7 @@ object HoodieSparkUtils extends SparkAdapterSupport {
def checkAndGlobPathIfNecessary(paths: Seq[String], fs: FileSystem):
Seq[Path] = {
paths.flatMap(path => {
val qualified = new Path(path).makeQualified(fs.getUri,
fs.getWorkingDirectory)
- val globPaths = globPathIfNecessary(fs, qualified)
- globPaths
+ globPathIfNecessary(fs, qualified)
})
}
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkUtils.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkUtils.scala
index b86eade..1b756b5 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkUtils.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkUtils.scala
@@ -41,14 +41,18 @@ class TestHoodieSparkUtils {
def testGlobPaths(@TempDir tempDir: File): Unit = {
val folders: Seq[Path] = Seq(
new Path(Paths.get(tempDir.getAbsolutePath, "folder1").toUri),
- new Path(Paths.get(tempDir.getAbsolutePath, "folder2").toUri)
+ new Path(Paths.get(tempDir.getAbsolutePath, "folder2").toUri),
+ new Path(Paths.get(tempDir.getAbsolutePath, ".hoodie").toUri),
+ new Path(Paths.get(tempDir.getAbsolutePath, ".hoodie", "metadata").toUri)
)
val files: Seq[Path] = Seq(
new Path(Paths.get(tempDir.getAbsolutePath, "folder1", "file1").toUri),
new Path(Paths.get(tempDir.getAbsolutePath, "folder1", "file2").toUri),
new Path(Paths.get(tempDir.getAbsolutePath, "folder2", "file3").toUri),
- new Path(Paths.get(tempDir.getAbsolutePath, "folder2", "file4").toUri)
+ new Path(Paths.get(tempDir.getAbsolutePath, "folder2","file4").toUri),
+ new Path(Paths.get(tempDir.getAbsolutePath, ".hoodie","metadata",
"file5").toUri),
+ new Path(Paths.get(tempDir.getAbsolutePath, ".hoodie","metadata",
"file6").toUri)
)
folders.foreach(folder => new File(folder.toUri).mkdir())
@@ -57,12 +61,14 @@ class TestHoodieSparkUtils {
var paths = Seq(tempDir.getAbsolutePath + "/*")
var globbedPaths = HoodieSparkUtils.checkAndGlobPathIfNecessary(paths,
new Path(paths.head).getFileSystem(new Configuration()))
- assertEquals(folders.sortWith(_.toString < _.toString),
globbedPaths.sortWith(_.toString < _.toString))
+ assertEquals(folders.filterNot(entry => entry.toString.contains(".hoodie"))
+ .sortWith(_.toString < _.toString), globbedPaths.sortWith(_.toString <
_.toString))
paths = Seq(tempDir.getAbsolutePath + "/*/*")
globbedPaths = HoodieSparkUtils.checkAndGlobPathIfNecessary(paths,
new Path(paths.head).getFileSystem(new Configuration()))
- assertEquals(files.sortWith(_.toString < _.toString),
globbedPaths.sortWith(_.toString < _.toString))
+ assertEquals(files.filterNot(entry => entry.toString.contains(".hoodie"))
+ .sortWith(_.toString < _.toString), globbedPaths.sortWith(_.toString <
_.toString))
paths = Seq(tempDir.getAbsolutePath + "/folder1/*")
globbedPaths = HoodieSparkUtils.checkAndGlobPathIfNecessary(paths,
@@ -79,7 +85,8 @@ class TestHoodieSparkUtils {
paths = Seq(tempDir.getAbsolutePath + "/folder1/*",
tempDir.getAbsolutePath + "/folder2/*")
globbedPaths = HoodieSparkUtils.checkAndGlobPathIfNecessary(paths,
new Path(paths.head).getFileSystem(new Configuration()))
- assertEquals(files.sortWith(_.toString < _.toString),
globbedPaths.sortWith(_.toString < _.toString))
+ assertEquals(files.filterNot(entry => entry.toString.contains(".hoodie"))
+ .sortWith(_.toString < _.toString), globbedPaths.sortWith(_.toString <
_.toString))
}
@Test
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala
index ffe2b4e..6634934 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala
@@ -419,7 +419,7 @@ class TestCOWDataSource extends HoodieClientTestBase {
@Test def testSparkPartitonByWithCustomKeyGenerator(): Unit = {
// Without fieldType, the default is SIMPLE
- var writer = getDataFrameWriter(classOf[CustomKeyGenerator].getName, false)
+ var writer = getDataFrameWriter(classOf[CustomKeyGenerator].getName)
writer.partitionBy("current_ts")
.mode(SaveMode.Overwrite)
.save(basePath)
@@ -428,7 +428,7 @@ class TestCOWDataSource extends HoodieClientTestBase {
assertTrue(recordsReadDF.filter(col("_hoodie_partition_path") =!=
col("current_ts").cast("string")).count() == 0)
// Specify fieldType as TIMESTAMP
- writer = getDataFrameWriter(classOf[CustomKeyGenerator].getName, false)
+ writer = getDataFrameWriter(classOf[CustomKeyGenerator].getName)
writer.partitionBy("current_ts:TIMESTAMP")
.option(Config.TIMESTAMP_TYPE_FIELD_PROP, "EPOCHMILLISECONDS")
.option(Config.TIMESTAMP_OUTPUT_DATE_FORMAT_PROP, "yyyyMMdd")
@@ -504,7 +504,7 @@ class TestCOWDataSource extends HoodieClientTestBase {
}
@Test def testSparkPartitonByWithTimestampBasedKeyGenerator() {
- val writer =
getDataFrameWriter(classOf[TimestampBasedKeyGenerator].getName, false)
+ val writer =
getDataFrameWriter(classOf[TimestampBasedKeyGenerator].getName)
writer.partitionBy("current_ts")
.option(Config.TIMESTAMP_TYPE_FIELD_PROP, "EPOCHMILLISECONDS")
.option(Config.TIMESTAMP_OUTPUT_DATE_FORMAT_PROP, "yyyyMMdd")