joao-miranda opened a new issue, #6552:
URL: https://github.com/apache/hudi/issues/6552
**Describe the problem you faced**
We are getting Full Load + CDC data from a RDBMS using AWS Database
Migration Service into an S3 bucket. We then use Hudi in a Scala Glue Job to
concatenate the files into a correct representation of the current status of
the database. DMS adds two columns to the data: Op (with values null, I, U or
D) and ts (timestamp of the operation). We are not using Hive or Avro.
This works fine with Hudi 0.9.0 and Hudi 0.10.0. Once we try to upgrade to
Hudi 0.11.0, 0.11.1 or 0.12.0, AWSDmsAvroPayload fails with the following error:
```
33061 [consumer-thread-1] ERROR org.apache.hudi.io.HoodieWriteHandle -
Error writing record HoodieRecord{key=HoodieKey { recordKey=id:3
partitionPath=}, currentLocation='null', newLocation='null'}
java.util.NoSuchElementException: No value present in Option
at org.apache.hudi.common.util.Option.get(Option.java:89)
at
org.apache.hudi.common.model.AWSDmsAvroPayload.getInsertValue(AWSDmsAvroPayload.java:72)
at
org.apache.hudi.execution.HoodieLazyInsertIterable$HoodieInsertValueGenResult.<init>(HoodieLazyInsertIterable.java:90)
at
org.apache.hudi.execution.HoodieLazyInsertIterable.lambda$getTransformFunction$0(HoodieLazyInsertIterable.java:103)
at
org.apache.hudi.common.util.queue.BoundedInMemoryQueue.insertRecord(BoundedInMemoryQueue.java:190)
at
org.apache.hudi.common.util.queue.IteratorBasedQueueProducer.produce(IteratorBasedQueueProducer.java:46)
at
org.apache.hudi.common.util.queue.BoundedInMemoryExecutor.lambda$null$0(BoundedInMemoryExecutor.java:106)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
```
Removing the PAYLOAD_CLASS_OPT_KEY option from the config makes it so that
the Job doesn't fail, but the delete operations are not applied. No other
payload class seems to work with the DMS format.
**Steps to reproduce the behavior**
**Dependencies:**
```
"org.apache.hudi" %% "hudi-spark-bundle" % "2.12-0.12.0"
"org.apache.hudi" %% "hudi-utilities-bundle" % "2.12-0.12.0"
```
**Configuration used:**
```
var hudiOptions = scala.collection.mutable.Map[String, String](
HoodieWriteConfig.TABLE_NAME -> "hudiTableName",
HoodieWriteConfig.COMBINE_BEFORE_INSERT.key() -> "true",
DataSourceWriteOptions.OPERATION_OPT_KEY ->
DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL,
DataSourceWriteOptions.TABLE_TYPE_OPT_KEY ->
DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL,
DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY -> "primaryKeyField",
DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY -> "ts",
DataSourceWriteOptions.PAYLOAD_CLASS_OPT_KEY ->
classOf[AWSDmsAvroPayload].getName,
DataSourceWriteOptions.KEYGENERATOR_CLASS_OPT_KEY ->
classOf[CustomKeyGenerator].getName,
DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, ""
)
```
**Following options are added if a partition key is defined:**
```
hudiOptions.put(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY,
"partitionKeyField")
hudiOptions.put(DataSourceWriteOptions.HIVE_STYLE_PARTITIONING_OPT_KEY, "true")
hudiOptions.put(HoodieIndexConfig.INDEX_TYPE.key(), "GLOBAL_BLOOM")
hudiOptions.put(HoodieIndexConfig.BLOOM_INDEX_UPDATE_PARTITION_PATH_ENABLE.key(),
"true")
hudiOptions.put(DataSourceWriteOptions.DROP_PARTITION_COLUMNS.key(),
"true")
```
**Saved into a file:**
```
// Write the DataFrame as a Hudi dataset
mappedDF
.dropDuplicates()
.write
.format("org.apache.hudi")
.options(hudiOptions)
.mode(SaveMode.Append)
.save("targetDirectory")
```
**Expected behavior**
Data obtained from using Hudi reflects the data present in the DB.
**Environment Description**
- Hudi version : 0.12.0
- Spark version : 3.1.1
- Scala version: 2.12.15
- AWS Glue version : 3.0.0
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]