[
https://issues.apache.org/jira/browse/BEAM-7428?focusedWorklogId=262395&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-262395
]
ASF GitHub Bot logged work on BEAM-7428:
----------------------------------------
Author: ASF GitHub Bot
Created on: 18/Jun/19 15:45
Start Date: 18/Jun/19 15:45
Worklog Time Spent: 10m
Work Description: lukecwik commented on issue #8741: [BEAM-7428] Output
the timestamp on elements in ReadAllViaFileBasedSource
URL: https://github.com/apache/beam/pull/8741#issuecomment-503194658
In the Beam model, the watermark propagates forward since the input
watermark >= output watermark of each transform. The input watermark is a bound
saying that all data that is before me will be considered late while the output
watermark says that all data that is output before me will be considered late.
So lets say T1 provides data to T2, then
```
... >= input_watermark(T1) >= output_watermark(T1) >= input_watermark(T2) >=
ouptut_watermark(T2) >= ...
```
Note that the input/output watermarks are computed by the runner and aren't
ever exposed to the SDK. Whether something is late or ontime is figured out as
part of the GroupByKey. The runner also automatically advances the output
watermark when all the data for a given input watermark has been consumed
(watermark holds which is a special timer that isn't meant to be exposed to
SDKs and will be modeled properly with the resolution of
https://issues.apache.org/jira/browse/BEAM-2535 does allow the SDK to hold back
the watermark).
This seems all great but what about the roots of the pipeline. This is where
the `UnboundedSource` has the ability to report what it thinks the watermark
should be and the runner computes the watermark by taking the min across all
UnboundedSource instances for a particular root.
Another property of the Beam model is that if you output elements with a
timestamp that is before the output watermark, the data will be classified as
late (and depending on the trigger, may be dropped).
Combining these two properties of the Beam model, we get to the issue of
what timestamp we should output with.
My concern is that the proposed solution of using getCurrentTimestamp and if
it is unknown fallback to using the element timestamp can lead to data being
marked late by default in the case where ReadAllViaFileBasedSource is used in a
streaming pipeline.
The reason why I suggest to model this as an SDF is that I believe SDFs
should be able to hold the output watermark just like an UnboundedSource by
invoking ProcessContext.updateWatermark() which would then allow for
ReadAllViaFileBasedSource to output data without any of it being marked late.
Eugene believes that ProcessContext.updateWatermark() should only be used to
advance the output watermark faster then the input and to not be able to hold
the output watermark and hence the data could be marked as late.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 262395)
Time Spent: 5.5h (was: 5h 20m)
> ReadAllViaFileBasedSource does not output the timestamps of the read elements
> -----------------------------------------------------------------------------
>
> Key: BEAM-7428
> URL: https://issues.apache.org/jira/browse/BEAM-7428
> Project: Beam
> Issue Type: Bug
> Components: sdk-java-core
> Reporter: Ismaël Mejía
> Assignee: Ismaël Mejía
> Priority: Minor
> Time Spent: 5.5h
> Remaining Estimate: 0h
>
> This differs from the implementation of JavaReadViaImpulse that tackles a
> similar problem but does output the timestamps correctly.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)