lostluck commented on a change in pull request #15743:
URL: https://github.com/apache/beam/pull/15743#discussion_r731335377



##########
File path: sdks/go/pkg/beam/pardo.go
##########
@@ -49,7 +50,16 @@ func TryParDo(s Scope, dofn interface{}, col PCollection, 
opts ...Option) ([]PCo
        }
 
        in := []*graph.Node{col.n}
+       inWfn := col.n.WindowingStrategy().Fn
        for _, s := range side {
+               sideNode := s.Input.n
+               sideWfn := sideNode.WindowingStrategy().Fn
+               if sideWfn.Kind == window.Sessions {
+                       return nil, fmt.Errorf("side input %v is 
session-windowed, which is not supported", sideNode.String())

Review comment:
       fmt is aware of the Stringer interface, so you don't need to explicitly 
call a `String()` method as input into a fmt call.
   ```suggestion
                        return nil, fmt.Errorf("side input %v is 
session-windowed, which is not supported", sideNode)
   ```

##########
File path: sdks/go/pkg/beam/pardo.go
##########
@@ -49,7 +50,16 @@ func TryParDo(s Scope, dofn interface{}, col PCollection, 
opts ...Option) ([]PCo
        }
 
        in := []*graph.Node{col.n}
+       inWfn := col.n.WindowingStrategy().Fn
        for _, s := range side {
+               sideNode := s.Input.n
+               sideWfn := sideNode.WindowingStrategy().Fn
+               if sideWfn.Kind == window.Sessions {
+                       return nil, fmt.Errorf("side input %v is 
session-windowed, which is not supported", sideNode.String())

Review comment:
       The property that prevents session windows from working properly is that 
it's a merging windowfn. We can't currently check this bit directly right now, 
but we can begin to educate users about it.
   
   "error with SideInput %d: PCollections using Merging WindowFns are not 
supported as side inputs"
   
   Or something similar. So that adds that it's the `i`th side input parameter 
that was the problem, but we can do better.
   
   We should also add further context: 
   Which user DoFn was the user adding to the graph, that has the bad side 
input? 
   
   We can also suggest that users could re-window PCollection to a non-merging 
WindowFn before using it as a side input. As a rule, users love it when the 
error that impedes them suggests a solution to the problem.

##########
File path: sdks/go/pkg/beam/pardo.go
##########
@@ -49,7 +50,16 @@ func TryParDo(s Scope, dofn interface{}, col PCollection, 
opts ...Option) ([]PCo
        }
 
        in := []*graph.Node{col.n}
+       inWfn := col.n.WindowingStrategy().Fn
        for _, s := range side {
+               sideNode := s.Input.n
+               sideWfn := sideNode.WindowingStrategy().Fn
+               if sideWfn.Kind == window.Sessions {
+                       return nil, fmt.Errorf("side input %v is 
session-windowed, which is not supported", sideNode.String())
+               }
+               if (inWfn.Kind == window.GlobalWindows) && (sideWfn.Kind != 
window.GlobalWindows) {
+                       return nil, fmt.Errorf("main input %v is global 
windowed but side input %v is not, cannot map windows correctly", 
col.n.String(), sideNode.String())

Review comment:
       ```suggestion
                        return nil, fmt.Errorf("main input %v is global 
windowed but side input %v is not, cannot map windows correctly", col.n, 
sideNode)
   ```

##########
File path: sdks/go/test/integration/primitives/windowinto.go
##########
@@ -93,6 +94,47 @@ func WindowSums_Lifted(s beam.Scope) {
        WindowSums(s.Scope("Lifted"), stats.SumPerKey)
 }
 
+// ValidateWindowedSideInputs checks that side inputs have accurate windowing 
information when used.
+func ValidateWindowedSideInputs(s beam.Scope) {
+       timestampedData := beam.ParDo(s, &createTimestampedData{Data: []int{1, 
2, 3}}, beam.Impulse(s))
+       timestampedSide := beam.ParDo(s, &createTimestampedData{Data: []int{1, 
2, 3}}, beam.Impulse(s))
+
+       timestampedData = beam.DropKey(s, timestampedData)
+       timestampedSide = beam.DropKey(s, timestampedSide)
+
+       _ = timestampedSide
+
+       windowSize := 1 * time.Second
+
+       validateSums := func(s beam.Scope, wfn, sideFn *window.Fn, in, side 
beam.PCollection, expected ...interface{}) {
+               wData := beam.WindowInto(s, wfn, in)
+               wSide := beam.WindowInto(s, sideFn, side)
+
+               sums := beam.ParDo(s, sumSideInputs, wData, 
beam.SideInput{Input: wSide})
+
+               sums = beam.WindowInto(s, window.NewGlobalWindows(), sums)
+
+               passert.Equals(s, sums, expected...)
+       }
+
+       // This works.

Review comment:
       We likely should get rid of the commentary here, since everything now 
works. ;)

##########
File path: sdks/go/test/integration/primitives/windowinto.go
##########
@@ -93,6 +94,47 @@ func WindowSums_Lifted(s beam.Scope) {
        WindowSums(s.Scope("Lifted"), stats.SumPerKey)
 }
 
+// ValidateWindowedSideInputs checks that side inputs have accurate windowing 
information when used.
+func ValidateWindowedSideInputs(s beam.Scope) {
+       timestampedData := beam.ParDo(s, &createTimestampedData{Data: []int{1, 
2, 3}}, beam.Impulse(s))
+       timestampedSide := beam.ParDo(s, &createTimestampedData{Data: []int{1, 
2, 3}}, beam.Impulse(s))
+
+       timestampedData = beam.DropKey(s, timestampedData)
+       timestampedSide = beam.DropKey(s, timestampedSide)
+
+       _ = timestampedSide
+
+       windowSize := 1 * time.Second
+
+       validateSums := func(s beam.Scope, wfn, sideFn *window.Fn, in, side 
beam.PCollection, expected ...interface{}) {
+               wData := beam.WindowInto(s, wfn, in)
+               wSide := beam.WindowInto(s, sideFn, side)
+
+               sums := beam.ParDo(s, sumSideInputs, wData, 
beam.SideInput{Input: wSide})
+
+               sums = beam.WindowInto(s, window.NewGlobalWindows(), sums)
+
+               passert.Equals(s, sums, expected...)
+       }
+
+       // This works.
+       validateSums(s.Scope("Fixed-Global"), 
window.NewFixedWindows(windowSize), window.NewGlobalWindows(), timestampedData, 
timestampedData, 7, 8, 9)
+       // So does this.
+       validateSums(s.Scope("Fixed-Same"), window.NewFixedWindows(windowSize), 
window.NewFixedWindows(windowSize), timestampedData, timestampedData, 2, 4, 6)
+
+       // Thise doesn't
+       validateSums(s.Scope("Fixed-Big"), window.NewFixedWindows(windowSize), 
window.NewFixedWindows(10*time.Second), timestampedData, timestampedSide, 7, 8, 
9)

Review comment:
       Please also add Sliding-Fixed and a Fixed-Sliding case, so we can be 
sure the code gets exercised fully.

##########
File path: sdks/go/pkg/beam/pardo.go
##########
@@ -49,7 +50,16 @@ func TryParDo(s Scope, dofn interface{}, col PCollection, 
opts ...Option) ([]PCo
        }
 
        in := []*graph.Node{col.n}
+       inWfn := col.n.WindowingStrategy().Fn
        for _, s := range side {
+               sideNode := s.Input.n
+               sideWfn := sideNode.WindowingStrategy().Fn
+               if sideWfn.Kind == window.Sessions {
+                       return nil, fmt.Errorf("side input %v is 
session-windowed, which is not supported", sideNode.String())
+               }
+               if (inWfn.Kind == window.GlobalWindows) && (sideWfn.Kind != 
window.GlobalWindows) {
+                       return nil, fmt.Errorf("main input %v is global 
windowed but side input %v is not, cannot map windows correctly", 
col.n.String(), sideNode.String())

Review comment:
       Same improvements to the error message here too: DoFn, which SideInput 
Parameter had the problem.
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to