lostluck commented on a change in pull request #17120:
URL: https://github.com/apache/beam/pull/17120#discussion_r830218179
##########
File path: sdks/go/pkg/beam/core/runtime/pipelinex/replace_test.go
##########
@@ -310,6 +311,28 @@ func TestComputeInputOutput(t *testing.T) {
}
}
+func BenchmarkComputeInputOutput(b *testing.B) {
+ in := make(map[string]*pipepb.PTransform)
+ for i := 0; i < 3000; i++ {
Review comment:
Can we add a comment about what this graph looks like? I think it's just
a linked list of nested primitives and composites, but I'd like a quick
confirmation.
##########
File path: sdks/go/pkg/beam/core/runtime/pipelinex/replace.go
##########
@@ -118,16 +118,25 @@ func makeParentMap(xforms map[string]*pipepb.PTransform)
map[string]string {
func computeCompositeInputOutput(xforms map[string]*pipepb.PTransform)
map[string]*pipepb.PTransform {
ret := reflectx.ShallowClone(xforms).(map[string]*pipepb.PTransform)
Review comment:
```suggestion
// Precompute the transforms that consume each PCollection as input.
```
##########
File path: sdks/go/pkg/beam/core/runtime/pipelinex/replace.go
##########
@@ -201,19 +210,11 @@ func diffAndMerge(out, in, extIn map[string]bool)
map[string]string {
}
// externalIns checks the unseen non-composite graph
-func externalIns(counted map[string]bool, xforms
map[string]*pipepb.PTransform, extIn, out map[string]bool) {
- for id, pt := range xforms {
- // Ignore other composites or already counted transforms.
- if counted[id] || len(pt.GetSubtransforms()) != 0 {
- continue
- }
- // Check this PTransform's inputs for anything output by
something
- // the current composite.
- for col := range out {
- for _, incol := range pt.GetInputs() {
- if col == incol {
- extIn[col] = true
- }
+func externalIns(counted map[string]bool, primitiveXformsForInput
map[string][]string, extIn, out map[string]bool) {
+ for col := range out {
+ for _, id := range primitiveXformsForInput[col] {
+ if !counted[id] {
+ extIn[col] = true
Review comment:
```suggestion
func externalIns(counted map[string]bool, primitiveXformsForInput
map[string][]string, extIn, out map[string]bool) {
// For this composite's output PCollections.
for col := range out {
// See if any transforms are using it.
for _, id := range primitiveXformsForInput[col] {
if !counted[id] {
// And if they're part of the composite
// Ensure the collections is in the set of
outputs for the composite.
extIn[col] = true
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]