[
https://issues.apache.org/jira/browse/BEAM-4276?focusedWorklogId=108768&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-108768
]
ASF GitHub Bot logged work on BEAM-4276:
----------------------------------------
Author: ASF GitHub Bot
Created on: 04/Jun/18 21:26
Start Date: 04/Jun/18 21:26
Worklog Time Spent: 10m
Work Description: lostluck commented on a change in pull request #5507:
[BEAM-4276] Add combiner lifting support to Go SDK
URL: https://github.com/apache/beam/pull/5507#discussion_r192885675
##########
File path: sdks/go/pkg/beam/core/runtime/exec/combine.go
##########
@@ -217,3 +224,162 @@ func (n *Combine) fail(err error) error {
func (n *Combine) String() string {
return fmt.Sprintf("Combine[%v] Keyed:%v Out:%v",
path.Base(n.Fn.Name()), n.UsesKey, n.Out.ID())
}
+
+// The nodes below break apart the Combine into components to support
+// Combiner Lifting optimizations.
+
+// LiftedCombine is an executor for combining values before grouping by keys
+// for a lifted combine. Partially groups values by key within a bundle,
+// accumulating them in an in memory cache, before emitting them in the
+// FinishBundle step.
+type LiftedCombine struct {
+ *Combine
+
+ cache map[interface{}]FullValue
+}
+
+func (n *LiftedCombine) String() string {
+ return fmt.Sprintf("LiftedCombine[%v] Keyed:%v Out:%v",
path.Base(n.Fn.Name()), n.UsesKey, n.Out.ID())
+}
+
+// StartBundle initializes the in memory cache of keys to accumulators.
+func (n *LiftedCombine) StartBundle(ctx context.Context, id string, data
DataManager) error {
+ if err := n.Combine.StartBundle(ctx, id, data); err != nil {
+ return err
+ }
+ n.cache = make(map[interface{}]FullValue)
+ return nil
+}
+
+// ProcessElement takes a KV pair and combines values with the same into an
accumulator,
+// caching them until the bundle is complete.
+func (n *LiftedCombine) ProcessElement(ctx context.Context, value FullValue,
values ...ReStream) error {
+ if n.status != Active {
+ return fmt.Errorf("invalid status for precombine %v: %v",
n.UID, n.status)
+ }
+
+ // Value is a KV so Elm & Elm2 are populated.
+ // Check the cache for an already present accumulator
+
+ afv, notfirst := n.cache[value.Elm]
+ var a interface{}
+ if notfirst {
+ a = afv.Elm2
+ } else {
+ b, err := n.newAccum(ctx, value.Elm)
+ if err != nil {
+ return n.fail(err)
+ }
+ a = b
+ }
+
+ a, err := n.addInput(ctx, a, value.Elm, value.Elm2, value.Timestamp,
!notfirst)
+ if err != nil {
+ return n.fail(err)
+ }
+
+ // Cache the accumulator with the key
+ n.cache[value.Elm] = FullValue{Windows: value.Windows, Elm: value.Elm,
Elm2: a, Timestamp: value.Timestamp}
+
+ return nil
+}
+
+// FinishBundle iterates through the cached key, accumulator pairs, and then
Review comment:
Done.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 108768)
Time Spent: 5h (was: 4h 50m)
> Implement the portable lifted Combiner transforms in Go SDK
> -----------------------------------------------------------
>
> Key: BEAM-4276
> URL: https://issues.apache.org/jira/browse/BEAM-4276
> Project: Beam
> Issue Type: Sub-task
> Components: sdk-go
> Reporter: Henning Rohde
> Assignee: Robert Burke
> Priority: Major
> Time Spent: 5h
> Remaining Estimate: 0h
>
> Specifically add the necessary code to produce a Combine Composite with the
> correct URN, and permit the SDK harness to understand the lifted parts when
> receiving a bundle plan from the worker.
> Not expected as part of this issue is:
> Additional performance tweaks to the in memory cache (SeeĀ
> [BEAM-4468|https://issues.apache.org/jira/browse/BEAM-4468])
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)