lostluck commented on code in PR #22057:
URL: https://github.com/apache/beam/pull/22057#discussion_r919519283


##########
sdks/go/pkg/beam/core/runtime/exec/datasource.go:
##########
@@ -236,24 +249,67 @@ func (n *DataSource) makeReStream(ctx context.Context, 
key *FullValue, cv Elemen
                                                        // We can't re-use the 
original bcr, since we may get new iterables,
                                                        // or multiple of them 
at the same time, but we can re-use the count itself.
                                                        r = 
&byteCountReader{reader: r, count: bcr.count}
-                                                       return 
&elementStream{r: r, ec: cv}, nil
+                                                       return 
&elementStream{r: r, ec: decoder}, nil
                                                },
-                                       },
-                               }, nil
-                       default:
-                               return nil, errors.Errorf("multi-chunk stream 
with invalid chunk size of %d", chunk)
+                                       })
+                                       return nil
+                               default:
+                                       return errors.Errorf("multi-chunk 
stream with invalid chunk size of %d", chunkSize)
+                               }
                        }
                }
+               if err := createChunkReStreams(); err != nil {
+                       return nil, nopCloser{}, err
+               }
+               closeChunkReStreamsEarly = false
+               return newConcatReStream(chunkReStreams...), 
chunkReStreamsCloser, nil
        default:
-               return nil, errors.Errorf("received stream with marker size of 
%d", size)
+               return nil, nopCloser{}, errors.Errorf("received stream with 
marker size of %d", size)
        }
 }
 
-func readStreamToBuffer(cv ElementDecoder, r io.ReadCloser, size int64, buf 
[]FullValue) ([]FullValue, error) {
-       for i := int64(0); i < size; i++ {
-               value, err := cv.Decode(r)
+var readStreamToReStream ReStreamFactory = DefaultReadStreamToReStream
+
+// ReStreamFactory is a function that constructs a ReStream from an io.Reader
+// and a coder for type of elements that need to be decoded. A ReStreamFactory
+// is used by the SDK hardness to transform a byte stream into a stream of
+// FullValues while executing a DoFn that takes an iterator as once of its
+// arguments (GBK and CoGBK DoFns).
+//
+// The factory should return a ReStream that decodes numElements elements from
+// the encodedStream reader. After the DoFn that uses the stream has finished,
+// the second return value will be called to close the ReStream; this provides
+// the factory an opportunity to release any resources associated with the
+// returned ReStream.
+//
+// DefaultReadSTreamToReStream is the default ReStreamFactory that is used by
+// the exec package
+type ReStreamFactory func(ctx context.Context, encodedStream io.Reader, 
numElements int64, coder *coder.Coder) (ReStream, func() error, error)
+
+// SetReStreamFactory overrides the default behavior for constructing a 
ReStream
+// for DoFns that iterate over values (GBK and CoGBK).
+//
+// The default implementation of this function is DefaultReadStreamToBuffer.
+func SetReStreamFactory(fn ReStreamFactory) {

Review Comment:
   Per discussion, please label this as Experimental, and link to the issue 
we're attempting to solve with this.



##########
sdks/go/pkg/beam/core/runtime/exec/datasource.go:
##########
@@ -164,70 +164,83 @@ func (n *DataSource) Process(ctx context.Context) error {
                pe.Pane = pn
 
                var valReStreams []ReStream
-               for _, cv := range cvs {
-                       values, err := n.makeReStream(ctx, pe, cv, &bcr)
+               reStreamCloser := &multiOnceCloser{}
+               defer reStreamCloser.Close()
+               for _, cod := range valueCoders {
+                       values, closer, err := n.makeReStream(ctx, pe, cod, 
&dataReaderCounted)
                        if err != nil {
                                return err
                        }
                        valReStreams = append(valReStreams, values)
+                       reStreamCloser.children = 
append(reStreamCloser.children, closer)
                }
 
                if err := n.Out.ProcessElement(ctx, pe, valReStreams...); err 
!= nil {
                        return err
                }
                // Collect the actual size of the element, and reset the 
bytecounter reader.
-               n.PCol.addSize(int64(bcr.reset()))
-               bcr.reader = r
+               n.PCol.addSize(int64(dataReaderCounted.reset()))
+               dataReaderCounted.reader = dataReader
+
+               if err := reStreamCloser.Close(); err != nil {
+                       return fmt.Errorf("error closing ReStream after 
processing element: %w", err)
+               }
        }
 }
 
-func (n *DataSource) makeReStream(ctx context.Context, key *FullValue, cv 
ElementDecoder, bcr *byteCountReader) (ReStream, error) {
+func (n *DataSource) makeReStream(ctx context.Context, key *FullValue, 
elemCoder *coder.Coder, bcr *byteCountReader) (ReStream, io.Closer, error) {
        // TODO(lostluck) 2020/02/22: Do we include the chunk size, or just the 
element sizes?
        size, err := coder.DecodeInt32(bcr.reader)
        if err != nil {
-               return nil, errors.Wrap(err, "stream size decoding failed")
+               return nil, nopCloser{}, errors.Wrap(err, "stream size decoding 
failed")
        }
 
        switch {
        case size >= 0:
                // Single chunk streams are fully read in and buffered in 
memory.
-               buf := make([]FullValue, 0, size)
-               buf, err = readStreamToBuffer(cv, bcr, int64(size), buf)
-               if err != nil {
-                       return nil, err
-               }
-               return &FixedReStream{Buf: buf}, nil
+               stream, cleanupFn, err := readStreamToReStream(ctx, bcr, 
int64(size), elemCoder)
+               return stream, closeFunc(cleanupFn), err
        case size == -1:
+               decoder := MakeElementDecoder(elemCoder)
                // Multi-chunked stream.
-               var buf []FullValue
-               for {
-                       chunk, err := coder.DecodeVarInt(bcr.reader)
-                       if err != nil {
-                               return nil, errors.Wrap(err, "stream chunk size 
decoding failed")
+               var chunkReStreams []ReStream
+               chunkReStreamsCloser := &multiOnceCloser{}
+               closeChunkReStreamsEarly := true
+               defer func() {
+                       if !closeChunkReStreamsEarly {
+                               return
                        }
-                       // All done, escape out.
-                       switch {
-                       case chunk == 0: // End of stream, return buffer.
-                               return &FixedReStream{Buf: buf}, nil
-                       case chunk > 0: // Non-zero chunk, read that many 
elements from the stream, and buffer them.
-                               chunkBuf := make([]FullValue, 0, chunk)
-                               chunkBuf, err = readStreamToBuffer(cv, bcr, 
chunk, chunkBuf)
-                               if err != nil {
-                                       return nil, err
-                               }
-                               buf = append(buf, chunkBuf...)
-                       case chunk == -1: // State backed iterable!
-                               chunk, err := coder.DecodeVarInt(bcr.reader)
-                               if err != nil {
-                                       return nil, err
-                               }
-                               token, err := ioutilx.ReadN(bcr.reader, 
(int)(chunk))
+                       chunkReStreamsCloser.Close() // ignore error because 
makeReStream is already returning an error in this case.
+               }()
+               // createChunkReStreams appends to chunkStreams and
+               // chunkStreamsCloser.children
+               createChunkReStreams := func() error {

Review Comment:
   Indirecting the error through an anon closure doesn't help with code 
readability here. The closure is too long for a meaningful readability benefit.
   
   I'd strongly prefer that we move this out to a named method and pass 
parameters in and out, instead of making hard to follow code harder to follow.
   
   And looking at this again, is this just to avoid adding a `nopCloser{}` 
return parameter on error cases? That would be clearer than the indirection.



##########
sdks/go/pkg/beam/core/runtime/exec/datasource.go:
##########
@@ -619,6 +675,18 @@ type concatReStream struct {
        first, next ReStream
 }
 
+func newConcatReStream(streams ...ReStream) *concatReStream {
+       if len(streams) == 0 {
+               streams = []ReStream{&FixedReStream{}}
+       }
+       first := streams[0]
+       rest := streams[1:]
+       if len(rest) == 0 {
+               return &concatReStream{first: first, next: nil}
+       }
+       return &concatReStream{first: first, next: newConcatReStream(rest...)}

Review Comment:
   I'm not a huge fan of the recursive linked list construction here, but I 
think in practice this will end up being minimally used, if ever. But this is 
probably less fiddly than alternative approaches.



##########
sdks/go/pkg/beam/core/runtime/exec/datasource.go:
##########
@@ -236,24 +249,67 @@ func (n *DataSource) makeReStream(ctx context.Context, 
key *FullValue, cv Elemen
                                                        // We can't re-use the 
original bcr, since we may get new iterables,
                                                        // or multiple of them 
at the same time, but we can re-use the count itself.
                                                        r = 
&byteCountReader{reader: r, count: bcr.count}
-                                                       return 
&elementStream{r: r, ec: cv}, nil
+                                                       return 
&elementStream{r: r, ec: decoder}, nil
                                                },
-                                       },
-                               }, nil
-                       default:
-                               return nil, errors.Errorf("multi-chunk stream 
with invalid chunk size of %d", chunk)
+                                       })
+                                       return nil
+                               default:
+                                       return errors.Errorf("multi-chunk 
stream with invalid chunk size of %d", chunkSize)
+                               }
                        }
                }
+               if err := createChunkReStreams(); err != nil {
+                       return nil, nopCloser{}, err
+               }
+               closeChunkReStreamsEarly = false
+               return newConcatReStream(chunkReStreams...), 
chunkReStreamsCloser, nil
        default:
-               return nil, errors.Errorf("received stream with marker size of 
%d", size)
+               return nil, nopCloser{}, errors.Errorf("received stream with 
marker size of %d", size)
        }
 }
 
-func readStreamToBuffer(cv ElementDecoder, r io.ReadCloser, size int64, buf 
[]FullValue) ([]FullValue, error) {
-       for i := int64(0); i < size; i++ {
-               value, err := cv.Decode(r)
+var readStreamToReStream ReStreamFactory = DefaultReadStreamToReStream
+
+// ReStreamFactory is a function that constructs a ReStream from an io.Reader
+// and a coder for type of elements that need to be decoded. A ReStreamFactory
+// is used by the SDK hardness to transform a byte stream into a stream of
+// FullValues while executing a DoFn that takes an iterator as once of its
+// arguments (GBK and CoGBK DoFns).
+//
+// The factory should return a ReStream that decodes numElements elements from
+// the encodedStream reader. After the DoFn that uses the stream has finished,
+// the second return value will be called to close the ReStream; this provides
+// the factory an opportunity to release any resources associated with the
+// returned ReStream.
+//
+// DefaultReadSTreamToReStream is the default ReStreamFactory that is used by
+// the exec package
+type ReStreamFactory func(ctx context.Context, encodedStream io.Reader, 
numElements int64, coder *coder.Coder) (ReStream, func() error, error)
+
+// SetReStreamFactory overrides the default behavior for constructing a 
ReStream
+// for DoFns that iterate over values (GBK and CoGBK).
+//
+// The default implementation of this function is DefaultReadStreamToBuffer.
+func SetReStreamFactory(fn ReStreamFactory) {
+       readStreamToReStream = fn
+}
+
+// DefaultReadStreamToReStream reads numElements from the byteStream using the
+// element decoder dec and returns an in-memory ReStream.
+func DefaultReadStreamToReStream(_ context.Context, encodedStream io.Reader, 
numElements int64, coder *coder.Coder) (ReStream, func() error, error) {

Review Comment:
   Per discussion, please unexport the default implementation.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to