umehrot2 opened a new issue #869: Hudi Spark error when spark bundle jar is 
added to spark's classpath
URL: https://github.com/apache/incubator-hudi/issues/869
 
 
   We run into runtime error while packaging Hudi to work on AWS EMR. The issue 
is reproducible on latest Spark 2.4.3, as well on previous versions like 2.3.0, 
2.2.0 and 2.1.0.
   
   ## Reproduction Steps
   
   These are the steps we followed on AWS EMR, that can help reproduce the 
issue:
   - Launch an AWS EMR 5.26.0 cluster
   - SSH into the cluster, and drop `hudi-spark-bundle` and `databricks-avro` 
jars under `/usr/lib/spark/jars/`
   - Start spark shell using `spark-shell --conf 
"spark.serializer=org.apache.spark.serializer.KryoSerializer"`
   - Run a sample code that takes in existing parquet data, and converts and 
saves it in hudi format:
   ```
   import org.apache.hudi.DataSourceWriteOptions
   import org.apache.hudi.DataSourceReadOptions
   import org.apache.hudi.config.HoodieWriteConfig
   import org.apache.hudi.HoodieDataSourceHelpers
   import org.apache.hudi.common.model.HoodieTableType
   import org.apache.spark.sql.SaveMode
   import java.util.UUID;
   
   var tableName = "xxx"
   var tablePath = "s3://<path-to-save-hudi-table>/" + tableName
   val generateUUID = udf(() => UUID.randomUUID().toString)
   val df0 = 
spark.read.format("parquet").load("s3://<existing-parquet-data-path>/")
   val df1 = df0.withColumn("record_key", generateUUID())
   df1.write.format("org.apache.hudi")
    .option(HoodieWriteConfig.TABLE_NAME, tableName)
    .option(DataSourceWriteOptions.OPERATION_OPT_KEY, 
DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL)
    .option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "record_key")
    .option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, 
"<partition_key>") 
    .option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, "<precombine_key>")
    .option(DataSourceWriteOptions.HIVE_SYNC_ENABLED_OPT_KEY, "true")
    .option(DataSourceWriteOptions.HIVE_TABLE_OPT_KEY, tableName)
    .option(DataSourceWriteOptions.HIVE_PARTITION_FIELDS_OPT_KEY, 
"<partition_key>")
    .option(DataSourceWriteOptions.HIVE_ASSUME_DATE_PARTITION_OPT_KEY, "false")
    .option(DataSourceWriteOptions.HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY, 
"org.apache.hudi.hive.MultiPartKeysValueExtractor")
    .mode(SaveMode.Overwrite)
    .save(tablePath)
   ```
   
   ## Stack Trace
   
   ```
   Driver stacktrace:
     at 
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:2041)
     at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:2029)
     at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:2028)
     at 
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
     at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
     at 
org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2028)
     at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:966)
     at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:966)
     at scala.Option.foreach(Option.scala:257)
     at 
org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:966)
     at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2262)
     at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2211)
     at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2200)
     at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
     at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:777)
     at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061)
     at org.apache.spark.SparkContext.runJob(SparkContext.scala:2082)
     at org.apache.spark.SparkContext.runJob(SparkContext.scala:2101)
     at org.apache.spark.rdd.RDD$$anonfun$take$1.apply(RDD.scala:1364)
     at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
     at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
     at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
     at org.apache.spark.rdd.RDD.take(RDD.scala:1337)
     at org.apache.spark.rdd.RDD$$anonfun$isEmpty$1.apply$mcZ$sp(RDD.scala:1472)
     at org.apache.spark.rdd.RDD$$anonfun$isEmpty$1.apply(RDD.scala:1472)
     at org.apache.spark.rdd.RDD$$anonfun$isEmpty$1.apply(RDD.scala:1472)
     at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
     at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
     at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
     at org.apache.spark.rdd.RDD.isEmpty(RDD.scala:1471)
     at 
org.apache.spark.api.java.JavaRDDLike$class.isEmpty(JavaRDDLike.scala:544)
     at 
org.apache.spark.api.java.AbstractJavaRDDLike.isEmpty(JavaRDDLike.scala:45)
     at 
org.apache.hudi.HoodieSparkSqlWriter$.write(HoodieSparkSqlWriter.scala:136)
     at org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:91)
     at 
org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:45)
     at 
org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
     at 
org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
     at 
org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:86)
     at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
     at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
     at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:156)
     at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
     at 
org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
     at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
     at 
org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:80)
     at 
org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:80)
     at 
org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676)
     at 
org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676)
     at 
org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78)
     at 
org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
     at 
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
     at 
org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:676)
     at 
org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:285)
     at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:271)
     at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:229)
     ... 49 elided
   Caused by: java.lang.ClassCastException: cannot assign instance of 
scala.collection.immutable.List$SerializationProxy to field 
org.apache.spark.rdd.RDD.org$apache$spark$rdd$RDD$$dependencies_ of type 
scala.collection.Seq in instance of org.apache.spark.rdd.MapPartitionsRDD
     at 
java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2287)
     at java.io.ObjectStreamClass.setObjFieldValues(ObjectStreamClass.java:1417)
     at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2292)
     at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2210)
     at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2068)
     at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1572)
     at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2286)
     at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2210)
     at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2068)
     at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1572)
     at java.io.ObjectInputStream.readObject(ObjectInputStream.java:430)
     at 
scala.collection.immutable.List$SerializationProxy.readObject(List.scala:490)
     at sun.reflect.GeneratedMethodAccessor19.invoke(Unknown Source)
     at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
     at java.lang.reflect.Method.invoke(Method.java:498)
     at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1170)
     at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2177)
     at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2068)
     at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1572)
     at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2286)
     at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2210)
     at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2068)
     at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1572)
     at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2286)
     at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2210)
     at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2068)
     at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1572)
     at java.io.ObjectInputStream.readObject(ObjectInputStream.java:430)
     at 
org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:75)
     at 
org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:114)
     at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:83)
     at org.apache.spark.scheduler.Task.run(Task.scala:121)
     at 
org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
     at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
     at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
     at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
     at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
     at java.lang.Thread.run(Thread.java:748)
   ```
   
   ## Findings on EMRs side
   
   - In EMR right now we are able to make Hudi work with Spark using one of the 
two ways only:
      - Pass `hudi-spark-bundle` and `databricks-avro` jars using `--jars` 
option while starting the spark-shell: `spark-shell --conf 
"spark.serializer=org.apache.spark.serializer.KryoSerializer" --jars 
s3://<path-to-jar>/hudi-spark-bundle-0.5.0-SNAPSHOT.jar`
      - Configure `spark.jars` property in to the jars 
`/../hudi-spark-bundle-0.5.0-SNAPSHOT.jar,/../spark-avro_2.11-4.0.0.jar`
      - However, it does not work if we add these jars to spark's classpath as 
we have mentioned above. And this issue happens no matter, which spark or scala 
version is used.
   - We tried shading `databricks-avro` inside the bundle jar, and also 
matching scala versions but it still does not work.
   - Several threads exist online where people have run into this error like 
https://stackoverflow.com/questions/39953245/how-to-fix-java-lang-classcastexception-cannot-assign-instance-of-scala-collect/39953805#39953805
 and it appears to be a class loading issue thats happening when `HoodieRecord` 
is converted into `JavaRDD`.
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to