This is an automated email from the ASF dual-hosted git repository.
lostluck pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new ded686a [BEAM-6374] Emit PCollection metrics from GoSDK (#10942)
ded686a is described below
commit ded686a58ad4747e91a26d3e59f61019b641e655
Author: Robert Burke <[email protected]>
AuthorDate: Wed Mar 4 13:03:22 2020 -0800
[BEAM-6374] Emit PCollection metrics from GoSDK (#10942)
---
sdks/go/pkg/beam/core/runtime/exec/datasource.go | 77 ++++++++---
.../pkg/beam/core/runtime/exec/datasource_test.go | 39 ++++--
sdks/go/pkg/beam/core/runtime/exec/pcollection.go | 153 ++++++++++++++++++++
.../pkg/beam/core/runtime/exec/pcollection_test.go | 154 +++++++++++++++++++++
sdks/go/pkg/beam/core/runtime/exec/plan.go | 75 +++++-----
sdks/go/pkg/beam/core/runtime/exec/translate.go | 31 ++++-
sdks/go/pkg/beam/core/runtime/harness/harness.go | 24 +++-
.../go/pkg/beam/core/runtime/harness/monitoring.go | 35 +++--
sdks/go/pkg/beam/runners/direct/direct.go | 3 +-
9 files changed, 491 insertions(+), 100 deletions(-)
diff --git a/sdks/go/pkg/beam/core/runtime/exec/datasource.go
b/sdks/go/pkg/beam/core/runtime/exec/datasource.go
index ccc4a09..fb20aea 100644
--- a/sdks/go/pkg/beam/core/runtime/exec/datasource.go
+++ b/sdks/go/pkg/beam/core/runtime/exec/datasource.go
@@ -36,14 +36,14 @@ type DataSource struct {
Name string
Coder *coder.Coder
Out Node
+ PCol PCollection // Handles size metrics. Value instead of pointer so
it's initialized by default in tests.
source DataManager
state StateReader
- // TODO(lostluck) 2020/02/06: refactor to support more general
PCollection metrics on nodes.
- outputPID string // The index is the output count for the PCollection.
- index int64
- splitIdx int64
- start time.Time
+
+ index int64
+ splitIdx int64
+ start time.Time
mu sync.Mutex
}
@@ -70,6 +70,29 @@ func (n *DataSource) StartBundle(ctx context.Context, id
string, data DataContex
return n.Out.StartBundle(ctx, id, data)
}
+// ByteCountReader is a passthrough reader that counts all the bytes read
through it.
+// It trusts the nested reader to return accurate byte information.
+type byteCountReader struct {
+ count *int
+ reader io.ReadCloser
+}
+
+func (r *byteCountReader) Read(p []byte) (int, error) {
+ n, err := r.reader.Read(p)
+ *r.count += n
+ return n, err
+}
+
+func (r *byteCountReader) Close() error {
+ return r.reader.Close()
+}
+
+func (r *byteCountReader) reset() int {
+ c := *r.count
+ *r.count = 0
+ return c
+}
+
// Process opens the data source, reads and decodes data, kicking off element
processing.
func (n *DataSource) Process(ctx context.Context) error {
r, err := n.source.OpenRead(ctx, n.SID)
@@ -77,6 +100,9 @@ func (n *DataSource) Process(ctx context.Context) error {
return err
}
defer r.Close()
+ n.PCol.resetSize() // initialize the size distribution for this bundle.
+ var byteCount int
+ bcr := byteCountReader{reader: r, count: &byteCount}
c := coder.SkipW(n.Coder)
wc := MakeWindowDecoder(n.Coder.Window)
@@ -99,6 +125,7 @@ func (n *DataSource) Process(ctx context.Context) error {
if n.incrementIndexAndCheckSplit() {
return nil
}
+ // TODO(lostluck) 2020/02/22: Should we include window headers
or just count the element sizes?
ws, t, err := DecodeWindowedValueHeader(wc, r)
if err != nil {
if err == io.EOF {
@@ -108,7 +135,7 @@ func (n *DataSource) Process(ctx context.Context) error {
}
// Decode key or parallel element.
- pe, err := cp.Decode(r)
+ pe, err := cp.Decode(&bcr)
if err != nil {
return errors.Wrap(err, "source decode failed")
}
@@ -117,7 +144,7 @@ func (n *DataSource) Process(ctx context.Context) error {
var valReStreams []ReStream
for _, cv := range cvs {
- values, err := n.makeReStream(ctx, pe, cv, r)
+ values, err := n.makeReStream(ctx, pe, cv, &bcr)
if err != nil {
return err
}
@@ -127,11 +154,15 @@ func (n *DataSource) Process(ctx context.Context) error {
if err := n.Out.ProcessElement(ctx, pe, valReStreams...); err
!= nil {
return err
}
+ // Collect the actual size of the element, and reset the
bytecounter reader.
+ n.PCol.addSize(int64(bcr.reset()))
+ bcr.reader = r
}
}
-func (n *DataSource) makeReStream(ctx context.Context, key *FullValue, cv
ElementDecoder, r io.ReadCloser) (ReStream, error) {
- size, err := coder.DecodeInt32(r)
+func (n *DataSource) makeReStream(ctx context.Context, key *FullValue, cv
ElementDecoder, bcr *byteCountReader) (ReStream, error) {
+ // TODO(lostluck) 2020/02/22: Do we include the chunk size, or just the
element sizes?
+ size, err := coder.DecodeInt32(bcr.reader)
if err != nil {
return nil, errors.Wrap(err, "stream size decoding failed")
}
@@ -140,16 +171,16 @@ func (n *DataSource) makeReStream(ctx context.Context,
key *FullValue, cv Elemen
case size >= 0:
// Single chunk streams are fully read in and buffered in
memory.
buf := make([]FullValue, 0, size)
- buf, err = readStreamToBuffer(cv, r, int64(size), buf)
+ buf, err = readStreamToBuffer(cv, bcr, int64(size), buf)
if err != nil {
return nil, err
}
return &FixedReStream{Buf: buf}, nil
- case size == -1: // Shouldn't this be 0?
+ case size == -1:
// Multi-chunked stream.
var buf []FullValue
for {
- chunk, err := coder.DecodeVarInt(r)
+ chunk, err := coder.DecodeVarInt(bcr.reader)
if err != nil {
return nil, errors.Wrap(err, "stream chunk size
decoding failed")
}
@@ -159,17 +190,17 @@ func (n *DataSource) makeReStream(ctx context.Context,
key *FullValue, cv Elemen
return &FixedReStream{Buf: buf}, nil
case chunk > 0: // Non-zero chunk, read that many
elements from the stream, and buffer them.
chunkBuf := make([]FullValue, 0, chunk)
- chunkBuf, err = readStreamToBuffer(cv, r,
chunk, chunkBuf)
+ chunkBuf, err = readStreamToBuffer(cv, bcr,
chunk, chunkBuf)
if err != nil {
return nil, err
}
buf = append(buf, chunkBuf...)
case chunk == -1: // State backed iterable!
- chunk, err := coder.DecodeVarInt(r)
+ chunk, err := coder.DecodeVarInt(bcr.reader)
if err != nil {
return nil, err
}
- token, err := ioutilx.ReadN(r, (int)(chunk))
+ token, err := ioutilx.ReadN(bcr.reader,
(int)(chunk))
if err != nil {
return nil, err
}
@@ -181,6 +212,9 @@ func (n *DataSource) makeReStream(ctx context.Context, key
*FullValue, cv Elemen
if err != nil {
return nil, err
}
+ // We can't re-use the
original bcr, since we may get new iterables,
+ // or multiple of them
at the same time, but we can re-use the count itself.
+ r =
&byteCountReader{reader: r, count: bcr.count}
return
&elementStream{r: r, ec: cv}, nil
},
},
@@ -241,12 +275,11 @@ func (n *DataSource) incrementIndexAndCheckSplit() bool {
}
// ProgressReportSnapshot captures the progress reading an input source.
-//
-// TODO(lostluck) 2020/02/06: Add a visitor pattern for collecting progress
-// metrics from downstream Nodes.
type ProgressReportSnapshot struct {
- ID, Name, PID string
- Count int64
+ ID, Name string
+ Count int64
+
+ pcol PCollectionSnapshot
}
// Progress returns a snapshot of the source's progress.
@@ -255,6 +288,7 @@ func (n *DataSource) Progress() ProgressReportSnapshot {
return ProgressReportSnapshot{}
}
n.mu.Lock()
+ pcol := n.PCol.snapshot()
// The count is the number of "completely processed elements"
// which matches the index of the currently processing element.
c := n.index
@@ -263,7 +297,8 @@ func (n *DataSource) Progress() ProgressReportSnapshot {
if c < 0 {
c = 0
}
- return ProgressReportSnapshot{PID: n.outputPID, ID: n.SID.PtransformID,
Name: n.Name, Count: c}
+ pcol.ElementCount = c
+ return ProgressReportSnapshot{ID: n.SID.PtransformID, Name: n.Name,
Count: c, pcol: pcol}
}
// Split takes a sorted set of potential split indices, selects and actuates
diff --git a/sdks/go/pkg/beam/core/runtime/exec/datasource_test.go
b/sdks/go/pkg/beam/core/runtime/exec/datasource_test.go
index 1ce493c..3037c84 100644
--- a/sdks/go/pkg/beam/core/runtime/exec/datasource_test.go
+++ b/sdks/go/pkg/beam/core/runtime/exec/datasource_test.go
@@ -201,6 +201,16 @@ func TestDataSource_Iterators(t *testing.T) {
if got, want := iVals, expectedKeys; !equalList(got,
want) {
t.Errorf("DataSource => %#v, want %#v",
extractValues(got...), extractValues(want...))
}
+
+ // We're using integers that encode to 1 byte, so do
some quick math to validate.
+ sizeOfSmallInt := 1
+ snap := quickTestSnapshot(source, int64(len(test.keys)))
+ snap.pcol.SizeSum = int64(len(test.keys) * (1 +
len(test.vals)) * sizeOfSmallInt)
+ snap.pcol.SizeMin = int64((1 + len(test.vals)) *
sizeOfSmallInt)
+ snap.pcol.SizeMax = int64((1 + len(test.vals)) *
sizeOfSmallInt)
+ if got, want := source.Progress(), snap; got != want {
+ t.Errorf("progress didn't match: got %v, want
%v", got, want)
+ }
})
}
}
@@ -358,15 +368,6 @@ func TestDataSource_Split(t *testing.T) {
})
validateSource(t, out, source,
makeValues(test.expected...))
-
- // Adjust expectations to maximum number of
elements.
- adjustedExpectation := test.splitIdx
- if adjustedExpectation > int64(len(elements)) {
- adjustedExpectation =
int64(len(elements))
- }
- if got, want := source.Progress().Count,
adjustedExpectation; got != want {
- t.Fatalf("progress didn't match split:
got %v, want %v", got, want)
- }
})
}
})
@@ -464,13 +465,29 @@ func constructAndExecutePlanWithContext(t *testing.T, us
[]Unit, dc DataContext)
}
}
+func quickTestSnapshot(source *DataSource, count int64) ProgressReportSnapshot
{
+ return ProgressReportSnapshot{
+ Name: source.Name,
+ ID: source.SID.PtransformID,
+ Count: count,
+ pcol: PCollectionSnapshot{
+ ElementCount: count,
+ SizeCount: count,
+ SizeSum: count,
+ // We're only encoding small ints here, so size will
only be 1.
+ SizeMin: 1,
+ SizeMax: 1,
+ },
+ }
+}
+
func validateSource(t *testing.T, out *CaptureNode, source *DataSource,
expected []FullValue) {
t.Helper()
if got, want := len(out.Elements), len(expected); got != want {
t.Fatalf("lengths don't match: got %v, want %v", got, want)
}
- if got, want := source.Progress().Count, int64(len(expected)); got !=
want {
- t.Fatalf("progress count didn't match: got %v, want %v", got,
want)
+ if got, want := source.Progress(), quickTestSnapshot(source,
int64(len(expected))); got != want {
+ t.Fatalf("progress snapshot didn't match: got %v, want %v",
got, want)
}
if !equalList(out.Elements, expected) {
t.Errorf("DataSource => %#v, want %#v",
extractValues(out.Elements...), extractValues(expected...))
diff --git a/sdks/go/pkg/beam/core/runtime/exec/pcollection.go
b/sdks/go/pkg/beam/core/runtime/exec/pcollection.go
new file mode 100644
index 0000000..d8fb024
--- /dev/null
+++ b/sdks/go/pkg/beam/core/runtime/exec/pcollection.go
@@ -0,0 +1,153 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package exec
+
+import (
+ "context"
+ "fmt"
+ "math"
+ "math/rand"
+ "sync"
+ "sync/atomic"
+
+ "github.com/apache/beam/sdks/go/pkg/beam/core/graph/coder"
+)
+
+// PCollection is a passthrough node to collect PCollection metrics, and
+// must be placed as the Out node of any producer of a PCollection.
+//
+// In particular, must not be placed after a Multiplex, and must be placed
+// after a Flatten.
+type PCollection struct {
+ UID UnitID
+ PColID string
+ Out Node // Out is the consumer of this PCollection.
+ Coder *coder.Coder
+ Seed int64
+
+ r *rand.Rand
+ nextSampleIdx int64 // The index of the next value to sample.
+ elementCoder ElementEncoder
+
+ elementCount int64 // must use atomic
operations.
+ sizeMu sync.Mutex
+ sizeCount, sizeSum, sizeMin, sizeMax int64
+}
+
+// ID returns the debug id for this unit.
+func (p *PCollection) ID() UnitID {
+ return p.UID
+}
+
+// Up initializes the random sampling source and element encoder.
+func (p *PCollection) Up(ctx context.Context) error {
+ // dedicated rand source
+ p.r = rand.New(rand.NewSource(p.Seed))
+ p.elementCoder = MakeElementEncoder(p.Coder)
+ return nil
+}
+
+// StartBundle resets collected metrics for this PCollection, and propagates
bundle start.
+func (p *PCollection) StartBundle(ctx context.Context, id string, data
DataContext) error {
+ atomic.StoreInt64(&p.elementCount, 0)
+ p.nextSampleIdx = p.r.Int63n(3)
+ p.resetSize()
+ return MultiStartBundle(ctx, id, data, p.Out)
+}
+
+type byteCounter struct {
+ count int
+}
+
+func (w *byteCounter) Write(p []byte) (n int, err error) {
+ w.count += len(p)
+ return len(p), nil
+}
+
+// ProcessElement increments the element count and sometimes takes size
samples of the elements.
+func (p *PCollection) ProcessElement(ctx context.Context, elm *FullValue,
values ...ReStream) error {
+ cur := atomic.AddInt64(&p.elementCount, 1)
+ if cur == p.nextSampleIdx {
+ // We pick the next sampling index based on how large this
pcollection already is.
+ // We don't want to necessarily wait until the pcollection has
doubled, so we reduce the range.
+ // We don't want to always encode the first consecutive
elements, so we add 2 to give some variance.
+ // Finally we add 1 no matter what, so that there's always the
potential to trigger again.
+ // Otherwise, there's the potential for the random int to be 0,
which means we don't change the
+ // nextSampleIdx at all.
+ p.nextSampleIdx = cur + p.r.Int63n(cur/10+2) + 1
+ var w byteCounter
+ p.elementCoder.Encode(elm, &w)
+ p.addSize(int64(w.count))
+ }
+ return p.Out.ProcessElement(ctx, elm, values...)
+}
+
+func (p *PCollection) addSize(size int64) {
+ p.sizeMu.Lock()
+ defer p.sizeMu.Unlock()
+ p.sizeCount++
+ p.sizeSum += size
+ if size > p.sizeMax {
+ p.sizeMax = size
+ }
+ if size < p.sizeMin {
+ p.sizeMin = size
+ }
+}
+
+func (p *PCollection) resetSize() {
+ p.sizeMu.Lock()
+ defer p.sizeMu.Unlock()
+ p.sizeCount = 0
+ p.sizeSum = 0
+ p.sizeMax = math.MinInt64
+ p.sizeMin = math.MaxInt64
+}
+
+// FinishBundle propagates bundle termination.
+func (p *PCollection) FinishBundle(ctx context.Context) error {
+ return MultiFinishBundle(ctx, p.Out)
+}
+
+// Down is a no-op.
+func (p *PCollection) Down(ctx context.Context) error {
+ return nil
+}
+
+func (p *PCollection) String() string {
+ return fmt.Sprintf("PCollection[%v] Out:%v", p.PColID, IDs(p.Out))
+}
+
+// PCollectionSnapshot contains the PCollectionID
+type PCollectionSnapshot struct {
+ ID string
+ ElementCount int64
+ // If SizeCount is zero, then no size metrics should be exported.
+ SizeCount, SizeSum, SizeMin, SizeMax int64
+}
+
+func (p *PCollection) snapshot() PCollectionSnapshot {
+ p.sizeMu.Lock()
+ defer p.sizeMu.Unlock()
+ return PCollectionSnapshot{
+ ID: p.PColID,
+ ElementCount: atomic.LoadInt64(&p.elementCount),
+ SizeCount: p.sizeCount,
+ SizeSum: p.sizeSum,
+ SizeMin: p.sizeMin,
+ SizeMax: p.sizeMax,
+ }
+}
diff --git a/sdks/go/pkg/beam/core/runtime/exec/pcollection_test.go
b/sdks/go/pkg/beam/core/runtime/exec/pcollection_test.go
new file mode 100644
index 0000000..f7f24a7
--- /dev/null
+++ b/sdks/go/pkg/beam/core/runtime/exec/pcollection_test.go
@@ -0,0 +1,154 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package exec
+
+import (
+ "context"
+ "math"
+ "testing"
+
+ "github.com/apache/beam/sdks/go/pkg/beam/core/graph/coder"
+ "github.com/apache/beam/sdks/go/pkg/beam/core/graph/mtime"
+ "github.com/apache/beam/sdks/go/pkg/beam/core/graph/window"
+)
+
+// TestPCollection verifies that the PCollection node works correctly.
+// Seed is by default set to 0, so we have a "deterministic" set of
+// randomness for the samples.
+func TestPCollection(t *testing.T) {
+ a := &CaptureNode{UID: 1}
+ pcol := &PCollection{UID: 2, Out: a, Coder: coder.NewVarInt()}
+ // The "large" 2nd value is to ensure the values are encoded properly,
+ // and that Min & Max are behaving.
+ inputs := []interface{}{int64(1), int64(2000000000), int64(3)}
+ in := &FixedRoot{UID: 3, Elements: makeInput(inputs...), Out: pcol}
+
+ p, err := NewPlan("a", []Unit{a, pcol, in})
+ if err != nil {
+ t.Fatalf("failed to construct plan: %v", err)
+ }
+
+ if err := p.Execute(context.Background(), "1", DataContext{}); err !=
nil {
+ t.Fatalf("execute failed: %v", err)
+ }
+ if err := p.Down(context.Background()); err != nil {
+ t.Fatalf("down failed: %v", err)
+ }
+
+ expected := makeValues(inputs...)
+ if !equalList(a.Elements, expected) {
+ t.Errorf("multiplex returned %v for a, want %v",
extractValues(a.Elements...), extractValues(expected...))
+ }
+ snap := pcol.snapshot()
+ if want, got := int64(len(expected)), snap.ElementCount; got != want {
+ t.Errorf("snapshot miscounted: got %v, want %v", got, want)
+ }
+ checkPCollectionSizeSample(t, snap, 2, 6, 1, 5)
+}
+
+func TestPCollection_sizeReset(t *testing.T) {
+ // Check the initial values after resetting.
+ var pcol PCollection
+ pcol.resetSize()
+ snap := pcol.snapshot()
+ checkPCollectionSizeSample(t, snap, 0, 0, math.MaxInt64, math.MinInt64)
+}
+
+func checkPCollectionSizeSample(t *testing.T, snap PCollectionSnapshot, count,
sum, min, max int64) {
+ t.Helper()
+ if want, got := int64(count), snap.SizeCount; got != want {
+ t.Errorf("sample count incorrect: got %v, want %v", got, want)
+ }
+ if want, got := int64(sum), snap.SizeSum; got != want {
+ t.Errorf("sample sum incorrect: got %v, want %v", got, want)
+ }
+ if want, got := int64(min), snap.SizeMin; got != want {
+ t.Errorf("sample min incorrect: got %v, want %v", got, want)
+ }
+ if want, got := int64(max), snap.SizeMax; got != want {
+ t.Errorf("sample max incorrect: got %v, want %v", got, want)
+ }
+}
+
+// BenchmarkPCollection measures the overhead of invoking a ParDo in a plan.
+//
+// On @lostluck's desktop (2020/02/20):
+// BenchmarkPCollection-12 44699806 24.8 ns/op
0 B/op 0 allocs/op
+func BenchmarkPCollection(b *testing.B) {
+ // Pre allocate the capture buffer and process buffer to avoid
+ // unnecessary overhead.
+ out := &CaptureNode{UID: 1, Elements: make([]FullValue, 0, b.N)}
+ process := make([]MainInput, 0, b.N)
+ for i := 0; i < b.N; i++ {
+ process = append(process, MainInput{Key: FullValue{
+ Windows: window.SingleGlobalWindow,
+ Timestamp: mtime.ZeroTimestamp,
+ Elm: int64(1),
+ }})
+ }
+ pcol := &PCollection{UID: 2, Out: out, Coder: coder.NewVarInt()}
+ n := &FixedRoot{UID: 3, Elements: process, Out: pcol}
+ p, err := NewPlan("a", []Unit{n, pcol, out})
+ if err != nil {
+ b.Fatalf("failed to construct plan: %v", err)
+ }
+ b.ResetTimer()
+ if err := p.Execute(context.Background(), "1", DataContext{}); err !=
nil {
+ b.Fatalf("execute failed: %v", err)
+ }
+ if err := p.Down(context.Background()); err != nil {
+ b.Fatalf("down failed: %v", err)
+ }
+ if got, want := pcol.snapshot().ElementCount, int64(b.N); got != want {
+ b.Errorf("did not process all elements: got %v, want %v", got,
want)
+ }
+ if got, want := len(out.Elements), b.N; got != want {
+ b.Errorf("did not process all elements: got %v, want %v", got,
want)
+ }
+}
+
+// BenchmarkPCollection_Baseline measures the baseline of the node
benchmarking scaffold.
+//
+// On @lostluck's desktop (2020/02/20):
+// BenchmarkPCollection_Baseline-12 62186372 18.8 ns/op
0 B/op 0 allocs/op
+func BenchmarkPCollection_Baseline(b *testing.B) {
+ // Pre allocate the capture buffer and process buffer to avoid
+ // unnecessary overhead.
+ out := &CaptureNode{UID: 1, Elements: make([]FullValue, 0, b.N)}
+ process := make([]MainInput, 0, b.N)
+ for i := 0; i < b.N; i++ {
+ process = append(process, MainInput{Key: FullValue{
+ Windows: window.SingleGlobalWindow,
+ Timestamp: mtime.ZeroTimestamp,
+ Elm: 1,
+ }})
+ }
+ n := &FixedRoot{UID: 3, Elements: process, Out: out}
+ p, err := NewPlan("a", []Unit{n, out})
+ if err != nil {
+ b.Fatalf("failed to construct plan: %v", err)
+ }
+ b.ResetTimer()
+ if err := p.Execute(context.Background(), "1", DataContext{}); err !=
nil {
+ b.Fatalf("execute failed: %v", err)
+ }
+ if err := p.Down(context.Background()); err != nil {
+ b.Fatalf("down failed: %v", err)
+ }
+ if got, want := len(out.Elements), b.N; got != want {
+ b.Errorf("did not process all elements: got %v, want %v", got,
want)
+ }
+}
diff --git a/sdks/go/pkg/beam/core/runtime/exec/plan.go
b/sdks/go/pkg/beam/core/runtime/exec/plan.go
index fde9e7c..5275092 100644
--- a/sdks/go/pkg/beam/core/runtime/exec/plan.go
+++ b/sdks/go/pkg/beam/core/runtime/exec/plan.go
@@ -21,9 +21,7 @@ import (
"context"
"fmt"
"strings"
- "sync"
- "github.com/apache/beam/sdks/go/pkg/beam/core/metrics"
"github.com/apache/beam/sdks/go/pkg/beam/internal/errors"
)
@@ -31,34 +29,22 @@ import (
// from a part of a pipeline. A plan can be used to process multiple bundles
// serially.
type Plan struct {
- id string
- roots []Root
- units []Unit
- parDoIDs []string
+ id string // id of the bundle descriptor for this plan
+ roots []Root
+ units []Unit
+ pcols []*PCollection
status Status
- // While the store is threadsafe, the reference to it
- // is not, so we need to protect the store field to be
- // able to asynchronously provide tentative metrics.
- storeMu sync.Mutex
- store *metrics.Store
-
// TODO: there can be more than 1 DataSource in a bundle.
source *DataSource
}
-// hasPID provides a common interface for extracting PTransformIDs
-// from Units.
-type hasPID interface {
- GetPID() string
-}
-
// NewPlan returns a new bundle execution plan from the given units.
func NewPlan(id string, units []Unit) (*Plan, error) {
var roots []Root
+ var pcols []*PCollection
var source *DataSource
- var pardoIDs []string
for _, u := range units {
if u == nil {
@@ -70,8 +56,8 @@ func NewPlan(id string, units []Unit) (*Plan, error) {
if s, ok := u.(*DataSource); ok {
source = s
}
- if p, ok := u.(hasPID); ok {
- pardoIDs = append(pardoIDs, p.GetPID())
+ if p, ok := u.(*PCollection); ok {
+ pcols = append(pcols, p)
}
}
if len(roots) == 0 {
@@ -79,12 +65,12 @@ func NewPlan(id string, units []Unit) (*Plan, error) {
}
return &Plan{
- id: id,
- status: Initializing,
- roots: roots,
- units: units,
- parDoIDs: pardoIDs,
- source: source,
+ id: id,
+ status: Initializing,
+ roots: roots,
+ units: units,
+ pcols: pcols,
+ source: source,
}, nil
}
@@ -102,10 +88,6 @@ func (p *Plan) SourcePTransformID() string {
// are brought up on the first execution. If a bundle fails, the plan cannot
// be reused for further bundles. Does not panic. Blocking.
func (p *Plan) Execute(ctx context.Context, id string, manager DataContext)
error {
- ctx = metrics.SetBundleID(ctx, p.id)
- p.storeMu.Lock()
- p.store = metrics.GetStore(ctx)
- p.storeMu.Unlock()
if p.status == Initializing {
for _, u := range p.units {
if err := callNoPanic(ctx, u.Up); err != nil {
@@ -178,19 +160,28 @@ func (p *Plan) String() string {
return fmt.Sprintf("Plan[%v]:\n%v", p.ID(), strings.Join(units, "\n"))
}
-// Progress returns a snapshot of input progress of the plan, and associated
metrics.
-func (p *Plan) Progress() (ProgressReportSnapshot, bool) {
- if p.source != nil {
- return p.source.Progress(), true
- }
- return ProgressReportSnapshot{}, false
+// PlanSnapshot contains system metrics for the current run of the plan.
+type PlanSnapshot struct {
+ Source ProgressReportSnapshot
+ PCols []PCollectionSnapshot
}
-// Store returns the metric store for the last use of this plan.
-func (p *Plan) Store() *metrics.Store {
- p.storeMu.Lock()
- defer p.storeMu.Unlock()
- return p.store
+// Progress returns a snapshot of progress of the plan, and associated
metrics.
+// The retuend boolean indicates whether the plan includes a DataSource, which
is
+// important for handling legacy metrics. This boolean will be removed once
+// we no longer return legacy metrics.
+func (p *Plan) Progress() (PlanSnapshot, bool) {
+ pcolSnaps := make([]PCollectionSnapshot, 0, len(p.pcols)+1) // include
space for the datasource pcollection.
+ for _, pcol := range p.pcols {
+ pcolSnaps = append(pcolSnaps, pcol.snapshot())
+ }
+ snap := PlanSnapshot{PCols: pcolSnaps}
+ if p.source != nil {
+ snap.Source = p.source.Progress()
+ snap.PCols = append(pcolSnaps, snap.Source.pcol)
+ return snap, true
+ }
+ return snap, false
}
// SplitPoints captures the split requested by the Runner.
diff --git a/sdks/go/pkg/beam/core/runtime/exec/translate.go
b/sdks/go/pkg/beam/core/runtime/exec/translate.go
index 486dacf..b629560 100644
--- a/sdks/go/pkg/beam/core/runtime/exec/translate.go
+++ b/sdks/go/pkg/beam/core/runtime/exec/translate.go
@@ -17,6 +17,7 @@ package exec
import (
"fmt"
+ "math/rand"
"strconv"
"strings"
@@ -77,12 +78,16 @@ func UnmarshalPlan(desc *fnpb.ProcessBundleDescriptor)
(*Plan, error) {
for key, pid := range transform.GetOutputs() {
u.SID = StreamID{PtransformID: id, Port: port}
u.Name = key
- u.outputPID = pid
u.Out, err = b.makePCollection(pid)
if err != nil {
return nil, err
}
+ // Elide the PCollection Node for DataSources.
+ // DataSources can get byte samples directly, and can
handle CoGBKs.
+ u.PCol = *u.Out.(*PCollection)
+ u.Out = u.PCol.Out
+ b.units = b.units[:len(b.units)-1]
}
b.units = append(b.units, u)
@@ -98,8 +103,8 @@ type builder struct {
succ map[string][]linkID // PCollectionID -> []linkID
windowing map[string]*window.WindowingStrategy
- nodes map[string]Node // PCollectionID -> Node (cache)
- links map[linkID]Node // linkID -> Node (cache)
+ nodes map[string]*PCollection // PCollectionID -> Node (cache)
+ links map[linkID]Node // linkID -> Node (cache)
units []Unit // result
idgen *GenID
@@ -141,7 +146,7 @@ func newBuilder(desc *fnpb.ProcessBundleDescriptor)
(*builder, error) {
succ: succ,
windowing: make(map[string]*window.WindowingStrategy),
- nodes: make(map[string]Node),
+ nodes: make(map[string]*PCollection),
links: make(map[linkID]Node),
idgen: &GenID{},
@@ -258,7 +263,7 @@ func (b *builder) makeCoderForPCollection(id string)
(*coder.Coder, *coder.Windo
return c, wc, nil
}
-func (b *builder) makePCollection(id string) (Node, error) {
+func (b *builder) makePCollection(id string) (*PCollection, error) {
if n, exists := b.nodes[id]; exists {
return n, nil
}
@@ -273,8 +278,11 @@ func (b *builder) makePCollection(id string) (Node, error)
{
u = &Discard{UID: b.idgen.New()}
case 1:
- return b.makeLink(id, list[0])
-
+ out, err := b.makeLink(id, list[0])
+ if err != nil {
+ return nil, err
+ }
+ return b.newPCollectionNode(id, out)
default:
// Multiplex.
@@ -291,7 +299,16 @@ func (b *builder) makePCollection(id string) (Node, error)
{
b.units = append(b.units, u)
u = &Flatten{UID: b.idgen.New(), N: count, Out: u}
}
+ b.units = append(b.units, u)
+ return b.newPCollectionNode(id, u)
+}
+func (b *builder) newPCollectionNode(id string, out Node) (*PCollection,
error) {
+ ec, _, err := b.makeCoderForPCollection(id)
+ if err != nil {
+ return nil, err
+ }
+ u := &PCollection{UID: b.idgen.New(), Out: out, PColID: id, Coder: ec,
Seed: rand.Int63()}
b.nodes[id] = u
b.units = append(b.units, u)
return u, nil
diff --git a/sdks/go/pkg/beam/core/runtime/harness/harness.go
b/sdks/go/pkg/beam/core/runtime/harness/harness.go
index c2fce51..2ee8beb 100644
--- a/sdks/go/pkg/beam/core/runtime/harness/harness.go
+++ b/sdks/go/pkg/beam/core/runtime/harness/harness.go
@@ -23,6 +23,7 @@ import (
"sync"
"time"
+ "github.com/apache/beam/sdks/go/pkg/beam/core/metrics"
"github.com/apache/beam/sdks/go/pkg/beam/core/runtime/exec"
"github.com/apache/beam/sdks/go/pkg/beam/core/util/hooks"
"github.com/apache/beam/sdks/go/pkg/beam/internal/errors"
@@ -82,11 +83,12 @@ func Main(ctx context.Context, loggingEndpoint,
controlEndpoint string) error {
}()
ctrl := &control{
- plans: make(map[bundleDescriptorID]*exec.Plan),
- active: make(map[instructionID]*exec.Plan),
- failed: make(map[instructionID]error),
- data: &DataChannelManager{},
- state: &StateChannelManager{},
+ plans: make(map[bundleDescriptorID]*exec.Plan),
+ active: make(map[instructionID]*exec.Plan),
+ metStore: make(map[instructionID]*metrics.Store),
+ failed: make(map[instructionID]error),
+ data: &DataChannelManager{},
+ state: &StateChannelManager{},
}
// gRPC requires all readers of a stream be the same goroutine, so this
goroutine
@@ -142,6 +144,8 @@ type control struct {
// plans that are actively being executed.
// a plan can only be in one of these maps at any time.
active map[instructionID]*exec.Plan // protected by mu
+ // metric stores for active plans.
+ metStore map[instructionID]*metrics.Store // protected by mu
// plans that have failed during execution
failed map[instructionID]error // protected by mu
mu sync.Mutex
@@ -192,6 +196,10 @@ func (c *control) handleInstruction(ctx context.Context,
req *fnpb.InstructionRe
// since a plan can't be run concurrently.
c.active[instID] = plan
delete(c.plans, bdID)
+ // Get the user metrics store for this bundle.
+ ctx = metrics.SetBundleID(ctx, string(instID))
+ store := metrics.GetStore(ctx)
+ c.metStore[instID] = store
c.mu.Unlock()
if !ok {
@@ -204,7 +212,7 @@ func (c *control) handleInstruction(ctx context.Context,
req *fnpb.InstructionRe
data.Close()
state.Close()
- mets, mons := monitoring(plan)
+ mets, mons := monitoring(plan, store)
// Move the plan back to the candidate state
c.mu.Lock()
// Mark the instruction as failed.
@@ -213,6 +221,7 @@ func (c *control) handleInstruction(ctx context.Context,
req *fnpb.InstructionRe
}
c.plans[bdID] = plan
delete(c.active, instID)
+ delete(c.metStore, instID)
c.mu.Unlock()
if err != nil {
@@ -236,6 +245,7 @@ func (c *control) handleInstruction(ctx context.Context,
req *fnpb.InstructionRe
ref := instructionID(msg.GetInstructionId())
c.mu.Lock()
plan, ok := c.active[ref]
+ store, _ := c.metStore[ref]
err := c.failed[ref]
c.mu.Unlock()
if err != nil {
@@ -245,7 +255,7 @@ func (c *control) handleInstruction(ctx context.Context,
req *fnpb.InstructionRe
return fail(ctx, instID, "failed to return progress:
instruction %v not active", ref)
}
- mets, mons := monitoring(plan)
+ mets, mons := monitoring(plan, store)
return &fnpb.InstructionResponse{
InstructionId: string(instID),
diff --git a/sdks/go/pkg/beam/core/runtime/harness/monitoring.go
b/sdks/go/pkg/beam/core/runtime/harness/monitoring.go
index 14162dd..b2bc35d 100644
--- a/sdks/go/pkg/beam/core/runtime/harness/monitoring.go
+++ b/sdks/go/pkg/beam/core/runtime/harness/monitoring.go
@@ -25,11 +25,7 @@ import (
"github.com/golang/protobuf/ptypes"
)
-func monitoring(p *exec.Plan) (*fnpb.Metrics, []*ppb.MonitoringInfo) {
- store := p.Store()
- if store == nil {
- return nil, nil
- }
+func monitoring(p *exec.Plan, store *metrics.Store) (*fnpb.Metrics,
[]*ppb.MonitoringInfo) {
// Get the legacy style metrics.
transforms := make(map[string]*fnpb.Metrics_PTransform)
metrics.Extractor{
@@ -114,27 +110,44 @@ func monitoring(p *exec.Plan) (*fnpb.Metrics,
[]*ppb.MonitoringInfo) {
}.ExtractFrom(store)
// Get the execution monitoring information from the bundle plan.
- if snapshot, ok := p.Progress(); ok {
+ snapshot, hasSource := p.Progress()
+ if hasSource {
// Legacy version.
- transforms[snapshot.ID] = &fnpb.Metrics_PTransform{
+ transforms[snapshot.Source.ID] = &fnpb.Metrics_PTransform{
ProcessedElements:
&fnpb.Metrics_PTransform_ProcessedElements{
Measured: &fnpb.Metrics_PTransform_Measured{
OutputElementCounts: map[string]int64{
- snapshot.Name: snapshot.Count,
+ snapshot.Source.Name:
snapshot.Source.Count,
},
},
},
}
- // Monitoring info version.
+ }
+
+ // Monitoring info version.
+ for _, pcol := range snapshot.PCols {
monitoringInfo = append(monitoringInfo,
&ppb.MonitoringInfo{
Urn: "beam:metric:element_count:v1",
Type: "beam:metrics:sum_int_64",
Labels: map[string]string{
- "PCOLLECTION": snapshot.PID,
+ "PCOLLECTION": pcol.ID,
},
- Data: int64Counter(snapshot.Count),
+ Data: int64Counter(pcol.ElementCount),
})
+
+ // Skip pcollections without size
+ if pcol.SizeCount != 0 {
+ monitoringInfo = append(monitoringInfo,
+ &ppb.MonitoringInfo{
+ Urn:
"beam:metric:sampled_byte_size:v1",
+ Type:
"beam:metrics:distribution_int_64",
+ Labels: map[string]string{
+ "PCOLLECTION": pcol.ID,
+ },
+ Data: int64Distribution(pcol.SizeCount,
pcol.SizeSum, pcol.SizeMin, pcol.SizeMax),
+ })
+ }
}
return &fnpb.Metrics{
diff --git a/sdks/go/pkg/beam/runners/direct/direct.go
b/sdks/go/pkg/beam/runners/direct/direct.go
index eface25..4d78dc8 100644
--- a/sdks/go/pkg/beam/runners/direct/direct.go
+++ b/sdks/go/pkg/beam/runners/direct/direct.go
@@ -46,6 +46,7 @@ func Execute(ctx context.Context, p *beam.Pipeline) error {
log.Info(ctx, "Pipeline:")
log.Info(ctx, p)
+ ctx = metrics.SetBundleID(ctx, "direct") // Ensure a metrics.Store
exists.
if *jobopts.Strict {
log.Info(ctx, "Strict mode enabled, applying additional
validation.")
@@ -74,7 +75,7 @@ func Execute(ctx context.Context, p *beam.Pipeline) error {
}
// TODO(lostluck) 2020/01/24: What's the right way to expose the
// metrics store for the direct runner?
- metrics.DumpToLogFromStore(ctx, plan.Store())
+ metrics.DumpToLog(ctx)
return nil
}