[ 
https://issues.apache.org/jira/browse/BEAM-11755?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17302071#comment-17302071
 ] 

Kenneth Knowles commented on BEAM-11755:
----------------------------------------

I will also do a oneshot ping of [~robertwb] and [~boyuanz] who may be 
interested in limitation of portability and xlang across various runners.

> Cross-language consistency (RequiresStableInputs) is quietly broken (at least 
> on portable flink runner)
> -------------------------------------------------------------------------------------------------------
>
>                 Key: BEAM-11755
>                 URL: https://issues.apache.org/jira/browse/BEAM-11755
>             Project: Beam
>          Issue Type: Bug
>          Components: cross-language, io-py-kafka, runner-flink, 
> sdk-java-harness, sdk-py-core
>            Reporter: Marek Pikulski
>            Priority: P1
>
> Since the Python SDK does not seem to provide anything similar to 
> [https://beam.apache.org/releases/javadoc/2.27.0/org/apache/beam/sdk/transforms/DoFn.RequiresStableInput.html,]
>  I wrote a small cross-language transform in Java, to be called from a Python 
> SDK pipeline executed using the Flink runner. The expectation was that it 
> would perform the necessary buffering to correctly implement exactly-once 
> semantics in my use case.
> However, this did not result in the creation of any Flink checkpoints. The 
> reason seems to be that the code in 
> [https://github.com/apache/beam/blob/73731ec4f3f2d185e89aa3e378d321c2154ecf53/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java#L492]
>  is never executed, because the UDF is called using the FnApiDoFnRunner 
> instead.
> This behavior appears particularly problematic, because the 
> RequiresStableInputs annotation is *silently* ignored, so users might falsely 
> believe that they get exactly-once semantics (EOS), whereas they only get 
> some kind of "at-least-once if the upstream pipeline happens to be 
> deterministic" (which is not the case in general).
> Thus, if a user where to use, e.g., the Kafka EOS sink 
> ([https://beam.apache.org/releases/javadoc/2.27.0/org/apache/beam/sdk/io/kafka/KafkaIO.WriteRecords.html#withEOS-int-java.lang.String-])
>  which relies on the RequiresStableInputs mechanism, in their cross-language 
> Java UDF, that might not provide correct (i.e., potentially not even 
> at-least-once) output in general if the upstream pipeline is not 
> deterministic and needs to be replayed from a checkpoint.
> I feel this issue should be prioritized, because it essentially makes it 
> impossible to achieve source-to-sink exactly once guarantees when using Beam 
> on Flink with the Python SDK.
> From a user perspective, either (or a combination) of the following would 
> resolve the issue:
>  * Implement something like RequiresStableInputs for the Python SDK's DoFn 
> (and ensure that using RequiresStableInputs in a Java-based DoFn results in 
> an error if the latter is called from a pipeline defined with the Python SDK).
>  * Extend the FnApiDoFnRunner to provide stable inputs to DoFn which require 
> it.
> Unfortunately, I do not feel familiar enough with the code base to address 
> the issue myself — at least not without further guidance, so any feedback is 
> welcome.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to