lostluck commented on code in PR #29590:
URL: https://github.com/apache/beam/pull/29590#discussion_r1416404011
##########
sdks/go/pkg/beam/core/runtime/exec/pcollection.go:
##########
@@ -96,6 +102,13 @@ func (p *PCollection) ProcessElement(ctx context.Context,
elm *FullValue, values
var w byteCounter
p.elementCoder.Encode(elm, &w)
p.addSize(int64(w.count))
+
+ if p.dataSampler != nil {
+ var buf bytes.Buffer
+ EncodeWindowedValueHeader(p.windowCoder, elm.Windows,
elm.Timestamp, elm.Pane, &buf)
+ p.elementCoder.Encode(elm, &buf)
Review Comment:
With this approach, we're encoding the element twice; once to get a size
estimate in line 103, and another to actually get the encoded value. I'd
recommend we have a branch instead, and do a bit of arithmetic to avoid
doubling the encoding cost on sampling.
```
if p.datasampler == nil {
var w byteCounter
p.elementCoder.Encode(elm, &w)
p.addSize(int64(w.count))
} else {
var buf bytes.Buffer
EncodeWindowedValueHeader(p.windowCoder, elm.Windows,
elm.Timestamp, elm.Pane, &buf)
winSize := buf.Len()
p.elementCoder.Encode(elm, &buf)
p.addSize(int64(buf.Len() - winSize))
p.dataSampler.SendSample(p.PColID, buf.Bytes(),
time.Now())
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]