Sorry for the slow reply.

I agree with XQ, if I understand correctly. So the scenario where the SDK
is sort of mid-stream when the runner decides to cancel work would result
in something like this?

1. SDK makes StateRequest
2. Runner returns StateReponse { error = "cancelled", reason =
ErrorReason.CANCELLED }
3. SDK notices ErrorReason.CANCELLED so instead of LOG.error and raise
IllegalStateException, it is perhaps LOG.info and raise
WorkCancelledException
4. Some place higher in the call stack, perhaps the SDK catches the
WorkCancelledException and suppresses. This layer could also be where the
logging occurs.

Is that roughly accurate?

Kenn

On Mon, May 5, 2025 at 3:55 AM Sam Whittle <scwhit...@apache.org> wrote:

>
>
> On Fri, May 2, 2025 at 4:30 PM XQ Hu via dev <dev@beam.apache.org> wrote:
>
>> Please fix the link for Python sdk code.
>>
> Sorry about that, the correct link is:
> https://github.com/apache/beam/blob/master/sdks/python/apache_beam/runners/worker/sdk_worker.py#L1142
>
>
>>
>> I think we should do both.
>>
>> On Fri, May 2, 2025 at 8:17 AM Sam Whittle <scwhit...@apache.org> wrote:
>>
>>> Hi,
>>> I'm sending this out to get some feedback on proposed fixes for
>>> https://github.com/apache/beam/issues/34705 since it would be good to
>>> be consistent across SDKs and would affect the beam FnApi.
>>>
>>> Currently StateReponse (code
>>> <https://github.com/apache/beam/blob/master/model/fn-execution/src/main/proto/org/apache/beam/model/fn_execution/v1/beam_fn_api.proto#L819>)
>>> contains an error string field to communicate errors populating a
>>> response.  When a state response contains an error, the error is generally
>>> raised as an exception that stops bundle processing and is logged.  Java
>>> sdk code
>>> <https://github.com/apache/beam/blob/master/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/BeamFnStateGrpcClientCache.java#L191>
>>> Python sdk code
>>> <http://google3/third_party/py/apache_beam/runners/worker/sdk_worker.py>
>>>
>>> However there are cases where such errors are expected if initiated by
>>> the runner. For example, runners may be performing load-balancing or
>>> hedging and wish to cancel processing that is no longer valid or
>>> necessary.  In such cases the current logging is excessive and concerning
>>> to users.
>>>
>>> *Proposal 1:*
>>> Add a typed exception ProcessingCancelledException to sdks instead of
>>> using generic exception types, ie IllegalStateException/RuntimeException.
>>> This exception can be handled and logged differently than other exceptions
>>> encountered during processing. Prior art is the Cloud Dataflow runner use
>>> of KeyTokenInvalidException
>>> <https://github.com/apache/beam/blob/master/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/KeyTokenInvalidException.java>.
>>> If we add explicit cancellation of processing in the fnapi in the future we
>>> could reuse the same exception.
>>>
>>> This still doesn't provide a distinction between a state response error
>>> that occurred due to an sdk bug (such as an illegal request) or when the
>>> runner wants to cancel processing.  To address this we could modify the
>>> FnApi StateResponse to include additional information to distinguish.
>>>
>>> *Proposal 2:*
>>> Add a boolean indicating if the error is runner_initiated to state
>>> response.  This is nice in that the new field is safe to be unknown and
>>> ignored by old sdks.
>>>
>>> message StateResponse {
>>>    string error;
>>>    // If true, error should be non-empty but is expected due to the
>>> runner.
>>>    // Sdks may choose to skip or reduce logging severity of error.
>>>    boolean cancelled;
>>>    ...
>>> }
>>>
>>> Instead of a boolean we could use an enum which might be more extensible
>>> but I'm unsure what additional states we would want to add in the future so
>>> it seems premature.
>>>
>>> I'd appreciate any feedback or concerns.
>>>
>>> Thanks,
>>> Sam
>>>
>>

Reply via email to