imonteroq opened a new issue, #10533:
URL: https://github.com/apache/hudi/issues/10533

   **Describe the problem you faced**
   Streaming in Spark from a Hudi table fails with the error below when a 
`writeStream` process has created / written to the table with the schema 
evolution settings `hoodie.schema.on.read.enable` & 
`hoodie.datasource.write.reconcile.schema` on. I have not been able to upsert a 
source schema containing either more columns and/or fewer columns than the 
target schema without this two settings enabled.
   
   `org.apache.spark.sql.AnalysisException: [COLUMN_ALREADY_EXISTS] The column 
_hoodie_commit_seqno already exists. Consider to choose another name or rename 
the existing column.`
   
   **To Reproduce**
   
   Steps to reproduce the behavior:
   ```
   val sparkConf: SparkConf = new SparkConf()
     .setMaster("local[*]")
     .setAppName("testAppName")
     .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
     .set("spark.sql.extensions", 
"org.apache.spark.sql.hudi.HoodieSparkSessionExtension")
     .set("spark.kryo.registrator", "org.apache.spark.HoodieSparkKryoRegistrar")
     .set("spark.sql.catalog.spark_catalog", 
"org.apache.spark.sql.hudi.catalog.HoodieCatalog")
   val spark = SparkSession.builder.config(sparkConf).getOrCreate()
   
   import spark.implicits._
   implicit val sqlContext: SQLContext = spark.sqlContext
   
   val hudiOptions: Map[String, String] = Map(
     "hoodie.table.name" -> "test_table",
     "hoodie.datasource.write.storage.type" -> "COPY_ON_WRITE",
     "hoodie.datasource.write.operation" -> "upsert",
     "hoodie.datasource.write.hive_style_partitioning" -> "true",
     "hoodie.datasource.write.recordkey.field" -> "identifier",
     "hoodie.datasource.write.precombine.field" -> "date",
     "hoodie.datasource.insert.dup.policy" -> "none",
     "hoodie.avro.schema.externalTransformation" -> "true",
     "hoodie.schema.on.read.enable" -> "true",
     "hoodie.datasource.write.reconcile.schema" -> "true")
   
   val inMemoryRecords: List[Contract] =
     List(Contract("001", 1, "test1", 100), Contract("002", 2, "test2", 100), 
Contract("003", 3, "test3", 100))
   
   val contractsInMemory: MemoryStream[Contract] = MemoryStream[Contract]
   contractsInMemory.addData(inMemoryRecords)
   
   contractsInMemory
     .toDF()
     .writeStream
     .format("hudi")
     .trigger(Trigger.AvailableNow())
     .queryName("streamingQueryName")
     .option("checkpointLocation", "/tmp/checkpoint")
     .options(hudiOptions)
     .outputMode(OutputMode.Append())
     .start("/tmp/data")
     .processAllAvailable()
   
   spark.readStream
     .format("hudi")
     .load("/tmp/data")
     .writeStream
     .format("memory")
     .queryName("queryName")
     .outputMode("append")
     .start()
     .processAllAvailable()
   ```
   
   **Environment Description**
   
   * OS: Mac OS X
   
   * Hudi version: 0.14.0
   
   * Spark version: 3.4.1
   
   * Storage: S3 (LocalStack)
   
   * Running on Docker?: No
   
   
   **Additional context**
   
   This works fine with either `hoodie.schema.on.read.enable` or 
`hoodie.datasource.write.reconcile.schema` disabled.
   
   **Stacktrace**
   
   ```
   org.apache.spark.sql.AnalysisException: [COLUMN_ALREADY_EXISTS] The column 
`_hoodie_commit_seqno` already exists. Consider to choose another name or 
rename the existing column.
        at 
org.apache.spark.sql.errors.QueryCompilationErrors$.columnAlreadyExistsError(QueryCompilationErrors.scala:2300)
        at 
org.apache.spark.sql.util.SchemaUtils$.checkColumnNameDuplication(SchemaUtils.scala:113)
        at 
org.apache.spark.sql.util.SchemaUtils$.checkSchemaColumnNameDuplication(SchemaUtils.scala:55)
        at 
org.apache.spark.sql.util.SchemaUtils$.checkSchemaColumnNameDuplication(SchemaUtils.scala:71)
        at 
org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:427)
        at 
org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:229)
        at 
org.apache.spark.sql.DataFrameReader.$anonfun$load$2(DataFrameReader.scala:211)
        at scala.Option.getOrElse(Option.scala:189)
        at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:211)
        at 
org.apache.hudi.IncrementalRelation.buildScan(IncrementalRelation.scala:275)
        at 
org.apache.spark.sql.hudi.streaming.HoodieStreamSource.getBatch(HoodieStreamSource.scala:171)
        at 
org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$3(MicroBatchExecution.scala:586)
        at 
scala.collection.TraversableLike.$anonfun$flatMap$1(TraversableLike.scala:293)
        at scala.collection.Iterator.foreach(Iterator.scala:943)
        at scala.collection.Iterator.foreach$(Iterator.scala:943)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
        at scala.collection.IterableLike.foreach(IterableLike.scala:74)
        at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
        at 
org.apache.spark.sql.execution.streaming.StreamProgress.foreach(StreamProgress.scala:27)
        at scala.collection.TraversableLike.flatMap(TraversableLike.scala:293)
        at scala.collection.TraversableLike.flatMap$(TraversableLike.scala:290)
        at 
org.apache.spark.sql.execution.streaming.StreamProgress.flatMap(StreamProgress.scala:27)
        at 
org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$2(MicroBatchExecution.scala:582)
        at 
org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:411)
        at 
org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:409)
        at 
org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:67)
        at 
org.apache.spark.sql.execution.streaming.MicroBatchExecution.runBatch(MicroBatchExecution.scala:582)
        at 
org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$2(MicroBatchExecution.scala:284)
        at 
scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
        at 
org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:411)
        at 
org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:409)
        at 
org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:67)
        at 
org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$1(MicroBatchExecution.scala:247)
        at 
org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:67)
        at 
org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:237)
        at 
org.apache.spark.sql.execution.streaming.StreamExecution.$anonfun$runStream$1(StreamExecution.scala:306)
        at 
scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:827)
        at 
org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:284)
        at 
org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:207)
   [stream execution thread for test_hudi [id = 
443bb452-13e7-4d43-916d-bac5fd5d1f2c, runId = 
dbf43a65-825f-45ca-bae0-d7109ee9d066]] INFO 
org.apache.spark.sql.execution.streaming.MicroBatchExecution - Async log purge 
executor pool for query test_hudi [id = 443bb452-13e7-4d43-916d-bac5fd5d1f2c, 
runId = dbf43a65-825f-45ca-bae0-d7109ee9d066] has been shutdown
   [ScalaTest-run-running-HudiSourceIT] INFO org.apache.spark.SparkContext - 
SparkContext is stopping with exitCode 0.
   [dispatcher-event-loop-4] INFO 
org.apache.spark.MapOutputTrackerMasterEndpoint - 
MapOutputTrackerMasterEndpoint stopped!
   [ScalaTest-run-running-HudiSourceIT] INFO 
org.apache.spark.storage.memory.MemoryStore - MemoryStore cleared
   [ScalaTest-run-running-HudiSourceIT] INFO 
org.apache.spark.storage.BlockManager - BlockManager stopped
   [ScalaTest-run-running-HudiSourceIT] INFO 
org.apache.spark.storage.BlockManagerMaster - BlockManagerMaster stopped
   [dispatcher-event-loop-2] INFO 
org.apache.spark.scheduler.OutputCommitCoordinator$OutputCommitCoordinatorEndpoint
 - OutputCommitCoordinator stopped!
   [ScalaTest-run-running-HudiSourceIT] INFO org.apache.spark.SparkContext - 
Successfully stopped SparkContext
   
   
   [STREAM_FAILED] Query [id = 443bb452-13e7-4d43-916d-bac5fd5d1f2c, runId = 
dbf43a65-825f-45ca-bae0-d7109ee9d066] terminated with exception: 
[COLUMN_ALREADY_EXISTS] The column `_hoodie_commit_seqno` already exists. 
Consider to choose another name or rename the existing column.
   org.apache.spark.sql.streaming.StreamingQueryException: 
[COLUMN_ALREADY_EXISTS] The column `_hoodie_commit_seqno` already exists. 
Consider to choose another name or rename the existing column.
   === Streaming Query ===
   Identifier: test_hudi [id = 443bb452-13e7-4d43-916d-bac5fd5d1f2c, runId = 
dbf43a65-825f-45ca-bae0-d7109ee9d066]
   Current Committed Offsets: {}
   Current Available Offsets: 
{org.apache.spark.sql.hudi.streaming.HoodieStreamSource@1ac520b5: 
{"commitTime":"20240118170827012"}}
   
   Current State: ACTIVE
   Thread State: RUNNABLE
   
   Logical Plan:
   WriteToMicroBatchDataSource MemorySink, 
443bb452-13e7-4d43-916d-bac5fd5d1f2c, [queryName=test_hudi], Append
   +- StreamingExecutionRelation 
org.apache.spark.sql.hudi.streaming.HoodieStreamSource@1ac520b5, 
[_hoodie_commit_time#142, _hoodie_commit_seqno#143, _hoodie_record_key#144, 
_hoodie_partition_path#145, _hoodie_file_name#146, identifier#147, name#148, 
quantity#149, status#150, agent#151, metadata#152, contacts#153, date#154]
   
        at 
org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:333)
        at 
org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:207)
   Caused by: org.apache.spark.sql.AnalysisException: [COLUMN_ALREADY_EXISTS] 
The column `_hoodie_commit_seqno` already exists. Consider to choose another 
name or rename the existing column.
        at 
org.apache.spark.sql.errors.QueryCompilationErrors$.columnAlreadyExistsError(QueryCompilationErrors.scala:2300)
        at 
org.apache.spark.sql.util.SchemaUtils$.checkColumnNameDuplication(SchemaUtils.scala:113)
        at 
org.apache.spark.sql.util.SchemaUtils$.checkSchemaColumnNameDuplication(SchemaUtils.scala:55)
        at 
org.apache.spark.sql.util.SchemaUtils$.checkSchemaColumnNameDuplication(SchemaUtils.scala:71)
        at 
org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:427)
        at 
org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:229)
        at 
org.apache.spark.sql.DataFrameReader.$anonfun$load$2(DataFrameReader.scala:211)
        at scala.Option.getOrElse(Option.scala:189)
        at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:211)
        at 
org.apache.hudi.IncrementalRelation.buildScan(IncrementalRelation.scala:275)
        at 
org.apache.spark.sql.hudi.streaming.HoodieStreamSource.getBatch(HoodieStreamSource.scala:171)
        at 
org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$3(MicroBatchExecution.scala:586)
        at 
scala.collection.TraversableLike.$anonfun$flatMap$1(TraversableLike.scala:293)
        at scala.collection.Iterator.foreach(Iterator.scala:943)
        at scala.collection.Iterator.foreach$(Iterator.scala:943)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
        at scala.collection.IterableLike.foreach(IterableLike.scala:74)
        at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
        at 
org.apache.spark.sql.execution.streaming.StreamProgress.foreach(StreamProgress.scala:27)
        at scala.collection.TraversableLike.flatMap(TraversableLike.scala:293)
        at scala.collection.TraversableLike.flatMap$(TraversableLike.scala:290)
        at 
org.apache.spark.sql.execution.streaming.StreamProgress.flatMap(StreamProgress.scala:27)
        at 
org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$2(MicroBatchExecution.scala:582)
        at 
org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:411)
        at 
org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:409)
        at 
org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:67)
        at 
org.apache.spark.sql.execution.streaming.MicroBatchExecution.runBatch(MicroBatchExecution.scala:582)
        at 
org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$2(MicroBatchExecution.scala:284)
        at 
scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
        at 
org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:411)
        at 
org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:409)
        at 
org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:67)
        at 
org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$1(MicroBatchExecution.scala:247)
        at 
org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:67)
        at 
org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:237)
        at 
org.apache.spark.sql.execution.streaming.StreamExecution.$anonfun$runStream$1(StreamExecution.scala:306)
        at 
scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:827)
        at 
org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:284)
        ... 1 more
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to