This is an automated email from the ASF dual-hosted git repository.

jrmccluskey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new a477b770381 [Go] Fix reshuffle implementation to use the global window 
for gbk output. (#34462)
a477b770381 is described below

commit a477b770381f914b9e98254e6f66a0cb760f309c
Author: Sam Whittle <[email protected]>
AuthorDate: Tue Apr 1 19:15:55 2025 +0000

    [Go] Fix reshuffle implementation to use the global window for gbk output. 
(#34462)
    
    Otherwise the original non-reified windowing is used and can confuse
    the runner, which encodes the gbk input using global windows but was
    attempting to decode it as original windowing.
    
    This fixes Dataflow runner harness failures when a
    non-global windowed pcollection is reshuffled.  A work around before
    this fix would be to explicitly rewindow to the global window before
    reshuffling and reapply the desired windowing after reshuffling.
    
    This isn't caught by prism because the entire reshuffle is replaced.
---
 sdks/go/pkg/beam/core/runtime/graphx/translate.go | 2 ++
 1 file changed, 2 insertions(+)

diff --git a/sdks/go/pkg/beam/core/runtime/graphx/translate.go 
b/sdks/go/pkg/beam/core/runtime/graphx/translate.go
index ae38e96ebf9..3bbb6c70dcf 100644
--- a/sdks/go/pkg/beam/core/runtime/graphx/translate.go
+++ b/sdks/go/pkg/beam/core/runtime/graphx/translate.go
@@ -1069,6 +1069,8 @@ func (m *marshaller) expandReshuffle(edge NamedEdge) 
(string, error) {
        if _, err := m.makeNode(gbkOut, gbkCoderID, outNode); err != nil {
                return handleErr(err)
        }
+       // Use the same windowing for gbk output as postReify
+       m.pcollections[gbkOut].WindowingStrategyId = 
m.pcollections[postReify].WindowingStrategyId
 
        gbkID := fmt.Sprintf("%v_gbk", id)
        gbk := &pipepb.PTransform{

Reply via email to