kapjoshi-cisco opened a new issue, #8160:
URL: https://github.com/apache/hudi/issues/8160
**Describe the problem you faced**
Writing to hudi table post allowed datatype upgrades like long to double
isnt alllowed and throws backs error.
```
org.apache.avro.AvroRuntimeException: cannot support rewrite value for
schema type: "long" since the old schema type is: "double"
```
A clear and concise description of the problem.
Schema evolution wrt to data types upgrades from int to long or long to
double is failing. We are using Glue 4, which has native support for hudi
libraries. Hence we are not using any external dependent hudi jars. Without
scheam evolution we are able to write and read from hudi table successfully.
But with schema evolution only addition of column was successfully tested.
While testing schema evolution wrt to datatype upgrades, its failing. When we
say data type upgrades, we meant in the data source few column data types are
upgraded as allowed wrt Avro specification. Say from int to long or long to
double. This as per documentation is supported but not working in actual.
**To Reproduce**
Steps to reproduce the behavior:
1. Create a glue job, with Glue 4
2. Update hudi script with below configs:
```
common_config = {
'className': ORG_APACHE_HUDI,
'hoodie.datasource.hive_sync.use_jdbc': 'false',
'hoodie.datasource.write.precombine.field': 'Time',
'hoodie.datasource.write.recordkey.field': 'id',
'hoodie.table.name': hudi_table,
'hoodie.datasource.hive_sync.mode': 'hms',
'hoodie.consistency.check.enabled': 'false',
'hoodie.schema.on.read.enable': 'true',
'hoodie.datasource.hive_sync.database': hudidb,
'hoodie.datasource.write.reconcile.schema': 'true',
'hoodie.datasource.hive_sync.table': hudi_table,
# 'hoodie.datasource.hive_sync.table':
f"{'_'.join(prefix.split('/')[1:])}",
'hoodie.datasource.hive_sync.enable': 'true',
'path': 's3://' + args['some_bucket'] + '/demo/full/demo_data',
'hoodie.parquet.small.file.limit': '134217728',
'hoodie.datasource.hive_sync.partition_extractor_class':
'org.apache.hudi.hive.NonPartitionedExtractor',
'hoodie.datasource.write.keygenerator.class':
'org.apache.hudi.keygen.NonpartitionedKeyGenerator',
'hoodie.upsert.shuffle.parallelism': 68,
"hoodie.datasource.write.operation": 'upsert'
}
```
This property 'hoodie.datasource.write.reconcile.schema': 'false', setting
seems to be the issue as per #7283.
But this #7283 issue states its fixed, which is not clear. Since we are
using Glue 4 we dont know which Hudi version is used in the background.
3. Write dataframe with one `long` column say col_1.
4. Write again the same data with schema updates, such that col_1 datatype
is updated to `double`
**Expected behavior**
A clear and concise description of what you expected to happen.
The data type of the column `col_1` should had been upgraded from `long` to
`double` and this should had been reflecting in the Glue catalog table schema.
**Environment Description**
* Hudi version : unknown, we are using Glue 4 which natively supports hudi .
* Spark version : 3.3
* Hive version :
* Hadoop version :
* Storage (HDFS/S3/GCS..) : S3
* Running on Docker? (yes/no) : no
**Additional context**
Add any other context about the problem here.
**Stacktrace**
```Add the stacktrace of the error.```
```bash
2023-03-12 19:26:28,932 ERROR [main] glue.ProcessLauncher
(Logging.scala:logError(77)): Error from Python:Traceback (most recent call
last):
File "/tmp/second-delete-upsert.py", line 175, in <module>
main()
File "/tmp/second-delete-upsert.py", line 159, in main
start_merging()
File "/tmp/second-delete-upsert.py", line 136, in start_merging
hudi_merge(input_df, combined_conf)
File "/tmp/second-delete-upsert.py", line 70, in hudi_merge
.save()
File "/opt/amazon/spark/python/lib/pyspark.zip/pyspark/sql/readwriter.py",
line 966, in save
self._jwrite.save()
File
"/opt/amazon/spark/python/lib/py4j-0.10.9.5-src.zip/py4j/java_gateway.py", line
1321, in __call__
return_value = get_return_value(
File "/opt/amazon/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line
190, in deco
return f(*a, **kw)
File
"/opt/amazon/spark/python/lib/py4j-0.10.9.5-src.zip/py4j/protocol.py", line
326, in get_return_value
raise Py4JJavaError(
py4j.protocol.Py4JJavaError: An error occurred while calling o162.save.
: org.apache.hudi.exception.HoodieUpsertException: Failed to upsert for
commit time 20230312192624096
at
org.apache.hudi.table.action.commit.BaseWriteHelper.write(BaseWriteHelper.java:64)
at
org.apache.hudi.table.action.commit.SparkUpsertCommitActionExecutor.execute(SparkUpsertCommitActionExecutor.java:45)
at
org.apache.hudi.table.HoodieSparkCopyOnWriteTable.upsert(HoodieSparkCopyOnWriteTable.java:113)
at
org.apache.hudi.table.HoodieSparkCopyOnWriteTable.upsert(HoodieSparkCopyOnWriteTable.java:97)
at
org.apache.hudi.client.SparkRDDWriteClient.upsert(SparkRDDWriteClient.java:158)
at
org.apache.hudi.DataSourceUtils.doWriteOperation(DataSourceUtils.java:206)
at
org.apache.hudi.HoodieSparkSqlWriter$.write(HoodieSparkSqlWriter.scala:331)
at org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:144)
at
org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:45)
at
org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:75)
at
org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:73)
at
org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:84)
at
org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:103)
at
org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107)
at
org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:224)
at
org.apache.spark.sql.execution.SQLExecution$.executeQuery$1(SQLExecution.scala:114)
at
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$7(SQLExecution.scala:139)
at
org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107)
at
org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:224)
at
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:139)
at
org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:245)
at
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:138)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
at
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:68)
at
org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:100)
at
org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:96)
at
org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:615)
at
org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:177)
at
org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:615)
at
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:30)
at
org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
at
org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
at
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
at
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
at
org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:591)
at
org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:96)
at
org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:83)
at
org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:81)
at
org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:124)
at
org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:860)
at
org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:390)
at
org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:363)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:247)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at
py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
at java.lang.Thread.run(Thread.java:750)
Caused by: org.apache.spark.SparkException: Job aborted due to stage
failure: Task 3 in stage 3.0 failed 4 times, most recent failure: Lost task 3.3
in stage 3.0 (TID 11) (172.34.158.152 executor 1):
org.apache.avro.AvroRuntimeException: cannot support rewrite value for schema
type: "long" since the old schema type is: "double"
at
org.apache.hudi.avro.HoodieAvroUtils.rewritePrimaryTypeWithDiffSchemaType(HoodieAvroUtils.java:955)
at
org.apache.hudi.avro.HoodieAvroUtils.rewritePrimaryType(HoodieAvroUtils.java:869)
at
org.apache.hudi.avro.HoodieAvroUtils.rewriteRecordWithNewSchema(HoodieAvroUtils.java:820)
at
org.apache.hudi.avro.HoodieAvroUtils.rewriteRecordWithNewSchema(HoodieAvroUtils.java:818)
at
org.apache.hudi.avro.HoodieAvroUtils.rewriteRecordWithNewSchema(HoodieAvroUtils.java:772)
at
org.apache.hudi.avro.HoodieAvroUtils.rewriteRecordWithNewSchema(HoodieAvroUtils.java:734)
at
org.apache.hudi.avro.HoodieAvroUtils.rewriteRecordDeep(HoodieAvroUtils.java:1034)
at
org.apache.hudi.HoodieSparkUtils$.$anonfun$createRdd$4(HoodieSparkUtils.scala:109)
at
org.apache.hudi.HoodieSparkUtils$.$anonfun$createRdd$5(HoodieSparkUtils.scala:117)
at scala.collection.Iterator$$anon$10.next(Iterator.scala:455)
at scala.collection.Iterator$$anon$10.next(Iterator.scala:455)
at scala.collection.Iterator$$anon$10.next(Iterator.scala:455)
at
org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:199)
at
org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:65)
at
org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
at org.apache.spark.scheduler.Task.run(Task.scala:138)
at
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1516)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:750)
Driver stacktrace:
at
org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2863)
at
org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2799)
at
org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2798)
at
scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:58)
at
scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:51)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at
org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2798)
at
org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1239)
at
org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1239)
at scala.Option.foreach(Option.scala:257)
at
org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1239)
at
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:3051)
at
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2993)
at
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2982)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
at
org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:1009)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2229)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2250)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2269)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2294)
at org.apache.spark.rdd.RDD.$anonfun$collect$1(RDD.scala:1021)
at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:406)
at org.apache.spark.rdd.RDD.collect(RDD.scala:1020)
at org.apache.spark.api.java.JavaRDDLike.collect(JavaRDDLike.scala:362)
at org.apache.spark.api.java.JavaRDDLike.collect$(JavaRDDLike.scala:361)
at
org.apache.spark.api.java.AbstractJavaRDDLike.collect(JavaRDDLike.scala:45)
at
org.apache.hudi.data.HoodieJavaRDD.collectAsList(HoodieJavaRDD.java:155)
at
org.apache.hudi.index.simple.HoodieSimpleIndex.fetchRecordLocationsForAffectedPartitions(HoodieSimpleIndex.java:142)
at
org.apache.hudi.index.simple.HoodieSimpleIndex.tagLocationInternal(HoodieSimpleIndex.java:113)
at
org.apache.hudi.index.simple.HoodieSimpleIndex.tagLocation(HoodieSimpleIndex.java:91)
at
org.apache.hudi.table.action.commit.HoodieWriteHelper.tag(HoodieWriteHelper.java:50)
at
org.apache.hudi.table.action.commit.HoodieWriteHelper.tag(HoodieWriteHelper.java:33)
at
org.apache.hudi.table.action.commit.BaseWriteHelper.write(BaseWriteHelper.java:53)
... 54 more
Caused by: org.apache.avro.AvroRuntimeException: cannot support rewrite
value for schema type: "long" since the old schema type is: "double"
at
org.apache.hudi.avro.HoodieAvroUtils.rewritePrimaryTypeWithDiffSchemaType(HoodieAvroUtils.java:955)
at
org.apache.hudi.avro.HoodieAvroUtils.rewritePrimaryType(HoodieAvroUtils.java:869)
at
org.apache.hudi.avro.HoodieAvroUtils.rewriteRecordWithNewSchema(HoodieAvroUtils.java:820)
at
org.apache.hudi.avro.HoodieAvroUtils.rewriteRecordWithNewSchema(HoodieAvroUtils.java:818)
at
org.apache.hudi.avro.HoodieAvroUtils.rewriteRecordWithNewSchema(HoodieAvroUtils.java:772)
at
org.apache.hudi.avro.HoodieAvroUtils.rewriteRecordWithNewSchema(HoodieAvroUtils.java:734)
at
org.apache.hudi.avro.HoodieAvroUtils.rewriteRecordDeep(HoodieAvroUtils.java:1034)
at
org.apache.hudi.HoodieSparkUtils$.$anonfun$createRdd$4(HoodieSparkUtils.scala:109)
at
org.apache.hudi.HoodieSparkUtils$.$anonfun$createRdd$5(HoodieSparkUtils.scala:117)
at scala.collection.Iterator$$anon$10.next(Iterator.scala:455)
at scala.collection.Iterator$$anon$10.next(Iterator.scala:455)
at scala.collection.Iterator$$anon$10.next(Iterator.scala:455)
at
org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:199)
at
org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:65)
at
org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
at org.apache.spark.scheduler.Task.run(Task.scala:138)
at
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1516)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
... 1 more
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]