This is an automated email from the ASF dual-hosted git repository.
lostluck pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 828717a71d6 [#21515][Go SDK] Update go protobuf package to new version
(#32045)
828717a71d6 is described below
commit 828717a71d638664ba12cad5c0c00193bb1cde35
Author: Vatsal <[email protected]>
AuthorDate: Wed Aug 7 09:54:15 2024 -0700
[#21515][Go SDK] Update go protobuf package to new version (#32045)
---
CHANGES.md | 1 +
sdks/go.mod | 2 +-
sdks/go/cmd/beamctl/cmd/provision.go | 3 +--
sdks/go/container/boot_test.go | 2 +-
sdks/go/container/tools/provision.go | 13 +++++++++----
sdks/go/pkg/beam/artifact/gcsproxy/retrieval.go | 2 +-
sdks/go/pkg/beam/artifact/gcsproxy/staging.go | 2 +-
sdks/go/pkg/beam/artifact/materialize.go | 2 +-
sdks/go/pkg/beam/artifact/materialize_test.go | 2 +-
sdks/go/pkg/beam/coder.go | 12 ++++++------
sdks/go/pkg/beam/core/runtime/exec/translate.go | 2 +-
sdks/go/pkg/beam/core/runtime/graphx/coder.go | 8 ++++----
sdks/go/pkg/beam/core/runtime/graphx/schema/schema.go | 2 +-
sdks/go/pkg/beam/core/runtime/graphx/schema/schema_test.go | 3 +--
sdks/go/pkg/beam/core/runtime/graphx/translate.go | 8 ++++----
sdks/go/pkg/beam/core/runtime/graphx/translate_test.go | 10 +++++-----
sdks/go/pkg/beam/core/runtime/harness/harness_test.go | 2 +-
sdks/go/pkg/beam/core/runtime/harness/statemgr.go | 5 ++---
sdks/go/pkg/beam/core/runtime/pipelinex/clone_test.go | 2 +-
sdks/go/pkg/beam/core/runtime/pipelinex/replace.go | 2 +-
sdks/go/pkg/beam/core/runtime/pipelinex/replace_test.go | 2 +-
sdks/go/pkg/beam/core/runtime/pipelinex/util.go | 2 +-
sdks/go/pkg/beam/core/runtime/xlangx/resolve_test.go | 2 +-
sdks/go/pkg/beam/core/util/protox/any.go | 6 +++---
sdks/go/pkg/beam/core/util/protox/any_test.go | 4 ++--
sdks/go/pkg/beam/core/util/protox/base64.go | 2 +-
sdks/go/pkg/beam/core/util/protox/protox.go | 2 +-
sdks/go/pkg/beam/create_test.go | 6 ++----
sdks/go/pkg/beam/provision/provision.go | 2 +-
sdks/go/pkg/beam/runners/dataflow/dataflow.go | 3 +--
sdks/go/pkg/beam/runners/dataflow/dataflowlib/execute.go | 3 +--
sdks/go/pkg/beam/runners/universal/runnerlib/job.go | 3 +--
sdks/go/pkg/beam/runners/universal/runnerlib/stage.go | 2 +-
sdks/go/pkg/beam/runners/universal/universal.go | 3 +--
sdks/go/pkg/beam/transforms/xlang/schema/external.go | 2 +-
sdks/java/container/boot.go | 7 +++----
sdks/python/container/boot.go | 12 ++++++------
37 files changed, 72 insertions(+), 76 deletions(-)
diff --git a/CHANGES.md b/CHANGES.md
index 129fa01f94a..d082f03fd31 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -69,6 +69,7 @@
* X feature added (Java/Python)
([#X](https://github.com/apache/beam/issues/X)).
* Go SDK Minimum Go Version updated to 1.21
([#32092](https://github.com/apache/beam/pull/32092)).
+* Updated Go protobuf package to new version (Go)
([#21515](https://github.com/apache/beam/issues/21515)).
## Breaking Changes
diff --git a/sdks/go.mod b/sdks/go.mod
index a5ad9f3b7f5..fb0b7f85f3d 100644
--- a/sdks/go.mod
+++ b/sdks/go.mod
@@ -39,7 +39,6 @@ require (
github.com/docker/go-connections v0.5.0
github.com/dustin/go-humanize v1.0.1
github.com/go-sql-driver/mysql v1.8.1
- github.com/golang/protobuf v1.5.4 // TODO(danoliveira): Fully replace
this with google.golang.org/protobuf
github.com/google/go-cmp v0.6.0
github.com/google/uuid v1.6.0
github.com/johannesboyne/gofakes3 v0.0.0-20221110173912-32fb85c5aed6
@@ -88,6 +87,7 @@ require (
github.com/go-logr/logr v1.4.1 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
github.com/go-ole/go-ole v1.2.6 // indirect
+ github.com/golang/protobuf v1.5.4 // indirect
github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0 //
indirect
github.com/minio/highwayhash v1.0.2 // indirect
github.com/moby/docker-image-spec v1.3.1 // indirect
diff --git a/sdks/go/cmd/beamctl/cmd/provision.go
b/sdks/go/cmd/beamctl/cmd/provision.go
index cab82f7bf9d..878c9a77da8 100644
--- a/sdks/go/cmd/beamctl/cmd/provision.go
+++ b/sdks/go/cmd/beamctl/cmd/provision.go
@@ -17,7 +17,6 @@ package cmd
import (
fnpb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/fnexecution_v1"
- "github.com/golang/protobuf/proto"
"github.com/spf13/cobra"
)
@@ -53,6 +52,6 @@ func infoFn(cmd *cobra.Command, args []string) error {
return err
}
- cmd.Print(proto.MarshalTextString(info.GetInfo()))
+ cmd.Print(info.GetInfo().String())
return nil
}
diff --git a/sdks/go/container/boot_test.go b/sdks/go/container/boot_test.go
index e799e5d65b0..49c78047249 100644
--- a/sdks/go/container/boot_test.go
+++ b/sdks/go/container/boot_test.go
@@ -25,7 +25,7 @@ import (
"github.com/apache/beam/sdks/v2/go/pkg/beam/artifact"
fnpb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/fnexecution_v1"
pipepb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/pipeline_v1"
- "github.com/golang/protobuf/proto"
+ "google.golang.org/protobuf/proto"
)
func TestEnsureEndpointsSet_AllSet(t *testing.T) {
diff --git a/sdks/go/container/tools/provision.go
b/sdks/go/container/tools/provision.go
index dab3383fc17..6b370a5c2e6 100644
--- a/sdks/go/container/tools/provision.go
+++ b/sdks/go/container/tools/provision.go
@@ -29,8 +29,8 @@ import (
fnpb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/fnexecution_v1"
"github.com/apache/beam/sdks/v2/go/pkg/beam/util/grpcx"
- "github.com/golang/protobuf/jsonpb"
- google_pb "github.com/golang/protobuf/ptypes/struct"
+ "google.golang.org/protobuf/encoding/protojson"
+ google_pb "google.golang.org/protobuf/types/known/structpb"
)
// ProvisionInfo returns the runtime provisioning info for the worker.
@@ -65,7 +65,8 @@ func OptionsToProto(v any) (*google_pb.Struct, error) {
// JSONToProto converts JSON-encoded pipeline options to a proto struct.
func JSONToProto(data string) (*google_pb.Struct, error) {
var out google_pb.Struct
- if err := jsonpb.UnmarshalString(string(data), &out); err != nil {
+
+ if err := protojson.Unmarshal([]byte(data), &out); err != nil {
return nil, err
}
return &out, nil
@@ -85,5 +86,9 @@ func ProtoToJSON(opt *google_pb.Struct) (string, error) {
if opt == nil {
return "{}", nil
}
- return (&jsonpb.Marshaler{}).MarshalToString(opt)
+ bytes, err := protojson.Marshal(opt)
+ if err != nil {
+ return "", err
+ }
+ return string(bytes), err
}
diff --git a/sdks/go/pkg/beam/artifact/gcsproxy/retrieval.go
b/sdks/go/pkg/beam/artifact/gcsproxy/retrieval.go
index 15c2d9e2954..ceb8a319be9 100644
--- a/sdks/go/pkg/beam/artifact/gcsproxy/retrieval.go
+++ b/sdks/go/pkg/beam/artifact/gcsproxy/retrieval.go
@@ -22,8 +22,8 @@ import (
"github.com/apache/beam/sdks/v2/go/pkg/beam/internal/errors"
jobpb
"github.com/apache/beam/sdks/v2/go/pkg/beam/model/jobmanagement_v1"
"github.com/apache/beam/sdks/v2/go/pkg/beam/util/gcsx"
- "github.com/golang/protobuf/proto"
"golang.org/x/net/context"
+ "google.golang.org/protobuf/proto"
)
// RetrievalServer is a artifact retrieval server backed by Google
diff --git a/sdks/go/pkg/beam/artifact/gcsproxy/staging.go
b/sdks/go/pkg/beam/artifact/gcsproxy/staging.go
index a2950843980..9113e780f33 100644
--- a/sdks/go/pkg/beam/artifact/gcsproxy/staging.go
+++ b/sdks/go/pkg/beam/artifact/gcsproxy/staging.go
@@ -28,8 +28,8 @@ import (
"github.com/apache/beam/sdks/v2/go/pkg/beam/internal/errors"
jobpb
"github.com/apache/beam/sdks/v2/go/pkg/beam/model/jobmanagement_v1"
"github.com/apache/beam/sdks/v2/go/pkg/beam/util/gcsx"
- "github.com/golang/protobuf/proto"
"golang.org/x/net/context"
+ "google.golang.org/protobuf/proto"
)
// StagingServer is a artifact staging server backed by Google Cloud Storage
diff --git a/sdks/go/pkg/beam/artifact/materialize.go
b/sdks/go/pkg/beam/artifact/materialize.go
index 866e0dd99b9..624e30efcd2 100644
--- a/sdks/go/pkg/beam/artifact/materialize.go
+++ b/sdks/go/pkg/beam/artifact/materialize.go
@@ -38,7 +38,7 @@ import (
pipepb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/pipeline_v1"
"github.com/apache/beam/sdks/v2/go/pkg/beam/util/errorx"
"github.com/apache/beam/sdks/v2/go/pkg/beam/util/grpcx"
- "github.com/golang/protobuf/proto"
+ "google.golang.org/protobuf/proto"
)
// TODO(lostluck): 2018/05/28 Extract these from their enum descriptors in the
pipeline_v1 proto
diff --git a/sdks/go/pkg/beam/artifact/materialize_test.go
b/sdks/go/pkg/beam/artifact/materialize_test.go
index 35223c908b7..31890ed045c 100644
--- a/sdks/go/pkg/beam/artifact/materialize_test.go
+++ b/sdks/go/pkg/beam/artifact/materialize_test.go
@@ -29,9 +29,9 @@ import (
jobpb
"github.com/apache/beam/sdks/v2/go/pkg/beam/model/jobmanagement_v1"
pipepb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/pipeline_v1"
"github.com/apache/beam/sdks/v2/go/pkg/beam/util/grpcx"
- "github.com/golang/protobuf/proto"
"google.golang.org/grpc"
"google.golang.org/grpc/metadata"
+ "google.golang.org/protobuf/proto"
)
// TestRetrieve tests that we can successfully retrieve fresh files.
diff --git a/sdks/go/pkg/beam/coder.go b/sdks/go/pkg/beam/coder.go
index 062bb337e8d..b03b739ed7b 100644
--- a/sdks/go/pkg/beam/coder.go
+++ b/sdks/go/pkg/beam/coder.go
@@ -30,8 +30,8 @@ import (
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/util/jsonx"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/util/reflectx"
"github.com/apache/beam/sdks/v2/go/pkg/beam/internal/errors"
- protov1 "github.com/golang/protobuf/proto"
protov2 "google.golang.org/protobuf/proto"
+ "google.golang.org/protobuf/protoadapt"
"google.golang.org/protobuf/reflect/protoreflect"
)
@@ -51,7 +51,7 @@ type jsonCoder interface {
json.Unmarshaler
}
-var protoMessageType = reflect.TypeOf((*protov1.Message)(nil)).Elem()
+var protoMessageType = reflect.TypeOf((*protoadapt.MessageV1)(nil)).Elem()
var protoReflectMessageType =
reflect.TypeOf((*protoreflect.ProtoMessage)(nil)).Elem()
var jsonCoderType = reflect.TypeOf((*jsonCoder)(nil)).Elem()
@@ -276,8 +276,8 @@ func protoEnc(in T) ([]byte, error) {
switch it := in.(type) {
case protoreflect.ProtoMessage:
p = it
- case protov1.Message:
- p = protov1.MessageV2(it)
+ case protoadapt.MessageV1:
+ p = protoadapt.MessageV2Of(it)
}
b, err := protov2.MarshalOptions{Deterministic: true}.Marshal(p)
if err != nil {
@@ -293,8 +293,8 @@ func protoDec(t reflect.Type, in []byte) (T, error) {
switch it := reflect.New(t.Elem()).Interface().(type) {
case protoreflect.ProtoMessage:
p = it
- case protov1.Message:
- p = protov1.MessageV2(it)
+ case protoadapt.MessageV1:
+ p = protoadapt.MessageV2Of(it)
}
err := protov2.UnmarshalOptions{}.Unmarshal(in, p)
if err != nil {
diff --git a/sdks/go/pkg/beam/core/runtime/exec/translate.go
b/sdks/go/pkg/beam/core/runtime/exec/translate.go
index 72af9e80c40..b74ede228fd 100644
--- a/sdks/go/pkg/beam/core/runtime/exec/translate.go
+++ b/sdks/go/pkg/beam/core/runtime/exec/translate.go
@@ -33,7 +33,7 @@ import (
"github.com/apache/beam/sdks/v2/go/pkg/beam/internal/errors"
fnpb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/fnexecution_v1"
pipepb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/pipeline_v1"
- "github.com/golang/protobuf/proto"
+ "google.golang.org/protobuf/proto"
)
// TODO(lostluck): 2018/05/28 Extract these from the canonical enums in
beam_runner_api.proto
diff --git a/sdks/go/pkg/beam/core/runtime/graphx/coder.go
b/sdks/go/pkg/beam/core/runtime/graphx/coder.go
index 87b3771e575..99ca5517d3d 100644
--- a/sdks/go/pkg/beam/core/runtime/graphx/coder.go
+++ b/sdks/go/pkg/beam/core/runtime/graphx/coder.go
@@ -27,7 +27,7 @@ import (
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/util/protox"
"github.com/apache/beam/sdks/v2/go/pkg/beam/internal/errors"
pipepb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/pipeline_v1"
- "github.com/golang/protobuf/proto"
+ "google.golang.org/protobuf/proto"
)
const (
@@ -615,8 +615,8 @@ func (b *CoderMarshaller) internRowCoder(schema
*pipepb.Schema) string {
}
func (b *CoderMarshaller) internCoder(coder *pipepb.Coder) string {
- key := proto.MarshalTextString(coder)
- if id, exists := b.coder2id[key]; exists {
+ key := coder.String()
+ if id, exists := b.coder2id[(key)]; exists {
return id
}
@@ -626,7 +626,7 @@ func (b *CoderMarshaller) internCoder(coder *pipepb.Coder)
string {
} else {
id = fmt.Sprintf("c%v@%v", len(b.coder2id), b.Namespace)
}
- b.coder2id[key] = id
+ b.coder2id[string(key)] = id
b.coders[id] = coder
return id
}
diff --git a/sdks/go/pkg/beam/core/runtime/graphx/schema/schema.go
b/sdks/go/pkg/beam/core/runtime/graphx/schema/schema.go
index fdd9355e1cb..0d44e68285b 100644
--- a/sdks/go/pkg/beam/core/runtime/graphx/schema/schema.go
+++ b/sdks/go/pkg/beam/core/runtime/graphx/schema/schema.go
@@ -37,8 +37,8 @@ import (
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/util/reflectx"
"github.com/apache/beam/sdks/v2/go/pkg/beam/internal/errors"
pipepb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/pipeline_v1"
- "github.com/golang/protobuf/proto"
"github.com/google/uuid"
+ "google.golang.org/protobuf/proto"
)
// Initialize registered schemas. For use by the beam package at beam.Init
time.
diff --git a/sdks/go/pkg/beam/core/runtime/graphx/schema/schema_test.go
b/sdks/go/pkg/beam/core/runtime/graphx/schema/schema_test.go
index 37b3e79f8f5..367d70e81d1 100644
--- a/sdks/go/pkg/beam/core/runtime/graphx/schema/schema_test.go
+++ b/sdks/go/pkg/beam/core/runtime/graphx/schema/schema_test.go
@@ -24,7 +24,6 @@ import (
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime"
pipepb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/pipeline_v1"
- "github.com/golang/protobuf/proto"
"github.com/google/go-cmp/cmp"
"google.golang.org/protobuf/encoding/prototext"
"google.golang.org/protobuf/testing/protocmp"
@@ -806,7 +805,7 @@ func TestSchemaConversion(t *testing.T) {
}
if d := cmp.Diff(test.st, got,
protocmp.Transform(),
-
protocmp.IgnoreFields(proto.MessageV2(&pipepb.Schema{}), "id"),
+ protocmp.IgnoreFields(&pipepb.Schema{},
"id"),
); d != "" {
t.Errorf("diff (-want, +got): %v", d)
}
diff --git a/sdks/go/pkg/beam/core/runtime/graphx/translate.go
b/sdks/go/pkg/beam/core/runtime/graphx/translate.go
index b0529254613..65280ef6b93 100644
--- a/sdks/go/pkg/beam/core/runtime/graphx/translate.go
+++ b/sdks/go/pkg/beam/core/runtime/graphx/translate.go
@@ -34,7 +34,7 @@ import (
"github.com/apache/beam/sdks/v2/go/pkg/beam/internal/errors"
pipepb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/pipeline_v1"
"github.com/apache/beam/sdks/v2/go/pkg/beam/options/resource"
- "github.com/golang/protobuf/proto"
+ "google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/durationpb"
)
@@ -1209,13 +1209,13 @@ func (m *marshaller) addWindowingStrategy(w
*window.WindowingStrategy) (string,
}
func (m *marshaller) internWindowingStrategy(w *pipepb.WindowingStrategy)
string {
- key := proto.MarshalTextString(w)
- if id, exists := m.windowing2id[key]; exists {
+ key := w.String()
+ if id, exists := m.windowing2id[(key)]; exists {
return id
}
id := fmt.Sprintf("w%v", len(m.windowing2id))
- m.windowing2id[key] = id
+ m.windowing2id[string(key)] = id
m.windowing[id] = w
return id
}
diff --git a/sdks/go/pkg/beam/core/runtime/graphx/translate_test.go
b/sdks/go/pkg/beam/core/runtime/graphx/translate_test.go
index a331aedd585..e18a5f97796 100644
--- a/sdks/go/pkg/beam/core/runtime/graphx/translate_test.go
+++ b/sdks/go/pkg/beam/core/runtime/graphx/translate_test.go
@@ -34,8 +34,8 @@ import (
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/util/protox"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/util/reflectx"
pipepb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/pipeline_v1"
- "github.com/golang/protobuf/proto"
"github.com/google/go-cmp/cmp"
+ "google.golang.org/protobuf/proto"
)
func init() {
@@ -181,13 +181,13 @@ func TestMarshal(t *testing.T) {
}
if got, want := len(p.GetComponents().GetTransforms()),
test.transforms; got != want {
- t.Errorf("got %d transforms, want %d : %v",
got, want, proto.MarshalTextString(p))
+ t.Errorf("got %d transforms, want %d : %v",
got, want, p.String())
}
if got, want := len(p.GetRootTransformIds()),
test.roots; got != want {
- t.Errorf("got %d roots, want %d : %v", got,
want, proto.MarshalTextString(p))
+ t.Errorf("got %d roots, want %d : %v", got,
want, p.String())
}
if got, want := p.GetRequirements(), test.requirements;
!cmp.Equal(got, want, cmpopts.SortSlices(func(a, b string) bool { return a < b
})) {
- t.Errorf("incorrect requirements: got %v, want
%v : %v", got, want, proto.MarshalTextString(p))
+ t.Errorf("incorrect requirements: got %v, want
%v : %v", got, want, p.String())
}
})
}
@@ -248,7 +248,7 @@ func TestMarshal_PTransformAnnotations(t *testing.T) {
pts := p.GetComponents().GetTransforms()
if got, want := len(pts), test.transforms; got != want {
- t.Errorf("got %d transforms, want %d : %v",
got, want, proto.MarshalTextString(p))
+ t.Errorf("got %d transforms, want %d : %v",
got, want, p.String())
}
for _, pt := range pts {
// Context annotations only apply to
composites, and are not duplicated to leaves.
diff --git a/sdks/go/pkg/beam/core/runtime/harness/harness_test.go
b/sdks/go/pkg/beam/core/runtime/harness/harness_test.go
index 91dd3c591d5..8c25db613eb 100644
--- a/sdks/go/pkg/beam/core/runtime/harness/harness_test.go
+++ b/sdks/go/pkg/beam/core/runtime/harness/harness_test.go
@@ -23,7 +23,7 @@ import (
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/exec"
fnpb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/fnexecution_v1"
pipepb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/pipeline_v1"
- "github.com/golang/protobuf/proto"
+ "google.golang.org/protobuf/proto"
)
// validDescriptor describes a valid pipeline with a source and a sink, but
doesn't do anything else.
diff --git a/sdks/go/pkg/beam/core/runtime/harness/statemgr.go
b/sdks/go/pkg/beam/core/runtime/harness/statemgr.go
index 76d4e1f32c2..061cfca011f 100644
--- a/sdks/go/pkg/beam/core/runtime/harness/statemgr.go
+++ b/sdks/go/pkg/beam/core/runtime/harness/statemgr.go
@@ -28,7 +28,6 @@ import (
"github.com/apache/beam/sdks/v2/go/pkg/beam/internal/errors"
"github.com/apache/beam/sdks/v2/go/pkg/beam/log"
fnpb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/fnexecution_v1"
- "github.com/golang/protobuf/proto"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
@@ -633,7 +632,7 @@ func (c *StateChannel) read(ctx context.Context) {
if !ok {
// This can happen if Send returns an error that write
handles, but
// the message was actually sent.
- log.Errorf(ctx, "StateChannel[%v].read: no consumer for
state response: %v", c.id, proto.MarshalTextString(msg))
+ log.Errorf(ctx, "StateChannel[%v].read: no consumer for
state response: %v", c.id, msg.String())
continue
}
@@ -641,7 +640,7 @@ func (c *StateChannel) read(ctx context.Context) {
case ch <- msg:
// ok
default:
- panic(fmt.Sprintf("StateChannel[%v].read: failed to
consume state response: %v", c.id, proto.MarshalTextString(msg)))
+ panic(fmt.Sprintf("StateChannel[%v].read: failed to
consume state response: %v", c.id, msg.String()))
}
}
}
diff --git a/sdks/go/pkg/beam/core/runtime/pipelinex/clone_test.go
b/sdks/go/pkg/beam/core/runtime/pipelinex/clone_test.go
index 695830a483c..b58a3098379 100644
--- a/sdks/go/pkg/beam/core/runtime/pipelinex/clone_test.go
+++ b/sdks/go/pkg/beam/core/runtime/pipelinex/clone_test.go
@@ -19,8 +19,8 @@ import (
"testing"
pipepb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/pipeline_v1"
- "github.com/golang/protobuf/proto"
"github.com/google/go-cmp/cmp"
+ "google.golang.org/protobuf/proto"
)
func TestShallowClonePTransform(t *testing.T) {
diff --git a/sdks/go/pkg/beam/core/runtime/pipelinex/replace.go
b/sdks/go/pkg/beam/core/runtime/pipelinex/replace.go
index cfcce88675b..9e527f2fd32 100644
--- a/sdks/go/pkg/beam/core/runtime/pipelinex/replace.go
+++ b/sdks/go/pkg/beam/core/runtime/pipelinex/replace.go
@@ -28,7 +28,7 @@ import (
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/util/reflectx"
"github.com/apache/beam/sdks/v2/go/pkg/beam/internal/errors"
pipepb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/pipeline_v1"
- "github.com/golang/protobuf/proto"
+ "google.golang.org/protobuf/proto"
)
// Update merges a pipeline with the given components, which may add, replace
diff --git a/sdks/go/pkg/beam/core/runtime/pipelinex/replace_test.go
b/sdks/go/pkg/beam/core/runtime/pipelinex/replace_test.go
index 79bfd43958a..3024787e616 100644
--- a/sdks/go/pkg/beam/core/runtime/pipelinex/replace_test.go
+++ b/sdks/go/pkg/beam/core/runtime/pipelinex/replace_test.go
@@ -20,8 +20,8 @@ import (
"testing"
pipepb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/pipeline_v1"
- "github.com/golang/protobuf/proto"
"github.com/google/go-cmp/cmp"
+ "google.golang.org/protobuf/proto"
"google.golang.org/protobuf/testing/protocmp"
)
diff --git a/sdks/go/pkg/beam/core/runtime/pipelinex/util.go
b/sdks/go/pkg/beam/core/runtime/pipelinex/util.go
index 5fe9def9b22..4735e7b77d2 100644
--- a/sdks/go/pkg/beam/core/runtime/pipelinex/util.go
+++ b/sdks/go/pkg/beam/core/runtime/pipelinex/util.go
@@ -19,7 +19,7 @@ import (
"sort"
pipepb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/pipeline_v1"
- "github.com/golang/protobuf/proto"
+ "google.golang.org/protobuf/proto"
)
// Bounded returns true iff all PCollections are bounded.
diff --git a/sdks/go/pkg/beam/core/runtime/xlangx/resolve_test.go
b/sdks/go/pkg/beam/core/runtime/xlangx/resolve_test.go
index 1f18b333541..eec13c451a1 100644
--- a/sdks/go/pkg/beam/core/runtime/xlangx/resolve_test.go
+++ b/sdks/go/pkg/beam/core/runtime/xlangx/resolve_test.go
@@ -20,7 +20,7 @@ import (
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph"
pipepb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/pipeline_v1"
- "github.com/golang/protobuf/proto"
+ "google.golang.org/protobuf/proto"
)
func createExternalEdge(typeUrn string, typePayload []byte) *graph.MultiEdge {
diff --git a/sdks/go/pkg/beam/core/util/protox/any.go
b/sdks/go/pkg/beam/core/util/protox/any.go
index e539a8c19de..46bd08b1aff 100644
--- a/sdks/go/pkg/beam/core/util/protox/any.go
+++ b/sdks/go/pkg/beam/core/util/protox/any.go
@@ -17,9 +17,9 @@ package protox
import (
"github.com/apache/beam/sdks/v2/go/pkg/beam/internal/errors"
- "github.com/golang/protobuf/proto"
- protobuf "github.com/golang/protobuf/ptypes/any"
- protobufw "github.com/golang/protobuf/ptypes/wrappers"
+ "google.golang.org/protobuf/proto"
+ protobuf "google.golang.org/protobuf/types/known/anypb"
+ protobufw "google.golang.org/protobuf/types/known/wrapperspb"
)
const (
diff --git a/sdks/go/pkg/beam/core/util/protox/any_test.go
b/sdks/go/pkg/beam/core/util/protox/any_test.go
index 1975bec405c..9eb7621db35 100644
--- a/sdks/go/pkg/beam/core/util/protox/any_test.go
+++ b/sdks/go/pkg/beam/core/util/protox/any_test.go
@@ -19,8 +19,8 @@ import (
"bytes"
"testing"
- "github.com/golang/protobuf/proto"
- protobufw "github.com/golang/protobuf/ptypes/wrappers"
+ "google.golang.org/protobuf/proto"
+ protobufw "google.golang.org/protobuf/types/known/wrapperspb"
)
func TestProtoPackingInvertibility(t *testing.T) {
diff --git a/sdks/go/pkg/beam/core/util/protox/base64.go
b/sdks/go/pkg/beam/core/util/protox/base64.go
index 7f0f5a4bdee..79ea8a025f7 100644
--- a/sdks/go/pkg/beam/core/util/protox/base64.go
+++ b/sdks/go/pkg/beam/core/util/protox/base64.go
@@ -19,7 +19,7 @@ import (
"encoding/base64"
"github.com/apache/beam/sdks/v2/go/pkg/beam/internal/errors"
- "github.com/golang/protobuf/proto"
+ "google.golang.org/protobuf/proto"
)
// MustEncodeBase64 encodes a proto wrapped in base64 and panics on failure.
diff --git a/sdks/go/pkg/beam/core/util/protox/protox.go
b/sdks/go/pkg/beam/core/util/protox/protox.go
index 3555886eefc..892a2ba97d0 100644
--- a/sdks/go/pkg/beam/core/util/protox/protox.go
+++ b/sdks/go/pkg/beam/core/util/protox/protox.go
@@ -16,7 +16,7 @@
// Package protox contains utilities for working with protobufs.
package protox
-import "github.com/golang/protobuf/proto"
+import "google.golang.org/protobuf/proto"
// MustEncode encode the message and panics on failure.
func MustEncode(msg proto.Message) []byte {
diff --git a/sdks/go/pkg/beam/create_test.go b/sdks/go/pkg/beam/create_test.go
index 785c3b33db6..e65fefc7f2d 100644
--- a/sdks/go/pkg/beam/create_test.go
+++ b/sdks/go/pkg/beam/create_test.go
@@ -23,7 +23,7 @@ import (
"github.com/apache/beam/sdks/v2/go/pkg/beam"
"github.com/apache/beam/sdks/v2/go/pkg/beam/testing/passert"
"github.com/apache/beam/sdks/v2/go/pkg/beam/testing/ptest"
- "github.com/golang/protobuf/proto"
+ "google.golang.org/protobuf/protoadapt"
)
func TestMain(m *testing.M) {
@@ -157,7 +157,5 @@ func (t *testProto) Unmarshal(b []byte) error {
// Ensure testProto is detected as a proto.Message and can be (un)marshalled by
// the proto library.
var (
- _ proto.Message = &testProto{}
- _ proto.Marshaler = &testProto{}
- _ proto.Unmarshaler = &testProto{}
+ _ protoadapt.MessageV1 = &testProto{}
)
diff --git a/sdks/go/pkg/beam/provision/provision.go
b/sdks/go/pkg/beam/provision/provision.go
index 3c36973535e..58a8f5ee829 100644
--- a/sdks/go/pkg/beam/provision/provision.go
+++ b/sdks/go/pkg/beam/provision/provision.go
@@ -24,7 +24,7 @@ import (
"github.com/apache/beam/sdks/v2/go/container/tools"
fnpb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/fnexecution_v1"
- google_pb "github.com/golang/protobuf/ptypes/struct"
+ google_pb "google.golang.org/protobuf/types/known/structpb"
)
// Info returns the runtime provisioning info for the worker.
diff --git a/sdks/go/pkg/beam/runners/dataflow/dataflow.go
b/sdks/go/pkg/beam/runners/dataflow/dataflow.go
index ca701979497..73667fb8ee6 100644
--- a/sdks/go/pkg/beam/runners/dataflow/dataflow.go
+++ b/sdks/go/pkg/beam/runners/dataflow/dataflow.go
@@ -47,7 +47,6 @@ import (
"github.com/apache/beam/sdks/v2/go/pkg/beam/runners/dataflow/dataflowlib"
"github.com/apache/beam/sdks/v2/go/pkg/beam/util/gcsx"
"github.com/apache/beam/sdks/v2/go/pkg/beam/x/hooks/perf"
- "github.com/golang/protobuf/proto"
)
// TODO(herohde) 5/16/2017: the Dataflow flags should match the other SDKs.
@@ -235,7 +234,7 @@ func Execute(ctx context.Context, p *beam.Pipeline)
(beam.PipelineResult, error)
if *dryRun {
log.Info(ctx, "Dry-run: not submitting job!")
- log.Info(ctx, proto.MarshalTextString(model))
+ log.Info(ctx, model.String())
job, err := dataflowlib.Translate(ctx, model, opts, workerURL,
modelURL)
if err != nil {
return nil, err
diff --git a/sdks/go/pkg/beam/runners/dataflow/dataflowlib/execute.go
b/sdks/go/pkg/beam/runners/dataflow/dataflowlib/execute.go
index 9a1641e314d..806b8940ae9 100644
--- a/sdks/go/pkg/beam/runners/dataflow/dataflowlib/execute.go
+++ b/sdks/go/pkg/beam/runners/dataflow/dataflowlib/execute.go
@@ -30,7 +30,6 @@ import (
"github.com/apache/beam/sdks/v2/go/pkg/beam/log"
pipepb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/pipeline_v1"
"github.com/apache/beam/sdks/v2/go/pkg/beam/runners/universal/runnerlib"
- "github.com/golang/protobuf/proto"
df "google.golang.org/api/dataflow/v1b3"
"google.golang.org/api/googleapi"
)
@@ -82,7 +81,7 @@ func Execute(ctx context.Context, raw *pipepb.Pipeline, opts
*JobOptions, worker
}
// (2) Upload model to GCS
- log.Info(ctx, proto.MarshalTextString(raw))
+ log.Info(ctx, raw.String())
if err := StageModel(ctx, opts.Project, modelURL,
protox.MustEncode(raw)); err != nil {
return presult, err
diff --git a/sdks/go/pkg/beam/runners/universal/runnerlib/job.go
b/sdks/go/pkg/beam/runners/universal/runnerlib/job.go
index 4e50661b3db..7d6a3027e47 100644
--- a/sdks/go/pkg/beam/runners/universal/runnerlib/job.go
+++ b/sdks/go/pkg/beam/runners/universal/runnerlib/job.go
@@ -28,7 +28,6 @@ import (
"github.com/apache/beam/sdks/v2/go/pkg/beam/log"
jobpb
"github.com/apache/beam/sdks/v2/go/pkg/beam/model/jobmanagement_v1"
pipepb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/pipeline_v1"
- "github.com/golang/protobuf/proto"
)
// JobOptions capture the various options for submitting jobs
@@ -152,7 +151,7 @@ func WaitForCompletion(ctx context.Context, client
jobpb.JobServiceClient, jobID
}
default:
- return errors.Errorf("unexpected job update: %v",
proto.MarshalTextString(msg))
+ return errors.Errorf("unexpected job update: %v",
msg.String())
}
}
}
diff --git a/sdks/go/pkg/beam/runners/universal/runnerlib/stage.go
b/sdks/go/pkg/beam/runners/universal/runnerlib/stage.go
index d5cc6aa7327..85d6fdc7e2c 100644
--- a/sdks/go/pkg/beam/runners/universal/runnerlib/stage.go
+++ b/sdks/go/pkg/beam/runners/universal/runnerlib/stage.go
@@ -29,8 +29,8 @@ import (
jobpb
"github.com/apache/beam/sdks/v2/go/pkg/beam/model/jobmanagement_v1"
pipepb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/pipeline_v1"
"github.com/apache/beam/sdks/v2/go/pkg/beam/util/grpcx"
- "github.com/golang/protobuf/proto"
"google.golang.org/grpc"
+ "google.golang.org/protobuf/proto"
)
// Stage stages the worker binary and any additional files to the given
diff --git a/sdks/go/pkg/beam/runners/universal/universal.go
b/sdks/go/pkg/beam/runners/universal/universal.go
index 8af9e91e1e1..c63175c5857 100644
--- a/sdks/go/pkg/beam/runners/universal/universal.go
+++ b/sdks/go/pkg/beam/runners/universal/universal.go
@@ -32,7 +32,6 @@ import (
"github.com/apache/beam/sdks/v2/go/pkg/beam/runners/universal/extworker"
"github.com/apache/beam/sdks/v2/go/pkg/beam/runners/universal/runnerlib"
"github.com/apache/beam/sdks/v2/go/pkg/beam/runners/vet"
- "github.com/golang/protobuf/proto"
)
func init() {
@@ -93,7 +92,7 @@ func Execute(ctx context.Context, p *beam.Pipeline)
(beam.PipelineResult, error)
return nil, errors.WithContextf(err, "generating model
pipeline")
}
- log.Info(ctx, proto.MarshalTextString(pipeline))
+ log.Info(ctx, pipeline.String())
opt := &runnerlib.JobOptions{
Name: jobopts.GetJobName(),
diff --git a/sdks/go/pkg/beam/transforms/xlang/schema/external.go
b/sdks/go/pkg/beam/transforms/xlang/schema/external.go
index 75be90cbe7b..55a858b9cf9 100644
--- a/sdks/go/pkg/beam/transforms/xlang/schema/external.go
+++ b/sdks/go/pkg/beam/transforms/xlang/schema/external.go
@@ -20,7 +20,7 @@ import (
"github.com/apache/beam/sdks/v2/go/pkg/beam"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/xlangx"
pipepb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/pipeline_v1"
- "github.com/golang/protobuf/proto"
+ "google.golang.org/protobuf/proto"
)
const schemaTransformURN = "beam:expansion:payload:schematransform:v1"
diff --git a/sdks/java/container/boot.go b/sdks/java/container/boot.go
index 14e2e4311b4..c23e50dcf1b 100644
--- a/sdks/java/container/boot.go
+++ b/sdks/java/container/boot.go
@@ -35,7 +35,6 @@ import (
"github.com/apache/beam/sdks/v2/go/pkg/beam/util/execx"
"github.com/apache/beam/sdks/v2/go/pkg/beam/util/grpcx"
"github.com/apache/beam/sdks/v2/go/pkg/beam/util/syscallx"
- "github.com/golang/protobuf/proto"
)
var (
@@ -126,12 +125,12 @@ func main() {
if err := tools.MakePipelineOptionsFileAndEnvVar(options); err != nil {
logger.Fatalf(ctx, "Failed to load pipeline options to worker:
%v", err)
}
- os.Setenv("LOGGING_API_SERVICE_DESCRIPTOR",
proto.MarshalTextString(&pipepb.ApiServiceDescriptor{Url: *loggingEndpoint}))
- os.Setenv("CONTROL_API_SERVICE_DESCRIPTOR",
proto.MarshalTextString(&pipepb.ApiServiceDescriptor{Url: *controlEndpoint}))
+ os.Setenv("LOGGING_API_SERVICE_DESCRIPTOR",
(&pipepb.ApiServiceDescriptor{Url: *loggingEndpoint}).String())
+ os.Setenv("CONTROL_API_SERVICE_DESCRIPTOR",
(&pipepb.ApiServiceDescriptor{Url: *controlEndpoint}).String())
os.Setenv("RUNNER_CAPABILITIES",
strings.Join(info.GetRunnerCapabilities(), " "))
if info.GetStatusEndpoint() != nil {
- os.Setenv("STATUS_API_SERVICE_DESCRIPTOR",
proto.MarshalTextString(info.GetStatusEndpoint()))
+ os.Setenv("STATUS_API_SERVICE_DESCRIPTOR",
info.GetStatusEndpoint().String())
}
const jarsDir = "/opt/apache/beam/jars"
diff --git a/sdks/python/container/boot.go b/sdks/python/container/boot.go
index 710041e0f04..696604c6488 100644
--- a/sdks/python/container/boot.go
+++ b/sdks/python/container/boot.go
@@ -41,8 +41,8 @@ import (
pipepb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/pipeline_v1"
"github.com/apache/beam/sdks/v2/go/pkg/beam/util/execx"
"github.com/apache/beam/sdks/v2/go/pkg/beam/util/grpcx"
- "github.com/golang/protobuf/jsonpb"
- "github.com/golang/protobuf/proto"
+ "google.golang.org/protobuf/encoding/protojson"
+ "google.golang.org/protobuf/proto"
)
var (
@@ -217,12 +217,12 @@ func launchSDKProcess() error {
os.Setenv("PIPELINE_OPTIONS", options)
os.Setenv("SEMI_PERSISTENT_DIRECTORY", *semiPersistDir)
- os.Setenv("LOGGING_API_SERVICE_DESCRIPTOR",
proto.MarshalTextString(&pipepb.ApiServiceDescriptor{Url: *loggingEndpoint}))
- os.Setenv("CONTROL_API_SERVICE_DESCRIPTOR",
proto.MarshalTextString(&pipepb.ApiServiceDescriptor{Url: *controlEndpoint}))
+ os.Setenv("LOGGING_API_SERVICE_DESCRIPTOR",
(&pipepb.ApiServiceDescriptor{Url: *loggingEndpoint}).String())
+ os.Setenv("CONTROL_API_SERVICE_DESCRIPTOR",
(&pipepb.ApiServiceDescriptor{Url: *controlEndpoint}).String())
os.Setenv("RUNNER_CAPABILITIES",
strings.Join(info.GetRunnerCapabilities(), " "))
if info.GetStatusEndpoint() != nil {
- os.Setenv("STATUS_API_SERVICE_DESCRIPTOR",
proto.MarshalTextString(info.GetStatusEndpoint()))
+ os.Setenv("STATUS_API_SERVICE_DESCRIPTOR",
info.GetStatusEndpoint().String())
}
if metadata := info.GetMetadata(); metadata != nil {
@@ -441,7 +441,7 @@ func processArtifactsInSetupOnlyMode() {
files := make([]string, len(infoJsons))
for i, info := range infoJsons {
var artifactInformation pipepb.ArtifactInformation
- if err := jsonpb.UnmarshalString(info, &artifactInformation);
err != nil {
+ if err := protojson.Unmarshal([]byte(info),
&artifactInformation); err != nil {
log.Fatalf("Unable to unmarshal artifact information
from json string %v", info)
}