BCriswell opened a new issue, #8616:
URL: https://github.com/apache/hudi/issues/8616
I've noticed an issue with the `data_before_after` CDC mode not converting
Spark DecimalType correctly. The decimals are getting converted to an array in
the before and after json strings when the cdc data is saved, which then
results in null values when trying to convert back to a Row using F.from_json()
along with the original schema because Spark can't cast the array to a valid
DecimalType. Example:
Querying the hudi table normally:
`gljeln=Decimal('208.000000000000000000')`
Querying using the cdc format + incremental options:
`Row(op='i', ts_ms='20230425193451991', before='null', after='{"gljeln": [0,
0, 0, 0, 0, 0, 0, 11, 70, -108, 113, -8, 1, 64, 0, 0]...`
Steps to reproduce the behavior:
1. Create a Spark dataframe that contains a decimal column (precision=18,
scale=38) with valid decimal value.
2. Write dataframe to new Hudi table using write options for enabling CDC.
3. Create a second dataframe that contains a decimal column (precision=18,
scale=38) with valid decimal value.
4. Write dataframe to same Hudi table using same write options, which will
create the first .cdc file.
5. Read CDC data using cdc format for incremental query starting at the
first commit time (which will only read the newly created cdc data).
6. The result is a single insert entry, and the decimal value in the "after"
JSON string will be an array of numbers instead of the string representation of
the decimal.
**Expected behavior**
The decimal value should be serialized to an appropriate type (probably a
string) that can be deserialized without corrupting the data.
**Environment Description**
* Hudi version : hudi-spark3.3-bundle_2.12-0.13.0.jar
* Spark version : 3.3.1
* Hive version : N/A
* Hadoop version : N/A
* Storage (HDFS/S3/GCS..) : S3
* Running on Docker? (yes/no) : Yes
**Additional context**
Example script to reproduce, and results:
```
from datetime import datetime
from decimal import Decimal
from pyspark.sql import SparkSession, types as T
HUDI_TARGET = 's3://some-bucket'
def decimal_test():
spark = (
SparkSession
.builder
.appName('decimal_test')
.config('spark.serializer',
'org.apache.spark.serializer.KryoSerializer')
.config('spark.sql.hive.convertMetastoreParquet', 'false')
.config('spark.hadoop.mapreduce.input.pathFilter.class',
'org.apache.hudi.hadoop.HoodieROTablePathFilter')
.config('spark.sql.parquet.mergeSchema', 'true')
.config('spark.sql.files.ignoreMissingFiles', 'true')
.config('spark.sql.adaptive.enabled', 'true')
.config('spark.sql.sources.partitionOverwriteMode', 'dynamic')
.config('spark.sql.sources.partitionColumnTypeInference.enabled',
'false')
.getOrCreate()
)
write_options = {
'hoodie.bloom.index.bucketized.checking': False,
'hoodie.bloom.index.input.storage.level': 'MEMORY_AND_DISK',
'hoodie.bloom.index.prune.by.ranges': False,
'hoodie.bulkinsert.shuffle.parallelism': 50,
'hoodie.bulkinsert.sort.mode': 'PARTITION_SORT',
'hoodie.combine.before.insert': False,
'hoodie.combine.before.upsert': False,
'hoodie.datasource.write.insert.drop.duplicates': False,
'hoodie.datasource.write.operation': 'UPSERT',
'hoodie.datasource.write.payload.class':
'org.apache.hudi.common.model.DefaultHoodieRecordPayload',
'hoodie.datasource.write.precombine.field': 'ts',
'hoodie.datasource.write.recordkey.field': 'id',
'hoodie.datasource.write.row.writer.enable': True,
'hoodie.datasource.write.table.type': 'COPY_ON_WRITE',
'hoodie.finalize.write.parallelism': 50,
'hoodie.index.type': 'SIMPLE',
'hoodie.insert.shuffle.parallelism': 50,
'hoodie.metadata.enable': True,
'hoodie.payload.ordering.field': 'ts',
'hoodie.simple.index.input.storage.level': 'MEMORY_AND_DISK',
'hoodie.table.name': 'some_table',
'hoodie.upsert.shuffle.parallelism': 50,
'hoodie.write.status.storage.level': 'MEMORY_AND_DISK',
'hoodie.table.cdc.enabled': True,
'hoodie.datasource.query.incremental.format': 'data_before_after',
'hoodie.datasource.write.partitionpath.field': '',
'hoodie.datasource.write.keygenerator.class':
'org.apache.hudi.keygen.NonpartitionedKeyGenerator'
}
schema = T.StructType([
T.StructField('id', T.IntegerType()),
T.StructField('amount', T.DecimalType(38, 18)),
T.StructField('ts', T.TimestampType())
])
df1 = spark.createDataFrame([(1, Decimal('42.000000000000000000'),
datetime.now())], schema)
df2 = spark.createDataFrame([(2, Decimal('2319.000000000000000000'),
datetime.now())], schema)
df1.write.format('hudi').options(**write_options).save(HUDI_TARGET,
mode='append')
df1 = spark.read.format('hudi').load(HUDI_TARGET)
assert df1.first().amount == Decimal('42.000000000000000000')
df2.write.format('hudi').options(**write_options).save(HUDI_TARGET,
mode='append')
if __name__ == '__main__':
decimal_test()
```
Incremental CDC query:
```
incremental_read_options = {
'hoodie.datasource.query.type': 'incremental',
'hoodie.datasource.read.begin.instanttime': commit_time,
'hoodie.datasource.query.incremental.format': 'cdc'
}
latest =
spark.read.format('hudi').options(**incremental_read_options).load(s3_path)
```
And the result:
```
Row(op='i', ts_ms='20230428175640557', before='null', after='{"id": 2,
"amount": [0, 0, 0, 0, 0, 0, 0, 125, -74, -105, 5, 105, 82, -36, 0, 0], "ts":
1682704564405033}')
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]