[
https://issues.apache.org/jira/browse/EAGLE-98?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Hao Chen updated EAGLE-98:
--------------------------
Description:
h2. Features
* High Level Stream-Oriented
* Declarative Streaming
* Metadata Driven
* Native Scala internal DSL
* Support Scala Programing or Script/Configure in *.egl
* Support static policy definition / dynamical policy loader
* IDE friendly features like sql-prefix and xml as email template.
h2. Syntax
{code}
// Plug-able DSL Interface
import org.apache.eagle.stream.dsl.AggregateInterface._
import org.apache.eagle.stream.dsl.AlertInterface._
import org.apache.eagle.stream.dsl.DefineInterface._
import org.apache.eagle.stream.dsl.DruidInterface._
import org.apache.eagle.stream.dsl.KafkaInterface._
// Topology Definition API by extends or script
// #!/bin/bash
// exec scala "$0" "$@"
// !#
// # start
define ("metricStream_1") as ("name" -> 'string, "value"->'double,
"timestamp"->'long) from kafka(topic="metricStream_1",zk=conf"kafka.zk.hosts")
define ("metricStream_2") as ("name" -> 'string, "value"->'double,
"timestamp"->'long) from kafka(topic="metricStream_2")
alert partitionBy "metricStream_1.metricType" parallism 1 by sql"""
from metricStream_1[component=='dn' and
metricType=="RpcActivityForPort50020.RpcQueueTimeNumOps"].time[3600]
select sum(value) group by host output every 1 hour insert into alertStream;
"""
aggregate partitionBy "metricStream_1.metricType" parallism 2 by sql"""
from metricStream_1[component=='dn' and
metricType=="RpcActivityForPort50020.RpcQueueTimeNumOps"].time[3600]
select sum(value) group by host output every 1 hour insert into
aggregatedMetricStream_1;
"""
"alertStream" ~> kafka("alert_topic",zk=conf"kafka.zk.hosts")
"alertStream" to mail(
from = "[email protected]",
to = "[email protected]",
smtp = "localhost:25",
template =
<html>
<head>
<title>Alert Notification</title>
</head>
<body>
<h1>Message</h1>
<p>$message</p>
</body>
</html>
)
// split stream by logic
"aggregatedMetricStream_1" to kafka("aggregated_stream_dn") where "component ==
'dn'" partitionBy "aggregatedMetricStream_1.metricType"
"aggregatedMetricStream_1" ~> druid("aggregated_stream_nn") where "component
== 'nn'" partitionBy "aggregatedMetricStream_1.metricType"
// # end
{code}
was:
h2. Features
* High Level Stream-Oriented
* Declarative Streaming
* Metadata Driven
* Native Scala internal DSL
* Support Scala Programing or Script/Configure in *.egl
* Support static policy definition / dynamical policy loader
* IDE friendly features like sql-prefix and xml as email template.
h2. Syntax
{code}
// Plug-able DSL Interface
import org.apache.eagle.stream.dsl.AggregateInterface._
import org.apache.eagle.stream.dsl.AlertInterface._
import org.apache.eagle.stream.dsl.DefineInterface._
import org.apache.eagle.stream.dsl.DruidInterface._
import org.apache.eagle.stream.dsl.KafkaInterface._
// Topology Definition API by extends or script
// #!/bin/bash
// exec scala "$0" "$@"
// !#
// # start
define ("metricStream_1") as ("name" -> 'string, "value"->'double,
"timestamp"->'long) from kafka(topic="metricStream_1",zk=conf"kafka.zk.hosts")
define ("metricStream_2") as ("name" -> 'string, "value"->'double,
"timestamp"->'long) from kafka(topic="metricStream_2")
alert partitionBy "metricStream_1.metricType" parallism 1 by sql"""
from metricStream_1[component=='dn' and
metricType=="RpcActivityForPort50020.RpcQueueTimeNumOps"].time[3600]
select sum(value) group by host output every 1 hour insert into alertStream;
"""
aggregate partitionBy "metricStream_1.metricType" parallism 2 by sql"""
from metricStream_1[component=='dn' and
metricType=="RpcActivityForPort50020.RpcQueueTimeNumOps"].time[3600]
select sum(value) group by host output every 1 hour insert into
aggregatedMetricStream_1;
"""
"alertStream" ~> kafka("alert_topic",zk=conf"kafka.zk.hosts")
"alertStream" to mail(
from = "[email protected]",
to = "[email protected]",
smtp = "localhost:25",
template =
<html>
<head>
<title>Alert Notification</title>
</head>
<body>
<h1>Message</h1>
<p>$message</p>
</body>
</html>
)
// split stream by logic
"aggregatedMetricStream_1" to kafka("aggregated_stream_dn") where "component
== 'dn'" partitionBy "aggregatedMetricStream_1.metricType"
"aggregatedMetricStream_1" ~> druid("aggregated_stream_nn") where "component
== 'nn'" partitionBy "aggregatedMetricStream_1.metricType"
// # end
{code}
> Eagle Declarative Topology Definition DSL
> -----------------------------------------
>
> Key: EAGLE-98
> URL: https://issues.apache.org/jira/browse/EAGLE-98
> Project: Eagle
> Issue Type: New Feature
> Affects Versions: 0.3.0
> Reporter: Hao Chen
> Assignee: Hao Chen
> Fix For: 0.3.0
>
>
> h2. Features
> * High Level Stream-Oriented
> * Declarative Streaming
> * Metadata Driven
> * Native Scala internal DSL
> * Support Scala Programing or Script/Configure in *.egl
> * Support static policy definition / dynamical policy loader
> * IDE friendly features like sql-prefix and xml as email template.
> h2. Syntax
> {code}
> // Plug-able DSL Interface
> import org.apache.eagle.stream.dsl.AggregateInterface._
> import org.apache.eagle.stream.dsl.AlertInterface._
> import org.apache.eagle.stream.dsl.DefineInterface._
> import org.apache.eagle.stream.dsl.DruidInterface._
> import org.apache.eagle.stream.dsl.KafkaInterface._
> // Topology Definition API by extends or script
> // #!/bin/bash
> // exec scala "$0" "$@"
> // !#
> // # start
> define ("metricStream_1") as ("name" -> 'string, "value"->'double,
> "timestamp"->'long) from kafka(topic="metricStream_1",zk=conf"kafka.zk.hosts")
> define ("metricStream_2") as ("name" -> 'string, "value"->'double,
> "timestamp"->'long) from kafka(topic="metricStream_2")
> alert partitionBy "metricStream_1.metricType" parallism 1 by sql"""
> from metricStream_1[component=='dn' and
> metricType=="RpcActivityForPort50020.RpcQueueTimeNumOps"].time[3600]
> select sum(value) group by host output every 1 hour insert into alertStream;
> """
> aggregate partitionBy "metricStream_1.metricType" parallism 2 by sql"""
> from metricStream_1[component=='dn' and
> metricType=="RpcActivityForPort50020.RpcQueueTimeNumOps"].time[3600]
> select sum(value) group by host output every 1 hour insert into
> aggregatedMetricStream_1;
> """
> "alertStream" ~> kafka("alert_topic",zk=conf"kafka.zk.hosts")
> "alertStream" to mail(
> from = "[email protected]",
> to = "[email protected]",
> smtp = "localhost:25",
> template =
> <html>
> <head>
> <title>Alert Notification</title>
> </head>
> <body>
> <h1>Message</h1>
> <p>$message</p>
> </body>
> </html>
> )
> // split stream by logic
> "aggregatedMetricStream_1" to kafka("aggregated_stream_dn") where "component
> == 'dn'" partitionBy "aggregatedMetricStream_1.metricType"
> "aggregatedMetricStream_1" ~> druid("aggregated_stream_nn") where "component
> == 'nn'" partitionBy "aggregatedMetricStream_1.metricType"
> // # end
> {code}
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)