[ https://issues.apache.org/jira/browse/HUDI-5293?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17640455#comment-17640455 ]
sivabalan narayanan edited comment on HUDI-5293 at 11/29/22 10:58 PM: ---------------------------------------------------------------------- steps to reproduce {code:java} import org.apache.hudi.QuickstartUtils._ import scala.collection.JavaConversions._ import org.apache.spark.sql.SaveMode._ import org.apache.hudi.DataSourceReadOptions._ import org.apache.hudi.DataSourceWriteOptions._ import org.apache.hudi.config.HoodieWriteConfig._ import org.apache.hudi.common.model.HoodieRecord{code} {code:java} val tableName = "hudi_trips_cow12_1" val basePath = "file:///tmp/hudi_trips_cow12_1" val dataGen = new DataGenerator// spark-shell val inserts = convertToStringList(dataGen.generateInserts(10)) val df = spark.read.json(spark.sparkContext.parallelize(inserts, 2)) df.write.format("hudi"). options(getQuickstartWriteConfigs). option(PRECOMBINE_FIELD_OPT_KEY, "ts"). option(RECORDKEY_FIELD_OPT_KEY, "uuid"). option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath"). option("hoodie.schema.on.read.enable","true"). option(TABLE_NAME, tableName). mode(Overwrite). save(basePath)val tripsSnapshotDF = spark. read. format("hudi"). load(basePath){code} {code:java} tripsSnapshotDF.createOrReplaceTempView("hudi_trips_snapshot"){code} {code:java} spark.sql("select fare, begin_lon, begin_lat, ts from hudi_trips_snapshot where fare > 20.0").show()// spark-shell val updates = convertToStringList(dataGen.generateUpdates(10)) val df = spark.read.json(spark.sparkContext.parallelize(updates, 2)) df.drop("begin_lat").write.format("hudi"). options(getQuickstartWriteConfigs). option(PRECOMBINE_FIELD_OPT_KEY, "ts"). option(RECORDKEY_FIELD_OPT_KEY, "uuid"). option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath"). option("hoodie.schema.on.read.enable","true"). option("hoodie.datasource.write.reconcile.schema","true"). option(TABLE_NAME, tableName). mode(Append). save(basePath)val tripsSnapshotDF = spark. read. format("hudi"). load(basePath) tripsSnapshotDF.createOrReplaceTempView("hudi_trips_snapshot") spark.sql("select fare, begin_lon, begin_lat, ts from hudi_trips_snapshot where fare > 20.0").show() {code} was (Author: shivnarayan): steps to reproduce {code:java} import org.apache.hudi.QuickstartUtils._ import scala.collection.JavaConversions._ import org.apache.spark.sql.SaveMode._ import org.apache.hudi.DataSourceReadOptions._ import org.apache.hudi.DataSourceWriteOptions._ import org.apache.hudi.config.HoodieWriteConfig._ import org.apache.hudi.common.model.HoodieRecord{code} {code:java} val tableName = "hudi_trips_cow12_1" val basePath = "file:///tmp/hudi_trips_cow12_1" val dataGen = new DataGenerator// spark-shell val inserts = convertToStringList(dataGen.generateInserts(10)) val df = spark.read.json(spark.sparkContext.parallelize(inserts, 2)) df.write.format("hudi"). options(getQuickstartWriteConfigs). option(PRECOMBINE_FIELD_OPT_KEY, "ts"). option(RECORDKEY_FIELD_OPT_KEY, "uuid"). option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath"). option("hoodie.schema.on.read.enable","true"). option(TABLE_NAME, tableName). mode(Overwrite). save(basePath)val tripsSnapshotDF = spark. read. format("hudi"). load(basePath) tripsSnapshotDF.createOrReplaceTempView("hudi_trips_snapshot")spark.sql("select fare, begin_lon, begin_lat, ts from hudi_trips_snapshot where fare > 20.0").show()// spark-shell val updates = convertToStringList(dataGen.generateUpdates(10)) val df = spark.read.json(spark.sparkContext.parallelize(updates, 2)) df.drop("begin_lat").write.format("hudi"). options(getQuickstartWriteConfigs). option(PRECOMBINE_FIELD_OPT_KEY, "ts"). option(RECORDKEY_FIELD_OPT_KEY, "uuid"). option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath"). option("hoodie.schema.on.read.enable","true"). option("hoodie.datasource.write.reconcile.schema","true"). option(TABLE_NAME, tableName). mode(Append). save(basePath)val tripsSnapshotDF = spark. read. format("hudi"). load(basePath) tripsSnapshotDF.createOrReplaceTempView("hudi_trips_snapshot")spark.sql("select fare, begin_lon, begin_lat, ts from hudi_trips_snapshot where fare > 20.0").show() {code} > Schema on read + reconcile schema fails w/ 0.12.1 > ------------------------------------------------- > > Key: HUDI-5293 > URL: https://issues.apache.org/jira/browse/HUDI-5293 > Project: Apache Hudi > Issue Type: Improvement > Components: writer-core > Reporter: sivabalan narayanan > Priority: Major > Labels: pull-request-available > Fix For: 0.12.2 > > > if I do schema on read on commit1 and then schema on read + reconcile schema > for 2nd batch, it fails w/ > {code:java} > warning: there was one deprecation warning; re-run with -deprecation for > details > 22/11/28 16:44:26 ERROR BaseSparkCommitActionExecutor: Error upserting > bucketType UPDATE for partition :2 > java.lang.IllegalArgumentException: cannot modify hudi meta col: > _hoodie_commit_time > at > org.apache.hudi.internal.schema.action.TableChange$BaseColumnChange.checkColModifyIsLegal(TableChange.java:157) > at > org.apache.hudi.internal.schema.action.TableChanges$ColumnAddChange.addColumns(TableChanges.java:314) > at > org.apache.hudi.internal.schema.utils.AvroSchemaEvolutionUtils.lambda$reconcileSchema$5(AvroSchemaEvolutionUtils.java:92) > at > java.util.TreeMap$EntrySpliterator.forEachRemaining(TreeMap.java:2969) > at > java.util.stream.ReferencePipeline$Head.forEach(ReferencePipeline.java:580) > at > org.apache.hudi.internal.schema.utils.AvroSchemaEvolutionUtils.reconcileSchema(AvroSchemaEvolutionUtils.java:80) > at > org.apache.hudi.table.action.commit.HoodieMergeHelper.runMerge(HoodieMergeHelper.java:103) > at > org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpdateInternal(BaseSparkCommitActionExecutor.java:358) > at > org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpdate(BaseSparkCommitActionExecutor.java:349) > at > org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpsertPartition(BaseSparkCommitActionExecutor.java:322) > at > org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.lambda$mapPartitionsAsRDD$a3ab3c4$1(BaseSparkCommitActionExecutor.java:244) > at > org.apache.spark.api.java.JavaRDDLike$$anonfun$mapPartitionsWithIndex$1.apply(JavaRDDLike.scala:102) > at > org.apache.spark.api.java.JavaRDDLike$$anonfun$mapPartitionsWithIndex$1.apply(JavaRDDLike.scala:102) > at > org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1$$anonfun$apply$25.apply(RDD.scala:875) > at > org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1$$anonfun$apply$25.apply(RDD.scala:875) > at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:310) > at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346) > at org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:359) > at org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:357) > at > org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1182) > at > org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1156) > at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:1091) > at > org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1156) > at > org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:882) > at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:357) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:308) > at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:310) > at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) > at org.apache.spark.scheduler.Task.run(Task.scala:123) > at > org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408) > at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360) > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) {code} > > > > > > -- This message was sent by Atlassian Jira (v8.20.10#820010)