[
https://issues.apache.org/jira/browse/BEAM-11755?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17301860#comment-17301860
]
Kenneth Knowles commented on BEAM-11755:
----------------------------------------
I agree that this is P1 because it is a data corruption issue. If it is a
regression from 2.28.0 then we should block 2.29.0 on a fix, if possible. If it
has always been broken, then we should find a way to warn the user or block
pipelines from executing if they would corrupt data.
> Cross-language consistency (RequiresStableInputs) is quietly broken (at least
> on portable flink runner)
> -------------------------------------------------------------------------------------------------------
>
> Key: BEAM-11755
> URL: https://issues.apache.org/jira/browse/BEAM-11755
> Project: Beam
> Issue Type: Bug
> Components: io-py-kafka, runner-flink, sdk-java-harness, sdk-py-core
> Reporter: Marek Pikulski
> Priority: P1
>
> Since the Python SDK does not seem to provide anything similar to
> [https://beam.apache.org/releases/javadoc/2.27.0/org/apache/beam/sdk/transforms/DoFn.RequiresStableInput.html,]
> I wrote a small cross-language transform in Java, to be called from a Python
> SDK pipeline executed using the Flink runner. The expectation was that it
> would perform the necessary buffering to correctly implement exactly-once
> semantics in my use case.
> However, this did not result in the creation of any Flink checkpoints. The
> reason seems to be that the code in
> [https://github.com/apache/beam/blob/73731ec4f3f2d185e89aa3e378d321c2154ecf53/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java#L492]
> is never executed, because the UDF is called using the FnApiDoFnRunner
> instead.
> This behavior appears particularly problematic, because the
> RequiresStableInputs annotation is *silently* ignored, so users might falsely
> believe that they get exactly-once semantics (EOS), whereas they only get
> some kind of "at-least-once if the upstream pipeline happens to be
> deterministic" (which is not the case in general).
> Thus, if a user where to use, e.g., the Kafka EOS sink
> ([https://beam.apache.org/releases/javadoc/2.27.0/org/apache/beam/sdk/io/kafka/KafkaIO.WriteRecords.html#withEOS-int-java.lang.String-])
> which relies on the RequiresStableInputs mechanism, in their cross-language
> Java UDF, that might not provide correct (i.e., potentially not even
> at-least-once) output in general if the upstream pipeline is not
> deterministic and needs to be replayed from a checkpoint.
> I feel this issue should be prioritized, because it essentially makes it
> impossible to achieve source-to-sink exactly once guarantees when using Beam
> on Flink with the Python SDK.
> From a user perspective, either (or a combination) of the following would
> resolve the issue:
> * Implement something like RequiresStableInputs for the Python SDK's DoFn
> (and ensure that using RequiresStableInputs in a Java-based DoFn results in
> an error if the latter is called from a pipeline defined with the Python SDK).
> * Extend the FnApiDoFnRunner to provide stable inputs to DoFn which require
> it.
> Unfortunately, I do not feel familiar enough with the code base to address
> the issue myself — at least not without further guidance, so any feedback is
> welcome.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)