[
https://issues.apache.org/jira/browse/BEAM-2669?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16145266#comment-16145266
]
Kobi Salant commented on BEAM-2669:
-----------------------------------
PR https://github.com/apache/beam/pull/3749 contains the latest changes for
the kryo exception.
The first step was to remove kryo Serializer hard coded settings but that
opened another issue, all our RDDs/DStreams are of type WindowdValue which is
not java serializable. A discussion o the dev list raised valid concerns of not
using the coders and breaking our agreement with the user.
So, we moved to a different direction when caching/persisting map to bytes with
coders so we will not need to serialize beam and user classes. Due to this
change it is impossible now to check the cached rdd which is now not the
PCollection's rdd and we removed the storage level test.
> Kryo serialization exception when DStreams containing non-Kryo-serializable
> data are cached
> -------------------------------------------------------------------------------------------
>
> Key: BEAM-2669
> URL: https://issues.apache.org/jira/browse/BEAM-2669
> Project: Beam
> Issue Type: Bug
> Components: runner-spark
> Affects Versions: 0.4.0, 0.5.0, 0.6.0, 2.0.0
> Reporter: Aviem Zur
> Assignee: Kobi Salant
>
> Today, when we detect re-use of a dataset in a pipeline in Spark runner we
> eagerly cache it to avoid calculating the same data multiple times.
> ([EvaluationContext.java|https://github.com/apache/beam/blob/v2.0.0/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/EvaluationContext.java#L148])
> When the dataset is bounded, which in Spark is represented by an {{RDD}}, we
> call {{RDD#persist}} and use storage level provided by the user via
> {{SparkPipelineOptions}}.
> ([BoundedDataset.java|https://github.com/apache/beam/blob/v2.0.0/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/BoundedDataset.java#L103-L103])
> When the dataset is unbounded, which in Spark is represented by a {{DStream}}
> we call {{DStream.cache()}} which defaults to persist the {{DStream}} using
> storage level {{MEMORY_ONLY_SER}}
> ([UnboundedDataset.java|https://github.com/apache/beam/blob/v2.0.0/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/UnboundedDataset.java#L61])
>
> ([DStream.scala|https://github.com/apache/spark/blob/v1.6.3/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala#L169])
> Storage level {{MEMORY_ONLY_SER}} means Spark will serialize the data using
> its configured serializer. Since we configure this to be Kryo in a hard coded
> fashion, this means the data will be serialized using Kryo.
> ([SparkContextFactory.java|https://github.com/apache/beam/blob/v2.0.0/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkContextFactory.java#L99-L99])
> Due to this, if your {{DStream}} contains non-Kryo-serializable data you will
> encounter Kryo serialization exceptions and your task will fail.
> Possible actions we should consider:
> # Remove the hard coded Spark serializer configuration, this should be taken
> from the user's configuration of Spark, no real reason for us to interfere
> with this.
> # Use the user's configured storage level configuration from
> {{SparkPipelineOptions}} when caching unbounded datasets ({{DStream}}s), same
> as we do for bounded datasets.
> # Make caching of re-used datasets configurable in {{SparkPipelineOptions}}
> (enable/disable). Although overloading our configuration with more options is
> always something not to be taken lightly.
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)