[ 
https://issues.apache.org/jira/browse/HUDI-5293?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17640455#comment-17640455
 ] 

sivabalan narayanan edited comment on HUDI-5293 at 11/29/22 10:58 PM:
----------------------------------------------------------------------

steps to reproduce
{code:java}
import org.apache.hudi.QuickstartUtils._
import scala.collection.JavaConversions._
import org.apache.spark.sql.SaveMode._
import org.apache.hudi.DataSourceReadOptions._
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.config.HoodieWriteConfig._
import org.apache.hudi.common.model.HoodieRecord{code}
{code:java}
val tableName = "hudi_trips_cow12_1"
val basePath = "file:///tmp/hudi_trips_cow12_1"
val dataGen = new DataGenerator// spark-shell
val inserts = convertToStringList(dataGen.generateInserts(10))
val df = spark.read.json(spark.sparkContext.parallelize(inserts, 2))
df.write.format("hudi").
  options(getQuickstartWriteConfigs).
  option(PRECOMBINE_FIELD_OPT_KEY, "ts").
  option(RECORDKEY_FIELD_OPT_KEY, "uuid").
  option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").
  option("hoodie.schema.on.read.enable","true").
  option(TABLE_NAME, tableName).
  mode(Overwrite).
  save(basePath)val tripsSnapshotDF = spark.
  read.
  format("hudi").
  load(basePath){code}
{code:java}
tripsSnapshotDF.createOrReplaceTempView("hudi_trips_snapshot"){code}
{code:java}
spark.sql("select fare, begin_lon, begin_lat, ts from  hudi_trips_snapshot 
where fare > 20.0").show()// spark-shell

val updates = convertToStringList(dataGen.generateUpdates(10))
val df = spark.read.json(spark.sparkContext.parallelize(updates, 2))
df.drop("begin_lat").write.format("hudi").
  options(getQuickstartWriteConfigs).
  option(PRECOMBINE_FIELD_OPT_KEY, "ts").
  option(RECORDKEY_FIELD_OPT_KEY, "uuid").
  option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").
  option("hoodie.schema.on.read.enable","true").
  option("hoodie.datasource.write.reconcile.schema","true").
  option(TABLE_NAME, tableName).
  mode(Append).
  save(basePath)val tripsSnapshotDF = spark.
  read.
  format("hudi").
  load(basePath)

tripsSnapshotDF.createOrReplaceTempView("hudi_trips_snapshot")
spark.sql("select fare, begin_lon, begin_lat, ts from  hudi_trips_snapshot 
where fare > 20.0").show() {code}


was (Author: shivnarayan):
steps to reproduce
{code:java}
import org.apache.hudi.QuickstartUtils._
import scala.collection.JavaConversions._
import org.apache.spark.sql.SaveMode._
import org.apache.hudi.DataSourceReadOptions._
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.config.HoodieWriteConfig._
import org.apache.hudi.common.model.HoodieRecord{code}
{code:java}
val tableName = "hudi_trips_cow12_1"
val basePath = "file:///tmp/hudi_trips_cow12_1"
val dataGen = new DataGenerator// spark-shell
val inserts = convertToStringList(dataGen.generateInserts(10))
val df = spark.read.json(spark.sparkContext.parallelize(inserts, 2))
df.write.format("hudi").
  options(getQuickstartWriteConfigs).
  option(PRECOMBINE_FIELD_OPT_KEY, "ts").
  option(RECORDKEY_FIELD_OPT_KEY, "uuid").
  option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").
  option("hoodie.schema.on.read.enable","true").
  option(TABLE_NAME, tableName).
  mode(Overwrite).
  save(basePath)val tripsSnapshotDF = spark.
  read.
  format("hudi").
  load(basePath)
tripsSnapshotDF.createOrReplaceTempView("hudi_trips_snapshot")spark.sql("select 
fare, begin_lon, begin_lat, ts from  hudi_trips_snapshot where fare > 
20.0").show()// spark-shell
val updates = convertToStringList(dataGen.generateUpdates(10))
val df = spark.read.json(spark.sparkContext.parallelize(updates, 2))
df.drop("begin_lat").write.format("hudi").
  options(getQuickstartWriteConfigs).
  option(PRECOMBINE_FIELD_OPT_KEY, "ts").
  option(RECORDKEY_FIELD_OPT_KEY, "uuid").
  option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").
  option("hoodie.schema.on.read.enable","true").
  option("hoodie.datasource.write.reconcile.schema","true").
  option(TABLE_NAME, tableName).
  mode(Append).
  save(basePath)val tripsSnapshotDF = spark.
  read.
  format("hudi").
  load(basePath)
tripsSnapshotDF.createOrReplaceTempView("hudi_trips_snapshot")spark.sql("select 
fare, begin_lon, begin_lat, ts from  hudi_trips_snapshot where fare > 
20.0").show() {code}

> Schema on read + reconcile schema fails w/ 0.12.1
> -------------------------------------------------
>
>                 Key: HUDI-5293
>                 URL: https://issues.apache.org/jira/browse/HUDI-5293
>             Project: Apache Hudi
>          Issue Type: Improvement
>          Components: writer-core
>            Reporter: sivabalan narayanan
>            Priority: Major
>              Labels: pull-request-available
>             Fix For: 0.12.2
>
>
> if I do schema on read on commit1 and then schema on read + reconcile schema 
> for 2nd batch, it fails w/ 
> {code:java}
> warning: there was one deprecation warning; re-run with -deprecation for 
> details
> 22/11/28 16:44:26 ERROR BaseSparkCommitActionExecutor: Error upserting 
> bucketType UPDATE for partition :2
> java.lang.IllegalArgumentException: cannot modify hudi meta col: 
> _hoodie_commit_time
>       at 
> org.apache.hudi.internal.schema.action.TableChange$BaseColumnChange.checkColModifyIsLegal(TableChange.java:157)
>       at 
> org.apache.hudi.internal.schema.action.TableChanges$ColumnAddChange.addColumns(TableChanges.java:314)
>       at 
> org.apache.hudi.internal.schema.utils.AvroSchemaEvolutionUtils.lambda$reconcileSchema$5(AvroSchemaEvolutionUtils.java:92)
>       at 
> java.util.TreeMap$EntrySpliterator.forEachRemaining(TreeMap.java:2969)
>       at 
> java.util.stream.ReferencePipeline$Head.forEach(ReferencePipeline.java:580)
>       at 
> org.apache.hudi.internal.schema.utils.AvroSchemaEvolutionUtils.reconcileSchema(AvroSchemaEvolutionUtils.java:80)
>       at 
> org.apache.hudi.table.action.commit.HoodieMergeHelper.runMerge(HoodieMergeHelper.java:103)
>       at 
> org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpdateInternal(BaseSparkCommitActionExecutor.java:358)
>       at 
> org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpdate(BaseSparkCommitActionExecutor.java:349)
>       at 
> org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpsertPartition(BaseSparkCommitActionExecutor.java:322)
>       at 
> org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.lambda$mapPartitionsAsRDD$a3ab3c4$1(BaseSparkCommitActionExecutor.java:244)
>       at 
> org.apache.spark.api.java.JavaRDDLike$$anonfun$mapPartitionsWithIndex$1.apply(JavaRDDLike.scala:102)
>       at 
> org.apache.spark.api.java.JavaRDDLike$$anonfun$mapPartitionsWithIndex$1.apply(JavaRDDLike.scala:102)
>       at 
> org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1$$anonfun$apply$25.apply(RDD.scala:875)
>       at 
> org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1$$anonfun$apply$25.apply(RDD.scala:875)
>       at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
>       at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
>       at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
>       at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
>       at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
>       at org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:359)
>       at org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:357)
>       at 
> org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1182)
>       at 
> org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1156)
>       at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:1091)
>       at 
> org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1156)
>       at 
> org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:882)
>       at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:357)
>       at org.apache.spark.rdd.RDD.iterator(RDD.scala:308)
>       at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
>       at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
>       at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
>       at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
>       at org.apache.spark.scheduler.Task.run(Task.scala:123)
>       at 
> org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
>       at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
>       at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
>       at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>       at java.lang.Thread.run(Thread.java:748) {code}
>  
>  
>  
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to