RE: Spark 2.0 - DataFrames vs Dataset performance

2016-10-24 Thread Mendelson, Assaf
Hi,
I believe that the UDF is only a small part of the problem. You can easily test 
by doing a UDF for dataframe too.
In my testing I saw that using datasets can be considerably slower than 
dataframe. I can make a guess as to why this happens.
Basically what you are doing in a dataframe is reading the data, then doing a 
function on a single column and counting the results. What this means is that 
in practice, the data is read into the tungsten project unsafe data structure, 
then the single column event_type is analyzed for the filtering and then the 
result is counted.
On the otherhand, dataset would read each element and convert it to a case 
class before doing the calculation. This means two things: First we need to 
read all the columns in the case class and second we need to generate the case 
class itself.
So basically the dataset option reads a lot more (all columns defined in the 
case class) and copies them (in the generation of the case class object used 
for the filtering).
In a more general case (i.e. when more complicated behavior is needed), we 
would be losing even more in terms of performance as catalyst and codegen would 
not take effect. Try for example to do the filter on a numeric value and you 
would see an even bigger difference as predicate pushdown to parquet would 
lower the dataframe’s time and not change much in the dataset.

If your goal is pure performance then probably dataframe solutions would be 
better than dataset in most cases. Dataset provides the advantage of type 
safety and if you have very complex logic where you would need to do multiple 
UDFs with many columns then going directly to dataset would simplify the 
development.



From: Daniel Darabos [mailto:daniel.dara...@lynxanalytics.com]
Sent: Monday, October 24, 2016 7:52 PM
To: Antoaneta Marinova
Cc: user
Subject: Re: Spark 2.0 - DataFrames vs Dataset performance

Hi Antoaneta,
I believe the difference is not due to Datasets being slower (DataFrames are 
just an alias to Datasets now), but rather using a user defined function for 
filtering vs using Spark builtins. The builtin can use tricks from Project 
Tungsten, such as only deserializing the "event_type" column. The user-defined 
function on the other hand has to be called with a full case class, so the 
whole object needs to be deserialized for each row.

Well, that is my understanding from reading some slides. Hopefully someone with 
a more direct knowledge of the code will correct me if I'm wrong.

On Mon, Oct 24, 2016 at 2:50 PM, Antoaneta Marinova 
> wrote:

Hello,


I am using Spark 2.0 for performing filtering, grouping and counting operations 
on events data saved in parquet files. As the events schema has very nested 
structure I wanted to read them as scala beans to simplify the code but I 
noticed a severe performance degradation. Below you can find simple comparison 
of the same operation using DataFrame and Dataset.


val data = session.read.parquet("events_data/")


// Using Datasets with custom class


//Case class matching the events schema

case class CustomEvent(event_id: Option[String],

event_type: Option[String]
   context : Option[Context],

….
   time: Option[BigInt]) extends Serializable {}

scala> val start = System.currentTimeMillis ;

  val count = data.as[CustomEvent].filter(event => 
eventNames.contains(event.event_type.get)).count ;

 val time =  System.currentTimeMillis - start


count: Long = 5545

time: Long = 11262


// Using DataFrames


scala>

val start = System.currentTimeMillis ;

val count = data.filter(col("event_type").isin(eventNames:_*)).count ;

val time =  System.currentTimeMillis - start


count: Long = 5545

time: Long = 147


The schema of the events is something like this:


//events schma

schemaroot

|-- event_id: string (nullable = true)

|-- event_type: string (nullable = true)

|-- context: struct (nullable = true)

||-- environment_1: struct (nullable = true)

|||-- filed1: integer (nullable = true)

|||-- filed2: integer (nullable = true)

|||-- filed3: integer (nullable = true)

||-- environment_2: struct (nullable = true)

|||-- filed_1: string (nullable = true)



|||-- filed_n: string (nullable = true)

|-- metadata: struct (nullable = true)

||-- elements: array (nullable = true)

|||-- element: struct (containsNull = true)

||||-- config: string (nullable = true)

||||-- tree: array (nullable = true)

|||||-- element: struct (containsNull = true)

||||||-- path: array (nullable = true)

|||||||-- element: struct (containsNull = true)

||||||||-- key: string (nullable = true)

||||||||-- name: string (nullable = true)

||||||||-- level: 

Re: [Spark 2] BigDecimal and 0

2016-10-24 Thread Efe Selcuk
I should have noted that I understand the notation of 0E-18 (exponential
form, I think) and that in a normal case it is no different than 0; I just
wanted to make sure that there wasn't something tricky going on since the
representation was seemingly changing.

Michael, that's a fair point. I keep operating under the assumption of some
guaranteed performance from BigDecimal but I realize there is probably some
math happening that's causing results that can't perfectly be represented.

Thanks guys. I'm good now.

On Mon, Oct 24, 2016 at 8:57 PM Jakob Odersky  wrote:

> Yes, thanks for elaborating Michael.
> The other thing that I wanted to highlight was that in this specific
> case the value is actually exactly zero (0E-18 = 0*10^(-18) = 0).
>
> On Mon, Oct 24, 2016 at 8:50 PM, Michael Matsko 
> wrote:
> > Efe,
> >
> > I think Jakob's point is that that there is no problem.  When you deal
> with
> > real numbers, you don't get exact representations of numbers.  There is
> > always some slop in representations, things don't ever cancel out
> exactly.
> > Testing reals for equality to zero will almost never work.
> >
> > Look at Goldberg's paper
> >
> https://ece.uwaterloo.ca/~dwharder/NumericalAnalysis/02Numerics/Double/paper.pdf
> > for a quick intro.
> >
> > Mike
> >
> > On Oct 24, 2016, at 10:36 PM, Efe Selcuk  wrote:
> >
> > Okay, so this isn't contributing to any kind of imprecision. I suppose I
> > need to go digging further then. Thanks for the quick help.
> >
> > On Mon, Oct 24, 2016 at 7:34 PM Jakob Odersky  wrote:
> >>
> >> What you're seeing is merely a strange representation, 0E-18 is zero.
> >> The E-18 represents the precision that Spark uses to store the decimal
> >>
> >> On Mon, Oct 24, 2016 at 7:32 PM, Jakob Odersky 
> wrote:
> >> > An even smaller example that demonstrates the same behaviour:
> >> >
> >> > Seq(Data(BigDecimal(0))).toDS.head
> >> >
> >> > On Mon, Oct 24, 2016 at 7:03 PM, Efe Selcuk 
> wrote:
> >> >> I’m trying to track down what seems to be a very slight imprecision
> in
> >> >> our
> >> >> Spark application; two of our columns, which should be netting out to
> >> >> exactly zero, are coming up with very small fractions of non-zero
> >> >> value. The
> >> >> only thing that I’ve found out of place is that a case class entry
> into
> >> >> a
> >> >> Dataset we’ve generated with BigDecimal(“0”) will end up as 0E-18
> after
> >> >> it
> >> >> goes through Spark, and I don’t know if there’s any appreciable
> >> >> difference
> >> >> between that and the actual 0 value, which can be generated with
> >> >> BigDecimal.
> >> >> Here’s a contrived example:
> >> >>
> >> >> scala> case class Data(num: BigDecimal)
> >> >> defined class Data
> >> >>
> >> >> scala> val x = Data(0)
> >> >> x: Data = Data(0)
> >> >>
> >> >> scala> x.num
> >> >> res9: BigDecimal = 0
> >> >>
> >> >> scala> val y = Seq(x, x.copy()).toDS.reduce( (a,b) => a.copy(a.num +
> >> >> b.num))
> >> >> y: Data = Data(0E-18)
> >> >>
> >> >> scala> y.num
> >> >> res12: BigDecimal = 0E-18
> >> >>
> >> >> scala> BigDecimal("1") - 1
> >> >> res15: scala.math.BigDecimal = 0
> >> >>
> >> >> Am I looking at anything valuable?
> >> >>
> >> >> Efe
>


Re: [Spark 2] BigDecimal and 0

2016-10-24 Thread Jakob Odersky
Yes, thanks for elaborating Michael.
The other thing that I wanted to highlight was that in this specific
case the value is actually exactly zero (0E-18 = 0*10^(-18) = 0).

On Mon, Oct 24, 2016 at 8:50 PM, Michael Matsko  wrote:
> Efe,
>
> I think Jakob's point is that that there is no problem.  When you deal with
> real numbers, you don't get exact representations of numbers.  There is
> always some slop in representations, things don't ever cancel out exactly.
> Testing reals for equality to zero will almost never work.
>
> Look at Goldberg's paper
> https://ece.uwaterloo.ca/~dwharder/NumericalAnalysis/02Numerics/Double/paper.pdf
> for a quick intro.
>
> Mike
>
> On Oct 24, 2016, at 10:36 PM, Efe Selcuk  wrote:
>
> Okay, so this isn't contributing to any kind of imprecision. I suppose I
> need to go digging further then. Thanks for the quick help.
>
> On Mon, Oct 24, 2016 at 7:34 PM Jakob Odersky  wrote:
>>
>> What you're seeing is merely a strange representation, 0E-18 is zero.
>> The E-18 represents the precision that Spark uses to store the decimal
>>
>> On Mon, Oct 24, 2016 at 7:32 PM, Jakob Odersky  wrote:
>> > An even smaller example that demonstrates the same behaviour:
>> >
>> > Seq(Data(BigDecimal(0))).toDS.head
>> >
>> > On Mon, Oct 24, 2016 at 7:03 PM, Efe Selcuk  wrote:
>> >> I’m trying to track down what seems to be a very slight imprecision in
>> >> our
>> >> Spark application; two of our columns, which should be netting out to
>> >> exactly zero, are coming up with very small fractions of non-zero
>> >> value. The
>> >> only thing that I’ve found out of place is that a case class entry into
>> >> a
>> >> Dataset we’ve generated with BigDecimal(“0”) will end up as 0E-18 after
>> >> it
>> >> goes through Spark, and I don’t know if there’s any appreciable
>> >> difference
>> >> between that and the actual 0 value, which can be generated with
>> >> BigDecimal.
>> >> Here’s a contrived example:
>> >>
>> >> scala> case class Data(num: BigDecimal)
>> >> defined class Data
>> >>
>> >> scala> val x = Data(0)
>> >> x: Data = Data(0)
>> >>
>> >> scala> x.num
>> >> res9: BigDecimal = 0
>> >>
>> >> scala> val y = Seq(x, x.copy()).toDS.reduce( (a,b) => a.copy(a.num +
>> >> b.num))
>> >> y: Data = Data(0E-18)
>> >>
>> >> scala> y.num
>> >> res12: BigDecimal = 0E-18
>> >>
>> >> scala> BigDecimal("1") - 1
>> >> res15: scala.math.BigDecimal = 0
>> >>
>> >> Am I looking at anything valuable?
>> >>
>> >> Efe

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: [Spark 2] BigDecimal and 0

2016-10-24 Thread Michael Matsko
Efe,

I think Jakob's point is that that there is no problem.  When you deal with 
real numbers, you don't get exact representations of numbers.  There is always 
some slop in representations, things don't ever cancel out exactly.  Testing 
reals for equality to zero will almost never work.  

Look at Goldberg's paper 
https://ece.uwaterloo.ca/~dwharder/NumericalAnalysis/02Numerics/Double/paper.pdf
 for a quick intro.

Mike

> On Oct 24, 2016, at 10:36 PM, Efe Selcuk  wrote:
> 
> Okay, so this isn't contributing to any kind of imprecision. I suppose I need 
> to go digging further then. Thanks for the quick help.
> 
>> On Mon, Oct 24, 2016 at 7:34 PM Jakob Odersky  wrote:
>> What you're seeing is merely a strange representation, 0E-18 is zero.
>> The E-18 represents the precision that Spark uses to store the decimal
>> 
>> On Mon, Oct 24, 2016 at 7:32 PM, Jakob Odersky  wrote:
>> > An even smaller example that demonstrates the same behaviour:
>> >
>> > Seq(Data(BigDecimal(0))).toDS.head
>> >
>> > On Mon, Oct 24, 2016 at 7:03 PM, Efe Selcuk  wrote:
>> >> I’m trying to track down what seems to be a very slight imprecision in our
>> >> Spark application; two of our columns, which should be netting out to
>> >> exactly zero, are coming up with very small fractions of non-zero value. 
>> >> The
>> >> only thing that I’ve found out of place is that a case class entry into a
>> >> Dataset we’ve generated with BigDecimal(“0”) will end up as 0E-18 after it
>> >> goes through Spark, and I don’t know if there’s any appreciable difference
>> >> between that and the actual 0 value, which can be generated with 
>> >> BigDecimal.
>> >> Here’s a contrived example:
>> >>
>> >> scala> case class Data(num: BigDecimal)
>> >> defined class Data
>> >>
>> >> scala> val x = Data(0)
>> >> x: Data = Data(0)
>> >>
>> >> scala> x.num
>> >> res9: BigDecimal = 0
>> >>
>> >> scala> val y = Seq(x, x.copy()).toDS.reduce( (a,b) => a.copy(a.num + 
>> >> b.num))
>> >> y: Data = Data(0E-18)
>> >>
>> >> scala> y.num
>> >> res12: BigDecimal = 0E-18
>> >>
>> >> scala> BigDecimal("1") - 1
>> >> res15: scala.math.BigDecimal = 0
>> >>
>> >> Am I looking at anything valuable?
>> >>
>> >> Efe


Re: spark streaming with kinesis

2016-10-24 Thread Takeshi Yamamuro
I'm not exactly sure about the receiver you pointed though,
if you point the "KinesisReceiver" implementation, yes.

Also, we currently cannot disable the interval checkpoints.

On Tue, Oct 25, 2016 at 11:53 AM, Shushant Arora 
wrote:

> Thanks!
>
> Is kinesis streams are receiver based only? Is there non receiver based
> consumer for Kinesis ?
>
> And Instead of having fixed checkpoint interval,Can I disable auto
> checkpoint and say  when my worker has processed the data after last record
> of mapPartition now checkpoint the sequence no using some api.
>
>
>
> On Tue, Oct 25, 2016 at 7:07 AM, Takeshi Yamamuro 
> wrote:
>
>> Hi,
>>
>> The only thing you can do for Kinesis checkpoints is tune the interval of
>> them.
>> https://github.com/apache/spark/blob/master/external/kinesis
>> -asl/src/main/scala/org/apache/spark/streaming/kinesis/
>> KinesisUtils.scala#L68
>>
>> Whether the dataloss occurs or not depends on the storage level you set;
>> if you set StorageLevel.MEMORY_AND_DISK_2, Spark may continue processing
>> in case of the dataloss because the stream data Spark receives are
>> replicated across executors.
>> However,  all the executors that have the replicated data crash,
>> IIUC the dataloss occurs.
>>
>> // maropu
>>
>> On Mon, Oct 24, 2016 at 4:43 PM, Shushant Arora <
>> shushantaror...@gmail.com> wrote:
>>
>>> Does spark streaming consumer for kinesis uses Kinesis Client Library
>>>  and mandates to checkpoint the sequence number of shards in dynamo db.
>>>
>>> Will it lead to dataloss if consumed datarecords are not yet processed
>>> and kinesis checkpointed the consumed sequenece numbers in dynamo db and
>>> spark worker crashes - then spark launched the worker on another node but
>>> start consuming from dynamo db's checkpointed sequence number which is
>>> ahead of processed sequenece number .
>>>
>>> is there a way to checkpoint the sequenece numbers ourselves in Kinesis
>>> as it is in Kafka low level consumer ?
>>>
>>> Thanks
>>>
>>>
>>
>>
>> --
>> ---
>> Takeshi Yamamuro
>>
>
>


-- 
---
Takeshi Yamamuro


Re: Get size of intermediate results

2016-10-24 Thread Takeshi Yamamuro
-dev +user

Hi,

You have tried this?
scala> val df = Seq((1, 0), (2, 0), (3, 0), (4, 0)).toDF.cache
scala> df.queryExecution.executedPlan(0).execute().foreach(x => Unit)
scala> df.rdd.toDebugString

res4: String =

(4) MapPartitionsRDD[13] at rdd at :26 []

 |  MapPartitionsRDD[12] at rdd at :26 []

 |  MapPartitionsRDD[11] at rdd at :26 []

 |  LocalTableScan [_1#41, _2#42]

 MapPartitionsRDD[9] at cache at :23 []

 |  CachedPartitions: 4; MemorySize: 1104.0 B; ExternalBlockStoreSize:
0.0 B; DiskSize: 0.0 B

 |  MapPartitionsRDD[8] at cache at :23 []

 |  ParallelCollectionRDD[7] at cache at :23 []

// maropu

On Fri, Oct 21, 2016 at 10:18 AM, Egor Pahomov 
wrote:

> I needed the same for debugging and I just added "count" action in debug
> mode for every step I was interested in. It's very time-consuming, but I
> debug not very often.
>
> 2016-10-20 2:17 GMT-07:00 Andreas Hechenberger :
>
>> Hey awesome Spark-Dev's :)
>>
>> i am new to spark and i read a lot but now i am stuck :( so please be
>> kind, if i ask silly questions.
>>
>> I want to analyze some algorithms and strategies in spark and for one
>> experiment i want to know the size of the intermediate results between
>> iterations/jobs. Some of them are written to disk and some are in the
>> cache, i guess. I am not afraid of looking into the code (i already did)
>> but its complex and have no clue where to start :( It would be nice if
>> someone can point me in the right direction or where i can find more
>> information about the structure of spark core devel :)
>>
>> I already setup the devel environment and i can compile spark. It was
>> really awesome how smoothly the setup was :) Thx for that.
>>
>> Servus
>> Andy
>>
>> -
>> To unsubscribe e-mail: dev-unsubscr...@spark.apache.org
>>
>>
>
>
> --
>
>
> *Sincerely yoursEgor Pakhomov*
>



-- 
---
Takeshi Yamamuro


Re: spark streaming with kinesis

2016-10-24 Thread Shushant Arora
Thanks!

Is kinesis streams are receiver based only? Is there non receiver based
consumer for Kinesis ?

And Instead of having fixed checkpoint interval,Can I disable auto
checkpoint and say  when my worker has processed the data after last record
of mapPartition now checkpoint the sequence no using some api.



On Tue, Oct 25, 2016 at 7:07 AM, Takeshi Yamamuro 
wrote:

> Hi,
>
> The only thing you can do for Kinesis checkpoints is tune the interval of
> them.
> https://github.com/apache/spark/blob/master/external/kinesis
> -asl/src/main/scala/org/apache/spark/streaming/kinesis
> /KinesisUtils.scala#L68
>
> Whether the dataloss occurs or not depends on the storage level you set;
> if you set StorageLevel.MEMORY_AND_DISK_2, Spark may continue processing
> in case of the dataloss because the stream data Spark receives are
> replicated across executors.
> However,  all the executors that have the replicated data crash,
> IIUC the dataloss occurs.
>
> // maropu
>
> On Mon, Oct 24, 2016 at 4:43 PM, Shushant Arora  > wrote:
>
>> Does spark streaming consumer for kinesis uses Kinesis Client Library
>>  and mandates to checkpoint the sequence number of shards in dynamo db.
>>
>> Will it lead to dataloss if consumed datarecords are not yet processed
>> and kinesis checkpointed the consumed sequenece numbers in dynamo db and
>> spark worker crashes - then spark launched the worker on another node but
>> start consuming from dynamo db's checkpointed sequence number which is
>> ahead of processed sequenece number .
>>
>> is there a way to checkpoint the sequenece numbers ourselves in Kinesis
>> as it is in Kafka low level consumer ?
>>
>> Thanks
>>
>>
>
>
> --
> ---
> Takeshi Yamamuro
>


Re: How to iterate the element of an array in DataFrame?

2016-10-24 Thread Yan Facai
scala> mblog_tags.dtypes
res13: Array[(String, String)] =
Array((tags,ArrayType(StructType(StructField(category,StringType,true),
StructField(weight,StringType,true)),true)))

scala> val testUDF = udf{ s: Seq[Tags] => s(0).weight }
testUDF: org.apache.spark.sql.expressions.UserDefinedFunction =
UserDefinedFunction(,StringType,Some(List(ArrayType(StructType(StructField(category,StringType,true),
StructField(weight,StringType,true)),true

Where is wrong with the udf function `testUDF` ?





On Tue, Oct 25, 2016 at 10:41 AM, 颜发才(Yan Facai)  wrote:

> Thanks, Cheng Lian.
>
> I try to use case class:
>
> scala> case class Tags (category: String, weight: String)
>
> scala> val testUDF = udf{ s: Seq[Tags] => s(0).weight }
>
> testUDF: org.apache.spark.sql.expressions.UserDefinedFunction =
> UserDefinedFunction(,StringType,Some(List(ArrayType(StructType(
> StructField(category,StringType,true), StructField(weight,StringType,
> true)),true
>
>
> but it raises an ClassCastException when run:
>
> scala> mblog_tags.withColumn("test", testUDF(col("tags"))).show(false)
>
> 16/10/25 10:39:54 ERROR Executor: Exception in task 0.0 in stage 4.0 (TID
> 4)
> java.lang.ClassCastException: 
> org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema
> cannot be cast to $line58.$read$$iw$$iw$Tags
> at $line59.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun$1.
> apply(:27)
> at $line59.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun$1.
> apply(:27)
> ...
>
>
> Where did I do wrong?
>
>
>
>
> On Sat, Oct 22, 2016 at 6:37 AM, Cheng Lian  wrote:
>
>> You may either use SQL function "array" and "named_struct" or define a
>> case class with expected field names.
>>
>> Cheng
>>
>> On 10/21/16 2:45 AM, 颜发才(Yan Facai) wrote:
>>
>> My expectation is:
>> root
>> |-- tag: vector
>>
>> namely, I want to extract from:
>> [[tagCategory_060, 0.8], [tagCategory_029, 0.7]]|
>> to:
>> Vectors.sparse(100, Array(60, 29),  Array(0.8, 0.7))
>>
>> I believe it needs two step:
>> 1. val tag2vec = {tag: Array[Structure] => Vector}
>> 2. mblog_tags.withColumn("vec", tag2vec(col("tag"))
>>
>> But, I have no idea of how to describe the Array[Structure] in the
>> DataFrame.
>>
>>
>>
>>
>>
>> On Fri, Oct 21, 2016 at 4:51 PM, lk_spark  wrote:
>>
>>> how about change Schema from
>>> root
>>>  |-- category.firstCategory: array (nullable = true)
>>>  ||-- element: struct (containsNull = true)
>>>  |||-- category: string (nullable = true)
>>>  |||-- weight: string (nullable = true)
>>> to:
>>>
>>> root
>>>  |-- category: string (nullable = true)
>>>  |-- weight: string (nullable = true)
>>>
>>> 2016-10-21
>>> --
>>> lk_spark
>>> --
>>>
>>> *发件人:*颜发才(Yan Facai) 
>>> *发送时间:*2016-10-21 15:35
>>> *主题:*Re: How to iterate the element of an array in DataFrame?
>>> *收件人:*"user.spark"
>>> *抄送:*
>>>
>>> I don't know how to construct `array>> weight:string>>`.
>>> Could anyone help me?
>>>
>>> I try to get the array by :
>>> scala> mblog_tags.map(_.getSeq[(String, String)](0))
>>>
>>> while the result is:
>>> res40: org.apache.spark.sql.Dataset[Seq[(String, String)]] = [value:
>>> array>]
>>>
>>>
>>> How to express `struct` ?
>>>
>>>
>>>
>>> On Thu, Oct 20, 2016 at 4:34 PM, 颜发才(Yan Facai) 
>>> wrote:
>>>
 Hi, I want to extract the attribute `weight` of an array, and combine
 them to construct a sparse vector.

 ### My data is like this:

 scala> mblog_tags.printSchema
 root
  |-- category.firstCategory: array (nullable = true)
  ||-- element: struct (containsNull = true)
  |||-- category: string (nullable = true)
  |||-- weight: string (nullable = true)


 scala> mblog_tags.show(false)
 +--+
 |category.firstCategory|
 +--+
 |[[tagCategory_060, 0.8], [tagCategory_029, 0.7]]|
 |[[tagCategory_029, 0.9]]  |
 |[[tagCategory_029, 0.8]]  |
 +--+


 ### And expected:
 Vectors.sparse(100, Array(60, 29),  Array(0.8, 0.7))
 Vectors.sparse(100, Array(29),  Array(0.9))
 Vectors.sparse(100, Array(29),  Array(0.8))

 How to iterate an array in DataFrame?
 Thanks.




>>>
>>
>>
>


Re: How to iterate the element of an array in DataFrame?

2016-10-24 Thread Yan Facai
Thanks, Cheng Lian.

I try to use case class:

scala> case class Tags (category: String, weight: String)

scala> val testUDF = udf{ s: Seq[Tags] => s(0).weight }

testUDF: org.apache.spark.sql.expressions.UserDefinedFunction =
UserDefinedFunction(,StringType,Some(List(ArrayType(StructType(StructField(category,StringType,true),
StructField(weight,StringType,true)),true


but it raises an ClassCastException when run:

scala> mblog_tags.withColumn("test", testUDF(col("tags"))).show(false)

16/10/25 10:39:54 ERROR Executor: Exception in task 0.0 in stage 4.0 (TID 4)
java.lang.ClassCastException:
org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema cannot be
cast to $line58.$read$$iw$$iw$Tags
at
$line59.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun$1.apply(:27)
at
$line59.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun$1.apply(:27)
...


Where did I do wrong?




On Sat, Oct 22, 2016 at 6:37 AM, Cheng Lian  wrote:

> You may either use SQL function "array" and "named_struct" or define a
> case class with expected field names.
>
> Cheng
>
> On 10/21/16 2:45 AM, 颜发才(Yan Facai) wrote:
>
> My expectation is:
> root
> |-- tag: vector
>
> namely, I want to extract from:
> [[tagCategory_060, 0.8], [tagCategory_029, 0.7]]|
> to:
> Vectors.sparse(100, Array(60, 29),  Array(0.8, 0.7))
>
> I believe it needs two step:
> 1. val tag2vec = {tag: Array[Structure] => Vector}
> 2. mblog_tags.withColumn("vec", tag2vec(col("tag"))
>
> But, I have no idea of how to describe the Array[Structure] in the
> DataFrame.
>
>
>
>
>
> On Fri, Oct 21, 2016 at 4:51 PM, lk_spark  wrote:
>
>> how about change Schema from
>> root
>>  |-- category.firstCategory: array (nullable = true)
>>  ||-- element: struct (containsNull = true)
>>  |||-- category: string (nullable = true)
>>  |||-- weight: string (nullable = true)
>> to:
>>
>> root
>>  |-- category: string (nullable = true)
>>  |-- weight: string (nullable = true)
>>
>> 2016-10-21
>> --
>> lk_spark
>> --
>>
>> *发件人:*颜发才(Yan Facai) 
>> *发送时间:*2016-10-21 15:35
>> *主题:*Re: How to iterate the element of an array in DataFrame?
>> *收件人:*"user.spark"
>> *抄送:*
>>
>> I don't know how to construct `array> weight:string>>`.
>> Could anyone help me?
>>
>> I try to get the array by :
>> scala> mblog_tags.map(_.getSeq[(String, String)](0))
>>
>> while the result is:
>> res40: org.apache.spark.sql.Dataset[Seq[(String, String)]] = [value:
>> array>]
>>
>>
>> How to express `struct` ?
>>
>>
>>
>> On Thu, Oct 20, 2016 at 4:34 PM, 颜发才(Yan Facai)  wrote:
>>
>>> Hi, I want to extract the attribute `weight` of an array, and combine
>>> them to construct a sparse vector.
>>>
>>> ### My data is like this:
>>>
>>> scala> mblog_tags.printSchema
>>> root
>>>  |-- category.firstCategory: array (nullable = true)
>>>  ||-- element: struct (containsNull = true)
>>>  |||-- category: string (nullable = true)
>>>  |||-- weight: string (nullable = true)
>>>
>>>
>>> scala> mblog_tags.show(false)
>>> +--+
>>> |category.firstCategory|
>>> +--+
>>> |[[tagCategory_060, 0.8], [tagCategory_029, 0.7]]|
>>> |[[tagCategory_029, 0.9]]  |
>>> |[[tagCategory_029, 0.8]]  |
>>> +--+
>>>
>>>
>>> ### And expected:
>>> Vectors.sparse(100, Array(60, 29),  Array(0.8, 0.7))
>>> Vectors.sparse(100, Array(29),  Array(0.9))
>>> Vectors.sparse(100, Array(29),  Array(0.8))
>>>
>>> How to iterate an array in DataFrame?
>>> Thanks.
>>>
>>>
>>>
>>>
>>
>
>


Re: [Spark 2] BigDecimal and 0

2016-10-24 Thread Efe Selcuk
Okay, so this isn't contributing to any kind of imprecision. I suppose I
need to go digging further then. Thanks for the quick help.

On Mon, Oct 24, 2016 at 7:34 PM Jakob Odersky  wrote:

> What you're seeing is merely a strange representation, 0E-18 is zero.
> The E-18 represents the precision that Spark uses to store the decimal
>
> On Mon, Oct 24, 2016 at 7:32 PM, Jakob Odersky  wrote:
> > An even smaller example that demonstrates the same behaviour:
> >
> > Seq(Data(BigDecimal(0))).toDS.head
> >
> > On Mon, Oct 24, 2016 at 7:03 PM, Efe Selcuk  wrote:
> >> I’m trying to track down what seems to be a very slight imprecision in
> our
> >> Spark application; two of our columns, which should be netting out to
> >> exactly zero, are coming up with very small fractions of non-zero
> value. The
> >> only thing that I’ve found out of place is that a case class entry into
> a
> >> Dataset we’ve generated with BigDecimal(“0”) will end up as 0E-18 after
> it
> >> goes through Spark, and I don’t know if there’s any appreciable
> difference
> >> between that and the actual 0 value, which can be generated with
> BigDecimal.
> >> Here’s a contrived example:
> >>
> >> scala> case class Data(num: BigDecimal)
> >> defined class Data
> >>
> >> scala> val x = Data(0)
> >> x: Data = Data(0)
> >>
> >> scala> x.num
> >> res9: BigDecimal = 0
> >>
> >> scala> val y = Seq(x, x.copy()).toDS.reduce( (a,b) => a.copy(a.num +
> b.num))
> >> y: Data = Data(0E-18)
> >>
> >> scala> y.num
> >> res12: BigDecimal = 0E-18
> >>
> >> scala> BigDecimal("1") - 1
> >> res15: scala.math.BigDecimal = 0
> >>
> >> Am I looking at anything valuable?
> >>
> >> Efe
>


Re: [Spark 2] BigDecimal and 0

2016-10-24 Thread Jakob Odersky
What you're seeing is merely a strange representation, 0E-18 is zero.
The E-18 represents the precision that Spark uses to store the decimal

On Mon, Oct 24, 2016 at 7:32 PM, Jakob Odersky  wrote:
> An even smaller example that demonstrates the same behaviour:
>
> Seq(Data(BigDecimal(0))).toDS.head
>
> On Mon, Oct 24, 2016 at 7:03 PM, Efe Selcuk  wrote:
>> I’m trying to track down what seems to be a very slight imprecision in our
>> Spark application; two of our columns, which should be netting out to
>> exactly zero, are coming up with very small fractions of non-zero value. The
>> only thing that I’ve found out of place is that a case class entry into a
>> Dataset we’ve generated with BigDecimal(“0”) will end up as 0E-18 after it
>> goes through Spark, and I don’t know if there’s any appreciable difference
>> between that and the actual 0 value, which can be generated with BigDecimal.
>> Here’s a contrived example:
>>
>> scala> case class Data(num: BigDecimal)
>> defined class Data
>>
>> scala> val x = Data(0)
>> x: Data = Data(0)
>>
>> scala> x.num
>> res9: BigDecimal = 0
>>
>> scala> val y = Seq(x, x.copy()).toDS.reduce( (a,b) => a.copy(a.num + b.num))
>> y: Data = Data(0E-18)
>>
>> scala> y.num
>> res12: BigDecimal = 0E-18
>>
>> scala> BigDecimal("1") - 1
>> res15: scala.math.BigDecimal = 0
>>
>> Am I looking at anything valuable?
>>
>> Efe

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: [Spark 2] BigDecimal and 0

2016-10-24 Thread Jakob Odersky
An even smaller example that demonstrates the same behaviour:

Seq(Data(BigDecimal(0))).toDS.head

On Mon, Oct 24, 2016 at 7:03 PM, Efe Selcuk  wrote:
> I’m trying to track down what seems to be a very slight imprecision in our
> Spark application; two of our columns, which should be netting out to
> exactly zero, are coming up with very small fractions of non-zero value. The
> only thing that I’ve found out of place is that a case class entry into a
> Dataset we’ve generated with BigDecimal(“0”) will end up as 0E-18 after it
> goes through Spark, and I don’t know if there’s any appreciable difference
> between that and the actual 0 value, which can be generated with BigDecimal.
> Here’s a contrived example:
>
> scala> case class Data(num: BigDecimal)
> defined class Data
>
> scala> val x = Data(0)
> x: Data = Data(0)
>
> scala> x.num
> res9: BigDecimal = 0
>
> scala> val y = Seq(x, x.copy()).toDS.reduce( (a,b) => a.copy(a.num + b.num))
> y: Data = Data(0E-18)
>
> scala> y.num
> res12: BigDecimal = 0E-18
>
> scala> BigDecimal("1") - 1
> res15: scala.math.BigDecimal = 0
>
> Am I looking at anything valuable?
>
> Efe

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



[Spark 2] BigDecimal and 0

2016-10-24 Thread Efe Selcuk
I’m trying to track down what seems to be a very slight imprecision in our
Spark application; two of our columns, which should be netting out to
exactly zero, are coming up with very small fractions of non-zero value.
The only thing that I’ve found out of place is that a case class entry into
a Dataset we’ve generated with BigDecimal(“0”) will end up as 0E-18 after
it goes through Spark, and I don’t know if there’s any appreciable
difference between that and the actual 0 value, which can be generated with
BigDecimal. Here’s a contrived example:

scala> case class Data(num: BigDecimal)
defined class Data

scala> val x = Data(0)
x: Data = Data(0)

scala> x.num
res9: BigDecimal = 0

scala> val y = Seq(x, x.copy()).toDS.reduce( (a,b) => a.copy(a.num + b.num))
y: Data = Data(0E-18)

scala> y.num
res12: BigDecimal = 0E-18

scala> BigDecimal("1") - 1
res15: scala.math.BigDecimal = 0

Am I looking at anything valuable?

Efe


Re: spark streaming with kinesis

2016-10-24 Thread Takeshi Yamamuro
Hi,

The only thing you can do for Kinesis checkpoints is tune the interval of
them.
https://github.com/apache/spark/blob/master/external/
kinesis-asl/src/main/scala/org/apache/spark/streaming/
kinesis/KinesisUtils.scala#L68

Whether the dataloss occurs or not depends on the storage level you set;
if you set StorageLevel.MEMORY_AND_DISK_2, Spark may continue processing
in case of the dataloss because the stream data Spark receives are
replicated across executors.
However,  all the executors that have the replicated data crash,
IIUC the dataloss occurs.

// maropu

On Mon, Oct 24, 2016 at 4:43 PM, Shushant Arora 
wrote:

> Does spark streaming consumer for kinesis uses Kinesis Client Library
>  and mandates to checkpoint the sequence number of shards in dynamo db.
>
> Will it lead to dataloss if consumed datarecords are not yet processed and
> kinesis checkpointed the consumed sequenece numbers in dynamo db and spark
> worker crashes - then spark launched the worker on another node but start
> consuming from dynamo db's checkpointed sequence number which is ahead of
> processed sequenece number .
>
> is there a way to checkpoint the sequenece numbers ourselves in Kinesis as
> it is in Kafka low level consumer ?
>
> Thanks
>
>


-- 
---
Takeshi Yamamuro


[Spark 2.0.1] Error in generated code, possible regression?

2016-10-24 Thread Efe Selcuk
I have an application that works in 2.0.0 but has been dying at runtime on
the 2.0.1 distribution.

at
org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$.org$apache$spark$sql$catalyst$expressions$codegen$CodeGenerator$$doCompile(CodeGenerator.scala:893)
at
org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$$anon$1.load(CodeGenerator.scala:950)
at
org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$$anon$1.load(CodeGenerator.scala:947)
at
org.spark_project.guava.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3599)
at
org.spark_project.guava.cache.LocalCache$Segment.loadSync(LocalCache.java:2379)
... 30 more
Caused by: org.codehaus.commons.compiler.CompileException: File
'generated.java', Line 74, Column 145: Unknown variable or type "value4"

It also includes a massive 1800-line generated code output (which repeats
over and over, even on 1 thread, which makes this a pain), but fortunately
the error occurs early so I can give at least some context.

/* 001 */ public java.lang.Object generate(Object[] references) {
/* 002 */   return new SpecificMutableProjection(references);
/* 003 */ }
/* 004 */
/* 005 */ class SpecificMutableProjection extends
org.apache.spark.sql.catalyst.expressions.codegen.BaseMutableProjection {
/* 006 */
/* 007 */   private Object[] references;
/* 008 */   private MutableRow mutableRow;
/* 009 */   private Object[] values;
... // many lines of class variables, mostly errMsg strings and Object[]
/* 071 */   private void apply2_7(InternalRow i) {
/* 072 */
/* 073 */ boolean isNull215 = false;
/* 074 */ final com.mypackage.MyThing value215 = isNull215 ? null :
(com.mypackage.MyThing) value4._2();
/* 075 */ isNull215 = value215 == null;
/* 076 */
...

As you can see, on line 74 there's a reference to value4 but nothing called
value4 has been defined. I have no idea of where to even begin looking for
what caused this, or even whether it's my fault or a bug in the code
generation. Any help is appreciated.

Efe


Modifying Metadata in StructType schemas

2016-10-24 Thread Everett Anderson
Hi,

I've been using the immutable Metadata within the StructType of a
DataFrame/Dataset to track application-level column lineage.

However, since it's immutable, the only way to modify it is to do a full
trip of

   1. Convert DataFrame/Dataset to Row RDD
   2. Create new, modified Metadata per column from the old
   3. Create a new StructType with the modified metadata
   4. Convert the Row RDD + StructType schema to a DataFrame/Dataset

It looks like conversion to/from an RDD might involve real work, even
though in this case the data itself isn't modified at all.

Is there a better way to do this?

Thanks!


Re: LIMIT issue of SparkSQL

2016-10-24 Thread Michael Armbrust
It is not about limits on specific tables.  We do support that.  The case
I'm describing involves pushing limits across system boundaries.  It is
certainly possible to do this, but the current datasource API does provide
this information (other than the implicit limit that is pushed down to the
consumed iterator of the data source).

On Mon, Oct 24, 2016 at 9:11 AM, Mich Talebzadeh 
wrote:

> This is an interesting point.
>
> As far as I know in any database (practically all RDBMS Oracle, SAP etc),
> the LIMIT affects the collection part of the result set.
>
> The result set is carried out fully on the query that may involve multiple
> joins on multiple underlying tables.
>
> To limit the actual query by LIMIT on each underlying table does not make
> sense and will not be industry standard AFAK.
>
> HTH
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
>
>
> http://talebzadehmich.wordpress.com
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
> On 24 October 2016 at 06:48, Michael Armbrust 
> wrote:
>
>> - dev + user
>>
>> Can you give more info about the query?  Maybe a full explain()?  Are you
>> using a datasource like JDBC?  The API does not currently push down limits,
>> but the documentation talks about how you can use a query instead of a
>> table if that is what you are looking to do.
>>
>> On Mon, Oct 24, 2016 at 5:40 AM, Liz Bai  wrote:
>>
>>> Hi all,
>>>
>>> Let me clarify the problem:
>>>
>>> Suppose we have a simple table `A` with 100 000 000 records
>>>
>>> Problem:
>>> When we execute sql query ‘select * from A Limit 500`,
>>> It scan through all 100 000 000 records.
>>> Normal behaviour should be that once 500 records is found, engine stop
>>> scanning.
>>>
>>> Detailed observation:
>>> We found that there are “GlobalLimit / LocalLimit” physical operators
>>> https://github.com/apache/spark/blob/branch-2.0/sql/core/src
>>> /main/scala/org/apache/spark/sql/execution/limit.scala
>>> But during query plan generation, GlobalLimit / LocalLimit is not
>>> applied to the query plan.
>>>
>>> Could you please help us to inspect LIMIT problem?
>>> Thanks.
>>>
>>> Best,
>>> Liz
>>>
>>> On 23 Oct 2016, at 10:11 PM, Xiao Li  wrote:
>>>
>>> Hi, Liz,
>>>
>>> CollectLimit means `Take the first `limit` elements and collect them to
>>> a single partition.`
>>>
>>> Thanks,
>>>
>>> Xiao
>>>
>>> 2016-10-23 5:21 GMT-07:00 Ran Bai :
>>>
 Hi all,

 I found the runtime for query with or without “LIMIT” keyword is the
 same. We looked into it and found actually there is “GlobalLimit /
 LocalLimit” in logical plan, however no relevant physical plan there. Is
 this a bug or something else? Attached are the logical and physical plans
 when running "SELECT * FROM seq LIMIT 1".


 More specifically, We expected a early stop upon getting adequate
 results.
 Thanks so much.

 Best,
 Liz




 -
 To unsubscribe e-mail: dev-unsubscr...@spark.apache.org

>>>
>>>
>>>
>>
>


Re: Writing to Parquet Job turns to wait mode after even completion of job

2016-10-24 Thread Steve Loughran

On 24 Oct 2016, at 20:32, Cheng Lian 
> wrote:



On 10/22/16 6:18 AM, Steve Loughran wrote:

...
On Sat, Oct 22, 2016 at 3:41 AM, Cheng Lian 
> wrote:

What version of Spark are you using and how many output files does the job 
writes out?

By default, Spark versions before 1.6 (not including) writes Parquet summary 
files when committing the job. This process reads footers from all Parquet 
files in the destination directory and merges them together. This can be 
particularly bad if you are appending a small amount of data to a large 
existing Parquet dataset.

If that's the case, you may disable Parquet summary files by setting Hadoop 
configuration " parquet.enable.summary-metadata" to false.


Now I'm a bit mixed up. Should that be 
spark.sql.parquet.enable.summary-metadata =false?
No, "parquet.enable.summary-metadata" is a Hadoop configuration option 
introduced by Parquet. In Spark 2.0, you can simply set it using 
spark.conf.set(), Spark will propagate it properly.


OK, chased it down to  a feature that ryanb @ netflix made optional, presumably 
for their s3 work (PARQUET-107 )

This is what I'm going to say make a good set of options for S3A & Parquet

spark.sql.parquet.filterPushdown true
spark.sql.parquet.mergeSchema false
spark.hadoop.parquet.enable.summary-metadata false

While for ORC, you want


spark.sql.orc.splits.include.file.footer true
spark.sql.orc.cache.stripe.details.size 1
spark.sql.orc.filterPushdown true

And:

spark.sql.hive.metastorePartitionPruning true

along with commitment via:

spark.speculation false
spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version 2
spark.hadoop.mapreduce.fileoutputcommitter.cleanup.skipped true



For when people get to play with the Hadoop S3A phase II binaries, they'll also 
be wanting

spark.hadoop.fs.s3a.readahead.range 157810688

// faster backward seek for ORC and Parquet input
spark.hadoop.fs.s3a.experimental.input.fadvise random

// PUT blocks in separate threads
spark.hadoop.fs.s3a.fast.output.enabled true


the fadvise one is *really* good when working with ORC/Parquet; without that 
column filtering and predicate pushdown is somewhat crippled.



Re: Getting the IP address of Spark Driver in yarn-cluster mode

2016-10-24 Thread Steve Loughran

On 24 Oct 2016, at 19:34, Masood Krohy 
> wrote:

Hi everyone,

Is there a way to set the IP address/hostname that the Spark Driver is going to 
be running on when launching a program through spark-submit in yarn-cluster 
mode (PySpark 1.6.0)?

I do not see an option for this. If not, is there a way to get this IP address 
after the Spark app has started running? (through an API call at the beginning 
of the program to be used in the rest of the program). spark-submit outputs 
“ApplicationMaster host: 10.0.0.9” in the console (and changes on every run due 
to yarn cluster mode) and I am wondering if this can be accessed within the 
program. It does not seem to me that a YARN node label can be used to tie the 
Spark Driver/AM to a node, while allowing the Executors to run on all the nodes.



you can grab it off the YARN API itself; there's a REST view as well as a 
fussier RPC level. That is, assuming you want the web view, which is what is 
registered.

If you know the application ID, you can also construct a URL through the YARN 
proxy; any attempt to talk direct to the AM is going to get 302'd back there 
anyway so any kerberos credentials can be verified.



Re: [Spark 2.0.0] error when unioning to an empty dataset

2016-10-24 Thread Efe Selcuk
All right, I looked at the schemas. There is one mismatching nullability,
on a scala.Boolean. It looks like an empty Dataset with that *cannot* be
nullable. However, when I run my code to generate the Dataset, the schema
comes back with nullable = true. Effectively:

scala> val empty = spark.createDataset[SomeClass]
scala> empty.printSchema
root
 |-- aCaseClass: struct (nullable = true)
 ||-- aBool: boolean (nullable = false)


scala> val data = // Dataset#flatMap that returns a Dataset[SomeClass]
scala> data.printSchema
root
 |-- aCaseClass: struct (nullable = true)
 ||-- aBool: boolean (nullable = true)

scala> empty.union(data)
org.apache.spark.sql.AnalysisException: unresolved operator 'Union;

If I switch the Boolean to a java.lang.Boolean, I get nullable = true in
the empty schema and the union starts working.

1) Is there a fix for this that I can do without jumping through hoops? I
don't know of the implications to switching to java.lang.Boolean.

2) It looks like this is probably the issue that these PRs fix:
https://github.com/apache/spark/pull/15595 and
https://github.com/apache/spark/pull/15602  Is there a timeline for 2.0.2?
I'm in a situation where I can't easily build from source.

On Mon, Oct 24, 2016 at 12:29 PM Cheng Lian  wrote:

>
>
> On 10/22/16 1:42 PM, Efe Selcuk wrote:
>
> Ah, looks similar. Next opportunity I get, I'm going to do a printSchema
> on the two datasets and see if they don't match up.
>
> I assume that unioning the underlying RDDs doesn't run into this problem
> because of less type checking or something along those lines?
>
> Exactly.
>
>
> On Fri, Oct 21, 2016 at 3:39 PM Cheng Lian  wrote:
>
> Efe - You probably hit this bug:
> https://issues.apache.org/jira/browse/SPARK-18058
>
> On 10/21/16 2:03 AM, Agraj Mangal wrote:
>
> I have seen this error sometimes when the elements in the schema have
> different nullabilities. Could you print the schema for data and for
> someCode.thatReturnsADataset() and see if there is any difference between
> the two ?
>
> On Fri, Oct 21, 2016 at 9:14 AM, Efe Selcuk  wrote:
>
> Thanks for the response. What do you mean by "semantically" the same?
> They're both Datasets of the same type, which is a case class, so I would
> expect compile-time integrity of the data. Is there a situation where this
> wouldn't be the case?
>
> Interestingly enough, if I instead create an empty rdd with
> sparkContext.emptyRDD of the same case class type, it works!
>
> So something like:
> var data = spark.sparkContext.emptyRDD[SomeData]
>
> // loop
>   data = data.union(someCode.thatReturnsADataset().rdd)
> // end loop
>
> data.toDS //so I can union it to the actual Dataset I have elsewhere
>
> On Thu, Oct 20, 2016 at 8:34 PM Agraj Mangal  wrote:
>
> I believe this normally comes when Spark is unable to perform union due to
> "difference" in schema of the operands. Can you check if the schema of both
> the datasets are semantically same ?
>
> On Tue, Oct 18, 2016 at 9:06 AM, Efe Selcuk  wrote:
>
> Bump!
>
> On Thu, Oct 13, 2016 at 8:25 PM Efe Selcuk  wrote:
>
> I have a use case where I want to build a dataset based off of
> conditionally available data. I thought I'd do something like this:
>
> case class SomeData( ... ) // parameters are basic encodable types like
> strings and BigDecimals
>
> var data = spark.emptyDataset[SomeData]
>
> // loop, determining what data to ingest and process into datasets
>   data = data.union(someCode.thatReturnsADataset)
> // end loop
>
> However I get a runtime exception:
>
> Exception in thread "main" org.apache.spark.sql.AnalysisException:
> unresolved operator 'Union;
> at
> org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.failAnalysis(CheckAnalysis.scala:40)
> at
> org.apache.spark.sql.catalyst.analysis.Analyzer.failAnalysis(Analyzer.scala:58)
> at
> org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:361)
> at
> org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:67)
> at
> org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:126)
> at
> org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.checkAnalysis(CheckAnalysis.scala:67)
> at
> org.apache.spark.sql.catalyst.analysis.Analyzer.checkAnalysis(Analyzer.scala:58)
> at
> org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:49)
> at org.apache.spark.sql.Dataset.(Dataset.scala:161)
> at org.apache.spark.sql.Dataset.(Dataset.scala:167)
> at org.apache.spark.sql.Dataset$.apply(Dataset.scala:59)
> at org.apache.spark.sql.Dataset.withTypedPlan(Dataset.scala:2594)
> at org.apache.spark.sql.Dataset.union(Dataset.scala:1459)
>
> Granted, I'm new at Spark so this might be an 

Need help with SVM

2016-10-24 Thread aditya1702
Hello,
I am using linear SVM to train my model and generate a line through my data.
However my model always predicts 1 for all the feature examples. Here is my
code:

print data_rdd.take(5)
[LabeledPoint(1.0, [1.9643,4.5957]), LabeledPoint(1.0, [2.2753,3.8589]),
LabeledPoint(1.0, [2.9781,4.5651]), LabeledPoint(1.0, [2.932,3.5519]),
LabeledPoint(1.0, [3.5772,2.856])]


from pyspark.mllib.classification import SVMWithSGD
from pyspark.mllib.linalg import Vectors
from sklearn.svm import SVC
data_rdd=x_df.map(lambda x:LabeledPoint(x[1],x[0]))

model = SVMWithSGD.train(data_rdd, iterations=1000,regParam=1)

X=x_df.map(lambda x:x[0]).collect()
Y=x_df.map(lambda x:x[1]).collect()


pred=[]
for i in X:
  pred.append(model.predict(i))
print pred

[1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1,
1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1,
1]


My dataset is as follows:

 


Can someone please help?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Need-help-with-SVM-tp27955.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Spark SQL is slower when DataFrame is cache in Memory

2016-10-24 Thread Kazuaki Ishizaki
Hi Chin Wei,
I am sorry for being late to reply.

Got it. Interesting behavior. How did you measure the time between 1st and 
2nd events?

Best Regards,
Kazuaki Ishizaki



From:   Chin Wei Low 
To: Kazuaki Ishizaki/Japan/IBM@IBMJP
Cc: user@spark.apache.org
Date:   2016/10/10 11:33
Subject:Re: Spark SQL is slower when DataFrame is cache in Memory



Hi Ishizaki san,

Thanks for the reply.

So, when I pre-cache the dataframe, the cache is being used during the job 
execution.

Actually there are 3 events:
1. call res.collect
2. job started
3. job completed

I am concerning about the longer time taken between 1st and 2nd events. It 
seems like the query planning and optimization is longer when query on 
cached dataframe.


Regards,
Chin Wei

On Fri, Oct 7, 2016 at 10:14 PM, Kazuaki Ishizaki  
wrote:
Hi Chin Wei,
Yes, since you force to create a cache by executing df.count, Spark starts 
to get data from cache for the following task:
val res = sqlContext.sql("table1 union table2 union table3")
res.collect()

If you insert 'res.explain', you can confirm which resource you use to get 
data, cache or parquet?
val res = sqlContext.sql("table1 union table2 union table3")
res.explain(true)
res.collect()

Do I make some misunderstandings?

Best Regards,
Kazuaki Ishizaki



From:Chin Wei Low 
To:Kazuaki Ishizaki/Japan/IBM@IBMJP
Cc:user@spark.apache.org
Date:2016/10/07 20:06
Subject:Re: Spark SQL is slower when DataFrame is cache in Memory




Hi Ishizaki san,

So there is a gap between res.collect
and when I see this log:   spark.SparkContext: Starting job: collect at 
:26

What you mean is, during this time Spark already start to get data from 
cache? Isn't it should only get the data after the job is started and 
tasks are distributed?

Regards,
Chin Wei


On Fri, Oct 7, 2016 at 3:43 PM, Kazuaki Ishizaki  
wrote:
Hi,
I think that the result looks correct. The current Spark spends extra time 
for getting data from a cache. There are two reasons. One is for a 
complicated path to get a data. The other is for decompression in the case 
of a primitive type.
The new implementation (https://github.com/apache/spark/pull/15219) is 
ready for review. It would achieve 1.2x performance improvement for a 
compressed column and much performance improvement for an uncompressed 
column.

Best Regards,
Kazuaki Ishizaki



From:Chin Wei Low 
To:user@spark.apache.org
Date:2016/10/07 13:05
Subject:Spark SQL is slower when DataFrame is cache in Memory




Hi,

I am using Spark 1.6.0. I have a Spark application that create and cache 
(in memory) DataFrames (around 50+, with some on single parquet file and 
some on folder with a few parquet files) with the following codes:

val df = sqlContext.read.parquet
df.persist
df.count

I union them to 3 DataFrames and register that as temp table.

Then, run the following codes:
val res = sqlContext.sql("table1 union table2 union table3")
res.collect()

The res.collect() is slower when I cache the DataFrame compare to without 
cache. e.g. 3 seconds vs 1 second

I turn on the DEBUG log and see there is a gap from the res.collect() to 
start the Spark job.

Is the extra time taken by the query planning & optimization? It does not 
show the gap when I do not cache the dataframe.

Anything I am missing here?

Regards,
Chin Wei








Re: Writing to Parquet Job turns to wait mode after even completion of job

2016-10-24 Thread Cheng Lian



On 10/22/16 6:18 AM, Steve Loughran wrote:

...
On Sat, Oct 22, 2016 at 3:41 AM, Cheng Lian > wrote:


What version of Spark are you using and how many output files
does the job writes out?

By default, Spark versions before 1.6 (not including) writes
Parquet summary files when committing the job. This process reads
footers from all Parquet files in the destination directory and
merges them together. This can be particularly bad if you are
appending a small amount of data to a large existing Parquet dataset.

If that's the case, you may disable Parquet summary files by
setting Hadoop configuration " parquet.enable.summary-metadata"
to false.




Now I'm a bit mixed up. Should that be 
spark.sql.parquet.enable.summary-metadata =false?
No, "parquet.enable.summary-metadata" is a Hadoop configuration option 
introduced by Parquet. In Spark 2.0, you can simply set it using 
spark.conf.set(), Spark will propagate it properly.



We've disabled it by default since 1.6.0

Cheng


On 10/21/16 1:47 PM, Chetan Khatri wrote:

Hello Spark Users,

I am writing around 10 GB of Processed Data to Parquet where
having 1 TB of HDD and 102 GB of RAM, 16 vCore machine on Google
Cloud.

Every time, i write to parquet. it shows on Spark UI that stages
succeeded but on spark shell it hold context on wait mode for
almost 10 mins. then it clears broadcast, accumulator shared
variables.

Can we sped up this thing ?

Thanks.

-- 
Yours Aye,

Chetan Khatri.
M.+91 7 80574
Data Science Researcher
INDIA

​​Statement of Confidentiality

The contents of this e-mail message and any attachments are
confidential and are intended solely for addressee. The
information may also be legally privileged. This transmission is
sent in trust, for the sole purpose of delivery to the intended
recipient. If you have received this transmission in error, any
use, reproduction or dissemination of this transmission is
strictly prohibited. If you are not the intended recipient,
please immediately notify the sender by reply e-mail or phone
and delete this message and its attachments, if any.​​





--
Yours Aye,
Chetan Khatri.
M.+91 7 80574
Data Science Researcher
INDIA

​​Statement of Confidentiality

The contents of this e-mail message and any attachments are 
confidential and are intended solely for addressee. The information 
may also be legally privileged. This transmission is sent in trust, 
for the sole purpose of delivery to the intended recipient. If you 
have received this transmission in error, any use, reproduction or 
dissemination of this transmission is strictly prohibited. If you are 
not the intended recipient, please immediately notify the sender by 
reply e-mail or phone and delete this message and its attachments, if 
any.​​






Re: [Spark 2.0.0] error when unioning to an empty dataset

2016-10-24 Thread Cheng Lian



On 10/22/16 1:42 PM, Efe Selcuk wrote:
Ah, looks similar. Next opportunity I get, I'm going to do a 
printSchema on the two datasets and see if they don't match up.


I assume that unioning the underlying RDDs doesn't run into this 
problem because of less type checking or something along those lines?

Exactly.


On Fri, Oct 21, 2016 at 3:39 PM Cheng Lian > wrote:


Efe - You probably hit this bug:
https://issues.apache.org/jira/browse/SPARK-18058


On 10/21/16 2:03 AM, Agraj Mangal wrote:

I have seen this error sometimes when the elements in the schema
have different nullabilities. Could you print the schema for
data and for someCode.thatReturnsADataset() and see if there is
any difference between the two ?

On Fri, Oct 21, 2016 at 9:14 AM, Efe Selcuk > wrote:

Thanks for the response. What do you mean by "semantically"
the same? They're both Datasets of the same type, which is a
case class, so I would expect compile-time integrity of the
data. Is there a situation where this wouldn't be the case?

Interestingly enough, if I instead create an empty rdd with
sparkContext.emptyRDD of the same case class type, it works!

So something like:
var data = spark.sparkContext.emptyRDD[SomeData]

// loop
data = data.union(someCode.thatReturnsADataset().rdd)
// end loop

data.toDS //so I can union it to the actual Dataset I have
elsewhere

On Thu, Oct 20, 2016 at 8:34 PM Agraj Mangal
> wrote:

I believe this normally comes when Spark is unable to
perform union due to "difference" in schema of the
operands. Can you check if the schema of both the
datasets are semantically same ?

On Tue, Oct 18, 2016 at 9:06 AM, Efe Selcuk
> wrote:

Bump!

On Thu, Oct 13, 2016 at 8:25 PM Efe Selcuk
> wrote:

I have a use case where I want to build a dataset
based off of conditionally available data. I
thought I'd do something like this:

case class SomeData( ... ) // parameters are
basic encodable types like strings and BigDecimals

var data = spark.emptyDataset[SomeData]

// loop, determining what data to ingest and
process into datasets
data = data.union(someCode.thatReturnsADataset)
// end loop

However I get a runtime exception:

Exception in thread "main"
org.apache.spark.sql.AnalysisException:
unresolved operator 'Union;
at

org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.failAnalysis(CheckAnalysis.scala:40)
at

org.apache.spark.sql.catalyst.analysis.Analyzer.failAnalysis(Analyzer.scala:58)
at

org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:361)
at

org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:67)
at

org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:126)
at

org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.checkAnalysis(CheckAnalysis.scala:67)
at

org.apache.spark.sql.catalyst.analysis.Analyzer.checkAnalysis(Analyzer.scala:58)
at

org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:49)
at
org.apache.spark.sql.Dataset.(Dataset.scala:161)
at
org.apache.spark.sql.Dataset.(Dataset.scala:167)
at
org.apache.spark.sql.Dataset$.apply(Dataset.scala:59)
at

org.apache.spark.sql.Dataset.withTypedPlan(Dataset.scala:2594)
at
org.apache.spark.sql.Dataset.union(Dataset.scala:1459)

Granted, I'm new at Spark so this might be an
anti-pattern, so I'm open to suggestions. However
it doesn't seem like I'm doing anything incorrect
here, the types are correct. Searching for this

Getting the IP address of Spark Driver in yarn-cluster mode

2016-10-24 Thread Masood Krohy
Hi everyone,
Is there a way to set the IP address/hostname that the Spark Driver is 
going to be running on when launching a program through spark-submit in 
yarn-cluster mode (PySpark 1.6.0)?
I do not see an option for this. If not, is there a way to get this IP 
address after the Spark app has started running? (through an API call at 
the beginning of the program to be used in the rest of the program). 
spark-submit outputs “ApplicationMaster host: 10.0.0.9” in the console 
(and changes on every run due to yarn cluster mode) and I am wondering if 
this can be accessed within the program. It does not seem to me that a 
YARN node label can be used to tie the Spark Driver/AM to a node, while 
allowing the Executors to run on all the nodes.
I am running a parameter server along with the Spark Driver that needs to 
be contacted during the program execution; I need the Driver’s IP so that 
other executors can call back to this server. I need to stick to the 
yarn-cluster mode.
Thanks for any hints in advance.
Masood
PS: the closest code I was able to write is this which is not outputting 
what I need:
print sc.statusTracker().getJobInfo( 
sc.statusTracker().getActiveJobsIds()[0] )
# output in YARN stdout log: SparkJobInfo(jobId=4, stageIds=JavaObject 
id=o101, status='SUCCEEDED')

--
Masood Krohy, Ph.D.
Data Scientist, Intact Lab-R
Intact Financial Corporation


Re: Generate random numbers from Normal Distribution with Specific Mean and Variance

2016-10-24 Thread Sean Owen
In the context of Spark, there are already things like RandomRDD and SQL
randn() to generate random standard normal variables.

If you want to do it directly, Commons Math is a good choice in the JVM,
among others.

Once you have a standard normal, just multiply by the stdev and add the
mean to get any other univariate normal. No need for special support for it.

On Mon, Oct 24, 2016 at 5:05 PM Mich Talebzadeh 
wrote:

> me being lazy
>
> Does anyone have a library to create an array of random numbers from
> normal distribution with a given mean and variance by any chance?
>
> Something like here
> 
>
>
> Thanks
>
>
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
>
>
> http://talebzadehmich.wordpress.com
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>


Re: Generate random numbers from Normal Distribution with Specific Mean and Variance

2016-10-24 Thread Jörn Franke
https://github.com/rnowling/bigpetstore-data-generator

> On 24 Oct 2016, at 19:17, Mich Talebzadeh  wrote:
> 
> thanks Jorn.
> 
> I wish we had these libraries somewhere :)
> 
> Dr Mich Talebzadeh
>  
> LinkedIn  
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>  
> http://talebzadehmich.wordpress.com
> 
> Disclaimer: Use it at your own risk. Any and all responsibility for any loss, 
> damage or destruction of data or any other property which may arise from 
> relying on this email's technical content is explicitly disclaimed. The 
> author will in no case be liable for any monetary damages arising from such 
> loss, damage or destruction.
>  
> 
>> On 24 October 2016 at 17:09, Jörn Franke  wrote:
>> Bigtop contains a random data generator mainly for transactions, but it 
>> could be rather easily adapted 
>> 
>>> On 24 Oct 2016, at 18:04, Mich Talebzadeh  wrote:
>>> 
>>> me being lazy
>>> 
>>> Does anyone have a library to create an array of random numbers from normal 
>>> distribution with a given mean and variance by any chance?
>>> 
>>> Something like here
>>> 
>>> 
>>> Thanks
>>> 
>>> 
>>> 
>>> Dr Mich Talebzadeh
>>>  
>>> LinkedIn  
>>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>>  
>>> http://talebzadehmich.wordpress.com
>>> 
>>> Disclaimer: Use it at your own risk. Any and all responsibility for any 
>>> loss, damage or destruction of data or any other property which may arise 
>>> from relying on this email's technical content is explicitly disclaimed. 
>>> The author will in no case be liable for any monetary damages arising from 
>>> such loss, damage or destruction.
>>>  
> 


Re: Generate random numbers from Normal Distribution with Specific Mean and Variance

2016-10-24 Thread Mich Talebzadeh
thanks Jorn.

I wish we had these libraries somewhere :)

Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.



On 24 October 2016 at 17:09, Jörn Franke  wrote:

> Bigtop contains a random data generator mainly for transactions, but it
> could be rather easily adapted
>
> On 24 Oct 2016, at 18:04, Mich Talebzadeh 
> wrote:
>
> me being lazy
>
> Does anyone have a library to create an array of random numbers from
> normal distribution with a given mean and variance by any chance?
>
> Something like here
> 
>
>
> Thanks
>
>
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
>
>
> http://talebzadehmich.wordpress.com
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>


Re: Spark 2.0 - DataFrames vs Dataset performance

2016-10-24 Thread Daniel Darabos
Hi Antoaneta,
I believe the difference is not due to Datasets being slower (DataFrames
are just an alias to Datasets now), but rather using a user defined
function for filtering vs using Spark builtins. The builtin can use tricks
from Project Tungsten, such as only deserializing the "event_type" column.
The user-defined function on the other hand has to be called with a full
case class, so the whole object needs to be deserialized for each row.

Well, that is my understanding from reading some slides. Hopefully someone
with a more direct knowledge of the code will correct me if I'm wrong.

On Mon, Oct 24, 2016 at 2:50 PM, Antoaneta Marinova <
antoaneta.vmarin...@gmail.com> wrote:

> Hello,
>
> I am using Spark 2.0 for performing filtering, grouping and counting
> operations on events data saved in parquet files. As the events schema has
> very nested structure I wanted to read them as scala beans to simplify the
> code but I noticed a severe performance degradation. Below you can find
> simple comparison of the same operation using DataFrame and Dataset.
>
> val data = session.read.parquet("events_data/")
>
> // Using Datasets with custom class
>
> //Case class matching the events schema
>
> case class CustomEvent(event_id: Option[String],
>
> event_type: Option[String]
>context : Option[Context],
>
> ….
>time: Option[BigInt]) extends Serializable {}
>
> scala> val start = System.currentTimeMillis ;
>
>   val count = data.as[CustomEvent].filter(event =>
> eventNames.contains(event.event_type.get)).count ;
>
>  val time =  System.currentTimeMillis - start
>
> count: Long = 5545
>
> time: Long = 11262
>
> // Using DataFrames
>
> scala>
>
> val start = System.currentTimeMillis ;
>
> val count = data.filter(col("event_type").isin(eventNames:_*)).count ;
>
> val time =  System.currentTimeMillis - start
>
> count: Long = 5545
>
> time: Long = 147
>
> The schema of the events is something like this:
>
> //events schma
>
> schemaroot
>
> |-- event_id: string (nullable = true)
>
> |-- event_type: string (nullable = true)
>
> |-- context: struct (nullable = true)
>
> ||-- environment_1: struct (nullable = true)
>
> |||-- filed1: integer (nullable = true)
>
> |||-- filed2: integer (nullable = true)
>
> |||-- filed3: integer (nullable = true)
>
> ||-- environment_2: struct (nullable = true)
>
> |||-- filed_1: string (nullable = true)
>
> 
>
> |||-- filed_n: string (nullable = true)
>
> |-- metadata: struct (nullable = true)
>
> ||-- elements: array (nullable = true)
>
> |||-- element: struct (containsNull = true)
>
> ||||-- config: string (nullable = true)
>
> ||||-- tree: array (nullable = true)
>
> |||||-- element: struct (containsNull = true)
>
> ||||||-- path: array (nullable = true)
>
> |||||||-- element: struct (containsNull = true)
>
> ||||||||-- key: string (nullable = true)
>
> ||||||||-- name: string (nullable = true)
>
> ||||||||-- level: integer (nullable = true)
>
> |-- time: long (nullable = true)
>
> Could you please advise me on the usage of the different abstractions and
> help me understand why using datasets with user defined class is so much
> slower.
>
> Thank you,
> Antoaneta
>


Re: Generate random numbers from Normal Distribution with Specific Mean and Variance

2016-10-24 Thread Jörn Franke
Bigtop contains a random data generator mainly for transactions, but it could 
be rather easily adapted 

> On 24 Oct 2016, at 18:04, Mich Talebzadeh  wrote:
> 
> me being lazy
> 
> Does anyone have a library to create an array of random numbers from normal 
> distribution with a given mean and variance by any chance?
> 
> Something like here
> 
> 
> Thanks
> 
> 
> 
> Dr Mich Talebzadeh
>  
> LinkedIn  
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>  
> http://talebzadehmich.wordpress.com
> 
> Disclaimer: Use it at your own risk. Any and all responsibility for any loss, 
> damage or destruction of data or any other property which may arise from 
> relying on this email's technical content is explicitly disclaimed. The 
> author will in no case be liable for any monetary damages arising from such 
> loss, damage or destruction.
>  


Generate random numbers from Normal Distribution with Specific Mean and Variance

2016-10-24 Thread Mich Talebzadeh
me being lazy

Does anyone have a library to create an array of random numbers from
normal distribution with a given mean and variance by any chance?

Something like here



Thanks



Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.


Re: pyspark doesn't recognize MMM dateFormat pattern in spark.read.load() for dates like 1989Dec31 and 31Dec1989

2016-10-24 Thread Pietro Pugni
This worked without setting other options:
spark/bin/spark-submit --conf 
"spark.driver.extraJavaOptions=-Duser.language=en" test.py

Thank you again!
 Pietro

> Il giorno 24 ott 2016, alle ore 17:18, Sean Owen  ha 
> scritto:
> 
> I believe it will be too late to set it there, and these are JVM flags, not 
> app or Spark flags. See spark.driver.extraJavaOptions and likewise for the 
> executor.
> 
> On Mon, Oct 24, 2016 at 4:04 PM Pietro Pugni  > wrote:
> Thank you!
> 
> I tried again setting locale options in different ways but doesn’t propagate 
> to the JVM. I tested these strategies (alone and all together):
> - bin/spark-submit --conf "spark.executor.extraJavaOptions=-Duser.language=en 
> -Duser.region=US -Duser.country=US -Duser.timezone=GMT” test.py
> - spark = SparkSession \
>   .builder \
>   .appName("My app") \
>   .config("spark.executor.extraJavaOptions", "-Duser.language=en 
> -Duser.region=US -Duser.country=US -Duser.timezone=GMT") \
>   .config("user.country", "US") \
>   .config("user.region", "US") \
>   .config("user.language", "en") \
>   .config("user.timezone", "GMT") \
>   .config("-Duser.country", "US") \
>   .config("-Duser.region", "US") \
>   .config("-Duser.language", "en") \
>   .config("-Duser.timezone", "GMT") \
>   .getOrCreate()
> - export JAVA_OPTS="-Duser.language=en -Duser.region=US -Duser.country=US 
> -Duser.timezone=GMT”
> - export LANG="en_US.UTF-8”
> 
> After running export LANG="en_US.UTF-8” from the same terminal session I use 
> to launch spark-submit, if I run locale command I get correct values:
> LANG="en_US.UTF-8"
> LC_COLLATE="en_US.UTF-8"
> LC_CTYPE="en_US.UTF-8"
> LC_MESSAGES="en_US.UTF-8"
> LC_MONETARY="en_US.UTF-8"
> LC_NUMERIC="en_US.UTF-8"
> LC_TIME="en_US.UTF-8"
> LC_ALL=
> 
> While running my pyspark script, from the Spark UI,  under Environment -> 
> Spark Properties the locale appear to be correctly set:
> - user.country: US
> - user.language: en
> - user.region: US
> - user.timezone: GMT
> 
> but Environment -> System Properties still reports the System locale and not 
> the session locale I previously set:
> - user.country: IT
> - user.language: it
> - user.timezone: Europe/Rome
> 
> Am I wrong or the options don’t propagate to the JVM correctly?
> 
> 



Accessing Phoenix table from Spark 2.0., any cure!

2016-10-24 Thread Mich Talebzadeh
My stack is this

Spark: Spark 2.0.0
Zookeeper: ZooKeeper 3.4.6
Hbase: hbase-1.2.3
Phoenix: apache-phoenix-4.8.1-HBase-1.2-bin

I am running this simple code

scala> val df = sqlContext.load("org.apache.phoenix.spark",
 | Map("table" -> "MARKETDATAHBASE", "zkUrl" -> "rhes564:2181")
 | )

java.lang.NoClassDefFoundError: org/apache/hadoop/hbase/HBaseConfiguration
  at
org.apache.phoenix.spark.PhoenixRDD.getPhoenixConfiguration(PhoenixRDD.scala:71)
  at
org.apache.phoenix.spark.PhoenixRDD.phoenixConf$lzycompute(PhoenixRDD.scala:39)
  at org.apache.phoenix.spark.PhoenixRDD.phoenixConf(PhoenixRDD.scala:38)
  at org.apache.phoenix.spark.PhoenixRDD.(PhoenixRDD.scala:42)
  at
org.apache.phoenix.spark.PhoenixRelation.schema(PhoenixRelation.scala:50)
  at
org.apache.spark.sql.execution.datasources.LogicalRelation.(LogicalRelation.scala:40)
  at
org.apache.spark.sql.SparkSession.baseRelationToDataFrame(SparkSession.scala:382)
  at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:143)
  at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:122)
  at org.apache.spark.sql.SQLContext.load(SQLContext.scala:958)
  ... 54 elided

Thanks

Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.


Re: pyspark doesn't recognize MMM dateFormat pattern in spark.read.load() for dates like 1989Dec31 and 31Dec1989

2016-10-24 Thread Sean Owen
I believe it will be too late to set it there, and these are JVM flags, not
app or Spark flags. See spark.driver.extraJavaOptions and likewise for the
executor.

On Mon, Oct 24, 2016 at 4:04 PM Pietro Pugni  wrote:

> Thank you!
>
> I tried again setting locale options in different ways but doesn’t
> propagate to the JVM. I tested these strategies (alone and all together):
> - bin/spark-submit --conf
> "spark.executor.extraJavaOptions=-Duser.language=en -Duser.region=US
> -Duser.country=US -Duser.timezone=GMT” test.py
> - spark = SparkSession \
> .builder \
> .appName("My app") \
> .config("spark.executor.extraJavaOptions", "-Duser.language=en
> -Duser.region=US -Duser.country=US -Duser.timezone=GMT") \
> .config("user.country", "US") \
> .config("user.region", "US") \
> .config("user.language", "en") \
> .config("user.timezone", "GMT") \
> .config("-Duser.country", "US") \
> .config("-Duser.region", "US") \
> .config("-Duser.language", "en") \
> .config("-Duser.timezone", "GMT") \
> .getOrCreate()
> - export JAVA_OPTS="-Duser.language=en -Duser.region=US -Duser.country=US
> -Duser.timezone=GMT”
> - export LANG="en_US.UTF-8”
>
> After running export LANG="en_US.UTF-8” from the same terminal session I
> use to launch spark-submit, if I run locale command I get correct values:
> LANG="en_US.UTF-8"
> LC_COLLATE="en_US.UTF-8"
> LC_CTYPE="en_US.UTF-8"
> LC_MESSAGES="en_US.UTF-8"
> LC_MONETARY="en_US.UTF-8"
> LC_NUMERIC="en_US.UTF-8"
> LC_TIME="en_US.UTF-8"
> LC_ALL=
>
> While running my pyspark script, from the Spark UI,  under Environment ->
> Spark Properties the locale appear to be correctly set:
> - user.country: US
> - user.language: en
> - user.region: US
> - user.timezone: GMT
>
> but Environment -> System Properties still reports the System locale and
> not the session locale I previously set:
> - user.country: IT
> - user.language: it
> - user.timezone: Europe/Rome
>
> Am I wrong or the options don’t propagate to the JVM correctly?
>
>
>


Re: reading info from spark 2.0 application UI

2016-10-24 Thread Sean Owen
What matters in this case is how many vcores YARN thinks it can allocate
per machine. I think the relevant setting is
yarn.nodemanager.resource.cpu-vcores. I bet you'll find this is actually
more than the machine's number of cores, possibly on purpose, to enable
some over-committing.

On Mon, Oct 24, 2016 at 4:13 PM TheGeorge1918 . 
wrote:

> Yep. I'm pretty sure that 4 executors are on 1 machine. I use yarn in emr.
> I have another "faulty" configuration with 9 executors and 5 of them are on
> one machine. Each one with 9 cores which adds up to 45 cores in that
> machine (the total vcores is 40). Still it works. The total number of
> vcores is 80 in the cluster but I get 81 in total from executors exclusing
> the cores for driver and system. I use aws emr ec2 instance where it
> specifies the resource available for each type of machine. Maybe I could go
> beyond the limitation in the cluster. I just want to make sure I understand
> correctly that when allocating vcores, it means vcores not the threads.
>
> Thanks a lot.
>
> Best
>
>
>
> On Mon, Oct 24, 2016 at 4:55 PM, Sean Owen  wrote:
>
> If you're really sure that 4 executors are on 1 machine, then it means
> your resource manager allowed it. What are you using, YARN? check that you
> really are limited to 40 cores per machine in the YARN config.
>
> On Mon, Oct 24, 2016 at 3:33 PM TheGeorge1918 . 
> wrote:
>
> Hi all,
>
> I'm deeply confused by the executor configuration in Spark. I have two
> machines, each with 40 vcores. By mistake, I assign 7 executors and each
> with 11 vcores (It ran without any problem). As a result, one machine has 4
> executors and the other has 3 executors + driver. But this means for the
> machine with 4 executors, it needs 4 x 11 = 44 vcores which is more than 40
> vcores available on that machine. Do I miss something here? Thanks a lot.
>
> aws emr cluster:
> 2 x m4.10xlarge machine, each with 40 vcores, 160G memory
>
> spark:
> num executors: 7
> executor memory: 33G
> num cores: 11
> driver memory: 39G
> driver cores: 6
>
>
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>
>


Re: pyspark doesn't recognize MMM dateFormat pattern in spark.read.load() for dates like 1989Dec31 and 31Dec1989

2016-10-24 Thread Pietro Pugni
Thank you!

I tried again setting locale options in different ways but doesn’t propagate to 
the JVM. I tested these strategies (alone and all together):
- bin/spark-submit --conf "spark.executor.extraJavaOptions=-Duser.language=en 
-Duser.region=US -Duser.country=US -Duser.timezone=GMT” test.py
- spark = SparkSession \
.builder \
.appName("My app") \
.config("spark.executor.extraJavaOptions", "-Duser.language=en 
-Duser.region=US -Duser.country=US -Duser.timezone=GMT") \
.config("user.country", "US") \
.config("user.region", "US") \
.config("user.language", "en") \
.config("user.timezone", "GMT") \
.config("-Duser.country", "US") \
.config("-Duser.region", "US") \
.config("-Duser.language", "en") \
.config("-Duser.timezone", "GMT") \
.getOrCreate()
- export JAVA_OPTS="-Duser.language=en -Duser.region=US -Duser.country=US 
-Duser.timezone=GMT”
- export LANG="en_US.UTF-8”

After running export LANG="en_US.UTF-8” from the same terminal session I use to 
launch spark-submit, if I run locale command I get correct values:
LANG="en_US.UTF-8"
LC_COLLATE="en_US.UTF-8"
LC_CTYPE="en_US.UTF-8"
LC_MESSAGES="en_US.UTF-8"
LC_MONETARY="en_US.UTF-8"
LC_NUMERIC="en_US.UTF-8"
LC_TIME="en_US.UTF-8"
LC_ALL=

While running my pyspark script, from the Spark UI,  under Environment -> Spark 
Properties the locale appear to be correctly set:
- user.country: US
- user.language: en
- user.region: US
- user.timezone: GMT

but Environment -> System Properties still reports the System locale and not 
the session locale I previously set:
- user.country: IT
- user.language: it
- user.timezone: Europe/Rome

Am I wrong or the options don’t propagate to the JVM correctly?




> Il giorno 24 ott 2016, alle ore 16:49, Sean Owen  ha 
> scritto:
> 
> This is more of an OS-level thing, but I think that if you can manage to set 
> -Duser.language=en to the JVM, it might do the trick.
> 
> I summarized what I think I know about this at 
> https://issues.apache.org/jira/browse/SPARK-18076 
>  and so we can decide what 
> to do, if anything, there.
> 
> Sean
> 
> On Mon, Oct 24, 2016 at 3:08 PM Pietro Pugni  > wrote:
> Thank you, I’ll appreciate that. I have no experience with Python, Java and 
> Spark, so I the question can be translated to: “How can I set JVM locale when 
> using spark-submit and pyspark?”. Probably this is possible only by changing 
> the system defaul locale and not within the Spark session, right?
> 
> Thank you
>  Pietro
> 
>> Il giorno 24 ott 2016, alle ore 14:51, Hyukjin Kwon > > ha scritto:
>> 
>> I am also interested in this issue. I will try to look into this too within 
>> coming few days..
>> 
>> 2016-10-24 21:32 GMT+09:00 Sean Owen > >:
>> I actually think this is a general problem with usage of DateFormat and 
>> SimpleDateFormat across the code, in that it relies on the default locale of 
>> the JVM. I believe this needs to, at least, default consistently to 
>> Locale.US so that behavior is consistent; otherwise it's possible that 
>> parsing and formatting of dates could work subtly differently across 
>> environments.
>> 
>> There's a similar question about some code that formats dates for the UI. 
>> It's more reasonable to let that use the platform-default locale, but, I'd 
>> still favor standardizing it I think.
>> 
>> Anyway, let me test it out a bit and possibly open a JIRA with this change 
>> for discussion.
>> 
>> On Mon, Oct 24, 2016 at 1:03 PM pietrop > > wrote:
>> Hi there,
>> I opened a question on StackOverflow at this link:
>> http://stackoverflow.com/questions/40007972/pyspark-doesnt-recognize-mmm-dateformat-pattern-in-spark-read-load-for-dates?noredirect=1#comment67297930_40007972
>>  
>> 
>> 
>> I didn’t get any useful answer, so I’m writing here hoping that someone can
>> help me.
>> 
>> In short, I’m trying to read a CSV containing data columns stored using the
>> pattern “MMMdd”. What doesn’t work for me is “MMM”. I’ve done some
>> testing and discovered that it’s a localization issue. As you can read from
>> the StackOverflow question, I run a simple Java code to parse the date
>> “1989Dec31” and it works only if I specify Locale.US in the
>> SimpleDateFormat() function.
>> 
>> I would like pyspark to work. I tried setting a different local from console
>> (LANG=“en_US”), but it doesn’t work. I tried also setting it using the
>> locale package from Python.
>> 
>> So, there’s a way to set locale in Spark when using pyspark? The issue is
>> Java related and 

Re: reading info from spark 2.0 application UI

2016-10-24 Thread Sean Owen
If you're really sure that 4 executors are on 1 machine, then it means your
resource manager allowed it. What are you using, YARN? check that you
really are limited to 40 cores per machine in the YARN config.

On Mon, Oct 24, 2016 at 3:33 PM TheGeorge1918 . 
wrote:

> Hi all,
>
> I'm deeply confused by the executor configuration in Spark. I have two
> machines, each with 40 vcores. By mistake, I assign 7 executors and each
> with 11 vcores (It ran without any problem). As a result, one machine has 4
> executors and the other has 3 executors + driver. But this means for the
> machine with 4 executors, it needs 4 x 11 = 44 vcores which is more than 40
> vcores available on that machine. Do I miss something here? Thanks a lot.
>
> aws emr cluster:
> 2 x m4.10xlarge machine, each with 40 vcores, 160G memory
>
> spark:
> num executors: 7
> executor memory: 33G
> num cores: 11
> driver memory: 39G
> driver cores: 6
>
>
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org


Re: Shortest path with directed and weighted graphs

2016-10-24 Thread Michael Malak
Chapter 6 of my book implements Dijkstra's Algorithm. The source code is 
available to download for free. 
https://www.manning.com/books/spark-graphx-in-action



  From: Brian Wilson 
 To: user@spark.apache.org 
 Sent: Monday, October 24, 2016 7:11 AM
 Subject: Shortest path with directed and weighted graphs
   
I have been looking at the ShortestPaths function inbuilt with Spark here.
Am I correct in saying there is no support for weighted graphs with this 
function? By that I mean that it assumes all edges carry a weight = 1
Many thanks
Brian 

   

Re: pyspark doesn't recognize MMM dateFormat pattern in spark.read.load() for dates like 1989Dec31 and 31Dec1989

2016-10-24 Thread Sean Owen
This is more of an OS-level thing, but I think that if you can manage to
set -Duser.language=en to the JVM, it might do the trick.

I summarized what I think I know about this at
https://issues.apache.org/jira/browse/SPARK-18076 and so we can decide what
to do, if anything, there.

Sean

On Mon, Oct 24, 2016 at 3:08 PM Pietro Pugni  wrote:

> Thank you, I’ll appreciate that. I have no experience with Python, Java
> and Spark, so I the question can be translated to: “How can I set JVM
> locale when using spark-submit and pyspark?”. Probably this is possible
> only by changing the system defaul locale and not within the Spark session,
> right?
>
> Thank you
>  Pietro
>
> Il giorno 24 ott 2016, alle ore 14:51, Hyukjin Kwon 
> ha scritto:
>
> I am also interested in this issue. I will try to look into this too
> within coming few days..
>
> 2016-10-24 21:32 GMT+09:00 Sean Owen :
>
> I actually think this is a general problem with usage of DateFormat and
> SimpleDateFormat across the code, in that it relies on the default locale
> of the JVM. I believe this needs to, at least, default consistently to
> Locale.US so that behavior is consistent; otherwise it's possible that
> parsing and formatting of dates could work subtly differently across
> environments.
>
> There's a similar question about some code that formats dates for the UI.
> It's more reasonable to let that use the platform-default locale, but, I'd
> still favor standardizing it I think.
>
> Anyway, let me test it out a bit and possibly open a JIRA with this change
> for discussion.
>
> On Mon, Oct 24, 2016 at 1:03 PM pietrop  wrote:
>
> Hi there,
> I opened a question on StackOverflow at this link:
>
> http://stackoverflow.com/questions/40007972/pyspark-doesnt-recognize-mmm-dateformat-pattern-in-spark-read-load-for-dates?noredirect=1#comment67297930_40007972
>
> I didn’t get any useful answer, so I’m writing here hoping that someone can
> help me.
>
> In short, I’m trying to read a CSV containing data columns stored using the
> pattern “MMMdd”. What doesn’t work for me is “MMM”. I’ve done some
> testing and discovered that it’s a localization issue. As you can read from
> the StackOverflow question, I run a simple Java code to parse the date
> “1989Dec31” and it works only if I specify Locale.US in the
> SimpleDateFormat() function.
>
> I would like pyspark to work. I tried setting a different local from
> console
> (LANG=“en_US”), but it doesn’t work. I tried also setting it using the
> locale package from Python.
>
> So, there’s a way to set locale in Spark when using pyspark? The issue is
> Java related and not Python related (the function that parses data is
> invoked by spark.read.load(dateFormat=“MMMdd”, …). I don’t want to use
> other solutions in order to encode data because they are slower (from what
> I’ve seen so far).
>
> Thank you
> Pietro
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/pyspark-doesn-t-recognize-MMM-dateFormat-pattern-in-spark-read-load-for-dates-like-1989Dec31-and-31D9-tp27951.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com
> .
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>
>
>


Re: JAVA heap space issue

2016-10-24 Thread Sankar Mittapally
I have lot of joint SQL operations, which is blocking me write data and
unresisted the data, if not useful.

On Oct 24, 2016 7:50 PM, "Mich Talebzadeh" 
wrote:

> OK so you are disabling broadcasting although it is not obvious how this
> helps in this case!
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
>
>
> http://talebzadehmich.wordpress.com
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
> On 24 October 2016 at 15:08, Sankar Mittapally  creditvidya.com> wrote:
>
>> sc <- sparkR.session(master = "spark://ip-172-31-6-116:7077"
>> ,sparkConfig=list(spark.executor.memory="10g",spark.app.name
>> ="Testing",spark.driver.memory="14g",spark.executor.extraJavaOption="-Xms2g
>> -Xmx5g -XX:-UseGCOverheadLimit",spark.driver.extraJavaOption="-Xms2g
>> -Xmx5g -XX:-UseGCOverheadLimit",spark.cores.max="2",spark.sql.autoB
>> roadcastJoinThreshold="-1"))
>>
>> On Mon, Oct 24, 2016 at 7:33 PM, Mich Talebzadeh <
>> mich.talebza...@gmail.com> wrote:
>>
>>> OK so what is your full launch code now? I mean equivalent to
>>> spark-submit
>>>
>>>
>>>
>>> Dr Mich Talebzadeh
>>>
>>>
>>>
>>> LinkedIn * 
>>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>> *
>>>
>>>
>>>
>>> http://talebzadehmich.wordpress.com
>>>
>>>
>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>> any loss, damage or destruction of data or any other property which may
>>> arise from relying on this email's technical content is explicitly
>>> disclaimed. The author will in no case be liable for any monetary damages
>>> arising from such loss, damage or destruction.
>>>
>>>
>>>
>>> On 24 October 2016 at 14:57, Sankar Mittapally <
>>> sankar.mittapa...@creditvidya.com> wrote:
>>>
 Hi Mich,

  I am able to write the files to storage after adding extra parameter.

 FYI..

 This one I used.

 spark.sql.autoBroadcastJoinThreshold="-1"



 On Mon, Oct 24, 2016 at 7:22 PM, Mich Talebzadeh <
 mich.talebza...@gmail.com> wrote:

> Rather strange as you have plenty free memory there.
>
> Try reducing driver memory to 2GB and executer memory to 2GB and run
> it again
>
> ${SPARK_HOME}/bin/spark-submit \
>--driver-memory 2G \
> --num-executors 2 \
> --executor-cores 1 \
> --executor-memory 2G \
> --master spark://IPAddress:7077 \
>
> HTH
>
>
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
>
>
> http://talebzadehmich.wordpress.com
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for
> any loss, damage or destruction of data or any other property which may
> arise from relying on this email's technical content is explicitly
> disclaimed. The author will in no case be liable for any monetary damages
> arising from such loss, damage or destruction.
>
>
>
> On 24 October 2016 at 13:15, Sankar Mittapally <
> sankar.mittapa...@creditvidya.com> wrote:
>
>> Hi Mich,
>>
>>  Yes, I am using standalone mode cluster, We have two executors with
>> 10G memory each.  We have two workers.
>>
>> FYI..
>>
>>
>>
>> On Mon, Oct 24, 2016 at 5:22 PM, Mich Talebzadeh <
>> mich.talebza...@gmail.com> wrote:
>>
>>> Sounds like you are running in standalone mode.
>>>
>>> Have you checked the UI on port 4040 (default) to see where memory
>>> is going. Why do you need executor memory of 10GB?
>>>
>>> How many executors are running and plus how many slaves started?
>>>
>>> In standalone mode executors run on workers (UI 8080)
>>>
>>>
>>> HTH
>>>
>>> Dr Mich Talebzadeh
>>>
>>>
>>>
>>> LinkedIn * 
>>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>> *
>>>
>>>
>>>
>>> http://talebzadehmich.wordpress.com
>>>
>>>
>>> *Disclaimer:* Use it at your own risk. Any and all responsibility
>>> for any loss, damage or 

Re: JAVA heap space issue

2016-10-24 Thread Mich Talebzadeh
OK so you are disabling broadcasting although it is not obvious how this
helps in this case!

Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.



On 24 October 2016 at 15:08, Sankar Mittapally <
sankar.mittapa...@creditvidya.com> wrote:

> sc <- sparkR.session(master = "spark://ip-172-31-6-116:7077"
> ,sparkConfig=list(spark.executor.memory="10g",spark.app.name
> ="Testing",spark.driver.memory="14g",spark.executor.extraJavaOption="-Xms2g
> -Xmx5g -XX:-UseGCOverheadLimit",spark.driver.extraJavaOption="-Xms2g
> -Xmx5g -XX:-UseGCOverheadLimit",spark.cores.max="2",spark.sql.
> autoBroadcastJoinThreshold="-1"))
>
> On Mon, Oct 24, 2016 at 7:33 PM, Mich Talebzadeh <
> mich.talebza...@gmail.com> wrote:
>
>> OK so what is your full launch code now? I mean equivalent to spark-submit
>>
>>
>>
>> Dr Mich Talebzadeh
>>
>>
>>
>> LinkedIn * 
>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>> *
>>
>>
>>
>> http://talebzadehmich.wordpress.com
>>
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>> On 24 October 2016 at 14:57, Sankar Mittapally <
>> sankar.mittapa...@creditvidya.com> wrote:
>>
>>> Hi Mich,
>>>
>>>  I am able to write the files to storage after adding extra parameter.
>>>
>>> FYI..
>>>
>>> This one I used.
>>>
>>> spark.sql.autoBroadcastJoinThreshold="-1"
>>>
>>>
>>>
>>> On Mon, Oct 24, 2016 at 7:22 PM, Mich Talebzadeh <
>>> mich.talebza...@gmail.com> wrote:
>>>
 Rather strange as you have plenty free memory there.

 Try reducing driver memory to 2GB and executer memory to 2GB and run it
 again

 ${SPARK_HOME}/bin/spark-submit \
--driver-memory 2G \
 --num-executors 2 \
 --executor-cores 1 \
 --executor-memory 2G \
 --master spark://IPAddress:7077 \

 HTH



 Dr Mich Talebzadeh



 LinkedIn * 
 https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
 *



 http://talebzadehmich.wordpress.com


 *Disclaimer:* Use it at your own risk. Any and all responsibility for
 any loss, damage or destruction of data or any other property which may
 arise from relying on this email's technical content is explicitly
 disclaimed. The author will in no case be liable for any monetary damages
 arising from such loss, damage or destruction.



 On 24 October 2016 at 13:15, Sankar Mittapally <
 sankar.mittapa...@creditvidya.com> wrote:

> Hi Mich,
>
>  Yes, I am using standalone mode cluster, We have two executors with
> 10G memory each.  We have two workers.
>
> FYI..
>
>
>
> On Mon, Oct 24, 2016 at 5:22 PM, Mich Talebzadeh <
> mich.talebza...@gmail.com> wrote:
>
>> Sounds like you are running in standalone mode.
>>
>> Have you checked the UI on port 4040 (default) to see where memory is
>> going. Why do you need executor memory of 10GB?
>>
>> How many executors are running and plus how many slaves started?
>>
>> In standalone mode executors run on workers (UI 8080)
>>
>>
>> HTH
>>
>> Dr Mich Talebzadeh
>>
>>
>>
>> LinkedIn * 
>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>> *
>>
>>
>>
>> http://talebzadehmich.wordpress.com
>>
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility
>> for any loss, damage or destruction of data or any other property which 
>> may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>> On 24 October 2016 at 12:19, sankarmittapally <
>> 

Re: JAVA heap space issue

2016-10-24 Thread Sankar Mittapally
sc <- sparkR.session(master =
"spark://ip-172-31-6-116:7077",sparkConfig=list(spark.executor.memory="10g",
spark.app.name="Testing",spark.driver.memory="14g",spark.executor.extraJavaOption="-Xms2g
-Xmx5g -XX:-UseGCOverheadLimit",spark.driver.extraJavaOption="-Xms2g -Xmx5g
-XX:-UseGCOverheadLimit",spark.cores.max="2",spark.sql.autoBroadcastJoinThreshold="-1"))

On Mon, Oct 24, 2016 at 7:33 PM, Mich Talebzadeh 
wrote:

> OK so what is your full launch code now? I mean equivalent to spark-submit
>
>
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
>
>
> http://talebzadehmich.wordpress.com
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
> On 24 October 2016 at 14:57, Sankar Mittapally  creditvidya.com> wrote:
>
>> Hi Mich,
>>
>>  I am able to write the files to storage after adding extra parameter.
>>
>> FYI..
>>
>> This one I used.
>>
>> spark.sql.autoBroadcastJoinThreshold="-1"
>>
>>
>>
>> On Mon, Oct 24, 2016 at 7:22 PM, Mich Talebzadeh <
>> mich.talebza...@gmail.com> wrote:
>>
>>> Rather strange as you have plenty free memory there.
>>>
>>> Try reducing driver memory to 2GB and executer memory to 2GB and run it
>>> again
>>>
>>> ${SPARK_HOME}/bin/spark-submit \
>>>--driver-memory 2G \
>>> --num-executors 2 \
>>> --executor-cores 1 \
>>> --executor-memory 2G \
>>> --master spark://IPAddress:7077 \
>>>
>>> HTH
>>>
>>>
>>>
>>> Dr Mich Talebzadeh
>>>
>>>
>>>
>>> LinkedIn * 
>>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>> *
>>>
>>>
>>>
>>> http://talebzadehmich.wordpress.com
>>>
>>>
>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>> any loss, damage or destruction of data or any other property which may
>>> arise from relying on this email's technical content is explicitly
>>> disclaimed. The author will in no case be liable for any monetary damages
>>> arising from such loss, damage or destruction.
>>>
>>>
>>>
>>> On 24 October 2016 at 13:15, Sankar Mittapally <
>>> sankar.mittapa...@creditvidya.com> wrote:
>>>
 Hi Mich,

  Yes, I am using standalone mode cluster, We have two executors with
 10G memory each.  We have two workers.

 FYI..



 On Mon, Oct 24, 2016 at 5:22 PM, Mich Talebzadeh <
 mich.talebza...@gmail.com> wrote:

> Sounds like you are running in standalone mode.
>
> Have you checked the UI on port 4040 (default) to see where memory is
> going. Why do you need executor memory of 10GB?
>
> How many executors are running and plus how many slaves started?
>
> In standalone mode executors run on workers (UI 8080)
>
>
> HTH
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
>
>
> http://talebzadehmich.wordpress.com
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for
> any loss, damage or destruction of data or any other property which may
> arise from relying on this email's technical content is explicitly
> disclaimed. The author will in no case be liable for any monetary damages
> arising from such loss, damage or destruction.
>
>
>
> On 24 October 2016 at 12:19, sankarmittapally <
> sankar.mittapa...@creditvidya.com> wrote:
>
>> Hi,
>>
>>  I have a three node cluster with 30G of Memory. I am trying to
>> analyzing
>> the data of 200MB and running out of memory every time. This is the
>> command
>> I am using
>>
>> Driver Memory = 10G
>> Executor memory=10G
>>
>> sc <- sparkR.session(master =
>> "spark://ip-172-31-6-116:7077",sparkConfig=list(spark.execut
>> or.memory="10g",spark.app.name="Testing",spark.driver.memory
>> ="14g",spark.executor.extraJavaOption="-Xms2g
>> -Xmx5g -XX:MaxPermSize=1024M",spark.driver.extraJavaOption="-Xms2g
>> -Xmx5g
>> -XX:MaxPermSize=1024M",spark.cores.max="2"))
>>
>>
>> [D 16:43:51.437 NotebookApp] 200 GET
>> /api/contents?type=directory&_=1477289197671 (123.176.38.226) 7.96ms
>> Exception in thread "broadcast-exchange-0"
>> java.lang.OutOfMemoryError: Java
>> heap 

Re: pyspark doesn't recognize MMM dateFormat pattern in spark.read.load() for dates like 1989Dec31 and 31Dec1989

2016-10-24 Thread Pietro Pugni
Thank you, I’ll appreciate that. I have no experience with Python, Java and 
Spark, so I the question can be translated to: “How can I set JVM locale when 
using spark-submit and pyspark?”. Probably this is possible only by changing 
the system defaul locale and not within the Spark session, right?

Thank you
 Pietro

> Il giorno 24 ott 2016, alle ore 14:51, Hyukjin Kwon  ha 
> scritto:
> 
> I am also interested in this issue. I will try to look into this too within 
> coming few days..
> 
> 2016-10-24 21:32 GMT+09:00 Sean Owen  >:
> I actually think this is a general problem with usage of DateFormat and 
> SimpleDateFormat across the code, in that it relies on the default locale of 
> the JVM. I believe this needs to, at least, default consistently to Locale.US 
> so that behavior is consistent; otherwise it's possible that parsing and 
> formatting of dates could work subtly differently across environments.
> 
> There's a similar question about some code that formats dates for the UI. 
> It's more reasonable to let that use the platform-default locale, but, I'd 
> still favor standardizing it I think.
> 
> Anyway, let me test it out a bit and possibly open a JIRA with this change 
> for discussion.
> 
> On Mon, Oct 24, 2016 at 1:03 PM pietrop  > wrote:
> Hi there,
> I opened a question on StackOverflow at this link:
> http://stackoverflow.com/questions/40007972/pyspark-doesnt-recognize-mmm-dateformat-pattern-in-spark-read-load-for-dates?noredirect=1#comment67297930_40007972
>  
> 
> 
> I didn’t get any useful answer, so I’m writing here hoping that someone can
> help me.
> 
> In short, I’m trying to read a CSV containing data columns stored using the
> pattern “MMMdd”. What doesn’t work for me is “MMM”. I’ve done some
> testing and discovered that it’s a localization issue. As you can read from
> the StackOverflow question, I run a simple Java code to parse the date
> “1989Dec31” and it works only if I specify Locale.US in the
> SimpleDateFormat() function.
> 
> I would like pyspark to work. I tried setting a different local from console
> (LANG=“en_US”), but it doesn’t work. I tried also setting it using the
> locale package from Python.
> 
> So, there’s a way to set locale in Spark when using pyspark? The issue is
> Java related and not Python related (the function that parses data is
> invoked by spark.read.load(dateFormat=“MMMdd”, …). I don’t want to use
> other solutions in order to encode data because they are slower (from what
> I’ve seen so far).
> 
> Thank you
> Pietro
> 
> 
> 
> --
> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/pyspark-doesn-t-recognize-MMM-dateFormat-pattern-in-spark-read-load-for-dates-like-1989Dec31-and-31D9-tp27951.html
>  
> 
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
> 
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org 
> 
> 
> 



Re: JAVA heap space issue

2016-10-24 Thread Mich Talebzadeh
OK so what is your full launch code now? I mean equivalent to spark-submit



Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.



On 24 October 2016 at 14:57, Sankar Mittapally <
sankar.mittapa...@creditvidya.com> wrote:

> Hi Mich,
>
>  I am able to write the files to storage after adding extra parameter.
>
> FYI..
>
> This one I used.
>
> spark.sql.autoBroadcastJoinThreshold="-1"
>
>
>
> On Mon, Oct 24, 2016 at 7:22 PM, Mich Talebzadeh <
> mich.talebza...@gmail.com> wrote:
>
>> Rather strange as you have plenty free memory there.
>>
>> Try reducing driver memory to 2GB and executer memory to 2GB and run it
>> again
>>
>> ${SPARK_HOME}/bin/spark-submit \
>>--driver-memory 2G \
>> --num-executors 2 \
>> --executor-cores 1 \
>> --executor-memory 2G \
>> --master spark://IPAddress:7077 \
>>
>> HTH
>>
>>
>>
>> Dr Mich Talebzadeh
>>
>>
>>
>> LinkedIn * 
>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>> *
>>
>>
>>
>> http://talebzadehmich.wordpress.com
>>
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>> On 24 October 2016 at 13:15, Sankar Mittapally <
>> sankar.mittapa...@creditvidya.com> wrote:
>>
>>> Hi Mich,
>>>
>>>  Yes, I am using standalone mode cluster, We have two executors with 10G
>>> memory each.  We have two workers.
>>>
>>> FYI..
>>>
>>>
>>>
>>> On Mon, Oct 24, 2016 at 5:22 PM, Mich Talebzadeh <
>>> mich.talebza...@gmail.com> wrote:
>>>
 Sounds like you are running in standalone mode.

 Have you checked the UI on port 4040 (default) to see where memory is
 going. Why do you need executor memory of 10GB?

 How many executors are running and plus how many slaves started?

 In standalone mode executors run on workers (UI 8080)


 HTH

 Dr Mich Talebzadeh



 LinkedIn * 
 https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
 *



 http://talebzadehmich.wordpress.com


 *Disclaimer:* Use it at your own risk. Any and all responsibility for
 any loss, damage or destruction of data or any other property which may
 arise from relying on this email's technical content is explicitly
 disclaimed. The author will in no case be liable for any monetary damages
 arising from such loss, damage or destruction.



 On 24 October 2016 at 12:19, sankarmittapally <
 sankar.mittapa...@creditvidya.com> wrote:

> Hi,
>
>  I have a three node cluster with 30G of Memory. I am trying to
> analyzing
> the data of 200MB and running out of memory every time. This is the
> command
> I am using
>
> Driver Memory = 10G
> Executor memory=10G
>
> sc <- sparkR.session(master =
> "spark://ip-172-31-6-116:7077",sparkConfig=list(spark.execut
> or.memory="10g",spark.app.name="Testing",spark.driver.memory
> ="14g",spark.executor.extraJavaOption="-Xms2g
> -Xmx5g -XX:MaxPermSize=1024M",spark.driver.extraJavaOption="-Xms2g
> -Xmx5g
> -XX:MaxPermSize=1024M",spark.cores.max="2"))
>
>
> [D 16:43:51.437 NotebookApp] 200 GET
> /api/contents?type=directory&_=1477289197671 (123.176.38.226) 7.96ms
> Exception in thread "broadcast-exchange-0" java.lang.OutOfMemoryError:
> Java
> heap space
> at
> org.apache.spark.sql.execution.joins.LongToUnsafeRowMap.appe
> nd(HashedRelation.scala:539)
> at
> org.apache.spark.sql.execution.joins.LongHashedRelation$.app
> ly(HashedRelation.scala:803)
> at
> org.apache.spark.sql.execution.joins.HashedRelation$.apply(H
> ashedRelation.scala:105)
> at
> org.apache.spark.sql.execution.joins.HashedRelationBroadcast
> Mode.transform(HashedRelation.scala:816)
> at
> org.apache.spark.sql.execution.joins.HashedRelationBroadcast
> Mode.transform(HashedRelation.scala:812)

Re: JAVA heap space issue

2016-10-24 Thread Sankar Mittapally
Hi Mich,

 I am able to write the files to storage after adding extra parameter.

FYI..

This one I used.

spark.sql.autoBroadcastJoinThreshold="-1"



On Mon, Oct 24, 2016 at 7:22 PM, Mich Talebzadeh 
wrote:

> Rather strange as you have plenty free memory there.
>
> Try reducing driver memory to 2GB and executer memory to 2GB and run it
> again
>
> ${SPARK_HOME}/bin/spark-submit \
>--driver-memory 2G \
> --num-executors 2 \
> --executor-cores 1 \
> --executor-memory 2G \
> --master spark://IPAddress:7077 \
>
> HTH
>
>
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
>
>
> http://talebzadehmich.wordpress.com
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
> On 24 October 2016 at 13:15, Sankar Mittapally  creditvidya.com> wrote:
>
>> Hi Mich,
>>
>>  Yes, I am using standalone mode cluster, We have two executors with 10G
>> memory each.  We have two workers.
>>
>> FYI..
>>
>>
>>
>> On Mon, Oct 24, 2016 at 5:22 PM, Mich Talebzadeh <
>> mich.talebza...@gmail.com> wrote:
>>
>>> Sounds like you are running in standalone mode.
>>>
>>> Have you checked the UI on port 4040 (default) to see where memory is
>>> going. Why do you need executor memory of 10GB?
>>>
>>> How many executors are running and plus how many slaves started?
>>>
>>> In standalone mode executors run on workers (UI 8080)
>>>
>>>
>>> HTH
>>>
>>> Dr Mich Talebzadeh
>>>
>>>
>>>
>>> LinkedIn * 
>>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>> *
>>>
>>>
>>>
>>> http://talebzadehmich.wordpress.com
>>>
>>>
>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>> any loss, damage or destruction of data or any other property which may
>>> arise from relying on this email's technical content is explicitly
>>> disclaimed. The author will in no case be liable for any monetary damages
>>> arising from such loss, damage or destruction.
>>>
>>>
>>>
>>> On 24 October 2016 at 12:19, sankarmittapally <
>>> sankar.mittapa...@creditvidya.com> wrote:
>>>
 Hi,

  I have a three node cluster with 30G of Memory. I am trying to
 analyzing
 the data of 200MB and running out of memory every time. This is the
 command
 I am using

 Driver Memory = 10G
 Executor memory=10G

 sc <- sparkR.session(master =
 "spark://ip-172-31-6-116:7077",sparkConfig=list(spark.execut
 or.memory="10g",spark.app.name="Testing",spark.driver.memory
 ="14g",spark.executor.extraJavaOption="-Xms2g
 -Xmx5g -XX:MaxPermSize=1024M",spark.driver.extraJavaOption="-Xms2g
 -Xmx5g
 -XX:MaxPermSize=1024M",spark.cores.max="2"))


 [D 16:43:51.437 NotebookApp] 200 GET
 /api/contents?type=directory&_=1477289197671 (123.176.38.226) 7.96ms
 Exception in thread "broadcast-exchange-0" java.lang.OutOfMemoryError:
 Java
 heap space
 at
 org.apache.spark.sql.execution.joins.LongToUnsafeRowMap.appe
 nd(HashedRelation.scala:539)
 at
 org.apache.spark.sql.execution.joins.LongHashedRelation$.app
 ly(HashedRelation.scala:803)
 at
 org.apache.spark.sql.execution.joins.HashedRelation$.apply(H
 ashedRelation.scala:105)
 at
 org.apache.spark.sql.execution.joins.HashedRelationBroadcast
 Mode.transform(HashedRelation.scala:816)
 at
 org.apache.spark.sql.execution.joins.HashedRelationBroadcast
 Mode.transform(HashedRelation.scala:812)
 at
 org.apache.spark.sql.execution.exchange.BroadcastExchangeExe
 c$$anonfun$relationFuture$1$$anonfun$apply$1.apply(Broadcast
 ExchangeExec.
 scala:90)
 at
 org.apache.spark.sql.execution.exchange.BroadcastExchangeExe
 c$$anonfun$relationFuture$1$$anonfun$apply$1.apply(Broadcast
 ExchangeExec.
 scala:72)
 at
 org.apache.spark.sql.execution.SQLExecution$.withExecutionId
 (SQLExecution.scala:94)
 at
 org.apache.spark.sql.execution.exchange.BroadcastExchangeExe
 c$$anonfun$relationFuture$1.apply(BroadcastExchangeExec.scala:72)
 at
 org.apache.spark.sql.execution.exchange.BroadcastExchangeExe
 c$$anonfun$relationFuture$1.apply(BroadcastExchangeExec.scala:72)
 at
 scala.concurrent.impl.Future$PromiseCompletingRunnable.lifte
 dTree1$1(Future.scala:24)

Re: JAVA heap space issue

2016-10-24 Thread Mich Talebzadeh
Rather strange as you have plenty free memory there.

Try reducing driver memory to 2GB and executer memory to 2GB and run it
again

${SPARK_HOME}/bin/spark-submit \
   --driver-memory 2G \
--num-executors 2 \
--executor-cores 1 \
--executor-memory 2G \
--master spark://IPAddress:7077 \

HTH



Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.



On 24 October 2016 at 13:15, Sankar Mittapally <
sankar.mittapa...@creditvidya.com> wrote:

> Hi Mich,
>
>  Yes, I am using standalone mode cluster, We have two executors with 10G
> memory each.  We have two workers.
>
> FYI..
>
>
>
> On Mon, Oct 24, 2016 at 5:22 PM, Mich Talebzadeh <
> mich.talebza...@gmail.com> wrote:
>
>> Sounds like you are running in standalone mode.
>>
>> Have you checked the UI on port 4040 (default) to see where memory is
>> going. Why do you need executor memory of 10GB?
>>
>> How many executors are running and plus how many slaves started?
>>
>> In standalone mode executors run on workers (UI 8080)
>>
>>
>> HTH
>>
>> Dr Mich Talebzadeh
>>
>>
>>
>> LinkedIn * 
>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>> *
>>
>>
>>
>> http://talebzadehmich.wordpress.com
>>
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>> On 24 October 2016 at 12:19, sankarmittapally <
>> sankar.mittapa...@creditvidya.com> wrote:
>>
>>> Hi,
>>>
>>>  I have a three node cluster with 30G of Memory. I am trying to analyzing
>>> the data of 200MB and running out of memory every time. This is the
>>> command
>>> I am using
>>>
>>> Driver Memory = 10G
>>> Executor memory=10G
>>>
>>> sc <- sparkR.session(master =
>>> "spark://ip-172-31-6-116:7077",sparkConfig=list(spark.execut
>>> or.memory="10g",spark.app.name="Testing",spark.driver.memory
>>> ="14g",spark.executor.extraJavaOption="-Xms2g
>>> -Xmx5g -XX:MaxPermSize=1024M",spark.driver.extraJavaOption="-Xms2g
>>> -Xmx5g
>>> -XX:MaxPermSize=1024M",spark.cores.max="2"))
>>>
>>>
>>> [D 16:43:51.437 NotebookApp] 200 GET
>>> /api/contents?type=directory&_=1477289197671 (123.176.38.226) 7.96ms
>>> Exception in thread "broadcast-exchange-0" java.lang.OutOfMemoryError:
>>> Java
>>> heap space
>>> at
>>> org.apache.spark.sql.execution.joins.LongToUnsafeRowMap.appe
>>> nd(HashedRelation.scala:539)
>>> at
>>> org.apache.spark.sql.execution.joins.LongHashedRelation$.app
>>> ly(HashedRelation.scala:803)
>>> at
>>> org.apache.spark.sql.execution.joins.HashedRelation$.apply(H
>>> ashedRelation.scala:105)
>>> at
>>> org.apache.spark.sql.execution.joins.HashedRelationBroadcast
>>> Mode.transform(HashedRelation.scala:816)
>>> at
>>> org.apache.spark.sql.execution.joins.HashedRelationBroadcast
>>> Mode.transform(HashedRelation.scala:812)
>>> at
>>> org.apache.spark.sql.execution.exchange.BroadcastExchangeExe
>>> c$$anonfun$relationFuture$1$$anonfun$apply$1.apply(Broadcast
>>> ExchangeExec.
>>> scala:90)
>>> at
>>> org.apache.spark.sql.execution.exchange.BroadcastExchangeExe
>>> c$$anonfun$relationFuture$1$$anonfun$apply$1.apply(Broadcast
>>> ExchangeExec.
>>> scala:72)
>>> at
>>> org.apache.spark.sql.execution.SQLExecution$.withExecutionId
>>> (SQLExecution.scala:94)
>>> at
>>> org.apache.spark.sql.execution.exchange.BroadcastExchangeExe
>>> c$$anonfun$relationFuture$1.apply(BroadcastExchangeExec.scala:72)
>>> at
>>> org.apache.spark.sql.execution.exchange.BroadcastExchangeExe
>>> c$$anonfun$relationFuture$1.apply(BroadcastExchangeExec.scala:72)
>>> at
>>> scala.concurrent.impl.Future$PromiseCompletingRunnable.lifte
>>> dTree1$1(Future.scala:24)
>>> at
>>> scala.concurrent.impl.Future$PromiseCompletingRunnable.run(F
>>> uture.scala:24)
>>> at
>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPool
>>> Executor.java:1142)
>>> at
>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoo
>>> lExecutor.java:617)
>>> at java.lang.Thread.run(Thread.java:745)
>>>
>>>
>>>
>>>
>>> --
>>> View this message in 

Re: Spark Sql 2.0 throws null pointer exception

2016-10-24 Thread Selvam Raman
​Why i could not able to access sparksession instance within
foreachpartition(i have created sparksession instance within main fucntion).
spark.sql("select 1").count or any sql queries which return within
foreachpartition throws nullpointer exception.
please give me some idea if you have faced the problem earlier.

Thanks,
Selvam R​

On Mon, Oct 24, 2016 at 10:23 AM, Selvam Raman  wrote:

> Hi All,
>
> Please help me.
>
> I have 10 (tables data) parquet file in s3.
>
> I am reading and storing as Dataset then registered as temp table.
>
> One table driving whole flow so i am doing below.(When i am triggering
> query from
>
>
> Code Base:
>
> SparkSession spark = SparkSession.builder().appName("Test").getOrCreate();
>
> Dataset citationDF = spark.read().parquet("s3://...")
>
> ...
>
> ...
>
> citationDF.createOrReplaceTempView("citation");
>
> ...
>
> 
>
> cit_num.javaRDD().foreachPartition(new VoidFunction()
>
> {
>
>   /**
>
> *
>
> */
>
> private static final long serialVersionUID = 1L;
>
>
> @Override
>
>   public void call(Iterator iter)
>
>   {
>
> while (iter.hasNext())
>
> {
>
>   Row record=iter.next();
>
>   int citation_num=record.getInt(0);
>
>   String ci_query="select queries ";//(i can execute this
> query outside of foreach)
>
>   System.out.println("citation num:"+citation_num+" count:"+spark
> .sql(ci_query).count());
>
>   accum.add(1);
>
>   System.out.println("accumulator count:"+accum);
>
> }
>
>   }
>
> });
> ​Error:
>
> 16/10/24 09:08:12 WARN TaskSetManager: Lost task 1.0 in stage 30.0 (TID
> 83, ip-10-95-36-172.dev): java.lang.NullPointerException
>
> at org.apache.spark.sql.SparkSession.sessionState$
> lzycompute(SparkSession.scala:112)
>
> at org.apache.spark.sql.SparkSession.sessionState(SparkSession.scala:110)
>
> at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:582)
>
> at com.elsevier.datasearch.CitationTest$1.call(CitationTest.java:124)
>
> at com.elsevier.datasearch.CitationTest$1.call(CitationTest.java:1)
>
> at org.apache.spark.api.java.JavaRDDLike$$anonfun$
> foreachPartition$1.apply(JavaRDDLike.scala:218)
>
> at org.apache.spark.api.java.JavaRDDLike$$anonfun$
> foreachPartition$1.apply(JavaRDDLike.scala:218)
>
> at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$
> anonfun$apply$28.apply(RDD.scala:883)
>
> at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$
> anonfun$apply$28.apply(RDD.scala:883)
>
> at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(
> SparkContext.scala:1897)
>
> at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(
> SparkContext.scala:1897)
>
> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
>
> at org.apache.spark.scheduler.Task.run(Task.scala:85)
>
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
>
> at java.util.concurrent.ThreadPoolExecutor.runWorker(
> ThreadPoolExecutor.java:1142)
>
> at java.util.concurrent.ThreadPoolExecutor$Worker.run(
> ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
>
>
>
>
> 16/10/24 09:08:12 INFO YarnScheduler: Stage 30 was cancelled
>
> 16/10/24 09:08:12 INFO DAGScheduler: ResultStage 30 (foreachPartition at
> CitationTest.java:108) failed in 0.421 s
>
> 16/10/24 09:08:12 INFO DAGScheduler: Job 23 failed: foreachPartition at
> CitationTest.java:108, took 0.578050 s
>
> Exception in thread "main" org.apache.spark.SparkException: Job aborted
> due to stage failure: Task 6 in stage 30.0 failed 4 times, most recent
> failure: Lost task 6.3 in stage 30.0 (TID 99, ip-dev):
> java.lang.NullPointerException
>
> at org.apache.spark.sql.SparkSession.sessionState$
> lzycompute(SparkSession.scala:112)
>
> at org.apache.spark.sql.SparkSession.sessionState(SparkSession.scala:110)
>
> at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:582)
>
> at com.elsevier.datasearch.CitationTest$1.call(CitationTest.java:124)
>
> at com.elsevier.datasearch.CitationTest$1.call(CitationTest.java:1)
>
> at org.apache.spark.api.java.JavaRDDLike$$anonfun$
> foreachPartition$1.apply(JavaRDDLike.scala:218)
>
> at org.apache.spark.api.java.JavaRDDLike$$anonfun$
> foreachPartition$1.apply(JavaRDDLike.scala:218)
>
> at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$
> anonfun$apply$28.apply(RDD.scala:883)
>
> at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$
> anonfun$apply$28.apply(RDD.scala:883)
>
> at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(
> SparkContext.scala:1897)
>
> at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(
> SparkContext.scala:1897)
>
> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
>
> at org.apache.spark.scheduler.Task.run(Task.scala:85)
>
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
>
> at java.util.concurrent.ThreadPoolExecutor.runWorker(
> ThreadPoolExecutor.java:1142)
>
> at 

Shortest path with directed and weighted graphs

2016-10-24 Thread Brian Wilson
I have been looking at the ShortestPaths function inbuilt with Spark here 
.

Am I correct in saying there is no support for weighted graphs with this 
function? By that I mean that it assumes all edges carry a weight = 1

Many thanks

Brian 

why is that two stages in apache spark are computing same thing?

2016-10-24 Thread maitraythaker
I have a spark optimization query that I have posted on StackOverflow, any
guidance on this would be appreciated. 
Please follow the link below, I have explained the problem in depth here
with code. 
http://stackoverflow.com/questions/40192302/why-is-that-two-stages-in-apache-spark-are-computing-same-thing

  



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/why-is-that-two-stages-in-apache-spark-are-computing-same-thing-tp27953.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: pyspark doesn't recognize MMM dateFormat pattern in spark.read.load() for dates like 1989Dec31 and 31Dec1989

2016-10-24 Thread Hyukjin Kwon
I am also interested in this issue. I will try to look into this too within
coming few days..

2016-10-24 21:32 GMT+09:00 Sean Owen :

> I actually think this is a general problem with usage of DateFormat and
> SimpleDateFormat across the code, in that it relies on the default locale
> of the JVM. I believe this needs to, at least, default consistently to
> Locale.US so that behavior is consistent; otherwise it's possible that
> parsing and formatting of dates could work subtly differently across
> environments.
>
> There's a similar question about some code that formats dates for the UI.
> It's more reasonable to let that use the platform-default locale, but, I'd
> still favor standardizing it I think.
>
> Anyway, let me test it out a bit and possibly open a JIRA with this change
> for discussion.
>
> On Mon, Oct 24, 2016 at 1:03 PM pietrop  wrote:
>
> Hi there,
> I opened a question on StackOverflow at this link:
> http://stackoverflow.com/questions/40007972/pyspark-doesnt-recognize-mmm-
> dateformat-pattern-in-spark-read-load-for-dates?
> noredirect=1#comment67297930_40007972
>
> I didn’t get any useful answer, so I’m writing here hoping that someone can
> help me.
>
> In short, I’m trying to read a CSV containing data columns stored using the
> pattern “MMMdd”. What doesn’t work for me is “MMM”. I’ve done some
> testing and discovered that it’s a localization issue. As you can read from
> the StackOverflow question, I run a simple Java code to parse the date
> “1989Dec31” and it works only if I specify Locale.US in the
> SimpleDateFormat() function.
>
> I would like pyspark to work. I tried setting a different local from
> console
> (LANG=“en_US”), but it doesn’t work. I tried also setting it using the
> locale package from Python.
>
> So, there’s a way to set locale in Spark when using pyspark? The issue is
> Java related and not Python related (the function that parses data is
> invoked by spark.read.load(dateFormat=“MMMdd”, …). I don’t want to use
> other solutions in order to encode data because they are slower (from what
> I’ve seen so far).
>
> Thank you
> Pietro
>
>
>
> --
> View this message in context: http://apache-spark-user-list.
> 1001560.n3.nabble.com/pyspark-doesn-t-recognize-MMM-
> dateFormat-pattern-in-spark-read-load-for-dates-like-
> 1989Dec31-and-31D9-tp27951.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Spark 2.0 - DataFrames vs Dataset performance

2016-10-24 Thread Antoaneta Marinova
Hello,

I am using Spark 2.0 for performing filtering, grouping and counting
operations on events data saved in parquet files. As the events schema has
very nested structure I wanted to read them as scala beans to simplify the
code but I noticed a severe performance degradation. Below you can find
simple comparison of the same operation using DataFrame and Dataset.

val data = session.read.parquet("events_data/")

// Using Datasets with custom class

//Case class matching the events schema

case class CustomEvent(event_id: Option[String],

event_type: Option[String]
   context : Option[Context],

….
   time: Option[BigInt]) extends Serializable {}

scala> val start = System.currentTimeMillis ;

  val count = data.as[CustomEvent].filter(event =>
eventNames.contains(event.event_type.get)).count ;

 val time =  System.currentTimeMillis - start

count: Long = 5545

time: Long = 11262

// Using DataFrames

scala>

val start = System.currentTimeMillis ;

val count = data.filter(col("event_type").isin(eventNames:_*)).count ;

val time =  System.currentTimeMillis - start

count: Long = 5545

time: Long = 147

The schema of the events is something like this:

//events schma

schemaroot

|-- event_id: string (nullable = true)

|-- event_type: string (nullable = true)

|-- context: struct (nullable = true)

||-- environment_1: struct (nullable = true)

|||-- filed1: integer (nullable = true)

|||-- filed2: integer (nullable = true)

|||-- filed3: integer (nullable = true)

||-- environment_2: struct (nullable = true)

|||-- filed_1: string (nullable = true)



|||-- filed_n: string (nullable = true)

|-- metadata: struct (nullable = true)

||-- elements: array (nullable = true)

|||-- element: struct (containsNull = true)

||||-- config: string (nullable = true)

||||-- tree: array (nullable = true)

|||||-- element: struct (containsNull = true)

||||||-- path: array (nullable = true)

|||||||-- element: struct (containsNull = true)

||||||||-- key: string (nullable = true)

||||||||-- name: string (nullable = true)

||||||||-- level: integer (nullable = true)

|-- time: long (nullable = true)

Could you please advise me on the usage of the different abstractions and
help me understand why using datasets with user defined class is so much
slower.

Thank you,
Antoaneta


Re: pyspark doesn't recognize MMM dateFormat pattern in spark.read.load() for dates like 1989Dec31 and 31Dec1989

2016-10-24 Thread Sean Owen
I actually think this is a general problem with usage of DateFormat and
SimpleDateFormat across the code, in that it relies on the default locale
of the JVM. I believe this needs to, at least, default consistently to
Locale.US so that behavior is consistent; otherwise it's possible that
parsing and formatting of dates could work subtly differently across
environments.

There's a similar question about some code that formats dates for the UI.
It's more reasonable to let that use the platform-default locale, but, I'd
still favor standardizing it I think.

Anyway, let me test it out a bit and possibly open a JIRA with this change
for discussion.

On Mon, Oct 24, 2016 at 1:03 PM pietrop  wrote:

Hi there,
I opened a question on StackOverflow at this link:
http://stackoverflow.com/questions/40007972/pyspark-doesnt-recognize-mmm-dateformat-pattern-in-spark-read-load-for-dates?noredirect=1#comment67297930_40007972

I didn’t get any useful answer, so I’m writing here hoping that someone can
help me.

In short, I’m trying to read a CSV containing data columns stored using the
pattern “MMMdd”. What doesn’t work for me is “MMM”. I’ve done some
testing and discovered that it’s a localization issue. As you can read from
the StackOverflow question, I run a simple Java code to parse the date
“1989Dec31” and it works only if I specify Locale.US in the
SimpleDateFormat() function.

I would like pyspark to work. I tried setting a different local from console
(LANG=“en_US”), but it doesn’t work. I tried also setting it using the
locale package from Python.

So, there’s a way to set locale in Spark when using pyspark? The issue is
Java related and not Python related (the function that parses data is
invoked by spark.read.load(dateFormat=“MMMdd”, …). I don’t want to use
other solutions in order to encode data because they are slower (from what
I’ve seen so far).

Thank you
Pietro



--
View this message in context:
http://apache-spark-user-list.1001560.n3.nabble.com/pyspark-doesn-t-recognize-MMM-dateFormat-pattern-in-spark-read-load-for-dates-like-1989Dec31-and-31D9-tp27951.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org


Re: JAVA heap space issue

2016-10-24 Thread Sankar Mittapally
Hi Mich,

 Yes, I am using standalone mode cluster, We have two executors with 10G
memory each.  We have two workers.

FYI..



On Mon, Oct 24, 2016 at 5:22 PM, Mich Talebzadeh 
wrote:

> Sounds like you are running in standalone mode.
>
> Have you checked the UI on port 4040 (default) to see where memory is
> going. Why do you need executor memory of 10GB?
>
> How many executors are running and plus how many slaves started?
>
> In standalone mode executors run on workers (UI 8080)
>
>
> HTH
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
>
>
> http://talebzadehmich.wordpress.com
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
> On 24 October 2016 at 12:19, sankarmittapally  creditvidya.com> wrote:
>
>> Hi,
>>
>>  I have a three node cluster with 30G of Memory. I am trying to analyzing
>> the data of 200MB and running out of memory every time. This is the
>> command
>> I am using
>>
>> Driver Memory = 10G
>> Executor memory=10G
>>
>> sc <- sparkR.session(master =
>> "spark://ip-172-31-6-116:7077",sparkConfig=list(spark.execut
>> or.memory="10g",spark.app.name="Testing",spark.driver.
>> memory="14g",spark.executor.extraJavaOption="-Xms2g
>> -Xmx5g -XX:MaxPermSize=1024M",spark.driver.extraJavaOption="-Xms2g -Xmx5g
>> -XX:MaxPermSize=1024M",spark.cores.max="2"))
>>
>>
>> [D 16:43:51.437 NotebookApp] 200 GET
>> /api/contents?type=directory&_=1477289197671 (123.176.38.226) 7.96ms
>> Exception in thread "broadcast-exchange-0" java.lang.OutOfMemoryError:
>> Java
>> heap space
>> at
>> org.apache.spark.sql.execution.joins.LongToUnsafeRowMap.
>> append(HashedRelation.scala:539)
>> at
>> org.apache.spark.sql.execution.joins.LongHashedRelation$.
>> apply(HashedRelation.scala:803)
>> at
>> org.apache.spark.sql.execution.joins.HashedRelation$.apply(H
>> ashedRelation.scala:105)
>> at
>> org.apache.spark.sql.execution.joins.HashedRelationBroadcast
>> Mode.transform(HashedRelation.scala:816)
>> at
>> org.apache.spark.sql.execution.joins.HashedRelationBroadcast
>> Mode.transform(HashedRelation.scala:812)
>> at
>> org.apache.spark.sql.execution.exchange.BroadcastExchangeExe
>> c$$anonfun$relationFuture$1$$anonfun$apply$1.apply(BroadcastExchangeExec.
>> scala:90)
>> at
>> org.apache.spark.sql.execution.exchange.BroadcastExchangeExe
>> c$$anonfun$relationFuture$1$$anonfun$apply$1.apply(BroadcastExchangeExec.
>> scala:72)
>> at
>> org.apache.spark.sql.execution.SQLExecution$.withExecutionId
>> (SQLExecution.scala:94)
>> at
>> org.apache.spark.sql.execution.exchange.BroadcastExchangeExe
>> c$$anonfun$relationFuture$1.apply(BroadcastExchangeExec.scala:72)
>> at
>> org.apache.spark.sql.execution.exchange.BroadcastExchangeExe
>> c$$anonfun$relationFuture$1.apply(BroadcastExchangeExec.scala:72)
>> at
>> scala.concurrent.impl.Future$PromiseCompletingRunnable.lifte
>> dTree1$1(Future.scala:24)
>> at
>> scala.concurrent.impl.Future$PromiseCompletingRunnable.run(F
>> uture.scala:24)
>> at
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPool
>> Executor.java:1142)
>> at
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoo
>> lExecutor.java:617)
>> at java.lang.Thread.run(Thread.java:745)
>>
>>
>>
>>
>> --
>> View this message in context: http://apache-spark-user-list.
>> 1001560.n3.nabble.com/JAVA-heap-space-issue-tp27950.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> -
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>
>>
>

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org

pyspark doesn't recognize MMM dateFormat pattern in spark.read.load() for dates like 1989Dec31 and 31Dec1989

2016-10-24 Thread pietrop
Hi there,
I opened a question on StackOverflow at this link:
http://stackoverflow.com/questions/40007972/pyspark-doesnt-recognize-mmm-dateformat-pattern-in-spark-read-load-for-dates?noredirect=1#comment67297930_40007972

I didn’t get any useful answer, so I’m writing here hoping that someone can
help me.

In short, I’m trying to read a CSV containing data columns stored using the
pattern “MMMdd”. What doesn’t work for me is “MMM”. I’ve done some
testing and discovered that it’s a localization issue. As you can read from
the StackOverflow question, I run a simple Java code to parse the date
“1989Dec31” and it works only if I specify Locale.US in the
SimpleDateFormat() function.

I would like pyspark to work. I tried setting a different local from console
(LANG=“en_US”), but it doesn’t work. I tried also setting it using the
locale package from Python.

So, there’s a way to set locale in Spark when using pyspark? The issue is
Java related and not Python related (the function that parses data is
invoked by spark.read.load(dateFormat=“MMMdd”, …). I don’t want to use
other solutions in order to encode data because they are slower (from what
I’ve seen so far).

Thank you
Pietro



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/pyspark-doesn-t-recognize-MMM-dateFormat-pattern-in-spark-read-load-for-dates-like-1989Dec31-and-31D9-tp27951.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: JAVA heap space issue

2016-10-24 Thread Mich Talebzadeh
Sounds like you are running in standalone mode.

Have you checked the UI on port 4040 (default) to see where memory is
going. Why do you need executor memory of 10GB?

How many executors are running and plus how many slaves started?

In standalone mode executors run on workers (UI 8080)


HTH

Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.



On 24 October 2016 at 12:19, sankarmittapally <
sankar.mittapa...@creditvidya.com> wrote:

> Hi,
>
>  I have a three node cluster with 30G of Memory. I am trying to analyzing
> the data of 200MB and running out of memory every time. This is the command
> I am using
>
> Driver Memory = 10G
> Executor memory=10G
>
> sc <- sparkR.session(master =
> "spark://ip-172-31-6-116:7077",sparkConfig=list(spark.
> executor.memory="10g",spark.app.name="Testing",spark.
> driver.memory="14g",spark.executor.extraJavaOption="-Xms2g
> -Xmx5g -XX:MaxPermSize=1024M",spark.driver.extraJavaOption="-Xms2g -Xmx5g
> -XX:MaxPermSize=1024M",spark.cores.max="2"))
>
>
> [D 16:43:51.437 NotebookApp] 200 GET
> /api/contents?type=directory&_=1477289197671 (123.176.38.226) 7.96ms
> Exception in thread "broadcast-exchange-0" java.lang.OutOfMemoryError: Java
> heap space
> at
> org.apache.spark.sql.execution.joins.LongToUnsafeRowMap.append(
> HashedRelation.scala:539)
> at
> org.apache.spark.sql.execution.joins.LongHashedRelation$.apply(
> HashedRelation.scala:803)
> at
> org.apache.spark.sql.execution.joins.HashedRelation$.apply(
> HashedRelation.scala:105)
> at
> org.apache.spark.sql.execution.joins.HashedRelationBroadcastMode.
> transform(HashedRelation.scala:816)
> at
> org.apache.spark.sql.execution.joins.HashedRelationBroadcastMode.
> transform(HashedRelation.scala:812)
> at
> org.apache.spark.sql.execution.exchange.BroadcastExchangeExec$$
> anonfun$relationFuture$1$$anonfun$apply$1.apply(BroadcastExchangeExec.
> scala:90)
> at
> org.apache.spark.sql.execution.exchange.BroadcastExchangeExec$$
> anonfun$relationFuture$1$$anonfun$apply$1.apply(BroadcastExchangeExec.
> scala:72)
> at
> org.apache.spark.sql.execution.SQLExecution$.withExecutionId(SQLExecution.
> scala:94)
> at
> org.apache.spark.sql.execution.exchange.BroadcastExchangeExec$$
> anonfun$relationFuture$1.apply(BroadcastExchangeExec.scala:72)
> at
> org.apache.spark.sql.execution.exchange.BroadcastExchangeExec$$
> anonfun$relationFuture$1.apply(BroadcastExchangeExec.scala:72)
> at
> scala.concurrent.impl.Future$PromiseCompletingRunnable.
> liftedTree1$1(Future.scala:24)
> at
> scala.concurrent.impl.Future$PromiseCompletingRunnable.run(
> Future.scala:24)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(
> ThreadPoolExecutor.java:1142)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(
> ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
>
>
>
>
> --
> View this message in context: http://apache-spark-user-list.
> 1001560.n3.nabble.com/JAVA-heap-space-issue-tp27950.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


JAVA heap space issue

2016-10-24 Thread sankarmittapally
Hi,

 I have a three node cluster with 30G of Memory. I am trying to analyzing
the data of 200MB and running out of memory every time. This is the command
I am using 

Driver Memory = 10G
Executor memory=10G

sc <- sparkR.session(master =
"spark://ip-172-31-6-116:7077",sparkConfig=list(spark.executor.memory="10g",spark.app.name="Testing",spark.driver.memory="14g",spark.executor.extraJavaOption="-Xms2g
-Xmx5g -XX:MaxPermSize=1024M",spark.driver.extraJavaOption="-Xms2g -Xmx5g
-XX:MaxPermSize=1024M",spark.cores.max="2"))


[D 16:43:51.437 NotebookApp] 200 GET
/api/contents?type=directory&_=1477289197671 (123.176.38.226) 7.96ms
   
Exception in thread "broadcast-exchange-0" java.lang.OutOfMemoryError: Java
heap space  
at
org.apache.spark.sql.execution.joins.LongToUnsafeRowMap.append(HashedRelation.scala:539)
 
at
org.apache.spark.sql.execution.joins.LongHashedRelation$.apply(HashedRelation.scala:803)
 
at
org.apache.spark.sql.execution.joins.HashedRelation$.apply(HashedRelation.scala:105)
 
at
org.apache.spark.sql.execution.joins.HashedRelationBroadcastMode.transform(HashedRelation.scala:816)
 
at
org.apache.spark.sql.execution.joins.HashedRelationBroadcastMode.transform(HashedRelation.scala:812)
 
at
org.apache.spark.sql.execution.exchange.BroadcastExchangeExec$$anonfun$relationFuture$1$$anonfun$apply$1.apply(BroadcastExchangeExec.
scala:90)   

at
org.apache.spark.sql.execution.exchange.BroadcastExchangeExec$$anonfun$relationFuture$1$$anonfun$apply$1.apply(BroadcastExchangeExec.
scala:72)   

at
org.apache.spark.sql.execution.SQLExecution$.withExecutionId(SQLExecution.scala:94)
  
at
org.apache.spark.sql.execution.exchange.BroadcastExchangeExec$$anonfun$relationFuture$1.apply(BroadcastExchangeExec.scala:72)

at
org.apache.spark.sql.execution.exchange.BroadcastExchangeExec$$anonfun$relationFuture$1.apply(BroadcastExchangeExec.scala:72)

at
scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)

at
scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24) 
 
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
 
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
 
at java.lang.Thread.run(Thread.java:745)






--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/JAVA-heap-space-issue-tp27950.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Issues with reading gz files with Spark Streaming

2016-10-24 Thread Steve Loughran

On 22 Oct 2016, at 20:58, Nkechi Achara 
> wrote:

I do not use rename, and the files are written to, and then moved to a 
directory on HDFS in gz format.

in that case there's nothing obvious to mee.

try logging at trace/debug the class:
org.apache.spark.sql.execution.streaming.FileStreamSource


On 22 October 2016 at 15:14, Steve Loughran 
> wrote:

> On 21 Oct 2016, at 15:53, Nkechi Achara 
> > wrote:
>
> Hi,
>
> I am using Spark 1.5.0 to read gz files with textFileStream, but when new 
> files are dropped in the specified directory. I know this is only the case 
> with gz files as when i extract the file into the directory specified the 
> files are read on the next window and processed.
>
> My code is here:
>
> val comments = ssc.fileStream[LongWritable, Text, 
> TextInputFormat]("file:///tmp/", (f: Path) => true, newFilesOnly=false).
>   map(pair => pair._2.toString)
> comments.foreachRDD(i => i.foreach(m=> println(m)))
>
> any idea why the gz files are not being recognized.
>
> Thanks in advance,
>
> K

Are the files being written in the directory or renamed in? As you should be 
using rename() against a filesystem (not an object store) to make sure that the 
file isn't picked up




Spark Sql 2.0 throws null pointer exception

2016-10-24 Thread Selvam Raman
Hi All,

Please help me.

I have 10 (tables data) parquet file in s3.

I am reading and storing as Dataset then registered as temp table.

One table driving whole flow so i am doing below.(When i am triggering
query from


Code Base:

SparkSession spark = SparkSession.builder().appName("Test").getOrCreate();

Dataset citationDF = spark.read().parquet("s3://...")

...

...

citationDF.createOrReplaceTempView("citation");

...



cit_num.javaRDD().foreachPartition(new VoidFunction()

{

  /**

*

*/

private static final long serialVersionUID = 1L;


@Override

  public void call(Iterator iter)

  {

while (iter.hasNext())

{

  Row record=iter.next();

  int citation_num=record.getInt(0);

  String ci_query="select queries ";//(i can execute this query
outside of foreach)

  System.out.println("citation num:"+citation_num+" count:"+spark
.sql(ci_query).count());

  accum.add(1);

  System.out.println("accumulator count:"+accum);

}

  }

});
​Error:

16/10/24 09:08:12 WARN TaskSetManager: Lost task 1.0 in stage 30.0 (TID 83,
ip-10-95-36-172.dev): java.lang.NullPointerException

at
org.apache.spark.sql.SparkSession.sessionState$lzycompute(SparkSession.scala:112)

at org.apache.spark.sql.SparkSession.sessionState(SparkSession.scala:110)

at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:582)

at com.elsevier.datasearch.CitationTest$1.call(CitationTest.java:124)

at com.elsevier.datasearch.CitationTest$1.call(CitationTest.java:1)

at
org.apache.spark.api.java.JavaRDDLike$$anonfun$foreachPartition$1.apply(JavaRDDLike.scala:218)

at
org.apache.spark.api.java.JavaRDDLike$$anonfun$foreachPartition$1.apply(JavaRDDLike.scala:218)

at
org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$28.apply(RDD.scala:883)

at
org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$28.apply(RDD.scala:883)

at
org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1897)

at
org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1897)

at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)

at org.apache.spark.scheduler.Task.run(Task.scala:85)

at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)

at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)

at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)




16/10/24 09:08:12 INFO YarnScheduler: Stage 30 was cancelled

16/10/24 09:08:12 INFO DAGScheduler: ResultStage 30 (foreachPartition at
CitationTest.java:108) failed in 0.421 s

16/10/24 09:08:12 INFO DAGScheduler: Job 23 failed: foreachPartition at
CitationTest.java:108, took 0.578050 s

Exception in thread "main" org.apache.spark.SparkException: Job aborted due
to stage failure: Task 6 in stage 30.0 failed 4 times, most recent failure:
Lost task 6.3 in stage 30.0 (TID 99, ip-dev): java.lang.NullPointerException

at
org.apache.spark.sql.SparkSession.sessionState$lzycompute(SparkSession.scala:112)

at org.apache.spark.sql.SparkSession.sessionState(SparkSession.scala:110)

at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:582)

at com.elsevier.datasearch.CitationTest$1.call(CitationTest.java:124)

at com.elsevier.datasearch.CitationTest$1.call(CitationTest.java:1)

at
org.apache.spark.api.java.JavaRDDLike$$anonfun$foreachPartition$1.apply(JavaRDDLike.scala:218)

at
org.apache.spark.api.java.JavaRDDLike$$anonfun$foreachPartition$1.apply(JavaRDDLike.scala:218)

at
org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$28.apply(RDD.scala:883)

at
org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$28.apply(RDD.scala:883)

at
org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1897)

at
org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1897)

at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)

at org.apache.spark.scheduler.Task.run(Task.scala:85)

at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)

at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)

at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)

at java.lang.Thread.run(Thread.java:745)


Driver stacktrace:

at org.apache.spark.scheduler.DAGScheduler.org
$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1450)

at
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1438)

at
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1437)

at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)

at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)

at
org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1437)

at

Using a Custom Data Store with Spark 2.0

2016-10-24 Thread Sachith Withana
Hi all,

I have a requirement to integrate a custom data store to be used with Spark
( v2.0.1). It consists of structured data in tables along with the schemas.

Then I want to run SparkSQL queries on the data and provide the data back
to the data service.

I'm wondering what would be the best way to do this.
Is it going to be extending the DataFrame and use the data store I have
wrapped as DataFrames or  extending the DataFrameReader.

It would be ideal if I can do minimal changes to the spark code and write
something like a external client to submit jobs using my data to a Spark
cluster.

I'm completely new to the Spark world. Any help would be much appreciated.

-- 
Thanks,
Sachith Withana


spark streaming with kinesis

2016-10-24 Thread Shushant Arora
Does spark streaming consumer for kinesis uses Kinesis Client Library  and
mandates to checkpoint the sequence number of shards in dynamo db.

Will it lead to dataloss if consumed datarecords are not yet processed and
kinesis checkpointed the consumed sequenece numbers in dynamo db and spark
worker crashes - then spark launched the worker on another node but start
consuming from dynamo db's checkpointed sequence number which is ahead of
processed sequenece number .

is there a way to checkpoint the sequenece numbers ourselves in Kinesis as
it is in Kafka low level consumer ?

Thanks


Re: LIMIT issue of SparkSQL

2016-10-24 Thread Mich Talebzadeh
This is an interesting point.

As far as I know in any database (practically all RDBMS Oracle, SAP etc),
the LIMIT affects the collection part of the result set.

The result set is carried out fully on the query that may involve multiple
joins on multiple underlying tables.

To limit the actual query by LIMIT on each underlying table does not make
sense and will not be industry standard AFAK.

HTH

Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.



On 24 October 2016 at 06:48, Michael Armbrust 
wrote:

> - dev + user
>
> Can you give more info about the query?  Maybe a full explain()?  Are you
> using a datasource like JDBC?  The API does not currently push down limits,
> but the documentation talks about how you can use a query instead of a
> table if that is what you are looking to do.
>
> On Mon, Oct 24, 2016 at 5:40 AM, Liz Bai  wrote:
>
>> Hi all,
>>
>> Let me clarify the problem:
>>
>> Suppose we have a simple table `A` with 100 000 000 records
>>
>> Problem:
>> When we execute sql query ‘select * from A Limit 500`,
>> It scan through all 100 000 000 records.
>> Normal behaviour should be that once 500 records is found, engine stop
>> scanning.
>>
>> Detailed observation:
>> We found that there are “GlobalLimit / LocalLimit” physical operators
>> https://github.com/apache/spark/blob/branch-2.0/sql/core/
>> src/main/scala/org/apache/spark/sql/execution/limit.scala
>> But during query plan generation, GlobalLimit / LocalLimit is not applied
>> to the query plan.
>>
>> Could you please help us to inspect LIMIT problem?
>> Thanks.
>>
>> Best,
>> Liz
>>
>> On 23 Oct 2016, at 10:11 PM, Xiao Li  wrote:
>>
>> Hi, Liz,
>>
>> CollectLimit means `Take the first `limit` elements and collect them to a
>> single partition.`
>>
>> Thanks,
>>
>> Xiao
>>
>> 2016-10-23 5:21 GMT-07:00 Ran Bai :
>>
>>> Hi all,
>>>
>>> I found the runtime for query with or without “LIMIT” keyword is the
>>> same. We looked into it and found actually there is “GlobalLimit /
>>> LocalLimit” in logical plan, however no relevant physical plan there. Is
>>> this a bug or something else? Attached are the logical and physical plans
>>> when running "SELECT * FROM seq LIMIT 1".
>>>
>>>
>>> More specifically, We expected a early stop upon getting adequate
>>> results.
>>> Thanks so much.
>>>
>>> Best,
>>> Liz
>>>
>>>
>>>
>>>
>>> -
>>> To unsubscribe e-mail: dev-unsubscr...@spark.apache.org
>>>
>>
>>
>>
>