I'm working on the Siddhi syntax for the distributed processing
We can use the bellow execution plan for distributed processing.
<?xml version="1.0" encoding="UTF-8"?>
<executionPlan name="ATMStatsExecutionPlan" statistics="disable"
trace="disable" xmlns="http://wso2.org/carbon/eventprocessor
">
<description>This execution plan is used to identify the possible fraud
transaction</description>
<processMode>local|active-passive|distributed</processMode>
<importedStreams>
<stream as="atmStatsStream" name="atmStatsStream" version="1.0.0"/>
</importedStreams>
<queryExpressions>
<![CDATA[
@parallel="full"
from atmRowStream[cardType=="Credit"]
inset into atmStatsStream
@parallel="partition"
{
partition by bankPartition atmStatsStream.cardProvider
from every a1 = atmStatsStream[amountWithdrawed < 100]
-> b1 = atmStatsStream[amountWithdrawed > 10000 and a1.cardNo
== b1.cardNo]
within 1 day
select a1.cardNo as cardNo, a1.cardHolderName as
cardHolderName, b1.amountWithdrawed as amountWithdrawed, b1.location as
location, b1.cardHolderMobile as cardHolderMobile
insert into possibleFraudStream
partition by bankPartition
from every a1 = atmStatsStream[amountWithdrawed < 100]
-> b1 = atmStatsStream[amountWithdrawed > 10000 and
a1.cardNo == b1.cardNo]
within 1 day
select a1.cardNo as cardNo, a1.cardHolderName as
cardHolderName, b1.amountWithdrawed as amountWithdrawed, b1.location as
location, b1.cardHolderMobile as cardHolderMobile
insert into possibleFraudStream
partition by bankPartition
}
]]>
</queryExpressions>
<exportedStreams>
<stream name="possibleFraudStream" valueOf="possibleFraudStream"
version="1.0.0"/>
</exportedStreams>
</executionPlan>
Here we'll have three modes of execution
1. local
2. active-passive
3. distributed
*Local mode*
This is the one we have now.
* Active-passive*
Here there will be 2 nodes, one active and the other passive. There will be
a handshake protocol between Active and passive, this will be used for
state replication and syncing when a node goes down and joins back.
*Distributed*
Here we use Annotations (These are ignored on the other modes), the
"parallel" annotation denotes the parallelism level and it can be "full"
for fully distributed, "petition" for distribute according to the
partition, or "single" for no distribution.
In the "petition" case all the queries need to be partitioned by the same
partition. We can also use curly braces {} to denote grouping
of parallelism whereby forcing all the queries to fall on the same Siddhi
instance.
We can combine storms reliable messaging and snapshot persistence to
achieve reliable messaging but this still needs more investigation.
Currently we'll mainly focus on the Active-passive case as it will provided
reliable and fault tolerant message processing easily and at the same time
we'll also work on the storm integration for the distributed case.
Thoughts?
Suho
On Wed, Nov 27, 2013 at 11:07 AM, Sanjiva Weerawarana <[email protected]>wrote:
> +1 .. excellent job getting this off the ground! I'd love to see the
> numbers in a real distributed set up :).
>
>
> On Wed, Nov 27, 2013 at 1:47 PM, Srinath Perera <[email protected]> wrote:
>
>> Hi All,
>>
>> I have written a Siddhi bolt that you can use to run Siddhi using Storm
>> in a distributed setup.
>>
>> You can create a SiddhiBolt(s) given any Siddhi query like following.
>>
>> SiddhiBolt siddhiBolt = new SiddhiBolt(
>> new String[]{ "define stream PlayStream1 ( sid string, ts long,
>> x double, y double, z double, a double, v double);"},
>> new String[]{ "from PlayStream1#window.timeBatch(1sec) select
>> sid, avg(v) as avgV insert into AvgRunPlay;" },
>> new String[]{"AvgRunPlay"});
>>
>> Then those bolts can be used within Storm topology like any other bolt.
>> However, the name of components and streams used in CEP queries should
>> match.
>>
>> TopologyBuilder builder = new TopologyBuilder();
>> builder.setSpout("PlayStream1", new FootballDataSpout(), 1);
>> builder.setBolt("AvgRunPlay", siddhiBolt1,
>> 1).shuffleGrouping("PlayStream1");
>>
>> builder.setBolt("FastRunPlay", siddhiBolt2,1).shuffleGrouping("AvgRunPlay");
>> builder.setBolt("LeafEacho", new EchoBolt(),
>> 1).shuffleGrouping("FastRunPlay");
>>
>> I have done a quick performance test and got about 140K TPS in local
>> cluster. We need to test using distributed setup. Lasantha will integrate
>> this with CEP code base.
>>
>> Some potential TODO are
>> 1) Write two new bolts for Siddhi that support reliable processing and
>> transaction processing using Storm constructs. (for cases where we need
>> high reliability while processing)
>> 2) Integrate this with out data agent so we can send events into Storm
>> setup as well.
>> 3) Extend the Siddhi language to support distributed processing, so above
>> topology can be written is Siddhi language itself.
>>
>> If performance confirmed to be in the same range, given stability of
>> Storm, I think we can go with Storm for planned Siddhi distributed
>> processing.
>>
>> Thanks
>> Srinath
>>
>> Code for the bolt can be found in
>> https://svn.wso2.org/repos/wso2/people/srinath/projects/siddhiStormIntegration/src/org/wso2/siddhi/storm/SiddhiBolt.java
>> .
>>
>> Code can be found from
>> https://svn.wso2.org/repos/wso2/people/srinath/projects/siddhiStormIntegration
>>
>> --
>> ============================
>> Srinath Perera, Ph.D.
>> Director, Research, WSO2 Inc.
>> Visiting Faculty, University of Moratuwa
>> Member, Apache Software Foundation
>> Research Scientist, Lanka Software Foundation
>> Blog: http://srinathsview.blogspot.com/
>> Photos: http://www.flickr.com/photos/hemapani/
>> Phone: 0772360902
>>
>> _______________________________________________
>> Architecture mailing list
>> [email protected]
>> https://mail.wso2.org/cgi-bin/mailman/listinfo/architecture
>>
>>
>
>
> --
> Sanjiva Weerawarana, Ph.D.
> Founder, Chairman & CEO; WSO2, Inc.; http://wso2.com/
> email: [email protected]; office: +1 650 745 4499 x5700; cell: +94 77 787
> 6880 | +1 650 265 8311
> blog: http://sanjiva.weerawarana.org/
> Lean . Enterprise . Middleware
>
> _______________________________________________
> Architecture mailing list
> [email protected]
> https://mail.wso2.org/cgi-bin/mailman/listinfo/architecture
>
>
--
*S. Suhothayan*
Associate Technical Lead,
*WSO2 Inc. *http://wso2.com
* <http://wso2.com/>*
lean . enterprise . middleware
*cell: (+94) 779 756 757 <%28%2B94%29%20779%20756%20757> | blog:
http://suhothayan.blogspot.com/ <http://suhothayan.blogspot.com/> twitter:
http://twitter.com/suhothayan <http://twitter.com/suhothayan> | linked-in:
http://lk.linkedin.com/in/suhothayan <http://lk.linkedin.com/in/suhothayan>*
_______________________________________________
Architecture mailing list
[email protected]
https://mail.wso2.org/cgi-bin/mailman/listinfo/architecture