This is an automated email from the ASF dual-hosted git repository.
lostluck pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 3b112e8bd8d [#24789] Remainder of changes from #27550. (#27822)
3b112e8bd8d is described below
commit 3b112e8bd8da50d934955d64f2fd1e1396d3fe01
Author: Robert Burke <[email protected]>
AuthorDate: Mon Aug 7 11:27:38 2023 -0700
[#24789] Remainder of changes from #27550. (#27822)
* Make the prism runner the default Go SDK runner.
* Break cycle with ptest.
* [DO NOT SUBMIT] Most changes needec to set prism as default Go SDK runner.
* rm commented out code.
* Avoid unnecessary logs on normal path.
* Fix top.
* Adjust Go versions?
* [prism] Update symbol lookup to not be unit test specific.
* [prism] Support for reshuffles.
* [prism] move reshuffle test out of unimplemented.
* [prism] Add CoGBK test to unimplemented set.
* [prism] graduate additional tests.
* delint
* [prog] guide updates
* [prism] Support CoGBKs and wafer thin fusion.
* Make window close strict.
* quick first pass
* chang cleanup
* remove unnecessary churn changes
* rm execute line
* Update sdks/go/pkg/beam/runners/vet/vet.go
Co-authored-by: Ritesh Ghorse <[email protected]>
---------
Co-authored-by: lostluck <[email protected]>
Co-authored-by: Ritesh Ghorse <[email protected]>
---
sdks/go/pkg/beam/core/runtime/harness/datamgr.go | 12 ++++++++++--
sdks/go/pkg/beam/core/runtime/harness/harness.go | 2 +-
sdks/go/pkg/beam/core/runtime/harness/statemgr.go | 9 ++++++++-
sdks/go/pkg/beam/core/runtime/symbols.go | 2 +-
sdks/go/pkg/beam/create.go | 5 +++++
sdks/go/pkg/beam/create_test.go | 4 ++--
sdks/go/pkg/beam/io/databaseio/database_test.go | 1 +
sdks/go/pkg/beam/io/datastoreio/datastore_test.go | 5 +++++
sdks/go/pkg/beam/pardo_test.go | 4 ++--
.../beam/runners/prism/internal/engine/elementmanager.go | 1 -
sdks/go/pkg/beam/runners/prism/internal/handlerunner.go | 2 +-
sdks/go/pkg/beam/runners/prism/internal/stage.go | 8 ++++++--
.../beam/runners/prism/internal/unimplemented_test.go | 1 -
sdks/go/pkg/beam/runners/prism/internal/worker/bundle.go | 2 ++
sdks/go/pkg/beam/runners/prism/internal/worker/worker.go | 7 ++++---
sdks/go/pkg/beam/runners/universal/runnerlib/job.go | 16 ++++++++--------
sdks/go/pkg/beam/runners/vet/vet.go | 2 +-
sdks/go/test/integration/integration.go | 3 +--
.../site/content/en/documentation/programming-guide.md | 9 ++++++---
19 files changed, 64 insertions(+), 31 deletions(-)
diff --git a/sdks/go/pkg/beam/core/runtime/harness/datamgr.go
b/sdks/go/pkg/beam/core/runtime/harness/datamgr.go
index d8c0f4d1d85..9662ac07c9c 100644
--- a/sdks/go/pkg/beam/core/runtime/harness/datamgr.go
+++ b/sdks/go/pkg/beam/core/runtime/harness/datamgr.go
@@ -27,6 +27,8 @@ import (
"github.com/apache/beam/sdks/v2/go/pkg/beam/internal/errors"
"github.com/apache/beam/sdks/v2/go/pkg/beam/log"
fnpb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/fnexecution_v1"
+ "google.golang.org/grpc/codes"
+ "google.golang.org/grpc/status"
)
const (
@@ -128,7 +130,12 @@ func (m *DataChannelManager) Open(ctx context.Context,
port exec.Port) (*DataCha
return nil, err
}
ch.forceRecreate = func(id string, err error) {
- log.Warnf(ctx, "forcing DataChannel[%v] reconnection on port %v
due to %v", id, port, err)
+ switch status.Code(err) {
+ case codes.Canceled:
+ // Don't log on context canceled path.
+ default:
+ log.Warnf(ctx, "forcing DataChannel[%v] reconnection on
port %v due to %v", id, port, err)
+ }
m.mu.Lock()
delete(m.ports, port.URL)
m.mu.Unlock()
@@ -371,7 +378,8 @@ func (c *DataChannel) read(ctx context.Context) {
c.terminateStreamOnError(err)
c.mu.Unlock()
- if err == io.EOF {
+ st := status.Code(err)
+ if st == codes.Canceled || err == io.EOF {
return
}
log.Errorf(ctx, "DataChannel.read %v bad: %v", c.id,
err)
diff --git a/sdks/go/pkg/beam/core/runtime/harness/harness.go
b/sdks/go/pkg/beam/core/runtime/harness/harness.go
index d97b6b7db07..c5db9a85f36 100644
--- a/sdks/go/pkg/beam/core/runtime/harness/harness.go
+++ b/sdks/go/pkg/beam/core/runtime/harness/harness.go
@@ -708,6 +708,6 @@ func fail(ctx context.Context, id instructionID, format
string, args ...any) *fn
// dial to the specified endpoint. if timeout <=0, call blocks until
// grpc.Dial succeeds.
func dial(ctx context.Context, endpoint, purpose string, timeout
time.Duration) (*grpc.ClientConn, error) {
- log.Infof(ctx, "Connecting via grpc @ %s for %s ...", endpoint, purpose)
+ log.Output(ctx, log.SevDebug, 1, fmt.Sprintf("Connecting via grpc @ %s
for %s ...", endpoint, purpose))
return grpcx.Dial(ctx, endpoint, timeout)
}
diff --git a/sdks/go/pkg/beam/core/runtime/harness/statemgr.go
b/sdks/go/pkg/beam/core/runtime/harness/statemgr.go
index f10f0d92e84..76d4e1f32c2 100644
--- a/sdks/go/pkg/beam/core/runtime/harness/statemgr.go
+++ b/sdks/go/pkg/beam/core/runtime/harness/statemgr.go
@@ -29,6 +29,8 @@ import (
"github.com/apache/beam/sdks/v2/go/pkg/beam/log"
fnpb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/fnexecution_v1"
"github.com/golang/protobuf/proto"
+ "google.golang.org/grpc/codes"
+ "google.golang.org/grpc/status"
)
type writeTypeEnum int32
@@ -525,7 +527,12 @@ func (m *StateChannelManager) Open(ctx context.Context,
port exec.Port) (*StateC
return nil, err
}
ch.forceRecreate = func(id string, err error) {
- log.Warnf(ctx, "forcing StateChannel[%v] reconnection on port
%v due to %v", id, port, err)
+ switch status.Code(err) {
+ case codes.Canceled:
+ // Don't log on context canceled path.
+ default:
+ log.Warnf(ctx, "forcing StateChannel[%v] reconnection
on port %v due to %v", id, port, err)
+ }
m.mu.Lock()
delete(m.ports, port.URL)
m.mu.Unlock()
diff --git a/sdks/go/pkg/beam/core/runtime/symbols.go
b/sdks/go/pkg/beam/core/runtime/symbols.go
index e8ff532e763..84afe9b769a 100644
--- a/sdks/go/pkg/beam/core/runtime/symbols.go
+++ b/sdks/go/pkg/beam/core/runtime/symbols.go
@@ -105,5 +105,5 @@ func ResolveFunction(name string, t reflect.Type) (any,
error) {
type failResolver bool
func (p failResolver) Sym2Addr(name string) (uintptr, error) {
- return 0, errors.Errorf("%v not found. Use runtime.RegisterFunction in
unit tests", name)
+ return 0, errors.Errorf("%v not found. Register DoFns and functions
with the the beam/register package.", name)
}
diff --git a/sdks/go/pkg/beam/create.go b/sdks/go/pkg/beam/create.go
index d2bd554963e..91e9f335ef8 100644
--- a/sdks/go/pkg/beam/create.go
+++ b/sdks/go/pkg/beam/create.go
@@ -112,6 +112,11 @@ func createList(s Scope, values []any, t reflect.Type)
(PCollection, error) {
// TODO(herohde) 6/26/2017: make 'create' a SDF once supported. See BEAM-2421.
+func init() {
+ register.DoFn2x1[[]byte, func(T), error]((*createFn)(nil))
+ register.Emitter1[T]()
+}
+
type createFn struct {
Values [][]byte `json:"values"`
Type EncodedType `json:"type"`
diff --git a/sdks/go/pkg/beam/create_test.go b/sdks/go/pkg/beam/create_test.go
index 9033979d050..785c3b33db6 100644
--- a/sdks/go/pkg/beam/create_test.go
+++ b/sdks/go/pkg/beam/create_test.go
@@ -75,8 +75,8 @@ func TestCreateList(t *testing.T) {
{[]float64{float64(0.1), float64(0.2), float64(0.3)}},
{[]uint{uint(1), uint(2), uint(3)}},
{[]bool{false, true, true, false, true}},
- {[]wc{wc{"a", 23}, wc{"b", 42}, wc{"c", 5}}},
- {[]*testProto{&testProto{}, &testProto{stringValue("test")}}},
// Test for BEAM-4401
+ {[]wc{{"a", 23}, {"b", 42}, {"c", 5}}},
+ {[]*testProto{{}, {stringValue("test")}}}, // Test for BEAM-4401
}
for _, test := range tests {
diff --git a/sdks/go/pkg/beam/io/databaseio/database_test.go
b/sdks/go/pkg/beam/io/databaseio/database_test.go
index b93d5c9da72..f6c1355e851 100644
--- a/sdks/go/pkg/beam/io/databaseio/database_test.go
+++ b/sdks/go/pkg/beam/io/databaseio/database_test.go
@@ -22,6 +22,7 @@ import (
"github.com/apache/beam/sdks/v2/go/pkg/beam"
_ "github.com/apache/beam/sdks/v2/go/pkg/beam/runners/direct"
+ _ "github.com/apache/beam/sdks/v2/go/pkg/beam/runners/prism"
"github.com/apache/beam/sdks/v2/go/pkg/beam/testing/passert"
"github.com/apache/beam/sdks/v2/go/pkg/beam/testing/ptest"
_ "github.com/proullon/ramsql/driver"
diff --git a/sdks/go/pkg/beam/io/datastoreio/datastore_test.go
b/sdks/go/pkg/beam/io/datastoreio/datastore_test.go
index 345eaa2a59e..b95439e2d56 100644
--- a/sdks/go/pkg/beam/io/datastoreio/datastore_test.go
+++ b/sdks/go/pkg/beam/io/datastoreio/datastore_test.go
@@ -64,6 +64,11 @@ type Foo struct {
type Bar struct {
}
+func init() {
+ beam.RegisterType(reflect.TypeOf((*Foo)(nil)).Elem())
+ beam.RegisterType(reflect.TypeOf((*Bar)(nil)).Elem())
+}
+
func Test_query(t *testing.T) {
testCases := []struct {
v any
diff --git a/sdks/go/pkg/beam/pardo_test.go b/sdks/go/pkg/beam/pardo_test.go
index b88a6d642ea..56ed7e3e9fa 100644
--- a/sdks/go/pkg/beam/pardo_test.go
+++ b/sdks/go/pkg/beam/pardo_test.go
@@ -72,9 +72,9 @@ func testFunction() int64 {
func TestFormatParDoError(t *testing.T) {
got := formatParDoError(testFunction, 2, 1)
- want := "beam.testFunction has 2 outputs, but ParDo requires 1 outputs,
use ParDo2 instead."
+ want := "has 2 outputs, but ParDo requires 1 outputs, use ParDo2
instead."
if !strings.Contains(got, want) {
- t.Errorf("formatParDoError(testFunction,2,1) = %v, want = %v",
got, want)
+ t.Errorf("formatParDoError(testFunction,2,1) = \n%q want
=\n%q", got, want)
}
}
diff --git a/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go
b/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go
index c8721e1a207..5e1585ffcd1 100644
--- a/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go
+++ b/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go
@@ -707,7 +707,6 @@ func (ss *stageState) bundleReady(em *ElementManager)
(mtime.Time, bool) {
ready := true
for _, side := range ss.sides {
pID, ok := em.pcolParents[side]
- // These panics indicate pre-process/stage construction
problems.
if !ok {
panic(fmt.Sprintf("stage[%v] no parent ID for side
input %v", ss.ID, side))
}
diff --git a/sdks/go/pkg/beam/runners/prism/internal/handlerunner.go
b/sdks/go/pkg/beam/runners/prism/internal/handlerunner.go
index 27303f03b70..05b3d3bbaa0 100644
--- a/sdks/go/pkg/beam/runners/prism/internal/handlerunner.go
+++ b/sdks/go/pkg/beam/runners/prism/internal/handlerunner.go
@@ -43,7 +43,7 @@ import (
type RunnerCharacteristic struct {
SDKFlatten bool // Sets whether we should force an SDK side flatten.
SDKGBK bool // Sets whether the GBK should be handled by the SDK,
if possible by the SDK.
- SDKReshuffle bool
+ SDKReshuffle bool // Sets whether we should use the SDK backup
implementation to handle a Reshuffle.
}
func Runner(config any) *runner {
diff --git a/sdks/go/pkg/beam/runners/prism/internal/stage.go
b/sdks/go/pkg/beam/runners/prism/internal/stage.go
index e6fe28714b7..1a9c2548df8 100644
--- a/sdks/go/pkg/beam/runners/prism/internal/stage.go
+++ b/sdks/go/pkg/beam/runners/prism/internal/stage.go
@@ -50,6 +50,10 @@ type link struct {
// should in principle be able to connect two SDK environments directly
// instead of going through the runner at all, which would be a small
// efficiency gain, in runner memory use.
+//
+// That would also warrant an execution mode where fusion is taken into
+// account, but all serialization boundaries remain since the pcollections
+// would continue to get serialized.
type stage struct {
ID string
transforms []string
@@ -145,11 +149,11 @@ progress:
if previousIndex == index && !splitsDone {
sr, err := b.Split(wk, 0.5 /* fraction of
remainder */, nil /* allowed splits */)
if err != nil {
- slog.Debug("SDK Error from split,
aborting splits", "bundle", rb, "error", err.Error())
+ slog.Warn("SDK Error from split,
aborting splits", "bundle", rb, "error", err.Error())
break progress
}
if sr.GetChannelSplits() == nil {
- slog.Warn("split failed", "bundle", rb)
+ slog.Debug("SDK returned no splits",
"bundle", rb)
splitsDone = true
continue progress
}
diff --git a/sdks/go/pkg/beam/runners/prism/internal/unimplemented_test.go
b/sdks/go/pkg/beam/runners/prism/internal/unimplemented_test.go
index f738a299cfd..5f8d3875999 100644
--- a/sdks/go/pkg/beam/runners/prism/internal/unimplemented_test.go
+++ b/sdks/go/pkg/beam/runners/prism/internal/unimplemented_test.go
@@ -41,7 +41,6 @@ func TestUnimplemented(t *testing.T) {
tests := []struct {
pipeline func(s beam.Scope)
}{
- // These tests don't terminate, so can't be run.
// {pipeline: primitives.Drain}, // Can't test drain
automatically yet.
{pipeline: primitives.TestStreamBoolSequence},
diff --git a/sdks/go/pkg/beam/runners/prism/internal/worker/bundle.go
b/sdks/go/pkg/beam/runners/prism/internal/worker/bundle.go
index 30515fa6f6e..c931655f000 100644
--- a/sdks/go/pkg/beam/runners/prism/internal/worker/bundle.go
+++ b/sdks/go/pkg/beam/runners/prism/internal/worker/bundle.go
@@ -145,6 +145,7 @@ func (b *B) Cleanup(wk *W) {
wk.mu.Unlock()
}
+// Progress sends a progress request for the given bundle to the passed in
worker, blocking on the response.
func (b *B) Progress(wk *W) (*fnpb.ProcessBundleProgressResponse, error) {
resp := wk.sendInstruction(&fnpb.InstructionRequest{
Request: &fnpb.InstructionRequest_ProcessBundleProgress{
@@ -159,6 +160,7 @@ func (b *B) Progress(wk *W)
(*fnpb.ProcessBundleProgressResponse, error) {
return resp.GetProcessBundleProgress(), nil
}
+// Split sends a split request for the given bundle to the passed in worker,
blocking on the response.
func (b *B) Split(wk *W, fraction float64, allowedSplits []int64)
(*fnpb.ProcessBundleSplitResponse, error) {
resp := wk.sendInstruction(&fnpb.InstructionRequest{
Request: &fnpb.InstructionRequest_ProcessBundleSplit{
diff --git a/sdks/go/pkg/beam/runners/prism/internal/worker/worker.go
b/sdks/go/pkg/beam/runners/prism/internal/worker/worker.go
index eefab54a54c..a1d0ff79baf 100644
--- a/sdks/go/pkg/beam/runners/prism/internal/worker/worker.go
+++ b/sdks/go/pkg/beam/runners/prism/internal/worker/worker.go
@@ -256,7 +256,6 @@ func (wk *W) Control(ctrl fnpb.BeamFnControl_ControlServer)
error {
// TODO: Do more than assume these are
ProcessBundleResponses.
wk.mu.Lock()
if b, ok :=
wk.activeInstructions[resp.GetInstructionId()]; ok {
- // Error is handled in the resonse handler.
b.Respond(resp)
} else {
slog.Debug("ctrl.Recv: %v", resp)
@@ -268,7 +267,10 @@ func (wk *W) Control(ctrl
fnpb.BeamFnControl_ControlServer) error {
for {
select {
case req := <-wk.InstReqs:
- ctrl.Send(req)
+ err := ctrl.Send(req)
+ if err != nil {
+ return err
+ }
case <-ctrl.Context().Done():
slog.Debug("Control context canceled")
return ctrl.Context().Err()
@@ -322,7 +324,6 @@ func (wk *W) Data(data fnpb.BeamFnData_DataServer) error {
wk.mu.Unlock()
}
}()
-
for {
select {
case req, ok := <-wk.DataReqs:
diff --git a/sdks/go/pkg/beam/runners/universal/runnerlib/job.go
b/sdks/go/pkg/beam/runners/universal/runnerlib/job.go
index 8cbb274e184..4e50661b3db 100644
--- a/sdks/go/pkg/beam/runners/universal/runnerlib/job.go
+++ b/sdks/go/pkg/beam/runners/universal/runnerlib/job.go
@@ -103,7 +103,7 @@ func WaitForCompletion(ctx context.Context, client
jobpb.JobServiceClient, jobID
return errors.Wrap(err, "failed to get job stream")
}
- mostRecentError := errors.New("<no error received, see runner logs>")
+ mostRecentError := "<no error received>"
var errReceived, jobFailed bool
for {
@@ -111,8 +111,8 @@ func WaitForCompletion(ctx context.Context, client
jobpb.JobServiceClient, jobID
if err != nil {
if err == io.EOF {
if jobFailed {
- // Connection finished with a failed
status, so produce what we have.
- return errors.Errorf("job %v
failed:\n%w", jobID, mostRecentError)
+ // Connection finished, so time to
exit, produce what we have.
+ return errors.Errorf("job %v
failed:\n%v", jobID, mostRecentError)
}
return nil
}
@@ -123,7 +123,7 @@ func WaitForCompletion(ctx context.Context, client
jobpb.JobServiceClient, jobID
case msg.GetStateResponse() != nil:
resp := msg.GetStateResponse()
- log.Infof(ctx, "Job state: %v",
resp.GetState().String())
+ log.Infof(ctx, "Job[%v] state: %v", jobID,
resp.GetState().String())
switch resp.State {
case jobpb.JobState_DONE, jobpb.JobState_CANCELLED:
@@ -131,9 +131,9 @@ func WaitForCompletion(ctx context.Context, client
jobpb.JobServiceClient, jobID
case jobpb.JobState_FAILED:
jobFailed = true
if errReceived {
- return errors.Errorf("job %v
failed:\n%w", jobID, mostRecentError)
+ return errors.Errorf("job %v
failed:\n%v", jobID, mostRecentError)
}
- // Otherwise, wait for at least one error log
from the runner, or the connection to close.
+ // Otherwise we should wait for at least one
error log from the runner.
}
case msg.GetMessageResponse() != nil:
@@ -144,10 +144,10 @@ func WaitForCompletion(ctx context.Context, client
jobpb.JobServiceClient, jobID
if resp.GetImportance() >=
jobpb.JobMessage_JOB_MESSAGE_ERROR {
errReceived = true
- mostRecentError =
errors.New(resp.GetMessageText())
+ mostRecentError = resp.GetMessageText()
if jobFailed {
- return errors.Errorf("job %v
failed:\n%w", jobID, mostRecentError)
+ return errors.Errorf("job %v
failed:\n%w", jobID, errors.New(mostRecentError))
}
}
diff --git a/sdks/go/pkg/beam/runners/vet/vet.go
b/sdks/go/pkg/beam/runners/vet/vet.go
index 131fa0b1ec1..739f5db61c5 100644
--- a/sdks/go/pkg/beam/runners/vet/vet.go
+++ b/sdks/go/pkg/beam/runners/vet/vet.go
@@ -54,7 +54,7 @@ func init() {
type disabledResolver bool
func (p disabledResolver) Sym2Addr(name string) (uintptr, error) {
- return 0, errors.Errorf("%v not found. Use runtime.RegisterFunction in
unit tests", name)
+ return 0, errors.Errorf("%v not found. Register DoFns and functions
with the beam/register package.", name)
}
// Execute evaluates the pipeline on whether it can run without reflection.
diff --git a/sdks/go/test/integration/integration.go
b/sdks/go/test/integration/integration.go
index 0f9e5984ead..f66cc1f53bf 100644
--- a/sdks/go/test/integration/integration.go
+++ b/sdks/go/test/integration/integration.go
@@ -136,7 +136,6 @@ var portableFilters = []string{
"TestSetStateClear",
}
-// TODO(lostluck): set up a specific run for these.
var prismFilters = []string{
// The prism runner does not support the TestStream primitive
"TestTestStream.*",
@@ -149,7 +148,7 @@ var prismFilters = []string{
// TODO(BEAM-13215): GCP IOs currently do not work in non-Dataflow
portable runners.
"TestBigQueryIO.*",
"TestSpannerIO.*",
- // The prsim runner does not support pipeline drain for SDF.
+ // The prism runner does not support pipeline drain for SDF.
"TestDrain",
// FhirIO currently only supports Dataflow runner
"TestFhirIO.*",
diff --git a/website/www/site/content/en/documentation/programming-guide.md
b/website/www/site/content/en/documentation/programming-guide.md
index 82ada91f26a..b0118df3987 100644
--- a/website/www/site/content/en/documentation/programming-guide.md
+++ b/website/www/site/content/en/documentation/programming-guide.md
@@ -1124,6 +1124,9 @@ words = ...
{{< /highlight >}}
{{< highlight go >}}
+
+The Go SDK cannot support anonymous functions outside of the deprecated Go
Direct runner.
+
// words is the input PCollection of strings
var words beam.PCollection = ...
@@ -1170,8 +1173,8 @@ words = ...
{{< /highlight >}}
{{< highlight go >}}
-// words is the input PCollection of strings
-var words beam.PCollection = ...
+
+The Go SDK cannot support anonymous functions outside of the deprecated Go
Direct runner.
{{< code_sample "sdks/go/examples/snippets/04transforms.go"
model_pardo_apply_anon >}}
{{< /highlight >}}
@@ -1191,7 +1194,7 @@ words = ...
<span class="language-go">
-> **Note:** Anonymous function DoFns may not work on distributed runners.
+> **Note:** Anonymous function DoFns do not work on distributed runners.
> It's recommended to use named functions and register them with
> `register.FunctionXxY` in
> an `init()` block.