Marek Pikulski created BEAM-11755:
-------------------------------------

             Summary: Cross-language consistency (RequiresStableInputs) is 
quietly broken (at least on portable flink runner)
                 Key: BEAM-11755
                 URL: https://issues.apache.org/jira/browse/BEAM-11755
             Project: Beam
          Issue Type: Bug
          Components: io-py-kafka, runner-flink, sdk-java-harness, sdk-py-core
            Reporter: Marek Pikulski


Since the Python SDK does not seem to provide anything similar to 
[https://beam.apache.org/releases/javadoc/2.27.0/org/apache/beam/sdk/transforms/DoFn.RequiresStableInput.html,]
 I wrote a small cross-language transform in Java, to be called from a Python 
SDK pipeline executed using the Flink runner. The expectation was that it would 
perform the necessary buffering to correctly implement exactly-once semantics 
in my use case.

However, this did not result in the creation of any Flink checkpoints. The 
reason seems to be that the code in 
[https://github.com/apache/beam/blob/73731ec4f3f2d185e89aa3e378d321c2154ecf53/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java#L492]
 is never executed, because the UDF is called using the FnApiDoFnRunner instead.

This behavior appears particularly problematic, because the 
RequiresStableInputs annotation is *silently* ignored, so users might falsely 
believe that they get exactly-once semantics (EOS), whereas they only get some 
kind of "at-least-once if the upstream pipeline happens to be deterministic" 
(which is not the case in general).

Thus, if a user where to use, e.g., the Kafka EOS sink 
([https://beam.apache.org/releases/javadoc/2.27.0/org/apache/beam/sdk/io/kafka/KafkaIO.WriteRecords.html#withEOS-int-java.lang.String-])
 which relies on the RequiresStableInputs mechanism, in their cross-language 
Java UDF, that might not provide correct (i.e., potentially not even 
at-least-once) output in general if the upstream pipeline is not deterministic 
and needs to be replayed from a checkpoint.

I feel this issue should be prioritized, because it essentially makes it 
impossible to achieve source-to-sink exactly once guarantees when using Beam on 
Flink with the Python SDK.

>From a user perspective, either (or a combination) of the following would 
>resolve the issue:
 * Implement something like RequiresStableInputs for the Python SDK's DoFn (and 
ensure that using RequiresStableInputs in a Java-based DoFn results in an error 
if the latter is called from a pipeline defined with the Python SDK).
 * Extend the FnApiDoFnRunner to provide stable inputs to DoFn which require it.

Unfortunately, I do not feel familiar enough with the code base to address the 
issue myself — at least not without further guidance, so any feedback is 
welcome.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to