camphillips22 opened a new issue, #26245: URL: https://github.com/apache/beam/issues/26245
### What happened? I created a simple Splittable DoFn that uses an `offsetrange.Restriction`. The range of the SDF was determined at pipeline construction time and the input to the SDF is an `Impulse()` collection. The dataflow runner then spews a _lot_ of the following message: ``` Error message from worker: generic::invalid_argument: First residual index 2 cannot be greater than the number of input elements to the bundle 1 ``` Occasionally, I see this one: ``` Error message from worker: generic::data_loss: SDK claims to have processed 2 but should have processed 1 elements. passed through: ==> dist_proc/dax/workflow/worker/fnapi_operators.cc:1875 generic::invalid_argument: First residual index cannot increase from previous split, was 1 and is now 2 ``` Looking in the logs, the `RESP: instruction_id` logs for `"process_bundle_split"` there are two types of messages, ones that contain non-empty primary and residual roots and ones where both are empty. The ones that are not empty contain a channel split like: ``` last_primary_element: -1 first_residual_element: 1 ``` The ones that are empty contain a channel split that have ``` last_primary_element: 1 first_residual_element: 2 ``` As the input collection only contains a single element, I believe this is the source of the error. I've been trying to determine the circumstances that lead to this. My only guess currently is that it's a race. Basically, when the Datasource node moves onto the next element (increments the index) and is waiting on receiving the `io.EOF`, a split comes in. This results in empty primary and residual roots (since the restriction tracker is done), and an off by one "error" in the element indexes. As far as what to do as a workaround, I _believe_ that the data is complete, but it's a bit concerning to see "data_loss" in the error logs. ### Issue Priority Priority: 2 (default / most bugs should be filed as P2) ### Issue Components - [ ] Component: Python SDK - [ ] Component: Java SDK - [X] Component: Go SDK - [ ] Component: Typescript SDK - [ ] Component: IO connector - [ ] Component: Beam examples - [ ] Component: Beam playground - [ ] Component: Beam katas - [ ] Component: Website - [ ] Component: Spark Runner - [ ] Component: Flink Runner - [ ] Component: Samza Runner - [ ] Component: Twister2 Runner - [ ] Component: Hazelcast Jet Runner - [ ] Component: Google Cloud Dataflow Runner -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
