lostluck commented on a change in pull request #15196:
URL: https://github.com/apache/beam/pull/15196#discussion_r673447514



##########
File path: sdks/go/pkg/beam/runners/direct/gbk.go
##########
@@ -100,6 +114,68 @@ func (n *CoGBK) FinishBundle(ctx context.Context) error {
        return n.Out.FinishBundle(ctx)
 }
 
+func (n *CoGBK) mergeWindows() error {
+       sort.Slice(n.wins, func(i int, j int) bool {
+               return n.wins[i].MaxTimestamp() < n.wins[j].MaxTimestamp()
+       })
+       n.mergeMap = make(map[typex.Window]int)
+       mergedWins := []typex.Window{}
+       for i := 0; i < len(n.wins); {
+               intWin, ok := n.wins[i].(window.IntervalWindow)
+               if !ok {
+                       return errors.Errorf("tried to merge non-interval 
window type")

Review comment:
       It's unlikely to crop up, but consider printing the actual type of the 
window in question using the %T formatting directive, and including that in the 
error message.
   See https://play.golang.org/p/8GhLJucF0lA

##########
File path: sdks/go/pkg/beam/runners/direct/gbk.go
##########
@@ -100,6 +114,68 @@ func (n *CoGBK) FinishBundle(ctx context.Context) error {
        return n.Out.FinishBundle(ctx)
 }
 
+func (n *CoGBK) mergeWindows() error {
+       sort.Slice(n.wins, func(i int, j int) bool {
+               return n.wins[i].MaxTimestamp() < n.wins[j].MaxTimestamp()
+       })
+       n.mergeMap = make(map[typex.Window]int)

Review comment:
       It would be worth a comment on mergedMap saying it's a map from the 
original windows to the index of the new window in the mergedWins slice.

##########
File path: sdks/go/pkg/beam/runners/direct/gbk.go
##########
@@ -100,6 +114,68 @@ func (n *CoGBK) FinishBundle(ctx context.Context) error {
        return n.Out.FinishBundle(ctx)
 }
 
+func (n *CoGBK) mergeWindows() error {
+       sort.Slice(n.wins, func(i int, j int) bool {
+               return n.wins[i].MaxTimestamp() < n.wins[j].MaxTimestamp()
+       })
+       n.mergeMap = make(map[typex.Window]int)

Review comment:
       Likely won't matter for Direct runner purposes, but since mergeWindows 
and reprocessByWindow are both called in FinishBundle (that is, it doesn't 
cross a Lifecycle Method boundary), have mergeWindow return the merge map, and 
reprocessByWindow take it as an argument.
   
   This avoids the CoGBK value holding onto the map with *all* the window 
information, allowing it to be garbage collected sooner, lowing memory overhead.
   
   I'm ambivalent about whether the sorted and merged window slice is given the 
same treatment, as long as it gets cleared away by the end of FinishBundle 
(similarly for Garbage collecting purposes).
   
   Basically, this is especially important when a pipeline can have arbitrary 
amounts of GBK operations. Since the Go Direct Runner will always run in a 
single machine's memory, it pays not to be wasteful and let things get cleaned 
up.

##########
File path: sdks/go/pkg/beam/runners/direct/gbk.go
##########
@@ -100,6 +114,68 @@ func (n *CoGBK) FinishBundle(ctx context.Context) error {
        return n.Out.FinishBundle(ctx)
 }
 
+func (n *CoGBK) mergeWindows() error {
+       sort.Slice(n.wins, func(i int, j int) bool {
+               return n.wins[i].MaxTimestamp() < n.wins[j].MaxTimestamp()
+       })
+       n.mergeMap = make(map[typex.Window]int)
+       mergedWins := []typex.Window{}
+       for i := 0; i < len(n.wins); {
+               intWin, ok := n.wins[i].(window.IntervalWindow)
+               if !ok {
+                       return errors.Errorf("tried to merge non-interval 
window type")
+               }
+               mergeStart := intWin.Start
+               mergeEnd := intWin.End
+               j := i + 1
+               for j < len(n.wins) {
+                       candidateWin := n.wins[j].(window.IntervalWindow)
+                       if candidateWin.Start <= mergeEnd {
+                               mergeEnd = candidateWin.End
+                               j++
+                       } else {
+                               break
+                       }
+               }
+               for k := i; k < j; k++ {
+                       n.mergeMap[n.wins[k]] = len(mergedWins)
+               }
+               mergedWins = append(mergedWins, window.IntervalWindow{Start: 
mergeStart, End: mergeEnd})
+               i = j
+       }
+       n.wins = mergedWins
+       return nil
+}
+
+func (n *CoGBK) reprocessByWindow() error {
+       newGroups := make(map[string]*group)
+       for _, g := range n.m {
+               ws := []typex.Window{n.wins[n.mergeMap[g.key.Windows[0]]]}
+               var buf bytes.Buffer

Review comment:
       Consider a helper function or two to reduce the duplication between this 
and ProcessElement. 
   Ideally we simply merge the two paths (so delaying work in process element 
to FinishBundle), but until then, better to clean things up a little via helper 
functions or helper types. 
   
   In this case, for example, the main difference is one is using `m.g` and 
this one is using `newGroups`. A helper type for grouping `type grouper 
map[string]*group` could handle this iteration in a method, avoiding the 
duplication. It just needs the right window information passed in somehow, in 
both cases... But two or so helper methods would be good, one for the key 
encoding and one for inserting into the group.  Since the method receiver is a 
map technically, it should use a value rather than a pointer, as maps, even as 
underlying types are reference types.
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to