Hans-Raintree opened a new issue, #8278:
URL: https://github.com/apache/hudi/issues/8278

   **Describe the problem you faced**
   
   Deltastreamer ingest fails with AWSDmsAvroPayload, works without with 
identical configuration.
   
   **To Reproduce**
   
   Steps to reproduce the behavior:
   
   1. Enable --payload-class org.apache.hudi.common.model.AWSDmsAvroPayload
   2. Run deltastreamer in a AWS EMR cluster.
   
   **Expected behavior**
   
   Deltastreamer creates hudi dataset.
   
   **Environment Description**
   
   * Hudi version : 0.13.0
   
   * Spark version : 3.3.1
   
   * Hive version : 3.1.3
   
   * Hadoop version : Amazon 3.3.3
   
   * Storage (HDFS/S3/GCS..) : S3
   
   * Running on Docker? (yes/no) : no
   
   
   **Additional context**
   
   Bootstrapped 0.13.0 jars:
    hudi-spark3.3-bundle_2.12-0.13.0.jar 
    hudi-utilities-slim-bundle_2.12-0.13.0.jar
    hudi-aws-bundle-0.13.0.jar
   
   `spark-submit 
   --master yarn 
   --jars 
/mnt1/hudi-jars/hudi-spark-bundle.jar,/mnt1/hudi-jars/hudi-utilities-slim-bundle.jar,/mnt1/hudi-jars/hudi-aws-bundle.jar
 
   --deploy-mode cluster 
   --conf spark.serializer=org.apache.spark.serializer.KryoSerializer 
   --class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer 
/mnt1/hudi-jars/hudi-utilities-slim-bundle.jar 
   --table-type COPY_ON_WRITE 
   --source-ordering-field replicadmstimestamp 
   --source-class org.apache.hudi.utilities.sources.ParquetDFSSource 
   --target-base-path s3://<s3path>/<table> 
   --target-table table 
   --payload-class org.apache.hudi.common.model.AWSDmsAvroPayload 
   --hoodie-conf 
hoodie.datasource.write.keygenerator.class=org.apache.hudi.keygen.SimpleKeyGenerator
 
   --hoodie-conf hoodie.datasource.write.recordkey.field=_id 
   --hoodie-conf hoodie.datasource.write.partitionpath.field=Loc 
   --hoodie-conf 
hoodie.deltastreamer.source.dfs.root=s3://<differents3path>/<table> `
   
   **Stacktrace**
   
   ```23/03/23 16:46:38 ERROR HoodieCreateHandle: Error writing record 
HoodieRecord{key=HoodieKey { recordKey=463607 partitionPath=SGILB}, 
currentLocation='null', newLocation='null'}
   java.util.NoSuchElementException: No value present in Option
        at org.apache.hudi.common.util.Option.get(Option.java:89) 
~[__app__.jar:0.13.0]
        at 
org.apache.hudi.common.model.HoodieAvroRecord.prependMetaFields(HoodieAvroRecord.java:132)
 ~[__app__.jar:0.13.0]
        at 
org.apache.hudi.io.HoodieCreateHandle.doWrite(HoodieCreateHandle.java:142) 
~[__app__.jar:0.13.0]
        at 
org.apache.hudi.io.HoodieWriteHandle.write(HoodieWriteHandle.java:175) 
~[__app__.jar:0.13.0]
        at 
org.apache.hudi.execution.CopyOnWriteInsertHandler.consume(CopyOnWriteInsertHandler.java:98)
 ~[__app__.jar:0.13.0]
        at 
org.apache.hudi.execution.CopyOnWriteInsertHandler.consume(CopyOnWriteInsertHandler.java:42)
 ~[__app__.jar:0.13.0]
        at 
org.apache.hudi.common.util.queue.SimpleExecutor.execute(SimpleExecutor.java:67)
 ~[__app__.jar:0.13.0]
        at 
org.apache.hudi.execution.SparkLazyInsertIterable.computeNext(SparkLazyInsertIterable.java:80)
 ~[hudi-spark3.3-bundle_2.12-0.13.0.jar:0.13.0]
        at 
org.apache.hudi.execution.SparkLazyInsertIterable.computeNext(SparkLazyInsertIterable.java:39)
 ~[hudi-spark3.3-bundle_2.12-0.13.0.jar:0.13.0]
        at 
org.apache.hudi.client.utils.LazyIterableIterator.next(LazyIterableIterator.java:119)
 ~[__app__.jar:0.13.0]
        at 
scala.collection.convert.Wrappers$JIteratorWrapper.next(Wrappers.scala:46) 
~[scala-library-2.12.15.jar:?]
        at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:486) 
~[scala-library-2.12.15.jar:?]
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:492) 
~[scala-library-2.12.15.jar:?]
        at 
org.apache.spark.storage.memory.MemoryStore.putIterator(MemoryStore.scala:223) 
~[spark-core_2.12-3.3.1-amzn-0.jar:3.3.1-amzn-0]
        at 
org.apache.spark.storage.memory.MemoryStore.putIteratorAsBytes(MemoryStore.scala:352)
 ~[spark-core_2.12-3.3.1-amzn-0.jar:3.3.1-amzn-0]
        at 
org.apache.spark.storage.BlockManager.$anonfun$doPutIterator$1(BlockManager.scala:1535)
 ~[spark-core_2.12-3.3.1-amzn-0.jar:3.3.1-amzn-0]
        at 
org.apache.spark.storage.BlockManager.org$apache$spark$storage$BlockManager$$doPut(BlockManager.scala:1445)
 ~[spark-core_2.12-3.3.1-amzn-0.jar:3.3.1-amzn-0]
        at 
org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1509) 
~[spark-core_2.12-3.3.1-amzn-0.jar:3.3.1-amzn-0]
        at 
org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:1332) 
~[spark-core_2.12-3.3.1-amzn-0.jar:3.3.1-amzn-0]
        at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:376) 
~[spark-core_2.12-3.3.1-amzn-0.jar:3.3.1-amzn-0]
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:327) 
~[spark-core_2.12-3.3.1-amzn-0.jar:3.3.1-amzn-0]
        at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) 
~[spark-core_2.12-3.3.1-amzn-0.jar:3.3.1-amzn-0]
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365) 
~[spark-core_2.12-3.3.1-amzn-0.jar:3.3.1-amzn-0]
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:329) 
~[spark-core_2.12-3.3.1-amzn-0.jar:3.3.1-amzn-0]
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) 
~[spark-core_2.12-3.3.1-amzn-0.jar:3.3.1-amzn-0]
        at org.apache.spark.scheduler.Task.run(Task.scala:138) 
~[spark-core_2.12-3.3.1-amzn-0.jar:3.3.1-amzn-0]
        at 
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
 ~[spark-core_2.12-3.3.1-amzn-0.jar:3.3.1-amzn-0]
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1516) 
~[spark-core_2.12-3.3.1-amzn-0.jar:3.3.1-amzn-0]
        at 
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551) 
~[spark-core_2.12-3.3.1-amzn-0.jar:3.3.1-amzn-0]
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 
~[?:1.8.0_362]
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) 
~[?:1.8.0_362]
        at java.lang.Thread.run(Thread.java:750) ~[?:1.8.0_362]```
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to