[ 
https://issues.apache.org/jira/browse/HUDI-1021?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Udit Mehrotra updated HUDI-1021:
--------------------------------
    Description: 
Reproduction Steps:

 
{code:java}
import spark.implicits._
import org.apache.hudi.DataSourceWriteOptions
import org.apache.hudi.DataSourceReadOptions
import org.apache.hudi.config.HoodieWriteConfig
import org.apache.hudi.HoodieDataSourceHelpers
import org.apache.hudi.common.model.HoodieTableType
import org.apache.spark.sql.SaveMode


val sourcePath = 
"s3://uditme-iad/hudi/tables/events/events_data_partitioned_non_null"
val sourceDf = spark.read.parquet(sourcePath + "/*")
var tableName = "events_data_partitioned_non_null_00"
var tablePath = "s3://emr-users/uditme/hudi/tables/events/" + tableName

sourceDf.write.format("org.apache.hudi")
 .option(HoodieWriteConfig.TABLE_NAME, tableName)
 .option(DataSourceWriteOptions.OPERATION_OPT_KEY, 
DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL)
 .option(DataSourceWriteOptions.STORAGE_TYPE_OPT_KEY, 
DataSourceWriteOptions.COW_STORAGE_TYPE_OPT_VAL)
 .option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "event_id")
 .option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, "event_type") 
 .option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, "event_ts")
 .mode(SaveMode.Overwrite)
 .save(tablePath)

val readDf = spark.read.format("org.apache.hudi").load(tablePath + "/*")

val updateDf = readDf.filter($"event_id" === "106")
                 .withColumn("event_name", lit("udit_event_106"))
                 
updateDf.write.format("org.apache.hudi")
 .option(HoodieWriteConfig.TABLE_NAME, tableName)
 .option(DataSourceWriteOptions.OPERATION_OPT_KEY, 
DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL)
 .option(DataSourceWriteOptions.STORAGE_TYPE_OPT_KEY, 
DataSourceWriteOptions.COW_STORAGE_TYPE_OPT_VAL)
 .option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "event_id")
 .option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, "event_type") 
 .option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, "event_ts")
 .mode(SaveMode.Append)
 .save(tablePath)

{code}
 

Full Stack trace:
{noformat}
Caused by: org.apache.hudi.exception.HoodieUpsertException: Error upserting 
bucketType UPDATE for partition :0
 at 
org.apache.hudi.table.action.commit.BaseCommitActionExecutor.handleUpsertPartition(BaseCommitActionExecutor.java:276)
 at 
org.apache.hudi.table.action.commit.BaseCommitActionExecutor.lambda$execute$caffe4c4$1(BaseCommitActionExecutor.java:102)
 at 
org.apache.spark.api.java.JavaRDDLike$$anonfun$mapPartitionsWithIndex$1.apply(JavaRDDLike.scala:102)
 at 
org.apache.spark.api.java.JavaRDDLike$$anonfun$mapPartitionsWithIndex$1.apply(JavaRDDLike.scala:102)
 at 
org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1$$anonfun$apply$25.apply(RDD.scala:875)
 at 
org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1$$anonfun$apply$25.apply(RDD.scala:875)
 at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
 at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
 at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
 at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
 at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
 at org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:359)
 at org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:357)
 at 
org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1181)
 at 
org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1155)
 at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:1090)
 at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1155)
 at 
org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:881)
 at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:357)
 at org.apache.spark.rdd.RDD.iterator(RDD.scala:308)
 at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
 at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
 at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
 at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
 at org.apache.spark.scheduler.Task.run(Task.scala:123)
 at 
org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
 at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1405)
 at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
 at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
 at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
 at java.lang.Thread.run(Thread.java:748)
 Caused by: org.apache.hudi.exception.HoodieException: 
org.apache.hudi.exception.HoodieException: 
java.util.concurrent.ExecutionException: java.lang.NullPointerException
 at 
org.apache.hudi.table.action.commit.MergeHelper.runMerge(MergeHelper.java:134)
 at 
org.apache.hudi.table.action.commit.CommitActionExecutor.handleUpdateInternal(CommitActionExecutor.java:90)
 at 
org.apache.hudi.table.action.commit.CommitActionExecutor.handleUpdate(CommitActionExecutor.java:74)
 at 
org.apache.hudi.table.action.commit.BaseCommitActionExecutor.handleUpsertPartition(BaseCommitActionExecutor.java:269)
 ... 30 more
 Caused by: org.apache.hudi.exception.HoodieException: 
java.util.concurrent.ExecutionException: java.lang.NullPointerException
 at 
org.apache.hudi.common.util.queue.BoundedInMemoryExecutor.execute(BoundedInMemoryExecutor.java:143)
 at 
org.apache.hudi.table.action.commit.MergeHelper.runMerge(MergeHelper.java:132)
 ... 33 more
 Caused by: java.util.concurrent.ExecutionException: 
java.lang.NullPointerException
 at java.util.concurrent.FutureTask.report(FutureTask.java:122)
 at java.util.concurrent.FutureTask.get(FutureTask.java:192)
 at 
org.apache.hudi.common.util.queue.BoundedInMemoryExecutor.execute(BoundedInMemoryExecutor.java:141)
 ... 34 more
 Caused by: java.lang.NullPointerException
 at org.apache.hudi.io.HoodieMergeHandle.write(HoodieMergeHandle.java:222)
 at 
org.apache.hudi.table.action.commit.MergeHelper$UpdateHandler.consumeOneRecord(MergeHelper.java:159)
 at 
org.apache.hudi.table.action.commit.MergeHelper$UpdateHandler.consumeOneRecord(MergeHelper.java:149)
 at 
org.apache.hudi.common.util.queue.BoundedInMemoryQueueConsumer.consume(BoundedInMemoryQueueConsumer.java:37)
 at 
org.apache.hudi.common.util.queue.BoundedInMemoryExecutor.lambda$null$2(BoundedInMemoryExecutor.java:121)
 at java.util.concurrent.FutureTask.run(FutureTask.java:266)
 ... 3 more

{noformat}
 

Here as you can see *updateDf* is being formed by reading row from the 
*bootstrapped hudi table* itself. If however, we for the *updateDf* from the 
source data it works fine:

val readDf = spark.read.parquet(sourcePath + "/*")
 val updateDf = readDf.filter($"event_id" === "106").withColumn("event_name", 
lit("udit_event_106"))

 

 

  was:
Reproduction Steps:

 
{code:java}
import spark.implicits._
import org.apache.hudi.DataSourceWriteOptions
import org.apache.hudi.DataSourceReadOptions
import org.apache.hudi.config.HoodieWriteConfig
import org.apache.hudi.HoodieDataSourceHelpers
import org.apache.hudi.common.model.HoodieTableType
import org.apache.spark.sql.SaveMode


val sourcePath = 
"s3://uditme-iad/hudi/tables/events/events_data_partitioned_non_null"
val sourceDf = spark.read.parquet(sourcePath + "/*")
var tableName = "events_data_partitioned_non_null_00"
var tablePath = "s3://emr-users/uditme/hudi/tables/events/" + tableName

sourceDf.write.format("org.apache.hudi")
 .option(HoodieWriteConfig.TABLE_NAME, tableName)
 .option(DataSourceWriteOptions.OPERATION_OPT_KEY, 
DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL)
 .option(DataSourceWriteOptions.STORAGE_TYPE_OPT_KEY, 
DataSourceWriteOptions.COW_STORAGE_TYPE_OPT_VAL)
 .option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "event_id")
 .option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, "event_type") 
 .option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, "event_ts")
 .mode(SaveMode.Overwrite)
 .save(tablePath)

val readDf = spark.read.format("org.apache.hudi").load(tablePath + "/*")

val updateDf = readDf.filter($"event_id" === "106")
                 .withColumn("event_name", lit("udit_event_106"))
                 
updateDf.write.format("org.apache.hudi")
 .option(HoodieWriteConfig.TABLE_NAME, tableName)
 .option(DataSourceWriteOptions.OPERATION_OPT_KEY, 
DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL)
 .option(DataSourceWriteOptions.STORAGE_TYPE_OPT_KEY, 
DataSourceWriteOptions.COW_STORAGE_TYPE_OPT_VAL)
 .option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "event_id")
 .option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, "event_type") 
 .option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, "event_ts")
 .mode(SaveMode.Append)
 .save(tablePath)

{code}
 
{noformat}
 {noformat}
Full Stack trace:
{noformat}
Caused by: org.apache.hudi.exception.HoodieUpsertException: Error upserting 
bucketType UPDATE for partition :0
 at 
org.apache.hudi.table.action.commit.BaseCommitActionExecutor.handleUpsertPartition(BaseCommitActionExecutor.java:276)
 at 
org.apache.hudi.table.action.commit.BaseCommitActionExecutor.lambda$execute$caffe4c4$1(BaseCommitActionExecutor.java:102)
 at 
org.apache.spark.api.java.JavaRDDLike$$anonfun$mapPartitionsWithIndex$1.apply(JavaRDDLike.scala:102)
 at 
org.apache.spark.api.java.JavaRDDLike$$anonfun$mapPartitionsWithIndex$1.apply(JavaRDDLike.scala:102)
 at 
org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1$$anonfun$apply$25.apply(RDD.scala:875)
 at 
org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1$$anonfun$apply$25.apply(RDD.scala:875)
 at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
 at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
 at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
 at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
 at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
 at org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:359)
 at org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:357)
 at 
org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1181)
 at 
org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1155)
 at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:1090)
 at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1155)
 at 
org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:881)
 at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:357)
 at org.apache.spark.rdd.RDD.iterator(RDD.scala:308)
 at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
 at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
 at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
 at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
 at org.apache.spark.scheduler.Task.run(Task.scala:123)
 at 
org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
 at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1405)
 at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
 at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
 at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
 at java.lang.Thread.run(Thread.java:748)
 Caused by: org.apache.hudi.exception.HoodieException: 
org.apache.hudi.exception.HoodieException: 
java.util.concurrent.ExecutionException: java.lang.NullPointerException
 at 
org.apache.hudi.table.action.commit.MergeHelper.runMerge(MergeHelper.java:134)
 at 
org.apache.hudi.table.action.commit.CommitActionExecutor.handleUpdateInternal(CommitActionExecutor.java:90)
 at 
org.apache.hudi.table.action.commit.CommitActionExecutor.handleUpdate(CommitActionExecutor.java:74)
 at 
org.apache.hudi.table.action.commit.BaseCommitActionExecutor.handleUpsertPartition(BaseCommitActionExecutor.java:269)
 ... 30 more
 Caused by: org.apache.hudi.exception.HoodieException: 
java.util.concurrent.ExecutionException: java.lang.NullPointerException
 at 
org.apache.hudi.common.util.queue.BoundedInMemoryExecutor.execute(BoundedInMemoryExecutor.java:143)
 at 
org.apache.hudi.table.action.commit.MergeHelper.runMerge(MergeHelper.java:132)
 ... 33 more
 Caused by: java.util.concurrent.ExecutionException: 
java.lang.NullPointerException
 at java.util.concurrent.FutureTask.report(FutureTask.java:122)
 at java.util.concurrent.FutureTask.get(FutureTask.java:192)
 at 
org.apache.hudi.common.util.queue.BoundedInMemoryExecutor.execute(BoundedInMemoryExecutor.java:141)
 ... 34 more
 Caused by: java.lang.NullPointerException
 at org.apache.hudi.io.HoodieMergeHandle.write(HoodieMergeHandle.java:222)
 at 
org.apache.hudi.table.action.commit.MergeHelper$UpdateHandler.consumeOneRecord(MergeHelper.java:159)
 at 
org.apache.hudi.table.action.commit.MergeHelper$UpdateHandler.consumeOneRecord(MergeHelper.java:149)
 at 
org.apache.hudi.common.util.queue.BoundedInMemoryQueueConsumer.consume(BoundedInMemoryQueueConsumer.java:37)
 at 
org.apache.hudi.common.util.queue.BoundedInMemoryExecutor.lambda$null$2(BoundedInMemoryExecutor.java:121)
 at java.util.concurrent.FutureTask.run(FutureTask.java:266)
 ... 3 more

{noformat}
 

Here as you can see *updateDf* is being formed by reading row from the 
*bootstrapped hudi table* itself. If however, we for the *updateDf* from the 
source data it works fine:

val readDf = spark.read.parquet(sourcePath + "/*")
val updateDf = readDf.filter($"event_id" === "106")
 .withColumn("event_name", lit("udit_event_106"))

 

 


> [Bug] Unable to update bootstrapped table using rows from the written 
> bootstrapped table
> ----------------------------------------------------------------------------------------
>
>                 Key: HUDI-1021
>                 URL: https://issues.apache.org/jira/browse/HUDI-1021
>             Project: Apache Hudi
>          Issue Type: Sub-task
>          Components: bootstrap
>            Reporter: Udit Mehrotra
>            Priority: Major
>
> Reproduction Steps:
>  
> {code:java}
> import spark.implicits._
> import org.apache.hudi.DataSourceWriteOptions
> import org.apache.hudi.DataSourceReadOptions
> import org.apache.hudi.config.HoodieWriteConfig
> import org.apache.hudi.HoodieDataSourceHelpers
> import org.apache.hudi.common.model.HoodieTableType
> import org.apache.spark.sql.SaveMode
> val sourcePath = 
> "s3://uditme-iad/hudi/tables/events/events_data_partitioned_non_null"
> val sourceDf = spark.read.parquet(sourcePath + "/*")
> var tableName = "events_data_partitioned_non_null_00"
> var tablePath = "s3://emr-users/uditme/hudi/tables/events/" + tableName
> sourceDf.write.format("org.apache.hudi")
>  .option(HoodieWriteConfig.TABLE_NAME, tableName)
>  .option(DataSourceWriteOptions.OPERATION_OPT_KEY, 
> DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL)
>  .option(DataSourceWriteOptions.STORAGE_TYPE_OPT_KEY, 
> DataSourceWriteOptions.COW_STORAGE_TYPE_OPT_VAL)
>  .option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "event_id")
>  .option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, "event_type") 
>  .option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, "event_ts")
>  .mode(SaveMode.Overwrite)
>  .save(tablePath)
> val readDf = spark.read.format("org.apache.hudi").load(tablePath + "/*")
> val updateDf = readDf.filter($"event_id" === "106")
>                  .withColumn("event_name", lit("udit_event_106"))
>                  
> updateDf.write.format("org.apache.hudi")
>  .option(HoodieWriteConfig.TABLE_NAME, tableName)
>  .option(DataSourceWriteOptions.OPERATION_OPT_KEY, 
> DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL)
>  .option(DataSourceWriteOptions.STORAGE_TYPE_OPT_KEY, 
> DataSourceWriteOptions.COW_STORAGE_TYPE_OPT_VAL)
>  .option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "event_id")
>  .option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, "event_type") 
>  .option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, "event_ts")
>  .mode(SaveMode.Append)
>  .save(tablePath)
> {code}
>  
> Full Stack trace:
> {noformat}
> Caused by: org.apache.hudi.exception.HoodieUpsertException: Error upserting 
> bucketType UPDATE for partition :0
>  at 
> org.apache.hudi.table.action.commit.BaseCommitActionExecutor.handleUpsertPartition(BaseCommitActionExecutor.java:276)
>  at 
> org.apache.hudi.table.action.commit.BaseCommitActionExecutor.lambda$execute$caffe4c4$1(BaseCommitActionExecutor.java:102)
>  at 
> org.apache.spark.api.java.JavaRDDLike$$anonfun$mapPartitionsWithIndex$1.apply(JavaRDDLike.scala:102)
>  at 
> org.apache.spark.api.java.JavaRDDLike$$anonfun$mapPartitionsWithIndex$1.apply(JavaRDDLike.scala:102)
>  at 
> org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1$$anonfun$apply$25.apply(RDD.scala:875)
>  at 
> org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1$$anonfun$apply$25.apply(RDD.scala:875)
>  at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
>  at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
>  at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
>  at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
>  at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
>  at org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:359)
>  at org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:357)
>  at 
> org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1181)
>  at 
> org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1155)
>  at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:1090)
>  at 
> org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1155)
>  at 
> org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:881)
>  at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:357)
>  at org.apache.spark.rdd.RDD.iterator(RDD.scala:308)
>  at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
>  at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
>  at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
>  at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
>  at org.apache.spark.scheduler.Task.run(Task.scala:123)
>  at 
> org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
>  at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1405)
>  at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
>  at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>  at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>  at java.lang.Thread.run(Thread.java:748)
>  Caused by: org.apache.hudi.exception.HoodieException: 
> org.apache.hudi.exception.HoodieException: 
> java.util.concurrent.ExecutionException: java.lang.NullPointerException
>  at 
> org.apache.hudi.table.action.commit.MergeHelper.runMerge(MergeHelper.java:134)
>  at 
> org.apache.hudi.table.action.commit.CommitActionExecutor.handleUpdateInternal(CommitActionExecutor.java:90)
>  at 
> org.apache.hudi.table.action.commit.CommitActionExecutor.handleUpdate(CommitActionExecutor.java:74)
>  at 
> org.apache.hudi.table.action.commit.BaseCommitActionExecutor.handleUpsertPartition(BaseCommitActionExecutor.java:269)
>  ... 30 more
>  Caused by: org.apache.hudi.exception.HoodieException: 
> java.util.concurrent.ExecutionException: java.lang.NullPointerException
>  at 
> org.apache.hudi.common.util.queue.BoundedInMemoryExecutor.execute(BoundedInMemoryExecutor.java:143)
>  at 
> org.apache.hudi.table.action.commit.MergeHelper.runMerge(MergeHelper.java:132)
>  ... 33 more
>  Caused by: java.util.concurrent.ExecutionException: 
> java.lang.NullPointerException
>  at java.util.concurrent.FutureTask.report(FutureTask.java:122)
>  at java.util.concurrent.FutureTask.get(FutureTask.java:192)
>  at 
> org.apache.hudi.common.util.queue.BoundedInMemoryExecutor.execute(BoundedInMemoryExecutor.java:141)
>  ... 34 more
>  Caused by: java.lang.NullPointerException
>  at org.apache.hudi.io.HoodieMergeHandle.write(HoodieMergeHandle.java:222)
>  at 
> org.apache.hudi.table.action.commit.MergeHelper$UpdateHandler.consumeOneRecord(MergeHelper.java:159)
>  at 
> org.apache.hudi.table.action.commit.MergeHelper$UpdateHandler.consumeOneRecord(MergeHelper.java:149)
>  at 
> org.apache.hudi.common.util.queue.BoundedInMemoryQueueConsumer.consume(BoundedInMemoryQueueConsumer.java:37)
>  at 
> org.apache.hudi.common.util.queue.BoundedInMemoryExecutor.lambda$null$2(BoundedInMemoryExecutor.java:121)
>  at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>  ... 3 more
> {noformat}
>  
> Here as you can see *updateDf* is being formed by reading row from the 
> *bootstrapped hudi table* itself. If however, we for the *updateDf* from the 
> source data it works fine:
> val readDf = spark.read.parquet(sourcePath + "/*")
>  val updateDf = readDf.filter($"event_id" === "106").withColumn("event_name", 
> lit("udit_event_106"))
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to