This is an automated email from the ASF dual-hosted git repository.

lostluck pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new ded686a  [BEAM-6374] Emit PCollection metrics from GoSDK (#10942)
ded686a is described below

commit ded686a58ad4747e91a26d3e59f61019b641e655
Author: Robert Burke <[email protected]>
AuthorDate: Wed Mar 4 13:03:22 2020 -0800

    [BEAM-6374] Emit PCollection metrics from GoSDK (#10942)
---
 sdks/go/pkg/beam/core/runtime/exec/datasource.go   |  77 ++++++++---
 .../pkg/beam/core/runtime/exec/datasource_test.go  |  39 ++++--
 sdks/go/pkg/beam/core/runtime/exec/pcollection.go  | 153 ++++++++++++++++++++
 .../pkg/beam/core/runtime/exec/pcollection_test.go | 154 +++++++++++++++++++++
 sdks/go/pkg/beam/core/runtime/exec/plan.go         |  75 +++++-----
 sdks/go/pkg/beam/core/runtime/exec/translate.go    |  31 ++++-
 sdks/go/pkg/beam/core/runtime/harness/harness.go   |  24 +++-
 .../go/pkg/beam/core/runtime/harness/monitoring.go |  35 +++--
 sdks/go/pkg/beam/runners/direct/direct.go          |   3 +-
 9 files changed, 491 insertions(+), 100 deletions(-)

diff --git a/sdks/go/pkg/beam/core/runtime/exec/datasource.go 
b/sdks/go/pkg/beam/core/runtime/exec/datasource.go
index ccc4a09..fb20aea 100644
--- a/sdks/go/pkg/beam/core/runtime/exec/datasource.go
+++ b/sdks/go/pkg/beam/core/runtime/exec/datasource.go
@@ -36,14 +36,14 @@ type DataSource struct {
        Name  string
        Coder *coder.Coder
        Out   Node
+       PCol  PCollection // Handles size metrics. Value instead of pointer so 
it's initialized by default in tests.
 
        source DataManager
        state  StateReader
-       // TODO(lostluck) 2020/02/06: refactor to support more general 
PCollection metrics on nodes.
-       outputPID string // The index is the output count for the PCollection.
-       index     int64
-       splitIdx  int64
-       start     time.Time
+
+       index    int64
+       splitIdx int64
+       start    time.Time
 
        mu sync.Mutex
 }
@@ -70,6 +70,29 @@ func (n *DataSource) StartBundle(ctx context.Context, id 
string, data DataContex
        return n.Out.StartBundle(ctx, id, data)
 }
 
+// ByteCountReader is a passthrough reader that counts all the bytes read 
through it.
+// It trusts the nested reader to return accurate byte information.
+type byteCountReader struct {
+       count  *int
+       reader io.ReadCloser
+}
+
+func (r *byteCountReader) Read(p []byte) (int, error) {
+       n, err := r.reader.Read(p)
+       *r.count += n
+       return n, err
+}
+
+func (r *byteCountReader) Close() error {
+       return r.reader.Close()
+}
+
+func (r *byteCountReader) reset() int {
+       c := *r.count
+       *r.count = 0
+       return c
+}
+
 // Process opens the data source, reads and decodes data, kicking off element 
processing.
 func (n *DataSource) Process(ctx context.Context) error {
        r, err := n.source.OpenRead(ctx, n.SID)
@@ -77,6 +100,9 @@ func (n *DataSource) Process(ctx context.Context) error {
                return err
        }
        defer r.Close()
+       n.PCol.resetSize() // initialize the size distribution for this bundle.
+       var byteCount int
+       bcr := byteCountReader{reader: r, count: &byteCount}
 
        c := coder.SkipW(n.Coder)
        wc := MakeWindowDecoder(n.Coder.Window)
@@ -99,6 +125,7 @@ func (n *DataSource) Process(ctx context.Context) error {
                if n.incrementIndexAndCheckSplit() {
                        return nil
                }
+               // TODO(lostluck) 2020/02/22: Should we include window headers 
or just count the element sizes?
                ws, t, err := DecodeWindowedValueHeader(wc, r)
                if err != nil {
                        if err == io.EOF {
@@ -108,7 +135,7 @@ func (n *DataSource) Process(ctx context.Context) error {
                }
 
                // Decode key or parallel element.
-               pe, err := cp.Decode(r)
+               pe, err := cp.Decode(&bcr)
                if err != nil {
                        return errors.Wrap(err, "source decode failed")
                }
@@ -117,7 +144,7 @@ func (n *DataSource) Process(ctx context.Context) error {
 
                var valReStreams []ReStream
                for _, cv := range cvs {
-                       values, err := n.makeReStream(ctx, pe, cv, r)
+                       values, err := n.makeReStream(ctx, pe, cv, &bcr)
                        if err != nil {
                                return err
                        }
@@ -127,11 +154,15 @@ func (n *DataSource) Process(ctx context.Context) error {
                if err := n.Out.ProcessElement(ctx, pe, valReStreams...); err 
!= nil {
                        return err
                }
+               // Collect the actual size of the element, and reset the 
bytecounter reader.
+               n.PCol.addSize(int64(bcr.reset()))
+               bcr.reader = r
        }
 }
 
-func (n *DataSource) makeReStream(ctx context.Context, key *FullValue, cv 
ElementDecoder, r io.ReadCloser) (ReStream, error) {
-       size, err := coder.DecodeInt32(r)
+func (n *DataSource) makeReStream(ctx context.Context, key *FullValue, cv 
ElementDecoder, bcr *byteCountReader) (ReStream, error) {
+       // TODO(lostluck) 2020/02/22: Do we include the chunk size, or just the 
element sizes?
+       size, err := coder.DecodeInt32(bcr.reader)
        if err != nil {
                return nil, errors.Wrap(err, "stream size decoding failed")
        }
@@ -140,16 +171,16 @@ func (n *DataSource) makeReStream(ctx context.Context, 
key *FullValue, cv Elemen
        case size >= 0:
                // Single chunk streams are fully read in and buffered in 
memory.
                buf := make([]FullValue, 0, size)
-               buf, err = readStreamToBuffer(cv, r, int64(size), buf)
+               buf, err = readStreamToBuffer(cv, bcr, int64(size), buf)
                if err != nil {
                        return nil, err
                }
                return &FixedReStream{Buf: buf}, nil
-       case size == -1: // Shouldn't this be 0?
+       case size == -1:
                // Multi-chunked stream.
                var buf []FullValue
                for {
-                       chunk, err := coder.DecodeVarInt(r)
+                       chunk, err := coder.DecodeVarInt(bcr.reader)
                        if err != nil {
                                return nil, errors.Wrap(err, "stream chunk size 
decoding failed")
                        }
@@ -159,17 +190,17 @@ func (n *DataSource) makeReStream(ctx context.Context, 
key *FullValue, cv Elemen
                                return &FixedReStream{Buf: buf}, nil
                        case chunk > 0: // Non-zero chunk, read that many 
elements from the stream, and buffer them.
                                chunkBuf := make([]FullValue, 0, chunk)
-                               chunkBuf, err = readStreamToBuffer(cv, r, 
chunk, chunkBuf)
+                               chunkBuf, err = readStreamToBuffer(cv, bcr, 
chunk, chunkBuf)
                                if err != nil {
                                        return nil, err
                                }
                                buf = append(buf, chunkBuf...)
                        case chunk == -1: // State backed iterable!
-                               chunk, err := coder.DecodeVarInt(r)
+                               chunk, err := coder.DecodeVarInt(bcr.reader)
                                if err != nil {
                                        return nil, err
                                }
-                               token, err := ioutilx.ReadN(r, (int)(chunk))
+                               token, err := ioutilx.ReadN(bcr.reader, 
(int)(chunk))
                                if err != nil {
                                        return nil, err
                                }
@@ -181,6 +212,9 @@ func (n *DataSource) makeReStream(ctx context.Context, key 
*FullValue, cv Elemen
                                                        if err != nil {
                                                                return nil, err
                                                        }
+                                                       // We can't re-use the 
original bcr, since we may get new iterables,
+                                                       // or multiple of them 
at the same time, but we can re-use the count itself.
+                                                       r = 
&byteCountReader{reader: r, count: bcr.count}
                                                        return 
&elementStream{r: r, ec: cv}, nil
                                                },
                                        },
@@ -241,12 +275,11 @@ func (n *DataSource) incrementIndexAndCheckSplit() bool {
 }
 
 // ProgressReportSnapshot captures the progress reading an input source.
-//
-// TODO(lostluck) 2020/02/06: Add a visitor pattern for collecting progress
-// metrics from downstream Nodes.
 type ProgressReportSnapshot struct {
-       ID, Name, PID string
-       Count         int64
+       ID, Name string
+       Count    int64
+
+       pcol PCollectionSnapshot
 }
 
 // Progress returns a snapshot of the source's progress.
@@ -255,6 +288,7 @@ func (n *DataSource) Progress() ProgressReportSnapshot {
                return ProgressReportSnapshot{}
        }
        n.mu.Lock()
+       pcol := n.PCol.snapshot()
        // The count is the number of "completely processed elements"
        // which matches the index of the currently processing element.
        c := n.index
@@ -263,7 +297,8 @@ func (n *DataSource) Progress() ProgressReportSnapshot {
        if c < 0 {
                c = 0
        }
-       return ProgressReportSnapshot{PID: n.outputPID, ID: n.SID.PtransformID, 
Name: n.Name, Count: c}
+       pcol.ElementCount = c
+       return ProgressReportSnapshot{ID: n.SID.PtransformID, Name: n.Name, 
Count: c, pcol: pcol}
 }
 
 // Split takes a sorted set of potential split indices, selects and actuates
diff --git a/sdks/go/pkg/beam/core/runtime/exec/datasource_test.go 
b/sdks/go/pkg/beam/core/runtime/exec/datasource_test.go
index 1ce493c..3037c84 100644
--- a/sdks/go/pkg/beam/core/runtime/exec/datasource_test.go
+++ b/sdks/go/pkg/beam/core/runtime/exec/datasource_test.go
@@ -201,6 +201,16 @@ func TestDataSource_Iterators(t *testing.T) {
                        if got, want := iVals, expectedKeys; !equalList(got, 
want) {
                                t.Errorf("DataSource => %#v, want %#v", 
extractValues(got...), extractValues(want...))
                        }
+
+                       // We're using integers that encode to 1 byte, so do 
some quick math to validate.
+                       sizeOfSmallInt := 1
+                       snap := quickTestSnapshot(source, int64(len(test.keys)))
+                       snap.pcol.SizeSum = int64(len(test.keys) * (1 + 
len(test.vals)) * sizeOfSmallInt)
+                       snap.pcol.SizeMin = int64((1 + len(test.vals)) * 
sizeOfSmallInt)
+                       snap.pcol.SizeMax = int64((1 + len(test.vals)) * 
sizeOfSmallInt)
+                       if got, want := source.Progress(), snap; got != want {
+                               t.Errorf("progress didn't match: got %v, want 
%v", got, want)
+                       }
                })
        }
 }
@@ -358,15 +368,6 @@ func TestDataSource_Split(t *testing.T) {
                                })
 
                                validateSource(t, out, source, 
makeValues(test.expected...))
-
-                               // Adjust expectations to maximum number of 
elements.
-                               adjustedExpectation := test.splitIdx
-                               if adjustedExpectation > int64(len(elements)) {
-                                       adjustedExpectation = 
int64(len(elements))
-                               }
-                               if got, want := source.Progress().Count, 
adjustedExpectation; got != want {
-                                       t.Fatalf("progress didn't match split: 
got %v, want %v", got, want)
-                               }
                        })
                }
        })
@@ -464,13 +465,29 @@ func constructAndExecutePlanWithContext(t *testing.T, us 
[]Unit, dc DataContext)
        }
 }
 
+func quickTestSnapshot(source *DataSource, count int64) ProgressReportSnapshot 
{
+       return ProgressReportSnapshot{
+               Name:  source.Name,
+               ID:    source.SID.PtransformID,
+               Count: count,
+               pcol: PCollectionSnapshot{
+                       ElementCount: count,
+                       SizeCount:    count,
+                       SizeSum:      count,
+                       // We're only encoding small ints here, so size will 
only be 1.
+                       SizeMin: 1,
+                       SizeMax: 1,
+               },
+       }
+}
+
 func validateSource(t *testing.T, out *CaptureNode, source *DataSource, 
expected []FullValue) {
        t.Helper()
        if got, want := len(out.Elements), len(expected); got != want {
                t.Fatalf("lengths don't match: got %v, want %v", got, want)
        }
-       if got, want := source.Progress().Count, int64(len(expected)); got != 
want {
-               t.Fatalf("progress count didn't match: got %v, want %v", got, 
want)
+       if got, want := source.Progress(), quickTestSnapshot(source, 
int64(len(expected))); got != want {
+               t.Fatalf("progress snapshot didn't match: got %v, want %v", 
got, want)
        }
        if !equalList(out.Elements, expected) {
                t.Errorf("DataSource => %#v, want %#v", 
extractValues(out.Elements...), extractValues(expected...))
diff --git a/sdks/go/pkg/beam/core/runtime/exec/pcollection.go 
b/sdks/go/pkg/beam/core/runtime/exec/pcollection.go
new file mode 100644
index 0000000..d8fb024
--- /dev/null
+++ b/sdks/go/pkg/beam/core/runtime/exec/pcollection.go
@@ -0,0 +1,153 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package exec
+
+import (
+       "context"
+       "fmt"
+       "math"
+       "math/rand"
+       "sync"
+       "sync/atomic"
+
+       "github.com/apache/beam/sdks/go/pkg/beam/core/graph/coder"
+)
+
+// PCollection is a passthrough node to collect PCollection metrics, and
+// must be placed as the Out node of any producer of a PCollection.
+//
+// In particular, must not be placed after a Multiplex, and must be placed
+// after a Flatten.
+type PCollection struct {
+       UID    UnitID
+       PColID string
+       Out    Node // Out is the consumer of this PCollection.
+       Coder  *coder.Coder
+       Seed   int64
+
+       r             *rand.Rand
+       nextSampleIdx int64 // The index of the next value to sample.
+       elementCoder  ElementEncoder
+
+       elementCount                         int64 // must use atomic 
operations.
+       sizeMu                               sync.Mutex
+       sizeCount, sizeSum, sizeMin, sizeMax int64
+}
+
+// ID returns the debug id for this unit.
+func (p *PCollection) ID() UnitID {
+       return p.UID
+}
+
+// Up initializes the random sampling source and element encoder.
+func (p *PCollection) Up(ctx context.Context) error {
+       // dedicated rand source
+       p.r = rand.New(rand.NewSource(p.Seed))
+       p.elementCoder = MakeElementEncoder(p.Coder)
+       return nil
+}
+
+// StartBundle resets collected metrics for this PCollection, and propagates 
bundle start.
+func (p *PCollection) StartBundle(ctx context.Context, id string, data 
DataContext) error {
+       atomic.StoreInt64(&p.elementCount, 0)
+       p.nextSampleIdx = p.r.Int63n(3)
+       p.resetSize()
+       return MultiStartBundle(ctx, id, data, p.Out)
+}
+
+type byteCounter struct {
+       count int
+}
+
+func (w *byteCounter) Write(p []byte) (n int, err error) {
+       w.count += len(p)
+       return len(p), nil
+}
+
+// ProcessElement increments the element count and sometimes takes size 
samples of the elements.
+func (p *PCollection) ProcessElement(ctx context.Context, elm *FullValue, 
values ...ReStream) error {
+       cur := atomic.AddInt64(&p.elementCount, 1)
+       if cur == p.nextSampleIdx {
+               // We pick the next sampling index based on how large this 
pcollection already is.
+               // We don't want to necessarily wait until the pcollection has 
doubled, so we reduce the range.
+               // We don't want to always encode the first consecutive 
elements, so we add 2 to give some variance.
+               // Finally we add 1 no matter what, so that there's always the 
potential to trigger again.
+               // Otherwise, there's the potential for the random int to be 0, 
which means we don't change the
+               // nextSampleIdx at all.
+               p.nextSampleIdx = cur + p.r.Int63n(cur/10+2) + 1
+               var w byteCounter
+               p.elementCoder.Encode(elm, &w)
+               p.addSize(int64(w.count))
+       }
+       return p.Out.ProcessElement(ctx, elm, values...)
+}
+
+func (p *PCollection) addSize(size int64) {
+       p.sizeMu.Lock()
+       defer p.sizeMu.Unlock()
+       p.sizeCount++
+       p.sizeSum += size
+       if size > p.sizeMax {
+               p.sizeMax = size
+       }
+       if size < p.sizeMin {
+               p.sizeMin = size
+       }
+}
+
+func (p *PCollection) resetSize() {
+       p.sizeMu.Lock()
+       defer p.sizeMu.Unlock()
+       p.sizeCount = 0
+       p.sizeSum = 0
+       p.sizeMax = math.MinInt64
+       p.sizeMin = math.MaxInt64
+}
+
+// FinishBundle propagates bundle termination.
+func (p *PCollection) FinishBundle(ctx context.Context) error {
+       return MultiFinishBundle(ctx, p.Out)
+}
+
+// Down is a no-op.
+func (p *PCollection) Down(ctx context.Context) error {
+       return nil
+}
+
+func (p *PCollection) String() string {
+       return fmt.Sprintf("PCollection[%v] Out:%v", p.PColID, IDs(p.Out))
+}
+
+// PCollectionSnapshot contains the PCollectionID
+type PCollectionSnapshot struct {
+       ID           string
+       ElementCount int64
+       // If SizeCount is zero, then no size metrics should be exported.
+       SizeCount, SizeSum, SizeMin, SizeMax int64
+}
+
+func (p *PCollection) snapshot() PCollectionSnapshot {
+       p.sizeMu.Lock()
+       defer p.sizeMu.Unlock()
+       return PCollectionSnapshot{
+               ID:           p.PColID,
+               ElementCount: atomic.LoadInt64(&p.elementCount),
+               SizeCount:    p.sizeCount,
+               SizeSum:      p.sizeSum,
+               SizeMin:      p.sizeMin,
+               SizeMax:      p.sizeMax,
+       }
+}
diff --git a/sdks/go/pkg/beam/core/runtime/exec/pcollection_test.go 
b/sdks/go/pkg/beam/core/runtime/exec/pcollection_test.go
new file mode 100644
index 0000000..f7f24a7
--- /dev/null
+++ b/sdks/go/pkg/beam/core/runtime/exec/pcollection_test.go
@@ -0,0 +1,154 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package exec
+
+import (
+       "context"
+       "math"
+       "testing"
+
+       "github.com/apache/beam/sdks/go/pkg/beam/core/graph/coder"
+       "github.com/apache/beam/sdks/go/pkg/beam/core/graph/mtime"
+       "github.com/apache/beam/sdks/go/pkg/beam/core/graph/window"
+)
+
+// TestPCollection verifies that the PCollection node works correctly.
+// Seed is by default set to 0, so we have a "deterministic" set of
+// randomness for the samples.
+func TestPCollection(t *testing.T) {
+       a := &CaptureNode{UID: 1}
+       pcol := &PCollection{UID: 2, Out: a, Coder: coder.NewVarInt()}
+       // The "large" 2nd value is to ensure the values are encoded properly,
+       // and that Min & Max are behaving.
+       inputs := []interface{}{int64(1), int64(2000000000), int64(3)}
+       in := &FixedRoot{UID: 3, Elements: makeInput(inputs...), Out: pcol}
+
+       p, err := NewPlan("a", []Unit{a, pcol, in})
+       if err != nil {
+               t.Fatalf("failed to construct plan: %v", err)
+       }
+
+       if err := p.Execute(context.Background(), "1", DataContext{}); err != 
nil {
+               t.Fatalf("execute failed: %v", err)
+       }
+       if err := p.Down(context.Background()); err != nil {
+               t.Fatalf("down failed: %v", err)
+       }
+
+       expected := makeValues(inputs...)
+       if !equalList(a.Elements, expected) {
+               t.Errorf("multiplex returned %v for a, want %v", 
extractValues(a.Elements...), extractValues(expected...))
+       }
+       snap := pcol.snapshot()
+       if want, got := int64(len(expected)), snap.ElementCount; got != want {
+               t.Errorf("snapshot miscounted: got %v, want %v", got, want)
+       }
+       checkPCollectionSizeSample(t, snap, 2, 6, 1, 5)
+}
+
+func TestPCollection_sizeReset(t *testing.T) {
+       // Check the initial values after resetting.
+       var pcol PCollection
+       pcol.resetSize()
+       snap := pcol.snapshot()
+       checkPCollectionSizeSample(t, snap, 0, 0, math.MaxInt64, math.MinInt64)
+}
+
+func checkPCollectionSizeSample(t *testing.T, snap PCollectionSnapshot, count, 
sum, min, max int64) {
+       t.Helper()
+       if want, got := int64(count), snap.SizeCount; got != want {
+               t.Errorf("sample count incorrect: got %v, want %v", got, want)
+       }
+       if want, got := int64(sum), snap.SizeSum; got != want {
+               t.Errorf("sample sum incorrect: got %v, want %v", got, want)
+       }
+       if want, got := int64(min), snap.SizeMin; got != want {
+               t.Errorf("sample min incorrect: got %v, want %v", got, want)
+       }
+       if want, got := int64(max), snap.SizeMax; got != want {
+               t.Errorf("sample max incorrect: got %v, want %v", got, want)
+       }
+}
+
+// BenchmarkPCollection measures the overhead of invoking a ParDo in a plan.
+//
+// On @lostluck's desktop (2020/02/20):
+// BenchmarkPCollection-12                 44699806                24.8 ns/op  
           0 B/op          0 allocs/op
+func BenchmarkPCollection(b *testing.B) {
+       // Pre allocate the capture buffer and process buffer to avoid
+       // unnecessary overhead.
+       out := &CaptureNode{UID: 1, Elements: make([]FullValue, 0, b.N)}
+       process := make([]MainInput, 0, b.N)
+       for i := 0; i < b.N; i++ {
+               process = append(process, MainInput{Key: FullValue{
+                       Windows:   window.SingleGlobalWindow,
+                       Timestamp: mtime.ZeroTimestamp,
+                       Elm:       int64(1),
+               }})
+       }
+       pcol := &PCollection{UID: 2, Out: out, Coder: coder.NewVarInt()}
+       n := &FixedRoot{UID: 3, Elements: process, Out: pcol}
+       p, err := NewPlan("a", []Unit{n, pcol, out})
+       if err != nil {
+               b.Fatalf("failed to construct plan: %v", err)
+       }
+       b.ResetTimer()
+       if err := p.Execute(context.Background(), "1", DataContext{}); err != 
nil {
+               b.Fatalf("execute failed: %v", err)
+       }
+       if err := p.Down(context.Background()); err != nil {
+               b.Fatalf("down failed: %v", err)
+       }
+       if got, want := pcol.snapshot().ElementCount, int64(b.N); got != want {
+               b.Errorf("did not process all elements: got %v, want %v", got, 
want)
+       }
+       if got, want := len(out.Elements), b.N; got != want {
+               b.Errorf("did not process all elements: got %v, want %v", got, 
want)
+       }
+}
+
+// BenchmarkPCollection_Baseline measures the baseline of the node 
benchmarking scaffold.
+//
+// On @lostluck's desktop (2020/02/20):
+// BenchmarkPCollection_Baseline-12        62186372                18.8 ns/op  
           0 B/op          0 allocs/op
+func BenchmarkPCollection_Baseline(b *testing.B) {
+       // Pre allocate the capture buffer and process buffer to avoid
+       // unnecessary overhead.
+       out := &CaptureNode{UID: 1, Elements: make([]FullValue, 0, b.N)}
+       process := make([]MainInput, 0, b.N)
+       for i := 0; i < b.N; i++ {
+               process = append(process, MainInput{Key: FullValue{
+                       Windows:   window.SingleGlobalWindow,
+                       Timestamp: mtime.ZeroTimestamp,
+                       Elm:       1,
+               }})
+       }
+       n := &FixedRoot{UID: 3, Elements: process, Out: out}
+       p, err := NewPlan("a", []Unit{n, out})
+       if err != nil {
+               b.Fatalf("failed to construct plan: %v", err)
+       }
+       b.ResetTimer()
+       if err := p.Execute(context.Background(), "1", DataContext{}); err != 
nil {
+               b.Fatalf("execute failed: %v", err)
+       }
+       if err := p.Down(context.Background()); err != nil {
+               b.Fatalf("down failed: %v", err)
+       }
+       if got, want := len(out.Elements), b.N; got != want {
+               b.Errorf("did not process all elements: got %v, want %v", got, 
want)
+       }
+}
diff --git a/sdks/go/pkg/beam/core/runtime/exec/plan.go 
b/sdks/go/pkg/beam/core/runtime/exec/plan.go
index fde9e7c..5275092 100644
--- a/sdks/go/pkg/beam/core/runtime/exec/plan.go
+++ b/sdks/go/pkg/beam/core/runtime/exec/plan.go
@@ -21,9 +21,7 @@ import (
        "context"
        "fmt"
        "strings"
-       "sync"
 
-       "github.com/apache/beam/sdks/go/pkg/beam/core/metrics"
        "github.com/apache/beam/sdks/go/pkg/beam/internal/errors"
 )
 
@@ -31,34 +29,22 @@ import (
 // from a part of a pipeline. A plan can be used to process multiple bundles
 // serially.
 type Plan struct {
-       id       string
-       roots    []Root
-       units    []Unit
-       parDoIDs []string
+       id    string // id of the bundle descriptor for this plan
+       roots []Root
+       units []Unit
+       pcols []*PCollection
 
        status Status
 
-       // While the store is threadsafe, the reference to it
-       // is not, so we need to protect the store field to be
-       // able to asynchronously provide tentative metrics.
-       storeMu sync.Mutex
-       store   *metrics.Store
-
        // TODO: there can be more than 1 DataSource in a bundle.
        source *DataSource
 }
 
-// hasPID provides a common interface for extracting PTransformIDs
-// from Units.
-type hasPID interface {
-       GetPID() string
-}
-
 // NewPlan returns a new bundle execution plan from the given units.
 func NewPlan(id string, units []Unit) (*Plan, error) {
        var roots []Root
+       var pcols []*PCollection
        var source *DataSource
-       var pardoIDs []string
 
        for _, u := range units {
                if u == nil {
@@ -70,8 +56,8 @@ func NewPlan(id string, units []Unit) (*Plan, error) {
                if s, ok := u.(*DataSource); ok {
                        source = s
                }
-               if p, ok := u.(hasPID); ok {
-                       pardoIDs = append(pardoIDs, p.GetPID())
+               if p, ok := u.(*PCollection); ok {
+                       pcols = append(pcols, p)
                }
        }
        if len(roots) == 0 {
@@ -79,12 +65,12 @@ func NewPlan(id string, units []Unit) (*Plan, error) {
        }
 
        return &Plan{
-               id:       id,
-               status:   Initializing,
-               roots:    roots,
-               units:    units,
-               parDoIDs: pardoIDs,
-               source:   source,
+               id:     id,
+               status: Initializing,
+               roots:  roots,
+               units:  units,
+               pcols:  pcols,
+               source: source,
        }, nil
 }
 
@@ -102,10 +88,6 @@ func (p *Plan) SourcePTransformID() string {
 // are brought up on the first execution. If a bundle fails, the plan cannot
 // be reused for further bundles. Does not panic. Blocking.
 func (p *Plan) Execute(ctx context.Context, id string, manager DataContext) 
error {
-       ctx = metrics.SetBundleID(ctx, p.id)
-       p.storeMu.Lock()
-       p.store = metrics.GetStore(ctx)
-       p.storeMu.Unlock()
        if p.status == Initializing {
                for _, u := range p.units {
                        if err := callNoPanic(ctx, u.Up); err != nil {
@@ -178,19 +160,28 @@ func (p *Plan) String() string {
        return fmt.Sprintf("Plan[%v]:\n%v", p.ID(), strings.Join(units, "\n"))
 }
 
-// Progress returns a snapshot of input progress of the plan, and associated 
metrics.
-func (p *Plan) Progress() (ProgressReportSnapshot, bool) {
-       if p.source != nil {
-               return p.source.Progress(), true
-       }
-       return ProgressReportSnapshot{}, false
+// PlanSnapshot contains system metrics for the current run of the plan.
+type PlanSnapshot struct {
+       Source ProgressReportSnapshot
+       PCols  []PCollectionSnapshot
 }
 
-// Store returns the metric store for the last use of this plan.
-func (p *Plan) Store() *metrics.Store {
-       p.storeMu.Lock()
-       defer p.storeMu.Unlock()
-       return p.store
+// Progress returns a snapshot of progress of the plan, and associated 
metrics. 
+// The retuend boolean indicates whether the plan includes a DataSource, which 
is
+// important for handling legacy metrics. This boolean will be removed once
+// we no longer return legacy metrics.
+func (p *Plan) Progress() (PlanSnapshot, bool) {
+       pcolSnaps := make([]PCollectionSnapshot, 0, len(p.pcols)+1) // include 
space for the datasource pcollection.
+       for _, pcol := range p.pcols {
+               pcolSnaps = append(pcolSnaps, pcol.snapshot())
+       }
+       snap := PlanSnapshot{PCols: pcolSnaps}
+       if p.source != nil {
+               snap.Source = p.source.Progress()
+               snap.PCols = append(pcolSnaps, snap.Source.pcol)
+               return snap, true
+       }
+       return snap, false
 }
 
 // SplitPoints captures the split requested by the Runner.
diff --git a/sdks/go/pkg/beam/core/runtime/exec/translate.go 
b/sdks/go/pkg/beam/core/runtime/exec/translate.go
index 486dacf..b629560 100644
--- a/sdks/go/pkg/beam/core/runtime/exec/translate.go
+++ b/sdks/go/pkg/beam/core/runtime/exec/translate.go
@@ -17,6 +17,7 @@ package exec
 
 import (
        "fmt"
+       "math/rand"
        "strconv"
        "strings"
 
@@ -77,12 +78,16 @@ func UnmarshalPlan(desc *fnpb.ProcessBundleDescriptor) 
(*Plan, error) {
                for key, pid := range transform.GetOutputs() {
                        u.SID = StreamID{PtransformID: id, Port: port}
                        u.Name = key
-                       u.outputPID = pid
 
                        u.Out, err = b.makePCollection(pid)
                        if err != nil {
                                return nil, err
                        }
+                       // Elide the PCollection Node for DataSources.
+                       // DataSources can get byte samples directly, and can 
handle CoGBKs.
+                       u.PCol = *u.Out.(*PCollection)
+                       u.Out = u.PCol.Out
+                       b.units = b.units[:len(b.units)-1]
                }
 
                b.units = append(b.units, u)
@@ -98,8 +103,8 @@ type builder struct {
        succ map[string][]linkID // PCollectionID -> []linkID
 
        windowing map[string]*window.WindowingStrategy
-       nodes     map[string]Node // PCollectionID -> Node (cache)
-       links     map[linkID]Node // linkID -> Node (cache)
+       nodes     map[string]*PCollection // PCollectionID -> Node (cache)
+       links     map[linkID]Node         // linkID -> Node (cache)
 
        units []Unit // result
        idgen *GenID
@@ -141,7 +146,7 @@ func newBuilder(desc *fnpb.ProcessBundleDescriptor) 
(*builder, error) {
                succ: succ,
 
                windowing: make(map[string]*window.WindowingStrategy),
-               nodes:     make(map[string]Node),
+               nodes:     make(map[string]*PCollection),
                links:     make(map[linkID]Node),
 
                idgen: &GenID{},
@@ -258,7 +263,7 @@ func (b *builder) makeCoderForPCollection(id string) 
(*coder.Coder, *coder.Windo
        return c, wc, nil
 }
 
-func (b *builder) makePCollection(id string) (Node, error) {
+func (b *builder) makePCollection(id string) (*PCollection, error) {
        if n, exists := b.nodes[id]; exists {
                return n, nil
        }
@@ -273,8 +278,11 @@ func (b *builder) makePCollection(id string) (Node, error) 
{
                u = &Discard{UID: b.idgen.New()}
 
        case 1:
-               return b.makeLink(id, list[0])
-
+               out, err := b.makeLink(id, list[0])
+               if err != nil {
+                       return nil, err
+               }
+               return b.newPCollectionNode(id, out)
        default:
                // Multiplex.
 
@@ -291,7 +299,16 @@ func (b *builder) makePCollection(id string) (Node, error) 
{
                b.units = append(b.units, u)
                u = &Flatten{UID: b.idgen.New(), N: count, Out: u}
        }
+       b.units = append(b.units, u)
+       return b.newPCollectionNode(id, u)
+}
 
+func (b *builder) newPCollectionNode(id string, out Node) (*PCollection, 
error) {
+       ec, _, err := b.makeCoderForPCollection(id)
+       if err != nil {
+               return nil, err
+       }
+       u := &PCollection{UID: b.idgen.New(), Out: out, PColID: id, Coder: ec, 
Seed: rand.Int63()}
        b.nodes[id] = u
        b.units = append(b.units, u)
        return u, nil
diff --git a/sdks/go/pkg/beam/core/runtime/harness/harness.go 
b/sdks/go/pkg/beam/core/runtime/harness/harness.go
index c2fce51..2ee8beb 100644
--- a/sdks/go/pkg/beam/core/runtime/harness/harness.go
+++ b/sdks/go/pkg/beam/core/runtime/harness/harness.go
@@ -23,6 +23,7 @@ import (
        "sync"
        "time"
 
+       "github.com/apache/beam/sdks/go/pkg/beam/core/metrics"
        "github.com/apache/beam/sdks/go/pkg/beam/core/runtime/exec"
        "github.com/apache/beam/sdks/go/pkg/beam/core/util/hooks"
        "github.com/apache/beam/sdks/go/pkg/beam/internal/errors"
@@ -82,11 +83,12 @@ func Main(ctx context.Context, loggingEndpoint, 
controlEndpoint string) error {
        }()
 
        ctrl := &control{
-               plans:  make(map[bundleDescriptorID]*exec.Plan),
-               active: make(map[instructionID]*exec.Plan),
-               failed: make(map[instructionID]error),
-               data:   &DataChannelManager{},
-               state:  &StateChannelManager{},
+               plans:    make(map[bundleDescriptorID]*exec.Plan),
+               active:   make(map[instructionID]*exec.Plan),
+               metStore: make(map[instructionID]*metrics.Store),
+               failed:   make(map[instructionID]error),
+               data:     &DataChannelManager{},
+               state:    &StateChannelManager{},
        }
 
        // gRPC requires all readers of a stream be the same goroutine, so this 
goroutine
@@ -142,6 +144,8 @@ type control struct {
        // plans that are actively being executed.
        // a plan can only be in one of these maps at any time.
        active map[instructionID]*exec.Plan // protected by mu
+       // metric stores for active plans.
+       metStore map[instructionID]*metrics.Store // protected by mu
        // plans that have failed during execution
        failed map[instructionID]error // protected by mu
        mu     sync.Mutex
@@ -192,6 +196,10 @@ func (c *control) handleInstruction(ctx context.Context, 
req *fnpb.InstructionRe
                // since a plan can't be run concurrently.
                c.active[instID] = plan
                delete(c.plans, bdID)
+               // Get the user metrics store for this bundle.
+               ctx = metrics.SetBundleID(ctx, string(instID))
+               store := metrics.GetStore(ctx)
+               c.metStore[instID] = store
                c.mu.Unlock()
 
                if !ok {
@@ -204,7 +212,7 @@ func (c *control) handleInstruction(ctx context.Context, 
req *fnpb.InstructionRe
                data.Close()
                state.Close()
 
-               mets, mons := monitoring(plan)
+               mets, mons := monitoring(plan, store)
                // Move the plan back to the candidate state
                c.mu.Lock()
                // Mark the instruction as failed.
@@ -213,6 +221,7 @@ func (c *control) handleInstruction(ctx context.Context, 
req *fnpb.InstructionRe
                }
                c.plans[bdID] = plan
                delete(c.active, instID)
+               delete(c.metStore, instID)
                c.mu.Unlock()
 
                if err != nil {
@@ -236,6 +245,7 @@ func (c *control) handleInstruction(ctx context.Context, 
req *fnpb.InstructionRe
                ref := instructionID(msg.GetInstructionId())
                c.mu.Lock()
                plan, ok := c.active[ref]
+               store, _ := c.metStore[ref]
                err := c.failed[ref]
                c.mu.Unlock()
                if err != nil {
@@ -245,7 +255,7 @@ func (c *control) handleInstruction(ctx context.Context, 
req *fnpb.InstructionRe
                        return fail(ctx, instID, "failed to return progress: 
instruction %v not active", ref)
                }
 
-               mets, mons := monitoring(plan)
+               mets, mons := monitoring(plan, store)
 
                return &fnpb.InstructionResponse{
                        InstructionId: string(instID),
diff --git a/sdks/go/pkg/beam/core/runtime/harness/monitoring.go 
b/sdks/go/pkg/beam/core/runtime/harness/monitoring.go
index 14162dd..b2bc35d 100644
--- a/sdks/go/pkg/beam/core/runtime/harness/monitoring.go
+++ b/sdks/go/pkg/beam/core/runtime/harness/monitoring.go
@@ -25,11 +25,7 @@ import (
        "github.com/golang/protobuf/ptypes"
 )
 
-func monitoring(p *exec.Plan) (*fnpb.Metrics, []*ppb.MonitoringInfo) {
-       store := p.Store()
-       if store == nil {
-               return nil, nil
-       }
+func monitoring(p *exec.Plan, store *metrics.Store) (*fnpb.Metrics, 
[]*ppb.MonitoringInfo) {
        // Get the legacy style metrics.
        transforms := make(map[string]*fnpb.Metrics_PTransform)
        metrics.Extractor{
@@ -114,27 +110,44 @@ func monitoring(p *exec.Plan) (*fnpb.Metrics, 
[]*ppb.MonitoringInfo) {
        }.ExtractFrom(store)
 
        // Get the execution monitoring information from the bundle plan.
-       if snapshot, ok := p.Progress(); ok {
+       snapshot, hasSource := p.Progress()
+       if hasSource {
                // Legacy version.
-               transforms[snapshot.ID] = &fnpb.Metrics_PTransform{
+               transforms[snapshot.Source.ID] = &fnpb.Metrics_PTransform{
                        ProcessedElements: 
&fnpb.Metrics_PTransform_ProcessedElements{
                                Measured: &fnpb.Metrics_PTransform_Measured{
                                        OutputElementCounts: map[string]int64{
-                                               snapshot.Name: snapshot.Count,
+                                               snapshot.Source.Name: 
snapshot.Source.Count,
                                        },
                                },
                        },
                }
-               // Monitoring info version.
+       }
+
+       // Monitoring info version.
+       for _, pcol := range snapshot.PCols {
                monitoringInfo = append(monitoringInfo,
                        &ppb.MonitoringInfo{
                                Urn:  "beam:metric:element_count:v1",
                                Type: "beam:metrics:sum_int_64",
                                Labels: map[string]string{
-                                       "PCOLLECTION": snapshot.PID,
+                                       "PCOLLECTION": pcol.ID,
                                },
-                               Data: int64Counter(snapshot.Count),
+                               Data: int64Counter(pcol.ElementCount),
                        })
+
+               // Skip pcollections without size
+               if pcol.SizeCount != 0 {
+                       monitoringInfo = append(monitoringInfo,
+                               &ppb.MonitoringInfo{
+                                       Urn:  
"beam:metric:sampled_byte_size:v1",
+                                       Type: 
"beam:metrics:distribution_int_64",
+                                       Labels: map[string]string{
+                                               "PCOLLECTION": pcol.ID,
+                                       },
+                                       Data: int64Distribution(pcol.SizeCount, 
pcol.SizeSum, pcol.SizeMin, pcol.SizeMax),
+                               })
+               }
        }
 
        return &fnpb.Metrics{
diff --git a/sdks/go/pkg/beam/runners/direct/direct.go 
b/sdks/go/pkg/beam/runners/direct/direct.go
index eface25..4d78dc8 100644
--- a/sdks/go/pkg/beam/runners/direct/direct.go
+++ b/sdks/go/pkg/beam/runners/direct/direct.go
@@ -46,6 +46,7 @@ func Execute(ctx context.Context, p *beam.Pipeline) error {
 
        log.Info(ctx, "Pipeline:")
        log.Info(ctx, p)
+       ctx = metrics.SetBundleID(ctx, "direct") // Ensure a metrics.Store 
exists.
 
        if *jobopts.Strict {
                log.Info(ctx, "Strict mode enabled, applying additional 
validation.")
@@ -74,7 +75,7 @@ func Execute(ctx context.Context, p *beam.Pipeline) error {
        }
        // TODO(lostluck) 2020/01/24: What's the right way to expose the
        // metrics store for the direct runner?
-       metrics.DumpToLogFromStore(ctx, plan.Store())
+       metrics.DumpToLog(ctx)
        return nil
 }
 

Reply via email to