This is an automated email from the ASF dual-hosted git repository. lostluck pushed a commit to branch revert-10942-metrics7 in repository https://gitbox.apache.org/repos/asf/beam.git
commit 1bcaaaa0ae840af38ed9ba523d08180e252787ee Author: Robert Burke <[email protected]> AuthorDate: Thu Mar 5 16:47:33 2020 -0800 Revert "[BEAM-6374] Emit PCollection metrics from GoSDK (#10942)" This reverts commit ded686a58ad4747e91a26d3e59f61019b641e655. --- sdks/go/pkg/beam/core/runtime/exec/datasource.go | 77 +++-------- .../pkg/beam/core/runtime/exec/datasource_test.go | 39 ++---- sdks/go/pkg/beam/core/runtime/exec/pcollection.go | 153 -------------------- .../pkg/beam/core/runtime/exec/pcollection_test.go | 154 --------------------- sdks/go/pkg/beam/core/runtime/exec/plan.go | 75 +++++----- sdks/go/pkg/beam/core/runtime/exec/translate.go | 31 +---- sdks/go/pkg/beam/core/runtime/harness/harness.go | 24 +--- .../go/pkg/beam/core/runtime/harness/monitoring.go | 35 ++--- sdks/go/pkg/beam/runners/direct/direct.go | 3 +- 9 files changed, 100 insertions(+), 491 deletions(-) diff --git a/sdks/go/pkg/beam/core/runtime/exec/datasource.go b/sdks/go/pkg/beam/core/runtime/exec/datasource.go index fb20aea..ccc4a09 100644 --- a/sdks/go/pkg/beam/core/runtime/exec/datasource.go +++ b/sdks/go/pkg/beam/core/runtime/exec/datasource.go @@ -36,14 +36,14 @@ type DataSource struct { Name string Coder *coder.Coder Out Node - PCol PCollection // Handles size metrics. Value instead of pointer so it's initialized by default in tests. source DataManager state StateReader - - index int64 - splitIdx int64 - start time.Time + // TODO(lostluck) 2020/02/06: refactor to support more general PCollection metrics on nodes. + outputPID string // The index is the output count for the PCollection. + index int64 + splitIdx int64 + start time.Time mu sync.Mutex } @@ -70,29 +70,6 @@ func (n *DataSource) StartBundle(ctx context.Context, id string, data DataContex return n.Out.StartBundle(ctx, id, data) } -// ByteCountReader is a passthrough reader that counts all the bytes read through it. -// It trusts the nested reader to return accurate byte information. -type byteCountReader struct { - count *int - reader io.ReadCloser -} - -func (r *byteCountReader) Read(p []byte) (int, error) { - n, err := r.reader.Read(p) - *r.count += n - return n, err -} - -func (r *byteCountReader) Close() error { - return r.reader.Close() -} - -func (r *byteCountReader) reset() int { - c := *r.count - *r.count = 0 - return c -} - // Process opens the data source, reads and decodes data, kicking off element processing. func (n *DataSource) Process(ctx context.Context) error { r, err := n.source.OpenRead(ctx, n.SID) @@ -100,9 +77,6 @@ func (n *DataSource) Process(ctx context.Context) error { return err } defer r.Close() - n.PCol.resetSize() // initialize the size distribution for this bundle. - var byteCount int - bcr := byteCountReader{reader: r, count: &byteCount} c := coder.SkipW(n.Coder) wc := MakeWindowDecoder(n.Coder.Window) @@ -125,7 +99,6 @@ func (n *DataSource) Process(ctx context.Context) error { if n.incrementIndexAndCheckSplit() { return nil } - // TODO(lostluck) 2020/02/22: Should we include window headers or just count the element sizes? ws, t, err := DecodeWindowedValueHeader(wc, r) if err != nil { if err == io.EOF { @@ -135,7 +108,7 @@ func (n *DataSource) Process(ctx context.Context) error { } // Decode key or parallel element. - pe, err := cp.Decode(&bcr) + pe, err := cp.Decode(r) if err != nil { return errors.Wrap(err, "source decode failed") } @@ -144,7 +117,7 @@ func (n *DataSource) Process(ctx context.Context) error { var valReStreams []ReStream for _, cv := range cvs { - values, err := n.makeReStream(ctx, pe, cv, &bcr) + values, err := n.makeReStream(ctx, pe, cv, r) if err != nil { return err } @@ -154,15 +127,11 @@ func (n *DataSource) Process(ctx context.Context) error { if err := n.Out.ProcessElement(ctx, pe, valReStreams...); err != nil { return err } - // Collect the actual size of the element, and reset the bytecounter reader. - n.PCol.addSize(int64(bcr.reset())) - bcr.reader = r } } -func (n *DataSource) makeReStream(ctx context.Context, key *FullValue, cv ElementDecoder, bcr *byteCountReader) (ReStream, error) { - // TODO(lostluck) 2020/02/22: Do we include the chunk size, or just the element sizes? - size, err := coder.DecodeInt32(bcr.reader) +func (n *DataSource) makeReStream(ctx context.Context, key *FullValue, cv ElementDecoder, r io.ReadCloser) (ReStream, error) { + size, err := coder.DecodeInt32(r) if err != nil { return nil, errors.Wrap(err, "stream size decoding failed") } @@ -171,16 +140,16 @@ func (n *DataSource) makeReStream(ctx context.Context, key *FullValue, cv Elemen case size >= 0: // Single chunk streams are fully read in and buffered in memory. buf := make([]FullValue, 0, size) - buf, err = readStreamToBuffer(cv, bcr, int64(size), buf) + buf, err = readStreamToBuffer(cv, r, int64(size), buf) if err != nil { return nil, err } return &FixedReStream{Buf: buf}, nil - case size == -1: + case size == -1: // Shouldn't this be 0? // Multi-chunked stream. var buf []FullValue for { - chunk, err := coder.DecodeVarInt(bcr.reader) + chunk, err := coder.DecodeVarInt(r) if err != nil { return nil, errors.Wrap(err, "stream chunk size decoding failed") } @@ -190,17 +159,17 @@ func (n *DataSource) makeReStream(ctx context.Context, key *FullValue, cv Elemen return &FixedReStream{Buf: buf}, nil case chunk > 0: // Non-zero chunk, read that many elements from the stream, and buffer them. chunkBuf := make([]FullValue, 0, chunk) - chunkBuf, err = readStreamToBuffer(cv, bcr, chunk, chunkBuf) + chunkBuf, err = readStreamToBuffer(cv, r, chunk, chunkBuf) if err != nil { return nil, err } buf = append(buf, chunkBuf...) case chunk == -1: // State backed iterable! - chunk, err := coder.DecodeVarInt(bcr.reader) + chunk, err := coder.DecodeVarInt(r) if err != nil { return nil, err } - token, err := ioutilx.ReadN(bcr.reader, (int)(chunk)) + token, err := ioutilx.ReadN(r, (int)(chunk)) if err != nil { return nil, err } @@ -212,9 +181,6 @@ func (n *DataSource) makeReStream(ctx context.Context, key *FullValue, cv Elemen if err != nil { return nil, err } - // We can't re-use the original bcr, since we may get new iterables, - // or multiple of them at the same time, but we can re-use the count itself. - r = &byteCountReader{reader: r, count: bcr.count} return &elementStream{r: r, ec: cv}, nil }, }, @@ -275,11 +241,12 @@ func (n *DataSource) incrementIndexAndCheckSplit() bool { } // ProgressReportSnapshot captures the progress reading an input source. +// +// TODO(lostluck) 2020/02/06: Add a visitor pattern for collecting progress +// metrics from downstream Nodes. type ProgressReportSnapshot struct { - ID, Name string - Count int64 - - pcol PCollectionSnapshot + ID, Name, PID string + Count int64 } // Progress returns a snapshot of the source's progress. @@ -288,7 +255,6 @@ func (n *DataSource) Progress() ProgressReportSnapshot { return ProgressReportSnapshot{} } n.mu.Lock() - pcol := n.PCol.snapshot() // The count is the number of "completely processed elements" // which matches the index of the currently processing element. c := n.index @@ -297,8 +263,7 @@ func (n *DataSource) Progress() ProgressReportSnapshot { if c < 0 { c = 0 } - pcol.ElementCount = c - return ProgressReportSnapshot{ID: n.SID.PtransformID, Name: n.Name, Count: c, pcol: pcol} + return ProgressReportSnapshot{PID: n.outputPID, ID: n.SID.PtransformID, Name: n.Name, Count: c} } // Split takes a sorted set of potential split indices, selects and actuates diff --git a/sdks/go/pkg/beam/core/runtime/exec/datasource_test.go b/sdks/go/pkg/beam/core/runtime/exec/datasource_test.go index 3037c84..1ce493c 100644 --- a/sdks/go/pkg/beam/core/runtime/exec/datasource_test.go +++ b/sdks/go/pkg/beam/core/runtime/exec/datasource_test.go @@ -201,16 +201,6 @@ func TestDataSource_Iterators(t *testing.T) { if got, want := iVals, expectedKeys; !equalList(got, want) { t.Errorf("DataSource => %#v, want %#v", extractValues(got...), extractValues(want...)) } - - // We're using integers that encode to 1 byte, so do some quick math to validate. - sizeOfSmallInt := 1 - snap := quickTestSnapshot(source, int64(len(test.keys))) - snap.pcol.SizeSum = int64(len(test.keys) * (1 + len(test.vals)) * sizeOfSmallInt) - snap.pcol.SizeMin = int64((1 + len(test.vals)) * sizeOfSmallInt) - snap.pcol.SizeMax = int64((1 + len(test.vals)) * sizeOfSmallInt) - if got, want := source.Progress(), snap; got != want { - t.Errorf("progress didn't match: got %v, want %v", got, want) - } }) } } @@ -368,6 +358,15 @@ func TestDataSource_Split(t *testing.T) { }) validateSource(t, out, source, makeValues(test.expected...)) + + // Adjust expectations to maximum number of elements. + adjustedExpectation := test.splitIdx + if adjustedExpectation > int64(len(elements)) { + adjustedExpectation = int64(len(elements)) + } + if got, want := source.Progress().Count, adjustedExpectation; got != want { + t.Fatalf("progress didn't match split: got %v, want %v", got, want) + } }) } }) @@ -465,29 +464,13 @@ func constructAndExecutePlanWithContext(t *testing.T, us []Unit, dc DataContext) } } -func quickTestSnapshot(source *DataSource, count int64) ProgressReportSnapshot { - return ProgressReportSnapshot{ - Name: source.Name, - ID: source.SID.PtransformID, - Count: count, - pcol: PCollectionSnapshot{ - ElementCount: count, - SizeCount: count, - SizeSum: count, - // We're only encoding small ints here, so size will only be 1. - SizeMin: 1, - SizeMax: 1, - }, - } -} - func validateSource(t *testing.T, out *CaptureNode, source *DataSource, expected []FullValue) { t.Helper() if got, want := len(out.Elements), len(expected); got != want { t.Fatalf("lengths don't match: got %v, want %v", got, want) } - if got, want := source.Progress(), quickTestSnapshot(source, int64(len(expected))); got != want { - t.Fatalf("progress snapshot didn't match: got %v, want %v", got, want) + if got, want := source.Progress().Count, int64(len(expected)); got != want { + t.Fatalf("progress count didn't match: got %v, want %v", got, want) } if !equalList(out.Elements, expected) { t.Errorf("DataSource => %#v, want %#v", extractValues(out.Elements...), extractValues(expected...)) diff --git a/sdks/go/pkg/beam/core/runtime/exec/pcollection.go b/sdks/go/pkg/beam/core/runtime/exec/pcollection.go deleted file mode 100644 index d8fb024..0000000 --- a/sdks/go/pkg/beam/core/runtime/exec/pcollection.go +++ /dev/null @@ -1,153 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one or more -// contributor license agreements. See the NOTICE file distributed with -// this work for additional information regarding copyright ownership. -// The ASF licenses this file to You under the Apache License, Version 2.0 -// (the "License"); you may not use this file except in compliance with -// the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package exec - -import ( - "context" - "fmt" - "math" - "math/rand" - "sync" - "sync/atomic" - - "github.com/apache/beam/sdks/go/pkg/beam/core/graph/coder" -) - -// PCollection is a passthrough node to collect PCollection metrics, and -// must be placed as the Out node of any producer of a PCollection. -// -// In particular, must not be placed after a Multiplex, and must be placed -// after a Flatten. -type PCollection struct { - UID UnitID - PColID string - Out Node // Out is the consumer of this PCollection. - Coder *coder.Coder - Seed int64 - - r *rand.Rand - nextSampleIdx int64 // The index of the next value to sample. - elementCoder ElementEncoder - - elementCount int64 // must use atomic operations. - sizeMu sync.Mutex - sizeCount, sizeSum, sizeMin, sizeMax int64 -} - -// ID returns the debug id for this unit. -func (p *PCollection) ID() UnitID { - return p.UID -} - -// Up initializes the random sampling source and element encoder. -func (p *PCollection) Up(ctx context.Context) error { - // dedicated rand source - p.r = rand.New(rand.NewSource(p.Seed)) - p.elementCoder = MakeElementEncoder(p.Coder) - return nil -} - -// StartBundle resets collected metrics for this PCollection, and propagates bundle start. -func (p *PCollection) StartBundle(ctx context.Context, id string, data DataContext) error { - atomic.StoreInt64(&p.elementCount, 0) - p.nextSampleIdx = p.r.Int63n(3) - p.resetSize() - return MultiStartBundle(ctx, id, data, p.Out) -} - -type byteCounter struct { - count int -} - -func (w *byteCounter) Write(p []byte) (n int, err error) { - w.count += len(p) - return len(p), nil -} - -// ProcessElement increments the element count and sometimes takes size samples of the elements. -func (p *PCollection) ProcessElement(ctx context.Context, elm *FullValue, values ...ReStream) error { - cur := atomic.AddInt64(&p.elementCount, 1) - if cur == p.nextSampleIdx { - // We pick the next sampling index based on how large this pcollection already is. - // We don't want to necessarily wait until the pcollection has doubled, so we reduce the range. - // We don't want to always encode the first consecutive elements, so we add 2 to give some variance. - // Finally we add 1 no matter what, so that there's always the potential to trigger again. - // Otherwise, there's the potential for the random int to be 0, which means we don't change the - // nextSampleIdx at all. - p.nextSampleIdx = cur + p.r.Int63n(cur/10+2) + 1 - var w byteCounter - p.elementCoder.Encode(elm, &w) - p.addSize(int64(w.count)) - } - return p.Out.ProcessElement(ctx, elm, values...) -} - -func (p *PCollection) addSize(size int64) { - p.sizeMu.Lock() - defer p.sizeMu.Unlock() - p.sizeCount++ - p.sizeSum += size - if size > p.sizeMax { - p.sizeMax = size - } - if size < p.sizeMin { - p.sizeMin = size - } -} - -func (p *PCollection) resetSize() { - p.sizeMu.Lock() - defer p.sizeMu.Unlock() - p.sizeCount = 0 - p.sizeSum = 0 - p.sizeMax = math.MinInt64 - p.sizeMin = math.MaxInt64 -} - -// FinishBundle propagates bundle termination. -func (p *PCollection) FinishBundle(ctx context.Context) error { - return MultiFinishBundle(ctx, p.Out) -} - -// Down is a no-op. -func (p *PCollection) Down(ctx context.Context) error { - return nil -} - -func (p *PCollection) String() string { - return fmt.Sprintf("PCollection[%v] Out:%v", p.PColID, IDs(p.Out)) -} - -// PCollectionSnapshot contains the PCollectionID -type PCollectionSnapshot struct { - ID string - ElementCount int64 - // If SizeCount is zero, then no size metrics should be exported. - SizeCount, SizeSum, SizeMin, SizeMax int64 -} - -func (p *PCollection) snapshot() PCollectionSnapshot { - p.sizeMu.Lock() - defer p.sizeMu.Unlock() - return PCollectionSnapshot{ - ID: p.PColID, - ElementCount: atomic.LoadInt64(&p.elementCount), - SizeCount: p.sizeCount, - SizeSum: p.sizeSum, - SizeMin: p.sizeMin, - SizeMax: p.sizeMax, - } -} diff --git a/sdks/go/pkg/beam/core/runtime/exec/pcollection_test.go b/sdks/go/pkg/beam/core/runtime/exec/pcollection_test.go deleted file mode 100644 index f7f24a7..0000000 --- a/sdks/go/pkg/beam/core/runtime/exec/pcollection_test.go +++ /dev/null @@ -1,154 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one or more -// contributor license agreements. See the NOTICE file distributed with -// this work for additional information regarding copyright ownership. -// The ASF licenses this file to You under the Apache License, Version 2.0 -// (the "License"); you may not use this file except in compliance with -// the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package exec - -import ( - "context" - "math" - "testing" - - "github.com/apache/beam/sdks/go/pkg/beam/core/graph/coder" - "github.com/apache/beam/sdks/go/pkg/beam/core/graph/mtime" - "github.com/apache/beam/sdks/go/pkg/beam/core/graph/window" -) - -// TestPCollection verifies that the PCollection node works correctly. -// Seed is by default set to 0, so we have a "deterministic" set of -// randomness for the samples. -func TestPCollection(t *testing.T) { - a := &CaptureNode{UID: 1} - pcol := &PCollection{UID: 2, Out: a, Coder: coder.NewVarInt()} - // The "large" 2nd value is to ensure the values are encoded properly, - // and that Min & Max are behaving. - inputs := []interface{}{int64(1), int64(2000000000), int64(3)} - in := &FixedRoot{UID: 3, Elements: makeInput(inputs...), Out: pcol} - - p, err := NewPlan("a", []Unit{a, pcol, in}) - if err != nil { - t.Fatalf("failed to construct plan: %v", err) - } - - if err := p.Execute(context.Background(), "1", DataContext{}); err != nil { - t.Fatalf("execute failed: %v", err) - } - if err := p.Down(context.Background()); err != nil { - t.Fatalf("down failed: %v", err) - } - - expected := makeValues(inputs...) - if !equalList(a.Elements, expected) { - t.Errorf("multiplex returned %v for a, want %v", extractValues(a.Elements...), extractValues(expected...)) - } - snap := pcol.snapshot() - if want, got := int64(len(expected)), snap.ElementCount; got != want { - t.Errorf("snapshot miscounted: got %v, want %v", got, want) - } - checkPCollectionSizeSample(t, snap, 2, 6, 1, 5) -} - -func TestPCollection_sizeReset(t *testing.T) { - // Check the initial values after resetting. - var pcol PCollection - pcol.resetSize() - snap := pcol.snapshot() - checkPCollectionSizeSample(t, snap, 0, 0, math.MaxInt64, math.MinInt64) -} - -func checkPCollectionSizeSample(t *testing.T, snap PCollectionSnapshot, count, sum, min, max int64) { - t.Helper() - if want, got := int64(count), snap.SizeCount; got != want { - t.Errorf("sample count incorrect: got %v, want %v", got, want) - } - if want, got := int64(sum), snap.SizeSum; got != want { - t.Errorf("sample sum incorrect: got %v, want %v", got, want) - } - if want, got := int64(min), snap.SizeMin; got != want { - t.Errorf("sample min incorrect: got %v, want %v", got, want) - } - if want, got := int64(max), snap.SizeMax; got != want { - t.Errorf("sample max incorrect: got %v, want %v", got, want) - } -} - -// BenchmarkPCollection measures the overhead of invoking a ParDo in a plan. -// -// On @lostluck's desktop (2020/02/20): -// BenchmarkPCollection-12 44699806 24.8 ns/op 0 B/op 0 allocs/op -func BenchmarkPCollection(b *testing.B) { - // Pre allocate the capture buffer and process buffer to avoid - // unnecessary overhead. - out := &CaptureNode{UID: 1, Elements: make([]FullValue, 0, b.N)} - process := make([]MainInput, 0, b.N) - for i := 0; i < b.N; i++ { - process = append(process, MainInput{Key: FullValue{ - Windows: window.SingleGlobalWindow, - Timestamp: mtime.ZeroTimestamp, - Elm: int64(1), - }}) - } - pcol := &PCollection{UID: 2, Out: out, Coder: coder.NewVarInt()} - n := &FixedRoot{UID: 3, Elements: process, Out: pcol} - p, err := NewPlan("a", []Unit{n, pcol, out}) - if err != nil { - b.Fatalf("failed to construct plan: %v", err) - } - b.ResetTimer() - if err := p.Execute(context.Background(), "1", DataContext{}); err != nil { - b.Fatalf("execute failed: %v", err) - } - if err := p.Down(context.Background()); err != nil { - b.Fatalf("down failed: %v", err) - } - if got, want := pcol.snapshot().ElementCount, int64(b.N); got != want { - b.Errorf("did not process all elements: got %v, want %v", got, want) - } - if got, want := len(out.Elements), b.N; got != want { - b.Errorf("did not process all elements: got %v, want %v", got, want) - } -} - -// BenchmarkPCollection_Baseline measures the baseline of the node benchmarking scaffold. -// -// On @lostluck's desktop (2020/02/20): -// BenchmarkPCollection_Baseline-12 62186372 18.8 ns/op 0 B/op 0 allocs/op -func BenchmarkPCollection_Baseline(b *testing.B) { - // Pre allocate the capture buffer and process buffer to avoid - // unnecessary overhead. - out := &CaptureNode{UID: 1, Elements: make([]FullValue, 0, b.N)} - process := make([]MainInput, 0, b.N) - for i := 0; i < b.N; i++ { - process = append(process, MainInput{Key: FullValue{ - Windows: window.SingleGlobalWindow, - Timestamp: mtime.ZeroTimestamp, - Elm: 1, - }}) - } - n := &FixedRoot{UID: 3, Elements: process, Out: out} - p, err := NewPlan("a", []Unit{n, out}) - if err != nil { - b.Fatalf("failed to construct plan: %v", err) - } - b.ResetTimer() - if err := p.Execute(context.Background(), "1", DataContext{}); err != nil { - b.Fatalf("execute failed: %v", err) - } - if err := p.Down(context.Background()); err != nil { - b.Fatalf("down failed: %v", err) - } - if got, want := len(out.Elements), b.N; got != want { - b.Errorf("did not process all elements: got %v, want %v", got, want) - } -} diff --git a/sdks/go/pkg/beam/core/runtime/exec/plan.go b/sdks/go/pkg/beam/core/runtime/exec/plan.go index 5275092..fde9e7c 100644 --- a/sdks/go/pkg/beam/core/runtime/exec/plan.go +++ b/sdks/go/pkg/beam/core/runtime/exec/plan.go @@ -21,7 +21,9 @@ import ( "context" "fmt" "strings" + "sync" + "github.com/apache/beam/sdks/go/pkg/beam/core/metrics" "github.com/apache/beam/sdks/go/pkg/beam/internal/errors" ) @@ -29,22 +31,34 @@ import ( // from a part of a pipeline. A plan can be used to process multiple bundles // serially. type Plan struct { - id string // id of the bundle descriptor for this plan - roots []Root - units []Unit - pcols []*PCollection + id string + roots []Root + units []Unit + parDoIDs []string status Status + // While the store is threadsafe, the reference to it + // is not, so we need to protect the store field to be + // able to asynchronously provide tentative metrics. + storeMu sync.Mutex + store *metrics.Store + // TODO: there can be more than 1 DataSource in a bundle. source *DataSource } +// hasPID provides a common interface for extracting PTransformIDs +// from Units. +type hasPID interface { + GetPID() string +} + // NewPlan returns a new bundle execution plan from the given units. func NewPlan(id string, units []Unit) (*Plan, error) { var roots []Root - var pcols []*PCollection var source *DataSource + var pardoIDs []string for _, u := range units { if u == nil { @@ -56,8 +70,8 @@ func NewPlan(id string, units []Unit) (*Plan, error) { if s, ok := u.(*DataSource); ok { source = s } - if p, ok := u.(*PCollection); ok { - pcols = append(pcols, p) + if p, ok := u.(hasPID); ok { + pardoIDs = append(pardoIDs, p.GetPID()) } } if len(roots) == 0 { @@ -65,12 +79,12 @@ func NewPlan(id string, units []Unit) (*Plan, error) { } return &Plan{ - id: id, - status: Initializing, - roots: roots, - units: units, - pcols: pcols, - source: source, + id: id, + status: Initializing, + roots: roots, + units: units, + parDoIDs: pardoIDs, + source: source, }, nil } @@ -88,6 +102,10 @@ func (p *Plan) SourcePTransformID() string { // are brought up on the first execution. If a bundle fails, the plan cannot // be reused for further bundles. Does not panic. Blocking. func (p *Plan) Execute(ctx context.Context, id string, manager DataContext) error { + ctx = metrics.SetBundleID(ctx, p.id) + p.storeMu.Lock() + p.store = metrics.GetStore(ctx) + p.storeMu.Unlock() if p.status == Initializing { for _, u := range p.units { if err := callNoPanic(ctx, u.Up); err != nil { @@ -160,28 +178,19 @@ func (p *Plan) String() string { return fmt.Sprintf("Plan[%v]:\n%v", p.ID(), strings.Join(units, "\n")) } -// PlanSnapshot contains system metrics for the current run of the plan. -type PlanSnapshot struct { - Source ProgressReportSnapshot - PCols []PCollectionSnapshot -} - -// Progress returns a snapshot of progress of the plan, and associated metrics. -// The retuend boolean indicates whether the plan includes a DataSource, which is -// important for handling legacy metrics. This boolean will be removed once -// we no longer return legacy metrics. -func (p *Plan) Progress() (PlanSnapshot, bool) { - pcolSnaps := make([]PCollectionSnapshot, 0, len(p.pcols)+1) // include space for the datasource pcollection. - for _, pcol := range p.pcols { - pcolSnaps = append(pcolSnaps, pcol.snapshot()) - } - snap := PlanSnapshot{PCols: pcolSnaps} +// Progress returns a snapshot of input progress of the plan, and associated metrics. +func (p *Plan) Progress() (ProgressReportSnapshot, bool) { if p.source != nil { - snap.Source = p.source.Progress() - snap.PCols = append(pcolSnaps, snap.Source.pcol) - return snap, true + return p.source.Progress(), true } - return snap, false + return ProgressReportSnapshot{}, false +} + +// Store returns the metric store for the last use of this plan. +func (p *Plan) Store() *metrics.Store { + p.storeMu.Lock() + defer p.storeMu.Unlock() + return p.store } // SplitPoints captures the split requested by the Runner. diff --git a/sdks/go/pkg/beam/core/runtime/exec/translate.go b/sdks/go/pkg/beam/core/runtime/exec/translate.go index b629560..486dacf 100644 --- a/sdks/go/pkg/beam/core/runtime/exec/translate.go +++ b/sdks/go/pkg/beam/core/runtime/exec/translate.go @@ -17,7 +17,6 @@ package exec import ( "fmt" - "math/rand" "strconv" "strings" @@ -78,16 +77,12 @@ func UnmarshalPlan(desc *fnpb.ProcessBundleDescriptor) (*Plan, error) { for key, pid := range transform.GetOutputs() { u.SID = StreamID{PtransformID: id, Port: port} u.Name = key + u.outputPID = pid u.Out, err = b.makePCollection(pid) if err != nil { return nil, err } - // Elide the PCollection Node for DataSources. - // DataSources can get byte samples directly, and can handle CoGBKs. - u.PCol = *u.Out.(*PCollection) - u.Out = u.PCol.Out - b.units = b.units[:len(b.units)-1] } b.units = append(b.units, u) @@ -103,8 +98,8 @@ type builder struct { succ map[string][]linkID // PCollectionID -> []linkID windowing map[string]*window.WindowingStrategy - nodes map[string]*PCollection // PCollectionID -> Node (cache) - links map[linkID]Node // linkID -> Node (cache) + nodes map[string]Node // PCollectionID -> Node (cache) + links map[linkID]Node // linkID -> Node (cache) units []Unit // result idgen *GenID @@ -146,7 +141,7 @@ func newBuilder(desc *fnpb.ProcessBundleDescriptor) (*builder, error) { succ: succ, windowing: make(map[string]*window.WindowingStrategy), - nodes: make(map[string]*PCollection), + nodes: make(map[string]Node), links: make(map[linkID]Node), idgen: &GenID{}, @@ -263,7 +258,7 @@ func (b *builder) makeCoderForPCollection(id string) (*coder.Coder, *coder.Windo return c, wc, nil } -func (b *builder) makePCollection(id string) (*PCollection, error) { +func (b *builder) makePCollection(id string) (Node, error) { if n, exists := b.nodes[id]; exists { return n, nil } @@ -278,11 +273,8 @@ func (b *builder) makePCollection(id string) (*PCollection, error) { u = &Discard{UID: b.idgen.New()} case 1: - out, err := b.makeLink(id, list[0]) - if err != nil { - return nil, err - } - return b.newPCollectionNode(id, out) + return b.makeLink(id, list[0]) + default: // Multiplex. @@ -299,16 +291,7 @@ func (b *builder) makePCollection(id string) (*PCollection, error) { b.units = append(b.units, u) u = &Flatten{UID: b.idgen.New(), N: count, Out: u} } - b.units = append(b.units, u) - return b.newPCollectionNode(id, u) -} -func (b *builder) newPCollectionNode(id string, out Node) (*PCollection, error) { - ec, _, err := b.makeCoderForPCollection(id) - if err != nil { - return nil, err - } - u := &PCollection{UID: b.idgen.New(), Out: out, PColID: id, Coder: ec, Seed: rand.Int63()} b.nodes[id] = u b.units = append(b.units, u) return u, nil diff --git a/sdks/go/pkg/beam/core/runtime/harness/harness.go b/sdks/go/pkg/beam/core/runtime/harness/harness.go index 2ee8beb..c2fce51 100644 --- a/sdks/go/pkg/beam/core/runtime/harness/harness.go +++ b/sdks/go/pkg/beam/core/runtime/harness/harness.go @@ -23,7 +23,6 @@ import ( "sync" "time" - "github.com/apache/beam/sdks/go/pkg/beam/core/metrics" "github.com/apache/beam/sdks/go/pkg/beam/core/runtime/exec" "github.com/apache/beam/sdks/go/pkg/beam/core/util/hooks" "github.com/apache/beam/sdks/go/pkg/beam/internal/errors" @@ -83,12 +82,11 @@ func Main(ctx context.Context, loggingEndpoint, controlEndpoint string) error { }() ctrl := &control{ - plans: make(map[bundleDescriptorID]*exec.Plan), - active: make(map[instructionID]*exec.Plan), - metStore: make(map[instructionID]*metrics.Store), - failed: make(map[instructionID]error), - data: &DataChannelManager{}, - state: &StateChannelManager{}, + plans: make(map[bundleDescriptorID]*exec.Plan), + active: make(map[instructionID]*exec.Plan), + failed: make(map[instructionID]error), + data: &DataChannelManager{}, + state: &StateChannelManager{}, } // gRPC requires all readers of a stream be the same goroutine, so this goroutine @@ -144,8 +142,6 @@ type control struct { // plans that are actively being executed. // a plan can only be in one of these maps at any time. active map[instructionID]*exec.Plan // protected by mu - // metric stores for active plans. - metStore map[instructionID]*metrics.Store // protected by mu // plans that have failed during execution failed map[instructionID]error // protected by mu mu sync.Mutex @@ -196,10 +192,6 @@ func (c *control) handleInstruction(ctx context.Context, req *fnpb.InstructionRe // since a plan can't be run concurrently. c.active[instID] = plan delete(c.plans, bdID) - // Get the user metrics store for this bundle. - ctx = metrics.SetBundleID(ctx, string(instID)) - store := metrics.GetStore(ctx) - c.metStore[instID] = store c.mu.Unlock() if !ok { @@ -212,7 +204,7 @@ func (c *control) handleInstruction(ctx context.Context, req *fnpb.InstructionRe data.Close() state.Close() - mets, mons := monitoring(plan, store) + mets, mons := monitoring(plan) // Move the plan back to the candidate state c.mu.Lock() // Mark the instruction as failed. @@ -221,7 +213,6 @@ func (c *control) handleInstruction(ctx context.Context, req *fnpb.InstructionRe } c.plans[bdID] = plan delete(c.active, instID) - delete(c.metStore, instID) c.mu.Unlock() if err != nil { @@ -245,7 +236,6 @@ func (c *control) handleInstruction(ctx context.Context, req *fnpb.InstructionRe ref := instructionID(msg.GetInstructionId()) c.mu.Lock() plan, ok := c.active[ref] - store, _ := c.metStore[ref] err := c.failed[ref] c.mu.Unlock() if err != nil { @@ -255,7 +245,7 @@ func (c *control) handleInstruction(ctx context.Context, req *fnpb.InstructionRe return fail(ctx, instID, "failed to return progress: instruction %v not active", ref) } - mets, mons := monitoring(plan, store) + mets, mons := monitoring(plan) return &fnpb.InstructionResponse{ InstructionId: string(instID), diff --git a/sdks/go/pkg/beam/core/runtime/harness/monitoring.go b/sdks/go/pkg/beam/core/runtime/harness/monitoring.go index b2bc35d..14162dd 100644 --- a/sdks/go/pkg/beam/core/runtime/harness/monitoring.go +++ b/sdks/go/pkg/beam/core/runtime/harness/monitoring.go @@ -25,7 +25,11 @@ import ( "github.com/golang/protobuf/ptypes" ) -func monitoring(p *exec.Plan, store *metrics.Store) (*fnpb.Metrics, []*ppb.MonitoringInfo) { +func monitoring(p *exec.Plan) (*fnpb.Metrics, []*ppb.MonitoringInfo) { + store := p.Store() + if store == nil { + return nil, nil + } // Get the legacy style metrics. transforms := make(map[string]*fnpb.Metrics_PTransform) metrics.Extractor{ @@ -110,44 +114,27 @@ func monitoring(p *exec.Plan, store *metrics.Store) (*fnpb.Metrics, []*ppb.Monit }.ExtractFrom(store) // Get the execution monitoring information from the bundle plan. - snapshot, hasSource := p.Progress() - if hasSource { + if snapshot, ok := p.Progress(); ok { // Legacy version. - transforms[snapshot.Source.ID] = &fnpb.Metrics_PTransform{ + transforms[snapshot.ID] = &fnpb.Metrics_PTransform{ ProcessedElements: &fnpb.Metrics_PTransform_ProcessedElements{ Measured: &fnpb.Metrics_PTransform_Measured{ OutputElementCounts: map[string]int64{ - snapshot.Source.Name: snapshot.Source.Count, + snapshot.Name: snapshot.Count, }, }, }, } - } - - // Monitoring info version. - for _, pcol := range snapshot.PCols { + // Monitoring info version. monitoringInfo = append(monitoringInfo, &ppb.MonitoringInfo{ Urn: "beam:metric:element_count:v1", Type: "beam:metrics:sum_int_64", Labels: map[string]string{ - "PCOLLECTION": pcol.ID, + "PCOLLECTION": snapshot.PID, }, - Data: int64Counter(pcol.ElementCount), + Data: int64Counter(snapshot.Count), }) - - // Skip pcollections without size - if pcol.SizeCount != 0 { - monitoringInfo = append(monitoringInfo, - &ppb.MonitoringInfo{ - Urn: "beam:metric:sampled_byte_size:v1", - Type: "beam:metrics:distribution_int_64", - Labels: map[string]string{ - "PCOLLECTION": pcol.ID, - }, - Data: int64Distribution(pcol.SizeCount, pcol.SizeSum, pcol.SizeMin, pcol.SizeMax), - }) - } } return &fnpb.Metrics{ diff --git a/sdks/go/pkg/beam/runners/direct/direct.go b/sdks/go/pkg/beam/runners/direct/direct.go index 4d78dc8..eface25 100644 --- a/sdks/go/pkg/beam/runners/direct/direct.go +++ b/sdks/go/pkg/beam/runners/direct/direct.go @@ -46,7 +46,6 @@ func Execute(ctx context.Context, p *beam.Pipeline) error { log.Info(ctx, "Pipeline:") log.Info(ctx, p) - ctx = metrics.SetBundleID(ctx, "direct") // Ensure a metrics.Store exists. if *jobopts.Strict { log.Info(ctx, "Strict mode enabled, applying additional validation.") @@ -75,7 +74,7 @@ func Execute(ctx context.Context, p *beam.Pipeline) error { } // TODO(lostluck) 2020/01/24: What's the right way to expose the // metrics store for the direct runner? - metrics.DumpToLog(ctx) + metrics.DumpToLogFromStore(ctx, plan.Store()) return nil }
