Yes that is exactly what I was thinking. I can help you build a Kafka reader for Blur. This should be very interesting. :-)
Aaron On Sun, Feb 23, 2014 at 12:07 AM, Dibyendu Bhattacharya < [email protected]> wrote: > Hi Aaron, > > In our real time stream processing system we use Kafka as our distributed > message infrastructure , and Storm pulls form Kafka and does various stream > processing. So if we can pull message from Kafka directly , that could work > for us as well. There are similar concept exists in Elasticsearch which is > called "River" which is basically a plug-able service for pulling the data > from various sources. If such service exists in Blur, we can implement > different connectors for Kafka and other sources.. > > Regards, > Dibyendu > > > > On Sun, Feb 23, 2014 at 8:54 AM, Aaron McCurry <[email protected]> wrote: > >> >> On Sat, Feb 22, 2014 at 1:20 PM, Dibyendu Bhattacharya < >> [email protected]> wrote: >> >>> Hi, >>> >>> I have taken the latest 0.2.2. updates and the issue I found earlier is >>> fixed. I need to clear the Zookeeper path (/blur) though before indexing. >>> I used Thrift client to index around 90K rows ( Json) around size of ~ 50 >>> MB. The indexing went through fine , and the final index size is 280 MB. I >>> have not evaluated the indexing rate, as it is our main concern. We need to >>> do that in bigger AWS machine with higher RAM. We will be indexing at near >>> real time stream from Storm and that comes at very high rate. Do you have >>> any benchmark on blur indexing throughput for a given configuration ? >>> >> >> Great glad things are working better for you now. As for your >> benchmarking question I don't have any hard numbers that I can share, >> however I can give you a break down on the various ways you could get data >> into the system and the performance profiles with each. Once 0.2.2 is >> released I want to run some sort of benchmarks against it perhaps in EC2 >> perhaps on bare metal. It will depend on what I have available to me. >> What kinda of benchmarks do you think will be useful? What data should I >> use? Would like to hear your thoughts on this. >> >> Here's the break for updating data. >> >> Mutates via Thrift - No Batch >> >> This option will be the slowest. Blur commits all data to disk (HDFS) >> with every mutate. It also refreshes the index after each commit. In this >> option every mutate is atomic. However this comes at a cost, I would say >> that with a single thread mutating on a medium sized cluster you will >> likely see <100 mutates / second. >> >> Mutates via Thrift - Batch >> >> This option will be faster than the "No Batch" option and has similar >> behavior. If the call succeeds all the data is immediately visible. >> However if the call fails, there is a good chance that some of the data >> has been updated and some has not (this is because of the multiple shards >> in the table and lack of global transactions). The mutate speeds here can >> reach 10,000-100,000 / second depending greatly on the table size cluster >> size and record size. >> >> Mutates via MapReduce >> >> This option will produce the highest indexing rates of all the options. >> I have seen on medium to large Hadoop cluster indexing speeds reach as >> high as 30 million records / second. This comes at high latency so my >> guess is that it won't easily fit into your Storm architecture. >> >> Mutates via Queue (New!!!) >> >> After giving your email some thought. You said that you were planning on >> using Storm for processing of your data. That usually means that you are >> processing things in real time. My guess is that batch processing via >> mapreduce as well as batch mutates would be difficult to implement (and not >> desired) in a real time architecture. I want Blur to more easily fit into >> this type of architecture so I wrote a new simple feature. It will allow >> for pulling mutates off a queue in batches. >> >> So Storm could write into the queue and Blur could read from it. The >> indexing speeds seem to be better than the batch mutates but again I don't >> have any hard numbers. As for queues I would recommend a scalable >> distributed queue because there will need to one logical queue per shard >> per table. Of course this can be modified once I have some real world use >> cases but there is at least an API to implement now. >> >> Hope this helps frame your expectations of what is possible. Let us know >> if we can answer anymore questions. >> >> Here's the commit for the queue feature: >> >> >> https://git-wip-us.apache.org/repos/asf?p=incubator-blur.git;a=commit;h=052c131e92e0caefb0c513fe52098ad6c6e04d3a >> >> Aaron >> >> >> >>> >>> Regards, >>> Dibyendu >>> >>> >>> On Fri, Feb 21, 2014 at 9:36 PM, Dibyendu Bhattacharya < >>> [email protected]> wrote: >>> >>>> Thanks for prompt reply. Sure, will pull updates and rebuild. Will let >>>> you know if I still see the issue. >>>> >>>> Dibyendu >>>> >>>> >>>> On Fri, Feb 21, 2014 at 9:32 PM, Aaron McCurry <[email protected]>wrote: >>>> >>>>> Dibyendu, >>>>> >>>>> Thanks for going through the pain of pulling and building 0.2.2, I >>>>> believe this was a bug that was resolved a few days ago. Would you mind >>>>> to >>>>> try pulling updates and rebuilding? If you are up-to-date then this is a >>>>> new bug and I will create an issue and work on it. We have been working >>>>> on >>>>> a few bugs over the past week, fixing tests getting close to a release. >>>>> Thanks! >>>>> >>>>> Aaron >>>>> >>>>> >>>>> On Fri, Feb 21, 2014 at 10:34 AM, Dibyendu Bhattacharya < >>>>> [email protected]> wrote: >>>>> >>>>>> Hi Aron, >>>>>> >>>>>> As you suggested me last time, I used Blur 0.2.2. I am trying to >>>>>> perform real time insert using thrift API, but facing some error. I >>>>>> created >>>>>> the Table from Blur Shell with 5 shards. The table I am trying to index >>>>>> will have single family and many columns and every rowid have one >>>>>> record.id. There are many such rows I want to index. >>>>>> >>>>>> Is there anything wrong I am doing ? I am using Blur 2.2. Thrift >>>>>> mutation API. >>>>>> >>>>>> Connection connection = new Connection("10.254.71.171:40010"); >>>>>> Iface client = BlurClient.getClient(connection); >>>>>> Record record = new Record(); >>>>>> record.setRecordId(id); >>>>>> record.setFamily(family); >>>>>> record.addToColumns(new Column(key, value)); >>>>>> List recordMutations = new ArrayList(); >>>>>> recordMutations.add(new >>>>>> RecordMutation(RecordMutationType.REPLACE_ENTIRE_RECORD, record)); >>>>>> RowMutation mutation = new RowMutation( >>>>>> TABLE_NAME,rowid,RowMutationType.REPLACE_ROW, >>>>>> recordMutations); >>>>>> mutation.setRecordMutations(recordMutations); >>>>>> client.mutate(mutation); >>>>>> >>>>>> and below is the exception..... >>>>>> >>>>>> BlurException(message:Unknown error during mutation of >>>>>> [RowMutation(table:mytable, rowId:1db5793577dd8595dd483a5ec87aad1, >>>>>> rowMutationType:REPLACE_ROW, >>>>>> recordMutations:[RecordMutation(recordMutationType:REPLACE_ENTIRE_RECORD, >>>>>> record:Record(recordId:1391378450807, family:my_family, >>>>>> columns:[Column(name:actor.location.ip, value:10.201.1.195), >>>>>> Column(name:context.campusId, value:24754975), >>>>>> Column(name:generator.market, value:digitalvellum), >>>>>> Column(name:context.questiondata, >>>>>> value:{"numericsubmissions":[{"choicenumber":2}]}), Column(name: >>>>>> object.id, value:52458965), Column(name:target.id, value:9431547), >>>>>> Column(name:verb, value:assessments.question.save), >>>>>> Column(name:generator.source, value:js_base_client), >>>>>> Column(name:generator.channel, value:seer-activity), >>>>>> Column(name:target.objectType, value:eclg.courseId), Column(name: >>>>>> actor.id, value:24754975), Column(name:generator.userAgent, >>>>>> value:Mozilla/5.0 (compatible; MSIE 10.0; Windows NT 6.2; WOW64; >>>>>> Trident/6.0)), Column(name:object.objectType, >>>>>> value:assessments.questionsave), Column(name:serverDate, >>>>>> value:2014-02-02T22:00:52.068Z), Column(name:published, >>>>>> value:2014-02-02T22:00:50.807Z), Column(name:generator.appId, >>>>>> value:dvcourse)]))])], stackTraceStr:java.lang.NullPointerException >>>>>> at >>>>>> org.apache.blur.thrift.BlurControllerServer.mutate(BlurControllerServer.java:1027) >>>>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >>>>>> at >>>>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) >>>>>> at >>>>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >>>>>> at java.lang.reflect.Method.invoke(Method.java:606) >>>>>> at org.apache.blur.utils.BlurUtil$2.invoke(BlurUtil.java:255) >>>>>> at com.sun.proxy.$Proxy0.mutate(Unknown Source) >>>>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >>>>>> at >>>>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) >>>>>> at >>>>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >>>>>> at java.lang.reflect.Method.invoke(Method.java:606) >>>>>> at org.apache.blur.utils.BlurUtil$7.invoke(BlurUtil.java:1216) >>>>>> at com.sun.proxy.$Proxy0.mutate(Unknown Source) >>>>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >>>>>> at >>>>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) >>>>>> at >>>>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >>>>>> at java.lang.reflect.Method.invoke(Method.java:606) >>>>>> at org.apache.blur.utils.BlurUtil$8.invoke(BlurUtil.java:1258) >>>>>> at com.sun.proxy.$Proxy0.mutate(Unknown Source) >>>>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >>>>>> at >>>>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) >>>>>> at >>>>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >>>>>> at java.lang.reflect.Method.invoke(Method.java:606) >>>>>> at org.apache.blur.utils.BlurUtil$1.invoke(BlurUtil.java:178) >>>>>> at com.sun.proxy.$Proxy0.mutate(Unknown Source) >>>>>> at >>>>>> org.apache.blur.thrift.generated.Blur$Processor$mutate.getResult(Blur.java:3369) >>>>>> at >>>>>> org.apache.blur.thrift.generated.Blur$Processor$mutate.getResult(Blur.java:3353) >>>>>> at >>>>>> org.apache.blur.thirdparty.thrift_0_9_0.ProcessFunction.process(ProcessFunction.java:54) >>>>>> at >>>>>> org.apache.blur.thirdparty.thrift_0_9_0.TBaseProcessor.process(TBaseProcessor.java:57) >>>>>> at >>>>>> org.apache.blur.thrift.server.AbstractNonblockingServer$FrameBuffer.invoke(AbstractNonblockingServer.java:555) >>>>>> at >>>>>> org.apache.blur.thrift.server.Invocation.run(Invocation.java:34) >>>>>> at >>>>>> org.apache.blur.concurrent.ThreadWatcher$ThreadWatcherExecutorService$1.run(ThreadWatcher.java:127) >>>>>> at >>>>>> org.apache.blur.concurrent.BlurThreadPoolExecutor$1.run(BlurThreadPoolExecutor.java:83) >>>>>> at >>>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) >>>>>> at >>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) >>>>>> at java.lang.Thread.run(Thread.java:744) >>>>>> , errorType:UNKNOWN) >>>>>> at >>>>>> org.apache.blur.thrift.generated.Blur$mutate_result$mutate_resultStandardScheme.read(Blur.java:20163) >>>>>> at >>>>>> org.apache.blur.thrift.generated.Blur$mutate_result$mutate_resultStandardScheme.read(Blur.java:20149) >>>>>> at >>>>>> org.apache.blur.thrift.generated.Blur$mutate_result.read(Blur.java:20099) >>>>>> at >>>>>> org.apache.blur.thirdparty.thrift_0_9_0.TServiceClient.receiveBase(TServiceClient.java:78) >>>>>> at >>>>>> org.apache.blur.thrift.generated.Blur$Client.recv_mutate(Blur.java:1003) >>>>>> at >>>>>> org.apache.blur.thrift.generated.SafeClientGen.recv_mutate(SafeClientGen.java:326) >>>>>> at >>>>>> org.apache.blur.thrift.generated.Blur$Client.mutate(Blur.java:990) >>>>>> at >>>>>> org.apache.blur.thrift.generated.SafeClientGen.mutate(SafeClientGen.java:165) >>>>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >>>>>> at >>>>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) >>>>>> at >>>>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >>>>>> at java.lang.reflect.Method.invoke(Method.java:606) >>>>>> at >>>>>> org.apache.blur.thrift.BlurClient$BlurClientInvocationHandler$1.call(BlurClient.java:60) >>>>>> at >>>>>> org.apache.blur.thrift.BlurClient$BlurClientInvocationHandler$1.call(BlurClient.java:56) >>>>>> at >>>>>> org.apache.blur.thrift.AbstractCommand.call(AbstractCommand.java:62) >>>>>> at >>>>>> org.apache.blur.thrift.BlurClientManager.execute(BlurClientManager.java:192) >>>>>> at >>>>>> org.apache.blur.thrift.BlurClient$BlurClientInvocationHandler.invoke(BlurClient.java:56) >>>>>> at com.sun.proxy.$Proxy0.mutate(Unknown Source) >>>>>> at >>>>>> com.pearson.blur.thrift.client.BlurLoader.load(BlurLoader.java:85) >>>>>> at >>>>>> com.pearson.blur.thrift.client.BlurLoader.main(BlurLoader.java:39) >>>>>> >>>>>> >>>>>> >>>>> >>>> >>> >> >
