imonteroq opened a new issue, #10533: URL: https://github.com/apache/hudi/issues/10533
**Describe the problem you faced** Streaming in Spark from a Hudi table fails with the error below when a `writeStream` process has created / written to the table with the schema evolution settings `hoodie.schema.on.read.enable` & `hoodie.datasource.write.reconcile.schema` on. I have not been able to upsert a source schema containing either more columns and/or fewer columns than the target schema without this two settings enabled. `org.apache.spark.sql.AnalysisException: [COLUMN_ALREADY_EXISTS] The column _hoodie_commit_seqno already exists. Consider to choose another name or rename the existing column.` **To Reproduce** Steps to reproduce the behavior: ``` val sparkConf: SparkConf = new SparkConf() .setMaster("local[*]") .setAppName("testAppName") .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") .set("spark.sql.extensions", "org.apache.spark.sql.hudi.HoodieSparkSessionExtension") .set("spark.kryo.registrator", "org.apache.spark.HoodieSparkKryoRegistrar") .set("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.hudi.catalog.HoodieCatalog") val spark = SparkSession.builder.config(sparkConf).getOrCreate() import spark.implicits._ implicit val sqlContext: SQLContext = spark.sqlContext val hudiOptions: Map[String, String] = Map( "hoodie.table.name" -> "test_table", "hoodie.datasource.write.storage.type" -> "COPY_ON_WRITE", "hoodie.datasource.write.operation" -> "upsert", "hoodie.datasource.write.hive_style_partitioning" -> "true", "hoodie.datasource.write.recordkey.field" -> "identifier", "hoodie.datasource.write.precombine.field" -> "date", "hoodie.datasource.insert.dup.policy" -> "none", "hoodie.avro.schema.externalTransformation" -> "true", "hoodie.schema.on.read.enable" -> "true", "hoodie.datasource.write.reconcile.schema" -> "true") val inMemoryRecords: List[Contract] = List(Contract("001", 1, "test1", 100), Contract("002", 2, "test2", 100), Contract("003", 3, "test3", 100)) val contractsInMemory: MemoryStream[Contract] = MemoryStream[Contract] contractsInMemory.addData(inMemoryRecords) contractsInMemory .toDF() .writeStream .format("hudi") .trigger(Trigger.AvailableNow()) .queryName("streamingQueryName") .option("checkpointLocation", "/tmp/checkpoint") .options(hudiOptions) .outputMode(OutputMode.Append()) .start("/tmp/data") .processAllAvailable() spark.readStream .format("hudi") .load("/tmp/data") .writeStream .format("memory") .queryName("queryName") .outputMode("append") .start() .processAllAvailable() ``` **Environment Description** * OS: Mac OS X * Hudi version: 0.14.0 * Spark version: 3.4.1 * Storage: S3 (LocalStack) * Running on Docker?: No **Additional context** This works fine with either `hoodie.schema.on.read.enable` or `hoodie.datasource.write.reconcile.schema` disabled. **Stacktrace** ``` org.apache.spark.sql.AnalysisException: [COLUMN_ALREADY_EXISTS] The column `_hoodie_commit_seqno` already exists. Consider to choose another name or rename the existing column. at org.apache.spark.sql.errors.QueryCompilationErrors$.columnAlreadyExistsError(QueryCompilationErrors.scala:2300) at org.apache.spark.sql.util.SchemaUtils$.checkColumnNameDuplication(SchemaUtils.scala:113) at org.apache.spark.sql.util.SchemaUtils$.checkSchemaColumnNameDuplication(SchemaUtils.scala:55) at org.apache.spark.sql.util.SchemaUtils$.checkSchemaColumnNameDuplication(SchemaUtils.scala:71) at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:427) at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:229) at org.apache.spark.sql.DataFrameReader.$anonfun$load$2(DataFrameReader.scala:211) at scala.Option.getOrElse(Option.scala:189) at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:211) at org.apache.hudi.IncrementalRelation.buildScan(IncrementalRelation.scala:275) at org.apache.spark.sql.hudi.streaming.HoodieStreamSource.getBatch(HoodieStreamSource.scala:171) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$3(MicroBatchExecution.scala:586) at scala.collection.TraversableLike.$anonfun$flatMap$1(TraversableLike.scala:293) at scala.collection.Iterator.foreach(Iterator.scala:943) at scala.collection.Iterator.foreach$(Iterator.scala:943) at scala.collection.AbstractIterator.foreach(Iterator.scala:1431) at scala.collection.IterableLike.foreach(IterableLike.scala:74) at scala.collection.IterableLike.foreach$(IterableLike.scala:73) at org.apache.spark.sql.execution.streaming.StreamProgress.foreach(StreamProgress.scala:27) at scala.collection.TraversableLike.flatMap(TraversableLike.scala:293) at scala.collection.TraversableLike.flatMap$(TraversableLike.scala:290) at org.apache.spark.sql.execution.streaming.StreamProgress.flatMap(StreamProgress.scala:27) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$2(MicroBatchExecution.scala:582) at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:411) at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:409) at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:67) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runBatch(MicroBatchExecution.scala:582) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$2(MicroBatchExecution.scala:284) at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:411) at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:409) at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:67) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$1(MicroBatchExecution.scala:247) at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:67) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:237) at org.apache.spark.sql.execution.streaming.StreamExecution.$anonfun$runStream$1(StreamExecution.scala:306) at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:827) at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:284) at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:207) [stream execution thread for test_hudi [id = 443bb452-13e7-4d43-916d-bac5fd5d1f2c, runId = dbf43a65-825f-45ca-bae0-d7109ee9d066]] INFO org.apache.spark.sql.execution.streaming.MicroBatchExecution - Async log purge executor pool for query test_hudi [id = 443bb452-13e7-4d43-916d-bac5fd5d1f2c, runId = dbf43a65-825f-45ca-bae0-d7109ee9d066] has been shutdown [ScalaTest-run-running-HudiSourceIT] INFO org.apache.spark.SparkContext - SparkContext is stopping with exitCode 0. [dispatcher-event-loop-4] INFO org.apache.spark.MapOutputTrackerMasterEndpoint - MapOutputTrackerMasterEndpoint stopped! [ScalaTest-run-running-HudiSourceIT] INFO org.apache.spark.storage.memory.MemoryStore - MemoryStore cleared [ScalaTest-run-running-HudiSourceIT] INFO org.apache.spark.storage.BlockManager - BlockManager stopped [ScalaTest-run-running-HudiSourceIT] INFO org.apache.spark.storage.BlockManagerMaster - BlockManagerMaster stopped [dispatcher-event-loop-2] INFO org.apache.spark.scheduler.OutputCommitCoordinator$OutputCommitCoordinatorEndpoint - OutputCommitCoordinator stopped! [ScalaTest-run-running-HudiSourceIT] INFO org.apache.spark.SparkContext - Successfully stopped SparkContext [STREAM_FAILED] Query [id = 443bb452-13e7-4d43-916d-bac5fd5d1f2c, runId = dbf43a65-825f-45ca-bae0-d7109ee9d066] terminated with exception: [COLUMN_ALREADY_EXISTS] The column `_hoodie_commit_seqno` already exists. Consider to choose another name or rename the existing column. org.apache.spark.sql.streaming.StreamingQueryException: [COLUMN_ALREADY_EXISTS] The column `_hoodie_commit_seqno` already exists. Consider to choose another name or rename the existing column. === Streaming Query === Identifier: test_hudi [id = 443bb452-13e7-4d43-916d-bac5fd5d1f2c, runId = dbf43a65-825f-45ca-bae0-d7109ee9d066] Current Committed Offsets: {} Current Available Offsets: {org.apache.spark.sql.hudi.streaming.HoodieStreamSource@1ac520b5: {"commitTime":"20240118170827012"}} Current State: ACTIVE Thread State: RUNNABLE Logical Plan: WriteToMicroBatchDataSource MemorySink, 443bb452-13e7-4d43-916d-bac5fd5d1f2c, [queryName=test_hudi], Append +- StreamingExecutionRelation org.apache.spark.sql.hudi.streaming.HoodieStreamSource@1ac520b5, [_hoodie_commit_time#142, _hoodie_commit_seqno#143, _hoodie_record_key#144, _hoodie_partition_path#145, _hoodie_file_name#146, identifier#147, name#148, quantity#149, status#150, agent#151, metadata#152, contacts#153, date#154] at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:333) at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:207) Caused by: org.apache.spark.sql.AnalysisException: [COLUMN_ALREADY_EXISTS] The column `_hoodie_commit_seqno` already exists. Consider to choose another name or rename the existing column. at org.apache.spark.sql.errors.QueryCompilationErrors$.columnAlreadyExistsError(QueryCompilationErrors.scala:2300) at org.apache.spark.sql.util.SchemaUtils$.checkColumnNameDuplication(SchemaUtils.scala:113) at org.apache.spark.sql.util.SchemaUtils$.checkSchemaColumnNameDuplication(SchemaUtils.scala:55) at org.apache.spark.sql.util.SchemaUtils$.checkSchemaColumnNameDuplication(SchemaUtils.scala:71) at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:427) at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:229) at org.apache.spark.sql.DataFrameReader.$anonfun$load$2(DataFrameReader.scala:211) at scala.Option.getOrElse(Option.scala:189) at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:211) at org.apache.hudi.IncrementalRelation.buildScan(IncrementalRelation.scala:275) at org.apache.spark.sql.hudi.streaming.HoodieStreamSource.getBatch(HoodieStreamSource.scala:171) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$3(MicroBatchExecution.scala:586) at scala.collection.TraversableLike.$anonfun$flatMap$1(TraversableLike.scala:293) at scala.collection.Iterator.foreach(Iterator.scala:943) at scala.collection.Iterator.foreach$(Iterator.scala:943) at scala.collection.AbstractIterator.foreach(Iterator.scala:1431) at scala.collection.IterableLike.foreach(IterableLike.scala:74) at scala.collection.IterableLike.foreach$(IterableLike.scala:73) at org.apache.spark.sql.execution.streaming.StreamProgress.foreach(StreamProgress.scala:27) at scala.collection.TraversableLike.flatMap(TraversableLike.scala:293) at scala.collection.TraversableLike.flatMap$(TraversableLike.scala:290) at org.apache.spark.sql.execution.streaming.StreamProgress.flatMap(StreamProgress.scala:27) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$2(MicroBatchExecution.scala:582) at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:411) at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:409) at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:67) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runBatch(MicroBatchExecution.scala:582) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$2(MicroBatchExecution.scala:284) at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:411) at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:409) at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:67) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$1(MicroBatchExecution.scala:247) at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:67) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:237) at org.apache.spark.sql.execution.streaming.StreamExecution.$anonfun$runStream$1(StreamExecution.scala:306) at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:827) at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:284) ... 1 more ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org