On Fri, May 2, 2025 at 4:30 PM XQ Hu via dev <dev@beam.apache.org> wrote:
> Please fix the link for Python sdk code. > Sorry about that, the correct link is: https://github.com/apache/beam/blob/master/sdks/python/apache_beam/runners/worker/sdk_worker.py#L1142 > > I think we should do both. > > On Fri, May 2, 2025 at 8:17 AM Sam Whittle <scwhit...@apache.org> wrote: > >> Hi, >> I'm sending this out to get some feedback on proposed fixes for >> https://github.com/apache/beam/issues/34705 since it would be good to be >> consistent across SDKs and would affect the beam FnApi. >> >> Currently StateReponse (code >> <https://github.com/apache/beam/blob/master/model/fn-execution/src/main/proto/org/apache/beam/model/fn_execution/v1/beam_fn_api.proto#L819>) >> contains an error string field to communicate errors populating a >> response. When a state response contains an error, the error is generally >> raised as an exception that stops bundle processing and is logged. Java >> sdk code >> <https://github.com/apache/beam/blob/master/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/BeamFnStateGrpcClientCache.java#L191> >> Python sdk code >> <http://google3/third_party/py/apache_beam/runners/worker/sdk_worker.py> >> >> However there are cases where such errors are expected if initiated by >> the runner. For example, runners may be performing load-balancing or >> hedging and wish to cancel processing that is no longer valid or >> necessary. In such cases the current logging is excessive and concerning >> to users. >> >> *Proposal 1:* >> Add a typed exception ProcessingCancelledException to sdks instead of >> using generic exception types, ie IllegalStateException/RuntimeException. >> This exception can be handled and logged differently than other exceptions >> encountered during processing. Prior art is the Cloud Dataflow runner use >> of KeyTokenInvalidException >> <https://github.com/apache/beam/blob/master/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/KeyTokenInvalidException.java>. >> If we add explicit cancellation of processing in the fnapi in the future we >> could reuse the same exception. >> >> This still doesn't provide a distinction between a state response error >> that occurred due to an sdk bug (such as an illegal request) or when the >> runner wants to cancel processing. To address this we could modify the >> FnApi StateResponse to include additional information to distinguish. >> >> *Proposal 2:* >> Add a boolean indicating if the error is runner_initiated to state >> response. This is nice in that the new field is safe to be unknown and >> ignored by old sdks. >> >> message StateResponse { >> string error; >> // If true, error should be non-empty but is expected due to the >> runner. >> // Sdks may choose to skip or reduce logging severity of error. >> boolean cancelled; >> ... >> } >> >> Instead of a boolean we could use an enum which might be more extensible >> but I'm unsure what additional states we would want to add in the future so >> it seems premature. >> >> I'd appreciate any feedback or concerns. >> >> Thanks, >> Sam >> >