lostluck commented on a change in pull request #15196:
URL: https://github.com/apache/beam/pull/15196#discussion_r673447514
##########
File path: sdks/go/pkg/beam/runners/direct/gbk.go
##########
@@ -100,6 +114,68 @@ func (n *CoGBK) FinishBundle(ctx context.Context) error {
return n.Out.FinishBundle(ctx)
}
+func (n *CoGBK) mergeWindows() error {
+ sort.Slice(n.wins, func(i int, j int) bool {
+ return n.wins[i].MaxTimestamp() < n.wins[j].MaxTimestamp()
+ })
+ n.mergeMap = make(map[typex.Window]int)
+ mergedWins := []typex.Window{}
+ for i := 0; i < len(n.wins); {
+ intWin, ok := n.wins[i].(window.IntervalWindow)
+ if !ok {
+ return errors.Errorf("tried to merge non-interval
window type")
Review comment:
It's unlikely to crop up, but consider printing the actual type of the
window in question using the %T formatting directive, and including that in the
error message.
See https://play.golang.org/p/8GhLJucF0lA
##########
File path: sdks/go/pkg/beam/runners/direct/gbk.go
##########
@@ -100,6 +114,68 @@ func (n *CoGBK) FinishBundle(ctx context.Context) error {
return n.Out.FinishBundle(ctx)
}
+func (n *CoGBK) mergeWindows() error {
+ sort.Slice(n.wins, func(i int, j int) bool {
+ return n.wins[i].MaxTimestamp() < n.wins[j].MaxTimestamp()
+ })
+ n.mergeMap = make(map[typex.Window]int)
Review comment:
It would be worth a comment on mergedMap saying it's a map from the
original windows to the index of the new window in the mergedWins slice.
##########
File path: sdks/go/pkg/beam/runners/direct/gbk.go
##########
@@ -100,6 +114,68 @@ func (n *CoGBK) FinishBundle(ctx context.Context) error {
return n.Out.FinishBundle(ctx)
}
+func (n *CoGBK) mergeWindows() error {
+ sort.Slice(n.wins, func(i int, j int) bool {
+ return n.wins[i].MaxTimestamp() < n.wins[j].MaxTimestamp()
+ })
+ n.mergeMap = make(map[typex.Window]int)
Review comment:
Likely won't matter for Direct runner purposes, but since mergeWindows
and reprocessByWindow are both called in FinishBundle (that is, it doesn't
cross a Lifecycle Method boundary), have mergeWindow return the merge map, and
reprocessByWindow take it as an argument.
This avoids the CoGBK value holding onto the map with *all* the window
information, allowing it to be garbage collected sooner, lowing memory overhead.
I'm ambivalent about whether the sorted and merged window slice is given the
same treatment, as long as it gets cleared away by the end of FinishBundle
(similarly for Garbage collecting purposes).
Basically, this is especially important when a pipeline can have arbitrary
amounts of GBK operations. Since the Go Direct Runner will always run in a
single machine's memory, it pays not to be wasteful and let things get cleaned
up.
##########
File path: sdks/go/pkg/beam/runners/direct/gbk.go
##########
@@ -100,6 +114,68 @@ func (n *CoGBK) FinishBundle(ctx context.Context) error {
return n.Out.FinishBundle(ctx)
}
+func (n *CoGBK) mergeWindows() error {
+ sort.Slice(n.wins, func(i int, j int) bool {
+ return n.wins[i].MaxTimestamp() < n.wins[j].MaxTimestamp()
+ })
+ n.mergeMap = make(map[typex.Window]int)
+ mergedWins := []typex.Window{}
+ for i := 0; i < len(n.wins); {
+ intWin, ok := n.wins[i].(window.IntervalWindow)
+ if !ok {
+ return errors.Errorf("tried to merge non-interval
window type")
+ }
+ mergeStart := intWin.Start
+ mergeEnd := intWin.End
+ j := i + 1
+ for j < len(n.wins) {
+ candidateWin := n.wins[j].(window.IntervalWindow)
+ if candidateWin.Start <= mergeEnd {
+ mergeEnd = candidateWin.End
+ j++
+ } else {
+ break
+ }
+ }
+ for k := i; k < j; k++ {
+ n.mergeMap[n.wins[k]] = len(mergedWins)
+ }
+ mergedWins = append(mergedWins, window.IntervalWindow{Start:
mergeStart, End: mergeEnd})
+ i = j
+ }
+ n.wins = mergedWins
+ return nil
+}
+
+func (n *CoGBK) reprocessByWindow() error {
+ newGroups := make(map[string]*group)
+ for _, g := range n.m {
+ ws := []typex.Window{n.wins[n.mergeMap[g.key.Windows[0]]]}
+ var buf bytes.Buffer
Review comment:
Consider a helper function or two to reduce the duplication between this
and ProcessElement.
Ideally we simply merge the two paths (so delaying work in process element
to FinishBundle), but until then, better to clean things up a little via helper
functions or helper types.
In this case, for example, the main difference is one is using `m.g` and
this one is using `newGroups`. A helper type for grouping `type grouper
map[string]*group` could handle this iteration in a method, avoiding the
duplication. It just needs the right window information passed in somehow, in
both cases... But two or so helper methods would be good, one for the key
encoding and one for inserting into the group. Since the method receiver is a
map technically, it should use a value rather than a pointer, as maps, even as
underlying types are reference types.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]