Marek Pikulski created BEAM-11755:
-------------------------------------
Summary: Cross-language consistency (RequiresStableInputs) is
quietly broken (at least on portable flink runner)
Key: BEAM-11755
URL: https://issues.apache.org/jira/browse/BEAM-11755
Project: Beam
Issue Type: Bug
Components: io-py-kafka, runner-flink, sdk-java-harness, sdk-py-core
Reporter: Marek Pikulski
Since the Python SDK does not seem to provide anything similar to
[https://beam.apache.org/releases/javadoc/2.27.0/org/apache/beam/sdk/transforms/DoFn.RequiresStableInput.html,]
I wrote a small cross-language transform in Java, to be called from a Python
SDK pipeline executed using the Flink runner. The expectation was that it would
perform the necessary buffering to correctly implement exactly-once semantics
in my use case.
However, this did not result in the creation of any Flink checkpoints. The
reason seems to be that the code in
[https://github.com/apache/beam/blob/73731ec4f3f2d185e89aa3e378d321c2154ecf53/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java#L492]
is never executed, because the UDF is called using the FnApiDoFnRunner instead.
This behavior appears particularly problematic, because the
RequiresStableInputs annotation is *silently* ignored, so users might falsely
believe that they get exactly-once semantics (EOS), whereas they only get some
kind of "at-least-once if the upstream pipeline happens to be deterministic"
(which is not the case in general).
Thus, if a user where to use, e.g., the Kafka EOS sink
([https://beam.apache.org/releases/javadoc/2.27.0/org/apache/beam/sdk/io/kafka/KafkaIO.WriteRecords.html#withEOS-int-java.lang.String-])
which relies on the RequiresStableInputs mechanism, in their cross-language
Java UDF, that might not provide correct (i.e., potentially not even
at-least-once) output in general if the upstream pipeline is not deterministic
and needs to be replayed from a checkpoint.
I feel this issue should be prioritized, because it essentially makes it
impossible to achieve source-to-sink exactly once guarantees when using Beam on
Flink with the Python SDK.
>From a user perspective, either (or a combination) of the following would
>resolve the issue:
* Implement something like RequiresStableInputs for the Python SDK's DoFn (and
ensure that using RequiresStableInputs in a Java-based DoFn results in an error
if the latter is called from a pipeline defined with the Python SDK).
* Extend the FnApiDoFnRunner to provide stable inputs to DoFn which require it.
Unfortunately, I do not feel familiar enough with the code base to address the
issue myself — at least not without further guidance, so any feedback is
welcome.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)