Fwd: Saving RDD as Kryo (broken in 2.1)

2017-06-26 Thread Alexander Krasheninnikov
s.DefaultArraySerializers$
ObjectArraySerializer.read(DefaultArraySerializers.java:396)
at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$
ObjectArraySerializer.read(DefaultArraySerializers.java:307)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:790)
at com.badoo.uds.commons.helper.KryoFile$$anonfun$objectFile$
1.apply(KryoFile.scala:75)
at com.badoo.uds.commons.helper.KryoFile$$anonfun$objectFile$
1.apply(KryoFile.scala:62)
at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
at org.apache.spark.util.Utils$.getIteratorSize(Utils.scala:1760)
at org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:1158)
at org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:1158)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(
SparkContext.scala:1951)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(
SparkContext.scala:1951)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:99)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322)
at java.util.concurrent.ThreadPoolExecutor.runWorker(
ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(
ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)

--
*Alexander Krasheninnikov*
Head of Data Team


Saving RDD as Kryo (broken in 2.1)

2017-06-21 Thread Alexander Krasheninnikov
bjectArraySerializer.read(DefaultArraySerializers.java:396)
at
com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:307)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:790)
at
com.badoo.uds.commons.helper.KryoFile$$anonfun$objectFile$1.apply(KryoFile.scala:75)
at
com.badoo.uds.commons.helper.KryoFile$$anonfun$objectFile$1.apply(KryoFile.scala:62)
at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
at org.apache.spark.util.Utils$.getIteratorSize(Utils.scala:1760)
at org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:1158)
at org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:1158)
at
org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1951)
at
org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1951)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:99)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)

--
*Alexander Krasheninnikov*
Head of Data Team


Re: JavaDStream to Dataframe: Java

2016-06-10 Thread Alexander Krasheninnikov
Hello!
While operating the JavaDStream you may use a transform() or foreach()
methods, which give you an access to an RDD.

JavaDStream dataFrameStream =
ctx.textFileStream("source").transform(new Function2() {
@Override
public JavaRDD call(JavaRDD incomingRdd, Time
batchTime) throws Exception {
// Get an API for operating DataFrames
HiveContext ctx = new HiveContext(incomingRdd.context());
// create a schema for DataFrame (declare columns)
StructType schema = null;
// map incoming data into RDD of DataFrame's rows
JavaRDD rowsRdd = incomingRdd.map(rddMember -> new
GenericRow(100));
// DataFrame creation
DataFrame df = ctx.createDataFrame(rowsRdd, schema);

// here you may perform some operations on df, or return it as a stream

return df.toJavaRDD();
}
});



On Fri, Jun 3, 2016 at 5:44 PM, Zakaria Hili  wrote:

> Hi,
> I m newbie in spark and I want to ask you a simple question.
> I have an JavaDStream which contains data selected from sql database.
> something like (id, user, score ...)
> and I want to convert the JavaDStream to a dataframe .
>
> how can I do this with java ?
> Thank you
> ᐧ
>


Re: Profiling a spark job

2016-04-11 Thread Alexander Krasheninnikov
If you are profiling in standalone mode, I recommend you to try with Java
Mission Control.
You just need to start app with these params:

-XX:+UnlockCommercialFeatures -XX:+FlightRecorder
-Dcom.sun.management.jmxremote=true
-Dcom.sun.management.jmxremote.port=$YOUR_PORT
-Dcom.sun.management.jmxremote.authenticate=false
-Dcom.sun.management.jmxremote.ssl=false



And them connect with profiling agent, and perform recording. I have strong
opinion, that you have epoll wait time spent in one concrete thread, but
not in all spawned threads.

On Tue, Apr 5, 2016 at 1:34 PM, Dmitry Olshansky 
wrote:

> Hi list,
>
> I'm curious as to what are the best practices of profiling spark apps? So
> far I tried following this guide with hprof and/or yourkit but the profile
> looks strange:
>
> https://cwiki.apache.org/confluence/display/SPARK/Profiling+Spark+Applications+Using+YourKit
>
>  55% of time spent in EPollWait. However I'm using standalone mode with
> local master without starting separate daemon (could it be that I should?)
>
> ---
> Dmitry Olshansky
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: problem with a very simple word count program

2015-09-16 Thread Alexander Krasheninnikov
Collect all your rdds from single files into List, then call
context.union(context.emptyRdd(), YOUR_LIST); Otherwise, on greater number
of elements to union, you will get stack overflow exception.

On Wed, Sep 16, 2015 at 10:17 PM, Shawn Carroll 
wrote:

> Your loop is deciding the files to process and then you are unioning the
> data on each iteration. If you change it to load all the files at the same
> time and let spark sort it out you should be much faster.
>
> Untested:
>
>  val rdd = sc.textFile("medline15n00*.xml")
>  val words = rdd.flatMap( x=> x.split(" ") );
>  words.map( x=> (x,1)).reduceByKey( (x,y) => (x+y) )
>  words.saveAsTextFile("results")
>
>
>
> shawn.c.carr...@gmail.com
> Software Engineer
> Soccer Referee
>
> On Wed, Sep 16, 2015 at 2:07 PM, huajun  wrote:
>
>> Hi.
>> I have a problem with this very simple word count rogram. The program
>> works
>> fine for
>> thousands of similar files in the dataset but is very slow for these first
>> 28 or so.
>> The files are about 50 to 100 MB each
>> and the program process other similar 28 files in about 30sec. These first
>> 28 files, however, take 30min.
>> This should not be a problem with the data in these files, as if I combine
>> all the files into one
>> bigger file, it will be processed in about 30sec.
>>
>> I am running spark in local mode (with > 100GB memory) and it is just use
>> 100% CPU (one core) most of time (for this troubled case) and no network
>> traffic is involved.
>>
>> Any obvious (or non-obvious) errors?
>>
>> def process(file : String) : RDD[(String, Int)] = {
>>   val rdd = sc.textFile(file)
>>   val words = rdd.flatMap( x=> x.split(" ") );
>>
>>   words.map( x=> (x,1)).reduceByKey( (x,y) => (x+y) )
>> }
>>
>> val file = "medline15n0001.xml"
>> var keep = process(file)
>>
>> for (i <- 2 to 28) {
>>   val file = if (i < 10) "medline15n000" + i + ".xml"
>>  else "medline15n00" + i + ".xml"
>>
>>   val result = process(file)
>>   keep = result.union(keep);
>> }
>> keep = keep.reduceByKey( (x,y) => (x+y) )
>> keep.saveAsTextFile("results")
>>
>> Thanks.
>>
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/problem-with-a-very-simple-word-count-program-tp24715.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>


Terminate streaming app on cluster restart

2015-08-06 Thread Alexander Krasheninnikov

Hello, everyone!
I have a case, when running standalone cluster: on master 
stop-all.sh/star-all.sh are invoked, streaming app loses all it's 
executors, but does not interrupt.
Since it is a streaming app, expected to get it's results ASAP, an 
downtime is undesirable.

Is there any workaround to solve that problem?

Thanks a lot.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: How to set log level in spark-submit ?

2015-07-30 Thread Alexander Krasheninnikov

I saw such example in docs:
--conf 
spark.driver.extraJavaOptions=-Dlog4j.configuration=file://$path_to_file

but, unfortunately, it does not work for me.

On 30.07.2015 05:12, canan chen wrote:
Yes, that should work. What I mean is is there any option in 
spark-submit command that I can specify for the log level



On Thu, Jul 30, 2015 at 10:05 AM, Jonathan Coveney jcove...@gmail.com 
mailto:jcove...@gmail.com wrote:


Put a log4j.properties file in conf/. You can copy
log4j.properties.template as a good base


El miércoles, 29 de julio de 2015, canan chen ccn...@gmail.com
mailto:ccn...@gmail.com escribió:

Anyone know how to set log level in spark-submit ? Thanks






Re: Count of distinct values in each column

2015-07-29 Thread Alexander Krasheninnikov

I made such naive implementation:

SparkConf conf =newSparkConf();
conf.setMaster(local[4]).setAppName(Stub);
finalJavaSparkContext ctx =newJavaSparkContext(conf);

JavaRDDString input = ctx.textFile(path_to_file);

// explode each line into list of column values
JavaRDDListString rowValues = input.map(newFunctionString, ListString() 
{
@Override
publicListString call(String v1)throwsException {
returnLists.newArrayList(v1.split(;));
}
});

// transform input to key(word, colNumber), value = 1
JavaPairRDDTuple2String, Integer, Integer positions = 
rowValues.flatMapToPair(newPairFlatMapFunctionListString, Tuple2String, Integer, 
Integer() {
@Override
publicIterableTuple2Tuple2String, Integer, Integer call(ListString 
strings)throwsException {
ListTuple2Tuple2String, Integer, Integer retval =newArrayList();

intcolNum = -1;
for(String word : strings) {
Tuple2String, Integer wordPosition =newTuple2(word, ++colNum);
retval.add(newTuple2(wordPosition,1));
}
returnretval;
}
});

// summ word counts within column
JavaPairRDDTuple2String, Integer, Integer summ = 
positions.reduceByKey(newFunction2Integer, Integer, Integer() {
@Override
publicInteger call(Integer v1, Integer v2)throwsException {
returnv1 + v2;
}
});

// invert position - make columnNumber key, and word+count - key
JavaPairRDDInteger, Tuple2String, Integer columnIsKey = 
summ.mapToPair(newPairFunctionTuple2Tuple2String, Integer, Integer, Integer, Tuple2String, 
Integer() {
@Override
publicTuple2Integer, Tuple2String, Integer call(Tuple2Tuple2String, 
Integer, Integer tuple2IntegerTuple2)throwsException {
return 
newTuple2(tuple2IntegerTuple2._1()._2(),newTuple2(tuple2IntegerTuple2._1()._1(),
 tuple2IntegerTuple2._2()));
}
});

// here some optimizations to avoid groupByKey may be implemented
JavaPairRDDInteger, IterableTuple2String, Integer groupped = 
columnIsKey.groupByKey();
// output results
groupped.foreach(newVoidFunctionTuple2Integer, IterableTuple2String, 
Integer() {
@Override
public voidcall(Tuple2Integer, IterableTuple2String, Integer 
integerIterableTuple2)throwsException {
String strValues =;
IterableTuple2String, Integer values = integerIterableTuple2._2();
for(Tuple2String,Integer distinct : values) {
strValues +=(+distinct._1()+,+ distinct._2() +);
}
System.out.println(Column: + integerIterableTuple2._1());
System.out.println(Distincts: + strValues);
}
});



On 29.07.2015 16:38, Devi P.V wrote:

Hi All,

I have a 5GB CSV dataset having 69 columns..I need to find the count 
of distinct values in each column. What is the optimized way to find 
the same using spark scala?


Example CSV format :

a,b,c,d
a,c,b,a
b,b,c,d
b,b,c,a
c,b,b,a

Output expecting :

(a,2),(b,2),(c,1) #- First column distinct count
(b,4),(c,1)   #- Second column distinct count
(c,3),(b,2)   #- Third column distinct count
(d,2),(a,3)   #- Fourth column distinct count


Thanks in Advance




Re: Override Logging with spark-streaming

2015-06-05 Thread Alexander Krasheninnikov
Have you tried putting this file on local disk on each of executor 
nodes? That worked for me.

On 05.06.2015 16:56, nib...@free.fr wrote:

Hello,
I want to override the log4j configuration when I start my spark job.
I tried :
.../bin/spark-submit --class  --conf 
spark.executor.extraJavaOptions=-Dlog4j.configuration=file:/.../log4j.properties
 x.jar
or
.../bin/spark-submit --class  --conf 
spark.executor.extraJavaOptions=-Dlog4j.configuration=/.../log4j.properties 
x.jar

But it doesn't work , I still have the default configuration.

Any ideas ?
Tks
Nicolas

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org




-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: kafka + Spark Streaming with checkPointing fails to start with

2015-05-15 Thread Alexander Krasheninnikov

I had same problem.
The solution, I've found was to use:
JavaStreamingContext streamingContext = 
JavaStreamingContext.getOrCreate('checkpoint_dir', contextFactory);


ALL configuration should be performed inside contextFactory. If you try 
to configure streamContext after ::getOrCreate, you receive an error 
has not been initialized.


On 13.05.2015 00:51, Ankur Chauhan wrote:

Hi,

I have a simple application which fails with the following exception only when 
the application is restarted (i.e. the checkpointDir has entires from a 
previous execution):

Exception in thread main org.apache.spark.SparkException: 
org.apache.spark.streaming.dstream.ShuffledDStream@2264e43c has not been initialized
at 
org.apache.spark.streaming.dstream.DStream.isTimeValid(DStream.scala:266)
at 
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:287)
at 
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:287)
at scala.Option.orElse(Option.scala:257)
at 
org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:284)
at 
org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:38)
at 
org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:116)
at 
org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:116)
at 
scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
at 
scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
at 
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at 
scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251)
at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105)
at 
org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:116)
at 
org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$restart$4.apply(JobGenerator.scala:227)
at 
org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$restart$4.apply(JobGenerator.scala:222)
at 
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
at 
org.apache.spark.streaming.scheduler.JobGenerator.restart(JobGenerator.scala:222)
at 
org.apache.spark.streaming.scheduler.JobGenerator.start(JobGenerator.scala:90)
at 
org.apache.spark.streaming.scheduler.JobScheduler.start(JobScheduler.scala:67)
at 
org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:512)
at 
com.brightcove.analytics.tacoma.RawLogProcessor$.start(RawLogProcessor.scala:115)
at 
com.brightcove.analytics.tacoma.Main$delayedInit$body.apply(Main.scala:15)
at scala.Function0$class.apply$mcV$sp(Function0.scala:40)
at 
scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12)
at scala.App$$anonfun$main$1.apply(App.scala:71)
at scala.App$$anonfun$main$1.apply(App.scala:71)
at scala.collection.immutable.List.foreach(List.scala:318)
at 
scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:32)
at scala.App$class.main(App.scala:71)
at com.brightcove.analytics.tacoma.Main$.main(Main.scala:5)
at com.brightcove.analytics.tacoma.Main.main(Main.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:483)
at 
org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:569)
at 
org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:166)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:189)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:110)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

The relavant source is:

class RawLogProcessor(ssc: StreamingContext, topic: String, kafkaParams: 
Map[String, String]) {
   // create kafka stream
   val rawlogDStream = KafkaUtils.createDirectStream[String, Object, 
StringDecoder, KafkaAvroDecoder](ssc, kafkaParams, Set(topic))
   //KafkaUtils.createStream[String, Object, StringDecoder, KafkaAvroDecoder](ssc, 
kafkaParams, Map(qa-rawlogs - 10), StorageLevel.MEMORY_AND_DISK_2)

   val eventStream = rawlogDStream
 .map({
   case (key, rawlogVal) =
 val record = rawlogVal.asInstanceOf[GenericData.Record]
 val rlog = RawLog.newBuilder()
   

Streaming app with windowing and persistence

2015-04-27 Thread Alexander Krasheninnikov

Hello, everyone.
I develop stream application, working with window functions - each 
window create table and perform some SQL-operations on extracted data.
I met such problem: when using window operations and checkpointing, 
application does not start next time.

Here is the code:



finalDuration batchDuration = Durations.seconds(10);
finalDuration slideDuration = Durations.seconds(10);
finalDuration windowDuration = Durations.seconds(600);

finalSparkConf conf =newSparkConf();
conf.setAppName(Streaming);
conf.setMaster(local[4]);


JavaStreamingContextFactory contextFactory =newJavaStreamingContextFactory() {
@Override
publicJavaStreamingContext create() {
JavaStreamingContext streamingContext 
=newJavaStreamingContext(conf,batchDuration);
streamingContext.checkpoint(CHECKPOINT_DIR);

returnstreamingContext;
}
};

JavaStreamingContext streamingContext = 
JavaStreamingContext.getOrCreate(CHECKPOINT_DIR,newConfiguration(), 
contextFactory,true);
JavaDStreamString lines = streamingContext.textFileStream(SOURCE_DIR);

lines.countByWindow(windowDuration,slideDuration).print();

streamingContext.start();
streamingContext.awaitTermination();



I expect, that after application restart, Spark will merge old event 
counter with new values (if it is not so, I am ready to merge old data 
manually).

But, after application restart, I have this error:
Exception in thread main org.apache.spark.SparkException: 
org.apache.spark.streaming.dstream.MappedDStream@49db6f23 has not been 
initialized
at 
org.apache.spark.streaming.dstream.DStream.isTimeValid(DStream.scala:266)
at 
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:287)
at 
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:287)

at scala.Option.orElse(Option.scala:289)
at 
org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:284)
at 
org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:38)
at 
org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:116)
at 
org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:116)
at 
scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:252)
at 
scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:252)
at 
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)

at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at 
scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:252)

at scala.collection.AbstractTraversable.flatMap(Traversable.scala:104)
at 
org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:116)
at 
org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$restart$4.apply(JobGenerator.scala:223)
at 
org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$restart$4.apply(JobGenerator.scala:218)
at 
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)

at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
at 
org.apache.spark.streaming.scheduler.JobGenerator.restart(JobGenerator.scala:218)
at 
org.apache.spark.streaming.scheduler.JobGenerator.start(JobGenerator.scala:89)
at 
org.apache.spark.streaming.scheduler.JobScheduler.start(JobScheduler.scala:67)
at 
org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:512)
at 
org.apache.spark.streaming.api.java.JavaStreamingContext.start(JavaStreamingContext.scala:584)

at my.package.FileAggregations.main(FileAggregations.java:76)

At FileAggregations.java:76 is

streamingContext.start();

Spark version is 1.3.0.

---
wbr, Alexandr Krasheninnikov