This is an automated email from the ASF dual-hosted git repository.

lostluck pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 9e4c288627c [BEAM-11104] Add ProcessContinuation type to Go SDK 
(#17265)
9e4c288627c is described below

commit 9e4c288627cf4af4e2397f565d3e2f847e2f4900
Author: Jack McCluskey <[email protected]>
AuthorDate: Thu Apr 7 13:32:56 2022 -0400

    [BEAM-11104] Add ProcessContinuation type to Go SDK (#17265)
---
 sdks/go/pkg/beam/core/funcx/fn.go              | 32 ++++++++++---
 sdks/go/pkg/beam/core/funcx/fn_test.go         |  7 +++
 sdks/go/pkg/beam/core/sdf/continuation.go      | 62 ++++++++++++++++++++++++++
 sdks/go/pkg/beam/core/sdf/continuation_test.go | 39 ++++++++++++++++
 4 files changed, 134 insertions(+), 6 deletions(-)

diff --git a/sdks/go/pkg/beam/core/funcx/fn.go 
b/sdks/go/pkg/beam/core/funcx/fn.go
index b2f31cc1d18..213d423ba02 100644
--- a/sdks/go/pkg/beam/core/funcx/fn.go
+++ b/sdks/go/pkg/beam/core/funcx/fn.go
@@ -124,11 +124,12 @@ type ReturnKind int
 
 // The supported types of ReturnKind.
 const (
-       RetIllegal   ReturnKind = 0x0
-       RetEventTime ReturnKind = 0x1
-       RetValue     ReturnKind = 0x2
-       RetError     ReturnKind = 0x4
-       RetRTracker  ReturnKind = 0x8
+       RetIllegal             ReturnKind = 0x0
+       RetEventTime           ReturnKind = 0x1
+       RetValue               ReturnKind = 0x2
+       RetError               ReturnKind = 0x4
+       RetRTracker            ReturnKind = 0x8
+       RetProcessContinuation ReturnKind = 0x10
 )
 
 func (k ReturnKind) String() string {
@@ -141,6 +142,8 @@ func (k ReturnKind) String() string {
                return "EventTime"
        case RetValue:
                return "Value"
+       case RetProcessContinuation:
+               return "ProcessContinuation"
        default:
                return fmt.Sprintf("%v", int(k))
        }
@@ -302,6 +305,16 @@ func (u *Fn) OutEventTime() (pos int, exists bool) {
        return -1, false
 }
 
+// ProcessContinuation returns (index, true) iff the function returns a 
process continuation.
+func (u *Fn) ProcessContinuation() (pos int, exists bool) {
+       for i, p := range u.Ret {
+               if p.Kind == RetProcessContinuation {
+                       return i, true
+               }
+       }
+       return -1, false
+}
+
 // Params returns the parameter indices that matches the given mask.
 func (u *Fn) Params(mask FnParamKind) []int {
        var ret []int
@@ -392,6 +405,8 @@ func New(fn reflectx.Func) (*Fn, error) {
                        kind = RetError
                case t.Implements(reflect.TypeOf((*sdf.RTracker)(nil)).Elem()):
                        kind = RetRTracker
+               case 
t.Implements(reflect.TypeOf((*sdf.ProcessContinuation)(nil)).Elem()):
+                       kind = RetProcessContinuation
                case t == typex.EventTimeType:
                        kind = RetEventTime
                case typex.IsContainer(t), typex.IsConcrete(t), 
typex.IsUniversal(t):
@@ -601,6 +616,8 @@ func nextParamState(cur paramState, transition FnParamKind) 
(paramState, error)
 var (
        errEventTimeRetPrecedence = errors.New("beam.EventTime must be first 
return parameter")
        errErrorPrecedence        = errors.New("error must be the final return 
parameter")
+       // TODO(BEAM-11104): Enable process continuations as a valid return 
value.
+       errContinuationSupport = errors.New("process continuations are not 
supported in this SDK release; see 
https://issues.apache.org/jira/browse/BEAM-11104 for the feature's current 
status")
 )
 
 type retState int
@@ -610,6 +627,7 @@ const (
        rsEventTime
        rsOutput
        rsError
+       rsProcessContinuation
 )
 
 func nextRetState(cur retState, transition ReturnKind) (retState, error) {
@@ -619,7 +637,7 @@ func nextRetState(cur retState, transition ReturnKind) 
(retState, error) {
                case RetEventTime:
                        return rsEventTime, nil
                }
-       case rsEventTime, rsOutput:
+       case rsEventTime, rsOutput, rsProcessContinuation:
                // Identical to the default cases.
        case rsError:
                // This is a terminal state. No valid transitions. error must 
be the final return value.
@@ -631,6 +649,8 @@ func nextRetState(cur retState, transition ReturnKind) 
(retState, error) {
                return -1, errEventTimeRetPrecedence
        case RetValue, RetRTracker:
                return rsOutput, nil
+       case RetProcessContinuation:
+               return -1, errContinuationSupport
        case RetError:
                return rsError, nil
        default:
diff --git a/sdks/go/pkg/beam/core/funcx/fn_test.go 
b/sdks/go/pkg/beam/core/funcx/fn_test.go
index 6616796af8e..96509c6a063 100644
--- a/sdks/go/pkg/beam/core/funcx/fn_test.go
+++ b/sdks/go/pkg/beam/core/funcx/fn_test.go
@@ -23,6 +23,7 @@ import (
        "testing"
 
        "github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/mtime"
+       "github.com/apache/beam/sdks/v2/go/pkg/beam/core/sdf"
        "github.com/apache/beam/sdks/v2/go/pkg/beam/core/typex"
        "github.com/apache/beam/sdks/v2/go/pkg/beam/core/util/reflectx"
 )
@@ -112,6 +113,12 @@ func TestNew(t *testing.T) {
                        Param: []FnParamKind{FnContext, FnEventTime, FnType, 
FnEmit},
                        Ret:   []ReturnKind{RetError},
                },
+               {
+                       // TODO(BEAM-11104): Replace with a functioning test 
case once E2E support is finished.
+                       Name: "sdf",
+                       Fn:   func(sdf.RTracker, func(int)) 
(sdf.ProcessContinuation, error) { return nil, nil },
+                       Err:  errContinuationSupport,
+               },
                {
                        Name: "errContextParam: after input",
                        Fn:   func(string, context.Context) {},
diff --git a/sdks/go/pkg/beam/core/sdf/continuation.go 
b/sdks/go/pkg/beam/core/sdf/continuation.go
new file mode 100644
index 00000000000..c1107b5b816
--- /dev/null
+++ b/sdks/go/pkg/beam/core/sdf/continuation.go
@@ -0,0 +1,62 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package sdf
+
+import "time"
+
+// ProcessContinuation is an interface used to signal that a splittable DoFn 
should be
+// split and resumed at a later time. The ProcessContinuation can be returned 
from
+// a DoFn when it returns, either complete or needing to be resumed.
+type ProcessContinuation interface {
+       // ShouldResume returns a boolean indicating whether the process should 
be
+       // resumed at a later time.
+       ShouldResume() bool
+
+       // ResumeDelay returns a suggested time.Duration to wait before 
resuming the
+       // process. The runner is not guaranteed to follow this suggestion.
+       ResumeDelay() time.Duration
+}
+
+// defaultProcessContinuation is the SDK-default implementation of the 
ProcessContinuation
+// interface, encapsulating the basic behavior necessary to resume a process 
later.
+type defaultProcessContinuation struct {
+       resumes     bool
+       resumeDelay time.Duration
+}
+
+// ShouldResume returns whether or not the DefaultProcessContinuation should 
lead to the
+// process being resumed.
+func (p *defaultProcessContinuation) ShouldResume() bool {
+       return p.resumes
+}
+
+// ResumeDelay returns the suggested duration that should pass before the 
process is resumed.
+// If the process should not be resumed, the value returned here does not 
matter.
+func (p *defaultProcessContinuation) ResumeDelay() time.Duration {
+       return p.resumeDelay
+}
+
+// StopProcessing returns a ProcessContinuation that will not resume the 
process
+// later.
+func StopProcessing() ProcessContinuation {
+       return &defaultProcessContinuation{resumes: false, resumeDelay: 0 * 
time.Second}
+}
+
+// ResumeProcessingIn returns a ProcessContinuation that will resume the 
process
+// later with a suggested delay passed as a time.Duration.
+func ResumeProcessingIn(delay time.Duration) ProcessContinuation {
+       return &defaultProcessContinuation{resumes: true, resumeDelay: delay}
+}
diff --git a/sdks/go/pkg/beam/core/sdf/continuation_test.go 
b/sdks/go/pkg/beam/core/sdf/continuation_test.go
new file mode 100644
index 00000000000..7f7970d4785
--- /dev/null
+++ b/sdks/go/pkg/beam/core/sdf/continuation_test.go
@@ -0,0 +1,39 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package sdf
+
+import (
+       "testing"
+       "time"
+)
+
+func TestStopProcessing(t *testing.T) {
+       pc := StopProcessing()
+       if pc.ShouldResume() {
+               t.Error("ShouldResume() got true, want false")
+       }
+}
+
+func TestResumeProcessingIn(t *testing.T) {
+       dur := 10 * time.Second
+       pc := ResumeProcessingIn(dur)
+       if !pc.ShouldResume() {
+               t.Error("ShouldResume() got false, want true")
+       }
+       if got, want := pc.ResumeDelay(), dur; got != want {
+               t.Errorf("got ResumeDelay %v, want %v", got, want)
+       }
+}

Reply via email to