[ 
https://issues.apache.org/jira/browse/BEAM-6751?focusedWorklogId=205890&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-205890
 ]

ASF GitHub Bot logged work on BEAM-6751:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 28/Feb/19 17:41
            Start Date: 28/Feb/19 17:41
    Worklog Time Spent: 10m 
      Work Description: mxm commented on issue #7955: [BEAM-6751] Extend Kafka 
EOS mode whitelist / Warn instead of throw
URL: https://github.com/apache/beam/pull/7955#issuecomment-468366778
 
 
   Thank you for your reply. I was actually in the process of writing an email 
to the dev list :) Will send it in a bit.
   
   >Processing an element again is unavoidable in the face of failure, so do 
you mean that the resulting processing will cause duplicate (non-idempotent) 
externally visible effects?
   
   As it stands, writes are not guaranteed to be idempotent with the 
FlinkRunner. It makes sense that Spark materializes the GroupByKey, I guess 
Dataflow does something similar. Flink has a different approach. It sends 
checkpoint barriers through all channels which allows to take a snapshot of the 
running pipeline, but it does require a callback after checkpoint completion to 
commit to Kafka.
   
   >There, it reads "during normal working of Flink applications, user can 
expect a delay in visibility of the records produced into Kafka topics, equal 
to average time between completed checkpoints." That is exactly the expected 
cost of implementing `@RequiresStableInput`.
   
   That is the time between Kafka transaction start and transaction end. The 
transaction is completed after the checkpoint is complete. 
   
   >The historical cause, which I think you know, is that many Beam IOs were 
designed before Beam and/or around Dataflow, where you can do things like 
generate random numbers, shuffle to save the result, then use them and know 
that they will not be regenerated upon retry
   
   It seems like that is not true for `KafkaIO` because it is not as old as the 
other transforms, though it might have been inspired by the old transforms.
   
   >I don't quite understand your last point, about having a callback on a 
`DoFn` to commit pending work. Theres `@FinishBundle` which must be called 
before elements are considered processed. I think you mean kind of the 
converse, like an `OK now it is durable so you can output other things` 
callback?
   
   Flink's KafkaProducer 1) pushes all pending items to Kafka, 2) checkpoints, 
3) commits when the checkpoint has completed. To do something similar with 
Beam, we need to be able to tell the Kafka DoFn that it can now commit the 
pending Kafka transaction. But since the commit is hidden inside 
`processElement`, we do not currently have control over it.
 
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 205890)
    Time Spent: 1h  (was: 50m)

> KafkaIO blocks FlinkRunner in EOS mode
> --------------------------------------
>
>                 Key: BEAM-6751
>                 URL: https://issues.apache.org/jira/browse/BEAM-6751
>             Project: Beam
>          Issue Type: Bug
>          Components: io-java-kafka, runner-flink
>            Reporter: Maximilian Michels
>            Assignee: Maximilian Michels
>            Priority: Critical
>             Fix For: 2.12.0
>
>          Time Spent: 1h
>  Remaining Estimate: 0h
>
> KafkaIO has a validation check which whitelists certain runners capable of 
> provide exactly-once semantics:
> {noformat}
>         if ("org.apache.beam.runners.direct.DirectRunner".equals(runner)
>             || runner.startsWith("org.apache.beam.runners.dataflow.")
>             || runner.startsWith("org.apache.beam.runners.spark.") {
> ...
> {noformat}
> The FlinkRunner supports exactly-once checkpointing but is blocked from using 
> Kafka's exactly once mode. 
> I wonder if such a list is easily maintainable? I think we should replace the 
> list with a warning instead. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to