This is an automated email from the ASF dual-hosted git repository.
lostluck pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new db23583d022 [#28126] plumb coder errors with better context. (#28164)
db23583d022 is described below
commit db23583d0227abe3625aafae98655c45c9edcf84
Author: Robert Burke <[email protected]>
AuthorDate: Fri Aug 25 16:08:15 2023 -0700
[#28126] plumb coder errors with better context. (#28164)
* [#28126] plumb coder errors with better context.
* Add hard clear checks for SDK error: no windows being encoded.
* parse log
* fix log line parsing
* fmt
---------
Co-authored-by: lostluck <[email protected]>
---
sdks/go/pkg/beam/runners/prism/internal/coders.go | 32 +++++++++++++---------
.../pkg/beam/runners/prism/internal/coders_test.go | 6 ++--
.../prism/internal/engine/elementmanager.go | 14 ++++++++--
sdks/go/pkg/beam/runners/prism/internal/execute.go | 10 +++++--
.../beam/runners/prism/internal/handlerunner.go | 15 ++++++++--
sdks/go/pkg/beam/runners/prism/internal/stage.go | 19 ++++++++++---
.../beam/runners/prism/internal/worker/worker.go | 14 +++++++++-
.../runners/prism/internal/worker/worker_test.go | 13 +++++++--
8 files changed, 93 insertions(+), 30 deletions(-)
diff --git a/sdks/go/pkg/beam/runners/prism/internal/coders.go
b/sdks/go/pkg/beam/runners/prism/internal/coders.go
index a141440400e..64005177b94 100644
--- a/sdks/go/pkg/beam/runners/prism/internal/coders.go
+++ b/sdks/go/pkg/beam/runners/prism/internal/coders.go
@@ -53,9 +53,12 @@ func isLeafCoder(c *pipepb.Coder) bool {
//
// PCollection coders are not inherently WindowValueCoder wrapped, and they
are added by the runner
// for crossing the FnAPI boundary at data sources and data sinks.
-func makeWindowedValueCoder(pID string, comps *pipepb.Components, coders
map[string]*pipepb.Coder) string {
+func makeWindowedValueCoder(pID string, comps *pipepb.Components, coders
map[string]*pipepb.Coder) (string, error) {
col := comps.GetPcollections()[pID]
- cID := lpUnknownCoders(col.GetCoderId(), coders, comps.GetCoders())
+ cID, err := lpUnknownCoders(col.GetCoderId(), coders, comps.GetCoders())
+ if err != nil {
+ return "", fmt.Errorf("makeWindowedValueCoder: couldn't process
coder for pcollection %q %v: %w", pID, prototext.Format(col), err)
+ }
wcID :=
comps.GetWindowingStrategies()[col.GetWindowingStrategyId()].GetWindowCoderId()
// The runner needs to be defensive, and tell the SDK to Length Prefix
@@ -73,7 +76,7 @@ func makeWindowedValueCoder(pID string, comps
*pipepb.Components, coders map[str
}
// Populate the coders to send with the new windowed value coder.
coders[wvcID] = wInC
- return wvcID
+ return wvcID, nil
}
// makeWindowCoders makes the coder pair but behavior is ultimately determined
by the strategy's windowFn.
@@ -94,22 +97,22 @@ func makeWindowCoders(wc *pipepb.Coder)
(exec.WindowDecoder, exec.WindowEncoder)
// lpUnknownCoders takes a coder, and populates coders with any new coders
// coders that the runner needs to be safe, and speedy.
// It returns either the passed in coder id, or the new safe coder id.
-func lpUnknownCoders(cID string, bundle, base map[string]*pipepb.Coder) string
{
+func lpUnknownCoders(cID string, bundle, base map[string]*pipepb.Coder)
(string, error) {
// First check if we've already added the LP version of this coder to
coders already.
lpcID := cID + "_lp"
// Check if we've done this one before.
if _, ok := bundle[lpcID]; ok {
- return lpcID
+ return lpcID, nil
}
// All coders in the coders map have been processed.
if _, ok := bundle[cID]; ok {
- return cID
+ return cID, nil
}
// Look up the canonical location.
c, ok := base[cID]
if !ok {
// We messed up somewhere.
- panic(fmt.Sprint("unknown coder id:", cID))
+ return "", fmt.Errorf("lpUnknownCoders: coder %q not present in
base map", cID)
}
// Add the original coder to the coders map.
bundle[cID] = c
@@ -124,7 +127,7 @@ func lpUnknownCoders(cID string, bundle, base
map[string]*pipepb.Coder) string {
ComponentCoderIds: []string{cID},
}
bundle[lpcID] = lpc
- return lpcID
+ return lpcID, nil
}
// We know we have a composite, so if we count this as a leaf, move
everything to
// the coders map.
@@ -133,12 +136,15 @@ func lpUnknownCoders(cID string, bundle, base
map[string]*pipepb.Coder) string {
for _, cc := range c.GetComponentCoderIds() {
bundle[cc] = base[cc]
}
- return cID
+ return cID, nil
}
var needNewComposite bool
var comps []string
- for _, cc := range c.GetComponentCoderIds() {
- rcc := lpUnknownCoders(cc, bundle, base)
+ for i, cc := range c.GetComponentCoderIds() {
+ rcc, err := lpUnknownCoders(cc, bundle, base)
+ if err != nil {
+ return "", fmt.Errorf("lpUnknownCoders: couldn't handle
component %d %q of %q %v:\n%w", i, cc, cID, prototext.Format(c), err)
+ }
if cc != rcc {
needNewComposite = true
}
@@ -150,9 +156,9 @@ func lpUnknownCoders(cID string, bundle, base
map[string]*pipepb.Coder) string {
ComponentCoderIds: comps,
}
bundle[lpcID] = lpc
- return lpcID
+ return lpcID, nil
}
- return cID
+ return cID, nil
}
// reconcileCoders ensures that the bundle coders are primed with initial
coders from
diff --git a/sdks/go/pkg/beam/runners/prism/internal/coders_test.go
b/sdks/go/pkg/beam/runners/prism/internal/coders_test.go
index c6e32c895fe..3f9557ff836 100644
--- a/sdks/go/pkg/beam/runners/prism/internal/coders_test.go
+++ b/sdks/go/pkg/beam/runners/prism/internal/coders_test.go
@@ -62,7 +62,7 @@ func Test_isLeafCoder(t *testing.T) {
func Test_makeWindowedValueCoder(t *testing.T) {
coders := map[string]*pipepb.Coder{}
- gotID := makeWindowedValueCoder("testPID", &pipepb.Components{
+ gotID, err := makeWindowedValueCoder("testPID", &pipepb.Components{
Pcollections: map[string]*pipepb.PCollection{
"testPID": {CoderId: "testCoderID"},
},
@@ -74,7 +74,9 @@ func Test_makeWindowedValueCoder(t *testing.T) {
},
},
}, coders)
-
+ if err != nil {
+ t.Errorf("makeWindowedValueCoder(...) = error %v, want nil",
err)
+ }
if gotID == "" {
t.Errorf("makeWindowedValueCoder(...) = %v, want non-empty",
gotID)
}
diff --git a/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go
b/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go
index 5e1585ffcd1..fb9c9802502 100644
--- a/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go
+++ b/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go
@@ -287,8 +287,12 @@ func reElementResiduals(residuals [][]byte, inputInfo
PColInfo, rb RunBundle) []
if err == io.EOF {
break
}
- slog.Error("reElementResiduals: error decoding residual
header", err, "bundle", rb)
- panic("error decoding residual header")
+ slog.Error("reElementResiduals: error decoding residual
header", "error", err, "bundle", rb)
+ panic("error decoding residual header:" + err.Error())
+ }
+ if len(ws) == 0 {
+ slog.Error("reElementResiduals: sdk provided a windowed
value header 0 windows", "bundle", rb)
+ panic("error decoding residual header: sdk provided a
windowed value header 0 windows")
}
for _, w := range ws {
@@ -332,9 +336,13 @@ func (em *ElementManager) PersistBundle(rb RunBundle,
col2Coders map[string]PCol
if err == io.EOF {
break
}
- slog.Error("PersistBundle: error
decoding watermarks", err, "bundle", rb, slog.String("output", output))
+ slog.Error("PersistBundle: error
decoding watermarks", "error", err, "bundle", rb, slog.String("output", output))
panic("error decoding watermarks")
}
+ if len(ws) == 0 {
+ slog.Error("PersistBundle: sdk provided
a windowed value header 0 windows", "bundle", rb)
+ panic("error decoding residual header:
sdk provided a windowed value header 0 windows")
+ }
// TODO: Optimize unnecessary copies. This is
doubleteeing.
elmBytes := info.EDec(tee)
for _, w := range ws {
diff --git a/sdks/go/pkg/beam/runners/prism/internal/execute.go
b/sdks/go/pkg/beam/runners/prism/internal/execute.go
index f8b6b6f33ab..e9c898699c7 100644
--- a/sdks/go/pkg/beam/runners/prism/internal/execute.go
+++ b/sdks/go/pkg/beam/runners/prism/internal/execute.go
@@ -297,13 +297,19 @@ func executePipeline(ctx context.Context, wk *worker.W, j
*jobservices.Job) erro
}
func collectionPullDecoder(coldCId string, coders map[string]*pipepb.Coder,
comps *pipepb.Components) func(io.Reader) []byte {
- cID := lpUnknownCoders(coldCId, coders, comps.GetCoders())
+ cID, err := lpUnknownCoders(coldCId, coders, comps.GetCoders())
+ if err != nil {
+ panic(err)
+ }
return pullDecoder(coders[cID], coders)
}
func getWindowValueCoders(comps *pipepb.Components, col *pipepb.PCollection,
coders map[string]*pipepb.Coder) (exec.WindowDecoder, exec.WindowEncoder) {
ws := comps.GetWindowingStrategies()[col.GetWindowingStrategyId()]
- wcID := lpUnknownCoders(ws.GetWindowCoderId(), coders,
comps.GetCoders())
+ wcID, err := lpUnknownCoders(ws.GetWindowCoderId(), coders,
comps.GetCoders())
+ if err != nil {
+ panic(err)
+ }
return makeWindowCoders(coders[wcID])
}
diff --git a/sdks/go/pkg/beam/runners/prism/internal/handlerunner.go
b/sdks/go/pkg/beam/runners/prism/internal/handlerunner.go
index 05b3d3bbaa0..3f699e47e67 100644
--- a/sdks/go/pkg/beam/runners/prism/internal/handlerunner.go
+++ b/sdks/go/pkg/beam/runners/prism/internal/handlerunner.go
@@ -162,9 +162,18 @@ func (h *runner) ExecuteTransform(stageID, tid string, t
*pipepb.PTransform, com
coders := map[string]*pipepb.Coder{}
// TODO assert this is a KV. It's probably fine, but we should
fail anyway.
- wcID := lpUnknownCoders(ws.GetWindowCoderId(), coders,
comps.GetCoders())
- kcID := lpUnknownCoders(kvc.GetComponentCoderIds()[0], coders,
comps.GetCoders())
- ecID := lpUnknownCoders(kvc.GetComponentCoderIds()[1], coders,
comps.GetCoders())
+ wcID, err := lpUnknownCoders(ws.GetWindowCoderId(), coders,
comps.GetCoders())
+ if err != nil {
+ panic(fmt.Errorf("ExecuteTransform[GBK] stage %v,
transform %q %v: couldn't process window coder:\n%w", stageID, tid,
prototext.Format(t), err))
+ }
+ kcID, err := lpUnknownCoders(kvc.GetComponentCoderIds()[0],
coders, comps.GetCoders())
+ if err != nil {
+ panic(fmt.Errorf("ExecuteTransform[GBK] stage %v,
transform %q %v: couldn't process key coder:\n%w", stageID, tid,
prototext.Format(t), err))
+ }
+ ecID, err := lpUnknownCoders(kvc.GetComponentCoderIds()[1],
coders, comps.GetCoders())
+ if err != nil {
+ panic(fmt.Errorf("ExecuteTransform[GBK] stage %v,
transform %q %v: couldn't process value coder:\n%w", stageID, tid,
prototext.Format(t), err))
+ }
reconcileCoders(coders, comps.GetCoders())
wc := coders[wcID]
diff --git a/sdks/go/pkg/beam/runners/prism/internal/stage.go
b/sdks/go/pkg/beam/runners/prism/internal/stage.go
index b2cbe23588d..3f4451d7db3 100644
--- a/sdks/go/pkg/beam/runners/prism/internal/stage.go
+++ b/sdks/go/pkg/beam/runners/prism/internal/stage.go
@@ -33,6 +33,7 @@ import (
"github.com/apache/beam/sdks/v2/go/pkg/beam/runners/prism/internal/worker"
"golang.org/x/exp/maps"
"golang.org/x/exp/slog"
+ "google.golang.org/protobuf/encoding/prototext"
"google.golang.org/protobuf/proto"
)
@@ -290,9 +291,12 @@ func buildDescriptor(stg *stage, comps *pipepb.Components,
wk *worker.W) error {
sink2Col := map[string]string{}
col2Coders := map[string]engine.PColInfo{}
for _, o := range stg.outputs {
- wOutCid := makeWindowedValueCoder(o.global, comps, coders)
- sinkID := o.transform + "_" + o.local
col := comps.GetPcollections()[o.global]
+ wOutCid, err := makeWindowedValueCoder(o.global, comps, coders)
+ if err != nil {
+ return fmt.Errorf("buildDescriptor: failed to handle
coder on stage %v for output %+v, pcol %q %v:\n%w", stg.ID, o, o.global,
prototext.Format(col), err)
+ }
+ sinkID := o.transform + "_" + o.local
ed := collectionPullDecoder(col.GetCoderId(), coders, comps)
wDec, wEnc := getWindowValueCoders(comps, col, coders)
sink2Col[sinkID] = o.global
@@ -311,7 +315,10 @@ func buildDescriptor(stg *stage, comps *pipepb.Components,
wk *worker.W) error {
for _, si := range stg.sideInputs {
col := comps.GetPcollections()[si.global]
oCID := col.GetCoderId()
- nCID := lpUnknownCoders(oCID, coders, comps.GetCoders())
+ nCID, err := lpUnknownCoders(oCID, coders, comps.GetCoders())
+ if err != nil {
+ return fmt.Errorf("buildDescriptor: failed to handle
coder on stage %v for side input %+v, pcol %q %v:\n%w", stg.ID, si, si.global,
prototext.Format(col), err)
+ }
sides = append(sides, si.global)
if oCID != nCID {
@@ -339,9 +346,13 @@ func buildDescriptor(stg *stage, comps *pipepb.Components,
wk *worker.W) error {
// This id is directly used for the source, but this also copies
// coders used by side inputs to the coders map for the bundle, so
// needs to be run for every ID.
- wInCid := makeWindowedValueCoder(stg.primaryInput, comps, coders)
col := comps.GetPcollections()[stg.primaryInput]
+ wInCid, err := makeWindowedValueCoder(stg.primaryInput, comps, coders)
+ if err != nil {
+ return fmt.Errorf("buildDescriptor: failed to handle coder on
stage %v for primary input, pcol %q %v:\n%w", stg.ID, stg.primaryInput,
prototext.Format(col), err)
+ }
+
ed := collectionPullDecoder(col.GetCoderId(), coders, comps)
wDec, wEnc := getWindowValueCoders(comps, col, coders)
inputInfo := engine.PColInfo{
diff --git a/sdks/go/pkg/beam/runners/prism/internal/worker/worker.go
b/sdks/go/pkg/beam/runners/prism/internal/worker/worker.go
index a1d0ff79baf..dab831c20af 100644
--- a/sdks/go/pkg/beam/runners/prism/internal/worker/worker.go
+++ b/sdks/go/pkg/beam/runners/prism/internal/worker/worker.go
@@ -23,6 +23,8 @@ import (
"fmt"
"io"
"net"
+ "strconv"
+ "strings"
"sync"
"sync/atomic"
@@ -191,8 +193,18 @@ func (wk *W) Logging(stream
fnpb.BeamFnLogging_LoggingServer) error {
if l.Severity >= minsev {
// TODO: Connect to the associated Job for this
worker instead of
// logging locally for SDK side logging.
+ file := l.GetLogLocation()
+ i := strings.LastIndex(file, ":")
+ line, _ := strconv.Atoi(file[i+1:])
+ if i > 0 {
+ file = file[:i]
+ }
+
slog.LogAttrs(context.TODO(),
toSlogSev(l.GetSeverity()), l.GetMessage(),
- slog.String(slog.SourceKey,
l.GetLogLocation()),
+ slog.Any(slog.SourceKey, &slog.Source{
+ File: file,
+ Line: line,
+ }),
slog.Time(slog.TimeKey,
l.GetTimestamp().AsTime()),
slog.Any("worker", wk),
)
diff --git a/sdks/go/pkg/beam/runners/prism/internal/worker/worker_test.go
b/sdks/go/pkg/beam/runners/prism/internal/worker/worker_test.go
index 68dc3fd917e..060c073fa12 100644
--- a/sdks/go/pkg/beam/runners/prism/internal/worker/worker_test.go
+++ b/sdks/go/pkg/beam/runners/prism/internal/worker/worker_test.go
@@ -119,8 +119,17 @@ func TestWorker_Logging(t *testing.T) {
logStream.Send(&fnpb.LogEntry_List{
LogEntries: []*fnpb.LogEntry{{
- Severity: fnpb.LogEntry_Severity_INFO,
- Message: "squeamish ossiphrage",
+ Severity: fnpb.LogEntry_Severity_INFO,
+ Message: "squeamish ossiphrage",
+ LogLocation: "intentionally.go:124",
+ }},
+ })
+
+ logStream.Send(&fnpb.LogEntry_List{
+ LogEntries: []*fnpb.LogEntry{{
+ Severity: fnpb.LogEntry_Severity_INFO,
+ Message: "squeamish ossiphrage the second",
+ LogLocation: "intentionally bad log location",
}},
})