soumilshah1995 opened a new issue, #10140:
URL: https://github.com/apache/hudi/issues/10140
Subject : Seeking Guidance on Running Hudi Delta Streamer Locally with Spark
3.4
Hey everyone,
I'm currently trying to run Hudi Delta Streamer locally using Spark 3.4,
Java 11, and Hadoop 3.7. I've successfully created Hudi tables locally using
PySpark, but I'm encountering challenges with running Delta Streamer. I'd
appreciate your expertise in identifying any potential issues. Here are the
steps I've taken:
Step 1: Downloaded sample Parquet files from Google Drive.
Step 2: Downloaded JAR files from Maven Repository.
However, I'm uncertain about two things:
Does the downloaded JAR file contain Hudi Delta Streamer classes? If yes, is
it compatible with Spark 3.4?
If the provided Maven links are incorrect, where can I find the correct JAR
files to use with Delta Streamer locally?
Here is the command I use to submit jobs:
```
spark-submit \
--class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer \
--properties-file spark-config.properties \
--master 'local[*]' \
--executor-memory 1g \
jar/hudi-utilities-bundle_2.12-0.14.0.jar \
--table-type COPY_ON_WRITE \
--op UPSERT \
--enable-sync \
--source-ordering-field replicadmstimestamp \
--source-class org.apache.hudi.utilities.sources.ParquetDFSSource \
--target-base-path file:///Users/soumilnitinshah/Downloads/hudidb/ \
--target-table invoice \
--payload-class org.apache.hudi.common.model.AWSDmsAvroPayload \
--hoodie-conf
hoodie.datasource.write.keygenerator.class=org.apache.hudi.keygen.SimpleKeyGenerator
\
--hoodie-conf hoodie.datasource.write.recordkey.field=invoiceid \
--hoodie-conf hoodie.datasource.write.partitionpath.field=destinationstate
\
--hoodie-conf
hoodie.streamer.source.dfs.root=file:///Users/soumilnitinshah/Downloads/sampledata
\
--hoodie-conf hoodie.datasource.write.precombine.field=replicadmstimestamp
```
Additionally, here is the content of spark-config.properties:
```
spark.serializer=org.apache.spark.serializer.KryoSerializer
spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension
spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog
spark.sql.hive.convertMetastoreParquet=false
spark.executor.memory=1g
```
Error Message
```
Exception in thread "main" org.apache.hudi.exception.HoodieException: Unable
to load class
at
org.apache.hudi.common.util.ReflectionUtils.getClass(ReflectionUtils.java:58)
at
org.apache.hudi.common.util.ReflectionUtils.loadClass(ReflectionUtils.java:68)
at
org.apache.spark.sql.hudi.analysis.HoodieAnalysis$.customOptimizerRules(HoodieAnalysis.scala:166)
at
org.apache.spark.sql.hudi.HoodieSparkSessionExtension.apply(HoodieSparkSessionExtension.scala:43)
at
org.apache.spark.sql.hudi.HoodieSparkSessionExtension.apply(HoodieSparkSessionExtension.scala:28)
at
org.apache.spark.sql.SparkSession$.$anonfun$applyExtensions$1(SparkSession.scala:1297)
at
org.apache.spark.sql.SparkSession$.$anonfun$applyExtensions$1$adapted(SparkSession.scala:1292)
at
scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
at
scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
at
org.apache.spark.sql.SparkSession$.org$apache$spark$sql$SparkSession$$applyExtensions(SparkSession.scala:1292)
at
org.apache.spark.sql.SparkSession$Builder.getOrCreate(SparkSession.scala:1033)
at org.apache.spark.sql.SQLContext$.getOrCreate(SQLContext.scala:1023)
at org.apache.spark.sql.SQLContext.getOrCreate(SQLContext.scala)
at
org.apache.hudi.client.common.HoodieSparkEngineContext.<init>(HoodieSparkEngineContext.java:72)
at
org.apache.hudi.utilities.streamer.HoodieStreamer.<init>(HoodieStreamer.java:157)
at
org.apache.hudi.utilities.streamer.HoodieStreamer.<init>(HoodieStreamer.java:131)
at
org.apache.hudi.utilities.streamer.HoodieStreamer.main(HoodieStreamer.java:584)
at
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at
org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
at
org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:1020)
at
org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:192)
at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:215)
at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:91)
at
org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1111)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1120)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.lang.ClassNotFoundException:
org.apache.spark.sql.execution.datasources.Spark34NestedSchemaPruning
at java.base/java.net.URLClassLoader.findClass(URLClassLoader.java:471)
at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:589)
at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:522)
at java.base/java.lang.Class.forName0(Native Method)
at java.base/java.lang.Class.forName(Class.java:315)
at
org.apache.hudi.common.util.ReflectionUtils.getClass(ReflectionUtils.java:55)
... 29 more
```
My goal is to perform these tasks locally for learning and experimenting
purposes, as spinning up an EMR cluster can be costly. If you notice any
mistakes in my approach, I'm eager to learn from the experts here. Your
guidance is highly appreciated!
Thanks in advance for your help.
Best regards
Shah
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]