Ah, also regarding your earlier mail: I didn't know if many people were using 
Kafka with Dataflow, thanks for that clarification! :-)

Also, I don't think that the TwoPhaseCommit Sink of Flink would work in a Beam 
context, I was just posting that for reference.

Best,
Aljoscha
> On 10. Aug 2017, at 11:13, Aljoscha Krettek <aljos...@apache.org> wrote:
> 
> @Raghu: Yes, exactly, that's what I thought about this morning, actually. 
> These are the methods of an operator that are relevant to checkpointing:
> 
> class FlinkOperator() {
>  open();
>  snapshotState():
>  notifySnapshotComplete();
>  initializeState();
> }
> 
> Input would be buffered in state, would be checkpointed in snapshotState() 
> and processed when we receive a notification of a complete checkpoint (which 
> is sent out once all operators have signaled that checkpointing is complete). 
> In case of failure, we would be re-initialized with the buffered elements in 
> initializeState() and could re-process them in open().
> 
> This is somewhat expensive and leads to higher latency so we should only do 
> it if the DoFn signals that it needs deterministic input.
> 
> +Jingsong Who is working on something similar for the output produced in 
> finishBundle().
> 
>> On 9. Aug 2017, at 19:41, Raghu Angadi <rang...@google.com.INVALID> wrote:
>> 
>> Yep, an option to ensure replays see identical input would be pretty useful.
>> It might be challenging on horizontally checkpointing runners like Flink
>> (only way I see to buffer all the input in state and replay it after
>> checkpoint).
>> 
>> On Wed, Aug 9, 2017 at 10:21 AM, Reuven Lax <re...@google.com.invalid>
>> wrote:
>> 
>>> Please see Kenn's proposal. This is a generic thing that is lacking in the
>>> Beam model, and only works today for specific runners. We should fix this
>>> at the Beam level, but I don't think that should block your PR.
>>> 
>>> 
>>> On Wed, Aug 9, 2017 at 10:10 AM, Raghu Angadi <rang...@google.com.invalid>
>>> wrote:
>>> 
>>>> There are quite a few customers using KafkaIO with Dataflow. All of them
>>>> are potential users of exactly-once sink. Dataflow Pubsub sink does not
>>>> support EOS yet. Even among those customers, I do expect fraction of
>>>> applications requiring EOS would be pretty small, that's why I don't
>>> think
>>>> extra shuffles are too expensive in overall cost yet.
>>>> 
>>>> It is also not clear how Flink's 2-phase commit sink function could be
>>> used
>>>> in Beam's context. Beam could add some checkpoint semantics to state-API
>>> so
>>>> that all the runners could support in platform specific way.
>>>> 
>>>> Took a look at Flink PR, commented on a few issues I see in comments
>>> there
>>>> : https://github.com/apache/flink/pull/4239. May be an extra shuffle or
>>>> storing all them messages in state can get over those.
>>>> 
>>>> On Wed, Aug 9, 2017 at 2:07 AM, Aljoscha Krettek <aljos...@apache.org>
>>>> wrote:
>>>> 
>>>>> Yes, I think making this explicit would be good. Having a
>>> transformation
>>>>> that makes assumptions about how the runner implements certain things
>>> is
>>>>> not optimal. Also, I think that most people probably don't use Kafka
>>> with
>>>>> the Dataflow Runner (because GCE has Pubsub, but I'm guest guessing
>>>> here).
>>>>> This would mean that the intersection of "people who would benefit from
>>>> an
>>>>> exactly-once Kafka sink" and "people who use Beam on Dataflow" is
>>> rather
>>>>> small, and therefore not many people would benefit from such a
>>> Transform.
>>>>> 
>>>>> This is all just conjecture, of course.
>>>>> 
>>>>> Best,
>>>>> Aljoscha
>>>>> 
>>>>>> On 8. Aug 2017, at 23:34, Reuven Lax <re...@google.com.INVALID>
>>> wrote:
>>>>>> 
>>>>>> I think the issue we're hitting is how to write this in Beam.
>>>>>> 
>>>>>> Dataflow historically guaranteed checkpointing at every GBK (which
>>> due
>>>> to
>>>>>> the design of Dataflow's streaming shuffle was reasonably efficient).
>>>> In
>>>>>> Beam we never formalized these semantics, leaving these syncs in a
>>> gray
>>>>>> area. I believe the Spark runner currently checkpoints the RDD on
>>> every
>>>>>> GBK, so these unwritten semantics currently work for Dataflow and for
>>>>> Spark.
>>>>>> 
>>>>>> We need someway to express this operation in Beam, whether it be via
>>> an
>>>>>> explicit Checkpoint() operation or via marking DoFns as having side
>>>>>> effects, and having the runner automatically insert such a Checkpoint
>>>> in
>>>>>> front of them. In Flink, this operation can be implemented using what
>>>>>> Aljoscha posted.
>>>>>> 
>>>>>> Reuven
>>>>>> 
>>>>>> On Tue, Aug 8, 2017 at 8:22 AM, Aljoscha Krettek <
>>> aljos...@apache.org>
>>>>>> wrote:
>>>>>> 
>>>>>>> Hi,
>>>>>>> 
>>>>>>> In Flink, there is a TwoPhaseCommit SinkFunction that can be used
>>> for
>>>>> such
>>>>>>> cases: [1]. The PR for a Kafka 0.11 exactly once producer builds on
>>>>> that:
>>>>>>> [2]
>>>>>>> 
>>>>>>> Best,
>>>>>>> Aljoscha
>>>>>>> 
>>>>>>> [1] https://github.com/apache/flink/blob/
>>>> 62e99918a45b7215c099fbcf160d45
>>>>>>> aa02d4559e/flink-streaming-java/src/main/java/org/apache/
>>>>>>> flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.
>>>> java#L55
>>>>> <
>>>>>>> https://github.com/apache/flink/blob/62e99918a45b7215c099fbcf160d45
>>>>>>> aa02d4559e/flink-streaming-java/src/main/java/org/apache/
>>>>>>> flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.
>>>> java#L55>
>>>>>>> [2] https://github.com/apache/flink/pull/4239
>>>>>>>> On 3. Aug 2017, at 04:03, Raghu Angadi <rang...@google.com.INVALID
>>>> 
>>>>>>> wrote:
>>>>>>>> 
>>>>>>>> Kafka 0.11 added support for transactions[1], which allows
>>> end-to-end
>>>>>>>> exactly-once semantics. Beam's KafkaIO users can benefit from these
>>>>> while
>>>>>>>> using runners that support exactly-once processing.
>>>>>>>> 
>>>>>>>> I have an implementation of EOS support for Kafka sink :
>>>>>>>> https://github.com/apache/beam/pull/3612
>>>>>>>> It has two shuffles and builds on Beam state-API and checkpoint
>>>> barrier
>>>>>>>> between stages (as in Dataflow). Pull request has a longer
>>>> description.
>>>>>>>> 
>>>>>>>> - What other runners in addition to Dataflow would be compatible
>>> with
>>>>>>> such
>>>>>>>> a strategy?
>>>>>>>> - I think it does not quite work for Flink (as it has a global
>>>>>>> checkpoint,
>>>>>>>> not between the stages). How would one go about implementing such a
>>>>> sink.
>>>>>>>> 
>>>>>>>> Any comments on the pull request are also welcome.
>>>>>>>> 
>>>>>>>> Thanks,
>>>>>>>> Raghu.
>>>>>>>> 
>>>>>>>> [1]
>>>>>>>> https://www.confluent.io/blog/exactly-once-semantics-are-
>>>>>>> possible-heres-how-apache-kafka-does-it/
>>>>>>> 
>>>>>>> 
>>>>> 
>>>>> 
>>>> 
>>> 
> 

Reply via email to