[
https://issues.apache.org/jira/browse/BEAM-4813?focusedWorklogId=128796&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-128796
]
ASF GitHub Bot logged work on BEAM-4813:
----------------------------------------
Author: ASF GitHub Bot
Created on: 30/Jul/18 16:54
Start Date: 30/Jul/18 16:54
Worklog Time Spent: 10m
Work Description: herohde commented on a change in pull request #5994:
[BEAM-4813] Refactor Go Dataflow runner and translation
URL: https://github.com/apache/beam/pull/5994#discussion_r206245870
##########
File path: sdks/go/pkg/beam/runners/dataflow/dataflowlib/translate.go
##########
@@ -0,0 +1,347 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package dataflowlib
+
+import (
+ "bytes"
+ "fmt"
+ "net/url"
+ "path"
+
+ "github.com/apache/beam/sdks/go/pkg/beam/core/graph/coder"
+ "github.com/apache/beam/sdks/go/pkg/beam/core/graph/mtime"
+ "github.com/apache/beam/sdks/go/pkg/beam/core/graph/window"
+ "github.com/apache/beam/sdks/go/pkg/beam/core/runtime/exec"
+ "github.com/apache/beam/sdks/go/pkg/beam/core/runtime/graphx"
+ "github.com/apache/beam/sdks/go/pkg/beam/core/runtime/pipelinex"
+ "github.com/apache/beam/sdks/go/pkg/beam/core/util/protox"
+ "github.com/apache/beam/sdks/go/pkg/beam/core/util/reflectx"
+ "github.com/apache/beam/sdks/go/pkg/beam/core/util/stringx"
+ pubsub_v1 "github.com/apache/beam/sdks/go/pkg/beam/io/pubsubio/v1"
+ pb "github.com/apache/beam/sdks/go/pkg/beam/model/pipeline_v1"
+ "github.com/golang/protobuf/proto"
+ df "google.golang.org/api/dataflow/v1b3"
+)
+
+const (
+ impulseKind = "CreateCollection"
+ parDoKind = "ParallelDo"
+ flattenKind = "Flatten"
+ gbkKind = "GroupByKey"
+ windowIntoKind = "Bucket"
+
+ sideInputKind = "CollectionToSingleton"
+
+ // Support for Dataflow native I/O, such as PubSub.
+ readKind = "ParallelRead"
+ writeKind = "ParallelWrite"
+)
+
+// translate translates a pipeline into a sequence of Dataflow steps. The step
+// representation and its semantics are complex. In particular, the service
+// optimizes the steps (step fusing, etc.) and may move steps around. Our
+// decorations of the steps must thus be robust against such changes, so that
+// they can be properly decoded in the harness. There are multiple quirks and
+// requirements of specific semi-opaque formats, such as base64 encoded blobs.
+//
+// Moreover, the harness sees pieces of the translated steps only -- not the
+// full graph. Special steps are also inserted around GBK, for example, which
+// makes the placement of the decoration somewhat tricky. The harness will
+// also never see steps that the service executes directly, notably GBK/CoGBK.
+func translate(p *pb.Pipeline) ([]*df.Step, error) {
+ // NOTE: Dataflow apparently assumes that the steps are in topological
order.
+ // Otherwise, it fails with "Output out for step was not found.". We
assume
+ // the pipeline has been normalized and each subtransform list is in
such order.
+
+ x := newTranslator(p.GetComponents())
+ return x.translateTransforms("", p.GetRootTransformIds())
+}
+
+type translator struct {
+ comp *pb.Components
+ pcollections map[string]*outputReference
+ coders *graphx.CoderUnmarshaller
+ bogusCoderRef *graphx.CoderRef
+}
+
+func newTranslator(comp *pb.Components) *translator {
+ bytesCoderRef, _ := graphx.EncodeCoderRef(coder.NewW(coder.NewBytes(),
coder.NewGlobalWindow()))
+
+ return &translator{
+ comp: comp,
+ pcollections: makeOutputReferences(comp.GetTransforms()),
+ coders: graphx.NewCoderUnmarshaller(comp.GetCoders()),
+ bogusCoderRef: bytesCoderRef,
+ }
+}
+
+func (x *translator) translateTransforms(trunk string, ids []string)
([]*df.Step, error) {
+ var steps []*df.Step
+ for _, id := range ids {
+ sub, err := x.translateTransform(trunk, id)
+ if err != nil {
+ return nil, err
+ }
+ steps = append(steps, sub...)
+ }
+ return steps, nil
+}
+
+func (x *translator) translateTransform(trunk string, id string) ([]*df.Step,
error) {
+ t := x.comp.Transforms[id]
+
+ prop := properties{
+ UserName: userName(trunk, t.UniqueName),
+ OutputInfo: x.translateOutputs(t.Outputs),
+ }
+
+ urn := t.GetSpec().GetUrn()
+ switch urn {
+ case graphx.URNImpulse:
+ // NOTE: The impulse []data value is encoded in a special way
as a
+ // URL Query-escaped windowed _unnested_ value. It is read back
in
+ // a nested context at runtime.
+ var buf bytes.Buffer
+ if err :=
exec.EncodeWindowedValueHeader(exec.MakeWindowEncoder(coder.NewGlobalWindow()),
window.SingleGlobalWindow, mtime.ZeroTimestamp, &buf); err != nil {
+ return nil, err
+ }
+ value := string(append(buf.Bytes(), t.GetSpec().Payload...))
+ // log.Printf("Impulse data: %v", url.QueryEscape(value))
+
+ prop.Element = []string{url.QueryEscape(value)}
+ return []*df.Step{x.newStep(id, impulseKind, prop)}, nil
+
+ case graphx.URNParDo:
+ var payload pb.ParDoPayload
+ if err := proto.Unmarshal(t.Spec.Payload, &payload); err != nil
{
+ return nil, fmt.Errorf("invalid ParDo payload for %v:
%v", t, err)
+ }
+
+ var steps []*df.Step
+ rem := reflectx.ShallowClone(t.Inputs).(map[string]string)
+
+ prop.NonParallelInputs = make(map[string]*outputReference)
+ for key := range payload.SideInputs {
+ // Side input require an additional conversion step,
which must
+ // be before the present one.
+ delete(rem, key)
+
+ pcol := x.comp.Pcollections[t.Inputs[key]]
+ ref := x.pcollections[t.Inputs[key]]
+ c := x.translateCoder(pcol, pcol.CoderId)
+
+ side := &df.Step{
+ Name: fmt.Sprintf("view%v_%v", id, key),
+ Kind: sideInputKind,
+ Properties: newMsg(properties{
+ ParallelInput: ref,
+ OutputInfo: []output{{
+ UserName: "i0",
+ OutputName: "i0",
+ Encoding:
graphx.WrapIterable(c),
+ }},
+ UserName: userName(trunk,
fmt.Sprintf("AsView%v_%v", id, key)),
+ }),
+ }
+ steps = append(steps, side)
+
+ prop.NonParallelInputs[side.Name] =
newOutputReference(side.Name, "i0")
+ }
+
+ in := stringx.SingleValue(rem)
+
+ prop.ParallelInput = x.pcollections[in]
+ prop.SerializedFn = id // == reference into the proto pipeline
+ return append(steps, x.newStep(id, parDoKind, prop)), nil
+
+ case graphx.URNFlatten:
+ for _, in := range t.Inputs {
+ prop.Inputs = append(prop.Inputs, x.pcollections[in])
+ }
+ return []*df.Step{x.newStep(id, flattenKind, prop)}, nil
+
+ case graphx.URNGBK:
+ in := stringx.SingleValue(t.Inputs)
+
+ prop.ParallelInput = x.pcollections[in]
+ prop.DisallowCombinerLifting = true
+ prop.SerializedFn =
encodeSerializedFn(x.extractWindowingStrategy(in))
+ return []*df.Step{x.newStep(id, gbkKind, prop)}, nil
+
+ case graphx.URNWindow:
+ in := stringx.SingleValue(t.Inputs)
+ out := stringx.SingleValue(t.Outputs)
+
+ prop.ParallelInput = x.pcollections[in]
+ prop.SerializedFn =
encodeSerializedFn(x.extractWindowingStrategy(out))
+ return []*df.Step{x.newStep(id, windowIntoKind, prop)}, nil
+
+ case pubsub_v1.PubSubPayloadURN:
+ // Translate to native handling of PubSub I/O.
+
+ var msg pubsub_v1.PubSubPayload
+ if err := proto.Unmarshal(t.Spec.Payload, &msg); err != nil {
+ return nil, fmt.Errorf("bad pubsub payload: %v", err)
+ }
+
+ prop.Format = "pubsub"
+ prop.PubSubTopic = msg.GetTopic()
+ prop.PubSubSubscription = msg.GetSubscription()
+ prop.PubSubIDLabel = msg.GetIdAttribute()
+ prop.PubSubTimestampLabel = msg.GetTimestampAttribute()
+ prop.PubSubWithAttributes = msg.GetWithAttributes()
+
+ if prop.PubSubSubscription != "" {
+ prop.PubSubTopic = ""
+ }
+
+ switch msg.Op {
+ case pubsub_v1.PubSubPayload_READ:
+ return []*df.Step{x.newStep(id, readKind, prop)}, nil
+
+ case pubsub_v1.PubSubPayload_WRITE:
+ in := stringx.SingleValue(t.Inputs)
+
+ prop.ParallelInput = x.pcollections[in]
+ prop.Encoding = x.wrapCoder(x.comp.Pcollections[in],
coder.NewBytes())
+ return []*df.Step{x.newStep(id, writeKind, prop)}, nil
+
+ default:
+ return nil, fmt.Errorf("bad pubsub op: %v", msg.Op)
+ }
+
+ default:
+ // TODO: graphx.URNCombinePerKey:
+
+ if len(t.Subtransforms) > 0 {
+ return x.translateTransforms(fmt.Sprintf("%v%v/",
trunk, path.Base(t.UniqueName)), t.Subtransforms)
+ }
+
+ return nil, fmt.Errorf("unexpected primitive urn: %v", t)
+ }
+}
+
+func (x *translator) newStep(id, kind string, prop properties) *df.Step {
+ step := &df.Step{
+ Name: id,
+ Kind: kind,
+ Properties: newMsg(prop),
+ }
+ if prop.PubSubWithAttributes {
+ // Hack to add a empty-value property for PubSub IO. This
+ // will make PubSub send the entire message, not just
+ // the payload.
+ prop.PubSubWithAttributes = false
+ step.Properties =
newMsg(propertiesWithPubSubMessage{properties: prop})
+ }
+ return step
+}
+
+func (x *translator) translateOutputs(outputs map[string]string) []output {
+ var ret []output
+ for _, out := range outputs {
+ pcol := x.comp.Pcollections[out]
+ ref := x.pcollections[out]
+
+ info := output{
+ UserName: ref.OutputName,
+ OutputName: ref.OutputName,
+ Encoding: x.translateCoder(pcol, pcol.CoderId),
+ }
+ ret = append(ret, info)
+ }
+ if len(ret) == 0 {
+ // Dataflow seems to require at least one output. We insert
+ // a bogus one (named "bogus") and remove it in the harness.
+
+ ret = []output{{
+ UserName: "bogus",
+ OutputName: "bogus",
+ Encoding: x.bogusCoderRef,
+ }}
+ }
+ return ret
+}
+
+func (x *translator) translateCoder(pcol *pb.PCollection, id string)
*graphx.CoderRef {
+ c, err := x.coders.Coder(id)
+ if err != nil {
+ panic(err)
+ }
+ return x.wrapCoder(pcol, c)
+}
+
+func (x *translator) wrapCoder(pcol *pb.PCollection, c *coder.Coder)
*graphx.CoderRef {
+ // TODO(herohde) 3/16/2018: ensure windowed values for Dataflow
+
+ ws := x.comp.WindowingStrategies[pcol.WindowingStrategyId]
+ wc, err := x.coders.WindowCoder(ws.WindowCoderId)
+ if err != nil {
+ panic(err)
Review comment:
Done
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 128796)
Time Spent: 3h 50m (was: 3h 40m)
> Make Go Dataflow translation use protos directly
> ------------------------------------------------
>
> Key: BEAM-4813
> URL: https://issues.apache.org/jira/browse/BEAM-4813
> Project: Beam
> Issue Type: Improvement
> Components: sdk-go
> Reporter: Henning Rohde
> Assignee: Henning Rohde
> Priority: Major
> Time Spent: 3h 50m
> Remaining Estimate: 0h
>
> The Go SDK maintains 2 pipeline translations and keeps various tweaks in
> sync. It would be better to remove the Dataflow one and extract a more
> flexible (such as running as a separate proxy) translation from proto to
> v1beta3.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)