[
https://issues.apache.org/jira/browse/BEAM-11105?focusedWorklogId=765106&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-765106
]
ASF GitHub Bot logged work on BEAM-11105:
-----------------------------------------
Author: ASF GitHub Bot
Created on: 02/May/22 19:03
Start Date: 02/May/22 19:03
Worklog Time Spent: 10m
Work Description: lostluck commented on code in PR #17522:
URL: https://github.com/apache/beam/pull/17522#discussion_r863090010
##########
sdks/go/examples/snippets/04transforms.go:
##########
@@ -105,6 +106,52 @@ func (fn *splittableDoFn) ProcessElement(element string,
bf beam.BundleFinalizat
// [END bundlefinalization_simplecallback]
+// [START watermarkestimation_customestimator]
+
+// (Optional) - define a custom watermark state type.
Review Comment:
Medium Nit: Authoring Go Code that breaks with Go Style for tutorial
purposes. Ideally we still maintain good Go Doc style for these.
eg.
```
// WatermarkState is a custom type.`
//
// It is Optional to write your own state type when making a custom
estimator.
```
Feel free to push back if you feel it's better to buck the convention, but
examples can and do get copypasted relentlessly.
##########
sdks/go/examples/snippets/04transforms.go:
##########
@@ -105,6 +106,52 @@ func (fn *splittableDoFn) ProcessElement(element string,
bf beam.BundleFinalizat
// [END bundlefinalization_simplecallback]
+// [START watermarkestimation_customestimator]
+
+// (Optional) - define a custom watermark state type.
+type WatermarkState struct {
+ Watermark time.Time
+}
+
+// Define a watermark estimator
+type CustomWatermarkEstimator struct {
+ state WatermarkState
+}
+
+// Watermark estimators must implement CurrentWatermark() time.Time
+func (e *CustomWatermarkEstimator) CurrentWatermark() time.Time {
+ return e.state.Watermark
+}
+
+// (Optional) Watermark estimators may implement ObserveTimestamp(time.time)
+// which will be called on the output timestamps of all emitted elements.
+func (e *CustomWatermarkEstimator) ObserveTimestamp(ts time.Time) {
+ e.state.Watermark = ts
+}
+
+// (Optional) Define an initial state to initialize your estimator with.
+// If this is not defined, GetWatermarkEstimatorState may not be defined and
+// CreateWatermarkEstimator must not take in parameters.
+func (fn *splittableDoFn) InitialWatermarkEstimatorState(et beam.EventTime,
rest offsetrange.Restriction, element string) WatermarkState {
+ // Return some watermark state
+ return WatermarkState{Watermark: time.Now()}
+}
+
+// Create the watermark estimator used by this sdf. Must take in a state
parameter if
Review Comment:
```suggestion
// Create the watermark estimator used by this SplittableDoFn. Must take in
a state parameter if
```
##########
sdks/go/examples/snippets/04transforms.go:
##########
@@ -105,6 +106,52 @@ func (fn *splittableDoFn) ProcessElement(element string,
bf beam.BundleFinalizat
// [END bundlefinalization_simplecallback]
+// [START watermarkestimation_customestimator]
+
+// (Optional) - define a custom watermark state type.
+type WatermarkState struct {
+ Watermark time.Time
+}
+
+// Define a watermark estimator
+type CustomWatermarkEstimator struct {
+ state WatermarkState
+}
+
+// Watermark estimators must implement CurrentWatermark() time.Time
+func (e *CustomWatermarkEstimator) CurrentWatermark() time.Time {
+ return e.state.Watermark
+}
+
+// (Optional) Watermark estimators may implement ObserveTimestamp(time.time)
+// which will be called on the output timestamps of all emitted elements.
+func (e *CustomWatermarkEstimator) ObserveTimestamp(ts time.Time) {
+ e.state.Watermark = ts
+}
+
+// (Optional) Define an initial state to initialize your estimator with.
+// If this is not defined, GetWatermarkEstimatorState may not be defined and
Review Comment:
The "Get" prefixes are still around in these doc comments. NEed s bit of a
cleanup.
Issue Time Tracking
-------------------
Worklog Id: (was: 765106)
Time Spent: 13.5h (was: 13h 20m)
> [Go SDK] Watermark Estimation
> -----------------------------
>
> Key: BEAM-11105
> URL: https://issues.apache.org/jira/browse/BEAM-11105
> Project: Beam
> Issue Type: Sub-task
> Components: sdk-go
> Reporter: Robert Burke
> Assignee: Danny McCormick
> Priority: P3
> Time Spent: 13.5h
> Remaining Estimate: 0h
>
> Allow self checkpointing SplittableDoFns to specify a watermark estimator.
> (To be updated once [https://github.com/apache/beam/pull/13160] is merged and
> the programming guide updated with SDF content.)
--
This message was sent by Atlassian Jira
(v8.20.7#820007)