This is an automated email from the ASF dual-hosted git repository.
lostluck pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 16911c5 [BEAM-13221] Don't drop last data. (#15939)
16911c5 is described below
commit 16911c5b7fea090e489740edc6f9f395916703d6
Author: Robert Burke <[email protected]>
AuthorDate: Mon Nov 15 17:18:25 2021 -0800
[BEAM-13221] Don't drop last data. (#15939)
---
sdks/go/pkg/beam/core/runtime/harness/datamgr.go | 11 ++++++++++-
sdks/go/pkg/beam/core/runtime/harness/datamgr_test.go | 18 ++++++++++++++++++
2 files changed, 28 insertions(+), 1 deletion(-)
diff --git a/sdks/go/pkg/beam/core/runtime/harness/datamgr.go
b/sdks/go/pkg/beam/core/runtime/harness/datamgr.go
index 4aa7424..402310d 100644
--- a/sdks/go/pkg/beam/core/runtime/harness/datamgr.go
+++ b/sdks/go/pkg/beam/core/runtime/harness/datamgr.go
@@ -284,7 +284,15 @@ func (c *DataChannel) read(ctx context.Context) {
if elm.GetIsLast() {
// If this reader hasn't closed yet, do so now.
if !r.completed {
- // Sentinel EOF segment for stream.
Close buffer to signal EOF.
+ // Use the last segment if any.
+ if len(elm.GetData()) != 0 {
+ // In case of local side
closing, send with select.
+ select {
+ case r.buf <- elm.GetData():
+ case <-r.done:
+ }
+ }
+ // Close buffer to signal EOF.
r.completed = true
close(r.buf)
}
@@ -516,6 +524,7 @@ func (w *dataWriter) Close() error {
{
InstructionId: string(w.id.instID),
TransformId: w.id.ptransformID,
+ // TODO(BEAM-13142): Set IsLast true on final
flush instead of w/empty sentinel?
// Empty data == sentinel
IsLast: true,
},
diff --git a/sdks/go/pkg/beam/core/runtime/harness/datamgr_test.go
b/sdks/go/pkg/beam/core/runtime/harness/datamgr_test.go
index bb65f7e..5c09440 100644
--- a/sdks/go/pkg/beam/core/runtime/harness/datamgr_test.go
+++ b/sdks/go/pkg/beam/core/runtime/harness/datamgr_test.go
@@ -40,6 +40,7 @@ type fakeDataClient struct {
calls int
err error
skipFirstError bool
+ isLastCall int
blocked sync.Mutex // Prevent data from being read by the gotourtinr.
}
@@ -55,6 +56,9 @@ func (f *fakeDataClient) Recv() (*fnpb.Elements, error) {
Data: data,
TransformId: "ptr",
}
+ if f.isLastCall == f.calls {
+ elemData.IsLast = true
+ }
msg := fnpb.Elements{}
@@ -130,6 +134,20 @@ func TestDataChannelTerminate_dataReader(t *testing.T) {
// fakeDataClient eventually returns a sentinel
element.
},
}, {
+ name: "onIsLast_withData",
+ expectedError: io.EOF,
+ caseFn: func(t *testing.T, r io.ReadCloser, client
*fakeDataClient, c *DataChannel) {
+ // Set the last call with data to use is_last.
+ client.isLastCall = 2
+ },
+ }, {
+ name: "onIsLast_withoutData",
+ expectedError: io.EOF,
+ caseFn: func(t *testing.T, r io.ReadCloser, client
*fakeDataClient, c *DataChannel) {
+ // Set the call without data to use is_last.
+ client.isLastCall = 3
+ },
+ }, {
name: "onRecvError",
expectedError: expectedError,
caseFn: func(t *testing.T, r io.ReadCloser, client
*fakeDataClient, c *DataChannel) {