youngoli commented on a change in pull request #13370:
URL: https://github.com/apache/beam/pull/13370#discussion_r526530455
##########
File path: sdks/go/pkg/beam/core/runtime/graphx/xlang_test.go
##########
@@ -203,3 +203,214 @@ func TestExpandedComponents(t *testing.T) {
})
}
+
+var testExternal = graph.ExternalTransform{
+ Urn: "test_urn",
+ Payload: nil,
+ ExpansionAddr: "test_addr",
+ Expanded: &testExpanded,
+}
+
+var testComponents = pipepb.Components{
+ Transforms: map[string]*pipepb.PTransform{"transform_id1":
{UniqueName: "test_components_transform"}},
+ Pcollections: map[string]*pipepb.PCollection{"pcollection_id1":
{UniqueName: "test_components_pcollection"}},
+ WindowingStrategies:
map[string]*pipepb.WindowingStrategy{"windowing_id1": {WindowCoderId:
"test_components_windowing"}},
+ Coders: map[string]*pipepb.Coder{"coder_id1": {Spec:
&pipepb.FunctionSpec{Urn: "test_components_coder"}}},
+ Environments: map[string]*pipepb.Environment{"environment_id1":
{Urn: "test_components_environment"}},
+}
+
+var testRequirements = []string{"test_requirement1", "test_requirement2"}
+
+var testTransform = pipepb.PTransform{
+ UniqueName: "test_transform",
+}
+
+var testExpanded = graph.ExpandedTransform{
+ Components: &testComponents,
+ Transform: &testTransform,
+ Requirements: testRequirements,
+}
+
+var testExternal2 = graph.ExternalTransform{
+ Urn: "test_urn2",
+ Payload: nil,
+ ExpansionAddr: "test_addr2",
+ Expanded: &testExpanded2,
+}
+
+var testComponents2 = pipepb.Components{
+ Transforms: map[string]*pipepb.PTransform{"transform_id2":
{UniqueName: "test_components2_transform"}},
+ Pcollections: map[string]*pipepb.PCollection{"pcollection_id2":
{UniqueName: "test_components2_pcollection"}},
+ WindowingStrategies:
map[string]*pipepb.WindowingStrategy{"windowing_id2": {WindowCoderId:
"test_components2_windowing"}},
+ Coders: map[string]*pipepb.Coder{"coder_id2": {Spec:
&pipepb.FunctionSpec{Urn: "test_components2_coder"}}},
+ Environments: map[string]*pipepb.Environment{"environment_id2":
{Urn: "test_components2_environment"}},
+}
+
+var testRequirements2 = []string{"test_requirement2", "test_requirement3"}
+
+var testTransform2 = pipepb.PTransform{
+ UniqueName: "test_transform2",
+}
+
+var testExpanded2 = graph.ExpandedTransform{
+ Components: &testComponents2,
+ Transform: &testTransform2,
+ Requirements: testRequirements2,
+}
+
+// TestMergeExpandedWithPipeline tests that mergeExpandedWithPipeline properly
+// adds data from external transforms to a pipeline.
+func TestMergeExpandedWithPipeline(t *testing.T) {
+ // Create slices of MultiEdges containing external edges, and make sure
all
+ // relevant data from the external edges is properly added to an empty
+ // pipeline.
+ tests := []struct {
+ name string
+ makeEdges func(g *graph.Graph) []*graph.MultiEdge
+ wantExpandeds []graph.ExpandedTransform
+ }{
+ {
+ name: "SingleTransform",
+ makeEdges: func(g *graph.Graph) []*graph.MultiEdge {
+ s := g.Root()
+ edge := g.NewEdge(s)
+ edge.Op = graph.External
+ edge.External = &testExternal
+
+ return []*graph.MultiEdge{edge}
+ },
+ wantExpandeds: []graph.ExpandedTransform{testExpanded},
+ },
+ {
+ name: "MultiTransforms",
+ makeEdges: func(g *graph.Graph) []*graph.MultiEdge {
+ s := g.Root()
+ edge := g.NewEdge(s)
+ edge.Op = graph.External
+ edge.External = &testExternal
+ edge2 := g.NewEdge(s)
+ edge2.Op = graph.External
+ edge2.External = &testExternal2
+
+ return []*graph.MultiEdge{edge, edge2}
+ },
+ wantExpandeds: []graph.ExpandedTransform{testExpanded,
testExpanded2},
+ },
+ {
+ name: "NonExternalTransforms",
+ makeEdges: func(g *graph.Graph) []*graph.MultiEdge {
+ s := g.Root()
+ edge := g.NewEdge(s)
+ edge.Op = graph.External
+ edge.External = &testExternal
+ edge2 := g.NewEdge(s)
+ edge2.Op = graph.External
+ edge2.External = &testExternal2
+ impulse := graph.NewImpulse(g, s, []byte{1})
+ impulse2 := graph.NewImpulse(g, s, []byte{2})
+
+ return []*graph.MultiEdge{edge, edge2, impulse,
impulse2}
+ },
+ wantExpandeds: []graph.ExpandedTransform{testExpanded,
testExpanded2},
+ },
+ }
+ for _, test := range tests {
+ test := test
+ t.Run(test.name, func(t *testing.T) {
+ g := graph.New()
+ edges := test.makeEdges(g)
+ var p pipepb.Pipeline
+ p.Components = &pipepb.Components{
+ Transforms:
make(map[string]*pipepb.PTransform),
+ Pcollections:
make(map[string]*pipepb.PCollection),
+ WindowingStrategies:
make(map[string]*pipepb.WindowingStrategy),
+ Coders:
make(map[string]*pipepb.Coder),
+ Environments:
make(map[string]*pipepb.Environment),
+ }
+ mergeExpandedWithPipeline(edges, &p)
+
+ // Check that all wanted expanded components have been
added to
+ // pipeline components.
+ for _, exp := range test.wantExpandeds {
+ wantComps := exp.Components.(*pipepb.Components)
+ gotComps := p.GetComponents()
+ validateComponents(t, wantComps, gotComps)
+
+ // Check that expanded transform is present.
Need to search.
+ wantTransform :=
exp.Transform.(*pipepb.PTransform)
+ var found bool
+ for _, gotTransform := range
gotComps.GetTransforms() {
+ if d := cmp.Diff(wantTransform,
gotTransform, protocmp.Transform()); d == "" {
Review comment:
Whoops, kinda embarassed I missed that. Done.
##########
File path: sdks/go/pkg/beam/core/runtime/graphx/xlang_test.go
##########
@@ -203,3 +203,214 @@ func TestExpandedComponents(t *testing.T) {
})
}
+
+var testExternal = graph.ExternalTransform{
+ Urn: "test_urn",
+ Payload: nil,
+ ExpansionAddr: "test_addr",
+ Expanded: &testExpanded,
+}
+
+var testComponents = pipepb.Components{
+ Transforms: map[string]*pipepb.PTransform{"transform_id1":
{UniqueName: "test_components_transform"}},
+ Pcollections: map[string]*pipepb.PCollection{"pcollection_id1":
{UniqueName: "test_components_pcollection"}},
+ WindowingStrategies:
map[string]*pipepb.WindowingStrategy{"windowing_id1": {WindowCoderId:
"test_components_windowing"}},
+ Coders: map[string]*pipepb.Coder{"coder_id1": {Spec:
&pipepb.FunctionSpec{Urn: "test_components_coder"}}},
+ Environments: map[string]*pipepb.Environment{"environment_id1":
{Urn: "test_components_environment"}},
+}
+
+var testRequirements = []string{"test_requirement1", "test_requirement2"}
+
+var testTransform = pipepb.PTransform{
+ UniqueName: "test_transform",
+}
+
+var testExpanded = graph.ExpandedTransform{
+ Components: &testComponents,
+ Transform: &testTransform,
+ Requirements: testRequirements,
+}
+
+var testExternal2 = graph.ExternalTransform{
+ Urn: "test_urn2",
+ Payload: nil,
+ ExpansionAddr: "test_addr2",
+ Expanded: &testExpanded2,
+}
+
+var testComponents2 = pipepb.Components{
+ Transforms: map[string]*pipepb.PTransform{"transform_id2":
{UniqueName: "test_components2_transform"}},
+ Pcollections: map[string]*pipepb.PCollection{"pcollection_id2":
{UniqueName: "test_components2_pcollection"}},
+ WindowingStrategies:
map[string]*pipepb.WindowingStrategy{"windowing_id2": {WindowCoderId:
"test_components2_windowing"}},
+ Coders: map[string]*pipepb.Coder{"coder_id2": {Spec:
&pipepb.FunctionSpec{Urn: "test_components2_coder"}}},
+ Environments: map[string]*pipepb.Environment{"environment_id2":
{Urn: "test_components2_environment"}},
+}
+
+var testRequirements2 = []string{"test_requirement2", "test_requirement3"}
+
+var testTransform2 = pipepb.PTransform{
+ UniqueName: "test_transform2",
+}
+
+var testExpanded2 = graph.ExpandedTransform{
+ Components: &testComponents2,
+ Transform: &testTransform2,
+ Requirements: testRequirements2,
+}
+
+// TestMergeExpandedWithPipeline tests that mergeExpandedWithPipeline properly
+// adds data from external transforms to a pipeline.
+func TestMergeExpandedWithPipeline(t *testing.T) {
+ // Create slices of MultiEdges containing external edges, and make sure
all
+ // relevant data from the external edges is properly added to an empty
+ // pipeline.
+ tests := []struct {
+ name string
+ makeEdges func(g *graph.Graph) []*graph.MultiEdge
+ wantExpandeds []graph.ExpandedTransform
+ }{
+ {
+ name: "SingleTransform",
+ makeEdges: func(g *graph.Graph) []*graph.MultiEdge {
+ s := g.Root()
+ edge := g.NewEdge(s)
+ edge.Op = graph.External
+ edge.External = &testExternal
+
+ return []*graph.MultiEdge{edge}
+ },
+ wantExpandeds: []graph.ExpandedTransform{testExpanded},
+ },
+ {
+ name: "MultiTransforms",
+ makeEdges: func(g *graph.Graph) []*graph.MultiEdge {
+ s := g.Root()
+ edge := g.NewEdge(s)
+ edge.Op = graph.External
+ edge.External = &testExternal
+ edge2 := g.NewEdge(s)
+ edge2.Op = graph.External
+ edge2.External = &testExternal2
+
+ return []*graph.MultiEdge{edge, edge2}
+ },
+ wantExpandeds: []graph.ExpandedTransform{testExpanded,
testExpanded2},
+ },
+ {
+ name: "NonExternalTransforms",
+ makeEdges: func(g *graph.Graph) []*graph.MultiEdge {
+ s := g.Root()
+ edge := g.NewEdge(s)
+ edge.Op = graph.External
+ edge.External = &testExternal
+ edge2 := g.NewEdge(s)
+ edge2.Op = graph.External
+ edge2.External = &testExternal2
+ impulse := graph.NewImpulse(g, s, []byte{1})
+ impulse2 := graph.NewImpulse(g, s, []byte{2})
+
+ return []*graph.MultiEdge{edge, edge2, impulse,
impulse2}
+ },
+ wantExpandeds: []graph.ExpandedTransform{testExpanded,
testExpanded2},
+ },
+ }
+ for _, test := range tests {
+ test := test
+ t.Run(test.name, func(t *testing.T) {
+ g := graph.New()
+ edges := test.makeEdges(g)
+ var p pipepb.Pipeline
+ p.Components = &pipepb.Components{
+ Transforms:
make(map[string]*pipepb.PTransform),
+ Pcollections:
make(map[string]*pipepb.PCollection),
+ WindowingStrategies:
make(map[string]*pipepb.WindowingStrategy),
+ Coders:
make(map[string]*pipepb.Coder),
+ Environments:
make(map[string]*pipepb.Environment),
+ }
+ mergeExpandedWithPipeline(edges, &p)
+
+ // Check that all wanted expanded components have been
added to
+ // pipeline components.
+ for _, exp := range test.wantExpandeds {
+ wantComps := exp.Components.(*pipepb.Components)
+ gotComps := p.GetComponents()
+ validateComponents(t, wantComps, gotComps)
+
+ // Check that expanded transform is present.
Need to search.
+ wantTransform :=
exp.Transform.(*pipepb.PTransform)
+ var found bool
+ for _, gotTransform := range
gotComps.GetTransforms() {
+ if d := cmp.Diff(wantTransform,
gotTransform, protocmp.Transform()); d == "" {
Review comment:
Whoops, kinda embarrassed I missed that. Done.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]