jrmccluskey commented on code in PR #27737:
URL: https://github.com/apache/beam/pull/27737#discussion_r1279335861


##########
sdks/go/pkg/beam/runners/prism/internal/handlerunner.go:
##########
@@ -63,13 +64,72 @@ func (*runner) ConfigCharacteristic() reflect.Type {
        return reflect.TypeOf((*RunnerCharacteristic)(nil)).Elem()
 }
 
+var _ transformPreparer = (*runner)(nil)
+
+func (*runner) PrepareUrns() []string {
+       return []string{urns.TransformReshuffle}
+}
+
+// PrepareTransform handles special processing with respect runner transforms, 
like reshuffle.
+func (h *runner) PrepareTransform(tid string, t *pipepb.PTransform, comps 
*pipepb.Components) (*pipepb.Components, []string) {
+       // TODO: Implement the windowing strategy the "backup" transforms used 
for Reshuffle.
+       // TODO: Implement a fusion break for reshuffles.
+
+       if h.config.SDKReshuffle {
+               panic("SDK side reshuffle not yet supported")
+       }
+
+       // A Reshuffle, in principle, is a no-op on the pipeline structure, WRT 
correctness.
+       // It could however affect performance, so it exists to tell the runner 
that this
+       // point in the pipeline needs a fusion break, to enable the pipeline 
to change it's
+       // degree of parallelism.
+       //
+       // The change of parallelism goes both ways. It could allow for larger 
batch sizes
+       // enable smaller batch sizes downstream if it is infact paralleizable.
+       //
+       // But for a single transform node per stage runner, we can elide it 
entirely,
+       // since the input collection and output collection types match.
+
+       // Get the input and output PCollections, there should only be 1 each.
+       if len(t.GetOutputs()) != 1 {
+               panic("Expected single putput PCollection in reshuffle: " + 
prototext.Format(t))
+       }
+       if len(t.GetOutputs()) != 1 {
+               panic("Expected single putput PCollection in reshuffle: " + 
prototext.Format(t))
+       }

Review Comment:
   Looks like you missed the input check here and didn't update the panic 
string for each



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to