[
https://issues.apache.org/jira/browse/BEAM-848?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Amit Sela updated BEAM-848:
---------------------------
Comment: was deleted
(was: I have this patched-up but I'm waiting to find the time to actually test
and see this break before the patch, and run successfully after.
Breaking this should be easy enough if the input from the {{UnboundedSource}}
is not Kryo-serializable.)
> A better shuffle after reading from within mapWithState.
> --------------------------------------------------------
>
> Key: BEAM-848
> URL: https://issues.apache.org/jira/browse/BEAM-848
> Project: Beam
> Issue Type: Bug
> Components: runner-spark
> Reporter: Amit Sela
> Assignee: Amit Sela
>
> The SparkRunner uses {{mapWithState}} to read and manage CheckpointMarks, and
> this stateful operation will be followed by a shuffle:
> https://github.com/apache/spark/blob/master/streaming/src/main/scala/org/apache/spark/streaming/dstream/MapWithStateDStream.scala#L159
> Since the stateful read maps "splitSource" -> "partition of a list of read
> values", the following shuffle won't benefit in any way (the list of read
> values has not been flatMapped yet). In order to avoid shuffle we need to set
> the input RDD ({{SourceRDD.Unbounded}}) partitioner to be a default
> {{HashPartitioner}} since {{mapWithState}} would use the same partitioner and
> will skip shuffle if the partitioners match.
> It would be wise to shuffle the read values _after_ flatmap.
> I will break this into two tasks:
> # Set default-partitioner to the input RDD.
> # Shuffle (using Coders) the input.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)