[ https://issues.apache.org/jira/browse/BEAM-6332?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Ismaël Mejía updated BEAM-6332: ------------------------------- Fix Version/s: (was: 2.11.0) 2.10.0 > Avoid unnecessary serialization steps when executing combine transform > ---------------------------------------------------------------------- > > Key: BEAM-6332 > URL: https://issues.apache.org/jira/browse/BEAM-6332 > Project: Beam > Issue Type: Improvement > Components: runner-spark > Reporter: Vaclav Plajt > Assignee: Vaclav Plajt > Priority: Major > Fix For: 2.10.0 > > Time Spent: 20m > Remaining Estimate: 0h > > Combine transformation is translated into Spark's RDD API in > [GroupCombineFunctions|https://github.com/apache/beam/blob/master/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/GroupCombineFunctions.java] > `combinePerKey` and `combineGlobally` methods. Both methods use byte arrays > as intermediate state of aggregation so they can be transferred over network. > That leads to serialization and de-serialization of intermediate aggregation > value every time new element is added to aggregation. That is unnecessary and > should be avoided. > _(copied from BEAM-6214 because it explains the approach):_ > We can do much better by letting accumulators to work on user defined java > types and only serialize accumulators when we need to send them over the > network. > In order to do this, we need following: > * Acummulator wrapper -> contains transient `T` value + byte payload, that > is filled in during serialization > * JavaSerialization: accumulator (beam wrapper) needs to implement > Serializable and override writeObject and readObject methods and use beam > coder > * KryoSerialization: we need a custom kryo serializer for accumulator wrapper > This should be enough to hook into all possible spark serialization > interfaces. -- This message was sent by Atlassian JIRA (v7.6.3#76005)