lostluck commented on code in PR #25982:
URL: https://github.com/apache/beam/pull/25982#discussion_r1151037803


##########
sdks/go/pkg/beam/core/runtime/harness/datamgr.go:
##########
@@ -214,51 +290,88 @@ func (c *DataChannel) terminateStreamOnError(err error) {
        }
 }
 
-// OpenRead returns an io.ReadCloser of the data elements for the given 
instruction and ptransform.
-func (c *DataChannel) OpenRead(ctx context.Context, ptransformID string, 
instID instructionID) io.ReadCloser {
+// OpenWrite returns an io.WriteCloser of the data elements for the given 
instruction and ptransform.
+func (c *DataChannel) OpenWrite(ctx context.Context, ptransformID string, 
instID instructionID) io.WriteCloser {
+       return c.makeWriter(ctx, clientID{ptransformID: ptransformID, instID: 
instID})
+}
+
+// OpenElementChan returns a channel of typex.Elements for the given 
instruction and ptransform.
+func (c *DataChannel) OpenElementChan(ctx context.Context, ptransformID 
string, instID instructionID, expectedTimerTransforms []string) (<-chan 
exec.Elements, error) {
        c.mu.Lock()
        defer c.mu.Unlock()
        cid := clientID{ptransformID: ptransformID, instID: instID}
        if c.readErr != nil {
-               log.Errorf(ctx, "opening a reader %v on a closed channel", cid)
-               return &errReader{c.readErr}
+               return nil, fmt.Errorf("opening a reader %v on a closed 
channel. Original error: %w", cid, c.readErr)
        }
-       return c.makeReader(ctx, cid)
+       return c.makeChannel(true, cid, expectedTimerTransforms...).ch, nil
 }
 
-// OpenWrite returns an io.WriteCloser of the data elements for the given 
instruction and ptransform.
-func (c *DataChannel) OpenWrite(ctx context.Context, ptransformID string, 
instID instructionID) io.WriteCloser {
-       return c.makeWriter(ctx, clientID{ptransformID: ptransformID, instID: 
instID})
+// makeChannel creates a channel of exec.Elements. It expects to be called 
while c.mu is held.
+func (c *DataChannel) makeChannel(fromSource bool, id clientID, 
additionalTransforms ...string) *elementsChan {
+       if ec, ok := c.channels[id.instID]; ok {
+               ec.mu.Lock()
+               defer ec.mu.Unlock()
+               if fromSource {
+                       ec.want = (1 + int32(len(additionalTransforms)))

Review Comment:
   There's always a "data" PTransform that isn't part of the 
"additionalTransforms" list.
   
   This is something we can refactor and clean up at a later time though. 
Keeping the assumption makes the diff easier to understand and reduces the 
scope of the change a litte.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to