[
https://issues.apache.org/jira/browse/BEAM-4727?focusedWorklogId=130588&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-130588
]
ASF GitHub Bot logged work on BEAM-4727:
----------------------------------------
Author: ASF GitHub Bot
Created on: 02/Aug/18 18:57
Start Date: 02/Aug/18 18:57
Worklog Time Spent: 10m
Work Description: herohde closed pull request #5884: [BEAM-4727] Re-use
metric context throughout bundle
URL: https://github.com/apache/beam/pull/5884
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git a/sdks/go/pkg/beam/core/metrics/metrics.go
b/sdks/go/pkg/beam/core/metrics/metrics.go
index 1899cc89971..d00f1f04afd 100644
--- a/sdks/go/pkg/beam/core/metrics/metrics.go
+++ b/sdks/go/pkg/beam/core/metrics/metrics.go
@@ -69,14 +69,53 @@ type ctxKey string
const bundleKey ctxKey = "beam:bundle"
const ptransformKey ctxKey = "beam:ptransform"
+// beamCtx is a caching context for IDs necessary to place metric updates.
+// Allocating contexts and searching for PTransformIDs for every element
+// is expensive, so we avoid it if possible.
+type beamCtx struct {
+ context.Context
+ bundleID, ptransformID string
+}
+
+// Value lifts the beam contLift the keys value for faster lookups when not
available.
+func (ctx *beamCtx) Value(key interface{}) interface{} {
+ switch key {
+ case bundleKey:
+ if ctx.bundleID == "" {
+ if id := ctx.Value(key); id != nil {
+ ctx.bundleID = id.(string)
+ }
+ }
+ return ctx.bundleID
+ case ptransformKey:
+ if ctx.ptransformID == "" {
+ if id := ctx.Value(key); id != nil {
+ ctx.ptransformID = id.(string)
+ }
+ }
+ return ctx.ptransformID
+ }
+ return ctx.Context.Value(key)
+}
+
// SetBundleID sets the id of the current Bundle.
func SetBundleID(ctx context.Context, id string) context.Context {
- return context.WithValue(ctx, bundleKey, id)
+ // Checking for *beamCtx is an optimization, so we don't dig deeply
+ // for ids if not necessary.
+ if bctx, ok := ctx.(*beamCtx); ok {
+ return &beamCtx{Context: bctx.Context, bundleID: id,
ptransformID: bctx.ptransformID}
+ }
+ return &beamCtx{Context: ctx, bundleID: id}
}
// SetPTransformID sets the id of the current PTransform.
func SetPTransformID(ctx context.Context, id string) context.Context {
- return context.WithValue(ctx, ptransformKey, id)
+ // Checking for *beamCtx is an optimization, so we don't dig deeply
+ // for ids if not necessary.
+ if bctx, ok := ctx.(*beamCtx); ok {
+ return &beamCtx{Context: bctx.Context, bundleID: bctx.bundleID,
ptransformID: id}
+ }
+ return &beamCtx{Context: ctx, ptransformID: id}
}
func getContextKey(ctx context.Context, n name) key {
diff --git a/sdks/go/pkg/beam/core/runtime/exec/pardo.go
b/sdks/go/pkg/beam/core/runtime/exec/pardo.go
index 41da9f241fa..f0e62cc3b81 100644
--- a/sdks/go/pkg/beam/core/runtime/exec/pardo.go
+++ b/sdks/go/pkg/beam/core/runtime/exec/pardo.go
@@ -45,8 +45,8 @@ type ParDo struct {
status Status
err errorx.GuardedError
-
- inv *invoker
+ ctx context.Context
+ inv *invoker
}
func (n *ParDo) ID() UnitID {
@@ -71,8 +71,12 @@ func (n *ParDo) StartBundle(ctx context.Context, id string,
data DataManager) er
return fmt.Errorf("invalid status for pardo %v: %v, want Up",
n.UID, n.status)
}
n.status = Active
+ // Allocating contexts all the time is expensive, but we seldom
re-write them,
+ // and never accept modified contexts from users, so we will cache them
per-bundle
+ // per-unit, to avoid the constant allocation overhead.
+ n.ctx = metrics.SetPTransformID(ctx, n.PID)
- if err := MultiStartBundle(ctx, id, data, n.Out...); err != nil {
+ if err := MultiStartBundle(n.ctx, id, data, n.Out...); err != nil {
return n.fail(err)
}
@@ -87,7 +91,7 @@ func (n *ParDo) StartBundle(ctx context.Context, id string,
data DataManager) er
// TODO(BEAM-3303): what to set for StartBundle/FinishBundle emitter
timestamp?
- if _, err := n.invokeDataFn(ctx, window.SingleGlobalWindow,
mtime.ZeroTimestamp, n.Fn.StartBundleFn(), nil); err != nil {
+ if _, err := n.invokeDataFn(n.ctx, window.SingleGlobalWindow,
mtime.ZeroTimestamp, n.Fn.StartBundleFn(), nil); err != nil {
return n.fail(err)
}
return nil
@@ -97,34 +101,31 @@ func (n *ParDo) ProcessElement(ctx context.Context, elm
FullValue, values ...ReS
if n.status != Active {
return fmt.Errorf("invalid status for pardo %v: %v, want
Active", n.UID, n.status)
}
-
- ctx = metrics.SetPTransformID(ctx, n.PID)
-
// If the function observes windows, we must invoke it for each window.
The expected fast path
// is that either there is a single window or the function doesn't
observes windows.
if !mustExplodeWindows(n.inv.fn, elm) {
- val, err := n.invokeProcessFn(ctx, elm.Windows, elm.Timestamp,
&MainInput{Key: elm, Values: values})
+ val, err := n.invokeProcessFn(n.ctx, elm.Windows,
elm.Timestamp, &MainInput{Key: elm, Values: values})
if err != nil {
return n.fail(err)
}
// Forward direct output, if any. It is always a main output.
if val != nil {
- return n.Out[0].ProcessElement(ctx, *val)
+ return n.Out[0].ProcessElement(n.ctx, *val)
}
} else {
for _, w := range elm.Windows {
wElm := FullValue{Elm: elm.Elm, Elm2: elm.Elm2,
Timestamp: elm.Timestamp, Windows: []typex.Window{w}}
- val, err := n.invokeProcessFn(ctx, wElm.Windows,
wElm.Timestamp, &MainInput{Key: wElm, Values: values})
+ val, err := n.invokeProcessFn(n.ctx, wElm.Windows,
wElm.Timestamp, &MainInput{Key: wElm, Values: values})
if err != nil {
return n.fail(err)
}
// Forward direct output, if any. It is always a main
output.
if val != nil {
- return n.Out[0].ProcessElement(ctx, *val)
+ return n.Out[0].ProcessElement(n.ctx, *val)
}
}
}
@@ -148,10 +149,10 @@ func (n *ParDo) FinishBundle(ctx context.Context) error {
n.status = Up
n.inv.Reset()
- if _, err := n.invokeDataFn(ctx, window.SingleGlobalWindow,
mtime.ZeroTimestamp, n.Fn.FinishBundleFn(), nil); err != nil {
+ if _, err := n.invokeDataFn(n.ctx, window.SingleGlobalWindow,
mtime.ZeroTimestamp, n.Fn.FinishBundleFn(), nil); err != nil {
return n.fail(err)
}
- if err := MultiFinishBundle(ctx, n.Out...); err != nil {
+ if err := MultiFinishBundle(n.ctx, n.Out...); err != nil {
return n.fail(err)
}
return nil
diff --git a/sdks/go/pkg/beam/core/runtime/exec/pardo_test.go
b/sdks/go/pkg/beam/core/runtime/exec/pardo_test.go
index 0181279969c..0df6961659b 100644
--- a/sdks/go/pkg/beam/core/runtime/exec/pardo_test.go
+++ b/sdks/go/pkg/beam/core/runtime/exec/pardo_test.go
@@ -104,7 +104,7 @@ func emitSumFn(n int, emit func(int)) {
// BenchmarkParDo_EmitSumFn measures the overhead of invoking a ParDo in a
plan.
//
// On @lostluck's desktop:
-// BenchmarkParDo_EmitSumFn-12 1000000 1202 ns/op
529 B/op 4 allocs/op
+// BenchmarkParDo_EmitSumFn-12 1000000 1070 ns/op
481 B/op 3 allocs/op
func BenchmarkParDo_EmitSumFn(b *testing.B) {
fn, err := graph.NewDoFn(emitSumFn)
if err != nil {
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 130588)
Time Spent: 4h (was: 3h 50m)
> Reduce metrics overhead
> -----------------------
>
> Key: BEAM-4727
> URL: https://issues.apache.org/jira/browse/BEAM-4727
> Project: Beam
> Issue Type: Sub-task
> Components: sdk-go
> Reporter: Robert Burke
> Assignee: Robert Burke
> Priority: Major
> Time Spent: 4h
> Remaining Estimate: 0h
>
> There are a few opportunities to avoid metrics overhead.
> First when setting state in the context, we allocate a new one for the stored
> value, per element. This generates a fair amount of objects for the garbage
> collector too. If we retain and re-use contexts within a bundle, we would
> have the opportunity to save on these costs.
> Also, it's possible that we have overhead on the metric updating paths. We
> can possibly do better than the general sync.Map, and avoid the type
> assertion cost for extracting values of known types from the maps.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)