[
https://issues.apache.org/jira/browse/BEAM-6751?focusedWorklogId=205890&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-205890
]
ASF GitHub Bot logged work on BEAM-6751:
----------------------------------------
Author: ASF GitHub Bot
Created on: 28/Feb/19 17:41
Start Date: 28/Feb/19 17:41
Worklog Time Spent: 10m
Work Description: mxm commented on issue #7955: [BEAM-6751] Extend Kafka
EOS mode whitelist / Warn instead of throw
URL: https://github.com/apache/beam/pull/7955#issuecomment-468366778
Thank you for your reply. I was actually in the process of writing an email
to the dev list :) Will send it in a bit.
>Processing an element again is unavoidable in the face of failure, so do
you mean that the resulting processing will cause duplicate (non-idempotent)
externally visible effects?
As it stands, writes are not guaranteed to be idempotent with the
FlinkRunner. It makes sense that Spark materializes the GroupByKey, I guess
Dataflow does something similar. Flink has a different approach. It sends
checkpoint barriers through all channels which allows to take a snapshot of the
running pipeline, but it does require a callback after checkpoint completion to
commit to Kafka.
>There, it reads "during normal working of Flink applications, user can
expect a delay in visibility of the records produced into Kafka topics, equal
to average time between completed checkpoints." That is exactly the expected
cost of implementing `@RequiresStableInput`.
That is the time between Kafka transaction start and transaction end. The
transaction is completed after the checkpoint is complete.
>The historical cause, which I think you know, is that many Beam IOs were
designed before Beam and/or around Dataflow, where you can do things like
generate random numbers, shuffle to save the result, then use them and know
that they will not be regenerated upon retry
It seems like that is not true for `KafkaIO` because it is not as old as the
other transforms, though it might have been inspired by the old transforms.
>I don't quite understand your last point, about having a callback on a
`DoFn` to commit pending work. Theres `@FinishBundle` which must be called
before elements are considered processed. I think you mean kind of the
converse, like an `OK now it is durable so you can output other things`
callback?
Flink's KafkaProducer 1) pushes all pending items to Kafka, 2) checkpoints,
3) commits when the checkpoint has completed. To do something similar with
Beam, we need to be able to tell the Kafka DoFn that it can now commit the
pending Kafka transaction. But since the commit is hidden inside
`processElement`, we do not currently have control over it.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 205890)
Time Spent: 1h (was: 50m)
> KafkaIO blocks FlinkRunner in EOS mode
> --------------------------------------
>
> Key: BEAM-6751
> URL: https://issues.apache.org/jira/browse/BEAM-6751
> Project: Beam
> Issue Type: Bug
> Components: io-java-kafka, runner-flink
> Reporter: Maximilian Michels
> Assignee: Maximilian Michels
> Priority: Critical
> Fix For: 2.12.0
>
> Time Spent: 1h
> Remaining Estimate: 0h
>
> KafkaIO has a validation check which whitelists certain runners capable of
> provide exactly-once semantics:
> {noformat}
> if ("org.apache.beam.runners.direct.DirectRunner".equals(runner)
> || runner.startsWith("org.apache.beam.runners.dataflow.")
> || runner.startsWith("org.apache.beam.runners.spark.") {
> ...
> {noformat}
> The FlinkRunner supports exactly-once checkpointing but is blocked from using
> Kafka's exactly once mode.
> I wonder if such a list is easily maintainable? I think we should replace the
> list with a warning instead.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)