kapjoshi-cisco opened a new issue, #8160:
URL: https://github.com/apache/hudi/issues/8160

   **Describe the problem you faced**
   Writing to hudi table post allowed datatype upgrades like long to double 
isnt alllowed and throws backs error. 
   ```
   org.apache.avro.AvroRuntimeException: cannot support rewrite value for 
schema type: "long" since the old schema type is: "double"
   ```
   A clear and concise description of the problem.
   Schema evolution wrt to data types upgrades from int to long or long to 
double is failing. We are using Glue 4, which has native support for hudi 
libraries. Hence we are not using any external dependent hudi jars. Without 
scheam evolution we are able to write and read from hudi table successfully. 
But with schema evolution only addition of column was successfully tested. 
While testing schema evolution wrt to datatype upgrades, its failing. When we 
say data type upgrades, we meant in the data source few column data types are 
upgraded as allowed wrt Avro specification. Say from int to long or long to 
double.  This as per documentation is supported but not working in actual. 
   **To Reproduce**
   Steps to reproduce the behavior:
   1. Create a glue job, with Glue 4
   2. Update hudi script with below configs:
   ```
   common_config = {
           'className': ORG_APACHE_HUDI,
           'hoodie.datasource.hive_sync.use_jdbc': 'false',
           'hoodie.datasource.write.precombine.field': 'Time',
           'hoodie.datasource.write.recordkey.field': 'id',
           'hoodie.table.name': hudi_table,
           'hoodie.datasource.hive_sync.mode': 'hms',
           'hoodie.consistency.check.enabled': 'false',
           'hoodie.schema.on.read.enable': 'true',
           'hoodie.datasource.hive_sync.database': hudidb,
           'hoodie.datasource.write.reconcile.schema': 'true',
           'hoodie.datasource.hive_sync.table': hudi_table,
           # 'hoodie.datasource.hive_sync.table': 
f"{'_'.join(prefix.split('/')[1:])}",
           'hoodie.datasource.hive_sync.enable': 'true',
           'path': 's3://' + args['some_bucket'] + '/demo/full/demo_data',
           'hoodie.parquet.small.file.limit': '134217728',
           'hoodie.datasource.hive_sync.partition_extractor_class': 
'org.apache.hudi.hive.NonPartitionedExtractor',
           'hoodie.datasource.write.keygenerator.class': 
'org.apache.hudi.keygen.NonpartitionedKeyGenerator',
           'hoodie.upsert.shuffle.parallelism': 68,
           "hoodie.datasource.write.operation": 'upsert'
       }
   ```
   This property 'hoodie.datasource.write.reconcile.schema': 'false',  setting 
seems to be the issue as per #7283. 
   But this #7283 issue states its fixed, which is not clear. Since we are 
using Glue 4 we dont know which Hudi version is used in the background.  
   3. Write dataframe with one `long` column say col_1.
   4. Write again the same data with schema updates, such that col_1 datatype 
is updated to `double`
   
   **Expected behavior**
   
   A clear and concise description of what you expected to happen.
   The data type of the column `col_1` should had been upgraded from `long` to 
`double` and this should had been reflecting in the Glue catalog table schema. 
   
   **Environment Description**
   
   * Hudi version : unknown, we are using Glue 4 which natively supports hudi . 
   
   * Spark version : 3.3
   
   * Hive version :
   
   * Hadoop version :
   
   * Storage (HDFS/S3/GCS..) : S3
   
   * Running on Docker? (yes/no) : no
   
   
   **Additional context**
   
   Add any other context about the problem here.
   
   **Stacktrace**
   
   ```Add the stacktrace of the error.```
   ```bash
   2023-03-12 19:26:28,932 ERROR [main] glue.ProcessLauncher 
(Logging.scala:logError(77)): Error from Python:Traceback (most recent call 
last):
     File "/tmp/second-delete-upsert.py", line 175, in <module>
       main()
     File "/tmp/second-delete-upsert.py", line 159, in main
       start_merging()
     File "/tmp/second-delete-upsert.py", line 136, in start_merging
       hudi_merge(input_df, combined_conf)
     File "/tmp/second-delete-upsert.py", line 70, in hudi_merge
       .save()
     File "/opt/amazon/spark/python/lib/pyspark.zip/pyspark/sql/readwriter.py", 
line 966, in save
       self._jwrite.save()
     File 
"/opt/amazon/spark/python/lib/py4j-0.10.9.5-src.zip/py4j/java_gateway.py", line 
1321, in __call__
       return_value = get_return_value(
     File "/opt/amazon/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 
190, in deco
       return f(*a, **kw)
     File 
"/opt/amazon/spark/python/lib/py4j-0.10.9.5-src.zip/py4j/protocol.py", line 
326, in get_return_value
       raise Py4JJavaError(
   py4j.protocol.Py4JJavaError: An error occurred while calling o162.save.
   : org.apache.hudi.exception.HoodieUpsertException: Failed to upsert for 
commit time 20230312192624096
        at 
org.apache.hudi.table.action.commit.BaseWriteHelper.write(BaseWriteHelper.java:64)
        at 
org.apache.hudi.table.action.commit.SparkUpsertCommitActionExecutor.execute(SparkUpsertCommitActionExecutor.java:45)
        at 
org.apache.hudi.table.HoodieSparkCopyOnWriteTable.upsert(HoodieSparkCopyOnWriteTable.java:113)
        at 
org.apache.hudi.table.HoodieSparkCopyOnWriteTable.upsert(HoodieSparkCopyOnWriteTable.java:97)
        at 
org.apache.hudi.client.SparkRDDWriteClient.upsert(SparkRDDWriteClient.java:158)
        at 
org.apache.hudi.DataSourceUtils.doWriteOperation(DataSourceUtils.java:206)
        at 
org.apache.hudi.HoodieSparkSqlWriter$.write(HoodieSparkSqlWriter.scala:331)
        at org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:144)
        at 
org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:45)
        at 
org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:75)
        at 
org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:73)
        at 
org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:84)
        at 
org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:103)
        at 
org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107)
        at 
org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:224)
        at 
org.apache.spark.sql.execution.SQLExecution$.executeQuery$1(SQLExecution.scala:114)
        at 
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$7(SQLExecution.scala:139)
        at 
org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107)
        at 
org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:224)
        at 
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:139)
        at 
org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:245)
        at 
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:138)
        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
        at 
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:68)
        at 
org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:100)
        at 
org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:96)
        at 
org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:615)
        at 
org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:177)
        at 
org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:615)
        at 
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:30)
        at 
org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
        at 
org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
        at 
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
        at 
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
        at 
org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:591)
        at 
org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:96)
        at 
org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:83)
        at 
org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:81)
        at 
org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:124)
        at 
org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:860)
        at 
org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:390)
        at 
org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:363)
        at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:247)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
        at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
        at py4j.Gateway.invoke(Gateway.java:282)
        at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
        at py4j.commands.CallCommand.execute(CallCommand.java:79)
        at 
py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
        at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
        at java.lang.Thread.run(Thread.java:750)
   Caused by: org.apache.spark.SparkException: Job aborted due to stage 
failure: Task 3 in stage 3.0 failed 4 times, most recent failure: Lost task 3.3 
in stage 3.0 (TID 11) (172.34.158.152 executor 1): 
org.apache.avro.AvroRuntimeException: cannot support rewrite value for schema 
type: "long" since the old schema type is: "double"
        at 
org.apache.hudi.avro.HoodieAvroUtils.rewritePrimaryTypeWithDiffSchemaType(HoodieAvroUtils.java:955)
        at 
org.apache.hudi.avro.HoodieAvroUtils.rewritePrimaryType(HoodieAvroUtils.java:869)
        at 
org.apache.hudi.avro.HoodieAvroUtils.rewriteRecordWithNewSchema(HoodieAvroUtils.java:820)
        at 
org.apache.hudi.avro.HoodieAvroUtils.rewriteRecordWithNewSchema(HoodieAvroUtils.java:818)
        at 
org.apache.hudi.avro.HoodieAvroUtils.rewriteRecordWithNewSchema(HoodieAvroUtils.java:772)
        at 
org.apache.hudi.avro.HoodieAvroUtils.rewriteRecordWithNewSchema(HoodieAvroUtils.java:734)
        at 
org.apache.hudi.avro.HoodieAvroUtils.rewriteRecordDeep(HoodieAvroUtils.java:1034)
        at 
org.apache.hudi.HoodieSparkUtils$.$anonfun$createRdd$4(HoodieSparkUtils.scala:109)
        at 
org.apache.hudi.HoodieSparkUtils$.$anonfun$createRdd$5(HoodieSparkUtils.scala:117)
        at scala.collection.Iterator$$anon$10.next(Iterator.scala:455)
        at scala.collection.Iterator$$anon$10.next(Iterator.scala:455)
        at scala.collection.Iterator$$anon$10.next(Iterator.scala:455)
        at 
org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:199)
        at 
org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:65)
        at 
org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
        at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
        at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
        at org.apache.spark.scheduler.Task.run(Task.scala:138)
        at 
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1516)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:750)
   
   Driver stacktrace:
        at 
org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2863)
        at 
org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2799)
        at 
org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2798)
        at 
scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:58)
        at 
scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:51)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
        at 
org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2798)
        at 
org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1239)
        at 
org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1239)
        at scala.Option.foreach(Option.scala:257)
        at 
org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1239)
        at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:3051)
        at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2993)
        at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2982)
        at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
        at 
org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:1009)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:2229)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:2250)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:2269)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:2294)
        at org.apache.spark.rdd.RDD.$anonfun$collect$1(RDD.scala:1021)
        at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
        at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
        at org.apache.spark.rdd.RDD.withScope(RDD.scala:406)
        at org.apache.spark.rdd.RDD.collect(RDD.scala:1020)
        at org.apache.spark.api.java.JavaRDDLike.collect(JavaRDDLike.scala:362)
        at org.apache.spark.api.java.JavaRDDLike.collect$(JavaRDDLike.scala:361)
        at 
org.apache.spark.api.java.AbstractJavaRDDLike.collect(JavaRDDLike.scala:45)
        at 
org.apache.hudi.data.HoodieJavaRDD.collectAsList(HoodieJavaRDD.java:155)
        at 
org.apache.hudi.index.simple.HoodieSimpleIndex.fetchRecordLocationsForAffectedPartitions(HoodieSimpleIndex.java:142)
        at 
org.apache.hudi.index.simple.HoodieSimpleIndex.tagLocationInternal(HoodieSimpleIndex.java:113)
        at 
org.apache.hudi.index.simple.HoodieSimpleIndex.tagLocation(HoodieSimpleIndex.java:91)
        at 
org.apache.hudi.table.action.commit.HoodieWriteHelper.tag(HoodieWriteHelper.java:50)
        at 
org.apache.hudi.table.action.commit.HoodieWriteHelper.tag(HoodieWriteHelper.java:33)
        at 
org.apache.hudi.table.action.commit.BaseWriteHelper.write(BaseWriteHelper.java:53)
        ... 54 more
   Caused by: org.apache.avro.AvroRuntimeException: cannot support rewrite 
value for schema type: "long" since the old schema type is: "double"
        at 
org.apache.hudi.avro.HoodieAvroUtils.rewritePrimaryTypeWithDiffSchemaType(HoodieAvroUtils.java:955)
        at 
org.apache.hudi.avro.HoodieAvroUtils.rewritePrimaryType(HoodieAvroUtils.java:869)
        at 
org.apache.hudi.avro.HoodieAvroUtils.rewriteRecordWithNewSchema(HoodieAvroUtils.java:820)
        at 
org.apache.hudi.avro.HoodieAvroUtils.rewriteRecordWithNewSchema(HoodieAvroUtils.java:818)
        at 
org.apache.hudi.avro.HoodieAvroUtils.rewriteRecordWithNewSchema(HoodieAvroUtils.java:772)
        at 
org.apache.hudi.avro.HoodieAvroUtils.rewriteRecordWithNewSchema(HoodieAvroUtils.java:734)
        at 
org.apache.hudi.avro.HoodieAvroUtils.rewriteRecordDeep(HoodieAvroUtils.java:1034)
        at 
org.apache.hudi.HoodieSparkUtils$.$anonfun$createRdd$4(HoodieSparkUtils.scala:109)
        at 
org.apache.hudi.HoodieSparkUtils$.$anonfun$createRdd$5(HoodieSparkUtils.scala:117)
        at scala.collection.Iterator$$anon$10.next(Iterator.scala:455)
        at scala.collection.Iterator$$anon$10.next(Iterator.scala:455)
        at scala.collection.Iterator$$anon$10.next(Iterator.scala:455)
        at 
org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:199)
        at 
org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:65)
        at 
org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
        at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
        at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
        at org.apache.spark.scheduler.Task.run(Task.scala:138)
        at 
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1516)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        ... 1 more
   ```
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to