This is an automated email from the ASF dual-hosted git repository.

lostluck pushed a commit to branch revert-10942-metrics7
in repository https://gitbox.apache.org/repos/asf/beam.git

commit 1bcaaaa0ae840af38ed9ba523d08180e252787ee
Author: Robert Burke <[email protected]>
AuthorDate: Thu Mar 5 16:47:33 2020 -0800

    Revert "[BEAM-6374] Emit PCollection metrics from GoSDK (#10942)"
    
    This reverts commit ded686a58ad4747e91a26d3e59f61019b641e655.
---
 sdks/go/pkg/beam/core/runtime/exec/datasource.go   |  77 +++--------
 .../pkg/beam/core/runtime/exec/datasource_test.go  |  39 ++----
 sdks/go/pkg/beam/core/runtime/exec/pcollection.go  | 153 --------------------
 .../pkg/beam/core/runtime/exec/pcollection_test.go | 154 ---------------------
 sdks/go/pkg/beam/core/runtime/exec/plan.go         |  75 +++++-----
 sdks/go/pkg/beam/core/runtime/exec/translate.go    |  31 +----
 sdks/go/pkg/beam/core/runtime/harness/harness.go   |  24 +---
 .../go/pkg/beam/core/runtime/harness/monitoring.go |  35 ++---
 sdks/go/pkg/beam/runners/direct/direct.go          |   3 +-
 9 files changed, 100 insertions(+), 491 deletions(-)

diff --git a/sdks/go/pkg/beam/core/runtime/exec/datasource.go 
b/sdks/go/pkg/beam/core/runtime/exec/datasource.go
index fb20aea..ccc4a09 100644
--- a/sdks/go/pkg/beam/core/runtime/exec/datasource.go
+++ b/sdks/go/pkg/beam/core/runtime/exec/datasource.go
@@ -36,14 +36,14 @@ type DataSource struct {
        Name  string
        Coder *coder.Coder
        Out   Node
-       PCol  PCollection // Handles size metrics. Value instead of pointer so 
it's initialized by default in tests.
 
        source DataManager
        state  StateReader
-
-       index    int64
-       splitIdx int64
-       start    time.Time
+       // TODO(lostluck) 2020/02/06: refactor to support more general 
PCollection metrics on nodes.
+       outputPID string // The index is the output count for the PCollection.
+       index     int64
+       splitIdx  int64
+       start     time.Time
 
        mu sync.Mutex
 }
@@ -70,29 +70,6 @@ func (n *DataSource) StartBundle(ctx context.Context, id 
string, data DataContex
        return n.Out.StartBundle(ctx, id, data)
 }
 
-// ByteCountReader is a passthrough reader that counts all the bytes read 
through it.
-// It trusts the nested reader to return accurate byte information.
-type byteCountReader struct {
-       count  *int
-       reader io.ReadCloser
-}
-
-func (r *byteCountReader) Read(p []byte) (int, error) {
-       n, err := r.reader.Read(p)
-       *r.count += n
-       return n, err
-}
-
-func (r *byteCountReader) Close() error {
-       return r.reader.Close()
-}
-
-func (r *byteCountReader) reset() int {
-       c := *r.count
-       *r.count = 0
-       return c
-}
-
 // Process opens the data source, reads and decodes data, kicking off element 
processing.
 func (n *DataSource) Process(ctx context.Context) error {
        r, err := n.source.OpenRead(ctx, n.SID)
@@ -100,9 +77,6 @@ func (n *DataSource) Process(ctx context.Context) error {
                return err
        }
        defer r.Close()
-       n.PCol.resetSize() // initialize the size distribution for this bundle.
-       var byteCount int
-       bcr := byteCountReader{reader: r, count: &byteCount}
 
        c := coder.SkipW(n.Coder)
        wc := MakeWindowDecoder(n.Coder.Window)
@@ -125,7 +99,6 @@ func (n *DataSource) Process(ctx context.Context) error {
                if n.incrementIndexAndCheckSplit() {
                        return nil
                }
-               // TODO(lostluck) 2020/02/22: Should we include window headers 
or just count the element sizes?
                ws, t, err := DecodeWindowedValueHeader(wc, r)
                if err != nil {
                        if err == io.EOF {
@@ -135,7 +108,7 @@ func (n *DataSource) Process(ctx context.Context) error {
                }
 
                // Decode key or parallel element.
-               pe, err := cp.Decode(&bcr)
+               pe, err := cp.Decode(r)
                if err != nil {
                        return errors.Wrap(err, "source decode failed")
                }
@@ -144,7 +117,7 @@ func (n *DataSource) Process(ctx context.Context) error {
 
                var valReStreams []ReStream
                for _, cv := range cvs {
-                       values, err := n.makeReStream(ctx, pe, cv, &bcr)
+                       values, err := n.makeReStream(ctx, pe, cv, r)
                        if err != nil {
                                return err
                        }
@@ -154,15 +127,11 @@ func (n *DataSource) Process(ctx context.Context) error {
                if err := n.Out.ProcessElement(ctx, pe, valReStreams...); err 
!= nil {
                        return err
                }
-               // Collect the actual size of the element, and reset the 
bytecounter reader.
-               n.PCol.addSize(int64(bcr.reset()))
-               bcr.reader = r
        }
 }
 
-func (n *DataSource) makeReStream(ctx context.Context, key *FullValue, cv 
ElementDecoder, bcr *byteCountReader) (ReStream, error) {
-       // TODO(lostluck) 2020/02/22: Do we include the chunk size, or just the 
element sizes?
-       size, err := coder.DecodeInt32(bcr.reader)
+func (n *DataSource) makeReStream(ctx context.Context, key *FullValue, cv 
ElementDecoder, r io.ReadCloser) (ReStream, error) {
+       size, err := coder.DecodeInt32(r)
        if err != nil {
                return nil, errors.Wrap(err, "stream size decoding failed")
        }
@@ -171,16 +140,16 @@ func (n *DataSource) makeReStream(ctx context.Context, 
key *FullValue, cv Elemen
        case size >= 0:
                // Single chunk streams are fully read in and buffered in 
memory.
                buf := make([]FullValue, 0, size)
-               buf, err = readStreamToBuffer(cv, bcr, int64(size), buf)
+               buf, err = readStreamToBuffer(cv, r, int64(size), buf)
                if err != nil {
                        return nil, err
                }
                return &FixedReStream{Buf: buf}, nil
-       case size == -1:
+       case size == -1: // Shouldn't this be 0?
                // Multi-chunked stream.
                var buf []FullValue
                for {
-                       chunk, err := coder.DecodeVarInt(bcr.reader)
+                       chunk, err := coder.DecodeVarInt(r)
                        if err != nil {
                                return nil, errors.Wrap(err, "stream chunk size 
decoding failed")
                        }
@@ -190,17 +159,17 @@ func (n *DataSource) makeReStream(ctx context.Context, 
key *FullValue, cv Elemen
                                return &FixedReStream{Buf: buf}, nil
                        case chunk > 0: // Non-zero chunk, read that many 
elements from the stream, and buffer them.
                                chunkBuf := make([]FullValue, 0, chunk)
-                               chunkBuf, err = readStreamToBuffer(cv, bcr, 
chunk, chunkBuf)
+                               chunkBuf, err = readStreamToBuffer(cv, r, 
chunk, chunkBuf)
                                if err != nil {
                                        return nil, err
                                }
                                buf = append(buf, chunkBuf...)
                        case chunk == -1: // State backed iterable!
-                               chunk, err := coder.DecodeVarInt(bcr.reader)
+                               chunk, err := coder.DecodeVarInt(r)
                                if err != nil {
                                        return nil, err
                                }
-                               token, err := ioutilx.ReadN(bcr.reader, 
(int)(chunk))
+                               token, err := ioutilx.ReadN(r, (int)(chunk))
                                if err != nil {
                                        return nil, err
                                }
@@ -212,9 +181,6 @@ func (n *DataSource) makeReStream(ctx context.Context, key 
*FullValue, cv Elemen
                                                        if err != nil {
                                                                return nil, err
                                                        }
-                                                       // We can't re-use the 
original bcr, since we may get new iterables,
-                                                       // or multiple of them 
at the same time, but we can re-use the count itself.
-                                                       r = 
&byteCountReader{reader: r, count: bcr.count}
                                                        return 
&elementStream{r: r, ec: cv}, nil
                                                },
                                        },
@@ -275,11 +241,12 @@ func (n *DataSource) incrementIndexAndCheckSplit() bool {
 }
 
 // ProgressReportSnapshot captures the progress reading an input source.
+//
+// TODO(lostluck) 2020/02/06: Add a visitor pattern for collecting progress
+// metrics from downstream Nodes.
 type ProgressReportSnapshot struct {
-       ID, Name string
-       Count    int64
-
-       pcol PCollectionSnapshot
+       ID, Name, PID string
+       Count         int64
 }
 
 // Progress returns a snapshot of the source's progress.
@@ -288,7 +255,6 @@ func (n *DataSource) Progress() ProgressReportSnapshot {
                return ProgressReportSnapshot{}
        }
        n.mu.Lock()
-       pcol := n.PCol.snapshot()
        // The count is the number of "completely processed elements"
        // which matches the index of the currently processing element.
        c := n.index
@@ -297,8 +263,7 @@ func (n *DataSource) Progress() ProgressReportSnapshot {
        if c < 0 {
                c = 0
        }
-       pcol.ElementCount = c
-       return ProgressReportSnapshot{ID: n.SID.PtransformID, Name: n.Name, 
Count: c, pcol: pcol}
+       return ProgressReportSnapshot{PID: n.outputPID, ID: n.SID.PtransformID, 
Name: n.Name, Count: c}
 }
 
 // Split takes a sorted set of potential split indices, selects and actuates
diff --git a/sdks/go/pkg/beam/core/runtime/exec/datasource_test.go 
b/sdks/go/pkg/beam/core/runtime/exec/datasource_test.go
index 3037c84..1ce493c 100644
--- a/sdks/go/pkg/beam/core/runtime/exec/datasource_test.go
+++ b/sdks/go/pkg/beam/core/runtime/exec/datasource_test.go
@@ -201,16 +201,6 @@ func TestDataSource_Iterators(t *testing.T) {
                        if got, want := iVals, expectedKeys; !equalList(got, 
want) {
                                t.Errorf("DataSource => %#v, want %#v", 
extractValues(got...), extractValues(want...))
                        }
-
-                       // We're using integers that encode to 1 byte, so do 
some quick math to validate.
-                       sizeOfSmallInt := 1
-                       snap := quickTestSnapshot(source, int64(len(test.keys)))
-                       snap.pcol.SizeSum = int64(len(test.keys) * (1 + 
len(test.vals)) * sizeOfSmallInt)
-                       snap.pcol.SizeMin = int64((1 + len(test.vals)) * 
sizeOfSmallInt)
-                       snap.pcol.SizeMax = int64((1 + len(test.vals)) * 
sizeOfSmallInt)
-                       if got, want := source.Progress(), snap; got != want {
-                               t.Errorf("progress didn't match: got %v, want 
%v", got, want)
-                       }
                })
        }
 }
@@ -368,6 +358,15 @@ func TestDataSource_Split(t *testing.T) {
                                })
 
                                validateSource(t, out, source, 
makeValues(test.expected...))
+
+                               // Adjust expectations to maximum number of 
elements.
+                               adjustedExpectation := test.splitIdx
+                               if adjustedExpectation > int64(len(elements)) {
+                                       adjustedExpectation = 
int64(len(elements))
+                               }
+                               if got, want := source.Progress().Count, 
adjustedExpectation; got != want {
+                                       t.Fatalf("progress didn't match split: 
got %v, want %v", got, want)
+                               }
                        })
                }
        })
@@ -465,29 +464,13 @@ func constructAndExecutePlanWithContext(t *testing.T, us 
[]Unit, dc DataContext)
        }
 }
 
-func quickTestSnapshot(source *DataSource, count int64) ProgressReportSnapshot 
{
-       return ProgressReportSnapshot{
-               Name:  source.Name,
-               ID:    source.SID.PtransformID,
-               Count: count,
-               pcol: PCollectionSnapshot{
-                       ElementCount: count,
-                       SizeCount:    count,
-                       SizeSum:      count,
-                       // We're only encoding small ints here, so size will 
only be 1.
-                       SizeMin: 1,
-                       SizeMax: 1,
-               },
-       }
-}
-
 func validateSource(t *testing.T, out *CaptureNode, source *DataSource, 
expected []FullValue) {
        t.Helper()
        if got, want := len(out.Elements), len(expected); got != want {
                t.Fatalf("lengths don't match: got %v, want %v", got, want)
        }
-       if got, want := source.Progress(), quickTestSnapshot(source, 
int64(len(expected))); got != want {
-               t.Fatalf("progress snapshot didn't match: got %v, want %v", 
got, want)
+       if got, want := source.Progress().Count, int64(len(expected)); got != 
want {
+               t.Fatalf("progress count didn't match: got %v, want %v", got, 
want)
        }
        if !equalList(out.Elements, expected) {
                t.Errorf("DataSource => %#v, want %#v", 
extractValues(out.Elements...), extractValues(expected...))
diff --git a/sdks/go/pkg/beam/core/runtime/exec/pcollection.go 
b/sdks/go/pkg/beam/core/runtime/exec/pcollection.go
deleted file mode 100644
index d8fb024..0000000
--- a/sdks/go/pkg/beam/core/runtime/exec/pcollection.go
+++ /dev/null
@@ -1,153 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one or more
-// contributor license agreements.  See the NOTICE file distributed with
-// this work for additional information regarding copyright ownership.
-// The ASF licenses this file to You under the Apache License, Version 2.0
-// (the "License"); you may not use this file except in compliance with
-// the License.  You may obtain a copy of the License at
-//
-//    http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package exec
-
-import (
-       "context"
-       "fmt"
-       "math"
-       "math/rand"
-       "sync"
-       "sync/atomic"
-
-       "github.com/apache/beam/sdks/go/pkg/beam/core/graph/coder"
-)
-
-// PCollection is a passthrough node to collect PCollection metrics, and
-// must be placed as the Out node of any producer of a PCollection.
-//
-// In particular, must not be placed after a Multiplex, and must be placed
-// after a Flatten.
-type PCollection struct {
-       UID    UnitID
-       PColID string
-       Out    Node // Out is the consumer of this PCollection.
-       Coder  *coder.Coder
-       Seed   int64
-
-       r             *rand.Rand
-       nextSampleIdx int64 // The index of the next value to sample.
-       elementCoder  ElementEncoder
-
-       elementCount                         int64 // must use atomic 
operations.
-       sizeMu                               sync.Mutex
-       sizeCount, sizeSum, sizeMin, sizeMax int64
-}
-
-// ID returns the debug id for this unit.
-func (p *PCollection) ID() UnitID {
-       return p.UID
-}
-
-// Up initializes the random sampling source and element encoder.
-func (p *PCollection) Up(ctx context.Context) error {
-       // dedicated rand source
-       p.r = rand.New(rand.NewSource(p.Seed))
-       p.elementCoder = MakeElementEncoder(p.Coder)
-       return nil
-}
-
-// StartBundle resets collected metrics for this PCollection, and propagates 
bundle start.
-func (p *PCollection) StartBundle(ctx context.Context, id string, data 
DataContext) error {
-       atomic.StoreInt64(&p.elementCount, 0)
-       p.nextSampleIdx = p.r.Int63n(3)
-       p.resetSize()
-       return MultiStartBundle(ctx, id, data, p.Out)
-}
-
-type byteCounter struct {
-       count int
-}
-
-func (w *byteCounter) Write(p []byte) (n int, err error) {
-       w.count += len(p)
-       return len(p), nil
-}
-
-// ProcessElement increments the element count and sometimes takes size 
samples of the elements.
-func (p *PCollection) ProcessElement(ctx context.Context, elm *FullValue, 
values ...ReStream) error {
-       cur := atomic.AddInt64(&p.elementCount, 1)
-       if cur == p.nextSampleIdx {
-               // We pick the next sampling index based on how large this 
pcollection already is.
-               // We don't want to necessarily wait until the pcollection has 
doubled, so we reduce the range.
-               // We don't want to always encode the first consecutive 
elements, so we add 2 to give some variance.
-               // Finally we add 1 no matter what, so that there's always the 
potential to trigger again.
-               // Otherwise, there's the potential for the random int to be 0, 
which means we don't change the
-               // nextSampleIdx at all.
-               p.nextSampleIdx = cur + p.r.Int63n(cur/10+2) + 1
-               var w byteCounter
-               p.elementCoder.Encode(elm, &w)
-               p.addSize(int64(w.count))
-       }
-       return p.Out.ProcessElement(ctx, elm, values...)
-}
-
-func (p *PCollection) addSize(size int64) {
-       p.sizeMu.Lock()
-       defer p.sizeMu.Unlock()
-       p.sizeCount++
-       p.sizeSum += size
-       if size > p.sizeMax {
-               p.sizeMax = size
-       }
-       if size < p.sizeMin {
-               p.sizeMin = size
-       }
-}
-
-func (p *PCollection) resetSize() {
-       p.sizeMu.Lock()
-       defer p.sizeMu.Unlock()
-       p.sizeCount = 0
-       p.sizeSum = 0
-       p.sizeMax = math.MinInt64
-       p.sizeMin = math.MaxInt64
-}
-
-// FinishBundle propagates bundle termination.
-func (p *PCollection) FinishBundle(ctx context.Context) error {
-       return MultiFinishBundle(ctx, p.Out)
-}
-
-// Down is a no-op.
-func (p *PCollection) Down(ctx context.Context) error {
-       return nil
-}
-
-func (p *PCollection) String() string {
-       return fmt.Sprintf("PCollection[%v] Out:%v", p.PColID, IDs(p.Out))
-}
-
-// PCollectionSnapshot contains the PCollectionID
-type PCollectionSnapshot struct {
-       ID           string
-       ElementCount int64
-       // If SizeCount is zero, then no size metrics should be exported.
-       SizeCount, SizeSum, SizeMin, SizeMax int64
-}
-
-func (p *PCollection) snapshot() PCollectionSnapshot {
-       p.sizeMu.Lock()
-       defer p.sizeMu.Unlock()
-       return PCollectionSnapshot{
-               ID:           p.PColID,
-               ElementCount: atomic.LoadInt64(&p.elementCount),
-               SizeCount:    p.sizeCount,
-               SizeSum:      p.sizeSum,
-               SizeMin:      p.sizeMin,
-               SizeMax:      p.sizeMax,
-       }
-}
diff --git a/sdks/go/pkg/beam/core/runtime/exec/pcollection_test.go 
b/sdks/go/pkg/beam/core/runtime/exec/pcollection_test.go
deleted file mode 100644
index f7f24a7..0000000
--- a/sdks/go/pkg/beam/core/runtime/exec/pcollection_test.go
+++ /dev/null
@@ -1,154 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one or more
-// contributor license agreements.  See the NOTICE file distributed with
-// this work for additional information regarding copyright ownership.
-// The ASF licenses this file to You under the Apache License, Version 2.0
-// (the "License"); you may not use this file except in compliance with
-// the License.  You may obtain a copy of the License at
-//
-//    http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package exec
-
-import (
-       "context"
-       "math"
-       "testing"
-
-       "github.com/apache/beam/sdks/go/pkg/beam/core/graph/coder"
-       "github.com/apache/beam/sdks/go/pkg/beam/core/graph/mtime"
-       "github.com/apache/beam/sdks/go/pkg/beam/core/graph/window"
-)
-
-// TestPCollection verifies that the PCollection node works correctly.
-// Seed is by default set to 0, so we have a "deterministic" set of
-// randomness for the samples.
-func TestPCollection(t *testing.T) {
-       a := &CaptureNode{UID: 1}
-       pcol := &PCollection{UID: 2, Out: a, Coder: coder.NewVarInt()}
-       // The "large" 2nd value is to ensure the values are encoded properly,
-       // and that Min & Max are behaving.
-       inputs := []interface{}{int64(1), int64(2000000000), int64(3)}
-       in := &FixedRoot{UID: 3, Elements: makeInput(inputs...), Out: pcol}
-
-       p, err := NewPlan("a", []Unit{a, pcol, in})
-       if err != nil {
-               t.Fatalf("failed to construct plan: %v", err)
-       }
-
-       if err := p.Execute(context.Background(), "1", DataContext{}); err != 
nil {
-               t.Fatalf("execute failed: %v", err)
-       }
-       if err := p.Down(context.Background()); err != nil {
-               t.Fatalf("down failed: %v", err)
-       }
-
-       expected := makeValues(inputs...)
-       if !equalList(a.Elements, expected) {
-               t.Errorf("multiplex returned %v for a, want %v", 
extractValues(a.Elements...), extractValues(expected...))
-       }
-       snap := pcol.snapshot()
-       if want, got := int64(len(expected)), snap.ElementCount; got != want {
-               t.Errorf("snapshot miscounted: got %v, want %v", got, want)
-       }
-       checkPCollectionSizeSample(t, snap, 2, 6, 1, 5)
-}
-
-func TestPCollection_sizeReset(t *testing.T) {
-       // Check the initial values after resetting.
-       var pcol PCollection
-       pcol.resetSize()
-       snap := pcol.snapshot()
-       checkPCollectionSizeSample(t, snap, 0, 0, math.MaxInt64, math.MinInt64)
-}
-
-func checkPCollectionSizeSample(t *testing.T, snap PCollectionSnapshot, count, 
sum, min, max int64) {
-       t.Helper()
-       if want, got := int64(count), snap.SizeCount; got != want {
-               t.Errorf("sample count incorrect: got %v, want %v", got, want)
-       }
-       if want, got := int64(sum), snap.SizeSum; got != want {
-               t.Errorf("sample sum incorrect: got %v, want %v", got, want)
-       }
-       if want, got := int64(min), snap.SizeMin; got != want {
-               t.Errorf("sample min incorrect: got %v, want %v", got, want)
-       }
-       if want, got := int64(max), snap.SizeMax; got != want {
-               t.Errorf("sample max incorrect: got %v, want %v", got, want)
-       }
-}
-
-// BenchmarkPCollection measures the overhead of invoking a ParDo in a plan.
-//
-// On @lostluck's desktop (2020/02/20):
-// BenchmarkPCollection-12                 44699806                24.8 ns/op  
           0 B/op          0 allocs/op
-func BenchmarkPCollection(b *testing.B) {
-       // Pre allocate the capture buffer and process buffer to avoid
-       // unnecessary overhead.
-       out := &CaptureNode{UID: 1, Elements: make([]FullValue, 0, b.N)}
-       process := make([]MainInput, 0, b.N)
-       for i := 0; i < b.N; i++ {
-               process = append(process, MainInput{Key: FullValue{
-                       Windows:   window.SingleGlobalWindow,
-                       Timestamp: mtime.ZeroTimestamp,
-                       Elm:       int64(1),
-               }})
-       }
-       pcol := &PCollection{UID: 2, Out: out, Coder: coder.NewVarInt()}
-       n := &FixedRoot{UID: 3, Elements: process, Out: pcol}
-       p, err := NewPlan("a", []Unit{n, pcol, out})
-       if err != nil {
-               b.Fatalf("failed to construct plan: %v", err)
-       }
-       b.ResetTimer()
-       if err := p.Execute(context.Background(), "1", DataContext{}); err != 
nil {
-               b.Fatalf("execute failed: %v", err)
-       }
-       if err := p.Down(context.Background()); err != nil {
-               b.Fatalf("down failed: %v", err)
-       }
-       if got, want := pcol.snapshot().ElementCount, int64(b.N); got != want {
-               b.Errorf("did not process all elements: got %v, want %v", got, 
want)
-       }
-       if got, want := len(out.Elements), b.N; got != want {
-               b.Errorf("did not process all elements: got %v, want %v", got, 
want)
-       }
-}
-
-// BenchmarkPCollection_Baseline measures the baseline of the node 
benchmarking scaffold.
-//
-// On @lostluck's desktop (2020/02/20):
-// BenchmarkPCollection_Baseline-12        62186372                18.8 ns/op  
           0 B/op          0 allocs/op
-func BenchmarkPCollection_Baseline(b *testing.B) {
-       // Pre allocate the capture buffer and process buffer to avoid
-       // unnecessary overhead.
-       out := &CaptureNode{UID: 1, Elements: make([]FullValue, 0, b.N)}
-       process := make([]MainInput, 0, b.N)
-       for i := 0; i < b.N; i++ {
-               process = append(process, MainInput{Key: FullValue{
-                       Windows:   window.SingleGlobalWindow,
-                       Timestamp: mtime.ZeroTimestamp,
-                       Elm:       1,
-               }})
-       }
-       n := &FixedRoot{UID: 3, Elements: process, Out: out}
-       p, err := NewPlan("a", []Unit{n, out})
-       if err != nil {
-               b.Fatalf("failed to construct plan: %v", err)
-       }
-       b.ResetTimer()
-       if err := p.Execute(context.Background(), "1", DataContext{}); err != 
nil {
-               b.Fatalf("execute failed: %v", err)
-       }
-       if err := p.Down(context.Background()); err != nil {
-               b.Fatalf("down failed: %v", err)
-       }
-       if got, want := len(out.Elements), b.N; got != want {
-               b.Errorf("did not process all elements: got %v, want %v", got, 
want)
-       }
-}
diff --git a/sdks/go/pkg/beam/core/runtime/exec/plan.go 
b/sdks/go/pkg/beam/core/runtime/exec/plan.go
index 5275092..fde9e7c 100644
--- a/sdks/go/pkg/beam/core/runtime/exec/plan.go
+++ b/sdks/go/pkg/beam/core/runtime/exec/plan.go
@@ -21,7 +21,9 @@ import (
        "context"
        "fmt"
        "strings"
+       "sync"
 
+       "github.com/apache/beam/sdks/go/pkg/beam/core/metrics"
        "github.com/apache/beam/sdks/go/pkg/beam/internal/errors"
 )
 
@@ -29,22 +31,34 @@ import (
 // from a part of a pipeline. A plan can be used to process multiple bundles
 // serially.
 type Plan struct {
-       id    string // id of the bundle descriptor for this plan
-       roots []Root
-       units []Unit
-       pcols []*PCollection
+       id       string
+       roots    []Root
+       units    []Unit
+       parDoIDs []string
 
        status Status
 
+       // While the store is threadsafe, the reference to it
+       // is not, so we need to protect the store field to be
+       // able to asynchronously provide tentative metrics.
+       storeMu sync.Mutex
+       store   *metrics.Store
+
        // TODO: there can be more than 1 DataSource in a bundle.
        source *DataSource
 }
 
+// hasPID provides a common interface for extracting PTransformIDs
+// from Units.
+type hasPID interface {
+       GetPID() string
+}
+
 // NewPlan returns a new bundle execution plan from the given units.
 func NewPlan(id string, units []Unit) (*Plan, error) {
        var roots []Root
-       var pcols []*PCollection
        var source *DataSource
+       var pardoIDs []string
 
        for _, u := range units {
                if u == nil {
@@ -56,8 +70,8 @@ func NewPlan(id string, units []Unit) (*Plan, error) {
                if s, ok := u.(*DataSource); ok {
                        source = s
                }
-               if p, ok := u.(*PCollection); ok {
-                       pcols = append(pcols, p)
+               if p, ok := u.(hasPID); ok {
+                       pardoIDs = append(pardoIDs, p.GetPID())
                }
        }
        if len(roots) == 0 {
@@ -65,12 +79,12 @@ func NewPlan(id string, units []Unit) (*Plan, error) {
        }
 
        return &Plan{
-               id:     id,
-               status: Initializing,
-               roots:  roots,
-               units:  units,
-               pcols:  pcols,
-               source: source,
+               id:       id,
+               status:   Initializing,
+               roots:    roots,
+               units:    units,
+               parDoIDs: pardoIDs,
+               source:   source,
        }, nil
 }
 
@@ -88,6 +102,10 @@ func (p *Plan) SourcePTransformID() string {
 // are brought up on the first execution. If a bundle fails, the plan cannot
 // be reused for further bundles. Does not panic. Blocking.
 func (p *Plan) Execute(ctx context.Context, id string, manager DataContext) 
error {
+       ctx = metrics.SetBundleID(ctx, p.id)
+       p.storeMu.Lock()
+       p.store = metrics.GetStore(ctx)
+       p.storeMu.Unlock()
        if p.status == Initializing {
                for _, u := range p.units {
                        if err := callNoPanic(ctx, u.Up); err != nil {
@@ -160,28 +178,19 @@ func (p *Plan) String() string {
        return fmt.Sprintf("Plan[%v]:\n%v", p.ID(), strings.Join(units, "\n"))
 }
 
-// PlanSnapshot contains system metrics for the current run of the plan.
-type PlanSnapshot struct {
-       Source ProgressReportSnapshot
-       PCols  []PCollectionSnapshot
-}
-
-// Progress returns a snapshot of progress of the plan, and associated 
metrics. 
-// The retuend boolean indicates whether the plan includes a DataSource, which 
is
-// important for handling legacy metrics. This boolean will be removed once
-// we no longer return legacy metrics.
-func (p *Plan) Progress() (PlanSnapshot, bool) {
-       pcolSnaps := make([]PCollectionSnapshot, 0, len(p.pcols)+1) // include 
space for the datasource pcollection.
-       for _, pcol := range p.pcols {
-               pcolSnaps = append(pcolSnaps, pcol.snapshot())
-       }
-       snap := PlanSnapshot{PCols: pcolSnaps}
+// Progress returns a snapshot of input progress of the plan, and associated 
metrics.
+func (p *Plan) Progress() (ProgressReportSnapshot, bool) {
        if p.source != nil {
-               snap.Source = p.source.Progress()
-               snap.PCols = append(pcolSnaps, snap.Source.pcol)
-               return snap, true
+               return p.source.Progress(), true
        }
-       return snap, false
+       return ProgressReportSnapshot{}, false
+}
+
+// Store returns the metric store for the last use of this plan.
+func (p *Plan) Store() *metrics.Store {
+       p.storeMu.Lock()
+       defer p.storeMu.Unlock()
+       return p.store
 }
 
 // SplitPoints captures the split requested by the Runner.
diff --git a/sdks/go/pkg/beam/core/runtime/exec/translate.go 
b/sdks/go/pkg/beam/core/runtime/exec/translate.go
index b629560..486dacf 100644
--- a/sdks/go/pkg/beam/core/runtime/exec/translate.go
+++ b/sdks/go/pkg/beam/core/runtime/exec/translate.go
@@ -17,7 +17,6 @@ package exec
 
 import (
        "fmt"
-       "math/rand"
        "strconv"
        "strings"
 
@@ -78,16 +77,12 @@ func UnmarshalPlan(desc *fnpb.ProcessBundleDescriptor) 
(*Plan, error) {
                for key, pid := range transform.GetOutputs() {
                        u.SID = StreamID{PtransformID: id, Port: port}
                        u.Name = key
+                       u.outputPID = pid
 
                        u.Out, err = b.makePCollection(pid)
                        if err != nil {
                                return nil, err
                        }
-                       // Elide the PCollection Node for DataSources.
-                       // DataSources can get byte samples directly, and can 
handle CoGBKs.
-                       u.PCol = *u.Out.(*PCollection)
-                       u.Out = u.PCol.Out
-                       b.units = b.units[:len(b.units)-1]
                }
 
                b.units = append(b.units, u)
@@ -103,8 +98,8 @@ type builder struct {
        succ map[string][]linkID // PCollectionID -> []linkID
 
        windowing map[string]*window.WindowingStrategy
-       nodes     map[string]*PCollection // PCollectionID -> Node (cache)
-       links     map[linkID]Node         // linkID -> Node (cache)
+       nodes     map[string]Node // PCollectionID -> Node (cache)
+       links     map[linkID]Node // linkID -> Node (cache)
 
        units []Unit // result
        idgen *GenID
@@ -146,7 +141,7 @@ func newBuilder(desc *fnpb.ProcessBundleDescriptor) 
(*builder, error) {
                succ: succ,
 
                windowing: make(map[string]*window.WindowingStrategy),
-               nodes:     make(map[string]*PCollection),
+               nodes:     make(map[string]Node),
                links:     make(map[linkID]Node),
 
                idgen: &GenID{},
@@ -263,7 +258,7 @@ func (b *builder) makeCoderForPCollection(id string) 
(*coder.Coder, *coder.Windo
        return c, wc, nil
 }
 
-func (b *builder) makePCollection(id string) (*PCollection, error) {
+func (b *builder) makePCollection(id string) (Node, error) {
        if n, exists := b.nodes[id]; exists {
                return n, nil
        }
@@ -278,11 +273,8 @@ func (b *builder) makePCollection(id string) 
(*PCollection, error) {
                u = &Discard{UID: b.idgen.New()}
 
        case 1:
-               out, err := b.makeLink(id, list[0])
-               if err != nil {
-                       return nil, err
-               }
-               return b.newPCollectionNode(id, out)
+               return b.makeLink(id, list[0])
+
        default:
                // Multiplex.
 
@@ -299,16 +291,7 @@ func (b *builder) makePCollection(id string) 
(*PCollection, error) {
                b.units = append(b.units, u)
                u = &Flatten{UID: b.idgen.New(), N: count, Out: u}
        }
-       b.units = append(b.units, u)
-       return b.newPCollectionNode(id, u)
-}
 
-func (b *builder) newPCollectionNode(id string, out Node) (*PCollection, 
error) {
-       ec, _, err := b.makeCoderForPCollection(id)
-       if err != nil {
-               return nil, err
-       }
-       u := &PCollection{UID: b.idgen.New(), Out: out, PColID: id, Coder: ec, 
Seed: rand.Int63()}
        b.nodes[id] = u
        b.units = append(b.units, u)
        return u, nil
diff --git a/sdks/go/pkg/beam/core/runtime/harness/harness.go 
b/sdks/go/pkg/beam/core/runtime/harness/harness.go
index 2ee8beb..c2fce51 100644
--- a/sdks/go/pkg/beam/core/runtime/harness/harness.go
+++ b/sdks/go/pkg/beam/core/runtime/harness/harness.go
@@ -23,7 +23,6 @@ import (
        "sync"
        "time"
 
-       "github.com/apache/beam/sdks/go/pkg/beam/core/metrics"
        "github.com/apache/beam/sdks/go/pkg/beam/core/runtime/exec"
        "github.com/apache/beam/sdks/go/pkg/beam/core/util/hooks"
        "github.com/apache/beam/sdks/go/pkg/beam/internal/errors"
@@ -83,12 +82,11 @@ func Main(ctx context.Context, loggingEndpoint, 
controlEndpoint string) error {
        }()
 
        ctrl := &control{
-               plans:    make(map[bundleDescriptorID]*exec.Plan),
-               active:   make(map[instructionID]*exec.Plan),
-               metStore: make(map[instructionID]*metrics.Store),
-               failed:   make(map[instructionID]error),
-               data:     &DataChannelManager{},
-               state:    &StateChannelManager{},
+               plans:  make(map[bundleDescriptorID]*exec.Plan),
+               active: make(map[instructionID]*exec.Plan),
+               failed: make(map[instructionID]error),
+               data:   &DataChannelManager{},
+               state:  &StateChannelManager{},
        }
 
        // gRPC requires all readers of a stream be the same goroutine, so this 
goroutine
@@ -144,8 +142,6 @@ type control struct {
        // plans that are actively being executed.
        // a plan can only be in one of these maps at any time.
        active map[instructionID]*exec.Plan // protected by mu
-       // metric stores for active plans.
-       metStore map[instructionID]*metrics.Store // protected by mu
        // plans that have failed during execution
        failed map[instructionID]error // protected by mu
        mu     sync.Mutex
@@ -196,10 +192,6 @@ func (c *control) handleInstruction(ctx context.Context, 
req *fnpb.InstructionRe
                // since a plan can't be run concurrently.
                c.active[instID] = plan
                delete(c.plans, bdID)
-               // Get the user metrics store for this bundle.
-               ctx = metrics.SetBundleID(ctx, string(instID))
-               store := metrics.GetStore(ctx)
-               c.metStore[instID] = store
                c.mu.Unlock()
 
                if !ok {
@@ -212,7 +204,7 @@ func (c *control) handleInstruction(ctx context.Context, 
req *fnpb.InstructionRe
                data.Close()
                state.Close()
 
-               mets, mons := monitoring(plan, store)
+               mets, mons := monitoring(plan)
                // Move the plan back to the candidate state
                c.mu.Lock()
                // Mark the instruction as failed.
@@ -221,7 +213,6 @@ func (c *control) handleInstruction(ctx context.Context, 
req *fnpb.InstructionRe
                }
                c.plans[bdID] = plan
                delete(c.active, instID)
-               delete(c.metStore, instID)
                c.mu.Unlock()
 
                if err != nil {
@@ -245,7 +236,6 @@ func (c *control) handleInstruction(ctx context.Context, 
req *fnpb.InstructionRe
                ref := instructionID(msg.GetInstructionId())
                c.mu.Lock()
                plan, ok := c.active[ref]
-               store, _ := c.metStore[ref]
                err := c.failed[ref]
                c.mu.Unlock()
                if err != nil {
@@ -255,7 +245,7 @@ func (c *control) handleInstruction(ctx context.Context, 
req *fnpb.InstructionRe
                        return fail(ctx, instID, "failed to return progress: 
instruction %v not active", ref)
                }
 
-               mets, mons := monitoring(plan, store)
+               mets, mons := monitoring(plan)
 
                return &fnpb.InstructionResponse{
                        InstructionId: string(instID),
diff --git a/sdks/go/pkg/beam/core/runtime/harness/monitoring.go 
b/sdks/go/pkg/beam/core/runtime/harness/monitoring.go
index b2bc35d..14162dd 100644
--- a/sdks/go/pkg/beam/core/runtime/harness/monitoring.go
+++ b/sdks/go/pkg/beam/core/runtime/harness/monitoring.go
@@ -25,7 +25,11 @@ import (
        "github.com/golang/protobuf/ptypes"
 )
 
-func monitoring(p *exec.Plan, store *metrics.Store) (*fnpb.Metrics, 
[]*ppb.MonitoringInfo) {
+func monitoring(p *exec.Plan) (*fnpb.Metrics, []*ppb.MonitoringInfo) {
+       store := p.Store()
+       if store == nil {
+               return nil, nil
+       }
        // Get the legacy style metrics.
        transforms := make(map[string]*fnpb.Metrics_PTransform)
        metrics.Extractor{
@@ -110,44 +114,27 @@ func monitoring(p *exec.Plan, store *metrics.Store) 
(*fnpb.Metrics, []*ppb.Monit
        }.ExtractFrom(store)
 
        // Get the execution monitoring information from the bundle plan.
-       snapshot, hasSource := p.Progress()
-       if hasSource {
+       if snapshot, ok := p.Progress(); ok {
                // Legacy version.
-               transforms[snapshot.Source.ID] = &fnpb.Metrics_PTransform{
+               transforms[snapshot.ID] = &fnpb.Metrics_PTransform{
                        ProcessedElements: 
&fnpb.Metrics_PTransform_ProcessedElements{
                                Measured: &fnpb.Metrics_PTransform_Measured{
                                        OutputElementCounts: map[string]int64{
-                                               snapshot.Source.Name: 
snapshot.Source.Count,
+                                               snapshot.Name: snapshot.Count,
                                        },
                                },
                        },
                }
-       }
-
-       // Monitoring info version.
-       for _, pcol := range snapshot.PCols {
+               // Monitoring info version.
                monitoringInfo = append(monitoringInfo,
                        &ppb.MonitoringInfo{
                                Urn:  "beam:metric:element_count:v1",
                                Type: "beam:metrics:sum_int_64",
                                Labels: map[string]string{
-                                       "PCOLLECTION": pcol.ID,
+                                       "PCOLLECTION": snapshot.PID,
                                },
-                               Data: int64Counter(pcol.ElementCount),
+                               Data: int64Counter(snapshot.Count),
                        })
-
-               // Skip pcollections without size
-               if pcol.SizeCount != 0 {
-                       monitoringInfo = append(monitoringInfo,
-                               &ppb.MonitoringInfo{
-                                       Urn:  
"beam:metric:sampled_byte_size:v1",
-                                       Type: 
"beam:metrics:distribution_int_64",
-                                       Labels: map[string]string{
-                                               "PCOLLECTION": pcol.ID,
-                                       },
-                                       Data: int64Distribution(pcol.SizeCount, 
pcol.SizeSum, pcol.SizeMin, pcol.SizeMax),
-                               })
-               }
        }
 
        return &fnpb.Metrics{
diff --git a/sdks/go/pkg/beam/runners/direct/direct.go 
b/sdks/go/pkg/beam/runners/direct/direct.go
index 4d78dc8..eface25 100644
--- a/sdks/go/pkg/beam/runners/direct/direct.go
+++ b/sdks/go/pkg/beam/runners/direct/direct.go
@@ -46,7 +46,6 @@ func Execute(ctx context.Context, p *beam.Pipeline) error {
 
        log.Info(ctx, "Pipeline:")
        log.Info(ctx, p)
-       ctx = metrics.SetBundleID(ctx, "direct") // Ensure a metrics.Store 
exists.
 
        if *jobopts.Strict {
                log.Info(ctx, "Strict mode enabled, applying additional 
validation.")
@@ -75,7 +74,7 @@ func Execute(ctx context.Context, p *beam.Pipeline) error {
        }
        // TODO(lostluck) 2020/01/24: What's the right way to expose the
        // metrics store for the direct runner?
-       metrics.DumpToLog(ctx)
+       metrics.DumpToLogFromStore(ctx, plan.Store())
        return nil
 }
 

Reply via email to