[ 
https://issues.apache.org/jira/browse/BEAM-2789?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stas Levin updated BEAM-2789:
-----------------------------
    Description: 
The watermark is updated by the driver like so:

{code:java}
blockManager.removeBlock(WATERMARKS_BLOCK_ID, true);
blockManager.putSingle(WATERMARKS_BLOCK_ID, newValues, 
StorageLevel.MEMORY_ONLY(), 
true);
{code}

However, these operations are neither synchronous nor atomic, so if an executor 
requests the watermark values before they are actually put but after they have 
been removed, it may get a {{null}} as a response, which will make it default 
to negative infinity as the watermark. This can result in an erroneous results.

To overcome this in tests, a workaround which assumes a single JVM setting is 
used. In such a setting the watermark values are stored in a static member, 
accessible by both the driver and the executors, bypassing the 
{{BlockManager#putSingle(...)}} {{BlockManager#removeBlock(...)}} APIs.

  was:
The watermark is updated by the driver like so:

{code:java}
blockManager.removeBlock(WATERMARKS_BLOCK_ID, true);
blockManager.putSingle(WATERMARKS_BLOCK_ID, newValues, 
StorageLevel.MEMORY_ONLY(), 
true);
{code}

However, these operations are neither synchronous nor atomic, so if an executor 
requests the watermark values before they are actually put but after they have 
been removed, it may get a {{null}} as a response, which will make it default 
to negative infinity as the watermark. This can result in an erroneous results.


> Watermark can become unavailable for executors while it's updated with new 
> values
> ---------------------------------------------------------------------------------
>
>                 Key: BEAM-2789
>                 URL: https://issues.apache.org/jira/browse/BEAM-2789
>             Project: Beam
>          Issue Type: Bug
>          Components: runner-spark
>    Affects Versions: 2.0.0, 2.1.0
>            Reporter: Stas Levin
>            Assignee: Amit Sela
>
> The watermark is updated by the driver like so:
> {code:java}
> blockManager.removeBlock(WATERMARKS_BLOCK_ID, true);
> blockManager.putSingle(WATERMARKS_BLOCK_ID, newValues, 
> StorageLevel.MEMORY_ONLY(), 
> true);
> {code}
> However, these operations are neither synchronous nor atomic, so if an 
> executor requests the watermark values before they are actually put but after 
> they have been removed, it may get a {{null}} as a response, which will make 
> it default to negative infinity as the watermark. This can result in an 
> erroneous results.
> To overcome this in tests, a workaround which assumes a single JVM setting is 
> used. In such a setting the watermark values are stored in a static member, 
> accessible by both the driver and the executors, bypassing the 
> {{BlockManager#putSingle(...)}} {{BlockManager#removeBlock(...)}} APIs.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to