This is an automated email from the ASF dual-hosted git repository.

lostluck pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new a636f25e139 [Go SDK]: Refactor avroio and parquetio Read to use fileio 
abstractions (#28177)
a636f25e139 is described below

commit a636f25e139d4ccecc1154028d63168f99200c5a
Author: Johanna Öjeling <[email protected]>
AuthorDate: Wed Aug 30 22:11:30 2023 +0200

    [Go SDK]: Refactor avroio and parquetio Read to use fileio abstractions 
(#28177)
    
    * Refactor avroio.Read to use fileio abstractions
    
    * Refactor parquetio.Read to use fileio abstractions
---
 sdks/go/pkg/beam/io/avroio/avroio.go       | 41 +++++--------------------
 sdks/go/pkg/beam/io/parquetio/parquetio.go | 48 ++++--------------------------
 2 files changed, 13 insertions(+), 76 deletions(-)

diff --git a/sdks/go/pkg/beam/io/avroio/avroio.go 
b/sdks/go/pkg/beam/io/avroio/avroio.go
index b00c6d2eea0..809c9479f7a 100644
--- a/sdks/go/pkg/beam/io/avroio/avroio.go
+++ b/sdks/go/pkg/beam/io/avroio/avroio.go
@@ -20,9 +20,9 @@ import (
        "context"
        "encoding/json"
        "reflect"
-       "strings"
 
        "github.com/apache/beam/sdks/v2/go/pkg/beam"
+       "github.com/apache/beam/sdks/v2/go/pkg/beam/io/fileio"
        "github.com/apache/beam/sdks/v2/go/pkg/beam/io/filesystem"
        "github.com/apache/beam/sdks/v2/go/pkg/beam/log"
        "github.com/apache/beam/sdks/v2/go/pkg/beam/register"
@@ -30,8 +30,7 @@ import (
 )
 
 func init() {
-       register.Function3x1(expandFn)
-       register.DoFn3x1[context.Context, string, func(beam.X), 
error]((*avroReadFn)(nil))
+       register.DoFn3x1[context.Context, fileio.ReadableFile, func(beam.X), 
error]((*avroReadFn)(nil))
        register.DoFn3x1[context.Context, int, func(*string) bool, 
error]((*writeAvroFn)(nil))
        register.Emitter1[beam.X]()
        register.Iter1[string]()
@@ -49,7 +48,8 @@ func Read(s beam.Scope, glob string, t reflect.Type) 
beam.PCollection {
 }
 
 func read(s beam.Scope, t reflect.Type, col beam.PCollection) beam.PCollection 
{
-       files := beam.ParDo(s, expandFn, col)
+       matches := fileio.MatchAll(s, col, fileio.MatchEmptyAllow())
+       files := fileio.ReadMatches(s, matches, fileio.ReadUncompressed())
        return beam.ParDo(s,
                &avroReadFn{Type: beam.EncodedType{T: t}},
                files,
@@ -57,42 +57,15 @@ func read(s beam.Scope, t reflect.Type, col 
beam.PCollection) beam.PCollection {
        )
 }
 
-func expandFn(ctx context.Context, glob string, emit func(string)) error {
-       if strings.TrimSpace(glob) == "" {
-               return nil // ignore empty string elements here
-       }
-
-       fs, err := filesystem.New(ctx, glob)
-       if err != nil {
-               return err
-       }
-       defer fs.Close()
-
-       files, err := fs.List(ctx, glob)
-       if err != nil {
-               return err
-       }
-       for _, filename := range files {
-               emit(filename)
-       }
-       return nil
-}
-
 type avroReadFn struct {
        // Avro schema type
        Type beam.EncodedType
 }
 
-func (f *avroReadFn) ProcessElement(ctx context.Context, filename string, emit 
func(beam.X)) (err error) {
-       log.Infof(ctx, "Reading AVRO from %v", filename)
-
-       fs, err := filesystem.New(ctx, filename)
-       if err != nil {
-               return
-       }
-       defer fs.Close()
+func (f *avroReadFn) ProcessElement(ctx context.Context, file 
fileio.ReadableFile, emit func(beam.X)) (err error) {
+       log.Infof(ctx, "Reading AVRO from %v", file.Metadata.Path)
 
-       fd, err := fs.OpenRead(ctx, filename)
+       fd, err := file.Open(ctx)
        if err != nil {
                return
        }
diff --git a/sdks/go/pkg/beam/io/parquetio/parquetio.go 
b/sdks/go/pkg/beam/io/parquetio/parquetio.go
index 0d5a4fbb831..9c48d134014 100644
--- a/sdks/go/pkg/beam/io/parquetio/parquetio.go
+++ b/sdks/go/pkg/beam/io/parquetio/parquetio.go
@@ -18,11 +18,10 @@ package parquetio
 
 import (
        "context"
-       "io"
        "reflect"
-       "strings"
 
        "github.com/apache/beam/sdks/v2/go/pkg/beam"
+       "github.com/apache/beam/sdks/v2/go/pkg/beam/io/fileio"
        "github.com/apache/beam/sdks/v2/go/pkg/beam/io/filesystem"
        "github.com/apache/beam/sdks/v2/go/pkg/beam/register"
        "github.com/xitongsys/parquet-go-source/buffer"
@@ -31,14 +30,11 @@ import (
 )
 
 func init() {
-       register.Function3x1(expandFn)
        register.Emitter1[string]()
 
-       beam.RegisterType(reflect.TypeOf((*parquetReadFn)(nil)).Elem())
-       register.DoFn3x1[context.Context, string, func(beam.X), 
error](&parquetReadFn{})
+       register.DoFn3x1[context.Context, fileio.ReadableFile, func(beam.X), 
error](&parquetReadFn{})
        register.Emitter1[beam.X]()
 
-       beam.RegisterType(reflect.TypeOf((*parquetWriteFn)(nil)).Elem())
        register.DoFn3x1[context.Context, int, func(*beam.X) bool, 
error](&parquetWriteFn{})
        register.Iter1[beam.X]()
 }
@@ -63,7 +59,8 @@ func Read(s beam.Scope, glob string, t reflect.Type) 
beam.PCollection {
 }
 
 func read(s beam.Scope, t reflect.Type, col beam.PCollection) beam.PCollection 
{
-       files := beam.ParDo(s, expandFn, col)
+       matches := fileio.MatchAll(s, col, fileio.MatchEmptyAllow())
+       files := fileio.ReadMatches(s, matches, fileio.ReadUncompressed())
        return beam.ParDo(s,
                &parquetReadFn{Type: beam.EncodedType{T: t}},
                files,
@@ -71,45 +68,12 @@ func read(s beam.Scope, t reflect.Type, col 
beam.PCollection) beam.PCollection {
        )
 }
 
-func expandFn(ctx context.Context, glob string, emit func(string)) error {
-       if strings.TrimSpace(glob) == "" {
-               return nil // ignore empty string elements here
-       }
-
-       fs, err := filesystem.New(ctx, glob)
-       if err != nil {
-               return err
-       }
-       defer fs.Close()
-
-       files, err := fs.List(ctx, glob)
-       if err != nil {
-               return err
-       }
-       for _, filename := range files {
-               emit(filename)
-       }
-       return nil
-}
-
 type parquetReadFn struct {
        Type beam.EncodedType
 }
 
-func (a *parquetReadFn) ProcessElement(ctx context.Context, filename string, 
emit func(beam.X)) error {
-       fs, err := filesystem.New(ctx, filename)
-       if err != nil {
-               return err
-       }
-       defer fs.Close()
-
-       fd, err := fs.OpenRead(ctx, filename)
-       if err != nil {
-               return err
-       }
-       defer fd.Close()
-
-       data, err := io.ReadAll(fd)
+func (a *parquetReadFn) ProcessElement(ctx context.Context, file 
fileio.ReadableFile, emit func(beam.X)) error {
+       data, err := file.Read(ctx)
        if err != nil {
                return err
        }

Reply via email to