[ https://issues.apache.org/jira/browse/EAGLE-98?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15072090#comment-15072090 ]
Hao Chen edited comment on EAGLE-98 at 12/27/15 8:25 AM: --------------------------------------------------------- Support Stream DSL Code Evaluation: https://github.com/haoch/incubator-eagle/commit/6bbf9a60c73e8bb14d8ccf165ac574a663c1335b {code} val code = """ | define("logStream") from Seq( | "55.3.244.1 GET /index.html 15824 0.043", | "55.3.244.1 GET /index.html 15824 0.043", | "55.3.244.1 GET /index.html 15824 0.043", | "55.3.244.1 GET /index.html 15824 0.043", | "55.3.244.1 GET /index.html 15824 0.043", | "55.3.244.1 GET /index.html 15824 0.043" | ) as ("line"->'string) parallism 1 | | filter("logStream") by grok { | pattern("line"->"(?<ip>[\\d+\\.]+)\\s+(?<method>\\w+)\\s+(?<path>[\\w/\\.]+)\\s+(?<bytes>\\d+)\\s+(?<time>[\\d+\\.]+)".r) | } parallism 1 | | 'logStream to stdout """.stripMargin val ret = StreamEvaluator(code).evaluate {code} was (Author: haoch): Support Stream DSL Code Evaluation: https://github.com/haoch/incubator-eagle/blob/77ba2f98a91fd62f1d2b7d3d9dc0d40c5da3d85d/eagle-core/eagle-data-process/eagle-stream-process-dsl/src/test/scala/org/apache/eagle/stream/dsl/execution/StreamEvaluatorSpec.scala#L51-L71 {code} val code = """ | define("logStream") from Seq( | "55.3.244.1 GET /index.html 15824 0.043", | "55.3.244.1 GET /index.html 15824 0.043", | "55.3.244.1 GET /index.html 15824 0.043", | "55.3.244.1 GET /index.html 15824 0.043", | "55.3.244.1 GET /index.html 15824 0.043", | "55.3.244.1 GET /index.html 15824 0.043" | ) as ("line"->'string) parallism 1 | | filter("logStream") by grok { | pattern("line"->"(?<ip>[\\d+\\.]+)\\s+(?<method>\\w+)\\s+(?<path>[\\w/\\.]+)\\s+(?<bytes>\\d+)\\s+(?<time>[\\d+\\.]+)".r) | } parallism 1 | | 'logStream to stdout """.stripMargin val ret = StreamEvaluator(code).evaluate {code} > Eagle Declarative Topology Definition DSL > ----------------------------------------- > > Key: EAGLE-98 > URL: https://issues.apache.org/jira/browse/EAGLE-98 > Project: Eagle > Issue Type: New Feature > Affects Versions: 0.3.0 > Reporter: Hao Chen > Assignee: Hao Chen > Fix For: 0.3.0 > > > h2. Features > * High Level Stream-Oriented > * Declarative Streaming > * Metadata Driven > * Native Scala internal DSL > * Support Scala Programing or Script/Configure in *.egl > * Support static policy definition / dynamical policy loader > * IDE friendly features like sql-prefix and xml as email template. > * Name Reference > h2. Syntax > {code:language=scala} > // Topology Definition API by extends or script > import org.apache.eagle.stream.dsl.experimental.KafkaInterface._ > import org.apache.eagle.stream.dsl.experimental.DruidInterface._ > // #!/bin/bash > // exec scala "$0" "$@" > // !# > // # start > define ("metricStream_1") as ("name" -> 'string, "value"->'double, > "timestamp"->'long) from > kafka(topic="metricStream_1",zk=conf"kafka.zk.hosts",deserializer="") > define ("metricStream_2") as ("name" -> 'string, "value"->'double, > "timestamp"->'long) from > kafka(topic="metricStream_2") > define ("logStream_3") from kafka(topic="logStream_3") > // filter by function > filter ("logStream_3") by {(line,collector) => collector.collect(line)} as > ("name" -> 'string, "value"->'double, "timestamp"->'long) > // "logStream_3" as ("name" -> 'string, "value"->'double, "timestamp"->'long) > // filter by pattern and rename stream > filter("logStream_3"->"logStream_3_parsed") by > """(?<timestamp>\d{4}-\d{2}-\d{2})""".r as ("name" -> 'string, > "value"->'double, "timestamp"-> datetime("YYYY-MM-DD")) > alert partitionBy "metricStream_1.metricType" parallism 1 by {sql""" > from metricStream_1[component=='dn' and > metricType=="RpcActivityForPort50020.RpcQueueTimeNumOps"].time[3600] > select sum(value) group by host output every 1 hour insert into alertStream; > """} > aggregate partitionBy "metricStream_1.metricType" parallism 2 by {sql""" > from metricStream_1[component=='dn' and > metricType=="RpcActivityForPort50020.RpcQueueTimeNumOps"].time[3600] > select sum(value) group by host output every 1 hour insert into > aggregatedMetricStream_1; > """} > 'alertStream ~> kafka("alert_topic",zk=conf"kafka.zk.hosts") > "alertStream" to mail( > from = "sen...@eagle.incubator.apache.org", > to = "recei...@eagle.incubator.apache.org", > smtp = "localhost:25", > template = > <html> > <head> > <title>Alert Notification</title> > </head> > <body> > <h1>Message</h1> > <p>$message</p> > </body> > </html> > ) > // split stream by logic > 'aggregatedMetricStream_1 to kafka("aggregated_stream_dn") where "component > == 'dn'" partitionBy "aggregatedMetricStream_1.metricType" > 'aggregatedMetricStream_1 ~> druid("aggregated_stream_nn") where "component > == 'nn'" partitionBy "aggregatedMetricStream_1.metricType" > // # end > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)