Introduction

This proposal is to address the CEP internal state distribution at 
Pattern/Sequence rules. Currently, you need to use SiddhiQL event tables in 
order to share states across multiple Siddhi instances. You can do this using 
Hazelcast or using RBDMS event tables. In both cases there is a big performance 
penalty, and thus they are not generally used. This document proposes using 
Kafka as synchronization mechanism instead of using event tables. 

Our proposed alternative is independent on the data sources (kafka, thrift, …) 
or the underlying Siddhi clustering mechanism (Storm as used in WSO2 or Spark 
as proposed by Stratio). The requirement to use Kafka only affects the 
synchronization mechanisms.

When two or more Siddhi instances consume events from a Kafka topic with two or 
more partitions each one receives a different subset of the events. This is the 
key of Kafka scaling. As the events received at each Siddhi instance aren’t the 
same, a rule that relates events received at different instances will not 
match. A Kafka based synchronization mechanism is proposed to solve this issue.

In the future we expect to extend this mechanism to other types of rules, such 
as those involving counting or averages.
Proposal

As an example (in this case fully Kafka based) the image below shows a Kafka 
cluster with three topics: Input, Sync and Output and two CEP instances. Both 
Input and Output topics have two partitions. The sync topic has one partition. 
Each Siddhi CEP instance has one embedded Kafka Consumer and one Kafka 
Producer. These two elements are used to read and write input events and rules 
output respectively. This document proposes to embed a Kafka consumer and a 
Kafka producer at Siddhi Core library. This will allow to use a Kafka topic as 
the distribution mechanism between multiple instances.

The rules are fully replicated in each Siddhi instance, without requiring 
specific rule partition or distribution mechanisms (and thus removing the need 
to design and implement complex cost based distribution mechanisms).
 
In this example, we only have one rule, being the same in both Siddhi 
instances: 
from every e1=TempStream, e2=TempStream[e1.temp + 1 < temp ]
select e1.temp as initialTemp, e2.temp as finalTemp
insert into AlertStream;

This rule alerts if there is more than 1 degree increase in temperature between 
two consecutive temperature events. A and B are events that match these 
conditions. Both are events at TempStream and B has more than 1 degree 
increase. The process steps are described below:

 


(0): A and B are produced to Input Kafka Topic.
(1): CEP1 KafkaConsumer consumes A. CEP2 KafkaConsumer consumes B. 
(2): Each KafkaConsumer sends the event to Siddhi Core Library.
(3): Both CEP1 and CEP2 share their state to a Kafka topic to synchronize their 
states.
(4): CEP1 and CEP2 consume the state shared by the others.
(5): CEP2 knows: “I received B and A was received by CEP1. The rule matches. I 
must alert because I received the last rule condition (see the rule above, B is 
“e2=TempStream[e1.temp + 1 < temp ]”)”. CEP1 knows: “I received A and B was 
received by CEP2. CEP2 must alert because it received the last rule condition”
(6): SiddhiCore at CEP2 sends the alert event to KafkaProducer embeded at CEP2.
(7): KafkaProducer embeded at CEP2 produces the event to Output Kafka topic.

This mechanism allows to scale a CEP cluster using multiple instances without 
using event tables. 

Method characteristics
It is technology agnostic, it works with a Storm cluster or with any other 
realtime processing technology.
All nodes must have all rules. As all instances share their internal states, 
all of them can have all rules. A distributed execution plan doesn’t need to be 
planned.
Kafka Benefits

The benefits of using Kafka as state distribution method are:

Kafka guarantees that events produced to a partition by a producer will be 
appended in the order they are sent. This allows to use this mechanism with 
states that need time order.
The consumers see the states in the order they are stored in the log.
With a N replication of Sync Topic, N-1 broker failures will be tolerated.

Implementation Plan
Stage 1 ) Embed Kafka Producer/Consumer at Siddhi Core
The first stage is to embed Kafka Producer and Consumer at Siddhi Library.
Stage 2 ) Implement state distribution algorithm
The second stage is to implement the distribution algorithm needed to share the 
internal rules machine states. At the same time the state the internal machine 
is updated, it should produce the synchronization event to the sync topic. 
Stage 3 ) Implement state update algorithm
The third stage is to implement the state update algorithm that consumes the 
states produced at the synchronization topic and updates the internal machine 
state for each rule. 

Known deficiencies

As the signaling mechanism is asynchronous, there is a potential race condition 
if both events are read out of order by both instances or if they are close 
enough in time that the “sync” message might not have arrived.

This situation is also present with current Hazelcast and RBDMS, and thus 
accepted by its users. Still, as Kafka is incorporating precise event time 
capabilities, we are evaluating the possibility to use them without affecting 
too much the current performance.
_______________________________________________
Architecture mailing list
[email protected]
https://mail.wso2.org/cgi-bin/mailman/listinfo/architecture

Reply via email to