This is an automated email from the ASF dual-hosted git repository.

lostluck pushed a commit to branch lostluck-protosuffix
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/lostluck-protosuffix by this 
push:
     new 84ebc24  Update translate.go
84ebc24 is described below

commit 84ebc2411b64130f94400671327d1e4eabfe5a83
Author: Robert Burke <[email protected]>
AuthorDate: Tue Apr 7 22:52:09 2020 -0700

    Update translate.go
---
 .../beam/runners/dataflow/dataflowlib/translate.go | 26 +++++++++++-----------
 1 file changed, 13 insertions(+), 13 deletions(-)

diff --git a/sdks/go/pkg/beam/runners/dataflow/dataflowlib/translate.go 
b/sdks/go/pkg/beam/runners/dataflow/dataflowlib/translate.go
index 3c9f2c6..71d590e 100644
--- a/sdks/go/pkg/beam/runners/dataflow/dataflowlib/translate.go
+++ b/sdks/go/pkg/beam/runners/dataflow/dataflowlib/translate.go
@@ -33,7 +33,7 @@ import (
        "github.com/apache/beam/sdks/go/pkg/beam/core/util/stringx"
        "github.com/apache/beam/sdks/go/pkg/beam/internal/errors"
        pubsub_v1 "github.com/apache/beam/sdks/go/pkg/beam/io/pubsubio/v1"
-       pb "github.com/apache/beam/sdks/go/pkg/beam/model/pipeline_v1"
+       pipepb "github.com/apache/beam/sdks/go/pkg/beam/model/pipeline_v1"
        "github.com/golang/protobuf/proto"
        df "google.golang.org/api/dataflow/v1b3"
 )
@@ -64,7 +64,7 @@ const (
 // full graph. Special steps are also inserted around GBK, for example, which
 // makes the placement of the decoration somewhat tricky. The harness will
 // also never see steps that the service executes directly, notably GBK/CoGBK.
-func translate(p *pb.Pipeline) ([]*df.Step, error) {
+func translate(p *pipepb.Pipeline) ([]*df.Step, error) {
        // NOTE: Dataflow apparently assumes that the steps are in topological 
order.
        // Otherwise, it fails with "Output out for step  was not found.". We 
assume
        // the pipeline has been normalized and each subtransform list is in 
such order.
@@ -74,13 +74,13 @@ func translate(p *pb.Pipeline) ([]*df.Step, error) {
 }
 
 type translator struct {
-       comp          *pb.Components
+       comp          *pipepb.Components
        pcollections  map[string]*outputReference
        coders        *graphx.CoderUnmarshaller
        bogusCoderRef *graphx.CoderRef
 }
 
-func newTranslator(comp *pb.Components) *translator {
+func newTranslator(comp *pipepb.Components) *translator {
        bytesCoderRef, _ := graphx.EncodeCoderRef(coder.NewW(coder.NewBytes(), 
coder.NewGlobalWindow()))
 
        return &translator{
@@ -128,7 +128,7 @@ func (x *translator) translateTransform(trunk string, id 
string) ([]*df.Step, er
                return []*df.Step{x.newStep(id, impulseKind, prop)}, nil
 
        case graphx.URNParDo:
-               var payload pb.ParDoPayload
+               var payload pipepb.ParDoPayload
                if err := proto.Unmarshal(t.Spec.Payload, &payload); err != nil 
{
                        return nil, errors.Wrapf(err, "invalid ParDo payload 
for %v", t)
                }
@@ -189,7 +189,7 @@ func (x *translator) translateTransform(trunk string, id 
string) ([]*df.Step, er
                if err != nil {
                        return nil, errors.Wrapf(err, "invalid CombinePerKey, 
couldn't extract GBK from %v", t)
                }
-               var payload pb.CombinePayload
+               var payload pipepb.CombinePayload
                if err := proto.Unmarshal(t.Spec.Payload, &payload); err != nil 
{
                        return nil, errors.Wrapf(err, "invalid Combine payload 
for %v", t)
                }
@@ -318,7 +318,7 @@ func (x *translator) translateOutputs(outputs 
map[string]string) []output {
        return ret
 }
 
-func (x *translator) translateCoder(pcol *pb.PCollection, id string) 
*graphx.CoderRef {
+func (x *translator) translateCoder(pcol *pipepb.PCollection, id string) 
*graphx.CoderRef {
        c, err := x.coders.Coder(id)
        if err != nil {
                panic(err)
@@ -326,7 +326,7 @@ func (x *translator) translateCoder(pcol *pb.PCollection, 
id string) *graphx.Cod
        return x.wrapCoder(pcol, c)
 }
 
-func (x *translator) wrapCoder(pcol *pb.PCollection, c *coder.Coder) 
*graphx.CoderRef {
+func (x *translator) wrapCoder(pcol *pipepb.PCollection, c *coder.Coder) 
*graphx.CoderRef {
        // TODO(herohde) 3/16/2018: ensure windowed values for Dataflow
 
        ws := x.comp.WindowingStrategies[pcol.WindowingStrategyId]
@@ -343,14 +343,14 @@ func (x *translator) wrapCoder(pcol *pb.PCollection, c 
*coder.Coder) *graphx.Cod
 
 // extractWindowingStrategy returns a self-contained windowing strategy from
 // the given pcollection id.
-func (x *translator) extractWindowingStrategy(pid string) 
*pb.MessageWithComponents {
+func (x *translator) extractWindowingStrategy(pid string) 
*pipepb.MessageWithComponents {
        ws := 
x.comp.WindowingStrategies[x.comp.Pcollections[pid].WindowingStrategyId]
 
-       msg := &pb.MessageWithComponents{
-               Components: &pb.Components{
+       msg := &pipepb.MessageWithComponents{
+               Components: &pipepb.Components{
                        Coders: pipelinex.TrimCoders(x.comp.Coders, 
ws.WindowCoderId),
                },
-               Root: &pb.MessageWithComponents_WindowingStrategy{
+               Root: &pipepb.MessageWithComponents_WindowingStrategy{
                        WindowingStrategy: ws,
                },
        }
@@ -359,7 +359,7 @@ func (x *translator) extractWindowingStrategy(pid string) 
*pb.MessageWithCompone
 
 // makeOutputReferences builds a map from PCollection id to the Dataflow 
representation.
 // Each output is named after the generating transform.
-func makeOutputReferences(xforms map[string]*pb.PTransform) 
map[string]*outputReference {
+func makeOutputReferences(xforms map[string]*pipepb.PTransform) 
map[string]*outputReference {
        ret := make(map[string]*outputReference)
        for id, t := range xforms {
                if len(t.Subtransforms) > 0 {

Reply via email to