[
https://issues.apache.org/jira/browse/EAGLE-98?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Hao Chen updated EAGLE-98:
--------------------------
Description:
h2. Features
* High Level Stream-Oriented
* Declarative Streaming
* Metadata Driven
* Native Scala internal DSL
* Support Scala Programing or Script/Configure in *.egl
* Support static policy definition / dynamical policy loader
* IDE friendly features like sql-prefix and xml as email template.
h2. Syntax
<syntaxhighlight lang="scala">
// Topology Definition API by extends or script
import org.apache.eagle.stream.dsl.experimental.KafkaInterface._
import org.apache.eagle.stream.dsl.experimental.DruidInterface._
// #!/bin/bash
// exec scala "$0" "$@"
// !#
// # start
define ("metricStream_1") as ("name" -> 'string, "value"->'double,
"timestamp"->'long) from
kafka(topic="metricStream_1",zk=conf"kafka.zk.hosts",deserializer="")
define ("metricStream_2") as ("name" -> 'string, "value"->'double,
"timestamp"->'long) from
kafka(topic="metricStream_2")
define ("logStream_3") from kafka(topic="logStream_3")
// filter by function
filter ("logStream_3") by {(line,collector) => collector.collect(line)} as
("name" -> 'string, "value"->'double, "timestamp"->'long)
// "logStream_3" as ("name" -> 'string, "value"->'double, "timestamp"->'long)
// filter by pattern and rename stream
filter("logStream_3"->"logStream_3_parsed") by
"""(?<timestamp>\d{4}-\d{2}-\d{2})""".r as ("name" -> 'string,
"value"->'double, "timestamp"-> datetime("YYYY-MM-DD"))
alert partitionBy "metricStream_1.metricType" parallism 1 by {sql"""
from metricStream_1[component=='dn' and
metricType=="RpcActivityForPort50020.RpcQueueTimeNumOps"].time[3600]
select sum(value) group by host output every 1 hour insert into alertStream;
"""}
aggregate partitionBy "metricStream_1.metricType" parallism 2 by {sql"""
from metricStream_1[component=='dn' and
metricType=="RpcActivityForPort50020.RpcQueueTimeNumOps"].time[3600]
select sum(value) group by host output every 1 hour insert into
aggregatedMetricStream_1;
"""}
'alertStream ~> kafka("alert_topic",zk=conf"kafka.zk.hosts")
"alertStream" to mail(
from = "[email protected]",
to = "[email protected]",
smtp = "localhost:25",
template =
<html>
<head>
<title>Alert Notification</title>
</head>
<body>
<h1>Message</h1>
<p>$message</p>
</body>
</html>
)
// split stream by logic
'aggregatedMetricStream_1 to kafka("aggregated_stream_dn") where "component ==
'dn'" partitionBy "aggregatedMetricStream_1.metricType"
'aggregatedMetricStream_1 ~> druid("aggregated_stream_nn") where "component ==
'nn'" partitionBy "aggregatedMetricStream_1.metricType"
// # end
</syntaxhighlight>
was:
h2. Features
* High Level Stream-Oriented
* Declarative Streaming
* Metadata Driven
* Native Scala internal DSL
* Support Scala Programing or Script/Configure in *.egl
* Support static policy definition / dynamical policy loader
* IDE friendly features like sql-prefix and xml as email template.
h2. Syntax
{code}
// Topology Definition API by extends or script
import org.apache.eagle.stream.dsl.experimental.KafkaInterface._
import org.apache.eagle.stream.dsl.experimental.DruidInterface._
// #!/bin/bash
// exec scala "$0" "$@"
// !#
// # start
define ("metricStream_1") as ("name" -> 'string, "value"->'double,
"timestamp"->'long) from
kafka(topic="metricStream_1",zk=conf"kafka.zk.hosts",deserializer="")
define ("metricStream_2") as ("name" -> 'string, "value"->'double,
"timestamp"->'long) from
kafka(topic="metricStream_2")
define ("logStream_3") from kafka(topic="logStream_3")
// filter by function
filter ("logStream_3") by {(line,collector) => collector.collect(line)} as
("name" -> 'string, "value"->'double, "timestamp"->'long)
// "logStream_3" as ("name" -> 'string, "value"->'double, "timestamp"->'long)
// filter by pattern and rename stream
filter("logStream_3"->"logStream_3_parsed") by
"""(?<timestamp>\d{4}-\d{2}-\d{2})""".r as ("name" -> 'string,
"value"->'double, "timestamp"-> datetime("YYYY-MM-DD"))
alert partitionBy "metricStream_1.metricType" parallism 1 by {sql"""
from metricStream_1[component=='dn' and
metricType=="RpcActivityForPort50020.RpcQueueTimeNumOps"].time[3600]
select sum(value) group by host output every 1 hour insert into alertStream;
"""}
aggregate partitionBy "metricStream_1.metricType" parallism 2 by {sql"""
from metricStream_1[component=='dn' and
metricType=="RpcActivityForPort50020.RpcQueueTimeNumOps"].time[3600]
select sum(value) group by host output every 1 hour insert into
aggregatedMetricStream_1;
"""}
'alertStream ~> kafka("alert_topic",zk=conf"kafka.zk.hosts")
"alertStream" to mail(
from = "[email protected]",
to = "[email protected]",
smtp = "localhost:25",
template =
<html>
<head>
<title>Alert Notification</title>
</head>
<body>
<h1>Message</h1>
<p>$message</p>
</body>
</html>
)
// split stream by logic
'aggregatedMetricStream_1 to kafka("aggregated_stream_dn") where "component ==
'dn'" partitionBy "aggregatedMetricStream_1.metricType"
'aggregatedMetricStream_1 ~> druid("aggregated_stream_nn") where "component ==
'nn'" partitionBy "aggregatedMetricStream_1.metricType"
// # end
{code}
> Eagle Declarative Topology Definition DSL
> -----------------------------------------
>
> Key: EAGLE-98
> URL: https://issues.apache.org/jira/browse/EAGLE-98
> Project: Eagle
> Issue Type: New Feature
> Affects Versions: 0.3.0
> Reporter: Hao Chen
> Assignee: Hao Chen
> Fix For: 0.3.0
>
>
> h2. Features
> * High Level Stream-Oriented
> * Declarative Streaming
> * Metadata Driven
> * Native Scala internal DSL
> * Support Scala Programing or Script/Configure in *.egl
> * Support static policy definition / dynamical policy loader
> * IDE friendly features like sql-prefix and xml as email template.
> h2. Syntax
> <syntaxhighlight lang="scala">
> // Topology Definition API by extends or script
> import org.apache.eagle.stream.dsl.experimental.KafkaInterface._
> import org.apache.eagle.stream.dsl.experimental.DruidInterface._
> // #!/bin/bash
> // exec scala "$0" "$@"
> // !#
> // # start
> define ("metricStream_1") as ("name" -> 'string, "value"->'double,
> "timestamp"->'long) from
> kafka(topic="metricStream_1",zk=conf"kafka.zk.hosts",deserializer="")
> define ("metricStream_2") as ("name" -> 'string, "value"->'double,
> "timestamp"->'long) from
> kafka(topic="metricStream_2")
> define ("logStream_3") from kafka(topic="logStream_3")
> // filter by function
> filter ("logStream_3") by {(line,collector) => collector.collect(line)} as
> ("name" -> 'string, "value"->'double, "timestamp"->'long)
> // "logStream_3" as ("name" -> 'string, "value"->'double, "timestamp"->'long)
> // filter by pattern and rename stream
> filter("logStream_3"->"logStream_3_parsed") by
> """(?<timestamp>\d{4}-\d{2}-\d{2})""".r as ("name" -> 'string,
> "value"->'double, "timestamp"-> datetime("YYYY-MM-DD"))
> alert partitionBy "metricStream_1.metricType" parallism 1 by {sql"""
> from metricStream_1[component=='dn' and
> metricType=="RpcActivityForPort50020.RpcQueueTimeNumOps"].time[3600]
> select sum(value) group by host output every 1 hour insert into alertStream;
> """}
> aggregate partitionBy "metricStream_1.metricType" parallism 2 by {sql"""
> from metricStream_1[component=='dn' and
> metricType=="RpcActivityForPort50020.RpcQueueTimeNumOps"].time[3600]
> select sum(value) group by host output every 1 hour insert into
> aggregatedMetricStream_1;
> """}
> 'alertStream ~> kafka("alert_topic",zk=conf"kafka.zk.hosts")
> "alertStream" to mail(
> from = "[email protected]",
> to = "[email protected]",
> smtp = "localhost:25",
> template =
> <html>
> <head>
> <title>Alert Notification</title>
> </head>
> <body>
> <h1>Message</h1>
> <p>$message</p>
> </body>
> </html>
> )
> // split stream by logic
> 'aggregatedMetricStream_1 to kafka("aggregated_stream_dn") where "component
> == 'dn'" partitionBy "aggregatedMetricStream_1.metricType"
> 'aggregatedMetricStream_1 ~> druid("aggregated_stream_nn") where "component
> == 'nn'" partitionBy "aggregatedMetricStream_1.metricType"
> // # end
> </syntaxhighlight>
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)