lostluck commented on code in PR #37673:
URL: https://github.com/apache/beam/pull/37673#discussion_r2854001285


##########
sdks/go/pkg/beam/runners/dot/dot.go:
##########
@@ -21,10 +21,11 @@ import (
        "bytes"
        "context"
        "flag"
+       "fmt"
        "os"
 
        "github.com/apache/beam/sdks/v2/go/pkg/beam"
-       dotlib "github.com/apache/beam/sdks/v2/go/pkg/beam/core/util/dot"

Review Comment:
   +1 if nothing else is using it within beam please also mark the package as 
Deprecated and slated for removal in a future beam version. We don't have a 
clear timeline for this, but this at least warns away new adapters.
   
   See https://go.dev/wiki/Deprecated for an example.



##########
sdks/go/pkg/beam/runners/dot/dot.go:
##########
@@ -42,14 +43,65 @@ func Execute(ctx context.Context, p *beam.Pipeline) 
(beam.PipelineResult, error)
                return nil, errors.New("must supply dot_file argument")
        }
 
-       edges, nodes, err := p.Build()
+       edges, _, err := p.Build()
        if err != nil {
                return nil, errors.New("can't get data to render")
        }
 
-       var buf bytes.Buffer
-       if err := dotlib.Render(edges, nodes, &buf); err != nil {
+       pipeline, err := graphx.Marshal(edges, &graphx.Options{})
+       if err != nil {
                return nil, err
        }
+
+       var buf bytes.Buffer
+       buf.WriteString("digraph G {\n")
+
+       components := pipeline.GetComponents()
+       if components == nil {
+               return nil, errors.New("pipeline has no components")
+       }
+
+       transforms := components.GetTransforms()
+
+       // Build reverse input index: PCollectionID -> []TransformID
+       consumers := make(map[string][]string)
+       for tid, t := range transforms {
+               // Skip composite transforms
+               if len(t.GetSubtransforms()) != 0 {
+                       continue
+               }
+
+               for _, pcollID := range t.GetInputs() {
+                       consumers[pcollID] = append(consumers[pcollID], tid)
+               }
+       }
+
+       // Generate edges
+       for _, t := range transforms {

Review Comment:
   +1 thought it would be good to use a topological sort as well.
   
   
https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/core/runtime/pipelinex/util.go#L51
   
   Is the one used by prism and similar.



##########
sdks/go/pkg/beam/runners/dot/dot.go:
##########
@@ -42,14 +43,65 @@ func Execute(ctx context.Context, p *beam.Pipeline) 
(beam.PipelineResult, error)
                return nil, errors.New("must supply dot_file argument")
        }
 
-       edges, nodes, err := p.Build()
+       edges, _, err := p.Build()
        if err != nil {
                return nil, errors.New("can't get data to render")
        }
 
-       var buf bytes.Buffer
-       if err := dotlib.Render(edges, nodes, &buf); err != nil {
+       pipeline, err := graphx.Marshal(edges, &graphx.Options{})
+       if err != nil {
                return nil, err
        }
+
+       var buf bytes.Buffer
+       buf.WriteString("digraph G {\n")
+
+       components := pipeline.GetComponents()
+       if components == nil {
+               return nil, errors.New("pipeline has no components")

Review Comment:
   Typo: 'sentinel' error.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to