In general such discussions happen or is posted on the dev lists. Could you 
please post a summary? Thanks.



Thanks, Hari

On Wed, Dec 24, 2014 at 11:46 PM, Cody Koeninger <c...@koeninger.org>
wrote:

> After a long talk with Patrick and TD (thanks guys), I opened the following
> jira
> https://issues.apache.org/jira/browse/SPARK-4964
> Sample PR has an impementation for the batch and the dstream case, and a
> link to a project with example usage.
> On Fri, Dec 19, 2014 at 4:36 PM, Koert Kuipers <ko...@tresata.com> wrote:
>> yup, we at tresata do the idempotent store the same way. very simple
>> approach.
>>
>> On Fri, Dec 19, 2014 at 5:32 PM, Cody Koeninger <c...@koeninger.org>
>> wrote:
>>>
>>> That KafkaRDD code is dead simple.
>>>
>>> Given a user specified map
>>>
>>> (topic1, partition0) -> (startingOffset, endingOffset)
>>> (topic1, partition1) -> (startingOffset, endingOffset)
>>> ...
>>> turn each one of those entries into a partition of an rdd, using the
>>> simple
>>> consumer.
>>> That's it.  No recovery logic, no state, nothing - for any failures, bail
>>> on the rdd and let it retry.
>>> Spark stays out of the business of being a distributed database.
>>>
>>> The client code does any transformation it wants, then stores the data and
>>> offsets.  There are two ways of doing this, either based on idempotence or
>>> a transactional data store.
>>>
>>> For idempotent stores:
>>>
>>> 1.manipulate data
>>> 2.save data to store
>>> 3.save ending offsets to the same store
>>>
>>> If you fail between 2 and 3, the offsets haven't been stored, you start
>>> again at the same beginning offsets, do the same calculations in the same
>>> order, overwrite the same data, all is good.
>>>
>>>
>>> For transactional stores:
>>>
>>> 1. manipulate data
>>> 2. begin transaction
>>> 3. save data to the store
>>> 4. save offsets
>>> 5. commit transaction
>>>
>>> If you fail before 5, the transaction rolls back.  To make this less
>>> heavyweight, you can write the data outside the transaction and then
>>> update
>>> a pointer to the current data inside the transaction.
>>>
>>>
>>> Again, spark has nothing much to do with guaranteeing exactly once.  In
>>> fact, the current streaming api actively impedes my ability to do the
>>> above.  I'm just suggesting providing an api that doesn't get in the way
>>> of
>>> exactly-once.
>>>
>>>
>>>
>>>
>>>
>>> On Fri, Dec 19, 2014 at 3:57 PM, Hari Shreedharan <
>>> hshreedha...@cloudera.com
>>> > wrote:
>>>
>>> > Can you explain your basic algorithm for the once-only-delivery? It is
>>> > quite a bit of very Kafka-specific code, that would take more time to
>>> read
>>> > than I can currently afford? If you can explain your algorithm a bit, it
>>> > might help.
>>> >
>>> > Thanks,
>>> > Hari
>>> >
>>> >
>>> > On Fri, Dec 19, 2014 at 1:48 PM, Cody Koeninger <c...@koeninger.org>
>>> > wrote:
>>> >
>>> >>
>>> >> The problems you guys are discussing come from trying to store state in
>>> >> spark, so don't do that.  Spark isn't a distributed database.
>>> >>
>>> >> Just map kafka partitions directly to rdds, llet user code specify the
>>> >> range of offsets explicitly, and let them be in charge of committing
>>> >> offsets.
>>> >>
>>> >> Using the simple consumer isn't that bad, I'm already using this in
>>> >> production with the code I linked to, and tresata apparently has been
>>> as
>>> >> well.  Again, for everyone saying this is impossible, have you read
>>> either
>>> >> of those implementations and looked at the approach?
>>> >>
>>> >>
>>> >>
>>> >> On Fri, Dec 19, 2014 at 2:27 PM, Sean McNamara <
>>> >> sean.mcnam...@webtrends.com> wrote:
>>> >>
>>> >>> Please feel free to correct me if I’m wrong, but I think the exactly
>>> >>> once spark streaming semantics can easily be solved using
>>> updateStateByKey.
>>> >>> Make the key going into updateStateByKey be a hash of the event, or
>>> pluck
>>> >>> off some uuid from the message.  The updateFunc would only emit the
>>> message
>>> >>> if the key did not exist, and the user has complete control over the
>>> window
>>> >>> of time / state lifecycle for detecting duplicates.  It also makes it
>>> >>> really easy to detect and take action (alert?) when you DO see a
>>> duplicate,
>>> >>> or make memory tradeoffs within an error bound using a sketch
>>> algorithm.
>>> >>> The kafka simple consumer is insanely complex, if possible I think it
>>> would
>>> >>> be better (and vastly more flexible) to get reliability using the
>>> >>> primitives that spark so elegantly provides.
>>> >>>
>>> >>> Cheers,
>>> >>>
>>> >>> Sean
>>> >>>
>>> >>>
>>> >>> > On Dec 19, 2014, at 12:06 PM, Hari Shreedharan <
>>> >>> hshreedha...@cloudera.com> wrote:
>>> >>> >
>>> >>> > Hi Dibyendu,
>>> >>> >
>>> >>> > Thanks for the details on the implementation. But I still do not
>>> >>> believe
>>> >>> > that it is no duplicates - what they achieve is that the same batch
>>> is
>>> >>> > processed exactly the same way every time (but see it may be
>>> processed
>>> >>> more
>>> >>> > than once) - so it depends on the operation being idempotent. I
>>> believe
>>> >>> > Trident uses ZK to keep track of the transactions - a batch can be
>>> >>> > processed multiple times in failure scenarios (for example, the
>>> >>> transaction
>>> >>> > is processed but before ZK is updated the machine fails, causing a
>>> >>> "new"
>>> >>> > node to process it again).
>>> >>> >
>>> >>> > I don't think it is impossible to do this in Spark Streaming as well
>>> >>> and
>>> >>> > I'd be really interested in working on it at some point in the near
>>> >>> future.
>>> >>> >
>>> >>> > On Fri, Dec 19, 2014 at 1:44 AM, Dibyendu Bhattacharya <
>>> >>> > dibyendu.bhattach...@gmail.com> wrote:
>>> >>> >
>>> >>> >> Hi,
>>> >>> >>
>>> >>> >> Thanks to Jerry for mentioning the Kafka Spout for Trident. The
>>> Storm
>>> >>> >> Trident has done the exact-once guarantee by processing the tuple
>>> in a
>>> >>> >> batch  and assigning same transaction-id for a given batch . The
>>> >>> replay for
>>> >>> >> a given batch with a transaction-id will have exact same set of
>>> >>> tuples and
>>> >>> >> replay of batches happen in exact same order before the failure.
>>> >>> >>
>>> >>> >> Having this paradigm, if downstream system process data for a given
>>> >>> batch
>>> >>> >> for having a given transaction-id , and if during failure if same
>>> >>> batch is
>>> >>> >> again emitted , you can check if same transaction-id is already
>>> >>> processed
>>> >>> >> or not and hence can guarantee exact once semantics.
>>> >>> >>
>>> >>> >> And this can only be achieved in Spark if we use Low Level Kafka
>>> >>> consumer
>>> >>> >> API to process the offsets. This low level Kafka Consumer (
>>> >>> >> https://github.com/dibbhatt/kafka-spark-consumer) has implemented
>>> the
>>> >>> >> Spark Kafka consumer which uses Kafka Low Level APIs . All of the
>>> >>> Kafka
>>> >>> >> related logic has been taken from Storm-Kafka spout and which
>>> manages
>>> >>> all
>>> >>> >> Kafka re-balance and fault tolerant aspects and Kafka metadata
>>> >>> managements.
>>> >>> >>
>>> >>> >> Presently this Consumer maintains that during Receiver failure, it
>>> >>> will
>>> >>> >> re-emit the exact same Block with same set of messages . Every
>>> >>> message have
>>> >>> >> the details of its partition, offset and topic related details
>>> which
>>> >>> can
>>> >>> >> tackle the SPARK-3146.
>>> >>> >>
>>> >>> >> As this Low Level consumer has complete control over the Kafka
>>> >>> Offsets ,
>>> >>> >> we can implement Trident like feature on top of it like having
>>> >>> implement a
>>> >>> >> transaction-id for a given block , and re-emit the same block with
>>> >>> same set
>>> >>> >> of message during Driver failure.
>>> >>> >>
>>> >>> >> Regards,
>>> >>> >> Dibyendu
>>> >>> >>
>>> >>> >>
>>> >>> >> On Fri, Dec 19, 2014 at 7:33 AM, Shao, Saisai <
>>> saisai.s...@intel.com>
>>> >>> >> wrote:
>>> >>> >>>
>>> >>> >>> Hi all,
>>> >>> >>>
>>> >>> >>> I agree with Hari that Strong exact-once semantics is very hard to
>>> >>> >>> guarantee, especially in the failure situation. From my
>>> >>> understanding even
>>> >>> >>> current implementation of ReliableKafkaReceiver cannot fully
>>> >>> guarantee the
>>> >>> >>> exact once semantics once failed, first is the ordering of data
>>> >>> replaying
>>> >>> >>> from last checkpoint, this is hard to guarantee when multiple
>>> >>> partitions
>>> >>> >>> are injected in; second is the design complexity of achieving
>>> this,
>>> >>> you can
>>> >>> >>> refer to the Kafka Spout in Trident, we have to dig into the very
>>> >>> details
>>> >>> >>> of Kafka metadata management system to achieve this, not to say
>>> >>> rebalance
>>> >>> >>> and fault-tolerance.
>>> >>> >>>
>>> >>> >>> Thanks
>>> >>> >>> Jerry
>>> >>> >>>
>>> >>> >>> -----Original Message-----
>>> >>> >>> From: Luis Ángel Vicente Sánchez [mailto:langel.gro...@gmail.com]
>>> >>> >>> Sent: Friday, December 19, 2014 5:57 AM
>>> >>> >>> To: Cody Koeninger
>>> >>> >>> Cc: Hari Shreedharan; Patrick Wendell; dev@spark.apache.org
>>> >>> >>> Subject: Re: Which committers care about Kafka?
>>> >>> >>>
>>> >>> >>> But idempotency is not that easy t achieve sometimes. A strong
>>> only
>>> >>> once
>>> >>> >>> semantic through a proper API would  be superuseful; but I'm not
>>> >>> implying
>>> >>> >>> this is easy to achieve.
>>> >>> >>> On 18 Dec 2014 21:52, "Cody Koeninger" <c...@koeninger.org>
>>> wrote:
>>> >>> >>>
>>> >>> >>>> If the downstream store for the output data is idempotent or
>>> >>> >>>> transactional, and that downstream store also is the system of
>>> >>> record
>>> >>> >>>> for kafka offsets, then you have exactly-once semantics.  Commit
>>> >>> >>>> offsets with / after the data is stored.  On any failure, restart
>>> >>> from
>>> >>> >>> the last committed offsets.
>>> >>> >>>>
>>> >>> >>>> Yes, this approach is biased towards the etl-like use cases
>>> rather
>>> >>> >>>> than near-realtime-analytics use cases.
>>> >>> >>>>
>>> >>> >>>> On Thu, Dec 18, 2014 at 3:27 PM, Hari Shreedharan <
>>> >>> >>>> hshreedha...@cloudera.com
>>> >>> >>>>> wrote:
>>> >>> >>>>>
>>> >>> >>>>> I get what you are saying. But getting exactly once right is an
>>> >>> >>>>> extremely hard problem - especially in presence of failure. The
>>> >>> >>>>> issue is failures
>>> >>> >>>> can
>>> >>> >>>>> happen in a bunch of places. For example, before the
>>> notification
>>> >>> of
>>> >>> >>>>> downstream store being successful reaches the receiver that
>>> updates
>>> >>> >>>>> the offsets, the node fails. The store was successful, but
>>> >>> >>>>> duplicates came in either way. This is something worth
>>> discussing
>>> >>> by
>>> >>> >>>>> itself - but without uuids etc this might not really be solved
>>> even
>>> >>> >>> when you think it is.
>>> >>> >>>>>
>>> >>> >>>>> Anyway, I will look at the links. Even I am interested in all of
>>> >>> the
>>> >>> >>>>> features you mentioned - no HDFS WAL for Kafka and once-only
>>> >>> >>>>> delivery,
>>> >>> >>>> but
>>> >>> >>>>> I doubt the latter is really possible to guarantee - though I
>>> >>> really
>>> >>> >>>> would
>>> >>> >>>>> love to have that!
>>> >>> >>>>>
>>> >>> >>>>> Thanks,
>>> >>> >>>>> Hari
>>> >>> >>>>>
>>> >>> >>>>>
>>> >>> >>>>> On Thu, Dec 18, 2014 at 12:26 PM, Cody Koeninger
>>> >>> >>>>> <c...@koeninger.org>
>>> >>> >>>>> wrote:
>>> >>> >>>>>
>>> >>> >>>>>> Thanks for the replies.
>>> >>> >>>>>>
>>> >>> >>>>>> Regarding skipping WAL, it's not just about optimization.  If
>>> you
>>> >>> >>>>>> actually want exactly-once semantics, you need control of kafka
>>> >>> >>>>>> offsets
>>> >>> >>>> as
>>> >>> >>>>>> well, including the ability to not use zookeeper as the system
>>> of
>>> >>> >>>>>> record for offsets.  Kafka already is a reliable system that
>>> has
>>> >>> >>>>>> strong
>>> >>> >>>> ordering
>>> >>> >>>>>> guarantees (within a partition) and does not mandate the use of
>>> >>> >>>> zookeeper
>>> >>> >>>>>> to store offsets.  I think there should be a spark api that
>>> acts
>>> >>> as
>>> >>> >>>>>> a
>>> >>> >>>> very
>>> >>> >>>>>> simple intermediary between Kafka and the user's choice of
>>> >>> >>>>>> downstream
>>> >>> >>>> store.
>>> >>> >>>>>>
>>> >>> >>>>>> Take a look at the links I posted - if there's already been 2
>>> >>> >>>> independent
>>> >>> >>>>>> implementations of the idea, chances are it's something people
>>> >>> need.
>>> >>> >>>>>>
>>> >>> >>>>>> On Thu, Dec 18, 2014 at 1:44 PM, Hari Shreedharan <
>>> >>> >>>>>> hshreedha...@cloudera.com> wrote:
>>> >>> >>>>>>>
>>> >>> >>>>>>> Hi Cody,
>>> >>> >>>>>>>
>>> >>> >>>>>>> I am an absolute +1 on SPARK-3146. I think we can implement
>>> >>> >>>>>>> something pretty simple and lightweight for that one.
>>> >>> >>>>>>>
>>> >>> >>>>>>> For the Kafka DStream skipping the WAL implementation - this
>>> is
>>> >>> >>>>>>> something I discussed with TD a few weeks ago. Though it is a
>>> >>> good
>>> >>> >>>> idea to
>>> >>> >>>>>>> implement this to avoid unnecessary HDFS writes, it is an
>>> >>> >>>> optimization. For
>>> >>> >>>>>>> that reason, we must be careful in implementation. There are a
>>> >>> >>>>>>> couple
>>> >>> >>>> of
>>> >>> >>>>>>> issues that we need to ensure works properly - specifically
>>> >>> >>> ordering.
>>> >>> >>>> To
>>> >>> >>>>>>> ensure we pull messages from different topics and partitions
>>> in
>>> >>> >>>>>>> the
>>> >>> >>>> same
>>> >>> >>>>>>> order after failure, we’d still have to persist the metadata
>>> to
>>> >>> >>>>>>> HDFS
>>> >>> >>>> (or
>>> >>> >>>>>>> some other system) - this metadata must contain the order of
>>> >>> >>>>>>> messages consumed, so we know how to re-read the messages. I
>>> am
>>> >>> >>>>>>> planning to
>>> >>> >>>> explore
>>> >>> >>>>>>> this once I have some time (probably in Jan). In addition, we
>>> >>> must
>>> >>> >>>>>>> also ensure bucketing functions work fine as well. I will
>>> file a
>>> >>> >>>>>>> placeholder jira for this one.
>>> >>> >>>>>>>
>>> >>> >>>>>>> I also wrote an API to write data back to Kafka a while back -
>>> >>> >>>>>>> https://github.com/apache/spark/pull/2994 . I am hoping that
>>> >>> this
>>> >>> >>>>>>> will get pulled in soon, as this is something I know people
>>> want.
>>> >>> >>>>>>> I am open
>>> >>> >>>> to
>>> >>> >>>>>>> feedback on that - anything that I can do to make it better.
>>> >>> >>>>>>>
>>> >>> >>>>>>> Thanks,
>>> >>> >>>>>>> Hari
>>> >>> >>>>>>>
>>> >>> >>>>>>>
>>> >>> >>>>>>> On Thu, Dec 18, 2014 at 11:14 AM, Patrick Wendell
>>> >>> >>>>>>> <pwend...@gmail.com>
>>> >>> >>>>>>> wrote:
>>> >>> >>>>>>>
>>> >>> >>>>>>>> Hey Cody,
>>> >>> >>>>>>>>
>>> >>> >>>>>>>> Thanks for reaching out with this. The lead on streaming is
>>> TD -
>>> >>> >>>>>>>> he is traveling this week though so I can respond a bit. To
>>> the
>>> >>> >>>>>>>> high level point of whether Kafka is important - it
>>> definitely
>>> >>> >>>>>>>> is. Something like 80% of Spark Streaming deployments
>>> >>> >>>>>>>> (anecdotally) ingest data from Kafka. Also, good support for
>>> >>> >>>>>>>> Kafka is something we generally want in Spark and not a
>>> library.
>>> >>> >>>>>>>> In some cases IIRC there were user libraries that used
>>> unstable
>>> >>> >>>>>>>> Kafka API's and we were somewhat waiting on Kafka to
>>> stabilize
>>> >>> >>>>>>>> them to merge things upstream. Otherwise users wouldn't be
>>> able
>>> >>> >>>>>>>> to use newer Kakfa versions. This is a high level impression
>>> >>> only
>>> >>> >>>>>>>> though, I haven't talked to TD about this recently so it's
>>> worth
>>> >>> >>> revisiting given the developments in Kafka.
>>> >>> >>>>>>>>
>>> >>> >>>>>>>> Please do bring things up like this on the dev list if there
>>> are
>>> >>> >>>>>>>> blockers for your usage - thanks for pinging it.
>>> >>> >>>>>>>>
>>> >>> >>>>>>>> - Patrick
>>> >>> >>>>>>>>
>>> >>> >>>>>>>> On Thu, Dec 18, 2014 at 7:07 AM, Cody Koeninger
>>> >>> >>>>>>>> <c...@koeninger.org>
>>> >>> >>>>>>>> wrote:
>>> >>> >>>>>>>>> Now that 1.2 is finalized... who are the go-to people to get
>>> >>> >>>>>>>>> some long-standing Kafka related issues resolved?
>>> >>> >>>>>>>>>
>>> >>> >>>>>>>>> The existing api is not sufficiently safe nor flexible for
>>> our
>>> >>> >>>>>>>> production
>>> >>> >>>>>>>>> use. I don't think we're alone in this viewpoint, because
>>> I've
>>> >>> >>>>>>>>> seen several different patches and libraries to fix the same
>>> >>> >>>>>>>>> things we've
>>> >>> >>>>>>>> been
>>> >>> >>>>>>>>> running into.
>>> >>> >>>>>>>>>
>>> >>> >>>>>>>>> Regarding flexibility
>>> >>> >>>>>>>>>
>>> >>> >>>>>>>>> https://issues.apache.org/jira/browse/SPARK-3146
>>> >>> >>>>>>>>>
>>> >>> >>>>>>>>> has been outstanding since August, and IMHO an equivalent of
>>> >>> >>>>>>>>> this is absolutely necessary. We wrote a similar patch
>>> >>> >>>>>>>>> ourselves, then found
>>> >>> >>>>>>>> that
>>> >>> >>>>>>>>> PR and have been running it in production. We wouldn't be
>>> able
>>> >>> >>>>>>>>> to
>>> >>> >>>> get
>>> >>> >>>>>>>> our
>>> >>> >>>>>>>>> jobs done without it. It also allows users to solve a whole
>>> >>> >>>>>>>>> class of problems for themselves (e.g. SPARK-2388, arbitrary
>>> >>> >>>>>>>>> delay of
>>> >>> >>>>>>>> messages, etc).
>>> >>> >>>>>>>>>
>>> >>> >>>>>>>>> Regarding safety, I understand the motivation behind
>>> >>> >>>>>>>>> WriteAheadLog
>>> >>> >>>> as
>>> >>> >>>>>>>> a
>>> >>> >>>>>>>>> general solution for streaming unreliable sources, but Kafka
>>> >>> >>>>>>>>> already
>>> >>> >>>>>>>> is a
>>> >>> >>>>>>>>> reliable source. I think there's a need for an api that
>>> treats
>>> >>> >>>>>>>>> it as such. Even aside from the performance issues of
>>> >>> >>>>>>>>> duplicating the write-ahead log in kafka into another
>>> >>> >>>>>>>>> write-ahead log in hdfs, I
>>> >>> >>>> need
>>> >>> >>>>>>>>> exactly-once semantics in the face of failure (I've had
>>> >>> >>>>>>>>> failures
>>> >>> >>>> that
>>> >>> >>>>>>>>> prevented reloading a spark streaming checkpoint, for
>>> >>> instance).
>>> >>> >>>>>>>>>
>>> >>> >>>>>>>>> I've got an implementation i've been using
>>> >>> >>>>>>>>>
>>> >>> >>>>>>>>>
>>> >>> https://github.com/koeninger/spark-1/tree/kafkaRdd/external/kaf
>>> >>> >>>>>>>>> ka /src/main/scala/org/apache/spark/rdd/kafka
>>> >>> >>>>>>>>>
>>> >>> >>>>>>>>> Tresata has something similar at
>>> >>> >>>>>>>> https://github.com/tresata/spark-kafka,
>>> >>> >>>>>>>>> and I know there were earlier attempts based on Storm code.
>>> >>> >>>>>>>>>
>>> >>> >>>>>>>>> Trying to distribute these kinds of fixes as libraries
>>> rather
>>> >>> >>>>>>>>> than
>>> >>> >>>>>>>> patches
>>> >>> >>>>>>>>> to Spark is problematic, because large portions of the
>>> >>> >>>> implementation
>>> >>> >>>>>>>> are
>>> >>> >>>>>>>>> private[spark].
>>> >>> >>>>>>>>>
>>> >>> >>>>>>>>> I'd like to help, but i need to know whose attention to get.
>>> >>> >>>>>>>>
>>> >>> >>>>>>>>
>>> >>> -----------------------------------------------------------------
>>> >>> >>>>>>>> ---- To unsubscribe, e-mail:
>>> dev-unsubscr...@spark.apache.org
>>> >>> For
>>> >>> >>>>>>>> additional commands, e-mail: dev-h...@spark.apache.org
>>> >>> >>>>>>>>
>>> >>> >>>>>>>>
>>> >>> >>>>>>>
>>> >>> >>>>>
>>> >>> >>>>
>>> >>> >>>
>>> >>> >>
>>> >>>
>>> >>>
>>> >>
>>> >
>>>
>>

Reply via email to