[ https://issues.apache.org/jira/browse/CRUNCH-560?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14904633#comment-14904633 ]
Nithin Asokan commented on CRUNCH-560: -------------------------------------- If I understood correctly, we need to have one hadoop configuration and it should include the proper classloader. First attempt: Copy {{sparkHadoopConfiguration}} to crunch pipeline {{conf}} and use {{conf}} on SparkRuntime. Results of this approach are ClassNotFoundException mentioned in jira description. {code} Configuration sparkHadoopConfiguration = sparkContext.hadoopConfiguration(); copyConfiguration(sparkHadoopConfiguration, conf); setConfiguration(conf); SparkRuntime runtime = new SparkRuntime(this, sparkContext, conf, outputTargets, toMaterialize, cachedCollections, allPipelineCallables); {code} Second attempt: Copy crunch pipeline {{conf}} to {{sparkHadoopConfiguration}} and use {{sparkHadoopConfiguration}} on SparkRuntime. Pipeline successful on {{yarn-cluster}} mode from oozie spark action. {code} Configuration sparkHadoopConfiguration = sparkContext.hadoopConfiguration(); copyConfiguration(conf, sparkHadoopConfiguration); setConfiguration(sparkHadoopConfiguration); SparkRuntime runtime = new SparkRuntime(this, sparkContext, sparkHadoopConfiguration, outputTargets, toMaterialize, cachedCollections, allPipelineCallables); {code} Please suggest me if I'm doing anything different than what is expected. > SparkPipeline should honor Spark Hadoop configuration > ----------------------------------------------------- > > Key: CRUNCH-560 > URL: https://issues.apache.org/jira/browse/CRUNCH-560 > Project: Crunch > Issue Type: Bug > Components: Spark > Reporter: Nithin Asokan > Attachments: CRUNCH-560-001.patch > > > Executing a SparkPipeline using {{SparkPipeline(String sparkConnect, String > appName, Class<?> jarClass, Configuration conf)}} constructor and > {{yarn-cluster}} mode via Oozie Spark action causes a ClassNotFoundException > during job creation. The problem appears to be Spark not being able to read > Crunch InputFormats from Hadoop configuration. > {code} > 15/09/18 00:06:39 WARN scheduler.DAGScheduler: Creating new stage failed due > to exception - job: 0 > java.lang.RuntimeException: readObject can't find class > at org.apache.crunch.io.FormatBundle.readClass(FormatBundle.java:158) > at org.apache.crunch.io.FormatBundle.readFields(FormatBundle.java:133) > at > org.apache.crunch.io.FormatBundle.fromSerialized(FormatBundle.java:62) > at > org.apache.crunch.io.CrunchInputs.getFormatNodeMap(CrunchInputs.java:79) > at > org.apache.crunch.impl.mr.run.CrunchInputFormat.getSplits(CrunchInputFormat.java:45) > at > org.apache.spark.rdd.NewHadoopRDD.getPartitions(NewHadoopRDD.scala:95) > at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219) > at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217) > at scala.Option.getOrElse(Option.scala:120) > at org.apache.spark.rdd.RDD.partitions(RDD.scala:217) > at > org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:32) > at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219) > at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217) > at scala.Option.getOrElse(Option.scala:120) > at org.apache.spark.rdd.RDD.partitions(RDD.scala:217) > at > org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:32) > at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219) > at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217) > at scala.Option.getOrElse(Option.scala:120) > at org.apache.spark.rdd.RDD.partitions(RDD.scala:217) > at > org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:32) > at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219) > at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217) > at scala.Option.getOrElse(Option.scala:120) > at org.apache.spark.rdd.RDD.partitions(RDD.scala:217) > at > org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:32) > at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219) > at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217) > at scala.Option.getOrElse(Option.scala:120) > at org.apache.spark.rdd.RDD.partitions(RDD.scala:217) > at > org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:32) > at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219) > at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217) > at scala.Option.getOrElse(Option.scala:120) > at org.apache.spark.rdd.RDD.partitions(RDD.scala:217) > at > org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:32) > at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219) > at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217) > at scala.Option.getOrElse(Option.scala:120) > at org.apache.spark.rdd.RDD.partitions(RDD.scala:217) > at > org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:32) > at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219) > at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217) > at scala.Option.getOrElse(Option.scala:120) > at org.apache.spark.rdd.RDD.partitions(RDD.scala:217) > at > org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:32) > at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219) > at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217) > at scala.Option.getOrElse(Option.scala:120) > at org.apache.spark.rdd.RDD.partitions(RDD.scala:217) > at org.apache.spark.rdd.UnionRDD$$anonfun$1.apply(UnionRDD.scala:66) > at org.apache.spark.rdd.UnionRDD$$anonfun$1.apply(UnionRDD.scala:66) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) > at scala.collection.immutable.List.foreach(List.scala:318) > at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) > at scala.collection.AbstractTraversable.map(Traversable.scala:105) > at org.apache.spark.rdd.UnionRDD.getPartitions(UnionRDD.scala:66) > at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219) > at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217) > at scala.Option.getOrElse(Option.scala:120) > at org.apache.spark.rdd.RDD.partitions(RDD.scala:217) > at > org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:32) > at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219) > at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217) > at scala.Option.getOrElse(Option.scala:120) > at org.apache.spark.rdd.RDD.partitions(RDD.scala:217) > at > org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:32) > at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219) > at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217) > at scala.Option.getOrElse(Option.scala:120) > at org.apache.spark.rdd.RDD.partitions(RDD.scala:217) > at org.apache.spark.ShuffleDependency.<init>(Dependency.scala:82) > at > org.apache.spark.rdd.ShuffledRDD.getDependencies(ShuffledRDD.scala:80) > at org.apache.spark.rdd.RDD$$anonfun$dependencies$2.apply(RDD.scala:206) > at org.apache.spark.rdd.RDD$$anonfun$dependencies$2.apply(RDD.scala:204) > at scala.Option.getOrElse(Option.scala:120) > at org.apache.spark.rdd.RDD.dependencies(RDD.scala:204) > at > org.apache.spark.scheduler.DAGScheduler.visit$1(DAGScheduler.scala:298) > at > org.apache.spark.scheduler.DAGScheduler.getParentStages(DAGScheduler.scala:310) > at > org.apache.spark.scheduler.DAGScheduler.newStage(DAGScheduler.scala:244) > at > org.apache.spark.scheduler.DAGScheduler.handleJobSubmitted(DAGScheduler.scala:731) > at > org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1362) > at > org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1354) > at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) > Caused by: java.lang.ClassNotFoundException: Class > org.apache.crunch.types.avro.AvroInputFormat not found > at > org.apache.hadoop.conf.Configuration.getClassByName(Configuration.java:2018) > at org.apache.crunch.io.FormatBundle.readClass(FormatBundle.java:156) > ... 84 more > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)