yup, we at tresata do the idempotent store the same way. very simple
approach.

On Fri, Dec 19, 2014 at 5:32 PM, Cody Koeninger <c...@koeninger.org> wrote:
>
> That KafkaRDD code is dead simple.
>
> Given a user specified map
>
> (topic1, partition0) -> (startingOffset, endingOffset)
> (topic1, partition1) -> (startingOffset, endingOffset)
> ...
> turn each one of those entries into a partition of an rdd, using the simple
> consumer.
> That's it.  No recovery logic, no state, nothing - for any failures, bail
> on the rdd and let it retry.
> Spark stays out of the business of being a distributed database.
>
> The client code does any transformation it wants, then stores the data and
> offsets.  There are two ways of doing this, either based on idempotence or
> a transactional data store.
>
> For idempotent stores:
>
> 1.manipulate data
> 2.save data to store
> 3.save ending offsets to the same store
>
> If you fail between 2 and 3, the offsets haven't been stored, you start
> again at the same beginning offsets, do the same calculations in the same
> order, overwrite the same data, all is good.
>
>
> For transactional stores:
>
> 1. manipulate data
> 2. begin transaction
> 3. save data to the store
> 4. save offsets
> 5. commit transaction
>
> If you fail before 5, the transaction rolls back.  To make this less
> heavyweight, you can write the data outside the transaction and then update
> a pointer to the current data inside the transaction.
>
>
> Again, spark has nothing much to do with guaranteeing exactly once.  In
> fact, the current streaming api actively impedes my ability to do the
> above.  I'm just suggesting providing an api that doesn't get in the way of
> exactly-once.
>
>
>
>
>
> On Fri, Dec 19, 2014 at 3:57 PM, Hari Shreedharan <
> hshreedha...@cloudera.com
> > wrote:
>
> > Can you explain your basic algorithm for the once-only-delivery? It is
> > quite a bit of very Kafka-specific code, that would take more time to
> read
> > than I can currently afford? If you can explain your algorithm a bit, it
> > might help.
> >
> > Thanks,
> > Hari
> >
> >
> > On Fri, Dec 19, 2014 at 1:48 PM, Cody Koeninger <c...@koeninger.org>
> > wrote:
> >
> >>
> >> The problems you guys are discussing come from trying to store state in
> >> spark, so don't do that.  Spark isn't a distributed database.
> >>
> >> Just map kafka partitions directly to rdds, llet user code specify the
> >> range of offsets explicitly, and let them be in charge of committing
> >> offsets.
> >>
> >> Using the simple consumer isn't that bad, I'm already using this in
> >> production with the code I linked to, and tresata apparently has been as
> >> well.  Again, for everyone saying this is impossible, have you read
> either
> >> of those implementations and looked at the approach?
> >>
> >>
> >>
> >> On Fri, Dec 19, 2014 at 2:27 PM, Sean McNamara <
> >> sean.mcnam...@webtrends.com> wrote:
> >>
> >>> Please feel free to correct me if I’m wrong, but I think the exactly
> >>> once spark streaming semantics can easily be solved using
> updateStateByKey.
> >>> Make the key going into updateStateByKey be a hash of the event, or
> pluck
> >>> off some uuid from the message.  The updateFunc would only emit the
> message
> >>> if the key did not exist, and the user has complete control over the
> window
> >>> of time / state lifecycle for detecting duplicates.  It also makes it
> >>> really easy to detect and take action (alert?) when you DO see a
> duplicate,
> >>> or make memory tradeoffs within an error bound using a sketch
> algorithm.
> >>> The kafka simple consumer is insanely complex, if possible I think it
> would
> >>> be better (and vastly more flexible) to get reliability using the
> >>> primitives that spark so elegantly provides.
> >>>
> >>> Cheers,
> >>>
> >>> Sean
> >>>
> >>>
> >>> > On Dec 19, 2014, at 12:06 PM, Hari Shreedharan <
> >>> hshreedha...@cloudera.com> wrote:
> >>> >
> >>> > Hi Dibyendu,
> >>> >
> >>> > Thanks for the details on the implementation. But I still do not
> >>> believe
> >>> > that it is no duplicates - what they achieve is that the same batch
> is
> >>> > processed exactly the same way every time (but see it may be
> processed
> >>> more
> >>> > than once) - so it depends on the operation being idempotent. I
> believe
> >>> > Trident uses ZK to keep track of the transactions - a batch can be
> >>> > processed multiple times in failure scenarios (for example, the
> >>> transaction
> >>> > is processed but before ZK is updated the machine fails, causing a
> >>> "new"
> >>> > node to process it again).
> >>> >
> >>> > I don't think it is impossible to do this in Spark Streaming as well
> >>> and
> >>> > I'd be really interested in working on it at some point in the near
> >>> future.
> >>> >
> >>> > On Fri, Dec 19, 2014 at 1:44 AM, Dibyendu Bhattacharya <
> >>> > dibyendu.bhattach...@gmail.com> wrote:
> >>> >
> >>> >> Hi,
> >>> >>
> >>> >> Thanks to Jerry for mentioning the Kafka Spout for Trident. The
> Storm
> >>> >> Trident has done the exact-once guarantee by processing the tuple
> in a
> >>> >> batch  and assigning same transaction-id for a given batch . The
> >>> replay for
> >>> >> a given batch with a transaction-id will have exact same set of
> >>> tuples and
> >>> >> replay of batches happen in exact same order before the failure.
> >>> >>
> >>> >> Having this paradigm, if downstream system process data for a given
> >>> batch
> >>> >> for having a given transaction-id , and if during failure if same
> >>> batch is
> >>> >> again emitted , you can check if same transaction-id is already
> >>> processed
> >>> >> or not and hence can guarantee exact once semantics.
> >>> >>
> >>> >> And this can only be achieved in Spark if we use Low Level Kafka
> >>> consumer
> >>> >> API to process the offsets. This low level Kafka Consumer (
> >>> >> https://github.com/dibbhatt/kafka-spark-consumer) has implemented
> the
> >>> >> Spark Kafka consumer which uses Kafka Low Level APIs . All of the
> >>> Kafka
> >>> >> related logic has been taken from Storm-Kafka spout and which
> manages
> >>> all
> >>> >> Kafka re-balance and fault tolerant aspects and Kafka metadata
> >>> managements.
> >>> >>
> >>> >> Presently this Consumer maintains that during Receiver failure, it
> >>> will
> >>> >> re-emit the exact same Block with same set of messages . Every
> >>> message have
> >>> >> the details of its partition, offset and topic related details which
> >>> can
> >>> >> tackle the SPARK-3146.
> >>> >>
> >>> >> As this Low Level consumer has complete control over the Kafka
> >>> Offsets ,
> >>> >> we can implement Trident like feature on top of it like having
> >>> implement a
> >>> >> transaction-id for a given block , and re-emit the same block with
> >>> same set
> >>> >> of message during Driver failure.
> >>> >>
> >>> >> Regards,
> >>> >> Dibyendu
> >>> >>
> >>> >>
> >>> >> On Fri, Dec 19, 2014 at 7:33 AM, Shao, Saisai <
> saisai.s...@intel.com>
> >>> >> wrote:
> >>> >>>
> >>> >>> Hi all,
> >>> >>>
> >>> >>> I agree with Hari that Strong exact-once semantics is very hard to
> >>> >>> guarantee, especially in the failure situation. From my
> >>> understanding even
> >>> >>> current implementation of ReliableKafkaReceiver cannot fully
> >>> guarantee the
> >>> >>> exact once semantics once failed, first is the ordering of data
> >>> replaying
> >>> >>> from last checkpoint, this is hard to guarantee when multiple
> >>> partitions
> >>> >>> are injected in; second is the design complexity of achieving this,
> >>> you can
> >>> >>> refer to the Kafka Spout in Trident, we have to dig into the very
> >>> details
> >>> >>> of Kafka metadata management system to achieve this, not to say
> >>> rebalance
> >>> >>> and fault-tolerance.
> >>> >>>
> >>> >>> Thanks
> >>> >>> Jerry
> >>> >>>
> >>> >>> -----Original Message-----
> >>> >>> From: Luis Ángel Vicente Sánchez [mailto:langel.gro...@gmail.com]
> >>> >>> Sent: Friday, December 19, 2014 5:57 AM
> >>> >>> To: Cody Koeninger
> >>> >>> Cc: Hari Shreedharan; Patrick Wendell; dev@spark.apache.org
> >>> >>> Subject: Re: Which committers care about Kafka?
> >>> >>>
> >>> >>> But idempotency is not that easy t achieve sometimes. A strong only
> >>> once
> >>> >>> semantic through a proper API would  be superuseful; but I'm not
> >>> implying
> >>> >>> this is easy to achieve.
> >>> >>> On 18 Dec 2014 21:52, "Cody Koeninger" <c...@koeninger.org> wrote:
> >>> >>>
> >>> >>>> If the downstream store for the output data is idempotent or
> >>> >>>> transactional, and that downstream store also is the system of
> >>> record
> >>> >>>> for kafka offsets, then you have exactly-once semantics.  Commit
> >>> >>>> offsets with / after the data is stored.  On any failure, restart
> >>> from
> >>> >>> the last committed offsets.
> >>> >>>>
> >>> >>>> Yes, this approach is biased towards the etl-like use cases rather
> >>> >>>> than near-realtime-analytics use cases.
> >>> >>>>
> >>> >>>> On Thu, Dec 18, 2014 at 3:27 PM, Hari Shreedharan <
> >>> >>>> hshreedha...@cloudera.com
> >>> >>>>> wrote:
> >>> >>>>>
> >>> >>>>> I get what you are saying. But getting exactly once right is an
> >>> >>>>> extremely hard problem - especially in presence of failure. The
> >>> >>>>> issue is failures
> >>> >>>> can
> >>> >>>>> happen in a bunch of places. For example, before the notification
> >>> of
> >>> >>>>> downstream store being successful reaches the receiver that
> updates
> >>> >>>>> the offsets, the node fails. The store was successful, but
> >>> >>>>> duplicates came in either way. This is something worth discussing
> >>> by
> >>> >>>>> itself - but without uuids etc this might not really be solved
> even
> >>> >>> when you think it is.
> >>> >>>>>
> >>> >>>>> Anyway, I will look at the links. Even I am interested in all of
> >>> the
> >>> >>>>> features you mentioned - no HDFS WAL for Kafka and once-only
> >>> >>>>> delivery,
> >>> >>>> but
> >>> >>>>> I doubt the latter is really possible to guarantee - though I
> >>> really
> >>> >>>> would
> >>> >>>>> love to have that!
> >>> >>>>>
> >>> >>>>> Thanks,
> >>> >>>>> Hari
> >>> >>>>>
> >>> >>>>>
> >>> >>>>> On Thu, Dec 18, 2014 at 12:26 PM, Cody Koeninger
> >>> >>>>> <c...@koeninger.org>
> >>> >>>>> wrote:
> >>> >>>>>
> >>> >>>>>> Thanks for the replies.
> >>> >>>>>>
> >>> >>>>>> Regarding skipping WAL, it's not just about optimization.  If
> you
> >>> >>>>>> actually want exactly-once semantics, you need control of kafka
> >>> >>>>>> offsets
> >>> >>>> as
> >>> >>>>>> well, including the ability to not use zookeeper as the system
> of
> >>> >>>>>> record for offsets.  Kafka already is a reliable system that has
> >>> >>>>>> strong
> >>> >>>> ordering
> >>> >>>>>> guarantees (within a partition) and does not mandate the use of
> >>> >>>> zookeeper
> >>> >>>>>> to store offsets.  I think there should be a spark api that acts
> >>> as
> >>> >>>>>> a
> >>> >>>> very
> >>> >>>>>> simple intermediary between Kafka and the user's choice of
> >>> >>>>>> downstream
> >>> >>>> store.
> >>> >>>>>>
> >>> >>>>>> Take a look at the links I posted - if there's already been 2
> >>> >>>> independent
> >>> >>>>>> implementations of the idea, chances are it's something people
> >>> need.
> >>> >>>>>>
> >>> >>>>>> On Thu, Dec 18, 2014 at 1:44 PM, Hari Shreedharan <
> >>> >>>>>> hshreedha...@cloudera.com> wrote:
> >>> >>>>>>>
> >>> >>>>>>> Hi Cody,
> >>> >>>>>>>
> >>> >>>>>>> I am an absolute +1 on SPARK-3146. I think we can implement
> >>> >>>>>>> something pretty simple and lightweight for that one.
> >>> >>>>>>>
> >>> >>>>>>> For the Kafka DStream skipping the WAL implementation - this is
> >>> >>>>>>> something I discussed with TD a few weeks ago. Though it is a
> >>> good
> >>> >>>> idea to
> >>> >>>>>>> implement this to avoid unnecessary HDFS writes, it is an
> >>> >>>> optimization. For
> >>> >>>>>>> that reason, we must be careful in implementation. There are a
> >>> >>>>>>> couple
> >>> >>>> of
> >>> >>>>>>> issues that we need to ensure works properly - specifically
> >>> >>> ordering.
> >>> >>>> To
> >>> >>>>>>> ensure we pull messages from different topics and partitions in
> >>> >>>>>>> the
> >>> >>>> same
> >>> >>>>>>> order after failure, we’d still have to persist the metadata to
> >>> >>>>>>> HDFS
> >>> >>>> (or
> >>> >>>>>>> some other system) - this metadata must contain the order of
> >>> >>>>>>> messages consumed, so we know how to re-read the messages. I am
> >>> >>>>>>> planning to
> >>> >>>> explore
> >>> >>>>>>> this once I have some time (probably in Jan). In addition, we
> >>> must
> >>> >>>>>>> also ensure bucketing functions work fine as well. I will file
> a
> >>> >>>>>>> placeholder jira for this one.
> >>> >>>>>>>
> >>> >>>>>>> I also wrote an API to write data back to Kafka a while back -
> >>> >>>>>>> https://github.com/apache/spark/pull/2994 . I am hoping that
> >>> this
> >>> >>>>>>> will get pulled in soon, as this is something I know people
> want.
> >>> >>>>>>> I am open
> >>> >>>> to
> >>> >>>>>>> feedback on that - anything that I can do to make it better.
> >>> >>>>>>>
> >>> >>>>>>> Thanks,
> >>> >>>>>>> Hari
> >>> >>>>>>>
> >>> >>>>>>>
> >>> >>>>>>> On Thu, Dec 18, 2014 at 11:14 AM, Patrick Wendell
> >>> >>>>>>> <pwend...@gmail.com>
> >>> >>>>>>> wrote:
> >>> >>>>>>>
> >>> >>>>>>>> Hey Cody,
> >>> >>>>>>>>
> >>> >>>>>>>> Thanks for reaching out with this. The lead on streaming is
> TD -
> >>> >>>>>>>> he is traveling this week though so I can respond a bit. To
> the
> >>> >>>>>>>> high level point of whether Kafka is important - it definitely
> >>> >>>>>>>> is. Something like 80% of Spark Streaming deployments
> >>> >>>>>>>> (anecdotally) ingest data from Kafka. Also, good support for
> >>> >>>>>>>> Kafka is something we generally want in Spark and not a
> library.
> >>> >>>>>>>> In some cases IIRC there were user libraries that used
> unstable
> >>> >>>>>>>> Kafka API's and we were somewhat waiting on Kafka to stabilize
> >>> >>>>>>>> them to merge things upstream. Otherwise users wouldn't be
> able
> >>> >>>>>>>> to use newer Kakfa versions. This is a high level impression
> >>> only
> >>> >>>>>>>> though, I haven't talked to TD about this recently so it's
> worth
> >>> >>> revisiting given the developments in Kafka.
> >>> >>>>>>>>
> >>> >>>>>>>> Please do bring things up like this on the dev list if there
> are
> >>> >>>>>>>> blockers for your usage - thanks for pinging it.
> >>> >>>>>>>>
> >>> >>>>>>>> - Patrick
> >>> >>>>>>>>
> >>> >>>>>>>> On Thu, Dec 18, 2014 at 7:07 AM, Cody Koeninger
> >>> >>>>>>>> <c...@koeninger.org>
> >>> >>>>>>>> wrote:
> >>> >>>>>>>>> Now that 1.2 is finalized... who are the go-to people to get
> >>> >>>>>>>>> some long-standing Kafka related issues resolved?
> >>> >>>>>>>>>
> >>> >>>>>>>>> The existing api is not sufficiently safe nor flexible for
> our
> >>> >>>>>>>> production
> >>> >>>>>>>>> use. I don't think we're alone in this viewpoint, because
> I've
> >>> >>>>>>>>> seen several different patches and libraries to fix the same
> >>> >>>>>>>>> things we've
> >>> >>>>>>>> been
> >>> >>>>>>>>> running into.
> >>> >>>>>>>>>
> >>> >>>>>>>>> Regarding flexibility
> >>> >>>>>>>>>
> >>> >>>>>>>>> https://issues.apache.org/jira/browse/SPARK-3146
> >>> >>>>>>>>>
> >>> >>>>>>>>> has been outstanding since August, and IMHO an equivalent of
> >>> >>>>>>>>> this is absolutely necessary. We wrote a similar patch
> >>> >>>>>>>>> ourselves, then found
> >>> >>>>>>>> that
> >>> >>>>>>>>> PR and have been running it in production. We wouldn't be
> able
> >>> >>>>>>>>> to
> >>> >>>> get
> >>> >>>>>>>> our
> >>> >>>>>>>>> jobs done without it. It also allows users to solve a whole
> >>> >>>>>>>>> class of problems for themselves (e.g. SPARK-2388, arbitrary
> >>> >>>>>>>>> delay of
> >>> >>>>>>>> messages, etc).
> >>> >>>>>>>>>
> >>> >>>>>>>>> Regarding safety, I understand the motivation behind
> >>> >>>>>>>>> WriteAheadLog
> >>> >>>> as
> >>> >>>>>>>> a
> >>> >>>>>>>>> general solution for streaming unreliable sources, but Kafka
> >>> >>>>>>>>> already
> >>> >>>>>>>> is a
> >>> >>>>>>>>> reliable source. I think there's a need for an api that
> treats
> >>> >>>>>>>>> it as such. Even aside from the performance issues of
> >>> >>>>>>>>> duplicating the write-ahead log in kafka into another
> >>> >>>>>>>>> write-ahead log in hdfs, I
> >>> >>>> need
> >>> >>>>>>>>> exactly-once semantics in the face of failure (I've had
> >>> >>>>>>>>> failures
> >>> >>>> that
> >>> >>>>>>>>> prevented reloading a spark streaming checkpoint, for
> >>> instance).
> >>> >>>>>>>>>
> >>> >>>>>>>>> I've got an implementation i've been using
> >>> >>>>>>>>>
> >>> >>>>>>>>>
> >>> https://github.com/koeninger/spark-1/tree/kafkaRdd/external/kaf
> >>> >>>>>>>>> ka /src/main/scala/org/apache/spark/rdd/kafka
> >>> >>>>>>>>>
> >>> >>>>>>>>> Tresata has something similar at
> >>> >>>>>>>> https://github.com/tresata/spark-kafka,
> >>> >>>>>>>>> and I know there were earlier attempts based on Storm code.
> >>> >>>>>>>>>
> >>> >>>>>>>>> Trying to distribute these kinds of fixes as libraries rather
> >>> >>>>>>>>> than
> >>> >>>>>>>> patches
> >>> >>>>>>>>> to Spark is problematic, because large portions of the
> >>> >>>> implementation
> >>> >>>>>>>> are
> >>> >>>>>>>>> private[spark].
> >>> >>>>>>>>>
> >>> >>>>>>>>> I'd like to help, but i need to know whose attention to get.
> >>> >>>>>>>>
> >>> >>>>>>>>
> >>> -----------------------------------------------------------------
> >>> >>>>>>>> ---- To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
> >>> For
> >>> >>>>>>>> additional commands, e-mail: dev-h...@spark.apache.org
> >>> >>>>>>>>
> >>> >>>>>>>>
> >>> >>>>>>>
> >>> >>>>>
> >>> >>>>
> >>> >>>
> >>> >>
> >>>
> >>>
> >>
> >
>

Reply via email to