Re: [pyspark 2.4] broadcasting DataFrame throws error

2020-09-21 Thread Rishi Shah
Thanks Amit, I was referring to dynamic partition pruning (
https://issues.apache.org/jira/browse/SPARK-11150) & adaptive query
execution (https://issues.apache.org/jira/browse/SPARK-31412) in Sparkk 3 -
where it would figure out right partitions & pushes the filters to input
before applying the join.

On Sat, Sep 19, 2020 at 1:31 AM Amit Joshi 
wrote:

> Hi Rishi,
>
> May be you have aready done these steps.
> Can you check the size of the dataframe you are trying to broadcast using
> logInfo(SizeEstimator.estimate(df))
> and adjust the driver similarly.
>
> There is one more issue which I found was in spark 2.
> Broadcast does not work in cache data. It is possible this may not be the
> issue. You can check at your end the same problem.
>
>
> https://github.com/apache/spark/blame/master/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala#L219
>
> And can you pls tell what issue was solved in spark 3, which you are
> referring.
>
> Regards
> Amit
>
>
> On Saturday, September 19, 2020, Rishi Shah 
> wrote:
>
>> Thanks Amit. I have tried increasing driver memory , also tried
>> increasing max result size returned to the driver. Nothing works, I believe
>> spark is not able to determine the fact that the result to be broadcasted
>> is small enough because input data is huge? When I tried this in 2 stages,
>> write out the grouped data and use that to join using broadcast, spark has
>> no issues broadcasting this.
>>
>> When I was checking Spark 3 documentation, it seems like this issue may
>> have been addressed in Spark 3 but not in earlier version?
>>
>> On Thu, Sep 17, 2020 at 11:35 PM Amit Joshi 
>> wrote:
>>
>>> Hi,
>>>
>>> I think problem lies with driver memory. Broadcast in spark work by
>>> collecting all the data to driver and then driver broadcasting to all the
>>> executors. Different strategy could be employed for trasfer like bit
>>> torrent though.
>>>
>>> Please try increasing the driver memory. See if it works.
>>>
>>> Regards,
>>> Amit
>>>
>>>
>>> On Thursday, September 17, 2020, Rishi Shah 
>>> wrote:
>>>
>>>> Hello All,
>>>>
>>>> Hope this email finds you well. I have a dataframe of size 8TB (parquet
>>>> snappy compressed), however I group it by a column and get a much smaller
>>>> aggregated dataframe of size 700 rows (just two columns, key and count).
>>>> When I use it like below to broadcast this aggregated result, it throws
>>>> dataframe can not be broadcasted error.
>>>>
>>>> df_agg = df.groupBy('column1').count().cache()
>>>> # df_agg.count()
>>>> df_join = df.join(broadcast(df_agg), 'column1', 'left_outer')
>>>> df_join.write.parquet('PATH')
>>>>
>>>> The same code works with input df size of 3TB without any
>>>> modifications.
>>>>
>>>> Any suggestions?
>>>>
>>>> --
>>>> Regards,
>>>>
>>>> Rishi Shah
>>>>
>>>
>>
>> --
>> Regards,
>>
>> Rishi Shah
>>
>

-- 
Regards,

Rishi Shah


Re: [pyspark 2.4] broadcasting DataFrame throws error

2020-09-18 Thread Rishi Shah
Thanks Amit. I have tried increasing driver memory , also tried increasing
max result size returned to the driver. Nothing works, I believe spark is
not able to determine the fact that the result to be broadcasted is small
enough because input data is huge? When I tried this in 2 stages, write out
the grouped data and use that to join using broadcast, spark has no issues
broadcasting this.

When I was checking Spark 3 documentation, it seems like this issue may
have been addressed in Spark 3 but not in earlier version?

On Thu, Sep 17, 2020 at 11:35 PM Amit Joshi 
wrote:

> Hi,
>
> I think problem lies with driver memory. Broadcast in spark work by
> collecting all the data to driver and then driver broadcasting to all the
> executors. Different strategy could be employed for trasfer like bit
> torrent though.
>
> Please try increasing the driver memory. See if it works.
>
> Regards,
> Amit
>
>
> On Thursday, September 17, 2020, Rishi Shah 
> wrote:
>
>> Hello All,
>>
>> Hope this email finds you well. I have a dataframe of size 8TB (parquet
>> snappy compressed), however I group it by a column and get a much smaller
>> aggregated dataframe of size 700 rows (just two columns, key and count).
>> When I use it like below to broadcast this aggregated result, it throws
>> dataframe can not be broadcasted error.
>>
>> df_agg = df.groupBy('column1').count().cache()
>> # df_agg.count()
>> df_join = df.join(broadcast(df_agg), 'column1', 'left_outer')
>> df_join.write.parquet('PATH')
>>
>> The same code works with input df size of 3TB without any modifications.
>>
>> Any suggestions?
>>
>> --
>> Regards,
>>
>> Rishi Shah
>>
>

-- 
Regards,

Rishi Shah


[pyspark 2.4] broadcasting DataFrame throws error

2020-09-16 Thread Rishi Shah
Hello All,

Hope this email finds you well. I have a dataframe of size 8TB (parquet
snappy compressed), however I group it by a column and get a much smaller
aggregated dataframe of size 700 rows (just two columns, key and count).
When I use it like below to broadcast this aggregated result, it throws
dataframe can not be broadcasted error.

df_agg = df.groupBy('column1').count().cache()
# df_agg.count()
df_join = df.join(broadcast(df_agg), 'column1', 'left_outer')
df_join.write.parquet('PATH')

The same code works with input df size of 3TB without any modifications.

Any suggestions?

-- 
Regards,

Rishi Shah


Re: [pyspark 2.3+] read/write huge data with smaller block size (128MB per block)

2020-06-19 Thread Rishi Shah
Thanks Sean! To combat the skew I do have another column I partitionby and
that has worked well (like below). However in the image I attached in my
original email - it looks like 2 tasks processed nothing, may I
reading SPARKUI task table right? All 4 dates have date - 2 dates have
~200MB & other 2 have ~800MB... This was just a test run to check the
behavior. Shouldn't I see all 4 tasks with some output rows?

df.repartition('file_date',
'part_col').write.partitionBy('file_date').parquet(PATH)


On Fri, Jun 19, 2020 at 9:38 AM Sean Owen  wrote:

> Yes you'll generally get 1 partition per block, and 1 task per partition.
> The amount of RAM isn't directly relevant; it's not loaded into memory.
> But you may nevertheless get some improvement with larger partitions /
> tasks, though typically only if your tasks are very small and very fast
> right now (completing in a few seconds)
> You can use minSplitSize to encourage some RDD APIs to choose larger
> partitions, but not in the DF API.
> Instead you can try coalescing to a smaller number of partitions, without
> a shuffle (the shuffle will probably negate any benefit)
>
> However what I see here is different still -- you have serious data skew
> because you partitioned by date, and I suppose some dates have lots of
> data, some have almost none.
>
>
> On Fri, Jun 19, 2020 at 12:17 AM Rishi Shah 
> wrote:
>
>> Hi All,
>>
>> I have about 10TB of parquet data on S3, where data files have 128MB
>> sized blocks. Spark would by default pick up one block per task, even
>> though every task within executor has atleast 1.8GB memory. Isn't that
>> wasteful? Is there any way to speed up this processing? Is there a way to
>> force tasks to pick up more files which sum up to a certain block size? or
>> Spark would always entertain block per task? Basically is there an override
>> to make sure spark tasks reads larger block(s)?
>>
>> Also as seen in the image here - while writing 4 files (partitionby
>> file_date), one file per partition.. Somehow 4 threads are active but two
>> threads seem to be doing nothing. and other 2 threads have taken over the
>> writing for all 4 files. Shouldn't all 4 tasks pick up one task each?
>>
>> For this example, assume df has 4 file_dates worth data.
>>
>> df.repartition('file_date').write.partitionBy('file_date').parquet(PATH)
>>
>> Screen Shot 2020-06-18 at 2.01.53 PM.png (126K)
>> <https://mail.google.com/mail/u/1?ui=2=72f679d936=0.1=msg-a:r4449189998704909724=att=safe=f_kblr6muu0>
>>
>> Any suggestions/feedback helps, appreciate it!
>> --
>> Regards,
>>
>> Rishi Shah
>>
>> -
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>

-- 
Regards,

Rishi Shah


[pyspark 2.3+] Add scala library to pyspark app and use to derive columns

2020-06-06 Thread Rishi Shah
Hi All,

I have a use case where I need to utilize java/scala for regex mapping (as
lookbehinds are not well supported with python).. However our entire code
is python based so was wondering if there's a suggested way of creating a
scala/java lib and use that within pyspark..

I came across this,
https://diogoalexandrefranco.github.io/scala-code-in-pyspark/ - will try it
out but my colleague ran into some issues with serialization before while
trying to use java lib with pyspark.

Typical use case is to use library functions to derive columns.

Any input helps, appreciate it!

-- 
Regards,

Rishi Shah


Re: [PySpark] Tagging descriptions

2020-06-04 Thread Rishi Shah
Thanks everyone. While working on Tagging I stumbled upon another setback..
There are about 5000 regex I am dealing with, out of with couple of
hundreds have variable length lookbehind (originally these worked in a
JVM). In order to use this with Python/Pyspark udf - we need to either
modify these regex rules so that it can work with Python or move this to
scala/java based implementation..

Does anyone have any experience with variable length lookbehind
(quantifiers/alternations of variable length) in python/pyspark? Any
suggestions are much appreciated!

Thanks,
-Rishi

On Thu, May 14, 2020 at 2:57 PM Netanel Malka  wrote:

> For elasticsearch you can use the elastic official connector.
> https://www.elastic.co/what-is/elasticsearch-hadoop
>
> Elastic spark connector docs:
> https://www.elastic.co/guide/en/elasticsearch/hadoop/current/spark.html
>
>
>
> On Thu, May 14, 2020, 21:14 Amol Umbarkar  wrote:
>
>> Check out sparkNLP for tokenization. I am not sure about solar or elastic
>> search though
>>
>> On Thu, May 14, 2020 at 9:02 PM Rishi Shah 
>> wrote:
>>
>>> This is great, thanks you Zhang & Amol !!
>>>
>>> Yes we can have multiple tags per row and multiple regex applied to
>>> single row as well. Would you have any example of working with spark &
>>> search engines like Solar, ElasticSearch? Does Spark ML provide
>>> tokenization support as expected (I am yet to try SparkML, still a
>>> beginner)?
>>>
>>> Any other reference material you found useful while working on similar
>>> problem? appreciate all the help!
>>>
>>> Thanks,
>>> -Rishi
>>>
>>>
>>> On Thu, May 14, 2020 at 6:11 AM Amol Umbarkar 
>>> wrote:
>>>
>>>> Rishi,
>>>> Just adding to zhang's questions.
>>>>
>>>> Are you expecting multiple tags per row?
>>>> Do you check multiple regex for a single tag?
>>>>
>>>> Let's say you had only one tag then theoretically you should be do this
>>>> -
>>>>
>>>> 1 Remove stop words or any irrelevant stuff
>>>> 2 split text into equal sized chunk column (eg - if max length is
>>>> 1000chars, split into 20 columns of 50 chars)
>>>> 3 distribute work for each column that would result in binary
>>>> (true/false) for a single tag
>>>> 4 merge the 20 resulting columns
>>>> 5 repeat for other tags or do them in parallel 3 and 4 for them
>>>>
>>>> Note on 3: If you expect single tag per row, then you can repeat 3
>>>> column by column and skip rows that have got tags in prior step.
>>>>
>>>> Secondly, if you expect similarity in text (of some kind) then you
>>>> could jus work on unique text values (might require shuffle, hence
>>>> expensive) and then join the end result back to the original data.  You
>>>> could use hash of some kind to join back. Though I would go for this
>>>> approach only if the chances of similarity in text are very high (it could
>>>> be in your case for being transactional data).
>>>>
>>>> Not the full answer to your question but hope this helps you brainstorm
>>>> more.
>>>>
>>>> Thanks,
>>>> Amol
>>>>
>>>>
>>>>
>>>>
>>>>
>>>> On Wed, May 13, 2020 at 10:17 AM Rishi Shah 
>>>> wrote:
>>>>
>>>>> Thanks ZHANG! Please find details below:
>>>>>
>>>>> # of rows: ~25B, row size would be somewhere around ~3-5MB (it's a
>>>>> parquet formatted data so, need to worry about only the columns to be
>>>>> tagged)
>>>>>
>>>>> avg length of the text to be parsed : ~300
>>>>>
>>>>> Unfortunately don't have sample data or regex which I can share
>>>>> freely. However about data being parsed - assume these are purchases made
>>>>> online and we are trying to parse the transaction details. Like purchases
>>>>> made on amazon can be tagged to amazon as well as other vendors etc.
>>>>>
>>>>> Appreciate your response!
>>>>>
>>>>>
>>>>>
>>>>> On Tue, May 12, 2020 at 6:23 AM ZHANG Wei  wrote:
>>>>>
>>>>>> May I get some requirement details?
>>>>>>
>>>>>> Such as:
>>>>>> 1. The row count and one row data size
>>>>>> 2. The avg length of text to be parsed by RegEx
>>>>>> 3. The sample format of text to be parsed
>>>>>> 4. The sample of current RegEx
>>>>>>
>>>>>> --
>>>>>> Cheers,
>>>>>> -z
>>>>>>
>>>>>> On Mon, 11 May 2020 18:40:49 -0400
>>>>>> Rishi Shah  wrote:
>>>>>>
>>>>>> > Hi All,
>>>>>> >
>>>>>> > I have a tagging problem at hand where we currently use regular
>>>>>> expressions
>>>>>> > to tag records. Is there a recommended way to distribute & tag?
>>>>>> Data is
>>>>>> > about 10TB large.
>>>>>> >
>>>>>> > --
>>>>>> > Regards,
>>>>>> >
>>>>>> > Rishi Shah
>>>>>>
>>>>>
>>>>>
>>>>> --
>>>>> Regards,
>>>>>
>>>>> Rishi Shah
>>>>>
>>>>
>>>
>>> --
>>> Regards,
>>>
>>> Rishi Shah
>>>
>>

-- 
Regards,

Rishi Shah


Re: [PySpark 2.3+] Reading parquet entire path vs a set of file paths

2020-06-03 Thread Rishi Shah
Hi All,

Just following up on below to see if anyone has any suggestions. Appreciate
your help in advance.

Thanks,
Rishi

On Mon, Jun 1, 2020 at 9:33 AM Rishi Shah  wrote:

> Hi All,
>
> I use the following to read a set of parquet file paths when files are
> scattered across many many partitions.
>
> paths = ['p1', 'p2', ... 'p1']
> df = spark.read.parquet(*paths)
>
> Above method feels like is sequentially reading those files & not really
> parallelizing the read operation, is that correct?
>
> If I put all these files in a single path and read like below - works
> faster:
>
> path = 'consolidated_path'
> df = spark.read.parquet(path)
>
> Is my observation correct? If so, is there a way to optimize reads from
> multiple/specific paths ?
>
> --
> Regards,
>
> Rishi Shah
>


-- 
Regards,

Rishi Shah


[PySpark 2.3+] Reading parquet entire path vs a set of file paths

2020-06-01 Thread Rishi Shah
Hi All,

I use the following to read a set of parquet file paths when files are
scattered across many many partitions.

paths = ['p1', 'p2', ... 'p1']
df = spark.read.parquet(*paths)

Above method feels like is sequentially reading those files & not really
parallelizing the read operation, is that correct?

If I put all these files in a single path and read like below - works
faster:

path = 'consolidated_path'
df = spark.read.parquet(path)

Is my observation correct? If so, is there a way to optimize reads from
multiple/specific paths ?

-- 
Regards,

Rishi Shah


[pyspark 2.3+] Dedupe records

2020-05-29 Thread Rishi Shah
Hi All,

I have around 100B records where I get new , update & delete records.
Update/delete records are not that frequent. I would like to get some
advice on below:

1) should I use rdd + reducibly or DataFrame window operation for data of
this size? Which one would outperform the other? Which is more reliable and
low maintenance?
2) Also how would you suggest we do incremental deduplication? Currently we
do full processing once a week and no dedupe during week days to avoid
heavy processing. However I would like to explore incremental dedupe option
and weight pros/cons.

Any input is highly appreciated!

-- 
Regards,

Rishi Shah


Re: [PySpark] Tagging descriptions

2020-05-14 Thread Rishi Shah
This is great, thanks you Zhang & Amol !!

Yes we can have multiple tags per row and multiple regex applied to single
row as well. Would you have any example of working with spark & search
engines like Solar, ElasticSearch? Does Spark ML provide tokenization
support as expected (I am yet to try SparkML, still a beginner)?

Any other reference material you found useful while working on similar
problem? appreciate all the help!

Thanks,
-Rishi


On Thu, May 14, 2020 at 6:11 AM Amol Umbarkar 
wrote:

> Rishi,
> Just adding to zhang's questions.
>
> Are you expecting multiple tags per row?
> Do you check multiple regex for a single tag?
>
> Let's say you had only one tag then theoretically you should be do this -
>
> 1 Remove stop words or any irrelevant stuff
> 2 split text into equal sized chunk column (eg - if max length is
> 1000chars, split into 20 columns of 50 chars)
> 3 distribute work for each column that would result in binary (true/false)
> for a single tag
> 4 merge the 20 resulting columns
> 5 repeat for other tags or do them in parallel 3 and 4 for them
>
> Note on 3: If you expect single tag per row, then you can repeat 3 column
> by column and skip rows that have got tags in prior step.
>
> Secondly, if you expect similarity in text (of some kind) then you could
> jus work on unique text values (might require shuffle, hence expensive) and
> then join the end result back to the original data.  You could use hash of
> some kind to join back. Though I would go for this approach only if the
> chances of similarity in text are very high (it could be in your case for
> being transactional data).
>
> Not the full answer to your question but hope this helps you brainstorm
> more.
>
> Thanks,
> Amol
>
>
>
>
>
> On Wed, May 13, 2020 at 10:17 AM Rishi Shah 
> wrote:
>
>> Thanks ZHANG! Please find details below:
>>
>> # of rows: ~25B, row size would be somewhere around ~3-5MB (it's a
>> parquet formatted data so, need to worry about only the columns to be
>> tagged)
>>
>> avg length of the text to be parsed : ~300
>>
>> Unfortunately don't have sample data or regex which I can share freely.
>> However about data being parsed - assume these are purchases made online
>> and we are trying to parse the transaction details. Like purchases made on
>> amazon can be tagged to amazon as well as other vendors etc.
>>
>> Appreciate your response!
>>
>>
>>
>> On Tue, May 12, 2020 at 6:23 AM ZHANG Wei  wrote:
>>
>>> May I get some requirement details?
>>>
>>> Such as:
>>> 1. The row count and one row data size
>>> 2. The avg length of text to be parsed by RegEx
>>> 3. The sample format of text to be parsed
>>> 4. The sample of current RegEx
>>>
>>> --
>>> Cheers,
>>> -z
>>>
>>> On Mon, 11 May 2020 18:40:49 -0400
>>> Rishi Shah  wrote:
>>>
>>> > Hi All,
>>> >
>>> > I have a tagging problem at hand where we currently use regular
>>> expressions
>>> > to tag records. Is there a recommended way to distribute & tag? Data is
>>> > about 10TB large.
>>> >
>>> > --
>>> > Regards,
>>> >
>>> > Rishi Shah
>>>
>>
>>
>> --
>> Regards,
>>
>> Rishi Shah
>>
>

-- 
Regards,

Rishi Shah


Re: [PySpark] Tagging descriptions

2020-05-12 Thread Rishi Shah
Thanks ZHANG! Please find details below:

# of rows: ~25B, row size would be somewhere around ~3-5MB (it's a parquet
formatted data so, need to worry about only the columns to be tagged)

avg length of the text to be parsed : ~300

Unfortunately don't have sample data or regex which I can share freely.
However about data being parsed - assume these are purchases made online
and we are trying to parse the transaction details. Like purchases made on
amazon can be tagged to amazon as well as other vendors etc.

Appreciate your response!



On Tue, May 12, 2020 at 6:23 AM ZHANG Wei  wrote:

> May I get some requirement details?
>
> Such as:
> 1. The row count and one row data size
> 2. The avg length of text to be parsed by RegEx
> 3. The sample format of text to be parsed
> 4. The sample of current RegEx
>
> --
> Cheers,
> -z
>
> On Mon, 11 May 2020 18:40:49 -0400
> Rishi Shah  wrote:
>
> > Hi All,
> >
> > I have a tagging problem at hand where we currently use regular
> expressions
> > to tag records. Is there a recommended way to distribute & tag? Data is
> > about 10TB large.
> >
> > --
> > Regards,
> >
> > Rishi Shah
>


-- 
Regards,

Rishi Shah


[PySpark] Tagging descriptions

2020-05-11 Thread Rishi Shah
Hi All,

I have a tagging problem at hand where we currently use regular expressions
to tag records. Is there a recommended way to distribute & tag? Data is
about 10TB large.

-- 
Regards,

Rishi Shah


Re: [spark streaming] checkpoint location feature for batch processing

2020-05-01 Thread Rishi Shah
Thanks Burak! Appreciate it. This makes sense.

How do you suggest we make sure resulting data doesn't produce tiny files?
If we are not on databricks yet and can not leverage delta lake features?
Also checkpointing feature, do you have active blog/article I can take
a look at to try out an example?

On Fri, May 1, 2020 at 7:22 PM Burak Yavuz  wrote:

> Hi Rishi,
>
> That is exactly why Trigger.Once was created for Structured Streaming. The
> way we look at streaming is that it doesn't have to be always real time, or
> 24-7 always on. We see streaming as a workflow that you have to repeat
> indefinitely. See this blog post for more details!
>
> https://databricks.com/blog/2017/05/22/running-streaming-jobs-day-10x-cost-savings.html
>
> Best,
> Burak
>
> On Fri, May 1, 2020 at 2:55 PM Rishi Shah 
> wrote:
>
>> Hi All,
>>
>> I recently started playing with spark streaming, and checkpoint location
>> feature looks very promising. I wonder if anyone has an opinion about using
>> spark streaming with checkpoint location option as a slow batch processing
>> solution. What would be the pros and cons of utilizing streaming with
>> checkpoint location feature to achieve fault tolerance in batch processing
>> application?
>>
>> --
>> Regards,
>>
>> Rishi Shah
>>
>

-- 
Regards,

Rishi Shah


[spark streaming] checkpoint location feature for batch processing

2020-05-01 Thread Rishi Shah
Hi All,

I recently started playing with spark streaming, and checkpoint location
feature looks very promising. I wonder if anyone has an opinion about using
spark streaming with checkpoint location option as a slow batch processing
solution. What would be the pros and cons of utilizing streaming with
checkpoint location feature to achieve fault tolerance in batch processing
application?

-- 
Regards,

Rishi Shah


Re: [pyspark 2.4+] BucketBy SortBy doesn't retain sort order

2020-03-03 Thread Rishi Shah
Hi All,

Just checking in to see if anyone has any advice on this.

Thanks,
Rishi

On Mon, Mar 2, 2020 at 9:21 PM Rishi Shah  wrote:

> Hi All,
>
> I have 2 large tables (~1TB), I used the following to save both the
> tables. Then when I try to join both tables with join_column, it still does
> shuffle & sort before the join. Could someone please help?
>
> df.repartition(2000).write.bucketBy(1,
> join_column).sortBy(join_column).saveAsTable(tablename)
>
> --
> Regards,
>
> Rishi Shah
>


-- 
Regards,

Rishi Shah


[pyspark 2.4+] BucketBy SortBy doesn't retain sort order

2020-03-02 Thread Rishi Shah
Hi All,

I have 2 large tables (~1TB), I used the following to save both the tables.
Then when I try to join both tables with join_column, it still does shuffle
& sort before the join. Could someone please help?

df.repartition(2000).write.bucketBy(1,
join_column).sortBy(join_column).saveAsTable(tablename)

-- 
Regards,

Rishi Shah


Re: High level explanation of dropDuplicates

2020-01-11 Thread Rishi Shah
Thanks everyone for your contribution on this topic, I wanted to check-in
to see if anyone has discovered a different or have an opinion on better
approach to deduplicating data using pyspark. Would really appreciate any
further insight on this.

Thanks,
-Rishi

On Wed, Jun 12, 2019 at 4:21 PM Yeikel  wrote:

> Nicholas , thank you for your explanation.
>
> I am also interested in the example that Rishi is asking for.  I am sure
> mapPartitions may work , but as Vladimir suggests it may not be the best
> option in terms of performance.
>
> @Vladimir Prus , are you aware of any example about writing a  "custom
> physical exec operator"?
>
> If anyone needs a further explanation for the follow up  question Rishi
> posted , please see the example below :
>
>
> import org.apache.spark.sql.types._
> import org.apache.spark.sql.Row
>
>
> val someData = Seq(
>   Row(1, 10),
>   Row(1, 20),
>   Row(1, 11)
> )
>
> val schema = List(
>   StructField("id", IntegerType, true),
>   StructField("score", IntegerType, true)
> )
>
> val df = spark.createDataFrame(
>   spark.sparkContext.parallelize(someData),
>   StructType(schema)
> )
>
> // Goal : Drop duplicates using the "id" as the primary key and keep the
> highest "score".
>
> df.sort($"score".desc).dropDuplicates("id").show
>
> == Physical Plan ==
> *(2) HashAggregate(keys=[id#191], functions=[first(score#192, false)])
> +- Exchange hashpartitioning(id#191, 200)
>+- *(1) HashAggregate(keys=[id#191], functions=[partial_first(score#192,
> false)])
>   +- *(1) Sort [score#192 DESC NULLS LAST], true, 0
>  +- Exchange rangepartitioning(score#192 DESC NULLS LAST, 200)
> +- Scan ExistingRDD[id#191,score#192]
>
> This seems to work , but I don't know what are the implications if we use
> this approach with a bigger dataset or what are the alternatives. From the
> explain output I can see the two Exchanges , so it may not be the best
> approach?
>
>
>
>
>
>
>
> --
> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>

-- 
Regards,

Rishi Shah


Re: [pyspark2.4+] A lot of tasks failed, but job eventually completes

2020-01-06 Thread Rishi Shah
Thank you Hemant and Enrico. Much appreciated.

your input really got me closer to the issue, I realized every task didn't
get enough memory and hence tasks with large partitions kept failing. I
increased executor memory and at the same time increased number of
partitions as well. This made the job succeed with flying colors. Really
appreciate the help here.

I do have one more question, when do you recommend using RDDs over data
frames? Because at time using windows may get a bit complicated but there's
always some or the other way to use windows on data frames. I always get
confused as to when to fall back on RDD approach? Any use case in your
experience warrant for RDD use, for better performance?

Thanks,
Rishi

On Mon, Jan 6, 2020 at 4:18 AM Enrico Minack  wrote:

> Note that repartitioning helps to increase the number of partitions (and
> hence to reduce the size of partitions and required executor memory), but
> subsequent transformations like join will repartition data again with the
> configured number of partitions (spark.sql.shuffle.partitions), virtually
> undoing the repartitioning, e.g.:
>
> data// may have any number of partitions
>   .repartition(1000)// has 1000 partitions
>   .join(table)  // has spark.sql.shuffle.partitions partitions
>
> If you use RDDs, you need to configure spark.default.parallelism rather
> than spark.sql.shuffle.partitions.
>
> Given you have 700GB of data, the default of 200 partitions mean that each
> partition is 3,5 GB (equivalent of input data) in size. Since increasing
> executor memory is limited by the available memory, executor memory does
> not scale for big data. Increasing the number of partitions is the natural
> way of scaling in Spark land.
>
> Having hundreds of tasks that fail is an indication that you do not suffer
> from skewed data but from large partitions. Skewed data usually has a few
> tasks that keep failing.
>
> It is easy to check for skewed data in the Spark UI. Open a stage that has
> failing tasks and look at the Summary Metrics, e.g.:
> If the Max number of Shuffle Read Size is way higher than the 75th
> percentile, than this indicates a poor distribution of the data (or more
> precise the partitioning key) of this stage.
>
> You can also sort the tasks by the "Shuffle Read Size / Records" column
> and see if numbers are evenly distributed (ideally).
>
> I hope this helped.
>
> Enrico
>
>
>
> Am 06.01.20 um 06:27 schrieb hemant singh:
>
> You can try repartitioning the data, if it’s a skewed data then you may
> need to salt the keys for better partitioning.
> Are you using a coalesce or any other fn which brings the data to lesser
> nodes. Window function also incurs shuffling that could be an issue.
>
> On Mon, 6 Jan 2020 at 9:49 AM, Rishi Shah 
> wrote:
>
>> Thanks Hemant, underlying data volume increased from 550GB to 690GB and
>> now the same job doesn't succeed. I tried incrementing executor memory to
>> 20G as well, still fails. I am running this in Databricks and start cluster
>> with 20G assigned to spark.executor.memory property.
>>
>> Also some more information on the job, I have about 4 window functions on
>> this dataset before it gets written out.
>>
>> Any other ideas?
>>
>> Thanks,
>> -Shraddha
>>
>> On Sun, Jan 5, 2020 at 11:06 PM hemant singh 
>> wrote:
>>
>>> You can try increasing the executor memory, generally this error comes
>>> when there is not enough memory in individual executors.
>>> Job is getting completed may be because when tasks are re-scheduled it
>>> would be going through.
>>>
>>> Thanks.
>>>
>>> On Mon, 6 Jan 2020 at 5:47 AM, Rishi Shah 
>>> wrote:
>>>
>>>> Hello All,
>>>>
>>>> One of my jobs, keep getting into this situation where 100s of tasks
>>>> keep failing with below error but job eventually completes.
>>>>
>>>> org.apache.spark.memory.SparkOutOfMemoryError: Unable to acquire 16384
>>>> bytes of memory
>>>>
>>>> Could someone advice?
>>>>
>>>> --
>>>> Regards,
>>>>
>>>> Rishi Shah
>>>>
>>>
>>
>> --
>> Regards,
>>
>> Rishi Shah
>>
>
>

-- 
Regards,

Rishi Shah


Re: [pyspark2.4+] A lot of tasks failed, but job eventually completes

2020-01-05 Thread Rishi Shah
Thanks Hemant, underlying data volume increased from 550GB to 690GB and now
the same job doesn't succeed. I tried incrementing executor memory to 20G
as well, still fails. I am running this in Databricks and start cluster
with 20G assigned to spark.executor.memory property.

Also some more information on the job, I have about 4 window functions on
this dataset before it gets written out.

Any other ideas?

Thanks,
-Shraddha

On Sun, Jan 5, 2020 at 11:06 PM hemant singh  wrote:

> You can try increasing the executor memory, generally this error comes
> when there is not enough memory in individual executors.
> Job is getting completed may be because when tasks are re-scheduled it
> would be going through.
>
> Thanks.
>
> On Mon, 6 Jan 2020 at 5:47 AM, Rishi Shah 
> wrote:
>
>> Hello All,
>>
>> One of my jobs, keep getting into this situation where 100s of tasks keep
>> failing with below error but job eventually completes.
>>
>> org.apache.spark.memory.SparkOutOfMemoryError: Unable to acquire 16384
>> bytes of memory
>>
>> Could someone advice?
>>
>> --
>> Regards,
>>
>> Rishi Shah
>>
>

-- 
Regards,

Rishi Shah


[pyspark2.4+] A lot of tasks failed, but job eventually completes

2020-01-05 Thread Rishi Shah
Hello All,

One of my jobs, keep getting into this situation where 100s of tasks keep
failing with below error but job eventually completes.

org.apache.spark.memory.SparkOutOfMemoryError: Unable to acquire 16384
bytes of memory

Could someone advice?

-- 
Regards,

Rishi Shah


Re: [Pyspark 2.3+] Timeseries with Spark

2019-12-29 Thread Rishi Shah
Hi All,

Checking in to see if anyone had input around time series libraries using
Spark. I in interested in financial forecasting model & regression mainly
at this  point. Input is a bunch of pricing data points.

I have read a lot of spark-timeseries and flint libraries but I am not sure
of the best way/use cases to use these libraries for or if there's any
other preferred way of tackling time series problems at scale.

Thanks,
-Shraddha

On Sun, Jun 16, 2019 at 9:17 AM Rishi Shah  wrote:

> Thanks Jorn. I am interested in timeseries forecasting for now but in
> general I was unable to find a good way to work with different time series
> methods using spark..
>
> On Fri, Jun 14, 2019 at 1:55 AM Jörn Franke  wrote:
>
>> Time series can mean a lot of different things and algorithms. Can you
>> describe more what you mean by time series use case, ie what is the input,
>> what do you like to do with the input and what is the output?
>>
>> > Am 14.06.2019 um 06:01 schrieb Rishi Shah :
>> >
>> > Hi All,
>> >
>> > I have a time series use case which I would like to implement in
>> Spark... What would be the best way to do so? Any built in libraries?
>> >
>> > --
>> > Regards,
>> >
>> > Rishi Shah
>>
>
>
> --
> Regards,
>
> Rishi Shah
>


-- 
Regards,

Rishi Shah


[pyspark 2.3+] broadcast timeout

2019-12-09 Thread Rishi Shah
Hi All,

All of a sudden recently we discovered that all of our auto broadcasts have
been timing out, this started happening in our static cloudera cluster as
well as databricks. Data has not changed much. Has anyone seen anything
like this before? Any suggestions other than increasing the timeout period
or shutting off broadcast completely by setting the auto broadcast property
to -1?

-- 
Regards,

Rishi Shah


[pyspark 2.4.0] write with overwrite mode fails

2019-12-07 Thread Rishi Shah
Hi All,

df = spark.read.csv(PATH)
spark.conf.set('spark.sql.sources.partitionOverwriteMode', 'dynamic')
df.repartition(col1,
col2).write.mode('overwrite').partitionBy('col1').parquet(OUT_PATH)

works fine and overwrites the partitioned directory as expected.

However this doesn't overwrite when previous run was abruptly interrupted
and the partitioned directory only has _started flag file & no _SUCCESS or
_committed. In this case, second run doesn't overwrite, causing partition
to have duplicated files. Could someone please help?

-- 
Regards,

Rishi Shah


[pyspark 2.4] maxrecordsperfile option

2019-11-23 Thread Rishi Shah
Hi All,

Version 2.2 introduced maxrecordsperfile option while writing data, could
someone help understand the performance impact of using maxrecordsperfile
(single pass at writing data with this option) vs repartitioning (2 stage
process where we write down data and then consolidate later)?

-- 
Regards,

Rishi Shah


Re: [pyspark 2.3.0] Task was denied committing errors

2019-11-10 Thread Rishi Shah
Hi Team,

I could really use your insight here, any help is appreciated!

Thanks,
Rishi


On Wed, Nov 6, 2019 at 8:27 PM Rishi Shah  wrote:

> Any suggestions?
>
> On Wed, Nov 6, 2019 at 7:30 AM Rishi Shah 
> wrote:
>
>> Hi All,
>>
>> I have two relatively big tables and join on them keeps throwing
>> TaskCommitErrors, eventually job succeeds but I was wondering what these
>> errors are and if there's any solution?
>>
>> --
>> Regards,
>>
>> Rishi Shah
>>
>
>
> --
> Regards,
>
> Rishi Shah
>


-- 
Regards,

Rishi Shah


Re: [pyspark 2.3.0] Task was denied committing errors

2019-11-06 Thread Rishi Shah
Any suggestions?

On Wed, Nov 6, 2019 at 7:30 AM Rishi Shah  wrote:

> Hi All,
>
> I have two relatively big tables and join on them keeps throwing
> TaskCommitErrors, eventually job succeeds but I was wondering what these
> errors are and if there's any solution?
>
> --
> Regards,
>
> Rishi Shah
>


-- 
Regards,

Rishi Shah


[pyspark 2.3.0] Task was denied committing errors

2019-11-06 Thread Rishi Shah
Hi All,

I have two relatively big tables and join on them keeps throwing
TaskCommitErrors, eventually job succeeds but I was wondering what these
errors are and if there's any solution?

-- 
Regards,

Rishi Shah


Re: [pyspark 2.4.3] nested windows function performance

2019-10-21 Thread Rishi Shah
Hi All,

Any suggestions?

Thanks,
-Rishi

On Sun, Oct 20, 2019 at 12:56 AM Rishi Shah 
wrote:

> Hi All,
>
> I have a use case where I need to perform nested windowing functions on a
> data frame to get final set of columns. Example:
>
> w1 = Window.partitionBy('col1')
> df = df.withColumn('sum1', F.sum('val'))
>
> w2 = Window.partitionBy('col1', 'col2')
> df = df.withColumn('sum2', F.sum('val'))
>
> w3 = Window.partitionBy('col1', 'col2', 'col3')
> df = df.withColumn('sum3', F.sum('val'))
>
> These 3 partitions are not huge at all, however the data size is 2T
> parquet snappy compressed. This throws a lot of outofmemory errors.
>
> I would like to get some advice around whether nested window functions is
> a good idea in pyspark? I wanted to avoid using multiple filter + joins to
> get to the final state, as join can create crazy shuffle.
>
> Any suggestions would be appreciated!
>
> --
> Regards,
>
> Rishi Shah
>


-- 
Regards,

Rishi Shah


[pyspark 2.4.3] nested windows function performance

2019-10-19 Thread Rishi Shah
Hi All,

I have a use case where I need to perform nested windowing functions on a
data frame to get final set of columns. Example:

w1 = Window.partitionBy('col1')
df = df.withColumn('sum1', F.sum('val'))

w2 = Window.partitionBy('col1', 'col2')
df = df.withColumn('sum2', F.sum('val'))

w3 = Window.partitionBy('col1', 'col2', 'col3')
df = df.withColumn('sum3', F.sum('val'))

These 3 partitions are not huge at all, however the data size is 2T parquet
snappy compressed. This throws a lot of outofmemory errors.

I would like to get some advice around whether nested window functions is a
good idea in pyspark? I wanted to avoid using multiple filter + joins to
get to the final state, as join can create crazy shuffle.

Any suggestions would be appreciated!

-- 
Regards,

Rishi Shah


[pyspark 2.4.3] small input csv ~3.4GB gets 40K tasks created

2019-08-29 Thread Rishi Shah
Hi All,

I am scratching my head against this weird behavior, where df (read from
.csv) of size ~3.4GB gets cross joined with itself and creates 50K tasks!
How to correlate input size with number of tasks in this case?

-- 
Regards,

Rishi Shah


[python 2.4.3] correlation matrix

2019-08-28 Thread Rishi Shah
Hi All,

What is the best way to calculate correlation matrix?

-- 
Regards,

Rishi Shah


Re: [Pyspark 2.4] not able to partition the data frame by dates

2019-07-31 Thread Rishi Shah
Thanks for your prompt reply Gourav. I am using Spark 2.4.0 (cloudera
distribution). The job consistently threw this error, so I narrowed down
the dataset by adding a date filter (date rang: 2018-01-01 to 2018-06-30)..
However it's still throwing the same error!

*command*: spark2-submit --master yarn --deploy-mode client
--executor-memory 15G --executor-cores 5 samplerestage.py
cluster: 4 nodes, 32 cores each 256GB RAM

This is the only job running, with 20 executors...

I would really like to know the best practice around creating partitioned
table using pays-ark - every time I need to partition huge dataset, I run
into such issues. Appreciate your help!


On Wed, Jul 31, 2019 at 10:58 PM Gourav Sengupta 
wrote:

> Hi Rishi,
>
> there is no version as 2.4 :), can you please specify the exact SPARK
> version you are using? How are you starting the SPARK session? And what is
> the environment?
>
> I know this issue occurs intermittently over large writes in S3 and has to
> do with S3 eventual consistency issues. Just restarting the job sometimes
> helps.
>
>
> Regards,
> Gourav Sengupta
>
> On Thu, Aug 1, 2019 at 3:55 AM Rishi Shah 
> wrote:
>
>> Hi All,
>>
>> I have a dataframe of size 2.7T (parquet) which I need to partition by
>> date, however below spark program doesn't help - keeps failing due to *file
>> already exists exception..*
>>
>> df = spark.read.parquet(INPUT_PATH)
>>
>> df.repartition('date_field').write.partitionBy('date_field').mode('overwrite').parquet(PATH)
>>
>> I did notice that couple of tasks failed and probably that's why it tried
>> spinning up new ones which write to the same .staging directory?
>>
>> --
>> Regards,
>>
>> Rishi Shah
>>
>

-- 
Regards,

Rishi Shah


[Pyspark 2.4] not able to partition the data frame by dates

2019-07-31 Thread Rishi Shah
Hi All,

I have a dataframe of size 2.7T (parquet) which I need to partition by
date, however below spark program doesn't help - keeps failing due to *file
already exists exception..*

df = spark.read.parquet(INPUT_PATH)
df.repartition('date_field').write.partitionBy('date_field').mode('overwrite').parquet(PATH)

I did notice that couple of tasks failed and probably that's why it tried
spinning up new ones which write to the same .staging directory?

-- 
Regards,

Rishi Shah


[Pyspark 2.4] Large number of row groups in parquet files created using spark

2019-07-24 Thread Rishi Shah
Hi All,

I have the following code which produces 1 600MB parquet file as expected,
however within this parquet file there are 42 row groups! I would expect it
to crate max 6 row groups, could someone please shed some light on this? Is
there any config setting which I can enable while submitting application
using spark-submit?

df = spark.read.parquet(INPUT_PATH)
df.coalesce(1).write.parquet(OUT_PATH)

I did try --conf spark.parquet.block.size & spark.dfs.blocksize, but that
makes no difference.

-- 
Regards,

Rishi Shah


[pyspark 2.4.0] write with partitionBy fails due to file already exits

2019-07-01 Thread Rishi Shah
Hi All,

I have a simple partition write like below:

df = spark.read.parquet('read-location')
df.write.partitionBy('col1').mode('overwrite').parquet('write-location')

this fails after an hr with "file already exists (in .staging directory)"
error. Not sure what am I doing wrong here..

-- 
Regards,

Rishi Shah


Re: [pyspark 2.3+] CountDistinct

2019-06-29 Thread Rishi Shah
Thanks Abdeali! Please find details below:

df.agg(countDistinct(col('col1'))).show() --> 450089
df.agg(countDistinct(col('col1'))).show() --> 450076
df.filter(col('col1').isNull()).count() --> 0
df.filter(col('col1').isNotNull()).count() --> 450063

col1 is a string
Spark version 2.4.0
datasize: ~ 500GB


On Sat, Jun 29, 2019 at 5:33 AM Abdeali Kothari 
wrote:

> How large is the data frame and what data type are you counting distinct
> for?
> I use count distinct quite a bit and haven't noticed any thing peculiar.
>
> Also, which exact version in 2.3.x?
> And, are performing any operations on the DF before the countDistinct?
>
> I recall there was a bug when I did countDistinct(PythonUDF(x)) in the
> same query which was resolved in one of the minor versions in 2.3.x
>
> On Sat, Jun 29, 2019, 10:32 Rishi Shah  wrote:
>
>> Hi All,
>>
>> Just wanted to check in to see if anyone has any insight about this
>> behavior. Any pointers would help.
>>
>> Thanks,
>> Rishi
>>
>> On Fri, Jun 14, 2019 at 7:05 AM Rishi Shah 
>> wrote:
>>
>>> Hi All,
>>>
>>> Recently we noticed that countDistinct on a larger dataframe doesn't
>>> always return the same value. Any idea? If this is the case then what is
>>> the difference between countDistinct & approx_count_distinct?
>>>
>>> --
>>> Regards,
>>>
>>> Rishi Shah
>>>
>>
>>
>> --
>> Regards,
>>
>> Rishi Shah
>>
>

-- 
Regards,

Rishi Shah


Re: [pyspark 2.3+] CountDistinct

2019-06-28 Thread Rishi Shah
Hi All,

Just wanted to check in to see if anyone has any insight about this
behavior. Any pointers would help.

Thanks,
Rishi

On Fri, Jun 14, 2019 at 7:05 AM Rishi Shah  wrote:

> Hi All,
>
> Recently we noticed that countDistinct on a larger dataframe doesn't
> always return the same value. Any idea? If this is the case then what is
> the difference between countDistinct & approx_count_distinct?
>
> --
> Regards,
>
> Rishi Shah
>


-- 
Regards,

Rishi Shah


Re: [Pyspark 2.3+] Timeseries with Spark

2019-06-16 Thread Rishi Shah
Thanks Jorn. I am interested in timeseries forecasting for now but in
general I was unable to find a good way to work with different time series
methods using spark..

On Fri, Jun 14, 2019 at 1:55 AM Jörn Franke  wrote:

> Time series can mean a lot of different things and algorithms. Can you
> describe more what you mean by time series use case, ie what is the input,
> what do you like to do with the input and what is the output?
>
> > Am 14.06.2019 um 06:01 schrieb Rishi Shah :
> >
> > Hi All,
> >
> > I have a time series use case which I would like to implement in
> Spark... What would be the best way to do so? Any built in libraries?
> >
> > --
> > Regards,
> >
> > Rishi Shah
>


-- 
Regards,

Rishi Shah


[pyspark 2.3+] CountDistinct

2019-06-14 Thread Rishi Shah
Hi All,

Recently we noticed that countDistinct on a larger dataframe doesn't always
return the same value. Any idea? If this is the case then what is the
difference between countDistinct & approx_count_distinct?

-- 
Regards,

Rishi Shah


[Pyspark 2.3+] Timeseries with Spark

2019-06-13 Thread Rishi Shah
Hi All,

I have a time series use case which I would like to implement in Spark...
What would be the best way to do so? Any built in libraries?

-- 
Regards,

Rishi Shah


[pyspark 2.3+] count distinct returns different value every time it is run on the same dataset

2019-06-11 Thread Rishi Shah
Hi All,

countDistinct on dataframe returns different results every time it is run,
I expect that when approxCountDistinct is used but even for
countDistinct()? Is there a way to get accurate count using pyspark
(deterministic result)?

-- 
Regards,

Rishi Shah


Re: [Pyspark 2.4] Best way to define activity within different time window

2019-06-10 Thread Rishi Shah
Thank you both for your input!

To calculate moving average of active users, could you comment on whether
to go for RDD based implementation or dataframe? If dataframe, will window
function work here?

In general, how would spark behave when working with dataframe with date,
week, month, quarter, year columns and groupie against each one one by one?



On Sun, Jun 9, 2019 at 1:17 PM Jörn Franke  wrote:

> Depending on what accuracy is needed, hyperloglogs can be an interesting
> alternative
> https://en.m.wikipedia.org/wiki/HyperLogLog
>
> Am 09.06.2019 um 15:59 schrieb big data :
>
> From m opinion, Bitmap is the best solution for active users calculation.
> Other solution almost bases on count(distinct) calculation process, which
> is more slower.
>
> If you 've implemented Bitmap solution including how to build Bitmap, how
> to load Bitmap, then Bitmap is the best choice.
> 在 2019/6/5 下午6:49, Rishi Shah 写道:
>
> Hi All,
>
> Is there a best practice around calculating daily, weekly, monthly,
> quarterly, yearly active users?
>
> One approach is to create a window of daily bitmap and aggregate it based
> on period later. However I was wondering if anyone has a better approach to
> tackling this problem..
>
> --
> Regards,
>
> Rishi Shah
>
>

-- 
Regards,

Rishi Shah


Re: High level explanation of dropDuplicates

2019-06-09 Thread Rishi Shah
Hi All,

Just wanted to check back regarding best way to perform deduplication. Is
using drop duplicates the optimal way to get rid of duplicates? Would it be
better if we run operations on red directly?

Also what about if we want to keep the last value of the group while
performing deduplication (based on some sorting criteria)?

Thanks,
Rishi

On Mon, May 20, 2019 at 3:33 PM Nicholas Hakobian <
nicholas.hakob...@rallyhealth.com> wrote:

> From doing some searching around in the spark codebase, I found the
> following:
>
>
> https://github.com/apache/spark/blob/163a6e298213f216f74f4764e241ee6298ea30b6/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala#L1452-L1474
>
> So it appears there is no direct operation called dropDuplicates or
> Deduplicate, but there is an optimizer rule that converts this logical
> operation to a physical operation that is equivalent to grouping by all the
> columns you want to deduplicate across (or all columns if you are doing
> something like distinct), and taking the First() value. So (using a pySpark
> code example):
>
> df = input_df.dropDuplicates(['col1', 'col2'])
>
> Is effectively shorthand for saying something like:
>
> df = input_df.groupBy('col1',
> 'col2').agg(first(struct(input_df.columns)).alias('data')).select('data.*')
>
> Except I assume that it has some internal optimization so it doesn't need
> to pack/unpack the column data, and just returns the whole Row.
>
> Nicholas Szandor Hakobian, Ph.D.
> Principal Data Scientist
> Rally Health
> nicholas.hakob...@rallyhealth.com
>
>
>
> On Mon, May 20, 2019 at 11:38 AM Yeikel  wrote:
>
>> Hi ,
>>
>> I am looking for a high level explanation(overview) on how
>> dropDuplicates[1]
>> works.
>>
>> [1]
>>
>> https://github.com/apache/spark/blob/db24b04cad421ed508413d397c6beec01f723aee/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala#L2326
>>
>> Could someone please explain?
>>
>> Thank you
>>
>>
>>
>> --
>> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>>
>> -
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>
>>

-- 
Regards,

Rishi Shah


[pyspark 2.3+] Querying non-partitioned @TB data table is too slow

2019-06-09 Thread Rishi Shah
Hi All,

I have a table with 3TB data, stored as parquet snappy compression - 100
columns.. However I am filtering the DataFrame on date column (date between
20190501-20190530) & selecting only 20 columns & counting.. This operation
takes about 45 mins!!

Shouldn't parquet do the predicate pushdown and filtering without scanning
the entire dataset?

-- 
Regards,

Rishi Shah


[Pyspark 2.4] Best way to define activity within different time window

2019-06-05 Thread Rishi Shah
Hi All,

Is there a best practice around calculating daily, weekly, monthly,
quarterly, yearly active users?

One approach is to create a window of daily bitmap and aggregate it based
on period later. However I was wondering if anyone has a better approach to
tackling this problem..

-- 
Regards,

Rishi Shah


Re: [pyspark 2.3+] Bucketing with sort - incremental data load?

2019-05-31 Thread Rishi Shah
Thanks much for your input Gourav, Silvio.

I have about 10TB of data, which gets stored daily. There's no qualifying
column for partitioning, which makes querying this table super slow. So I
wanted to sort the results before storing them daily. This is why I was
thinking to use bucketing and sorting ... Do you think sorting data based
on a column or two before saving would help query performance on this
table?

My concern is, data will be sorted on daily basis and not globally. Would
that help with performance? I can compact files every month as well and
sort before saving. Just not sure if this is going to help with performance
issues on this table.

Would be great to get your advice on this.









On Fri, May 31, 2019 at 10:42 AM Silvio Fiorito <
silvio.fior...@granturing.com> wrote:

> Spark does allow appending new files to bucketed tables. When the data is
> read in, Spark will combine the multiple files belonging to the same
> buckets into the same partitions.
>
>
>
> Having said that, you need to be very careful with bucketing especially as
> you’re appending to avoid generating lots of small files. So, you may need
> to consider periodically running a compaction job.
>
>
>
> If you’re simply appending daily snapshots, then you could just consider
> using date partitions, instead?
>
>
>
> *From: *Rishi Shah 
> *Date: *Thursday, May 30, 2019 at 10:43 PM
> *To: *"user @spark" 
> *Subject: *[pyspark 2.3+] Bucketing with sort - incremental data load?
>
>
>
> Hi All,
>
>
>
> Can we use bucketing with sorting functionality to save data incrementally
> (say daily) ? I understand bucketing is supported in Spark only with
> saveAsTable, however can this be used with mode "append" instead of
> "overwrite"?
>
>
>
> My understanding around bucketing was, you need to rewrite entire table
> every time, can someone help advice?
>
>
>
> --
>
> Regards,
>
>
>
> Rishi Shah
>


-- 
Regards,

Rishi Shah


[pyspark 2.3+] Bucketing with sort - incremental data load?

2019-05-30 Thread Rishi Shah
Hi All,

Can we use bucketing with sorting functionality to save data incrementally
(say daily) ? I understand bucketing is supported in Spark only with
saveAsTable, however can this be used with mode "append" instead of
"overwrite"?

My understanding around bucketing was, you need to rewrite entire table
every time, can someone help advice?

-- 
Regards,

Rishi Shah


Re: [pyspark 2.3+] how to dynamically determine DataFrame partitions while writing

2019-05-22 Thread Rishi Shah
Hi All,

Any idea about this?

Thanks,
Rishi

On Tue, May 21, 2019 at 11:29 PM Rishi Shah 
wrote:

> Hi All,
>
> What is the best way to determine partitions of a dataframe dynamically
> before writing to disk?
>
> 1) statically determine based on data and use coalesce or repartition
> while writing
> 2) somehow determine count of records for entire dataframe and divide that
> number to determine partition - however how to determine total count
> without having to risk computing dataframe twice (if dataframe is not
> cached, and count() is used)
>
> --
> Regards,
>
> Rishi Shah
>


-- 
Regards,

Rishi Shah


[pyspark 2.3+] repartition followed by window function

2019-05-22 Thread Rishi Shah
Hi All,

If dataframe is repartitioned in memory by (date, id) columns and then if I
use multiple window functions which uses partition by clause with (date,
id) columns --> we can avoid shuffle/sort again I believe.. Can someone
confirm this?

However what happens when dataframe repartition was done using (date, id)
columns, but window function which follows repartition needs a partition by
clause with (date, id, col3, col4) columns ? Would spark reshuffle the
data? or would it know to utilize the initially partitioned/shuffled data
by date/id (as date & id are the common partition keys)?

-- 
Regards,

Rishi Shah


[pyspark 2.3+] how to dynamically determine DataFrame partitions while writing

2019-05-21 Thread Rishi Shah
Hi All,

What is the best way to determine partitions of a dataframe dynamically
before writing to disk?

1) statically determine based on data and use coalesce or repartition while
writing
2) somehow determine count of records for entire dataframe and divide that
number to determine partition - however how to determine total count
without having to risk computing dataframe twice (if dataframe is not
cached, and count() is used)

-- 
Regards,

Rishi Shah


[pyspark 2.3] count followed by write on dataframe

2019-05-20 Thread Rishi Shah
Hi All,

Just wanted to confirm my understanding around actions on dataframe. If
dataframe is not persisted at any point, & count() is called on a dataframe
followed by write action --> this would trigger dataframe computation twice
(which could be the performance hit for a larger dataframe).. Could anyone
please help confirm?

-- 
Regards,

Rishi Shah


Re: Spark job gets hung on cloudera cluster

2019-05-17 Thread Rishi Shah
Yes that's exactly what happens, but I would think that if data node is
unavailable/unavailability of data for one of the nodes should not cause
indefinite wait.. Are there any properties we can set to avoid getting into
indefinite/non-deterministic outcome of a spark application?

On Thu, May 16, 2019 at 9:49 AM Akshay Bhardwaj <
akshay.bhardwaj1...@gmail.com> wrote:

> One of the reason that any jobs running on YARN (Spark, MR, Hive, etc) can
> get stuck is if there is data unavailability issue with HDFS.
> This can arise if either the Namenode is not reachable or if the
> particular data block is unavailable due to node failures.
>
> Can you check if your YARN service can communicate with Name node service?
>
> Akshay Bhardwaj
> +91-97111-33849
>
>
> On Thu, May 16, 2019 at 4:27 PM Rishi Shah 
> wrote:
>
>> on yarn
>>
>> On Thu, May 16, 2019 at 1:36 AM Akshay Bhardwaj <
>> akshay.bhardwaj1...@gmail.com> wrote:
>>
>>> Hi Rishi,
>>>
>>> Are you running spark on YARN or spark's master-slave cluster?
>>>
>>> Akshay Bhardwaj
>>> +91-97111-33849
>>>
>>>
>>> On Thu, May 16, 2019 at 7:15 AM Rishi Shah 
>>> wrote:
>>>
>>>> Any one please?
>>>>
>>>> On Tue, May 14, 2019 at 11:51 PM Rishi Shah 
>>>> wrote:
>>>>
>>>>> Hi All,
>>>>>
>>>>> At times when there's a data node failure, running spark job doesn't
>>>>> fail - it gets stuck and doesn't return. Any setting can help here? I 
>>>>> would
>>>>> ideally like to get the job terminated or executors running on those data
>>>>> nodes fail...
>>>>>
>>>>> --
>>>>> Regards,
>>>>>
>>>>> Rishi Shah
>>>>>
>>>>
>>>>
>>>> --
>>>> Regards,
>>>>
>>>> Rishi Shah
>>>>
>>>
>>
>> --
>> Regards,
>>
>> Rishi Shah
>>
>

-- 
Regards,

Rishi Shah


Re: Spark job gets hung on cloudera cluster

2019-05-16 Thread Rishi Shah
on yarn

On Thu, May 16, 2019 at 1:36 AM Akshay Bhardwaj <
akshay.bhardwaj1...@gmail.com> wrote:

> Hi Rishi,
>
> Are you running spark on YARN or spark's master-slave cluster?
>
> Akshay Bhardwaj
> +91-97111-33849
>
>
> On Thu, May 16, 2019 at 7:15 AM Rishi Shah 
> wrote:
>
>> Any one please?
>>
>> On Tue, May 14, 2019 at 11:51 PM Rishi Shah 
>> wrote:
>>
>>> Hi All,
>>>
>>> At times when there's a data node failure, running spark job doesn't
>>> fail - it gets stuck and doesn't return. Any setting can help here? I would
>>> ideally like to get the job terminated or executors running on those data
>>> nodes fail...
>>>
>>> --
>>> Regards,
>>>
>>> Rishi Shah
>>>
>>
>>
>> --
>> Regards,
>>
>> Rishi Shah
>>
>

-- 
Regards,

Rishi Shah


Re: Databricks - number of executors, shuffle.partitions etc

2019-05-16 Thread Rishi Shah
Thanks Ayan, I wasn't aware of such user group specifically for databricks.
Thanks for the input, much appreciated!

On Wed, May 15, 2019 at 10:07 PM ayan guha  wrote:

> Well its a databricks question so better be asked in their forum.
>
> You can set up cluster level params when you create new cluster or add
> them later. Go to cluster page, ipen one cluster, expand additional config
> section and add your param there as key value pair separated by space.
>
> On Thu, 16 May 2019 at 11:46 am, Rishi Shah 
> wrote:
>
>> Hi All,
>>
>> Any idea?
>>
>> Thanks,
>> -Rishi
>>
>> On Tue, May 14, 2019 at 11:52 PM Rishi Shah 
>> wrote:
>>
>>> Hi All,
>>>
>>> How can we set spark conf parameter in databricks notebook? My cluster
>>> doesn't take into account any spark.conf.set properties... it creates 8
>>> worker nodes (dat executors) but doesn't honor the supplied conf
>>> parameters. Any idea?
>>>
>>> --
>>> Regards,
>>>
>>> Rishi Shah
>>>
>>
>>
>> --
>> Regards,
>>
>> Rishi Shah
>>
> --
> Best Regards,
> Ayan Guha
>


-- 
Regards,

Rishi Shah


Re: Databricks - number of executors, shuffle.partitions etc

2019-05-15 Thread Rishi Shah
Hi All,

Any idea?

Thanks,
-Rishi

On Tue, May 14, 2019 at 11:52 PM Rishi Shah 
wrote:

> Hi All,
>
> How can we set spark conf parameter in databricks notebook? My cluster
> doesn't take into account any spark.conf.set properties... it creates 8
> worker nodes (dat executors) but doesn't honor the supplied conf
> parameters. Any idea?
>
> --
> Regards,
>
> Rishi Shah
>


-- 
Regards,

Rishi Shah


Re: Spark job gets hung on cloudera cluster

2019-05-15 Thread Rishi Shah
Any one please?

On Tue, May 14, 2019 at 11:51 PM Rishi Shah 
wrote:

> Hi All,
>
> At times when there's a data node failure, running spark job doesn't fail
> - it gets stuck and doesn't return. Any setting can help here? I would
> ideally like to get the job terminated or executors running on those data
> nodes fail...
>
> --
> Regards,
>
> Rishi Shah
>


-- 
Regards,

Rishi Shah


Databricks - number of executors, shuffle.partitions etc

2019-05-14 Thread Rishi Shah
Hi All,

How can we set spark conf parameter in databricks notebook? My cluster
doesn't take into account any spark.conf.set properties... it creates 8
worker nodes (dat executors) but doesn't honor the supplied conf
parameters. Any idea?

-- 
Regards,

Rishi Shah


Spark job gets hung on cloudera cluster

2019-05-14 Thread Rishi Shah
Hi All,

At times when there's a data node failure, running spark job doesn't fail -
it gets stuck and doesn't return. Any setting can help here? I would
ideally like to get the job terminated or executors running on those data
nodes fail...

-- 
Regards,

Rishi Shah


[pyspark 2.3] drop_duplicates, keep first record based on sorted records

2019-05-13 Thread Rishi Shah
Hi All,

Is there a better way to drop duplicates, and keep first record based on
sorted column?

simple sorting on dataframe and dropping duplicates is quite slow!

-- 
Regards,

Rishi Shah


[Pyspark 2.3] Logical operators (and/or) in pyspark

2019-05-13 Thread Rishi Shah
Hi All,

I am using or operator "|" in withColumn clause on a DataFrame in pyspark.
However it looks like it always evaluates all the conditions regardless of
first condition being true. Please find a sample below:

contains = udf(lambda s, arr : s in arr, BooleanType())

df.withColumn('match_flag', (col('list_names').isNull()) |
(contains(col('name'), col('list_names'

Here where list_names is null, it starts to throw an error : NoneType is
not iterable.

Any idea?

-- 
Regards,

Rishi Shah


Re: Anaconda installation with Pyspark/Pyarrow (2.3.0+) on cloudera managed server

2019-05-04 Thread Rishi Shah
Thanks Patrick! I tried to package it according to this instructions, it
got distributed on the cluster however the same spark program that takes 5
mins without pandas UDF has started to take 25mins...

Have you experienced anything like this? Also is Pyarrow 0.12 supported
with Spark 2.3 (according to documentation, it should be fine)?

On Tue, Apr 30, 2019 at 9:35 AM Patrick McCarthy 
wrote:

> Hi Rishi,
>
> I've had success using the approach outlined here:
> https://community.hortonworks.com/articles/58418/running-pyspark-with-conda-env.html
>
> Does this work for you?
>
> On Tue, Apr 30, 2019 at 12:32 AM Rishi Shah 
> wrote:
>
>> modified the subject & would like to clarify that I am looking to create
>> an anaconda parcel with pyarrow and other libraries, so that I can
>> distribute it on the cloudera cluster..
>>
>> On Tue, Apr 30, 2019 at 12:21 AM Rishi Shah 
>> wrote:
>>
>>> Hi All,
>>>
>>> I have been trying to figure out a way to build anaconda parcel with
>>> pyarrow included for my cloudera managed server for distribution but this
>>> doesn't seem to work right. Could someone please help?
>>>
>>> I have tried to install anaconda on one of the management nodes on
>>> cloudera cluster... tarred the directory, but this directory doesn't
>>> include all the packages to form a proper parcel for distribution.
>>>
>>> Any help is much appreciated!
>>>
>>> --
>>> Regards,
>>>
>>> Rishi Shah
>>>
>>
>>
>> --
>> Regards,
>>
>> Rishi Shah
>>
>
>
> --
>
>
> *Patrick McCarthy  *
>
> Senior Data Scientist, Machine Learning Engineering
>
> Dstillery
>
> 470 Park Ave South, 17th Floor, NYC 10016
>


-- 
Regards,

Rishi Shah


Re: Anaconda installation with Pyspark/Pyarrow (2.3.0+) on cloudera managed server

2019-04-29 Thread Rishi Shah
modified the subject & would like to clarify that I am looking to create an
anaconda parcel with pyarrow and other libraries, so that I can distribute
it on the cloudera cluster..

On Tue, Apr 30, 2019 at 12:21 AM Rishi Shah 
wrote:

> Hi All,
>
> I have been trying to figure out a way to build anaconda parcel with
> pyarrow included for my cloudera managed server for distribution but this
> doesn't seem to work right. Could someone please help?
>
> I have tried to install anaconda on one of the management nodes on
> cloudera cluster... tarred the directory, but this directory doesn't
> include all the packages to form a proper parcel for distribution.
>
> Any help is much appreciated!
>
> --
> Regards,
>
> Rishi Shah
>


-- 
Regards,

Rishi Shah


Anaconda installation with Pyspark on cloudera managed server

2019-04-29 Thread Rishi Shah
Hi All,

I have been trying to figure out a way to build anaconda parcel with
pyarrow included for my cloudera managed server for distribution but this
doesn't seem to work right. Could someone please help?

I have tried to install anaconda on one of the management nodes on cloudera
cluster... tarred the directory, but this directory doesn't include all the
packages to form a proper parcel for distribution.

Any help is much appreciated!

-- 
Regards,

Rishi Shah


[pyspark] Use output of one aggregated function for another aggregated function within the same groupby

2019-04-24 Thread Rishi Shah
Hi All,

[PySpark 2.3, python 2.7]

I would like to achieve something like this, could you please suggest best
way to implement (perhaps highlight pros & cons of the approach in terms of
performance)?

df = df.groupby('grp_col').agg(max(date).alias('max_date'), count(when
col('file_date') == col('max_date')))

Please note 'max_date' is a result of aggregate function max inside the
group by agg. I can definitely use multiple groupbys to achieve this but is
there a better way? better performance wise may be?

Appreciate your help!

-- 
Regards,

Rishi Shah


RDD vs Dataframe & when to persist

2019-04-24 Thread Rishi Shah
Hello All,

I run into situations where I ask myself should I write map partitions
function on RDD or use dataframe all the way (with column + group by )
approach.. I am using Pyspark 2.3 (python 2.7).. I understand we should be
utilizing dataframe as much as possible but at time it feels like RDD
function would provide more flexible code .. Could you please advise? When
to prefer one approach over the other.. (keeping pandas UDFs functions in
mind, which approach makes more sense in what scenarios? )

Also how does it affect performance - that is using dataframe all the way
vs RDD map partitions function?

another question always arrises as to when to persist a dataframe? should
we repartition before group by? If so, without persist - will it affect
performance?

Any help is much appreciated.

Thanks,
-Rishi


Use derived column for other derived column in the same statement

2019-04-21 Thread Rishi Shah
Hello All,

How can we use a derived column1 for deriving another column in the same
dataframe operation statement?

something like:

df = df.withColumn('derived1', lit('something'))
.withColumn('derived2', col('derived1') == 'something')

-- 
Regards,

Rishi Shah