Hi All,
We are in the process of introducing the incremental aggregation capability
to Siddhi, which was formerly achieved with Spark scripts in DAS. Even
though aggregate calculations are already plausible with Siddhi, it has
certain limitations such as the incapability of getting aggregate values
for several durations with a single query and the incapability of
performing fail-safe execution.
Incremental aggregation allows users to obtain aggregate values (e.g.
average, count, sum, min, max, etc.) for specified duration(s). This
aggregation could be specified as follows in Siddhi language.
@store(type="rdbms")
define aggregation testAggregator
from tradesStream
select symbol, avg(price) as avgPrice, sum(price) as total
group by symbol
aggregate by timestamp every sec...year;
In the given example, a stream called tradesStream with "symbol", "price"
and "timestamp" should have been pre-defined. When the duration is given as
sec .. year, average and sum aggregation would be calculated for a second,
minute, hour, day, month and year.
Since the "by timestamp" clause is given here, aggregation would be done
based on a timestamp attached to the event. Otherwise, the event arrival
time would be used. If execution is to be performed based on event arrival
time, the last line of the query should be altered as "aggregate every
sec...year".
This feature internally performs aggregation incrementally. i.e: for
example, subsequent to obtaining the sum for a second, several such "sums"
would be summed up again to obtain the sum for a minute. Since this
approach cannot be directly applied to aggregate functions such as average
(the average of averages, is not an average), such aggregates would be
broken down into base aggregates, with which we can do incremental
processing. For average, the base aggregates would therefore be the sum and
count. This is solely implementation detail which is invisible to the user.
For each duration, the timestamp, group by attribute(s) and base aggregate
values would be maintained. The following depicts how it's done for the
given example.
When a second elapses, the values maintained in second incremental executor
would be forwarded to the minute incremental executor. Those values would
be persisted to a table called TestAggregate_Second at the same time, to
ensure fail-safe execution. The values propagate likewise from window to
window. Rather than forwarding values as soon as a second elapses, we can
delay it by few seconds to ensure that events coming out of order are
aggregated properly. Additionally, if a user specifies a stream attribute
in the select statement of the query, which is not an aggregate, the last
value of it corresponding to a time window would be given as the output.
For example, if the user changes the select statement as "select symbol,
avg(price) as avgPrice, sum(price) as total, volume" (where volume
attribute appears in "tradesStream"), the last volume value within second
window, minute window, etc. would be given as output.
Ultimately, a user can retrieve the aggregate values for a certain duration
as follows.
define stream barStream (symbol string, value int);
from barStream as b join testAggregator as a
on a.symbol == b.symbol
within "2014-02-15T00:00:00Z", "2014-03-16T00:00:00Z"
per "day"
select a.symbol, a.total, a.avgPrice
insert into fooBar;
In the given example, aggregate values would be retrieved, in day
granularity. Aggregations, for all the days within given time period, would
be shown. If the requested information is for the running aggregates, those
would be retrieved from the "Incremental Executors". Otherwise, the
information would be retrieved from the tables. If values must be retrieved
for a period within a certain year, month, etc. the duration could be given
as a single value. For example, changing the query as,' within
"2014-02-**T**:**:**Z"' would retrieve all the day aggregations within
February 2014. Furthermore, the "within" and "per" values could be assigned
from attributes from the incoming stream. An example query is provided
herewith.
define stream barStream (symbol string, value int, startTime string,
endTime string, type string);
from barStream as b join testAggregator as a
on a.symbol == b.symbol
within b.startTime, b.endTime
per b.type
select a.symbol, a.total, a.avgPrice
insert into fooBar;
This is the existent architecture for long running aggregation. Suggestions
are most welcome.
Best Regards
--
*Charini Vimansha Nanayakkara*
Software Engineer at WSO2
Mobile: 0714126293
E-mail: [email protected]
Blog: http://www.charini.me/
<http://wso2.com/signature>
_______________________________________________
Architecture mailing list
[email protected]
https://mail.wso2.org/cgi-bin/mailman/listinfo/architecture