This is an automated email from the ASF dual-hosted git repository.

lostluck pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 16911c5  [BEAM-13221] Don't drop last data. (#15939)
16911c5 is described below

commit 16911c5b7fea090e489740edc6f9f395916703d6
Author: Robert Burke <[email protected]>
AuthorDate: Mon Nov 15 17:18:25 2021 -0800

    [BEAM-13221] Don't drop last data. (#15939)
---
 sdks/go/pkg/beam/core/runtime/harness/datamgr.go      | 11 ++++++++++-
 sdks/go/pkg/beam/core/runtime/harness/datamgr_test.go | 18 ++++++++++++++++++
 2 files changed, 28 insertions(+), 1 deletion(-)

diff --git a/sdks/go/pkg/beam/core/runtime/harness/datamgr.go 
b/sdks/go/pkg/beam/core/runtime/harness/datamgr.go
index 4aa7424..402310d 100644
--- a/sdks/go/pkg/beam/core/runtime/harness/datamgr.go
+++ b/sdks/go/pkg/beam/core/runtime/harness/datamgr.go
@@ -284,7 +284,15 @@ func (c *DataChannel) read(ctx context.Context) {
                        if elm.GetIsLast() {
                                // If this reader hasn't closed yet, do so now.
                                if !r.completed {
-                                       // Sentinel EOF segment for stream. 
Close buffer to signal EOF.
+                                       // Use the last segment if any.
+                                       if len(elm.GetData()) != 0 {
+                                               // In case of local side 
closing, send with select.
+                                               select {
+                                               case r.buf <- elm.GetData():
+                                               case <-r.done:
+                                               }
+                                       }
+                                       // Close buffer to signal EOF.
                                        r.completed = true
                                        close(r.buf)
                                }
@@ -516,6 +524,7 @@ func (w *dataWriter) Close() error {
                        {
                                InstructionId: string(w.id.instID),
                                TransformId:   w.id.ptransformID,
+                               // TODO(BEAM-13142): Set IsLast true on final 
flush instead of w/empty sentinel?
                                // Empty data == sentinel
                                IsLast: true,
                        },
diff --git a/sdks/go/pkg/beam/core/runtime/harness/datamgr_test.go 
b/sdks/go/pkg/beam/core/runtime/harness/datamgr_test.go
index bb65f7e..5c09440 100644
--- a/sdks/go/pkg/beam/core/runtime/harness/datamgr_test.go
+++ b/sdks/go/pkg/beam/core/runtime/harness/datamgr_test.go
@@ -40,6 +40,7 @@ type fakeDataClient struct {
        calls          int
        err            error
        skipFirstError bool
+       isLastCall     int
 
        blocked sync.Mutex // Prevent data from being read by the gotourtinr.
 }
@@ -55,6 +56,9 @@ func (f *fakeDataClient) Recv() (*fnpb.Elements, error) {
                Data:          data,
                TransformId:   "ptr",
        }
+       if f.isLastCall == f.calls {
+               elemData.IsLast = true
+       }
 
        msg := fnpb.Elements{}
 
@@ -130,6 +134,20 @@ func TestDataChannelTerminate_dataReader(t *testing.T) {
                                // fakeDataClient eventually returns a sentinel 
element.
                        },
                }, {
+                       name:          "onIsLast_withData",
+                       expectedError: io.EOF,
+                       caseFn: func(t *testing.T, r io.ReadCloser, client 
*fakeDataClient, c *DataChannel) {
+                               // Set the last call with data to use is_last.
+                               client.isLastCall = 2
+                       },
+               }, {
+                       name:          "onIsLast_withoutData",
+                       expectedError: io.EOF,
+                       caseFn: func(t *testing.T, r io.ReadCloser, client 
*fakeDataClient, c *DataChannel) {
+                               // Set the call without data to use is_last.
+                               client.isLastCall = 3
+                       },
+               }, {
                        name:          "onRecvError",
                        expectedError: expectedError,
                        caseFn: func(t *testing.T, r io.ReadCloser, client 
*fakeDataClient, c *DataChannel) {

Reply via email to