This is an automated email from the ASF dual-hosted git repository.
lostluck pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 9c614557c51 [Go SDK]: Add fileio MatchFiles, MatchAll and ReadMatches
transforms (#25809)
9c614557c51 is described below
commit 9c614557c51ad55230211f864e70f48ad0914326
Author: Johanna Öjeling <[email protected]>
AuthorDate: Wed Mar 29 22:44:29 2023 +0200
[Go SDK]: Add fileio MatchFiles, MatchAll and ReadMatches transforms
(#25809)
* Create utility file structs
* Create MatchFiles and MatchAll transforms
* Create ReadMatches transform
* Add example doc for transforms
* Move FileMetadata and Compression to fileio
* Provide functional options without exporting enums
* Update CHANGES.md
* Rename read compression options
* Provide default options
---
CHANGES.md | 1 +
sdks/go/pkg/beam/io/fileio/example_test.go | 74 ++++++++
sdks/go/pkg/beam/io/fileio/file.go | 142 ++++++++++++++
sdks/go/pkg/beam/io/fileio/file_test.go | 254 +++++++++++++++++++++++++
sdks/go/pkg/beam/io/fileio/gzip.go | 60 ++++++
sdks/go/pkg/beam/io/fileio/helper_test.go | 88 +++++++++
sdks/go/pkg/beam/io/fileio/match.go | 189 +++++++++++++++++++
sdks/go/pkg/beam/io/fileio/match_test.go | 285 +++++++++++++++++++++++++++++
sdks/go/pkg/beam/io/fileio/read.go | 140 ++++++++++++++
sdks/go/pkg/beam/io/fileio/read_test.go | 181 ++++++++++++++++++
10 files changed, 1414 insertions(+)
diff --git a/CHANGES.md b/CHANGES.md
index ab8a0b797a7..0f54d233ad6 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -62,6 +62,7 @@
* Support for X source added (Java/Python)
([#X](https://github.com/apache/beam/issues/X)).
* BigQuery Storage Write API is now available in Python SDK via cross-language
([#21961](https://github.com/apache/beam/issues/21961)).
* Added HbaseIO support for writing RowMutations (ordered by rowkey) to Hbase
(Java) ([#25830](https://github.com/apache/beam/issues/25830)).
+* Added fileio transforms MatchFiles, MatchAll and ReadMatches (Go)
([#25779](https://github.com/apache/beam/issues/25779)).
## New Features / Improvements
diff --git a/sdks/go/pkg/beam/io/fileio/example_test.go
b/sdks/go/pkg/beam/io/fileio/example_test.go
new file mode 100644
index 00000000000..ed27546f9dd
--- /dev/null
+++ b/sdks/go/pkg/beam/io/fileio/example_test.go
@@ -0,0 +1,74 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package fileio_test
+
+import (
+ "context"
+ "log"
+
+ "github.com/apache/beam/sdks/v2/go/pkg/beam"
+ "github.com/apache/beam/sdks/v2/go/pkg/beam/io/fileio"
+ "github.com/apache/beam/sdks/v2/go/pkg/beam/x/beamx"
+ "github.com/apache/beam/sdks/v2/go/pkg/beam/x/debug"
+)
+
+func ExampleMatchFiles() {
+ beam.Init()
+ p, s := beam.NewPipelineWithRoot()
+
+ matches := fileio.MatchFiles(s, "gs://path/to/*.gz")
+ debug.Print(s, matches)
+
+ if err := beamx.Run(context.Background(), p); err != nil {
+ log.Fatalf("Failed to execute job: %v", err)
+ }
+}
+
+func ExampleMatchAll() {
+ beam.Init()
+ p, s := beam.NewPipelineWithRoot()
+
+ globs := beam.Create(s, "gs://path/to/sub1/*.gz",
"gs://path/to/sub2/*.gz")
+ matches := fileio.MatchAll(s, globs)
+ debug.Print(s, matches)
+
+ if err := beamx.Run(context.Background(), p); err != nil {
+ log.Fatalf("Failed to execute job: %v", err)
+ }
+}
+
+func ExampleReadMatches() {
+ beam.Init()
+ p, s := beam.NewPipelineWithRoot()
+
+ pairFn := func(ctx context.Context, file fileio.ReadableFile, emit
func(string, string)) error {
+ contents, err := file.ReadString(ctx)
+ if err != nil {
+ return err
+ }
+ emit(file.Metadata.Path, contents)
+ return nil
+ }
+
+ matches := fileio.MatchFiles(s, "gs://path/to/*.gz")
+ files := fileio.ReadMatches(s, matches)
+ pairs := beam.ParDo(s, pairFn, files)
+ debug.Print(s, pairs)
+
+ if err := beamx.Run(context.Background(), p); err != nil {
+ log.Fatalf("Failed to execute job: %v", err)
+ }
+}
diff --git a/sdks/go/pkg/beam/io/fileio/file.go
b/sdks/go/pkg/beam/io/fileio/file.go
new file mode 100644
index 00000000000..4ae7b3d3d07
--- /dev/null
+++ b/sdks/go/pkg/beam/io/fileio/file.go
@@ -0,0 +1,142 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package fileio
+
+import (
+ "context"
+ "errors"
+ "io"
+ "path/filepath"
+ "reflect"
+
+ "github.com/apache/beam/sdks/v2/go/pkg/beam"
+ "github.com/apache/beam/sdks/v2/go/pkg/beam/io/filesystem"
+ "github.com/apache/beam/sdks/v2/go/pkg/beam/log"
+)
+
+func init() {
+ beam.RegisterType(reflect.TypeOf((*FileMetadata)(nil)).Elem())
+ beam.RegisterType(reflect.TypeOf((*ReadableFile)(nil)).Elem())
+}
+
+// FileMetadata contains metadata about a file, namely its path and size in
bytes.
+type FileMetadata struct {
+ Path string
+ Size int64
+}
+
+// compressionType is the type of compression used to compress a file.
+type compressionType int
+
+const (
+ // compressionAuto indicates that the compression type should be
auto-detected.
+ compressionAuto compressionType = iota
+ // compressionGzip indicates that the file is compressed using gzip.
+ compressionGzip
+ // compressionUncompressed indicates that the file is not compressed.
+ compressionUncompressed
+)
+
+// ReadableFile is a wrapper around a FileMetadata and compressionType that
can be used to obtain a
+// file descriptor or read the file's contents.
+type ReadableFile struct {
+ Metadata FileMetadata
+ Compression compressionType
+}
+
+// Open opens the file for reading. The compression type is determined by the
Compression field of
+// the ReadableFile. If Compression is compressionAuto, the compression type
is auto-detected from
+// the file extension. It is the caller's responsibility to close the returned
reader.
+func (f ReadableFile) Open(ctx context.Context) (io.ReadCloser, error) {
+ fs, err := filesystem.New(ctx, f.Metadata.Path)
+ if err != nil {
+ return nil, err
+ }
+ defer fs.Close()
+
+ rc, err := fs.OpenRead(ctx, f.Metadata.Path)
+ if err != nil {
+ return nil, err
+ }
+
+ comp := f.Compression
+ if comp == compressionAuto {
+ comp = compressionFromExt(f.Metadata.Path)
+ }
+
+ return newDecompressionReader(rc, comp)
+}
+
+// compressionFromExt detects the compression of a file based on its
extension. If the extension is
+// not recognized, compressionUncompressed is returned.
+func compressionFromExt(path string) compressionType {
+ switch filepath.Ext(path) {
+ case ".gz":
+ return compressionGzip
+ default:
+ return compressionUncompressed
+ }
+}
+
+// newDecompressionReader returns an io.ReadCloser that can be used to read
uncompressed data from
+// reader, based on the specified compression. If the compression is
compressionAuto, a non-nil
+// error is returned. It is the caller's responsibility to close the returned
reader.
+func newDecompressionReader(
+ reader io.ReadCloser,
+ compression compressionType,
+) (io.ReadCloser, error) {
+ switch compression {
+ case compressionAuto:
+ return nil, errors.New(
+ "compression must be resolved into a concrete type
before obtaining a reader",
+ )
+ case compressionGzip:
+ return newGzipReader(reader)
+ default:
+ return reader, nil
+ }
+}
+
+// Read reads the entire file into memory and returns the contents.
+func (f ReadableFile) Read(ctx context.Context) (data []byte, err error) {
+ rc, err := f.Open(ctx)
+ if err != nil {
+ return nil, err
+ }
+
+ defer func() {
+ closeErr := rc.Close()
+ if err != nil {
+ if closeErr != nil {
+ log.Errorf(ctx, "error closing reader: %v",
closeErr)
+ }
+ return
+ }
+ err = closeErr
+ }()
+
+ return io.ReadAll(rc)
+}
+
+// ReadString reads the entire file into memory and returns the contents as a
string.
+func (f ReadableFile) ReadString(ctx context.Context) (string, error) {
+ data, err := f.Read(ctx)
+ if err != nil {
+ return "", err
+ }
+
+ return string(data), nil
+}
diff --git a/sdks/go/pkg/beam/io/fileio/file_test.go
b/sdks/go/pkg/beam/io/fileio/file_test.go
new file mode 100644
index 00000000000..5f7d292387c
--- /dev/null
+++ b/sdks/go/pkg/beam/io/fileio/file_test.go
@@ -0,0 +1,254 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package fileio
+
+import (
+ "bytes"
+ "context"
+ "path/filepath"
+ "testing"
+ "testing/iotest"
+)
+
+func TestReadableFile_Open(t *testing.T) {
+ dir := t.TempDir()
+ write(t, filepath.Join(dir, "file1.txt"), []byte("test1"))
+ writeGzip(t, filepath.Join(dir, "file2.gz"), []byte("test2"))
+
+ tests := []struct {
+ name string
+ file ReadableFile
+ want []byte
+ }{
+ {
+ name: "Open uncompressed file",
+ file: ReadableFile{
+ Metadata: FileMetadata{
+ Path: filepath.Join(dir, "file1.txt"),
+ },
+ Compression: compressionUncompressed,
+ },
+ want: []byte("test1"),
+ },
+ {
+ name: "Open file with auto-detection of compression",
+ file: ReadableFile{
+ Metadata: FileMetadata{
+ Path: filepath.Join(dir, "file2.gz"),
+ },
+ Compression: compressionAuto,
+ },
+ want: []byte("test2"),
+ },
+ }
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ ctx := context.Background()
+
+ rc, err := tt.file.Open(ctx)
+ if err != nil {
+ t.Fatalf("Open() error = %v, want nil", err)
+ }
+
+ t.Cleanup(func() {
+ rc.Close()
+ })
+
+ if err := iotest.TestReader(rc, tt.want); err != nil {
+ t.Errorf("TestReader() error = %v, want nil",
err)
+ }
+ })
+ }
+}
+
+func Test_compressionFromExt(t *testing.T) {
+ tests := []struct {
+ name string
+ path string
+ want compressionType
+ }{
+ {
+ name: "compressionGzip for gz extension",
+ path: "file.gz",
+ want: compressionGzip,
+ },
+ {
+ name: "compressionUncompressed for no extension",
+ path: "file",
+ want: compressionUncompressed,
+ },
+ {
+ name: "compressionUncompressed for unrecognized
extension",
+ path: "file.unknown",
+ want: compressionUncompressed,
+ },
+ }
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ if got := compressionFromExt(tt.path); got != tt.want {
+ t.Errorf("compressionFromExt() = %v, want %v",
got, tt.want)
+ }
+ })
+ }
+}
+
+func Test_newDecompressionReader(t *testing.T) {
+ dir := t.TempDir()
+ write(t, filepath.Join(dir, "file1.txt"), []byte("test1"))
+ writeGzip(t, filepath.Join(dir, "file2.gz"), []byte("test2"))
+
+ tests := []struct {
+ name string
+ path string
+ comp compressionType
+ want []byte
+ wantErr bool
+ }{
+ {
+ name: "Reader for uncompressed file",
+ path: filepath.Join(dir, "file1.txt"),
+ comp: compressionUncompressed,
+ want: []byte("test1"),
+ },
+ {
+ name: "Reader for gzip compressed file",
+ path: filepath.Join(dir, "file2.gz"),
+ comp: compressionGzip,
+ want: []byte("test2"),
+ },
+ {
+ name: "Error - reader for auto compression not
supported",
+ path: filepath.Join(dir, "file2.gz"),
+ comp: compressionAuto,
+ wantErr: true,
+ },
+ }
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ rc := openFile(t, tt.path)
+
+ dr, err := newDecompressionReader(rc, tt.comp)
+ if (err != nil) != tt.wantErr {
+ t.Fatalf("newDecompressionReader() error = %v,
wantErr %v", err, tt.wantErr)
+ }
+ if tt.wantErr {
+ return
+ }
+
+ t.Cleanup(func() {
+ dr.Close()
+ })
+
+ if err := iotest.TestReader(dr, tt.want); err != nil {
+ t.Errorf("TestReader() error = %v, want nil",
err)
+ }
+ })
+ }
+}
+
+func TestReadableFile_Read(t *testing.T) {
+ dir := t.TempDir()
+ write(t, filepath.Join(dir, "file1.txt"), []byte("test1"))
+ writeGzip(t, filepath.Join(dir, "file2.gz"), []byte("test2"))
+
+ tests := []struct {
+ name string
+ file ReadableFile
+ want []byte
+ }{
+ {
+ name: "Read contents from uncompressed file",
+ file: ReadableFile{
+ Metadata: FileMetadata{
+ Path: filepath.Join(dir, "file1.txt"),
+ },
+ Compression: compressionUncompressed,
+ },
+ want: []byte("test1"),
+ },
+ {
+ name: "Read contents from gzip compressed file",
+ file: ReadableFile{
+ Metadata: FileMetadata{
+ Path: filepath.Join(dir, "file2.gz"),
+ },
+ Compression: compressionGzip,
+ },
+ want: []byte("test2"),
+ },
+ }
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ ctx := context.Background()
+
+ got, err := tt.file.Read(ctx)
+ if err != nil {
+ t.Fatalf("Read() error = %v, want nil", err)
+ }
+
+ if !bytes.Equal(got, tt.want) {
+ t.Errorf("Read() got = %v, want %v", got,
tt.want)
+ }
+ })
+ }
+}
+
+func TestReadableFile_ReadString(t *testing.T) {
+ dir := t.TempDir()
+ write(t, filepath.Join(dir, "file1.txt"), []byte("test1"))
+ writeGzip(t, filepath.Join(dir, "file2.gz"), []byte("test2"))
+
+ tests := []struct {
+ name string
+ file ReadableFile
+ want string
+ }{
+ {
+ name: "Read contents from uncompressed file as string",
+ file: ReadableFile{
+ Metadata: FileMetadata{
+ Path: filepath.Join(dir, "file1.txt"),
+ },
+ Compression: compressionUncompressed,
+ },
+ want: "test1",
+ },
+ {
+ name: "Read contents from gzip compressed file as
string",
+ file: ReadableFile{
+ Metadata: FileMetadata{
+ Path: filepath.Join(dir, "file2.gz"),
+ },
+ Compression: compressionGzip,
+ },
+ want: "test2",
+ },
+ }
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ ctx := context.Background()
+
+ got, err := tt.file.ReadString(ctx)
+ if err != nil {
+ t.Fatalf("ReadString() error = %v, want nil",
err)
+ }
+
+ if got != tt.want {
+ t.Errorf("ReadString() got = %v, want %v", got,
tt.want)
+ }
+ })
+ }
+}
diff --git a/sdks/go/pkg/beam/io/fileio/gzip.go
b/sdks/go/pkg/beam/io/fileio/gzip.go
new file mode 100644
index 00000000000..5a63d9be7d3
--- /dev/null
+++ b/sdks/go/pkg/beam/io/fileio/gzip.go
@@ -0,0 +1,60 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package fileio
+
+import (
+ "compress/gzip"
+ "context"
+ "io"
+
+ "github.com/apache/beam/sdks/v2/go/pkg/beam/log"
+)
+
+// gzipReader is a wrapper around a gzip.Reader that also closes the
underlying io.ReadCloser.
+type gzipReader struct {
+ rc io.ReadCloser
+ zr *gzip.Reader
+}
+
+// newGzipReader creates a new gzipReader from an io.ReadCloser.
+func newGzipReader(rc io.ReadCloser) (*gzipReader, error) {
+ zr, err := gzip.NewReader(rc)
+ if err != nil {
+ return nil, err
+ }
+ return &gzipReader{rc: rc, zr: zr}, nil
+}
+
+// Read reads from the gzip reader.
+func (r *gzipReader) Read(p []byte) (int, error) {
+ return r.zr.Read(p)
+}
+
+// Close closes the gzip reader and the underlying io.ReadCloser.
+func (r *gzipReader) Close() (err error) {
+ defer func() {
+ rcErr := r.rc.Close()
+ if err != nil {
+ if rcErr != nil {
+ log.Errorf(context.Background(), "error closing
reader: %v", rcErr)
+ }
+ return
+ }
+ err = rcErr
+ }()
+
+ return r.zr.Close()
+}
diff --git a/sdks/go/pkg/beam/io/fileio/helper_test.go
b/sdks/go/pkg/beam/io/fileio/helper_test.go
new file mode 100644
index 00000000000..e8b61ef067f
--- /dev/null
+++ b/sdks/go/pkg/beam/io/fileio/helper_test.go
@@ -0,0 +1,88 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package fileio
+
+import (
+ "bufio"
+ "compress/gzip"
+ "io"
+ "os"
+ "path/filepath"
+ "testing"
+)
+
+// openFile opens a file for reading.
+func openFile(t *testing.T, path string) io.ReadCloser {
+ t.Helper()
+
+ f, err := os.Open(path)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ return f
+}
+
+// createFile creates a file and parent directories if needed.
+func createFile(t *testing.T, path string) *os.File {
+ t.Helper()
+
+ dir := filepath.Dir(path)
+ if err := os.MkdirAll(dir, 0750); err != nil && !os.IsExist(err) {
+ t.Fatal(err)
+ }
+
+ file, err := os.Create(path)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ return file
+}
+
+// write writes data to a file.
+func write(t *testing.T, path string, data []byte) {
+ t.Helper()
+
+ f := createFile(t, path)
+ defer f.Close()
+
+ bw := bufio.NewWriter(f)
+ if _, err := bw.Write(data); err != nil {
+ t.Fatal(err)
+ }
+
+ if err := bw.Flush(); err != nil {
+ t.Fatal(err)
+ }
+}
+
+// writeGzip compresses and writes data to a file using gzip.
+func writeGzip(t *testing.T, path string, data []byte) {
+ t.Helper()
+
+ f := createFile(t, path)
+ defer f.Close()
+
+ zw := gzip.NewWriter(f)
+ if _, err := zw.Write(data); err != nil {
+ t.Fatal(err)
+ }
+
+ if err := zw.Close(); err != nil {
+ t.Fatal(err)
+ }
+}
diff --git a/sdks/go/pkg/beam/io/fileio/match.go
b/sdks/go/pkg/beam/io/fileio/match.go
new file mode 100644
index 00000000000..0c8470c61b6
--- /dev/null
+++ b/sdks/go/pkg/beam/io/fileio/match.go
@@ -0,0 +1,189 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Package fileio provides transforms for matching and reading files.
+package fileio
+
+import (
+ "context"
+ "fmt"
+ "strings"
+
+ "github.com/apache/beam/sdks/v2/go/pkg/beam"
+ "github.com/apache/beam/sdks/v2/go/pkg/beam/io/filesystem"
+ "github.com/apache/beam/sdks/v2/go/pkg/beam/register"
+)
+
+func init() {
+ register.DoFn3x1[context.Context, string, func(FileMetadata),
error](&matchFn{})
+ register.Emitter1[FileMetadata]()
+}
+
+// emptyTreatment controls how empty matches of a pattern are treated.
+type emptyTreatment int
+
+const (
+ // emptyAllow allows empty matches.
+ emptyAllow emptyTreatment = iota
+ // emptyDisallow disallows empty matches.
+ emptyDisallow
+ // emptyAllowIfWildcard allows empty matches if the pattern contains a
wildcard.
+ emptyAllowIfWildcard
+)
+
+type matchOption struct {
+ EmptyTreatment emptyTreatment
+}
+
+// MatchOptionFn is a function that can be passed to MatchFiles or MatchAll to
configure options for
+// matching files.
+type MatchOptionFn func(*matchOption)
+
+// MatchEmptyAllowIfWildcard specifies that empty matches are allowed if the
pattern contains a
+// wildcard.
+func MatchEmptyAllowIfWildcard() MatchOptionFn {
+ return func(o *matchOption) {
+ o.EmptyTreatment = emptyAllowIfWildcard
+ }
+}
+
+// MatchEmptyAllow specifies that empty matches are allowed.
+func MatchEmptyAllow() MatchOptionFn {
+ return func(o *matchOption) {
+ o.EmptyTreatment = emptyAllow
+ }
+}
+
+// MatchEmptyDisallow specifies that empty matches are not allowed.
+func MatchEmptyDisallow() MatchOptionFn {
+ return func(o *matchOption) {
+ o.EmptyTreatment = emptyDisallow
+ }
+}
+
+// MatchFiles finds all files matching the glob pattern and returns a
PCollection<FileMetadata> of
+// the matching files. MatchFiles accepts a variadic number of MatchOptionFn
that can be used to
+// configure the treatment of empty matches. By default, empty matches are
allowed if the pattern
+// contains a wildcard.
+func MatchFiles(s beam.Scope, glob string, opts ...MatchOptionFn)
beam.PCollection {
+ s = s.Scope("fileio.MatchFiles")
+
+ filesystem.ValidateScheme(glob)
+ return MatchAll(s, beam.Create(s, glob), opts...)
+}
+
+// MatchAll finds all files matching the glob patterns given by the incoming
PCollection<string> and
+// returns a PCollection<FileMetadata> of the matching files. MatchAll accepts
a variadic number of
+// MatchOptionFn that can be used to configure the treatment of empty matches.
By default, empty
+// matches are allowed if the pattern contains a wildcard.
+func MatchAll(s beam.Scope, col beam.PCollection, opts ...MatchOptionFn)
beam.PCollection {
+ s = s.Scope("fileio.MatchAll")
+
+ option := &matchOption{
+ EmptyTreatment: emptyAllowIfWildcard,
+ }
+
+ for _, opt := range opts {
+ opt(option)
+ }
+
+ return beam.ParDo(s, newMatchFn(option), col)
+}
+
+type matchFn struct {
+ EmptyTreatment emptyTreatment
+}
+
+func newMatchFn(option *matchOption) *matchFn {
+ return &matchFn{
+ EmptyTreatment: option.EmptyTreatment,
+ }
+}
+
+func (fn *matchFn) ProcessElement(
+ ctx context.Context,
+ glob string,
+ emit func(FileMetadata),
+) error {
+ if strings.TrimSpace(glob) == "" {
+ return nil
+ }
+
+ fs, err := filesystem.New(ctx, glob)
+ if err != nil {
+ return err
+ }
+ defer fs.Close()
+
+ files, err := fs.List(ctx, glob)
+ if err != nil {
+ return err
+ }
+
+ if len(files) == 0 {
+ if !allowEmptyMatch(glob, fn.EmptyTreatment) {
+ return fmt.Errorf("no files matching pattern %q", glob)
+ }
+ return nil
+ }
+
+ metadata, err := metadataFromFiles(ctx, fs, files)
+ if err != nil {
+ return err
+ }
+
+ for _, md := range metadata {
+ emit(md)
+ }
+
+ return nil
+}
+
+func allowEmptyMatch(glob string, treatment emptyTreatment) bool {
+ switch treatment {
+ case emptyDisallow:
+ return false
+ case emptyAllowIfWildcard:
+ return strings.Contains(glob, "*")
+ default:
+ return true
+ }
+}
+
+func metadataFromFiles(
+ ctx context.Context,
+ fs filesystem.Interface,
+ files []string,
+) ([]FileMetadata, error) {
+ if len(files) == 0 {
+ return nil, nil
+ }
+
+ metadata := make([]FileMetadata, len(files))
+
+ for i, path := range files {
+ size, err := fs.Size(ctx, path)
+ if err != nil {
+ return nil, err
+ }
+
+ metadata[i] = FileMetadata{
+ Path: path,
+ Size: size,
+ }
+ }
+
+ return metadata, nil
+}
diff --git a/sdks/go/pkg/beam/io/fileio/match_test.go
b/sdks/go/pkg/beam/io/fileio/match_test.go
new file mode 100644
index 00000000000..57b2d8cfe1c
--- /dev/null
+++ b/sdks/go/pkg/beam/io/fileio/match_test.go
@@ -0,0 +1,285 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package fileio
+
+import (
+ "context"
+ "path/filepath"
+ "testing"
+
+ "github.com/apache/beam/sdks/v2/go/pkg/beam"
+ "github.com/apache/beam/sdks/v2/go/pkg/beam/io/filesystem/local"
+ "github.com/apache/beam/sdks/v2/go/pkg/beam/testing/passert"
+ "github.com/apache/beam/sdks/v2/go/pkg/beam/testing/ptest"
+ "github.com/google/go-cmp/cmp"
+)
+
+type testFile struct {
+ filename string
+ data []byte
+}
+
+var testFiles = []testFile{
+ {
+ filename: "file1.txt",
+ data: []byte("test1"),
+ },
+ {
+ filename: "file2.txt",
+ data: []byte(""),
+ },
+ {
+ filename: "file3.csv",
+ data: []byte("test3"),
+ },
+}
+
+func TestMatchFiles(t *testing.T) {
+ dir := t.TempDir()
+ testDir := filepath.Join(dir, "testdata")
+
+ for _, tf := range testFiles {
+ write(t, filepath.Join(testDir, tf.filename), tf.data)
+ }
+
+ tests := []struct {
+ name string
+ glob string
+ opts []MatchOptionFn
+ want []any
+ }{
+ {
+ name: "Match files",
+ glob: filepath.Join(dir, "*", "file*.txt"),
+ want: []any{
+ FileMetadata{
+ Path: filepath.Join(testDir,
"file1.txt"),
+ Size: 5,
+ },
+ FileMetadata{
+ Path: filepath.Join(testDir,
"file2.txt"),
+ Size: 0,
+ },
+ },
+ },
+ {
+ name: "Read matches with specified empty match
treatment",
+ opts: []MatchOptionFn{
+ MatchEmptyAllow(),
+ },
+ glob: filepath.Join(dir, "non-existent.txt"),
+ want: nil,
+ },
+ }
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ p, s := beam.NewPipelineWithRoot()
+
+ got := MatchFiles(s, tt.glob, tt.opts...)
+
+ passert.Equals(s, got, tt.want...)
+ ptest.RunAndValidate(t, p)
+ })
+ }
+}
+
+func TestMatchAll(t *testing.T) {
+ dir := t.TempDir()
+ testDir := filepath.Join(dir, "testdata")
+
+ for _, tf := range testFiles {
+ write(t, filepath.Join(testDir, tf.filename), tf.data)
+ }
+
+ tests := []struct {
+ name string
+ opts []MatchOptionFn
+ input []any
+ want []any
+ wantErr bool
+ }{
+ {
+ name: "Match all",
+ input: []any{
+ filepath.Join(dir, "*", "file*.txt"),
+ filepath.Join(dir, "*", "file*.csv"),
+ },
+ want: []any{
+ FileMetadata{
+ Path: filepath.Join(testDir,
"file1.txt"),
+ Size: 5,
+ },
+ FileMetadata{
+ Path: filepath.Join(testDir,
"file2.txt"),
+ Size: 0,
+ },
+ FileMetadata{
+ Path: filepath.Join(testDir,
"file3.csv"),
+ Size: 5,
+ },
+ },
+ },
+ {
+ name: "No matches",
+ input: []any{
+ filepath.Join(dir, "*", "non-existent.txt"),
+ },
+ want: nil,
+ },
+ {
+ name: "No matches for empty glob",
+ input: []any{""},
+ want: nil,
+ },
+ {
+ name: "No matches for glob without wildcard and empty
matches allowed",
+ opts: []MatchOptionFn{
+ MatchEmptyAllow(),
+ },
+ input: []any{
+ filepath.Join(dir, "non-existent.txt"),
+ },
+ want: nil,
+ },
+ {
+ name: "Error - no matches for glob without wildcard",
+ input: []any{
+ filepath.Join(dir, "non-existent.txt"),
+ },
+ wantErr: true,
+ },
+ {
+ name: "Error - no matches and empty matches disallowed",
+ opts: []MatchOptionFn{
+ MatchEmptyDisallow(),
+ },
+ input: []any{
+ filepath.Join(dir, "*", "non-existent.txt"),
+ },
+ wantErr: true,
+ },
+ }
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ p, s := beam.NewPipelineWithRoot()
+
+ col := beam.Create(s, tt.input...)
+ got := MatchAll(s, col, tt.opts...)
+
+ passert.Equals(s, got, tt.want...)
+ if err := ptest.Run(p); (err != nil) != tt.wantErr {
+ t.Errorf("MatchAll() error = %v, wantErr %v",
err, tt.wantErr)
+ }
+ })
+ }
+}
+
+func Test_allowEmptyMatch(t *testing.T) {
+ tests := []struct {
+ name string
+ glob string
+ treatment emptyTreatment
+ want bool
+ }{
+ {
+ name: "Allow for emptyAllow",
+ glob: "path/to/file.txt",
+ treatment: emptyAllow,
+ want: true,
+ },
+ {
+ name: "Disallow for emptyDisallow",
+ glob: "path/to/file.txt",
+ treatment: emptyDisallow,
+ want: false,
+ },
+ {
+ name: "Allow for glob with wildcard and
emptyAllowIfWildcard",
+ glob: "path/to/*.txt",
+ treatment: emptyAllowIfWildcard,
+ want: true,
+ },
+ {
+ name: "Disallow for glob without wildcard and
emptyAllowIfWildcard",
+ glob: "path/to/file.txt",
+ treatment: emptyAllowIfWildcard,
+ want: false,
+ },
+ }
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ if got := allowEmptyMatch(tt.glob, tt.treatment); got
!= tt.want {
+ t.Errorf("allowEmptyMatch() = %v, want %v",
got, tt.want)
+ }
+ })
+ }
+}
+
+func Test_metadataFromFiles(t *testing.T) {
+ dir := t.TempDir()
+ files := make([]string, len(testFiles))
+
+ for i, tf := range testFiles {
+ file := filepath.Join(dir, tf.filename)
+ write(t, file, tf.data)
+ files[i] = file
+ }
+
+ tests := []struct {
+ name string
+ files []string
+ want []FileMetadata
+ }{
+ {
+ name: "Slice of FileMetadata from file paths",
+ files: files,
+ want: []FileMetadata{
+ {
+ Path: filepath.Join(dir, "file1.txt"),
+ Size: 5,
+ },
+ {
+ Path: filepath.Join(dir, "file2.txt"),
+ Size: 0,
+ },
+ {
+ Path: filepath.Join(dir, "file3.csv"),
+ Size: 5,
+ },
+ },
+ },
+ {
+ name: "Nil when files is empty",
+ files: nil,
+ want: nil,
+ },
+ }
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ ctx := context.Background()
+ fs := local.New(ctx)
+
+ got, err := metadataFromFiles(ctx, fs, tt.files)
+ if err != nil {
+ t.Fatalf("metadataFromFiles() error = %v, want
nil", err)
+ }
+
+ if !cmp.Equal(got, tt.want) {
+ t.Errorf("metadataFromFiles() got = %v, want
%v", got, tt.want)
+ }
+ })
+ }
+}
diff --git a/sdks/go/pkg/beam/io/fileio/read.go
b/sdks/go/pkg/beam/io/fileio/read.go
new file mode 100644
index 00000000000..b06f6a2a6f9
--- /dev/null
+++ b/sdks/go/pkg/beam/io/fileio/read.go
@@ -0,0 +1,140 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package fileio
+
+import (
+ "fmt"
+ "strings"
+
+ "github.com/apache/beam/sdks/v2/go/pkg/beam"
+ "github.com/apache/beam/sdks/v2/go/pkg/beam/register"
+)
+
+func init() {
+ register.DoFn2x1[FileMetadata, func(ReadableFile), error](&readFn{})
+ register.Emitter1[ReadableFile]()
+}
+
+// directoryTreatment controls how paths to directories are treated when
reading matches.
+type directoryTreatment int
+
+const (
+ // directorySkip skips directories.
+ directorySkip directoryTreatment = iota
+ // directoryDisallow disallows directories.
+ directoryDisallow
+)
+
+type readOption struct {
+ Compression compressionType
+ DirectoryTreatment directoryTreatment
+}
+
+// ReadOptionFn is a function that can be passed to ReadMatches to configure
options for
+// reading files.
+type ReadOptionFn func(*readOption)
+
+// ReadAutoCompression specifies that the compression type of files should be
auto-detected.
+func ReadAutoCompression() ReadOptionFn {
+ return func(o *readOption) {
+ o.Compression = compressionAuto
+ }
+}
+
+// ReadGzip specifies that files have been compressed using gzip.
+func ReadGzip() ReadOptionFn {
+ return func(o *readOption) {
+ o.Compression = compressionGzip
+ }
+}
+
+// ReadUncompressed specifies that files have not been compressed.
+func ReadUncompressed() ReadOptionFn {
+ return func(o *readOption) {
+ o.Compression = compressionUncompressed
+ }
+}
+
+// ReadDirectorySkip specifies that directories are skipped.
+func ReadDirectorySkip() ReadOptionFn {
+ return func(o *readOption) {
+ o.DirectoryTreatment = directorySkip
+ }
+}
+
+// ReadDirectoryDisallow specifies that directories are not allowed.
+func ReadDirectoryDisallow() ReadOptionFn {
+ return func(o *readOption) {
+ o.DirectoryTreatment = directoryDisallow
+ }
+}
+
+// ReadMatches accepts the result of MatchFiles or MatchAll as a
PCollection<FileMetadata> and
+// converts it to a PCollection<ReadableFile>. The ReadableFile can be used to
retrieve file
+// metadata, open the file for reading or read the entire file into memory.
ReadMatches accepts a
+// variadic number of ReadOptionFn that can be used to configure the
compression type of the files
+// and treatment of directories. By default, the compression type is
determined by the file
+// extension and directories are skipped.
+func ReadMatches(s beam.Scope, col beam.PCollection, opts ...ReadOptionFn)
beam.PCollection {
+ s = s.Scope("fileio.ReadMatches")
+
+ option := &readOption{
+ Compression: compressionAuto,
+ DirectoryTreatment: directorySkip,
+ }
+
+ for _, opt := range opts {
+ opt(option)
+ }
+
+ return beam.ParDo(s, newReadFn(option), col)
+}
+
+type readFn struct {
+ Compression compressionType
+ DirectoryTreatment directoryTreatment
+}
+
+func newReadFn(option *readOption) *readFn {
+ return &readFn{
+ Compression: option.Compression,
+ DirectoryTreatment: option.DirectoryTreatment,
+ }
+}
+
+func (fn *readFn) ProcessElement(metadata FileMetadata, emit
func(ReadableFile)) error {
+ if isDirectory(metadata.Path) {
+ if fn.DirectoryTreatment == directoryDisallow {
+ return fmt.Errorf("path to directory not allowed: %q",
metadata.Path)
+ }
+ return nil
+ }
+
+ file := ReadableFile{
+ Metadata: metadata,
+ Compression: fn.Compression,
+ }
+
+ emit(file)
+ return nil
+}
+
+func isDirectory(path string) bool {
+ if strings.HasSuffix(path, "/") || strings.HasSuffix(path, "\\") {
+ return true
+ }
+ return false
+}
diff --git a/sdks/go/pkg/beam/io/fileio/read_test.go
b/sdks/go/pkg/beam/io/fileio/read_test.go
new file mode 100644
index 00000000000..b961d324f9a
--- /dev/null
+++ b/sdks/go/pkg/beam/io/fileio/read_test.go
@@ -0,0 +1,181 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package fileio
+
+import (
+ "testing"
+
+ "github.com/apache/beam/sdks/v2/go/pkg/beam"
+ "github.com/apache/beam/sdks/v2/go/pkg/beam/testing/passert"
+ "github.com/apache/beam/sdks/v2/go/pkg/beam/testing/ptest"
+)
+
+func TestReadMatches(t *testing.T) {
+ tests := []struct {
+ name string
+ opts []ReadOptionFn
+ input []any
+ want []any
+ wantErr bool
+ }{
+ {
+ name: "Read matches",
+ input: []any{
+ FileMetadata{
+ Path: "file1.txt",
+ Size: 5,
+ },
+ FileMetadata{
+ Path: "file2.txt",
+ Size: 0,
+ },
+ },
+ want: []any{
+ ReadableFile{
+ Metadata: FileMetadata{
+ Path: "file1.txt",
+ Size: 5,
+ },
+ Compression: compressionAuto,
+ },
+ ReadableFile{
+ Metadata: FileMetadata{
+ Path: "file2.txt",
+ Size: 0,
+ },
+ Compression: compressionAuto,
+ },
+ },
+ },
+ {
+ name: "Read matches with specified compression",
+ opts: []ReadOptionFn{
+ ReadGzip(),
+ },
+ input: []any{
+ FileMetadata{
+ Path: "file1",
+ Size: 5,
+ },
+ FileMetadata{
+ Path: "file2",
+ Size: 0,
+ },
+ },
+ want: []any{
+ ReadableFile{
+ Metadata: FileMetadata{
+ Path: "file1",
+ Size: 5,
+ },
+ Compression: compressionGzip,
+ },
+ ReadableFile{
+ Metadata: FileMetadata{
+ Path: "file2",
+ Size: 0,
+ },
+ Compression: compressionGzip,
+ },
+ },
+ },
+ {
+ name: "Read matches and skip directories",
+ input: []any{
+ FileMetadata{
+ Path: "dir/",
+ Size: 0,
+ },
+ FileMetadata{
+ Path: "file1.txt",
+ Size: 5,
+ },
+ },
+ want: []any{
+ ReadableFile{
+ Metadata: FileMetadata{
+ Path: "file1.txt",
+ Size: 5,
+ },
+ Compression: compressionAuto,
+ },
+ },
+ },
+ {
+ name: "Error - directories disallowed",
+ opts: []ReadOptionFn{
+ ReadDirectoryDisallow(),
+ },
+ input: []any{
+ FileMetadata{
+ Path: "dir/",
+ Size: 0,
+ },
+ FileMetadata{
+ Path: "file1.txt",
+ Size: 5,
+ },
+ },
+ wantErr: true,
+ },
+ }
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ t.Run(tt.name, func(t *testing.T) {
+ p, s := beam.NewPipelineWithRoot()
+
+ col := beam.Create(s, tt.input...)
+ got := ReadMatches(s, col, tt.opts...)
+
+ passert.Equals(s, got, tt.want...)
+ if err := ptest.Run(p); (err != nil) !=
tt.wantErr {
+ t.Errorf("ReadMatches() error = %v,
wantErr %v", err, tt.wantErr)
+ }
+ })
+ })
+ }
+}
+
+func Test_isDirectory(t *testing.T) {
+ tests := []struct {
+ name string
+ path string
+ want bool
+ }{
+ {
+ name: "Path to directory with forward slash directory
separator",
+ path: "path/to/",
+ want: true,
+ },
+ {
+ name: "Path to directory with backslash directory
separator",
+ path: "path\\to\\",
+ want: true,
+ },
+ {
+ name: "Path to file",
+ path: "path/to/file",
+ want: false,
+ },
+ }
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ if got := isDirectory(tt.path); got != tt.want {
+ t.Errorf("isDirectory() = %v, want %v", got,
tt.want)
+ }
+ })
+ }
+}