[
https://issues.apache.org/jira/browse/BEAM-11105?focusedWorklogId=765157&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-765157
]
ASF GitHub Bot logged work on BEAM-11105:
-----------------------------------------
Author: ASF GitHub Bot
Created on: 02/May/22 21:28
Start Date: 02/May/22 21:28
Worklog Time Spent: 10m
Work Description: lostluck commented on code in PR #17522:
URL: https://github.com/apache/beam/pull/17522#discussion_r863202615
##########
sdks/go/examples/snippets/04transforms.go:
##########
@@ -105,6 +108,64 @@ func (fn *splittableDoFn) ProcessElement(element string,
bf beam.BundleFinalizat
// [END bundlefinalization_simplecallback]
+// [START watermarkestimation_customestimator]
+
+// WatermarkState is a custom type.`
+//
+// It is optional to write your own state type when making a custom estimator.
+type WatermarkState struct {
+ Watermark time.Time
+}
+
+// CustomWatermarkEstimator is a custom watermark estimator.
+// You may use any type here, including some of Beam's built in watermark
estimator types,
+// e.g. sdf.WallTimeWatermarkEstimator,
sdf.TimestampObservingWatermarkEstimator, and sdf.ManualWatermarkEstimator
+type CustomWatermarkEstimator struct {
+ state WatermarkState
+}
+
+// CurrentWatermark returns the current watermark and is invoked on DoFn
splits and self-checkpoints.
+// Watermark estimators must implement CurrentWatermark() time.Time
+func (e *CustomWatermarkEstimator) CurrentWatermark() time.Time {
+ return e.state.Watermark
+}
+
+// ObserveTimestamp is called on the output timestamps of all
+// emitted elements to update the watermark. It is optional
+func (e *CustomWatermarkEstimator) ObserveTimestamp(ts time.Time) {
+ e.state.Watermark = ts
+}
+
+// InitialWatermarkEstimatorState defines an initial state used to initialize
the watermark
+// estimator. It is optional. If this is not defined, WatermarkEstimatorState
may not be
+// defined and CreateWatermarkEstimator must not take in parameters.
+func (fn *weDoFn) InitialWatermarkEstimatorState(et beam.EventTime, rest
offsetrange.Restriction, element string) WatermarkState {
+ // Return some watermark state
+ return WatermarkState{Watermark: time.Now()}
+}
+
+// CreateWatermarkEstimator creates the watermark estimator used by this
Splittable DoFn.
+// Must take in a state parameter if InitialWatermarkEstimatorState is
defined, otherwise takes no parameters.
+func (fn *weDoFn) CreateWatermarkEstimator(initialState WatermarkState)
*CustomWatermarkEstimator {
+ return &CustomWatermarkEstimator{state: initialState}
+}
+
+// WatermarkEstimatorState returns the state usedto resume future watermark
estimation
Review Comment:
```suggestion
// WatermarkEstimatorState returns the state used to resume future watermark
estimation
```
Issue Time Tracking
-------------------
Worklog Id: (was: 765157)
Time Spent: 14h (was: 13h 50m)
> [Go SDK] Watermark Estimation
> -----------------------------
>
> Key: BEAM-11105
> URL: https://issues.apache.org/jira/browse/BEAM-11105
> Project: Beam
> Issue Type: Sub-task
> Components: sdk-go
> Reporter: Robert Burke
> Assignee: Danny McCormick
> Priority: P3
> Time Spent: 14h
> Remaining Estimate: 0h
>
> Allow self checkpointing SplittableDoFns to specify a watermark estimator.
> (To be updated once [https://github.com/apache/beam/pull/13160] is merged and
> the programming guide updated with SDF content.)
--
This message was sent by Atlassian Jira
(v8.20.7#820007)