Hi all, Storm trident has a partitionBy() method that would partition according to fields of a tuple. Also, there are methods like shuffle(), global() to repartition processing. Can we integrate this in creating distributed partitions for CEP?
However, it seems using Trident will introduce some performance hit since it introduces a level of complexity on top of storm to guarantee "exactly-once" semantics. Thanks, Lasantha On 10 December 2013 14:05, Srinath Perera <[email protected]> wrote: > Hi Suho, > > Basically proposal is to use @parallel=distributed{ queries} to group > queries into distributed groups and then use storm to distribute the groups > across different machines. I think that should work. > > --Srinath > > > On Mon, Dec 9, 2013 at 11:41 AM, Sriskandarajah Suhothayan > <[email protected]>wrote: > >> I'm working on the Siddhi syntax for the distributed processing >> >> We can use the bellow execution plan for distributed processing. >> >> <?xml version="1.0" encoding="UTF-8"?> >> <executionPlan name="ATMStatsExecutionPlan" statistics="disable" >> trace="disable" xmlns=" >> http://wso2.org/carbon/eventprocessor"> >> <description>This execution plan is used to identify the possible >> fraud transaction</description> >> <processMode>local|active-passive|distributed</processMode> >> <importedStreams> >> <stream as="atmStatsStream" name="atmStatsStream" >> version="1.0.0"/> >> </importedStreams> >> <queryExpressions> >> <![CDATA[ >> >> @parallel="full" >> from atmRowStream[cardType=="Credit"] >> inset into atmStatsStream >> >> @parallel="partition" >> { >> partition by bankPartition atmStatsStream.cardProvider >> >> from every a1 = atmStatsStream[amountWithdrawed < 100] >> -> b1 = atmStatsStream[amountWithdrawed > 10000 and a1.cardNo >> == b1.cardNo] >> within 1 day >> select a1.cardNo as cardNo, a1.cardHolderName as >> cardHolderName, b1.amountWithdrawed as amountWithdrawed, b1.location as >> location, b1.cardHolderMobile as cardHolderMobile >> insert into possibleFraudStream >> partition by bankPartition >> >> from every a1 = atmStatsStream[amountWithdrawed < 100] >> -> b1 = atmStatsStream[amountWithdrawed > 10000 and >> a1.cardNo == b1.cardNo] >> within 1 day >> select a1.cardNo as cardNo, a1.cardHolderName as >> cardHolderName, b1.amountWithdrawed as amountWithdrawed, b1.location as >> location, b1.cardHolderMobile as cardHolderMobile >> insert into possibleFraudStream >> partition by bankPartition >> } >> >> ]]> >> </queryExpressions> >> <exportedStreams> >> <stream name="possibleFraudStream" valueOf="possibleFraudStream" >> version="1.0.0"/> >> </exportedStreams> >> </executionPlan> >> >> Here we'll have three modes of execution >> 1. local >> 2. active-passive >> 3. distributed >> >> *Local mode* >> This is the one we have now. >> >> * Active-passive* >> Here there will be 2 nodes, one active and the other passive. There will >> be a handshake protocol between Active and passive, this will be used for >> state replication and syncing when a node goes down and joins back. >> >> *Distributed* >> Here we use Annotations (These are ignored on the other modes), the >> "parallel" annotation denotes the parallelism level and it can be "full" >> for fully distributed, "petition" for distribute according to the >> partition, or "single" for no distribution. >> In the "petition" case all the queries need to be partitioned by the same >> partition. We can also use curly braces {} to denote grouping >> of parallelism whereby forcing all the queries to fall on the same Siddhi >> instance. >> We can combine storms reliable messaging and snapshot persistence to >> achieve reliable messaging but this still needs more investigation. >> >> Currently we'll mainly focus on the Active-passive case as it will >> provided reliable and fault tolerant message processing easily and at the >> same time we'll also work on the storm integration for the distributed case. >> >> Thoughts? >> >> Suho >> >> >> >> >> >> On Wed, Nov 27, 2013 at 11:07 AM, Sanjiva Weerawarana >> <[email protected]>wrote: >> >>> +1 .. excellent job getting this off the ground! I'd love to see the >>> numbers in a real distributed set up :). >>> >>> >>> On Wed, Nov 27, 2013 at 1:47 PM, Srinath Perera <[email protected]>wrote: >>> >>>> Hi All, >>>> >>>> I have written a Siddhi bolt that you can use to run Siddhi using Storm >>>> in a distributed setup. >>>> >>>> You can create a SiddhiBolt(s) given any Siddhi query like following. >>>> >>>> SiddhiBolt siddhiBolt = new SiddhiBolt( >>>> new String[]{ "define stream PlayStream1 ( sid string, ts >>>> long, x double, y double, z double, a double, v double);"}, >>>> new String[]{ "from PlayStream1#window.timeBatch(1sec) select >>>> sid, avg(v) as avgV insert into AvgRunPlay;" }, >>>> new String[]{"AvgRunPlay"}); >>>> >>>> Then those bolts can be used within Storm topology like any other bolt. >>>> However, the name of components and streams used in CEP queries should >>>> match. >>>> >>>> TopologyBuilder builder = new TopologyBuilder(); >>>> builder.setSpout("PlayStream1", new FootballDataSpout(), 1); >>>> builder.setBolt("AvgRunPlay", siddhiBolt1, >>>> 1).shuffleGrouping("PlayStream1"); >>>> >>>> builder.setBolt("FastRunPlay", >>>> siddhiBolt2,1).shuffleGrouping("AvgRunPlay"); >>>> builder.setBolt("LeafEacho", new EchoBolt(), >>>> 1).shuffleGrouping("FastRunPlay"); >>>> >>>> I have done a quick performance test and got about 140K TPS in local >>>> cluster. We need to test using distributed setup. Lasantha will integrate >>>> this with CEP code base. >>>> >>>> Some potential TODO are >>>> 1) Write two new bolts for Siddhi that support reliable processing and >>>> transaction processing using Storm constructs. (for cases where we need >>>> high reliability while processing) >>>> 2) Integrate this with out data agent so we can send events into Storm >>>> setup as well. >>>> 3) Extend the Siddhi language to support distributed processing, so >>>> above topology can be written is Siddhi language itself. >>>> >>>> If performance confirmed to be in the same range, given stability of >>>> Storm, I think we can go with Storm for planned Siddhi distributed >>>> processing. >>>> >>>> Thanks >>>> Srinath >>>> >>>> Code for the bolt can be found in >>>> https://svn.wso2.org/repos/wso2/people/srinath/projects/siddhiStormIntegration/src/org/wso2/siddhi/storm/SiddhiBolt.java >>>> . >>>> >>>> Code can be found from >>>> https://svn.wso2.org/repos/wso2/people/srinath/projects/siddhiStormIntegration >>>> >>>> -- >>>> ============================ >>>> Srinath Perera, Ph.D. >>>> Director, Research, WSO2 Inc. >>>> Visiting Faculty, University of Moratuwa >>>> Member, Apache Software Foundation >>>> Research Scientist, Lanka Software Foundation >>>> Blog: http://srinathsview.blogspot.com/ >>>> Photos: http://www.flickr.com/photos/hemapani/ >>>> Phone: 0772360902 >>>> >>>> _______________________________________________ >>>> Architecture mailing list >>>> [email protected] >>>> https://mail.wso2.org/cgi-bin/mailman/listinfo/architecture >>>> >>>> >>> >>> >>> -- >>> Sanjiva Weerawarana, Ph.D. >>> Founder, Chairman & CEO; WSO2, Inc.; http://wso2.com/ >>> email: [email protected]; office: +1 650 745 4499 x5700; cell: +94 77 >>> 787 6880 | +1 650 265 8311 >>> blog: http://sanjiva.weerawarana.org/ >>> Lean . Enterprise . Middleware >>> >>> _______________________________________________ >>> Architecture mailing list >>> [email protected] >>> https://mail.wso2.org/cgi-bin/mailman/listinfo/architecture >>> >>> >> >> >> -- >> >> *S. Suhothayan* >> Associate Technical Lead, >> *WSO2 Inc. *http://wso2.com >> * <http://wso2.com/>* >> lean . enterprise . middleware >> >> >> *cell: (+94) 779 756 757 <%28%2B94%29%20779%20756%20757> | blog: >> http://suhothayan.blogspot.com/ <http://suhothayan.blogspot.com/> twitter: >> http://twitter.com/suhothayan <http://twitter.com/suhothayan> | linked-in: >> http://lk.linkedin.com/in/suhothayan <http://lk.linkedin.com/in/suhothayan>* >> >> > > > -- > ============================ > Srinath Perera, Ph.D. > http://people.apache.org/~hemapani/ > http://srinathsview.blogspot.com/ > > _______________________________________________ > Architecture mailing list > [email protected] > https://mail.wso2.org/cgi-bin/mailman/listinfo/architecture > > -- *Lasantha Fernando* Software Engineer - Data Technologies Team WSO2 Inc. http://wso2.com email: [email protected] mobile: (+94) 71 5247551
_______________________________________________ Architecture mailing list [email protected] https://mail.wso2.org/cgi-bin/mailman/listinfo/architecture
