On Fri, May 2, 2025 at 4:30 PM XQ Hu via dev <dev@beam.apache.org> wrote:

> Please fix the link for Python sdk code.
>
Sorry about that, the correct link is:
https://github.com/apache/beam/blob/master/sdks/python/apache_beam/runners/worker/sdk_worker.py#L1142


>
> I think we should do both.
>
> On Fri, May 2, 2025 at 8:17 AM Sam Whittle <scwhit...@apache.org> wrote:
>
>> Hi,
>> I'm sending this out to get some feedback on proposed fixes for
>> https://github.com/apache/beam/issues/34705 since it would be good to be
>> consistent across SDKs and would affect the beam FnApi.
>>
>> Currently StateReponse (code
>> <https://github.com/apache/beam/blob/master/model/fn-execution/src/main/proto/org/apache/beam/model/fn_execution/v1/beam_fn_api.proto#L819>)
>> contains an error string field to communicate errors populating a
>> response.  When a state response contains an error, the error is generally
>> raised as an exception that stops bundle processing and is logged.  Java
>> sdk code
>> <https://github.com/apache/beam/blob/master/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/BeamFnStateGrpcClientCache.java#L191>
>> Python sdk code
>> <http://google3/third_party/py/apache_beam/runners/worker/sdk_worker.py>
>>
>> However there are cases where such errors are expected if initiated by
>> the runner. For example, runners may be performing load-balancing or
>> hedging and wish to cancel processing that is no longer valid or
>> necessary.  In such cases the current logging is excessive and concerning
>> to users.
>>
>> *Proposal 1:*
>> Add a typed exception ProcessingCancelledException to sdks instead of
>> using generic exception types, ie IllegalStateException/RuntimeException.
>> This exception can be handled and logged differently than other exceptions
>> encountered during processing. Prior art is the Cloud Dataflow runner use
>> of KeyTokenInvalidException
>> <https://github.com/apache/beam/blob/master/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/KeyTokenInvalidException.java>.
>> If we add explicit cancellation of processing in the fnapi in the future we
>> could reuse the same exception.
>>
>> This still doesn't provide a distinction between a state response error
>> that occurred due to an sdk bug (such as an illegal request) or when the
>> runner wants to cancel processing.  To address this we could modify the
>> FnApi StateResponse to include additional information to distinguish.
>>
>> *Proposal 2:*
>> Add a boolean indicating if the error is runner_initiated to state
>> response.  This is nice in that the new field is safe to be unknown and
>> ignored by old sdks.
>>
>> message StateResponse {
>>    string error;
>>    // If true, error should be non-empty but is expected due to the
>> runner.
>>    // Sdks may choose to skip or reduce logging severity of error.
>>    boolean cancelled;
>>    ...
>> }
>>
>> Instead of a boolean we could use an enum which might be more extensible
>> but I'm unsure what additional states we would want to add in the future so
>> it seems premature.
>>
>> I'd appreciate any feedback or concerns.
>>
>> Thanks,
>> Sam
>>
>

Reply via email to