This is an automated email from the ASF dual-hosted git repository.

lostluck pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 095190d5bcb [BEAM-11105] Add docs + CHANGES.md entry for Go Watermark 
Estimation (#17522)
095190d5bcb is described below

commit 095190d5bcb9985065d6a13b3e934cbc96f45637
Author: Danny McCormick <[email protected]>
AuthorDate: Mon May 2 19:00:58 2022 -0400

    [BEAM-11105] Add docs + CHANGES.md entry for Go Watermark Estimation 
(#17522)
---
 CHANGES.md                                         |  3 +-
 sdks/go/examples/snippets/04transforms.go          | 61 ++++++++++++++++++++++
 .../content/en/documentation/programming-guide.md  |  2 +-
 3 files changed, 63 insertions(+), 3 deletions(-)

diff --git a/CHANGES.md b/CHANGES.md
index 9c2c465e67e..b30632a5755 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -54,8 +54,7 @@
 
 ## Highlights
 
-* New highly anticipated feature X added to Python SDK 
([BEAM-X](https://issues.apache.org/jira/browse/BEAM-X)).
-* New highly anticipated feature Y added to Java SDK 
([BEAM-Y](https://issues.apache.org/jira/browse/BEAM-Y)).
+* Watermark estimation is now supported in the Go SDK 
([BEAM-11105](https://issues.apache.org/jira/browse/BEAM-11105))
 
 ## I/Os
 
diff --git a/sdks/go/examples/snippets/04transforms.go 
b/sdks/go/examples/snippets/04transforms.go
index 42a13bd5426..80ff7ed66aa 100644
--- a/sdks/go/examples/snippets/04transforms.go
+++ b/sdks/go/examples/snippets/04transforms.go
@@ -25,6 +25,7 @@ import (
 
        "github.com/apache/beam/sdks/v2/go/pkg/beam"
        "github.com/apache/beam/sdks/v2/go/pkg/beam/core/typex"
+       "github.com/apache/beam/sdks/v2/go/pkg/beam/io/rtrackers/offsetrange"
        "github.com/apache/beam/sdks/v2/go/pkg/beam/transforms/stats"
 )
 
@@ -91,6 +92,8 @@ func CreateAndSplit(s beam.Scope, input []stringPair) 
beam.PCollection {
 
 type splittableDoFn struct{}
 
+type weDoFn struct{}
+
 // [START bundlefinalization_simplecallback]
 
 func (fn *splittableDoFn) ProcessElement(element string, bf 
beam.BundleFinalization) {
@@ -105,6 +108,64 @@ func (fn *splittableDoFn) ProcessElement(element string, 
bf beam.BundleFinalizat
 
 // [END bundlefinalization_simplecallback]
 
+// [START watermarkestimation_customestimator]
+
+// WatermarkState is a custom type.`
+//
+// It is optional to write your own state type when making a custom estimator.
+type WatermarkState struct {
+       Watermark time.Time
+}
+
+// CustomWatermarkEstimator is a custom watermark estimator.
+// You may use any type here, including some of Beam's built in watermark 
estimator types,
+// e.g. sdf.WallTimeWatermarkEstimator, 
sdf.TimestampObservingWatermarkEstimator, and sdf.ManualWatermarkEstimator
+type CustomWatermarkEstimator struct {
+       state WatermarkState
+}
+
+// CurrentWatermark returns the current watermark and is invoked on DoFn 
splits and self-checkpoints.
+// Watermark estimators must implement CurrentWatermark() time.Time
+func (e *CustomWatermarkEstimator) CurrentWatermark() time.Time {
+       return e.state.Watermark
+}
+
+// ObserveTimestamp is called on the output timestamps of all
+// emitted elements to update the watermark. It is optional
+func (e *CustomWatermarkEstimator) ObserveTimestamp(ts time.Time) {
+       e.state.Watermark = ts
+}
+
+// InitialWatermarkEstimatorState defines an initial state used to initialize 
the watermark
+// estimator. It is optional. If this is not defined, WatermarkEstimatorState 
may not be
+// defined and CreateWatermarkEstimator must not take in parameters.
+func (fn *weDoFn) InitialWatermarkEstimatorState(et beam.EventTime, rest 
offsetrange.Restriction, element string) WatermarkState {
+       // Return some watermark state
+       return WatermarkState{Watermark: time.Now()}
+}
+
+// CreateWatermarkEstimator creates the watermark estimator used by this 
Splittable DoFn.
+// Must take in a state parameter if InitialWatermarkEstimatorState is 
defined, otherwise takes no parameters.
+func (fn *weDoFn) CreateWatermarkEstimator(initialState WatermarkState) 
*CustomWatermarkEstimator {
+       return &CustomWatermarkEstimator{state: initialState}
+}
+
+// WatermarkEstimatorState returns the state used to resume future watermark 
estimation
+// after a checkpoint/split. It is required if InitialWatermarkEstimatorState 
is defined,
+// otherwise it must not be defined.
+func (fn *weDoFn) WatermarkEstimatorState(e *CustomWatermarkEstimator) 
WatermarkState {
+       return e.state
+}
+
+// ProcessElement is the method to execute for each element.
+// It can optionally take in a watermark estimator.
+func (fn *weDoFn) ProcessElement(e *CustomWatermarkEstimator, element string) {
+       // ...
+       e.state.Watermark = time.Now()
+}
+
+// [END watermarkestimation_customestimator]
+
 // [START cogroupbykey_output_helpers]
 
 func formatCoGBKResults(key string, emailIter, phoneIter func(*string) bool) 
string {
diff --git a/website/www/site/content/en/documentation/programming-guide.md 
b/website/www/site/content/en/documentation/programming-guide.md
index fccf4b98b37..5515d3a3f30 100644
--- a/website/www/site/content/en/documentation/programming-guide.md
+++ b/website/www/site/content/en/documentation/programming-guide.md
@@ -6507,7 +6507,7 @@ watermark estimator implementation. You can also provide 
your own watermark esti
 {{< /highlight >}}
 
 {{< highlight go >}}
-This is not supported yet, see BEAM-11105.
+{{< code_sample "sdks/go/examples/snippets/04transforms.go" 
watermarkestimation_customestimator >}}
 {{< /highlight >}}
 
 ### 12.6. Truncating during drain {#truncating-during-drain}

Reply via email to