[
https://issues.apache.org/jira/browse/BEAM-11105?focusedWorklogId=762825&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-762825
]
ASF GitHub Bot logged work on BEAM-11105:
-----------------------------------------
Author: ASF GitHub Bot
Created on: 27/Apr/22 12:10
Start Date: 27/Apr/22 12:10
Worklog Time Spent: 10m
Work Description: damccorm opened a new pull request, #17475:
URL: https://github.com/apache/beam/pull/17475
**Summary of Changes**
This pr is a continuation of building watermark support in the Go Sdk. It
builds on the work done in https://github.com/apache/beam/pull/17374 and
introduces the ability for watermark estimators to be invoked from
ProcessElement. It includes the following changes:
- Validation (fn/fn.go, fn/fn_test.go, graph/fn.go, graph/fn_test.go)
- Execution (exec/fn.go and exec/pardo.go)
- Code generation (exec/fn_arity.tmpl, exec/fn_arity.go,
reflectx/calls.tmpl, reflectx/calls.go)
- Concrete implementation of a manual estimator (sdf/watermark_estimator.go)
This is built according to the design specified here -
https://docs.google.com/document/d/1DqCYJ-J1YGNelCRIcN5v6BQsZxJB2l5uWBmN4ti--Ew/edit?usp=sharing
**Next Steps**
The following work remains to be implemented after this is completed
- Timestamp observing estimators (call ObserveTimestamp on each emitted
timestamp)
- Doc changes
**Manual Testing**
On top of all the unit tests introduced, I hacked the existing textio sdf
doFn and added the following function:
```
func (fn *readSdfFn) CreateWatermarkEstimator(t time.Time)
*sdf.ManualWatermarkEstimator {
return &sdf.ManualWatermarkEstimator{State: t}
}
func (fn *readSdfFn) InitialWatermarkEstimatorState(_ beam.EventTime, _
offsetrange.Restriction, _ string, _ int64) time.Time {
return time.Now()
}
func (fn *readSdfFn) WatermarkEstimatorState(e
*sdf.ManualWatermarkEstimator) time.Time {
return e.State
}
```
and added the following to ProcessElement:
```
func (fn *readSdfFn) ProcessElement(ctx context.Context, we
*sdf.ManualWatermarkEstimator, rt *sdf.LockRTracker, filename string, _ int64,
emit func(string)) error {
we.UpdateWatermark(time.Now().AddDate(0, 1, 0)) // Add 1 month to
current time
```
and ran wordCount (with the textio sdf version) on dataflow so that I could
observe the output watermark was getting correctly updated to the current wall
clock time + 1 month:
<img width="418" alt="image"
src="https://user-images.githubusercontent.com/42773683/165515083-bb9f5a55-3a42-4a5c-9eb1-762dda6f383a.png">
------------------------
Thank you for your contribution! Follow this checklist to help us
incorporate your contribution quickly and easily:
- [ ] [**Choose
reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and
mention them in a comment (`R: @username`).
- [x] Format the pull request title like `[BEAM-XXX] Fixes bug in
ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA
issue, if applicable. This will automatically link the pull request to the
issue.
- [ ] Update `CHANGES.md` with noteworthy changes.
- [x] If this contribution is large, please file an Apache [Individual
Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
See the [Contributor Guide](https://beam.apache.org/contribute) for more
tips on [how to make review process
smoother](https://beam.apache.org/contribute/#make-reviewers-job-easier).
To check the build health, please visit
[https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md](https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md)
GitHub Actions Tests Status (on master branch)
------------------------------------------------------------------------------------------------
[](https://github.com/apache/beam/actions?query=workflow%3A%22Build+python+source+distribution+and+wheels%22+branch%3Amaster+event%3Aschedule)
[](https://github.com/apache/beam/actions?query=workflow%3A%22Python+Tests%22+branch%3Amaster+event%3Aschedule)
[](https://github.com/apache/beam/actions?query=workflow%3A%22Java+Tests%22+branch%3Amaster+event%3Aschedule)
See [CI.md](https://github.com/apache/beam/blob/master/CI.md) for more
information about GitHub Actions CI.
Issue Time Tracking
-------------------
Worklog Id: (was: 762825)
Time Spent: 6h 20m (was: 6h 10m)
> [Go SDK] Watermark Estimation
> -----------------------------
>
> Key: BEAM-11105
> URL: https://issues.apache.org/jira/browse/BEAM-11105
> Project: Beam
> Issue Type: Sub-task
> Components: sdk-go
> Reporter: Robert Burke
> Assignee: Danny McCormick
> Priority: P3
> Time Spent: 6h 20m
> Remaining Estimate: 0h
>
> Allow self checkpointing SplittableDoFns to specify a watermark estimator.
> (To be updated once [https://github.com/apache/beam/pull/13160] is merged and
> the programming guide updated with SDF content.)
--
This message was sent by Atlassian Jira
(v8.20.7#820007)