[
https://issues.apache.org/jira/browse/BEAM-6332?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Ismaël Mejía updated BEAM-6332:
-------------------------------
Description:
Combine transformation is translated into Spark's RDD API in
[GroupCombineFunctions|https://github.com/apache/beam/blob/master/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/GroupCombineFunctions.java]
`combinePerKey` and `combineGlobally` methods. Both methods use byte arrays as
intermediate state of aggregation so they can be transferred over network. That
leads to serialization and de-serialization of intermediate aggregation value
every time new element is added to aggregation. That is unnecessary and should
be avoided.
(copied from the related JIRA because the explanation is better):
Right now spark runner's implementation of combineByKey causes at least two
serializations / de-serializations of each element, because combine
accumulators are byte based.
We can do much better by letting accumulators to work on user defined java
types and only serialize accumulators when we need to send them over the
network.
In order to do this, we need following:
Acummulator wrapper -> contains transient `T` value + byte payload, that is
filled in during serialization
JavaSerialization: accumulator (beam wrapper) needs to implement
Serializable and override writeObject and readObject methods and use beam coder
KryoSerialization: we need a custom kryo serializer for accumulator wrapper
This should be enough to hook into all possible spark serialization interfaces.
was:Combine transformation is translated into Spark's RDD API in
[GroupCombineFunctions|https://github.com/apache/beam/blob/master/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/GroupCombineFunctions.java]
`combinePerKey` and `combineGlobally` methods. Both methods use byte arrays as
intermediate state of aggregation so they can be transferred over network. That
leads to serialization and de-serialization of intermediate aggregation value
every time new element is added to aggregation. That is unnecessary and should
be avoided.
> Avoid unnecessary serialization steps when executing combine transform
> ----------------------------------------------------------------------
>
> Key: BEAM-6332
> URL: https://issues.apache.org/jira/browse/BEAM-6332
> Project: Beam
> Issue Type: Improvement
> Components: runner-spark
> Reporter: Vaclav Plajt
> Assignee: Vaclav Plajt
> Priority: Major
> Time Spent: 10m
> Remaining Estimate: 0h
>
> Combine transformation is translated into Spark's RDD API in
> [GroupCombineFunctions|https://github.com/apache/beam/blob/master/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/GroupCombineFunctions.java]
> `combinePerKey` and `combineGlobally` methods. Both methods use byte arrays
> as intermediate state of aggregation so they can be transferred over network.
> That leads to serialization and de-serialization of intermediate aggregation
> value every time new element is added to aggregation. That is unnecessary and
> should be avoided.
> (copied from the related JIRA because the explanation is better):
> Right now spark runner's implementation of combineByKey causes at least two
> serializations / de-serializations of each element, because combine
> accumulators are byte based.
> We can do much better by letting accumulators to work on user defined java
> types and only serialize accumulators when we need to send them over the
> network.
> In order to do this, we need following:
> Acummulator wrapper -> contains transient `T` value + byte payload, that
> is filled in during serialization
> JavaSerialization: accumulator (beam wrapper) needs to implement
> Serializable and override writeObject and readObject methods and use beam
> coder
> KryoSerialization: we need a custom kryo serializer for accumulator
> wrapper
> This should be enough to hook into all possible spark serialization
> interfaces.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)