+spark user mailing list

Hi there,

I have exactly the same problem as mentioned below. My current work around
is to add the jar containing my UDP in one of the system classpath (for
example, put it under the same path as
/opt/cloudera/parcels/CDH-5.4.2-1.cdh5.4.2.p0.2/jars/parquet-hadoop-bundle-1.5.0-cdh5.4.2.jar)
listed
in "Classpath Entries" of spark executors.

Obviously, the downside is that you have to put the jar locally to every
node of the cluster and it's hard to maintain when the cluster's setup got
updated.

I'd like to hear if anyone has a better solution for this. Thanks a lot!


>
>
> ---------- Forwarded message ----------
> From: Vladimir Vladimirov <smartk...@gmail.com>
> To: d...@spark.apache.org
> Cc:
> Date: Mon, 19 Oct 2015 19:38:07 -0400
> Subject: Problem using User Defined Predicate pushdown with core RDD and
> parquet - UDP class not found
> Hi all
>
> I feel like this questions is more Spark dev related that Spark user
> related. Please correct me if I'm wrong.
>
> My project's data flow involves sampling records from the data stored as
> Parquet dataset.
> I've checked DataFrames API and it doesn't support user defined predicates
> projection pushdown - only simple filter expressions.
> I want to use custom filter function predicate pushdown feature of parquet
> while loading data with newAPIHadoopFile.
> Simple filters constructed with org.apache.parquet.filter2 API works fine.
> But User Defined Predicate works only with `--master local` mode.
>
> When I try to run in yarn-client mode my test program that uses UDP class
> to be used by parquet-mr I'm getting class not found exception.
>
> I suspect that the issue could be related to the way how class loader
> works from parquet or maybe it could be related to the fact that Spark
> executor processes has my jar loaded from HTTP server and there is some
> security policies (classpath shows that the jar URI is actually HTTP URL
> and not local file).
>
> I've tried to create uber jar with all dependencies and shipt it with the
> spark app - no success.
>
> PS I'm using spark 1.5.1.
>
> Here is my command line I'm using to submit the application:
>
> SPARK_CLASSPATH=./lib/my-jar-with-dependencies.jar spark-submit \
>     --master yarn-client
>     --num-executors 3 --driver-memory 3G --executor-memory 2G \
>     --executor-cores 1 \
>     --jars
> ./lib/my-jar-with-dependencies.jar,./lib/snappy-java-1.1.2.jar,./lib/parquet-hadoop-1.7.0.jar,./lib/parquet-avro-1.7.0.jar,./lib/parquet-column-1.7.0.jar,/opt/cloudera/parcels/CDH/jars/avro-1.7.6-cdh5.4.0.jar,/opt/cloudera/parcels/CDH/jars/avro-mapred-1.7.6-cdh5.4.0-hadoop2.jar,
> \
>     --class my.app.parquet.filters.tools.TestSparkApp \
>     ./lib/my-jar-with-dependencies.jar \
>     yarn-client \
>     "/user/vvlad/2015/*/*/*/EVENTS"
>
> Here is the code of my UDP class:
>
> package my.app.parquet.filters.udp
>
> import org.apache.parquet.filter2.predicate.Statistics
> import org.apache.parquet.filter2.predicate.UserDefinedPredicate
>
>
> import java.lang.{Integer => JInt}
>
> import scala.util.Random
>
> class SampleIntColumn(threshold: Double) extends
> UserDefinedPredicate[JInt] with Serializable {
>   lazy val random = { new Random() }
>   val myThreshold = threshold
>   override def keep(value: JInt): Boolean = {
>     random.nextFloat() < myThreshold
>   }
>
>   override def canDrop(statistics: Statistics[JInt]): Boolean = false
>
>   override def inverseCanDrop(statistics: Statistics[JInt]): Boolean =
> false
>
>   override def toString: String = {
>     "%s(%f)".format(getClass.getName, myThreshold)
>   }
> }
>
> Spark app:
>
> package my.app.parquet.filters.tools
>
> import my.app.parquet.filters.udp.SampleIntColumn
> import org.apache.avro.generic.GenericRecord
> import org.apache.hadoop.mapreduce.Job
> import org.apache.parquet.avro.AvroReadSupport
> import org.apache.parquet.filter2.dsl.Dsl.IntColumn
> import org.apache.parquet.hadoop.ParquetInputFormat
> import org.apache.spark.{SparkContext, SparkConf}
>
> import org.apache.parquet.filter2.dsl.Dsl._
> import org.apache.parquet.filter2.predicate.FilterPredicate
>
>
> object TestSparkApp {
>   def main (args: Array[String]) {
>     val conf = new SparkConf()
>       //"local[2]" or yarn-client etc
>       .setMaster(args(0))
>       .setAppName("Spark Scala App")
>       .set("spark.executor.memory", "1g")
>       .set("spark.rdd.compress", "true")
>       .set("spark.storage.memoryFraction", "1")
>
>     val sc = new SparkContext(conf)
>
>     val job = new Job(sc.hadoopConfiguration)
>     ParquetInputFormat.setReadSupportClass(job,
> classOf[AvroReadSupport[GenericRecord]])
>
>     val sampler = new SampleIntColumn(0.05)
>     val impField = IntColumn("impression")
>
>     val pred: FilterPredicate = impField.filterBy(sampler)
>
>     ParquetInputFormat.setFilterPredicate(job.getConfiguration, pred)
>
>
> println(job.getConfiguration.get("parquet.private.read.filter.predicate"))
>
> println(job.getConfiguration.get("parquet.private.read.filter.predicate.human.readable"))
>
>     val records1 = sc.newAPIHadoopFile(
>     //<path to parquet>
>       args(1),
>       classOf[ParquetInputFormat[GenericRecord]],
>       classOf[Void],
>       classOf[GenericRecord],
>       job.getConfiguration
>     ).map(_._2).cache()
>
>     println("result count " + records1.count().toString)
>
>     sc.stop()
>   }
> }
>
>
>
> Here are logs with exception I'm getting:
>
>
> 15/10/19 11:14:43 INFO TaskSetManager: Starting task 21.0 in stage 0.0
> (TID 0, hdp010........, NODE_LOCAL, 2815 bytes)
> 15/10/19 11:14:43 INFO TaskSetManager: Starting task 14.0 in stage 0.0
> (TID 1, hdp042........, NODE_LOCAL, 2816 bytes)
> 15/10/19 11:14:43 INFO YarnClientSchedulerBackend: Registered executor:
> AkkaRpcEndpointRef(Actor[akka.tcp://sparkExecutor@hdp027........:43593/user/Executor#-832887318])
> with ID 3
> 15/10/19 11:14:43 INFO TaskSetManager: Starting task 3.0 in stage 0.0 (TID
> 2, hdp027........, NODE_LOCAL, 2814 bytes)
> 15/10/19 11:14:44 INFO BlockManagerMasterEndpoint: Registering block
> manager hdp027........:64266 with 883.8 MB RAM, BlockManagerId(3,
> hdp027........, 64266)
> 15/10/19 11:14:44 INFO BlockManagerInfo: Added broadcast_1_piece0 in
> memory on hdp010........:23967 (size: 1516.0 B, free: 883.8 MB)
> 15/10/19 11:14:44 INFO BlockManagerInfo: Added broadcast_1_piece0 in
> memory on hdp042........:63034 (size: 1516.0 B, free: 883.8 MB)
> 15/10/19 11:14:45 INFO BlockManagerInfo: Added broadcast_0_piece0 in
> memory on hdp010........:23967 (size: 25.1 KB, free: 883.8 MB)
> 15/10/19 11:14:45 INFO TaskSetManager: Starting task 48.0 in stage 0.0
> (TID 3, hdp010........, NODE_LOCAL, 2816 bytes)
> 15/10/19 11:14:45 WARN TaskSetManager: Lost task 21.0 in stage 0.0 (TID 0,
> hdp010........): java.lang.RuntimeException: java.io.IOException: Could not
> read object from config with key parquet.private.read.filter.predicate
> at
> org.apache.parquet.hadoop.ParquetInputFormat.getFilterPredicate(ParquetInputFormat.java:196)
> at
> org.apache.parquet.hadoop.ParquetInputFormat.getFilter(ParquetInputFormat.java:205)
> at
> org.apache.parquet.hadoop.ParquetInputFormat.createRecordReader(ParquetInputFormat.java:241)
> at org.apache.spark.rdd.NewHadoopRDD$$anon$1.<init>(NewHadoopRDD.scala:151)
> at org.apache.spark.rdd.NewHadoopRDD.compute(NewHadoopRDD.scala:124)
> at org.apache.spark.rdd.NewHadoopRDD.compute(NewHadoopRDD.scala:65)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
> at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
> at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:69)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:262)
> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
> at org.apache.spark.scheduler.Task.run(Task.scala:88)
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: java.io.IOException: Could not read object from config with key
> parquet.private.read.filter.predicate
> at
> org.apache.parquet.hadoop.util.SerializationUtil.readObjectFromConfAsBase64(SerializationUtil.java:102)
> at
> org.apache.parquet.hadoop.ParquetInputFormat.getFilterPredicate(ParquetInputFormat.java:194)
> ... 17 more
> Caused by: java.lang.ClassNotFoundException:
> my.app.parquet.filters.udp.SampleIntColumn
> at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
> at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
> at java.security.AccessController.doPrivileged(Native Method)
> at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
> at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
> at java.lang.Class.forName0(Native Method)
> at java.lang.Class.forName(Class.java:274)
> at java.io.ObjectInputStream.resolveClass(ObjectInputStream.java:625)
> at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1612)
> at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1517)
> at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
> at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
> at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
> at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
> at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
> at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
> at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
> at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
> at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
> at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
> at
> org.apache.parquet.hadoop.util.SerializationUtil.readObjectFromConfAsBase64(SerializationUtil.java:100)
> ... 18 more
>
>
> Best Regards
> Vladimir Vladimirov
>
>


-- 
ChuChao

Reply via email to