The conversation was mostly getting TD up to speed on this thread since he had just gotten back from his trip and hadn't seen it.
The jira has a summary of the requirements we discussed, I'm sure TD or Patrick can add to the ticket if I missed something. On Dec 25, 2014 1:54 AM, "Hari Shreedharan" <hshreedha...@cloudera.com> wrote: > In general such discussions happen or is posted on the dev lists. Could > you please post a summary? Thanks. > > Thanks, > Hari > > > On Wed, Dec 24, 2014 at 11:46 PM, Cody Koeninger <c...@koeninger.org> > wrote: > >> After a long talk with Patrick and TD (thanks guys), I opened the >> following jira >> >> https://issues.apache.org/jira/browse/SPARK-4964 >> >> Sample PR has an impementation for the batch and the dstream case, and a >> link to a project with example usage. >> >> On Fri, Dec 19, 2014 at 4:36 PM, Koert Kuipers <ko...@tresata.com> wrote: >> >>> yup, we at tresata do the idempotent store the same way. very simple >>> approach. >>> >>> On Fri, Dec 19, 2014 at 5:32 PM, Cody Koeninger <c...@koeninger.org> >>> wrote: >>>> >>>> That KafkaRDD code is dead simple. >>>> >>>> Given a user specified map >>>> >>>> (topic1, partition0) -> (startingOffset, endingOffset) >>>> (topic1, partition1) -> (startingOffset, endingOffset) >>>> ... >>>> turn each one of those entries into a partition of an rdd, using the >>>> simple >>>> consumer. >>>> That's it. No recovery logic, no state, nothing - for any failures, >>>> bail >>>> on the rdd and let it retry. >>>> Spark stays out of the business of being a distributed database. >>>> >>>> The client code does any transformation it wants, then stores the data >>>> and >>>> offsets. There are two ways of doing this, either based on idempotence >>>> or >>>> a transactional data store. >>>> >>>> For idempotent stores: >>>> >>>> 1.manipulate data >>>> 2.save data to store >>>> 3.save ending offsets to the same store >>>> >>>> If you fail between 2 and 3, the offsets haven't been stored, you start >>>> again at the same beginning offsets, do the same calculations in the >>>> same >>>> order, overwrite the same data, all is good. >>>> >>>> >>>> For transactional stores: >>>> >>>> 1. manipulate data >>>> 2. begin transaction >>>> 3. save data to the store >>>> 4. save offsets >>>> 5. commit transaction >>>> >>>> If you fail before 5, the transaction rolls back. To make this less >>>> heavyweight, you can write the data outside the transaction and then >>>> update >>>> a pointer to the current data inside the transaction. >>>> >>>> >>>> Again, spark has nothing much to do with guaranteeing exactly once. In >>>> fact, the current streaming api actively impedes my ability to do the >>>> above. I'm just suggesting providing an api that doesn't get in the >>>> way of >>>> exactly-once. >>>> >>>> >>>> >>>> >>>> >>>> On Fri, Dec 19, 2014 at 3:57 PM, Hari Shreedharan < >>>> hshreedha...@cloudera.com >>>> > wrote: >>>> >>>> > Can you explain your basic algorithm for the once-only-delivery? It is >>>> > quite a bit of very Kafka-specific code, that would take more time to >>>> read >>>> > than I can currently afford? If you can explain your algorithm a bit, >>>> it >>>> > might help. >>>> > >>>> > Thanks, >>>> > Hari >>>> > >>>> > >>>> > On Fri, Dec 19, 2014 at 1:48 PM, Cody Koeninger <c...@koeninger.org> >>>> > wrote: >>>> > >>>> >> >>>> >> The problems you guys are discussing come from trying to store state >>>> in >>>> >> spark, so don't do that. Spark isn't a distributed database. >>>> >> >>>> >> Just map kafka partitions directly to rdds, llet user code specify >>>> the >>>> >> range of offsets explicitly, and let them be in charge of committing >>>> >> offsets. >>>> >> >>>> >> Using the simple consumer isn't that bad, I'm already using this in >>>> >> production with the code I linked to, and tresata apparently has >>>> been as >>>> >> well. Again, for everyone saying this is impossible, have you read >>>> either >>>> >> of those implementations and looked at the approach? >>>> >> >>>> >> >>>> >> >>>> >> On Fri, Dec 19, 2014 at 2:27 PM, Sean McNamara < >>>> >> sean.mcnam...@webtrends.com> wrote: >>>> >> >>>> >>> Please feel free to correct me if I’m wrong, but I think the exactly >>>> >>> once spark streaming semantics can easily be solved using >>>> updateStateByKey. >>>> >>> Make the key going into updateStateByKey be a hash of the event, or >>>> pluck >>>> >>> off some uuid from the message. The updateFunc would only emit the >>>> message >>>> >>> if the key did not exist, and the user has complete control over >>>> the window >>>> >>> of time / state lifecycle for detecting duplicates. It also makes >>>> it >>>> >>> really easy to detect and take action (alert?) when you DO see a >>>> duplicate, >>>> >>> or make memory tradeoffs within an error bound using a sketch >>>> algorithm. >>>> >>> The kafka simple consumer is insanely complex, if possible I think >>>> it would >>>> >>> be better (and vastly more flexible) to get reliability using the >>>> >>> primitives that spark so elegantly provides. >>>> >>> >>>> >>> Cheers, >>>> >>> >>>> >>> Sean >>>> >>> >>>> >>> >>>> >>> > On Dec 19, 2014, at 12:06 PM, Hari Shreedharan < >>>> >>> hshreedha...@cloudera.com> wrote: >>>> >>> > >>>> >>> > Hi Dibyendu, >>>> >>> > >>>> >>> > Thanks for the details on the implementation. But I still do not >>>> >>> believe >>>> >>> > that it is no duplicates - what they achieve is that the same >>>> batch is >>>> >>> > processed exactly the same way every time (but see it may be >>>> processed >>>> >>> more >>>> >>> > than once) - so it depends on the operation being idempotent. I >>>> believe >>>> >>> > Trident uses ZK to keep track of the transactions - a batch can be >>>> >>> > processed multiple times in failure scenarios (for example, the >>>> >>> transaction >>>> >>> > is processed but before ZK is updated the machine fails, causing a >>>> >>> "new" >>>> >>> > node to process it again). >>>> >>> > >>>> >>> > I don't think it is impossible to do this in Spark Streaming as >>>> well >>>> >>> and >>>> >>> > I'd be really interested in working on it at some point in the >>>> near >>>> >>> future. >>>> >>> > >>>> >>> > On Fri, Dec 19, 2014 at 1:44 AM, Dibyendu Bhattacharya < >>>> >>> > dibyendu.bhattach...@gmail.com> wrote: >>>> >>> > >>>> >>> >> Hi, >>>> >>> >> >>>> >>> >> Thanks to Jerry for mentioning the Kafka Spout for Trident. The >>>> Storm >>>> >>> >> Trident has done the exact-once guarantee by processing the >>>> tuple in a >>>> >>> >> batch and assigning same transaction-id for a given batch . The >>>> >>> replay for >>>> >>> >> a given batch with a transaction-id will have exact same set of >>>> >>> tuples and >>>> >>> >> replay of batches happen in exact same order before the failure. >>>> >>> >> >>>> >>> >> Having this paradigm, if downstream system process data for a >>>> given >>>> >>> batch >>>> >>> >> for having a given transaction-id , and if during failure if same >>>> >>> batch is >>>> >>> >> again emitted , you can check if same transaction-id is already >>>> >>> processed >>>> >>> >> or not and hence can guarantee exact once semantics. >>>> >>> >> >>>> >>> >> And this can only be achieved in Spark if we use Low Level Kafka >>>> >>> consumer >>>> >>> >> API to process the offsets. This low level Kafka Consumer ( >>>> >>> >> https://github.com/dibbhatt/kafka-spark-consumer) has >>>> implemented the >>>> >>> >> Spark Kafka consumer which uses Kafka Low Level APIs . All of the >>>> >>> Kafka >>>> >>> >> related logic has been taken from Storm-Kafka spout and which >>>> manages >>>> >>> all >>>> >>> >> Kafka re-balance and fault tolerant aspects and Kafka metadata >>>> >>> managements. >>>> >>> >> >>>> >>> >> Presently this Consumer maintains that during Receiver failure, >>>> it >>>> >>> will >>>> >>> >> re-emit the exact same Block with same set of messages . Every >>>> >>> message have >>>> >>> >> the details of its partition, offset and topic related details >>>> which >>>> >>> can >>>> >>> >> tackle the SPARK-3146. >>>> >>> >> >>>> >>> >> As this Low Level consumer has complete control over the Kafka >>>> >>> Offsets , >>>> >>> >> we can implement Trident like feature on top of it like having >>>> >>> implement a >>>> >>> >> transaction-id for a given block , and re-emit the same block >>>> with >>>> >>> same set >>>> >>> >> of message during Driver failure. >>>> >>> >> >>>> >>> >> Regards, >>>> >>> >> Dibyendu >>>> >>> >> >>>> >>> >> >>>> >>> >> On Fri, Dec 19, 2014 at 7:33 AM, Shao, Saisai < >>>> saisai.s...@intel.com> >>>> >>> >> wrote: >>>> >>> >>> >>>> >>> >>> Hi all, >>>> >>> >>> >>>> >>> >>> I agree with Hari that Strong exact-once semantics is very hard >>>> to >>>> >>> >>> guarantee, especially in the failure situation. From my >>>> >>> understanding even >>>> >>> >>> current implementation of ReliableKafkaReceiver cannot fully >>>> >>> guarantee the >>>> >>> >>> exact once semantics once failed, first is the ordering of data >>>> >>> replaying >>>> >>> >>> from last checkpoint, this is hard to guarantee when multiple >>>> >>> partitions >>>> >>> >>> are injected in; second is the design complexity of achieving >>>> this, >>>> >>> you can >>>> >>> >>> refer to the Kafka Spout in Trident, we have to dig into the >>>> very >>>> >>> details >>>> >>> >>> of Kafka metadata management system to achieve this, not to say >>>> >>> rebalance >>>> >>> >>> and fault-tolerance. >>>> >>> >>> >>>> >>> >>> Thanks >>>> >>> >>> Jerry >>>> >>> >>> >>>> >>> >>> -----Original Message----- >>>> >>> >>> From: Luis Ángel Vicente Sánchez [mailto: >>>> langel.gro...@gmail.com] >>>> >>> >>> Sent: Friday, December 19, 2014 5:57 AM >>>> >>> >>> To: Cody Koeninger >>>> >>> >>> Cc: Hari Shreedharan; Patrick Wendell; dev@spark.apache.org >>>> >>> >>> Subject: Re: Which committers care about Kafka? >>>> >>> >>> >>>> >>> >>> But idempotency is not that easy t achieve sometimes. A strong >>>> only >>>> >>> once >>>> >>> >>> semantic through a proper API would be superuseful; but I'm not >>>> >>> implying >>>> >>> >>> this is easy to achieve. >>>> >>> >>> On 18 Dec 2014 21:52, "Cody Koeninger" <c...@koeninger.org> >>>> wrote: >>>> >>> >>> >>>> >>> >>>> If the downstream store for the output data is idempotent or >>>> >>> >>>> transactional, and that downstream store also is the system of >>>> >>> record >>>> >>> >>>> for kafka offsets, then you have exactly-once semantics. >>>> Commit >>>> >>> >>>> offsets with / after the data is stored. On any failure, >>>> restart >>>> >>> from >>>> >>> >>> the last committed offsets. >>>> >>> >>>> >>>> >>> >>>> Yes, this approach is biased towards the etl-like use cases >>>> rather >>>> >>> >>>> than near-realtime-analytics use cases. >>>> >>> >>>> >>>> >>> >>>> On Thu, Dec 18, 2014 at 3:27 PM, Hari Shreedharan < >>>> >>> >>>> hshreedha...@cloudera.com >>>> >>> >>>>> wrote: >>>> >>> >>>>> >>>> >>> >>>>> I get what you are saying. But getting exactly once right is >>>> an >>>> >>> >>>>> extremely hard problem - especially in presence of failure. >>>> The >>>> >>> >>>>> issue is failures >>>> >>> >>>> can >>>> >>> >>>>> happen in a bunch of places. For example, before the >>>> notification >>>> >>> of >>>> >>> >>>>> downstream store being successful reaches the receiver that >>>> updates >>>> >>> >>>>> the offsets, the node fails. The store was successful, but >>>> >>> >>>>> duplicates came in either way. This is something worth >>>> discussing >>>> >>> by >>>> >>> >>>>> itself - but without uuids etc this might not really be >>>> solved even >>>> >>> >>> when you think it is. >>>> >>> >>>>> >>>> >>> >>>>> Anyway, I will look at the links. Even I am interested in all >>>> of >>>> >>> the >>>> >>> >>>>> features you mentioned - no HDFS WAL for Kafka and once-only >>>> >>> >>>>> delivery, >>>> >>> >>>> but >>>> >>> >>>>> I doubt the latter is really possible to guarantee - though I >>>> >>> really >>>> >>> >>>> would >>>> >>> >>>>> love to have that! >>>> >>> >>>>> >>>> >>> >>>>> Thanks, >>>> >>> >>>>> Hari >>>> >>> >>>>> >>>> >>> >>>>> >>>> >>> >>>>> On Thu, Dec 18, 2014 at 12:26 PM, Cody Koeninger >>>> >>> >>>>> <c...@koeninger.org> >>>> >>> >>>>> wrote: >>>> >>> >>>>> >>>> >>> >>>>>> Thanks for the replies. >>>> >>> >>>>>> >>>> >>> >>>>>> Regarding skipping WAL, it's not just about optimization. >>>> If you >>>> >>> >>>>>> actually want exactly-once semantics, you need control of >>>> kafka >>>> >>> >>>>>> offsets >>>> >>> >>>> as >>>> >>> >>>>>> well, including the ability to not use zookeeper as the >>>> system of >>>> >>> >>>>>> record for offsets. Kafka already is a reliable system that >>>> has >>>> >>> >>>>>> strong >>>> >>> >>>> ordering >>>> >>> >>>>>> guarantees (within a partition) and does not mandate the use >>>> of >>>> >>> >>>> zookeeper >>>> >>> >>>>>> to store offsets. I think there should be a spark api that >>>> acts >>>> >>> as >>>> >>> >>>>>> a >>>> >>> >>>> very >>>> >>> >>>>>> simple intermediary between Kafka and the user's choice of >>>> >>> >>>>>> downstream >>>> >>> >>>> store. >>>> >>> >>>>>> >>>> >>> >>>>>> Take a look at the links I posted - if there's already been 2 >>>> >>> >>>> independent >>>> >>> >>>>>> implementations of the idea, chances are it's something >>>> people >>>> >>> need. >>>> >>> >>>>>> >>>> >>> >>>>>> On Thu, Dec 18, 2014 at 1:44 PM, Hari Shreedharan < >>>> >>> >>>>>> hshreedha...@cloudera.com> wrote: >>>> >>> >>>>>>> >>>> >>> >>>>>>> Hi Cody, >>>> >>> >>>>>>> >>>> >>> >>>>>>> I am an absolute +1 on SPARK-3146. I think we can implement >>>> >>> >>>>>>> something pretty simple and lightweight for that one. >>>> >>> >>>>>>> >>>> >>> >>>>>>> For the Kafka DStream skipping the WAL implementation - >>>> this is >>>> >>> >>>>>>> something I discussed with TD a few weeks ago. Though it is >>>> a >>>> >>> good >>>> >>> >>>> idea to >>>> >>> >>>>>>> implement this to avoid unnecessary HDFS writes, it is an >>>> >>> >>>> optimization. For >>>> >>> >>>>>>> that reason, we must be careful in implementation. There >>>> are a >>>> >>> >>>>>>> couple >>>> >>> >>>> of >>>> >>> >>>>>>> issues that we need to ensure works properly - specifically >>>> >>> >>> ordering. >>>> >>> >>>> To >>>> >>> >>>>>>> ensure we pull messages from different topics and >>>> partitions in >>>> >>> >>>>>>> the >>>> >>> >>>> same >>>> >>> >>>>>>> order after failure, we’d still have to persist the >>>> metadata to >>>> >>> >>>>>>> HDFS >>>> >>> >>>> (or >>>> >>> >>>>>>> some other system) - this metadata must contain the order of >>>> >>> >>>>>>> messages consumed, so we know how to re-read the messages. >>>> I am >>>> >>> >>>>>>> planning to >>>> >>> >>>> explore >>>> >>> >>>>>>> this once I have some time (probably in Jan). In addition, >>>> we >>>> >>> must >>>> >>> >>>>>>> also ensure bucketing functions work fine as well. I will >>>> file a >>>> >>> >>>>>>> placeholder jira for this one. >>>> >>> >>>>>>> >>>> >>> >>>>>>> I also wrote an API to write data back to Kafka a while >>>> back - >>>> >>> >>>>>>> https://github.com/apache/spark/pull/2994 . I am hoping >>>> that >>>> >>> this >>>> >>> >>>>>>> will get pulled in soon, as this is something I know people >>>> want. >>>> >>> >>>>>>> I am open >>>> >>> >>>> to >>>> >>> >>>>>>> feedback on that - anything that I can do to make it better. >>>> >>> >>>>>>> >>>> >>> >>>>>>> Thanks, >>>> >>> >>>>>>> Hari >>>> >>> >>>>>>> >>>> >>> >>>>>>> >>>> >>> >>>>>>> On Thu, Dec 18, 2014 at 11:14 AM, Patrick Wendell >>>> >>> >>>>>>> <pwend...@gmail.com> >>>> >>> >>>>>>> wrote: >>>> >>> >>>>>>> >>>> >>> >>>>>>>> Hey Cody, >>>> >>> >>>>>>>> >>>> >>> >>>>>>>> Thanks for reaching out with this. The lead on streaming >>>> is TD - >>>> >>> >>>>>>>> he is traveling this week though so I can respond a bit. >>>> To the >>>> >>> >>>>>>>> high level point of whether Kafka is important - it >>>> definitely >>>> >>> >>>>>>>> is. Something like 80% of Spark Streaming deployments >>>> >>> >>>>>>>> (anecdotally) ingest data from Kafka. Also, good support >>>> for >>>> >>> >>>>>>>> Kafka is something we generally want in Spark and not a >>>> library. >>>> >>> >>>>>>>> In some cases IIRC there were user libraries that used >>>> unstable >>>> >>> >>>>>>>> Kafka API's and we were somewhat waiting on Kafka to >>>> stabilize >>>> >>> >>>>>>>> them to merge things upstream. Otherwise users wouldn't be >>>> able >>>> >>> >>>>>>>> to use newer Kakfa versions. This is a high level >>>> impression >>>> >>> only >>>> >>> >>>>>>>> though, I haven't talked to TD about this recently so it's >>>> worth >>>> >>> >>> revisiting given the developments in Kafka. >>>> >>> >>>>>>>> >>>> >>> >>>>>>>> Please do bring things up like this on the dev list if >>>> there are >>>> >>> >>>>>>>> blockers for your usage - thanks for pinging it. >>>> >>> >>>>>>>> >>>> >>> >>>>>>>> - Patrick >>>> >>> >>>>>>>> >>>> >>> >>>>>>>> On Thu, Dec 18, 2014 at 7:07 AM, Cody Koeninger >>>> >>> >>>>>>>> <c...@koeninger.org> >>>> >>> >>>>>>>> wrote: >>>> >>> >>>>>>>>> Now that 1.2 is finalized... who are the go-to people to >>>> get >>>> >>> >>>>>>>>> some long-standing Kafka related issues resolved? >>>> >>> >>>>>>>>> >>>> >>> >>>>>>>>> The existing api is not sufficiently safe nor flexible >>>> for our >>>> >>> >>>>>>>> production >>>> >>> >>>>>>>>> use. I don't think we're alone in this viewpoint, because >>>> I've >>>> >>> >>>>>>>>> seen several different patches and libraries to fix the >>>> same >>>> >>> >>>>>>>>> things we've >>>> >>> >>>>>>>> been >>>> >>> >>>>>>>>> running into. >>>> >>> >>>>>>>>> >>>> >>> >>>>>>>>> Regarding flexibility >>>> >>> >>>>>>>>> >>>> >>> >>>>>>>>> https://issues.apache.org/jira/browse/SPARK-3146 >>>> >>> >>>>>>>>> >>>> >>> >>>>>>>>> has been outstanding since August, and IMHO an equivalent >>>> of >>>> >>> >>>>>>>>> this is absolutely necessary. We wrote a similar patch >>>> >>> >>>>>>>>> ourselves, then found >>>> >>> >>>>>>>> that >>>> >>> >>>>>>>>> PR and have been running it in production. We wouldn't be >>>> able >>>> >>> >>>>>>>>> to >>>> >>> >>>> get >>>> >>> >>>>>>>> our >>>> >>> >>>>>>>>> jobs done without it. It also allows users to solve a >>>> whole >>>> >>> >>>>>>>>> class of problems for themselves (e.g. SPARK-2388, >>>> arbitrary >>>> >>> >>>>>>>>> delay of >>>> >>> >>>>>>>> messages, etc). >>>> >>> >>>>>>>>> >>>> >>> >>>>>>>>> Regarding safety, I understand the motivation behind >>>> >>> >>>>>>>>> WriteAheadLog >>>> >>> >>>> as >>>> >>> >>>>>>>> a >>>> >>> >>>>>>>>> general solution for streaming unreliable sources, but >>>> Kafka >>>> >>> >>>>>>>>> already >>>> >>> >>>>>>>> is a >>>> >>> >>>>>>>>> reliable source. I think there's a need for an api that >>>> treats >>>> >>> >>>>>>>>> it as such. Even aside from the performance issues of >>>> >>> >>>>>>>>> duplicating the write-ahead log in kafka into another >>>> >>> >>>>>>>>> write-ahead log in hdfs, I >>>> >>> >>>> need >>>> >>> >>>>>>>>> exactly-once semantics in the face of failure (I've had >>>> >>> >>>>>>>>> failures >>>> >>> >>>> that >>>> >>> >>>>>>>>> prevented reloading a spark streaming checkpoint, for >>>> >>> instance). >>>> >>> >>>>>>>>> >>>> >>> >>>>>>>>> I've got an implementation i've been using >>>> >>> >>>>>>>>> >>>> >>> >>>>>>>>> >>>> >>> https://github.com/koeninger/spark-1/tree/kafkaRdd/external/kaf >>>> >>> >>>>>>>>> ka /src/main/scala/org/apache/spark/rdd/kafka >>>> >>> >>>>>>>>> >>>> >>> >>>>>>>>> Tresata has something similar at >>>> >>> >>>>>>>> https://github.com/tresata/spark-kafka, >>>> >>> >>>>>>>>> and I know there were earlier attempts based on Storm >>>> code. >>>> >>> >>>>>>>>> >>>> >>> >>>>>>>>> Trying to distribute these kinds of fixes as libraries >>>> rather >>>> >>> >>>>>>>>> than >>>> >>> >>>>>>>> patches >>>> >>> >>>>>>>>> to Spark is problematic, because large portions of the >>>> >>> >>>> implementation >>>> >>> >>>>>>>> are >>>> >>> >>>>>>>>> private[spark]. >>>> >>> >>>>>>>>> >>>> >>> >>>>>>>>> I'd like to help, but i need to know whose attention to >>>> get. >>>> >>> >>>>>>>> >>>> >>> >>>>>>>> >>>> >>> ----------------------------------------------------------------- >>>> >>> >>>>>>>> ---- To unsubscribe, e-mail: >>>> dev-unsubscr...@spark.apache.org >>>> >>> For >>>> >>> >>>>>>>> additional commands, e-mail: dev-h...@spark.apache.org >>>> >>> >>>>>>>> >>>> >>> >>>>>>>> >>>> >>> >>>>>>> >>>> >>> >>>>> >>>> >>> >>>> >>>> >>> >>> >>>> >>> >> >>>> >>> >>>> >>> >>>> >> >>>> > >>>> >>> >> >