YousufFFFF commented on code in PR #37673:
URL: https://github.com/apache/beam/pull/37673#discussion_r2854894643
##########
sdks/go/pkg/beam/runners/dot/dot.go:
##########
@@ -42,14 +43,65 @@ func Execute(ctx context.Context, p *beam.Pipeline)
(beam.PipelineResult, error)
return nil, errors.New("must supply dot_file argument")
}
- edges, nodes, err := p.Build()
+ edges, _, err := p.Build()
if err != nil {
return nil, errors.New("can't get data to render")
}
- var buf bytes.Buffer
- if err := dotlib.Render(edges, nodes, &buf); err != nil {
+ pipeline, err := graphx.Marshal(edges, &graphx.Options{})
+ if err != nil {
return nil, err
}
+
+ var buf bytes.Buffer
+ buf.WriteString("digraph G {\n")
+
+ components := pipeline.GetComponents()
+ if components == nil {
+ return nil, errors.New("pipeline has no components")
+ }
+
+ transforms := components.GetTransforms()
+
+ // Build reverse input index: PCollectionID -> []TransformID
+ consumers := make(map[string][]string)
+ for tid, t := range transforms {
+ // Skip composite transforms
+ if len(t.GetSubtransforms()) != 0 {
+ continue
+ }
+
+ for _, pcollID := range t.GetInputs() {
+ consumers[pcollID] = append(consumers[pcollID], tid)
+ }
+ }
+
+ // Generate edges
+ for _, t := range transforms {
+ // Skip composite transforms
+ if len(t.GetSubtransforms()) != 0 {
+ continue
+ }
+
+ from := t.GetUniqueName()
+
+ for _, pcollID := range t.GetOutputs() {
+ for _, consumerID := range consumers[pcollID] {
+
+ consumer := transforms[consumerID]
+
+ // Skip composite consumers
+ if len(consumer.GetSubtransforms()) != 0 {
Review Comment:
Nice Catch!
Removed the block!
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]