This is an automated email from the ASF dual-hosted git repository.
lostluck pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new a2fc441f348 Adding Google Storage Requester pays feature to Golang
SDK. (#33236)
a2fc441f348 is described below
commit a2fc441f348bbcea568df184c95e4fef203db5a3
Author: Leonardo Cesar Borges <[email protected]>
AuthorDate: Thu May 29 06:49:01 2025 -0300
Adding Google Storage Requester pays feature to Golang SDK. (#33236)
* Adding Google Storage Requester pays feature to Golang SDK.
Setting UserProject on Google Storage Bucket operations to enable requester
pays
feature.
Requester pays project ID will come from environment variable named
`BILLING_PROJECT_ID`
More information about Google storage requester pays feature here
https://cloud.google.com/storage/docs/requester-pays
* Adding new entry to CHANGES.md
---
CHANGES.md | 2 +-
sdks/go/pkg/beam/io/filesystem/gcs/gcs.go | 62 +++++++++++++++++++++-----
sdks/go/pkg/beam/io/filesystem/gcs/gcs_test.go | 23 ++++++++++
3 files changed, 75 insertions(+), 12 deletions(-)
diff --git a/CHANGES.md b/CHANGES.md
index a8925df49d7..cff581230b7 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -74,7 +74,7 @@
* Support for X source added (Java/Python)
([#X](https://github.com/apache/beam/issues/X)).
## New Features / Improvements
-
+* Adding Google Storage Requests Pays feature
(Golang)([#30747](https://github.com/apache/beam/issues/30747)).
* X feature added (Java/Python)
([#X](https://github.com/apache/beam/issues/X)).
* [Python] Prism runner now auto-enabled for some Python pipelines using the
direct runner ([#34921](https://github.com/apache/beam/pull/34921)).
* [YAML] WriteToTFRecord and ReadFromTFRecord Beam YAML support
diff --git a/sdks/go/pkg/beam/io/filesystem/gcs/gcs.go
b/sdks/go/pkg/beam/io/filesystem/gcs/gcs.go
index 55509d9ff2f..73e68638105 100644
--- a/sdks/go/pkg/beam/io/filesystem/gcs/gcs.go
+++ b/sdks/go/pkg/beam/io/filesystem/gcs/gcs.go
@@ -25,6 +25,7 @@ import (
"time"
"cloud.google.com/go/storage"
+ "github.com/apache/beam/sdks/v2/go/pkg/beam/core/util/hooks"
"github.com/apache/beam/sdks/v2/go/pkg/beam/internal/errors"
"github.com/apache/beam/sdks/v2/go/pkg/beam/io/filesystem"
"github.com/apache/beam/sdks/v2/go/pkg/beam/log"
@@ -33,8 +34,30 @@ import (
"google.golang.org/api/iterator"
)
+const (
+ projectBillingHook = "beam:go:hook:filesystem:billingproject"
+)
+
+var billingProject string = ""
+
func init() {
filesystem.Register("gs", New)
+ hf := func(opts []string) hooks.Hook {
+ return hooks.Hook{
+ Init: func(ctx context.Context) (context.Context,
error) {
+ if len(opts) == 0 {
+ return ctx, nil
+ }
+ if len(opts) > 1 {
+ return ctx, fmt.Errorf("expected 1
option, got %v: %v", len(opts), opts)
+ }
+
+ billingProject = opts[0]
+ return ctx, nil
+ },
+ }
+ }
+ hooks.RegisterHook(projectBillingHook, hf)
}
type fs struct {
@@ -44,6 +67,7 @@ type fs struct {
// New creates a new Google Cloud Storage filesystem using application
// default credentials. If it fails, it falls back to unauthenticated
// access.
+// It will use the environment variable named `BILLING_PROJECT_ID` as
requester payer bucket attribute.
func New(ctx context.Context) filesystem.Interface {
client, err := gcsx.NewClient(ctx, storage.ScopeReadWrite)
if err != nil {
@@ -54,7 +78,23 @@ func New(ctx context.Context) filesystem.Interface {
panic(errors.Wrapf(err, "failed to create GCS client"))
}
}
- return &fs{client: client}
+ return &fs{
+ client: client,
+ }
+}
+
+func SetRequesterBillingProject(project string) {
+ billingProject = project
+}
+
+// RequesterBillingProject configure project to be used in google storage
operations
+// with requester pays actived. More informaiton about requester pays in
https://cloud.google.com/storage/docs/requester-pays
+func RequesterBillingProject(project string) error {
+ if project == "" {
+ return fmt.Errorf("project cannot be empty, got %v", project)
+ }
+ // The hook itself is defined in
beam/core/runtime/harness/file_system_hooks.go
+ return hooks.EnableHook(projectBillingHook, project)
}
func (f *fs) Close() error {
@@ -73,7 +113,7 @@ func (f *fs) List(ctx context.Context, glob string)
([]string, error) {
// For now, we assume * is the first matching character to make a
// prefix listing and not list the entire bucket.
prefix := fsx.GetPrefix(object)
- it := f.client.Bucket(bucket).Objects(ctx, &storage.Query{
+ it := f.client.Bucket(bucket).UserProject(billingProject).Objects(ctx,
&storage.Query{
Prefix: prefix,
})
for {
@@ -107,7 +147,7 @@ func (f *fs) OpenRead(ctx context.Context, filename string)
(io.ReadCloser, erro
return nil, err
}
- return f.client.Bucket(bucket).Object(object).NewReader(ctx)
+ return
f.client.Bucket(bucket).UserProject(billingProject).Object(object).NewReader(ctx)
}
// TODO(herohde) 7/12/2017: should we create the bucket in OpenWrite? For now,
"no".
@@ -118,7 +158,7 @@ func (f *fs) OpenWrite(ctx context.Context, filename
string) (io.WriteCloser, er
return nil, err
}
- return f.client.Bucket(bucket).Object(object).NewWriter(ctx), nil
+ return
f.client.Bucket(bucket).UserProject(billingProject).Object(object).NewWriter(ctx),
nil
}
func (f *fs) Size(ctx context.Context, filename string) (int64, error) {
@@ -127,7 +167,7 @@ func (f *fs) Size(ctx context.Context, filename string)
(int64, error) {
return -1, err
}
- obj := f.client.Bucket(bucket).Object(object)
+ obj :=
f.client.Bucket(bucket).UserProject(billingProject).Object(object)
attrs, err := obj.Attrs(ctx)
if err != nil {
return -1, err
@@ -143,7 +183,7 @@ func (f *fs) LastModified(ctx context.Context, filename
string) (time.Time, erro
return time.Time{}, err
}
- obj := f.client.Bucket(bucket).Object(object)
+ obj :=
f.client.Bucket(bucket).UserProject(billingProject).Object(object)
attrs, err := obj.Attrs(ctx)
if err != nil {
return time.Time{}, err
@@ -159,7 +199,7 @@ func (f *fs) Remove(ctx context.Context, filename string)
error {
return err
}
- obj := f.client.Bucket(bucket).Object(object)
+ obj :=
f.client.Bucket(bucket).UserProject(billingProject).Object(object)
return obj.Delete(ctx)
}
@@ -169,13 +209,13 @@ func (f *fs) Copy(ctx context.Context, srcpath, dstpath
string) error {
if err != nil {
return err
}
- srcobj := f.client.Bucket(bucket).Object(src)
+ srcobj :=
f.client.Bucket(bucket).UserProject(billingProject).Object(src)
bucket, dst, err := gcsx.ParseObject(dstpath)
if err != nil {
return err
}
- dstobj := f.client.Bucket(bucket).Object(dst)
+ dstobj :=
f.client.Bucket(bucket).UserProject(billingProject).Object(dst)
cp := dstobj.CopierFrom(srcobj)
_, err = cp.Run(ctx)
@@ -188,13 +228,13 @@ func (f *fs) Rename(ctx context.Context, srcpath, dstpath
string) error {
if err != nil {
return err
}
- srcobj := f.client.Bucket(bucket).Object(src)
+ srcobj :=
f.client.Bucket(bucket).UserProject(billingProject).Object(src)
bucket, dst, err := gcsx.ParseObject(dstpath)
if err != nil {
return err
}
- dstobj := f.client.Bucket(bucket).Object(dst)
+ dstobj :=
f.client.Bucket(bucket).UserProject(billingProject).Object(dst)
cp := dstobj.CopierFrom(srcobj)
_, err = cp.Run(ctx)
diff --git a/sdks/go/pkg/beam/io/filesystem/gcs/gcs_test.go
b/sdks/go/pkg/beam/io/filesystem/gcs/gcs_test.go
index 09b5da0db12..66dee6bb23f 100644
--- a/sdks/go/pkg/beam/io/filesystem/gcs/gcs_test.go
+++ b/sdks/go/pkg/beam/io/filesystem/gcs/gcs_test.go
@@ -22,6 +22,7 @@ import (
"testing"
"time"
+ "github.com/apache/beam/sdks/v2/go/pkg/beam/core/util/hooks"
"github.com/apache/beam/sdks/v2/go/pkg/beam/io/filesystem"
"github.com/fsouza/fake-gcs-server/fakestorage"
"github.com/google/go-cmp/cmp"
@@ -43,6 +44,28 @@ func TestGCS_FilesystemNew(t *testing.T) {
}
func TestGCS_direct(t *testing.T) {
+ testGCS_direct(t)
+}
+
+func TestGCS_BillingProjectHookEnable(t *testing.T) {
+ billingProject := "whatever"
+ RequesterBillingProject(billingProject)
+ _, err := hooks.RunInitHooks(context.Background())
+ if err != nil {
+ t.Errorf("error to init hooks = %v", err)
+ }
+ projectBillingHook := "beam:go:hook:filesystem:billingproject"
+ projectBillingHookIsEnable, hookValue :=
hooks.IsEnabled(projectBillingHook)
+ if !projectBillingHookIsEnable {
+ t.Error("project billing hook isn't enable")
+ }
+ if hookValue[0] != billingProject {
+ t.Errorf("projectBillingHook value wrong / want {%s} got {%s}",
billingProject, hookValue[0])
+ }
+
+}
+
+func testGCS_direct(t *testing.T) {
ctx := context.Background()
dirPath := "gs://beamgogcsfilesystemtest"
filePath := dirPath + "/file.txt"