damccorm opened a new issue, #20812:
URL: https://github.com/apache/beam/issues/20812

   Since the Python SDK does not seem to provide anything similar to 
[https://beam.apache.org/releases/javadoc/2.27.0/org/apache/beam/sdk/transforms/DoFn.RequiresStableInput.html,](https://beam.apache.org/releases/javadoc/2.27.0/org/apache/beam/sdk/transforms/DoFn.RequiresStableInput.html,)
 I wrote a small cross-language transform in Java, to be called from a Python 
SDK pipeline executed using the Flink runner. The expectation was that it would 
perform the necessary buffering to correctly implement exactly-once semantics 
in my use case.
   
   However, this did not result in the creation of any Flink checkpoints. The 
reason seems to be that the code in 
[https://github.com/apache/beam/blob/73731ec4f3f2d185e89aa3e378d321c2154ecf53/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java#L492](https://github.com/apache/beam/blob/73731ec4f3f2d185e89aa3e378d321c2154ecf53/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java#L492)
 is never executed, because the UDF is called using the FnApiDoFnRunner instead.
   
   This behavior appears particularly problematic, because the 
RequiresStableInputs annotation is *silently* ignored, so users might falsely 
believe that they get exactly-once semantics (EOS), whereas they only get some 
kind of "at-least-once if the upstream pipeline happens to be deterministic" 
(which is not the case in general).
   
   Thus, if a user where to use, e.g., the Kafka EOS sink 
([https://beam.apache.org/releases/javadoc/2.27.0/org/apache/beam/sdk/io/kafka/KafkaIO.WriteRecords.html#withEOS-int-java.lang.String-](https://beam.apache.org/releases/javadoc/2.27.0/org/apache/beam/sdk/io/kafka/KafkaIO.WriteRecords.html#withEOS-int-java.lang.String-))
 which relies on the RequiresStableInputs mechanism, in their cross-language 
Java UDF, that might not provide correct (i.e., potentially not even 
at-least-once) output in general if the upstream pipeline is not deterministic 
and needs to be replayed from a checkpoint.
   
   I feel this issue should be prioritized, because it essentially makes it 
impossible to achieve source-to-sink exactly once guarantees when using Beam on 
Flink with the Python SDK.
   
   From a user perspective, either (or a combination) of the following would 
resolve the issue:
    * Implement something like RequiresStableInputs for the Python SDK's DoFn 
(and ensure that using RequiresStableInputs in a Java-based DoFn results in an 
error if the latter is called from a pipeline defined with the Python SDK).
    * Extend the FnApiDoFnRunner to provide stable inputs to DoFn which require 
it.
   
   Unfortunately, I do not feel familiar enough with the code base to address 
the issue myself — at least not without further guidance, so any feedback is 
welcome.
   
   Imported from Jira 
[BEAM-11755](https://issues.apache.org/jira/browse/BEAM-11755). Original Jira 
may contain additional context.
   Reported by: pikulmar.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to