lostluck commented on code in PR #22904:
URL: https://github.com/apache/beam/pull/22904#discussion_r956214350


##########
sdks/go/pkg/beam/core/runtime/exec/fullvalue.go:
##########
@@ -189,3 +191,173 @@ func ReadAll(rs ReStream) ([]FullValue, error) {
                ret = append(ret, *elm)
        }
 }
+
+// decodeReStream is a decode on demand ReStream.
+// Can only produce a single Stream because it consumes the reader.
+// Must not be used for streams that might be re-iterated, causing Open
+// to be called twice.
+type decodeReStream struct {

Review Comment:
   I'm going to drop the "decode" from the restream levels, so 
singleUseRestream. Simpler to understand that way.
   
   The Stream levels are pretty standard decode streams at least.
   
   



##########
sdks/go/pkg/beam/core/runtime/exec/fullvalue.go:
##########
@@ -189,3 +191,173 @@ func ReadAll(rs ReStream) ([]FullValue, error) {
                ret = append(ret, *elm)
        }
 }
+
+// decodeReStream is a decode on demand ReStream.
+// Can only produce a single Stream because it consumes the reader.
+// Must not be used for streams that might be re-iterated, causing Open
+// to be called twice.
+type decodeReStream struct {
+       r    io.Reader
+       d    ElementDecoder
+       size int // The number of elements in this stream.
+}
+
+// Open returns the Stream from the start of the in-memory reader. Returns 
error if called twice.
+func (n *decodeReStream) Open() (Stream, error) {
+       if n.r == nil {
+               return nil, errors.New("decodeReStream opened twice!")
+       }
+       ret := &decodeStream{r: n.r, d: n.d, size: n.size}
+       n.r = nil
+       n.d = nil
+       return ret, nil
+}
+
+// decodeStream is a decode on demand Stream, that decodes size elements from 
the provided
+// io.Reader.
+type decodeStream struct {
+       r          io.Reader
+       d          ElementDecoder
+       next, size int
+       ret        FullValue
+}
+
+// Close causes subsequent calls to Read to return io.EOF, and drains the 
remaining element count
+// from the reader.
+func (s *decodeStream) Close() error {
+       // On close, if next != size, we must iterate through the rest of the 
decoding
+       // until the reader is drained. Otherwise we corrupt the read for the 
next element.
+       // TODO: Optimize the case where we have length prefixed values

Review Comment:
   We do, I missed it here. Same as the other one.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to