simonjobs opened a new issue, #8020:
URL: https://github.com/apache/hudi/issues/8020

   **Describe the problem you faced**
   
   When writing data were I am expecting the schema to evolve, I am getting 
errors for subsequent writes where the precision and scale of a decimal in the 
schema has increased.
   
   We are facing this issue when running Glue 4.0 with built in Hudi connector 
and writing to AWS Glue Data Catalog.
   
   **To Reproduce**
   
   Steps to reproduce the behavior:
   
   I am showing an example where scale is 0 and where only the precision 
increases but the issue relates to both.
   
   1. Write data to Hudi table with decimal field defined as DecimalType(2,0)
   ```
   value = 99
   schema = T.StructType([
               T.StructField('recordkeyfield', T.IntegerType()),
               T.StructField('precombinefield', T.TimestampType()),
               T.StructField('decimalvalue', T.DecimalType(2, 0)),
               T.StructField('year', T.IntegerType())
   ])
   data = [(
       1,
       datetime.strptime('2023-01-01 12:00:00', '%Y-%m-%d %H:%M:%S'),
       Decimal(value),
       2023)]
   df = spark.createDataFrame(data, schema=schema)
   ``` 
   2. Write to the same field but with field defined as DecimalType(4,0) and 
data to match.
   
   ```
   value = 9999
   schema = T.StructType([
               T.StructField('recordkeyfield', T.IntegerType()),
               T.StructField('precombinefield', T.TimestampType()),
               T.StructField('decimalvalue', T.DecimalType(4, 0)),
               T.StructField('year', T.IntegerType())
   ])
   data = [(
       2,
       datetime.strptime('2023-01-01 12:00:00', '%Y-%m-%d %H:%M:%S'),
       Decimal(value),
       2023)]
   df = spark.createDataFrame(data, schema=schema)
   ``` 
   3. Write will fail with exception: "org.apache.avro.AvroTypeException: 
Cannot encode decimal with precision 4 as max precision 2"
   
   
   Full script:
   
   ```
   import sys
   
   from awsglue.transforms import *
   from awsglue.utils import getResolvedOptions
   from awsglue.context import GlueContext
   from awsglue.job import Job
   from awsglue.gluetypes import *
   
   from pyspark.context import SparkContext
   from pyspark.sql import types as T
   from pyspark.sql import functions as F
   from decimal import Decimal
   from datetime import datetime
   
   ## @params: [JOB_NAME]
   args = getResolvedOptions(sys.argv, ['JOB_NAME'])
   
   sc = SparkContext()
   glueContext = GlueContext(sc)
   spark = glueContext.spark_session
   job = Job(glueContext)
   job.init(args['JOB_NAME'], args)
   
   value = 9999
   schema = T.StructType([
               T.StructField('recordkeyfield', T.IntegerType()),
               T.StructField('precombinefield', T.TimestampType()),
               T.StructField('decimalvalue', T.DecimalType(4, 0)),
               T.StructField('year', T.IntegerType())
   ])
   data = [(
       2,
       datetime.strptime('2023-01-01 12:00:00', '%Y-%m-%d %H:%M:%S'),
       Decimal(value),
       2023)]
   df = spark.createDataFrame(data, schema=schema)
   
   # Set Hudi options
   # 
https://docs.aws.amazon.com/glue/latest/dg/aws-glue-programming-etl-format-hudi.html#aws-glue-programming-etl-format-hudi-update-insert
   additional_options={
       "hoodie.table.name": "test_decimals",
       "hoodie.database.name": "redacted",
       "hoodie.datasource.write.table.type": "COPY_ON_WRITE",
       "hoodie.datasource.write.operation": "upsert",
       "hoodie.datasource.write.recordkey.field": "recordkeyfield",
       "hoodie.datasource.write.precombine.field": "precombinefield",
       "hoodie.datasource.write.keygenerator.class": 
"org.apache.hudi.keygen.ComplexKeyGenerator",
       "hoodie.datasource.write.partitionpath.field": "year",
       "hoodie.datasource.write.hive_style_partitioning": "true",
       "hoodie.datasource.hive_sync.enable": "true",
       "hoodie.datasource.hive_sync.database": "redacted",
       "hoodie.datasource.hive_sync.table": "test_decimals",
       "hoodie.datasource.hive_sync.partition_fields": "year",
       "hoodie.datasource.hive_sync.partition_extractor_class": 
"org.apache.hudi.hive.MultiPartKeysValueExtractor",
       "hoodie.datasource.hive_sync.use_jdbc": "false",
       "hoodie.datasource.hive_sync.mode": "hms",
       "hoodie.datasource.meta.sync.enable": "true",
       "hoodie.datasource.meta_sync.condition.sync": "true",
       "hoodie.schema.on.read.enable": "true",
       "hoodie.datasource.write.reconcile.schema": "true"
   }
   
   # Write to Data Catalog table via Hudi
   df.write.format("hudi") \
       .options(**additional_options) \
       .mode("append") \
       .save("s3://redacted/test_decimals")
   
   job.commit()
   ```
   
   **Expected behavior**
   
   Schema evolution should work for (increasing) precision and scale of 
decimals.
   
   E.g. decimal(2,0) should be able to be changed to decimal(4,0) on write to 
data catalog.
   
   **Environment Description**
   
   * AWS Glue version: 4.0
   
   * Hudi version :  0.12.1
   
   * Spark version : 3.3.0-amzn-0
   
   * Hive version : 2.3.9-amzn-2
   
   * Hadoop version : 3.2.1-amzn-8
   
   * Storage (HDFS/S3/GCS..) : S3
   
   * Running on Docker? (yes/no) : No
   
   
   **Additional context**
   
   We intend to write daily batches of data and if we were to infer schemas for 
each batch then schema evolution seems to work fine until there is a change in 
a field of the decimal type.
   
   
   
   **Stacktrace**
   
   ```2023-02-22 14:31:35,737 ERROR [main] glue.ProcessLauncher 
(Logging.scala:logError(77)): Error from Python:Traceback (most recent call 
last):
     File "/tmp/redacted.py", line 67, in <module>
       .save("s3://redacted/test_decimals")
     File "/opt/amazon/lib/python3.7/site-packages/pyspark/sql/readwriter.py", 
line 968, in save
       self._jwrite.save(path)
     File 
"/opt/amazon/spark/python/lib/py4j-0.10.9.5-src.zip/py4j/java_gateway.py", line 
1321, in __call__
       return_value = get_return_value(
     File "/opt/amazon/lib/python3.7/site-packages/pyspark/sql/utils.py", line 
190, in deco
       return f(*a, **kw)
     File 
"/opt/amazon/spark/python/lib/py4j-0.10.9.5-src.zip/py4j/protocol.py", line 
326, in get_return_value
       raise Py4JJavaError(
   py4j.protocol.Py4JJavaError: An error occurred while calling o132.save.
   : org.apache.hudi.exception.HoodieUpsertException: Failed to upsert for 
commit time 20230222143058547
        at 
org.apache.hudi.table.action.commit.BaseWriteHelper.write(BaseWriteHelper.java:64)
        at 
org.apache.hudi.table.action.commit.SparkUpsertCommitActionExecutor.execute(SparkUpsertCommitActionExecutor.java:45)
        at 
org.apache.hudi.table.HoodieSparkCopyOnWriteTable.upsert(HoodieSparkCopyOnWriteTable.java:113)
        at 
org.apache.hudi.table.HoodieSparkCopyOnWriteTable.upsert(HoodieSparkCopyOnWriteTable.java:97)
        at 
org.apache.hudi.client.SparkRDDWriteClient.upsert(SparkRDDWriteClient.java:158)
        at 
org.apache.hudi.DataSourceUtils.doWriteOperation(DataSourceUtils.java:206)
        at 
org.apache.hudi.HoodieSparkSqlWriter$.write(HoodieSparkSqlWriter.scala:331)
        at org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:144)
        at 
org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:45)
        at 
org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:75)
        at 
org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:73)
        at 
org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:84)
        at 
org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:103)
        at 
org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107)
        at 
org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:224)
        at 
org.apache.spark.sql.execution.SQLExecution$.executeQuery$1(SQLExecution.scala:114)
        at 
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$7(SQLExecution.scala:139)
        at 
org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107)
        at 
org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:224)
        at 
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:139)
        at 
org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:245)
        at 
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:138)
        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
        at 
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:68)
        at 
org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:100)
        at 
org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:96)
        at 
org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:615)
        at 
org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:177)
        at 
org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:615)
        at 
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:30)
        at 
org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
        at 
org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
        at 
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
        at 
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
        at 
org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:591)
        at 
org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:96)
        at 
org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:83)
        at 
org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:81)
        at 
org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:124)
        at 
org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:860)
        at 
org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:390)
        at 
org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:363)
        at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:239)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
        at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
        at py4j.Gateway.invoke(Gateway.java:282)
        at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
        at py4j.commands.CallCommand.execute(CallCommand.java:79)
        at 
py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
        at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
        at java.lang.Thread.run(Thread.java:750)
   Caused by: org.apache.spark.SparkException: Job aborted due to stage 
failure: Task 3 in stage 0.0 failed 4 times, most recent failure: Lost task 3.3 
in stage 0.0 (TID 6) (172.34.79.143 executor 1): 
org.apache.avro.AvroTypeException: Cannot encode decimal with precision 4 as 
max precision 2
        at 
org.apache.avro.Conversions$DecimalConversion.validate(Conversions.java:140)
        at 
org.apache.avro.Conversions$DecimalConversion.toFixed(Conversions.java:104)
        at 
org.apache.hudi.avro.HoodieAvroUtils.rewritePrimaryType(HoodieAvroUtils.java:859)
        at 
org.apache.hudi.avro.HoodieAvroUtils.rewriteRecordWithNewSchema(HoodieAvroUtils.java:820)
        at 
org.apache.hudi.avro.HoodieAvroUtils.rewriteRecordWithNewSchema(HoodieAvroUtils.java:818)
        at 
org.apache.hudi.avro.HoodieAvroUtils.rewriteRecordWithNewSchema(HoodieAvroUtils.java:772)
        at 
org.apache.hudi.avro.HoodieAvroUtils.rewriteRecordWithNewSchema(HoodieAvroUtils.java:734)
        at 
org.apache.hudi.avro.HoodieAvroUtils.rewriteRecordDeep(HoodieAvroUtils.java:1034)
        at 
org.apache.hudi.HoodieSparkUtils$.$anonfun$createRdd$4(HoodieSparkUtils.scala:109)
        at 
org.apache.hudi.HoodieSparkUtils$.$anonfun$createRdd$5(HoodieSparkUtils.scala:117)
        at scala.collection.Iterator$$anon$10.next(Iterator.scala:455)
        at scala.collection.Iterator$$anon$10.next(Iterator.scala:455)
        at scala.collection.Iterator$$anon$10.next(Iterator.scala:455)
        at 
org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:199)
        at 
org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:65)
        at 
org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
        at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
        at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
        at org.apache.spark.scheduler.Task.run(Task.scala:138)
        at 
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1516)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:750)
   
   Driver stacktrace:
        at 
org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2863)
        at 
org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2799)
        at 
org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2798)
        at 
scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:58)
        at 
scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:51)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
        at 
org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2798)
        at 
org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1239)
        at 
org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1239)
        at scala.Option.foreach(Option.scala:257)
        at 
org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1239)
        at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:3051)
        at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2993)
        at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2982)
        at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
        at 
org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:1009)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:2229)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:2250)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:2269)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:2294)
        at org.apache.spark.rdd.RDD.$anonfun$collect$1(RDD.scala:1021)
        at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
        at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
        at org.apache.spark.rdd.RDD.withScope(RDD.scala:406)
        at org.apache.spark.rdd.RDD.collect(RDD.scala:1020)
        at org.apache.spark.api.java.JavaRDDLike.collect(JavaRDDLike.scala:362)
        at org.apache.spark.api.java.JavaRDDLike.collect$(JavaRDDLike.scala:361)
        at 
org.apache.spark.api.java.AbstractJavaRDDLike.collect(JavaRDDLike.scala:45)
        at 
org.apache.hudi.data.HoodieJavaRDD.collectAsList(HoodieJavaRDD.java:155)
        at 
org.apache.hudi.index.simple.HoodieSimpleIndex.fetchRecordLocationsForAffectedPartitions(HoodieSimpleIndex.java:142)
        at 
org.apache.hudi.index.simple.HoodieSimpleIndex.tagLocationInternal(HoodieSimpleIndex.java:113)
        at 
org.apache.hudi.index.simple.HoodieSimpleIndex.tagLocation(HoodieSimpleIndex.java:91)
        at 
org.apache.hudi.table.action.commit.HoodieWriteHelper.tag(HoodieWriteHelper.java:50)
        at 
org.apache.hudi.table.action.commit.HoodieWriteHelper.tag(HoodieWriteHelper.java:33)
        at 
org.apache.hudi.table.action.commit.BaseWriteHelper.write(BaseWriteHelper.java:53)
        ... 54 more
   Caused by: org.apache.avro.AvroTypeException: Cannot encode decimal with 
precision 4 as max precision 2
        at 
org.apache.avro.Conversions$DecimalConversion.validate(Conversions.java:140)
        at 
org.apache.avro.Conversions$DecimalConversion.toFixed(Conversions.java:104)
        at 
org.apache.hudi.avro.HoodieAvroUtils.rewritePrimaryType(HoodieAvroUtils.java:859)
        at 
org.apache.hudi.avro.HoodieAvroUtils.rewriteRecordWithNewSchema(HoodieAvroUtils.java:820)
        at 
org.apache.hudi.avro.HoodieAvroUtils.rewriteRecordWithNewSchema(HoodieAvroUtils.java:818)
        at 
org.apache.hudi.avro.HoodieAvroUtils.rewriteRecordWithNewSchema(HoodieAvroUtils.java:772)
        at 
org.apache.hudi.avro.HoodieAvroUtils.rewriteRecordWithNewSchema(HoodieAvroUtils.java:734)
        at 
org.apache.hudi.avro.HoodieAvroUtils.rewriteRecordDeep(HoodieAvroUtils.java:1034)
        at 
org.apache.hudi.HoodieSparkUtils$.$anonfun$createRdd$4(HoodieSparkUtils.scala:109)
        at 
org.apache.hudi.HoodieSparkUtils$.$anonfun$createRdd$5(HoodieSparkUtils.scala:117)
        at scala.collection.Iterator$$anon$10.next(Iterator.scala:455)
        at scala.collection.Iterator$$anon$10.next(Iterator.scala:455)
        at scala.collection.Iterator$$anon$10.next(Iterator.scala:455)
        at 
org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:199)
        at 
org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:65)
        at 
org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
        at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
        at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
        at org.apache.spark.scheduler.Task.run(Task.scala:138)
        at 
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1516)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        ... 1 more```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to