lostluck commented on code in PR #25982:
URL: https://github.com/apache/beam/pull/25982#discussion_r1151041222


##########
sdks/go/pkg/beam/core/runtime/harness/datamgr.go:
##########
@@ -124,28 +137,37 @@ func (m *DataChannelManager) Open(ctx context.Context, 
port exec.Port) (*DataCha
        return ch, nil
 }
 
-func (m *DataChannelManager) closeInstruction(instID instructionID) {
+func (m *DataChannelManager) closeInstruction(instID instructionID, ports 
[]exec.Port) error {
        m.mu.Lock()
        defer m.mu.Unlock()
-       for _, ch := range m.ports {
-               ch.removeInstruction(instID)
+       var firstNonNilError error
+       for _, port := range ports {
+               ch, ok := m.ports[port.URL]
+               if !ok {
+                       continue
+               }
+               err := ch.removeInstruction(instID)
+               if err != nil && firstNonNilError == nil {

Review Comment:
   Yup! This is the "paranoid check". Before, with the reader approach, we 
could just pass a short read / data channel error out with the `io.Reader`. But 
we can't with the channel being the main pass through, so it needs to be done 
after the fact via the ScopedDataManager.
   
   I entertained the idea of dramatically changing things so each 
instruction+PTransform Pair would get a Reader, and then "splitting" a bundle 
into multiple executing goroutines on independant plans... but that would have 
been much harder to follow than this approach, which keeps to a single 
Goroutine per ProcessBundle Instruction.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to