This is an automated email from the ASF dual-hosted git repository.

lostluck pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new d652d054ecd [Go SDK]: Retrieve file size in CreateInitialRestriction 
in textio.Read (#25535)
d652d054ecd is described below

commit d652d054ecd80e9406430b878fb1c7a105ae17ed
Author: Johanna Öjeling <[email protected]>
AuthorDate: Sun Feb 19 23:31:25 2023 +0100

    [Go SDK]: Retrieve file size in CreateInitialRestriction in textio.Read 
(#25535)
    
    * Retrieve file size in CreateInitialRestriction in textio.Read
    
    * Update readFn doc comment
---
 sdks/go/pkg/beam/io/textio/textio.go | 39 ++++++++++++++----------------------
 1 file changed, 15 insertions(+), 24 deletions(-)

diff --git a/sdks/go/pkg/beam/io/textio/textio.go 
b/sdks/go/pkg/beam/io/textio/textio.go
index a6f909aea1a..ed8be0a42b2 100644
--- a/sdks/go/pkg/beam/io/textio/textio.go
+++ b/sdks/go/pkg/beam/io/textio/textio.go
@@ -34,7 +34,6 @@ import (
 
 func init() {
        beam.RegisterType(reflect.TypeOf((*readFn)(nil)).Elem())
-       beam.RegisterFunction(sizeFn)
        beam.RegisterType(reflect.TypeOf((*writeFileFn)(nil)).Elem())
        beam.RegisterFunction(expandFn)
 }
@@ -82,8 +81,7 @@ func ReadAllSdf(s beam.Scope, col beam.PCollection) 
beam.PCollection {
 // into separate bundles.
 func read(s beam.Scope, col beam.PCollection) beam.PCollection {
        files := beam.ParDo(s, expandFn, col)
-       sized := beam.ParDo(s, sizeFn, files)
-       return beam.ParDo(s, &readFn{}, sized)
+       return beam.ParDo(s, &readFn{}, files)
 }
 
 // expandFn expands a glob pattern into all matching file names.
@@ -108,36 +106,29 @@ func expandFn(ctx context.Context, glob string, emit 
func(string)) error {
        return nil
 }
 
-// sizeFn pairs a filename with the size of that file in bytes.
-// TODO(https://github.com/apache/beam/issues/20607): Once 
CreateInitialRestriction supports Context params and
-// error return values, this can be done in readSdfFn.CreateInitialRestriction.
-func sizeFn(ctx context.Context, filename string) (string, int64, error) {
+// readFn reads individual lines from a text file. Implemented as an SDF
+// to allow splitting within a file.
+type readFn struct {
+}
+
+// CreateInitialRestriction creates an offset range restriction representing
+// the file's size in bytes.
+func (fn *readFn) CreateInitialRestriction(ctx context.Context, filename 
string) (offsetrange.Restriction, error) {
        fs, err := filesystem.New(ctx, filename)
        if err != nil {
-               return "", -1, err
+               return offsetrange.Restriction{}, err
        }
        defer fs.Close()
 
        size, err := fs.Size(ctx, filename)
        if err != nil {
-               return "", -1, err
+               return offsetrange.Restriction{}, err
        }
-       return filename, size, nil
-}
 
-// readFn reads individual lines from a text file, given a filename and a
-// size in bytes for that file. Implemented as an SDF to allow splitting
-// within a file.
-type readFn struct {
-}
-
-// CreateInitialRestriction creates an offset range restriction representing
-// the file, using the paired size rather than fetching the file's size.
-func (fn *readFn) CreateInitialRestriction(_ string, size int64) 
offsetrange.Restriction {
        return offsetrange.Restriction{
                Start: 0,
                End:   size,
-       }
+       }, nil
 }
 
 const (
@@ -150,7 +141,7 @@ const (
 
 // SplitRestriction splits each file restriction into blocks of a predeterined
 // size, with some checks to avoid having small remainders.
-func (fn *readFn) SplitRestriction(_ string, _ int64, rest 
offsetrange.Restriction) []offsetrange.Restriction {
+func (fn *readFn) SplitRestriction(_ string, rest offsetrange.Restriction) 
[]offsetrange.Restriction {
        splits := rest.SizedSplits(blockSize)
        numSplits := len(splits)
        if numSplits > 1 {
@@ -165,7 +156,7 @@ func (fn *readFn) SplitRestriction(_ string, _ int64, rest 
offsetrange.Restricti
 }
 
 // Size returns the size of each restriction as its range.
-func (fn *readFn) RestrictionSize(_ string, _ int64, rest 
offsetrange.Restriction) float64 {
+func (fn *readFn) RestrictionSize(_ string, rest offsetrange.Restriction) 
float64 {
        return rest.Size()
 }
 
@@ -183,7 +174,7 @@ func (fn *readFn) CreateTracker(rest 
offsetrange.Restriction) *sdf.LockRTracker
 // begin within the restriction and past the restriction (those are entirely
 // output, including the portion outside the restriction). In some cases a
 // valid restriction might not output any lines.
-func (fn *readFn) ProcessElement(ctx context.Context, rt *sdf.LockRTracker, 
filename string, _ int64, emit func(string)) error {
+func (fn *readFn) ProcessElement(ctx context.Context, rt *sdf.LockRTracker, 
filename string, emit func(string)) error {
        log.Infof(ctx, "Reading from %v", filename)
 
        fs, err := filesystem.New(ctx, filename)

Reply via email to