This is an automated email from the ASF dual-hosted git repository.
lostluck pushed a commit to branch lostluck-protosuffix
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/lostluck-protosuffix by this
push:
new 11f086a Update datamgr.go
11f086a is described below
commit 11f086a554602ec66e230b8c71ca6cb74e961464
Author: Robert Burke <[email protected]>
AuthorDate: Tue Apr 7 22:32:40 2020 -0700
Update datamgr.go
---
sdks/go/pkg/beam/core/runtime/harness/datamgr.go | 20 ++++++++++----------
1 file changed, 10 insertions(+), 10 deletions(-)
diff --git a/sdks/go/pkg/beam/core/runtime/harness/datamgr.go
b/sdks/go/pkg/beam/core/runtime/harness/datamgr.go
index a24d6a3..81c5514 100644
--- a/sdks/go/pkg/beam/core/runtime/harness/datamgr.go
+++ b/sdks/go/pkg/beam/core/runtime/harness/datamgr.go
@@ -24,7 +24,7 @@ import (
"github.com/apache/beam/sdks/go/pkg/beam/core/runtime/exec"
"github.com/apache/beam/sdks/go/pkg/beam/internal/errors"
"github.com/apache/beam/sdks/go/pkg/beam/log"
- pb "github.com/apache/beam/sdks/go/pkg/beam/model/fnexecution_v1"
+ fnpb "github.com/apache/beam/sdks/go/pkg/beam/model/fnexecution_v1"
)
const (
@@ -133,10 +133,10 @@ type clientID struct {
// This is a reduced version of the full gRPC interface to help with testing.
// TODO(wcn): need a compile-time assertion to make sure this stays synced
with what's
-// in pb.BeamFnData_DataClient
+// in fnpb.BeamFnData_DataClient
type dataClient interface {
- Send(*pb.Elements) error
- Recv() (*pb.Elements, error)
+ Send(*fnpb.Elements) error
+ Recv() (*fnpb.Elements, error)
}
// DataChannel manages a single gRPC stream over the Data API. Data from
@@ -166,7 +166,7 @@ func newDataChannel(ctx context.Context, port exec.Port)
(*DataChannel, error) {
if err != nil {
return nil, errors.Wrapf(err, "failed to connect to data
service at %v", port.URL)
}
- client, err := pb.NewBeamFnDataClient(cc).Data(ctx)
+ client, err := fnpb.NewBeamFnDataClient(cc).Data(ctx)
if err != nil {
cc.Close()
return nil, errors.Wrapf(err, "failed to create data client on
%v", port.URL)
@@ -391,7 +391,7 @@ type dataWriter struct {
}
// send requires the ch.mu lock to be held.
-func (w *dataWriter) send(msg *pb.Elements) error {
+func (w *dataWriter) send(msg *fnpb.Elements) error {
recordStreamSend(msg)
if err := w.ch.client.Send(msg); err != nil {
if err == io.EOF {
@@ -423,8 +423,8 @@ func (w *dataWriter) Close() error {
w.ch.mu.Lock()
defer w.ch.mu.Unlock()
delete(w.ch.writers, w.id)
- msg := &pb.Elements{
- Data: []*pb.Elements_Data{
+ msg := &fnpb.Elements{
+ Data: []*fnpb.Elements_Data{
{
InstructionId: string(w.id.instID),
TransformId: w.id.ptransformID,
@@ -446,8 +446,8 @@ func (w *dataWriter) Flush() error {
return nil
}
- msg := &pb.Elements{
- Data: []*pb.Elements_Data{
+ msg := &fnpb.Elements{
+ Data: []*fnpb.Elements_Data{
{
InstructionId: string(w.id.instID),
TransformId: w.id.ptransformID,