This is an automated email from the ASF dual-hosted git repository.
lostluck pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new d652d054ecd [Go SDK]: Retrieve file size in CreateInitialRestriction
in textio.Read (#25535)
d652d054ecd is described below
commit d652d054ecd80e9406430b878fb1c7a105ae17ed
Author: Johanna Öjeling <[email protected]>
AuthorDate: Sun Feb 19 23:31:25 2023 +0100
[Go SDK]: Retrieve file size in CreateInitialRestriction in textio.Read
(#25535)
* Retrieve file size in CreateInitialRestriction in textio.Read
* Update readFn doc comment
---
sdks/go/pkg/beam/io/textio/textio.go | 39 ++++++++++++++----------------------
1 file changed, 15 insertions(+), 24 deletions(-)
diff --git a/sdks/go/pkg/beam/io/textio/textio.go
b/sdks/go/pkg/beam/io/textio/textio.go
index a6f909aea1a..ed8be0a42b2 100644
--- a/sdks/go/pkg/beam/io/textio/textio.go
+++ b/sdks/go/pkg/beam/io/textio/textio.go
@@ -34,7 +34,6 @@ import (
func init() {
beam.RegisterType(reflect.TypeOf((*readFn)(nil)).Elem())
- beam.RegisterFunction(sizeFn)
beam.RegisterType(reflect.TypeOf((*writeFileFn)(nil)).Elem())
beam.RegisterFunction(expandFn)
}
@@ -82,8 +81,7 @@ func ReadAllSdf(s beam.Scope, col beam.PCollection)
beam.PCollection {
// into separate bundles.
func read(s beam.Scope, col beam.PCollection) beam.PCollection {
files := beam.ParDo(s, expandFn, col)
- sized := beam.ParDo(s, sizeFn, files)
- return beam.ParDo(s, &readFn{}, sized)
+ return beam.ParDo(s, &readFn{}, files)
}
// expandFn expands a glob pattern into all matching file names.
@@ -108,36 +106,29 @@ func expandFn(ctx context.Context, glob string, emit
func(string)) error {
return nil
}
-// sizeFn pairs a filename with the size of that file in bytes.
-// TODO(https://github.com/apache/beam/issues/20607): Once
CreateInitialRestriction supports Context params and
-// error return values, this can be done in readSdfFn.CreateInitialRestriction.
-func sizeFn(ctx context.Context, filename string) (string, int64, error) {
+// readFn reads individual lines from a text file. Implemented as an SDF
+// to allow splitting within a file.
+type readFn struct {
+}
+
+// CreateInitialRestriction creates an offset range restriction representing
+// the file's size in bytes.
+func (fn *readFn) CreateInitialRestriction(ctx context.Context, filename
string) (offsetrange.Restriction, error) {
fs, err := filesystem.New(ctx, filename)
if err != nil {
- return "", -1, err
+ return offsetrange.Restriction{}, err
}
defer fs.Close()
size, err := fs.Size(ctx, filename)
if err != nil {
- return "", -1, err
+ return offsetrange.Restriction{}, err
}
- return filename, size, nil
-}
-// readFn reads individual lines from a text file, given a filename and a
-// size in bytes for that file. Implemented as an SDF to allow splitting
-// within a file.
-type readFn struct {
-}
-
-// CreateInitialRestriction creates an offset range restriction representing
-// the file, using the paired size rather than fetching the file's size.
-func (fn *readFn) CreateInitialRestriction(_ string, size int64)
offsetrange.Restriction {
return offsetrange.Restriction{
Start: 0,
End: size,
- }
+ }, nil
}
const (
@@ -150,7 +141,7 @@ const (
// SplitRestriction splits each file restriction into blocks of a predeterined
// size, with some checks to avoid having small remainders.
-func (fn *readFn) SplitRestriction(_ string, _ int64, rest
offsetrange.Restriction) []offsetrange.Restriction {
+func (fn *readFn) SplitRestriction(_ string, rest offsetrange.Restriction)
[]offsetrange.Restriction {
splits := rest.SizedSplits(blockSize)
numSplits := len(splits)
if numSplits > 1 {
@@ -165,7 +156,7 @@ func (fn *readFn) SplitRestriction(_ string, _ int64, rest
offsetrange.Restricti
}
// Size returns the size of each restriction as its range.
-func (fn *readFn) RestrictionSize(_ string, _ int64, rest
offsetrange.Restriction) float64 {
+func (fn *readFn) RestrictionSize(_ string, rest offsetrange.Restriction)
float64 {
return rest.Size()
}
@@ -183,7 +174,7 @@ func (fn *readFn) CreateTracker(rest
offsetrange.Restriction) *sdf.LockRTracker
// begin within the restriction and past the restriction (those are entirely
// output, including the portion outside the restriction). In some cases a
// valid restriction might not output any lines.
-func (fn *readFn) ProcessElement(ctx context.Context, rt *sdf.LockRTracker,
filename string, _ int64, emit func(string)) error {
+func (fn *readFn) ProcessElement(ctx context.Context, rt *sdf.LockRTracker,
filename string, emit func(string)) error {
log.Infof(ctx, "Reading from %v", filename)
fs, err := filesystem.New(ctx, filename)