lostluck commented on code in PR #22057:
URL: https://github.com/apache/beam/pull/22057#discussion_r919519283
##########
sdks/go/pkg/beam/core/runtime/exec/datasource.go:
##########
@@ -236,24 +249,67 @@ func (n *DataSource) makeReStream(ctx context.Context,
key *FullValue, cv Elemen
// We can't re-use the
original bcr, since we may get new iterables,
// or multiple of them
at the same time, but we can re-use the count itself.
r =
&byteCountReader{reader: r, count: bcr.count}
- return
&elementStream{r: r, ec: cv}, nil
+ return
&elementStream{r: r, ec: decoder}, nil
},
- },
- }, nil
- default:
- return nil, errors.Errorf("multi-chunk stream
with invalid chunk size of %d", chunk)
+ })
+ return nil
+ default:
+ return errors.Errorf("multi-chunk
stream with invalid chunk size of %d", chunkSize)
+ }
}
}
+ if err := createChunkReStreams(); err != nil {
+ return nil, nopCloser{}, err
+ }
+ closeChunkReStreamsEarly = false
+ return newConcatReStream(chunkReStreams...),
chunkReStreamsCloser, nil
default:
- return nil, errors.Errorf("received stream with marker size of
%d", size)
+ return nil, nopCloser{}, errors.Errorf("received stream with
marker size of %d", size)
}
}
-func readStreamToBuffer(cv ElementDecoder, r io.ReadCloser, size int64, buf
[]FullValue) ([]FullValue, error) {
- for i := int64(0); i < size; i++ {
- value, err := cv.Decode(r)
+var readStreamToReStream ReStreamFactory = DefaultReadStreamToReStream
+
+// ReStreamFactory is a function that constructs a ReStream from an io.Reader
+// and a coder for type of elements that need to be decoded. A ReStreamFactory
+// is used by the SDK hardness to transform a byte stream into a stream of
+// FullValues while executing a DoFn that takes an iterator as once of its
+// arguments (GBK and CoGBK DoFns).
+//
+// The factory should return a ReStream that decodes numElements elements from
+// the encodedStream reader. After the DoFn that uses the stream has finished,
+// the second return value will be called to close the ReStream; this provides
+// the factory an opportunity to release any resources associated with the
+// returned ReStream.
+//
+// DefaultReadSTreamToReStream is the default ReStreamFactory that is used by
+// the exec package
+type ReStreamFactory func(ctx context.Context, encodedStream io.Reader,
numElements int64, coder *coder.Coder) (ReStream, func() error, error)
+
+// SetReStreamFactory overrides the default behavior for constructing a
ReStream
+// for DoFns that iterate over values (GBK and CoGBK).
+//
+// The default implementation of this function is DefaultReadStreamToBuffer.
+func SetReStreamFactory(fn ReStreamFactory) {
Review Comment:
Per discussion, please label this as Experimental, and link to the issue
we're attempting to solve with this.
##########
sdks/go/pkg/beam/core/runtime/exec/datasource.go:
##########
@@ -164,70 +164,83 @@ func (n *DataSource) Process(ctx context.Context) error {
pe.Pane = pn
var valReStreams []ReStream
- for _, cv := range cvs {
- values, err := n.makeReStream(ctx, pe, cv, &bcr)
+ reStreamCloser := &multiOnceCloser{}
+ defer reStreamCloser.Close()
+ for _, cod := range valueCoders {
+ values, closer, err := n.makeReStream(ctx, pe, cod,
&dataReaderCounted)
if err != nil {
return err
}
valReStreams = append(valReStreams, values)
+ reStreamCloser.children =
append(reStreamCloser.children, closer)
}
if err := n.Out.ProcessElement(ctx, pe, valReStreams...); err
!= nil {
return err
}
// Collect the actual size of the element, and reset the
bytecounter reader.
- n.PCol.addSize(int64(bcr.reset()))
- bcr.reader = r
+ n.PCol.addSize(int64(dataReaderCounted.reset()))
+ dataReaderCounted.reader = dataReader
+
+ if err := reStreamCloser.Close(); err != nil {
+ return fmt.Errorf("error closing ReStream after
processing element: %w", err)
+ }
}
}
-func (n *DataSource) makeReStream(ctx context.Context, key *FullValue, cv
ElementDecoder, bcr *byteCountReader) (ReStream, error) {
+func (n *DataSource) makeReStream(ctx context.Context, key *FullValue,
elemCoder *coder.Coder, bcr *byteCountReader) (ReStream, io.Closer, error) {
// TODO(lostluck) 2020/02/22: Do we include the chunk size, or just the
element sizes?
size, err := coder.DecodeInt32(bcr.reader)
if err != nil {
- return nil, errors.Wrap(err, "stream size decoding failed")
+ return nil, nopCloser{}, errors.Wrap(err, "stream size decoding
failed")
}
switch {
case size >= 0:
// Single chunk streams are fully read in and buffered in
memory.
- buf := make([]FullValue, 0, size)
- buf, err = readStreamToBuffer(cv, bcr, int64(size), buf)
- if err != nil {
- return nil, err
- }
- return &FixedReStream{Buf: buf}, nil
+ stream, cleanupFn, err := readStreamToReStream(ctx, bcr,
int64(size), elemCoder)
+ return stream, closeFunc(cleanupFn), err
case size == -1:
+ decoder := MakeElementDecoder(elemCoder)
// Multi-chunked stream.
- var buf []FullValue
- for {
- chunk, err := coder.DecodeVarInt(bcr.reader)
- if err != nil {
- return nil, errors.Wrap(err, "stream chunk size
decoding failed")
+ var chunkReStreams []ReStream
+ chunkReStreamsCloser := &multiOnceCloser{}
+ closeChunkReStreamsEarly := true
+ defer func() {
+ if !closeChunkReStreamsEarly {
+ return
}
- // All done, escape out.
- switch {
- case chunk == 0: // End of stream, return buffer.
- return &FixedReStream{Buf: buf}, nil
- case chunk > 0: // Non-zero chunk, read that many
elements from the stream, and buffer them.
- chunkBuf := make([]FullValue, 0, chunk)
- chunkBuf, err = readStreamToBuffer(cv, bcr,
chunk, chunkBuf)
- if err != nil {
- return nil, err
- }
- buf = append(buf, chunkBuf...)
- case chunk == -1: // State backed iterable!
- chunk, err := coder.DecodeVarInt(bcr.reader)
- if err != nil {
- return nil, err
- }
- token, err := ioutilx.ReadN(bcr.reader,
(int)(chunk))
+ chunkReStreamsCloser.Close() // ignore error because
makeReStream is already returning an error in this case.
+ }()
+ // createChunkReStreams appends to chunkStreams and
+ // chunkStreamsCloser.children
+ createChunkReStreams := func() error {
Review Comment:
Indirecting the error through an anon closure doesn't help with code
readability here. The closure is too long for a meaningful readability benefit.
I'd strongly prefer that we move this out to a named method and pass
parameters in and out, instead of making hard to follow code harder to follow.
And looking at this again, is this just to avoid adding a `nopCloser{}`
return parameter on error cases? That would be clearer than the indirection.
##########
sdks/go/pkg/beam/core/runtime/exec/datasource.go:
##########
@@ -619,6 +675,18 @@ type concatReStream struct {
first, next ReStream
}
+func newConcatReStream(streams ...ReStream) *concatReStream {
+ if len(streams) == 0 {
+ streams = []ReStream{&FixedReStream{}}
+ }
+ first := streams[0]
+ rest := streams[1:]
+ if len(rest) == 0 {
+ return &concatReStream{first: first, next: nil}
+ }
+ return &concatReStream{first: first, next: newConcatReStream(rest...)}
Review Comment:
I'm not a huge fan of the recursive linked list construction here, but I
think in practice this will end up being minimally used, if ever. But this is
probably less fiddly than alternative approaches.
##########
sdks/go/pkg/beam/core/runtime/exec/datasource.go:
##########
@@ -236,24 +249,67 @@ func (n *DataSource) makeReStream(ctx context.Context,
key *FullValue, cv Elemen
// We can't re-use the
original bcr, since we may get new iterables,
// or multiple of them
at the same time, but we can re-use the count itself.
r =
&byteCountReader{reader: r, count: bcr.count}
- return
&elementStream{r: r, ec: cv}, nil
+ return
&elementStream{r: r, ec: decoder}, nil
},
- },
- }, nil
- default:
- return nil, errors.Errorf("multi-chunk stream
with invalid chunk size of %d", chunk)
+ })
+ return nil
+ default:
+ return errors.Errorf("multi-chunk
stream with invalid chunk size of %d", chunkSize)
+ }
}
}
+ if err := createChunkReStreams(); err != nil {
+ return nil, nopCloser{}, err
+ }
+ closeChunkReStreamsEarly = false
+ return newConcatReStream(chunkReStreams...),
chunkReStreamsCloser, nil
default:
- return nil, errors.Errorf("received stream with marker size of
%d", size)
+ return nil, nopCloser{}, errors.Errorf("received stream with
marker size of %d", size)
}
}
-func readStreamToBuffer(cv ElementDecoder, r io.ReadCloser, size int64, buf
[]FullValue) ([]FullValue, error) {
- for i := int64(0); i < size; i++ {
- value, err := cv.Decode(r)
+var readStreamToReStream ReStreamFactory = DefaultReadStreamToReStream
+
+// ReStreamFactory is a function that constructs a ReStream from an io.Reader
+// and a coder for type of elements that need to be decoded. A ReStreamFactory
+// is used by the SDK hardness to transform a byte stream into a stream of
+// FullValues while executing a DoFn that takes an iterator as once of its
+// arguments (GBK and CoGBK DoFns).
+//
+// The factory should return a ReStream that decodes numElements elements from
+// the encodedStream reader. After the DoFn that uses the stream has finished,
+// the second return value will be called to close the ReStream; this provides
+// the factory an opportunity to release any resources associated with the
+// returned ReStream.
+//
+// DefaultReadSTreamToReStream is the default ReStreamFactory that is used by
+// the exec package
+type ReStreamFactory func(ctx context.Context, encodedStream io.Reader,
numElements int64, coder *coder.Coder) (ReStream, func() error, error)
+
+// SetReStreamFactory overrides the default behavior for constructing a
ReStream
+// for DoFns that iterate over values (GBK and CoGBK).
+//
+// The default implementation of this function is DefaultReadStreamToBuffer.
+func SetReStreamFactory(fn ReStreamFactory) {
+ readStreamToReStream = fn
+}
+
+// DefaultReadStreamToReStream reads numElements from the byteStream using the
+// element decoder dec and returns an in-memory ReStream.
+func DefaultReadStreamToReStream(_ context.Context, encodedStream io.Reader,
numElements int64, coder *coder.Coder) (ReStream, func() error, error) {
Review Comment:
Per discussion, please unexport the default implementation.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]