[ 
https://issues.apache.org/jira/browse/HUDI-1489?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Wenning Ding updated HUDI-1489:
-------------------------------
    Description: 
After updating Hudi table with the written bootstrap table, it would fail to 
read the latest bootstrap table.
h3. Reproduction steps
{code:java}
import org.apache.hudi.DataSourceWriteOptions
import org.apache.hudi.common.model.HoodieTableType
import org.apache.hudi.config.HoodieBootstrapConfig
import org.apache.hudi.config.HoodieWriteConfig
import org.apache.spark.sql.SaveMode
import org.apache.spark.sql.SparkSession

    val bucketName = "wenningd-dev"
    val tableName = "hudi_bootstrap_test_cow_5c1a5147_888e_4b638bef8"
    val recordKeyName = "event_id"
    val partitionKeyName = "event_type"
    val precombineKeyName = "event_time"
    val verificationRecordKey = "4"
    val verificationColumn = "event_name"
    val originalVerificationValue = "event_d"
    val updatedVerificationValue = "event_test"
    val sourceTableLocation = "s3://wenningd-dev/hudi/test-data/source_table/"

    val tableType = HoodieTableType.COPY_ON_WRITE.name()
    val verificationSqlQuery = "select " + verificationColumn + " from " + 
tableName + " where " + recordKeyName + " = '" + verificationRecordKey + "'"
    val tablePath = "s3://" + bucketName + "/hudi/tables/" + tableName
    val loadTablePath = tablePath + "/*/*"

    // Create table and sync with hive

        val df = spark.emptyDataFrame
        val tableType = HoodieTableType.COPY_ON_WRITE.name

        df.write
          .format("hudi")
          .option(HoodieWriteConfig.TABLE_NAME, tableName)
          .option(HoodieBootstrapConfig.BOOTSTRAP_BASE_PATH_PROP, 
sourceTableLocation)
          .option(HoodieBootstrapConfig.BOOTSTRAP_KEYGEN_CLASS, 
"org.apache.hudi.keygen.SimpleKeyGenerator")
          .option(DataSourceWriteOptions.OPERATION_OPT_KEY, 
DataSourceWriteOptions.BOOTSTRAP_OPERATION_OPT_VAL)
          .option(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY, tableType)
          .option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, recordKeyName)
          .option(DataSourceWriteOptions.HIVE_SYNC_ENABLED_OPT_KEY, "true")
          .option(DataSourceWriteOptions.HIVE_TABLE_OPT_KEY, tableName)
          .option(DataSourceWriteOptions.HIVE_PARTITION_FIELDS_OPT_KEY, 
partitionKeyName)
          
.option(DataSourceWriteOptions.HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY, 
"org.apache.hudi.hive.MultiPartKeysValueExtractor")
          .mode(SaveMode.Overwrite)
          .save(tablePath)

    // Verify create with spark sql query
    val result0 = spark.sql(verificationSqlQuery)
    if (!(result0.count == 1) || 
!result0.collect.mkString.contains(originalVerificationValue)) {
      throw new TestFailureException("Create table verification failed!")
    }


    val df3 = spark.read.format("org.apache.hudi").load(loadTablePath)
val df4 = df3.filter(col(recordKeyName) === verificationRecordKey)
val df5 = df4.withColumn(verificationColumn, lit(updatedVerificationValue))
df5.write.format("org.apache.hudi")
  .option(DataSourceWriteOptions.STORAGE_TYPE_OPT_KEY, tableType)
  .option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, recordKeyName)
  .option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, partitionKeyName)
  .option(HoodieWriteConfig.TABLE_NAME, tableName)
  .option(DataSourceWriteOptions.OPERATION_OPT_KEY, 
DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL)
  .option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, precombineKeyName)
  .option(DataSourceWriteOptions.HIVE_SYNC_ENABLED_OPT_KEY, "true")
  .option(DataSourceWriteOptions.HIVE_TABLE_OPT_KEY, tableName)
  .option(DataSourceWriteOptions.HIVE_PARTITION_FIELDS_OPT_KEY, 
partitionKeyName)
  .option(DataSourceWriteOptions.HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY, 
"org.apache.hudi.hive.MultiPartKeysValueExtractor")
  .mode(SaveMode.Append)
  .save(tablePath)

  val result1 = spark.sql(verificationSqlQuery)

  val df6 = spark.read.format("org.apache.hudi").load(loadTablePath)

df6.show
{code}
df6.show would return:
{code:java}
Driver stacktrace:
  at 
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:2043)
  at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:2031)
  at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:2030)
  at 
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
  at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
  at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2030)
  at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:967)
  at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:967)
  at scala.Option.foreach(Option.scala:257)
  at 
org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:967)
  at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2264)
  at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2213)
  at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2202)
  at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
  at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:778)
  at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061)
  at org.apache.spark.SparkContext.runJob(SparkContext.scala:2082)
  at org.apache.spark.SparkContext.runJob(SparkContext.scala:2101)
  at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:407)
  at 
org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:38)
  at 
org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collectFromPlan(Dataset.scala:3395)
  at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2552)
  at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2552)
  at org.apache.spark.sql.Dataset$$anonfun$52.apply(Dataset.scala:3370)
  at 
org.apache.spark.sql.execution.SQLExecution$.org$apache$spark$sql$execution$SQLExecution$$executeQuery$1(SQLExecution.scala:83)
  at 
org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1$$anonfun$apply$1.apply(SQLExecution.scala:94)
  at 
org.apache.spark.sql.execution.QueryExecutionMetrics$.withMetrics(QueryExecutionMetrics.scala:141)
  at 
org.apache.spark.sql.execution.SQLExecution$.org$apache$spark$sql$execution$SQLExecution$$withMetrics(SQLExecution.scala:178)
  at 
org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:93)
  at 
org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:200)
  at 
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:92)
  at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3369)
  at org.apache.spark.sql.Dataset.head(Dataset.scala:2552)
  at org.apache.spark.sql.Dataset.take(Dataset.scala:2766)
  at org.apache.spark.sql.Dataset.getRows(Dataset.scala:255)
  at org.apache.spark.sql.Dataset.showString(Dataset.scala:292)
  at org.apache.spark.sql.Dataset.show(Dataset.scala:753)
  at org.apache.spark.sql.Dataset.show(Dataset.scala:712)
  at org.apache.spark.sql.Dataset.show(Dataset.scala:721)
  ... 49 elided
Caused by: java.lang.NullPointerException
  at 
org.apache.spark.sql.execution.vectorized.WritableColumnVector.arrayData(WritableColumnVector.java:637)
  at 
org.apache.spark.sql.execution.vectorized.WritableColumnVector.getUTF8String(WritableColumnVector.java:378)
  at 
org.apache.spark.sql.execution.vectorized.MutableColumnarRow.getUTF8String(MutableColumnarRow.java:135)
  at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown
 Source)
  at 
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
  at 
org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:585)
  at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:297)
  at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:289)
  at 
org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:858)
  at 
org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:858)
  at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
  at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
  at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
{code}
The root cause is:

 the {{requiredColumns}} in the 
[buildScan()|https://github.com/apache/hudi/blob/release-0.6.0/hudi-spark/src/main/scala/org/apache/hudi/HoodieBootstrapRelation.scala#L72]
 does not follow the same order as the schema file.

For example, when I selected all the columns, I printed the {{requiredColumns}}:

 

{{20/10/13 22:57:59 WARN HoodieBootstrapRelation: wenningd = > required 
columns: _hoodie_commit_time _hoodie_record_key _hoodie_partition_path 
event_type event_id event_guests event_time _hoodie_commit_seqno 
_hoodie_file_name event_name}}

You can see not all the metadata columns are in the front. So the problem here 
is when we try to use 
[regularReadFunction|https://github.com/apache/hudi/blob/release-0.6.0/hudi-spark/src/main/scala/org/apache/hudi/HoodieBootstrapRelation.scala#L127],
 we use this as the schema: {{requiredSkeletonSchema.fields ++ 
requiredDataSchema.fields}}. But since the required columns do not follow the 
same order as schema file, there's a schema mismatch between 
[requiredSchema|https://github.com/apache/hudi/blob/release-0.6.0/hudi-spark/src/main/scala/org/apache/hudi/HoodieBootstrapRelation.scala#L132]
 and {{requiredColumns}}

  was:
After updating Hudi table with the written bootstrap table, it would fail to 
read the latest bootstrap table.
h3. Reproduction steps
{code:java}
import org.apache.hudi.DataSourceWriteOptions
import org.apache.hudi.common.model.HoodieTableType
import org.apache.hudi.config.HoodieBootstrapConfig
import org.apache.hudi.config.HoodieWriteConfig
import org.apache.spark.sql.SaveMode
import org.apache.spark.sql.SparkSession

    val bucketName = "wenningd-emr-dev"
    val tableName = "hudi_bootstrap_test_cow_5c1a5147_888e_4b638bef8"
    val recordKeyName = "event_id"
    val partitionKeyName = "event_type"
    val precombineKeyName = "event_time"
    val verificationRecordKey = "4"
    val verificationColumn = "event_name"
    val originalVerificationValue = "event_d"
    val updatedVerificationValue = "event_test"
    // val sourceTableWithoutHiveStylePartition = 
"s3://wenningd-emr-dev/hudi/test-data/source_table/"

    // new parameters
    val sourceTableLocation = 
"s3://wenningd-emr-dev/hudi/test-data/source_table/"

    val tableType = HoodieTableType.COPY_ON_WRITE.name()
    val verificationSqlQuery = "select " + verificationColumn + " from " + 
tableName + " where " + recordKeyName + " = '" + verificationRecordKey + "'"
    val tablePath = "s3://" + bucketName + "/hudi/tables/" + tableName
    val loadTablePath = tablePath + "/*/*"

    // Create table and sync with hive

        val df = spark.emptyDataFrame
        val tableType = HoodieTableType.COPY_ON_WRITE.name
    // val tableType = HoodieTableType.MERGE_ON_READ.name

        df.write
          .format("hudi")
          .option(HoodieWriteConfig.TABLE_NAME, tableName)
          .option(HoodieBootstrapConfig.BOOTSTRAP_BASE_PATH_PROP, 
sourceTableLocation)
          .option(HoodieBootstrapConfig.BOOTSTRAP_KEYGEN_CLASS, 
"org.apache.hudi.keygen.SimpleKeyGenerator")
          .option(DataSourceWriteOptions.OPERATION_OPT_KEY, 
DataSourceWriteOptions.BOOTSTRAP_OPERATION_OPT_VAL)
          .option(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY, tableType)
          .option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, recordKeyName)
          .option(DataSourceWriteOptions.HIVE_SYNC_ENABLED_OPT_KEY, "true")
          .option(DataSourceWriteOptions.HIVE_TABLE_OPT_KEY, tableName)
          .option(DataSourceWriteOptions.HIVE_PARTITION_FIELDS_OPT_KEY, 
partitionKeyName)
          
.option(DataSourceWriteOptions.HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY, 
"org.apache.hudi.hive.MultiPartKeysValueExtractor")
          .mode(SaveMode.Overwrite)
          .save(tablePath)

    // Verify create with spark sql query
    val result0 = spark.sql(verificationSqlQuery)
    if (!(result0.count == 1) || 
!result0.collect.mkString.contains(originalVerificationValue)) {
      throw new TestFailureException("Create table verification failed!")
    }


    val df3 = spark.read.format("org.apache.hudi").load(loadTablePath)
val df4 = df3.filter(col(recordKeyName) === verificationRecordKey)
val df5 = df4.withColumn(verificationColumn, lit(updatedVerificationValue))
df5.write.format("org.apache.hudi")
  .option(DataSourceWriteOptions.STORAGE_TYPE_OPT_KEY, tableType)
  .option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, recordKeyName)
  .option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, partitionKeyName)
  .option(HoodieWriteConfig.TABLE_NAME, tableName)
  .option(DataSourceWriteOptions.OPERATION_OPT_KEY, 
DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL)
  .option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, precombineKeyName)
  // .option(DataSourceWriteOptions.HIVE_STYLE_PARTITIONING_OPT_KEY, "true")
  .option(DataSourceWriteOptions.HIVE_SYNC_ENABLED_OPT_KEY, "true")
  .option(DataSourceWriteOptions.HIVE_TABLE_OPT_KEY, tableName)
  .option(DataSourceWriteOptions.HIVE_PARTITION_FIELDS_OPT_KEY, 
partitionKeyName)
  .option(DataSourceWriteOptions.HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY, 
"org.apache.hudi.hive.MultiPartKeysValueExtractor")
  .mode(SaveMode.Append)
  .save(tablePath)

  val result1 = spark.sql(verificationSqlQuery)

  val df6 = spark.read.format("org.apache.hudi").load(loadTablePath)

df6.show
{code}
df6.show would return:
{code:java}
Driver stacktrace:
  at 
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:2043)
  at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:2031)
  at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:2030)
  at 
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
  at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
  at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2030)
  at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:967)
  at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:967)
  at scala.Option.foreach(Option.scala:257)
  at 
org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:967)
  at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2264)
  at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2213)
  at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2202)
  at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
  at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:778)
  at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061)
  at org.apache.spark.SparkContext.runJob(SparkContext.scala:2082)
  at org.apache.spark.SparkContext.runJob(SparkContext.scala:2101)
  at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:407)
  at 
org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:38)
  at 
org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collectFromPlan(Dataset.scala:3395)
  at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2552)
  at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2552)
  at org.apache.spark.sql.Dataset$$anonfun$52.apply(Dataset.scala:3370)
  at 
org.apache.spark.sql.execution.SQLExecution$.org$apache$spark$sql$execution$SQLExecution$$executeQuery$1(SQLExecution.scala:83)
  at 
org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1$$anonfun$apply$1.apply(SQLExecution.scala:94)
  at 
org.apache.spark.sql.execution.QueryExecutionMetrics$.withMetrics(QueryExecutionMetrics.scala:141)
  at 
org.apache.spark.sql.execution.SQLExecution$.org$apache$spark$sql$execution$SQLExecution$$withMetrics(SQLExecution.scala:178)
  at 
org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:93)
  at 
org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:200)
  at 
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:92)
  at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3369)
  at org.apache.spark.sql.Dataset.head(Dataset.scala:2552)
  at org.apache.spark.sql.Dataset.take(Dataset.scala:2766)
  at org.apache.spark.sql.Dataset.getRows(Dataset.scala:255)
  at org.apache.spark.sql.Dataset.showString(Dataset.scala:292)
  at org.apache.spark.sql.Dataset.show(Dataset.scala:753)
  at org.apache.spark.sql.Dataset.show(Dataset.scala:712)
  at org.apache.spark.sql.Dataset.show(Dataset.scala:721)
  ... 49 elided
Caused by: java.lang.NullPointerException
  at 
org.apache.spark.sql.execution.vectorized.WritableColumnVector.arrayData(WritableColumnVector.java:637)
  at 
org.apache.spark.sql.execution.vectorized.WritableColumnVector.getUTF8String(WritableColumnVector.java:378)
  at 
org.apache.spark.sql.execution.vectorized.MutableColumnarRow.getUTF8String(MutableColumnarRow.java:135)
  at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown
 Source)
  at 
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
  at 
org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:585)
  at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:297)
  at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:289)
  at 
org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:858)
  at 
org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:858)
  at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
  at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
  at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
{code}
The root cause is:

 the {{requiredColumns}} in the 
[buildScan()|https://github.com/apache/hudi/blob/release-0.6.0/hudi-spark/src/main/scala/org/apache/hudi/HoodieBootstrapRelation.scala#L72]
 does not follow the same order as the schema file.

For example, when I selected all the columns, I printed the {{requiredColumns}}:

 

{{20/10/13 22:57:59 WARN HoodieBootstrapRelation: wenningd = > required 
columns: _hoodie_commit_time _hoodie_record_key _hoodie_partition_path 
event_type event_id event_guests event_time _hoodie_commit_seqno 
_hoodie_file_name event_name}}

You can see not all the metadata columns are in the front. So the problem here 
is when we try to use 
[regularReadFunction|https://github.com/apache/hudi/blob/release-0.6.0/hudi-spark/src/main/scala/org/apache/hudi/HoodieBootstrapRelation.scala#L127],
 we use this as the schema: {{requiredSkeletonSchema.fields ++ 
requiredDataSchema.fields}}. But since the required columns do not follow the 
same order as schema file, there's a schema mismatch between 
[requiredSchema|https://github.com/apache/hudi/blob/release-0.6.0/hudi-spark/src/main/scala/org/apache/hudi/HoodieBootstrapRelation.scala#L132]
 and {{requiredColumns}}


> Not able to read after updating bootstrap table with written table
> ------------------------------------------------------------------
>
>                 Key: HUDI-1489
>                 URL: https://issues.apache.org/jira/browse/HUDI-1489
>             Project: Apache Hudi
>          Issue Type: Bug
>            Reporter: Wenning Ding
>            Assignee: Wenning Ding
>            Priority: Major
>
> After updating Hudi table with the written bootstrap table, it would fail to 
> read the latest bootstrap table.
> h3. Reproduction steps
> {code:java}
> import org.apache.hudi.DataSourceWriteOptions
> import org.apache.hudi.common.model.HoodieTableType
> import org.apache.hudi.config.HoodieBootstrapConfig
> import org.apache.hudi.config.HoodieWriteConfig
> import org.apache.spark.sql.SaveMode
> import org.apache.spark.sql.SparkSession
>     val bucketName = "wenningd-dev"
>     val tableName = "hudi_bootstrap_test_cow_5c1a5147_888e_4b638bef8"
>     val recordKeyName = "event_id"
>     val partitionKeyName = "event_type"
>     val precombineKeyName = "event_time"
>     val verificationRecordKey = "4"
>     val verificationColumn = "event_name"
>     val originalVerificationValue = "event_d"
>     val updatedVerificationValue = "event_test"
>     val sourceTableLocation = "s3://wenningd-dev/hudi/test-data/source_table/"
>     val tableType = HoodieTableType.COPY_ON_WRITE.name()
>     val verificationSqlQuery = "select " + verificationColumn + " from " + 
> tableName + " where " + recordKeyName + " = '" + verificationRecordKey + "'"
>     val tablePath = "s3://" + bucketName + "/hudi/tables/" + tableName
>     val loadTablePath = tablePath + "/*/*"
>     // Create table and sync with hive
>         val df = spark.emptyDataFrame
>         val tableType = HoodieTableType.COPY_ON_WRITE.name
>         df.write
>           .format("hudi")
>           .option(HoodieWriteConfig.TABLE_NAME, tableName)
>           .option(HoodieBootstrapConfig.BOOTSTRAP_BASE_PATH_PROP, 
> sourceTableLocation)
>           .option(HoodieBootstrapConfig.BOOTSTRAP_KEYGEN_CLASS, 
> "org.apache.hudi.keygen.SimpleKeyGenerator")
>           .option(DataSourceWriteOptions.OPERATION_OPT_KEY, 
> DataSourceWriteOptions.BOOTSTRAP_OPERATION_OPT_VAL)
>           .option(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY, tableType)
>           .option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, 
> recordKeyName)
>           .option(DataSourceWriteOptions.HIVE_SYNC_ENABLED_OPT_KEY, "true")
>           .option(DataSourceWriteOptions.HIVE_TABLE_OPT_KEY, tableName)
>           .option(DataSourceWriteOptions.HIVE_PARTITION_FIELDS_OPT_KEY, 
> partitionKeyName)
>           
> .option(DataSourceWriteOptions.HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY, 
> "org.apache.hudi.hive.MultiPartKeysValueExtractor")
>           .mode(SaveMode.Overwrite)
>           .save(tablePath)
>     // Verify create with spark sql query
>     val result0 = spark.sql(verificationSqlQuery)
>     if (!(result0.count == 1) || 
> !result0.collect.mkString.contains(originalVerificationValue)) {
>       throw new TestFailureException("Create table verification failed!")
>     }
>     val df3 = spark.read.format("org.apache.hudi").load(loadTablePath)
> val df4 = df3.filter(col(recordKeyName) === verificationRecordKey)
> val df5 = df4.withColumn(verificationColumn, lit(updatedVerificationValue))
> df5.write.format("org.apache.hudi")
>   .option(DataSourceWriteOptions.STORAGE_TYPE_OPT_KEY, tableType)
>   .option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, recordKeyName)
>   .option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, 
> partitionKeyName)
>   .option(HoodieWriteConfig.TABLE_NAME, tableName)
>   .option(DataSourceWriteOptions.OPERATION_OPT_KEY, 
> DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL)
>   .option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, precombineKeyName)
>   .option(DataSourceWriteOptions.HIVE_SYNC_ENABLED_OPT_KEY, "true")
>   .option(DataSourceWriteOptions.HIVE_TABLE_OPT_KEY, tableName)
>   .option(DataSourceWriteOptions.HIVE_PARTITION_FIELDS_OPT_KEY, 
> partitionKeyName)
>   .option(DataSourceWriteOptions.HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY, 
> "org.apache.hudi.hive.MultiPartKeysValueExtractor")
>   .mode(SaveMode.Append)
>   .save(tablePath)
>   val result1 = spark.sql(verificationSqlQuery)
>   val df6 = spark.read.format("org.apache.hudi").load(loadTablePath)
> df6.show
> {code}
> df6.show would return:
> {code:java}
> Driver stacktrace:
>   at 
> org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:2043)
>   at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:2031)
>   at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:2030)
>   at 
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>   at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
>   at 
> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2030)
>   at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:967)
>   at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:967)
>   at scala.Option.foreach(Option.scala:257)
>   at 
> org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:967)
>   at 
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2264)
>   at 
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2213)
>   at 
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2202)
>   at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
>   at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:778)
>   at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061)
>   at org.apache.spark.SparkContext.runJob(SparkContext.scala:2082)
>   at org.apache.spark.SparkContext.runJob(SparkContext.scala:2101)
>   at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:407)
>   at 
> org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:38)
>   at 
> org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collectFromPlan(Dataset.scala:3395)
>   at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2552)
>   at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2552)
>   at org.apache.spark.sql.Dataset$$anonfun$52.apply(Dataset.scala:3370)
>   at 
> org.apache.spark.sql.execution.SQLExecution$.org$apache$spark$sql$execution$SQLExecution$$executeQuery$1(SQLExecution.scala:83)
>   at 
> org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1$$anonfun$apply$1.apply(SQLExecution.scala:94)
>   at 
> org.apache.spark.sql.execution.QueryExecutionMetrics$.withMetrics(QueryExecutionMetrics.scala:141)
>   at 
> org.apache.spark.sql.execution.SQLExecution$.org$apache$spark$sql$execution$SQLExecution$$withMetrics(SQLExecution.scala:178)
>   at 
> org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:93)
>   at 
> org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:200)
>   at 
> org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:92)
>   at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3369)
>   at org.apache.spark.sql.Dataset.head(Dataset.scala:2552)
>   at org.apache.spark.sql.Dataset.take(Dataset.scala:2766)
>   at org.apache.spark.sql.Dataset.getRows(Dataset.scala:255)
>   at org.apache.spark.sql.Dataset.showString(Dataset.scala:292)
>   at org.apache.spark.sql.Dataset.show(Dataset.scala:753)
>   at org.apache.spark.sql.Dataset.show(Dataset.scala:712)
>   at org.apache.spark.sql.Dataset.show(Dataset.scala:721)
>   ... 49 elided
> Caused by: java.lang.NullPointerException
>   at 
> org.apache.spark.sql.execution.vectorized.WritableColumnVector.arrayData(WritableColumnVector.java:637)
>   at 
> org.apache.spark.sql.execution.vectorized.WritableColumnVector.getUTF8String(WritableColumnVector.java:378)
>   at 
> org.apache.spark.sql.execution.vectorized.MutableColumnarRow.getUTF8String(MutableColumnarRow.java:135)
>   at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown
>  Source)
>   at 
> org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
>   at 
> org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:585)
>   at 
> org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:297)
>   at 
> org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:289)
>   at 
> org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:858)
>   at 
> org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:858)
>   at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
>   at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
>   at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
> {code}
> The root cause is:
>  the {{requiredColumns}} in the 
> [buildScan()|https://github.com/apache/hudi/blob/release-0.6.0/hudi-spark/src/main/scala/org/apache/hudi/HoodieBootstrapRelation.scala#L72]
>  does not follow the same order as the schema file.
> For example, when I selected all the columns, I printed the 
> {{requiredColumns}}:
>  
> {{20/10/13 22:57:59 WARN HoodieBootstrapRelation: wenningd = > required 
> columns: _hoodie_commit_time _hoodie_record_key _hoodie_partition_path 
> event_type event_id event_guests event_time _hoodie_commit_seqno 
> _hoodie_file_name event_name}}
> You can see not all the metadata columns are in the front. So the problem 
> here is when we try to use 
> [regularReadFunction|https://github.com/apache/hudi/blob/release-0.6.0/hudi-spark/src/main/scala/org/apache/hudi/HoodieBootstrapRelation.scala#L127],
>  we use this as the schema: {{requiredSkeletonSchema.fields ++ 
> requiredDataSchema.fields}}. But since the required columns do not follow the 
> same order as schema file, there's a schema mismatch between 
> [requiredSchema|https://github.com/apache/hudi/blob/release-0.6.0/hudi-spark/src/main/scala/org/apache/hudi/HoodieBootstrapRelation.scala#L132]
>  and {{requiredColumns}}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to