This is an automated email from the ASF dual-hosted git repository.

lostluck pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 828717a71d6 [#21515][Go SDK] Update go protobuf package to new version 
(#32045)
828717a71d6 is described below

commit 828717a71d638664ba12cad5c0c00193bb1cde35
Author: Vatsal <[email protected]>
AuthorDate: Wed Aug 7 09:54:15 2024 -0700

    [#21515][Go SDK] Update go protobuf package to new version (#32045)
---
 CHANGES.md                                                 |  1 +
 sdks/go.mod                                                |  2 +-
 sdks/go/cmd/beamctl/cmd/provision.go                       |  3 +--
 sdks/go/container/boot_test.go                             |  2 +-
 sdks/go/container/tools/provision.go                       | 13 +++++++++----
 sdks/go/pkg/beam/artifact/gcsproxy/retrieval.go            |  2 +-
 sdks/go/pkg/beam/artifact/gcsproxy/staging.go              |  2 +-
 sdks/go/pkg/beam/artifact/materialize.go                   |  2 +-
 sdks/go/pkg/beam/artifact/materialize_test.go              |  2 +-
 sdks/go/pkg/beam/coder.go                                  | 12 ++++++------
 sdks/go/pkg/beam/core/runtime/exec/translate.go            |  2 +-
 sdks/go/pkg/beam/core/runtime/graphx/coder.go              |  8 ++++----
 sdks/go/pkg/beam/core/runtime/graphx/schema/schema.go      |  2 +-
 sdks/go/pkg/beam/core/runtime/graphx/schema/schema_test.go |  3 +--
 sdks/go/pkg/beam/core/runtime/graphx/translate.go          |  8 ++++----
 sdks/go/pkg/beam/core/runtime/graphx/translate_test.go     | 10 +++++-----
 sdks/go/pkg/beam/core/runtime/harness/harness_test.go      |  2 +-
 sdks/go/pkg/beam/core/runtime/harness/statemgr.go          |  5 ++---
 sdks/go/pkg/beam/core/runtime/pipelinex/clone_test.go      |  2 +-
 sdks/go/pkg/beam/core/runtime/pipelinex/replace.go         |  2 +-
 sdks/go/pkg/beam/core/runtime/pipelinex/replace_test.go    |  2 +-
 sdks/go/pkg/beam/core/runtime/pipelinex/util.go            |  2 +-
 sdks/go/pkg/beam/core/runtime/xlangx/resolve_test.go       |  2 +-
 sdks/go/pkg/beam/core/util/protox/any.go                   |  6 +++---
 sdks/go/pkg/beam/core/util/protox/any_test.go              |  4 ++--
 sdks/go/pkg/beam/core/util/protox/base64.go                |  2 +-
 sdks/go/pkg/beam/core/util/protox/protox.go                |  2 +-
 sdks/go/pkg/beam/create_test.go                            |  6 ++----
 sdks/go/pkg/beam/provision/provision.go                    |  2 +-
 sdks/go/pkg/beam/runners/dataflow/dataflow.go              |  3 +--
 sdks/go/pkg/beam/runners/dataflow/dataflowlib/execute.go   |  3 +--
 sdks/go/pkg/beam/runners/universal/runnerlib/job.go        |  3 +--
 sdks/go/pkg/beam/runners/universal/runnerlib/stage.go      |  2 +-
 sdks/go/pkg/beam/runners/universal/universal.go            |  3 +--
 sdks/go/pkg/beam/transforms/xlang/schema/external.go       |  2 +-
 sdks/java/container/boot.go                                |  7 +++----
 sdks/python/container/boot.go                              | 12 ++++++------
 37 files changed, 72 insertions(+), 76 deletions(-)

diff --git a/CHANGES.md b/CHANGES.md
index 129fa01f94a..d082f03fd31 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -69,6 +69,7 @@
 
 * X feature added (Java/Python) 
([#X](https://github.com/apache/beam/issues/X)).
 * Go SDK Minimum Go Version updated to 1.21 
([#32092](https://github.com/apache/beam/pull/32092)).
+* Updated Go protobuf package to new version (Go) 
([#21515](https://github.com/apache/beam/issues/21515)).
 
 ## Breaking Changes
 
diff --git a/sdks/go.mod b/sdks/go.mod
index a5ad9f3b7f5..fb0b7f85f3d 100644
--- a/sdks/go.mod
+++ b/sdks/go.mod
@@ -39,7 +39,6 @@ require (
        github.com/docker/go-connections v0.5.0
        github.com/dustin/go-humanize v1.0.1
        github.com/go-sql-driver/mysql v1.8.1
-       github.com/golang/protobuf v1.5.4 // TODO(danoliveira): Fully replace 
this with google.golang.org/protobuf
        github.com/google/go-cmp v0.6.0
        github.com/google/uuid v1.6.0
        github.com/johannesboyne/gofakes3 v0.0.0-20221110173912-32fb85c5aed6
@@ -88,6 +87,7 @@ require (
        github.com/go-logr/logr v1.4.1 // indirect
        github.com/go-logr/stdr v1.2.2 // indirect
        github.com/go-ole/go-ole v1.2.6 // indirect
+       github.com/golang/protobuf v1.5.4 // indirect
        github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0 // 
indirect
        github.com/minio/highwayhash v1.0.2 // indirect
        github.com/moby/docker-image-spec v1.3.1 // indirect
diff --git a/sdks/go/cmd/beamctl/cmd/provision.go 
b/sdks/go/cmd/beamctl/cmd/provision.go
index cab82f7bf9d..878c9a77da8 100644
--- a/sdks/go/cmd/beamctl/cmd/provision.go
+++ b/sdks/go/cmd/beamctl/cmd/provision.go
@@ -17,7 +17,6 @@ package cmd
 
 import (
        fnpb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/fnexecution_v1"
-       "github.com/golang/protobuf/proto"
        "github.com/spf13/cobra"
 )
 
@@ -53,6 +52,6 @@ func infoFn(cmd *cobra.Command, args []string) error {
                return err
        }
 
-       cmd.Print(proto.MarshalTextString(info.GetInfo()))
+       cmd.Print(info.GetInfo().String())
        return nil
 }
diff --git a/sdks/go/container/boot_test.go b/sdks/go/container/boot_test.go
index e799e5d65b0..49c78047249 100644
--- a/sdks/go/container/boot_test.go
+++ b/sdks/go/container/boot_test.go
@@ -25,7 +25,7 @@ import (
        "github.com/apache/beam/sdks/v2/go/pkg/beam/artifact"
        fnpb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/fnexecution_v1"
        pipepb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/pipeline_v1"
-       "github.com/golang/protobuf/proto"
+       "google.golang.org/protobuf/proto"
 )
 
 func TestEnsureEndpointsSet_AllSet(t *testing.T) {
diff --git a/sdks/go/container/tools/provision.go 
b/sdks/go/container/tools/provision.go
index dab3383fc17..6b370a5c2e6 100644
--- a/sdks/go/container/tools/provision.go
+++ b/sdks/go/container/tools/provision.go
@@ -29,8 +29,8 @@ import (
 
        fnpb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/fnexecution_v1"
        "github.com/apache/beam/sdks/v2/go/pkg/beam/util/grpcx"
-       "github.com/golang/protobuf/jsonpb"
-       google_pb "github.com/golang/protobuf/ptypes/struct"
+       "google.golang.org/protobuf/encoding/protojson"
+       google_pb "google.golang.org/protobuf/types/known/structpb"
 )
 
 // ProvisionInfo returns the runtime provisioning info for the worker.
@@ -65,7 +65,8 @@ func OptionsToProto(v any) (*google_pb.Struct, error) {
 // JSONToProto converts JSON-encoded pipeline options to a proto struct.
 func JSONToProto(data string) (*google_pb.Struct, error) {
        var out google_pb.Struct
-       if err := jsonpb.UnmarshalString(string(data), &out); err != nil {
+
+       if err := protojson.Unmarshal([]byte(data), &out); err != nil {
                return nil, err
        }
        return &out, nil
@@ -85,5 +86,9 @@ func ProtoToJSON(opt *google_pb.Struct) (string, error) {
        if opt == nil {
                return "{}", nil
        }
-       return (&jsonpb.Marshaler{}).MarshalToString(opt)
+       bytes, err := protojson.Marshal(opt)
+       if err != nil {
+               return "", err
+       }
+       return string(bytes), err
 }
diff --git a/sdks/go/pkg/beam/artifact/gcsproxy/retrieval.go 
b/sdks/go/pkg/beam/artifact/gcsproxy/retrieval.go
index 15c2d9e2954..ceb8a319be9 100644
--- a/sdks/go/pkg/beam/artifact/gcsproxy/retrieval.go
+++ b/sdks/go/pkg/beam/artifact/gcsproxy/retrieval.go
@@ -22,8 +22,8 @@ import (
        "github.com/apache/beam/sdks/v2/go/pkg/beam/internal/errors"
        jobpb 
"github.com/apache/beam/sdks/v2/go/pkg/beam/model/jobmanagement_v1"
        "github.com/apache/beam/sdks/v2/go/pkg/beam/util/gcsx"
-       "github.com/golang/protobuf/proto"
        "golang.org/x/net/context"
+       "google.golang.org/protobuf/proto"
 )
 
 // RetrievalServer is a artifact retrieval server backed by Google
diff --git a/sdks/go/pkg/beam/artifact/gcsproxy/staging.go 
b/sdks/go/pkg/beam/artifact/gcsproxy/staging.go
index a2950843980..9113e780f33 100644
--- a/sdks/go/pkg/beam/artifact/gcsproxy/staging.go
+++ b/sdks/go/pkg/beam/artifact/gcsproxy/staging.go
@@ -28,8 +28,8 @@ import (
        "github.com/apache/beam/sdks/v2/go/pkg/beam/internal/errors"
        jobpb 
"github.com/apache/beam/sdks/v2/go/pkg/beam/model/jobmanagement_v1"
        "github.com/apache/beam/sdks/v2/go/pkg/beam/util/gcsx"
-       "github.com/golang/protobuf/proto"
        "golang.org/x/net/context"
+       "google.golang.org/protobuf/proto"
 )
 
 // StagingServer is a artifact staging server backed by Google Cloud Storage
diff --git a/sdks/go/pkg/beam/artifact/materialize.go 
b/sdks/go/pkg/beam/artifact/materialize.go
index 866e0dd99b9..624e30efcd2 100644
--- a/sdks/go/pkg/beam/artifact/materialize.go
+++ b/sdks/go/pkg/beam/artifact/materialize.go
@@ -38,7 +38,7 @@ import (
        pipepb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/pipeline_v1"
        "github.com/apache/beam/sdks/v2/go/pkg/beam/util/errorx"
        "github.com/apache/beam/sdks/v2/go/pkg/beam/util/grpcx"
-       "github.com/golang/protobuf/proto"
+       "google.golang.org/protobuf/proto"
 )
 
 // TODO(lostluck): 2018/05/28 Extract these from their enum descriptors in the 
pipeline_v1 proto
diff --git a/sdks/go/pkg/beam/artifact/materialize_test.go 
b/sdks/go/pkg/beam/artifact/materialize_test.go
index 35223c908b7..31890ed045c 100644
--- a/sdks/go/pkg/beam/artifact/materialize_test.go
+++ b/sdks/go/pkg/beam/artifact/materialize_test.go
@@ -29,9 +29,9 @@ import (
        jobpb 
"github.com/apache/beam/sdks/v2/go/pkg/beam/model/jobmanagement_v1"
        pipepb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/pipeline_v1"
        "github.com/apache/beam/sdks/v2/go/pkg/beam/util/grpcx"
-       "github.com/golang/protobuf/proto"
        "google.golang.org/grpc"
        "google.golang.org/grpc/metadata"
+       "google.golang.org/protobuf/proto"
 )
 
 // TestRetrieve tests that we can successfully retrieve fresh files.
diff --git a/sdks/go/pkg/beam/coder.go b/sdks/go/pkg/beam/coder.go
index 062bb337e8d..b03b739ed7b 100644
--- a/sdks/go/pkg/beam/coder.go
+++ b/sdks/go/pkg/beam/coder.go
@@ -30,8 +30,8 @@ import (
        "github.com/apache/beam/sdks/v2/go/pkg/beam/core/util/jsonx"
        "github.com/apache/beam/sdks/v2/go/pkg/beam/core/util/reflectx"
        "github.com/apache/beam/sdks/v2/go/pkg/beam/internal/errors"
-       protov1 "github.com/golang/protobuf/proto"
        protov2 "google.golang.org/protobuf/proto"
+       "google.golang.org/protobuf/protoadapt"
        "google.golang.org/protobuf/reflect/protoreflect"
 )
 
@@ -51,7 +51,7 @@ type jsonCoder interface {
        json.Unmarshaler
 }
 
-var protoMessageType = reflect.TypeOf((*protov1.Message)(nil)).Elem()
+var protoMessageType = reflect.TypeOf((*protoadapt.MessageV1)(nil)).Elem()
 var protoReflectMessageType = 
reflect.TypeOf((*protoreflect.ProtoMessage)(nil)).Elem()
 var jsonCoderType = reflect.TypeOf((*jsonCoder)(nil)).Elem()
 
@@ -276,8 +276,8 @@ func protoEnc(in T) ([]byte, error) {
        switch it := in.(type) {
        case protoreflect.ProtoMessage:
                p = it
-       case protov1.Message:
-               p = protov1.MessageV2(it)
+       case protoadapt.MessageV1:
+               p = protoadapt.MessageV2Of(it)
        }
        b, err := protov2.MarshalOptions{Deterministic: true}.Marshal(p)
        if err != nil {
@@ -293,8 +293,8 @@ func protoDec(t reflect.Type, in []byte) (T, error) {
        switch it := reflect.New(t.Elem()).Interface().(type) {
        case protoreflect.ProtoMessage:
                p = it
-       case protov1.Message:
-               p = protov1.MessageV2(it)
+       case protoadapt.MessageV1:
+               p = protoadapt.MessageV2Of(it)
        }
        err := protov2.UnmarshalOptions{}.Unmarshal(in, p)
        if err != nil {
diff --git a/sdks/go/pkg/beam/core/runtime/exec/translate.go 
b/sdks/go/pkg/beam/core/runtime/exec/translate.go
index 72af9e80c40..b74ede228fd 100644
--- a/sdks/go/pkg/beam/core/runtime/exec/translate.go
+++ b/sdks/go/pkg/beam/core/runtime/exec/translate.go
@@ -33,7 +33,7 @@ import (
        "github.com/apache/beam/sdks/v2/go/pkg/beam/internal/errors"
        fnpb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/fnexecution_v1"
        pipepb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/pipeline_v1"
-       "github.com/golang/protobuf/proto"
+       "google.golang.org/protobuf/proto"
 )
 
 // TODO(lostluck): 2018/05/28 Extract these from the canonical enums in 
beam_runner_api.proto
diff --git a/sdks/go/pkg/beam/core/runtime/graphx/coder.go 
b/sdks/go/pkg/beam/core/runtime/graphx/coder.go
index 87b3771e575..99ca5517d3d 100644
--- a/sdks/go/pkg/beam/core/runtime/graphx/coder.go
+++ b/sdks/go/pkg/beam/core/runtime/graphx/coder.go
@@ -27,7 +27,7 @@ import (
        "github.com/apache/beam/sdks/v2/go/pkg/beam/core/util/protox"
        "github.com/apache/beam/sdks/v2/go/pkg/beam/internal/errors"
        pipepb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/pipeline_v1"
-       "github.com/golang/protobuf/proto"
+       "google.golang.org/protobuf/proto"
 )
 
 const (
@@ -615,8 +615,8 @@ func (b *CoderMarshaller) internRowCoder(schema 
*pipepb.Schema) string {
 }
 
 func (b *CoderMarshaller) internCoder(coder *pipepb.Coder) string {
-       key := proto.MarshalTextString(coder)
-       if id, exists := b.coder2id[key]; exists {
+       key := coder.String()
+       if id, exists := b.coder2id[(key)]; exists {
                return id
        }
 
@@ -626,7 +626,7 @@ func (b *CoderMarshaller) internCoder(coder *pipepb.Coder) 
string {
        } else {
                id = fmt.Sprintf("c%v@%v", len(b.coder2id), b.Namespace)
        }
-       b.coder2id[key] = id
+       b.coder2id[string(key)] = id
        b.coders[id] = coder
        return id
 }
diff --git a/sdks/go/pkg/beam/core/runtime/graphx/schema/schema.go 
b/sdks/go/pkg/beam/core/runtime/graphx/schema/schema.go
index fdd9355e1cb..0d44e68285b 100644
--- a/sdks/go/pkg/beam/core/runtime/graphx/schema/schema.go
+++ b/sdks/go/pkg/beam/core/runtime/graphx/schema/schema.go
@@ -37,8 +37,8 @@ import (
        "github.com/apache/beam/sdks/v2/go/pkg/beam/core/util/reflectx"
        "github.com/apache/beam/sdks/v2/go/pkg/beam/internal/errors"
        pipepb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/pipeline_v1"
-       "github.com/golang/protobuf/proto"
        "github.com/google/uuid"
+       "google.golang.org/protobuf/proto"
 )
 
 // Initialize registered schemas. For use by the beam package at beam.Init 
time.
diff --git a/sdks/go/pkg/beam/core/runtime/graphx/schema/schema_test.go 
b/sdks/go/pkg/beam/core/runtime/graphx/schema/schema_test.go
index 37b3e79f8f5..367d70e81d1 100644
--- a/sdks/go/pkg/beam/core/runtime/graphx/schema/schema_test.go
+++ b/sdks/go/pkg/beam/core/runtime/graphx/schema/schema_test.go
@@ -24,7 +24,6 @@ import (
 
        "github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime"
        pipepb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/pipeline_v1"
-       "github.com/golang/protobuf/proto"
        "github.com/google/go-cmp/cmp"
        "google.golang.org/protobuf/encoding/prototext"
        "google.golang.org/protobuf/testing/protocmp"
@@ -806,7 +805,7 @@ func TestSchemaConversion(t *testing.T) {
                                }
                                if d := cmp.Diff(test.st, got,
                                        protocmp.Transform(),
-                                       
protocmp.IgnoreFields(proto.MessageV2(&pipepb.Schema{}), "id"),
+                                       protocmp.IgnoreFields(&pipepb.Schema{}, 
"id"),
                                ); d != "" {
                                        t.Errorf("diff (-want, +got): %v", d)
                                }
diff --git a/sdks/go/pkg/beam/core/runtime/graphx/translate.go 
b/sdks/go/pkg/beam/core/runtime/graphx/translate.go
index b0529254613..65280ef6b93 100644
--- a/sdks/go/pkg/beam/core/runtime/graphx/translate.go
+++ b/sdks/go/pkg/beam/core/runtime/graphx/translate.go
@@ -34,7 +34,7 @@ import (
        "github.com/apache/beam/sdks/v2/go/pkg/beam/internal/errors"
        pipepb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/pipeline_v1"
        "github.com/apache/beam/sdks/v2/go/pkg/beam/options/resource"
-       "github.com/golang/protobuf/proto"
+       "google.golang.org/protobuf/proto"
        "google.golang.org/protobuf/types/known/durationpb"
 )
 
@@ -1209,13 +1209,13 @@ func (m *marshaller) addWindowingStrategy(w 
*window.WindowingStrategy) (string,
 }
 
 func (m *marshaller) internWindowingStrategy(w *pipepb.WindowingStrategy) 
string {
-       key := proto.MarshalTextString(w)
-       if id, exists := m.windowing2id[key]; exists {
+       key := w.String()
+       if id, exists := m.windowing2id[(key)]; exists {
                return id
        }
 
        id := fmt.Sprintf("w%v", len(m.windowing2id))
-       m.windowing2id[key] = id
+       m.windowing2id[string(key)] = id
        m.windowing[id] = w
        return id
 }
diff --git a/sdks/go/pkg/beam/core/runtime/graphx/translate_test.go 
b/sdks/go/pkg/beam/core/runtime/graphx/translate_test.go
index a331aedd585..e18a5f97796 100644
--- a/sdks/go/pkg/beam/core/runtime/graphx/translate_test.go
+++ b/sdks/go/pkg/beam/core/runtime/graphx/translate_test.go
@@ -34,8 +34,8 @@ import (
        "github.com/apache/beam/sdks/v2/go/pkg/beam/core/util/protox"
        "github.com/apache/beam/sdks/v2/go/pkg/beam/core/util/reflectx"
        pipepb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/pipeline_v1"
-       "github.com/golang/protobuf/proto"
        "github.com/google/go-cmp/cmp"
+       "google.golang.org/protobuf/proto"
 )
 
 func init() {
@@ -181,13 +181,13 @@ func TestMarshal(t *testing.T) {
                        }
 
                        if got, want := len(p.GetComponents().GetTransforms()), 
test.transforms; got != want {
-                               t.Errorf("got %d transforms, want %d : %v", 
got, want, proto.MarshalTextString(p))
+                               t.Errorf("got %d transforms, want %d : %v", 
got, want, p.String())
                        }
                        if got, want := len(p.GetRootTransformIds()), 
test.roots; got != want {
-                               t.Errorf("got %d roots, want %d : %v", got, 
want, proto.MarshalTextString(p))
+                               t.Errorf("got %d roots, want %d : %v", got, 
want, p.String())
                        }
                        if got, want := p.GetRequirements(), test.requirements; 
!cmp.Equal(got, want, cmpopts.SortSlices(func(a, b string) bool { return a < b 
})) {
-                               t.Errorf("incorrect requirements: got %v, want 
%v : %v", got, want, proto.MarshalTextString(p))
+                               t.Errorf("incorrect requirements: got %v, want 
%v : %v", got, want, p.String())
                        }
                })
        }
@@ -248,7 +248,7 @@ func TestMarshal_PTransformAnnotations(t *testing.T) {
 
                        pts := p.GetComponents().GetTransforms()
                        if got, want := len(pts), test.transforms; got != want {
-                               t.Errorf("got %d transforms, want %d : %v", 
got, want, proto.MarshalTextString(p))
+                               t.Errorf("got %d transforms, want %d : %v", 
got, want, p.String())
                        }
                        for _, pt := range pts {
                                // Context annotations only apply to 
composites, and are not duplicated to leaves.
diff --git a/sdks/go/pkg/beam/core/runtime/harness/harness_test.go 
b/sdks/go/pkg/beam/core/runtime/harness/harness_test.go
index 91dd3c591d5..8c25db613eb 100644
--- a/sdks/go/pkg/beam/core/runtime/harness/harness_test.go
+++ b/sdks/go/pkg/beam/core/runtime/harness/harness_test.go
@@ -23,7 +23,7 @@ import (
        "github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/exec"
        fnpb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/fnexecution_v1"
        pipepb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/pipeline_v1"
-       "github.com/golang/protobuf/proto"
+       "google.golang.org/protobuf/proto"
 )
 
 // validDescriptor describes a valid pipeline with a source and a sink, but 
doesn't do anything else.
diff --git a/sdks/go/pkg/beam/core/runtime/harness/statemgr.go 
b/sdks/go/pkg/beam/core/runtime/harness/statemgr.go
index 76d4e1f32c2..061cfca011f 100644
--- a/sdks/go/pkg/beam/core/runtime/harness/statemgr.go
+++ b/sdks/go/pkg/beam/core/runtime/harness/statemgr.go
@@ -28,7 +28,6 @@ import (
        "github.com/apache/beam/sdks/v2/go/pkg/beam/internal/errors"
        "github.com/apache/beam/sdks/v2/go/pkg/beam/log"
        fnpb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/fnexecution_v1"
-       "github.com/golang/protobuf/proto"
        "google.golang.org/grpc/codes"
        "google.golang.org/grpc/status"
 )
@@ -633,7 +632,7 @@ func (c *StateChannel) read(ctx context.Context) {
                if !ok {
                        // This can happen if Send returns an error that write 
handles, but
                        // the message was actually sent.
-                       log.Errorf(ctx, "StateChannel[%v].read: no consumer for 
state response: %v", c.id, proto.MarshalTextString(msg))
+                       log.Errorf(ctx, "StateChannel[%v].read: no consumer for 
state response: %v", c.id, msg.String())
                        continue
                }
 
@@ -641,7 +640,7 @@ func (c *StateChannel) read(ctx context.Context) {
                case ch <- msg:
                        // ok
                default:
-                       panic(fmt.Sprintf("StateChannel[%v].read: failed to 
consume state response: %v", c.id, proto.MarshalTextString(msg)))
+                       panic(fmt.Sprintf("StateChannel[%v].read: failed to 
consume state response: %v", c.id, msg.String()))
                }
        }
 }
diff --git a/sdks/go/pkg/beam/core/runtime/pipelinex/clone_test.go 
b/sdks/go/pkg/beam/core/runtime/pipelinex/clone_test.go
index 695830a483c..b58a3098379 100644
--- a/sdks/go/pkg/beam/core/runtime/pipelinex/clone_test.go
+++ b/sdks/go/pkg/beam/core/runtime/pipelinex/clone_test.go
@@ -19,8 +19,8 @@ import (
        "testing"
 
        pipepb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/pipeline_v1"
-       "github.com/golang/protobuf/proto"
        "github.com/google/go-cmp/cmp"
+       "google.golang.org/protobuf/proto"
 )
 
 func TestShallowClonePTransform(t *testing.T) {
diff --git a/sdks/go/pkg/beam/core/runtime/pipelinex/replace.go 
b/sdks/go/pkg/beam/core/runtime/pipelinex/replace.go
index cfcce88675b..9e527f2fd32 100644
--- a/sdks/go/pkg/beam/core/runtime/pipelinex/replace.go
+++ b/sdks/go/pkg/beam/core/runtime/pipelinex/replace.go
@@ -28,7 +28,7 @@ import (
        "github.com/apache/beam/sdks/v2/go/pkg/beam/core/util/reflectx"
        "github.com/apache/beam/sdks/v2/go/pkg/beam/internal/errors"
        pipepb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/pipeline_v1"
-       "github.com/golang/protobuf/proto"
+       "google.golang.org/protobuf/proto"
 )
 
 // Update merges a pipeline with the given components, which may add, replace
diff --git a/sdks/go/pkg/beam/core/runtime/pipelinex/replace_test.go 
b/sdks/go/pkg/beam/core/runtime/pipelinex/replace_test.go
index 79bfd43958a..3024787e616 100644
--- a/sdks/go/pkg/beam/core/runtime/pipelinex/replace_test.go
+++ b/sdks/go/pkg/beam/core/runtime/pipelinex/replace_test.go
@@ -20,8 +20,8 @@ import (
        "testing"
 
        pipepb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/pipeline_v1"
-       "github.com/golang/protobuf/proto"
        "github.com/google/go-cmp/cmp"
+       "google.golang.org/protobuf/proto"
        "google.golang.org/protobuf/testing/protocmp"
 )
 
diff --git a/sdks/go/pkg/beam/core/runtime/pipelinex/util.go 
b/sdks/go/pkg/beam/core/runtime/pipelinex/util.go
index 5fe9def9b22..4735e7b77d2 100644
--- a/sdks/go/pkg/beam/core/runtime/pipelinex/util.go
+++ b/sdks/go/pkg/beam/core/runtime/pipelinex/util.go
@@ -19,7 +19,7 @@ import (
        "sort"
 
        pipepb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/pipeline_v1"
-       "github.com/golang/protobuf/proto"
+       "google.golang.org/protobuf/proto"
 )
 
 // Bounded returns true iff all PCollections are bounded.
diff --git a/sdks/go/pkg/beam/core/runtime/xlangx/resolve_test.go 
b/sdks/go/pkg/beam/core/runtime/xlangx/resolve_test.go
index 1f18b333541..eec13c451a1 100644
--- a/sdks/go/pkg/beam/core/runtime/xlangx/resolve_test.go
+++ b/sdks/go/pkg/beam/core/runtime/xlangx/resolve_test.go
@@ -20,7 +20,7 @@ import (
 
        "github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph"
        pipepb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/pipeline_v1"
-       "github.com/golang/protobuf/proto"
+       "google.golang.org/protobuf/proto"
 )
 
 func createExternalEdge(typeUrn string, typePayload []byte) *graph.MultiEdge {
diff --git a/sdks/go/pkg/beam/core/util/protox/any.go 
b/sdks/go/pkg/beam/core/util/protox/any.go
index e539a8c19de..46bd08b1aff 100644
--- a/sdks/go/pkg/beam/core/util/protox/any.go
+++ b/sdks/go/pkg/beam/core/util/protox/any.go
@@ -17,9 +17,9 @@ package protox
 
 import (
        "github.com/apache/beam/sdks/v2/go/pkg/beam/internal/errors"
-       "github.com/golang/protobuf/proto"
-       protobuf "github.com/golang/protobuf/ptypes/any"
-       protobufw "github.com/golang/protobuf/ptypes/wrappers"
+       "google.golang.org/protobuf/proto"
+       protobuf "google.golang.org/protobuf/types/known/anypb"
+       protobufw "google.golang.org/protobuf/types/known/wrapperspb"
 )
 
 const (
diff --git a/sdks/go/pkg/beam/core/util/protox/any_test.go 
b/sdks/go/pkg/beam/core/util/protox/any_test.go
index 1975bec405c..9eb7621db35 100644
--- a/sdks/go/pkg/beam/core/util/protox/any_test.go
+++ b/sdks/go/pkg/beam/core/util/protox/any_test.go
@@ -19,8 +19,8 @@ import (
        "bytes"
        "testing"
 
-       "github.com/golang/protobuf/proto"
-       protobufw "github.com/golang/protobuf/ptypes/wrappers"
+       "google.golang.org/protobuf/proto"
+       protobufw "google.golang.org/protobuf/types/known/wrapperspb"
 )
 
 func TestProtoPackingInvertibility(t *testing.T) {
diff --git a/sdks/go/pkg/beam/core/util/protox/base64.go 
b/sdks/go/pkg/beam/core/util/protox/base64.go
index 7f0f5a4bdee..79ea8a025f7 100644
--- a/sdks/go/pkg/beam/core/util/protox/base64.go
+++ b/sdks/go/pkg/beam/core/util/protox/base64.go
@@ -19,7 +19,7 @@ import (
        "encoding/base64"
 
        "github.com/apache/beam/sdks/v2/go/pkg/beam/internal/errors"
-       "github.com/golang/protobuf/proto"
+       "google.golang.org/protobuf/proto"
 )
 
 // MustEncodeBase64 encodes a proto wrapped in base64 and panics on failure.
diff --git a/sdks/go/pkg/beam/core/util/protox/protox.go 
b/sdks/go/pkg/beam/core/util/protox/protox.go
index 3555886eefc..892a2ba97d0 100644
--- a/sdks/go/pkg/beam/core/util/protox/protox.go
+++ b/sdks/go/pkg/beam/core/util/protox/protox.go
@@ -16,7 +16,7 @@
 // Package protox contains utilities for working with protobufs.
 package protox
 
-import "github.com/golang/protobuf/proto"
+import "google.golang.org/protobuf/proto"
 
 // MustEncode encode the message and panics on failure.
 func MustEncode(msg proto.Message) []byte {
diff --git a/sdks/go/pkg/beam/create_test.go b/sdks/go/pkg/beam/create_test.go
index 785c3b33db6..e65fefc7f2d 100644
--- a/sdks/go/pkg/beam/create_test.go
+++ b/sdks/go/pkg/beam/create_test.go
@@ -23,7 +23,7 @@ import (
        "github.com/apache/beam/sdks/v2/go/pkg/beam"
        "github.com/apache/beam/sdks/v2/go/pkg/beam/testing/passert"
        "github.com/apache/beam/sdks/v2/go/pkg/beam/testing/ptest"
-       "github.com/golang/protobuf/proto"
+       "google.golang.org/protobuf/protoadapt"
 )
 
 func TestMain(m *testing.M) {
@@ -157,7 +157,5 @@ func (t *testProto) Unmarshal(b []byte) error {
 // Ensure testProto is detected as a proto.Message and can be (un)marshalled by
 // the proto library.
 var (
-       _ proto.Message     = &testProto{}
-       _ proto.Marshaler   = &testProto{}
-       _ proto.Unmarshaler = &testProto{}
+       _ protoadapt.MessageV1 = &testProto{}
 )
diff --git a/sdks/go/pkg/beam/provision/provision.go 
b/sdks/go/pkg/beam/provision/provision.go
index 3c36973535e..58a8f5ee829 100644
--- a/sdks/go/pkg/beam/provision/provision.go
+++ b/sdks/go/pkg/beam/provision/provision.go
@@ -24,7 +24,7 @@ import (
 
        "github.com/apache/beam/sdks/v2/go/container/tools"
        fnpb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/fnexecution_v1"
-       google_pb "github.com/golang/protobuf/ptypes/struct"
+       google_pb "google.golang.org/protobuf/types/known/structpb"
 )
 
 // Info returns the runtime provisioning info for the worker.
diff --git a/sdks/go/pkg/beam/runners/dataflow/dataflow.go 
b/sdks/go/pkg/beam/runners/dataflow/dataflow.go
index ca701979497..73667fb8ee6 100644
--- a/sdks/go/pkg/beam/runners/dataflow/dataflow.go
+++ b/sdks/go/pkg/beam/runners/dataflow/dataflow.go
@@ -47,7 +47,6 @@ import (
        
"github.com/apache/beam/sdks/v2/go/pkg/beam/runners/dataflow/dataflowlib"
        "github.com/apache/beam/sdks/v2/go/pkg/beam/util/gcsx"
        "github.com/apache/beam/sdks/v2/go/pkg/beam/x/hooks/perf"
-       "github.com/golang/protobuf/proto"
 )
 
 // TODO(herohde) 5/16/2017: the Dataflow flags should match the other SDKs.
@@ -235,7 +234,7 @@ func Execute(ctx context.Context, p *beam.Pipeline) 
(beam.PipelineResult, error)
        if *dryRun {
                log.Info(ctx, "Dry-run: not submitting job!")
 
-               log.Info(ctx, proto.MarshalTextString(model))
+               log.Info(ctx, model.String())
                job, err := dataflowlib.Translate(ctx, model, opts, workerURL, 
modelURL)
                if err != nil {
                        return nil, err
diff --git a/sdks/go/pkg/beam/runners/dataflow/dataflowlib/execute.go 
b/sdks/go/pkg/beam/runners/dataflow/dataflowlib/execute.go
index 9a1641e314d..806b8940ae9 100644
--- a/sdks/go/pkg/beam/runners/dataflow/dataflowlib/execute.go
+++ b/sdks/go/pkg/beam/runners/dataflow/dataflowlib/execute.go
@@ -30,7 +30,6 @@ import (
        "github.com/apache/beam/sdks/v2/go/pkg/beam/log"
        pipepb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/pipeline_v1"
        "github.com/apache/beam/sdks/v2/go/pkg/beam/runners/universal/runnerlib"
-       "github.com/golang/protobuf/proto"
        df "google.golang.org/api/dataflow/v1b3"
        "google.golang.org/api/googleapi"
 )
@@ -82,7 +81,7 @@ func Execute(ctx context.Context, raw *pipepb.Pipeline, opts 
*JobOptions, worker
        }
 
        // (2) Upload model to GCS
-       log.Info(ctx, proto.MarshalTextString(raw))
+       log.Info(ctx, raw.String())
 
        if err := StageModel(ctx, opts.Project, modelURL, 
protox.MustEncode(raw)); err != nil {
                return presult, err
diff --git a/sdks/go/pkg/beam/runners/universal/runnerlib/job.go 
b/sdks/go/pkg/beam/runners/universal/runnerlib/job.go
index 4e50661b3db..7d6a3027e47 100644
--- a/sdks/go/pkg/beam/runners/universal/runnerlib/job.go
+++ b/sdks/go/pkg/beam/runners/universal/runnerlib/job.go
@@ -28,7 +28,6 @@ import (
        "github.com/apache/beam/sdks/v2/go/pkg/beam/log"
        jobpb 
"github.com/apache/beam/sdks/v2/go/pkg/beam/model/jobmanagement_v1"
        pipepb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/pipeline_v1"
-       "github.com/golang/protobuf/proto"
 )
 
 // JobOptions capture the various options for submitting jobs
@@ -152,7 +151,7 @@ func WaitForCompletion(ctx context.Context, client 
jobpb.JobServiceClient, jobID
                        }
 
                default:
-                       return errors.Errorf("unexpected job update: %v", 
proto.MarshalTextString(msg))
+                       return errors.Errorf("unexpected job update: %v", 
msg.String())
                }
        }
 }
diff --git a/sdks/go/pkg/beam/runners/universal/runnerlib/stage.go 
b/sdks/go/pkg/beam/runners/universal/runnerlib/stage.go
index d5cc6aa7327..85d6fdc7e2c 100644
--- a/sdks/go/pkg/beam/runners/universal/runnerlib/stage.go
+++ b/sdks/go/pkg/beam/runners/universal/runnerlib/stage.go
@@ -29,8 +29,8 @@ import (
        jobpb 
"github.com/apache/beam/sdks/v2/go/pkg/beam/model/jobmanagement_v1"
        pipepb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/pipeline_v1"
        "github.com/apache/beam/sdks/v2/go/pkg/beam/util/grpcx"
-       "github.com/golang/protobuf/proto"
        "google.golang.org/grpc"
+       "google.golang.org/protobuf/proto"
 )
 
 // Stage stages the worker binary and any additional files to the given
diff --git a/sdks/go/pkg/beam/runners/universal/universal.go 
b/sdks/go/pkg/beam/runners/universal/universal.go
index 8af9e91e1e1..c63175c5857 100644
--- a/sdks/go/pkg/beam/runners/universal/universal.go
+++ b/sdks/go/pkg/beam/runners/universal/universal.go
@@ -32,7 +32,6 @@ import (
        "github.com/apache/beam/sdks/v2/go/pkg/beam/runners/universal/extworker"
        "github.com/apache/beam/sdks/v2/go/pkg/beam/runners/universal/runnerlib"
        "github.com/apache/beam/sdks/v2/go/pkg/beam/runners/vet"
-       "github.com/golang/protobuf/proto"
 )
 
 func init() {
@@ -93,7 +92,7 @@ func Execute(ctx context.Context, p *beam.Pipeline) 
(beam.PipelineResult, error)
                return nil, errors.WithContextf(err, "generating model 
pipeline")
        }
 
-       log.Info(ctx, proto.MarshalTextString(pipeline))
+       log.Info(ctx, pipeline.String())
 
        opt := &runnerlib.JobOptions{
                Name:         jobopts.GetJobName(),
diff --git a/sdks/go/pkg/beam/transforms/xlang/schema/external.go 
b/sdks/go/pkg/beam/transforms/xlang/schema/external.go
index 75be90cbe7b..55a858b9cf9 100644
--- a/sdks/go/pkg/beam/transforms/xlang/schema/external.go
+++ b/sdks/go/pkg/beam/transforms/xlang/schema/external.go
@@ -20,7 +20,7 @@ import (
        "github.com/apache/beam/sdks/v2/go/pkg/beam"
        "github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/xlangx"
        pipepb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/pipeline_v1"
-       "github.com/golang/protobuf/proto"
+       "google.golang.org/protobuf/proto"
 )
 
 const schemaTransformURN = "beam:expansion:payload:schematransform:v1"
diff --git a/sdks/java/container/boot.go b/sdks/java/container/boot.go
index 14e2e4311b4..c23e50dcf1b 100644
--- a/sdks/java/container/boot.go
+++ b/sdks/java/container/boot.go
@@ -35,7 +35,6 @@ import (
        "github.com/apache/beam/sdks/v2/go/pkg/beam/util/execx"
        "github.com/apache/beam/sdks/v2/go/pkg/beam/util/grpcx"
        "github.com/apache/beam/sdks/v2/go/pkg/beam/util/syscallx"
-       "github.com/golang/protobuf/proto"
 )
 
 var (
@@ -126,12 +125,12 @@ func main() {
        if err := tools.MakePipelineOptionsFileAndEnvVar(options); err != nil {
                logger.Fatalf(ctx, "Failed to load pipeline options to worker: 
%v", err)
        }
-       os.Setenv("LOGGING_API_SERVICE_DESCRIPTOR", 
proto.MarshalTextString(&pipepb.ApiServiceDescriptor{Url: *loggingEndpoint}))
-       os.Setenv("CONTROL_API_SERVICE_DESCRIPTOR", 
proto.MarshalTextString(&pipepb.ApiServiceDescriptor{Url: *controlEndpoint}))
+       os.Setenv("LOGGING_API_SERVICE_DESCRIPTOR", 
(&pipepb.ApiServiceDescriptor{Url: *loggingEndpoint}).String())
+       os.Setenv("CONTROL_API_SERVICE_DESCRIPTOR", 
(&pipepb.ApiServiceDescriptor{Url: *controlEndpoint}).String())
        os.Setenv("RUNNER_CAPABILITIES", 
strings.Join(info.GetRunnerCapabilities(), " "))
 
        if info.GetStatusEndpoint() != nil {
-               os.Setenv("STATUS_API_SERVICE_DESCRIPTOR", 
proto.MarshalTextString(info.GetStatusEndpoint()))
+               os.Setenv("STATUS_API_SERVICE_DESCRIPTOR", 
info.GetStatusEndpoint().String())
        }
 
        const jarsDir = "/opt/apache/beam/jars"
diff --git a/sdks/python/container/boot.go b/sdks/python/container/boot.go
index 710041e0f04..696604c6488 100644
--- a/sdks/python/container/boot.go
+++ b/sdks/python/container/boot.go
@@ -41,8 +41,8 @@ import (
        pipepb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/pipeline_v1"
        "github.com/apache/beam/sdks/v2/go/pkg/beam/util/execx"
        "github.com/apache/beam/sdks/v2/go/pkg/beam/util/grpcx"
-       "github.com/golang/protobuf/jsonpb"
-       "github.com/golang/protobuf/proto"
+       "google.golang.org/protobuf/encoding/protojson"
+       "google.golang.org/protobuf/proto"
 )
 
 var (
@@ -217,12 +217,12 @@ func launchSDKProcess() error {
 
        os.Setenv("PIPELINE_OPTIONS", options)
        os.Setenv("SEMI_PERSISTENT_DIRECTORY", *semiPersistDir)
-       os.Setenv("LOGGING_API_SERVICE_DESCRIPTOR", 
proto.MarshalTextString(&pipepb.ApiServiceDescriptor{Url: *loggingEndpoint}))
-       os.Setenv("CONTROL_API_SERVICE_DESCRIPTOR", 
proto.MarshalTextString(&pipepb.ApiServiceDescriptor{Url: *controlEndpoint}))
+       os.Setenv("LOGGING_API_SERVICE_DESCRIPTOR", 
(&pipepb.ApiServiceDescriptor{Url: *loggingEndpoint}).String())
+       os.Setenv("CONTROL_API_SERVICE_DESCRIPTOR", 
(&pipepb.ApiServiceDescriptor{Url: *controlEndpoint}).String())
        os.Setenv("RUNNER_CAPABILITIES", 
strings.Join(info.GetRunnerCapabilities(), " "))
 
        if info.GetStatusEndpoint() != nil {
-               os.Setenv("STATUS_API_SERVICE_DESCRIPTOR", 
proto.MarshalTextString(info.GetStatusEndpoint()))
+               os.Setenv("STATUS_API_SERVICE_DESCRIPTOR", 
info.GetStatusEndpoint().String())
        }
 
        if metadata := info.GetMetadata(); metadata != nil {
@@ -441,7 +441,7 @@ func processArtifactsInSetupOnlyMode() {
        files := make([]string, len(infoJsons))
        for i, info := range infoJsons {
                var artifactInformation pipepb.ArtifactInformation
-               if err := jsonpb.UnmarshalString(info, &artifactInformation); 
err != nil {
+               if err := protojson.Unmarshal([]byte(info), 
&artifactInformation); err != nil {
                        log.Fatalf("Unable to unmarshal artifact information 
from json string %v", info)
                }
 


Reply via email to