Introduction
This proposal is to address the CEP internal state distribution at
Pattern/Sequence rules. Currently, you need to use SiddhiQL event tables in
order to share states across multiple Siddhi instances. You can do this using
Hazelcast or using RBDMS event tables. In both cases there is a big performance
penalty, and thus they are not generally used. This document proposes using
Kafka as synchronization mechanism instead of using event tables.
Our proposed alternative is independent on the data sources (kafka, thrift, …)
or the underlying Siddhi clustering mechanism (Storm as used in WSO2 or Spark
as proposed by Stratio). The requirement to use Kafka only affects the
synchronization mechanisms.
When two or more Siddhi instances consume events from a Kafka topic with two or
more partitions each one receives a different subset of the events. This is the
key of Kafka scaling. As the events received at each Siddhi instance aren’t the
same, a rule that relates events received at different instances will not
match. A Kafka based synchronization mechanism is proposed to solve this issue.
In the future we expect to extend this mechanism to other types of rules, such
as those involving counting or averages.
Proposal
As an example (in this case fully Kafka based) the image below shows a Kafka
cluster with three topics: Input, Sync and Output and two CEP instances. Both
Input and Output topics have two partitions. The sync topic has one partition.
Each Siddhi CEP instance has one embedded Kafka Consumer and one Kafka
Producer. These two elements are used to read and write input events and rules
output respectively. This document proposes to embed a Kafka consumer and a
Kafka producer at Siddhi Core library. This will allow to use a Kafka topic as
the distribution mechanism between multiple instances.
The rules are fully replicated in each Siddhi instance, without requiring
specific rule partition or distribution mechanisms (and thus removing the need
to design and implement complex cost based distribution mechanisms).
In this example, we only have one rule, being the same in both Siddhi
instances:
from every e1=TempStream, e2=TempStream[e1.temp + 1 < temp ]
select e1.temp as initialTemp, e2.temp as finalTemp
insert into AlertStream;
This rule alerts if there is more than 1 degree increase in temperature between
two consecutive temperature events. A and B are events that match these
conditions. Both are events at TempStream and B has more than 1 degree
increase. The process steps are described below:
(0): A and B are produced to Input Kafka Topic.
(1): CEP1 KafkaConsumer consumes A. CEP2 KafkaConsumer consumes B.
(2): Each KafkaConsumer sends the event to Siddhi Core Library.
(3): Both CEP1 and CEP2 share their state to a Kafka topic to synchronize their
states.
(4): CEP1 and CEP2 consume the state shared by the others.
(5): CEP2 knows: “I received B and A was received by CEP1. The rule matches. I
must alert because I received the last rule condition (see the rule above, B is
“e2=TempStream[e1.temp + 1 < temp ]”)”. CEP1 knows: “I received A and B was
received by CEP2. CEP2 must alert because it received the last rule condition”
(6): SiddhiCore at CEP2 sends the alert event to KafkaProducer embeded at CEP2.
(7): KafkaProducer embeded at CEP2 produces the event to Output Kafka topic.
This mechanism allows to scale a CEP cluster using multiple instances without
using event tables.
Method characteristics
It is technology agnostic, it works with a Storm cluster or with any other
realtime processing technology.
All nodes must have all rules. As all instances share their internal states,
all of them can have all rules. A distributed execution plan doesn’t need to be
planned.
Kafka Benefits
The benefits of using Kafka as state distribution method are:
Kafka guarantees that events produced to a partition by a producer will be
appended in the order they are sent. This allows to use this mechanism with
states that need time order.
The consumers see the states in the order they are stored in the log.
With a N replication of Sync Topic, N-1 broker failures will be tolerated.
Implementation Plan
Stage 1 ) Embed Kafka Producer/Consumer at Siddhi Core
The first stage is to embed Kafka Producer and Consumer at Siddhi Library.
Stage 2 ) Implement state distribution algorithm
The second stage is to implement the distribution algorithm needed to share the
internal rules machine states. At the same time the state the internal machine
is updated, it should produce the synchronization event to the sync topic.
Stage 3 ) Implement state update algorithm
The third stage is to implement the state update algorithm that consumes the
states produced at the synchronization topic and updates the internal machine
state for each rule.
Known deficiencies
As the signaling mechanism is asynchronous, there is a potential race condition
if both events are read out of order by both instances or if they are close
enough in time that the “sync” message might not have arrived.
This situation is also present with current Hazelcast and RBDMS, and thus
accepted by its users. Still, as Kafka is incorporating precise event time
capabilities, we are evaluating the possibility to use them without affecting
too much the current performance.
_______________________________________________
Architecture mailing list
[email protected]
https://mail.wso2.org/cgi-bin/mailman/listinfo/architecture