lostluck commented on code in PR #27737:
URL: https://github.com/apache/beam/pull/27737#discussion_r1279435001
##########
sdks/go/pkg/beam/runners/prism/internal/handlerunner.go:
##########
@@ -63,13 +64,72 @@ func (*runner) ConfigCharacteristic() reflect.Type {
return reflect.TypeOf((*RunnerCharacteristic)(nil)).Elem()
}
+var _ transformPreparer = (*runner)(nil)
+
+func (*runner) PrepareUrns() []string {
+ return []string{urns.TransformReshuffle}
+}
+
+// PrepareTransform handles special processing with respect runner transforms,
like reshuffle.
+func (h *runner) PrepareTransform(tid string, t *pipepb.PTransform, comps
*pipepb.Components) (*pipepb.Components, []string) {
+ // TODO: Implement the windowing strategy the "backup" transforms used
for Reshuffle.
+ // TODO: Implement a fusion break for reshuffles.
+
+ if h.config.SDKReshuffle {
+ panic("SDK side reshuffle not yet supported")
+ }
+
+ // A Reshuffle, in principle, is a no-op on the pipeline structure, WRT
correctness.
+ // It could however affect performance, so it exists to tell the runner
that this
+ // point in the pipeline needs a fusion break, to enable the pipeline
to change it's
+ // degree of parallelism.
+ //
+ // The change of parallelism goes both ways. It could allow for larger
batch sizes
+ // enable smaller batch sizes downstream if it is infact paralleizable.
+ //
+ // But for a single transform node per stage runner, we can elide it
entirely,
+ // since the input collection and output collection types match.
+
+ // Get the input and output PCollections, there should only be 1 each.
+ if len(t.GetOutputs()) != 1 {
+ panic("Expected single putput PCollection in reshuffle: " +
prototext.Format(t))
+ }
+ if len(t.GetOutputs()) != 1 {
+ panic("Expected single putput PCollection in reshuffle: " +
prototext.Format(t))
+ }
Review Comment:
lol. putput. Good catch!
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]