On Sun, Apr 13, 2014 at 1:26 PM, Dibyendu Bhattacharya < [email protected]> wrote:
> Hi Aaron, > > Recently I have written a Kafka Blur Consumer for indexing real time > streams into Blur cluster. Just pushed it to git > ( https://github.com/dibbhatt/kafka-blur-consumer) > > This utility will help to pull messages from Kafka Cluster and Index into > target Apache Blur Cluster. Kafka-Blur consumer will detect the number of > Partitions for a Kafka Topic and spawn that many threads to index kafka > partitions in parallel into Target Blur Table. Kafka-Blur Consumer uses > Blur Thrift Client's enqueueMutate . Kafka-Blur Consumer uses Zookeeper for > storing the latest offset of the indexed messages, which will help to > recover in case of failure . Let me know your view. Let me know if this is > possible to push into Blur Contrib ? > The kafka-blur-consumer looks awesome! Thank you for sharing! I think we would love to have it as a contrib in Blur. We should open a separate thread on the mail list to discuss. > > Is there any progress on the persistent queue (HDFS backed) which you have > started working sometime back ? > I have not started on the implementation though I doubt it will be very complicated. I have been letting the current version of Blur run for a few weeks to make sure we have good build before releasing. Also we have been waiting for another committer to get his keys signed so they he may perform the release. Thanks again! Aaron > > Regards, > Dibyendu > > > > On Fri, Mar 14, 2014 at 7:15 AM, Aaron McCurry <[email protected]> wrote: > >> >> >> >> On Thu, Mar 13, 2014 at 6:54 AM, Dibyendu Bhattacharya < >> [email protected]> wrote: >> >>> Hi Aaron, >>> >>> Sorry for late reply. Yes, the three node AWS I used for Blur also >>> contains HDFS. I have not created dedicated Blur cluster for the test. >>> >>> I have used 3 node Blur cluster , where 1 node is Controller ( Shared by >>> Hadoop NameNode and JT) and 2 Shard Server ( shared by DataNodes and TT). >>> And my Blur table has 3 shards. >>> >>> I did a small modification in my Client code . My Kafka topic has 3 >>> partition, in earlier test I was creating single threaded client to read >>> form all kafka partition and index into Blur. I have modified the logic and >>> now created three threads for 3 Kafka partition and indexing using >>> enqueueMutate >>> in parallel for all three Kafka partitions . With multi threaded >>> client, I am now able to achieve around 1200 records/sec throughput which >>> is much better than earlier result. Obviously this number will be better if >>> dedicated blur cluster and high compute aws nodes being used. >>> >> >> That's good, also there is an AsyncClient pool in Blur that could provide >> some more throughput if needed. >> >> >>> >>> For future work on this , do you have any plan to change the in-memory >>> queue to HDFS backed queue ? >>> >> >> Yes. I have a very basic start to one in a library on github called >> mele. github.com/amccurry/mele >> >> I am currently working on getting from prototype to prod quality. >> >> >>> Also as you mentioned the shard queues are an in-memory only data >>> structure. So data can be lost at this point if a shard fails and because >>> they have a finite length they can block under heavy load. Do think this >>> might be taken care in 0.2.2 ? >>> >> >> I am planning on releasing 0.2.2 this week, so it will go as is. However >> if you need the persistent queue (for no data loss) I would be happy to >> work on that for a 0.2.3 release. I'm hoping that now that I have Blur to >> a stable state that releases will flow about once a month with a single >> focus for each release (and bug fixes as they show up). >> >> Hope this helps. >> >> Aaron >> >> >>> >>> Regards, >>> Dibyendu >>> >>> >>> >>> >>> On Wed, Mar 12, 2014 at 7:49 AM, Aaron McCurry <[email protected]>wrote: >>> >>>> >>>> >>>> >>>> On Tue, Mar 11, 2014 at 4:08 AM, Dibyendu Bhattacharya < >>>> [email protected]> wrote: >>>> >>>>> Hi Aaron, >>>>> >>>>> I am finally able to test the new queue capabilities . The results are >>>>> promising compared to traditional thrift client. Here is the details.. >>>>> >>>> >>>> That's great! >>>> >>>> >>>>> >>>>> I have done the test on AWS EMR cluster, 3 node m1.large nodes. For >>>>> all the test, environment and memory set to default. >>>>> >>>>> *Test 1* : Tested without the Queue feature ( doing client.mutate) >>>>> for 10,000 records took almost 300 Seconds with a rate of *33 >>>>> records/sec*. >>>>> >>>>> *Test 2* : Tested with queue feature ( client.enqueueMutate ) for >>>>> same 10,000 records in same cluster, and now it took just 23 Seconds ! >>>>> with >>>>> a rate of >>>>> *435 records/sec . This is jump of 13 times* >>>>> >>>>> *Test 3*: Wanted to index around 90,000 documents in same cluster, >>>>> but this time using the client.enqueueMutate I got error in the log and >>>>> all >>>>> shards server Hang . Below was the logs shows the error after that >>>>> everything was hang and shards become unresponsive. >>>>> >>>>> org.apache.blur.thrift.BadConnectionException: Could not connect to >>>>> controller/shard server. All connections are bad. >>>>> at >>>>> org.apache.blur.thrift.BlurClientManager.execute(BlurClientManager.java:235) >>>>> at >>>>> org.apache.blur.thrift.BlurClient$BlurClientInvocationHandler.invoke(BlurClient.java:56) >>>>> at com.sun.proxy.$Proxy0.enqueueMutate(Unknown Source) >>>>> >>>>> >>>>> In the log I found this code giving the error.. >>>>> >>>>> >>>>> stackTraceStr:null, errorType:UNKNOWN) >>>>> at >>>>> org.apache.blur.manager.writer.MutatableAction.merge(MutatableAction.java:460) >>>>> at >>>>> org.apache.blur.manager.writer.MutatableAction.reduceMutates(MutatableAction.java:439) >>>>> at >>>>> org.apache.blur.manager.writer.BaseQueueReader$1.run(BaseQueueReader.java:65) >>>>> at java.lang.Thread.run(Thread.java:724) >>>>> >>>>> >>>>> Just to test if MutatableAction.reduceMutates is the culprit, I >>>>> modified the BaseQueueReader.java and only perform doMutate(mutations); >>>>> >>>>> I commented out the mutations = >>>>> MutatableAction.reduceMutates(mutations); >>>>> >>>> >>>> Hmm, ok. I suppose there is a bug in there. The queue reader code is >>>> in need of some better error handling. I am trying to clean it up a bit >>>> now. >>>> >>>> >>>>> >>>>> With this changes, when I run the test again for 90,000 documents, >>>>> this time all documents got indexed properly, and it took around 157 >>>>> seconds with indexing rate of *575 records/seconds with 17 times >>>>> jump. * >>>>> >>>>> Just to give an idea of index size, 90K documents shows table size of >>>>> 270MB in Blur. >>>>> >>>> >>>> Just out of curiosity, if you are using 3 nodes in AWS I assume that >>>> you are running HDFS on those 3 nodes? Also how many shards are you >>>> testing with? NRT updates require more CPU than MapReduce indexes so the >>>> shard count will play a part in the performance. >>>> >>>> Also there is room for improvement with a smarter client. Currently >>>> all the data is being routed through the controller. Which means the data >>>> is serialized and deserialized in the controller before it ever reaches the >>>> shard servers. >>>> >>>> Thank you very much for the feedback! I am working to improve the >>>> queue code a bit before the release. I believe at this point it's the >>>> final thing to work on. >>>> >>>> Aaron >>>> >>>> >>>>> >>>>> Regards, >>>>> Dibyendu >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> On Fri, Mar 7, 2014 at 9:51 PM, Dibyendu Bhattacharya < >>>>> [email protected]> wrote: >>>>> >>>>>> Thanks Aaron for detail explanation. Just now browse through the >>>>>> changes. I think we do not need the TableQueueReader now. I will try out >>>>>> the thrift enqueMethod to see how it performs. We will be using Kafka >>>>>> client to populate the queue. Will let you know how that goes. >>>>>> >>>>>> Regards, >>>>>> Dibyendu >>>>>> >>>>>> >>>>>> On Fri, Mar 7, 2014 at 9:12 PM, Aaron McCurry <[email protected]>wrote: >>>>>> >>>>>>> I have pushed most of the feature needed for the queue and here's a >>>>>>> run down on how it works. I have left the original QueueReader in >>>>>>> place at >>>>>>> the shards but renamed it to ShardQueueReader which requires the data >>>>>>> to be >>>>>>> partitioned correctly. It also makes use of an in-memory blocking queue >>>>>>> that I will be changing to an HDFS backed queue so the memory resources >>>>>>> won't be effected under heavy write load. Then I have created another >>>>>>> set >>>>>>> of thrift calls enqueueMutate, and enqueueMatchBatch that feeds the >>>>>>> internal queue. Both of these methods are implemented on the controller >>>>>>> and the shard server. There will be a TableQueueReader that can run >>>>>>> inside >>>>>>> the controller to read from a queue and dealing with the partitioning >>>>>>> inside the controller. The class is written and committed but the >>>>>>> logic to >>>>>>> instantiate and run it has not been written. >>>>>>> >>>>>>> However using the controller api (standard Thrift Client) to write >>>>>>> RowMutations via the enqueueMethod from Storm could be an option right >>>>>>> now >>>>>>> without needing to implement anything that runs inside of Blur. The >>>>>>> only >>>>>>> issue now is the blocking natural of the in-memory queue. I will be >>>>>>> working to finish this feature before the release, but I believe that >>>>>>> it is >>>>>>> mostly in a state to evaluate. The only issue that I can see is that >>>>>>> writing data in via the enqueueMutate method could have some performance >>>>>>> slow downs once it hits the max queue length and once the HDFS back >>>>>>> version >>>>>>> is in place that slow down will be less apparent. >>>>>>> >>>>>>> So here's a run down on where the feature lacks: >>>>>>> >>>>>>> 1. The shard queues are an in-memory only data structure. So data >>>>>>> can be lost at this point if a shard fails and because they have a >>>>>>> finite >>>>>>> length they can block under heavy load. This one is the I see as a must >>>>>>> before the release. >>>>>>> 2. A way to run the table queue reader in the controllers, but with >>>>>>> the rework of the API I'm not sure you all would really need this >>>>>>> anymore. >>>>>>> >>>>>>> Let me know if you all need any help getting started with this >>>>>>> updated code. >>>>>>> >>>>>>> Thanks! >>>>>>> >>>>>>> Aaron >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> On Fri, Mar 7, 2014 at 10:00 AM, Dibyendu Bhattacharya < >>>>>>> [email protected]> wrote: >>>>>>> >>>>>>>> Hi Aaron, >>>>>>>> >>>>>>>> Do you still plan to have This Real Time Queue based indexing >>>>>>>> feature for Blur 0.2.2 ? I know you are very busy on 2.2. release, just >>>>>>>> wanted to know if this will be coming soon. >>>>>>>> >>>>>>>> Regards, >>>>>>>> Dibyendu >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> On Tue, Mar 4, 2014 at 8:50 AM, Jonathan Hodges >>>>>>>> <[email protected]>wrote: >>>>>>>> >>>>>>>>> Nothing to add, I agree the Kafka partitions don't need to match >>>>>>>>> the Blur partitions. >>>>>>>>> >>>>>>>>> >>>>>>>>> On Mon, Mar 3, 2014 at 7:17 PM, Dibyendu Bhattacharya < >>>>>>>>> [email protected]> wrote: >>>>>>>>> >>>>>>>>>> Hi Aaron, >>>>>>>>>> >>>>>>>>>> No, I do not see we need to match Kafka Partitions with Blur >>>>>>>>>> Partitions. In fact, the number of partitions in Kafka and Number of >>>>>>>>>> shards >>>>>>>>>> in Blur may not match also. >>>>>>>>>> >>>>>>>>>> Jonathan, do you have anything to add here. >>>>>>>>>> >>>>>>>>>> Regards, >>>>>>>>>> Dibyendu >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> On Mon, Mar 3, 2014 at 11:35 PM, Aaron McCurry < >>>>>>>>>> [email protected]> wrote: >>>>>>>>>> >>>>>>>>>>> Ok, I do have a question. Do you see a use for the current use >>>>>>>>>>> case where Kafka partitions match Blur partitions and the clients >>>>>>>>>>> pushing >>>>>>>>>>> messages into Kafka partition the data into the Kakfa partitions to >>>>>>>>>>> match >>>>>>>>>>> Blur partitions? The reason I ask is i want to know if I should >>>>>>>>>>> keep the >>>>>>>>>>> current low level API pluggable or not. >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> On Mon, Mar 3, 2014 at 10:29 AM, Dibyendu Bhattacharya < >>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>> >>>>>>>>>>>> Thanks Aaron, yes I figured it out how Blur uses this API, you >>>>>>>>>>>> do not need to take a look at this. >>>>>>>>>>>> >>>>>>>>>>>> Once you are done with the new design of the queue feature, do >>>>>>>>>>>> let me know, I will try to integrate Kafka into it and test it . >>>>>>>>>>>> >>>>>>>>>>>> Dibyendu >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> On Mon, Mar 3, 2014 at 8:07 PM, Aaron McCurry < >>>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>>> >>>>>>>>>>>>> Based on your post on the mail list I assume that you got what >>>>>>>>>>>>> you needed working, or at least figured out how Blur was using >>>>>>>>>>>>> the API. >>>>>>>>>>>>> Let me know if you need me to take a look at this or not. Also >>>>>>>>>>>>> I'm >>>>>>>>>>>>> planning on spending some time this afternoon working through >>>>>>>>>>>>> making this >>>>>>>>>>>>> feature easier to use. >>>>>>>>>>>>> >>>>>>>>>>>>> Aaron >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> On Fri, Feb 28, 2014 at 10:03 AM, Aaron McCurry < >>>>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>>>> >>>>>>>>>>>>>> Hey Dibyendu, >>>>>>>>>>>>>> >>>>>>>>>>>>>> It will take me a little while to digest this. I will try to >>>>>>>>>>>>>> get back to you later this afternoon. Thanks! >>>>>>>>>>>>>> >>>>>>>>>>>>>> Aaron >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> On Fri, Feb 28, 2014 at 8:29 AM, Dibyendu Bhattacharya < >>>>>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>>>>> >>>>>>>>>>>>>>> Just forwarding you to guide me if the approach is correct >>>>>>>>>>>>>>> for Kafka Consumer with multiple kafka partitions indexed into >>>>>>>>>>>>>>> multiple >>>>>>>>>>>>>>> blur table shards. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Sorry that code is not that clean. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Dibyendu >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> ---------- Forwarded message ---------- >>>>>>>>>>>>>>> From: Dibyendu Bhattacharya <[email protected]> >>>>>>>>>>>>>>> Date: Fri, Feb 28, 2014 at 5:32 PM >>>>>>>>>>>>>>> Subject: Re: new queue capability >>>>>>>>>>>>>>> To: [email protected], Jonathan Hodges < >>>>>>>>>>>>>>> [email protected]> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Hi, >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> I was just playing with the new QueueReader API, and as Tim >>>>>>>>>>>>>>> pointed out , its very low level . I still tried to implement a >>>>>>>>>>>>>>> KafkaConsumer . >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Here is my use case. Let me know if I have approached >>>>>>>>>>>>>>> correctly. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> I have a given topic in Kafka, which has 3 Partitions. And >>>>>>>>>>>>>>> in Blur I have a table with 2 Shards . I need to index all >>>>>>>>>>>>>>> messages from >>>>>>>>>>>>>>> Kafka Topic to Blur Table. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> I have used Kafka ConsumerGroupAPI to consume in parallel >>>>>>>>>>>>>>> in 2 streams ( from 3 partitions) for indexing into 2 Blur >>>>>>>>>>>>>>> shards. As >>>>>>>>>>>>>>> ConsumerGroup API allow me to split any Kafka Topic into N >>>>>>>>>>>>>>> number of >>>>>>>>>>>>>>> streams, I can choose N for my target shard count, here it is 2. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> For both shards I created two ShardContext and >>>>>>>>>>>>>>> two BlurIndexSimpleWriter. ( Is this okay ?) >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Now, I modified the BlurIndexSimpleWriter to get handle to >>>>>>>>>>>>>>> the _queueReader object. I used this _queueReader to populate >>>>>>>>>>>>>>> the >>>>>>>>>>>>>>> respective shards queue taking messages from KafkaStreams. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Here is the TestCase (KafkaReaderTest) , KafkaStreamReader ( >>>>>>>>>>>>>>> which reads the Kafka Stream) , and the KafkaQueueReader ( The >>>>>>>>>>>>>>> Q interface >>>>>>>>>>>>>>> for Blur) >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Also attached the modified BlurIndexSimpleWriter. Just added >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> public QueueReader getQueueReader(){ >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> return _queueReader; >>>>>>>>>>>>>>> } >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> With these changes, I am able to read Kafka messages in >>>>>>>>>>>>>>> parallel streams and index them into 2 shards. All documents >>>>>>>>>>>>>>> from Kafka >>>>>>>>>>>>>>> getting indexed properly. But after TestCases run , I can see >>>>>>>>>>>>>>> two Index >>>>>>>>>>>>>>> Directory for two path I created. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Let me know if this approach is correct ? In this code, I >>>>>>>>>>>>>>> have not taken care of Shard Failure logic and as Tim pointed >>>>>>>>>>>>>>> out, if that >>>>>>>>>>>>>>> can be abstracted form client that will be great. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Regards, >>>>>>>>>>>>>>> Dibyendu >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> On Thu, Feb 27, 2014 at 9:40 PM, Aaron McCurry < >>>>>>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> What if we provide an implementation of the QueueReader >>>>>>>>>>>>>>>> concept that does >>>>>>>>>>>>>>>> what you are discussing. That way in more extreme cases >>>>>>>>>>>>>>>> when the user is >>>>>>>>>>>>>>>> forced into implementing the lower level api (perhaps for >>>>>>>>>>>>>>>> performance) they >>>>>>>>>>>>>>>> can still do it, but for the normal case the partitioning >>>>>>>>>>>>>>>> (and other >>>>>>>>>>>>>>>> difficult issues) are handled by the controllers. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> I could see adding an enqueueMutate call to the controllers >>>>>>>>>>>>>>>> that pushes the >>>>>>>>>>>>>>>> mutates to the correct buckets for the user. At the same >>>>>>>>>>>>>>>> time we could >>>>>>>>>>>>>>>> allow each of the controllers to pull from an external and >>>>>>>>>>>>>>>> push the mutates >>>>>>>>>>>>>>>> to the correct buckets for the shards. I could see a >>>>>>>>>>>>>>>> couple of different >>>>>>>>>>>>>>>> ways of handling this. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> However I do agree that right now there is too much burden >>>>>>>>>>>>>>>> on the user for >>>>>>>>>>>>>>>> the 95% case. We should make this simpler. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Aaron >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> On Thu, Feb 27, 2014 at 10:07 AM, Tim Williams < >>>>>>>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> > I've been playing around with the new QueueReader stuff >>>>>>>>>>>>>>>> and I'm >>>>>>>>>>>>>>>> > starting to believe it's at the wrong level of >>>>>>>>>>>>>>>> abstraction - in the >>>>>>>>>>>>>>>> > shard context - for a user. >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> > Between having to know about the BlurPartioner and >>>>>>>>>>>>>>>> handling all the >>>>>>>>>>>>>>>> > failure nuances, I'm thinking a much friendlier approach >>>>>>>>>>>>>>>> would be to >>>>>>>>>>>>>>>> > have the client implement a single message pump that Blur >>>>>>>>>>>>>>>> take's from >>>>>>>>>>>>>>>> > and handles. >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> > Maybe on startup the Controllers compete for the lead >>>>>>>>>>>>>>>> QueueReader >>>>>>>>>>>>>>>> > position, create it from the TableContext and run with >>>>>>>>>>>>>>>> it? The user >>>>>>>>>>>>>>>> > would still need to deal with Controller failures but >>>>>>>>>>>>>>>> that seems >>>>>>>>>>>>>>>> > easier to reason about then shard failures. >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> > The way it's crafted right now, the user seems burdened >>>>>>>>>>>>>>>> with a lot of >>>>>>>>>>>>>>>> > the hard problems that Blur otherwise solves. Obviously, >>>>>>>>>>>>>>>> it trades >>>>>>>>>>>>>>>> > off a high burden for one of the controllers. >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> > Thoughts? >>>>>>>>>>>>>>>> > --tim >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>> >>>>>> >>>>> >>>> >>> >> >
