Zhao, Qingwen created EAGLE-1041:
------------------------------------

             Summary: Support policy processing pipeline
                 Key: EAGLE-1041
                 URL: https://issues.apache.org/jira/browse/EAGLE-1041
             Project: Eagle
          Issue Type: Improvement
    Affects Versions: v0.5.0
            Reporter: Zhao, Qingwen
            Assignee: Zhao, Qingwen


In some cases, like increment or decrement pattern, data need to be processed 
in more than one stage. For example, alert if the metric value increases by N. 
Two steps to get the right alert.
1. sort & filter events which meet the filter conditions.
2. define data change pattern. 

Here is a sample policy
{code}
fromHADOOP_JMX_METRIC_STREAM_SANDBOX[metric=="hadoop.namenode.dfs.missingblocks"]#window.externalTime(timestamp,1min)
 select * group by site,host,component, metric insert into temp;
from every 
a=HADOOP_JMX_METRIC_STREAM_SANDBOX[metric=="hadoop.namenode.dfs.missingblocks"],
b=HADOOP_JMX_METRIC_STREAM_SANDBOX[b.component==a.componentandb.metric==a.metricandb.host==a.hostandb.value>a.valueanda.value>100]
selectb.site,b.host,b.component, b.metric, b.value as newNumOfMissingBlocks, 
a.value as oldNumOfMissingBlocks, (b.value-a.value) as 
increastedNumOfMissingBlocks insert into 
HADOOP_JMX_METRIC_STREAM_SANDBOX_MISS_BLOCKS_LARGER_OUT;
{code}

There are two queries in this policy. The first one with the time window 
condition tells Eagle to sort the original events. The second one defines the 
data pattern. As the constraint of Siddhi syntax 
(https://docs.wso2.com/display/CEP420/SiddhiQL+Guide+3.1#SiddhiQLGuide3.1-Pattern),
 the filtering of input events does not work. 

Luckily, if we put the output stream of the first query as the input stream of 
the second query, it works. That's the problem this ticket tries to solve. 

Ideally, the right policy can be written as 
{code}
fromHADOOP_JMX_METRIC_STREAM_SANDBOX[metric=="hadoop.namenode.dfs.missingblocks"]#window.externalTime(timestamp,1min)
 select * group by site,host,component, metric insert into MISSING_BLOCK_OUT;
from every a=MISSING_BLOCK_OUT[metric=="hadoop.namenode.dfs.missingblocks"],
b=MISSING_BLOCK_OUT[b.component==a.componentandb.metric==a.metricandb.host==a.hostandb.value>a.valueanda.value>100]
selectb.site,b.host,b.component, b.metric, b.value as newNumOfMissingBlocks, 
a.value as oldNumOfMissingBlocks, (b.value-a.value) as 
increastedNumOfMissingBlocks insert into 
HADOOP_JMX_METRIC_STREAM_SANDBOX_MISS_BLOCKS_LARGER_OUT;
{code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to