[ 
https://issues.apache.org/jira/browse/BEAM-5392?focusedWorklogId=195670&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-195670
 ]

ASF GitHub Bot logged work on BEAM-5392:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 07/Feb/19 14:10
            Start Date: 07/Feb/19 14:10
    Worklog Time Spent: 10m 
      Work Description: mareksimunek commented on pull request #7601: 
[BEAM-5392] GroupByKey optimized for non-merging windows
URL: https://github.com/apache/beam/pull/7601#discussion_r254696042
 
 

 ##########
 File path: 
runners/spark/src/main/java/org/apache/beam/runners/spark/translation/EvaluationContext.java
 ##########
 @@ -137,30 +139,39 @@ public void setCurrentTransform(AppliedPTransform<?, ?, 
?> transform) {
   }
 
   /**
-   * Cache PCollection if {@link #isCacheDisabled()} flag is false and 
PCollection is used more then
-   * once in Pipeline.
+   * Cache PCollection if {@link #isCacheDisabled()} flag is false or 
transform isn't GroupByKey
+   * transformation and PCollection is used more then once in Pipeline.
    *
-   * @param pvalue
+   * <p>PCollection is not cached in GroupByKey transformation, because Spark 
automatically persists
+   * some intermediate data in shuffle operations, even without users calling 
persist.
+   *
+   * @param pvalue output of transform
+   * @param transform
    * @return if PCollection will be cached
    */
-  public boolean shouldCache(PValue pvalue) {
-    if (isCacheDisabled()) {
+  public boolean shouldCache(PValue pvalue, PTransform<?, ? extends PValue> 
transform) {
+    if (isCacheDisabled() || transform instanceof GroupByKey) {
       return false;
     }
     return pvalue instanceof PCollection && 
cacheCandidates.getOrDefault(pvalue, 0L) > 1;
   }
 
   public void putDataset(PTransform<?, ? extends PValue> transform, Dataset 
dataset) {
-    putDataset(getOutput(transform), dataset);
+    putDataset(transform, getOutput(transform), dataset);
   }
 
   public void putDataset(PValue pvalue, Dataset dataset) {
 
 Review comment:
   Method is used when transform has multiple outputs so I left it and added 
better javadoc
 
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 195670)
    Time Spent: 2h 50m  (was: 2h 40m)

> GroupByKey on Spark: All values for a single key need to fit in-memory at once
> ------------------------------------------------------------------------------
>
>                 Key: BEAM-5392
>                 URL: https://issues.apache.org/jira/browse/BEAM-5392
>             Project: Beam
>          Issue Type: Bug
>          Components: runner-spark
>    Affects Versions: 2.6.0
>            Reporter: David Moravek
>            Assignee: David Moravek
>            Priority: Major
>              Labels: performance, triaged
>          Time Spent: 2h 50m
>  Remaining Estimate: 0h
>
> Currently, when using GroupByKey, all values for a single key need to fit 
> in-memory at once.
>  
> There are following issues, that need to be addressed:
> a) We can not use Spark's _groupByKey_, because it requires all values to fit 
> in memory for a single key (it is implemented as "list combiner")
> b) _ReduceFnRunner_ iterates over values multiple times in order to group 
> also by window
>  
> Solution:
>  
> In Dataflow Worker code, there are optimized versions of ReduceFnRunner, that 
> can take advantage of having elements for a single key sorted by timestamp.
>  
> We can use Spark's `{{repartitionAndSortWithinPartitions}}` in order to meet 
> this constraint.
>  
> For non-merging windows, we can put window itself into the key resulting in 
> smaller groupings.
>  
> This approach was already tested in ~100TB input scale on Spark 2.3.x. 
> (custom Spark runner).
>  
> I'll submit a patch once the Dataflow Worker code donation is complete.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to