This is an automated email from the ASF dual-hosted git repository.

lostluck pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 3b112e8bd8d [#24789]  Remainder of changes from #27550. (#27822)
3b112e8bd8d is described below

commit 3b112e8bd8da50d934955d64f2fd1e1396d3fe01
Author: Robert Burke <[email protected]>
AuthorDate: Mon Aug 7 11:27:38 2023 -0700

    [#24789]  Remainder of changes from #27550. (#27822)
    
    * Make the prism runner the default Go SDK runner.
    
    * Break cycle with ptest.
    
    * [DO NOT SUBMIT] Most changes needec to set prism as default Go SDK runner.
    
    * rm commented out code.
    
    * Avoid unnecessary logs on normal path.
    
    * Fix top.
    
    * Adjust Go versions?
    
    * [prism] Update symbol lookup to not be unit test specific.
    
    * [prism] Support for reshuffles.
    
    * [prism] move reshuffle test out of unimplemented.
    
    * [prism] Add CoGBK test to unimplemented set.
    
    * [prism] graduate additional tests.
    
    * delint
    
    * [prog] guide updates
    
    * [prism] Support CoGBKs and wafer thin fusion.
    
    * Make window close strict.
    
    * quick first pass
    
    * chang cleanup
    
    * remove unnecessary churn changes
    
    * rm execute line
    
    * Update sdks/go/pkg/beam/runners/vet/vet.go
    
    Co-authored-by: Ritesh Ghorse <[email protected]>
    
    ---------
    
    Co-authored-by: lostluck <[email protected]>
    Co-authored-by: Ritesh Ghorse <[email protected]>
---
 sdks/go/pkg/beam/core/runtime/harness/datamgr.go         | 12 ++++++++++--
 sdks/go/pkg/beam/core/runtime/harness/harness.go         |  2 +-
 sdks/go/pkg/beam/core/runtime/harness/statemgr.go        |  9 ++++++++-
 sdks/go/pkg/beam/core/runtime/symbols.go                 |  2 +-
 sdks/go/pkg/beam/create.go                               |  5 +++++
 sdks/go/pkg/beam/create_test.go                          |  4 ++--
 sdks/go/pkg/beam/io/databaseio/database_test.go          |  1 +
 sdks/go/pkg/beam/io/datastoreio/datastore_test.go        |  5 +++++
 sdks/go/pkg/beam/pardo_test.go                           |  4 ++--
 .../beam/runners/prism/internal/engine/elementmanager.go |  1 -
 sdks/go/pkg/beam/runners/prism/internal/handlerunner.go  |  2 +-
 sdks/go/pkg/beam/runners/prism/internal/stage.go         |  8 ++++++--
 .../beam/runners/prism/internal/unimplemented_test.go    |  1 -
 sdks/go/pkg/beam/runners/prism/internal/worker/bundle.go |  2 ++
 sdks/go/pkg/beam/runners/prism/internal/worker/worker.go |  7 ++++---
 sdks/go/pkg/beam/runners/universal/runnerlib/job.go      | 16 ++++++++--------
 sdks/go/pkg/beam/runners/vet/vet.go                      |  2 +-
 sdks/go/test/integration/integration.go                  |  3 +--
 .../site/content/en/documentation/programming-guide.md   |  9 ++++++---
 19 files changed, 64 insertions(+), 31 deletions(-)

diff --git a/sdks/go/pkg/beam/core/runtime/harness/datamgr.go 
b/sdks/go/pkg/beam/core/runtime/harness/datamgr.go
index d8c0f4d1d85..9662ac07c9c 100644
--- a/sdks/go/pkg/beam/core/runtime/harness/datamgr.go
+++ b/sdks/go/pkg/beam/core/runtime/harness/datamgr.go
@@ -27,6 +27,8 @@ import (
        "github.com/apache/beam/sdks/v2/go/pkg/beam/internal/errors"
        "github.com/apache/beam/sdks/v2/go/pkg/beam/log"
        fnpb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/fnexecution_v1"
+       "google.golang.org/grpc/codes"
+       "google.golang.org/grpc/status"
 )
 
 const (
@@ -128,7 +130,12 @@ func (m *DataChannelManager) Open(ctx context.Context, 
port exec.Port) (*DataCha
                return nil, err
        }
        ch.forceRecreate = func(id string, err error) {
-               log.Warnf(ctx, "forcing DataChannel[%v] reconnection on port %v 
due to %v", id, port, err)
+               switch status.Code(err) {
+               case codes.Canceled:
+                       // Don't log on context canceled path.
+               default:
+                       log.Warnf(ctx, "forcing DataChannel[%v] reconnection on 
port %v due to %v", id, port, err)
+               }
                m.mu.Lock()
                delete(m.ports, port.URL)
                m.mu.Unlock()
@@ -371,7 +378,8 @@ func (c *DataChannel) read(ctx context.Context) {
                        c.terminateStreamOnError(err)
                        c.mu.Unlock()
 
-                       if err == io.EOF {
+                       st := status.Code(err)
+                       if st == codes.Canceled || err == io.EOF {
                                return
                        }
                        log.Errorf(ctx, "DataChannel.read %v bad: %v", c.id, 
err)
diff --git a/sdks/go/pkg/beam/core/runtime/harness/harness.go 
b/sdks/go/pkg/beam/core/runtime/harness/harness.go
index d97b6b7db07..c5db9a85f36 100644
--- a/sdks/go/pkg/beam/core/runtime/harness/harness.go
+++ b/sdks/go/pkg/beam/core/runtime/harness/harness.go
@@ -708,6 +708,6 @@ func fail(ctx context.Context, id instructionID, format 
string, args ...any) *fn
 // dial to the specified endpoint. if timeout <=0, call blocks until
 // grpc.Dial succeeds.
 func dial(ctx context.Context, endpoint, purpose string, timeout 
time.Duration) (*grpc.ClientConn, error) {
-       log.Infof(ctx, "Connecting via grpc @ %s for %s ...", endpoint, purpose)
+       log.Output(ctx, log.SevDebug, 1, fmt.Sprintf("Connecting via grpc @ %s 
for %s ...", endpoint, purpose))
        return grpcx.Dial(ctx, endpoint, timeout)
 }
diff --git a/sdks/go/pkg/beam/core/runtime/harness/statemgr.go 
b/sdks/go/pkg/beam/core/runtime/harness/statemgr.go
index f10f0d92e84..76d4e1f32c2 100644
--- a/sdks/go/pkg/beam/core/runtime/harness/statemgr.go
+++ b/sdks/go/pkg/beam/core/runtime/harness/statemgr.go
@@ -29,6 +29,8 @@ import (
        "github.com/apache/beam/sdks/v2/go/pkg/beam/log"
        fnpb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/fnexecution_v1"
        "github.com/golang/protobuf/proto"
+       "google.golang.org/grpc/codes"
+       "google.golang.org/grpc/status"
 )
 
 type writeTypeEnum int32
@@ -525,7 +527,12 @@ func (m *StateChannelManager) Open(ctx context.Context, 
port exec.Port) (*StateC
                return nil, err
        }
        ch.forceRecreate = func(id string, err error) {
-               log.Warnf(ctx, "forcing StateChannel[%v] reconnection on port 
%v due to %v", id, port, err)
+               switch status.Code(err) {
+               case codes.Canceled:
+                       // Don't log on context canceled path.
+               default:
+                       log.Warnf(ctx, "forcing StateChannel[%v] reconnection 
on port %v due to %v", id, port, err)
+               }
                m.mu.Lock()
                delete(m.ports, port.URL)
                m.mu.Unlock()
diff --git a/sdks/go/pkg/beam/core/runtime/symbols.go 
b/sdks/go/pkg/beam/core/runtime/symbols.go
index e8ff532e763..84afe9b769a 100644
--- a/sdks/go/pkg/beam/core/runtime/symbols.go
+++ b/sdks/go/pkg/beam/core/runtime/symbols.go
@@ -105,5 +105,5 @@ func ResolveFunction(name string, t reflect.Type) (any, 
error) {
 type failResolver bool
 
 func (p failResolver) Sym2Addr(name string) (uintptr, error) {
-       return 0, errors.Errorf("%v not found. Use runtime.RegisterFunction in 
unit tests", name)
+       return 0, errors.Errorf("%v not found. Register DoFns and functions 
with the the beam/register package.", name)
 }
diff --git a/sdks/go/pkg/beam/create.go b/sdks/go/pkg/beam/create.go
index d2bd554963e..91e9f335ef8 100644
--- a/sdks/go/pkg/beam/create.go
+++ b/sdks/go/pkg/beam/create.go
@@ -112,6 +112,11 @@ func createList(s Scope, values []any, t reflect.Type) 
(PCollection, error) {
 
 // TODO(herohde) 6/26/2017: make 'create' a SDF once supported. See BEAM-2421.
 
+func init() {
+       register.DoFn2x1[[]byte, func(T), error]((*createFn)(nil))
+       register.Emitter1[T]()
+}
+
 type createFn struct {
        Values [][]byte    `json:"values"`
        Type   EncodedType `json:"type"`
diff --git a/sdks/go/pkg/beam/create_test.go b/sdks/go/pkg/beam/create_test.go
index 9033979d050..785c3b33db6 100644
--- a/sdks/go/pkg/beam/create_test.go
+++ b/sdks/go/pkg/beam/create_test.go
@@ -75,8 +75,8 @@ func TestCreateList(t *testing.T) {
                {[]float64{float64(0.1), float64(0.2), float64(0.3)}},
                {[]uint{uint(1), uint(2), uint(3)}},
                {[]bool{false, true, true, false, true}},
-               {[]wc{wc{"a", 23}, wc{"b", 42}, wc{"c", 5}}},
-               {[]*testProto{&testProto{}, &testProto{stringValue("test")}}}, 
// Test for BEAM-4401
+               {[]wc{{"a", 23}, {"b", 42}, {"c", 5}}},
+               {[]*testProto{{}, {stringValue("test")}}}, // Test for BEAM-4401
        }
 
        for _, test := range tests {
diff --git a/sdks/go/pkg/beam/io/databaseio/database_test.go 
b/sdks/go/pkg/beam/io/databaseio/database_test.go
index b93d5c9da72..f6c1355e851 100644
--- a/sdks/go/pkg/beam/io/databaseio/database_test.go
+++ b/sdks/go/pkg/beam/io/databaseio/database_test.go
@@ -22,6 +22,7 @@ import (
 
        "github.com/apache/beam/sdks/v2/go/pkg/beam"
        _ "github.com/apache/beam/sdks/v2/go/pkg/beam/runners/direct"
+       _ "github.com/apache/beam/sdks/v2/go/pkg/beam/runners/prism"
        "github.com/apache/beam/sdks/v2/go/pkg/beam/testing/passert"
        "github.com/apache/beam/sdks/v2/go/pkg/beam/testing/ptest"
        _ "github.com/proullon/ramsql/driver"
diff --git a/sdks/go/pkg/beam/io/datastoreio/datastore_test.go 
b/sdks/go/pkg/beam/io/datastoreio/datastore_test.go
index 345eaa2a59e..b95439e2d56 100644
--- a/sdks/go/pkg/beam/io/datastoreio/datastore_test.go
+++ b/sdks/go/pkg/beam/io/datastoreio/datastore_test.go
@@ -64,6 +64,11 @@ type Foo struct {
 type Bar struct {
 }
 
+func init() {
+       beam.RegisterType(reflect.TypeOf((*Foo)(nil)).Elem())
+       beam.RegisterType(reflect.TypeOf((*Bar)(nil)).Elem())
+}
+
 func Test_query(t *testing.T) {
        testCases := []struct {
                v           any
diff --git a/sdks/go/pkg/beam/pardo_test.go b/sdks/go/pkg/beam/pardo_test.go
index b88a6d642ea..56ed7e3e9fa 100644
--- a/sdks/go/pkg/beam/pardo_test.go
+++ b/sdks/go/pkg/beam/pardo_test.go
@@ -72,9 +72,9 @@ func testFunction() int64 {
 
 func TestFormatParDoError(t *testing.T) {
        got := formatParDoError(testFunction, 2, 1)
-       want := "beam.testFunction has 2 outputs, but ParDo requires 1 outputs, 
use ParDo2 instead."
+       want := "has 2 outputs, but ParDo requires 1 outputs, use ParDo2 
instead."
        if !strings.Contains(got, want) {
-               t.Errorf("formatParDoError(testFunction,2,1) = %v, want = %v", 
got, want)
+               t.Errorf("formatParDoError(testFunction,2,1) = \n%q want 
=\n%q", got, want)
        }
 }
 
diff --git a/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go 
b/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go
index c8721e1a207..5e1585ffcd1 100644
--- a/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go
+++ b/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go
@@ -707,7 +707,6 @@ func (ss *stageState) bundleReady(em *ElementManager) 
(mtime.Time, bool) {
        ready := true
        for _, side := range ss.sides {
                pID, ok := em.pcolParents[side]
-               // These panics indicate pre-process/stage construction 
problems.
                if !ok {
                        panic(fmt.Sprintf("stage[%v] no parent ID for side 
input %v", ss.ID, side))
                }
diff --git a/sdks/go/pkg/beam/runners/prism/internal/handlerunner.go 
b/sdks/go/pkg/beam/runners/prism/internal/handlerunner.go
index 27303f03b70..05b3d3bbaa0 100644
--- a/sdks/go/pkg/beam/runners/prism/internal/handlerunner.go
+++ b/sdks/go/pkg/beam/runners/prism/internal/handlerunner.go
@@ -43,7 +43,7 @@ import (
 type RunnerCharacteristic struct {
        SDKFlatten   bool // Sets whether we should force an SDK side flatten.
        SDKGBK       bool // Sets whether the GBK should be handled by the SDK, 
if possible by the SDK.
-       SDKReshuffle bool
+       SDKReshuffle bool // Sets whether we should use the SDK backup 
implementation to handle a Reshuffle.
 }
 
 func Runner(config any) *runner {
diff --git a/sdks/go/pkg/beam/runners/prism/internal/stage.go 
b/sdks/go/pkg/beam/runners/prism/internal/stage.go
index e6fe28714b7..1a9c2548df8 100644
--- a/sdks/go/pkg/beam/runners/prism/internal/stage.go
+++ b/sdks/go/pkg/beam/runners/prism/internal/stage.go
@@ -50,6 +50,10 @@ type link struct {
 // should in principle be able to connect two SDK environments directly
 // instead of going through the runner at all, which would be a small
 // efficiency gain, in runner memory use.
+//
+// That would also warrant an execution mode where fusion is taken into
+// account, but all serialization boundaries remain since the pcollections
+// would continue to get serialized.
 type stage struct {
        ID           string
        transforms   []string
@@ -145,11 +149,11 @@ progress:
                        if previousIndex == index && !splitsDone {
                                sr, err := b.Split(wk, 0.5 /* fraction of 
remainder */, nil /* allowed splits */)
                                if err != nil {
-                                       slog.Debug("SDK Error from split, 
aborting splits", "bundle", rb, "error", err.Error())
+                                       slog.Warn("SDK Error from split, 
aborting splits", "bundle", rb, "error", err.Error())
                                        break progress
                                }
                                if sr.GetChannelSplits() == nil {
-                                       slog.Warn("split failed", "bundle", rb)
+                                       slog.Debug("SDK returned no splits", 
"bundle", rb)
                                        splitsDone = true
                                        continue progress
                                }
diff --git a/sdks/go/pkg/beam/runners/prism/internal/unimplemented_test.go 
b/sdks/go/pkg/beam/runners/prism/internal/unimplemented_test.go
index f738a299cfd..5f8d3875999 100644
--- a/sdks/go/pkg/beam/runners/prism/internal/unimplemented_test.go
+++ b/sdks/go/pkg/beam/runners/prism/internal/unimplemented_test.go
@@ -41,7 +41,6 @@ func TestUnimplemented(t *testing.T) {
        tests := []struct {
                pipeline func(s beam.Scope)
        }{
-               // These tests don't terminate, so can't be run.
                // {pipeline: primitives.Drain}, // Can't test drain 
automatically yet.
 
                {pipeline: primitives.TestStreamBoolSequence},
diff --git a/sdks/go/pkg/beam/runners/prism/internal/worker/bundle.go 
b/sdks/go/pkg/beam/runners/prism/internal/worker/bundle.go
index 30515fa6f6e..c931655f000 100644
--- a/sdks/go/pkg/beam/runners/prism/internal/worker/bundle.go
+++ b/sdks/go/pkg/beam/runners/prism/internal/worker/bundle.go
@@ -145,6 +145,7 @@ func (b *B) Cleanup(wk *W) {
        wk.mu.Unlock()
 }
 
+// Progress sends a progress request for the given bundle to the passed in 
worker, blocking on the response.
 func (b *B) Progress(wk *W) (*fnpb.ProcessBundleProgressResponse, error) {
        resp := wk.sendInstruction(&fnpb.InstructionRequest{
                Request: &fnpb.InstructionRequest_ProcessBundleProgress{
@@ -159,6 +160,7 @@ func (b *B) Progress(wk *W) 
(*fnpb.ProcessBundleProgressResponse, error) {
        return resp.GetProcessBundleProgress(), nil
 }
 
+// Split sends a split request for the given bundle to the passed in worker, 
blocking on the response.
 func (b *B) Split(wk *W, fraction float64, allowedSplits []int64) 
(*fnpb.ProcessBundleSplitResponse, error) {
        resp := wk.sendInstruction(&fnpb.InstructionRequest{
                Request: &fnpb.InstructionRequest_ProcessBundleSplit{
diff --git a/sdks/go/pkg/beam/runners/prism/internal/worker/worker.go 
b/sdks/go/pkg/beam/runners/prism/internal/worker/worker.go
index eefab54a54c..a1d0ff79baf 100644
--- a/sdks/go/pkg/beam/runners/prism/internal/worker/worker.go
+++ b/sdks/go/pkg/beam/runners/prism/internal/worker/worker.go
@@ -256,7 +256,6 @@ func (wk *W) Control(ctrl fnpb.BeamFnControl_ControlServer) 
error {
                        // TODO: Do more than assume these are 
ProcessBundleResponses.
                        wk.mu.Lock()
                        if b, ok := 
wk.activeInstructions[resp.GetInstructionId()]; ok {
-                               // Error is handled in the resonse handler.
                                b.Respond(resp)
                        } else {
                                slog.Debug("ctrl.Recv: %v", resp)
@@ -268,7 +267,10 @@ func (wk *W) Control(ctrl 
fnpb.BeamFnControl_ControlServer) error {
        for {
                select {
                case req := <-wk.InstReqs:
-                       ctrl.Send(req)
+                       err := ctrl.Send(req)
+                       if err != nil {
+                               return err
+                       }
                case <-ctrl.Context().Done():
                        slog.Debug("Control context canceled")
                        return ctrl.Context().Err()
@@ -322,7 +324,6 @@ func (wk *W) Data(data fnpb.BeamFnData_DataServer) error {
                        wk.mu.Unlock()
                }
        }()
-
        for {
                select {
                case req, ok := <-wk.DataReqs:
diff --git a/sdks/go/pkg/beam/runners/universal/runnerlib/job.go 
b/sdks/go/pkg/beam/runners/universal/runnerlib/job.go
index 8cbb274e184..4e50661b3db 100644
--- a/sdks/go/pkg/beam/runners/universal/runnerlib/job.go
+++ b/sdks/go/pkg/beam/runners/universal/runnerlib/job.go
@@ -103,7 +103,7 @@ func WaitForCompletion(ctx context.Context, client 
jobpb.JobServiceClient, jobID
                return errors.Wrap(err, "failed to get job stream")
        }
 
-       mostRecentError := errors.New("<no error received, see runner logs>")
+       mostRecentError := "<no error received>"
        var errReceived, jobFailed bool
 
        for {
@@ -111,8 +111,8 @@ func WaitForCompletion(ctx context.Context, client 
jobpb.JobServiceClient, jobID
                if err != nil {
                        if err == io.EOF {
                                if jobFailed {
-                                       // Connection finished with a failed 
status, so produce what we have.
-                                       return errors.Errorf("job %v 
failed:\n%w", jobID, mostRecentError)
+                                       // Connection finished, so time to 
exit, produce what we have.
+                                       return errors.Errorf("job %v 
failed:\n%v", jobID, mostRecentError)
                                }
                                return nil
                        }
@@ -123,7 +123,7 @@ func WaitForCompletion(ctx context.Context, client 
jobpb.JobServiceClient, jobID
                case msg.GetStateResponse() != nil:
                        resp := msg.GetStateResponse()
 
-                       log.Infof(ctx, "Job state: %v", 
resp.GetState().String())
+                       log.Infof(ctx, "Job[%v] state: %v", jobID, 
resp.GetState().String())
 
                        switch resp.State {
                        case jobpb.JobState_DONE, jobpb.JobState_CANCELLED:
@@ -131,9 +131,9 @@ func WaitForCompletion(ctx context.Context, client 
jobpb.JobServiceClient, jobID
                        case jobpb.JobState_FAILED:
                                jobFailed = true
                                if errReceived {
-                                       return errors.Errorf("job %v 
failed:\n%w", jobID, mostRecentError)
+                                       return errors.Errorf("job %v 
failed:\n%v", jobID, mostRecentError)
                                }
-                               // Otherwise, wait for at least one error log 
from the runner, or the connection to close.
+                               // Otherwise we should wait for at least one 
error log from the runner.
                        }
 
                case msg.GetMessageResponse() != nil:
@@ -144,10 +144,10 @@ func WaitForCompletion(ctx context.Context, client 
jobpb.JobServiceClient, jobID
 
                        if resp.GetImportance() >= 
jobpb.JobMessage_JOB_MESSAGE_ERROR {
                                errReceived = true
-                               mostRecentError = 
errors.New(resp.GetMessageText())
+                               mostRecentError = resp.GetMessageText()
 
                                if jobFailed {
-                                       return errors.Errorf("job %v 
failed:\n%w", jobID, mostRecentError)
+                                       return errors.Errorf("job %v 
failed:\n%w", jobID, errors.New(mostRecentError))
                                }
                        }
 
diff --git a/sdks/go/pkg/beam/runners/vet/vet.go 
b/sdks/go/pkg/beam/runners/vet/vet.go
index 131fa0b1ec1..739f5db61c5 100644
--- a/sdks/go/pkg/beam/runners/vet/vet.go
+++ b/sdks/go/pkg/beam/runners/vet/vet.go
@@ -54,7 +54,7 @@ func init() {
 type disabledResolver bool
 
 func (p disabledResolver) Sym2Addr(name string) (uintptr, error) {
-       return 0, errors.Errorf("%v not found. Use runtime.RegisterFunction in 
unit tests", name)
+       return 0, errors.Errorf("%v not found. Register DoFns and functions 
with the beam/register package.", name)
 }
 
 // Execute evaluates the pipeline on whether it can run without reflection.
diff --git a/sdks/go/test/integration/integration.go 
b/sdks/go/test/integration/integration.go
index 0f9e5984ead..f66cc1f53bf 100644
--- a/sdks/go/test/integration/integration.go
+++ b/sdks/go/test/integration/integration.go
@@ -136,7 +136,6 @@ var portableFilters = []string{
        "TestSetStateClear",
 }
 
-// TODO(lostluck): set up a specific run for these.
 var prismFilters = []string{
        // The prism runner does not support the TestStream primitive
        "TestTestStream.*",
@@ -149,7 +148,7 @@ var prismFilters = []string{
        // TODO(BEAM-13215): GCP IOs currently do not work in non-Dataflow 
portable runners.
        "TestBigQueryIO.*",
        "TestSpannerIO.*",
-       // The prsim runner does not support pipeline drain for SDF.
+       // The prism runner does not support pipeline drain for SDF.
        "TestDrain",
        // FhirIO currently only supports Dataflow runner
        "TestFhirIO.*",
diff --git a/website/www/site/content/en/documentation/programming-guide.md 
b/website/www/site/content/en/documentation/programming-guide.md
index 82ada91f26a..b0118df3987 100644
--- a/website/www/site/content/en/documentation/programming-guide.md
+++ b/website/www/site/content/en/documentation/programming-guide.md
@@ -1124,6 +1124,9 @@ words = ...
 {{< /highlight >}}
 
 {{< highlight go >}}
+
+The Go SDK cannot support anonymous functions outside of the deprecated Go 
Direct runner.
+
 // words is the input PCollection of strings
 var words beam.PCollection = ...
 
@@ -1170,8 +1173,8 @@ words = ...
 {{< /highlight >}}
 
 {{< highlight go >}}
-// words is the input PCollection of strings
-var words beam.PCollection = ...
+
+The Go SDK cannot support anonymous functions outside of the deprecated Go 
Direct runner.
 
 {{< code_sample "sdks/go/examples/snippets/04transforms.go" 
model_pardo_apply_anon >}}
 {{< /highlight >}}
@@ -1191,7 +1194,7 @@ words = ...
 
 <span class="language-go">
 
-> **Note:** Anonymous function DoFns may not work on distributed runners.
+> **Note:** Anonymous function DoFns do not work on distributed runners.
 > It's recommended to use named functions and register them with 
 > `register.FunctionXxY` in
 > an `init()` block.
 

Reply via email to