lostluck commented on a change in pull request #13370:
URL: https://github.com/apache/beam/pull/13370#discussion_r526299363
##########
File path: sdks/go/pkg/beam/core/runtime/graphx/xlang_test.go
##########
@@ -203,3 +203,214 @@ func TestExpandedComponents(t *testing.T) {
})
}
+
+var testExternal = graph.ExternalTransform{
+ Urn: "test_urn",
+ Payload: nil,
+ ExpansionAddr: "test_addr",
+ Expanded: &testExpanded,
+}
+
+var testComponents = pipepb.Components{
+ Transforms: map[string]*pipepb.PTransform{"transform_id1":
{UniqueName: "test_components_transform"}},
+ Pcollections: map[string]*pipepb.PCollection{"pcollection_id1":
{UniqueName: "test_components_pcollection"}},
+ WindowingStrategies:
map[string]*pipepb.WindowingStrategy{"windowing_id1": {WindowCoderId:
"test_components_windowing"}},
+ Coders: map[string]*pipepb.Coder{"coder_id1": {Spec:
&pipepb.FunctionSpec{Urn: "test_components_coder"}}},
+ Environments: map[string]*pipepb.Environment{"environment_id1":
{Urn: "test_components_environment"}},
+}
+
+var testRequirements = []string{"test_requirement1", "test_requirement2"}
+
+var testTransform = pipepb.PTransform{
+ UniqueName: "test_transform",
+}
+
+var testExpanded = graph.ExpandedTransform{
+ Components: &testComponents,
+ Transform: &testTransform,
+ Requirements: testRequirements,
+}
+
+var testExternal2 = graph.ExternalTransform{
+ Urn: "test_urn2",
+ Payload: nil,
+ ExpansionAddr: "test_addr2",
+ Expanded: &testExpanded2,
+}
+
+var testComponents2 = pipepb.Components{
+ Transforms: map[string]*pipepb.PTransform{"transform_id2":
{UniqueName: "test_components2_transform"}},
+ Pcollections: map[string]*pipepb.PCollection{"pcollection_id2":
{UniqueName: "test_components2_pcollection"}},
+ WindowingStrategies:
map[string]*pipepb.WindowingStrategy{"windowing_id2": {WindowCoderId:
"test_components2_windowing"}},
+ Coders: map[string]*pipepb.Coder{"coder_id2": {Spec:
&pipepb.FunctionSpec{Urn: "test_components2_coder"}}},
+ Environments: map[string]*pipepb.Environment{"environment_id2":
{Urn: "test_components2_environment"}},
+}
+
+var testRequirements2 = []string{"test_requirement2", "test_requirement3"}
+
+var testTransform2 = pipepb.PTransform{
+ UniqueName: "test_transform2",
+}
+
+var testExpanded2 = graph.ExpandedTransform{
+ Components: &testComponents2,
+ Transform: &testTransform2,
+ Requirements: testRequirements2,
+}
+
+// TestMergeExpandedWithPipeline tests that mergeExpandedWithPipeline properly
+// adds data from external transforms to a pipeline.
+func TestMergeExpandedWithPipeline(t *testing.T) {
+ // Create slices of MultiEdges containing external edges, and make sure
all
+ // relevant data from the external edges is properly added to an empty
+ // pipeline.
+ tests := []struct {
+ name string
+ makeEdges func(g *graph.Graph) []*graph.MultiEdge
+ wantExpandeds []graph.ExpandedTransform
+ }{
+ {
+ name: "SingleTransform",
+ makeEdges: func(g *graph.Graph) []*graph.MultiEdge {
+ s := g.Root()
+ edge := g.NewEdge(s)
+ edge.Op = graph.External
+ edge.External = &testExternal
+
+ return []*graph.MultiEdge{edge}
+ },
+ wantExpandeds: []graph.ExpandedTransform{testExpanded},
+ },
+ {
+ name: "MultiTransforms",
+ makeEdges: func(g *graph.Graph) []*graph.MultiEdge {
+ s := g.Root()
+ edge := g.NewEdge(s)
+ edge.Op = graph.External
+ edge.External = &testExternal
+ edge2 := g.NewEdge(s)
+ edge2.Op = graph.External
+ edge2.External = &testExternal2
+
+ return []*graph.MultiEdge{edge, edge2}
+ },
+ wantExpandeds: []graph.ExpandedTransform{testExpanded,
testExpanded2},
+ },
+ {
+ name: "NonExternalTransforms",
+ makeEdges: func(g *graph.Graph) []*graph.MultiEdge {
+ s := g.Root()
+ edge := g.NewEdge(s)
+ edge.Op = graph.External
+ edge.External = &testExternal
+ edge2 := g.NewEdge(s)
+ edge2.Op = graph.External
+ edge2.External = &testExternal2
+ impulse := graph.NewImpulse(g, s, []byte{1})
+ impulse2 := graph.NewImpulse(g, s, []byte{2})
+
+ return []*graph.MultiEdge{edge, edge2, impulse,
impulse2}
+ },
+ wantExpandeds: []graph.ExpandedTransform{testExpanded,
testExpanded2},
+ },
+ }
+ for _, test := range tests {
+ test := test
+ t.Run(test.name, func(t *testing.T) {
+ g := graph.New()
+ edges := test.makeEdges(g)
+ var p pipepb.Pipeline
+ p.Components = &pipepb.Components{
+ Transforms:
make(map[string]*pipepb.PTransform),
+ Pcollections:
make(map[string]*pipepb.PCollection),
+ WindowingStrategies:
make(map[string]*pipepb.WindowingStrategy),
+ Coders:
make(map[string]*pipepb.Coder),
+ Environments:
make(map[string]*pipepb.Environment),
+ }
+ mergeExpandedWithPipeline(edges, &p)
+
+ // Check that all wanted expanded components have been
added to
+ // pipeline components.
+ for _, exp := range test.wantExpandeds {
+ wantComps := exp.Components.(*pipepb.Components)
+ gotComps := p.GetComponents()
+ validateComponents(t, wantComps, gotComps)
+
+ // Check that expanded transform is present.
Need to search.
+ wantTransform :=
exp.Transform.(*pipepb.PTransform)
+ var found bool
+ for _, gotTransform := range
gotComps.GetTransforms() {
+ if d := cmp.Diff(wantTransform,
gotTransform, protocmp.Transform()); d == "" {
Review comment:
In this case, you probably just want cmp.Equal instead of cmp.Diff,
since we never use the diff result.
https://pkg.go.dev/github.com/google/go-cmp/cmp#Equal
##########
File path: sdks/go/pkg/beam/core/runtime/graphx/xlang_test.go
##########
@@ -203,3 +203,214 @@ func TestExpandedComponents(t *testing.T) {
})
}
+
+var testExternal = graph.ExternalTransform{
+ Urn: "test_urn",
+ Payload: nil,
+ ExpansionAddr: "test_addr",
+ Expanded: &testExpanded,
+}
+
+var testComponents = pipepb.Components{
+ Transforms: map[string]*pipepb.PTransform{"transform_id1":
{UniqueName: "test_components_transform"}},
+ Pcollections: map[string]*pipepb.PCollection{"pcollection_id1":
{UniqueName: "test_components_pcollection"}},
+ WindowingStrategies:
map[string]*pipepb.WindowingStrategy{"windowing_id1": {WindowCoderId:
"test_components_windowing"}},
+ Coders: map[string]*pipepb.Coder{"coder_id1": {Spec:
&pipepb.FunctionSpec{Urn: "test_components_coder"}}},
+ Environments: map[string]*pipepb.Environment{"environment_id1":
{Urn: "test_components_environment"}},
+}
+
+var testRequirements = []string{"test_requirement1", "test_requirement2"}
+
+var testTransform = pipepb.PTransform{
+ UniqueName: "test_transform",
+}
+
+var testExpanded = graph.ExpandedTransform{
+ Components: &testComponents,
+ Transform: &testTransform,
+ Requirements: testRequirements,
+}
+
+var testExternal2 = graph.ExternalTransform{
+ Urn: "test_urn2",
+ Payload: nil,
+ ExpansionAddr: "test_addr2",
+ Expanded: &testExpanded2,
+}
+
+var testComponents2 = pipepb.Components{
+ Transforms: map[string]*pipepb.PTransform{"transform_id2":
{UniqueName: "test_components2_transform"}},
+ Pcollections: map[string]*pipepb.PCollection{"pcollection_id2":
{UniqueName: "test_components2_pcollection"}},
+ WindowingStrategies:
map[string]*pipepb.WindowingStrategy{"windowing_id2": {WindowCoderId:
"test_components2_windowing"}},
+ Coders: map[string]*pipepb.Coder{"coder_id2": {Spec:
&pipepb.FunctionSpec{Urn: "test_components2_coder"}}},
+ Environments: map[string]*pipepb.Environment{"environment_id2":
{Urn: "test_components2_environment"}},
+}
+
+var testRequirements2 = []string{"test_requirement2", "test_requirement3"}
+
+var testTransform2 = pipepb.PTransform{
+ UniqueName: "test_transform2",
+}
+
+var testExpanded2 = graph.ExpandedTransform{
+ Components: &testComponents2,
+ Transform: &testTransform2,
+ Requirements: testRequirements2,
+}
+
+// TestMergeExpandedWithPipeline tests that mergeExpandedWithPipeline properly
+// adds data from external transforms to a pipeline.
+func TestMergeExpandedWithPipeline(t *testing.T) {
+ // Create slices of MultiEdges containing external edges, and make sure
all
+ // relevant data from the external edges is properly added to an empty
+ // pipeline.
+ tests := []struct {
+ name string
+ makeEdges func(g *graph.Graph) []*graph.MultiEdge
+ wantExpandeds []graph.ExpandedTransform
+ }{
+ {
+ name: "SingleTransform",
+ makeEdges: func(g *graph.Graph) []*graph.MultiEdge {
+ s := g.Root()
+ edge := g.NewEdge(s)
+ edge.Op = graph.External
+ edge.External = &testExternal
+
+ return []*graph.MultiEdge{edge}
+ },
+ wantExpandeds: []graph.ExpandedTransform{testExpanded},
+ },
+ {
+ name: "MultiTransforms",
+ makeEdges: func(g *graph.Graph) []*graph.MultiEdge {
+ s := g.Root()
+ edge := g.NewEdge(s)
+ edge.Op = graph.External
+ edge.External = &testExternal
+ edge2 := g.NewEdge(s)
+ edge2.Op = graph.External
+ edge2.External = &testExternal2
+
+ return []*graph.MultiEdge{edge, edge2}
+ },
+ wantExpandeds: []graph.ExpandedTransform{testExpanded,
testExpanded2},
+ },
+ {
+ name: "NonExternalTransforms",
+ makeEdges: func(g *graph.Graph) []*graph.MultiEdge {
+ s := g.Root()
+ edge := g.NewEdge(s)
+ edge.Op = graph.External
+ edge.External = &testExternal
+ edge2 := g.NewEdge(s)
+ edge2.Op = graph.External
+ edge2.External = &testExternal2
+ impulse := graph.NewImpulse(g, s, []byte{1})
+ impulse2 := graph.NewImpulse(g, s, []byte{2})
+
+ return []*graph.MultiEdge{edge, edge2, impulse,
impulse2}
+ },
+ wantExpandeds: []graph.ExpandedTransform{testExpanded,
testExpanded2},
+ },
+ }
+ for _, test := range tests {
+ test := test
+ t.Run(test.name, func(t *testing.T) {
+ g := graph.New()
+ edges := test.makeEdges(g)
+ var p pipepb.Pipeline
+ p.Components = &pipepb.Components{
+ Transforms:
make(map[string]*pipepb.PTransform),
+ Pcollections:
make(map[string]*pipepb.PCollection),
+ WindowingStrategies:
make(map[string]*pipepb.WindowingStrategy),
+ Coders:
make(map[string]*pipepb.Coder),
+ Environments:
make(map[string]*pipepb.Environment),
+ }
+ mergeExpandedWithPipeline(edges, &p)
+
+ // Check that all wanted expanded components have been
added to
+ // pipeline components.
+ for _, exp := range test.wantExpandeds {
+ wantComps := exp.Components.(*pipepb.Components)
+ gotComps := p.GetComponents()
+ validateComponents(t, wantComps, gotComps)
+
+ // Check that expanded transform is present.
Need to search.
+ wantTransform :=
exp.Transform.(*pipepb.PTransform)
+ var found bool
+ for _, gotTransform := range
gotComps.GetTransforms() {
+ if d := cmp.Diff(wantTransform,
gotTransform, protocmp.Transform()); d == "" {
+ found = true
+ break
+ }
+ }
+ if !found {
+ t.Errorf("Pipeline components missing
expected expanded transform: %v", wantTransform)
+ }
+
+ // Check that requirements are present.
+ for _, wantReq := range exp.Requirements {
+ var found bool
+ for _, gotReq := range
p.GetRequirements() {
+ if wantReq == gotReq {
+ found = true
+ break
+ }
+ }
+ if !found {
+ t.Errorf("Pipeline missing
expected requirement: %v", wantReq)
+ }
+ }
+ }
+ })
+ }
+}
+
+func validateComponents(t *testing.T, wantComps, gotComps *pipepb.Components) {
Review comment:
I'm confused why this is broken out rather than simply using a single
cmp.Diff(wantComps, gotComps, protocmp.Transform()) ?
I'm assuming that you wanted additional granularity on the parts that were
missing/extra for each component, and the full diff was less clear than this
output?
Consider adding a comment documenting that reasoning.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]