[
https://issues.apache.org/jira/browse/BEAM-6332?focusedWorklogId=184252&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-184252
]
ASF GitHub Bot logged work on BEAM-6332:
----------------------------------------
Author: ASF GitHub Bot
Created on: 11/Jan/19 16:02
Start Date: 11/Jan/19 16:02
Worklog Time Spent: 10m
Work Description: iemejia commented on pull request #7398: [BEAM-6332]
Lazy serialization of aggregation results in Spark runner.
URL: https://github.com/apache/beam/pull/7398
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 184252)
Time Spent: 20m (was: 10m)
> Avoid unnecessary serialization steps when executing combine transform
> ----------------------------------------------------------------------
>
> Key: BEAM-6332
> URL: https://issues.apache.org/jira/browse/BEAM-6332
> Project: Beam
> Issue Type: Improvement
> Components: runner-spark
> Reporter: Vaclav Plajt
> Assignee: Vaclav Plajt
> Priority: Major
> Time Spent: 20m
> Remaining Estimate: 0h
>
> Combine transformation is translated into Spark's RDD API in
> [GroupCombineFunctions|https://github.com/apache/beam/blob/master/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/GroupCombineFunctions.java]
> `combinePerKey` and `combineGlobally` methods. Both methods use byte arrays
> as intermediate state of aggregation so they can be transferred over network.
> That leads to serialization and de-serialization of intermediate aggregation
> value every time new element is added to aggregation. That is unnecessary and
> should be avoided.
> (copied from the related JIRA because the explanation is better):
> Right now spark runner's implementation of combineByKey causes at least two
> serializations / de-serializations of each element, because combine
> accumulators are byte based.
> We can do much better by letting accumulators to work on user defined java
> types and only serialize accumulators when we need to send them over the
> network.
> In order to do this, we need following:
> Acummulator wrapper -> contains transient `T` value + byte payload, that
> is filled in during serialization
> JavaSerialization: accumulator (beam wrapper) needs to implement
> Serializable and override writeObject and readObject methods and use beam
> coder
> KryoSerialization: we need a custom kryo serializer for accumulator
> wrapper
> This should be enough to hook into all possible spark serialization
> interfaces.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)