lostluck commented on code in PR #17522:
URL: https://github.com/apache/beam/pull/17522#discussion_r863202615
##########
sdks/go/examples/snippets/04transforms.go:
##########
@@ -105,6 +108,64 @@ func (fn *splittableDoFn) ProcessElement(element string,
bf beam.BundleFinalizat
// [END bundlefinalization_simplecallback]
+// [START watermarkestimation_customestimator]
+
+// WatermarkState is a custom type.`
+//
+// It is optional to write your own state type when making a custom estimator.
+type WatermarkState struct {
+ Watermark time.Time
+}
+
+// CustomWatermarkEstimator is a custom watermark estimator.
+// You may use any type here, including some of Beam's built in watermark
estimator types,
+// e.g. sdf.WallTimeWatermarkEstimator,
sdf.TimestampObservingWatermarkEstimator, and sdf.ManualWatermarkEstimator
+type CustomWatermarkEstimator struct {
+ state WatermarkState
+}
+
+// CurrentWatermark returns the current watermark and is invoked on DoFn
splits and self-checkpoints.
+// Watermark estimators must implement CurrentWatermark() time.Time
+func (e *CustomWatermarkEstimator) CurrentWatermark() time.Time {
+ return e.state.Watermark
+}
+
+// ObserveTimestamp is called on the output timestamps of all
+// emitted elements to update the watermark. It is optional
+func (e *CustomWatermarkEstimator) ObserveTimestamp(ts time.Time) {
+ e.state.Watermark = ts
+}
+
+// InitialWatermarkEstimatorState defines an initial state used to initialize
the watermark
+// estimator. It is optional. If this is not defined, WatermarkEstimatorState
may not be
+// defined and CreateWatermarkEstimator must not take in parameters.
+func (fn *weDoFn) InitialWatermarkEstimatorState(et beam.EventTime, rest
offsetrange.Restriction, element string) WatermarkState {
+ // Return some watermark state
+ return WatermarkState{Watermark: time.Now()}
+}
+
+// CreateWatermarkEstimator creates the watermark estimator used by this
Splittable DoFn.
+// Must take in a state parameter if InitialWatermarkEstimatorState is
defined, otherwise takes no parameters.
+func (fn *weDoFn) CreateWatermarkEstimator(initialState WatermarkState)
*CustomWatermarkEstimator {
+ return &CustomWatermarkEstimator{state: initialState}
+}
+
+// WatermarkEstimatorState returns the state usedto resume future watermark
estimation
Review Comment:
```suggestion
// WatermarkEstimatorState returns the state used to resume future watermark
estimation
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]