[ 
https://issues.apache.org/jira/browse/EAGLE-98?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hao Chen updated EAGLE-98:
--------------------------
    Description: 
h2. Features
* High Level Stream-Oriented 
* Declarative Streaming
* Metadata Driven
* Native Scala internal DSL
* Support Scala Programing or Script/Configure in *.egl
* Support static policy definition / dynamical policy loader
* IDE friendly features like sql-prefix and xml as email template.

h2. Syntax 
{code}
// Plug-able DSL Interface
import org.apache.eagle.stream.dsl.AggregateInterface._
import org.apache.eagle.stream.dsl.AlertInterface._
import org.apache.eagle.stream.dsl.DefineInterface._
import org.apache.eagle.stream.dsl.DruidInterface._
import org.apache.eagle.stream.dsl.KafkaInterface._

// Topology Definition API by extends or script

// #!/bin/bash
// exec scala "$0" "$@"
// !#
// # start
define ("metricStream_1") as ("name" -> 'string, "value"->'double, 
"timestamp"->'long) from kafka(topic="metricStream_1",zk=conf"kafka.zk.hosts")
define ("metricStream_2") as ("name" -> 'string, "value"->'double, 
"timestamp"->'long) from kafka(topic="metricStream_2")

alert partitionBy "metricStream_1.metricType" parallism 1 by sql"""
  from metricStream_1[component=='dn' and 
metricType=="RpcActivityForPort50020.RpcQueueTimeNumOps"].time[3600]
  select sum(value) group by host output every 1 hour insert into alertStream;
  """

aggregate partitionBy "metricStream_1.metricType" parallism 2 by sql"""
  from metricStream_1[component=='dn' and 
metricType=="RpcActivityForPort50020.RpcQueueTimeNumOps"].time[3600]
  select sum(value) group by host output every 1 hour insert into 
aggregatedMetricStream_1;
  """

"alertStream" ~> kafka("alert_topic",zk=conf"kafka.zk.hosts")
"alertStream" to mail(
  from = "[email protected]",
  to = "[email protected]",
  smtp = "localhost:25",
  template =
    <html>
      <head>
      <title>Alert Notification</title>
      </head>
      <body>
        <h1>Message</h1>
        <p>$message</p>
      </body>
    </html>
)

// split stream by logic
"aggregatedMetricStream_1" to kafka("aggregated_stream_dn") where "component == 
'dn'" partitionBy "aggregatedMetricStream_1.metricType"
"aggregatedMetricStream_1" ~> druid("aggregated_stream_nn")  where "component 
== 'nn'" partitionBy "aggregatedMetricStream_1.metricType"
// # end
{code}

  was:
h2. Features
* High Level Stream-Oriented 
* Declarative Streaming
* Metadata Driven
* Native Scala internal DSL
* Support Scala Programing or Script/Configure in *.egl
* Support static policy definition / dynamical policy loader
* IDE friendly features like sql-prefix and xml as email template.

h2. Syntax 
{code}
// Plug-able DSL Interface
  import org.apache.eagle.stream.dsl.AggregateInterface._
  import org.apache.eagle.stream.dsl.AlertInterface._
  import org.apache.eagle.stream.dsl.DefineInterface._
  import org.apache.eagle.stream.dsl.DruidInterface._
  import org.apache.eagle.stream.dsl.KafkaInterface._

  // Topology Definition API by extends or script

  // #!/bin/bash
  // exec scala "$0" "$@"
  // !#
  // # start
  define ("metricStream_1") as ("name" -> 'string, "value"->'double, 
"timestamp"->'long) from kafka(topic="metricStream_1",zk=conf"kafka.zk.hosts")
  define ("metricStream_2") as ("name" -> 'string, "value"->'double, 
"timestamp"->'long) from kafka(topic="metricStream_2")

  alert partitionBy "metricStream_1.metricType" parallism 1 by sql"""
    from metricStream_1[component=='dn' and 
metricType=="RpcActivityForPort50020.RpcQueueTimeNumOps"].time[3600]
    select sum(value) group by host output every 1 hour insert into alertStream;
    """

  aggregate partitionBy "metricStream_1.metricType" parallism 2 by sql"""
    from metricStream_1[component=='dn' and 
metricType=="RpcActivityForPort50020.RpcQueueTimeNumOps"].time[3600]
    select sum(value) group by host output every 1 hour insert into 
aggregatedMetricStream_1;
    """

  "alertStream" ~> kafka("alert_topic",zk=conf"kafka.zk.hosts")
  "alertStream" to mail(
    from = "[email protected]",
    to = "[email protected]",
    smtp = "localhost:25",
    template =
      <html>
        <head>
        <title>Alert Notification</title>
        </head>
        <body>
          <h1>Message</h1>
          <p>$message</p>
        </body>
      </html>
  )

  // split stream by logic
  "aggregatedMetricStream_1" to kafka("aggregated_stream_dn") where "component 
== 'dn'" partitionBy "aggregatedMetricStream_1.metricType"
  "aggregatedMetricStream_1" ~> druid("aggregated_stream_nn")  where "component 
== 'nn'" partitionBy "aggregatedMetricStream_1.metricType"
  // # end
{code}


> Eagle Declarative Topology Definition DSL
> -----------------------------------------
>
>                 Key: EAGLE-98
>                 URL: https://issues.apache.org/jira/browse/EAGLE-98
>             Project: Eagle
>          Issue Type: New Feature
>    Affects Versions: 0.3.0
>            Reporter: Hao Chen
>            Assignee: Hao Chen
>             Fix For: 0.3.0
>
>
> h2. Features
> * High Level Stream-Oriented 
> * Declarative Streaming
> * Metadata Driven
> * Native Scala internal DSL
> * Support Scala Programing or Script/Configure in *.egl
> * Support static policy definition / dynamical policy loader
> * IDE friendly features like sql-prefix and xml as email template.
> h2. Syntax 
> {code}
> // Plug-able DSL Interface
> import org.apache.eagle.stream.dsl.AggregateInterface._
> import org.apache.eagle.stream.dsl.AlertInterface._
> import org.apache.eagle.stream.dsl.DefineInterface._
> import org.apache.eagle.stream.dsl.DruidInterface._
> import org.apache.eagle.stream.dsl.KafkaInterface._
> // Topology Definition API by extends or script
> // #!/bin/bash
> // exec scala "$0" "$@"
> // !#
> // # start
> define ("metricStream_1") as ("name" -> 'string, "value"->'double, 
> "timestamp"->'long) from kafka(topic="metricStream_1",zk=conf"kafka.zk.hosts")
> define ("metricStream_2") as ("name" -> 'string, "value"->'double, 
> "timestamp"->'long) from kafka(topic="metricStream_2")
> alert partitionBy "metricStream_1.metricType" parallism 1 by sql"""
>   from metricStream_1[component=='dn' and 
> metricType=="RpcActivityForPort50020.RpcQueueTimeNumOps"].time[3600]
>   select sum(value) group by host output every 1 hour insert into alertStream;
>   """
> aggregate partitionBy "metricStream_1.metricType" parallism 2 by sql"""
>   from metricStream_1[component=='dn' and 
> metricType=="RpcActivityForPort50020.RpcQueueTimeNumOps"].time[3600]
>   select sum(value) group by host output every 1 hour insert into 
> aggregatedMetricStream_1;
>   """
> "alertStream" ~> kafka("alert_topic",zk=conf"kafka.zk.hosts")
> "alertStream" to mail(
>   from = "[email protected]",
>   to = "[email protected]",
>   smtp = "localhost:25",
>   template =
>     <html>
>       <head>
>       <title>Alert Notification</title>
>       </head>
>       <body>
>         <h1>Message</h1>
>         <p>$message</p>
>       </body>
>     </html>
> )
> // split stream by logic
> "aggregatedMetricStream_1" to kafka("aggregated_stream_dn") where "component 
> == 'dn'" partitionBy "aggregatedMetricStream_1.metricType"
> "aggregatedMetricStream_1" ~> druid("aggregated_stream_nn")  where "component 
> == 'nn'" partitionBy "aggregatedMetricStream_1.metricType"
> // # end
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to