Hi Shigeru, I believe you can adjust this parallelism you are asking for by modifying the -p parameter in the VHT algorithm, e.g: -l (classifiers.trees.VerticalHoeffdingTree -p 2) will run with 2 parallel statistics.
There is another option in the file bin/samoa-storm.properties which you declare that you are running the storm in a cluster mode and you also define the worker processes allocated to the cluster: samoa.storm.numworker=2 You should adjust that one as well. In my setup I found the two parameters needed to be aligned (i.e., the -p and the samoa.storm.numworker) but I don't know if yours is different. Hope this helps, Nicolas On Tue, May 2, 2017 at 7:45 PM, Shigeru Imai <[email protected]> wrote: > Hi Nicolas, > > Thank you for your reply. > > I will wait for SAMOA-65 to be available. > > I tried a dataset with 100 numerical and 100 nominal attributes generated > with RandomTreeGenerator, but that did not scale either. Again, the > throughput remained 50 Mbytes/sec up to 32 VMs. > > By the way, does the following scaling policy look good to you? Can I > assume that changing the parallelism of LocalStatisticsProcessor is the > only way to scale VHT? Or is there any other processor I should change the > parallelism? > > * Scaling policy: assign one core per LocalStatisticsProcessor > > Regards, > Shigeru > > > On 5/2/2017 10:25 AM, Nicolas Kourtellis wrote: > > Hi Shigeru, > > > > Thank you for the interest in the VHT algorithm and SAMOA. A couple of > > brief comments from first glance: > > > > - The particular connector with Kafka was not thoroughly tested and that > is > > why it was not merged yet with the main. > > Some teams we are aware of are currently working on a proposed new > > connector, as you can see from this new open issue: > > https://issues.apache.org/jira/browse/SAMOA-65 > > > > - Indeed, when we tested VHT with small set of attributes, the benefit of > > more resources was not obvious, especially in the throughput. Only when > we > > scaled out the problem to thousands of attributes would scalability to > more > > resources make sense. > > > > Hope this helps, > > > > Nicolas > > > > > > > > On Mon, May 1, 2017 at 10:35 PM, Shigeru Imai <[email protected]> wrote: > > > >> Hello, > >> > >> I am testing the scalability of Vertical Hoeffding Tree on SAMOA-Storm > >> consuming streams from Kafka. So far, I have tested up to 32 VMs of > >> m4.large on Amazon EC2; however, throughput does not improve almost at > all. > >> Storm consumes streams at 30 Mbytes/sec from Kafka with 1 VM, and this > >> throughput stays almost the same up to 32 VMs. > >> > >> Here are the experimental settings: > >> * SAMOA: latest on github as of April 2017 > >> * Storm: version 0.10.1 > >> * Dataset: forest covertype (54 attributes, > https://archive.ics.uci.edu/ > >> ml/datasets/Covertype) > >> * Kafka connector: implementation proposed for SAMOA-40 ( > >> https://github.com/apache/incubator-samoa/pull/32) > >> * Scaling policy: assign one core per LocalStatisticsProcessor > >> * Tested with Prequential Evaluation > >> > >> I read the Vertical Hoeffding Tree paper from IEEE BigData 2016, but I > >> could not find the information on how throughput of VHT scales when we > add > >> more resources (it only shows relative performance improvements > compared to > >> the standard Hoeffding tree). > >> > >> Has anyone scale VHT successfully with or without Kafka? Is there any > >> tips to achieve high throughput with VHT? > >> I believe using datasets with more attributes leads to a better > >> scalability for VHT, so I am thinking to try that next, but I think 54 > >> attributes should scale at least a little bit. > >> > >> Also, I found the following sleep of 1 second in > >> StormEntranceProcessingItem.java. It looks to me that this hinders high > >> throughput processing. Can we get rid of this sleep? > >> public void nextTuple() { > >> if (entranceProcessor.hasNext()) { > >> Values value = newValues(entranceProcessor.nextEvent()); > >> collector.emit(outputStream.getOutputId(), value); > >> } else > >> Utils.sleep(1000); > >> // StormTupleInfo tupleInfo = tupleInfoQueue.poll(50, > >> // TimeUnit.MILLISECONDS); > >> // if (tupleInfo != null) { > >> // Values value = newValues(tupleInfo.getContentEvent()); > >> // collector.emit(tupleInfo.getStormStream().getOutputId(), > value); > >> // } > >> } > >> > >> Any suggestions would be appreciated. > >> > >> Thank you, > >> Shigeru > >> > >> -- > >> Shigeru Imai <[email protected]> > >> Ph.D. candidate > >> Worldwide Computing Laboratory > >> Department of Computer Science > >> Rensselaer Polytechnic Institute > >> 110 8th Street, Troy, NY 12180, USA > >> http://wcl.cs.rpi.edu/ > >> > > > > > > > -- Nicolas Kourtellis
