[ 
https://issues.apache.org/jira/browse/BEAM-4276?focusedWorklogId=109196&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-109196
 ]

ASF GitHub Bot logged work on BEAM-4276:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 05/Jun/18 20:46
            Start Date: 05/Jun/18 20:46
    Worklog Time Spent: 10m 
      Work Description: lukecwik closed pull request #5507: [BEAM-4276] Add 
combiner lifting support to Go SDK
URL: https://github.com/apache/beam/pull/5507
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/sdks/go/pkg/beam/combine.go b/sdks/go/pkg/beam/combine.go
index b841d916450..d4508c7379b 100644
--- a/sdks/go/pkg/beam/combine.go
+++ b/sdks/go/pkg/beam/combine.go
@@ -19,6 +19,7 @@ import (
        "fmt"
 
        "github.com/apache/beam/sdks/go/pkg/beam/core/graph"
+       "github.com/apache/beam/sdks/go/pkg/beam/core/typex"
 )
 
 // Combine inserts a global Combine transform into the pipeline. It
@@ -59,8 +60,16 @@ func TryCombinePerKey(s Scope, combinefn interface{}, col 
PCollection) (PCollect
        if err != nil {
                return PCollection{}, fmt.Errorf("invalid CombineFn: %v", err)
        }
+       // This seems like the best place to infer the accumulator coder type, 
unless
+       // it's a universal type.
+       // We can get the fulltype from the return value of the 
mergeAccumulatorFn
+       // TODO(lostluck): 2018/05/28 Correctly infer universal type coder if 
necessary.
+       accumCoder, err := 
inferCoder(typex.New(fn.MergeAccumulatorsFn().Ret[0].T))
+       if err != nil {
+               return PCollection{}, fmt.Errorf("unable to infer CombineFn 
accumulator coder: %v", err)
+       }
 
-       edge, err := graph.NewCombine(s.real, s.scope, fn, col.n)
+       edge, err := graph.NewCombine(s.real, s.scope, fn, col.n, accumCoder)
        if err != nil {
                return PCollection{}, err
        }
diff --git a/sdks/go/pkg/beam/core/graph/edge.go 
b/sdks/go/pkg/beam/core/graph/edge.go
index 4605470bab0..74eb94c26fa 100644
--- a/sdks/go/pkg/beam/core/graph/edge.go
+++ b/sdks/go/pkg/beam/core/graph/edge.go
@@ -136,21 +136,19 @@ type Payload struct {
        Data []byte
 }
 
-// TODO(herohde) 5/24/2017: how should we represent/obtain the coder for 
Combine
-// accumulator types? Coder registry? Assume JSON?
-
 // MultiEdge represents a primitive data processing operation. Each non-user
 // code operation may be implemented by either the harness or the runner.
 type MultiEdge struct {
        id     int
        parent *Scope
 
-       Op        Opcode
-       DoFn      *DoFn      // ParDo
-       CombineFn *CombineFn // Combine
-       Value     []byte     // Impulse
-       Payload   *Payload   // External
-       WindowFn  *window.Fn // WindowInto
+       Op         Opcode
+       DoFn       *DoFn        // ParDo
+       CombineFn  *CombineFn   // Combine
+       AccumCoder *coder.Coder // Combine
+       Value      []byte       // Impulse
+       Payload    *Payload     // External
+       WindowFn   *window.Fn   // WindowInto
 
        Input  []*Inbound
        Output []*Outbound
@@ -317,9 +315,17 @@ func newDoFnNode(op Opcode, g *Graph, s *Scope, u *DoFn, 
in []*Node, typedefs ma
        return edge, nil
 }
 
+// CombinePerKeyScope is the Go SDK canonical name for the combine composite
+// scope. With Beam Portability, "primitive" composite transforms like
+// combine have their URNs & payloads attached to a high level scope, with a
+// default representation beneath. The use of this const permits the
+// translation layer to confirm the SDK expects this combine to be liftable
+// by a runner and should set this scope's URN and Payload accordingly.
+const CombinePerKeyScope = "CombinePerKey"
+
 // NewCombine inserts a new Combine edge into the graph. Combines cannot have 
side
 // input.
-func NewCombine(g *Graph, s *Scope, u *CombineFn, in *Node) (*MultiEdge, 
error) {
+func NewCombine(g *Graph, s *Scope, u *CombineFn, in *Node, ac *coder.Coder) 
(*MultiEdge, error) {
        inT := in.Type()
        if !typex.IsCoGBK(inT) {
                return nil, fmt.Errorf("combine requires CoGBK type: %v", inT)
@@ -380,6 +386,7 @@ func NewCombine(g *Graph, s *Scope, u *CombineFn, in *Node) 
(*MultiEdge, error)
        edge := g.NewEdge(s)
        edge.Op = Combine
        edge.CombineFn = u
+       edge.AccumCoder = ac
        edge.Input = []*Inbound{{Kind: kinds[0], From: in, Type: inbound[0]}}
        for i := 0; i < len(out); i++ {
                n := g.NewNode(out[i], in.WindowingStrategy(), in.Bounded())
diff --git a/sdks/go/pkg/beam/core/runtime/exec/combine.go 
b/sdks/go/pkg/beam/core/runtime/exec/combine.go
index fd2c7d609e2..9e9698f3dd3 100644
--- a/sdks/go/pkg/beam/core/runtime/exec/combine.go
+++ b/sdks/go/pkg/beam/core/runtime/exec/combine.go
@@ -42,10 +42,12 @@ type Combine struct {
        err    errorx.GuardedError
 }
 
+// ID returns the UnitID for this node.
 func (n *Combine) ID() UnitID {
        return n.UID
 }
 
+// Up initializes this CombineFn and runs its SetupFn() method.
 func (n *Combine) Up(ctx context.Context) error {
        if n.status != Initializing {
                return fmt.Errorf("invalid status for combine %v: %v", n.UID, 
n.status)
@@ -62,6 +64,7 @@ func (n *Combine) Up(ctx context.Context) error {
        return nil
 }
 
+// StartBundle initializes processing this bundle for combines.
 func (n *Combine) StartBundle(ctx context.Context, id string, data 
DataManager) error {
        if n.status != Up {
                return fmt.Errorf("invalid status for combine %v: %v", n.UID, 
n.status)
@@ -74,6 +77,8 @@ func (n *Combine) StartBundle(ctx context.Context, id string, 
data DataManager)
        return nil
 }
 
+// ProcessElement combines elements grouped by key using the CombineFn's
+// AddInput, MergeAccumulators, and ExtractOutput functions.
 func (n *Combine) ProcessElement(ctx context.Context, value FullValue, values 
...ReStream) error {
        if n.status != Active {
                return fmt.Errorf("invalid status for combine %v: %v", n.UID, 
n.status)
@@ -113,6 +118,7 @@ func (n *Combine) ProcessElement(ctx context.Context, value 
FullValue, values ..
        return n.Out.ProcessElement(ctx, FullValue{Windows: value.Windows, Elm: 
value.Elm, Elm2: out, Timestamp: value.Timestamp})
 }
 
+// FinishBundle completes this node's processing of a bundle.
 func (n *Combine) FinishBundle(ctx context.Context) error {
        if n.status != Active {
                return fmt.Errorf("invalid status for combine %v: %v", n.UID, 
n.status)
@@ -125,6 +131,7 @@ func (n *Combine) FinishBundle(ctx context.Context) error {
        return nil
 }
 
+// Down runs the ParDo's TeardownFn.
 func (n *Combine) Down(ctx context.Context) error {
        if n.status == Down {
                return n.err.Error()
@@ -217,3 +224,162 @@ func (n *Combine) fail(err error) error {
 func (n *Combine) String() string {
        return fmt.Sprintf("Combine[%v] Keyed:%v Out:%v", 
path.Base(n.Fn.Name()), n.UsesKey, n.Out.ID())
 }
+
+// The nodes below break apart the Combine into components to support
+// Combiner Lifting optimizations.
+
+// LiftedCombine is an executor for combining values before grouping by keys
+// for a lifted combine. Partially groups values by key within a bundle,
+// accumulating them in an in memory cache, before emitting them in the
+// FinishBundle step.
+type LiftedCombine struct {
+       *Combine
+
+       cache map[interface{}]FullValue
+}
+
+func (n *LiftedCombine) String() string {
+       return fmt.Sprintf("LiftedCombine[%v] Keyed:%v Out:%v", 
path.Base(n.Fn.Name()), n.UsesKey, n.Out.ID())
+}
+
+// StartBundle initializes the in memory cache of keys to accumulators.
+func (n *LiftedCombine) StartBundle(ctx context.Context, id string, data 
DataManager) error {
+       if err := n.Combine.StartBundle(ctx, id, data); err != nil {
+               return err
+       }
+       n.cache = make(map[interface{}]FullValue)
+       return nil
+}
+
+// ProcessElement takes a KV pair and combines values with the same into an 
accumulator,
+// caching them until the bundle is complete.
+func (n *LiftedCombine) ProcessElement(ctx context.Context, value FullValue, 
values ...ReStream) error {
+       if n.status != Active {
+               return fmt.Errorf("invalid status for precombine %v: %v", 
n.UID, n.status)
+       }
+
+       // Value is a KV so Elm & Elm2 are populated.
+       // Check the cache for an already present accumulator
+
+       afv, notfirst := n.cache[value.Elm]
+       var a interface{}
+       if notfirst {
+               a = afv.Elm2
+       } else {
+               b, err := n.newAccum(ctx, value.Elm)
+               if err != nil {
+                       return n.fail(err)
+               }
+               a = b
+       }
+
+       a, err := n.addInput(ctx, a, value.Elm, value.Elm2, value.Timestamp, 
!notfirst)
+       if err != nil {
+               return n.fail(err)
+       }
+
+       // Cache the accumulator with the key
+       n.cache[value.Elm] = FullValue{Windows: value.Windows, Elm: value.Elm, 
Elm2: a, Timestamp: value.Timestamp}
+
+       return nil
+}
+
+// FinishBundle iterates through the cached (key, accumulator) pairs, and then
+// processes the value in the bundle as normal.
+func (n *LiftedCombine) FinishBundle(ctx context.Context) error {
+       if n.status != Active {
+               return fmt.Errorf("invalid status for precombine %v: %v", 
n.UID, n.status)
+       }
+       n.status = Up
+
+       // Need to run n.Out.ProcessElement for all the cached precombined KVs, 
and
+       // then finally Finish bundle as normal.
+       for _, a := range n.cache {
+               n.Out.ProcessElement(ctx, a)
+       }
+
+       if err := n.Out.FinishBundle(ctx); err != nil {
+               return n.fail(err)
+       }
+       return nil
+}
+
+// Down tears down the cache.
+func (n *LiftedCombine) Down(ctx context.Context) error {
+       if err := n.Combine.Down(ctx); err != nil {
+               return err
+       }
+       n.cache = nil
+       return nil
+}
+
+// MergeAccumulators is an executor for merging accumulators from a lifted 
combine.
+type MergeAccumulators struct {
+       *Combine
+}
+
+func (n *MergeAccumulators) String() string {
+       return fmt.Sprintf("MergeAccumulators[%v] Keyed:%v Out:%v", 
path.Base(n.Fn.Name()), n.UsesKey, n.Out.ID())
+}
+
+// ProcessElement accepts a stream of accumulator values with the same key and
+// runs the MergeAccumulatorsFn over them repeatedly.
+func (n *MergeAccumulators) ProcessElement(ctx context.Context, value 
FullValue, values ...ReStream) error {
+       if n.status != Active {
+               return fmt.Errorf("invalid status for combine merge %v: %v", 
n.UID, n.status)
+       }
+       a, err := n.newAccum(ctx, value.Elm)
+       if err != nil {
+               return n.fail(err)
+       }
+       first := true
+
+       stream := values[0].Open()
+       defer stream.Close()
+       for {
+               v, err := stream.Read()
+               if err != nil {
+                       if err == io.EOF {
+                               break
+                       }
+                       return n.fail(err)
+               }
+               if first {
+                       a = v.Elm
+                       first = false
+                       continue
+               }
+               a = n.mergeFn.Call2x1(a, v.Elm)
+       }
+       return n.Out.ProcessElement(ctx, FullValue{Windows: value.Windows, Elm: 
value.Elm, Elm2: a, Timestamp: value.Timestamp})
+}
+
+// Up eagerly gets the optimized binary merge function.
+func (n *MergeAccumulators) Up(ctx context.Context) error {
+       if err := n.Combine.Up(ctx); err != nil {
+               return err
+       }
+       n.mergeFn = reflectx.ToFunc2x1(n.Fn.MergeAccumulatorsFn().Fn)
+       return nil
+}
+
+// ExtractOutput is an executor for extracting output from a lifted combine.
+type ExtractOutput struct {
+       *Combine
+}
+
+func (n *ExtractOutput) String() string {
+       return fmt.Sprintf("ExtractOutput[%v] Keyed:%v Out:%v", 
path.Base(n.Fn.Name()), n.UsesKey, n.Out.ID())
+}
+
+// ProcessElement accepts an accumulator value, and extracts the final return 
type from it.
+func (n *ExtractOutput) ProcessElement(ctx context.Context, value FullValue, 
values ...ReStream) error {
+       if n.status != Active {
+               return fmt.Errorf("invalid status for combine extract %v: %v", 
n.UID, n.status)
+       }
+       out, err := n.extract(ctx, value.Elm2)
+       if err != nil {
+               return n.fail(err)
+       }
+       return n.Out.ProcessElement(ctx, FullValue{Windows: value.Windows, Elm: 
value.Elm, Elm2: out, Timestamp: value.Timestamp})
+}
diff --git a/sdks/go/pkg/beam/core/runtime/exec/combine_test.go 
b/sdks/go/pkg/beam/core/runtime/exec/combine_test.go
index f808c5f1712..43bb4f2dc92 100644
--- a/sdks/go/pkg/beam/core/runtime/exec/combine_test.go
+++ b/sdks/go/pkg/beam/core/runtime/exec/combine_test.go
@@ -17,39 +17,104 @@ package exec
 
 import (
        "context"
+       "fmt"
+       "reflect"
+       "strconv"
        "testing"
 
        "github.com/apache/beam/sdks/go/pkg/beam/core/graph"
+       "github.com/apache/beam/sdks/go/pkg/beam/core/graph/coder"
        "github.com/apache/beam/sdks/go/pkg/beam/core/graph/window"
+       "github.com/apache/beam/sdks/go/pkg/beam/core/runtime/coderx"
        "github.com/apache/beam/sdks/go/pkg/beam/core/typex"
        "github.com/apache/beam/sdks/go/pkg/beam/core/util/reflectx"
 )
 
-func mergeFn(a, b int) int {
-       return a + b
+var intInput = []interface{}{int(1), int(2), int(3), int(4), int(5), int(6)}
+var strInput = []interface{}{"1", "2", "3", "4", "5", "6"}
+
+var tests = []struct {
+       Fn         interface{}
+       AccumCoder *coder.Coder
+       Input      []interface{}
+       Expected   interface{}
+}{
+       {Fn: mergeFn, AccumCoder: intCoder(reflectx.Int), Input: intInput, 
Expected: int(21)},
+       {Fn: &MyCombine{}, AccumCoder: intCoder(reflectx.Int64), Input: 
intInput, Expected: int(21)},
+       {Fn: &MyOtherCombine{}, AccumCoder: intCoder(reflectx.Int64), Input: 
intInput, Expected: "21"},
+       {Fn: &MyThirdCombine{}, AccumCoder: intCoder(reflectx.Int), Input: 
strInput, Expected: int(21)},
 }
 
 // TestCombine verifies that the Combine node works correctly.
 func TestCombine(t *testing.T) {
-       fn, err := graph.NewCombineFn(mergeFn)
+       for _, test := range tests {
+               t.Run(reflect.TypeOf(test.Fn).Name(), func(t *testing.T) {
+                       edge := getCombineEdge(t, test.Fn, test.AccumCoder)
+
+                       out := &CaptureNode{UID: 1}
+                       combine := &Combine{UID: 2, Fn: edge.CombineFn, Out: 
out}
+                       n := &FixedRoot{UID: 3, Elements: makeKeyedInput(42, 
test.Input...), Out: combine}
+
+                       constructAndExecutePlan(t, []Unit{n, combine, out})
+
+                       expected := makeKV(42, test.Expected)
+                       if !equalList(out.Elements, expected) {
+                               t.Errorf("combine(%s) = %v, want %v", 
edge.CombineFn.Name(), extractKeyedValues(out.Elements...), 
extractKeyedValues(expected...))
+                       }
+               })
+       }
+}
+
+// TestLiftedCombine verifies that the LiftedCombine, MergeAccumulators, and
+// ExtractOutput nodes work correctly after the lift has been performed.
+func TestLiftedCombine(t *testing.T) {
+       for _, test := range tests {
+               t.Run(reflect.TypeOf(test.Fn).Name(), func(t *testing.T) {
+                       edge := getCombineEdge(t, test.Fn, test.AccumCoder)
+
+                       out := &CaptureNode{UID: 1}
+                       extract := &ExtractOutput{Combine: &Combine{UID: 2, Fn: 
edge.CombineFn, Out: out}}
+                       merge := &MergeAccumulators{Combine: &Combine{UID: 3, 
Fn: edge.CombineFn, Out: extract}}
+                       gbk := &simpleGBK{UID: 4, Out: merge}
+                       precombine := &LiftedCombine{Combine: &Combine{UID: 5, 
Fn: edge.CombineFn, Out: gbk}}
+                       n := &FixedRoot{UID: 6, Elements: makeKVInput(42, 
test.Input...), Out: precombine}
+
+                       constructAndExecutePlan(t, []Unit{n, precombine, gbk, 
merge, extract, out})
+                       expected := makeKV(42, test.Expected)
+                       if !equalList(out.Elements, expected) {
+                               t.Errorf("liftedCombineChain(%s) = %v, want 
%v", edge.CombineFn.Name(), extractKeyedValues(out.Elements...), 
extractKeyedValues(expected...))
+                       }
+               })
+       }
+}
+
+func getCombineEdge(t *testing.T, cfn interface{}, ac *coder.Coder) 
*graph.MultiEdge {
+       t.Helper()
+       fn, err := graph.NewCombineFn(cfn)
        if err != nil {
                t.Fatalf("invalid function: %v", err)
        }
 
        g := graph.New()
-       inT := typex.NewCoGBK(typex.New(reflectx.Int), typex.New(reflectx.Int))
+       var vtype reflect.Type
+       if fn.AddInputFn() != nil {
+               // This makes the assumption that the AddInput function is 
unkeyed.
+               vtype = fn.AddInputFn().Param[1].T
+       } else {
+               vtype = fn.MergeAccumulatorsFn().Param[0].T
+       }
+       inT := typex.NewCoGBK(typex.New(reflectx.Int), typex.New(vtype))
        in := g.NewNode(inT, window.DefaultWindowingStrategy(), true)
 
-       edge, err := graph.NewCombine(g, g.Root(), fn, in)
+       edge, err := graph.NewCombine(g, g.Root(), fn, in, ac)
        if err != nil {
-               t.Fatalf("invalid pardo: %v", err)
+               t.Fatalf("invalid combinefn: %v", err)
        }
+       return edge
+}
 
-       out := &CaptureNode{UID: 1}
-       combine := &Combine{UID: 2, Fn: edge.CombineFn, Out: out}
-       n := &FixedRoot{UID: 3, Elements: makeKeyedInput(42, 1, 2, 3, 4, 5, 6), 
Out: combine}
-
-       p, err := NewPlan("a", []Unit{n, combine, out})
+func constructAndExecutePlan(t *testing.T, us []Unit) {
+       p, err := NewPlan("a", us)
        if err != nil {
                t.Fatalf("failed to construct plan: %v", err)
        }
@@ -60,9 +125,161 @@ func TestCombine(t *testing.T) {
        if err := p.Down(context.Background()); err != nil {
                t.Fatalf("down failed: %v", err)
        }
+}
+
+// The following functions and struct represent the combine contract 
implemented
+// in various ways.
+//
+// Instead of a ProcessElement method, a liftable combine can be written by
+// breaking down an operation into four component methods:
+//
+//   CreateAccumulator() AccumT
+//   AddInput(a AccumT, v InputT) AccumT
+//   MergeAccumulators(a, b AccumT) AccumT
+//   ExtractOutput(v AccumT) OutputT
+//
+// In addition, depending there can be three distinct types, depending on where
+// they are used in the combine, the input type, InputT, the output type, 
OutputT,
+// and the accumulator type AccumT. Depending on the equality of the types, one
+// or more of the methods can be unspecified.
+//
+// The only required method is MergeAccumulators.
+//
+// When InputT == OutputT == AccumT, the only required method is 
MergeAccumulators.
+//
+// When InputT == AccumT , then AddInput can be omitted, and MergeAccumulators 
used instead.
+// When AccumT == Output , then ExtractOutput can be omitted, and the identity 
used instead.
+//
+// These two combined mean that when InputT == OutputT == AccumT, only 
MergeAccumulators
+// needs to be specified.
+//
+// CreateAccumulator() is only required if the AccumT cannot be used in it's 
zero state,
+// and needs initialization.
+
+// mergeFn represents a combine that is just a binary merge, where
+//
+//  InputT == OutputT == AccumT == int
+func mergeFn(a, b int) int {
+       return a + b
+}
+
+// MyCombine represents a combine with the same Input and Output type (int), 
but a
+// distinct accumulator type (int64).
+//
+//  InputT == OutputT == int
+//  AccumT == int64
+type MyCombine struct{}
+
+func (*MyCombine) AddInput(a int64, v int) int64 {
+       return a + int64(v)
+}
 
-       expected := makeKV(42, 21)
-       if !equalList(out.Elements, expected) {
-               t.Errorf("pardo(sumFn) = %v, want %v", 
extractKeyedValues(out.Elements...), extractKeyedValues(expected...))
+func (*MyCombine) MergeAccumulators(a, b int64) int64 {
+       return a + b
+}
+
+func (*MyCombine) ExtractOutput(a int64) int {
+       return int(a)
+}
+
+// MyOtherCombine is the same as MyCombine, but has strings extracted as 
output.
+//
+//  InputT == int
+//  AccumT == int64
+//  OutputT == string
+type MyOtherCombine struct {
+       MyCombine // Embedding to re-use the exisitng AddInput and 
MergeAccumulators implementations
+}
+
+func (*MyOtherCombine) ExtractOutput(a int64) string {
+       return fmt.Sprintf("%d", a)
+}
+
+// MyThirdCombine has parses strings as Input, and doesn't specify an 
ExtractOutput
+//
+//  InputT == string
+//  AccumT == int
+//  OutputT == int
+type MyThirdCombine struct{}
+
+func (c *MyThirdCombine) AddInput(a int, s string) (int, error) {
+       v, err := strconv.ParseInt(s, 0, 0)
+       if err != nil {
+               return 0, err
+       }
+       return c.MergeAccumulators(a, int(v)), nil
+}
+
+func (*MyThirdCombine) MergeAccumulators(a, b int) int {
+       return a + b
+}
+
+func intCoder(t reflect.Type) *coder.Coder {
+       c, err := coderx.NewVarIntZ(t)
+       if err != nil {
+               panic(fmt.Sprintf("Couldn't get VarInt coder for %v: %v", t, 
err))
        }
+       return &coder.Coder{Kind: coder.Custom, T: typex.New(t), Custom: c}
+}
+
+// simpleGBK buffers all input and continues on FinishBundle. Use with small 
single-bundle data only.
+type simpleGBK struct {
+       UID  UnitID
+       Edge *graph.MultiEdge
+       Out  Node
+
+       m map[interface{}]*group
+}
+
+type group struct {
+       key    FullValue
+       values []FullValue
+}
+
+func (n *simpleGBK) ID() UnitID {
+       return n.UID
+}
+
+func (n *simpleGBK) Up(ctx context.Context) error {
+       n.m = make(map[interface{}]*group)
+       return nil
+}
+
+func (n *simpleGBK) StartBundle(ctx context.Context, id string, data 
DataManager) error {
+       return n.Out.StartBundle(ctx, id, data)
+}
+
+func (n *simpleGBK) ProcessElement(ctx context.Context, elm FullValue, _ 
...ReStream) error {
+       key := elm.Elm.(int)
+       value := elm.Elm2
+
+       g, ok := n.m[key]
+       if !ok {
+               g = &group{
+                       key:    FullValue{Elm: key, Timestamp: elm.Timestamp, 
Windows: elm.Windows},
+                       values: make([]FullValue, 0),
+               }
+               n.m[key] = g
+       }
+       g.values = append(g.values, FullValue{Elm: value, Timestamp: 
elm.Timestamp})
+
+       return nil
+}
+
+func (n *simpleGBK) FinishBundle(ctx context.Context) error {
+       for _, g := range n.m {
+               values := &FixedReStream{Buf: g.values}
+               if err := n.Out.ProcessElement(ctx, g.key, values); err != nil {
+                       return err
+               }
+       }
+       return n.Out.FinishBundle(ctx)
+}
+
+func (n *simpleGBK) Down(ctx context.Context) error {
+       return nil
+}
+
+func (n *simpleGBK) String() string {
+       return fmt.Sprintf("simpleGBK: %v Out:%v", n.ID(), n.Out.ID())
 }
diff --git a/sdks/go/pkg/beam/core/runtime/exec/fullvalue_test.go 
b/sdks/go/pkg/beam/core/runtime/exec/fullvalue_test.go
index ae458320104..9329aefc918 100644
--- a/sdks/go/pkg/beam/core/runtime/exec/fullvalue_test.go
+++ b/sdks/go/pkg/beam/core/runtime/exec/fullvalue_test.go
@@ -42,6 +42,32 @@ func makeValues(vs ...interface{}) []FullValue {
        return ret
 }
 
+// makeKVValues returns a list of KV<K,V> inputs as a list of main inputs.
+func makeKVInput(key interface{}, vs ...interface{}) []MainInput {
+       var ret []MainInput
+       for _, v := range makeKVValues(key, vs...) {
+               ret = append(ret, MainInput{Key: v})
+       }
+       return ret
+}
+
+// makeKVValues returns a list of KV<K,V> inputs.
+func makeKVValues(key interface{}, vs ...interface{}) []FullValue {
+       var ret []FullValue
+       for _, v := range vs {
+               k := FullValue{
+                       Windows:   window.SingleGlobalWindow,
+                       Timestamp: mtime.ZeroTimestamp,
+                       Elm:       key,
+                       Elm2:      v,
+               }
+               ret = append(ret, k)
+       }
+       return ret
+}
+
+// makeKeyedInput returns a CoGBK<K, V> where the list of values are a stream
+// in a single main input.
 func makeKeyedInput(key interface{}, vs ...interface{}) []MainInput {
        k := FullValue{
                Windows:   window.SingleGlobalWindow,
diff --git a/sdks/go/pkg/beam/core/runtime/exec/translate.go 
b/sdks/go/pkg/beam/core/runtime/exec/translate.go
index 01049b45a8d..67dcf488455 100644
--- a/sdks/go/pkg/beam/core/runtime/exec/translate.go
+++ b/sdks/go/pkg/beam/core/runtime/exec/translate.go
@@ -34,9 +34,13 @@ import (
        "github.com/golang/protobuf/ptypes"
 )
 
+// TODO(lostluck): 2018/05/28 Extract these from the canonical enums in 
beam_runner_api.proto
 const (
-       urnDataSource = "urn:org.apache.beam:source:runner:0.1"
-       urnDataSink   = "urn:org.apache.beam:sink:runner:0.1"
+       urnDataSource           = "urn:org.apache.beam:source:runner:0.1"
+       urnDataSink             = "urn:org.apache.beam:sink:runner:0.1"
+       urnPerKeyCombinePre     = "beam:transform:combine_per_key_precombine:v1"
+       urnPerKeyCombineMerge   = 
"beam:transform:combine_per_key_merge_accumulators:v1"
+       urnPerKeyCombineExtract = 
"beam:transform:combine_per_key_extract_outputs:v1"
 )
 
 // UnmarshalPlan converts a model bundle descriptor into an execution Plan.
@@ -331,22 +335,29 @@ func (b *builder) makeLink(from string, id linkID) (Node, 
error) {
 
        var u Node
        switch urn {
-       case graphx.URNParDo, graphx.URNJavaDoFn:
+       case graphx.URNParDo, graphx.URNJavaDoFn, urnPerKeyCombinePre, 
urnPerKeyCombineMerge, urnPerKeyCombineExtract:
                var data string
-               if urn == graphx.URNParDo {
+               switch urn {
+               case graphx.URNParDo:
                        var pardo pb.ParDoPayload
                        if err := proto.Unmarshal(payload, &pardo); err != nil {
                                return nil, fmt.Errorf("invalid ParDo payload 
for %v: %v", transform, err)
                        }
                        data = string(pardo.GetDoFn().GetSpec().GetPayload())
-               } else {
+               case urnPerKeyCombinePre, urnPerKeyCombineMerge, 
urnPerKeyCombineExtract:
+                       var cmb pb.CombinePayload
+                       if err := proto.Unmarshal(payload, &cmb); err != nil {
+                               return nil, fmt.Errorf("invalid CombinePayload 
payload for %v: %v", transform, err)
+                       }
+                       data = string(cmb.GetCombineFn().GetSpec().GetPayload())
+               default:
                        // TODO(herohde) 12/4/2017: we see DoFns directly with 
Dataflow. Handle that
                        // case here, for now, so that the harness can use this 
logic.
 
                        data = string(payload)
                }
 
-               // TODO(herohde) 1/28/2018: Once we're fully off the old way,
+               // TODO(herohde) 1/28/2018: Once Dataflow's fully off the old 
way,
                // we can simply switch on the ParDo DoFn URN directly.
 
                var tp v1.TransformPayload
@@ -354,7 +365,7 @@ func (b *builder) makeLink(from string, id linkID) (Node, 
error) {
                        return nil, fmt.Errorf("invalid transform payload for 
%v: %v", transform, err)
                }
 
-               switch tp.GetUrn() {
+               switch tpUrn := tp.GetUrn(); tpUrn {
                case graphx.URNDoFn:
                        op, fn, _, in, _, err := 
graphx.DecodeMultiEdge(tp.GetEdge())
                        if err != nil {
@@ -376,17 +387,23 @@ func (b *builder) makeLink(from string, id linkID) (Node, 
error) {
                                }
 
                                panic("NYI: side input")
-
                        case graph.Combine:
-                               n := &Combine{UID: b.idgen.New(), Out: out[0]}
-                               n.Fn, err = graph.AsCombineFn(fn)
+                               cn := &Combine{UID: b.idgen.New(), Out: out[0]}
+                               cn.Fn, err = graph.AsCombineFn(fn)
                                if err != nil {
                                        return nil, err
                                }
-                               n.UsesKey = typex.IsKV(in[0].Type)
-
-                               u = n
-
+                               cn.UsesKey = typex.IsKV(in[0].Type)
+                               switch urn {
+                               case urnPerKeyCombinePre:
+                                       u = &LiftedCombine{Combine: cn}
+                               case urnPerKeyCombineMerge:
+                                       u = &MergeAccumulators{Combine: cn}
+                               case urnPerKeyCombineExtract:
+                                       u = &ExtractOutput{Combine: cn}
+                               default: // For unlifted combines
+                                       u = cn
+                               }
                        default:
                                panic(fmt.Sprintf("Opcode should be one of 
ParDo or Combine, but it is: %v", op))
                        }
diff --git a/sdks/go/pkg/beam/core/runtime/graphx/translate.go 
b/sdks/go/pkg/beam/core/runtime/graphx/translate.go
index 4681a82f79f..14b6b7d4e0f 100644
--- a/sdks/go/pkg/beam/core/runtime/graphx/translate.go
+++ b/sdks/go/pkg/beam/core/runtime/graphx/translate.go
@@ -28,15 +28,15 @@ import (
        "github.com/golang/protobuf/ptypes"
 )
 
+// Model constants for interfacing with a Beam runner.
+// TODO(lostluck): 2018/05/28 Extract these from their enum descriptors in the 
pipeline_v1 proto
 const (
-       // Model constants
-
-       URNImpulse = "beam:transform:impulse:v1"
-       URNParDo   = "urn:beam:transform:pardo:v1"
-       URNFlatten = "beam:transform:flatten:v1"
-       URNGBK     = "beam:transform:group_by_key:v1"
-       URNCombine = "beam:transform:combine:v1"
-       URNWindow  = "beam:transform:window:v1"
+       URNImpulse       = "beam:transform:impulse:v1"
+       URNParDo         = "urn:beam:transform:pardo:v1"
+       URNFlatten       = "beam:transform:flatten:v1"
+       URNGBK           = "beam:transform:group_by_key:v1"
+       URNCombinePerKey = "beam:transform:combine_per_key:v1"
+       URNWindow        = "beam:transform:window:v1"
 
        URNGlobalWindowsWindowFn  = "beam:windowfn:global_windows:v0.1"
        URNFixedWindowsWindowFn   = "beam:windowfn:fixed_windows:v0.1"
@@ -148,10 +148,59 @@ func (m *marshaller) addScopeTree(s *ScopeTree) string {
                Inputs:        diff(in, out),
                Outputs:       diff(out, in),
        }
+
+       m.updateIfCombineComposite(s, transform)
+
        m.transforms[id] = transform
        return id
 }
 
+// updateIfCombineComposite examines the scope tree and sets the PTransform 
Spec
+// to be a CombinePerKey with a CombinePayload if it's a liftable composite.
+// Beam Portability requires that composites contain an implementation for 
runners
+// that don't understand the URN and Payload, which this lightly checks for.
+func (m *marshaller) updateIfCombineComposite(s *ScopeTree, transform 
*pb.PTransform) {
+       if s.Scope.Name != graph.CombinePerKeyScope ||
+               len(s.Edges) != 2 ||
+               len(s.Edges[0].Edge.Input) != 1 ||
+               len(s.Edges[1].Edge.Output) != 1 ||
+               s.Edges[1].Edge.Op != graph.Combine {
+               return
+       }
+
+       edge := s.Edges[1].Edge
+       if !tryAddingCoder(edge.AccumCoder) {
+               return
+       }
+       acID := m.coders.Add(edge.AccumCoder)
+       payload := &pb.CombinePayload{
+               CombineFn: &pb.SdkFunctionSpec{
+                       Spec: &pb.FunctionSpec{
+                               Urn:     URNJavaDoFn,
+                               Payload: 
[]byte(mustEncodeMultiEdgeBase64(edge)),
+                       },
+                       EnvironmentId: m.addDefaultEnv(),
+               },
+               AccumulatorCoderId: acID,
+       }
+       transform.Spec = &pb.FunctionSpec{Urn: URNCombinePerKey, Payload: 
protox.MustEncode(payload)}
+}
+
+// If the accumulator type is unencodable (eg. contains raw interface{})
+// Try encoding the AccumCoder. If the marshaller doesn't panic, it's
+// encodable.
+func tryAddingCoder(c *coder.Coder) (ok bool) {
+       defer func() {
+               if p := recover(); p != nil {
+                       ok = false
+                       fmt.Printf("Unable to encode combiner for lifting: %v", 
p)
+               }
+       }()
+       // Try in a new Marshaller to not corrupt state.
+       NewCoderMarshaller().Add(c)
+       return true
+}
+
 // diff computes A\B and returns its keys as an identity map.
 func diff(a, b map[string]bool) map[string]string {
        ret := make(map[string]string)
diff --git a/sdks/go/pkg/beam/transforms/top/top.go 
b/sdks/go/pkg/beam/transforms/top/top.go
index f440242dd93..c08de242973 100644
--- a/sdks/go/pkg/beam/transforms/top/top.go
+++ b/sdks/go/pkg/beam/transforms/top/top.go
@@ -106,9 +106,9 @@ func validate(t typex.FullType, n int, less interface{}) {
        funcx.MustSatisfy(less, funcx.Replace(sig, beam.TType, t.Type()))
 }
 
-// TODO(herohde) 5/25/2017: the accumulator should be serializable with a 
Coder.
-// We need a coder here, because the elements are generally code-able only. 
Until
-// then, we do not support combiner lifting.
+// TODO(herohde) 5/25/2017: BEAM-4472 the accumulator should be serializable
+// with a Coder. We need a coder here, because the elements are generally
+// code-able only. Until then, it does not support combiner lifting.
 
 // TODO(herohde) 5/25/2017: use a heap instead of a sorted slice.
 


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 109196)
    Time Spent: 6h 50m  (was: 6h 40m)

> Implement the portable lifted Combiner transforms in Go SDK
> -----------------------------------------------------------
>
>                 Key: BEAM-4276
>                 URL: https://issues.apache.org/jira/browse/BEAM-4276
>             Project: Beam
>          Issue Type: Sub-task
>          Components: sdk-go
>            Reporter: Henning Rohde
>            Assignee: Robert Burke
>            Priority: Major
>          Time Spent: 6h 50m
>  Remaining Estimate: 0h
>
> Specifically add the necessary code to produce a Combine Composite with the 
> correct URN, and permit the SDK harness to understand the lifted parts when 
> receiving a bundle plan from the worker.
> Not expected as part of this issue is:
> Additional performance tweaks to the in memory cache (SeeĀ 
> [BEAM-4468|https://issues.apache.org/jira/browse/BEAM-4468])



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to