lostluck commented on code in PR #17522:
URL: https://github.com/apache/beam/pull/17522#discussion_r863090010


##########
sdks/go/examples/snippets/04transforms.go:
##########
@@ -105,6 +106,52 @@ func (fn *splittableDoFn) ProcessElement(element string, 
bf beam.BundleFinalizat
 
 // [END bundlefinalization_simplecallback]
 
+// [START watermarkestimation_customestimator]
+
+// (Optional) - define a custom watermark state type.

Review Comment:
   Medium Nit: Authoring Go Code that breaks with Go Style for tutorial 
purposes.  Ideally we still maintain good Go Doc style for these. 
   
   eg.
   ```
   // WatermarkState is a custom type.`
   //
   // It is Optional to write your own state type when making a custom 
estimator.
   ```
   
   Feel free to push back if you feel it's better to buck the convention, but 
examples can and do get copypasted relentlessly.



##########
sdks/go/examples/snippets/04transforms.go:
##########
@@ -105,6 +106,52 @@ func (fn *splittableDoFn) ProcessElement(element string, 
bf beam.BundleFinalizat
 
 // [END bundlefinalization_simplecallback]
 
+// [START watermarkestimation_customestimator]
+
+// (Optional) - define a custom watermark state type.
+type WatermarkState struct {
+       Watermark time.Time
+}
+
+// Define a watermark estimator
+type CustomWatermarkEstimator struct {
+       state WatermarkState
+}
+
+// Watermark estimators must implement CurrentWatermark() time.Time
+func (e *CustomWatermarkEstimator) CurrentWatermark() time.Time {
+       return e.state.Watermark
+}
+
+// (Optional) Watermark estimators may implement ObserveTimestamp(time.time)
+// which will be called on the output timestamps of all emitted elements.
+func (e *CustomWatermarkEstimator) ObserveTimestamp(ts time.Time) {
+       e.state.Watermark = ts
+}
+
+// (Optional) Define an initial state to initialize your estimator with.
+// If this is not defined, GetWatermarkEstimatorState may not be defined and
+// CreateWatermarkEstimator must not take in parameters.
+func (fn *splittableDoFn) InitialWatermarkEstimatorState(et beam.EventTime, 
rest offsetrange.Restriction, element string) WatermarkState {
+       // Return some watermark state
+       return WatermarkState{Watermark: time.Now()}
+}
+
+// Create the watermark estimator used by this sdf. Must take in a state 
parameter if

Review Comment:
   ```suggestion
   // Create the watermark estimator used by this SplittableDoFn. Must take in 
a state parameter if
   ```



##########
sdks/go/examples/snippets/04transforms.go:
##########
@@ -105,6 +106,52 @@ func (fn *splittableDoFn) ProcessElement(element string, 
bf beam.BundleFinalizat
 
 // [END bundlefinalization_simplecallback]
 
+// [START watermarkestimation_customestimator]
+
+// (Optional) - define a custom watermark state type.
+type WatermarkState struct {
+       Watermark time.Time
+}
+
+// Define a watermark estimator
+type CustomWatermarkEstimator struct {
+       state WatermarkState
+}
+
+// Watermark estimators must implement CurrentWatermark() time.Time
+func (e *CustomWatermarkEstimator) CurrentWatermark() time.Time {
+       return e.state.Watermark
+}
+
+// (Optional) Watermark estimators may implement ObserveTimestamp(time.time)
+// which will be called on the output timestamps of all emitted elements.
+func (e *CustomWatermarkEstimator) ObserveTimestamp(ts time.Time) {
+       e.state.Watermark = ts
+}
+
+// (Optional) Define an initial state to initialize your estimator with.
+// If this is not defined, GetWatermarkEstimatorState may not be defined and

Review Comment:
   The "Get" prefixes are still around in these doc comments. NEed s bit of a 
cleanup.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to