riteshghorse commented on code in PR #25982:
URL: https://github.com/apache/beam/pull/25982#discussion_r1150751691
##########
sdks/go/pkg/beam/core/runtime/harness/datamgr.go:
##########
@@ -214,51 +290,88 @@ func (c *DataChannel) terminateStreamOnError(err error) {
}
}
-// OpenRead returns an io.ReadCloser of the data elements for the given
instruction and ptransform.
-func (c *DataChannel) OpenRead(ctx context.Context, ptransformID string,
instID instructionID) io.ReadCloser {
+// OpenWrite returns an io.WriteCloser of the data elements for the given
instruction and ptransform.
+func (c *DataChannel) OpenWrite(ctx context.Context, ptransformID string,
instID instructionID) io.WriteCloser {
+ return c.makeWriter(ctx, clientID{ptransformID: ptransformID, instID:
instID})
+}
+
+// OpenElementChan returns a channel of typex.Elements for the given
instruction and ptransform.
+func (c *DataChannel) OpenElementChan(ctx context.Context, ptransformID
string, instID instructionID, expectedTimerTransforms []string) (<-chan
exec.Elements, error) {
c.mu.Lock()
defer c.mu.Unlock()
cid := clientID{ptransformID: ptransformID, instID: instID}
if c.readErr != nil {
- log.Errorf(ctx, "opening a reader %v on a closed channel", cid)
- return &errReader{c.readErr}
+ return nil, fmt.Errorf("opening a reader %v on a closed
channel. Original error: %w", cid, c.readErr)
}
- return c.makeReader(ctx, cid)
+ return c.makeChannel(true, cid, expectedTimerTransforms...).ch, nil
}
-// OpenWrite returns an io.WriteCloser of the data elements for the given
instruction and ptransform.
-func (c *DataChannel) OpenWrite(ctx context.Context, ptransformID string,
instID instructionID) io.WriteCloser {
- return c.makeWriter(ctx, clientID{ptransformID: ptransformID, instID:
instID})
+// makeChannel creates a channel of exec.Elements. It expects to be called
while c.mu is held.
+func (c *DataChannel) makeChannel(fromSource bool, id clientID,
additionalTransforms ...string) *elementsChan {
+ if ec, ok := c.channels[id.instID]; ok {
+ ec.mu.Lock()
+ defer ec.mu.Unlock()
+ if fromSource {
+ ec.want = (1 + int32(len(additionalTransforms)))
Review Comment:
The `+1` is because of 1 incremented by source?
##########
sdks/go/pkg/beam/core/runtime/harness/datamgr.go:
##########
@@ -574,3 +620,99 @@ func (w *dataWriter) Write(p []byte) (n int, err error) {
w.buf = append(w.buf, p...)
return len(p), nil
}
+
+func (c *DataChannel) makeTimerWriter(ctx context.Context, id clientID, family
string) *timerWriter {
+ c.mu.Lock()
+ defer c.mu.Unlock()
+
+ var m map[timerKey]*timerWriter
+ var ok bool
+ if m, ok = c.timerWriters[id.instID]; !ok {
+ m = make(map[timerKey]*timerWriter)
+ c.timerWriters[id.instID] = m
+ }
+ tk := timerKey{ptransformID: id.ptransformID, family: family}
+ if w, ok := m[tk]; ok {
+ return w
+ }
+
+ // We don't check for finished instructions for writers, as writers
+ // can only be created if an instruction is in scope, and aren't
+ // runner or user directed.
+
+ w := &timerWriter{ch: c, id: id, timerFamilyID: family}
+ m[tk] = w
+ return w
+}
+
+type timerWriter struct {
+ id clientID
+ timerFamilyID string
+ ch *DataChannel
+}
+
+// send requires the ch.mu lock to be held.
+func (w *timerWriter) send(msg *fnpb.Elements) error {
+ recordStreamSend(msg)
+ if err := w.ch.client.Send(msg); err != nil {
+ if err == io.EOF {
+ log.Warnf(context.TODO(), "dataWriter[%v;%v] EOF on
send; fetching real error", w.id, w.ch.id)
+ err = nil
+ for err == nil {
+ // Per GRPC stream documentation, if there's an
EOF, we must call Recv
+ // until a non-nil error is returned, to ensure
resources are cleaned up.
+ //
https://pkg.go.dev/google.golang.org/grpc#ClientConn.NewStream
+ _, err = w.ch.client.Recv()
+ }
+ }
+ log.Warnf(context.TODO(), "dataWriter[%v;%v] error on send:
%v", w.id, w.ch.id, err)
Review Comment:
```suggestion
log.Warnf(context.TODO(), "timerWriter[%v;%v] error on send:
%v", w.id, w.ch.id, err)
```
##########
sdks/go/pkg/beam/core/runtime/exec/datasource.go:
##########
@@ -94,20 +98,79 @@ func (n *DataSource) Up(ctx context.Context) error {
// StartBundle initializes this datasource for the bundle.
func (n *DataSource) StartBundle(ctx context.Context, id string, data
DataContext) error {
n.mu.Lock()
+ n.curInst = id
n.source = data.Data
n.state = data.State
n.start = time.Now()
- n.index = -1
+ n.index = 0
n.splitIdx = math.MaxInt64
n.mu.Unlock()
return n.Out.StartBundle(ctx, id, data)
}
+// splitSuccess is a marker error to indicate we've reached the split index.
+// Akin to io.EOF.
+var splitSuccess = errors.New("split index reached")
+
+// process handles converting elements from the data source to timers.
+//
+// The data and timer callback functions must return an io.EOF if the reader
terminates to signal that an additional
+// buffer is desired. On successful splits, [splitSuccess] must be returned to
indicate that the
+// PTransform is done processing data for this instruction.
+func (n *DataSource) process(ctx context.Context, data func(bcr
*byteCountReader, ptransformID string) error, timer func(bcr *byteCountReader,
ptransformID, timerFamilyID string) error) error {
Review Comment:
Nice, I like this approach of passing in data and timer handling functions
##########
sdks/go/pkg/beam/core/runtime/harness/datamgr.go:
##########
@@ -574,3 +620,99 @@ func (w *dataWriter) Write(p []byte) (n int, err error) {
w.buf = append(w.buf, p...)
return len(p), nil
}
+
+func (c *DataChannel) makeTimerWriter(ctx context.Context, id clientID, family
string) *timerWriter {
+ c.mu.Lock()
+ defer c.mu.Unlock()
+
+ var m map[timerKey]*timerWriter
+ var ok bool
+ if m, ok = c.timerWriters[id.instID]; !ok {
+ m = make(map[timerKey]*timerWriter)
+ c.timerWriters[id.instID] = m
+ }
+ tk := timerKey{ptransformID: id.ptransformID, family: family}
+ if w, ok := m[tk]; ok {
+ return w
+ }
+
+ // We don't check for finished instructions for writers, as writers
+ // can only be created if an instruction is in scope, and aren't
+ // runner or user directed.
+
+ w := &timerWriter{ch: c, id: id, timerFamilyID: family}
+ m[tk] = w
+ return w
+}
+
+type timerWriter struct {
+ id clientID
+ timerFamilyID string
+ ch *DataChannel
+}
+
+// send requires the ch.mu lock to be held.
+func (w *timerWriter) send(msg *fnpb.Elements) error {
+ recordStreamSend(msg)
+ if err := w.ch.client.Send(msg); err != nil {
+ if err == io.EOF {
+ log.Warnf(context.TODO(), "dataWriter[%v;%v] EOF on
send; fetching real error", w.id, w.ch.id)
Review Comment:
```suggestion
log.Warnf(context.TODO(), "timerWriter[%v;%v] EOF on
send; fetching real error", w.id, w.ch.id)
```
##########
sdks/go/pkg/beam/core/runtime/harness/datamgr.go:
##########
@@ -124,28 +137,37 @@ func (m *DataChannelManager) Open(ctx context.Context,
port exec.Port) (*DataCha
return ch, nil
}
-func (m *DataChannelManager) closeInstruction(instID instructionID) {
+func (m *DataChannelManager) closeInstruction(instID instructionID, ports
[]exec.Port) error {
m.mu.Lock()
defer m.mu.Unlock()
- for _, ch := range m.ports {
- ch.removeInstruction(instID)
+ var firstNonNilError error
+ for _, port := range ports {
+ ch, ok := m.ports[port.URL]
+ if !ok {
+ continue
+ }
+ err := ch.removeInstruction(instID)
+ if err != nil && firstNonNilError == nil {
Review Comment:
So we are closing all ports here first and then returning the caught error.
Nice.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]