[ 
https://issues.apache.org/jira/browse/BEAM-2359?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aviem Zur updated BEAM-2359:
----------------------------
    Description: 
{{SparkTimerInternals#inputWatermarkTime}} does not get updated in cluster mode.

This causes windows to not get closed and state to increase forever in memory 
and processing time to increase leading to eventual application crash (also, 
triggers based on the watermark do not fire).

The root cause is 
a call from within the {{updateStateByKey}} operation in 
[SparkGroupAlsoByWindowViaWindowSet|https://github.com/apache/beam/blob/master/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkGroupAlsoByWindowViaWindowSet.java#L241-L242]
 which tries to access a static reference to a {{GlobalWatermarkHolder}} 
broadcast variable, however, in cluster mode this static reference would be a 
different one in the executor's JVM and is null (this works in local mode since 
the executor and driver are on the same JVM).

Alternative Solutions (And viability of solution):
* -Broadcast variable passed to the {{updateStateByKey}} operator- - Not viable 
since even if we use the broadcast correctly, broadcast variables can't be used 
in this case (from within {{updateStateByKey}}) since  {{updateStateByKey}} is 
a {{DStream}} operator and not an {{RDD}} operator so it will not be updated 
every micro-batch but rather will retain the same initial value.
* -Broadcast variable to update the data in an additional transform- - Create 
an additional transform on the {{DStream}}'s RDDs prior to the {{DStream}} 
operator {{updateStateByKey}} and use a broadcast which will be updated (since 
this is an {{RDD}} operator), and add this value to the keyed datum itself so 
it will be available in the {{DStream}} operator {{updateStateByKey}}. Not 
viable since this will only update keys which have had new data appear in the 
microbatch, however we also want to update the watermark value for keys which 
did not have new data appear in the microbatch.
* -Broadcast variable to update a static reference- - Create an additional 
transform on the {{DStream}}'s RDDs prior to the {{DStream}} operator 
{{updateStateByKey}} and use a broadcast which will be updated (since this is 
an {{RDD}} operator), and set this value in a static reference within the 
executor. Not viable since we cannot ensure that all executor's will receive 
partitions to process in each microbatch.
* Server to be polled lazily every microbatch from within the 
{{updateStateByKey}} operator - Spin a server on some configured port on the 
driver which will serve the current watermarks upon request. Lazily poll this 
value every microbatch from within the {{updateStateByKey}} operator and update 
a static reference within the executor. Viable, however does not use Spark 
native operations and incurs code maintenance for this and operational cost for 
the user (open ports in firewalls, etc.).
* Drop/register watermarks as a block in BlockManager and request remote 
version from within the {{updateStateByKey}} operator - Update watermarks as a 
block in the BlockManager on the driver by dropping and reregistering the block 
every microbatch. Lazily poll this value every microbatch from within the 
{{updateStateByKey}} operator and update a static reference within the 
executor. Viable, less "ugly" than the server version and requires less 
operational cost.

  was:
{{SparkTimerInternals#inputWatermarkTime}} does not get updated in cluster mode.

This causes windows to not get closed and state to increase forever in memory 
and processing time to increase leading to eventual application crash (also, 
triggers based on the watermark do not fire).

The root cause is 
a call from within the {{updateStateByKey}} operation in 
[SparkGroupAlsoByWindowViaWindowSet|https://github.com/apache/beam/blob/master/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkGroupAlsoByWindowViaWindowSet.java#L241-L242]
 which tries to access a static reference to a {{GlobalWatermarkHolder}} 
broadcast variable, however, in cluster mode this static reference would be a 
different one in the executor's JVM and is null (this works in local mode since 
the executor and driver are on the same JVM).

The fix is not trivial since even if we use the broadcast correctly, broadcast 
variables can't be used in this case (from within {{updateStateByKey}}) since  
{{updateStateByKey}} is a {{DStream}} operator and not an {{RDD}} operator so 
it will not be updated every micro-batch but rather will retain the same 
initial value.


> SparkTimerInternals inputWatermarkTime does not get updated in cluster mode
> ---------------------------------------------------------------------------
>
>                 Key: BEAM-2359
>                 URL: https://issues.apache.org/jira/browse/BEAM-2359
>             Project: Beam
>          Issue Type: Bug
>          Components: runner-spark
>            Reporter: Aviem Zur
>            Assignee: Amit Sela
>             Fix For: 2.1.0
>
>
> {{SparkTimerInternals#inputWatermarkTime}} does not get updated in cluster 
> mode.
> This causes windows to not get closed and state to increase forever in memory 
> and processing time to increase leading to eventual application crash (also, 
> triggers based on the watermark do not fire).
> The root cause is 
> a call from within the {{updateStateByKey}} operation in 
> [SparkGroupAlsoByWindowViaWindowSet|https://github.com/apache/beam/blob/master/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkGroupAlsoByWindowViaWindowSet.java#L241-L242]
>  which tries to access a static reference to a {{GlobalWatermarkHolder}} 
> broadcast variable, however, in cluster mode this static reference would be a 
> different one in the executor's JVM and is null (this works in local mode 
> since the executor and driver are on the same JVM).
> Alternative Solutions (And viability of solution):
> * -Broadcast variable passed to the {{updateStateByKey}} operator- - Not 
> viable since even if we use the broadcast correctly, broadcast variables 
> can't be used in this case (from within {{updateStateByKey}}) since  
> {{updateStateByKey}} is a {{DStream}} operator and not an {{RDD}} operator so 
> it will not be updated every micro-batch but rather will retain the same 
> initial value.
> * -Broadcast variable to update the data in an additional transform- - Create 
> an additional transform on the {{DStream}}'s RDDs prior to the {{DStream}} 
> operator {{updateStateByKey}} and use a broadcast which will be updated 
> (since this is an {{RDD}} operator), and add this value to the keyed datum 
> itself so it will be available in the {{DStream}} operator 
> {{updateStateByKey}}. Not viable since this will only update keys which have 
> had new data appear in the microbatch, however we also want to update the 
> watermark value for keys which did not have new data appear in the microbatch.
> * -Broadcast variable to update a static reference- - Create an additional 
> transform on the {{DStream}}'s RDDs prior to the {{DStream}} operator 
> {{updateStateByKey}} and use a broadcast which will be updated (since this is 
> an {{RDD}} operator), and set this value in a static reference within the 
> executor. Not viable since we cannot ensure that all executor's will receive 
> partitions to process in each microbatch.
> * Server to be polled lazily every microbatch from within the 
> {{updateStateByKey}} operator - Spin a server on some configured port on the 
> driver which will serve the current watermarks upon request. Lazily poll this 
> value every microbatch from within the {{updateStateByKey}} operator and 
> update a static reference within the executor. Viable, however does not use 
> Spark native operations and incurs code maintenance for this and operational 
> cost for the user (open ports in firewalls, etc.).
> * Drop/register watermarks as a block in BlockManager and request remote 
> version from within the {{updateStateByKey}} operator - Update watermarks as 
> a block in the BlockManager on the driver by dropping and reregistering the 
> block every microbatch. Lazily poll this value every microbatch from within 
> the {{updateStateByKey}} operator and update a static reference within the 
> executor. Viable, less "ugly" than the server version and requires less 
> operational cost.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to