Fwd: Saving RDD as Kryo (broken in 2.1)
s.DefaultArraySerializers$ ObjectArraySerializer.read(DefaultArraySerializers.java:396) at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ ObjectArraySerializer.read(DefaultArraySerializers.java:307) at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:790) at com.badoo.uds.commons.helper.KryoFile$$anonfun$objectFile$ 1.apply(KryoFile.scala:75) at com.badoo.uds.commons.helper.KryoFile$$anonfun$objectFile$ 1.apply(KryoFile.scala:62) at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434) at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440) at org.apache.spark.util.Utils$.getIteratorSize(Utils.scala:1760) at org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:1158) at org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:1158) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply( SparkContext.scala:1951) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply( SparkContext.scala:1951) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) at org.apache.spark.scheduler.Task.run(Task.scala:99) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322) at java.util.concurrent.ThreadPoolExecutor.runWorker( ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run( ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) -- *Alexander Krasheninnikov* Head of Data Team
Saving RDD as Kryo (broken in 2.1)
bjectArraySerializer.read(DefaultArraySerializers.java:396) at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:307) at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:790) at com.badoo.uds.commons.helper.KryoFile$$anonfun$objectFile$1.apply(KryoFile.scala:75) at com.badoo.uds.commons.helper.KryoFile$$anonfun$objectFile$1.apply(KryoFile.scala:62) at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434) at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440) at org.apache.spark.util.Utils$.getIteratorSize(Utils.scala:1760) at org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:1158) at org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:1158) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1951) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1951) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) at org.apache.spark.scheduler.Task.run(Task.scala:99) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) -- *Alexander Krasheninnikov* Head of Data Team
Re: JavaDStream to Dataframe: Java
Hello! While operating the JavaDStream you may use a transform() or foreach() methods, which give you an access to an RDD. JavaDStream dataFrameStream = ctx.textFileStream("source").transform(new Function2() { @Override public JavaRDD call(JavaRDD incomingRdd, Time batchTime) throws Exception { // Get an API for operating DataFrames HiveContext ctx = new HiveContext(incomingRdd.context()); // create a schema for DataFrame (declare columns) StructType schema = null; // map incoming data into RDD of DataFrame's rows JavaRDD rowsRdd = incomingRdd.map(rddMember -> new GenericRow(100)); // DataFrame creation DataFrame df = ctx.createDataFrame(rowsRdd, schema); // here you may perform some operations on df, or return it as a stream return df.toJavaRDD(); } }); On Fri, Jun 3, 2016 at 5:44 PM, Zakaria Hili wrote: > Hi, > I m newbie in spark and I want to ask you a simple question. > I have an JavaDStream which contains data selected from sql database. > something like (id, user, score ...) > and I want to convert the JavaDStream to a dataframe . > > how can I do this with java ? > Thank you > ᐧ >
Re: Profiling a spark job
If you are profiling in standalone mode, I recommend you to try with Java Mission Control. You just need to start app with these params: -XX:+UnlockCommercialFeatures -XX:+FlightRecorder -Dcom.sun.management.jmxremote=true -Dcom.sun.management.jmxremote.port=$YOUR_PORT -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false And them connect with profiling agent, and perform recording. I have strong opinion, that you have epoll wait time spent in one concrete thread, but not in all spawned threads. On Tue, Apr 5, 2016 at 1:34 PM, Dmitry Olshanskywrote: > Hi list, > > I'm curious as to what are the best practices of profiling spark apps? So > far I tried following this guide with hprof and/or yourkit but the profile > looks strange: > > https://cwiki.apache.org/confluence/display/SPARK/Profiling+Spark+Applications+Using+YourKit > > 55% of time spent in EPollWait. However I'm using standalone mode with > local master without starting separate daemon (could it be that I should?) > > --- > Dmitry Olshansky > - > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org > For additional commands, e-mail: user-h...@spark.apache.org > >
Re: problem with a very simple word count program
Collect all your rdds from single files into List, then call context.union(context.emptyRdd(), YOUR_LIST); Otherwise, on greater number of elements to union, you will get stack overflow exception. On Wed, Sep 16, 2015 at 10:17 PM, Shawn Carroll wrote: > Your loop is deciding the files to process and then you are unioning the > data on each iteration. If you change it to load all the files at the same > time and let spark sort it out you should be much faster. > > Untested: > > val rdd = sc.textFile("medline15n00*.xml") > val words = rdd.flatMap( x=> x.split(" ") ); > words.map( x=> (x,1)).reduceByKey( (x,y) => (x+y) ) > words.saveAsTextFile("results") > > > > shawn.c.carr...@gmail.com > Software Engineer > Soccer Referee > > On Wed, Sep 16, 2015 at 2:07 PM, huajun wrote: > >> Hi. >> I have a problem with this very simple word count rogram. The program >> works >> fine for >> thousands of similar files in the dataset but is very slow for these first >> 28 or so. >> The files are about 50 to 100 MB each >> and the program process other similar 28 files in about 30sec. These first >> 28 files, however, take 30min. >> This should not be a problem with the data in these files, as if I combine >> all the files into one >> bigger file, it will be processed in about 30sec. >> >> I am running spark in local mode (with > 100GB memory) and it is just use >> 100% CPU (one core) most of time (for this troubled case) and no network >> traffic is involved. >> >> Any obvious (or non-obvious) errors? >> >> def process(file : String) : RDD[(String, Int)] = { >> val rdd = sc.textFile(file) >> val words = rdd.flatMap( x=> x.split(" ") ); >> >> words.map( x=> (x,1)).reduceByKey( (x,y) => (x+y) ) >> } >> >> val file = "medline15n0001.xml" >> var keep = process(file) >> >> for (i <- 2 to 28) { >> val file = if (i < 10) "medline15n000" + i + ".xml" >> else "medline15n00" + i + ".xml" >> >> val result = process(file) >> keep = result.union(keep); >> } >> keep = keep.reduceByKey( (x,y) => (x+y) ) >> keep.saveAsTextFile("results") >> >> Thanks. >> >> >> >> >> -- >> View this message in context: >> http://apache-spark-user-list.1001560.n3.nabble.com/problem-with-a-very-simple-word-count-program-tp24715.html >> Sent from the Apache Spark User List mailing list archive at Nabble.com. >> >> - >> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org >> For additional commands, e-mail: user-h...@spark.apache.org >> >> >
Terminate streaming app on cluster restart
Hello, everyone! I have a case, when running standalone cluster: on master stop-all.sh/star-all.sh are invoked, streaming app loses all it's executors, but does not interrupt. Since it is a streaming app, expected to get it's results ASAP, an downtime is undesirable. Is there any workaround to solve that problem? Thanks a lot. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: How to set log level in spark-submit ?
I saw such example in docs: --conf spark.driver.extraJavaOptions=-Dlog4j.configuration=file://$path_to_file but, unfortunately, it does not work for me. On 30.07.2015 05:12, canan chen wrote: Yes, that should work. What I mean is is there any option in spark-submit command that I can specify for the log level On Thu, Jul 30, 2015 at 10:05 AM, Jonathan Coveney jcove...@gmail.com mailto:jcove...@gmail.com wrote: Put a log4j.properties file in conf/. You can copy log4j.properties.template as a good base El miércoles, 29 de julio de 2015, canan chen ccn...@gmail.com mailto:ccn...@gmail.com escribió: Anyone know how to set log level in spark-submit ? Thanks
Re: Count of distinct values in each column
I made such naive implementation: SparkConf conf =newSparkConf(); conf.setMaster(local[4]).setAppName(Stub); finalJavaSparkContext ctx =newJavaSparkContext(conf); JavaRDDString input = ctx.textFile(path_to_file); // explode each line into list of column values JavaRDDListString rowValues = input.map(newFunctionString, ListString() { @Override publicListString call(String v1)throwsException { returnLists.newArrayList(v1.split(;)); } }); // transform input to key(word, colNumber), value = 1 JavaPairRDDTuple2String, Integer, Integer positions = rowValues.flatMapToPair(newPairFlatMapFunctionListString, Tuple2String, Integer, Integer() { @Override publicIterableTuple2Tuple2String, Integer, Integer call(ListString strings)throwsException { ListTuple2Tuple2String, Integer, Integer retval =newArrayList(); intcolNum = -1; for(String word : strings) { Tuple2String, Integer wordPosition =newTuple2(word, ++colNum); retval.add(newTuple2(wordPosition,1)); } returnretval; } }); // summ word counts within column JavaPairRDDTuple2String, Integer, Integer summ = positions.reduceByKey(newFunction2Integer, Integer, Integer() { @Override publicInteger call(Integer v1, Integer v2)throwsException { returnv1 + v2; } }); // invert position - make columnNumber key, and word+count - key JavaPairRDDInteger, Tuple2String, Integer columnIsKey = summ.mapToPair(newPairFunctionTuple2Tuple2String, Integer, Integer, Integer, Tuple2String, Integer() { @Override publicTuple2Integer, Tuple2String, Integer call(Tuple2Tuple2String, Integer, Integer tuple2IntegerTuple2)throwsException { return newTuple2(tuple2IntegerTuple2._1()._2(),newTuple2(tuple2IntegerTuple2._1()._1(), tuple2IntegerTuple2._2())); } }); // here some optimizations to avoid groupByKey may be implemented JavaPairRDDInteger, IterableTuple2String, Integer groupped = columnIsKey.groupByKey(); // output results groupped.foreach(newVoidFunctionTuple2Integer, IterableTuple2String, Integer() { @Override public voidcall(Tuple2Integer, IterableTuple2String, Integer integerIterableTuple2)throwsException { String strValues =; IterableTuple2String, Integer values = integerIterableTuple2._2(); for(Tuple2String,Integer distinct : values) { strValues +=(+distinct._1()+,+ distinct._2() +); } System.out.println(Column: + integerIterableTuple2._1()); System.out.println(Distincts: + strValues); } }); On 29.07.2015 16:38, Devi P.V wrote: Hi All, I have a 5GB CSV dataset having 69 columns..I need to find the count of distinct values in each column. What is the optimized way to find the same using spark scala? Example CSV format : a,b,c,d a,c,b,a b,b,c,d b,b,c,a c,b,b,a Output expecting : (a,2),(b,2),(c,1) #- First column distinct count (b,4),(c,1) #- Second column distinct count (c,3),(b,2) #- Third column distinct count (d,2),(a,3) #- Fourth column distinct count Thanks in Advance
Re: Override Logging with spark-streaming
Have you tried putting this file on local disk on each of executor nodes? That worked for me. On 05.06.2015 16:56, nib...@free.fr wrote: Hello, I want to override the log4j configuration when I start my spark job. I tried : .../bin/spark-submit --class --conf spark.executor.extraJavaOptions=-Dlog4j.configuration=file:/.../log4j.properties x.jar or .../bin/spark-submit --class --conf spark.executor.extraJavaOptions=-Dlog4j.configuration=/.../log4j.properties x.jar But it doesn't work , I still have the default configuration. Any ideas ? Tks Nicolas - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: kafka + Spark Streaming with checkPointing fails to start with
I had same problem. The solution, I've found was to use: JavaStreamingContext streamingContext = JavaStreamingContext.getOrCreate('checkpoint_dir', contextFactory); ALL configuration should be performed inside contextFactory. If you try to configure streamContext after ::getOrCreate, you receive an error has not been initialized. On 13.05.2015 00:51, Ankur Chauhan wrote: Hi, I have a simple application which fails with the following exception only when the application is restarted (i.e. the checkpointDir has entires from a previous execution): Exception in thread main org.apache.spark.SparkException: org.apache.spark.streaming.dstream.ShuffledDStream@2264e43c has not been initialized at org.apache.spark.streaming.dstream.DStream.isTimeValid(DStream.scala:266) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:287) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:287) at scala.Option.orElse(Option.scala:257) at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:284) at org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:38) at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:116) at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:116) at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251) at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251) at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105) at org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:116) at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$restart$4.apply(JobGenerator.scala:227) at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$restart$4.apply(JobGenerator.scala:222) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108) at org.apache.spark.streaming.scheduler.JobGenerator.restart(JobGenerator.scala:222) at org.apache.spark.streaming.scheduler.JobGenerator.start(JobGenerator.scala:90) at org.apache.spark.streaming.scheduler.JobScheduler.start(JobScheduler.scala:67) at org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:512) at com.brightcove.analytics.tacoma.RawLogProcessor$.start(RawLogProcessor.scala:115) at com.brightcove.analytics.tacoma.Main$delayedInit$body.apply(Main.scala:15) at scala.Function0$class.apply$mcV$sp(Function0.scala:40) at scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12) at scala.App$$anonfun$main$1.apply(App.scala:71) at scala.App$$anonfun$main$1.apply(App.scala:71) at scala.collection.immutable.List.foreach(List.scala:318) at scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:32) at scala.App$class.main(App.scala:71) at com.brightcove.analytics.tacoma.Main$.main(Main.scala:5) at com.brightcove.analytics.tacoma.Main.main(Main.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:483) at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:569) at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:166) at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:189) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:110) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) The relavant source is: class RawLogProcessor(ssc: StreamingContext, topic: String, kafkaParams: Map[String, String]) { // create kafka stream val rawlogDStream = KafkaUtils.createDirectStream[String, Object, StringDecoder, KafkaAvroDecoder](ssc, kafkaParams, Set(topic)) //KafkaUtils.createStream[String, Object, StringDecoder, KafkaAvroDecoder](ssc, kafkaParams, Map(qa-rawlogs - 10), StorageLevel.MEMORY_AND_DISK_2) val eventStream = rawlogDStream .map({ case (key, rawlogVal) = val record = rawlogVal.asInstanceOf[GenericData.Record] val rlog = RawLog.newBuilder()
Streaming app with windowing and persistence
Hello, everyone. I develop stream application, working with window functions - each window create table and perform some SQL-operations on extracted data. I met such problem: when using window operations and checkpointing, application does not start next time. Here is the code: finalDuration batchDuration = Durations.seconds(10); finalDuration slideDuration = Durations.seconds(10); finalDuration windowDuration = Durations.seconds(600); finalSparkConf conf =newSparkConf(); conf.setAppName(Streaming); conf.setMaster(local[4]); JavaStreamingContextFactory contextFactory =newJavaStreamingContextFactory() { @Override publicJavaStreamingContext create() { JavaStreamingContext streamingContext =newJavaStreamingContext(conf,batchDuration); streamingContext.checkpoint(CHECKPOINT_DIR); returnstreamingContext; } }; JavaStreamingContext streamingContext = JavaStreamingContext.getOrCreate(CHECKPOINT_DIR,newConfiguration(), contextFactory,true); JavaDStreamString lines = streamingContext.textFileStream(SOURCE_DIR); lines.countByWindow(windowDuration,slideDuration).print(); streamingContext.start(); streamingContext.awaitTermination(); I expect, that after application restart, Spark will merge old event counter with new values (if it is not so, I am ready to merge old data manually). But, after application restart, I have this error: Exception in thread main org.apache.spark.SparkException: org.apache.spark.streaming.dstream.MappedDStream@49db6f23 has not been initialized at org.apache.spark.streaming.dstream.DStream.isTimeValid(DStream.scala:266) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:287) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:287) at scala.Option.orElse(Option.scala:289) at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:284) at org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:38) at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:116) at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:116) at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:252) at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:252) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:252) at scala.collection.AbstractTraversable.flatMap(Traversable.scala:104) at org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:116) at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$restart$4.apply(JobGenerator.scala:223) at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$restart$4.apply(JobGenerator.scala:218) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186) at org.apache.spark.streaming.scheduler.JobGenerator.restart(JobGenerator.scala:218) at org.apache.spark.streaming.scheduler.JobGenerator.start(JobGenerator.scala:89) at org.apache.spark.streaming.scheduler.JobScheduler.start(JobScheduler.scala:67) at org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:512) at org.apache.spark.streaming.api.java.JavaStreamingContext.start(JavaStreamingContext.scala:584) at my.package.FileAggregations.main(FileAggregations.java:76) At FileAggregations.java:76 is streamingContext.start(); Spark version is 1.3.0. --- wbr, Alexandr Krasheninnikov