Hans-Raintree opened a new issue, #8278:
URL: https://github.com/apache/hudi/issues/8278
**Describe the problem you faced**
Deltastreamer ingest fails with AWSDmsAvroPayload, works without with
identical configuration.
**To Reproduce**
Steps to reproduce the behavior:
1. Enable --payload-class org.apache.hudi.common.model.AWSDmsAvroPayload
2. Run deltastreamer in a AWS EMR cluster.
**Expected behavior**
Deltastreamer creates hudi dataset.
**Environment Description**
* Hudi version : 0.13.0
* Spark version : 3.3.1
* Hive version : 3.1.3
* Hadoop version : Amazon 3.3.3
* Storage (HDFS/S3/GCS..) : S3
* Running on Docker? (yes/no) : no
**Additional context**
Bootstrapped 0.13.0 jars:
hudi-spark3.3-bundle_2.12-0.13.0.jar
hudi-utilities-slim-bundle_2.12-0.13.0.jar
hudi-aws-bundle-0.13.0.jar
`spark-submit
--master yarn
--jars
/mnt1/hudi-jars/hudi-spark-bundle.jar,/mnt1/hudi-jars/hudi-utilities-slim-bundle.jar,/mnt1/hudi-jars/hudi-aws-bundle.jar
--deploy-mode cluster
--conf spark.serializer=org.apache.spark.serializer.KryoSerializer
--class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer
/mnt1/hudi-jars/hudi-utilities-slim-bundle.jar
--table-type COPY_ON_WRITE
--source-ordering-field replicadmstimestamp
--source-class org.apache.hudi.utilities.sources.ParquetDFSSource
--target-base-path s3://<s3path>/<table>
--target-table table
--payload-class org.apache.hudi.common.model.AWSDmsAvroPayload
--hoodie-conf
hoodie.datasource.write.keygenerator.class=org.apache.hudi.keygen.SimpleKeyGenerator
--hoodie-conf hoodie.datasource.write.recordkey.field=_id
--hoodie-conf hoodie.datasource.write.partitionpath.field=Loc
--hoodie-conf
hoodie.deltastreamer.source.dfs.root=s3://<differents3path>/<table> `
**Stacktrace**
```23/03/23 16:46:38 ERROR HoodieCreateHandle: Error writing record
HoodieRecord{key=HoodieKey { recordKey=463607 partitionPath=SGILB},
currentLocation='null', newLocation='null'}
java.util.NoSuchElementException: No value present in Option
at org.apache.hudi.common.util.Option.get(Option.java:89)
~[__app__.jar:0.13.0]
at
org.apache.hudi.common.model.HoodieAvroRecord.prependMetaFields(HoodieAvroRecord.java:132)
~[__app__.jar:0.13.0]
at
org.apache.hudi.io.HoodieCreateHandle.doWrite(HoodieCreateHandle.java:142)
~[__app__.jar:0.13.0]
at
org.apache.hudi.io.HoodieWriteHandle.write(HoodieWriteHandle.java:175)
~[__app__.jar:0.13.0]
at
org.apache.hudi.execution.CopyOnWriteInsertHandler.consume(CopyOnWriteInsertHandler.java:98)
~[__app__.jar:0.13.0]
at
org.apache.hudi.execution.CopyOnWriteInsertHandler.consume(CopyOnWriteInsertHandler.java:42)
~[__app__.jar:0.13.0]
at
org.apache.hudi.common.util.queue.SimpleExecutor.execute(SimpleExecutor.java:67)
~[__app__.jar:0.13.0]
at
org.apache.hudi.execution.SparkLazyInsertIterable.computeNext(SparkLazyInsertIterable.java:80)
~[hudi-spark3.3-bundle_2.12-0.13.0.jar:0.13.0]
at
org.apache.hudi.execution.SparkLazyInsertIterable.computeNext(SparkLazyInsertIterable.java:39)
~[hudi-spark3.3-bundle_2.12-0.13.0.jar:0.13.0]
at
org.apache.hudi.client.utils.LazyIterableIterator.next(LazyIterableIterator.java:119)
~[__app__.jar:0.13.0]
at
scala.collection.convert.Wrappers$JIteratorWrapper.next(Wrappers.scala:46)
~[scala-library-2.12.15.jar:?]
at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:486)
~[scala-library-2.12.15.jar:?]
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:492)
~[scala-library-2.12.15.jar:?]
at
org.apache.spark.storage.memory.MemoryStore.putIterator(MemoryStore.scala:223)
~[spark-core_2.12-3.3.1-amzn-0.jar:3.3.1-amzn-0]
at
org.apache.spark.storage.memory.MemoryStore.putIteratorAsBytes(MemoryStore.scala:352)
~[spark-core_2.12-3.3.1-amzn-0.jar:3.3.1-amzn-0]
at
org.apache.spark.storage.BlockManager.$anonfun$doPutIterator$1(BlockManager.scala:1535)
~[spark-core_2.12-3.3.1-amzn-0.jar:3.3.1-amzn-0]
at
org.apache.spark.storage.BlockManager.org$apache$spark$storage$BlockManager$$doPut(BlockManager.scala:1445)
~[spark-core_2.12-3.3.1-amzn-0.jar:3.3.1-amzn-0]
at
org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1509)
~[spark-core_2.12-3.3.1-amzn-0.jar:3.3.1-amzn-0]
at
org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:1332)
~[spark-core_2.12-3.3.1-amzn-0.jar:3.3.1-amzn-0]
at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:376)
~[spark-core_2.12-3.3.1-amzn-0.jar:3.3.1-amzn-0]
at org.apache.spark.rdd.RDD.iterator(RDD.scala:327)
~[spark-core_2.12-3.3.1-amzn-0.jar:3.3.1-amzn-0]
at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
~[spark-core_2.12-3.3.1-amzn-0.jar:3.3.1-amzn-0]
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
~[spark-core_2.12-3.3.1-amzn-0.jar:3.3.1-amzn-0]
at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
~[spark-core_2.12-3.3.1-amzn-0.jar:3.3.1-amzn-0]
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
~[spark-core_2.12-3.3.1-amzn-0.jar:3.3.1-amzn-0]
at org.apache.spark.scheduler.Task.run(Task.scala:138)
~[spark-core_2.12-3.3.1-amzn-0.jar:3.3.1-amzn-0]
at
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
~[spark-core_2.12-3.3.1-amzn-0.jar:3.3.1-amzn-0]
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1516)
~[spark-core_2.12-3.3.1-amzn-0.jar:3.3.1-amzn-0]
at
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
~[spark-core_2.12-3.3.1-amzn-0.jar:3.3.1-amzn-0]
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
~[?:1.8.0_362]
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
~[?:1.8.0_362]
at java.lang.Thread.run(Thread.java:750) ~[?:1.8.0_362]```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]