[ 
https://issues.apache.org/jira/browse/BEAM-11105?focusedWorklogId=757123&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-757123
 ]

ASF GitHub Bot logged work on BEAM-11105:
-----------------------------------------

                Author: ASF GitHub Bot
            Created on: 14/Apr/22 17:41
            Start Date: 14/Apr/22 17:41
    Worklog Time Spent: 10m 
      Work Description: damccorm opened a new pull request, #17374:
URL: https://github.com/apache/beam/pull/17374

   **Summary of Changes**
   
   This pr is a continuation of building watermark support in the Go Sdk. It 
builds on the work done in https://github.com/apache/beam/pull/17267 and 
introduces the ability for watermark estimators to have state that is persisted 
through splits. It includes the following changes:
   
   - Validation (graph/fn.go)
   - Code generation (genx/genx.go)
   - Execution (exec/sdf.go and exec/sdf_invokers.go)
   - Encoding changes (pardo.go)
   
   This is built according to the design specified here - 
https://docs.google.com/document/d/1DqCYJ-J1YGNelCRIcN5v6BQsZxJB2l5uWBmN4ti--Ew/edit?usp=sharing
   
   **Next Steps**
   
   This work does not attempt any of the following (though it is built in such 
a way that those should be cleanly supported in subsequent PRs):
   
   - Manual estimators (passed into ProcessElement)
   - Timestamp observing estimators (call ObserveTimestamp on each emitted 
timestamp)
   - Doc changes
   - ProcessContinuation support (because ProcessContinuations don't exist in 
the Go Sdk yet)
   
   **Manual Testing**
   
   On top of all the unit tests introduced, I hacked the existing textio sdf 
doFn and added the following function:
   
   ```
   type DannysEstimator struct {
        state time.Time
   }
   
   func (e *DannysEstimator) CurrentWatermark() time.Time {
        return e.state
   }
   
   func (fn *readSdfFn) GetInitialWatermarkEstimatorState(_ beam.EventTime, _ 
offsetrange.Restriction, _ string, _ int64) time.Time {
        // Return some watermark state
        return time.Now()
   }
   
   func (fn *readSdfFn) CreateWatermarkEstimator(initialState time.Time) 
*DannysEstimator {
        return &DannysEstimator{state: initialState}
   }
   
   func (fn *readSdfFn) GetWatermarkEstimatorState(e *DannysEstimator) 
time.Time {
        state := e.state
        return state.Add(7 * 24 * time.Hour)
   }
   ```
   
   and ran wordCount (with the textio sdf version) on dataflow so that I could 
observe the output watermark was getting correctly updated to the current wall 
clock time, and then was getting updated repeatedly on splits. Since I kept 
adding 7 days to the state every time we split, it eventually got to June:
   
   <img width="459" alt="image" 
src="https://user-images.githubusercontent.com/42773683/163442776-15a1e681-9c98-4028-b970-94d9e6b9b6f2.png";>
   
   ------------------------
   
   Thank you for your contribution! Follow this checklist to help us 
incorporate your contribution quickly and easily:
   
    - [ ] [**Choose 
reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and 
mention them in a comment (`R: @username`).
    - [x] Format the pull request title like `[BEAM-XXX] Fixes bug in 
ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA 
issue, if applicable. This will automatically link the pull request to the 
issue.
    - [ ] Update `CHANGES.md` with noteworthy changes.
    - [x] If this contribution is large, please file an Apache [Individual 
Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   See the [Contributor Guide](https://beam.apache.org/contribute) for more 
tips on [how to make review process 
smoother](https://beam.apache.org/contribute/#make-reviewers-job-easier).
   
   To check the build health, please visit 
[https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md](https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md)
   
   GitHub Actions Tests Status (on master branch)
   
------------------------------------------------------------------------------------------------
   [![Build python source distribution and 
wheels](https://github.com/apache/beam/workflows/Build%20python%20source%20distribution%20and%20wheels/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Build+python+source+distribution+and+wheels%22+branch%3Amaster+event%3Aschedule)
   [![Python 
tests](https://github.com/apache/beam/workflows/Python%20tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Python+Tests%22+branch%3Amaster+event%3Aschedule)
   [![Java 
tests](https://github.com/apache/beam/workflows/Java%20Tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Java+Tests%22+branch%3Amaster+event%3Aschedule)
   
   See [CI.md](https://github.com/apache/beam/blob/master/CI.md) for more 
information about GitHub Actions CI.
   




Issue Time Tracking
-------------------

    Worklog Id:     (was: 757123)
    Time Spent: 3h 50m  (was: 3h 40m)

> [Go SDK] Watermark Estimation
> -----------------------------
>
>                 Key: BEAM-11105
>                 URL: https://issues.apache.org/jira/browse/BEAM-11105
>             Project: Beam
>          Issue Type: Sub-task
>          Components: sdk-go
>            Reporter: Robert Burke
>            Assignee: Danny McCormick
>            Priority: P3
>          Time Spent: 3h 50m
>  Remaining Estimate: 0h
>
> Allow self checkpointing SplittableDoFns to specify a watermark estimator.
> (To be updated once [https://github.com/apache/beam/pull/13160] is merged and 
> the programming guide updated with SDF content.)



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to