[ 
https://issues.apache.org/jira/browse/BEAM-11105?focusedWorklogId=765157&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-765157
 ]

ASF GitHub Bot logged work on BEAM-11105:
-----------------------------------------

                Author: ASF GitHub Bot
            Created on: 02/May/22 21:28
            Start Date: 02/May/22 21:28
    Worklog Time Spent: 10m 
      Work Description: lostluck commented on code in PR #17522:
URL: https://github.com/apache/beam/pull/17522#discussion_r863202615


##########
sdks/go/examples/snippets/04transforms.go:
##########
@@ -105,6 +108,64 @@ func (fn *splittableDoFn) ProcessElement(element string, 
bf beam.BundleFinalizat
 
 // [END bundlefinalization_simplecallback]
 
+// [START watermarkestimation_customestimator]
+
+// WatermarkState is a custom type.`
+//
+// It is optional to write your own state type when making a custom estimator.
+type WatermarkState struct {
+       Watermark time.Time
+}
+
+// CustomWatermarkEstimator is a custom watermark estimator.
+// You may use any type here, including some of Beam's built in watermark 
estimator types,
+// e.g. sdf.WallTimeWatermarkEstimator, 
sdf.TimestampObservingWatermarkEstimator, and sdf.ManualWatermarkEstimator
+type CustomWatermarkEstimator struct {
+       state WatermarkState
+}
+
+// CurrentWatermark returns the current watermark and is invoked on DoFn 
splits and self-checkpoints.
+// Watermark estimators must implement CurrentWatermark() time.Time
+func (e *CustomWatermarkEstimator) CurrentWatermark() time.Time {
+       return e.state.Watermark
+}
+
+// ObserveTimestamp is called on the output timestamps of all
+// emitted elements to update the watermark. It is optional
+func (e *CustomWatermarkEstimator) ObserveTimestamp(ts time.Time) {
+       e.state.Watermark = ts
+}
+
+// InitialWatermarkEstimatorState defines an initial state used to initialize 
the watermark
+// estimator. It is optional. If this is not defined, WatermarkEstimatorState 
may not be
+// defined and CreateWatermarkEstimator must not take in parameters.
+func (fn *weDoFn) InitialWatermarkEstimatorState(et beam.EventTime, rest 
offsetrange.Restriction, element string) WatermarkState {
+       // Return some watermark state
+       return WatermarkState{Watermark: time.Now()}
+}
+
+// CreateWatermarkEstimator creates the watermark estimator used by this 
Splittable DoFn.
+// Must take in a state parameter if InitialWatermarkEstimatorState is 
defined, otherwise takes no parameters.
+func (fn *weDoFn) CreateWatermarkEstimator(initialState WatermarkState) 
*CustomWatermarkEstimator {
+       return &CustomWatermarkEstimator{state: initialState}
+}
+
+// WatermarkEstimatorState returns the state usedto resume future watermark 
estimation

Review Comment:
   ```suggestion
   // WatermarkEstimatorState returns the state used to resume future watermark 
estimation
   ```





Issue Time Tracking
-------------------

    Worklog Id:     (was: 765157)
    Time Spent: 14h  (was: 13h 50m)

> [Go SDK] Watermark Estimation
> -----------------------------
>
>                 Key: BEAM-11105
>                 URL: https://issues.apache.org/jira/browse/BEAM-11105
>             Project: Beam
>          Issue Type: Sub-task
>          Components: sdk-go
>            Reporter: Robert Burke
>            Assignee: Danny McCormick
>            Priority: P3
>          Time Spent: 14h
>  Remaining Estimate: 0h
>
> Allow self checkpointing SplittableDoFns to specify a watermark estimator.
> (To be updated once [https://github.com/apache/beam/pull/13160] is merged and 
> the programming guide updated with SDF content.)



--
This message was sent by Atlassian Jira
(v8.20.7#820007)

Reply via email to