[ 
https://issues.apache.org/jira/browse/BEAM-11105?focusedWorklogId=765106&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-765106
 ]

ASF GitHub Bot logged work on BEAM-11105:
-----------------------------------------

                Author: ASF GitHub Bot
            Created on: 02/May/22 19:03
            Start Date: 02/May/22 19:03
    Worklog Time Spent: 10m 
      Work Description: lostluck commented on code in PR #17522:
URL: https://github.com/apache/beam/pull/17522#discussion_r863090010


##########
sdks/go/examples/snippets/04transforms.go:
##########
@@ -105,6 +106,52 @@ func (fn *splittableDoFn) ProcessElement(element string, 
bf beam.BundleFinalizat
 
 // [END bundlefinalization_simplecallback]
 
+// [START watermarkestimation_customestimator]
+
+// (Optional) - define a custom watermark state type.

Review Comment:
   Medium Nit: Authoring Go Code that breaks with Go Style for tutorial 
purposes.  Ideally we still maintain good Go Doc style for these. 
   
   eg.
   ```
   // WatermarkState is a custom type.`
   //
   // It is Optional to write your own state type when making a custom 
estimator.
   ```
   
   Feel free to push back if you feel it's better to buck the convention, but 
examples can and do get copypasted relentlessly.



##########
sdks/go/examples/snippets/04transforms.go:
##########
@@ -105,6 +106,52 @@ func (fn *splittableDoFn) ProcessElement(element string, 
bf beam.BundleFinalizat
 
 // [END bundlefinalization_simplecallback]
 
+// [START watermarkestimation_customestimator]
+
+// (Optional) - define a custom watermark state type.
+type WatermarkState struct {
+       Watermark time.Time
+}
+
+// Define a watermark estimator
+type CustomWatermarkEstimator struct {
+       state WatermarkState
+}
+
+// Watermark estimators must implement CurrentWatermark() time.Time
+func (e *CustomWatermarkEstimator) CurrentWatermark() time.Time {
+       return e.state.Watermark
+}
+
+// (Optional) Watermark estimators may implement ObserveTimestamp(time.time)
+// which will be called on the output timestamps of all emitted elements.
+func (e *CustomWatermarkEstimator) ObserveTimestamp(ts time.Time) {
+       e.state.Watermark = ts
+}
+
+// (Optional) Define an initial state to initialize your estimator with.
+// If this is not defined, GetWatermarkEstimatorState may not be defined and
+// CreateWatermarkEstimator must not take in parameters.
+func (fn *splittableDoFn) InitialWatermarkEstimatorState(et beam.EventTime, 
rest offsetrange.Restriction, element string) WatermarkState {
+       // Return some watermark state
+       return WatermarkState{Watermark: time.Now()}
+}
+
+// Create the watermark estimator used by this sdf. Must take in a state 
parameter if

Review Comment:
   ```suggestion
   // Create the watermark estimator used by this SplittableDoFn. Must take in 
a state parameter if
   ```



##########
sdks/go/examples/snippets/04transforms.go:
##########
@@ -105,6 +106,52 @@ func (fn *splittableDoFn) ProcessElement(element string, 
bf beam.BundleFinalizat
 
 // [END bundlefinalization_simplecallback]
 
+// [START watermarkestimation_customestimator]
+
+// (Optional) - define a custom watermark state type.
+type WatermarkState struct {
+       Watermark time.Time
+}
+
+// Define a watermark estimator
+type CustomWatermarkEstimator struct {
+       state WatermarkState
+}
+
+// Watermark estimators must implement CurrentWatermark() time.Time
+func (e *CustomWatermarkEstimator) CurrentWatermark() time.Time {
+       return e.state.Watermark
+}
+
+// (Optional) Watermark estimators may implement ObserveTimestamp(time.time)
+// which will be called on the output timestamps of all emitted elements.
+func (e *CustomWatermarkEstimator) ObserveTimestamp(ts time.Time) {
+       e.state.Watermark = ts
+}
+
+// (Optional) Define an initial state to initialize your estimator with.
+// If this is not defined, GetWatermarkEstimatorState may not be defined and

Review Comment:
   The "Get" prefixes are still around in these doc comments. NEed s bit of a 
cleanup.





Issue Time Tracking
-------------------

    Worklog Id:     (was: 765106)
    Time Spent: 13.5h  (was: 13h 20m)

> [Go SDK] Watermark Estimation
> -----------------------------
>
>                 Key: BEAM-11105
>                 URL: https://issues.apache.org/jira/browse/BEAM-11105
>             Project: Beam
>          Issue Type: Sub-task
>          Components: sdk-go
>            Reporter: Robert Burke
>            Assignee: Danny McCormick
>            Priority: P3
>          Time Spent: 13.5h
>  Remaining Estimate: 0h
>
> Allow self checkpointing SplittableDoFns to specify a watermark estimator.
> (To be updated once [https://github.com/apache/beam/pull/13160] is merged and 
> the programming guide updated with SDF content.)



--
This message was sent by Atlassian Jira
(v8.20.7#820007)

Reply via email to