This is an automated email from the ASF dual-hosted git repository.

codope pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 59786113fae [HUDI-5998] Speed up reads from bootstrapped tables in 
spark (#8303)
59786113fae is described below

commit 59786113fae88382f03412c7c79f7a827bf9393f
Author: Jon Vexler <[email protected]>
AuthorDate: Fri May 26 02:36:32 2023 -0400

    [HUDI-5998] Speed up reads from bootstrapped tables in spark (#8303)
    
    Introduces new config `hoodie.bootstrap.data.queries.only` that is disabled
    by default. To read the Hudi metadata fields, it needs to be set to false.
---
 .../apache/hudi/config/HoodieBootstrapConfig.java  |  7 ++++++
 .../org/apache/hudi/config/HoodieWriteConfig.java  |  5 ++++
 .../main/scala/org/apache/hudi/DefaultSource.scala | 24 +++++++++++++++++--
 .../scala/org/apache/hudi/HoodieBaseRelation.scala |  5 +++-
 .../org/apache/hudi/HoodieBootstrapRelation.scala  | 13 ++++++++--
 .../scala/org/apache/hudi/HoodieFileIndex.scala    | 28 ++++++++++++++++++----
 .../apache/hudi/SparkHoodieTableFileIndex.scala    | 23 ++++++++++++------
 .../apache/hudi/functional/TestBootstrapRead.java  |  5 ++++
 .../functional/TestDataSourceForBootstrap.scala    |  9 ++++---
 9 files changed, 99 insertions(+), 20 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieBootstrapConfig.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieBootstrapConfig.java
index 1de36a26a5f..d88f0bb2e6f 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieBootstrapConfig.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieBootstrapConfig.java
@@ -67,6 +67,13 @@ public class HoodieBootstrapConfig extends HoodieConfig {
       .sinceVersion("0.6.0")
       .withDocumentation("Selects the mode in which each file/partition in the 
bootstrapped dataset gets bootstrapped");
 
+  public static final ConfigProperty<String> DATA_QUERIES_ONLY = ConfigProperty
+      .key("hoodie.bootstrap.data.queries.only")
+      .defaultValue("false")
+      .markAdvanced()
+      .sinceVersion("0.14.0")
+      .withDocumentation("Improves query performance, but queries cannot use 
hudi metadata fields");
+
   public static final ConfigProperty<String> 
FULL_BOOTSTRAP_INPUT_PROVIDER_CLASS_NAME = ConfigProperty
       .key("hoodie.bootstrap.full.input.provider")
       
.defaultValue("org.apache.hudi.bootstrap.SparkParquetBootstrapDataProvider")
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
index 878d185af06..50f264cd7e6 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
@@ -2855,6 +2855,11 @@ public class HoodieWriteConfig extends HoodieConfig {
       return this;
     }
 
+    public Builder withHiveStylePartitioningEnabled(boolean enabled) {
+      writeConfig.setValue(KeyGeneratorOptions.HIVE_STYLE_PARTITIONING_ENABLE, 
String.valueOf(enabled));
+      return this;
+    }
+
     public Builder withExternalSchemaTrasformation(boolean enabled) {
       writeConfig.setValue(AVRO_EXTERNAL_SCHEMA_TRANSFORMATION_ENABLE, 
String.valueOf(enabled));
       return this;
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala
index e4be1a136e2..6655093fc85 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala
@@ -28,6 +28,7 @@ import org.apache.hudi.common.table.timeline.HoodieInstant
 import org.apache.hudi.common.table.{HoodieTableMetaClient, 
TableSchemaResolver}
 import org.apache.hudi.common.util.ConfigUtils
 import org.apache.hudi.common.util.ValidationUtils.checkState
+import org.apache.hudi.config.HoodieBootstrapConfig.DATA_QUERIES_ONLY
 import org.apache.hudi.config.HoodieWriteConfig.WRITE_CONCURRENCY_MODE
 import org.apache.hudi.exception.HoodieException
 import org.apache.hudi.util.PathUtils
@@ -101,7 +102,8 @@ class DefaultSource extends RelationProvider
       )
     } else {
       Map()
-    }) ++ DataSourceOptionsHelper.parametersWithReadDefaults(optParams)
+    }) ++ DataSourceOptionsHelper.parametersWithReadDefaults(optParams +
+      (DATA_QUERIES_ONLY.key() -> sqlContext.getConf(DATA_QUERIES_ONLY.key(), 
optParams.getOrElse(DATA_QUERIES_ONLY.key(), 
DATA_QUERIES_ONLY.defaultValue()))))
 
     // Get the table base path
     val tablePath = if (globPaths.nonEmpty) {
@@ -262,7 +264,7 @@ object DefaultSource {
           new MergeOnReadIncrementalRelation(sqlContext, parameters, 
metaClient, userSchema)
 
         case (_, _, true) =>
-          new HoodieBootstrapRelation(sqlContext, userSchema, globPaths, 
metaClient, parameters)
+          resolveHoodieBootstrapRelation(sqlContext, globPaths, userSchema, 
metaClient, parameters)
 
         case (_, _, _) =>
           throw new HoodieException(s"Invalid query type : $queryType for 
tableType: $tableType," +
@@ -271,6 +273,24 @@ object DefaultSource {
     }
   }
 
+  private def resolveHoodieBootstrapRelation(sqlContext: SQLContext,
+                                             globPaths: Seq[Path],
+                                             userSchema: Option[StructType],
+                                             metaClient: HoodieTableMetaClient,
+                                             parameters: Map[String, String]): 
BaseRelation = {
+    val enableFileIndex = HoodieSparkConfUtils.getConfigValue(parameters, 
sqlContext.sparkSession.sessionState.conf,
+      ENABLE_HOODIE_FILE_INDEX.key, 
ENABLE_HOODIE_FILE_INDEX.defaultValue.toString).toBoolean
+    val isSchemaEvolutionEnabledOnRead = 
HoodieSparkConfUtils.getConfigValue(parameters,
+      sqlContext.sparkSession.sessionState.conf, 
DataSourceReadOptions.SCHEMA_EVOLUTION_ENABLED.key,
+      
DataSourceReadOptions.SCHEMA_EVOLUTION_ENABLED.defaultValue.toString).toBoolean
+    if (!enableFileIndex || isSchemaEvolutionEnabledOnRead
+      || globPaths.nonEmpty || !parameters.getOrElse(DATA_QUERIES_ONLY.key, 
DATA_QUERIES_ONLY.defaultValue).toBoolean) {
+      HoodieBootstrapRelation(sqlContext, userSchema, globPaths, metaClient, 
parameters + (DATA_QUERIES_ONLY.key() -> "false"))
+    } else {
+      HoodieBootstrapRelation(sqlContext, userSchema, globPaths, metaClient, 
parameters).toHadoopFsRelation
+    }
+  }
+
   private def resolveBaseFileOnlyRelation(sqlContext: SQLContext,
                                           globPaths: Seq[Path],
                                           userSchema: Option[StructType],
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala
index d7588335822..de67504d73d 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala
@@ -38,6 +38,7 @@ import org.apache.hudi.common.table.{HoodieTableConfig, 
HoodieTableMetaClient, T
 import org.apache.hudi.common.util.StringUtils.isNullOrEmpty
 import org.apache.hudi.common.util.ValidationUtils.checkState
 import org.apache.hudi.common.util.{ConfigUtils, StringUtils}
+import org.apache.hudi.config.HoodieBootstrapConfig.DATA_QUERIES_ONLY
 import org.apache.hudi.config.HoodieWriteConfig
 import org.apache.hudi.hadoop.CachingPath
 import org.apache.hudi.internal.schema.convert.AvroInternalSchemaConverter
@@ -225,7 +226,9 @@ abstract class HoodieBaseRelation(val sqlContext: 
SQLContext,
     val shouldExtractPartitionValueFromPath =
       
optParams.getOrElse(DataSourceReadOptions.EXTRACT_PARTITION_VALUES_FROM_PARTITION_PATH.key,
         
DataSourceReadOptions.EXTRACT_PARTITION_VALUES_FROM_PARTITION_PATH.defaultValue.toString).toBoolean
-    shouldOmitPartitionColumns || shouldExtractPartitionValueFromPath
+    val shouldUseBootstrapFastRead = 
optParams.getOrElse(DATA_QUERIES_ONLY.key(), "false").toBoolean
+
+    shouldOmitPartitionColumns || shouldExtractPartitionValueFromPath || 
shouldUseBootstrapFastRead
   }
 
   /**
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBootstrapRelation.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBootstrapRelation.scala
index 5e15c0d1d68..34db2850501 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBootstrapRelation.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBootstrapRelation.scala
@@ -29,7 +29,7 @@ import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.SQLContext
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions.Expression
-import org.apache.spark.sql.execution.datasources.PartitionedFile
+import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, 
PartitionedFile}
 import org.apache.spark.sql.hudi.HoodieSqlCommonUtils.isMetaField
 import org.apache.spark.sql.sources.Filter
 import org.apache.spark.sql.types.StructType
@@ -200,6 +200,16 @@ case class HoodieBootstrapRelation(override val 
sqlContext: SQLContext,
   override def updatePrunedDataSchema(prunedSchema: StructType): 
HoodieBootstrapRelation =
     this.copy(prunedDataSchema = Some(prunedSchema))
 
+  def toHadoopFsRelation: HadoopFsRelation = {
+      HadoopFsRelation(
+        location = fileIndex,
+        partitionSchema = fileIndex.partitionSchema,
+        dataSchema = fileIndex.dataSchema,
+        bucketSpec = None,
+        fileFormat = fileFormat,
+        optParams)(sparkSession)
+  }
+
   //TODO: This should be unnecessary with spark 3.4 [SPARK-41970]
   private def encodePartitionPath(file: FileStatus): String = {
     val tablePathWithoutScheme = 
CachingPath.getPathWithoutSchemeAndAuthority(bootstrapBasePath)
@@ -212,7 +222,6 @@ case class HoodieBootstrapRelation(override val sqlContext: 
SQLContext,
 
 
 object HoodieBootstrapRelation {
-
   private def validate(requiredDataSchema: HoodieTableSchema, 
requiredDataFileSchema: StructType, requiredSkeletonFileSchema: StructType): 
Unit = {
     val requiredDataColumns: Seq[String] = 
requiredDataSchema.structTypeSchema.fieldNames.toSeq
     val combinedColumns = (requiredSkeletonFileSchema.fieldNames ++ 
requiredDataFileSchema.fieldNames).toSeq
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala
index 3131c81abee..3767b65a8ce 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala
@@ -22,6 +22,7 @@ import 
org.apache.hudi.HoodieFileIndex.{DataSkippingFailureMode, collectReferenc
 import org.apache.hudi.HoodieSparkConfUtils.getConfigValue
 import 
org.apache.hudi.common.config.TimestampKeyGeneratorConfig.{TIMESTAMP_INPUT_DATE_FORMAT,
 TIMESTAMP_OUTPUT_DATE_FORMAT}
 import org.apache.hudi.common.config.{HoodieMetadataConfig, TypedProperties}
+import org.apache.hudi.common.model.HoodieBaseFile
 import org.apache.hudi.common.table.HoodieTableMetaClient
 import org.apache.hudi.common.util.StringUtils
 import org.apache.hudi.exception.HoodieException
@@ -41,6 +42,7 @@ import org.apache.spark.unsafe.types.UTF8String
 import java.text.SimpleDateFormat
 import javax.annotation.concurrent.NotThreadSafe
 import scala.collection.JavaConverters._
+import scala.collection.mutable
 import scala.util.control.NonFatal
 import scala.util.{Failure, Success, Try}
 
@@ -145,11 +147,10 @@ case class HoodieFileIndex(spark: SparkSession,
     val prunedPartitions = listMatchingPartitionPaths(partitionFilters)
     val listedPartitions = getInputFileSlices(prunedPartitions: 
_*).asScala.toSeq.map {
       case (partition, fileSlices) =>
-        val baseFileStatuses: Seq[FileStatus] =
-          fileSlices.asScala
-            .map(fs => fs.getBaseFile.orElse(null))
-            .filter(_ != null)
-            .map(_.getFileStatus)
+        val baseFileStatuses: Seq[FileStatus] = getBaseFileStatus(fileSlices
+          .asScala
+          .map(fs => fs.getBaseFile.orElse(null))
+          .filter(_ != null))
 
         // Filter in candidate files based on the col-stats index lookup
         val candidateFiles = baseFileStatuses.filter(fs =>
@@ -179,6 +180,23 @@ case class HoodieFileIndex(spark: SparkSession,
     }
   }
 
+  /**
+   * In the fast bootstrap read code path, it gets the file status for the 
bootstrap base files instead of
+   * skeleton files.
+   */
+  private def getBaseFileStatus(baseFiles: mutable.Buffer[HoodieBaseFile]): 
mutable.Buffer[FileStatus] = {
+    if (shouldFastBootstrap) {
+     baseFiles.map(f =>
+        if (f.getBootstrapBaseFile.isPresent) {
+         f.getBootstrapBaseFile.get().getFileStatus
+        } else {
+          f.getFileStatus
+        })
+    } else {
+      baseFiles.map(_.getFileStatus)
+    }
+  }
+
   private def lookupFileNamesMissingFromIndex(allIndexedFileNames: 
Set[String]) = {
     val allBaseFileNames = allFiles.map(f => f.getPath.getName).toSet
     allBaseFileNames -- allIndexedFileNames
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SparkHoodieTableFileIndex.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SparkHoodieTableFileIndex.scala
index be9c8cdb6bb..c76af7b39ce 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SparkHoodieTableFileIndex.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SparkHoodieTableFileIndex.scala
@@ -23,11 +23,12 @@ import org.apache.hudi.DataSourceReadOptions._
 import org.apache.hudi.HoodieConversionUtils.toJavaOption
 import org.apache.hudi.SparkHoodieTableFileIndex._
 import org.apache.hudi.client.common.HoodieSparkEngineContext
-import org.apache.hudi.common.bootstrap.index.BootstrapIndex
 import org.apache.hudi.common.config.TypedProperties
+import 
org.apache.hudi.common.model.HoodieRecord.HOODIE_META_COLUMNS_WITH_OPERATION
 import org.apache.hudi.common.model.{FileSlice, HoodieTableQueryType}
 import org.apache.hudi.common.table.{HoodieTableMetaClient, 
TableSchemaResolver}
 import org.apache.hudi.common.util.ValidationUtils.checkState
+import org.apache.hudi.config.HoodieBootstrapConfig.DATA_QUERIES_ONLY
 import org.apache.hudi.hadoop.CachingPath
 import org.apache.hudi.hadoop.CachingPath.createRelativePathUnsafe
 import org.apache.hudi.keygen.{StringPartitionPathFormatter, 
TimestampBasedAvroKeyGenerator, TimestampBasedKeyGenerator}
@@ -40,7 +41,7 @@ import org.apache.spark.sql.catalyst.util.DateTimeUtils
 import org.apache.spark.sql.catalyst.{InternalRow, expressions}
 import org.apache.spark.sql.execution.datasources.{FileStatusCache, NoopCache}
 import org.apache.spark.sql.internal.SQLConf
-import org.apache.spark.sql.types.{ByteType, DataType, DateType, IntegerType, 
LongType, ShortType, StringType, StructField, StructType}
+import org.apache.spark.sql.types._
 import org.apache.spark.unsafe.types.UTF8String
 
 import javax.annotation.concurrent.NotThreadSafe
@@ -83,10 +84,18 @@ class SparkHoodieTableFileIndex(spark: SparkSession,
   /**
    * Get the schema of the table.
    */
-  lazy val schema: StructType = schemaSpec.getOrElse({
-    val schemaUtil = new TableSchemaResolver(metaClient)
-    
AvroConversionUtils.convertAvroSchemaToStructType(schemaUtil.getTableAvroSchema)
-  })
+  lazy val schema: StructType = if (shouldFastBootstrap) {
+      StructType(rawSchema.fields.filterNot(f => 
HOODIE_META_COLUMNS_WITH_OPERATION.contains(f.name)))
+    } else {
+      rawSchema
+    }
+
+  private lazy val rawSchema: StructType = schemaSpec.getOrElse({
+      val schemaUtil = new TableSchemaResolver(metaClient)
+      
AvroConversionUtils.convertAvroSchemaToStructType(schemaUtil.getTableAvroSchema)
+    })
+
+  protected lazy val shouldFastBootstrap = 
configProperties.getBoolean(DATA_QUERIES_ONLY.key, false)
 
   private lazy val sparkParsePartitionUtil = 
sparkAdapter.getSparkParsePartitionUtil
 
@@ -110,7 +119,7 @@ class SparkHoodieTableFileIndex(spark: SparkSession,
           .map(column => nameFieldMap.apply(column))
 
         if (partitionFields.size != partitionColumns.get().size) {
-          val isBootstrapTable = 
BootstrapIndex.getBootstrapIndex(metaClient).useIndex()
+          val isBootstrapTable = tableConfig.getBootstrapBasePath.isPresent
           if (isBootstrapTable) {
             // For bootstrapped tables its possible the schema does not 
contain partition field when source table
             // is hive style partitioned. In this case we would like to treat 
the table as non-partitioned
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestBootstrapRead.java
 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestBootstrapRead.java
index 874d6fe3062..f6ec64ce16b 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestBootstrapRead.java
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestBootstrapRead.java
@@ -54,6 +54,7 @@ import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
 import static 
org.apache.hudi.common.testutils.RawTripTestPayload.recordToString;
+import static org.apache.hudi.config.HoodieBootstrapConfig.DATA_QUERIES_ONLY;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 
 /**
@@ -228,12 +229,16 @@ public class TestBootstrapRead extends 
HoodieSparkClientTestBase {
     }
     Dataset<Row> hudiDf = 
sparkSession.read().options(readOpts).format("hudi").load(hudiBasePath);
     Dataset<Row> bootstrapDf = 
sparkSession.read().format("hudi").load(bootstrapTargetPath);
+    Dataset<Row> fastBootstrapDf = 
sparkSession.read().format("hudi").option(DATA_QUERIES_ONLY.key(), 
"true").load(bootstrapTargetPath);
     if (nPartitions == 0) {
+      compareDf(fastBootstrapDf.drop("city_to_state"), 
bootstrapDf.drop(dropColumns).drop("_hoodie_partition_path"));
       compareDf(hudiDf.drop(dropColumns), bootstrapDf.drop(dropColumns));
       return;
     }
     compareDf(hudiDf.drop(dropColumns).drop(partitionCols), 
bootstrapDf.drop(dropColumns).drop(partitionCols));
+    compareDf(fastBootstrapDf.drop("city_to_state").drop(partitionCols), 
bootstrapDf.drop(dropColumns).drop("_hoodie_partition_path").drop(partitionCols));
     compareDf(hudiDf.select("_row_key",partitionCols), 
bootstrapDf.select("_row_key",partitionCols));
+    compareDf(fastBootstrapDf.select("_row_key",partitionCols), 
bootstrapDf.select("_row_key",partitionCols));
   }
 
   protected void compareDf(Dataset<Row> df1, Dataset<Row> df2) {
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestDataSourceForBootstrap.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestDataSourceForBootstrap.scala
index 81c9233eb34..12974d133a8 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestDataSourceForBootstrap.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestDataSourceForBootstrap.scala
@@ -583,10 +583,12 @@ class TestDataSourceForBootstrap {
     assertEquals(HoodieTimeline.FULL_BOOTSTRAP_INSTANT_TS, commitInstantTime1)
 
     // Read bootstrapped table and verify count
-    val hoodieROViewDF1 = spark.read.format("hudi").load(basePath + "/*")
+    val hoodieROViewDF1 = spark.read.format("hudi")
+      .option(HoodieBootstrapConfig.DATA_QUERIES_ONLY.key(), 
"true").load(basePath + "/*")
     assertEquals(sort(sourceDF).collectAsList(), 
sort(dropMetaCols(hoodieROViewDF1)).collectAsList())
 
-    val hoodieROViewDFWithBasePath = spark.read.format("hudi").load(basePath)
+    val hoodieROViewDFWithBasePath = spark.read.format("hudi")
+      .option(HoodieBootstrapConfig.DATA_QUERIES_ONLY.key(), 
"true").load(basePath)
     assertEquals(sort(sourceDF).collectAsList(), 
sort(dropMetaCols(hoodieROViewDFWithBasePath)).collectAsList())
 
     // Perform upsert
@@ -606,7 +608,8 @@ class TestDataSourceForBootstrap {
     assertEquals(1, HoodieDataSourceHelpers.listCommitsSince(fs, basePath, 
commitInstantTime1).size())
 
     // Read table after upsert and verify count
-    val hoodieROViewDF2 = spark.read.format("hudi").load(basePath + "/*")
+    val hoodieROViewDF2 = spark.read.format("hudi")
+      .option(HoodieBootstrapConfig.DATA_QUERIES_ONLY.key(), 
"true").load(basePath + "/*")
     assertEquals(numRecords, hoodieROViewDF2.count())
     assertEquals(numRecordsUpdate, hoodieROViewDF2.filter(s"timestamp == 
$updateTimestamp").count())
 

Reply via email to