Re: Does explode lead to more usage of memory

2020-01-18 Thread Jörn Franke
Why not two tables and then you can join them? This would be the standard way. 
it depends what your full use case is, what volumes / orders you expect on 
average, how aggregations and filters look like. The example below states that 
you do a Select all on the table.

> Am 19.01.2020 um 01:50 schrieb V0lleyBallJunki3 :
> 
> I am using a dataframe and has structure like this :
> 
> root
> |-- orders: array (nullable = true)
> ||-- element: struct (containsNull = true) 
> |||-- amount: double (nullable = true)
> |||-- id: string (nullable = true)
> |-- user: string (nullable = true)
> |-- language: string (nullable = true)
> 
> Each user has multiple orders. Now if I explode orders like this:
> 
> df.select($"user", explode($"orders").as("order")) . Each order element will
> become a row with a duplicated user and language. Was wondering if spark
> actually converts each order element into a single row in memory or it just
> logical. Because if a single user has 1000 orders  then wouldn't it lead to
> a lot more memory consumption since it is duplicating user and language a
> 1000 times (once for each order) in memory?
> 
> 
> 
> --
> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
> 
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
> 

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Does explode lead to more usage of memory

2020-01-18 Thread Chris Teoh
I think it does mean more memory usage but consider how big your arrays
are. Think about your use case requirements and whether it makes sense to
use arrays. Also it may be preferable to explode if the arrays are very
large. I'd say exploding arrays will make the data more splittable, having
the array has benefit of avoiding a join and colocation of the children
items but does imply more memory pressure on each executor to read every
record in the array, requiring denser nodes.

I hope that helps.

On Sun, 19 Jan 2020, 7:50 am V0lleyBallJunki3, 
wrote:

> I am using a dataframe and has structure like this :
>
> root
>  |-- orders: array (nullable = true)
>  ||-- element: struct (containsNull = true)
>  |||-- amount: double (nullable = true)
>  |||-- id: string (nullable = true)
>  |-- user: string (nullable = true)
>  |-- language: string (nullable = true)
>
> Each user has multiple orders. Now if I explode orders like this:
>
> df.select($"user", explode($"orders").as("order")) . Each order element
> will
> become a row with a duplicated user and language. Was wondering if spark
> actually converts each order element into a single row in memory or it just
> logical. Because if a single user has 1000 orders  then wouldn't it lead to
> a lot more memory consumption since it is duplicating user and language a
> 1000 times (once for each order) in memory?
>
>
>
> --
> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Does explode lead to more usage of memory

2020-01-18 Thread V0lleyBallJunki3
I am using a dataframe and has structure like this :

root
 |-- orders: array (nullable = true)
 ||-- element: struct (containsNull = true) 
 |||-- amount: double (nullable = true)
 |||-- id: string (nullable = true)
 |-- user: string (nullable = true)
 |-- language: string (nullable = true)

Each user has multiple orders. Now if I explode orders like this:

df.select($"user", explode($"orders").as("order")) . Each order element will
become a row with a duplicated user and language. Was wondering if spark
actually converts each order element into a single row in memory or it just
logical. Because if a single user has 1000 orders  then wouldn't it lead to
a lot more memory consumption since it is duplicating user and language a
1000 times (once for each order) in memory?



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: How to implement "getPreferredLocations" in Data source v2?

2020-01-18 Thread Russell Spitzer
https://github.com/apache/spark/blob/master/sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/InputPartition.java

See InputPartition which had a preferred location parameter you should
override

On Sat, Jan 18, 2020, 1:44 PM kineret M  wrote:

> Hi,
> I would like to support data locality in Spark data source v2. How can I
> provide Spark the ability to read and process data on the same node?
>
> I didn't find any interface that supports 'getPreferredLocations' (or
> equivalent).
>
> Thanks!
>


How to implement "getPreferredLocations" in Data source v2?

2020-01-18 Thread kineret M
Hi,
I would like to support data locality in Spark data source v2. How can I
provide Spark the ability to read and process data on the same node?

I didn't find any interface that supports 'getPreferredLocations' (or
equivalent).

Thanks!


How to prevent and track data loss/dropped due to watermark during structure streaming aggregation

2020-01-18 Thread stevech.hu
We have a scenario to group raw records by correlation id every 3 minutes and
append groupped result to some HDFS store, below is an example of our query

val df= records.readStream.format("SomeDataSource")
   .selectExpr("current_timestamp() as CurrentTime", "*")
  .withWatermark("CurrentTime", "2 minute")
  .groupBy(window($"CurrentTime", "3 minute"), $"CorrelationId")
  .agg(collect_list(col("data")) as "Records")
  .repartition(100, $"CorrelationId")
  .select($"CorrelationId", $"Records")
  .writeStream.

We want include delayed data even if there is processing delay in the
pipeline, and have the SLA of 5 minutes meaning once any record is read into
spark, we want to see the groupped output flush to hdfs within 5 minutes.

So, let's say during shuffle stage (groupby) or write stage, we have a delay
of 5 to 10 minutes, will we lose data due to watermark of 2 minutes here?
(sometimes it is ok to break SLA but we cannot afford data loss)   If so,
how can we prevent data loss or track the amount of data is being dropped in
this case?  

Note that, extending watermark to longer windows won't work in our append
scenario, because aggregate data won't be output to write stage until the
watermark timer is up.

Thanks,
Steve


  





--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org