mohamedawnallah commented on code in PR #37673:
URL: https://github.com/apache/beam/pull/37673#discussion_r2852101471
##########
sdks/go/pkg/beam/runners/dot/dot.go:
##########
@@ -42,14 +43,65 @@ func Execute(ctx context.Context, p *beam.Pipeline)
(beam.PipelineResult, error)
return nil, errors.New("must supply dot_file argument")
}
- edges, nodes, err := p.Build()
+ edges, _, err := p.Build()
if err != nil {
return nil, errors.New("can't get data to render")
}
- var buf bytes.Buffer
- if err := dotlib.Render(edges, nodes, &buf); err != nil {
+ pipeline, err := graphx.Marshal(edges, &graphx.Options{})
+ if err != nil {
return nil, err
}
+
+ var buf bytes.Buffer
+ buf.WriteString("digraph G {\n")
+
+ components := pipeline.GetComponents()
+ if components == nil {
+ return nil, errors.New("pipeline has no components")
+ }
+
+ transforms := components.GetTransforms()
+
+ // Build reverse input index: PCollectionID -> []TransformID
+ consumers := make(map[string][]string)
+ for tid, t := range transforms {
+ // Skip composite transforms
+ if len(t.GetSubtransforms()) != 0 {
+ continue
+ }
+
+ for _, pcollID := range t.GetInputs() {
+ consumers[pcollID] = append(consumers[pcollID], tid)
+ }
+ }
+
+ // Generate edges
+ for _, t := range transforms {
Review Comment:
Same here, before iterating on the after-sorted transforms, the consumers'
map needs to be sorted like:
```go
// Sort each consumer list for deterministic edge order.
for pcollID := range consumers {
sort.Strings(consumers[pcollID])
}
```
##########
sdks/go/pkg/beam/runners/dot/dot.go:
##########
@@ -42,14 +43,65 @@ func Execute(ctx context.Context, p *beam.Pipeline)
(beam.PipelineResult, error)
return nil, errors.New("must supply dot_file argument")
}
- edges, nodes, err := p.Build()
+ edges, _, err := p.Build()
if err != nil {
return nil, errors.New("can't get data to render")
}
- var buf bytes.Buffer
- if err := dotlib.Render(edges, nodes, &buf); err != nil {
+ pipeline, err := graphx.Marshal(edges, &graphx.Options{})
+ if err != nil {
return nil, err
}
+
+ var buf bytes.Buffer
+ buf.WriteString("digraph G {\n")
+
+ components := pipeline.GetComponents()
+ if components == nil {
+ return nil, errors.New("pipeline has no components")
Review Comment:
A private sentimental error can be a good option here something along those
lines at the top of the file:
```go
var errNoComponents = errors.New("pipeline has no components")
```
That can be helpful as well for testing when asserting on that error in the
tests
##########
sdks/go/pkg/beam/runners/dot/dot.go:
##########
@@ -21,10 +21,11 @@ import (
"bytes"
"context"
"flag"
+ "fmt"
"os"
"github.com/apache/beam/sdks/v2/go/pkg/beam"
- dotlib "github.com/apache/beam/sdks/v2/go/pkg/beam/core/util/dot"
Review Comment:
After this is removed from here, what other places in the codebase is this
go pkg used?
##########
sdks/go/pkg/beam/runners/dot/dot.go:
##########
@@ -42,14 +43,65 @@ func Execute(ctx context.Context, p *beam.Pipeline)
(beam.PipelineResult, error)
return nil, errors.New("must supply dot_file argument")
}
- edges, nodes, err := p.Build()
+ edges, _, err := p.Build()
if err != nil {
return nil, errors.New("can't get data to render")
}
- var buf bytes.Buffer
- if err := dotlib.Render(edges, nodes, &buf); err != nil {
+ pipeline, err := graphx.Marshal(edges, &graphx.Options{})
+ if err != nil {
return nil, err
}
+
+ var buf bytes.Buffer
+ buf.WriteString("digraph G {\n")
+
+ components := pipeline.GetComponents()
+ if components == nil {
+ return nil, errors.New("pipeline has no components")
+ }
+
+ transforms := components.GetTransforms()
+
+ // Build reverse input index: PCollectionID -> []TransformID
+ consumers := make(map[string][]string)
+ for tid, t := range transforms {
+ // Skip composite transforms
+ if len(t.GetSubtransforms()) != 0 {
+ continue
+ }
+
+ for _, pcollID := range t.GetInputs() {
+ consumers[pcollID] = append(consumers[pcollID], tid)
+ }
+ }
+
+ // Generate edges
+ for _, t := range transforms {
+ // Skip composite transforms
+ if len(t.GetSubtransforms()) != 0 {
+ continue
+ }
+
+ from := t.GetUniqueName()
+
+ for _, pcollID := range t.GetOutputs() {
Review Comment:
Same here
##########
sdks/go/pkg/beam/runners/dot/dot.go:
##########
@@ -42,14 +43,65 @@ func Execute(ctx context.Context, p *beam.Pipeline)
(beam.PipelineResult, error)
return nil, errors.New("must supply dot_file argument")
}
- edges, nodes, err := p.Build()
+ edges, _, err := p.Build()
if err != nil {
return nil, errors.New("can't get data to render")
}
- var buf bytes.Buffer
- if err := dotlib.Render(edges, nodes, &buf); err != nil {
+ pipeline, err := graphx.Marshal(edges, &graphx.Options{})
+ if err != nil {
return nil, err
}
+
+ var buf bytes.Buffer
+ buf.WriteString("digraph G {\n")
+
+ components := pipeline.GetComponents()
+ if components == nil {
+ return nil, errors.New("pipeline has no components")
+ }
+
+ transforms := components.GetTransforms()
+
+ // Build reverse input index: PCollectionID -> []TransformID
+ consumers := make(map[string][]string)
+ for tid, t := range transforms {
Review Comment:
`transforms` is a type of map, and a map iteration in Go is not guaranteed
(https://pkg.go.dev/maps#Keys). That means we would get non-deterministic
results for each run and can make the tests flaky.
For example here we can sort transform ids for deterministic output,
something along those lines:
```go
// Sort transform IDs for deterministic output.
tids := make([]string, 0, len(transforms))
for tid := range transforms {
tids = append(tids, tid)
}
sort.Strings(tids)
```
##########
sdks/go/pkg/beam/runners/dot/dot.go:
##########
@@ -42,14 +43,65 @@ func Execute(ctx context.Context, p *beam.Pipeline)
(beam.PipelineResult, error)
return nil, errors.New("must supply dot_file argument")
}
- edges, nodes, err := p.Build()
+ edges, _, err := p.Build()
if err != nil {
return nil, errors.New("can't get data to render")
}
- var buf bytes.Buffer
- if err := dotlib.Render(edges, nodes, &buf); err != nil {
+ pipeline, err := graphx.Marshal(edges, &graphx.Options{})
+ if err != nil {
return nil, err
}
+
+ var buf bytes.Buffer
+ buf.WriteString("digraph G {\n")
+
+ components := pipeline.GetComponents()
+ if components == nil {
+ return nil, errors.New("pipeline has no components")
+ }
+
+ transforms := components.GetTransforms()
+
+ // Build reverse input index: PCollectionID -> []TransformID
+ consumers := make(map[string][]string)
+ for tid, t := range transforms {
+ // Skip composite transforms
+ if len(t.GetSubtransforms()) != 0 {
+ continue
+ }
+
+ for _, pcollID := range t.GetInputs() {
+ consumers[pcollID] = append(consumers[pcollID], tid)
+ }
+ }
+
+ // Generate edges
+ for _, t := range transforms {
+ // Skip composite transforms
+ if len(t.GetSubtransforms()) != 0 {
+ continue
+ }
+
+ from := t.GetUniqueName()
+
+ for _, pcollID := range t.GetOutputs() {
+ for _, consumerID := range consumers[pcollID] {
+
+ consumer := transforms[consumerID]
+
+ // Skip composite consumers
+ if len(consumer.GetSubtransforms()) != 0 {
Review Comment:
This check seems unreachable? The first loop already excluded composites
when building the `consumers` map
##########
sdks/go/pkg/beam/runners/dot/dot.go:
##########
Review Comment:
There are missing functional tests for that file `dot.go`, we can have
`dot_test.go` here in the same directory.
Overall, those perhaps need to be tested to ensure we don't have any
regressions:
- **Linear pipeline produces correct edges:** Read → Transform → Write
generates the right `"A" -> "B"` connections with valid DOT syntax
- **Composite transforms are skipped:** Only leaf transforms appear in the
output; composites (transforms with subtransforms) are excluded
- **Fan-out / fan-in connectivity:** One transform feeding multiple
consumers, or multiple producers feeding one transform, produces all expected
edge
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]