This is an automated email from the ASF dual-hosted git repository.
lostluck pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new a636f25e139 [Go SDK]: Refactor avroio and parquetio Read to use fileio
abstractions (#28177)
a636f25e139 is described below
commit a636f25e139d4ccecc1154028d63168f99200c5a
Author: Johanna Öjeling <[email protected]>
AuthorDate: Wed Aug 30 22:11:30 2023 +0200
[Go SDK]: Refactor avroio and parquetio Read to use fileio abstractions
(#28177)
* Refactor avroio.Read to use fileio abstractions
* Refactor parquetio.Read to use fileio abstractions
---
sdks/go/pkg/beam/io/avroio/avroio.go | 41 +++++--------------------
sdks/go/pkg/beam/io/parquetio/parquetio.go | 48 ++++--------------------------
2 files changed, 13 insertions(+), 76 deletions(-)
diff --git a/sdks/go/pkg/beam/io/avroio/avroio.go
b/sdks/go/pkg/beam/io/avroio/avroio.go
index b00c6d2eea0..809c9479f7a 100644
--- a/sdks/go/pkg/beam/io/avroio/avroio.go
+++ b/sdks/go/pkg/beam/io/avroio/avroio.go
@@ -20,9 +20,9 @@ import (
"context"
"encoding/json"
"reflect"
- "strings"
"github.com/apache/beam/sdks/v2/go/pkg/beam"
+ "github.com/apache/beam/sdks/v2/go/pkg/beam/io/fileio"
"github.com/apache/beam/sdks/v2/go/pkg/beam/io/filesystem"
"github.com/apache/beam/sdks/v2/go/pkg/beam/log"
"github.com/apache/beam/sdks/v2/go/pkg/beam/register"
@@ -30,8 +30,7 @@ import (
)
func init() {
- register.Function3x1(expandFn)
- register.DoFn3x1[context.Context, string, func(beam.X),
error]((*avroReadFn)(nil))
+ register.DoFn3x1[context.Context, fileio.ReadableFile, func(beam.X),
error]((*avroReadFn)(nil))
register.DoFn3x1[context.Context, int, func(*string) bool,
error]((*writeAvroFn)(nil))
register.Emitter1[beam.X]()
register.Iter1[string]()
@@ -49,7 +48,8 @@ func Read(s beam.Scope, glob string, t reflect.Type)
beam.PCollection {
}
func read(s beam.Scope, t reflect.Type, col beam.PCollection) beam.PCollection
{
- files := beam.ParDo(s, expandFn, col)
+ matches := fileio.MatchAll(s, col, fileio.MatchEmptyAllow())
+ files := fileio.ReadMatches(s, matches, fileio.ReadUncompressed())
return beam.ParDo(s,
&avroReadFn{Type: beam.EncodedType{T: t}},
files,
@@ -57,42 +57,15 @@ func read(s beam.Scope, t reflect.Type, col
beam.PCollection) beam.PCollection {
)
}
-func expandFn(ctx context.Context, glob string, emit func(string)) error {
- if strings.TrimSpace(glob) == "" {
- return nil // ignore empty string elements here
- }
-
- fs, err := filesystem.New(ctx, glob)
- if err != nil {
- return err
- }
- defer fs.Close()
-
- files, err := fs.List(ctx, glob)
- if err != nil {
- return err
- }
- for _, filename := range files {
- emit(filename)
- }
- return nil
-}
-
type avroReadFn struct {
// Avro schema type
Type beam.EncodedType
}
-func (f *avroReadFn) ProcessElement(ctx context.Context, filename string, emit
func(beam.X)) (err error) {
- log.Infof(ctx, "Reading AVRO from %v", filename)
-
- fs, err := filesystem.New(ctx, filename)
- if err != nil {
- return
- }
- defer fs.Close()
+func (f *avroReadFn) ProcessElement(ctx context.Context, file
fileio.ReadableFile, emit func(beam.X)) (err error) {
+ log.Infof(ctx, "Reading AVRO from %v", file.Metadata.Path)
- fd, err := fs.OpenRead(ctx, filename)
+ fd, err := file.Open(ctx)
if err != nil {
return
}
diff --git a/sdks/go/pkg/beam/io/parquetio/parquetio.go
b/sdks/go/pkg/beam/io/parquetio/parquetio.go
index 0d5a4fbb831..9c48d134014 100644
--- a/sdks/go/pkg/beam/io/parquetio/parquetio.go
+++ b/sdks/go/pkg/beam/io/parquetio/parquetio.go
@@ -18,11 +18,10 @@ package parquetio
import (
"context"
- "io"
"reflect"
- "strings"
"github.com/apache/beam/sdks/v2/go/pkg/beam"
+ "github.com/apache/beam/sdks/v2/go/pkg/beam/io/fileio"
"github.com/apache/beam/sdks/v2/go/pkg/beam/io/filesystem"
"github.com/apache/beam/sdks/v2/go/pkg/beam/register"
"github.com/xitongsys/parquet-go-source/buffer"
@@ -31,14 +30,11 @@ import (
)
func init() {
- register.Function3x1(expandFn)
register.Emitter1[string]()
- beam.RegisterType(reflect.TypeOf((*parquetReadFn)(nil)).Elem())
- register.DoFn3x1[context.Context, string, func(beam.X),
error](&parquetReadFn{})
+ register.DoFn3x1[context.Context, fileio.ReadableFile, func(beam.X),
error](&parquetReadFn{})
register.Emitter1[beam.X]()
- beam.RegisterType(reflect.TypeOf((*parquetWriteFn)(nil)).Elem())
register.DoFn3x1[context.Context, int, func(*beam.X) bool,
error](&parquetWriteFn{})
register.Iter1[beam.X]()
}
@@ -63,7 +59,8 @@ func Read(s beam.Scope, glob string, t reflect.Type)
beam.PCollection {
}
func read(s beam.Scope, t reflect.Type, col beam.PCollection) beam.PCollection
{
- files := beam.ParDo(s, expandFn, col)
+ matches := fileio.MatchAll(s, col, fileio.MatchEmptyAllow())
+ files := fileio.ReadMatches(s, matches, fileio.ReadUncompressed())
return beam.ParDo(s,
&parquetReadFn{Type: beam.EncodedType{T: t}},
files,
@@ -71,45 +68,12 @@ func read(s beam.Scope, t reflect.Type, col
beam.PCollection) beam.PCollection {
)
}
-func expandFn(ctx context.Context, glob string, emit func(string)) error {
- if strings.TrimSpace(glob) == "" {
- return nil // ignore empty string elements here
- }
-
- fs, err := filesystem.New(ctx, glob)
- if err != nil {
- return err
- }
- defer fs.Close()
-
- files, err := fs.List(ctx, glob)
- if err != nil {
- return err
- }
- for _, filename := range files {
- emit(filename)
- }
- return nil
-}
-
type parquetReadFn struct {
Type beam.EncodedType
}
-func (a *parquetReadFn) ProcessElement(ctx context.Context, filename string,
emit func(beam.X)) error {
- fs, err := filesystem.New(ctx, filename)
- if err != nil {
- return err
- }
- defer fs.Close()
-
- fd, err := fs.OpenRead(ctx, filename)
- if err != nil {
- return err
- }
- defer fd.Close()
-
- data, err := io.ReadAll(fd)
+func (a *parquetReadFn) ProcessElement(ctx context.Context, file
fileio.ReadableFile, emit func(beam.X)) error {
+ data, err := file.Read(ctx)
if err != nil {
return err
}