lostluck commented on code in PR #25982:
URL: https://github.com/apache/beam/pull/25982#discussion_r1151041222
##########
sdks/go/pkg/beam/core/runtime/harness/datamgr.go:
##########
@@ -124,28 +137,37 @@ func (m *DataChannelManager) Open(ctx context.Context,
port exec.Port) (*DataCha
return ch, nil
}
-func (m *DataChannelManager) closeInstruction(instID instructionID) {
+func (m *DataChannelManager) closeInstruction(instID instructionID, ports
[]exec.Port) error {
m.mu.Lock()
defer m.mu.Unlock()
- for _, ch := range m.ports {
- ch.removeInstruction(instID)
+ var firstNonNilError error
+ for _, port := range ports {
+ ch, ok := m.ports[port.URL]
+ if !ok {
+ continue
+ }
+ err := ch.removeInstruction(instID)
+ if err != nil && firstNonNilError == nil {
Review Comment:
Yup! This is the "paranoid check". Before, with the reader approach, we
could just pass a short read / data channel error out with the `io.Reader`. But
we can't with the channel being the main pass through, so it needs to be done
after the fact via the ScopedDataManager.
I entertained the idea of dramatically changing things so each
instruction+PTransform Pair would get a Reader, and then "splitting" a bundle
into multiple executing goroutines on independant plans... but that would have
been much harder to follow than this approach, which keeps to a single
Goroutine per ProcessBundle Instruction.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]