[
https://issues.apache.org/jira/browse/BEAM-4276?focusedWorklogId=131447&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-131447
]
ASF GitHub Bot logged work on BEAM-4276:
----------------------------------------
Author: ASF GitHub Bot
Created on: 06/Aug/18 15:55
Start Date: 06/Aug/18 15:55
Worklog Time Spent: 10m
Work Description: herohde closed pull request #6143: [BEAM-4276] Combiner
lifting for Dataflow
URL: https://github.com/apache/beam/pull/6143
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git a/sdks/go/pkg/beam/runners/dataflow/dataflowlib/translate.go
b/sdks/go/pkg/beam/runners/dataflow/dataflowlib/translate.go
index 56625242c4f..062b12ae0e7 100644
--- a/sdks/go/pkg/beam/runners/dataflow/dataflowlib/translate.go
+++ b/sdks/go/pkg/beam/runners/dataflow/dataflowlib/translate.go
@@ -17,6 +17,7 @@ package dataflowlib
import (
"bytes"
+ "encoding/json"
"fmt"
"net/url"
"path"
@@ -39,6 +40,7 @@ import (
const (
impulseKind = "CreateCollection"
parDoKind = "ParallelDo"
+ combineKind = "CombineValues"
flattenKind = "Flatten"
gbkKind = "GroupByKey"
windowIntoKind = "Bucket"
@@ -166,6 +168,37 @@ func (x *translator) translateTransform(trunk string, id
string) ([]*df.Step, er
prop.ParallelInput = x.pcollections[in]
prop.SerializedFn = id // == reference into the proto pipeline
return append(steps, x.newStep(id, parDoKind, prop)), nil
+ case graphx.URNCombinePerKey:
+ // Dataflow uses a GBK followed by a CombineValues to determine
when it can lift.
+ // To achieve this, we use the combine composite's
subtransforms, and modify the
+ // Combine ParDo with the CombineValues kind, set its
SerializedFn to map to the
+ // composite payload, and the accumulator coding.
+ if len(t.Subtransforms) != 2 {
+ return nil, fmt.Errorf("invalid CombinePerKey, expected
2 subtransforms but got %d in %v", len(t.Subtransforms), t)
+ }
+ steps, err := x.translateTransforms(fmt.Sprintf("%v%v/", trunk,
path.Base(t.UniqueName)), t.Subtransforms)
+ if err != nil {
+ return nil, fmt.Errorf("invalid CombinePerKey, couldn't
extract GBK from %v: %v", t, err)
+ }
+ var payload pb.CombinePayload
+ if err := proto.Unmarshal(t.Spec.Payload, &payload); err != nil
{
+ return nil, fmt.Errorf("invalid Combine payload for %v:
%v", t, err)
+ }
+
+ c, err := x.coders.Coder(payload.AccumulatorCoderId)
+ if err != nil {
+ return nil, fmt.Errorf("invalid Combine payload ,
missing Accumulator Coder %v: %v", t, err)
+ }
+ enc, err := graphx.EncodeCoderRef(c)
+ if err != nil {
+ return nil, fmt.Errorf("invalid Combine payload,
couldn't encode Accumulator Coder %v: %v", t, err)
+ }
+ json.Unmarshal([]byte(steps[1].Properties), &prop)
+ prop.Encoding = enc
+ prop.SerializedFn = id
+ steps[1].Kind = combineKind
+ steps[1].Properties = newMsg(prop)
+ return steps, nil
case graphx.URNFlatten:
for _, in := range t.Inputs {
@@ -177,7 +210,6 @@ func (x *translator) translateTransform(trunk string, id
string) ([]*df.Step, er
in := stringx.SingleValue(t.Inputs)
prop.ParallelInput = x.pcollections[in]
- prop.DisallowCombinerLifting = true
prop.SerializedFn =
encodeSerializedFn(x.extractWindowingStrategy(in))
return []*df.Step{x.newStep(id, gbkKind, prop)}, nil
@@ -224,8 +256,6 @@ func (x *translator) translateTransform(trunk string, id
string) ([]*df.Step, er
}
default:
- // TODO: graphx.URNCombinePerKey:
-
if len(t.Subtransforms) > 0 {
return x.translateTransforms(fmt.Sprintf("%v%v/",
trunk, path.Base(t.UniqueName)), t.Subtransforms)
}
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 131447)
Time Spent: 9h 40m (was: 9.5h)
> Implement the portable lifted Combiner transforms in Go SDK
> -----------------------------------------------------------
>
> Key: BEAM-4276
> URL: https://issues.apache.org/jira/browse/BEAM-4276
> Project: Beam
> Issue Type: Sub-task
> Components: sdk-go
> Reporter: Henning Rohde
> Assignee: Robert Burke
> Priority: Major
> Time Spent: 9h 40m
> Remaining Estimate: 0h
>
> Specifically add the necessary code to produce a Combine Composite with the
> correct URN, and permit the SDK harness to understand the lifted parts when
> receiving a bundle plan from the worker.
> Not expected as part of this issue is:
> Additional performance tweaks to the in memory cache (SeeĀ
> [BEAM-4468|https://issues.apache.org/jira/browse/BEAM-4468])
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)