[ 
https://issues.apache.org/jira/browse/BEAM-6332?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ismaël Mejía updated BEAM-6332:
-------------------------------
    Description: 
Combine transformation is translated into Spark's RDD API in 
[GroupCombineFunctions|https://github.com/apache/beam/blob/master/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/GroupCombineFunctions.java]
 `combinePerKey` and `combineGlobally` methods. Both methods use byte arrays as 
intermediate state of aggregation so they can be transferred over network. That 
leads to serialization and de-serialization of intermediate aggregation value 
every time new element is added to aggregation. That is unnecessary and should 
be avoided.

_(copied from BEAM-6214 because it explains the approach):_

We can do much better by letting accumulators to work on user defined java 
types and only serialize accumulators when we need to send them over the 
network.

In order to do this, we need following:
 * Acummulator wrapper -> contains transient `T` value + byte payload, that is 
filled in during serialization
 * JavaSerialization: accumulator (beam wrapper) needs to implement 
Serializable and override writeObject and readObject methods and use beam coder
 * KryoSerialization: we need a custom kryo serializer for accumulator wrapper

This should be enough to hook into all possible spark serialization interfaces.

  was:
Combine transformation is translated into Spark's RDD API in 
[GroupCombineFunctions|https://github.com/apache/beam/blob/master/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/GroupCombineFunctions.java]
 `combinePerKey` and `combineGlobally` methods. Both methods use byte arrays as 
intermediate state of aggregation so they can be transferred over network. That 
leads to serialization and de-serialization of intermediate aggregation value 
every time new element is added to aggregation. That is unnecessary and should 
be avoided.

(copied from the related JIRA because the explanation is better):

Right now spark runner's implementation of combineByKey causes at least two 
serializations / de-serializations of each element, because combine 
accumulators are byte based.

We can do much better by letting accumulators to work on user defined java 
types and only serialize accumulators when we need to send them over the 
network.

In order to do this, we need following:

    Acummulator wrapper -> contains transient `T` value + byte payload, that is 
filled in during serialization
    JavaSerialization: accumulator (beam wrapper) needs to implement 
Serializable and override writeObject and readObject methods and use beam coder
    KryoSerialization: we need a custom kryo serializer for accumulator wrapper

This should be enough to hook into all possible spark serialization interfaces.



> Avoid unnecessary serialization steps when executing combine transform
> ----------------------------------------------------------------------
>
>                 Key: BEAM-6332
>                 URL: https://issues.apache.org/jira/browse/BEAM-6332
>             Project: Beam
>          Issue Type: Improvement
>          Components: runner-spark
>            Reporter: Vaclav Plajt
>            Assignee: Vaclav Plajt
>            Priority: Major
>          Time Spent: 20m
>  Remaining Estimate: 0h
>
> Combine transformation is translated into Spark's RDD API in 
> [GroupCombineFunctions|https://github.com/apache/beam/blob/master/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/GroupCombineFunctions.java]
>  `combinePerKey` and `combineGlobally` methods. Both methods use byte arrays 
> as intermediate state of aggregation so they can be transferred over network. 
> That leads to serialization and de-serialization of intermediate aggregation 
> value every time new element is added to aggregation. That is unnecessary and 
> should be avoided.
> _(copied from BEAM-6214 because it explains the approach):_
> We can do much better by letting accumulators to work on user defined java 
> types and only serialize accumulators when we need to send them over the 
> network.
> In order to do this, we need following:
>  * Acummulator wrapper -> contains transient `T` value + byte payload, that 
> is filled in during serialization
>  * JavaSerialization: accumulator (beam wrapper) needs to implement 
> Serializable and override writeObject and readObject methods and use beam 
> coder
>  * KryoSerialization: we need a custom kryo serializer for accumulator wrapper
> This should be enough to hook into all possible spark serialization 
> interfaces.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to