lostluck commented on a change in pull request #13370:
URL: https://github.com/apache/beam/pull/13370#discussion_r526299363



##########
File path: sdks/go/pkg/beam/core/runtime/graphx/xlang_test.go
##########
@@ -203,3 +203,214 @@ func TestExpandedComponents(t *testing.T) {
 
        })
 }
+
+var testExternal = graph.ExternalTransform{
+       Urn:           "test_urn",
+       Payload:       nil,
+       ExpansionAddr: "test_addr",
+       Expanded:      &testExpanded,
+}
+
+var testComponents = pipepb.Components{
+       Transforms:          map[string]*pipepb.PTransform{"transform_id1": 
{UniqueName: "test_components_transform"}},
+       Pcollections:        map[string]*pipepb.PCollection{"pcollection_id1": 
{UniqueName: "test_components_pcollection"}},
+       WindowingStrategies: 
map[string]*pipepb.WindowingStrategy{"windowing_id1": {WindowCoderId: 
"test_components_windowing"}},
+       Coders:              map[string]*pipepb.Coder{"coder_id1": {Spec: 
&pipepb.FunctionSpec{Urn: "test_components_coder"}}},
+       Environments:        map[string]*pipepb.Environment{"environment_id1": 
{Urn: "test_components_environment"}},
+}
+
+var testRequirements = []string{"test_requirement1", "test_requirement2"}
+
+var testTransform = pipepb.PTransform{
+       UniqueName: "test_transform",
+}
+
+var testExpanded = graph.ExpandedTransform{
+       Components:   &testComponents,
+       Transform:    &testTransform,
+       Requirements: testRequirements,
+}
+
+var testExternal2 = graph.ExternalTransform{
+       Urn:           "test_urn2",
+       Payload:       nil,
+       ExpansionAddr: "test_addr2",
+       Expanded:      &testExpanded2,
+}
+
+var testComponents2 = pipepb.Components{
+       Transforms:          map[string]*pipepb.PTransform{"transform_id2": 
{UniqueName: "test_components2_transform"}},
+       Pcollections:        map[string]*pipepb.PCollection{"pcollection_id2": 
{UniqueName: "test_components2_pcollection"}},
+       WindowingStrategies: 
map[string]*pipepb.WindowingStrategy{"windowing_id2": {WindowCoderId: 
"test_components2_windowing"}},
+       Coders:              map[string]*pipepb.Coder{"coder_id2": {Spec: 
&pipepb.FunctionSpec{Urn: "test_components2_coder"}}},
+       Environments:        map[string]*pipepb.Environment{"environment_id2": 
{Urn: "test_components2_environment"}},
+}
+
+var testRequirements2 = []string{"test_requirement2", "test_requirement3"}
+
+var testTransform2 = pipepb.PTransform{
+       UniqueName: "test_transform2",
+}
+
+var testExpanded2 = graph.ExpandedTransform{
+       Components:   &testComponents2,
+       Transform:    &testTransform2,
+       Requirements: testRequirements2,
+}
+
+// TestMergeExpandedWithPipeline tests that mergeExpandedWithPipeline properly
+// adds data from external transforms to a pipeline.
+func TestMergeExpandedWithPipeline(t *testing.T) {
+       // Create slices of MultiEdges containing external edges, and make sure 
all
+       // relevant data from the external edges is properly added to an empty
+       // pipeline.
+       tests := []struct {
+               name          string
+               makeEdges     func(g *graph.Graph) []*graph.MultiEdge
+               wantExpandeds []graph.ExpandedTransform
+       }{
+               {
+                       name: "SingleTransform",
+                       makeEdges: func(g *graph.Graph) []*graph.MultiEdge {
+                               s := g.Root()
+                               edge := g.NewEdge(s)
+                               edge.Op = graph.External
+                               edge.External = &testExternal
+
+                               return []*graph.MultiEdge{edge}
+                       },
+                       wantExpandeds: []graph.ExpandedTransform{testExpanded},
+               },
+               {
+                       name: "MultiTransforms",
+                       makeEdges: func(g *graph.Graph) []*graph.MultiEdge {
+                               s := g.Root()
+                               edge := g.NewEdge(s)
+                               edge.Op = graph.External
+                               edge.External = &testExternal
+                               edge2 := g.NewEdge(s)
+                               edge2.Op = graph.External
+                               edge2.External = &testExternal2
+
+                               return []*graph.MultiEdge{edge, edge2}
+                       },
+                       wantExpandeds: []graph.ExpandedTransform{testExpanded, 
testExpanded2},
+               },
+               {
+                       name: "NonExternalTransforms",
+                       makeEdges: func(g *graph.Graph) []*graph.MultiEdge {
+                               s := g.Root()
+                               edge := g.NewEdge(s)
+                               edge.Op = graph.External
+                               edge.External = &testExternal
+                               edge2 := g.NewEdge(s)
+                               edge2.Op = graph.External
+                               edge2.External = &testExternal2
+                               impulse := graph.NewImpulse(g, s, []byte{1})
+                               impulse2 := graph.NewImpulse(g, s, []byte{2})
+
+                               return []*graph.MultiEdge{edge, edge2, impulse, 
impulse2}
+                       },
+                       wantExpandeds: []graph.ExpandedTransform{testExpanded, 
testExpanded2},
+               },
+       }
+       for _, test := range tests {
+               test := test
+               t.Run(test.name, func(t *testing.T) {
+                       g := graph.New()
+                       edges := test.makeEdges(g)
+                       var p pipepb.Pipeline
+                       p.Components = &pipepb.Components{
+                               Transforms:          
make(map[string]*pipepb.PTransform),
+                               Pcollections:        
make(map[string]*pipepb.PCollection),
+                               WindowingStrategies: 
make(map[string]*pipepb.WindowingStrategy),
+                               Coders:              
make(map[string]*pipepb.Coder),
+                               Environments:        
make(map[string]*pipepb.Environment),
+                       }
+                       mergeExpandedWithPipeline(edges, &p)
+
+                       // Check that all wanted expanded components have been 
added to
+                       // pipeline components.
+                       for _, exp := range test.wantExpandeds {
+                               wantComps := exp.Components.(*pipepb.Components)
+                               gotComps := p.GetComponents()
+                               validateComponents(t, wantComps, gotComps)
+
+                               // Check that expanded transform is present. 
Need to search.
+                               wantTransform := 
exp.Transform.(*pipepb.PTransform)
+                               var found bool
+                               for _, gotTransform := range 
gotComps.GetTransforms() {
+                                       if d := cmp.Diff(wantTransform, 
gotTransform, protocmp.Transform()); d == "" {

Review comment:
       In this case, you probably just want cmp.Equal instead of cmp.Diff, 
since we never use the diff result.
   
   https://pkg.go.dev/github.com/google/go-cmp/cmp#Equal

##########
File path: sdks/go/pkg/beam/core/runtime/graphx/xlang_test.go
##########
@@ -203,3 +203,214 @@ func TestExpandedComponents(t *testing.T) {
 
        })
 }
+
+var testExternal = graph.ExternalTransform{
+       Urn:           "test_urn",
+       Payload:       nil,
+       ExpansionAddr: "test_addr",
+       Expanded:      &testExpanded,
+}
+
+var testComponents = pipepb.Components{
+       Transforms:          map[string]*pipepb.PTransform{"transform_id1": 
{UniqueName: "test_components_transform"}},
+       Pcollections:        map[string]*pipepb.PCollection{"pcollection_id1": 
{UniqueName: "test_components_pcollection"}},
+       WindowingStrategies: 
map[string]*pipepb.WindowingStrategy{"windowing_id1": {WindowCoderId: 
"test_components_windowing"}},
+       Coders:              map[string]*pipepb.Coder{"coder_id1": {Spec: 
&pipepb.FunctionSpec{Urn: "test_components_coder"}}},
+       Environments:        map[string]*pipepb.Environment{"environment_id1": 
{Urn: "test_components_environment"}},
+}
+
+var testRequirements = []string{"test_requirement1", "test_requirement2"}
+
+var testTransform = pipepb.PTransform{
+       UniqueName: "test_transform",
+}
+
+var testExpanded = graph.ExpandedTransform{
+       Components:   &testComponents,
+       Transform:    &testTransform,
+       Requirements: testRequirements,
+}
+
+var testExternal2 = graph.ExternalTransform{
+       Urn:           "test_urn2",
+       Payload:       nil,
+       ExpansionAddr: "test_addr2",
+       Expanded:      &testExpanded2,
+}
+
+var testComponents2 = pipepb.Components{
+       Transforms:          map[string]*pipepb.PTransform{"transform_id2": 
{UniqueName: "test_components2_transform"}},
+       Pcollections:        map[string]*pipepb.PCollection{"pcollection_id2": 
{UniqueName: "test_components2_pcollection"}},
+       WindowingStrategies: 
map[string]*pipepb.WindowingStrategy{"windowing_id2": {WindowCoderId: 
"test_components2_windowing"}},
+       Coders:              map[string]*pipepb.Coder{"coder_id2": {Spec: 
&pipepb.FunctionSpec{Urn: "test_components2_coder"}}},
+       Environments:        map[string]*pipepb.Environment{"environment_id2": 
{Urn: "test_components2_environment"}},
+}
+
+var testRequirements2 = []string{"test_requirement2", "test_requirement3"}
+
+var testTransform2 = pipepb.PTransform{
+       UniqueName: "test_transform2",
+}
+
+var testExpanded2 = graph.ExpandedTransform{
+       Components:   &testComponents2,
+       Transform:    &testTransform2,
+       Requirements: testRequirements2,
+}
+
+// TestMergeExpandedWithPipeline tests that mergeExpandedWithPipeline properly
+// adds data from external transforms to a pipeline.
+func TestMergeExpandedWithPipeline(t *testing.T) {
+       // Create slices of MultiEdges containing external edges, and make sure 
all
+       // relevant data from the external edges is properly added to an empty
+       // pipeline.
+       tests := []struct {
+               name          string
+               makeEdges     func(g *graph.Graph) []*graph.MultiEdge
+               wantExpandeds []graph.ExpandedTransform
+       }{
+               {
+                       name: "SingleTransform",
+                       makeEdges: func(g *graph.Graph) []*graph.MultiEdge {
+                               s := g.Root()
+                               edge := g.NewEdge(s)
+                               edge.Op = graph.External
+                               edge.External = &testExternal
+
+                               return []*graph.MultiEdge{edge}
+                       },
+                       wantExpandeds: []graph.ExpandedTransform{testExpanded},
+               },
+               {
+                       name: "MultiTransforms",
+                       makeEdges: func(g *graph.Graph) []*graph.MultiEdge {
+                               s := g.Root()
+                               edge := g.NewEdge(s)
+                               edge.Op = graph.External
+                               edge.External = &testExternal
+                               edge2 := g.NewEdge(s)
+                               edge2.Op = graph.External
+                               edge2.External = &testExternal2
+
+                               return []*graph.MultiEdge{edge, edge2}
+                       },
+                       wantExpandeds: []graph.ExpandedTransform{testExpanded, 
testExpanded2},
+               },
+               {
+                       name: "NonExternalTransforms",
+                       makeEdges: func(g *graph.Graph) []*graph.MultiEdge {
+                               s := g.Root()
+                               edge := g.NewEdge(s)
+                               edge.Op = graph.External
+                               edge.External = &testExternal
+                               edge2 := g.NewEdge(s)
+                               edge2.Op = graph.External
+                               edge2.External = &testExternal2
+                               impulse := graph.NewImpulse(g, s, []byte{1})
+                               impulse2 := graph.NewImpulse(g, s, []byte{2})
+
+                               return []*graph.MultiEdge{edge, edge2, impulse, 
impulse2}
+                       },
+                       wantExpandeds: []graph.ExpandedTransform{testExpanded, 
testExpanded2},
+               },
+       }
+       for _, test := range tests {
+               test := test
+               t.Run(test.name, func(t *testing.T) {
+                       g := graph.New()
+                       edges := test.makeEdges(g)
+                       var p pipepb.Pipeline
+                       p.Components = &pipepb.Components{
+                               Transforms:          
make(map[string]*pipepb.PTransform),
+                               Pcollections:        
make(map[string]*pipepb.PCollection),
+                               WindowingStrategies: 
make(map[string]*pipepb.WindowingStrategy),
+                               Coders:              
make(map[string]*pipepb.Coder),
+                               Environments:        
make(map[string]*pipepb.Environment),
+                       }
+                       mergeExpandedWithPipeline(edges, &p)
+
+                       // Check that all wanted expanded components have been 
added to
+                       // pipeline components.
+                       for _, exp := range test.wantExpandeds {
+                               wantComps := exp.Components.(*pipepb.Components)
+                               gotComps := p.GetComponents()
+                               validateComponents(t, wantComps, gotComps)
+
+                               // Check that expanded transform is present. 
Need to search.
+                               wantTransform := 
exp.Transform.(*pipepb.PTransform)
+                               var found bool
+                               for _, gotTransform := range 
gotComps.GetTransforms() {
+                                       if d := cmp.Diff(wantTransform, 
gotTransform, protocmp.Transform()); d == "" {
+                                               found = true
+                                               break
+                                       }
+                               }
+                               if !found {
+                                       t.Errorf("Pipeline components missing 
expected expanded transform: %v", wantTransform)
+                               }
+
+                               // Check that requirements are present.
+                               for _, wantReq := range exp.Requirements {
+                                       var found bool
+                                       for _, gotReq := range 
p.GetRequirements() {
+                                               if wantReq == gotReq {
+                                                       found = true
+                                                       break
+                                               }
+                                       }
+                                       if !found {
+                                               t.Errorf("Pipeline missing 
expected requirement: %v", wantReq)
+                                       }
+                               }
+                       }
+               })
+       }
+}
+
+func validateComponents(t *testing.T, wantComps, gotComps *pipepb.Components) {

Review comment:
       I'm confused why this is broken out rather than simply using a single 
cmp.Diff(wantComps, gotComps, protocmp.Transform()) ?
   
   I'm assuming that you wanted additional granularity on the parts that were 
missing/extra for each component, and the full diff was less clear than this 
output?
   
   Consider adding a comment documenting that reasoning.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to