[ 
https://issues.apache.org/jira/browse/BEAM-11188?focusedWorklogId=513683&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-513683
 ]

ASF GitHub Bot logged work on BEAM-11188:
-----------------------------------------

                Author: ASF GitHub Bot
            Created on: 18/Nov/20 18:17
            Start Date: 18/Nov/20 18:17
    Worklog Time Spent: 10m 
      Work Description: lostluck commented on a change in pull request #13370:
URL: https://github.com/apache/beam/pull/13370#discussion_r526299363



##########
File path: sdks/go/pkg/beam/core/runtime/graphx/xlang_test.go
##########
@@ -203,3 +203,214 @@ func TestExpandedComponents(t *testing.T) {
 
        })
 }
+
+var testExternal = graph.ExternalTransform{
+       Urn:           "test_urn",
+       Payload:       nil,
+       ExpansionAddr: "test_addr",
+       Expanded:      &testExpanded,
+}
+
+var testComponents = pipepb.Components{
+       Transforms:          map[string]*pipepb.PTransform{"transform_id1": 
{UniqueName: "test_components_transform"}},
+       Pcollections:        map[string]*pipepb.PCollection{"pcollection_id1": 
{UniqueName: "test_components_pcollection"}},
+       WindowingStrategies: 
map[string]*pipepb.WindowingStrategy{"windowing_id1": {WindowCoderId: 
"test_components_windowing"}},
+       Coders:              map[string]*pipepb.Coder{"coder_id1": {Spec: 
&pipepb.FunctionSpec{Urn: "test_components_coder"}}},
+       Environments:        map[string]*pipepb.Environment{"environment_id1": 
{Urn: "test_components_environment"}},
+}
+
+var testRequirements = []string{"test_requirement1", "test_requirement2"}
+
+var testTransform = pipepb.PTransform{
+       UniqueName: "test_transform",
+}
+
+var testExpanded = graph.ExpandedTransform{
+       Components:   &testComponents,
+       Transform:    &testTransform,
+       Requirements: testRequirements,
+}
+
+var testExternal2 = graph.ExternalTransform{
+       Urn:           "test_urn2",
+       Payload:       nil,
+       ExpansionAddr: "test_addr2",
+       Expanded:      &testExpanded2,
+}
+
+var testComponents2 = pipepb.Components{
+       Transforms:          map[string]*pipepb.PTransform{"transform_id2": 
{UniqueName: "test_components2_transform"}},
+       Pcollections:        map[string]*pipepb.PCollection{"pcollection_id2": 
{UniqueName: "test_components2_pcollection"}},
+       WindowingStrategies: 
map[string]*pipepb.WindowingStrategy{"windowing_id2": {WindowCoderId: 
"test_components2_windowing"}},
+       Coders:              map[string]*pipepb.Coder{"coder_id2": {Spec: 
&pipepb.FunctionSpec{Urn: "test_components2_coder"}}},
+       Environments:        map[string]*pipepb.Environment{"environment_id2": 
{Urn: "test_components2_environment"}},
+}
+
+var testRequirements2 = []string{"test_requirement2", "test_requirement3"}
+
+var testTransform2 = pipepb.PTransform{
+       UniqueName: "test_transform2",
+}
+
+var testExpanded2 = graph.ExpandedTransform{
+       Components:   &testComponents2,
+       Transform:    &testTransform2,
+       Requirements: testRequirements2,
+}
+
+// TestMergeExpandedWithPipeline tests that mergeExpandedWithPipeline properly
+// adds data from external transforms to a pipeline.
+func TestMergeExpandedWithPipeline(t *testing.T) {
+       // Create slices of MultiEdges containing external edges, and make sure 
all
+       // relevant data from the external edges is properly added to an empty
+       // pipeline.
+       tests := []struct {
+               name          string
+               makeEdges     func(g *graph.Graph) []*graph.MultiEdge
+               wantExpandeds []graph.ExpandedTransform
+       }{
+               {
+                       name: "SingleTransform",
+                       makeEdges: func(g *graph.Graph) []*graph.MultiEdge {
+                               s := g.Root()
+                               edge := g.NewEdge(s)
+                               edge.Op = graph.External
+                               edge.External = &testExternal
+
+                               return []*graph.MultiEdge{edge}
+                       },
+                       wantExpandeds: []graph.ExpandedTransform{testExpanded},
+               },
+               {
+                       name: "MultiTransforms",
+                       makeEdges: func(g *graph.Graph) []*graph.MultiEdge {
+                               s := g.Root()
+                               edge := g.NewEdge(s)
+                               edge.Op = graph.External
+                               edge.External = &testExternal
+                               edge2 := g.NewEdge(s)
+                               edge2.Op = graph.External
+                               edge2.External = &testExternal2
+
+                               return []*graph.MultiEdge{edge, edge2}
+                       },
+                       wantExpandeds: []graph.ExpandedTransform{testExpanded, 
testExpanded2},
+               },
+               {
+                       name: "NonExternalTransforms",
+                       makeEdges: func(g *graph.Graph) []*graph.MultiEdge {
+                               s := g.Root()
+                               edge := g.NewEdge(s)
+                               edge.Op = graph.External
+                               edge.External = &testExternal
+                               edge2 := g.NewEdge(s)
+                               edge2.Op = graph.External
+                               edge2.External = &testExternal2
+                               impulse := graph.NewImpulse(g, s, []byte{1})
+                               impulse2 := graph.NewImpulse(g, s, []byte{2})
+
+                               return []*graph.MultiEdge{edge, edge2, impulse, 
impulse2}
+                       },
+                       wantExpandeds: []graph.ExpandedTransform{testExpanded, 
testExpanded2},
+               },
+       }
+       for _, test := range tests {
+               test := test
+               t.Run(test.name, func(t *testing.T) {
+                       g := graph.New()
+                       edges := test.makeEdges(g)
+                       var p pipepb.Pipeline
+                       p.Components = &pipepb.Components{
+                               Transforms:          
make(map[string]*pipepb.PTransform),
+                               Pcollections:        
make(map[string]*pipepb.PCollection),
+                               WindowingStrategies: 
make(map[string]*pipepb.WindowingStrategy),
+                               Coders:              
make(map[string]*pipepb.Coder),
+                               Environments:        
make(map[string]*pipepb.Environment),
+                       }
+                       mergeExpandedWithPipeline(edges, &p)
+
+                       // Check that all wanted expanded components have been 
added to
+                       // pipeline components.
+                       for _, exp := range test.wantExpandeds {
+                               wantComps := exp.Components.(*pipepb.Components)
+                               gotComps := p.GetComponents()
+                               validateComponents(t, wantComps, gotComps)
+
+                               // Check that expanded transform is present. 
Need to search.
+                               wantTransform := 
exp.Transform.(*pipepb.PTransform)
+                               var found bool
+                               for _, gotTransform := range 
gotComps.GetTransforms() {
+                                       if d := cmp.Diff(wantTransform, 
gotTransform, protocmp.Transform()); d == "" {

Review comment:
       In this case, you probably just want cmp.Equal instead of cmp.Diff, 
since we never use the diff result.
   
   https://pkg.go.dev/github.com/google/go-cmp/cmp#Equal

##########
File path: sdks/go/pkg/beam/core/runtime/graphx/xlang_test.go
##########
@@ -203,3 +203,214 @@ func TestExpandedComponents(t *testing.T) {
 
        })
 }
+
+var testExternal = graph.ExternalTransform{
+       Urn:           "test_urn",
+       Payload:       nil,
+       ExpansionAddr: "test_addr",
+       Expanded:      &testExpanded,
+}
+
+var testComponents = pipepb.Components{
+       Transforms:          map[string]*pipepb.PTransform{"transform_id1": 
{UniqueName: "test_components_transform"}},
+       Pcollections:        map[string]*pipepb.PCollection{"pcollection_id1": 
{UniqueName: "test_components_pcollection"}},
+       WindowingStrategies: 
map[string]*pipepb.WindowingStrategy{"windowing_id1": {WindowCoderId: 
"test_components_windowing"}},
+       Coders:              map[string]*pipepb.Coder{"coder_id1": {Spec: 
&pipepb.FunctionSpec{Urn: "test_components_coder"}}},
+       Environments:        map[string]*pipepb.Environment{"environment_id1": 
{Urn: "test_components_environment"}},
+}
+
+var testRequirements = []string{"test_requirement1", "test_requirement2"}
+
+var testTransform = pipepb.PTransform{
+       UniqueName: "test_transform",
+}
+
+var testExpanded = graph.ExpandedTransform{
+       Components:   &testComponents,
+       Transform:    &testTransform,
+       Requirements: testRequirements,
+}
+
+var testExternal2 = graph.ExternalTransform{
+       Urn:           "test_urn2",
+       Payload:       nil,
+       ExpansionAddr: "test_addr2",
+       Expanded:      &testExpanded2,
+}
+
+var testComponents2 = pipepb.Components{
+       Transforms:          map[string]*pipepb.PTransform{"transform_id2": 
{UniqueName: "test_components2_transform"}},
+       Pcollections:        map[string]*pipepb.PCollection{"pcollection_id2": 
{UniqueName: "test_components2_pcollection"}},
+       WindowingStrategies: 
map[string]*pipepb.WindowingStrategy{"windowing_id2": {WindowCoderId: 
"test_components2_windowing"}},
+       Coders:              map[string]*pipepb.Coder{"coder_id2": {Spec: 
&pipepb.FunctionSpec{Urn: "test_components2_coder"}}},
+       Environments:        map[string]*pipepb.Environment{"environment_id2": 
{Urn: "test_components2_environment"}},
+}
+
+var testRequirements2 = []string{"test_requirement2", "test_requirement3"}
+
+var testTransform2 = pipepb.PTransform{
+       UniqueName: "test_transform2",
+}
+
+var testExpanded2 = graph.ExpandedTransform{
+       Components:   &testComponents2,
+       Transform:    &testTransform2,
+       Requirements: testRequirements2,
+}
+
+// TestMergeExpandedWithPipeline tests that mergeExpandedWithPipeline properly
+// adds data from external transforms to a pipeline.
+func TestMergeExpandedWithPipeline(t *testing.T) {
+       // Create slices of MultiEdges containing external edges, and make sure 
all
+       // relevant data from the external edges is properly added to an empty
+       // pipeline.
+       tests := []struct {
+               name          string
+               makeEdges     func(g *graph.Graph) []*graph.MultiEdge
+               wantExpandeds []graph.ExpandedTransform
+       }{
+               {
+                       name: "SingleTransform",
+                       makeEdges: func(g *graph.Graph) []*graph.MultiEdge {
+                               s := g.Root()
+                               edge := g.NewEdge(s)
+                               edge.Op = graph.External
+                               edge.External = &testExternal
+
+                               return []*graph.MultiEdge{edge}
+                       },
+                       wantExpandeds: []graph.ExpandedTransform{testExpanded},
+               },
+               {
+                       name: "MultiTransforms",
+                       makeEdges: func(g *graph.Graph) []*graph.MultiEdge {
+                               s := g.Root()
+                               edge := g.NewEdge(s)
+                               edge.Op = graph.External
+                               edge.External = &testExternal
+                               edge2 := g.NewEdge(s)
+                               edge2.Op = graph.External
+                               edge2.External = &testExternal2
+
+                               return []*graph.MultiEdge{edge, edge2}
+                       },
+                       wantExpandeds: []graph.ExpandedTransform{testExpanded, 
testExpanded2},
+               },
+               {
+                       name: "NonExternalTransforms",
+                       makeEdges: func(g *graph.Graph) []*graph.MultiEdge {
+                               s := g.Root()
+                               edge := g.NewEdge(s)
+                               edge.Op = graph.External
+                               edge.External = &testExternal
+                               edge2 := g.NewEdge(s)
+                               edge2.Op = graph.External
+                               edge2.External = &testExternal2
+                               impulse := graph.NewImpulse(g, s, []byte{1})
+                               impulse2 := graph.NewImpulse(g, s, []byte{2})
+
+                               return []*graph.MultiEdge{edge, edge2, impulse, 
impulse2}
+                       },
+                       wantExpandeds: []graph.ExpandedTransform{testExpanded, 
testExpanded2},
+               },
+       }
+       for _, test := range tests {
+               test := test
+               t.Run(test.name, func(t *testing.T) {
+                       g := graph.New()
+                       edges := test.makeEdges(g)
+                       var p pipepb.Pipeline
+                       p.Components = &pipepb.Components{
+                               Transforms:          
make(map[string]*pipepb.PTransform),
+                               Pcollections:        
make(map[string]*pipepb.PCollection),
+                               WindowingStrategies: 
make(map[string]*pipepb.WindowingStrategy),
+                               Coders:              
make(map[string]*pipepb.Coder),
+                               Environments:        
make(map[string]*pipepb.Environment),
+                       }
+                       mergeExpandedWithPipeline(edges, &p)
+
+                       // Check that all wanted expanded components have been 
added to
+                       // pipeline components.
+                       for _, exp := range test.wantExpandeds {
+                               wantComps := exp.Components.(*pipepb.Components)
+                               gotComps := p.GetComponents()
+                               validateComponents(t, wantComps, gotComps)
+
+                               // Check that expanded transform is present. 
Need to search.
+                               wantTransform := 
exp.Transform.(*pipepb.PTransform)
+                               var found bool
+                               for _, gotTransform := range 
gotComps.GetTransforms() {
+                                       if d := cmp.Diff(wantTransform, 
gotTransform, protocmp.Transform()); d == "" {
+                                               found = true
+                                               break
+                                       }
+                               }
+                               if !found {
+                                       t.Errorf("Pipeline components missing 
expected expanded transform: %v", wantTransform)
+                               }
+
+                               // Check that requirements are present.
+                               for _, wantReq := range exp.Requirements {
+                                       var found bool
+                                       for _, gotReq := range 
p.GetRequirements() {
+                                               if wantReq == gotReq {
+                                                       found = true
+                                                       break
+                                               }
+                                       }
+                                       if !found {
+                                               t.Errorf("Pipeline missing 
expected requirement: %v", wantReq)
+                                       }
+                               }
+                       }
+               })
+       }
+}
+
+func validateComponents(t *testing.T, wantComps, gotComps *pipepb.Components) {

Review comment:
       I'm confused why this is broken out rather than simply using a single 
cmp.Diff(wantComps, gotComps, protocmp.Transform()) ?
   
   I'm assuming that you wanted additional granularity on the parts that were 
missing/extra for each component, and the full diff was less clear than this 
output?
   
   Consider adding a comment documenting that reasoning.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 513683)
    Time Spent: 3h 50m  (was: 3h 40m)

> Go Cross-Language UX polish and refactoring
> -------------------------------------------
>
>                 Key: BEAM-11188
>                 URL: https://issues.apache.org/jira/browse/BEAM-11188
>             Project: Beam
>          Issue Type: Improvement
>          Components: cross-language, sdk-go
>            Reporter: Daniel Oliveira
>            Assignee: Daniel Oliveira
>            Priority: P2
>          Time Spent: 3h 50m
>  Remaining Estimate: 0h
>
> This is a bug for progress on various small usability and UX improvements to 
> the Go SDK implementation of Cross-Language. I don't feel each one 
> individually is important enough for a Jira, but together it's worth 
> recording progress.
> Tasks included:
> * Adjust user-facing XLang functions so that Sink and Source versions are 
> actually sinks and sources (no outputs and no inputs respectively).
> * Rename SourceInputTag and SinkOutputTag since they are no longer used with 
> source/sink versions of the methods.
> * Adjust beam/xlang.go so that it doesn't need to import job_management 
> protos. Move the proto creation down into the method the proto is passed to 
> (which is xlangx.Expand).
> * Refactor the functions in xlangx/translate.go and how they are used, since 
> right now the functions just get called one after another in sequence.
> * Move as many xlang calls out of universal.go as possible. They should be 
> handled as part of the normal sequence of the SDK, such as in proto 
> marshalling and unmarshalling.
> * Add wrappers around xlang calls in existing examples, to both give a 
> cleaner interface and provide an example of how xlang transforms should be 
> implemented.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to