[
https://issues.apache.org/jira/browse/BEAM-6155?focusedWorklogId=172911&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-172911
]
ASF GitHub Bot logged work on BEAM-6155:
----------------------------------------
Author: ASF GitHub Bot
Created on: 07/Dec/18 02:03
Start Date: 07/Dec/18 02:03
Worklog Time Spent: 10m
Work Description: aaltay closed pull request #7182: [BEAM-6155] Updates
the GCS library the Go SDK uses.
URL: https://github.com/apache/beam/pull/7182
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git a/sdks/go/pkg/beam/artifact/gcsproxy/retrieval.go
b/sdks/go/pkg/beam/artifact/gcsproxy/retrieval.go
index dede7a51c1af..a59b81d93ffc 100644
--- a/sdks/go/pkg/beam/artifact/gcsproxy/retrieval.go
+++ b/sdks/go/pkg/beam/artifact/gcsproxy/retrieval.go
@@ -19,11 +19,11 @@ import (
"fmt"
"io"
+ "cloud.google.com/go/storage"
pb "github.com/apache/beam/sdks/go/pkg/beam/model/jobmanagement_v1"
"github.com/apache/beam/sdks/go/pkg/beam/util/gcsx"
"github.com/golang/protobuf/proto"
"golang.org/x/net/context"
- "google.golang.org/api/storage/v1"
)
// RetrievalServer is a artifact retrieval server backed by Google
@@ -41,7 +41,7 @@ func ReadProxyManifest(ctx context.Context, object string)
(*pb.ProxyManifest, e
return nil, fmt.Errorf("invalid manifest object %v: %v",
object, err)
}
- cl, err := gcsx.NewClient(ctx, storage.DevstorageReadOnlyScope)
+ cl, err := gcsx.NewClient(ctx, storage.ScopeReadOnly)
if err != nil {
return nil, fmt.Errorf("failed to create GCS client: %v", err)
}
@@ -88,22 +88,22 @@ func (s *RetrievalServer) GetArtifact(req
*pb.GetArtifactRequest, stream pb.Arti
bucket, object := parseObject(blob)
- client, err := gcsx.NewClient(stream.Context(),
storage.DevstorageReadOnlyScope)
+ client, err := gcsx.NewClient(stream.Context(), storage.ScopeReadOnly)
if err != nil {
return fmt.Errorf("Failed to create client for %v: %v", key,
err)
}
// Stream artifact in up to 1MB chunks.
-
- resp, err := client.Objects.Get(bucket, object).Download()
+ ctx := context.TODO()
+ r, err := client.Bucket(bucket).Object(object).NewReader(ctx)
if err != nil {
return fmt.Errorf("Failed to read object for %v: %v", key, err)
}
- defer resp.Body.Close()
+ defer r.Close()
data := make([]byte, 1<<20)
for {
- n, err := resp.Body.Read(data)
+ n, err := r.Read(data)
if n > 0 {
if err := stream.Send(&pb.ArtifactChunk{Data:
data[:n]}); err != nil {
return fmt.Errorf("chunk send failed: %v", err)
diff --git a/sdks/go/pkg/beam/artifact/gcsproxy/staging.go
b/sdks/go/pkg/beam/artifact/gcsproxy/staging.go
index 51ffec6fe129..109eb29cdb43 100644
--- a/sdks/go/pkg/beam/artifact/gcsproxy/staging.go
+++ b/sdks/go/pkg/beam/artifact/gcsproxy/staging.go
@@ -26,11 +26,11 @@ import (
"path"
"sync"
+ "cloud.google.com/go/storage"
pb "github.com/apache/beam/sdks/go/pkg/beam/model/jobmanagement_v1"
"github.com/apache/beam/sdks/go/pkg/beam/util/gcsx"
"github.com/golang/protobuf/proto"
"golang.org/x/net/context"
- "google.golang.org/api/storage/v1"
)
// StagingServer is a artifact staging server backed by Google Cloud Storage
@@ -81,7 +81,7 @@ func (s *StagingServer) CommitManifest(ctx context.Context,
req *pb.CommitManife
return nil, fmt.Errorf("failed to marshal proxy manifest: %v",
err)
}
- cl, err := gcsx.NewClient(ctx, storage.DevstorageReadWriteScope)
+ cl, err := gcsx.NewClient(ctx, storage.ScopeReadWrite)
if err != nil {
return nil, fmt.Errorf("failed to create GCS client: %v", err)
}
@@ -135,7 +135,7 @@ func (s *StagingServer) PutArtifact(ps
pb.ArtifactStagingService_PutArtifactServ
// Stream content to GCS. We don't have to worry about partial
// or abandoned writes, because object writes are atomic.
- cl, err := gcsx.NewClient(ps.Context(),
storage.DevstorageReadWriteScope)
+ cl, err := gcsx.NewClient(ps.Context(), storage.ScopeReadWrite)
if err != nil {
return fmt.Errorf("failed to create GCS client: %v", err)
}
diff --git a/sdks/go/pkg/beam/io/filesystem/gcs/gcs.go
b/sdks/go/pkg/beam/io/filesystem/gcs/gcs.go
index 9a7b265796d7..356c718a024d 100644
--- a/sdks/go/pkg/beam/io/filesystem/gcs/gcs.go
+++ b/sdks/go/pkg/beam/io/filesystem/gcs/gcs.go
@@ -18,17 +18,18 @@
package gcs
import (
- "bytes"
"context"
"fmt"
"io"
"path/filepath"
"strings"
+ "cloud.google.com/go/storage"
"github.com/apache/beam/sdks/go/pkg/beam/io/filesystem"
"github.com/apache/beam/sdks/go/pkg/beam/log"
"github.com/apache/beam/sdks/go/pkg/beam/util/gcsx"
- "google.golang.org/api/storage/v1"
+ "google.golang.org/api/iterator"
+ "google.golang.org/api/option"
)
func init() {
@@ -36,28 +37,27 @@ func init() {
}
type fs struct {
- client *storage.Service
+ client *storage.Client
}
// New creates a new Google Cloud Storage filesystem using application
// default credentials. If it fails, it falls back to unauthenticated
// access.
func New(ctx context.Context) filesystem.Interface {
- client, err := gcsx.NewClient(ctx, storage.DevstorageReadWriteScope)
+ client, err := storage.NewClient(ctx,
option.WithScopes(storage.ScopeReadWrite))
if err != nil {
log.Warnf(ctx, "Warning: falling back to unauthenticated GCS
access: %v", err)
- client, err = gcsx.NewUnauthenticatedClient(ctx)
+ client, err = storage.NewClient(ctx,
option.WithoutAuthentication())
if err != nil {
- panic(fmt.Sprintf("failed to create GCE client: %v",
err))
+ panic(fmt.Sprintf("failed to create GCS client: %v",
err))
}
}
return &fs{client: client}
}
func (f *fs) Close() error {
- f.client = nil
- return nil
+ return f.client.Close()
}
func (f *fs) List(ctx context.Context, glob string) ([]string, error) {
@@ -72,24 +72,28 @@ func (f *fs) List(ctx context.Context, glob string)
([]string, error) {
// For now, we assume * is the first matching character to make
a
// prefix listing and not list the entire bucket.
- err :=
f.client.Objects.List(bucket).Prefix(object[:index]).Pages(ctx, func(list
*storage.Objects) error {
- for _, obj := range list.Items {
- match, err := filepath.Match(object, obj.Name)
- if err != nil {
- return err
- }
- if match {
- candidates = append(candidates,
obj.Name)
- }
- }
- return nil
+ it := f.client.Bucket(bucket).Objects(ctx, &storage.Query{
+ Prefix: object[:index],
})
- if err != nil {
- return nil, err
+ for {
+ obj, err := it.Next()
+ if err == iterator.Done {
+ break
+ }
+ if err != nil {
+ return nil, err
+ }
+
+ match, err := filepath.Match(object, obj.Name)
+ if err != nil {
+ return nil, err
+ }
+ if match {
+ candidates = append(candidates, obj.Name)
+ }
}
} else {
// Single object.
-
candidates = []string{object}
}
@@ -106,11 +110,7 @@ func (f *fs) OpenRead(ctx context.Context, filename
string) (io.ReadCloser, erro
return nil, err
}
- resp, err := f.client.Objects.Get(bucket, object).Download()
- if err != nil {
- return nil, err
- }
- return resp.Body, nil
+ return f.client.Bucket(bucket).Object(object).NewReader(ctx)
}
// TODO(herohde) 7/12/2017: should we create the bucket in OpenWrite? For now,
"no".
@@ -120,20 +120,6 @@ func (f *fs) OpenWrite(ctx context.Context, filename
string) (io.WriteCloser, er
if err != nil {
return nil, err
}
- return &writer{client: f.client, bucket: bucket, object: object}, nil
-}
-
-type writer struct {
- client *storage.Service
- bucket, object string
-
- buf bytes.Buffer
-}
-
-func (w *writer) Write(data []byte) (n int, err error) {
- return w.buf.Write(data)
-}
-func (w *writer) Close() error {
- return gcsx.WriteObject(w.client, w.bucket, w.object, &w.buf)
+ return f.client.Bucket(bucket).Object(object).NewWriter(ctx), nil
}
diff --git a/sdks/go/pkg/beam/runners/dataflow/dataflow.go
b/sdks/go/pkg/beam/runners/dataflow/dataflow.go
index a19678b8e03e..05c8c8cec27e 100644
--- a/sdks/go/pkg/beam/runners/dataflow/dataflow.go
+++ b/sdks/go/pkg/beam/runners/dataflow/dataflow.go
@@ -28,6 +28,7 @@ import (
"sync/atomic"
"time"
+ "cloud.google.com/go/storage"
"github.com/apache/beam/sdks/go/pkg/beam"
"github.com/apache/beam/sdks/go/pkg/beam/core/runtime/graphx"
"github.com/apache/beam/sdks/go/pkg/beam/core/util/hooks"
@@ -39,7 +40,6 @@ import (
"github.com/apache/beam/sdks/go/pkg/beam/util/gcsx"
"github.com/apache/beam/sdks/go/pkg/beam/x/hooks/perf"
"github.com/golang/protobuf/proto"
- "google.golang.org/api/storage/v1"
)
// TODO(herohde) 5/16/2017: the Dataflow flags should match the other SDKs.
@@ -178,7 +178,7 @@ func gcsRecorderHook(opts []string) perf.CaptureHook {
}
return func(ctx context.Context, spec string, r io.Reader) error {
- client, err := gcsx.NewClient(ctx,
storage.DevstorageReadWriteScope)
+ client, err := gcsx.NewClient(ctx, storage.ScopeReadWrite)
if err != nil {
return fmt.Errorf("couldn't establish GCS client: %v",
err)
}
diff --git a/sdks/go/pkg/beam/runners/dataflow/dataflowlib/stage.go
b/sdks/go/pkg/beam/runners/dataflow/dataflowlib/stage.go
index f467fea65703..c385eeffc232 100644
--- a/sdks/go/pkg/beam/runners/dataflow/dataflowlib/stage.go
+++ b/sdks/go/pkg/beam/runners/dataflow/dataflowlib/stage.go
@@ -22,8 +22,8 @@ import (
"io"
"os"
+ "cloud.google.com/go/storage"
"github.com/apache/beam/sdks/go/pkg/beam/util/gcsx"
- "google.golang.org/api/storage/v1"
)
// StageModel uploads the pipeline model to GCS as a unique object.
@@ -47,7 +47,7 @@ func upload(ctx context.Context, project, object string, r
io.Reader) error {
if err != nil {
return fmt.Errorf("invalid staging location %v: %v", object,
err)
}
- client, err := gcsx.NewClient(ctx, storage.DevstorageReadWriteScope)
+ client, err := gcsx.NewClient(ctx, storage.ScopeReadWrite)
if err != nil {
return err
}
diff --git a/sdks/go/pkg/beam/util/gcsx/gcs.go
b/sdks/go/pkg/beam/util/gcsx/gcs.go
index c426988f6338..aaee2f8a24b9 100644
--- a/sdks/go/pkg/beam/util/gcsx/gcs.go
+++ b/sdks/go/pkg/beam/util/gcsx/gcs.go
@@ -22,39 +22,26 @@ import (
"io"
"io/ioutil"
"net/url"
-
- "net/http"
-
"path"
- "golang.org/x/oauth2/google"
- "google.golang.org/api/googleapi"
+ "cloud.google.com/go/storage"
"google.golang.org/api/option"
- "google.golang.org/api/storage/v1"
- ghttp "google.golang.org/api/transport/http"
)
-// NewClient creates a new GCS client with default application credentials.
-func NewClient(ctx context.Context, scope string) (*storage.Service, error) {
- cl, err := google.DefaultClient(ctx, scope)
- if err != nil {
- return nil, err
- }
- return storage.New(cl)
+// NewClient creates a new GCS client with default application credentials,
and supplied
+// OAuth scope. The OAuth scopes are defined in
https://godoc.org/cloud.google.com/go/storage#pkg-constants.
+func NewClient(ctx context.Context, scope string) (*storage.Client, error) {
+ return storage.NewClient(ctx, option.WithScopes(scope))
}
// NewUnauthenticatedClient creates a new GCS client without authentication.
-func NewUnauthenticatedClient(ctx context.Context) (*storage.Service, error) {
- cl, _, err := ghttp.NewClient(ctx, option.WithoutAuthentication())
- if err != nil {
- return nil, fmt.Errorf("dialing: %v", err)
- }
- return storage.New(cl)
+func NewUnauthenticatedClient(ctx context.Context) (*storage.Client, error) {
+ return storage.NewClient(ctx, option.WithoutAuthentication())
}
// Upload writes the given content to GCS. If the specified bucket does not
// exist, it is created first. Returns the full path of the object.
-func Upload(client *storage.Service, project, bucket, object string, r
io.Reader) (string, error) {
+func Upload(client *storage.Client, project, bucket, object string, r
io.Reader) (string, error) {
exists, err := BucketExists(client, bucket)
if err != nil {
return "", err
@@ -69,21 +56,20 @@ func Upload(client *storage.Service, project, bucket,
object string, r io.Reader
return "", err
}
return fmt.Sprintf("gs://%s/%s", bucket, object), nil
+
}
// CreateBucket creates a bucket in GCS.
-func CreateBucket(client *storage.Service, project, bucket string) error {
- b := &storage.Bucket{
- Name: bucket,
- }
- _, err := client.Buckets.Insert(project, b).Do()
- return err
+func CreateBucket(client *storage.Client, project, bucket string) error {
+ ctx := context.TODO()
+ return client.Bucket(bucket).Create(ctx, project, nil)
}
// BucketExists returns true iff the given bucket exists.
-func BucketExists(client *storage.Service, bucket string) (bool, error) {
- _, err := client.Buckets.Get(bucket).Do()
- if e, ok := err.(*googleapi.Error); ok && e.Code == http.StatusNotFound
{
+func BucketExists(client *storage.Client, bucket string) (bool, error) {
+ ctx := context.TODO()
+ _, err := client.Bucket(bucket).Attrs(ctx)
+ if err == storage.ErrBucketNotExist {
return false, nil
}
return err == nil, err
@@ -91,22 +77,24 @@ func BucketExists(client *storage.Service, bucket string)
(bool, error) {
// WriteObject writes the given content to the specified object. If the object
// already exist, it is overwritten.
-func WriteObject(client *storage.Service, bucket, object string, r io.Reader)
error {
- obj := &storage.Object{
- Name: object,
- Bucket: bucket,
+func WriteObject(client *storage.Client, bucket, object string, r io.Reader)
error {
+ ctx := context.TODO()
+ w := client.Bucket(bucket).Object(object).NewWriter(ctx)
+ _, err := io.Copy(w, r)
+ if err != nil {
+ return err
}
- _, err := client.Objects.Insert(bucket, obj).Media(r).Do()
- return err
+ return w.Close()
}
// ReadObject reads the content of the given object in full.
-func ReadObject(client *storage.Service, bucket, object string) ([]byte,
error) {
- resp, err := client.Objects.Get(bucket, object).Download()
+func ReadObject(client *storage.Client, bucket, object string) ([]byte, error)
{
+ ctx := context.TODO()
+ r, err := client.Bucket(bucket).Object(object).NewReader(ctx)
if err != nil {
return nil, err
}
- return ioutil.ReadAll(resp.Body)
+ return ioutil.ReadAll(r)
}
// MakeObject creates a object location from bucket and path. For example,
diff --git a/sdks/go/pkg/beam/util/gcsx/gcs_test.go
b/sdks/go/pkg/beam/util/gcsx/gcs_test.go
new file mode 100644
index 000000000000..82ab5cd5666f
--- /dev/null
+++ b/sdks/go/pkg/beam/util/gcsx/gcs_test.go
@@ -0,0 +1,43 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package gcsx_test
+
+import (
+ "context"
+
+ "cloud.google.com/go/storage"
+ "github.com/apache/beam/sdks/go/pkg/beam/util/gcsx"
+)
+
+func Example() {
+ ctx := context.Background()
+ c, err := gcsx.NewClient(ctx, storage.ScopeReadOnly)
+ if err != nil {
+ // do something
+ }
+
+ buckets, object, err := gcsx.ParseObject("gs://some-bucket/some-object")
+ if err != nil {
+ // do something
+ }
+
+ bytes, err := gcsx.ReadObject(c, buckets, object)
+ if err != nil {
+ // do something
+ }
+
+ _ = bytes
+}
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 172911)
Time Spent: 0.5h (was: 20m)
> Migrate the Go SDK to the modern GCS library
> --------------------------------------------
>
> Key: BEAM-6155
> URL: https://issues.apache.org/jira/browse/BEAM-6155
> Project: Beam
> Issue Type: Improvement
> Components: sdk-go
> Reporter: Andrew Brampton
> Assignee: Robert Burke
> Priority: Major
> Time Spent: 0.5h
> Remaining Estimate: 0h
>
> The gcsx package is using the google.golang.org/api/storage/v1 GCS library.
> That library has been deprecated for ~6 months, and the recommendation is to
> use the newer
> [cloud.google.com/go/storage|https://godoc.org/cloud.google.com/go/storage]
> package. That package supports newer features, and has built in connection
> pooling, timeout support, retry with exponential backoff, etc.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)