lostluck commented on code in PR #17522:
URL: https://github.com/apache/beam/pull/17522#discussion_r863202615


##########
sdks/go/examples/snippets/04transforms.go:
##########
@@ -105,6 +108,64 @@ func (fn *splittableDoFn) ProcessElement(element string, 
bf beam.BundleFinalizat
 
 // [END bundlefinalization_simplecallback]
 
+// [START watermarkestimation_customestimator]
+
+// WatermarkState is a custom type.`
+//
+// It is optional to write your own state type when making a custom estimator.
+type WatermarkState struct {
+       Watermark time.Time
+}
+
+// CustomWatermarkEstimator is a custom watermark estimator.
+// You may use any type here, including some of Beam's built in watermark 
estimator types,
+// e.g. sdf.WallTimeWatermarkEstimator, 
sdf.TimestampObservingWatermarkEstimator, and sdf.ManualWatermarkEstimator
+type CustomWatermarkEstimator struct {
+       state WatermarkState
+}
+
+// CurrentWatermark returns the current watermark and is invoked on DoFn 
splits and self-checkpoints.
+// Watermark estimators must implement CurrentWatermark() time.Time
+func (e *CustomWatermarkEstimator) CurrentWatermark() time.Time {
+       return e.state.Watermark
+}
+
+// ObserveTimestamp is called on the output timestamps of all
+// emitted elements to update the watermark. It is optional
+func (e *CustomWatermarkEstimator) ObserveTimestamp(ts time.Time) {
+       e.state.Watermark = ts
+}
+
+// InitialWatermarkEstimatorState defines an initial state used to initialize 
the watermark
+// estimator. It is optional. If this is not defined, WatermarkEstimatorState 
may not be
+// defined and CreateWatermarkEstimator must not take in parameters.
+func (fn *weDoFn) InitialWatermarkEstimatorState(et beam.EventTime, rest 
offsetrange.Restriction, element string) WatermarkState {
+       // Return some watermark state
+       return WatermarkState{Watermark: time.Now()}
+}
+
+// CreateWatermarkEstimator creates the watermark estimator used by this 
Splittable DoFn.
+// Must take in a state parameter if InitialWatermarkEstimatorState is 
defined, otherwise takes no parameters.
+func (fn *weDoFn) CreateWatermarkEstimator(initialState WatermarkState) 
*CustomWatermarkEstimator {
+       return &CustomWatermarkEstimator{state: initialState}
+}
+
+// WatermarkEstimatorState returns the state usedto resume future watermark 
estimation

Review Comment:
   ```suggestion
   // WatermarkEstimatorState returns the state used to resume future watermark 
estimation
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to