rohdesamuel commented on code in PR #29590:
URL: https://github.com/apache/beam/pull/29590#discussion_r1419740273
##########
sdks/go/pkg/beam/core/runtime/exec/pcollection.go:
##########
@@ -93,9 +99,19 @@ func (p *PCollection) ProcessElement(ctx context.Context,
elm *FullValue, values
} else {
p.nextSampleIdx = cur + p.r.Int63n(cur/10+2) + 1
}
- var w byteCounter
- p.elementCoder.Encode(elm, &w)
- p.addSize(int64(w.count))
+
+ if p.dataSampler == nil {
+ var w byteCounter
+ p.elementCoder.Encode(elm, &w)
+ p.addSize(int64(w.count))
+ } else {
+ var buf bytes.Buffer
+ EncodeWindowedValueHeader(p.windowCoder, elm.Windows,
elm.Timestamp, elm.Pane, &buf)
+ winSize := buf.Len()
+ p.elementCoder.Encode(elm, &buf)
Review Comment:
@lostluck , is there every a case where the windowCoder is nil? Will the
encoding always be a windowed value?
##########
sdks/go/pkg/beam/core/runtime/harness/init/init.go:
##########
@@ -91,6 +100,13 @@ func hook() {
os.Exit(1)
}
runtime.GlobalOptions.Import(opt.Options)
+ var experiments []string
+ if e, ok := opt.Options.Options["experiments"]; ok {
+ experiments = strings.Split(e, ",")
+ }
+ if slices.Contains(experiments, "enable_data_sampling") {
+ runnerCapabilities = append(runnerCapabilities,
graphx.URNDataSampling)
+ }
Review Comment:
Can you please add a TODO here to remove once the data sampling URN is
properly sent in via the capabilities?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]