This is an automated email from the ASF dual-hosted git repository.
lostluck pushed a commit to branch lostluck-protosuffix
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/lostluck-protosuffix by this
push:
new 84ebc24 Update translate.go
84ebc24 is described below
commit 84ebc2411b64130f94400671327d1e4eabfe5a83
Author: Robert Burke <[email protected]>
AuthorDate: Tue Apr 7 22:52:09 2020 -0700
Update translate.go
---
.../beam/runners/dataflow/dataflowlib/translate.go | 26 +++++++++++-----------
1 file changed, 13 insertions(+), 13 deletions(-)
diff --git a/sdks/go/pkg/beam/runners/dataflow/dataflowlib/translate.go
b/sdks/go/pkg/beam/runners/dataflow/dataflowlib/translate.go
index 3c9f2c6..71d590e 100644
--- a/sdks/go/pkg/beam/runners/dataflow/dataflowlib/translate.go
+++ b/sdks/go/pkg/beam/runners/dataflow/dataflowlib/translate.go
@@ -33,7 +33,7 @@ import (
"github.com/apache/beam/sdks/go/pkg/beam/core/util/stringx"
"github.com/apache/beam/sdks/go/pkg/beam/internal/errors"
pubsub_v1 "github.com/apache/beam/sdks/go/pkg/beam/io/pubsubio/v1"
- pb "github.com/apache/beam/sdks/go/pkg/beam/model/pipeline_v1"
+ pipepb "github.com/apache/beam/sdks/go/pkg/beam/model/pipeline_v1"
"github.com/golang/protobuf/proto"
df "google.golang.org/api/dataflow/v1b3"
)
@@ -64,7 +64,7 @@ const (
// full graph. Special steps are also inserted around GBK, for example, which
// makes the placement of the decoration somewhat tricky. The harness will
// also never see steps that the service executes directly, notably GBK/CoGBK.
-func translate(p *pb.Pipeline) ([]*df.Step, error) {
+func translate(p *pipepb.Pipeline) ([]*df.Step, error) {
// NOTE: Dataflow apparently assumes that the steps are in topological
order.
// Otherwise, it fails with "Output out for step was not found.". We
assume
// the pipeline has been normalized and each subtransform list is in
such order.
@@ -74,13 +74,13 @@ func translate(p *pb.Pipeline) ([]*df.Step, error) {
}
type translator struct {
- comp *pb.Components
+ comp *pipepb.Components
pcollections map[string]*outputReference
coders *graphx.CoderUnmarshaller
bogusCoderRef *graphx.CoderRef
}
-func newTranslator(comp *pb.Components) *translator {
+func newTranslator(comp *pipepb.Components) *translator {
bytesCoderRef, _ := graphx.EncodeCoderRef(coder.NewW(coder.NewBytes(),
coder.NewGlobalWindow()))
return &translator{
@@ -128,7 +128,7 @@ func (x *translator) translateTransform(trunk string, id
string) ([]*df.Step, er
return []*df.Step{x.newStep(id, impulseKind, prop)}, nil
case graphx.URNParDo:
- var payload pb.ParDoPayload
+ var payload pipepb.ParDoPayload
if err := proto.Unmarshal(t.Spec.Payload, &payload); err != nil
{
return nil, errors.Wrapf(err, "invalid ParDo payload
for %v", t)
}
@@ -189,7 +189,7 @@ func (x *translator) translateTransform(trunk string, id
string) ([]*df.Step, er
if err != nil {
return nil, errors.Wrapf(err, "invalid CombinePerKey,
couldn't extract GBK from %v", t)
}
- var payload pb.CombinePayload
+ var payload pipepb.CombinePayload
if err := proto.Unmarshal(t.Spec.Payload, &payload); err != nil
{
return nil, errors.Wrapf(err, "invalid Combine payload
for %v", t)
}
@@ -318,7 +318,7 @@ func (x *translator) translateOutputs(outputs
map[string]string) []output {
return ret
}
-func (x *translator) translateCoder(pcol *pb.PCollection, id string)
*graphx.CoderRef {
+func (x *translator) translateCoder(pcol *pipepb.PCollection, id string)
*graphx.CoderRef {
c, err := x.coders.Coder(id)
if err != nil {
panic(err)
@@ -326,7 +326,7 @@ func (x *translator) translateCoder(pcol *pb.PCollection,
id string) *graphx.Cod
return x.wrapCoder(pcol, c)
}
-func (x *translator) wrapCoder(pcol *pb.PCollection, c *coder.Coder)
*graphx.CoderRef {
+func (x *translator) wrapCoder(pcol *pipepb.PCollection, c *coder.Coder)
*graphx.CoderRef {
// TODO(herohde) 3/16/2018: ensure windowed values for Dataflow
ws := x.comp.WindowingStrategies[pcol.WindowingStrategyId]
@@ -343,14 +343,14 @@ func (x *translator) wrapCoder(pcol *pb.PCollection, c
*coder.Coder) *graphx.Cod
// extractWindowingStrategy returns a self-contained windowing strategy from
// the given pcollection id.
-func (x *translator) extractWindowingStrategy(pid string)
*pb.MessageWithComponents {
+func (x *translator) extractWindowingStrategy(pid string)
*pipepb.MessageWithComponents {
ws :=
x.comp.WindowingStrategies[x.comp.Pcollections[pid].WindowingStrategyId]
- msg := &pb.MessageWithComponents{
- Components: &pb.Components{
+ msg := &pipepb.MessageWithComponents{
+ Components: &pipepb.Components{
Coders: pipelinex.TrimCoders(x.comp.Coders,
ws.WindowCoderId),
},
- Root: &pb.MessageWithComponents_WindowingStrategy{
+ Root: &pipepb.MessageWithComponents_WindowingStrategy{
WindowingStrategy: ws,
},
}
@@ -359,7 +359,7 @@ func (x *translator) extractWindowingStrategy(pid string)
*pb.MessageWithCompone
// makeOutputReferences builds a map from PCollection id to the Dataflow
representation.
// Each output is named after the generating transform.
-func makeOutputReferences(xforms map[string]*pb.PTransform)
map[string]*outputReference {
+func makeOutputReferences(xforms map[string]*pipepb.PTransform)
map[string]*outputReference {
ret := make(map[string]*outputReference)
for id, t := range xforms {
if len(t.Subtransforms) > 0 {