lostluck commented on code in PR #25982:
URL: https://github.com/apache/beam/pull/25982#discussion_r1151037803
##########
sdks/go/pkg/beam/core/runtime/harness/datamgr.go:
##########
@@ -214,51 +290,88 @@ func (c *DataChannel) terminateStreamOnError(err error) {
}
}
-// OpenRead returns an io.ReadCloser of the data elements for the given
instruction and ptransform.
-func (c *DataChannel) OpenRead(ctx context.Context, ptransformID string,
instID instructionID) io.ReadCloser {
+// OpenWrite returns an io.WriteCloser of the data elements for the given
instruction and ptransform.
+func (c *DataChannel) OpenWrite(ctx context.Context, ptransformID string,
instID instructionID) io.WriteCloser {
+ return c.makeWriter(ctx, clientID{ptransformID: ptransformID, instID:
instID})
+}
+
+// OpenElementChan returns a channel of typex.Elements for the given
instruction and ptransform.
+func (c *DataChannel) OpenElementChan(ctx context.Context, ptransformID
string, instID instructionID, expectedTimerTransforms []string) (<-chan
exec.Elements, error) {
c.mu.Lock()
defer c.mu.Unlock()
cid := clientID{ptransformID: ptransformID, instID: instID}
if c.readErr != nil {
- log.Errorf(ctx, "opening a reader %v on a closed channel", cid)
- return &errReader{c.readErr}
+ return nil, fmt.Errorf("opening a reader %v on a closed
channel. Original error: %w", cid, c.readErr)
}
- return c.makeReader(ctx, cid)
+ return c.makeChannel(true, cid, expectedTimerTransforms...).ch, nil
}
-// OpenWrite returns an io.WriteCloser of the data elements for the given
instruction and ptransform.
-func (c *DataChannel) OpenWrite(ctx context.Context, ptransformID string,
instID instructionID) io.WriteCloser {
- return c.makeWriter(ctx, clientID{ptransformID: ptransformID, instID:
instID})
+// makeChannel creates a channel of exec.Elements. It expects to be called
while c.mu is held.
+func (c *DataChannel) makeChannel(fromSource bool, id clientID,
additionalTransforms ...string) *elementsChan {
+ if ec, ok := c.channels[id.instID]; ok {
+ ec.mu.Lock()
+ defer ec.mu.Unlock()
+ if fromSource {
+ ec.want = (1 + int32(len(additionalTransforms)))
Review Comment:
There's always a "data" PTransform that isn't part of the
"additionalTransforms" list.
This is something we can refactor and clean up at a later time though.
Keeping the assumption makes the diff easier to understand and reduces the
scope of the change a litte.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]