Re: Different execution results with wholestage codegen on and off

2020-05-27 Thread Xiao Li
Thanks for reporting it. Please open a JIRA with a test case.

Cheers,

Xiao

On Wed, May 27, 2020 at 1:42 PM Pasha Finkelshteyn <
pavel.finkelsht...@gmail.com> wrote:

> Hi folks,
>
> I'm implementing Kotlin bindings for Spark and faced strange problem. In
> one cornercase Spark works differently when wholestage codegen is on or
> off.
>
> Does it look like bug ot expected behavior?
> --
> Regards,
> Pasha
>
> Big Data Tools @ JetBrains
>


-- 



Re: Spark dataframe hdfs vs s3

2020-05-27 Thread Dark Crusader
Hi Randy,

Yes, I'm using parquet on both S3 and hdfs.

On Thu, 28 May, 2020, 2:38 am randy clinton,  wrote:

> Is the file Parquet on S3 or is it some other file format?
>
> In general I would assume that HDFS read/writes are more performant for
> spark jobs.
>
> For instance, consider how well partitioned your HDFS file is vs the S3
> file.
>
> On Wed, May 27, 2020 at 1:51 PM Dark Crusader <
> relinquisheddra...@gmail.com> wrote:
>
>> Hi Jörn,
>>
>> Thanks for the reply. I will try to create a easier example to reproduce
>> the issue.
>>
>> I will also try your suggestion to look into the UI. Can you guide on
>> what I should be looking for?
>>
>> I was already using the s3a protocol to compare the times.
>>
>> My hunch is that multiple reads from S3 are required because of improper
>> caching of intermediate data. And maybe hdfs is doing a better job at this.
>> Does this make sense?
>>
>> I would also like to add that we built an extra layer on S3 which might
>> be adding to even slower times.
>>
>> Thanks for your help.
>>
>> On Wed, 27 May, 2020, 11:03 pm Jörn Franke,  wrote:
>>
>>> Have you looked in Spark UI why this is the case ?
>>> S3 Reading can take more time - it depends also what s3 url you are
>>> using : s3a vs s3n vs S3.
>>>
>>> It could help after some calculation to persist in-memory or on HDFS.
>>> You can also initially load from S3 and store on HDFS and work from there .
>>>
>>> HDFS offers Data locality for the tasks, ie the tasks start on the nodes
>>> where the data is. Depending on what s3 „protocol“ you are using you might
>>> be also more punished with performance.
>>>
>>> Try s3a as a protocol (replace all s3n with s3a).
>>>
>>> You can also use s3 url but this requires a special bucket
>>> configuration, a dedicated empty bucket and it lacks some ineroperability
>>> with other AWS services.
>>>
>>> Nevertheless, it could be also something else with the code. Can you
>>> post an example reproducing the issue?
>>>
>>> > Am 27.05.2020 um 18:18 schrieb Dark Crusader <
>>> relinquisheddra...@gmail.com>:
>>> >
>>> > 
>>> > Hi all,
>>> >
>>> > I am reading data from hdfs in the form of parquet files (around 3 GB)
>>> and running an algorithm from the spark ml library.
>>> >
>>> > If I create the same spark dataframe by reading data from S3, the same
>>> algorithm takes considerably more time.
>>> >
>>> > I don't understand why this is happening. Is this a chance occurence
>>> or are the spark dataframes created different?
>>> >
>>> > I don't understand how the data store would effect the algorithm
>>> performance.
>>> >
>>> > Any help would be appreciated. Thanks a lot.
>>>
>>
>
> --
> I appreciate your time,
>
> ~Randy
>


Re: Using Spark Accumulators with Structured Streaming

2020-05-27 Thread Something Something
Yes, that's exactly how I am creating them.

Question... Are you using 'Stateful Structured Streaming' in which you've
something like this?

.mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())(
updateAcrossEvents
  )

And updating the Accumulator inside 'updateAcrossEvents'? We're
experiencing this only under 'Stateful Structured Streaming'. In other
streaming applications it works as expected.



On Wed, May 27, 2020 at 9:01 AM Srinivas V  wrote:

> Yes, I am talking about Application specific Accumulators. Actually I am
> getting the values printed in my driver log as well as sent to Grafana. Not
> sure where and when I saw 0 before. My deploy mode is “client” on a yarn
> cluster(not local Mac) where I submit from master node. It should work the
> same for cluster mode as well.
> Create accumulators like this:
> AccumulatorV2 accumulator = sparkContext.longAccumulator(name);
>
>
> On Tue, May 26, 2020 at 8:42 PM Something Something <
> mailinglist...@gmail.com> wrote:
>
>> Hmm... how would they go to Graphana if they are not getting computed in
>> your code? I am talking about the Application Specific Accumulators. The
>> other standard counters such as 'event.progress.inputRowsPerSecond' are
>> getting populated correctly!
>>
>> On Mon, May 25, 2020 at 8:39 PM Srinivas V  wrote:
>>
>>> Hello,
>>> Even for me it comes as 0 when I print in OnQueryProgress. I use
>>> LongAccumulator as well. Yes, it prints on my local but not on cluster.
>>> But one consolation is that when I send metrics to Graphana, the values
>>> are coming there.
>>>
>>> On Tue, May 26, 2020 at 3:10 AM Something Something <
>>> mailinglist...@gmail.com> wrote:
>>>
 No this is not working even if I use LongAccumulator.

 On Fri, May 15, 2020 at 9:54 PM ZHANG Wei  wrote:

> There is a restriction in AccumulatorV2 API [1], the OUT type should
> be atomic or thread safe. I'm wondering if the implementation for
> `java.util.Map[T, Long]` can meet it or not. Is there any chance to 
> replace
> CollectionLongAccumulator by CollectionAccumulator[2] or 
> LongAccumulator[3]
> and test if the StreamingListener and other codes are able to work?
>
> ---
> Cheers,
> -z
> [1]
> http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.util.AccumulatorV2
> [2]
> http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.util.CollectionAccumulator
> [3]
> http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.util.LongAccumulator
>
> 
> From: Something Something 
> Sent: Saturday, May 16, 2020 0:38
> To: spark-user
> Subject: Re: Using Spark Accumulators with Structured Streaming
>
> Can someone from Spark Development team tell me if this functionality
> is supported and tested? I've spent a lot of time on this but can't get it
> to work. Just to add more context, we've our own Accumulator class that
> extends from AccumulatorV2. In this class we keep track of one or more
> accumulators. Here's the definition:
>
>
> class CollectionLongAccumulator[T]
> extends AccumulatorV2[T, java.util.Map[T, Long]]
>
> When the job begins we register an instance of this class:
>
> spark.sparkContext.register(myAccumulator, "MyAccumulator")
>
> Is this working under Structured Streaming?
>
> I will keep looking for alternate approaches but any help would be
> greatly appreciated. Thanks.
>
>
>
> On Thu, May 14, 2020 at 2:36 PM Something Something <
> mailinglist...@gmail.com> wrote:
>
> In my structured streaming job I am updating Spark Accumulators in the
> updateAcrossEvents method but they are always 0 when I try to print them 
> in
> my StreamingListener. Here's the code:
>
> .mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())(
> updateAcrossEvents
>   )
>
>
> The accumulators get incremented in 'updateAcrossEvents'. I've a
> StreamingListener which writes values of the accumulators in
> 'onQueryProgress' method but in this method the Accumulators are ALWAYS
> ZERO!
>
> When I added log statements in the updateAcrossEvents, I could see
> that these accumulators are getting incremented as expected.
>
> This only happens when I run in the 'Cluster' mode. In Local mode it
> works fine which implies that the Accumulators are not getting distributed
> correctly - or something like that!
>
> Note: I've seen quite a few answers on the Web that tell me to perform
> an "Action". That's not a solution here. This is a 'Stateful Structured
> Streaming' job. Yes, I am also 'registering' them in SparkContext.
>
>
>
>


Re: Spark dataframe hdfs vs s3

2020-05-27 Thread randy clinton
Is the file Parquet on S3 or is it some other file format?

In general I would assume that HDFS read/writes are more performant for
spark jobs.

For instance, consider how well partitioned your HDFS file is vs the S3
file.

On Wed, May 27, 2020 at 1:51 PM Dark Crusader 
wrote:

> Hi Jörn,
>
> Thanks for the reply. I will try to create a easier example to reproduce
> the issue.
>
> I will also try your suggestion to look into the UI. Can you guide on what
> I should be looking for?
>
> I was already using the s3a protocol to compare the times.
>
> My hunch is that multiple reads from S3 are required because of improper
> caching of intermediate data. And maybe hdfs is doing a better job at this.
> Does this make sense?
>
> I would also like to add that we built an extra layer on S3 which might be
> adding to even slower times.
>
> Thanks for your help.
>
> On Wed, 27 May, 2020, 11:03 pm Jörn Franke,  wrote:
>
>> Have you looked in Spark UI why this is the case ?
>> S3 Reading can take more time - it depends also what s3 url you are using
>> : s3a vs s3n vs S3.
>>
>> It could help after some calculation to persist in-memory or on HDFS. You
>> can also initially load from S3 and store on HDFS and work from there .
>>
>> HDFS offers Data locality for the tasks, ie the tasks start on the nodes
>> where the data is. Depending on what s3 „protocol“ you are using you might
>> be also more punished with performance.
>>
>> Try s3a as a protocol (replace all s3n with s3a).
>>
>> You can also use s3 url but this requires a special bucket configuration,
>> a dedicated empty bucket and it lacks some ineroperability with other AWS
>> services.
>>
>> Nevertheless, it could be also something else with the code. Can you post
>> an example reproducing the issue?
>>
>> > Am 27.05.2020 um 18:18 schrieb Dark Crusader <
>> relinquisheddra...@gmail.com>:
>> >
>> > 
>> > Hi all,
>> >
>> > I am reading data from hdfs in the form of parquet files (around 3 GB)
>> and running an algorithm from the spark ml library.
>> >
>> > If I create the same spark dataframe by reading data from S3, the same
>> algorithm takes considerably more time.
>> >
>> > I don't understand why this is happening. Is this a chance occurence or
>> are the spark dataframes created different?
>> >
>> > I don't understand how the data store would effect the algorithm
>> performance.
>> >
>> > Any help would be appreciated. Thanks a lot.
>>
>

-- 
I appreciate your time,

~Randy


Different execution results with wholestage codegen on and off

2020-05-27 Thread Pasha Finkelshteyn
Hi folks,

I'm implementing Kotlin bindings for Spark and faced strange problem. In
one cornercase Spark works differently when wholestage codegen is on or
off.

Does it look like bug ot expected behavior?
-- 
Regards,
Pasha

Big Data Tools @ JetBrains


signature.asc
Description: PGP signature


Re: Spark dataframe hdfs vs s3

2020-05-27 Thread Dark Crusader
Hi Jörn,

Thanks for the reply. I will try to create a easier example to reproduce
the issue.

I will also try your suggestion to look into the UI. Can you guide on what
I should be looking for?

I was already using the s3a protocol to compare the times.

My hunch is that multiple reads from S3 are required because of improper
caching of intermediate data. And maybe hdfs is doing a better job at this.
Does this make sense?

I would also like to add that we built an extra layer on S3 which might be
adding to even slower times.

Thanks for your help.

On Wed, 27 May, 2020, 11:03 pm Jörn Franke,  wrote:

> Have you looked in Spark UI why this is the case ?
> S3 Reading can take more time - it depends also what s3 url you are using
> : s3a vs s3n vs S3.
>
> It could help after some calculation to persist in-memory or on HDFS. You
> can also initially load from S3 and store on HDFS and work from there .
>
> HDFS offers Data locality for the tasks, ie the tasks start on the nodes
> where the data is. Depending on what s3 „protocol“ you are using you might
> be also more punished with performance.
>
> Try s3a as a protocol (replace all s3n with s3a).
>
> You can also use s3 url but this requires a special bucket configuration,
> a dedicated empty bucket and it lacks some ineroperability with other AWS
> services.
>
> Nevertheless, it could be also something else with the code. Can you post
> an example reproducing the issue?
>
> > Am 27.05.2020 um 18:18 schrieb Dark Crusader <
> relinquisheddra...@gmail.com>:
> >
> > 
> > Hi all,
> >
> > I am reading data from hdfs in the form of parquet files (around 3 GB)
> and running an algorithm from the spark ml library.
> >
> > If I create the same spark dataframe by reading data from S3, the same
> algorithm takes considerably more time.
> >
> > I don't understand why this is happening. Is this a chance occurence or
> are the spark dataframes created different?
> >
> > I don't understand how the data store would effect the algorithm
> performance.
> >
> > Any help would be appreciated. Thanks a lot.
>


Re: Spark dataframe hdfs vs s3

2020-05-27 Thread Jörn Franke
Have you looked in Spark UI why this is the case ? 
S3 Reading can take more time - it depends also what s3 url you are using : s3a 
vs s3n vs S3.

It could help after some calculation to persist in-memory or on HDFS. You can 
also initially load from S3 and store on HDFS and work from there . 

HDFS offers Data locality for the tasks, ie the tasks start on the nodes where 
the data is. Depending on what s3 „protocol“ you are using you might be also 
more punished with performance.

Try s3a as a protocol (replace all s3n with s3a).

You can also use s3 url but this requires a special bucket configuration, a 
dedicated empty bucket and it lacks some ineroperability with other AWS 
services.

Nevertheless, it could be also something else with the code. Can you post an 
example reproducing the issue?

> Am 27.05.2020 um 18:18 schrieb Dark Crusader :
> 
> 
> Hi all,
> 
> I am reading data from hdfs in the form of parquet files (around 3 GB) and 
> running an algorithm from the spark ml library.
> 
> If I create the same spark dataframe by reading data from S3, the same 
> algorithm takes considerably more time.
> 
> I don't understand why this is happening. Is this a chance occurence or are 
> the spark dataframes created different? 
> 
> I don't understand how the data store would effect the algorithm performance.
> 
> Any help would be appreciated. Thanks a lot.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Spark dataframe hdfs vs s3

2020-05-27 Thread Dark Crusader
Hi all,

I am reading data from hdfs in the form of parquet files (around 3 GB) and
running an algorithm from the spark ml library.

If I create the same spark dataframe by reading data from S3, the same
algorithm takes considerably more time.

I don't understand why this is happening. Is this a chance occurence or are
the spark dataframes created different?

I don't understand how the data store would effect the algorithm
performance.

Any help would be appreciated. Thanks a lot.


Re: Using Spark Accumulators with Structured Streaming

2020-05-27 Thread Srinivas V
Yes, I am talking about Application specific Accumulators. Actually I am
getting the values printed in my driver log as well as sent to Grafana. Not
sure where and when I saw 0 before. My deploy mode is “client” on a yarn
cluster(not local Mac) where I submit from master node. It should work the
same for cluster mode as well.
Create accumulators like this:
AccumulatorV2 accumulator = sparkContext.longAccumulator(name);


On Tue, May 26, 2020 at 8:42 PM Something Something <
mailinglist...@gmail.com> wrote:

> Hmm... how would they go to Graphana if they are not getting computed in
> your code? I am talking about the Application Specific Accumulators. The
> other standard counters such as 'event.progress.inputRowsPerSecond' are
> getting populated correctly!
>
> On Mon, May 25, 2020 at 8:39 PM Srinivas V  wrote:
>
>> Hello,
>> Even for me it comes as 0 when I print in OnQueryProgress. I use
>> LongAccumulator as well. Yes, it prints on my local but not on cluster.
>> But one consolation is that when I send metrics to Graphana, the values
>> are coming there.
>>
>> On Tue, May 26, 2020 at 3:10 AM Something Something <
>> mailinglist...@gmail.com> wrote:
>>
>>> No this is not working even if I use LongAccumulator.
>>>
>>> On Fri, May 15, 2020 at 9:54 PM ZHANG Wei  wrote:
>>>
 There is a restriction in AccumulatorV2 API [1], the OUT type should be
 atomic or thread safe. I'm wondering if the implementation for
 `java.util.Map[T, Long]` can meet it or not. Is there any chance to replace
 CollectionLongAccumulator by CollectionAccumulator[2] or LongAccumulator[3]
 and test if the StreamingListener and other codes are able to work?

 ---
 Cheers,
 -z
 [1]
 http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.util.AccumulatorV2
 [2]
 http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.util.CollectionAccumulator
 [3]
 http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.util.LongAccumulator

 
 From: Something Something 
 Sent: Saturday, May 16, 2020 0:38
 To: spark-user
 Subject: Re: Using Spark Accumulators with Structured Streaming

 Can someone from Spark Development team tell me if this functionality
 is supported and tested? I've spent a lot of time on this but can't get it
 to work. Just to add more context, we've our own Accumulator class that
 extends from AccumulatorV2. In this class we keep track of one or more
 accumulators. Here's the definition:


 class CollectionLongAccumulator[T]
 extends AccumulatorV2[T, java.util.Map[T, Long]]

 When the job begins we register an instance of this class:

 spark.sparkContext.register(myAccumulator, "MyAccumulator")

 Is this working under Structured Streaming?

 I will keep looking for alternate approaches but any help would be
 greatly appreciated. Thanks.



 On Thu, May 14, 2020 at 2:36 PM Something Something <
 mailinglist...@gmail.com> wrote:

 In my structured streaming job I am updating Spark Accumulators in the
 updateAcrossEvents method but they are always 0 when I try to print them in
 my StreamingListener. Here's the code:

 .mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())(
 updateAcrossEvents
   )


 The accumulators get incremented in 'updateAcrossEvents'. I've a
 StreamingListener which writes values of the accumulators in
 'onQueryProgress' method but in this method the Accumulators are ALWAYS
 ZERO!

 When I added log statements in the updateAcrossEvents, I could see that
 these accumulators are getting incremented as expected.

 This only happens when I run in the 'Cluster' mode. In Local mode it
 works fine which implies that the Accumulators are not getting distributed
 correctly - or something like that!

 Note: I've seen quite a few answers on the Web that tell me to perform
 an "Action". That's not a solution here. This is a 'Stateful Structured
 Streaming' job. Yes, I am also 'registering' them in SparkContext.






Re: Regarding Spark 3.0 GA

2020-05-27 Thread Sean Owen
No firm dates; it always depends on RC voting. Another RC is coming soon.
It is however looking pretty close to done.

On Wed, May 27, 2020 at 3:54 AM ARNAV NEGI SOFTWARE ARCHITECT <
negi.ar...@gmail.com> wrote:

> Hi,
>
> I am working on Spark 3.0 preview release for large Spark jobs on
> Kubernetes and preview looks promising.
>
> Can I understand when the Spark 3.0 GA is expected? Definitive dates will
> help us plan our roadmap with Spark 3.0.
>
>
> Arnav Negi / Technical Architect | Web Technology Enthusiast
> negi.ar...@gmail.com / +91-7045018844
>
>
> [image: Twitter]   [image: Facebook]
>   [image: Google +]
>   [image: LinkedIn]
>   [image: Skype]
>   [image: Youtube]
>   [image: Github]
>   [image: Quora]
> 
>
>
>


Re: Regarding Spark 3.0 GA

2020-05-27 Thread Gaetano Fabiano
I have no idea. 

I compiled a docker image that you can find on docker hub and you can do some 
experiments with it composing a cluster.

https://hub.docker.com/r/gaetanofabiano/spark

Let me know if you will have news about release

Regards 

Inviato da iPhone

> Il giorno 27 mag 2020, alle ore 10:54, ARNAV NEGI SOFTWARE ARCHITECT 
>  ha scritto:
> 
> 
> Hi,
> 
> I am working on Spark 3.0 preview release for large Spark jobs on Kubernetes 
> and preview looks promising.
> 
> Can I understand when the Spark 3.0 GA is expected? Definitive dates will 
> help us plan our roadmap with Spark 3.0.
> 
> 
> 
> Arnav Negi / Technical Architect | Web Technology Enthusiast 
> negi.ar...@gmail.com / +91-7045018844
> 
>
> 


Regarding Spark 3.0 GA

2020-05-27 Thread ARNAV NEGI SOFTWARE ARCHITECT
Hi,

I am working on Spark 3.0 preview release for large Spark jobs on
Kubernetes and preview looks promising.

Can I understand when the Spark 3.0 GA is expected? Definitive dates will
help us plan our roadmap with Spark 3.0.


Arnav Negi / Technical Architect | Web Technology Enthusiast
negi.ar...@gmail.com / +91-7045018844


[image: Twitter]   [image: Facebook]
  [image: Google +]
  [image: LinkedIn]
  [image: Skype]
  [image: Youtube]
  [image: Github]
  [image: Quora]



Spark on kubernetes memory spike and spark.kubernetes.memoryOverheadFactor not working

2020-05-27 Thread Maiti, Mousam
Hi Team,
We are using spark on Kubernetes, through spark-on-k8s-operator. Our 
application deals with multiple updateStateByKey operations. Upon 
investigation, we found that the spark application consumes a higher volume of 
memory. As spark-on-k8s-operator doesn't give the option to segregate spark JVM 
memory and pod memory, pod memory usage reaches 100 %. Container OS kernel 
kills the executor because of OOM.
So, we are trying to limit the memory overhead using -
spark.kubernetes.memoryOverheadFactor
0.1
This sets the Memory Overhead Factor that will allocate memory to non-JVM 
memory, which includes off-heap memory allocations, non-JVM tasks, and various 
systems processes. For JVM-based jobs this value will default to 0.10 and 0.40 
for non-JVM jobs. This is done as non-JVM tasks need more non-JVM heap space 
and such tasks commonly fail with "Memory Overhead Exceeded" errors. This 
prempts this error with a higher default.

But to no avail. Is there something we are missing? Is there any way around for 
this problem.
Thanks,
Mousam



Re: How to enable hive support on an existing Spark session?

2020-05-27 Thread HARSH TAKKAR
Hi Kun,

You can use following spark property instead while launching the app
instead of manually enabling it in the code.

spark.sql.catalogImplementation=hive


Kind Regards
Harsh

On Tue, May 26, 2020 at 9:55 PM Kun Huang (COSMOS)
 wrote:

>
> Hi Spark experts,
>
> I am seeking for an approach to enable hive support manually on an
> existing Spark session.
>
> Currently, HiveContext seems the best way for my scenario. However, this
> class has already been marked as deprecated and it is recommended to use
> SparkSession.builder.enableHiveSupport(). This should be called before
> creating Spark session.
>
> I wonder if there are other workaround?
>
> Thanks,
> Kun
>