[
https://issues.apache.org/jira/browse/BEAM-2359?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16073258#comment-16073258
]
Aviem Zur commented on BEAM-2359:
---------------------------------
Yeah, the PR was merged, marking as resolved.
> SparkTimerInternals inputWatermarkTime does not get updated in cluster mode
> ---------------------------------------------------------------------------
>
> Key: BEAM-2359
> URL: https://issues.apache.org/jira/browse/BEAM-2359
> Project: Beam
> Issue Type: Bug
> Components: runner-spark
> Reporter: Aviem Zur
> Assignee: Aviem Zur
> Fix For: 2.1.0
>
>
> {{SparkTimerInternals#inputWatermarkTime}} does not get updated in cluster
> mode.
> This causes windows to not get closed and state to increase forever in memory
> and processing time to increase leading to eventual application crash (also,
> triggers based on the watermark do not fire).
> The root cause is
> a call from within the {{updateStateByKey}} operation in
> [SparkGroupAlsoByWindowViaWindowSet|https://github.com/apache/beam/blob/master/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkGroupAlsoByWindowViaWindowSet.java#L241-L242]
> which tries to access a static reference to a {{GlobalWatermarkHolder}}
> broadcast variable, however, in cluster mode this static reference would be a
> different one in the executor's JVM and is null (this works in local mode
> since the executor and driver are on the same JVM).
> Alternative Solutions (And viability of solution):
> * -Broadcast variable passed to the {{updateStateByKey}} operator- - Not
> viable since even if we use the broadcast correctly, broadcast variables
> can't be used in this case (from within {{updateStateByKey}}) since
> {{updateStateByKey}} is a {{DStream}} operator and not an {{RDD}} operator so
> it will not be updated every micro-batch but rather will retain the same
> initial value.
> * -Broadcast variable to update the data in an additional transform- - Create
> an additional transform on the {{DStream}}'s RDDs prior to the {{DStream}}
> operator {{updateStateByKey}} and use a broadcast which will be updated
> (since this is an {{RDD}} operator), and add this value to the keyed datum
> itself so it will be available in the {{DStream}} operator
> {{updateStateByKey}}. Not viable since this will only update keys which have
> had new data appear in the microbatch, however we also want to update the
> watermark value for keys which did not have new data appear in the microbatch.
> * -Broadcast variable to update a static reference- - Create an additional
> transform on the {{DStream}}'s RDDs prior to the {{DStream}} operator
> {{updateStateByKey}} and use a broadcast which will be updated (since this is
> an {{RDD}} operator), and set this value in a static reference within the
> executor. Not viable since we cannot ensure that all executors will receive
> partitions to process in each microbatch.
> * Server to be polled lazily every microbatch from within the
> {{updateStateByKey}} operator - Spin a server on some configured port on the
> driver which will serve the current watermarks upon request. Lazily poll this
> value every microbatch from within the {{updateStateByKey}} operator and
> update a static reference within the executor. Viable, however does not use
> Spark native operations and incurs code maintenance for this and operational
> cost for the user (open ports in firewalls, etc.).
> * Drop/register watermarks as a block in BlockManager and request remote
> version from within the {{updateStateByKey}} operator - Update watermarks as
> a block in the BlockManager on the driver by dropping and reregistering the
> block every microbatch. Lazily poll this value every microbatch from within
> the {{updateStateByKey}} operator and update a static reference within the
> executor. Viable, less "ugly" than the server version and requires less
> operational cost.
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)