Yes that is exactly what I was thinking.  I can help you build a Kafka
reader for Blur.  This should be very interesting.  :-)

Aaron


On Sun, Feb 23, 2014 at 12:07 AM, Dibyendu Bhattacharya <
[email protected]> wrote:

> Hi Aaron,
>
> In our real time stream processing system we use Kafka as our distributed
> message infrastructure , and Storm pulls form Kafka and does various stream
> processing. So if we can pull message from Kafka directly , that could work
> for us as well. There are similar concept exists in Elasticsearch which is
> called "River" which is basically a plug-able service for pulling the data
> from various sources. If such service exists in Blur, we can implement
> different connectors for Kafka and other sources..
>
> Regards,
> Dibyendu
>
>
>
> On Sun, Feb 23, 2014 at 8:54 AM, Aaron McCurry <[email protected]> wrote:
>
>>
>> On Sat, Feb 22, 2014 at 1:20 PM, Dibyendu Bhattacharya <
>> [email protected]> wrote:
>>
>>> Hi,
>>>
>>> I have taken the latest 0.2.2. updates and the issue I found earlier is
>>> fixed. I need to clear the Zookeeper path  (/blur) though before indexing.
>>> I used Thrift client to index around 90K rows ( Json) around size of ~ 50
>>> MB. The indexing went through fine , and the final index size is 280 MB. I
>>> have not evaluated the indexing rate, as it is our main concern. We need to
>>> do that in bigger AWS machine with higher RAM. We will be indexing at near
>>> real time stream from Storm and that comes at very high rate. Do you have
>>> any benchmark on blur indexing throughput for a given configuration ?
>>>
>>
>> Great glad things are working better for you now.  As for your
>> benchmarking question I don't have any hard numbers that I can share,
>> however I can give you a break down on the various ways you could get data
>> into the system and the performance profiles with each.  Once 0.2.2 is
>> released I want to run some sort of benchmarks against it perhaps in EC2
>> perhaps on bare metal.  It will depend on what I have available to me.
>>  What kinda of benchmarks do you think will be useful?  What data should I
>> use?  Would like to hear your thoughts on this.
>>
>> Here's the break for updating data.
>>
>> Mutates via Thrift - No Batch
>>
>> This option will be the slowest.  Blur commits all data to disk (HDFS)
>> with every mutate.  It also refreshes the index after each commit.  In this
>> option every mutate is atomic.  However this comes at a cost, I would say
>> that with a single thread mutating on a medium sized cluster you will
>> likely see <100 mutates / second.
>>
>> Mutates via Thrift - Batch
>>
>> This option will be faster than the "No Batch" option and has similar
>> behavior.  If the call succeeds all the data is immediately visible.
>>  However if the call fails, there is a good chance that some of the data
>> has been updated and some has not (this is because of the multiple shards
>> in the table and lack of global transactions).  The mutate speeds here can
>> reach 10,000-100,000 / second depending greatly on the table size cluster
>> size and record size.
>>
>> Mutates via MapReduce
>>
>> This option will produce the highest indexing rates of all the options.
>>  I have seen on medium to large Hadoop cluster indexing speeds reach as
>> high as 30 million records / second.  This comes at high latency so my
>> guess is that it won't easily fit into your Storm architecture.
>>
>> Mutates via Queue (New!!!)
>>
>> After giving your email some thought.  You said that you were planning on
>> using Storm for processing of your data.  That usually means that you are
>> processing things in real time.  My guess is that batch processing via
>> mapreduce as well as batch mutates would be difficult to implement (and not
>> desired) in a real time architecture.  I want Blur to more easily fit into
>> this type of architecture so I wrote a new simple feature.  It will allow
>> for pulling mutates off a queue in batches.
>>
>> So Storm could write into the queue and Blur could read from it.  The
>> indexing speeds seem to be better than the batch mutates but again I don't
>> have any hard numbers.  As for queues I would recommend a scalable
>> distributed queue because there will need to one logical queue per shard
>> per table.  Of course this can be modified once I have some real world use
>> cases but there is at least an API to implement now.
>>
>> Hope this helps frame your expectations of what is possible.  Let us know
>> if we can answer anymore questions.
>>
>> Here's the commit for the queue feature:
>>
>>
>> https://git-wip-us.apache.org/repos/asf?p=incubator-blur.git;a=commit;h=052c131e92e0caefb0c513fe52098ad6c6e04d3a
>>
>> Aaron
>>
>>
>>
>>>
>>> Regards,
>>> Dibyendu
>>>
>>>
>>> On Fri, Feb 21, 2014 at 9:36 PM, Dibyendu Bhattacharya <
>>> [email protected]> wrote:
>>>
>>>> Thanks for prompt reply. Sure, will pull updates and rebuild. Will let
>>>> you know if I still see the issue.
>>>>
>>>> Dibyendu
>>>>
>>>>
>>>> On Fri, Feb 21, 2014 at 9:32 PM, Aaron McCurry <[email protected]>wrote:
>>>>
>>>>> Dibyendu,
>>>>>
>>>>> Thanks for going through the pain of pulling and building 0.2.2, I
>>>>> believe this was a bug that was resolved a few days ago.  Would you mind 
>>>>> to
>>>>> try pulling updates and rebuilding?  If you are up-to-date then this is a
>>>>> new bug and I will create an issue and work on it.  We have been working 
>>>>> on
>>>>> a few bugs over the past week, fixing tests getting close to a release.
>>>>> Thanks!
>>>>>
>>>>> Aaron
>>>>>
>>>>>
>>>>> On Fri, Feb 21, 2014 at 10:34 AM, Dibyendu Bhattacharya <
>>>>> [email protected]> wrote:
>>>>>
>>>>>> Hi Aron,
>>>>>>
>>>>>> As you suggested me last time, I used Blur 0.2.2. I am trying to
>>>>>> perform real time insert using thrift API, but facing some error. I 
>>>>>> created
>>>>>> the Table from Blur Shell with 5 shards. The table I am trying to index
>>>>>> will have single family and many columns and every rowid have one
>>>>>> record.id. There are many such rows I want to index.
>>>>>>
>>>>>> Is there anything wrong I am doing ? I am using Blur 2.2. Thrift
>>>>>> mutation API.
>>>>>>
>>>>>> Connection connection = new Connection("10.254.71.171:40010");
>>>>>>  Iface client = BlurClient.getClient(connection);
>>>>>> Record record = new Record();
>>>>>> record.setRecordId(id);
>>>>>>  record.setFamily(family);
>>>>>> record.addToColumns(new Column(key, value));
>>>>>>  List recordMutations = new ArrayList();
>>>>>> recordMutations.add(new
>>>>>> RecordMutation(RecordMutationType.REPLACE_ENTIRE_RECORD, record));
>>>>>>  RowMutation mutation = new RowMutation( 
>>>>>> TABLE_NAME,rowid,RowMutationType.REPLACE_ROW,
>>>>>> recordMutations);
>>>>>> mutation.setRecordMutations(recordMutations);
>>>>>>  client.mutate(mutation);
>>>>>>
>>>>>> and below is the exception.....
>>>>>>
>>>>>> BlurException(message:Unknown error during mutation of
>>>>>> [RowMutation(table:mytable, rowId:1db5793577dd8595dd483a5ec87aad1,
>>>>>> rowMutationType:REPLACE_ROW,
>>>>>> recordMutations:[RecordMutation(recordMutationType:REPLACE_ENTIRE_RECORD,
>>>>>> record:Record(recordId:1391378450807, family:my_family,
>>>>>> columns:[Column(name:actor.location.ip, value:10.201.1.195),
>>>>>> Column(name:context.campusId, value:24754975),
>>>>>> Column(name:generator.market, value:digitalvellum),
>>>>>> Column(name:context.questiondata,
>>>>>> value:{"numericsubmissions":[{"choicenumber":2}]}), Column(name:
>>>>>> object.id, value:52458965), Column(name:target.id, value:9431547),
>>>>>> Column(name:verb, value:assessments.question.save),
>>>>>> Column(name:generator.source, value:js_base_client),
>>>>>> Column(name:generator.channel, value:seer-activity),
>>>>>> Column(name:target.objectType, value:eclg.courseId), Column(name:
>>>>>> actor.id, value:24754975), Column(name:generator.userAgent,
>>>>>> value:Mozilla/5.0 (compatible; MSIE 10.0; Windows NT 6.2; WOW64;
>>>>>> Trident/6.0)), Column(name:object.objectType,
>>>>>> value:assessments.questionsave), Column(name:serverDate,
>>>>>> value:2014-02-02T22:00:52.068Z), Column(name:published,
>>>>>> value:2014-02-02T22:00:50.807Z), Column(name:generator.appId,
>>>>>> value:dvcourse)]))])], stackTraceStr:java.lang.NullPointerException
>>>>>>         at
>>>>>> org.apache.blur.thrift.BlurControllerServer.mutate(BlurControllerServer.java:1027)
>>>>>>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>>>>>         at
>>>>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>>>>>>         at
>>>>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>>>>>         at java.lang.reflect.Method.invoke(Method.java:606)
>>>>>>         at org.apache.blur.utils.BlurUtil$2.invoke(BlurUtil.java:255)
>>>>>>         at com.sun.proxy.$Proxy0.mutate(Unknown Source)
>>>>>>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>>>>>         at
>>>>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>>>>>>         at
>>>>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>>>>>         at java.lang.reflect.Method.invoke(Method.java:606)
>>>>>>         at org.apache.blur.utils.BlurUtil$7.invoke(BlurUtil.java:1216)
>>>>>>         at com.sun.proxy.$Proxy0.mutate(Unknown Source)
>>>>>>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>>>>>         at
>>>>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>>>>>>         at
>>>>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>>>>>         at java.lang.reflect.Method.invoke(Method.java:606)
>>>>>>         at org.apache.blur.utils.BlurUtil$8.invoke(BlurUtil.java:1258)
>>>>>>         at com.sun.proxy.$Proxy0.mutate(Unknown Source)
>>>>>>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>>>>>         at
>>>>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>>>>>>         at
>>>>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>>>>>         at java.lang.reflect.Method.invoke(Method.java:606)
>>>>>>         at org.apache.blur.utils.BlurUtil$1.invoke(BlurUtil.java:178)
>>>>>>         at com.sun.proxy.$Proxy0.mutate(Unknown Source)
>>>>>>         at
>>>>>> org.apache.blur.thrift.generated.Blur$Processor$mutate.getResult(Blur.java:3369)
>>>>>>         at
>>>>>> org.apache.blur.thrift.generated.Blur$Processor$mutate.getResult(Blur.java:3353)
>>>>>>         at
>>>>>> org.apache.blur.thirdparty.thrift_0_9_0.ProcessFunction.process(ProcessFunction.java:54)
>>>>>>         at
>>>>>> org.apache.blur.thirdparty.thrift_0_9_0.TBaseProcessor.process(TBaseProcessor.java:57)
>>>>>>         at
>>>>>> org.apache.blur.thrift.server.AbstractNonblockingServer$FrameBuffer.invoke(AbstractNonblockingServer.java:555)
>>>>>>         at
>>>>>> org.apache.blur.thrift.server.Invocation.run(Invocation.java:34)
>>>>>>         at
>>>>>> org.apache.blur.concurrent.ThreadWatcher$ThreadWatcherExecutorService$1.run(ThreadWatcher.java:127)
>>>>>>         at
>>>>>> org.apache.blur.concurrent.BlurThreadPoolExecutor$1.run(BlurThreadPoolExecutor.java:83)
>>>>>>         at
>>>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>>>>>>         at
>>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>>>>>>         at java.lang.Thread.run(Thread.java:744)
>>>>>> , errorType:UNKNOWN)
>>>>>>         at
>>>>>> org.apache.blur.thrift.generated.Blur$mutate_result$mutate_resultStandardScheme.read(Blur.java:20163)
>>>>>>         at
>>>>>> org.apache.blur.thrift.generated.Blur$mutate_result$mutate_resultStandardScheme.read(Blur.java:20149)
>>>>>>         at
>>>>>> org.apache.blur.thrift.generated.Blur$mutate_result.read(Blur.java:20099)
>>>>>>         at
>>>>>> org.apache.blur.thirdparty.thrift_0_9_0.TServiceClient.receiveBase(TServiceClient.java:78)
>>>>>>         at
>>>>>> org.apache.blur.thrift.generated.Blur$Client.recv_mutate(Blur.java:1003)
>>>>>>         at
>>>>>> org.apache.blur.thrift.generated.SafeClientGen.recv_mutate(SafeClientGen.java:326)
>>>>>>         at
>>>>>> org.apache.blur.thrift.generated.Blur$Client.mutate(Blur.java:990)
>>>>>>         at
>>>>>> org.apache.blur.thrift.generated.SafeClientGen.mutate(SafeClientGen.java:165)
>>>>>>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>>>>>         at
>>>>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>>>>>>         at
>>>>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>>>>>         at java.lang.reflect.Method.invoke(Method.java:606)
>>>>>>         at
>>>>>> org.apache.blur.thrift.BlurClient$BlurClientInvocationHandler$1.call(BlurClient.java:60)
>>>>>>         at
>>>>>> org.apache.blur.thrift.BlurClient$BlurClientInvocationHandler$1.call(BlurClient.java:56)
>>>>>>         at
>>>>>> org.apache.blur.thrift.AbstractCommand.call(AbstractCommand.java:62)
>>>>>>         at
>>>>>> org.apache.blur.thrift.BlurClientManager.execute(BlurClientManager.java:192)
>>>>>>         at
>>>>>> org.apache.blur.thrift.BlurClient$BlurClientInvocationHandler.invoke(BlurClient.java:56)
>>>>>>         at com.sun.proxy.$Proxy0.mutate(Unknown Source)
>>>>>>         at
>>>>>> com.pearson.blur.thrift.client.BlurLoader.load(BlurLoader.java:85)
>>>>>>         at
>>>>>> com.pearson.blur.thrift.client.BlurLoader.main(BlurLoader.java:39)
>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Reply via email to