[
https://issues.apache.org/jira/browse/BEAM-7726?focusedWorklogId=278619&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-278619
]
ASF GitHub Bot logged work on BEAM-7726:
----------------------------------------
Author: ASF GitHub Bot
Created on: 18/Jul/19 00:13
Start Date: 18/Jul/19 00:13
Worklog Time Spent: 10m
Work Description: youngoli commented on pull request #9080: [BEAM-7726]
Implement State Backed Iterables in Go SDK
URL: https://github.com/apache/beam/pull/9080#discussion_r304680191
##########
File path: sdks/go/pkg/beam/core/runtime/exec/datasource.go
##########
@@ -72,117 +79,129 @@ func (n *DataSource) Process(ctx context.Context) error {
c := coder.SkipW(n.Coder)
wc := MakeWindowDecoder(n.Coder.Window)
+ var cp ElementDecoder // Decoder for the primary element or the key
in CoGBKs.
+ var cvs []ElementDecoder // Decoders for each value stream in CoGBKs.
+
switch {
case coder.IsCoGBK(c):
- ck := MakeElementDecoder(c.Components[0])
- cv := MakeElementDecoder(c.Components[1])
+ cp = MakeElementDecoder(c.Components[0])
- for {
- if n.IncrementCountAndCheckSplit(ctx) {
+ // TODO(BEAM-490): Support multiple value streams (coder
components) with
+ // with CoGBK.
+ cvs = []ElementDecoder{MakeElementDecoder(c.Components[1])}
+ default:
+ cp = MakeElementDecoder(c)
+ }
+
+ for {
+ if n.IncrementCountAndCheckSplit(ctx) {
+ return nil
+ }
+ ws, t, err := DecodeWindowedValueHeader(wc, r)
+ if err != nil {
+ if err == io.EOF {
return nil
}
- ws, t, err := DecodeWindowedValueHeader(wc, r)
- if err != nil {
- if err == io.EOF {
- return nil
- }
- return errors.Wrap(err, "source failed")
- }
+ return errors.Wrap(err, "source failed")
+ }
- // Decode key
+ // Decode key or parallel element.
+ pe, err := cp.Decode(r)
+ if err != nil {
+ return errors.Wrap(err, "source decode failed")
+ }
+ pe.Timestamp = t
+ pe.Windows = ws
- key, err := ck.Decode(r)
+ var valReStreams []ReStream
+ for _, cv := range cvs {
+ values, err := n.makeReStream(ctx, pe, cv, r)
if err != nil {
- return errors.Wrap(err, "source decode failed")
+ return err
}
- key.Timestamp = t
- key.Windows = ws
+ valReStreams = append(valReStreams, values)
+ }
- // TODO(herohde) 4/30/2017: the State API will be
handle re-iterations
- // and only "small" value streams would be inline.
Presumably, that
- // would entail buffering the whole stream. We do that
for now.
+ if err := n.Out.ProcessElement(ctx, pe, valReStreams...); err
!= nil {
+ return err
+ }
+ }
+}
- var buf []FullValue
+func (n *DataSource) makeReStream(ctx context.Context, key *FullValue, cv
ElementDecoder, r io.ReadCloser) (ReStream, error) {
+ size, err := coder.DecodeInt32(r)
+ if err != nil {
+ return nil, errors.Wrap(err, "stream size decoding failed")
+ }
- size, err := coder.DecodeInt32(r)
+ switch {
+ case size >= 0:
+ // Single chunk streams are fully read in and buffered in
memory.
+ var buf []FullValue
+ buf, err = readStreamToBuffer(cv, r, int64(size), buf)
+ if err != nil {
+ return nil, err
+ }
+ return &FixedReStream{Buf: buf}, nil
+ case size == -1: // Shouldn't this be 0?
+ // Multi-chunked stream.
+ var buf []FullValue
+ for {
+ chunk, err := coder.DecodeVarInt(r)
if err != nil {
- return errors.Wrap(err, "stream size decoding
failed")
+ return nil, errors.Wrap(err, "stream chunk size
decoding failed")
}
-
- if size > -1 {
- // Single chunk stream.
-
- // log.Printf("Fixed size=%v", size)
- for i := int32(0); i < size; i++ {
- value, err := cv.Decode(r)
- if err != nil {
- return errors.Wrap(err, "stream
value decode failed")
- }
- buf = append(buf, *value)
+ // All done, escape out.
+ switch {
+ case chunk == 0: // End of stream, return buffer.
+ return &FixedReStream{Buf: buf}, nil
+ case chunk > 0: // Non-zero chunk, read that many
elements from the stream, and buffer them.
+ buf, err = readStreamToBuffer(cv, r, chunk, buf)
+ if err != nil {
+ return nil, err
}
- } else {
- // Multi-chunked stream.
-
- for {
- chunk, err := coder.DecodeVarUint64(r)
- if err != nil {
- return errors.Wrap(err, "stream
chunk size decoding failed")
- }
-
- // log.Printf("Chunk size=%v", chunk)
-
- if chunk == 0 {
- break
- }
-
- for i := uint64(0); i < chunk; i++ {
- value, err := cv.Decode(r)
- if err != nil {
- return errors.Wrap(err,
"stream value decode failed")
- }
- buf = append(buf, *value)
- }
+ case chunk == -1: // State backed iterable!
+ chunk, err := coder.DecodeVarInt(r)
Review comment:
Question: Why are we re-reading the chunk here? Is it just to get the next
element (which based on the rest of the code is the token's size)?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 278619)
Time Spent: 1.5h (was: 1h 20m)
> [Go SDK] State Backed Iterables
> -------------------------------
>
> Key: BEAM-7726
> URL: https://issues.apache.org/jira/browse/BEAM-7726
> Project: Beam
> Issue Type: Improvement
> Components: sdk-go
> Affects Versions: Not applicable
> Reporter: Robert Burke
> Assignee: Robert Burke
> Priority: Major
> Fix For: Not applicable
>
> Time Spent: 1.5h
> Remaining Estimate: 0h
>
> The Go SDK should support the State backed iterables protocol per the proto.
> [https://github.com/apache/beam/blob/master/model/pipeline/src/main/proto/beam_runner_api.proto#L644]
>
> Primary case is for iterables after CoGBKs.
--
This message was sent by Atlassian JIRA
(v7.6.14#76016)