[ 
https://issues.apache.org/jira/browse/BEAM-11188?focusedWorklogId=513895&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-513895
 ]

ASF GitHub Bot logged work on BEAM-11188:
-----------------------------------------

                Author: ASF GitHub Bot
            Created on: 19/Nov/20 01:27
            Start Date: 19/Nov/20 01:27
    Worklog Time Spent: 10m 
      Work Description: youngoli commented on a change in pull request #13370:
URL: https://github.com/apache/beam/pull/13370#discussion_r526531831



##########
File path: sdks/go/pkg/beam/core/runtime/graphx/xlang_test.go
##########
@@ -203,3 +203,214 @@ func TestExpandedComponents(t *testing.T) {
 
        })
 }
+
+var testExternal = graph.ExternalTransform{
+       Urn:           "test_urn",
+       Payload:       nil,
+       ExpansionAddr: "test_addr",
+       Expanded:      &testExpanded,
+}
+
+var testComponents = pipepb.Components{
+       Transforms:          map[string]*pipepb.PTransform{"transform_id1": 
{UniqueName: "test_components_transform"}},
+       Pcollections:        map[string]*pipepb.PCollection{"pcollection_id1": 
{UniqueName: "test_components_pcollection"}},
+       WindowingStrategies: 
map[string]*pipepb.WindowingStrategy{"windowing_id1": {WindowCoderId: 
"test_components_windowing"}},
+       Coders:              map[string]*pipepb.Coder{"coder_id1": {Spec: 
&pipepb.FunctionSpec{Urn: "test_components_coder"}}},
+       Environments:        map[string]*pipepb.Environment{"environment_id1": 
{Urn: "test_components_environment"}},
+}
+
+var testRequirements = []string{"test_requirement1", "test_requirement2"}
+
+var testTransform = pipepb.PTransform{
+       UniqueName: "test_transform",
+}
+
+var testExpanded = graph.ExpandedTransform{
+       Components:   &testComponents,
+       Transform:    &testTransform,
+       Requirements: testRequirements,
+}
+
+var testExternal2 = graph.ExternalTransform{
+       Urn:           "test_urn2",
+       Payload:       nil,
+       ExpansionAddr: "test_addr2",
+       Expanded:      &testExpanded2,
+}
+
+var testComponents2 = pipepb.Components{
+       Transforms:          map[string]*pipepb.PTransform{"transform_id2": 
{UniqueName: "test_components2_transform"}},
+       Pcollections:        map[string]*pipepb.PCollection{"pcollection_id2": 
{UniqueName: "test_components2_pcollection"}},
+       WindowingStrategies: 
map[string]*pipepb.WindowingStrategy{"windowing_id2": {WindowCoderId: 
"test_components2_windowing"}},
+       Coders:              map[string]*pipepb.Coder{"coder_id2": {Spec: 
&pipepb.FunctionSpec{Urn: "test_components2_coder"}}},
+       Environments:        map[string]*pipepb.Environment{"environment_id2": 
{Urn: "test_components2_environment"}},
+}
+
+var testRequirements2 = []string{"test_requirement2", "test_requirement3"}
+
+var testTransform2 = pipepb.PTransform{
+       UniqueName: "test_transform2",
+}
+
+var testExpanded2 = graph.ExpandedTransform{
+       Components:   &testComponents2,
+       Transform:    &testTransform2,
+       Requirements: testRequirements2,
+}
+
+// TestMergeExpandedWithPipeline tests that mergeExpandedWithPipeline properly
+// adds data from external transforms to a pipeline.
+func TestMergeExpandedWithPipeline(t *testing.T) {
+       // Create slices of MultiEdges containing external edges, and make sure 
all
+       // relevant data from the external edges is properly added to an empty
+       // pipeline.
+       tests := []struct {
+               name          string
+               makeEdges     func(g *graph.Graph) []*graph.MultiEdge
+               wantExpandeds []graph.ExpandedTransform
+       }{
+               {
+                       name: "SingleTransform",
+                       makeEdges: func(g *graph.Graph) []*graph.MultiEdge {
+                               s := g.Root()
+                               edge := g.NewEdge(s)
+                               edge.Op = graph.External
+                               edge.External = &testExternal
+
+                               return []*graph.MultiEdge{edge}
+                       },
+                       wantExpandeds: []graph.ExpandedTransform{testExpanded},
+               },
+               {
+                       name: "MultiTransforms",
+                       makeEdges: func(g *graph.Graph) []*graph.MultiEdge {
+                               s := g.Root()
+                               edge := g.NewEdge(s)
+                               edge.Op = graph.External
+                               edge.External = &testExternal
+                               edge2 := g.NewEdge(s)
+                               edge2.Op = graph.External
+                               edge2.External = &testExternal2
+
+                               return []*graph.MultiEdge{edge, edge2}
+                       },
+                       wantExpandeds: []graph.ExpandedTransform{testExpanded, 
testExpanded2},
+               },
+               {
+                       name: "NonExternalTransforms",
+                       makeEdges: func(g *graph.Graph) []*graph.MultiEdge {
+                               s := g.Root()
+                               edge := g.NewEdge(s)
+                               edge.Op = graph.External
+                               edge.External = &testExternal
+                               edge2 := g.NewEdge(s)
+                               edge2.Op = graph.External
+                               edge2.External = &testExternal2
+                               impulse := graph.NewImpulse(g, s, []byte{1})
+                               impulse2 := graph.NewImpulse(g, s, []byte{2})
+
+                               return []*graph.MultiEdge{edge, edge2, impulse, 
impulse2}
+                       },
+                       wantExpandeds: []graph.ExpandedTransform{testExpanded, 
testExpanded2},
+               },
+       }
+       for _, test := range tests {
+               test := test
+               t.Run(test.name, func(t *testing.T) {
+                       g := graph.New()
+                       edges := test.makeEdges(g)
+                       var p pipepb.Pipeline
+                       p.Components = &pipepb.Components{
+                               Transforms:          
make(map[string]*pipepb.PTransform),
+                               Pcollections:        
make(map[string]*pipepb.PCollection),
+                               WindowingStrategies: 
make(map[string]*pipepb.WindowingStrategy),
+                               Coders:              
make(map[string]*pipepb.Coder),
+                               Environments:        
make(map[string]*pipepb.Environment),
+                       }
+                       mergeExpandedWithPipeline(edges, &p)
+
+                       // Check that all wanted expanded components have been 
added to
+                       // pipeline components.
+                       for _, exp := range test.wantExpandeds {
+                               wantComps := exp.Components.(*pipepb.Components)
+                               gotComps := p.GetComponents()
+                               validateComponents(t, wantComps, gotComps)
+
+                               // Check that expanded transform is present. 
Need to search.
+                               wantTransform := 
exp.Transform.(*pipepb.PTransform)
+                               var found bool
+                               for _, gotTransform := range 
gotComps.GetTransforms() {
+                                       if d := cmp.Diff(wantTransform, 
gotTransform, protocmp.Transform()); d == "" {
+                                               found = true
+                                               break
+                                       }
+                               }
+                               if !found {
+                                       t.Errorf("Pipeline components missing 
expected expanded transform: %v", wantTransform)
+                               }
+
+                               // Check that requirements are present.
+                               for _, wantReq := range exp.Requirements {
+                                       var found bool
+                                       for _, gotReq := range 
p.GetRequirements() {
+                                               if wantReq == gotReq {
+                                                       found = true
+                                                       break
+                                               }
+                                       }
+                                       if !found {
+                                               t.Errorf("Pipeline missing 
expected requirement: %v", wantReq)
+                                       }
+                               }
+                       }
+               })
+       }
+}
+
+func validateComponents(t *testing.T, wantComps, gotComps *pipepb.Components) {

Review comment:
       Actually, the main reason was because the pipeline object can contain 
the components from multiple external transforms, so just diffing the 
components wouldn't work. What I needed to validate is that the components of 
the external transform are a subset of the pipeline components, not that 
they're exactly equal.
   
   I'll add a comment explaining that, because I agree it's not immediately 
obvious.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 513895)
    Time Spent: 4h 10m  (was: 4h)

> Go Cross-Language UX polish and refactoring
> -------------------------------------------
>
>                 Key: BEAM-11188
>                 URL: https://issues.apache.org/jira/browse/BEAM-11188
>             Project: Beam
>          Issue Type: Improvement
>          Components: cross-language, sdk-go
>            Reporter: Daniel Oliveira
>            Assignee: Daniel Oliveira
>            Priority: P2
>          Time Spent: 4h 10m
>  Remaining Estimate: 0h
>
> This is a bug for progress on various small usability and UX improvements to 
> the Go SDK implementation of Cross-Language. I don't feel each one 
> individually is important enough for a Jira, but together it's worth 
> recording progress.
> Tasks included:
> * Adjust user-facing XLang functions so that Sink and Source versions are 
> actually sinks and sources (no outputs and no inputs respectively).
> * Rename SourceInputTag and SinkOutputTag since they are no longer used with 
> source/sink versions of the methods.
> * Adjust beam/xlang.go so that it doesn't need to import job_management 
> protos. Move the proto creation down into the method the proto is passed to 
> (which is xlangx.Expand).
> * Refactor the functions in xlangx/translate.go and how they are used, since 
> right now the functions just get called one after another in sequence.
> * Move as many xlang calls out of universal.go as possible. They should be 
> handled as part of the normal sequence of the SDK, such as in proto 
> marshalling and unmarshalling.
> * Add wrappers around xlang calls in existing examples, to both give a 
> cleaner interface and provide an example of how xlang transforms should be 
> implemented.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to