This is an automated email from the ASF dual-hosted git repository.

yichi pushed a commit to branch release-2.39.0
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/release-2.39.0 by this push:
     new 8aa77e61e5f Revert "[BEAM-11104] Enable ProcessContinuation return 
values, add unit test cases (#17533)" (#17562)
8aa77e61e5f is described below

commit 8aa77e61e5ffc1efdc07772c591d4ab0a336533d
Author: Jack McCluskey <[email protected]>
AuthorDate: Thu May 5 15:11:44 2022 -0400

    Revert "[BEAM-11104] Enable ProcessContinuation return values, add unit 
test cases (#17533)" (#17562)
    
    This reverts commit 6a50718364282b4e0a2fb266a7d6767e50961607.
---
 CHANGES.md                                    |  1 -
 sdks/go/pkg/beam/core/funcx/fn.go             | 18 +++-----
 sdks/go/pkg/beam/core/funcx/fn_test.go        | 15 ++-----
 sdks/go/pkg/beam/core/runtime/exec/fn_test.go | 63 ++-------------------------
 4 files changed, 13 insertions(+), 84 deletions(-)

diff --git a/CHANGES.md b/CHANGES.md
index 8071b24be4a..ffd6cd32024 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -67,7 +67,6 @@
 
 * 'Manage Clusters' JupyterLab extension added for users to configure usage of 
Dataproc clusters managed by Interactive Beam (Python) 
([BEAM-14130](https://issues.apache.org/jira/browse/BEAM-14130)).
 * Pipeline drain support added for Go SDK 
([BEAM-11106](https://issues.apache.org/jira/browse/BEAM-11106)). **Note: this 
feature is not yet fully validated and should be treated as experimental in 
this release.**
-* Go SDK users may now write self-checkpointing Splittable DoFns to read from 
streaming sources. **Note: this feature is not yet fully validated and should 
be treated as experimental in this release.** 
([BEAM-11104](https://issues.apache.org/jira/browse/BEAM-11104))
 
 ## Breaking Changes
 
diff --git a/sdks/go/pkg/beam/core/funcx/fn.go 
b/sdks/go/pkg/beam/core/funcx/fn.go
index 7d39aa1ce88..2580b9bbd9f 100644
--- a/sdks/go/pkg/beam/core/funcx/fn.go
+++ b/sdks/go/pkg/beam/core/funcx/fn.go
@@ -654,9 +654,10 @@ func nextParamState(cur paramState, transition 
FnParamKind) (paramState, error)
 }
 
 var (
-       errEventTimeRetPrecedence        = errors.New("beam.EventTime must be 
first return parameter")
-       errErrorPrecedence               = errors.New("error must be the final 
return parameter")
-       errProcessContinuationPrecedence = errors.New("ProcessContinuation must 
be the final non-error return parameter")
+       errEventTimeRetPrecedence = errors.New("beam.EventTime must be first 
return parameter")
+       errErrorPrecedence        = errors.New("error must be the final return 
parameter")
+       // TODO(BEAM-11104): Enable process continuations as a valid return 
value.
+       errContinuationSupport = errors.New("process continuations are not 
supported in this SDK release; see 
https://issues.apache.org/jira/browse/BEAM-11104 for the feature's current 
status")
 )
 
 type retState int
@@ -676,15 +677,8 @@ func nextRetState(cur retState, transition ReturnKind) 
(retState, error) {
                case RetEventTime:
                        return rsEventTime, nil
                }
-       case rsEventTime, rsOutput:
+       case rsEventTime, rsOutput, rsProcessContinuation:
                // Identical to the default cases.
-       case rsProcessContinuation:
-               switch transition {
-               case RetError:
-                       return rsError, nil
-               default:
-                       return -1, errProcessContinuationPrecedence
-               }
        case rsError:
                // This is a terminal state. No valid transitions. error must 
be the final return value.
                return -1, errErrorPrecedence
@@ -696,7 +690,7 @@ func nextRetState(cur retState, transition ReturnKind) 
(retState, error) {
        case RetValue, RetRTracker:
                return rsOutput, nil
        case RetProcessContinuation:
-               return rsProcessContinuation, nil
+               return -1, errContinuationSupport
        case RetError:
                return rsError, nil
        default:
diff --git a/sdks/go/pkg/beam/core/funcx/fn_test.go 
b/sdks/go/pkg/beam/core/funcx/fn_test.go
index 202de64b19d..60b97f2080c 100644
--- a/sdks/go/pkg/beam/core/funcx/fn_test.go
+++ b/sdks/go/pkg/beam/core/funcx/fn_test.go
@@ -121,10 +121,10 @@ func TestNew(t *testing.T) {
                        Ret:   []ReturnKind{RetError},
                },
                {
-                       Name:  "sdf",
-                       Fn:    func(sdf.RTracker, func(int)) 
(sdf.ProcessContinuation, error) { return nil, nil },
-                       Param: []FnParamKind{FnRTracker, FnEmit},
-                       Ret:   []ReturnKind{RetProcessContinuation, RetError},
+                       // TODO(BEAM-11104): Replace with a functioning test 
case once E2E support is finished.
+                       Name: "sdf",
+                       Fn:   func(sdf.RTracker, func(int)) 
(sdf.ProcessContinuation, error) { return nil, nil },
+                       Err:  errContinuationSupport,
                },
                {
                        Name: "errContextParam: after input",
@@ -247,13 +247,6 @@ func TestNew(t *testing.T) {
                        },
                        Err: errEventTimeRetPrecedence,
                },
-               {
-                       Name: "errProcessContinuationPrecedence",
-                       Fn: func() (string, sdf.ProcessContinuation, int, 
error) {
-                               return "", nil, 0, nil
-                       },
-                       Err: errProcessContinuationPrecedence,
-               },
                {
                        Name: "errIllegalParametersInEmit - malformed emit 
struct",
                        Fn:   func(context.Context, typex.EventTime, 
reflect.Type, func(nonConcreteType)) error { return nil },
diff --git a/sdks/go/pkg/beam/core/runtime/exec/fn_test.go 
b/sdks/go/pkg/beam/core/runtime/exec/fn_test.go
index d4e4a091229..ee0b9ab5b98 100644
--- a/sdks/go/pkg/beam/core/runtime/exec/fn_test.go
+++ b/sdks/go/pkg/beam/core/runtime/exec/fn_test.go
@@ -33,13 +33,6 @@ import (
 
 var errGeneric = errors.New("generic error")
 
-func continuationsEqual(first, second sdf.ProcessContinuation) bool {
-       if first.ShouldResume() {
-               return first.ShouldResume() == second.ShouldResume() && 
first.ResumeDelay() == second.ResumeDelay()
-       }
-       return first.ShouldResume() == second.ShouldResume()
-}
-
 // TestInvoke verifies the the various forms of input to Invoke are handled 
correctly.
 func TestInvoke(t *testing.T) {
        tests := []struct {
@@ -133,12 +126,6 @@ func TestInvoke(t *testing.T) {
                        Opt:      &MainInput{Key: FullValue{Elm: 1}},
                        Expected: mtime.EndOfGlobalWindowTime.Milliseconds(),
                },
-               {
-                       // (Return check) ProcessContinuation
-                       Fn:                   func(a string) 
sdf.ProcessContinuation { return sdf.ResumeProcessingIn(1 * time.Second) },
-                       Opt:                  &MainInput{Key: FullValue{Elm: 
"some string"}},
-                       ExpectedContinuation: sdf.ResumeProcessingIn(1 * 
time.Second),
-               },
                {
                        // (Return check) K, V
                        Fn:        func(a int) (int64, int) { return int64(a), 
2 * a },
@@ -146,13 +133,6 @@ func TestInvoke(t *testing.T) {
                        Expected:  int64(1),
                        Expected2: 2,
                },
-               {
-                       // (Return check) V, ProcessContinuation
-                       Fn:                   func(a int) (int, 
sdf.ProcessContinuation) { return 2 * a, sdf.StopProcessing() },
-                       Opt:                  &MainInput{Key: FullValue{Elm: 
1}},
-                       Expected:             2,
-                       ExpectedContinuation: sdf.StopProcessing(),
-               },
                {
                        // (Return check)  K, V, Error
                        Fn:        func(a int) (int64, int, error) { return 
int64(a), 2 * a, nil },
@@ -160,14 +140,6 @@ func TestInvoke(t *testing.T) {
                        Expected:  int64(1),
                        Expected2: 2,
                },
-               {
-                       // (Return check) K, V, ProcessContinuation
-                       Fn:                   func(a int) (int64, int, 
sdf.ProcessContinuation) { return int64(a), 2 * a, sdf.StopProcessing() },
-                       Opt:                  &MainInput{Key: FullValue{Elm: 
1}},
-                       Expected:             int64(1),
-                       Expected2:            2,
-                       ExpectedContinuation: sdf.StopProcessing(),
-               },
                {
                        // (Return check) EventTime, K, V
                        Fn:           func(a int) (typex.EventTime, int64, int) 
{ return 42, int64(a), 3 * a },
@@ -176,17 +148,6 @@ func TestInvoke(t *testing.T) {
                        Expected2:    3,
                        ExpectedTime: 42,
                },
-               {
-                       // (Return check) EventTime, K, V, ProcessContinuation
-                       Fn: func(a int) (typex.EventTime, int64, int, 
sdf.ProcessContinuation) {
-                               return 42, int64(a), 3 * a, sdf.StopProcessing()
-                       },
-                       Opt:                  &MainInput{Key: FullValue{Elm: 
1}},
-                       Expected:             int64(1),
-                       Expected2:            3,
-                       ExpectedTime:         42,
-                       ExpectedContinuation: sdf.StopProcessing(),
-               },
                {
                        // (Return check) EventTime, K, V, Error
                        Fn:           func(a int) (typex.EventTime, int64, int, 
error) { return 47, int64(a), 3 * a, nil },
@@ -195,17 +156,6 @@ func TestInvoke(t *testing.T) {
                        Expected2:    3,
                        ExpectedTime: 47,
                },
-               {
-                       // (Return check) EventTime, K, V, ProcessContinuation, 
Error
-                       Fn: func(a int) (typex.EventTime, int64, int, 
sdf.ProcessContinuation, error) {
-                               return 47, int64(a), 3 * a, 
sdf.StopProcessing(), nil
-                       },
-                       Opt:                  &MainInput{Key: FullValue{Elm: 
1}},
-                       Expected:             int64(1),
-                       Expected2:            3,
-                       ExpectedTime:         47,
-                       ExpectedContinuation: sdf.StopProcessing(),
-               },
                {
                        // (Return check) EventTime, V, Error
                        Fn:           func(a int) (typex.EventTime, int, error) 
{ return 10, 3 * a, nil },
@@ -250,14 +200,7 @@ func TestInvoke(t *testing.T) {
                        Opt:           &MainInput{Key: FullValue{Elm: 1}},
                        ExpectedError: errGeneric,
                },
-               {
-                       // ret5() error check
-                       Fn: func(a int) (typex.EventTime, string, int, 
sdf.ProcessContinuation, error) {
-                               return 0, "", 0, nil, errGeneric
-                       },
-                       Opt:           &MainInput{Key: FullValue{Elm: 1}},
-                       ExpectedError: errGeneric,
-               },
+               // TODO(BEAM-11104): Add unit test cases for 
ProcessContinuations once they are enabled for use.
        }
 
        for i, test := range tests {
@@ -300,8 +243,8 @@ func TestInvoke(t *testing.T) {
                                if val != nil && val.Timestamp != 
test.ExpectedTime {
                                        t.Errorf("EventTime: Invoke(%v,%v) = 
%v, want %v", fn.Fn.Name(), test.Args, val.Timestamp, test.ExpectedTime)
                                }
-                               if val != nil && test.ExpectedContinuation != 
nil && !continuationsEqual(val.Continuation, test.ExpectedContinuation) {
-                                       t.Errorf("Continuation: Invoke(%v,%v) = 
%v, want %v", fn.Fn.Name(), test.Args, val.Continuation, 
test.ExpectedContinuation)
+                               if val != nil && val.Continuation != 
test.ExpectedContinuation {
+                                       t.Errorf("EventTime: Invoke(%v,%v) = 
%v, want %v", fn.Fn.Name(), test.Args, val.Continuation, 
test.ExpectedContinuation)
                                }
                        }
                })

Reply via email to