This is an automated email from the ASF dual-hosted git repository.

sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new a1509e25f5c [HUDI-5891] Fix clustering on bootstrapped tables (#8206)
a1509e25f5c is described below

commit a1509e25f5c9bece470075272515e17890f30736
Author: Sagar Sumit <[email protected]>
AuthorDate: Mon Mar 27 09:12:23 2023 +0530

    [HUDI-5891] Fix clustering on bootstrapped tables (#8206)
    
    - Fixes clustering on bootstrapped tables. There are two issues:
    
    Clustering strategy added bootstrap as well as source paths to the glob 
paths. This was causing an error, as mentioned in HUDI-5891, while building 
file index. There is no need to add source path as that is handled in 
HoodieBootstrapRDD.
    Sometimes the path will have scheme and sometimes it will be without 
scheme, so getting the relative partition path based was buggy. Corrected that 
in HoodieBootstrapRelation.
---
 .../MultipleSparkJobExecutionStrategy.java         |  7 +--
 .../scala/org/apache/hudi/HoodieBaseRelation.scala |  9 ++-
 .../functional/TestDataSourceForBootstrap.scala    | 66 +++++++++++++++++++++-
 3 files changed, 72 insertions(+), 10 deletions(-)

diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java
index 623e0b18349..d9cebe6d0c6 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java
@@ -35,12 +35,12 @@ import org.apache.hudi.common.model.HoodieKey;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.table.HoodieTableConfig;
 import org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner;
-import org.apache.hudi.common.util.collection.ClosableIterator;
 import org.apache.hudi.common.util.CollectionUtils;
 import org.apache.hudi.common.util.FutureUtils;
-import org.apache.hudi.common.util.collection.CloseableMappingIterator;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.common.util.collection.ClosableIterator;
+import org.apache.hudi.common.util.collection.CloseableMappingIterator;
 import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.config.HoodieClusteringConfig;
 import org.apache.hudi.config.HoodieWriteConfig;
@@ -368,9 +368,6 @@ public abstract class MultipleSparkJobExecutionStrategy<T>
         .stream()
         .map(op -> {
           ArrayList<String> readPaths = new ArrayList<>();
-          if (op.getBootstrapFilePath() != null) {
-            readPaths.add(op.getBootstrapFilePath());
-          }
           if (op.getDataFilePath() != null) {
             readPaths.add(op.getDataFilePath());
           }
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala
index eb92a2f6732..8d9a5e2fc85 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala
@@ -23,8 +23,8 @@ import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.{FileStatus, Path}
 import org.apache.hadoop.hbase.io.hfile.CacheConfig
 import org.apache.hadoop.mapred.JobConf
-import org.apache.hudi.HoodieBaseRelation._
 import org.apache.hudi.AvroConversionUtils.getAvroSchemaWithDefaults
+import org.apache.hudi.HoodieBaseRelation._
 import org.apache.hudi.HoodieConversionUtils.toScalaOption
 import org.apache.hudi.avro.HoodieAvroUtils
 import org.apache.hudi.client.utils.SparkInternalSchemaConverter
@@ -39,6 +39,7 @@ import org.apache.hudi.common.util.StringUtils.isNullOrEmpty
 import org.apache.hudi.common.util.ValidationUtils.checkState
 import org.apache.hudi.common.util.{ConfigUtils, StringUtils}
 import org.apache.hudi.config.HoodieWriteConfig
+import org.apache.hudi.hadoop.CachingPath
 import org.apache.hudi.internal.schema.convert.AvroInternalSchemaConverter
 import org.apache.hudi.internal.schema.utils.{InternalSchemaUtils, SerDeHelper}
 import org.apache.hudi.internal.schema.{HoodieSchemaException, InternalSchema}
@@ -57,7 +58,7 @@ import 
org.apache.spark.sql.execution.datasources.orc.OrcFileFormat
 import 
org.apache.spark.sql.execution.datasources.parquet.{HoodieParquetFileFormat, 
ParquetFileFormat}
 import org.apache.spark.sql.hudi.HoodieSqlCommonUtils
 import org.apache.spark.sql.sources.{BaseRelation, Filter, PrunedFilteredScan}
-import org.apache.spark.sql.types.{StringType, StructField, StructType}
+import org.apache.spark.sql.types.StructType
 import org.apache.spark.sql.{Row, SQLContext, SparkSession}
 import org.apache.spark.unsafe.types.UTF8String
 
@@ -480,7 +481,9 @@ abstract class HoodieBaseRelation(val sqlContext: 
SQLContext,
     try {
       val tableConfig = metaClient.getTableConfig
       if (extractPartitionValuesFromPartitionPath) {
-        val relativePath = new URI(metaClient.getBasePath).relativize(new 
URI(file.getPath.getParent.toString)).toString
+        val tablePathWithoutScheme = 
CachingPath.getPathWithoutSchemeAndAuthority(metaClient.getBasePathV2)
+        val partitionPathWithoutScheme = 
CachingPath.getPathWithoutSchemeAndAuthority(file.getPath.getParent)
+        val relativePath = new 
URI(tablePathWithoutScheme.toString).relativize(new 
URI(partitionPathWithoutScheme.toString)).toString
         val hiveStylePartitioningEnabled = 
tableConfig.getHiveStylePartitioningEnable.toBoolean
         if (hiveStylePartitioningEnabled) {
           val partitionSpec = PartitioningUtils.parsePathFragment(relativePath)
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestDataSourceForBootstrap.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestDataSourceForBootstrap.scala
index e3d235591d4..86172b64f90 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestDataSourceForBootstrap.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestDataSourceForBootstrap.scala
@@ -25,7 +25,7 @@ import org.apache.hudi.common.fs.FSUtils
 import org.apache.hudi.common.model.HoodieRecord
 import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType
 import org.apache.hudi.common.table.timeline.HoodieTimeline
-import org.apache.hudi.config.{HoodieBootstrapConfig, HoodieCompactionConfig, 
HoodieWriteConfig}
+import org.apache.hudi.config.{HoodieBootstrapConfig, HoodieClusteringConfig, 
HoodieCompactionConfig, HoodieWriteConfig}
 import org.apache.hudi.functional.TestDataSourceForBootstrap.{dropMetaCols, 
sort}
 import org.apache.hudi.keygen.{NonpartitionedKeyGenerator, SimpleKeyGenerator}
 import org.apache.hudi.testutils.HoodieClientTestUtils
@@ -37,7 +37,7 @@ import org.junit.jupiter.api.Assertions.assertEquals
 import org.junit.jupiter.api.io.TempDir
 import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
 import org.junit.jupiter.params.ParameterizedTest
-import org.junit.jupiter.params.provider.{CsvSource, EnumSource, ValueSource}
+import org.junit.jupiter.params.provider.{CsvSource, EnumSource}
 
 import java.time.Instant
 import java.util.Collections
@@ -328,6 +328,68 @@ class TestDataSourceForBootstrap {
     verifyIncrementalViewResult(commitInstantTime1, commitInstantTime3, 
isPartitioned = true, isHiveStylePartitioned = true)
   }
 
+  @Test
+  def testMetadataBootstrapMORPartitionedInlineClustering(): Unit = {
+    val timestamp = Instant.now.toEpochMilli
+    val jsc = JavaSparkContext.fromSparkContext(spark.sparkContext)
+    // Prepare source data
+    val sourceDF = TestBootstrap.generateTestRawTripDataset(timestamp, 0, 
numRecords, partitionPaths.asJava, jsc, spark.sqlContext)
+    sourceDF.write.format("parquet")
+      .partitionBy("datestr")
+      .mode(SaveMode.Overwrite)
+      .save(srcPath)
+
+    val writeOpts = commonOpts ++ getRecordTypeOpts(HoodieRecordType.AVRO) ++ 
Map(
+      DataSourceWriteOptions.HIVE_STYLE_PARTITIONING.key -> "true",
+      DataSourceWriteOptions.PARTITIONPATH_FIELD.key -> "datestr"
+    )
+
+    // Perform bootstrap
+    val commitInstantTime1 = runMetadataBootstrapAndVerifyCommit(
+      DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL,
+      writeOpts,
+      classOf[SimpleKeyGenerator].getName)
+
+    // Read bootstrapped table and verify count
+    val hoodieROViewDF1 = spark.read.format("hudi")
+      .option(DataSourceReadOptions.QUERY_TYPE.key,
+        DataSourceReadOptions.QUERY_TYPE_READ_OPTIMIZED_OPT_VAL)
+      .load(basePath + "/*")
+    assertEquals(numRecords, hoodieROViewDF1.count())
+
+    // Perform upsert with clustering
+    val updateTimestamp = Instant.now.toEpochMilli
+    val updateDF = TestBootstrap.generateTestRawTripDataset(updateTimestamp, 
0, numRecordsUpdate, partitionPaths.asJava, jsc, spark.sqlContext)
+
+    updateDF.write
+      .format("hudi")
+      .options(writeOpts)
+      .option(DataSourceWriteOptions.OPERATION.key, 
DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL)
+      .option(DataSourceWriteOptions.TABLE_TYPE.key, 
DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL)
+      .option(HoodieClusteringConfig.INLINE_CLUSTERING.key, "true")
+      .option(HoodieClusteringConfig.INLINE_CLUSTERING_MAX_COMMITS.key, "1")
+      .option(HoodieClusteringConfig.PLAN_STRATEGY_SORT_COLUMNS.key, "datestr")
+      .mode(SaveMode.Append)
+      .save(basePath)
+
+    // Expect 2 new commits since meta bootstrap - delta commit and 
replacecommit (due to inline clustering)
+    assertEquals(2, HoodieDataSourceHelpers.listCommitsSince(fs, basePath, 
commitInstantTime1).size())
+
+    // Read table after upsert and verify count.
+    val hoodieROViewDF2 = spark.read.format("hudi")
+      .option(DataSourceReadOptions.QUERY_TYPE.key,
+        DataSourceReadOptions.QUERY_TYPE_READ_OPTIMIZED_OPT_VAL)
+      .load(basePath + "/*")
+    assertEquals(numRecords, hoodieROViewDF2.count())
+
+    // Test query without "*" for MOR READ_OPTIMIZED
+    val hoodieROViewDFWithBasePath = spark.read.format("hudi")
+      .option(DataSourceReadOptions.QUERY_TYPE.key,
+        DataSourceReadOptions.QUERY_TYPE_READ_OPTIMIZED_OPT_VAL)
+      .load(basePath)
+    assertEquals(numRecords, hoodieROViewDFWithBasePath.count())
+  }
+
   @ParameterizedTest
   @EnumSource(value = classOf[HoodieRecordType],
     // TODO(HUDI-5807) enable for spark native records

Reply via email to