damccorm commented on code in PR #22904:
URL: https://github.com/apache/beam/pull/22904#discussion_r956081165


##########
sdks/go/pkg/beam/core/runtime/exec/fullvalue.go:
##########
@@ -189,3 +191,173 @@ func ReadAll(rs ReStream) ([]FullValue, error) {
                ret = append(ret, *elm)
        }
 }
+
+// decodeReStream is a decode on demand ReStream.
+// Can only produce a single Stream because it consumes the reader.
+// Must not be used for streams that might be re-iterated, causing Open
+// to be called twice.
+type decodeReStream struct {
+       r    io.Reader
+       d    ElementDecoder
+       size int // The number of elements in this stream.
+}
+
+// Open returns the Stream from the start of the in-memory reader. Returns 
error if called twice.
+func (n *decodeReStream) Open() (Stream, error) {
+       if n.r == nil {
+               return nil, errors.New("decodeReStream opened twice!")
+       }
+       ret := &decodeStream{r: n.r, d: n.d, size: n.size}
+       n.r = nil
+       n.d = nil
+       return ret, nil
+}
+
+// decodeStream is a decode on demand Stream, that decodes size elements from 
the provided
+// io.Reader.
+type decodeStream struct {
+       r          io.Reader
+       d          ElementDecoder
+       next, size int
+       ret        FullValue
+}
+
+// Close causes subsequent calls to Read to return io.EOF, and drains the 
remaining element count
+// from the reader.
+func (s *decodeStream) Close() error {
+       // On close, if next != size, we must iterate through the rest of the 
decoding
+       // until the reader is drained. Otherwise we corrupt the read for the 
next element.
+       // TODO: Optimize the case where we have length prefixed values

Review Comment:
   Nit - should we have an issue for this TODO?



##########
sdks/go/pkg/beam/core/runtime/exec/fullvalue.go:
##########
@@ -189,3 +191,173 @@ func ReadAll(rs ReStream) ([]FullValue, error) {
                ret = append(ret, *elm)
        }
 }
+
+// decodeReStream is a decode on demand ReStream.
+// Can only produce a single Stream because it consumes the reader.
+// Must not be used for streams that might be re-iterated, causing Open
+// to be called twice.
+type decodeReStream struct {

Review Comment:
   One (unfortunately verbose) option is `singleUseDecodeReStream`?



##########
sdks/go/pkg/beam/core/runtime/exec/fullvalue.go:
##########
@@ -189,3 +191,173 @@ func ReadAll(rs ReStream) ([]FullValue, error) {
                ret = append(ret, *elm)
        }
 }
+
+// decodeReStream is a decode on demand ReStream.
+// Can only produce a single Stream because it consumes the reader.
+// Must not be used for streams that might be re-iterated, causing Open
+// to be called twice.
+type decodeReStream struct {

Review Comment:
   Naming nit - does it still make sense to call this a ReStream? ReStream 
suggests to me that it can be iterated multiple times, and I think that's what 
the interface itself suggests - 
https://github.com/apache/beam/blob/48bad7d966a583055669850eb9fb558782f636a8/sdks/go/pkg/beam/core/runtime/exec/fullvalue.go#L60



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to