simonjobs opened a new issue, #8020:
URL: https://github.com/apache/hudi/issues/8020
**Describe the problem you faced**
When writing data were I am expecting the schema to evolve, I am getting
errors for subsequent writes where the precision and scale of a decimal in the
schema has increased.
We are facing this issue when running Glue 4.0 with built in Hudi connector
and writing to AWS Glue Data Catalog.
**To Reproduce**
Steps to reproduce the behavior:
I am showing an example where scale is 0 and where only the precision
increases but the issue relates to both.
1. Write data to Hudi table with decimal field defined as DecimalType(2,0)
```
value = 99
schema = T.StructType([
T.StructField('recordkeyfield', T.IntegerType()),
T.StructField('precombinefield', T.TimestampType()),
T.StructField('decimalvalue', T.DecimalType(2, 0)),
T.StructField('year', T.IntegerType())
])
data = [(
1,
datetime.strptime('2023-01-01 12:00:00', '%Y-%m-%d %H:%M:%S'),
Decimal(value),
2023)]
df = spark.createDataFrame(data, schema=schema)
```
2. Write to the same field but with field defined as DecimalType(4,0) and
data to match.
```
value = 9999
schema = T.StructType([
T.StructField('recordkeyfield', T.IntegerType()),
T.StructField('precombinefield', T.TimestampType()),
T.StructField('decimalvalue', T.DecimalType(4, 0)),
T.StructField('year', T.IntegerType())
])
data = [(
2,
datetime.strptime('2023-01-01 12:00:00', '%Y-%m-%d %H:%M:%S'),
Decimal(value),
2023)]
df = spark.createDataFrame(data, schema=schema)
```
3. Write will fail with exception: "org.apache.avro.AvroTypeException:
Cannot encode decimal with precision 4 as max precision 2"
Full script:
```
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from awsglue.context import GlueContext
from awsglue.job import Job
from awsglue.gluetypes import *
from pyspark.context import SparkContext
from pyspark.sql import types as T
from pyspark.sql import functions as F
from decimal import Decimal
from datetime import datetime
## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME'])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
value = 9999
schema = T.StructType([
T.StructField('recordkeyfield', T.IntegerType()),
T.StructField('precombinefield', T.TimestampType()),
T.StructField('decimalvalue', T.DecimalType(4, 0)),
T.StructField('year', T.IntegerType())
])
data = [(
2,
datetime.strptime('2023-01-01 12:00:00', '%Y-%m-%d %H:%M:%S'),
Decimal(value),
2023)]
df = spark.createDataFrame(data, schema=schema)
# Set Hudi options
#
https://docs.aws.amazon.com/glue/latest/dg/aws-glue-programming-etl-format-hudi.html#aws-glue-programming-etl-format-hudi-update-insert
additional_options={
"hoodie.table.name": "test_decimals",
"hoodie.database.name": "redacted",
"hoodie.datasource.write.table.type": "COPY_ON_WRITE",
"hoodie.datasource.write.operation": "upsert",
"hoodie.datasource.write.recordkey.field": "recordkeyfield",
"hoodie.datasource.write.precombine.field": "precombinefield",
"hoodie.datasource.write.keygenerator.class":
"org.apache.hudi.keygen.ComplexKeyGenerator",
"hoodie.datasource.write.partitionpath.field": "year",
"hoodie.datasource.write.hive_style_partitioning": "true",
"hoodie.datasource.hive_sync.enable": "true",
"hoodie.datasource.hive_sync.database": "redacted",
"hoodie.datasource.hive_sync.table": "test_decimals",
"hoodie.datasource.hive_sync.partition_fields": "year",
"hoodie.datasource.hive_sync.partition_extractor_class":
"org.apache.hudi.hive.MultiPartKeysValueExtractor",
"hoodie.datasource.hive_sync.use_jdbc": "false",
"hoodie.datasource.hive_sync.mode": "hms",
"hoodie.datasource.meta.sync.enable": "true",
"hoodie.datasource.meta_sync.condition.sync": "true",
"hoodie.schema.on.read.enable": "true",
"hoodie.datasource.write.reconcile.schema": "true"
}
# Write to Data Catalog table via Hudi
df.write.format("hudi") \
.options(**additional_options) \
.mode("append") \
.save("s3://redacted/test_decimals")
job.commit()
```
**Expected behavior**
Schema evolution should work for (increasing) precision and scale of
decimals.
E.g. decimal(2,0) should be able to be changed to decimal(4,0) on write to
data catalog.
**Environment Description**
* AWS Glue version: 4.0
* Hudi version : 0.12.1
* Spark version : 3.3.0-amzn-0
* Hive version : 2.3.9-amzn-2
* Hadoop version : 3.2.1-amzn-8
* Storage (HDFS/S3/GCS..) : S3
* Running on Docker? (yes/no) : No
**Additional context**
We intend to write daily batches of data and if we were to infer schemas for
each batch then schema evolution seems to work fine until there is a change in
a field of the decimal type.
**Stacktrace**
```2023-02-22 14:31:35,737 ERROR [main] glue.ProcessLauncher
(Logging.scala:logError(77)): Error from Python:Traceback (most recent call
last):
File "/tmp/redacted.py", line 67, in <module>
.save("s3://redacted/test_decimals")
File "/opt/amazon/lib/python3.7/site-packages/pyspark/sql/readwriter.py",
line 968, in save
self._jwrite.save(path)
File
"/opt/amazon/spark/python/lib/py4j-0.10.9.5-src.zip/py4j/java_gateway.py", line
1321, in __call__
return_value = get_return_value(
File "/opt/amazon/lib/python3.7/site-packages/pyspark/sql/utils.py", line
190, in deco
return f(*a, **kw)
File
"/opt/amazon/spark/python/lib/py4j-0.10.9.5-src.zip/py4j/protocol.py", line
326, in get_return_value
raise Py4JJavaError(
py4j.protocol.Py4JJavaError: An error occurred while calling o132.save.
: org.apache.hudi.exception.HoodieUpsertException: Failed to upsert for
commit time 20230222143058547
at
org.apache.hudi.table.action.commit.BaseWriteHelper.write(BaseWriteHelper.java:64)
at
org.apache.hudi.table.action.commit.SparkUpsertCommitActionExecutor.execute(SparkUpsertCommitActionExecutor.java:45)
at
org.apache.hudi.table.HoodieSparkCopyOnWriteTable.upsert(HoodieSparkCopyOnWriteTable.java:113)
at
org.apache.hudi.table.HoodieSparkCopyOnWriteTable.upsert(HoodieSparkCopyOnWriteTable.java:97)
at
org.apache.hudi.client.SparkRDDWriteClient.upsert(SparkRDDWriteClient.java:158)
at
org.apache.hudi.DataSourceUtils.doWriteOperation(DataSourceUtils.java:206)
at
org.apache.hudi.HoodieSparkSqlWriter$.write(HoodieSparkSqlWriter.scala:331)
at org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:144)
at
org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:45)
at
org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:75)
at
org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:73)
at
org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:84)
at
org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:103)
at
org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107)
at
org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:224)
at
org.apache.spark.sql.execution.SQLExecution$.executeQuery$1(SQLExecution.scala:114)
at
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$7(SQLExecution.scala:139)
at
org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107)
at
org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:224)
at
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:139)
at
org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:245)
at
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:138)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
at
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:68)
at
org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:100)
at
org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:96)
at
org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:615)
at
org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:177)
at
org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:615)
at
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:30)
at
org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
at
org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
at
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
at
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
at
org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:591)
at
org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:96)
at
org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:83)
at
org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:81)
at
org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:124)
at
org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:860)
at
org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:390)
at
org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:363)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:239)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at
py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
at java.lang.Thread.run(Thread.java:750)
Caused by: org.apache.spark.SparkException: Job aborted due to stage
failure: Task 3 in stage 0.0 failed 4 times, most recent failure: Lost task 3.3
in stage 0.0 (TID 6) (172.34.79.143 executor 1):
org.apache.avro.AvroTypeException: Cannot encode decimal with precision 4 as
max precision 2
at
org.apache.avro.Conversions$DecimalConversion.validate(Conversions.java:140)
at
org.apache.avro.Conversions$DecimalConversion.toFixed(Conversions.java:104)
at
org.apache.hudi.avro.HoodieAvroUtils.rewritePrimaryType(HoodieAvroUtils.java:859)
at
org.apache.hudi.avro.HoodieAvroUtils.rewriteRecordWithNewSchema(HoodieAvroUtils.java:820)
at
org.apache.hudi.avro.HoodieAvroUtils.rewriteRecordWithNewSchema(HoodieAvroUtils.java:818)
at
org.apache.hudi.avro.HoodieAvroUtils.rewriteRecordWithNewSchema(HoodieAvroUtils.java:772)
at
org.apache.hudi.avro.HoodieAvroUtils.rewriteRecordWithNewSchema(HoodieAvroUtils.java:734)
at
org.apache.hudi.avro.HoodieAvroUtils.rewriteRecordDeep(HoodieAvroUtils.java:1034)
at
org.apache.hudi.HoodieSparkUtils$.$anonfun$createRdd$4(HoodieSparkUtils.scala:109)
at
org.apache.hudi.HoodieSparkUtils$.$anonfun$createRdd$5(HoodieSparkUtils.scala:117)
at scala.collection.Iterator$$anon$10.next(Iterator.scala:455)
at scala.collection.Iterator$$anon$10.next(Iterator.scala:455)
at scala.collection.Iterator$$anon$10.next(Iterator.scala:455)
at
org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:199)
at
org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:65)
at
org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
at org.apache.spark.scheduler.Task.run(Task.scala:138)
at
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1516)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:750)
Driver stacktrace:
at
org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2863)
at
org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2799)
at
org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2798)
at
scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:58)
at
scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:51)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at
org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2798)
at
org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1239)
at
org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1239)
at scala.Option.foreach(Option.scala:257)
at
org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1239)
at
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:3051)
at
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2993)
at
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2982)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
at
org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:1009)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2229)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2250)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2269)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2294)
at org.apache.spark.rdd.RDD.$anonfun$collect$1(RDD.scala:1021)
at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:406)
at org.apache.spark.rdd.RDD.collect(RDD.scala:1020)
at org.apache.spark.api.java.JavaRDDLike.collect(JavaRDDLike.scala:362)
at org.apache.spark.api.java.JavaRDDLike.collect$(JavaRDDLike.scala:361)
at
org.apache.spark.api.java.AbstractJavaRDDLike.collect(JavaRDDLike.scala:45)
at
org.apache.hudi.data.HoodieJavaRDD.collectAsList(HoodieJavaRDD.java:155)
at
org.apache.hudi.index.simple.HoodieSimpleIndex.fetchRecordLocationsForAffectedPartitions(HoodieSimpleIndex.java:142)
at
org.apache.hudi.index.simple.HoodieSimpleIndex.tagLocationInternal(HoodieSimpleIndex.java:113)
at
org.apache.hudi.index.simple.HoodieSimpleIndex.tagLocation(HoodieSimpleIndex.java:91)
at
org.apache.hudi.table.action.commit.HoodieWriteHelper.tag(HoodieWriteHelper.java:50)
at
org.apache.hudi.table.action.commit.HoodieWriteHelper.tag(HoodieWriteHelper.java:33)
at
org.apache.hudi.table.action.commit.BaseWriteHelper.write(BaseWriteHelper.java:53)
... 54 more
Caused by: org.apache.avro.AvroTypeException: Cannot encode decimal with
precision 4 as max precision 2
at
org.apache.avro.Conversions$DecimalConversion.validate(Conversions.java:140)
at
org.apache.avro.Conversions$DecimalConversion.toFixed(Conversions.java:104)
at
org.apache.hudi.avro.HoodieAvroUtils.rewritePrimaryType(HoodieAvroUtils.java:859)
at
org.apache.hudi.avro.HoodieAvroUtils.rewriteRecordWithNewSchema(HoodieAvroUtils.java:820)
at
org.apache.hudi.avro.HoodieAvroUtils.rewriteRecordWithNewSchema(HoodieAvroUtils.java:818)
at
org.apache.hudi.avro.HoodieAvroUtils.rewriteRecordWithNewSchema(HoodieAvroUtils.java:772)
at
org.apache.hudi.avro.HoodieAvroUtils.rewriteRecordWithNewSchema(HoodieAvroUtils.java:734)
at
org.apache.hudi.avro.HoodieAvroUtils.rewriteRecordDeep(HoodieAvroUtils.java:1034)
at
org.apache.hudi.HoodieSparkUtils$.$anonfun$createRdd$4(HoodieSparkUtils.scala:109)
at
org.apache.hudi.HoodieSparkUtils$.$anonfun$createRdd$5(HoodieSparkUtils.scala:117)
at scala.collection.Iterator$$anon$10.next(Iterator.scala:455)
at scala.collection.Iterator$$anon$10.next(Iterator.scala:455)
at scala.collection.Iterator$$anon$10.next(Iterator.scala:455)
at
org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:199)
at
org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:65)
at
org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
at org.apache.spark.scheduler.Task.run(Task.scala:138)
at
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1516)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
... 1 more```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]