Setting log level to DEBUG while keeping httpclient.wire on WARN

2018-06-29 Thread Daniel Haviv
Hi,
I'm trying to debug an issue with Spark so I've set log level to DEBUG but
at the same time I'd like to avoid the httpclient.wire's verbose output by
setting it to WARN.

I tried the following log4.properties config but I'm still getting DEBUG
outputs for httpclient.wire:

log4j.rootCategory=DEBUG,console

...

log4j.logger.org.apache=WARN
log4j.logger.httpclient.wire.header=WARN
log4j.logger.httpclient.wire.content=WARN
log4j.logger.org.apache.commons.httpclient=WARN
log4j.logger.httpclient=WARN
log4j.logger.httpclient.wire.header=WARN

Thank you.
Daniel


Thrift server not exposing temp tables (spark.sql.hive.thriftServer.singleSession=true)

2018-05-30 Thread Daniel Haviv
Hi,
I would like to expose a DF through the Thrift server, but even though I
enable spark.sql.hive.thriftServer.singleSession I still can't see the temp
table.

I'm using Spark 2.2.0:


spark-shell --conf spark.sql.hive.thriftServer.singleSession=true

 import org.apache.spark.sql.hive.thriftserver.HiveThriftServer2
 import org.apache.spark.sql.hive.thriftserver._
 val df = spark.read.orc( "s3://sparkdemoday/crimes_orc")
 df.createOrReplaceTempView("cached_df")
 df.cache
 df.count
 val sql = spark.sqlContext
 HiveThriftServer2.startWithContext(sql)


Thank you.
Daniel


Writing a UDF that works with an Interval in PySpark

2017-12-11 Thread Daniel Haviv
Hi,
I'm trying to write a variant of date_add that accepts an interval as a
second parameter so that I could use the following syntax with SparkSQL:
select date_add(cast('1970-01-01' as date), interval 1 day)

but I'm getting the following error:
ValueError: (ValueError(u'Could not parse datatype: calendarinterval',),
,
(u'{"type":"struct","fields":[{"name":"","type":"date","nullable":true,"metadata":{}},{"name":"","type":"calendarinterval","nullable":true,"metadata":{}}]}',))

Any ideas how can I achieve this (or even better, has someone already done
this)?

Thank you.
Daniel


Re: UDF issues with spark

2017-12-10 Thread Daniel Haviv
Some code would help to debug the issue

On Fri, 8 Dec 2017 at 21:54 Afshin, Bardia <
bardia.afs...@changehealthcare.com> wrote:

> Using pyspark cli on spark 2.1.1 I’m getting out of memory issues when
> running the udf function on a recordset count of 10 with a mapping of the
> same value (arbirtrary for testing purposes). This is on amazon EMR release
> label 5.6.0 with the following hardware specs
>
>
>
> *m4.4xlarge*
>
> 32 vCPU, 64 GiB memory, EBS only storage
>
> EBS Storage:100 GiB
>
>
>
> Help?
>
>
> This message is confidential, intended only for the named recipient(s) and
> may contain information that is privileged or exempt from disclosure under
> applicable law. If you are not the intended recipient(s), you are notified
> that the dissemination, distribution, or copying of this message is
> strictly prohibited. If you receive this message in error or are not the
> named recipient(s), please notify the sender by return email and delete
> this message. Thank you.
>


Writing custom Structured Streaming receiver

2017-11-01 Thread Daniel Haviv
Hi,
Is there a guide to writing a custom Structured Streaming receiver?

Thank you.
Daniel


Optimizing dataset joins

2017-05-18 Thread Daniel Haviv
Hi,
With RDDs it was possible to define a partitioner for two RDDS and given
that two RDDs have the same partitioner, a join operation would be
performed local to the partition without shuffling.

Can dataset joins be optimized in the same way ?
Is it enough to repartition two datasets on the the same column?

Thank you.
Daniel


[Spark-SQL] Hive support is required to select over the following tables

2017-02-08 Thread Daniel Haviv
Hi,
I'm using Spark 2.1.0 on Zeppelin.

I can successfully create a table but when I try to select from it I fail:
spark.sql("create table foo (name string)")
res0: org.apache.spark.sql.DataFrame = []

spark.sql("select * from foo")

org.apache.spark.sql.AnalysisException:
Hive support is required to select over the following tables:
`default`.`zibi`
;;
'Project [*]
+- 'SubqueryAlias foo
+- 'SimpleCatalogRelation default, CatalogTable(
Table: `default`.`foo`
Created: Wed Feb 08 12:52:08 UTC 2017
Last Access: Wed Dec 31 23:59:59 UTC 1969
Type: MANAGED
Schema: [StructField(name,StringType,true)]
Provider: hive
Storage(Location: hdfs:/user/spark/warehouse/foo, InputFormat:
org.apache.hadoop.mapred.TextInputFormat, OutputFormat:
org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat))


This is a change in behavior from 2.0.2, any idea why ?

Thank you,
Daniel


Re: Not per-key state in spark streaming

2016-12-08 Thread Daniel Haviv
There's no need to extend Spark's API, look at mapWithState for examples.

On Thu, Dec 8, 2016 at 4:49 AM, Anty Rao  wrote:

>
>
> On Wed, Dec 7, 2016 at 7:42 PM, Anty Rao  wrote:
>
>> Hi
>> I'm new to Spark. I'm doing some research to see if spark streaming can
>> solve my problem. I don't want to keep per-key state,b/c my data set is
>> very huge and keep a little longer time, it not viable to keep all per key
>> state in memory.Instead, i want to have a bloom filter based state. Does it
>> possible to achieve this in Spark streaming.
>>
>> Is it possible to achieve this by extending Spark API?
>
>> --
>> Anty Rao
>>
>
>
>
> --
> Anty Rao
>


Re: Not per-key state in spark streaming

2016-12-07 Thread Daniel Haviv
Hi Anty,
What you could do is keep in the state only the existence of a key and when
necessary pull it from a secondary state store like HDFS or HBASE.

Daniel

On Wed, Dec 7, 2016 at 1:42 PM, Anty Rao  wrote:

> Hi
> I'm new to Spark. I'm doing some research to see if spark streaming can
> solve my problem. I don't want to keep per-key state,b/c my data set is
> very huge and keep a little longer time, it not viable to keep all per key
> state in memory.Instead, i want to have a bloom filter based state. Does it
> possible to achieve this in Spark streaming.
>
> --
> Anty Rao
>


Re: How to load only the data of the last partition

2016-11-17 Thread Daniel Haviv
Hi Samy,
If you're working with hive you could create a partitioned table and update
it's partitions' locations to the last version so when you'll query it
using spark, you'll always get the latest version.

Daniel

On Thu, Nov 17, 2016 at 9:05 PM, Samy Dindane  wrote:

> Hi,
>
> I have some data partitioned this way:
>
> /data/year=2016/month=9/version=0
> /data/year=2016/month=10/version=0
> /data/year=2016/month=10/version=1
> /data/year=2016/month=10/version=2
> /data/year=2016/month=10/version=3
> /data/year=2016/month=11/version=0
> /data/year=2016/month=11/version=1
>
> When using this data, I'd like to load the last version only of each month.
>
> A simple way to do this is to do `load("/data/year=2016/month=11/version=3")`
> instead of doing `load("/data")`.
> The drawback of this solution is the loss of partitioning information such
> as `year` and `month`, which means it would not be possible to apply
> operations based on the year or the month anymore.
>
> Is it possible to ask Spark to load the last version only of each month?
> How would you go about this?
>
> Thank you,
>
> Samy
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Using mapWithState without a checkpoint

2016-11-17 Thread Daniel Haviv
Hi,
Is it possible to use mapWithState without checkpointing at all ?
I'd rather have the whole application fail, restart and reload an
initialState RDD than pay for checkpointing every 10 batches.

Thank you,
Daniel


mapWithState job slows down & exceeds yarn's memory limits

2016-11-14 Thread Daniel Haviv
Hi,
I have a fairly simple stateful streaming job that suffers from high GC and
it's executors are killed as they are exceeding the size of the requested
container.
My current executor-memory is 10G, spark overhead is 2G and it's running
with one core.

At first the job begins running at a rate that is below the batch time
(45s) and after a few batches it starts running at much slower rates (2.1
minutes at times).

This is the relevant code:

import sparkSession.implicits._
val x: RDD[(String, Record)] = ss.sql(s"""select  * from
data_table  where partition_ts >=
${DateTime.now.minusHours(10).toString("MMddHH")}""").as[Record].map(x
=> {
  (x.iid, x)
}).rdd
val stateSpec = StateSpec.function(trackStateFunc _).numPartitions(16)
  .timeout(Durations.minutes(60 * 48)).initialState(x)
val ssc = new StreamingContext(sc, Seconds(45))
val sqlContext = ss.sqlContext
val kafkaStream = KafkaUtils.createDirectStream[String, String,
StringDecoder, StringDecoder](ssc, kafkaParams, topicSet)
val stateStream  =
kafkaStream.transform(x=>{x.map(_._2)}).transform(x=>{val sparkSession
= ss ; import sparkSession.implicits._ ;
sqlContext.read.schema(schema).json(x).as[Record].map(r=>Tuple2(r.iid,r))}.rdd).mapWithState(stateSpec)

stateStream.foreachRDD(x=>{x.coalesce(16).toDF().write.mode(SaveMode.Append).insertInto("joineddata")})


Right now, after playing with the parameters a bit, I'm running with
spark.memory.storageFraction=0 and spark.memory.fraction=0.2  .

Any help would be appreciated.

Thank you,

Daniel


Re: mapWithState with Datasets

2016-11-08 Thread Daniel Haviv
Scratch that, it's working fine.

Thank you.

On Tue, Nov 8, 2016 at 8:19 PM, Daniel Haviv <
daniel.ha...@veracity-group.com> wrote:

> Hi,
> I should have used transform instead of map
>
> val x: DStream[(String, Record)] = 
> kafkaStream.transform(x=>{x.map(_._2)}).transform(x=>{sqlContext.read.json(x).as[Record].map(r=>(r.iid,r))}.rdd)
>
> but I'm still unable to call mapWithState on x.
>
> any idea why ?
>
> Thank you,
> Daniel
>
>
>
> On Tue, Nov 8, 2016 at 7:46 PM, Daniel Haviv <daniel.haviv@veracity-group.
> com> wrote:
>
>> Hi,
>> I'm trying to make a stateful stream of Tuple2[String, Dataset[Record]] :
>>
>> val kafkaStream = KafkaUtils.createDirectStream[String, String, 
>> StringDecoder, StringDecoder](ssc, kafkaParams, topicSet)
>> val stateStream: DStream[RDD[(String, Record)]] = kafkaStream.map(x=> {  
>> sqlContext.read.json(x._2).as[Record]}).map(x=>{x.map(r=>(r.iid,r)).rdd})
>>
>>
>> Because stateStream is a DStream[RDD[(String, Record)]] I can't call 
>> mapWithState on it.
>> How can I map it to a DStream[(String,Record)] ?
>>
>> Thank you,
>> Daniel
>>
>>
>


Re: mapWithState with Datasets

2016-11-08 Thread Daniel Haviv
Hi,
I should have used transform instead of map

val x: DStream[(String, Record)] =
kafkaStream.transform(x=>{x.map(_._2)}).transform(x=>{sqlContext.read.json(x).as[Record].map(r=>(r.iid,r))}.rdd)

but I'm still unable to call mapWithState on x.

any idea why ?

Thank you,
Daniel



On Tue, Nov 8, 2016 at 7:46 PM, Daniel Haviv <
daniel.ha...@veracity-group.com> wrote:

> Hi,
> I'm trying to make a stateful stream of Tuple2[String, Dataset[Record]] :
>
> val kafkaStream = KafkaUtils.createDirectStream[String, String, 
> StringDecoder, StringDecoder](ssc, kafkaParams, topicSet)
> val stateStream: DStream[RDD[(String, Record)]] = kafkaStream.map(x=> {  
> sqlContext.read.json(x._2).as[Record]}).map(x=>{x.map(r=>(r.iid,r)).rdd})
>
>
> Because stateStream is a DStream[RDD[(String, Record)]] I can't call 
> mapWithState on it.
> How can I map it to a DStream[(String,Record)] ?
>
> Thank you,
> Daniel
>
>


mapWithState with Datasets

2016-11-08 Thread Daniel Haviv
Hi,
I'm trying to make a stateful stream of Tuple2[String, Dataset[Record]] :

val kafkaStream = KafkaUtils.createDirectStream[String, String,
StringDecoder, StringDecoder](ssc, kafkaParams, topicSet)
val stateStream: DStream[RDD[(String, Record)]] = kafkaStream.map(x=>
{  sqlContext.read.json(x._2).as[Record]}).map(x=>{x.map(r=>(r.iid,r)).rdd})


Because stateStream is a DStream[RDD[(String, Record)]] I can't call
mapWithState on it.
How can I map it to a DStream[(String,Record)] ?

Thank you,
Daniel


mapWithState with a big initial RDD gets OOM'ed

2016-11-07 Thread Daniel Haviv
Hi,
I have a stateful streaming app where I pass a rather large initialState
RDD at the beginning.
No matter to how many partitions I divide the stateful stream I keep
failing on OOM or Java heap space.

Is there a way to make it more resilient?
how can I control it's storage level?

This is basically my code:

val x = ss.sql("select * From myTable where partition_ts >=
2016110600").toJSON.rdd.mapPartitions(extractIidfromJson)
val stateSpec = StateSpec.function(trackStateFunc _).numPartitions(128)
  .timeout(Durations.minutes(60 * 48)).initialState(x)
val ssc = new StreamingContext(sc, Seconds(10))
val sqlContext = ss.sqlContext


val stateStream = kafkaStream.mapPartitions(r =>
jsonToJsonNode(r)).mapWithState(stateSpec)

stateStream.foreachRDD(r=>{  if (!r.isEmpty())
{ss.read.json(r).write.format("orc").mode(SaveMode.Append).saveAsTable("joinedData")}}
)


Thank you,

Daniel


mapWithState and DataFrames

2016-11-06 Thread Daniel Haviv
Hi,
How can I utilize mapWithState and DataFrames?
Right now I stream json messages from Kafka, update their state, output the
updated state as json and compose a dataframe from it.
It seems inefficient both in terms of processing and storage (a long string
instead of a compact DF).

Is there a way to manage state for DataFrame?

Thank you,
Daniel


Re: Stream compressed data from KafkaUtils.createDirectStream

2016-11-03 Thread Daniel Haviv
Hi Baki,
It's enough for the producer to write the messages compressed. See here:
https://cwiki.apache.org/confluence/display/KAFKA/Compression

Thank you.
Daniel

> On 3 Nov 2016, at 21:27, Daniel Haviv <daniel.ha...@veracity-group.com> wrote:
> 
> Hi,
> Kafka can compress/uncompress your messages for you seamlessly, adding 
> compression on top of that will be redundant.
> 
> Thank you.
> Daniel
> 
>> On 3 Nov 2016, at 20:53, bhayat <baki...@gmail.com> wrote:
>> 
>> Hello,
>> 
>> I really wonder that whether i can stream compressed data with using
>> KafkaUtils.createDirectStream(...) or not.
>> 
>> This is formal code that i use ;
>> 
>> JavaPairInputDStream<String, String> messages =
>> KafkaUtils.createStream(javaStreamingContext, zookeeperConfiguration,
>> groupName, topicMap, StorageLevel.MEMORY_AND_DISK_SER());
>> 
>> final JavaDStream lines = messages.map(new
>> Function<Tuple2String, String>, String>() {
>>@Override
>>public String call(Tuple2<String, String> tuple2) {
>>System.out.println("Stream Received: " + tuple2._2());
>>return tuple2._2();
>>}
>>});
>> 
>> in this code i am consuming string message but in my case i need to consume
>> compressed data from stream then uncompress it and finally read string
>> message.
>> 
>> Could you please asist me how i can go in right way ? ı am a bit confused
>> that whether spark streaming has this ability or not.
>> 
>> 
>> 
>> --
>> View this message in context: 
>> http://apache-spark-user-list.1001560.n3.nabble.com/Stream-compressed-data-from-KafkaUtils-createDirectStream-tp28010.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>> 
>> -
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>> 


Re: Stream compressed data from KafkaUtils.createDirectStream

2016-11-03 Thread Daniel Haviv
Hi,
Kafka can compress/uncompress your messages for you seamlessly, adding 
compression on top of that will be redundant.

Thank you.
Daniel

> On 3 Nov 2016, at 20:53, bhayat  wrote:
> 
> Hello,
> 
> I really wonder that whether i can stream compressed data with using
> KafkaUtils.createDirectStream(...) or not.
> 
> This is formal code that i use ;
> 
> JavaPairInputDStream messages =
> KafkaUtils.createStream(javaStreamingContext, zookeeperConfiguration,
> groupName, topicMap, StorageLevel.MEMORY_AND_DISK_SER());
> 
> final JavaDStream lines = messages.map(new
> Function, String>() {
>@Override
>public String call(Tuple2 tuple2) {
>System.out.println("Stream Received: " + tuple2._2());
>return tuple2._2();
>}
>});
> 
> in this code i am consuming string message but in my case i need to consume
> compressed data from stream then uncompress it and finally read string
> message.
> 
> Could you please asist me how i can go in right way ? ı am a bit confused
> that whether spark streaming has this ability or not.
> 
> 
> 
> --
> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/Stream-compressed-data-from-KafkaUtils-createDirectStream-tp28010.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
> 
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
> 


error: Unable to find encoder for type stored in a Dataset. when trying to map through a DataFrame

2016-11-02 Thread Daniel Haviv
Hi,
I have the following scenario:

scala> val df = spark.sql("select * from danieltest3")
df: org.apache.spark.sql.DataFrame = [iid: string, activity: string ... 34
more fields]

Now I'm trying to map through the rows I'm getting:
scala> df.map(r=>r.toSeq)
:32: error: Unable to find encoder for type stored in a Dataset.
Primitive types (Int, String, etc) and Product types (case classes) are
supported by importing spark.implicits._  Support for serializing other
types will be added in future releases.
   df.map(r=>r.toSeq)


What am I missing here ?

Thank you,
Daniel


partitionBy produces wrong number of tasks

2016-10-19 Thread Daniel Haviv
Hi,
I have a case where I use partitionBy to write my DF using a calculated
column, so it looks somethings like this:

val df = spark.sql("select *, from_unixtime(ts, 'MMddH')
partition_key from mytable")

df.write.partitionBy("partition_key").orc("/partitioned_table")


df is 8 partitions in size (spark.sql.shuffle.partitions is set to 8) and
partition_key usually has 1 or 2 distinct values.
When the write action begins it's split into 330 tasks and takes much
longer than it should but if I switch to the following code instead it
works as expected with 8 tasks:

df.createTempView("tab")
spark.sql("set hive.exec.dynamic.partition.mode=nonstrict")
spark.sql("insert into partitioned_table select * from tab")



Any idea why is this happening ?
How does partitionBy decide to repartition the DF ?


Thank you,
Daniel


Using DirectOutputCommitter with ORC

2016-07-25 Thread Daniel Haviv
Hi,
How can the DirectOutputCommitter be utilized for writing ORC files?
I tried setting it via:

sc.getConf.set("spark.hadoop.mapred.output.committer.class","com.veracity-group.datastage.directorcwriter")

But I can still see a _temporary  directory being used when I save my
dataframe as ORC.

Thank you,
Daniel


Spark (on Windows) not picking up HADOOP_CONF_DIR

2016-07-17 Thread Daniel Haviv
Hi,
I'm running Spark using IntelliJ on Windows and even though I set
HADOOP_CONF_DIR it does not affect the contents of sc.hadoopConfiguration.

Anybody encountered it ?

Thanks,
Daniel


Re: Spark Streaming - Best Practices to handle multiple datapoints arriving at different time interval

2016-07-16 Thread Daniel Haviv
Or use mapWithState

Thank you.
Daniel

> On 16 Jul 2016, at 03:02, RK Aduri  wrote:
> 
> You can probably define sliding windows and set larger batch intervals. 
> 
> 
> 
> --
> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-Best-Practices-to-handle-multiple-datapoints-arriving-at-different-time-interval-tp27315p27348.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
> 
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
> 


Spark Streaming: Refreshing broadcast value after each batch

2016-07-12 Thread Daniel Haviv
Hi,
I have a streaming application which uses a broadcast variable which I
populate from a database.
I would like every once in a while (or even every batch) to update/replace
the broadcast variable with the latest data from the database.

Only way I found online to do this is this "hackish" way (
http://stackoverflow.com/questions/28573816/periodic-broadcast-in-apache-spark-streaming)
which I'm not sure gets re-executed per batch anyway:

val broadcastFactory = new TorrentBroadcastFactory()
broadcastFactory.unbroadcast(BroadcastId, true, true)
// append some ids to initIds
val broadcastcontent =
broadcastFactory.newBroadcast[.Set[String]](initIds, false,
BroadcastId)


Is there a proper way to do that?

Thank you,
Daniel


Confusion regarding sc.accumulableCollection(mutable.ArrayBuffer[String]()) type

2016-06-23 Thread Daniel Haviv
Hi,
I want to to use an accumulableCollection of type mutable.ArrayBuffer[String
] to return invalid records detected during transformations but I don't
quite get it's type:

val errorAccumulator: Accumulable[ArrayBuffer[String], String] =
sc.accumulableCollection(mutable.ArrayBuffer[String]())


Why am I getting an Accumulable[ArrayBuffer[String], String] object
and not an Accumulable[ArrayBuffer[String]] ?


Thank you.

Daniel


Re: Switching broadcast mechanism from torrrent

2016-06-20 Thread Daniel Haviv
I agree, it was by mistake.
I just updated so that the next person looking for torrent broadcast issues
will have a hint :)

Thank you.
Daniel

On Sun, Jun 19, 2016 at 5:26 PM, Ted Yu <yuzhih...@gmail.com> wrote:

> I think good practice is not to hold on to SparkContext in mapFunction.
>
> On Sun, Jun 19, 2016 at 7:10 AM, Takeshi Yamamuro <linguin@gmail.com>
> wrote:
>
>> How about using `transient` annotations?
>>
>> // maropu
>>
>> On Sun, Jun 19, 2016 at 10:51 PM, Daniel Haviv <
>> daniel.ha...@veracity-group.com> wrote:
>>
>>> Hi,
>>> Just updating on my findings for future reference.
>>> The problem was that after refactoring my code I ended up with a scala
>>> object which held SparkContext as a member, eg:
>>> object A  {
>>>  sc: SparkContext = new SparkContext
>>>  def mapFunction  {}
>>> }
>>>
>>> and when I called rdd.map(A.mapFunction) it failed as A.sc is not
>>> serializable.
>>>
>>> Thanks,
>>> Daniel
>>>
>>> On Tue, Jun 7, 2016 at 10:13 AM, Takeshi Yamamuro <linguin@gmail.com
>>> > wrote:
>>>
>>>> Hi,
>>>>
>>>> Since `HttpBroadcastFactory` has already been removed in master, so
>>>> you cannot use the broadcast mechanism in future releases.
>>>>
>>>> Anyway, I couldn't find a root cause only from the stacktraces...
>>>>
>>>> // maropu
>>>>
>>>>
>>>>
>>>>
>>>> On Mon, Jun 6, 2016 at 2:14 AM, Daniel Haviv <
>>>> daniel.ha...@veracity-group.com> wrote:
>>>>
>>>>> Hi,
>>>>> I've set  spark.broadcast.factory to
>>>>> org.apache.spark.broadcast.HttpBroadcastFactory and it indeed resolve my
>>>>> issue.
>>>>>
>>>>> I'm creating a dataframe which creates a broadcast variable internally
>>>>> and then fails due to the torrent broadcast with the following stacktrace:
>>>>> Caused by: org.apache.spark.SparkException: Failed to get
>>>>> broadcast_3_piece0 of broadcast_3
>>>>> at
>>>>> org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$broadcast$TorrentBroadcast$$readBlocks$1$$anonfun$2.apply(TorrentBroadcast.scala:138)
>>>>> at
>>>>> org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$broadcast$TorrentBroadcast$$readBlocks$1$$anonfun$2.apply(TorrentBroadcast.scala:138)
>>>>> at scala.Option.getOrElse(Option.scala:120)
>>>>> at
>>>>> org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$broadcast$TorrentBroadcast$$readBlocks$1.apply$mcVI$sp(TorrentBroadcast.scala:137)
>>>>> at
>>>>> org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$broadcast$TorrentBroadcast$$readBlocks$1.apply(TorrentBroadcast.scala:120)
>>>>> at
>>>>> org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$broadcast$TorrentBroadcast$$readBlocks$1.apply(TorrentBroadcast.scala:120)
>>>>> at scala.collection.immutable.List.foreach(List.scala:318)
>>>>> at org.apache.spark.broadcast.TorrentBroadcast.org
>>>>> $apache$spark$broadcast$TorrentBroadcast$$readBlocks(TorrentBroadcast.scala:120)
>>>>> at
>>>>> org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readBroadcastBlock$1.apply(TorrentBroadcast.scala:175)
>>>>> at
>>>>> org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1220)
>>>>>
>>>>> I'm using spark 1.6.0 on CDH 5.7
>>>>>
>>>>> Thanks,
>>>>> Daniel
>>>>>
>>>>>
>>>>> On Wed, Jun 1, 2016 at 5:52 PM, Ted Yu <yuzhih...@gmail.com> wrote:
>>>>>
>>>>>> I found spark.broadcast.blockSize but no parameter to switch
>>>>>> broadcast method.
>>>>>>
>>>>>> Can you describe the issues with torrent broadcast in more detail ?
>>>>>>
>>>>>> Which version of Spark are you using ?
>>>>>>
>>>>>> Thanks
>>>>>>
>>>>>> On Wed, Jun 1, 2016 at 7:48 AM, Daniel Haviv <
>>>>>> daniel.ha...@veracity-group.com> wrote:
>>>>>>
>>>>>>> Hi,
>>>>>>> Our application is failing due to issues with the torrent broadcast,
>>>>>>> is there a way to switch to another broadcast method ?
>>>>>>>
>>>>>>> Thank you.
>>>>>>> Daniel
>>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>
>>>>
>>>> --
>>>> ---
>>>> Takeshi Yamamuro
>>>>
>>>
>>>
>>
>>
>> --
>> ---
>> Takeshi Yamamuro
>>
>
>


Re: Switching broadcast mechanism from torrrent

2016-06-19 Thread Daniel Haviv
Hi,
Just updating on my findings for future reference.
The problem was that after refactoring my code I ended up with a scala
object which held SparkContext as a member, eg:
object A  {
 sc: SparkContext = new SparkContext
 def mapFunction  {}
}

and when I called rdd.map(A.mapFunction) it failed as A.sc is not
serializable.

Thanks,
Daniel

On Tue, Jun 7, 2016 at 10:13 AM, Takeshi Yamamuro <linguin@gmail.com>
wrote:

> Hi,
>
> Since `HttpBroadcastFactory` has already been removed in master, so
> you cannot use the broadcast mechanism in future releases.
>
> Anyway, I couldn't find a root cause only from the stacktraces...
>
> // maropu
>
>
>
>
> On Mon, Jun 6, 2016 at 2:14 AM, Daniel Haviv <
> daniel.ha...@veracity-group.com> wrote:
>
>> Hi,
>> I've set  spark.broadcast.factory to
>> org.apache.spark.broadcast.HttpBroadcastFactory and it indeed resolve my
>> issue.
>>
>> I'm creating a dataframe which creates a broadcast variable internally
>> and then fails due to the torrent broadcast with the following stacktrace:
>> Caused by: org.apache.spark.SparkException: Failed to get
>> broadcast_3_piece0 of broadcast_3
>> at
>> org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$broadcast$TorrentBroadcast$$readBlocks$1$$anonfun$2.apply(TorrentBroadcast.scala:138)
>> at
>> org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$broadcast$TorrentBroadcast$$readBlocks$1$$anonfun$2.apply(TorrentBroadcast.scala:138)
>> at scala.Option.getOrElse(Option.scala:120)
>> at
>> org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$broadcast$TorrentBroadcast$$readBlocks$1.apply$mcVI$sp(TorrentBroadcast.scala:137)
>> at
>> org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$broadcast$TorrentBroadcast$$readBlocks$1.apply(TorrentBroadcast.scala:120)
>> at
>> org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$broadcast$TorrentBroadcast$$readBlocks$1.apply(TorrentBroadcast.scala:120)
>> at scala.collection.immutable.List.foreach(List.scala:318)
>> at org.apache.spark.broadcast.TorrentBroadcast.org
>> $apache$spark$broadcast$TorrentBroadcast$$readBlocks(TorrentBroadcast.scala:120)
>> at
>> org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readBroadcastBlock$1.apply(TorrentBroadcast.scala:175)
>> at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1220)
>>
>> I'm using spark 1.6.0 on CDH 5.7
>>
>> Thanks,
>> Daniel
>>
>>
>> On Wed, Jun 1, 2016 at 5:52 PM, Ted Yu <yuzhih...@gmail.com> wrote:
>>
>>> I found spark.broadcast.blockSize but no parameter to switch broadcast
>>> method.
>>>
>>> Can you describe the issues with torrent broadcast in more detail ?
>>>
>>> Which version of Spark are you using ?
>>>
>>> Thanks
>>>
>>> On Wed, Jun 1, 2016 at 7:48 AM, Daniel Haviv <
>>> daniel.ha...@veracity-group.com> wrote:
>>>
>>>> Hi,
>>>> Our application is failing due to issues with the torrent broadcast, is
>>>> there a way to switch to another broadcast method ?
>>>>
>>>> Thank you.
>>>> Daniel
>>>>
>>>
>>>
>>
>
>
> --
> ---
> Takeshi Yamamuro
>


Re: HiveContext: Unable to load AWS credentials from any provider in the chain

2016-06-10 Thread Daniel Haviv
I'm using EC2 instances 

Thank you.
Daniel

> On 9 Jun 2016, at 16:49, Gourav Sengupta <gourav.sengu...@gmail.com> wrote:
> 
> Hi,
> 
> are you using EC2 instances or local cluster behind firewall.
> 
> 
> Regards,
> Gourav Sengupta
> 
>> On Wed, Jun 8, 2016 at 4:34 PM, Daniel Haviv 
>> <daniel.ha...@veracity-group.com> wrote:
>> Hi,
>> I'm trying to create a table on s3a but I keep hitting the following error:
>> Exception in thread "main" org.apache.hadoop.hive.ql.metadata.HiveException: 
>> MetaException(message:com.cloudera.com.amazonaws.AmazonClientException: 
>> Unable to load AWS credentials from any provider in the chain)
>>  
>> I tried setting the s3a keys using the configuration object but I might be 
>> hitting SPARK-11364 :
>> conf.set("fs.s3a.access.key", accessKey)
>> conf.set("fs.s3a.secret.key", secretKey)
>> conf.set("spark.hadoop.fs.s3a.access.key",accessKey)
>> conf.set("spark.hadoop.fs.s3a.secret.key",secretKey)
>> val sc = new SparkContext(conf)
>>  
>> I tried setting these propeties in hdfs-site.xml but i'm still getting this 
>> error.
>> Finally I tried to set the AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY 
>> environment variables but with no luck.
>>  
>> Any ideas on how to resolve this issue ?
>>  
>> Thank you.
>> Daniel
>> 
>> Thank you.
>> Daniel
> 


Re: HiveContext: Unable to load AWS credentials from any provider in the chain

2016-06-08 Thread Daniel Haviv
Hi,
I've set these properties both in core-site.xml and hdfs-site.xml with no luck.

Thank you.
Daniel

> On 9 Jun 2016, at 01:11, Steve Loughran <ste...@hortonworks.com> wrote:
> 
> 
>> On 8 Jun 2016, at 16:34, Daniel Haviv <daniel.ha...@veracity-group.com> 
>> wrote:
>> 
>> Hi,
>> I'm trying to create a table on s3a but I keep hitting the following error:
>> Exception in thread "main" org.apache.hadoop.hive.ql.metadata.HiveException: 
>> MetaException(message:com.cloudera.com.amazonaws.AmazonClientException: 
>> Unable to load AWS credentials from any provider in the chain)
>>  
>> I tried setting the s3a keys using the configuration object but I might be 
>> hitting SPARK-11364 :
>> conf.set("fs.s3a.access.key", accessKey)
>> conf.set("fs.s3a.secret.key", secretKey)
>> conf.set("spark.hadoop.fs.s3a.access.key",accessKey)
>> conf.set("spark.hadoop.fs.s3a.secret.key",secretKey)
>> val sc = new SparkContext(conf)
>>  
>> I tried setting these propeties in hdfs-site.xml but i'm still getting this 
>> error.
> 
> 
> 
> try core-site.xml rather than hdfs-site.xml; the latter only gets loaded when 
> an HdfsConfiguration() instances is created; it may be a bit too late.
> 
>> Finally I tried to set the AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY 
>> environment variables but with no luck.
> 
> Those env vars aren't picked up directly by S3a (well, that was fixed over 
> the weekend https://issues.apache.org/jira/browse/HADOOP-12807  ); There's 
> some fixup in spark ( see 
> SparkHadoopUtil.appendS3AndSparkHadoopConfigurations() ); I don't know if 
> that is a factor; 
> 
>> Any ideas on how to resolve this issue ?
>>  
>> Thank you.
>> Daniel
>> 
>> Thank you.
>> Daniel
> 


HiveContext: Unable to load AWS credentials from any provider in the chain

2016-06-08 Thread Daniel Haviv
Hi,
I'm trying to create a table on s3a but I keep hitting the following error:
Exception in thread "main" org.apache.hadoop.hive.ql.metadata.HiveException: 
MetaException(message:com.cloudera.com.amazonaws.AmazonClientException: Unable 
to load AWS credentials from any provider in the chain)
 
I tried setting the s3a keys using the configuration object but I might be 
hitting SPARK-11364 :
conf.set("fs.s3a.access.key", accessKey)
conf.set("fs.s3a.secret.key", secretKey)
conf.set("spark.hadoop.fs.s3a.access.key",accessKey)
conf.set("spark.hadoop.fs.s3a.secret.key",secretKey)
val sc = new SparkContext(conf)
 
I tried setting these propeties in hdfs-site.xml but i'm still getting this 
error.
Finally I tried to set the AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY environment 
variables but with no luck.
 
Any ideas on how to resolve this issue ?
 
Thank you.
Daniel

Thank you.
Daniel

groupByKey returns an emptyRDD

2016-06-06 Thread Daniel Haviv
Hi,
I'm wrapped the following code into a jar:

val test = sc.parallelize(Seq(("daniel", "a"), ("daniel", "b"), ("test", "1)")))

val agg = test.groupByKey()
agg.collect.foreach(r=>{println(r._1)})


The result of groupByKey is an empty RDD, when I'm trying the same
code using the spark-shell it's running as expected.


Any ideas?


Thank you,

Daniel


Re: Switching broadcast mechanism from torrrent

2016-06-06 Thread Daniel Haviv
Hi,
I've set  spark.broadcast.factory to
org.apache.spark.broadcast.HttpBroadcastFactory and it indeed resolve my
issue.

I'm creating a dataframe which creates a broadcast variable internally and
then fails due to the torrent broadcast with the following stacktrace:
Caused by: org.apache.spark.SparkException: Failed to get
broadcast_3_piece0 of broadcast_3
at
org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$broadcast$TorrentBroadcast$$readBlocks$1$$anonfun$2.apply(TorrentBroadcast.scala:138)
at
org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$broadcast$TorrentBroadcast$$readBlocks$1$$anonfun$2.apply(TorrentBroadcast.scala:138)
at scala.Option.getOrElse(Option.scala:120)
at
org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$broadcast$TorrentBroadcast$$readBlocks$1.apply$mcVI$sp(TorrentBroadcast.scala:137)
at
org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$broadcast$TorrentBroadcast$$readBlocks$1.apply(TorrentBroadcast.scala:120)
at
org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$broadcast$TorrentBroadcast$$readBlocks$1.apply(TorrentBroadcast.scala:120)
at scala.collection.immutable.List.foreach(List.scala:318)
at org.apache.spark.broadcast.TorrentBroadcast.org
$apache$spark$broadcast$TorrentBroadcast$$readBlocks(TorrentBroadcast.scala:120)
at
org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readBroadcastBlock$1.apply(TorrentBroadcast.scala:175)
at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1220)

I'm using spark 1.6.0 on CDH 5.7

Thanks,
Daniel


On Wed, Jun 1, 2016 at 5:52 PM, Ted Yu <yuzhih...@gmail.com> wrote:

> I found spark.broadcast.blockSize but no parameter to switch broadcast
> method.
>
> Can you describe the issues with torrent broadcast in more detail ?
>
> Which version of Spark are you using ?
>
> Thanks
>
> On Wed, Jun 1, 2016 at 7:48 AM, Daniel Haviv <
> daniel.ha...@veracity-group.com> wrote:
>
>> Hi,
>> Our application is failing due to issues with the torrent broadcast, is
>> there a way to switch to another broadcast method ?
>>
>> Thank you.
>> Daniel
>>
>
>


Switching broadcast mechanism from torrrent

2016-06-01 Thread Daniel Haviv
Hi,
Our application is failing due to issues with the torrent broadcast, is
there a way to switch to another broadcast method ?

Thank you.
Daniel


Re: "collecting" DStream data

2016-05-15 Thread Daniel Haviv
I mistyped, the code is
foreachRDD(r=> arr++=r.collect)

And it does work for ArrayBuffer but not for HashMap

On Sun, May 15, 2016 at 3:04 PM, Silvio Fiorito <
silvio.fior...@granturing.com> wrote:

> Hi Daniel,
>
> Given your example, “arr” is defined on the driver, but the “foreachRDD”
> function is run on the executors. If you want to collect the results of the
> RDD/DStream down to the driver you need to call RDD.collect. You have to be
> careful though that you have enough memory on the driver JVM to hold the
> results, otherwise you’ll have an OOM exception. Also, you can’t update the
> value of a broadcast variable, since it’s immutable.
>
> Thanks,
> Silvio
>
> From: Daniel Haviv <daniel.ha...@veracity-group.com>
> Date: Sunday, May 15, 2016 at 6:23 AM
> To: user <user@spark.apache.org>
> Subject: "collecting" DStream data
>
> Hi,
> I have a DStream I'd like to collect and broadcast it's values.
> To do so I've created a mutable HashMap which i'm filling with foreachRDD
> but when I'm checking it, it remains empty. If I use ArrayBuffer it works
> as expected.
>
> This is my code:
>
> val arr = scala.collection.mutable.HashMap.empty[String,String]
> MappedVersionsToBrodcast.foreachRDD(r=> { r.foreach(r=> { arr+=r})   } )
>
>
> What am I missing here?
>
> Thank you,
> Daniel
>
>


"collecting" DStream data

2016-05-15 Thread Daniel Haviv
Hi,
I have a DStream I'd like to collect and broadcast it's values.
To do so I've created a mutable HashMap which i'm filling with foreachRDD
but when I'm checking it, it remains empty. If I use ArrayBuffer it works
as expected.

This is my code:

val arr = scala.collection.mutable.HashMap.empty[String,String]
MappedVersionsToBrodcast.foreachRDD(r=> { r.foreach(r=> { arr+=r})   } )


What am I missing here?

Thank you,
Daniel


java.lang.ClassCastException: org.apache.spark.util.SerializableConfiguration cannot be cast to [B

2016-05-11 Thread Daniel Haviv
Hi,
I'm running a very simple job (textFile->map->groupby->count) and hitting
this with Spark 1.6.0 on EMR 4.3 (Hadoop 2.7.1) and hitting this exception
when running on yarn-client and not in local mode:

16/05/11 10:29:26 INFO TaskSetManager: Starting task 0.1 in stage 0.0 (TID
1, ip-172-31-33-97.ec2.internal, partition 0,NODE_LOCAL, 15116 bytes)
16/05/11 10:29:26 WARN TaskSetManager: Lost task 0.1 in stage 0.0 (TID 1,
ip-172-31-33-97.ec2.internal): java.lang.ClassCastException:
org.apache.spark.util.SerializableConfiguration cannot be cast to [B
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62)
at org.apache.spark.scheduler.Task.run(Task.scala:89)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)


If found a jira that relates to streaming and accumulators but I'm using
neither.

Any ideas ?
Should I file a jira?

Thank you,
Daniel


Re: Is it a bug?

2016-05-09 Thread Daniel Haviv
How come that for the first() function it calculates an updated value and
for collect it doesn't ?



On Sun, May 8, 2016 at 4:17 PM, Ted Yu  wrote:

> I don't think so.
> RDD is immutable.
>
> > On May 8, 2016, at 2:14 AM, Sisyphuss  wrote:
> >
> > 
> >
> >
> >
> > --
> > View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Is-it-a-bug-tp26898.html
> > Sent from the Apache Spark User List mailing list archive at Nabble.com.
> >
> > -
> > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> > For additional commands, e-mail: user-h...@spark.apache.org
> >
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: Save DataFrame to HBase

2016-04-27 Thread Daniel Haviv
Hi Benjamin,
Yes it should work.

Let me know if you need further assistance I might be able to get the code I've 
used for that project.

Thank you.
Daniel

> On 24 Apr 2016, at 17:35, Benjamin Kim <bbuil...@gmail.com> wrote:
> 
> Hi Daniel,
> 
> How did you get the Phoenix plugin to work? I have CDH 5.5.2 installed which 
> comes with HBase 1.0.0 and Phoenix 4.5.2. Do you think this will work?
> 
> Thanks,
> Ben
> 
>> On Apr 24, 2016, at 1:43 AM, Daniel Haviv <daniel.ha...@veracity-group.com> 
>> wrote:
>> 
>> Hi,
>> I tried saving DF to HBase using a hive table with hbase storage handler and 
>> hiveContext but it failed due to a bug.
>> 
>> I was able to persist the DF to hbase using Apache Pheonix which was pretty 
>> simple.
>> 
>> Thank you.
>> Daniel
>> 
>>> On 21 Apr 2016, at 16:52, Benjamin Kim <bbuil...@gmail.com> wrote:
>>> 
>>> Has anyone found an easy way to save a DataFrame into HBase?
>>> 
>>> Thanks,
>>> Ben
>>> 
>>> 
>>> -
>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>> For additional commands, e-mail: user-h...@spark.apache.org
> 


Re: Save DataFrame to HBase

2016-04-24 Thread Daniel Haviv
Hi,
I tried saving DF to HBase using a hive table with hbase storage handler and 
hiveContext but it failed due to a bug.

I was able to persist the DF to hbase using Apache Pheonix which was pretty 
simple.

Thank you.
Daniel

> On 21 Apr 2016, at 16:52, Benjamin Kim  wrote:
> 
> Has anyone found an easy way to save a DataFrame into HBase?
> 
> Thanks,
> Ben
> 
> 
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
> 


Spark fileStream from a partitioned hive dir

2016-04-13 Thread Daniel Haviv
Hi,
We have a hive table which gets data written to it by two partition keys,
day and hour.
We would like to stream the incoming files assince fileStream can only
listen on one directory we start a streaming job on the latest partition
and every hour kill it and start a new one on a newer partition (We are
also working on migrating the stream from HDFS to Kafka but it will take a
while).

I imagine I'm not the first who tries that, is there a better way to either
stream multiple dirs or change the streaming source location at runtime (or
any other suggestion)?


Thank you.
Daniel


Re: aggregateByKey on PairRDD

2016-03-30 Thread Daniel Haviv
Hi,
shouldn't groupByKey be avoided (
https://databricks.gitbooks.io/databricks-spark-knowledge-base/content/best_practices/prefer_reducebykey_over_groupbykey.html)
?


Thank you,.
Daniel

On Wed, Mar 30, 2016 at 9:01 AM, Akhil Das 
wrote:

> Isn't it what tempRDD.groupByKey does?
>
> Thanks
> Best Regards
>
> On Wed, Mar 30, 2016 at 7:36 AM, Suniti Singh 
> wrote:
>
>> Hi All,
>>
>> I have an RDD having the data in  the following form :
>>
>> tempRDD: RDD[(String, (String, String))]
>>
>> (brand , (product, key))
>>
>> ("amazon",("book1","tech"))
>>
>> ("eBay",("book1","tech"))
>>
>> ("barns",("book","tech"))
>>
>> ("amazon",("book2","tech"))
>>
>>
>> I would like to group the data by Brand and would like to get the result
>> set in the following format :
>>
>> resultSetRDD : RDD[(String, List[(String), (String)]
>>
>> i tried using the aggregateByKey but kind  of not getting how to achieve
>> this. OR is there any other way to achieve this?
>>
>> val resultSetRDD  = tempRDD.aggregateByKey("")({case (aggr , value) =>
>> aggr + String.valueOf(value) + ","}, (aggr1, aggr2) => aggr1 + aggr2)
>>
>> resultSetRDD = (amazon,("book1","tech"),("book2","tech"))
>>
>> Thanks,
>>
>> Suniti
>>
>
>


Flume with Spark Streaming Sink

2016-03-20 Thread Daniel Haviv
Hi,
I'm trying to use the Spark Sink with Flume but it seems I'm missing some
of the dependencies.
I'm running the following code:

./bin/spark-shell --master yarn --jars
/home/impact/flumeStreaming/spark-streaming-flume_2.10-1.6.1.jar,/home/impact/flumeStreaming/flume-ng-core-1.6.0.jar,/home/impact/flumeStreaming/flume-ng-sdk-1.6.0.jar


import org.apache.spark.streaming.flume._

import org.apache.spark.streaming._

val ssc = new StreamingContext(sc, Seconds(60))
val flumeStream = FlumeUtils.createPollingStream(ssc, "impact1", )

flumeStream.print
ssc.start


And getting this execption.

16/03/20 18:17:17 INFO scheduler.ReceiverTracker: Registered receiver for
stream 0 from impact3.indigo.co.il:51581
16/03/20 18:17:17 WARN scheduler.TaskSetManager: Lost task 0.0 in stage 4.0
(TID 76, impact3.indigo.co.il): java.lang.NoClassDefFoundError:
org/apache/spark/streaming/flume/sink/SparkFlumeProtocol$Callback
at
org.apache.spark.streaming.flume.FlumePollingReceiver$$anonfun$onStart$1.apply(FlumePollingInputDStream.scala:84)


What deps am I missing ?

Thank you.
Daniel


One task hangs and never finishes

2015-12-17 Thread Daniel Haviv
Hi,

I have an application running a set of transformations and finishes
with saveAsTextFile.

Out of 80 tasks all finish pretty fast but one that just hangs and
outputs these message to STDERR:

5/12/17 17:22:19 INFO collection.ExternalAppendOnlyMap: Thread 82
spilling in-memory map of 4.0 GB to disk (6 times so far)

15/12/17 17:23:41 INFO collection.ExternalAppendOnlyMap: Thread 82
spilling in-memory map of 3.8 GB to disk (7 times so far)


Inside the WEBUI I can see that for some reason the shuffle spill
memory is exteremly high (15GB) compared to the others (around a few
mb to 1 GB) and as a result the GC time is exteremly bad



IndexIDAttemptStatus  ▴Locality LevelExecutor ID / HostLaunch
TimeDurationScheduler DelayTask Deserialization TimeGC TimeResult
Serialization TimeGetting Result TimePeak Execution MemoryOutput Size
/ RecordsShuffle Read Size / RecordsShuffle Spill (Memory)Shuffle
Spill (Disk)Errors171530RUNNINGPROCESS_LOCAL8 /
impact3.indigo.co.il2015/12/17 17:18:3232 min0 ms0 ms25 min0 ms0 ms0.0
B0.0 B / 0835.8 MB / 521783315.2 GB662.9 MB

I'm running with 8 executors with 8 cpus and 25GB ram each and it
seems that tasks are correctly spread across the nodes:



Executor IDAddressRDD BlocksStorage MemoryDisk UsedActive TasksFailed
TasksComplete TasksTotal TasksTask TimeInputShuffle ReadShuffle
WriteLogsThread Dump1impact1.indigo.co.il:3812000.0 B / 12.9 GB0.0
B0025253.54 h2.1 GB377.6 MB555.3 MB
stdout 

stderr 

Thread Dump 
2impact4.indigo.co.il:4076800.0
B / 12.9 GB0.0 B0024244.32 h2.0 GB513.1 MB495.9 MB
stdout 

stderr 

Thread Dump 
3impact2.indigo.co.il:4366600.0
B / 12.9 GB0.0 B0024243.78 h2.0 GB332.7 MB503.1 MB
stdout 

stderr 

Thread Dump 
4impact3.indigo.co.il:4902000.0
B / 12.9 GB0.0 B0026263.39 h2.2 GB532.0 MB596.1 MB
stdout 

stderr 

Thread Dump 
5impact1.indigo.co.il:4906800.0
B / 12.9 GB0.0 B0024243.30 h2.0 GB187.3 MB502.1 MB
stdout 

stderr 

Thread Dump 
6impact4.indigo.co.il:5006900.0
B / 12.9 GB0.0 B0028283.64 h2.4 GB336.4 MB498.9 MB
stdout 

stderr 

Thread Dump 
7impact2.indigo.co.il:4022500.0
B / 12.9 GB0.0 B0028283.62 h2.0 GB93.6 MB496.2 MB
stdout 

stderr 

Thread Dump 
8impact3.indigo.co.il:5076700.0
B / 12.9 GB0.0 B1024253.38 h2.1 GB336.2 MB564.4 MB
stdout 

stderr 

Thread Dump 

Re: HiveContext ignores ("skip.header.line.count"="1")

2015-10-26 Thread Daniel Haviv
I will

Thank you.

> On 27 באוק׳ 2015, at 4:54, Felix Cheung  wrote:
> 
> Please open a JIRA?
> 
>  
> Date: Mon, 26 Oct 2015 15:32:42 +0200
> Subject: HiveContext ignores ("skip.header.line.count"="1")
> From: daniel.ha...@veracity-group.com
> To: user@spark.apache.org
> 
> Hi,
> I have a csv table in Hive which is configured to skip the header row using 
> TBLPROPERTIES("skip.header.line.count"="1").
> When querying from Hive the header row is not included in the data, but when 
> running the same query via HiveContext I get the header row.
> 
> I made sure that HiveContext sees the skip.header.line.count setting by 
> running "show create table"
> 
> Any ideas?
> 
> Thank you.
> Daniel


HiveContext ignores ("skip.header.line.count"="1")

2015-10-26 Thread Daniel Haviv
Hi,
I have a csv table in Hive which is configured to skip the header row using
TBLPROPERTIES("skip.header.line.count"="1").
When querying from Hive the header row is not included in the data, but
when running the same query via HiveContext I get the header row.

I made sure that HiveContext sees the skip.header.line.count setting by
running "show create table"

Any ideas?

Thank you.
Daniel


Generated ORC files cause NPE in Hive

2015-10-13 Thread Daniel Haviv
Hi,
We are inserting streaming data into a hive orc table via a simple insert
statement passed to HiveContext.
When trying to read the files generated using Hive 1.2.1 we are getting NPE:
at
org.apache.hadoop.hive.ql.exec.tez.MapRecordSource.processRow(MapRecordSource.java:91)
at
org.apache.hadoop.hive.ql.exec.tez.MapRecordSource.pushRecord(MapRecordSource.java:68)
at
org.apache.hadoop.hive.ql.exec.tez.MapRecordProcessor.run(MapRecordProcessor.java:290)
at
org.apache.hadoop.hive.ql.exec.tez.TezProcessor.initializeAndRunProcessor(TezProcessor.java:148)
... 14 more
Caused by: org.apache.hadoop.hive.ql.metadata.HiveException: Hive Runtime
Error while processing row
at
org.apache.hadoop.hive.ql.exec.vector.VectorMapOperator.process(VectorMapOperator.java:52)
at
org.apache.hadoop.hive.ql.exec.tez.MapRecordSource.processRow(MapRecordSource.java:83)
... 17 more
Caused by: java.lang.NullPointerException
at java.lang.System.arraycopy(Native Method)
at org.apache.hadoop.io.Text.set(Text.java:225)
at
org.apache.hadoop.hive.ql.exec.vector.VectorExtractRow$StringExtractorByValue.extract(VectorExtractRow.java:472)
at
org.apache.hadoop.hive.ql.exec.vector.VectorExtractRow.extractRow(VectorExtractRow.java:732)
at
org.apache.hadoop.hive.ql.exec.vector.VectorReduceSinkOperator.process(VectorReduceSinkOperator.java:102)
at
org.apache.hadoop.hive.ql.exec.Operator.forward(Operator.java:837)
at
org.apache.hadoop.hive.ql.exec.vector.VectorSelectOperator.process(VectorSelectOperator.java:138)
at
org.apache.hadoop.hive.ql.exec.Operator.forward(Operator.java:837)
at
org.apache.hadoop.hive.ql.exec.TableScanOperator.process(TableScanOperator.java:97)
at
org.apache.hadoop.hive.ql.exec.MapOperator$MapOpCtx.forward(MapOperator.java:162)
at
org.apache.hadoop.hive.ql.exec.vector.VectorMapOperator.process(VectorMapOperator.java:45)
... 18 more

Is this a known issue ?


Re: SQLContext within foreachRDD

2015-10-12 Thread Daniel Haviv
Just wanted to make sure.

Thanks.
Daniel

On Mon, Oct 12, 2015 at 1:07 PM, Adrian Tanase <atan...@adobe.com> wrote:

> Not really, unless you’re doing something wrong (e.g. Call collect or
> similar).
>
> In the foreach loop you’re typically registering a temp table, by
> converting an RDD to data frame. All the subsequent queries are executed in
> parallel on the workers.
>
> I haven’t built production apps with this pattern but I have successfully
> built a prototype where I execute dynamic SQL on top of a 15 minute window
> (obtained with .window on the Dstream) - and it works as expected.
>
> Check this out for code example:
> https://github.com/databricks/reference-apps/blob/master/logs_analyzer/chapter1/scala/src/main/scala/com/databricks/apps/logs/chapter1/LogAnalyzerStreamingSQL.scala
>
> -adrian
>
> From: Daniel Haviv
> Date: Monday, October 12, 2015 at 12:52 PM
> To: user
> Subject: SQLContext within foreachRDD
>
> Hi,
> As things that run inside foreachRDD run at the driver, does that mean
> that if we use SQLContext inside foreachRDD the data is sent back to the
> driver and only then the query is executed or is it executed at the
> executors?
>
>
> Thank you.
> Daniel
>
>
>


SQLContext within foreachRDD

2015-10-12 Thread Daniel Haviv
Hi,
As things that run inside foreachRDD run at the driver, does that mean that
if we use SQLContext inside foreachRDD the data is sent back to the driver
and only then the query is executed or is it executed at the executors?


Thank you.
Daniel


Re: Insert via HiveContext is slow

2015-10-09 Thread Daniel Haviv
Thanks Hao.
It seems like one issue.
The other issue to me seems the renaming of files at the end of the insert.
would DF.save perform the task better?

Thanks,
Daniel

On Fri, Oct 9, 2015 at 3:35 AM, Cheng, Hao <hao.ch...@intel.com> wrote:

> I think that’s a known performance issue(Compared to Hive) of Spark SQL in
> multi-inserts.
>
> A workaround is create a temp cached table for the projection first, and
> then do the multiple inserts base on the cached table.
>
>
>
> We are actually working on the POC of some similar cases, hopefully it
> comes out soon.
>
>
>
> Hao
>
>
>
> *From:* Daniel Haviv [mailto:daniel.ha...@veracity-group.com]
> *Sent:* Friday, October 9, 2015 3:08 AM
> *To:* user
> *Subject:* Re: Insert via HiveContext is slow
>
>
>
> Forgot to mention that my insert is a multi table insert :
>
> sqlContext2.sql("""from avro_events
>
>lateral view explode(usChnlList) usParamLine as usParamLine
>
>lateral view explode(dsChnlList) dsParamLine as dsParamLine
>
>insert into table UpStreamParam partition(day_ts, cmtsid)
>
>select cmtstimestamp,datats,macaddress,
>
> usParamLine['chnlidx'] chnlidx,
>
> usParamLine['modulation'] modulation,
>
> usParamLine['severity'] severity,
>
> usParamLine['rxpower'] rxpower,
>
> usParamLine['sigqnoise'] sigqnoise,
>
> usParamLine['noisedeviation'] noisedeviation,
>
> usParamLine['prefecber'] prefecber,
>
> usParamLine['postfecber'] postfecber,
>
> usParamLine['txpower'] txpower,
>
> usParamLine['txpowerdrop'] txpowerdrop,
>
> usParamLine['nmter'] nmter,
>
> usParamLine['premtter'] premtter,
>
> usParamLine['postmtter'] postmtter,
>
> usParamLine['unerroreds'] unerroreds,
>
> usParamLine['corrected'] corrected,
>
> usParamLine['uncorrectables'] uncorrectables,
>
> from_unixtime(cast(datats/1000 as bigint),'MMdd')
> day_ts,
>
> cmtsid
>
>insert into table DwnStreamParam partition(day_ts, cmtsid)
>
>select  cmtstimestamp,datats,macaddress,
>
> dsParamLine['chnlidx'] chnlidx,
>
> dsParamLine['modulation'] modulation,
>
> dsParamLine['severity'] severity,
>
> dsParamLine['rxpower'] rxpower,
>
> dsParamLine['sigqnoise'] sigqnoise,
>
> dsParamLine['noisedeviation'] noisedeviation,
>
> dsParamLine['prefecber'] prefecber,
>
> dsParamLine['postfecber'] postfecber,
>
> dsParamLine['sigqrxmer'] sigqrxmer,
>
> dsParamLine['sigqmicroreflection'] sigqmicroreflection,
>
> dsParamLine['unerroreds'] unerroreds,
>
> dsParamLine['corrected'] corrected,
>
> dsParamLine['uncorrectables'] uncorrectables,
>
> from_unixtime(cast(datats/1000 as bigint),'MMdd')
> day_ts,
>
> cmtsid
>
>
> """)
>
>
>
>
>
>
>
> On Thu, Oct 8, 2015 at 9:51 PM, Daniel Haviv <
> daniel.ha...@veracity-group.com> wrote:
>
> Hi,
>
> I'm inserting into a partitioned ORC table using an insert sql statement
> passed via HiveContext.
>
> The performance I'm getting is pretty bad and I was wondering if there are
> ways to speed things up.
>
> Would saving the DF like this 
> df.write().mode(SaveMode.Append).partitionBy("date").saveAsTable("Tablename")
> be faster ?
>
>
>
>
>
> Thank you.
>
> Daniel
>
>
>


Re: Insert via HiveContext is slow

2015-10-08 Thread Daniel Haviv
Forgot to mention that my insert is a multi table insert :
sqlContext2.sql("""from avro_events
   lateral view explode(usChnlList) usParamLine as usParamLine
   lateral view explode(dsChnlList) dsParamLine as dsParamLine
   insert into table UpStreamParam partition(day_ts, cmtsid)
   select cmtstimestamp,datats,macaddress,
usParamLine['chnlidx'] chnlidx,
usParamLine['modulation'] modulation,
usParamLine['severity'] severity,
usParamLine['rxpower'] rxpower,
usParamLine['sigqnoise'] sigqnoise,
usParamLine['noisedeviation'] noisedeviation,
usParamLine['prefecber'] prefecber,
usParamLine['postfecber'] postfecber,
usParamLine['txpower'] txpower,
usParamLine['txpowerdrop'] txpowerdrop,
usParamLine['nmter'] nmter,
usParamLine['premtter'] premtter,
usParamLine['postmtter'] postmtter,
usParamLine['unerroreds'] unerroreds,
usParamLine['corrected'] corrected,
usParamLine['uncorrectables'] uncorrectables,
from_unixtime(cast(datats/1000 as bigint),'MMdd')
day_ts,
cmtsid
   insert into table DwnStreamParam partition(day_ts, cmtsid)
   select  cmtstimestamp,datats,macaddress,
dsParamLine['chnlidx'] chnlidx,
dsParamLine['modulation'] modulation,
dsParamLine['severity'] severity,
dsParamLine['rxpower'] rxpower,
dsParamLine['sigqnoise'] sigqnoise,
dsParamLine['noisedeviation'] noisedeviation,
dsParamLine['prefecber'] prefecber,
dsParamLine['postfecber'] postfecber,
dsParamLine['sigqrxmer'] sigqrxmer,
dsParamLine['sigqmicroreflection'] sigqmicroreflection,
dsParamLine['unerroreds'] unerroreds,
dsParamLine['corrected'] corrected,
dsParamLine['uncorrectables'] uncorrectables,
from_unixtime(cast(datats/1000 as bigint),'MMdd')
day_ts,
cmtsid
""")



On Thu, Oct 8, 2015 at 9:51 PM, Daniel Haviv <
daniel.ha...@veracity-group.com> wrote:

> Hi,
> I'm inserting into a partitioned ORC table using an insert sql statement
> passed via HiveContext.
> The performance I'm getting is pretty bad and I was wondering if there are
> ways to speed things up.
> Would saving the DF like this 
> df.write().mode(SaveMode.Append).partitionBy("date").saveAsTable("Tablename")
> be faster ?
>
>
> Thank you.
> Daniel
>


Insert via HiveContext is slow

2015-10-08 Thread Daniel Haviv
Hi,
I'm inserting into a partitioned ORC table using an insert sql statement
passed via HiveContext.
The performance I'm getting is pretty bad and I was wondering if there are
ways to speed things up.
Would saving the DF like this
df.write().mode(SaveMode.Append).partitionBy("date").saveAsTable("Tablename")
be faster ?


Thank you.
Daniel


Converting a DStream to schemaRDD

2015-09-29 Thread Daniel Haviv
Hi,
I have a DStream which is a stream of RDD[String].

How can I pass a DStream to sqlContext.jsonRDD and work with it as a DF ?

Thank you.
Daniel


Re: java.io.NotSerializableException: org.apache.avro.Schema$RecordSchema

2015-09-25 Thread Daniel Haviv
I tried but I'm getting the same error (task not serializable)

> On 25 בספט׳ 2015, at 20:10, Ted Yu <yuzhih...@gmail.com> wrote:
> 
> Is the Schema.parse() call expensive ?
> 
> Can you call it in the closure ?
> 
>> On Fri, Sep 25, 2015 at 10:06 AM, Daniel Haviv 
>> <daniel.ha...@veracity-group.com> wrote:
>> Hi,
>> I'm getting a NotSerializableException even though I'm creating all the my 
>> objects from within the closure:
>>  import org.apache.avro.generic.GenericDatumReader
>>  import   java.io.File
>> import org.apache.avro._
>> 
>>  val orig_schema = Schema.parse(new File("/home/wasabi/schema"))
>>   val READER = new GenericDatumReader[GenericRecord](schema)
>> 
>> val bd = sc.broadcast(orig_schema.toString)
>> 
>> 
>>  val rdd=sc.binaryFiles("/daniel").map(zibi => {  
>>  val schema_obj =  new Schema.Parser
>>  val schema2 = schema_obj.parse(bd.value)
>> })
>> 
>> I think that the problem is that Schema itself is an abstract class with 
>> static methods (such as Parser).
>> 
>> Am I correct? 
>> How can I overcome it ?
>> 
>> Thank you.
>> Daniel
> 


java.io.NotSerializableException: org.apache.avro.Schema$RecordSchema

2015-09-25 Thread Daniel Haviv
Hi,
I'm getting a NotSerializableException even though I'm creating all the my
objects from within the closure:
 import org.apache.avro.generic.GenericDatumReader
 import   java.io.File
import org.apache.avro._

 val orig_schema = Schema.parse(new File("/home/wasabi/schema"))
  val READER = new GenericDatumReader[GenericRecord](schema)

val bd = sc.broadcast(orig_schema.toString)


 val rdd=sc.binaryFiles("/daniel").map(zibi => {
 val schema_obj =  new Schema.Parser
 val schema2 = schema_obj.parse(bd.value)
})

I think that the problem is that Schema itself is an abstract class with
static methods (such as Parser).

Am I correct?
How can I overcome it ?

Thank you.
Daniel


Reading avro data using KafkaUtils.createDirectStream

2015-09-24 Thread Daniel Haviv
Hi,
I'm trying to use KafkaUtils.createDirectStream to read avro messages from
Kafka but something is off with my type arguments:

val avroStream = KafkaUtils.createDirectStream[AvroKey[GenericRecord],
GenericRecord, NullWritable, AvroInputFormat[GenericRecord]](ssc,
kafkaParams, topicSet)

I'm getting the following error:
:47: error: type arguments
[org.apache.avro.mapred.AvroKey[org.apache.avro.generic.GenericRecord],org.apache.avro.generic.GenericRecord,org.apache.hadoop.io.NullWritable,org.apache.avro.mapred.AvroInputFormat[org.apache.avro.generic.GenericRecord]]
conform to the bounds of none of the overloaded alternatives of
 value createDirectStream: [K, V, KD <: kafka.serializer.Decoder[K], VD <:
kafka.serializer.Decoder[V]](jssc:
org.apache.spark.streaming.api.java.JavaStreamingContext, keyClass:
Class[K], valueClass: Class[V], keyDecoderClass: Class[KD],
valueDecoderClass: Class[VD], kafkaParams: java.util.Map[String,String],
topics:
java.util.Set[String])org.apache.spark.streaming.api.java.JavaPairInputDStream[K,V]
 [K, V, KD <: kafka.serializer.Decoder[K], VD <:
kafka.serializer.Decoder[V]](ssc:
org.apache.spark.streaming.StreamingContext, kafkaParams:
Map[String,String], topics: Set[String])(implicit evidence$19:
scala.reflect.ClassTag[K], implicit evidence$20: scala.reflect.ClassTag[V],
implicit evidence$21: scala.reflect.ClassTag[KD], implicit evidence$22:
scala.reflect.ClassTag[VD])org.apache.spark.streaming.dstream.InputDStream[(K,
V)]

What am I doing wrong?

Thank you.
Daniel


Re: spark-avro takes a lot time to load thousands of files

2015-09-22 Thread Daniel Haviv
I Agree but it's a constraint I have to deal with.
The idea is load these files and merge them into ORC.
When using hive on Tez it takes less than a minute. 

Daniel

> On 22 בספט׳ 2015, at 16:00, Jonathan Coveney <jcove...@gmail.com> wrote:
> 
> having a file per record is pretty inefficient on almost any file system
> 
> El martes, 22 de septiembre de 2015, Daniel Haviv 
> <daniel.ha...@veracity-group.com> escribió:
>> Hi,
>> We are trying to load around 10k avro files (each file holds only one 
>> record) using spark-avro but it takes over 15 minutes to load.
>> It seems that most of the work is being done at the driver where it created 
>> a broadcast variable for each file.
>> 
>> Any idea why is it behaving that way ?
>> Thank you.
>> Daniel


spark-avro takes a lot time to load thousands of files

2015-09-22 Thread Daniel Haviv
Hi,
We are trying to load around 10k avro files (each file holds only one
record) using spark-avro but it takes over 15 minutes to load.
It seems that most of the work is being done at the driver where it created
a broadcast variable for each file.

Any idea why is it behaving that way ?
Thank you.
Daniel


sqlContext.read.avro broadcasting files from the driver

2015-09-21 Thread Daniel Haviv
Hi,
I'm loading a 1000 files using the spark-avro package:
val df = sqlContext.read.avro(*"/incoming/"*)

When I'm performing an action on this df it seems like for each file a
broadcast is being created and is sent to the workers (instead of the
workers reading their data-local files):

scala> df.coalesce(4).count
15/09/21 15:11:32 INFO storage.MemoryStore: ensureFreeSpace(261920) called
with curMem=0, maxMem=2223023063
15/09/21 15:11:32 INFO storage.MemoryStore: Block broadcast_0 stored as
values in memory (estimated size 255.8 KB, free 2.1 GB)
15/09/21 15:11:32 INFO storage.MemoryStore: ensureFreeSpace(22987) called
with curMem=261920, maxMem=2223023063
15/09/21 15:11:32 INFO storage.MemoryStore: Block broadcast_0_piece0 stored
as bytes in memory (estimated size 22.4 KB, free 2.1 GB)
15/09/21 15:11:32 INFO storage.BlockManagerInfo: Added broadcast_0_piece0
in memory on 192.168.3.4:39736 (size: 22.4 KB, free: 2.1 GB)



15/09/21 15:12:45 INFO storage.MemoryStore: ensureFreeSpace(22987) called
with curMem=294913622, maxMem=2223023063
15/09/21 15:12:45 INFO storage.MemoryStore: Block
*broadcast_1034_piece0 *stored
as bytes in memory (estimated size 22.4 KB, free 1838.8 MB)
15/09/21 15:12:45 INFO storage.BlockManagerInfo: Added
broadcast_1034_piece0 in memory on 192.168.3.4:39736 (size: 22.4 KB, free:
2.0 GB)
15/09/21 15:12:45 INFO spark.SparkContext: Created broadcast 1034 from
hadoopFile at AvroRelation.scala:121
15/09/21 15:12:46 INFO execution.Exchange: Using SparkSqlSerializer2.
15/09/21 15:12:46 INFO spark.SparkContext: Starting job: count at
:25

Am I understanding this wrongs?

Thank you.
Daniel


Re: Spark Thrift Server JDBC Drivers

2015-09-17 Thread Daniel Haviv
Thank you!

On Wed, Sep 16, 2015 at 10:29 PM, Dan LaBar <danielrla...@gmail.com> wrote:

> I'm running Spark in EMR, and using the JDBC driver provided by AWS
> <http://docs.aws.amazon.com/ElasticMapReduce/latest/ReleaseGuide/HiveJDBCDriver.html>.
> Don't know if it will work outside of EMR, but it's worth a try.
>
> I've also used the ODBC driver from Hortonworks
> <http://hortonworks.com/products/releases/hdp-2-2/#add_ons>.
>
> Regards,
> Dan
>
> On Wed, Sep 16, 2015 at 8:34 AM, Daniel Haviv <
> daniel.ha...@veracity-group.com> wrote:
>
>> Hi,
>> are there any free JDBC drivers for thrift ?
>> The only ones I could find are Simba's which require a license.
>>
>> Thank,
>> Daniel
>>
>
>


Spark Thrift Server JDBC Drivers

2015-09-16 Thread Daniel Haviv
Hi,
are there any free JDBC drivers for thrift ?
The only ones I could find are Simba's which require a license.

Thank,
Daniel


Re: Parsing Avro from Kafka Message

2015-09-04 Thread Daniel Haviv
This will create and RDD[String] and what I want is a DF based on the avro 
schema.

Thank you Akhil.
Sent from my iPhone

> On 4 בספט׳ 2015, at 15:05, Akhil Das <ak...@sigmoidanalytics.com> wrote:
> 
> Something like this?
> 
> val avroStream = KafkaUtils.createDirectStream[AvroKey[GenericRecord], 
> NullWritable, AvroKeyInputFormat[GenericRecord]](..)
> 
> val avroData = avroStream.map(x => x._1.datum().toString)
> 
> 
> Thanks
> Best Regards
> 
>> On Thu, Sep 3, 2015 at 6:17 PM, Daniel Haviv 
>> <daniel.ha...@veracity-group.com> wrote:
>> Hi,
>> I'm reading messages from Kafka where the value is an avro file.
>> I would like to parse the contents of the message and work with it as a 
>> DataFrame, like with the spark-avro package but instead of files, pass it a 
>> RDD.
>> 
>> How can this be achieved ?
>> 
>> Thank you.
>> Daniel
> 


Parsing Avro from Kafka Message

2015-09-03 Thread Daniel Haviv
Hi,
I'm reading messages from Kafka where the value is an avro file.
I would like to parse the contents of the message and work with it as a
DataFrame, like with the spark-avro package but instead of files, pass it a
RDD.

How can this be achieved ?

Thank you.
Daniel


Starting a service with Spark Executors

2015-08-09 Thread Daniel Haviv
Hi,
I'd like to start a service with each Spark Executor upon initalization and
have the disributed code reference that service locally.
What I'm trying to do is to invoke torch7 computations without reloading
the model for each row by starting Lua http handler that will recieve http
requests for each row in my data.

Can this be achieved with Spark ?

Thank you.
Daniel


Re: Starting Spark SQL thrift server from within a streaming app

2015-08-06 Thread Daniel Haviv
Thank you Todd,
How is the sparkstreaming-sql project different from starting a thrift
server on a streaming app ?

Thanks again.
Daniel


On Thu, Aug 6, 2015 at 1:53 AM, Todd Nist tsind...@gmail.com wrote:

 Hi Danniel,

 It is possible to create an instance of the SparkSQL Thrift server,
 however seems like this project is what you may be looking for:

 https://github.com/Intel-bigdata/spark-streamingsql

 Not 100% sure of your use case is, but you can always convert the data
 into DF then issue a query against it.  If you want other systems to be
 able to query it then there are numerous connectors to  store data into
 Hive, Cassandra, HBase, ElasticSearch, 

 To create a instance of a thrift server with its own SQL Context you would
 do something like the following:

 import org.apache.spark.{SparkConf, SparkContext}

 import org.apache.spark.sql.hive.HiveContext
 import org.apache.spark.sql.hive.HiveMetastoreTypes._
 import org.apache.spark.sql.types._
 import org.apache.spark.sql.hive.thriftserver._


 object MyThriftServer {

   val sparkConf = new SparkConf()
 // master is passed to spark-submit, but could also be specified 
 explicitely
 // .setMaster(sparkMaster)
 .setAppName(My ThriftServer)
 .set(spark.cores.max, 2)
   val sc = new SparkContext(sparkConf)
   val  sparkContext  =  sc
   import  sparkContext._
   val  sqlContext  =  new  HiveContext(sparkContext)
   import  sqlContext._
   import sqlContext.implicits._

   makeRDD((1,hello) :: (2,world) 
 ::Nil).toDF.cache().registerTempTable(t)

   HiveThriftServer2.startWithContext(sqlContext)
 }

 Again, I'm not really clear what your use case is, but it does sound like
 the first link above is what you may want.

 -Todd

 On Wed, Aug 5, 2015 at 1:57 PM, Daniel Haviv 
 daniel.ha...@veracity-group.com wrote:

 Hi,
 Is it possible to start the Spark SQL thrift server from with a streaming
 app so the streamed data could be queried as it's goes in ?

 Thank you.
 Daniel





Starting Spark SQL thrift server from within a streaming app

2015-08-05 Thread Daniel Haviv
Hi,
Is it possible to start the Spark SQL thrift server from with a streaming app 
so the streamed data could be queried as it's goes in ?

Thank you.
Daniel

Local Repartition

2015-07-20 Thread Daniel Haviv
Hi,
My data is constructed from a lot of small files which results in a lot of
partitions per RDD.
Is there some way to locally repartition the RDD without shuffling so that
all of the partitions that reside on a specific node will become X
partitions on the same node ?

Thank you.
Daniel


Re: Local Repartition

2015-07-20 Thread Daniel Haviv
Thanks Doug,
coalesce might invoke a shuffle as well.
I don't think what I'm suggesting is a feature but it definitely should be.

Daniel

On Mon, Jul 20, 2015 at 4:15 PM, Doug Balog d...@balog.net wrote:

 Hi Daniel,
 Take a look at .coalesce()
 I’ve seen good results by coalescing to num executors * 10, but I’m still
 trying to figure out the
 optimal number of partitions per executor.
 To get the number of executors,
 sc.getConf.getInt(“spark.executor.instances”,-1)


 Cheers,

 Doug

  On Jul 20, 2015, at 5:04 AM, Daniel Haviv 
 daniel.ha...@veracity-group.com wrote:
 
  Hi,
  My data is constructed from a lot of small files which results in a lot
 of partitions per RDD.
  Is there some way to locally repartition the RDD without shuffling so
 that all of the partitions that reside on a specific node will become X
 partitions on the same node ?
 
  Thank you.
  Daniel




Re: Local Repartition

2015-07-20 Thread Daniel Haviv
Great explanation.

Thanks guys!

Daniel

 On 20 ביולי 2015, at 18:12, Silvio Fiorito silvio.fior...@granturing.com 
 wrote:
 
 Hi Daniel,
 
 Coalesce, by default will not cause a shuffle. The second parameter when set 
 to true will cause a full shuffle. This is actually what repartition does 
 (calls coalesce with shuffle=true).
 
 It will attempt to keep colocated partitions together (as you describe) on 
 the same executor. What may happen is you lose data locality if you reduce 
 the partitions to fewer than the number of executors. You obviously also 
 reduce parallelism so you need to be aware of that as you decide when to call 
 coalesce.
 
 Thanks,
 Silvio
 
 From: Daniel Haviv
 Date: Monday, July 20, 2015 at 4:59 PM
 To: Doug Balog
 Cc: user
 Subject: Re: Local Repartition
 
 Thanks Doug,
 coalesce might invoke a shuffle as well. 
 I don't think what I'm suggesting is a feature but it definitely should be.
 
 Daniel
 
 On Mon, Jul 20, 2015 at 4:15 PM, Doug Balog d...@balog.net wrote:
 Hi Daniel,
 Take a look at .coalesce()
 I’ve seen good results by coalescing to num executors * 10, but I’m still 
 trying to figure out the
 optimal number of partitions per executor.
 To get the number of executors, 
 sc.getConf.getInt(“spark.executor.instances”,-1)
 
 
 Cheers,
 
 Doug
 
  On Jul 20, 2015, at 5:04 AM, Daniel Haviv 
  daniel.ha...@veracity-group.com wrote:
 
  Hi,
  My data is constructed from a lot of small files which results in a lot of 
  partitions per RDD.
  Is there some way to locally repartition the RDD without shuffling so that 
  all of the partitions that reside on a specific node will become X 
  partitions on the same node ?
 
  Thank you.
  Daniel
 


DataFrame insertInto fails, saveAsTable works (Azure HDInsight)

2015-07-09 Thread Daniel Haviv
Hi,
I'm running Spark 1.4 on Azure.
DataFrame's insertInto fails, but when saveAsTable works.
It seems like some issue with accessing Azure's blob storage but that
doesn't explain why one type of write works and the other doesn't.

This is the stack trace:

Caused by: org.apache.hadoop.fs.azure.AzureException:
org.apache.hadoop.fs.azure.KeyProviderException: Unable to load key
provider class.

at
org.apache.hadoop.fs.azure.AzureNativeFileSystemStore.createAzureStorageSession(AzureNativeFileSystemStore.java:938)

at
org.apache.hadoop.fs.azure.AzureNativeFileSystemStore.initialize(AzureNativeFileSystemStore.java:438)

at
org.apache.hadoop.fs.azure.NativeAzureFileSystem.initialize(NativeAzureFileSystem.java:1048)

at
org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2596)

at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:91)

at
org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2630)

at
org.apache.hadoop.fs.FileSystem$Cache.getUnique(FileSystem.java:2618)

at org.apache.hadoop.fs.FileSystem.newInstance(FileSystem.java:417)

at
org.apache.hadoop.hive.shims.Hadoop23Shims.getNonCachedFileSystem(Hadoop23Shims.java:574)

at
org.apache.hadoop.hive.ql.exec.Utilities.createDirsWithPermission(Utilities.java:3424)

at
org.apache.hadoop.hive.ql.exec.Utilities.createDirsWithPermission(Utilities.java:3396)

at org.apache.hadoop.hive.ql.Context.getScratchDir(Context.java:214)

... 59 more

Caused by: org.apache.hadoop.fs.azure.KeyProviderException: Unable to load
key provider class.

at
org.apache.hadoop.fs.azure.AzureNativeFileSystemStore.getAccountKeyFromConfiguration(AzureNativeFileSystemStore.java:829)

at
org.apache.hadoop.fs.azure.AzureNativeFileSystemStore.createAzureStorageSession(AzureNativeFileSystemStore.java:917)

... 70 more

Caused by: java.lang.ClassNotFoundException: Class
org.apache.hadoop.fs.azure.ShellDecryptionKeyProvider not found

at
org.apache.hadoop.conf.Configuration.getClassByName(Configuration.java:1980)

at
org.apache.hadoop.fs.azure.AzureNativeFileSystemStore.getAccountKeyFromConfiguration(AzureNativeFileSystemStore.java:826)

... 71 more


Thanks,

Daniel


Re: thrift-server does not load jars files (Azure HDInsight)

2015-07-08 Thread Daniel Haviv
Hi,
Just updating back that  setting spark.driver.extraClassPath worked.

Thanks,
Daniel

On Fri, Jul 3, 2015 at 5:35 PM, Ted Yu yuzhih...@gmail.com wrote:

 Alternatively, setting spark.driver.extraClassPath should work.

 Cheers

 On Fri, Jul 3, 2015 at 2:59 AM, Steve Loughran ste...@hortonworks.com
 wrote:


 On Thu, Jul 2, 2015 at 7:38 AM, Daniel Haviv 
 daniel.ha...@veracity-group.com wrote:

 Hi,
 I'm trying to start the thrift-server and passing it azure's blob
 storage jars but I'm failing on :
  Caused by: java.io.IOException: No FileSystem for scheme: wasb
 at
 org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:2584)
 at
 org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2591)
 at
 org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:91)
 at
 org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2630)
 at
 org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2612)
 at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:370)
 at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:169)
 at
 org.apache.hadoop.hive.ql.session.SessionState.start(SessionState.java:342)
 ... 16 more

  If I start the spark-shell the same way, everything works fine.

  spark-shell command:
   ./bin/spark-shell --master yarn --jars
 /home/hdiuser/azureclass/azure-storage-1.2.0.jar,/home/hdiuser/azureclass/hadoop-azure-2.7.0.jar
 --num-executors 4

  thrift-server command:
  ./sbin/start-thriftserver.sh --master yarn--jars
 /home/hdiuser/azureclass/azure-storage-1.2.0.jar,/home/hdiuser/azureclass/hadoop-azure-2.7.0.jar
 --num-executors 4

  How can I pass dependency jars to the thrift server?

  Thanks,
 Daniel




  you should be able to add the JARs to the environment variable
 SPARK_SUBMIT_CLASSPATH or SPARK_CLASSPATH and have them picked up when
 bin/compute-classpath.{cmd.sh} builds up the classpath






Re: Starting Spark without automatically starting HiveContext

2015-07-03 Thread Daniel Haviv
The main reason is Spark's startup time and the need to configure a component I 
don't really need (without  configs the hivecontext takes  more time to load)

Thanks,
Daniel

 On 3 ביולי 2015, at 11:13, Robin East robin.e...@xense.co.uk wrote:
 
 As Akhil mentioned there isn’t AFAIK any kind of initialisation to stop the 
 SQLContext being created. If you could articulate why you would need to do 
 this (it’s not obvious to me what the benefit would be) then maybe this is 
 something that could be included as a feature in a future release. It may 
 also suggest a way to a workaround.
 
 On 3 Jul 2015, at 08:33, Daniel Haviv daniel.ha...@veracity-group.com 
 wrote:
 
 Thanks
 I was looking for a less hack-ish way :)
 
 Daniel
 
 On Fri, Jul 3, 2015 at 10:15 AM, Akhil Das ak...@sigmoidanalytics.com 
 wrote:
 With binary i think it might not be possible, although if you can download 
 the sources and then build it then you can remove this function which 
 initializes the SQLContext.
 
 Thanks
 Best Regards
 
 On Thu, Jul 2, 2015 at 6:11 PM, Daniel Haviv 
 daniel.ha...@veracity-group.com wrote:
 Hi,
 I've downloaded the pre-built binaries for Hadoop 2.6 and whenever I start 
 the spark-shell it always start with HiveContext.
 
 How can I disable the HiveContext from being initialized automatically ?
 
 Thanks,
 Daniel
 


Re: Starting Spark without automatically starting HiveContext

2015-07-03 Thread Daniel Haviv
Thanks
I was looking for a less hack-ish way :)

Daniel

On Fri, Jul 3, 2015 at 10:15 AM, Akhil Das ak...@sigmoidanalytics.com
wrote:

 With binary i think it might not be possible, although if you can download
 the sources and then build it then you can remove this function
 https://github.com/apache/spark/blob/master/repl/scala-2.10/src/main/scala/org/apache/spark/repl/SparkILoop.scala#L1023
 which initializes the SQLContext.

 Thanks
 Best Regards

 On Thu, Jul 2, 2015 at 6:11 PM, Daniel Haviv 
 daniel.ha...@veracity-group.com wrote:

 Hi,
 I've downloaded the pre-built binaries for Hadoop 2.6 and whenever I
 start the spark-shell it always start with HiveContext.

 How can I disable the HiveContext from being initialized automatically ?

 Thanks,
 Daniel





Starting Spark without automatically starting HiveContext

2015-07-02 Thread Daniel Haviv
Hi,
I've downloaded the pre-built binaries for Hadoop 2.6 and whenever I start
the spark-shell it always start with HiveContext.

How can I disable the HiveContext from being initialized automatically ?

Thanks,
Daniel


thrift-server does not load jars files (Azure HDInsight)

2015-07-02 Thread Daniel Haviv
Hi,
I'm trying to start the thrift-server and passing it azure's blob storage
jars but I'm failing on :
Caused by: java.io.IOException: No FileSystem for scheme: wasb
at
org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:2584)
at
org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2591)
at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:91)
at
org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2630)
at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2612)
at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:370)
at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:169)
at
org.apache.hadoop.hive.ql.session.SessionState.start(SessionState.java:342)
... 16 more

If I start the spark-shell the same way, everything works fine.

spark-shell command:
 ./bin/spark-shell --master yarn --jars
/home/hdiuser/azureclass/azure-storage-1.2.0.jar,/home/hdiuser/azureclass/hadoop-azure-2.7.0.jar
--num-executors 4

thrift-server command:
 ./sbin/start-thriftserver.sh --master yarn--jars
/home/hdiuser/azureclass/azure-storage-1.2.0.jar,/home/hdiuser/azureclass/hadoop-azure-2.7.0.jar
--num-executors 4

How can I pass dependency jars to the thrift server?

Thanks,
Daniel


Re: thrift-server does not load jars files (Azure HDInsight)

2015-07-02 Thread Daniel Haviv
Hi,
I'm using 1.4.
It's indeed a typo in the email itself.

Thanks,
Daniel

On Thu, Jul 2, 2015 at 6:06 PM, Ted Yu yuzhih...@gmail.com wrote:

 Which Spark release are you using ?

 bq. yarn--jars

 I guess the above was just a typo in your email (missing space).

 Cheers

 On Thu, Jul 2, 2015 at 7:38 AM, Daniel Haviv 
 daniel.ha...@veracity-group.com wrote:

 Hi,
 I'm trying to start the thrift-server and passing it azure's blob storage
 jars but I'm failing on :
 Caused by: java.io.IOException: No FileSystem for scheme: wasb
 at
 org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:2584)
 at
 org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2591)
 at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:91)
 at
 org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2630)
 at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2612)
 at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:370)
 at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:169)
 at
 org.apache.hadoop.hive.ql.session.SessionState.start(SessionState.java:342)
 ... 16 more

 If I start the spark-shell the same way, everything works fine.

 spark-shell command:
  ./bin/spark-shell --master yarn --jars
 /home/hdiuser/azureclass/azure-storage-1.2.0.jar,/home/hdiuser/azureclass/hadoop-azure-2.7.0.jar
 --num-executors 4

 thrift-server command:
  ./sbin/start-thriftserver.sh --master yarn--jars
 /home/hdiuser/azureclass/azure-storage-1.2.0.jar,/home/hdiuser/azureclass/hadoop-azure-2.7.0.jar
 --num-executors 4

 How can I pass dependency jars to the thrift server?

 Thanks,
 Daniel





Re: Using Spark on Azure Blob Storage

2015-06-25 Thread Daniel Haviv
Hi,
This note only speaks of Spark 1.2, is only applicable to Spark on Windows
and it's not possible to use the Thrift server so I was looking for a better 
way to have Spark on Azure.

Thanks,
Daniel

 On 26 ביוני 2015, at 01:38, Jacob Kim jac...@microsoft.com wrote:
 
 Below is the link for step by step guide in how to setup and use Spark in 
 HDInsight.
  
 https://azure.microsoft.com/en-us/documentation/articles/hdinsight-hadoop-spark-install/
  
 Jacob
  
 From: Daniel Haviv [mailto:daniel.ha...@veracity-group.com] 
 Sent: Thursday, June 25, 2015 3:19 PM
 To: Silvio Fiorito
 Cc: user@spark.apache.org
 Subject: Re: Using Spark on Azure Blob Storage
  
 Thank you guys for the helpful answers.
  
 Daniel
  
 On 25 ביוני 2015, at 21:23, Silvio Fiorito silvio.fior...@granturing.com 
 wrote:
 
 Hi Daniel,
  
 As Peter pointed out you need the hadoop-azure JAR as well as the Azure 
 storage SDK for Java (com.microsoft.azure:azure-storage). Even though the 
 WASB driver is built for 2.7, I was still able to use the hadoop-azure JAR 
 with Spark built for older Hadoop versions, back to 2.4 I think.
  
 Also, be sure to set your Storage Account key in your Spark Hadoop config, 
 typically in core-site.xml:
  
 property
   namefs.azure.account.key.{accountname}.blob.core.windows.net/name
   value{storage key here}/value
 /property
  
 As a heads up I have a couple projects for Spark on Azure. One is to push 
 data to the Power BI service (both batch and streaming) and I’m finishing up 
 on another project for using Event Hubs as well. The Power BI library is up 
 at http://spark-packages.org/package/granturing/spark-power-bi the Event Hubs 
 library should be up soon.
  
 Thanks,
 Silvio
  
 From: Daniel Haviv
 Date: Thursday, June 25, 2015 at 1:37 PM
 To: user@spark.apache.org
 Subject: Using Spark on Azure Blob Storage
  
 Hi,
 I'm trying to use spark over Azure's HDInsight but the spark-shell fails when 
 starting:
 java.io.IOException: No FileSystem for scheme: wasb
 at 
 org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:2584)
 at 
 org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2591)
 at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:91)
 at 
 org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2630)
  
 Is Azure's blob storage supported ? 
  
 Thanks,
 Daniel


Using Spark on Azure Blob Storage

2015-06-25 Thread Daniel Haviv
Hi,
I'm trying to use spark over Azure's HDInsight but the spark-shell fails
when starting:
java.io.IOException: No FileSystem for scheme: wasb
at
org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:2584)
at
org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2591)
at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:91)
at
org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2630)

Is Azure's blob storage supported ?

Thanks,
Daniel


Re: Using Spark on Azure Blob Storage

2015-06-25 Thread Daniel Haviv
Thank you guys for the helpful answers.

Daniel

 On 25 ביוני 2015, at 21:23, Silvio Fiorito silvio.fior...@granturing.com 
 wrote:
 
 Hi Daniel,
 
 As Peter pointed out you need the hadoop-azure JAR as well as the Azure 
 storage SDK for Java (com.microsoft.azure:azure-storage). Even though the 
 WASB driver is built for 2.7, I was still able to use the hadoop-azure JAR 
 with Spark built for older Hadoop versions, back to 2.4 I think.
 
 Also, be sure to set your Storage Account key in your Spark Hadoop config, 
 typically in core-site.xml:
 
 property
   namefs.azure.account.key.{accountname}.blob.core.windows.net/name
   value{storage key here}/value
 /property
 
 As a heads up I have a couple projects for Spark on Azure. One is to push 
 data to the Power BI service (both batch and streaming) and I’m finishing up 
 on another project for using Event Hubs as well. The Power BI library is up 
 at http://spark-packages.org/package/granturing/spark-power-bi the Event Hubs 
 library should be up soon.
 
 Thanks,
 Silvio
 
 From: Daniel Haviv
 Date: Thursday, June 25, 2015 at 1:37 PM
 To: user@spark.apache.org
 Subject: Using Spark on Azure Blob Storage
 
 Hi,
 I'm trying to use spark over Azure's HDInsight but the spark-shell fails when 
 starting:
 java.io.IOException: No FileSystem for scheme: wasb
 at 
 org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:2584)
 at 
 org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2591)
 at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:91)
 at 
 org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2630)
 
 Is Azure's blob storage supported ? 
 
 Thanks,
 Daniel


Saving RDDs as custom output format

2015-04-14 Thread Daniel Haviv
Hi,
Is it possible to store RDDs as custom output formats, For example ORC?

Thanks,
Daniel


Ensuring data locality when opening files

2015-03-09 Thread Daniel Haviv
Hi,
We wrote a spark steaming app that receives file names on HDFS from Kafka
and opens them using Hadoop's libraries.
The problem with this method is that I'm not utilizing data locality
because any worker might open any file without giving precedence to data
locality.
I can't open the files using sparkContext because it's limited to the
driver class.

Is there a way I could open files at runtime and benefit from data locality?

Thanks,
Daniel


using sparkContext from within a map function (from spark streaming app)

2015-03-08 Thread Daniel Haviv
Hi,
We are designing a solution which pulls file paths from Kafka and for the
current stage just counts the lines in each of these files.
When running the code it fails on:
Exception in thread main org.apache.spark.SparkException: Task not
serializable
at
org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166)
at
org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158)
at org.apache.spark.SparkContext.clean(SparkContext.scala:1478)
at
org.apache.spark.streaming.dstream.DStream.map(DStream.scala:444)
at
org.apache.spark.streaming.api.java.JavaDStreamLike$class.mapToPair(JavaDStreamLike.scala:146)
at
org.apache.spark.streaming.api.java.JavaPairDStream.mapToPair(JavaPairDStream.scala:46)
at streamReader.App.main(App.java:66)

Is using the sparkContext from inside a map function wrong ?

This is the code we are using:
SparkConf conf = new SparkConf().setAppName(Simple
Application).setMaster(spark://namenode:7077);

// KAFKA
final JavaStreamingContext jssc = new
JavaStreamingContext(conf, new Duration(2000));
MapString, Integer topicMap = new HashMapString, Integer();
topicMap.put(uploadedFiles, 1);
JavaPairReceiverInputDStreamString, String messages =
KafkaUtils.createStream(jssc, localhost:2181, group3,
topicMap);


JavaDStreamString files = messages.map(new
FunctionTuple2String, String, String() {

public String call(Tuple2String, String tuple2) {
  return tuple2._2();
}
  });


JavaPairDStreamString, Integer pairs = messages.mapToPair(
new PairFunctionTuple2String, String, String, Integer()
{
 public Tuple2String, Integer call(Tuple2String,
String word) throws Exception
 {
JavaRDDString textfile =
jssc.sparkContext().textFile(word._2());
int test = new Long(textfile.count()).intValue();
return new Tuple2String,
Integer(word._2(), test);
 }
});


System.out.println(Printing Messages:);
pairs.print();

jssc.start();
jssc.awaitTermination();
   jssc.close();

Thanks,
Daniel


Re: Why Parquet Predicate Pushdown doesn't work?

2015-01-06 Thread Daniel Haviv
Quoting Michael:
Predicate push down into the input format is turned off by default because 
there is a bug in the current parquet library that null pointers when there are 
full row groups that are null.

https://issues.apache.org/jira/browse/SPARK-4258

You can turn it on if you want: 
http://spark.apache.org/docs/latest/sql-programming-guide.html#configuration

Daniel

 On 7 בינו׳ 2015, at 08:18, Xuelin Cao xuelin...@yahoo.com.INVALID wrote:
 
 
 Hi,
 
I'm testing parquet file format, and the predicate pushdown is a very 
 useful feature for us.
 
However, it looks like the predicate push down doesn't work after I 
 set 
sqlContext.sql(SET spark.sql.parquet.filterPushdown=true)
  
Here is my sql:
sqlContext.sql(select adId, adTitle  from ad where 
 groupId=10113000).collect
 
Then, I checked the amount of input data on the WEB UI. But the amount 
 of input data is ALWAYS 80.2M regardless whether I turn the 
 spark.sql.parquet.filterPushdown flag on or off.
 
I'm not sure, if there is anything that I must do when generating the 
 parquet file in order to make the predicate pushdown available. (Like ORC 
 file, when creating the ORC file, I need to explicitly sort the field that 
 will be used for predicate pushdown)
 
Anyone have any idea?
 
And, anyone knows the internal mechanism for parquet predicate 
 pushdown?
 
Thanks
 
  


Re: Unable to start Spark 1.3 after building:java.lang. NoClassDefFoundError: org/codehaus/jackson/map/deser/std/StdDeserializer

2014-12-17 Thread Daniel Haviv
Thanks for your replies.
I was building spark from trunk.

Daniel

 On 17 בדצמ׳ 2014, at 19:49, Nicholas Chammas nicholas.cham...@gmail.com 
 wrote:
 
 Thanks for the correction, Sean. Do the docs need to be updated on this 
 point, or is it safer for now just to note 2.4 specifically?
 
 On Wed Dec 17 2014 at 5:54:53 AM Sean Owen so...@cloudera.com wrote:
 Spark works fine with 2.4 *and later*. The docs don't mean to imply
 2.4 is the last supported version.
 
 On Wed, Dec 17, 2014 at 10:19 AM, Nicholas Chammas
 nicholas.cham...@gmail.com wrote:
  Spark 1.3 does not exist. Spark 1.2 hasn't been released just yet. Which
  version of Spark did you mean?
 
  Also, from what I can see in the docs, I believe the latest version of
  Hadoop that Spark supports is 2.4, not 2.6.
 
  Nick
 
  On Wed Dec 17 2014 at 2:09:56 AM Kyle Lin kylelin2...@gmail.com wrote:
 
 
  I also got the same problem..
 
  2014-12-09 22:58 GMT+08:00 Daniel Haviv danielru...@gmail.com:
 
  Hi,
  I've built spark 1.3 with hadoop 2.6 but when I startup the spark-shell I
  get the following exception:
 
  14/12/09 06:54:24 INFO server.AbstractConnector: Started
  SelectChannelConnector@0.0.0.0:4040
  14/12/09 06:54:24 INFO util.Utils: Successfully started service 'SparkUI'
  on port 4040.
  14/12/09 06:54:24 INFO ui.SparkUI: Started SparkUI at http://hdname:4040
  14/12/09 06:54:25 INFO impl.TimelineClientImpl: Timeline service address:
  http://0.0.0.0:8188/ws/v1/timeline/
  java.lang.NoClassDefFoundError:
  org/codehaus/jackson/map/deser/std/StdDeserializer
  at java.lang.ClassLoader.defineClass1(Native Method)
  at java.lang.ClassLoader.defineClass(ClassLoader.java:800)
  at
  java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
 
  Any idea why ?
 
  Thanks,
  Daniel
 
 
 


Could not find the main class: org.apache.spark.deploy.SparkSubmit

2014-12-16 Thread Daniel Haviv
Hi,
I've built spark successfully with maven but when I try to run spark-shell I 
get the following errors:
Spark assembly has been built with Hive, including Datanucleus jars on classpath

Exception in thread main java.lang.NoClassDefFoundError: 
org/apache/spark/deploy/SparkSubmit

Caused by: java.lang.ClassNotFoundException: org.apache.spark.deploy.SparkSubmit

at java.net.URLClassLoader$1.run(URLClassLoader.java:217)

at java.security.AccessController.doPrivileged(Native Method)

at java.net.URLClassLoader.findClass(URLClassLoader.java:205)

at java.lang.ClassLoader.loadClass(ClassLoader.java:323)

at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:294)at 
java.lang.ClassLoader.loadClass(ClassLoader.java:268)Could not find the main 
class: org.apache.spark.deploy.SparkSubmit. Program will exit.

What can be wrong?

Thanks,
Daniel

Re: Could not find the main class: org.apache.spark.deploy.SparkSubmit

2014-12-16 Thread Daniel Haviv
That's the first thing I tried... still the same error:
hdfs@ams-rsrv01:~$ export CLASSPATH=/tmp/spark/spark-branch-1.1/lib
hdfs@ams-rsrv01:~$ cd /tmp/spark/spark-branch-1.1
hdfs@ams-rsrv01:/tmp/spark/spark-branch-1.1$ ./bin/spark-shell

Spark assembly has been built with Hive, including Datanucleus jars on
classpath
Exception in thread main java.lang.NoClassDefFoundError:
org/apache/spark/deploy/SparkSubmit
Caused by: java.lang.ClassNotFoundException:
org.apache.spark.deploy.SparkSubmit
at java.net.URLClassLoader$1.run(URLClassLoader.java:217)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:205)
at java.lang.ClassLoader.loadClass(ClassLoader.java:323)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:294)
at java.lang.ClassLoader.loadClass(ClassLoader.java:268)
Could not find the main class: org.apache.spark.deploy.SparkSubmit. Program
will exit.

Thanks,
Daniel

On Tue, Dec 16, 2014 at 1:53 PM, Akhil Das ak...@sigmoidanalytics.com
wrote:

 print the CLASSPATH and make sure the spark assembly jar is there in the
 classpath

 Thanks
 Best Regards

 On Tue, Dec 16, 2014 at 5:04 PM, Daniel Haviv danielru...@gmail.com
 wrote:

 Hi,
 I've built spark successfully with maven but when I try to run
 spark-shell I get the following errors:

 Spark assembly has been built with Hive, including Datanucleus jars on
 classpath

 Exception in thread main java.lang.NoClassDefFoundError:
 org/apache/spark/deploy/SparkSubmit

 Caused by: java.lang.ClassNotFoundException:
 org.apache.spark.deploy.SparkSubmit

 at java.net.URLClassLoader$1.run(URLClassLoader.java:217)

 at java.security.AccessController.doPrivileged(Native Method)

 at java.net.URLClassLoader.findClass(URLClassLoader.java:205)

 at java.lang.ClassLoader.loadClass(ClassLoader.java:323)

 at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:294)at
 java.lang.ClassLoader.loadClass(ClassLoader.java:268)Could not find the
 main class: org.apache.spark.deploy.SparkSubmit. Program will exit.
 What can be wrong?

 Thanks,
 Daniel




Re: Could not find the main class: org.apache.spark.deploy.SparkSubmit

2014-12-16 Thread Daniel Haviv
Same here...
# jar tf lib/spark-assembly-1.1.2-SNAPSHOT-hadoop2.3.0.jar  | grep
SparkSubmit.class
*org/apache/spark/deploy/SparkSubmit.class*






On Tue, Dec 16, 2014 at 6:50 PM, Akhil Das ak...@sigmoidanalytics.com
wrote:

 This is how it looks on my machine.

 [image: Inline image 1]

 Thanks
 Best Regards

 On Tue, Dec 16, 2014 at 9:33 PM, Daniel Haviv danielru...@gmail.com
 wrote:

 That's the first thing I tried... still the same error:
 hdfs@ams-rsrv01:~$ export CLASSPATH=/tmp/spark/spark-branch-1.1/lib
 hdfs@ams-rsrv01:~$ cd /tmp/spark/spark-branch-1.1
 hdfs@ams-rsrv01:/tmp/spark/spark-branch-1.1$ ./bin/spark-shell

 Spark assembly has been built with Hive, including Datanucleus jars on
 classpath
 Exception in thread main java.lang.NoClassDefFoundError:
 org/apache/spark/deploy/SparkSubmit
 Caused by: java.lang.ClassNotFoundException:
 org.apache.spark.deploy.SparkSubmit
 at java.net.URLClassLoader$1.run(URLClassLoader.java:217)
 at java.security.AccessController.doPrivileged(Native Method)
 at java.net.URLClassLoader.findClass(URLClassLoader.java:205)
 at java.lang.ClassLoader.loadClass(ClassLoader.java:323)
 at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:294)
 at java.lang.ClassLoader.loadClass(ClassLoader.java:268)
 Could not find the main class: org.apache.spark.deploy.SparkSubmit.
 Program will exit.

 Thanks,
 Daniel

 On Tue, Dec 16, 2014 at 1:53 PM, Akhil Das ak...@sigmoidanalytics.com
 wrote:

 print the CLASSPATH and make sure the spark assembly jar is there in the
 classpath

 Thanks
 Best Regards

 On Tue, Dec 16, 2014 at 5:04 PM, Daniel Haviv danielru...@gmail.com
 wrote:

 Hi,
 I've built spark successfully with maven but when I try to run
 spark-shell I get the following errors:

 Spark assembly has been built with Hive, including Datanucleus jars on
 classpath

 Exception in thread main java.lang.NoClassDefFoundError:
 org/apache/spark/deploy/SparkSubmit

 Caused by: java.lang.ClassNotFoundException:
 org.apache.spark.deploy.SparkSubmit

 at java.net.URLClassLoader$1.run(URLClassLoader.java:217)

 at java.security.AccessController.doPrivileged(Native Method)

 at java.net.URLClassLoader.findClass(URLClassLoader.java:205)

 at java.lang.ClassLoader.loadClass(ClassLoader.java:323)

 at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:294)at
 java.lang.ClassLoader.loadClass(ClassLoader.java:268)Could not find
 the main class: org.apache.spark.deploy.SparkSubmit. Program will exit.
 What can be wrong?

 Thanks,
 Daniel




Re: Could not find the main class: org.apache.spark.deploy.SparkSubmit

2014-12-16 Thread Daniel Haviv
I've added every jar in the lib dir to my classpath and still no luck:

CLASSPATH=/tmp/spark/spark-branch-1.1/lib/datanucleus-api-jdo-3.2.1.jar:/tmp/spark/spark-branch-1.1/lib/datanucleus-core-3.2.2.jar:/tmp/spark/spark-branch-1.1/lib/datanucleus-rdbms-3.2.1.jar:/tmp/spark/spark-branch-1.1/lib/spark-assembly-1.1.2-SNAPSHOT-hadoop2.3.0.jar:/tmp/spark/spark-branch-1.1/lib/spark-bagel_2.10-1.1.2-SNAPSHOT.jar:/tmp/spark/spark-branch-1.1/lib/spark-bagel_2.10-1.1.2-SNAPSHOT-sources.jar:/tmp/spark/spark-branch-1.1/lib/spark-catalyst_2.10-1.1.2-SNAPSHOT.jar:/tmp/spark/spark-branch-1.1/lib/spark-catalyst_2.10-1.1.2-SNAPSHOT-sources.jar:/tmp/spark/spark-branch-1.1/lib/spark-catalyst_2.10-1.1.2-SNAPSHOT-tests.jar:/tmp/spark/spark-branch-1.1/lib/spark-core_2.10-1.1.2-SNAPSHOT.jar:/tmp/spark/spark-branch-1.1/lib/spark-core_2.10-1.1.2-SNAPSHOT-sources.jar:/tmp/spark/spark-branch-1.1/lib/spark-examples-1.1.2-SNAPSHOT-hadoop2.3.0.jar:/tmp/spark/spark-branch-1.1/lib/spark-examples_2.10-1.1.2-SNAPSHOT.jar:/tmp/spark/spark-branch-1.1/lib/spark-examples_2.10-1.1.2-SNAPSHOT-sources.jar:/tmp/spark/spark-branch-1.1/lib/spark-graphx_2.10-1.1.2-SNAPSHOT.jar:/tmp/spark/spark-branch-1.1/lib/spark-graphx_2.10-1.1.2-SNAPSHOT-sources.jar:/tmp/spark/spark-branch-1.1/lib/spark-hive_2.10-1.1.2-SNAPSHOT.jar:/tmp/spark/spark-branch-1.1/lib/spark-hive_2.10-1.1.2-SNAPSHOT-sources.jar:/tmp/spark/spark-branch-1.1/lib/spark-hive-thriftserver_2.10-1.1.2-SNAPSHOT.jar:/tmp/spark/spark-branch-1.1/lib/spark-hive-thriftserver_2.10-1.1.2-SNAPSHOT-sources.jar:/tmp/spark/spark-branch-1.1/lib/spark-mllib_2.10-1.1.2-SNAPSHOT.jar:/tmp/spark/spark-branch-1.1/lib/spark-mllib_2.10-1.1.2-SNAPSHOT-sources.jar:/tmp/spark/spark-branch-1.1/lib/spark-repl_2.10-1.1.2-SNAPSHOT.jar:/tmp/spark/spark-branch-1.1/lib/spark-repl_2.10-1.1.2-SNAPSHOT-sources.jar:/tmp/spark/spark-branch-1.1/lib/spark-sql_2.10-1.1.2-SNAPSHOT.jar:/tmp/spark/spark-branch-1.1/lib/spark-sql_2.10-1.1.2-SNAPSHOT-sources.jar:/tmp/spark/spark-branch-1.1/lib/spark-streaming_2.10-1.1.2-SNAPSHOT.jar:/tmp/spark/spark-branch-1.1/lib/spark-streaming_2.10-1.1.2-SNAPSHOT-sources.jar:/tmp/spark/spark-branch-1.1/lib/spark-streaming_2.10-1.1.2-SNAPSHOT-tests.jar:/tmp/spark/spark-branch-1.1/lib/spark-streaming-flume_2.10-1.1.2-SNAPSHOT.jar:/tmp/spark/spark-branch-1.1/lib/spark-streaming-flume_2.10-1.1.2-SNAPSHOT-sources.jar:/tmp/spark/spark-branch-1.1/lib/spark-streaming-flume-sink_2.10-1.1.2-SNAPSHOT.jar:/tmp/spark/spark-branch-1.1/lib/spark-streaming-flume-sink_2.10-1.1.2-SNAPSHOT-sources.jar:/tmp/spark/spark-branch-1.1/lib/spark-streaming-kafka_2.10-1.1.2-SNAPSHOT.jar:/tmp/spark/spark-branch-1.1/lib/spark-streaming-kafka_2.10-1.1.2-SNAPSHOT-sources.jar:/tmp/spark/spark-branch-1.1/lib/spark-streaming-mqtt_2.10-1.1.2-SNAPSHOT.jar:/tmp/spark/spark-branch-1.1/lib/spark-streaming-mqtt_2.10-1.1.2-SNAPSHOT-sources.jar:/tmp/spark/spark-branch-1.1/lib/spark-streaming-twitter_2.10-1.1.2-SNAPSHOT.jar:/tmp/spark/spark-branch-1.1/lib/spark-streaming-twitter_2.10-1.1.2-SNAPSHOT-sources.jar:/tmp/spark/spark-branch-1.1/lib/spark-streaming-zeromq_2.10-1.1.2-SNAPSHOT.jar:/tmp/spark/spark-branch-1.1/lib/spark-streaming-zeromq_2.10-1.1.2-SNAPSHOT-sources.jar:/tmp/spark/spark-branch-1.1/lib/spark-tools_2.10-1.1.2-SNAPSHOT.jar:/tmp/spark/spark-branch-1.1/lib/spark-tools_2.10-1.1.2-SNAPSHOT-sources.jar:/tmp/spark/spark-branch-1.1/lib/spark-yarn_2.10-1.1.2-SNAPSHOT.jar:/tmp/spark/spark-branch-1.1/lib/spark-yarn_2.10-1.1.2-SNAPSHOT-sources.jar:/tmp/spark/spark-branch-1.1/lib/TestSerDe.jar




On Tue, Dec 16, 2014 at 7:05 PM, Akhil Das ak...@sigmoidanalytics.com
wrote:

 And this is how my classpath looks like

 Spark assembly has been built with Hive, including Datanucleus jars on
 classpath

 ::/home/akhld/mobi/localcluster/spark-1/conf:/home/akhld/mobi/localcluster/spark-1/lib/spark-assembly-1.1.0-hadoop1.0.4.jar:/home/akhld/mobi/localcluster/spark-1/lib/datanucleus-core-3.2.2.jar:/home/akhld/mobi/localcluster/spark-1/lib/datanucleus-rdbms-3.2.1.jar:/home/akhld/mobi/localcluster/spark-1/lib/datanucleus-api-jdo-3.2.1.jar

 Thanks
 Best Regards

 On Tue, Dec 16, 2014 at 10:33 PM, Daniel Haviv danielru...@gmail.com
 wrote:

 Same here...
 # jar tf lib/spark-assembly-1.1.2-SNAPSHOT-hadoop2.3.0.jar  | grep
 SparkSubmit.class
 *org/apache/spark/deploy/SparkSubmit.class*






 On Tue, Dec 16, 2014 at 6:50 PM, Akhil Das ak...@sigmoidanalytics.com
 wrote:

 This is how it looks on my machine.

 [image: Inline image 1]

 Thanks
 Best Regards

 On Tue, Dec 16, 2014 at 9:33 PM, Daniel Haviv danielru...@gmail.com
 wrote:

 That's the first thing I tried... still the same error:
 hdfs@ams-rsrv01:~$ export CLASSPATH=/tmp/spark/spark-branch-1.1/lib
 hdfs@ams-rsrv01:~$ cd /tmp/spark/spark-branch-1.1
 hdfs@ams-rsrv01:/tmp/spark/spark-branch-1.1$ ./bin/spark-shell

 Spark assembly has been built with Hive, including Datanucleus jars on
 classpath
 Exception in thread main java.lang.NoClassDefFoundError:
 org/apache/spark/deploy/SparkSubmit
 Caused

Re: Could not find the main class: org.apache.spark.deploy.SparkSubmit

2014-12-16 Thread Daniel Haviv
Completely diffrent than the one I set:
Classpath is
 
::/tmp/spark/spark-branch-1.1/conf:/tmp/spark/spark-branch-1.1/assembly/target/scala-2.10/spark-assembly-1.1.2-SNAPSHOT-hadoop2.3.0.jar:/tmp/spark/spark-branch-1.1/lib_managed/jars/datanucleus-core-3.2.2.jar:/tmp/spark/spark-branch-1.1/lib_managed/jars/datanucleus-rdbms-3.2.1.jar:/tmp/spark/spark-branch-1.1/lib_managed/jars/datanucleus-api-jdo-3.2.1.jar



On Tue, Dec 16, 2014 at 7:18 PM, Akhil Das ak...@sigmoidanalytics.com
wrote:

 Can you open the file bin/spark-class and then put an echo $CLASSPATH
 below the place where they exports it and see what are the contents?
 On 16 Dec 2014 22:46, Daniel Haviv danielru...@gmail.com wrote:

 I've added every jar in the lib dir to my classpath and still no luck:


 CLASSPATH=/tmp/spark/spark-branch-1.1/lib/datanucleus-api-jdo-3.2.1.jar:/tmp/spark/spark-branch-1.1/lib/datanucleus-core-3.2.2.jar:/tmp/spark/spark-branch-1.1/lib/datanucleus-rdbms-3.2.1.jar:/tmp/spark/spark-branch-1.1/lib/spark-assembly-1.1.2-SNAPSHOT-hadoop2.3.0.jar:/tmp/spark/spark-branch-1.1/lib/spark-bagel_2.10-1.1.2-SNAPSHOT.jar:/tmp/spark/spark-branch-1.1/lib/spark-bagel_2.10-1.1.2-SNAPSHOT-sources.jar:/tmp/spark/spark-branch-1.1/lib/spark-catalyst_2.10-1.1.2-SNAPSHOT.jar:/tmp/spark/spark-branch-1.1/lib/spark-catalyst_2.10-1.1.2-SNAPSHOT-sources.jar:/tmp/spark/spark-branch-1.1/lib/spark-catalyst_2.10-1.1.2-SNAPSHOT-tests.jar:/tmp/spark/spark-branch-1.1/lib/spark-core_2.10-1.1.2-SNAPSHOT.jar:/tmp/spark/spark-branch-1.1/lib/spark-core_2.10-1.1.2-SNAPSHOT-sources.jar:/tmp/spark/spark-branch-1.1/lib/spark-examples-1.1.2-SNAPSHOT-hadoop2.3.0.jar:/tmp/spark/spark-branch-1.1/lib/spark-examples_2.10-1.1.2-SNAPSHOT.jar:/tmp/spark/spark-branch-1.1/lib/spark-examples_2.10-1.1.2-SNAPSHOT-sources.jar:/tmp/spark/spark-branch-1.1/lib/spark-graphx_2.10-1.1.2-SNAPSHOT.jar:/tmp/spark/spark-branch-1.1/lib/spark-graphx_2.10-1.1.2-SNAPSHOT-sources.jar:/tmp/spark/spark-branch-1.1/lib/spark-hive_2.10-1.1.2-SNAPSHOT.jar:/tmp/spark/spark-branch-1.1/lib/spark-hive_2.10-1.1.2-SNAPSHOT-sources.jar:/tmp/spark/spark-branch-1.1/lib/spark-hive-thriftserver_2.10-1.1.2-SNAPSHOT.jar:/tmp/spark/spark-branch-1.1/lib/spark-hive-thriftserver_2.10-1.1.2-SNAPSHOT-sources.jar:/tmp/spark/spark-branch-1.1/lib/spark-mllib_2.10-1.1.2-SNAPSHOT.jar:/tmp/spark/spark-branch-1.1/lib/spark-mllib_2.10-1.1.2-SNAPSHOT-sources.jar:/tmp/spark/spark-branch-1.1/lib/spark-repl_2.10-1.1.2-SNAPSHOT.jar:/tmp/spark/spark-branch-1.1/lib/spark-repl_2.10-1.1.2-SNAPSHOT-sources.jar:/tmp/spark/spark-branch-1.1/lib/spark-sql_2.10-1.1.2-SNAPSHOT.jar:/tmp/spark/spark-branch-1.1/lib/spark-sql_2.10-1.1.2-SNAPSHOT-sources.jar:/tmp/spark/spark-branch-1.1/lib/spark-streaming_2.10-1.1.2-SNAPSHOT.jar:/tmp/spark/spark-branch-1.1/lib/spark-streaming_2.10-1.1.2-SNAPSHOT-sources.jar:/tmp/spark/spark-branch-1.1/lib/spark-streaming_2.10-1.1.2-SNAPSHOT-tests.jar:/tmp/spark/spark-branch-1.1/lib/spark-streaming-flume_2.10-1.1.2-SNAPSHOT.jar:/tmp/spark/spark-branch-1.1/lib/spark-streaming-flume_2.10-1.1.2-SNAPSHOT-sources.jar:/tmp/spark/spark-branch-1.1/lib/spark-streaming-flume-sink_2.10-1.1.2-SNAPSHOT.jar:/tmp/spark/spark-branch-1.1/lib/spark-streaming-flume-sink_2.10-1.1.2-SNAPSHOT-sources.jar:/tmp/spark/spark-branch-1.1/lib/spark-streaming-kafka_2.10-1.1.2-SNAPSHOT.jar:/tmp/spark/spark-branch-1.1/lib/spark-streaming-kafka_2.10-1.1.2-SNAPSHOT-sources.jar:/tmp/spark/spark-branch-1.1/lib/spark-streaming-mqtt_2.10-1.1.2-SNAPSHOT.jar:/tmp/spark/spark-branch-1.1/lib/spark-streaming-mqtt_2.10-1.1.2-SNAPSHOT-sources.jar:/tmp/spark/spark-branch-1.1/lib/spark-streaming-twitter_2.10-1.1.2-SNAPSHOT.jar:/tmp/spark/spark-branch-1.1/lib/spark-streaming-twitter_2.10-1.1.2-SNAPSHOT-sources.jar:/tmp/spark/spark-branch-1.1/lib/spark-streaming-zeromq_2.10-1.1.2-SNAPSHOT.jar:/tmp/spark/spark-branch-1.1/lib/spark-streaming-zeromq_2.10-1.1.2-SNAPSHOT-sources.jar:/tmp/spark/spark-branch-1.1/lib/spark-tools_2.10-1.1.2-SNAPSHOT.jar:/tmp/spark/spark-branch-1.1/lib/spark-tools_2.10-1.1.2-SNAPSHOT-sources.jar:/tmp/spark/spark-branch-1.1/lib/spark-yarn_2.10-1.1.2-SNAPSHOT.jar:/tmp/spark/spark-branch-1.1/lib/spark-yarn_2.10-1.1.2-SNAPSHOT-sources.jar:/tmp/spark/spark-branch-1.1/lib/TestSerDe.jar




 On Tue, Dec 16, 2014 at 7:05 PM, Akhil Das ak...@sigmoidanalytics.com
 wrote:

 And this is how my classpath looks like

 Spark assembly has been built with Hive, including Datanucleus jars on
 classpath

 ::/home/akhld/mobi/localcluster/spark-1/conf:/home/akhld/mobi/localcluster/spark-1/lib/spark-assembly-1.1.0-hadoop1.0.4.jar:/home/akhld/mobi/localcluster/spark-1/lib/datanucleus-core-3.2.2.jar:/home/akhld/mobi/localcluster/spark-1/lib/datanucleus-rdbms-3.2.1.jar:/home/akhld/mobi/localcluster/spark-1/lib/datanucleus-api-jdo-3.2.1.jar

 Thanks
 Best Regards

 On Tue, Dec 16, 2014 at 10:33 PM, Daniel Haviv danielru...@gmail.com
 wrote:

 Same here...
 # jar tf lib/spark-assembly-1.1.2-SNAPSHOT-hadoop2.3.0.jar  | grep
 SparkSubmit.class
 *org/apache/spark

Re: Could not find the main class: org.apache.spark.deploy.SparkSubmit

2014-12-16 Thread Daniel Haviv
I'm using CDH5 that was installed via Cloudera Manager.
Does it matter?


Thanks,
Daniel

 On 16 בדצמ׳ 2014, at 19:18, Akhil Das ak...@sigmoidanalytics.com wrote:
 
 Can you open the file bin/spark-class and then put an echo $CLASSPATH below 
 the place where they exports it and see what are the contents?
 
 On 16 Dec 2014 22:46, Daniel Haviv danielru...@gmail.com wrote:
 I've added every jar in the lib dir to my classpath and still no luck:
 
 CLASSPATH=/tmp/spark/spark-branch-1.1/lib/datanucleus-api-jdo-3.2.1.jar:/tmp/spark/spark-branch-1.1/lib/datanucleus-core-3.2.2.jar:/tmp/spark/spark-branch-1.1/lib/datanucleus-rdbms-3.2.1.jar:/tmp/spark/spark-branch-1.1/lib/spark-assembly-1.1.2-SNAPSHOT-hadoop2.3.0.jar:/tmp/spark/spark-branch-1.1/lib/spark-bagel_2.10-1.1.2-SNAPSHOT.jar:/tmp/spark/spark-branch-1.1/lib/spark-bagel_2.10-1.1.2-SNAPSHOT-sources.jar:/tmp/spark/spark-branch-1.1/lib/spark-catalyst_2.10-1.1.2-SNAPSHOT.jar:/tmp/spark/spark-branch-1.1/lib/spark-catalyst_2.10-1.1.2-SNAPSHOT-sources.jar:/tmp/spark/spark-branch-1.1/lib/spark-catalyst_2.10-1.1.2-SNAPSHOT-tests.jar:/tmp/spark/spark-branch-1.1/lib/spark-core_2.10-1.1.2-SNAPSHOT.jar:/tmp/spark/spark-branch-1.1/lib/spark-core_2.10-1.1.2-SNAPSHOT-sources.jar:/tmp/spark/spark-branch-1.1/lib/spark-examples-1.1.2-SNAPSHOT-hadoop2.3.0.jar:/tmp/spark/spark-branch-1.1/lib/spark-examples_2.10-1.1.2-SNAPSHOT.jar:/tmp/spark/spark-branch-1.1/lib/spark-examples_2.10-1.1.2-SNAPSHOT-sources.jar:/tmp/spark/spark-branch-1.1/lib/spark-graphx_2.10-1.1.2-SNAPSHOT.jar:/tmp/spark/spark-branch-1.1/lib/spark-graphx_2.10-1.1.2-SNAPSHOT-sources.jar:/tmp/spark/spark-branch-1.1/lib/spark-hive_2.10-1.1.2-SNAPSHOT.jar:/tmp/spark/spark-branch-1.1/lib/spark-hive_2.10-1.1.2-SNAPSHOT-sources.jar:/tmp/spark/spark-branch-1.1/lib/spark-hive-thriftserver_2.10-1.1.2-SNAPSHOT.jar:/tmp/spark/spark-branch-1.1/lib/spark-hive-thriftserver_2.10-1.1.2-SNAPSHOT-sources.jar:/tmp/spark/spark-branch-1.1/lib/spark-mllib_2.10-1.1.2-SNAPSHOT.jar:/tmp/spark/spark-branch-1.1/lib/spark-mllib_2.10-1.1.2-SNAPSHOT-sources.jar:/tmp/spark/spark-branch-1.1/lib/spark-repl_2.10-1.1.2-SNAPSHOT.jar:/tmp/spark/spark-branch-1.1/lib/spark-repl_2.10-1.1.2-SNAPSHOT-sources.jar:/tmp/spark/spark-branch-1.1/lib/spark-sql_2.10-1.1.2-SNAPSHOT.jar:/tmp/spark/spark-branch-1.1/lib/spark-sql_2.10-1.1.2-SNAPSHOT-sources.jar:/tmp/spark/spark-branch-1.1/lib/spark-streaming_2.10-1.1.2-SNAPSHOT.jar:/tmp/spark/spark-branch-1.1/lib/spark-streaming_2.10-1.1.2-SNAPSHOT-sources.jar:/tmp/spark/spark-branch-1.1/lib/spark-streaming_2.10-1.1.2-SNAPSHOT-tests.jar:/tmp/spark/spark-branch-1.1/lib/spark-streaming-flume_2.10-1.1.2-SNAPSHOT.jar:/tmp/spark/spark-branch-1.1/lib/spark-streaming-flume_2.10-1.1.2-SNAPSHOT-sources.jar:/tmp/spark/spark-branch-1.1/lib/spark-streaming-flume-sink_2.10-1.1.2-SNAPSHOT.jar:/tmp/spark/spark-branch-1.1/lib/spark-streaming-flume-sink_2.10-1.1.2-SNAPSHOT-sources.jar:/tmp/spark/spark-branch-1.1/lib/spark-streaming-kafka_2.10-1.1.2-SNAPSHOT.jar:/tmp/spark/spark-branch-1.1/lib/spark-streaming-kafka_2.10-1.1.2-SNAPSHOT-sources.jar:/tmp/spark/spark-branch-1.1/lib/spark-streaming-mqtt_2.10-1.1.2-SNAPSHOT.jar:/tmp/spark/spark-branch-1.1/lib/spark-streaming-mqtt_2.10-1.1.2-SNAPSHOT-sources.jar:/tmp/spark/spark-branch-1.1/lib/spark-streaming-twitter_2.10-1.1.2-SNAPSHOT.jar:/tmp/spark/spark-branch-1.1/lib/spark-streaming-twitter_2.10-1.1.2-SNAPSHOT-sources.jar:/tmp/spark/spark-branch-1.1/lib/spark-streaming-zeromq_2.10-1.1.2-SNAPSHOT.jar:/tmp/spark/spark-branch-1.1/lib/spark-streaming-zeromq_2.10-1.1.2-SNAPSHOT-sources.jar:/tmp/spark/spark-branch-1.1/lib/spark-tools_2.10-1.1.2-SNAPSHOT.jar:/tmp/spark/spark-branch-1.1/lib/spark-tools_2.10-1.1.2-SNAPSHOT-sources.jar:/tmp/spark/spark-branch-1.1/lib/spark-yarn_2.10-1.1.2-SNAPSHOT.jar:/tmp/spark/spark-branch-1.1/lib/spark-yarn_2.10-1.1.2-SNAPSHOT-sources.jar:/tmp/spark/spark-branch-1.1/lib/TestSerDe.jar
 
 
 
 
 On Tue, Dec 16, 2014 at 7:05 PM, Akhil Das ak...@sigmoidanalytics.com 
 wrote:
 And this is how my classpath looks like
 
 Spark assembly has been built with Hive, including Datanucleus jars on 
 classpath
 ::/home/akhld/mobi/localcluster/spark-1/conf:/home/akhld/mobi/localcluster/spark-1/lib/spark-assembly-1.1.0-hadoop1.0.4.jar:/home/akhld/mobi/localcluster/spark-1/lib/datanucleus-core-3.2.2.jar:/home/akhld/mobi/localcluster/spark-1/lib/datanucleus-rdbms-3.2.1.jar:/home/akhld/mobi/localcluster/spark-1/lib/datanucleus-api-jdo-3.2.1.jar
 
 Thanks
 Best Regards
 
 On Tue, Dec 16, 2014 at 10:33 PM, Daniel Haviv danielru...@gmail.com 
 wrote:
 Same here...
 # jar tf lib/spark-assembly-1.1.2-SNAPSHOT-hadoop2.3.0.jar  | grep 
 SparkSubmit.class
 org/apache/spark/deploy/SparkSubmit.class
 
 
 
 
 
 
 On Tue, Dec 16, 2014 at 6:50 PM, Akhil Das ak...@sigmoidanalytics.com 
 wrote:
 This is how it looks on my machine.
 
 image.png
 
 Thanks
 Best Regards
 
 On Tue, Dec 16, 2014 at 9:33 PM, Daniel Haviv danielru...@gmail.com 
 wrote:
 That's the first thing I tried... still the same

Unable to start Spark 1.3 after building:java.lang. NoClassDefFoundError: org/codehaus/jackson/map/deser/std/StdDeserializer

2014-12-09 Thread Daniel Haviv
Hi,
I've built spark 1.3 with hadoop 2.6 but when I startup the spark-shell I
get the following exception:

14/12/09 06:54:24 INFO server.AbstractConnector: Started
SelectChannelConnector@0.0.0.0:4040
14/12/09 06:54:24 INFO util.Utils: Successfully started service 'SparkUI'
on port 4040.
14/12/09 06:54:24 INFO ui.SparkUI: Started SparkUI at http://hdname:4040
14/12/09 06:54:25 INFO impl.TimelineClientImpl: Timeline service address:
http://0.0.0.0:8188/ws/v1/timeline/
java.lang.NoClassDefFoundError:
org/codehaus/jackson/map/deser/std/StdDeserializer
at java.lang.ClassLoader.defineClass1(Native Method)
at java.lang.ClassLoader.defineClass(ClassLoader.java:800)
at
java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)

Any idea why ?

Thanks,
Daniel


Starting the thrift server

2014-11-26 Thread Daniel Haviv
Hi,
I'm trying to start the thrift server but failing:
Exception in thread main java.lang.NoClassDefFoundError:

org/apache/tez/dag/api/SessionNotRunning
at
org.apache.hadoop.hive.ql.session.SessionState.start(SessionState.java:353)
at
org.apache.spark.sql.hive.HiveContext$$anonfun$4.apply(HiveContext.scala:235)
at
org.apache.spark.sql.hive.HiveContext$$anonfun$4.apply(HiveContext.scala:231)
at scala.Option.orElse(Option.scala:257)


I've tried adding TEZ's libpath to the HADOOP_CLASSPATH but it didn't
help...

Your help will be appreciated.

Thanks,
Daniel


Re: Remapping columns from a schemaRDD

2014-11-26 Thread Daniel Haviv
Is there some place I can read more about it ? I can't find any reference.
I actully want to flatten these structures and not return them from the UDF.

Thanks,
Daniel

On Tue, Nov 25, 2014 at 8:44 PM, Michael Armbrust mich...@databricks.com
wrote:

 Maps should just be scala maps, structs are rows inside of rows.  If you
 wan to return a struct from a UDF you can do that with a case class.

 On Tue, Nov 25, 2014 at 10:25 AM, Daniel Haviv danielru...@gmail.com
 wrote:

 Thank you.

 How can I address more complex columns like maps and structs?

 Thanks again!
 Daniel

 On 25 בנוב׳ 2014, at 19:43, Michael Armbrust mich...@databricks.com
 wrote:

 Probably the easiest/closest way to do this would be with a UDF,
 something like:

 registerFunction(makeString, (s: Seq[String]) = s.mkString(,))
 sql(SELECT *, makeString(c8) AS newC8 FROM jRequests)

 Although this does not modify a column, but instead appends a new column.

 Another more complicated way to do something like this would be by using the
 applySchema function
 http://spark.apache.org/docs/latest/sql-programming-guide.html#programmatically-specifying-the-schema
 .

 I'll note that, as part of the ML pipeline work, we have been considering
 adding something like:

 def modifyColumn(columnName, function)

 Any comments anyone has on this interface would be appreciated!

 Michael

 On Tue, Nov 25, 2014 at 7:02 AM, Daniel Haviv danielru...@gmail.com
 wrote:

 Hi,
 I'm selecting columns from a json file, transform some of them and would
 like to store the result as a parquet file but I'm failing.

 This is what I'm doing:

 val jsonFiles=sqlContext.jsonFile(/requests.loading)
 jsonFiles.registerTempTable(jRequests)

 val clean_jRequests=sqlContext.sql(select c1, c2, c3 ... c55 from
 jRequests)

 and then I run a map:
  val
 jRequests_flat=clean_jRequests.map(line={((line(1),line(2),line(3),line(4),line(5),line(6),line(7),
 *line(8).asInstanceOf[Iterable[String]].mkString(,)*,line(9)
 ,line(10) ,line(11) ,line(12) ,line(13) ,line(14) ,line(15) ,line(16)
 ,line(17) ,line(18) ,line(19) ,line(20) ,line(21) ,line(22) ,line(23)
 ,line(24) ,line(25) ,line(26) ,line(27) ,line(28) ,line(29) ,line(30)
 ,line(31) ,line(32) ,line(33) ,line(34) ,line(35) ,line(36) ,line(37)
 ,line(38) ,line(39) ,line(40) ,line(41) ,line(42) ,line(43) ,line(44)
 ,line(45) ,line(46) ,line(47) ,line(48) ,line(49) ,line(50)))})



 1. Is there a smarter way to achieve that (only modify a certain column
 without relating to the others, but keeping all of them)?
 2. The last statement fails because the tuple has too much members:
 console:19: error: object Tuple50 is not a member of package scala


 Thanks for your help,
 Daniel






Re: RDD saveAsObjectFile write to local file and HDFS

2014-11-26 Thread Daniel Haviv
Prepend file:// to the path

Daniel

 On 26 בנוב׳ 2014, at 20:15, firemonk9 dhiraj.peech...@gmail.com wrote:
 
 When I am running spark locally, RDD saveAsObjectFile writes the file to
 local file system (ex : path /data/temp.txt)
 and 
 when I am running spark on YARN cluster,  RDD saveAsObjectFile writes the
 file to hdfs. (ex : path /data/temp.txt )
 
 Is there a way to explictly mention local file system instead of hdfs when
 running on YARN cluster.  
 
 
 
 --
 View this message in context: 
 http://apache-spark-user-list.1001560.n3.nabble.com/RDD-saveAsObjectFile-write-to-local-file-and-HDFS-tp19898.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.
 
 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org
 


Re: Unable to use Kryo

2014-11-25 Thread Daniel Haviv
The problem was I didn't use the correct class name, it should
be org.apache.spark.*serializer*.KryoSerializer

On Mon, Nov 24, 2014 at 11:12 PM, Daniel Haviv danielru...@gmail.com
wrote:

 Hi,
 I want to test Kryo serialization but when starting spark-shell I'm
 hitting the following error:
 java.lang.ClassNotFoundException: org.apache.spark.KryoSerializer

 the kryo-2.21.jar is on the classpath so I'm not sure why it's not picking
 it up.

 Thanks for your help,
 Daniel



Remapping columns from a schemaRDD

2014-11-25 Thread Daniel Haviv
Hi,
I'm selecting columns from a json file, transform some of them and would
like to store the result as a parquet file but I'm failing.

This is what I'm doing:

val jsonFiles=sqlContext.jsonFile(/requests.loading)
jsonFiles.registerTempTable(jRequests)

val clean_jRequests=sqlContext.sql(select c1, c2, c3 ... c55 from
jRequests)

and then I run a map:
 val
jRequests_flat=clean_jRequests.map(line={((line(1),line(2),line(3),line(4),line(5),line(6),line(7),
*line(8).asInstanceOf[Iterable[String]].mkString(,)*,line(9) ,line(10)
,line(11) ,line(12) ,line(13) ,line(14) ,line(15) ,line(16) ,line(17)
,line(18) ,line(19) ,line(20) ,line(21) ,line(22) ,line(23) ,line(24)
,line(25) ,line(26) ,line(27) ,line(28) ,line(29) ,line(30) ,line(31)
,line(32) ,line(33) ,line(34) ,line(35) ,line(36) ,line(37) ,line(38)
,line(39) ,line(40) ,line(41) ,line(42) ,line(43) ,line(44) ,line(45)
,line(46) ,line(47) ,line(48) ,line(49) ,line(50)))})



1. Is there a smarter way to achieve that (only modify a certain column
without relating to the others, but keeping all of them)?
2. The last statement fails because the tuple has too much members:
console:19: error: object Tuple50 is not a member of package scala


Thanks for your help,
Daniel


Re: Remapping columns from a schemaRDD

2014-11-25 Thread Daniel Haviv
Thank you.

How can I address more complex columns like maps and structs?

Thanks again!
Daniel

 On 25 בנוב׳ 2014, at 19:43, Michael Armbrust mich...@databricks.com wrote:
 
 Probably the easiest/closest way to do this would be with a UDF, something 
 like:
 
 registerFunction(makeString, (s: Seq[String]) = s.mkString(,))
 sql(SELECT *, makeString(c8) AS newC8 FROM jRequests)
 
 Although this does not modify a column, but instead appends a new column.
 
 Another more complicated way to do something like this would be by using the 
 applySchema function.
 
 I'll note that, as part of the ML pipeline work, we have been considering 
 adding something like:
 
 def modifyColumn(columnName, function)
 
 Any comments anyone has on this interface would be appreciated!
 
 Michael
 
 On Tue, Nov 25, 2014 at 7:02 AM, Daniel Haviv danielru...@gmail.com wrote:
 Hi,
 I'm selecting columns from a json file, transform some of them and would 
 like to store the result as a parquet file but I'm failing.
 
 This is what I'm doing:
 
 val jsonFiles=sqlContext.jsonFile(/requests.loading)
 jsonFiles.registerTempTable(jRequests)
 
 val clean_jRequests=sqlContext.sql(select c1, c2, c3 ... c55 from 
 jRequests)
 
 and then I run a map:
  val 
 jRequests_flat=clean_jRequests.map(line={((line(1),line(2),line(3),line(4),line(5),line(6),line(7),line(8).asInstanceOf[Iterable[String]].mkString(,),line(9)
  ,line(10) ,line(11) ,line(12) ,line(13) ,line(14) ,line(15) ,line(16) 
 ,line(17) ,line(18) ,line(19) ,line(20) ,line(21) ,line(22) ,line(23) 
 ,line(24) ,line(25) ,line(26) ,line(27) ,line(28) ,line(29) ,line(30) 
 ,line(31) ,line(32) ,line(33) ,line(34) ,line(35) ,line(36) ,line(37) 
 ,line(38) ,line(39) ,line(40) ,line(41) ,line(42) ,line(43) ,line(44) 
 ,line(45) ,line(46) ,line(47) ,line(48) ,line(49) ,line(50)))})
 
 
 
 1. Is there a smarter way to achieve that (only modify a certain column 
 without relating to the others, but keeping all of them)?
 2. The last statement fails because the tuple has too much members:
 console:19: error: object Tuple50 is not a member of package scala
 
 
 Thanks for your help,
 Daniel
 


Unable to use Kryo

2014-11-24 Thread Daniel Haviv
Hi,
I want to test Kryo serialization but when starting spark-shell I'm hitting
the following error:
java.lang.ClassNotFoundException: org.apache.spark.KryoSerializer

the kryo-2.21.jar is on the classpath so I'm not sure why it's not picking
it up.

Thanks for your help,
Daniel


Converting a column to a map

2014-11-23 Thread Daniel Haviv
Hi,
I have a column in my schemaRDD that is a map but I'm unable to convert it
to a map.. I've tried converting it to a Tuple2[String,String]:
val converted = jsonFiles.map(line= {
line(10).asInstanceOf[Tuple2[String,String]]})

but I get ClassCastException:
14/11/23 11:51:30 WARN scheduler.TaskSetManager: Lost task 0.0 in stage 1.0
(TID 2, localhost): java.lang.ClassCastException:
org.apache.spark.sql.catalyst.expressions.GenericRow cannot be cast to
scala.Tuple2

And if if convert it to Iterable[String] I can only get the values without
the keys.

What it the correct data type I should convert it to ?

Thanks,
Daniel


Merging Parquet Files

2014-11-22 Thread Daniel Haviv
Hi,
I'm ingesting a lot of small JSON files and convert them to unified parquet
files, but even the unified files are fairly small (~10MB).
I want to run a merge operation every hour on the existing files, but it
takes a lot of time for such a small amount of data: about 3 GB spread of
3000 parquet files.

Basically what I'm doing is load files in the existing directory, coalesce
them and save to the new dir:
val parquetFiles=sqlContext.parquetFile(/requests_merged/inproc)

parquetFiles.coalesce(2).saveAsParquetFile(/requests_merged/$currday)

Doing this takes over an hour on my 3 node cluster...

Is there a better way to achieve this ?
Any ideas what can cause such a simple operation take so long?

Thanks,
Daniel


  1   2   >