Hi Aaron,

Recently I have written a Kafka Blur Consumer for indexing real time
streams into Blur cluster. Just pushed it to git
( https://github.com/dibbhatt/kafka-blur-consumer)

This utility will help to pull messages from Kafka Cluster and Index into
target Apache Blur Cluster. Kafka-Blur consumer will detect the number of
Partitions for a Kafka Topic and spawn that many threads to index kafka
partitions in parallel into Target Blur Table. Kafka-Blur Consumer uses
Blur Thrift Client's enqueueMutate . Kafka-Blur Consumer uses Zookeeper for
storing the latest offset of the indexed messages, which will help to
recover in case of failure . Let me know your view.  Let me know if this is
possible to push into Blur Contrib ?

Is there any progress on the persistent queue (HDFS backed) which you have
started working sometime back ?

Regards,
Dibyendu



On Fri, Mar 14, 2014 at 7:15 AM, Aaron McCurry <[email protected]> wrote:

>
>
>
> On Thu, Mar 13, 2014 at 6:54 AM, Dibyendu Bhattacharya <
> [email protected]> wrote:
>
>> Hi Aaron,
>>
>> Sorry for late reply. Yes, the three node AWS I used for Blur also
>> contains HDFS. I have not created dedicated Blur cluster for the test.
>>
>> I have used 3 node Blur cluster , where 1 node is Controller ( Shared by
>> Hadoop NameNode and JT)  and 2 Shard Server ( shared by DataNodes and TT).
>> And my Blur table has 3 shards.
>>
>> I did a small modification in my Client code . My Kafka topic has 3
>> partition, in earlier test I was creating single threaded client to read
>> form all kafka partition and index into Blur. I have modified the logic and
>> now created three threads for 3 Kafka partition and indexing using 
>> enqueueMutate
>> in parallel for all three Kafka partitions . With multi threaded client,
>> I am now able to achieve around 1200 records/sec throughput which is much
>> better than earlier result. Obviously this number will be better if
>> dedicated blur cluster and high compute aws nodes being used.
>>
>
> That's good, also there is an AsyncClient pool in Blur that could provide
> some more throughput if needed.
>
>
>>
>> For future work on this , do you have any plan to change the in-memory
>> queue to HDFS backed queue ?
>>
>
> Yes.  I have a very basic start to one in a library on github called mele.
>  github.com/amccurry/mele
>
> I am currently working on getting from prototype to prod quality.
>
>
>> Also as you mentioned the shard queues are an in-memory only data
>> structure.  So data can be lost at this point if a shard fails and because
>> they have a finite length they can block under heavy load.  Do think this
>> might be taken care in 0.2.2 ?
>>
>
> I am planning on releasing 0.2.2 this week, so it will go as is.  However
> if you need the persistent queue (for no data loss) I would be happy to
> work on that for a 0.2.3 release.  I'm hoping that now that I have Blur to
> a stable state that releases will flow about once a month with a single
> focus for each release (and bug fixes as they show up).
>
> Hope this helps.
>
> Aaron
>
>
>>
>> Regards,
>> Dibyendu
>>
>>
>>
>>
>> On Wed, Mar 12, 2014 at 7:49 AM, Aaron McCurry <[email protected]>wrote:
>>
>>>
>>>
>>>
>>> On Tue, Mar 11, 2014 at 4:08 AM, Dibyendu Bhattacharya <
>>> [email protected]> wrote:
>>>
>>>> Hi Aaron,
>>>>
>>>> I am finally able to test the new queue capabilities . The results are
>>>> promising compared to traditional thrift client. Here is the details..
>>>>
>>>
>>> That's great!
>>>
>>>
>>>>
>>>> I have done the test on AWS EMR cluster, 3 node m1.large nodes. For all
>>>> the test, environment and memory set to default.
>>>>
>>>> *Test 1* : Tested without the Queue feature ( doing client.mutate) for
>>>> 10,000 records took almost 300 Seconds with a rate of *33 records/sec*.
>>>>
>>>> *Test 2* : Tested with queue feature ( client.enqueueMutate ) for same
>>>> 10,000 records in same cluster, and now it took just 23 Seconds ! with a
>>>> rate of
>>>>  *435 records/sec . This is jump of 13 times*
>>>>
>>>> *Test 3*: Wanted to index around 90,000 documents in same cluster, but
>>>> this time using the client.enqueueMutate I got error in the log and all
>>>> shards server Hang . Below was the logs shows the error after that
>>>> everything was hang and shards become unresponsive.
>>>>
>>>> org.apache.blur.thrift.BadConnectionException: Could not connect to
>>>> controller/shard server. All connections are bad.
>>>>         at
>>>> org.apache.blur.thrift.BlurClientManager.execute(BlurClientManager.java:235)
>>>>         at
>>>> org.apache.blur.thrift.BlurClient$BlurClientInvocationHandler.invoke(BlurClient.java:56)
>>>>         at com.sun.proxy.$Proxy0.enqueueMutate(Unknown Source)
>>>>
>>>>
>>>> In the log I found this code giving the error..
>>>>
>>>>
>>>> stackTraceStr:null, errorType:UNKNOWN)
>>>>         at
>>>> org.apache.blur.manager.writer.MutatableAction.merge(MutatableAction.java:460)
>>>>         at
>>>> org.apache.blur.manager.writer.MutatableAction.reduceMutates(MutatableAction.java:439)
>>>>         at
>>>> org.apache.blur.manager.writer.BaseQueueReader$1.run(BaseQueueReader.java:65)
>>>>         at java.lang.Thread.run(Thread.java:724)
>>>>
>>>>
>>>> Just to test if MutatableAction.reduceMutates is the culprit, I
>>>> modified the BaseQueueReader.java and only perform doMutate(mutations);
>>>>
>>>> I commented out the  mutations =
>>>> MutatableAction.reduceMutates(mutations);
>>>>
>>>
>>> Hmm, ok.  I suppose there is a bug in there.  The queue reader code is
>>> in need of some better error handling.  I am trying to clean it up a bit
>>> now.
>>>
>>>
>>>>
>>>> With this changes, when I run the test again for 90,000 documents, this
>>>> time all documents got indexed properly, and it took around 157 seconds
>>>> with indexing rate of *575 records/seconds with 17 times jump. *
>>>>
>>>> Just to give an idea of index size, 90K documents shows table size of
>>>> 270MB in Blur.
>>>>
>>>
>>> Just out of curiosity, if you are using 3 nodes in AWS I assume that you
>>> are running HDFS on those 3 nodes?  Also how many shards are you testing
>>> with?  NRT updates require more CPU than MapReduce indexes so the shard
>>> count will play a part in the performance.
>>>
>>> Also there is room for improvement with a smarter client.  Currently all
>>> the data is being routed through the controller.  Which means the data is
>>> serialized and deserialized in the controller before it ever reaches the
>>> shard servers.
>>>
>>> Thank you very much for the feedback!  I am working to improve the queue
>>> code a bit before the release.  I believe at this point it's the final
>>> thing to work on.
>>>
>>> Aaron
>>>
>>>
>>>>
>>>> Regards,
>>>> Dibyendu
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>> On Fri, Mar 7, 2014 at 9:51 PM, Dibyendu Bhattacharya <
>>>> [email protected]> wrote:
>>>>
>>>>> Thanks Aaron for detail explanation. Just now browse through the
>>>>> changes. I think we do not need the TableQueueReader now. I will try out
>>>>> the thrift enqueMethod to see how it performs. We will be using Kafka
>>>>> client to populate the queue. Will let you know how that goes.
>>>>>
>>>>> Regards,
>>>>> Dibyendu
>>>>>
>>>>>
>>>>> On Fri, Mar 7, 2014 at 9:12 PM, Aaron McCurry <[email protected]>wrote:
>>>>>
>>>>>> I have pushed most of the feature needed for the queue and here's a
>>>>>> run down on how it works.  I have left the original QueueReader in place 
>>>>>> at
>>>>>> the shards but renamed it to ShardQueueReader which requires the data to 
>>>>>> be
>>>>>> partitioned correctly.  It also makes use of an in-memory blocking queue
>>>>>> that I will be changing to an HDFS backed queue so the memory resources
>>>>>> won't be effected under heavy write load.  Then I have created another 
>>>>>> set
>>>>>> of thrift calls enqueueMutate, and enqueueMatchBatch that feeds the
>>>>>> internal queue.  Both of these methods are implemented on the controller
>>>>>> and the shard server.  There will be a TableQueueReader that can run 
>>>>>> inside
>>>>>> the controller to read from a queue and dealing with the partitioning
>>>>>> inside the controller.  The class is written and committed but the logic 
>>>>>> to
>>>>>> instantiate and run it has not been written.
>>>>>>
>>>>>> However using the controller api (standard Thrift Client) to write
>>>>>> RowMutations via the enqueueMethod from Storm could be an option right 
>>>>>> now
>>>>>> without needing to implement anything that runs inside of Blur.  The only
>>>>>> issue now is the blocking natural of the in-memory queue.  I will be
>>>>>> working to finish this feature before the release, but I believe that it 
>>>>>> is
>>>>>> mostly in a state to evaluate.  The only issue that I can see is that
>>>>>> writing data in via the enqueueMutate method could have some performance
>>>>>> slow downs once it hits the max queue length and once the HDFS back 
>>>>>> version
>>>>>> is in place that slow down will be less apparent.
>>>>>>
>>>>>> So here's a run down on where the feature lacks:
>>>>>>
>>>>>> 1.  The shard queues are an in-memory only data structure.  So data
>>>>>> can be lost at this point if a shard fails and because they have a finite
>>>>>> length they can block under heavy load.  This one is the I see as a must
>>>>>> before the release.
>>>>>> 2.  A way to run the table queue reader in the controllers, but with
>>>>>> the rework of the API I'm not sure you all would really need this 
>>>>>> anymore.
>>>>>>
>>>>>> Let me know if you all need any help getting started with this
>>>>>> updated code.
>>>>>>
>>>>>> Thanks!
>>>>>>
>>>>>> Aaron
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Fri, Mar 7, 2014 at 10:00 AM, Dibyendu Bhattacharya <
>>>>>> [email protected]> wrote:
>>>>>>
>>>>>>> Hi Aaron,
>>>>>>>
>>>>>>> Do you still plan to have This Real Time Queue based indexing
>>>>>>> feature for Blur 0.2.2 ? I know you are very busy on 2.2. release, just
>>>>>>> wanted to know if this will be coming soon.
>>>>>>>
>>>>>>> Regards,
>>>>>>> Dibyendu
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Tue, Mar 4, 2014 at 8:50 AM, Jonathan Hodges 
>>>>>>> <[email protected]>wrote:
>>>>>>>
>>>>>>>> Nothing to add, I agree the Kafka partitions don't need to match
>>>>>>>> the Blur partitions.
>>>>>>>>
>>>>>>>>
>>>>>>>> On Mon, Mar 3, 2014 at 7:17 PM, Dibyendu Bhattacharya <
>>>>>>>> [email protected]> wrote:
>>>>>>>>
>>>>>>>>> Hi Aaron,
>>>>>>>>>
>>>>>>>>> No, I do not see we need to match Kafka Partitions with Blur
>>>>>>>>> Partitions. In fact, the number of partitions in Kafka and Number of 
>>>>>>>>> shards
>>>>>>>>> in Blur may not match also.
>>>>>>>>>
>>>>>>>>> Jonathan, do you have anything to add here.
>>>>>>>>>
>>>>>>>>> Regards,
>>>>>>>>> Dibyendu
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Mon, Mar 3, 2014 at 11:35 PM, Aaron McCurry <[email protected]
>>>>>>>>> > wrote:
>>>>>>>>>
>>>>>>>>>> Ok, I do have a question.  Do you see a use for the current use
>>>>>>>>>> case where Kafka partitions match Blur partitions and the clients 
>>>>>>>>>> pushing
>>>>>>>>>> messages into Kafka partition the data into the Kakfa partitions to 
>>>>>>>>>> match
>>>>>>>>>> Blur partitions?  The reason I ask is i want to know if I should 
>>>>>>>>>> keep the
>>>>>>>>>> current low level API pluggable or not.
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Mon, Mar 3, 2014 at 10:29 AM, Dibyendu Bhattacharya <
>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>
>>>>>>>>>>> Thanks Aaron, yes I figured it out how Blur uses this API, you
>>>>>>>>>>> do not need to take a look at this.
>>>>>>>>>>>
>>>>>>>>>>> Once you are done with the new design of the queue feature, do
>>>>>>>>>>> let me know, I will try to integrate Kafka into it and test it .
>>>>>>>>>>>
>>>>>>>>>>> Dibyendu
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> On Mon, Mar 3, 2014 at 8:07 PM, Aaron McCurry <
>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Based on your post on the mail list I assume that you got what
>>>>>>>>>>>> you needed working, or at least figured out how Blur was using the 
>>>>>>>>>>>> API.
>>>>>>>>>>>>  Let me know if you need me to take a look at this or not.  Also 
>>>>>>>>>>>> I'm
>>>>>>>>>>>> planning on spending some time this afternoon working through 
>>>>>>>>>>>> making this
>>>>>>>>>>>> feature easier to use.
>>>>>>>>>>>>
>>>>>>>>>>>> Aaron
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> On Fri, Feb 28, 2014 at 10:03 AM, Aaron McCurry <
>>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> Hey Dibyendu,
>>>>>>>>>>>>>
>>>>>>>>>>>>> It will take me a little while to digest this.  I will try to
>>>>>>>>>>>>> get back to you later this afternoon.  Thanks!
>>>>>>>>>>>>>
>>>>>>>>>>>>> Aaron
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Fri, Feb 28, 2014 at 8:29 AM, Dibyendu Bhattacharya <
>>>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> Just forwarding you to guide me if the approach is correct
>>>>>>>>>>>>>> for Kafka Consumer with multiple kafka partitions indexed into 
>>>>>>>>>>>>>> multiple
>>>>>>>>>>>>>> blur table shards.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Sorry that code is not that clean.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Dibyendu
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> ---------- Forwarded message ----------
>>>>>>>>>>>>>> From: Dibyendu Bhattacharya <[email protected]>
>>>>>>>>>>>>>> Date: Fri, Feb 28, 2014 at 5:32 PM
>>>>>>>>>>>>>> Subject: Re: new queue capability
>>>>>>>>>>>>>> To: [email protected], Jonathan Hodges <
>>>>>>>>>>>>>> [email protected]>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Hi,
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> I was just playing with the new QueueReader API, and as Tim
>>>>>>>>>>>>>> pointed out , its very low level . I still tried to implement a
>>>>>>>>>>>>>> KafkaConsumer .
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Here is my use case. Let me know if I have approached
>>>>>>>>>>>>>> correctly.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> I have a given topic in Kafka, which has 3 Partitions. And in
>>>>>>>>>>>>>> Blur I have a table with 2 Shards . I need to index all messages 
>>>>>>>>>>>>>> from Kafka
>>>>>>>>>>>>>> Topic to Blur Table.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>  I have used Kafka ConsumerGroupAPI to consume in parallel in
>>>>>>>>>>>>>> 2 streams ( from 3 partitions) for indexing into 2 Blur shards. 
>>>>>>>>>>>>>> As
>>>>>>>>>>>>>> ConsumerGroup API allow me to split any Kafka Topic into N 
>>>>>>>>>>>>>> number of
>>>>>>>>>>>>>> streams, I can choose N for my target shard count, here it is 2.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> For both shards I created two ShardContext and
>>>>>>>>>>>>>> two BlurIndexSimpleWriter. ( Is this okay ?)
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Now, I modified the BlurIndexSimpleWriter  to get handle to
>>>>>>>>>>>>>> the _queueReader object.  I used this _queueReader  to populate 
>>>>>>>>>>>>>> the
>>>>>>>>>>>>>> respective shards queue taking messages from KafkaStreams.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Here is the TestCase (KafkaReaderTest) , KafkaStreamReader (
>>>>>>>>>>>>>> which reads the Kafka Stream) , and the KafkaQueueReader ( The Q 
>>>>>>>>>>>>>> interface
>>>>>>>>>>>>>> for Blur)
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Also attached the modified BlurIndexSimpleWriter. Just added
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>   public QueueReader getQueueReader(){
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>   return _queueReader;
>>>>>>>>>>>>>>   }
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> With these changes, I am able to read Kafka messages in
>>>>>>>>>>>>>> parallel streams and index them into 2 shards. All documents 
>>>>>>>>>>>>>> from Kafka
>>>>>>>>>>>>>> getting indexed properly. But after TestCases run , I can see 
>>>>>>>>>>>>>> two Index
>>>>>>>>>>>>>> Directory for two path I created.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Let me know if this approach is correct ? In this code, I
>>>>>>>>>>>>>> have not taken care of Shard Failure logic and as Tim pointed 
>>>>>>>>>>>>>> out, if that
>>>>>>>>>>>>>> can be abstracted form client that will be great.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Regards,
>>>>>>>>>>>>>> Dibyendu
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> On Thu, Feb 27, 2014 at 9:40 PM, Aaron McCurry <
>>>>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> What if we provide an implementation of the QueueReader
>>>>>>>>>>>>>>> concept that does
>>>>>>>>>>>>>>> what you are discussing.  That way in more extreme cases
>>>>>>>>>>>>>>> when the user is
>>>>>>>>>>>>>>> forced into implementing the lower level api (perhaps for
>>>>>>>>>>>>>>> performance) they
>>>>>>>>>>>>>>> can still do it, but for the normal case the partitioning
>>>>>>>>>>>>>>> (and other
>>>>>>>>>>>>>>> difficult issues) are handled by the controllers.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> I could see adding an enqueueMutate call to the controllers
>>>>>>>>>>>>>>> that pushes the
>>>>>>>>>>>>>>> mutates to the correct buckets for the user.  At the same
>>>>>>>>>>>>>>> time we could
>>>>>>>>>>>>>>> allow each of the controllers to pull from an external and
>>>>>>>>>>>>>>> push the mutates
>>>>>>>>>>>>>>> to the correct buckets for the shards.  I could see a couple
>>>>>>>>>>>>>>> of different
>>>>>>>>>>>>>>> ways of handling this.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> However I do agree that right now there is too much burden
>>>>>>>>>>>>>>> on the user for
>>>>>>>>>>>>>>> the 95% case.  We should make this simpler.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Aaron
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> On Thu, Feb 27, 2014 at 10:07 AM, Tim Williams <
>>>>>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> > I've been playing around with the new QueueReader stuff
>>>>>>>>>>>>>>> and I'm
>>>>>>>>>>>>>>> > starting to believe it's at the wrong level of abstraction
>>>>>>>>>>>>>>> - in the
>>>>>>>>>>>>>>> > shard context - for a user.
>>>>>>>>>>>>>>> >
>>>>>>>>>>>>>>> > Between having to know about the BlurPartioner and
>>>>>>>>>>>>>>> handling all the
>>>>>>>>>>>>>>> > failure nuances, I'm thinking a much friendlier approach
>>>>>>>>>>>>>>> would be to
>>>>>>>>>>>>>>> > have the client implement a single message pump that Blur
>>>>>>>>>>>>>>> take's from
>>>>>>>>>>>>>>> > and handles.
>>>>>>>>>>>>>>> >
>>>>>>>>>>>>>>> > Maybe on startup the Controllers compete for the lead
>>>>>>>>>>>>>>> QueueReader
>>>>>>>>>>>>>>> > position, create it from the TableContext and run with it?
>>>>>>>>>>>>>>>  The user
>>>>>>>>>>>>>>> > would still need to deal with  Controller failures but
>>>>>>>>>>>>>>> that seems
>>>>>>>>>>>>>>> > easier to reason about then shard failures.
>>>>>>>>>>>>>>> >
>>>>>>>>>>>>>>> > The way it's crafted right now, the user seems burdened
>>>>>>>>>>>>>>> with a lot of
>>>>>>>>>>>>>>> > the hard problems that Blur otherwise solves.  Obviously,
>>>>>>>>>>>>>>> it trades
>>>>>>>>>>>>>>> > off a high burden for one of the controllers.
>>>>>>>>>>>>>>> >
>>>>>>>>>>>>>>> > Thoughts?
>>>>>>>>>>>>>>> > --tim
>>>>>>>>>>>>>>> >
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Reply via email to