Re: Spark dataframe hdfs vs s3

2020-05-29 Thread Jörn Franke
Maybe some aws network optimized instances with higher bandwidth will improve 
the situation.

> Am 27.05.2020 um 19:51 schrieb Dark Crusader :
> 
> 
> Hi Jörn,
> 
> Thanks for the reply. I will try to create a easier example to reproduce the 
> issue.
> 
> I will also try your suggestion to look into the UI. Can you guide on what I 
> should be looking for? 
> 
> I was already using the s3a protocol to compare the times.
> 
> My hunch is that multiple reads from S3 are required because of improper 
> caching of intermediate data. And maybe hdfs is doing a better job at this. 
> Does this make sense?
> 
> I would also like to add that we built an extra layer on S3 which might be 
> adding to even slower times.
> 
> Thanks for your help.
> 
>> On Wed, 27 May, 2020, 11:03 pm Jörn Franke,  wrote:
>> Have you looked in Spark UI why this is the case ? 
>> S3 Reading can take more time - it depends also what s3 url you are using : 
>> s3a vs s3n vs S3.
>> 
>> It could help after some calculation to persist in-memory or on HDFS. You 
>> can also initially load from S3 and store on HDFS and work from there . 
>> 
>> HDFS offers Data locality for the tasks, ie the tasks start on the nodes 
>> where the data is. Depending on what s3 „protocol“ you are using you might 
>> be also more punished with performance.
>> 
>> Try s3a as a protocol (replace all s3n with s3a).
>> 
>> You can also use s3 url but this requires a special bucket configuration, a 
>> dedicated empty bucket and it lacks some ineroperability with other AWS 
>> services.
>> 
>> Nevertheless, it could be also something else with the code. Can you post an 
>> example reproducing the issue?
>> 
>> > Am 27.05.2020 um 18:18 schrieb Dark Crusader 
>> > :
>> > 
>> > 
>> > Hi all,
>> > 
>> > I am reading data from hdfs in the form of parquet files (around 3 GB) and 
>> > running an algorithm from the spark ml library.
>> > 
>> > If I create the same spark dataframe by reading data from S3, the same 
>> > algorithm takes considerably more time.
>> > 
>> > I don't understand why this is happening. Is this a chance occurence or 
>> > are the spark dataframes created different? 
>> > 
>> > I don't understand how the data store would effect the algorithm 
>> > performance.
>> > 
>> > Any help would be appreciated. Thanks a lot.


Re: [pyspark 2.3+] Dedupe records

2020-05-29 Thread Sonal Goyal
Hi Rishi,

1. Dataframes are RDDs under the cover. If you have unstructured data or if
you know something about the data through which you can optimize the
computation. you can go with RDDs. Else the Dataframes which are optimized
by Spark SQL should be fine.
2. For incremental deduplication, I guess you can hash your data based on
some particular values and then only compare the new records against the
ones which have the same hash. That should reduce the order of comparisons
drastically provided you can come up with a good indexing/hashing scheme as
per your dataset.

Thanks,
Sonal
Nube Technologies 






On Sat, May 30, 2020 at 8:17 AM Rishi Shah  wrote:

> Hi All,
>
> I have around 100B records where I get new , update & delete records.
> Update/delete records are not that frequent. I would like to get some
> advice on below:
>
> 1) should I use rdd + reducibly or DataFrame window operation for data of
> this size? Which one would outperform the other? Which is more reliable and
> low maintenance?
> 2) Also how would you suggest we do incremental deduplication? Currently
> we do full processing once a week and no dedupe during week days to avoid
> heavy processing. However I would like to explore incremental dedupe option
> and weight pros/cons.
>
> Any input is highly appreciated!
>
> --
> Regards,
>
> Rishi Shah
>


[pyspark 2.3+] Dedupe records

2020-05-29 Thread Rishi Shah
Hi All,

I have around 100B records where I get new , update & delete records.
Update/delete records are not that frequent. I would like to get some
advice on below:

1) should I use rdd + reducibly or DataFrame window operation for data of
this size? Which one would outperform the other? Which is more reliable and
low maintenance?
2) Also how would you suggest we do incremental deduplication? Currently we
do full processing once a week and no dedupe during week days to avoid
heavy processing. However I would like to explore incremental dedupe option
and weight pros/cons.

Any input is highly appreciated!

-- 
Regards,

Rishi Shah


Re: Using Spark Accumulators with Structured Streaming

2020-05-29 Thread Something Something
I mean... I don't see any reference to 'accumulator' in your Class
*definition*. How can you access it in the class if it's not in your
definition of class:

public class StateUpdateTask implements MapGroupsWithStateFunction<*String,
InputEventModel, ModelStateInfo, ModelUpdate*> {.  *--> I was expecting to
see 'accumulator' here in the definition.*

@Override
public ModelUpdate call(String productId, Iterator
eventsIterator, GroupState state) {
}
}

On Fri, May 29, 2020 at 1:08 PM Srinivas V  wrote:

>
> Yes, accumulators are updated in the call method of StateUpdateTask. Like
> when state times out or when the data is pushed to next Kafka topic etc.
>
> On Fri, May 29, 2020 at 11:55 PM Something Something <
> mailinglist...@gmail.com> wrote:
>
>> Thanks! I will take a look at the link. Just one question, you seem to be
>> passing 'accumulators' in the constructor but where do you use it in the
>> StateUpdateTask class? I am still missing that connection. Sorry, if my
>> question is dumb. I must be missing something. Thanks for your help so far.
>> It's been useful.
>>
>>
>> On Fri, May 29, 2020 at 6:51 AM Srinivas V  wrote:
>>
>>> Yes it is application specific class. This is how java Spark Functions
>>> work.
>>> You can refer to this code in the documentation:
>>> https://github.com/apache/spark/blob/master/examples/src/main/java/org/apache/spark/examples/sql/streaming/JavaStructuredSessionization.java
>>>
>>> public class StateUpdateTask implements
>>> MapGroupsWithStateFunction>> ModelUpdate> {
>>>
>>> @Override
>>> public ModelUpdate call(String productId, Iterator
>>> eventsIterator, GroupState state) {
>>> }
>>> }
>>>
>>> On Thu, May 28, 2020 at 10:59 PM Something Something <
>>> mailinglist...@gmail.com> wrote:
>>>
 I am assuming StateUpdateTask is your application specific class. Does
 it have 'updateState' method or something? I googled but couldn't find any
 documentation about doing it this way. Can you please direct me to some
 documentation. Thanks.

 On Thu, May 28, 2020 at 4:43 AM Srinivas V  wrote:

> yes, I am using stateful structured streaming. Yes similar to what you
> do. This is in Java
> I do it this way:
> Dataset productUpdates = watermarkedDS
> .groupByKey(
> (MapFunction) event
> -> event.getId(), Encoders.STRING())
> .mapGroupsWithState(
> new
> StateUpdateTask(Long.parseLong(appConfig.getSparkStructuredStreamingConfig().STATE_TIMEOUT),
> appConfig, accumulators),
> Encoders.bean(ModelStateInfo.class),
> Encoders.bean(ModelUpdate.class),
> GroupStateTimeout.ProcessingTimeTimeout());
>
> StateUpdateTask contains the update method.
>
> On Thu, May 28, 2020 at 4:41 AM Something Something <
> mailinglist...@gmail.com> wrote:
>
>> Yes, that's exactly how I am creating them.
>>
>> Question... Are you using 'Stateful Structured Streaming' in which
>> you've something like this?
>>
>> .mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())(
>> updateAcrossEvents
>>   )
>>
>> And updating the Accumulator inside 'updateAcrossEvents'? We're 
>> experiencing this only under 'Stateful Structured Streaming'. In other 
>> streaming applications it works as expected.
>>
>>
>>
>> On Wed, May 27, 2020 at 9:01 AM Srinivas V 
>> wrote:
>>
>>> Yes, I am talking about Application specific Accumulators. Actually
>>> I am getting the values printed in my driver log as well as sent to
>>> Grafana. Not sure where and when I saw 0 before. My deploy mode is 
>>> “client”
>>> on a yarn cluster(not local Mac) where I submit from master node. It 
>>> should
>>> work the same for cluster mode as well.
>>> Create accumulators like this:
>>> AccumulatorV2 accumulator = sparkContext.longAccumulator(name);
>>>
>>>
>>> On Tue, May 26, 2020 at 8:42 PM Something Something <
>>> mailinglist...@gmail.com> wrote:
>>>
 Hmm... how would they go to Graphana if they are not getting
 computed in your code? I am talking about the Application Specific
 Accumulators. The other standard counters such as
 'event.progress.inputRowsPerSecond' are getting populated correctly!

 On Mon, May 25, 2020 at 8:39 PM Srinivas V 
 wrote:

> Hello,
> Even for me it comes as 0 when I print in OnQueryProgress. I use
> LongAccumulator as well. Yes, it prints on my local but not on 
> cluster.
> But one consolation is that when I send metrics to Graphana, the
> values are coming there.
>
> On Tue, May 26, 2020 at 3:10 AM Something Something <
> mailinglist...@gmail.com> wrote:

Re: Spark dataframe hdfs vs s3

2020-05-29 Thread randy clinton
HDFS is simply a better place to make performant reads and on top of that
the data is closer to your spark job. The databricks link from above will
show you that where they find a 6x read throughput difference between the
two.

If your HDFS is part of the same Spark cluster than it should be an
incredibly fast read vs reaching out to S3 for the data.

They are different types of storage solving different things.

Something I have seen in workflows is something other people have suggested
above, is a stage where you load data from S3 into HDFS, then move on to
you other work with it and maybe finally persist outside of HDFS.

On Fri, May 29, 2020 at 2:09 PM Bin Fan  wrote:

> Try to deploy Alluxio as a caching layer on top of S3, providing Spark a
> similar HDFS interface?
> Like in this article:
>
> https://www.alluxio.io/blog/accelerate-spark-and-hive-jobs-on-aws-s3-by-10x-with-alluxio-tiered-storage/
>
>
> On Wed, May 27, 2020 at 6:52 PM Dark Crusader <
> relinquisheddra...@gmail.com> wrote:
>
>> Hi Randy,
>>
>> Yes, I'm using parquet on both S3 and hdfs.
>>
>> On Thu, 28 May, 2020, 2:38 am randy clinton, 
>> wrote:
>>
>>> Is the file Parquet on S3 or is it some other file format?
>>>
>>> In general I would assume that HDFS read/writes are more performant for
>>> spark jobs.
>>>
>>> For instance, consider how well partitioned your HDFS file is vs the S3
>>> file.
>>>
>>> On Wed, May 27, 2020 at 1:51 PM Dark Crusader <
>>> relinquisheddra...@gmail.com> wrote:
>>>
 Hi Jörn,

 Thanks for the reply. I will try to create a easier example to
 reproduce the issue.

 I will also try your suggestion to look into the UI. Can you guide on
 what I should be looking for?

 I was already using the s3a protocol to compare the times.

 My hunch is that multiple reads from S3 are required because of
 improper caching of intermediate data. And maybe hdfs is doing a better job
 at this. Does this make sense?

 I would also like to add that we built an extra layer on S3 which might
 be adding to even slower times.

 Thanks for your help.

 On Wed, 27 May, 2020, 11:03 pm Jörn Franke, 
 wrote:

> Have you looked in Spark UI why this is the case ?
> S3 Reading can take more time - it depends also what s3 url you are
> using : s3a vs s3n vs S3.
>
> It could help after some calculation to persist in-memory or on HDFS.
> You can also initially load from S3 and store on HDFS and work from there 
> .
>
> HDFS offers Data locality for the tasks, ie the tasks start on the
> nodes where the data is. Depending on what s3 „protocol“ you are using you
> might be also more punished with performance.
>
> Try s3a as a protocol (replace all s3n with s3a).
>
> You can also use s3 url but this requires a special bucket
> configuration, a dedicated empty bucket and it lacks some ineroperability
> with other AWS services.
>
> Nevertheless, it could be also something else with the code. Can you
> post an example reproducing the issue?
>
> > Am 27.05.2020 um 18:18 schrieb Dark Crusader <
> relinquisheddra...@gmail.com>:
> >
> > 
> > Hi all,
> >
> > I am reading data from hdfs in the form of parquet files (around 3
> GB) and running an algorithm from the spark ml library.
> >
> > If I create the same spark dataframe by reading data from S3, the
> same algorithm takes considerably more time.
> >
> > I don't understand why this is happening. Is this a chance occurence
> or are the spark dataframes created different?
> >
> > I don't understand how the data store would effect the algorithm
> performance.
> >
> > Any help would be appreciated. Thanks a lot.
>

>>>
>>> --
>>> I appreciate your time,
>>>
>>> ~Randy
>>>
>>

-- 
I appreciate your time,

~Randy


Re: Spark Security

2020-05-29 Thread Anwar AliKhan
What is the size of your .tsv file   sir  ?
What is the size of your local hard drive   sir  ?


Regards


Wali Ahaad


On Fri, 29 May 2020, 16:21 ,  wrote:

> Hello,
>
> I plan to load in a local .tsv file from my hard drive using sparklyr (an
> R package). I have figured out how to do this already on small files.
>
> When I decide to receive my client’s large .tsv file, can I be confident
> that loading in data this way will be secure? I know that this creates a
> Spark connection to help process the data more quickly, but I want to
> verify that the data will be secure after loading it with the Spark
> connection and sparklyr.
>
>
> Thanks,
>
> Wilbert J. Seoane
>
> Sent from iPhone
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: Using Spark Accumulators with Structured Streaming

2020-05-29 Thread Srinivas V
Yes, accumulators are updated in the call method of StateUpdateTask. Like
when state times out or when the data is pushed to next Kafka topic etc.

On Fri, May 29, 2020 at 11:55 PM Something Something <
mailinglist...@gmail.com> wrote:

> Thanks! I will take a look at the link. Just one question, you seem to be
> passing 'accumulators' in the constructor but where do you use it in the
> StateUpdateTask class? I am still missing that connection. Sorry, if my
> question is dumb. I must be missing something. Thanks for your help so far.
> It's been useful.
>
>
> On Fri, May 29, 2020 at 6:51 AM Srinivas V  wrote:
>
>> Yes it is application specific class. This is how java Spark Functions
>> work.
>> You can refer to this code in the documentation:
>> https://github.com/apache/spark/blob/master/examples/src/main/java/org/apache/spark/examples/sql/streaming/JavaStructuredSessionization.java
>>
>> public class StateUpdateTask implements
>> MapGroupsWithStateFunction> ModelUpdate> {
>>
>> @Override
>> public ModelUpdate call(String productId, Iterator
>> eventsIterator, GroupState state) {
>> }
>> }
>>
>> On Thu, May 28, 2020 at 10:59 PM Something Something <
>> mailinglist...@gmail.com> wrote:
>>
>>> I am assuming StateUpdateTask is your application specific class. Does
>>> it have 'updateState' method or something? I googled but couldn't find any
>>> documentation about doing it this way. Can you please direct me to some
>>> documentation. Thanks.
>>>
>>> On Thu, May 28, 2020 at 4:43 AM Srinivas V  wrote:
>>>
 yes, I am using stateful structured streaming. Yes similar to what you
 do. This is in Java
 I do it this way:
 Dataset productUpdates = watermarkedDS
 .groupByKey(
 (MapFunction) event ->
 event.getId(), Encoders.STRING())
 .mapGroupsWithState(
 new
 StateUpdateTask(Long.parseLong(appConfig.getSparkStructuredStreamingConfig().STATE_TIMEOUT),
 appConfig, accumulators),
 Encoders.bean(ModelStateInfo.class),
 Encoders.bean(ModelUpdate.class),
 GroupStateTimeout.ProcessingTimeTimeout());

 StateUpdateTask contains the update method.

 On Thu, May 28, 2020 at 4:41 AM Something Something <
 mailinglist...@gmail.com> wrote:

> Yes, that's exactly how I am creating them.
>
> Question... Are you using 'Stateful Structured Streaming' in which
> you've something like this?
>
> .mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())(
> updateAcrossEvents
>   )
>
> And updating the Accumulator inside 'updateAcrossEvents'? We're 
> experiencing this only under 'Stateful Structured Streaming'. In other 
> streaming applications it works as expected.
>
>
>
> On Wed, May 27, 2020 at 9:01 AM Srinivas V 
> wrote:
>
>> Yes, I am talking about Application specific Accumulators. Actually I
>> am getting the values printed in my driver log as well as sent to 
>> Grafana.
>> Not sure where and when I saw 0 before. My deploy mode is “client” on a
>> yarn cluster(not local Mac) where I submit from master node. It should 
>> work
>> the same for cluster mode as well.
>> Create accumulators like this:
>> AccumulatorV2 accumulator = sparkContext.longAccumulator(name);
>>
>>
>> On Tue, May 26, 2020 at 8:42 PM Something Something <
>> mailinglist...@gmail.com> wrote:
>>
>>> Hmm... how would they go to Graphana if they are not getting
>>> computed in your code? I am talking about the Application Specific
>>> Accumulators. The other standard counters such as
>>> 'event.progress.inputRowsPerSecond' are getting populated correctly!
>>>
>>> On Mon, May 25, 2020 at 8:39 PM Srinivas V 
>>> wrote:
>>>
 Hello,
 Even for me it comes as 0 when I print in OnQueryProgress. I use
 LongAccumulator as well. Yes, it prints on my local but not on cluster.
 But one consolation is that when I send metrics to Graphana, the
 values are coming there.

 On Tue, May 26, 2020 at 3:10 AM Something Something <
 mailinglist...@gmail.com> wrote:

> No this is not working even if I use LongAccumulator.
>
> On Fri, May 15, 2020 at 9:54 PM ZHANG Wei  wrote:
>
>> There is a restriction in AccumulatorV2 API [1], the OUT type
>> should be atomic or thread safe. I'm wondering if the implementation 
>> for
>> `java.util.Map[T, Long]` can meet it or not. Is there any chance to 
>> replace
>> CollectionLongAccumulator by CollectionAccumulator[2] or 
>> LongAccumulator[3]
>> and test if the StreamingListener and other codes are able to work?
>>
>> ---
>> Cheers,

Re: Spark Security

2020-05-29 Thread Sean Owen
If you load a file on your computer, that is unrelated to Spark.
Whatever you load via Spark APIs will at some point live in memory on the
Spark cluster, or the storage you back it with if you store it.
Whether the cluster and storage are secure (like, ACLs / auth enabled) is
up to whoever runs the cluster.

On Fri, May 29, 2020 at 1:54 PM  wrote:

> Hi Sean
>
> I mean that I won’t be opening up my client for any data breaches or
> anything like that by connecting to Spark and loading in their data using
> sparklyr in R studio.
>
> Connecting with spark and loading in a tsv file on my local computer is
> secure correct?
>
>
> Thanks
>
> Wilbert J. Seoane
>
> Sent from iPhone
>
> On May 29, 2020, at 11:25 AM, Sean Owen  wrote:
>
> 
> What do you mean by secure here?
>
> On Fri, May 29, 2020 at 10:21 AM  wrote:
>
>> Hello,
>>
>> I plan to load in a local .tsv file from my hard drive using sparklyr (an
>> R package). I have figured out how to do this already on small files.
>>
>> When I decide to receive my client’s large .tsv file, can I be confident
>> that loading in data this way will be secure? I know that this creates a
>> Spark connection to help process the data more quickly, but I want to
>> verify that the data will be secure after loading it with the Spark
>> connection and sparklyr.
>>
>>
>> Thanks,
>>
>> Wilbert J. Seoane
>>
>> Sent from iPhone
>> -
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>
>>


Re: Using Spark Accumulators with Structured Streaming

2020-05-29 Thread Something Something
Thanks! I will take a look at the link. Just one question, you seem to be
passing 'accumulators' in the constructor but where do you use it in the
StateUpdateTask class? I am still missing that connection. Sorry, if my
question is dumb. I must be missing something. Thanks for your help so far.
It's been useful.


On Fri, May 29, 2020 at 6:51 AM Srinivas V  wrote:

> Yes it is application specific class. This is how java Spark Functions
> work.
> You can refer to this code in the documentation:
> https://github.com/apache/spark/blob/master/examples/src/main/java/org/apache/spark/examples/sql/streaming/JavaStructuredSessionization.java
>
> public class StateUpdateTask implements MapGroupsWithStateFunction InputEventModel, ModelStateInfo, ModelUpdate> {
>
> @Override
> public ModelUpdate call(String productId, Iterator
> eventsIterator, GroupState state) {
> }
> }
>
> On Thu, May 28, 2020 at 10:59 PM Something Something <
> mailinglist...@gmail.com> wrote:
>
>> I am assuming StateUpdateTask is your application specific class. Does it
>> have 'updateState' method or something? I googled but couldn't find any
>> documentation about doing it this way. Can you please direct me to some
>> documentation. Thanks.
>>
>> On Thu, May 28, 2020 at 4:43 AM Srinivas V  wrote:
>>
>>> yes, I am using stateful structured streaming. Yes similar to what you
>>> do. This is in Java
>>> I do it this way:
>>> Dataset productUpdates = watermarkedDS
>>> .groupByKey(
>>> (MapFunction) event ->
>>> event.getId(), Encoders.STRING())
>>> .mapGroupsWithState(
>>> new
>>> StateUpdateTask(Long.parseLong(appConfig.getSparkStructuredStreamingConfig().STATE_TIMEOUT),
>>> appConfig, accumulators),
>>> Encoders.bean(ModelStateInfo.class),
>>> Encoders.bean(ModelUpdate.class),
>>> GroupStateTimeout.ProcessingTimeTimeout());
>>>
>>> StateUpdateTask contains the update method.
>>>
>>> On Thu, May 28, 2020 at 4:41 AM Something Something <
>>> mailinglist...@gmail.com> wrote:
>>>
 Yes, that's exactly how I am creating them.

 Question... Are you using 'Stateful Structured Streaming' in which
 you've something like this?

 .mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())(
 updateAcrossEvents
   )

 And updating the Accumulator inside 'updateAcrossEvents'? We're 
 experiencing this only under 'Stateful Structured Streaming'. In other 
 streaming applications it works as expected.



 On Wed, May 27, 2020 at 9:01 AM Srinivas V  wrote:

> Yes, I am talking about Application specific Accumulators. Actually I
> am getting the values printed in my driver log as well as sent to Grafana.
> Not sure where and when I saw 0 before. My deploy mode is “client” on a
> yarn cluster(not local Mac) where I submit from master node. It should 
> work
> the same for cluster mode as well.
> Create accumulators like this:
> AccumulatorV2 accumulator = sparkContext.longAccumulator(name);
>
>
> On Tue, May 26, 2020 at 8:42 PM Something Something <
> mailinglist...@gmail.com> wrote:
>
>> Hmm... how would they go to Graphana if they are not getting computed
>> in your code? I am talking about the Application Specific Accumulators. 
>> The
>> other standard counters such as 'event.progress.inputRowsPerSecond' are
>> getting populated correctly!
>>
>> On Mon, May 25, 2020 at 8:39 PM Srinivas V 
>> wrote:
>>
>>> Hello,
>>> Even for me it comes as 0 when I print in OnQueryProgress. I use
>>> LongAccumulator as well. Yes, it prints on my local but not on cluster.
>>> But one consolation is that when I send metrics to Graphana, the
>>> values are coming there.
>>>
>>> On Tue, May 26, 2020 at 3:10 AM Something Something <
>>> mailinglist...@gmail.com> wrote:
>>>
 No this is not working even if I use LongAccumulator.

 On Fri, May 15, 2020 at 9:54 PM ZHANG Wei  wrote:

> There is a restriction in AccumulatorV2 API [1], the OUT type
> should be atomic or thread safe. I'm wondering if the implementation 
> for
> `java.util.Map[T, Long]` can meet it or not. Is there any chance to 
> replace
> CollectionLongAccumulator by CollectionAccumulator[2] or 
> LongAccumulator[3]
> and test if the StreamingListener and other codes are able to work?
>
> ---
> Cheers,
> -z
> [1]
> http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.util.AccumulatorV2
> [2]
> http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.util.CollectionAccumulator
> [3]
> 

Re: Using Spark Accumulators with Structured Streaming

2020-05-29 Thread Something Something
Did you try this on the Cluster? Note: This works just fine under 'Local'
mode.

On Thu, May 28, 2020 at 9:12 PM ZHANG Wei  wrote:

> I can't reproduce the issue with my simple code:
> ```scala
> spark.streams.addListener(new StreamingQueryListener {
>   override def onQueryProgress(event:
> StreamingQueryListener.QueryProgressEvent): Unit = {
> println(event.progress.id + " is on progress")
> println(s"My accu is ${myAcc.value} on query progress")
>   }
> ...
> })
>
> def mappingFunc(key: Long, values: Iterator[String], state:
> GroupState[Long]): ... = {
>   myAcc.add(1)
>   println(s">>> key: $key => state: ${state}")
> ...
> }
>
> val wordCounts = words
>   .groupByKey(v => ...)
>   .mapGroupsWithState(timeoutConf =
> GroupStateTimeout.ProcessingTimeTimeout)(func = mappingFunc)
>
> val query = wordCounts.writeStream
>   .outputMode(OutputMode.Update)
> ...
> ```
>
> I'm wondering if there were any errors can be found from driver logs? The
> micro-batch
> exceptions won't terminate the streaming job running.
>
> For the following code, we have to make sure that `StateUpdateTask` is
> started:
> > .mapGroupsWithState(
> > new
> StateUpdateTask(Long.parseLong(appConfig.getSparkStructuredStreamingConfig().STATE_TIMEOUT),
> > appConfig, accumulators),
> > Encoders.bean(ModelStateInfo.class),
> > Encoders.bean(ModelUpdate.class),
> > GroupStateTimeout.ProcessingTimeTimeout());
>
> --
> Cheers,
> -z
>
> On Thu, 28 May 2020 19:59:31 +0530
> Srinivas V  wrote:
>
> > Giving the code below:
> > //accumulators is a class level variable in driver.
> >
> >  sparkSession.streams().addListener(new StreamingQueryListener() {
> > @Override
> > public void onQueryStarted(QueryStartedEvent queryStarted) {
> > logger.info("Query started: " + queryStarted.id());
> > }
> > @Override
> > public void onQueryTerminated(QueryTerminatedEvent
> > queryTerminated) {
> > logger.info("Query terminated: " +
> queryTerminated.id());
> > }
> > @Override
> > public void onQueryProgress(QueryProgressEvent
> queryProgress) {
> >
> > accumulators.eventsReceived(queryProgress.progress().numInputRows());
> > long eventsReceived = 0;
> > long eventsExpired = 0;
> > long eventSentSuccess = 0;
> > try {
> > eventsReceived =
> > accumulators.getLong(InstrumentationCounters.EVENTS_RECEIVED);
> > eventsExpired =
> > accumulators.getLong(InstrumentationCounters.EVENTS_STATE_EXPIRED);
> > eventSentSuccess =
> > accumulators.getLong(InstrumentationCounters.EVENTS_SENT);
> > } catch (MissingKeyException e) {
> > logger.error("Accumulator key not found due to
> > Exception {}", e.getMessage());
> > }
> > logger.info("Events Received:{}", eventsReceived);
> > logger.info("Events State Expired:{}", eventsExpired);
> > logger.info("Events Sent Success:{}", eventSentSuccess);
> > logger.info("Query made progress - batchId: {}
> > numInputRows:{} inputRowsPerSecond:{} processedRowsPerSecond:{}
> > durationMs:{}" ,
> > queryProgress.progress().batchId(),
> > queryProgress.progress().numInputRows(),
> > queryProgress.progress().inputRowsPerSecond(),
> >
>  queryProgress.progress().processedRowsPerSecond(),
> > queryProgress.progress().durationMs());
> >
> >
> > On Thu, May 28, 2020 at 7:04 PM ZHANG Wei  wrote:
> >
> > > May I get how the accumulator is accessed in the method
> > > `onQueryProgress()`?
> > >
> > > AFAICT, the accumulator is incremented well. There is a way to verify
> that
> > > in cluster like this:
> > > ```
> > > // Add the following while loop before invoking awaitTermination
> > > while (true) {
> > >   println("My acc: " + myAcc.value)
> > >   Thread.sleep(5 * 1000)
> > > }
> > >
> > > //query.awaitTermination()
> > > ```
> > >
> > > And the accumulator value updated can be found from driver stdout.
> > >
> > > --
> > > Cheers,
> > > -z
> > >
> > > On Thu, 28 May 2020 17:12:48 +0530
> > > Srinivas V  wrote:
> > >
> > > > yes, I am using stateful structured streaming. Yes similar to what
> you
> > > do.
> > > > This is in Java
> > > > I do it this way:
> > > > Dataset productUpdates = watermarkedDS
> > > > .groupByKey(
> > > > (MapFunction) event
> ->
> > > > event.getId(), Encoders.STRING())
> > > > .mapGroupsWithState(
> > > > new
> > > >
> > >
> StateUpdateTask(Long.parseLong(appConfig.getSparkStructuredStreamingConfig().STATE_TIMEOUT),
> > > > 

Re: Spark dataframe hdfs vs s3

2020-05-29 Thread Bin Fan
Try to deploy Alluxio as a caching layer on top of S3, providing Spark a
similar HDFS interface?
Like in this article:
https://www.alluxio.io/blog/accelerate-spark-and-hive-jobs-on-aws-s3-by-10x-with-alluxio-tiered-storage/


On Wed, May 27, 2020 at 6:52 PM Dark Crusader 
wrote:

> Hi Randy,
>
> Yes, I'm using parquet on both S3 and hdfs.
>
> On Thu, 28 May, 2020, 2:38 am randy clinton, 
> wrote:
>
>> Is the file Parquet on S3 or is it some other file format?
>>
>> In general I would assume that HDFS read/writes are more performant for
>> spark jobs.
>>
>> For instance, consider how well partitioned your HDFS file is vs the S3
>> file.
>>
>> On Wed, May 27, 2020 at 1:51 PM Dark Crusader <
>> relinquisheddra...@gmail.com> wrote:
>>
>>> Hi Jörn,
>>>
>>> Thanks for the reply. I will try to create a easier example to reproduce
>>> the issue.
>>>
>>> I will also try your suggestion to look into the UI. Can you guide on
>>> what I should be looking for?
>>>
>>> I was already using the s3a protocol to compare the times.
>>>
>>> My hunch is that multiple reads from S3 are required because of improper
>>> caching of intermediate data. And maybe hdfs is doing a better job at this.
>>> Does this make sense?
>>>
>>> I would also like to add that we built an extra layer on S3 which might
>>> be adding to even slower times.
>>>
>>> Thanks for your help.
>>>
>>> On Wed, 27 May, 2020, 11:03 pm Jörn Franke, 
>>> wrote:
>>>
 Have you looked in Spark UI why this is the case ?
 S3 Reading can take more time - it depends also what s3 url you are
 using : s3a vs s3n vs S3.

 It could help after some calculation to persist in-memory or on HDFS.
 You can also initially load from S3 and store on HDFS and work from there .

 HDFS offers Data locality for the tasks, ie the tasks start on the
 nodes where the data is. Depending on what s3 „protocol“ you are using you
 might be also more punished with performance.

 Try s3a as a protocol (replace all s3n with s3a).

 You can also use s3 url but this requires a special bucket
 configuration, a dedicated empty bucket and it lacks some ineroperability
 with other AWS services.

 Nevertheless, it could be also something else with the code. Can you
 post an example reproducing the issue?

 > Am 27.05.2020 um 18:18 schrieb Dark Crusader <
 relinquisheddra...@gmail.com>:
 >
 > 
 > Hi all,
 >
 > I am reading data from hdfs in the form of parquet files (around 3
 GB) and running an algorithm from the spark ml library.
 >
 > If I create the same spark dataframe by reading data from S3, the
 same algorithm takes considerably more time.
 >
 > I don't understand why this is happening. Is this a chance occurence
 or are the spark dataframes created different?
 >
 > I don't understand how the data store would effect the algorithm
 performance.
 >
 > Any help would be appreciated. Thanks a lot.

>>>
>>
>> --
>> I appreciate your time,
>>
>> ~Randy
>>
>


Re: Spark Security

2020-05-29 Thread Sean Owen
What do you mean by secure here?

On Fri, May 29, 2020 at 10:21 AM  wrote:

> Hello,
>
> I plan to load in a local .tsv file from my hard drive using sparklyr (an
> R package). I have figured out how to do this already on small files.
>
> When I decide to receive my client’s large .tsv file, can I be confident
> that loading in data this way will be secure? I know that this creates a
> Spark connection to help process the data more quickly, but I want to
> verify that the data will be secure after loading it with the Spark
> connection and sparklyr.
>
>
> Thanks,
>
> Wilbert J. Seoane
>
> Sent from iPhone
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Spark Security

2020-05-29 Thread wilbertseoane
Hello,

I plan to load in a local .tsv file from my hard drive using sparklyr (an R 
package). I have figured out how to do this already on small files. 

When I decide to receive my client’s large .tsv file, can I be confident that 
loading in data this way will be secure? I know that this creates a Spark 
connection to help process the data more quickly, but I want to verify that the 
data will be secure after loading it with the Spark connection and sparklyr. 


Thanks,

Wilbert J. Seoane

Sent from iPhone
-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Using Spark Accumulators with Structured Streaming

2020-05-29 Thread Srinivas V
Yes it is application specific class. This is how java Spark Functions work.
You can refer to this code in the documentation:
https://github.com/apache/spark/blob/master/examples/src/main/java/org/apache/spark/examples/sql/streaming/JavaStructuredSessionization.java

public class StateUpdateTask implements MapGroupsWithStateFunction {

@Override
public ModelUpdate call(String productId, Iterator
eventsIterator, GroupState state) {
}
}

On Thu, May 28, 2020 at 10:59 PM Something Something <
mailinglist...@gmail.com> wrote:

> I am assuming StateUpdateTask is your application specific class. Does it
> have 'updateState' method or something? I googled but couldn't find any
> documentation about doing it this way. Can you please direct me to some
> documentation. Thanks.
>
> On Thu, May 28, 2020 at 4:43 AM Srinivas V  wrote:
>
>> yes, I am using stateful structured streaming. Yes similar to what you
>> do. This is in Java
>> I do it this way:
>> Dataset productUpdates = watermarkedDS
>> .groupByKey(
>> (MapFunction) event ->
>> event.getId(), Encoders.STRING())
>> .mapGroupsWithState(
>> new
>> StateUpdateTask(Long.parseLong(appConfig.getSparkStructuredStreamingConfig().STATE_TIMEOUT),
>> appConfig, accumulators),
>> Encoders.bean(ModelStateInfo.class),
>> Encoders.bean(ModelUpdate.class),
>> GroupStateTimeout.ProcessingTimeTimeout());
>>
>> StateUpdateTask contains the update method.
>>
>> On Thu, May 28, 2020 at 4:41 AM Something Something <
>> mailinglist...@gmail.com> wrote:
>>
>>> Yes, that's exactly how I am creating them.
>>>
>>> Question... Are you using 'Stateful Structured Streaming' in which
>>> you've something like this?
>>>
>>> .mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())(
>>> updateAcrossEvents
>>>   )
>>>
>>> And updating the Accumulator inside 'updateAcrossEvents'? We're 
>>> experiencing this only under 'Stateful Structured Streaming'. In other 
>>> streaming applications it works as expected.
>>>
>>>
>>>
>>> On Wed, May 27, 2020 at 9:01 AM Srinivas V  wrote:
>>>
 Yes, I am talking about Application specific Accumulators. Actually I
 am getting the values printed in my driver log as well as sent to Grafana.
 Not sure where and when I saw 0 before. My deploy mode is “client” on a
 yarn cluster(not local Mac) where I submit from master node. It should work
 the same for cluster mode as well.
 Create accumulators like this:
 AccumulatorV2 accumulator = sparkContext.longAccumulator(name);


 On Tue, May 26, 2020 at 8:42 PM Something Something <
 mailinglist...@gmail.com> wrote:

> Hmm... how would they go to Graphana if they are not getting computed
> in your code? I am talking about the Application Specific Accumulators. 
> The
> other standard counters such as 'event.progress.inputRowsPerSecond' are
> getting populated correctly!
>
> On Mon, May 25, 2020 at 8:39 PM Srinivas V 
> wrote:
>
>> Hello,
>> Even for me it comes as 0 when I print in OnQueryProgress. I use
>> LongAccumulator as well. Yes, it prints on my local but not on cluster.
>> But one consolation is that when I send metrics to Graphana, the
>> values are coming there.
>>
>> On Tue, May 26, 2020 at 3:10 AM Something Something <
>> mailinglist...@gmail.com> wrote:
>>
>>> No this is not working even if I use LongAccumulator.
>>>
>>> On Fri, May 15, 2020 at 9:54 PM ZHANG Wei  wrote:
>>>
 There is a restriction in AccumulatorV2 API [1], the OUT type
 should be atomic or thread safe. I'm wondering if the implementation 
 for
 `java.util.Map[T, Long]` can meet it or not. Is there any chance to 
 replace
 CollectionLongAccumulator by CollectionAccumulator[2] or 
 LongAccumulator[3]
 and test if the StreamingListener and other codes are able to work?

 ---
 Cheers,
 -z
 [1]
 http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.util.AccumulatorV2
 [2]
 http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.util.CollectionAccumulator
 [3]
 http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.util.LongAccumulator

 
 From: Something Something 
 Sent: Saturday, May 16, 2020 0:38
 To: spark-user
 Subject: Re: Using Spark Accumulators with Structured Streaming

 Can someone from Spark Development team tell me if this
 functionality is supported and tested? I've spent a lot of time on 
 this but
 can't get it to work. Just to add more context, we've our own 
 Accumulator
 

Re: Different execution results with wholestage codegen on and off

2020-05-29 Thread Pasha Finkelshteyn
Here[1] it is, please review

[1] https://issues.apache.org/jira/browse/SPARK-31854
On 20/05/27 10:21PM, Xiao Li wrote:
> Thanks for reporting it. Please open a JIRA with a test case.
> 
> Cheers,
> 
> Xiao
> 
> On Wed, May 27, 2020 at 1:42 PM Pasha Finkelshteyn <
> pavel.finkelsht...@gmail.com> wrote:
> 
> > Hi folks,
> >
> > I'm implementing Kotlin bindings for Spark and faced strange problem. In
> > one cornercase Spark works differently when wholestage codegen is on or
> > off.
> >
> > Does it look like bug ot expected behavior?
> > --
> > Regards,
> > Pasha
> >
> > Big Data Tools @ JetBrains
> >
> 
> 
> -- 
> 

-- 
Regards,
Pasha

Big Data Tools @ JetBrains


signature.asc
Description: PGP signature


Re: CSV parsing issue

2020-05-29 Thread elango vaidyanathan
Thanks Sean, got it.

Thanks,
Elango

On Thu, May 28, 2020, 9:04 PM Sean Owen  wrote:

> I don't think so, that data is inherently ambiguous and incorrectly
> formatted. If you know something about the structure, maybe you can rewrite
> the middle column manually to escape the inner quotes and then reparse.
>
> On Thu, May 28, 2020 at 10:25 AM elango vaidyanathan 
> wrote:
>
>> Is there any way I can handle it in code?
>>
>> Thanks,
>> Elango
>>
>> On Thu, May 28, 2020, 8:52 PM Sean Owen  wrote:
>>
>>> Your data doesn't escape double-quotes.
>>>
>>> On Thu, May 28, 2020 at 10:21 AM elango vaidyanathan <
>>> elango...@gmail.com> wrote:
>>>

 Hi team,

 I am loading an CSV. One column contains a json value. I am unable to
 parse that column properly. Below is the details. Can you please check 
 once?



 val df1=spark.read.option("inferSchema","true").
 option("header","true").option("quote", "\"")

 .option("escape",
 "\"").csv("/FileStore/tables/sample_file_structure.csv")



 sample data:

 

 column1,column2,column3

 123456789,"{   "moveId" : "123456789",   "dob" : null,   "username" :
 "abcdef",   "language" : "en" }",11

 123456789,"{   "moveId" : "123456789",   "dob" : null,   "username" :
 "ghi, jkl",   "language" : "en" }",12 123456789,"{   "moveId" :
 "123456789",   "dob" : null,   "username" : "mno, pqr",   "language" : "en"
 }",13



 output:

 ---

 +-++---+

 | column1| column2| column3 |

 +-++---+

 |123456789|"{ "moveId" : "...| "dob" : null|

 |123456789|"{ "moveId" : "...| "dob" : null|

 +-++---+



 Thanks,
 Elango

>>>