[
https://issues.apache.org/jira/browse/BEAM-11755?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17301858#comment-17301858
]
Kenneth Knowles commented on BEAM-11755:
----------------------------------------
CC [~chamikara] [~ibzib] I wonder if this is a known issue. We introduced
{{RequiresStableInput}} to force checkpointing at certain places necessary for
exactly once. Before that we used GBK generally and "Reshuffle" specifically to
do checkpointing but that is not portable and does not work on Flink. I have no
idea of the implementation status in Python or Flink, etc.
> Cross-language consistency (RequiresStableInputs) is quietly broken (at least
> on portable flink runner)
> -------------------------------------------------------------------------------------------------------
>
> Key: BEAM-11755
> URL: https://issues.apache.org/jira/browse/BEAM-11755
> Project: Beam
> Issue Type: Bug
> Components: io-py-kafka, runner-flink, sdk-java-harness, sdk-py-core
> Reporter: Marek Pikulski
> Priority: P1
>
> Since the Python SDK does not seem to provide anything similar to
> [https://beam.apache.org/releases/javadoc/2.27.0/org/apache/beam/sdk/transforms/DoFn.RequiresStableInput.html,]
> I wrote a small cross-language transform in Java, to be called from a Python
> SDK pipeline executed using the Flink runner. The expectation was that it
> would perform the necessary buffering to correctly implement exactly-once
> semantics in my use case.
> However, this did not result in the creation of any Flink checkpoints. The
> reason seems to be that the code in
> [https://github.com/apache/beam/blob/73731ec4f3f2d185e89aa3e378d321c2154ecf53/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java#L492]
> is never executed, because the UDF is called using the FnApiDoFnRunner
> instead.
> This behavior appears particularly problematic, because the
> RequiresStableInputs annotation is *silently* ignored, so users might falsely
> believe that they get exactly-once semantics (EOS), whereas they only get
> some kind of "at-least-once if the upstream pipeline happens to be
> deterministic" (which is not the case in general).
> Thus, if a user where to use, e.g., the Kafka EOS sink
> ([https://beam.apache.org/releases/javadoc/2.27.0/org/apache/beam/sdk/io/kafka/KafkaIO.WriteRecords.html#withEOS-int-java.lang.String-])
> which relies on the RequiresStableInputs mechanism, in their cross-language
> Java UDF, that might not provide correct (i.e., potentially not even
> at-least-once) output in general if the upstream pipeline is not
> deterministic and needs to be replayed from a checkpoint.
> I feel this issue should be prioritized, because it essentially makes it
> impossible to achieve source-to-sink exactly once guarantees when using Beam
> on Flink with the Python SDK.
> From a user perspective, either (or a combination) of the following would
> resolve the issue:
> * Implement something like RequiresStableInputs for the Python SDK's DoFn
> (and ensure that using RequiresStableInputs in a Java-based DoFn results in
> an error if the latter is called from a pipeline defined with the Python SDK).
> * Extend the FnApiDoFnRunner to provide stable inputs to DoFn which require
> it.
> Unfortunately, I do not feel familiar enough with the code base to address
> the issue myself — at least not without further guidance, so any feedback is
> welcome.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)