http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/scala/org/apache/eagle/datastream/TestDAGExpansion.scala
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/scala/org/apache/eagle/datastream/TestDAGExpansion.scala
 
b/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/scala/org/apache/eagle/datastream/TestDAGExpansion.scala
deleted file mode 100644
index 4339e5a..0000000
--- 
a/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/scala/org/apache/eagle/datastream/TestDAGExpansion.scala
+++ /dev/null
@@ -1,104 +0,0 @@
-/*
- *
- *  * Licensed to the Apache Software Foundation (ASF) under one or more
- *  * contributor license agreements.  See the NOTICE file distributed with
- *  * this work for additional information regarding copyright ownership.
- *  * The ASF licenses this file to You under the Apache License, Version 2.0
- *  * (the "License"); you may not use this file except in compliance with
- *  * the License.  You may obtain a copy of the License at
- *  *
- *  *    http://www.apache.org/licenses/LICENSE-2.0
- *  *
- *  * Unless required by applicable law or agreed to in writing, software
- *  * distributed under the License is distributed on an "AS IS" BASIS,
- *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  * See the License for the specific language governing permissions and
- *  * limitations under the License.
- *
- */
-
-package org.apache.eagle.datastream
-
-import com.typesafe.config.{Config, ConfigFactory}
-import org.apache.eagle.datastream.storm.StormExecutionEnvironment
-
-object testStreamUnionExpansion extends App{
-  val config : Config = ConfigFactory.load
-  val env = new StormExecutionEnvironment(config)
-  val tail1 = 
env.fromSpout(TestSpout()).flatMap(WordPrependForAlertExecutor("test")).map2(a 
=> ("key1",a))
-  val tail2 = 
env.fromSpout(TestSpout()).flatMap(WordAppendForAlertExecutor("test")).map2(a 
=> ("key1",a))
-  tail1.streamUnion(List(tail2)).map1(a => "xyz")
-}
-
-object testStreamGroupbyExpansion extends App{
-  val config : Config = ConfigFactory.load;
-  val env = new StormExecutionEnvironment(config)
-  
env.fromSpout(TestSpout()).flatMap(WordPrependForAlertExecutor("test")).groupBy(1).map2(a
 => ("key1",a))
-  //env.execute
-}
-
-object testStreamUnionAndGroupbyExpansion extends App{
-  val config : Config = ConfigFactory.load;
-  val env = new StormExecutionEnvironment(config)
-  val tail1 = 
env.fromSpout(TestSpout()).flatMap(WordPrependForAlertExecutor("test")).map2(a 
=> ("key1",a)).groupBy(1)
-  val tail2 = 
env.fromSpout(TestSpout()).flatMap(WordAppendForAlertExecutor("test")).map2(a 
=> ("key1",a)).groupBy(0)
-  tail1.streamUnion(List(tail2)).map1(a => "xyz")
-//  env.execute()
-}
-
-/**
- * 1. stream schema
- * curl -X POST -H 'Content-Type:application/json' 
"http://localhost:38080/eagle-service/rest/entities?serviceName=AlertStreamSchemaService";
 -d 
'[{"prefix":"alertStreamSchema","tags":{"dataSource":"ds1","streamName":"s1","attrName":"word"},"attrDescription":"word","attrType":"string","category":"","attrValueResolver":""}]'
- * 2. policy
- * curl -X POST -H 'Content-Type:application/json' 
"http://localhost:38080/eagle-service/rest/entities?serviceName=AlertDefinitionService";
 -d 
'[{"tags":{"site":"sandbox","dataSource":"ds1","alertExecutorId":"alert1","policyId":"testAlert","policyType":"siddhiCEPEngine"},"desc":"test
 alert","policyDef":"{\"type\":\"siddhiCEPEngine\",\"expression\":\"from s1 
[(str:regexp(word,'\'.*test.*==true)] select * insert into outputStream 
;\"}","dedupeDef":"","notificationDef":"","remediationDef":"","enabled":"true"}]'
- */
-object testAlertExpansion extends App{
-  val config : Config = ConfigFactory.load;
-  val env = new StormExecutionEnvironment(config)
-  val tail1 = env.fromSpout(TestSpout()).nameAs("testSpout1")
-                  
.flatMap(WordPrependForAlertExecutor("test")).nameAs("prepend")
-                  .alertWithConsumer("s1", "alert1")
-  //env.execute
-}
-
-/**
- * 1. stream schema
- * curl -X POST -H 'Content-Type:application/json' 
"http://localhost:38080/eagle-service/rest/entities?serviceName=AlertStreamSchemaService";
 -d 
'[{"prefix":"alertStreamSchema","tags":{"dataSource":"ds1","streamName":"s1","attrName":"word"},"attrDescription":"word","attrType":"string","category":"","attrValueResolver":""}]'
- * curl -X POST -H 'Content-Type:application/json' 
"http://localhost:38080/eagle-service/rest/entities?serviceName=AlertStreamSchemaService";
 -d 
'[{"prefix":"alertStreamSchema","tags":{"dataSource":"ds1","streamName":"s2","attrName":"word"},"attrDescription":"word","attrType":"string","category":"","attrValueResolver":""}]'
- * 2. policy
- * curl -X POST -H 'Content-Type:application/json' 
"http://localhost:38080/eagle-service/rest/entities?serviceName=AlertDefinitionService";
 -d 
'[{"tags":{"site":"sandbox","dataSource":"ds1","alertExecutorId":"alert1","policyId":"testAlert","policyType":"siddhiCEPEngine"},"desc":"test
 alert","policyDef":"{\"type\":\"siddhiCEPEngine\",\"expression\":\"from s1 
[(str:regexp(word,'\'.*test.*\)==true)] select * insert into outputStream 
;\"}","dedupeDef":"","notificationDef":"","remediationDef":"","enabled":"true"}]'
- */
-object testAlertExpansionWithUnion extends App{
-  val config : Config = ConfigFactory.load;
-  val env = new StormExecutionEnvironment(config)
-  val tail1 = 
env.fromSpout(TestSpout()).nameAs("testSpout1").flatMap(WordPrependForAlertExecutor("test")).nameAs("prepend")
 //.map2(a => ("key1",a))
-  val tail2 = 
env.fromSpout(TestSpout()).flatMap(WordAppendForAlertExecutor("test")) 
//.map2(a => ("key1",a))
-  tail1.streamUnion(List(tail2)).alert(Seq("s1","s2"), "alert1", consume = 
true)
-  //env.execute
-}
-
-
-object testStreamUnionExpansionWithSharedSpout extends App{
-  val config : Config = ConfigFactory.load;
-  val env = new StormExecutionEnvironment(config)
-  val source = env.fromSpout(TestSpout())
-  val tail1 = source.flatMap(WordPrependForAlertExecutor("test")).map2(a => 
("key1",a))
-  val tail2 = source.flatMap(WordAppendForAlertExecutor("test")).map2(a => 
("key1",a))
-  tail1.streamUnion(List(tail2)).map1(a  => {
-    println(a)
-    "xyz"
-  })
-//    env.execute
-}
-
-object testStreamUnionExpansionWithSharedSpout_2 extends App{
-  val config : Config = ConfigFactory.load;
-  val env = new StormExecutionEnvironment(config)
-  val source = env.fromSpout(TestSpout())
-  val tail1 = source.flatMap(WordPrependForAlertExecutor("test")).map2(a => 
("key1",a))
-  source.streamUnion(List(tail1)).map1(a  => {
-    println(a)
-    "xyz"
-  })
-//  env.execute
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/scala/org/apache/eagle/datastream/TestExecutionEnvironment.scala
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/scala/org/apache/eagle/datastream/TestExecutionEnvironment.scala
 
b/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/scala/org/apache/eagle/datastream/TestExecutionEnvironment.scala
deleted file mode 100644
index ccd3deb..0000000
--- 
a/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/scala/org/apache/eagle/datastream/TestExecutionEnvironment.scala
+++ /dev/null
@@ -1,34 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.datastream
-
-import com.typesafe.config.ConfigFactory
-import org.apache.eagle.datastream.storm.StormExecutionEnvironment
-
-/**
- * @since  12/5/15
- */
-object TestExecutionEnvironment extends App{
-  val env0 = ExecutionEnvironments.get[StormExecutionEnvironment]
-  println(env0)
-  val config = ConfigFactory.load()
-  val env1 = 
ExecutionEnvironments.getWithConfig[StormExecutionEnvironment](config)
-  println(env1)
-  val env2 = 
ExecutionEnvironments.get[StormExecutionEnvironment](Array[String]("-D","key=value"))
-  println(env2)
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/scala/org/apache/eagle/datastream/TestScala.scala
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/scala/org/apache/eagle/datastream/TestScala.scala
 
b/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/scala/org/apache/eagle/datastream/TestScala.scala
deleted file mode 100644
index d785689..0000000
--- 
a/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/scala/org/apache/eagle/datastream/TestScala.scala
+++ /dev/null
@@ -1,20 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.datastream
-
-trait A { val x: Int }
-case class B(val x: Int, y: Int) extends A
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/scala/org/apache/eagle/datastream/TestStormNodes.scala
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/scala/org/apache/eagle/datastream/TestStormNodes.scala
 
b/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/scala/org/apache/eagle/datastream/TestStormNodes.scala
deleted file mode 100644
index 231ebab..0000000
--- 
a/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/scala/org/apache/eagle/datastream/TestStormNodes.scala
+++ /dev/null
@@ -1,160 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.datastream
-
-import java.util
-
-import backtype.storm.spout.SpoutOutputCollector
-import backtype.storm.task.TopologyContext
-import backtype.storm.topology.OutputFieldsDeclarer
-import backtype.storm.topology.base.BaseRichSpout
-import backtype.storm.tuple.Fields
-import com.typesafe.config.Config
-import org.slf4j.LoggerFactory
-
-case class TestSpout() extends BaseRichSpout {
-  val LOG = LoggerFactory.getLogger(TestSpout.getClass)
-  var _collector : SpoutOutputCollector = null
-  override def nextTuple : Unit = {
-    _collector.emit(util.Arrays.asList("abc"))
-    LOG.info("send spout data abc")
-    Thread.sleep(1000)
-  }
-  override def declareOutputFields (declarer: OutputFieldsDeclarer): Unit ={
-    declarer.declare(new Fields("value"))
-  }
-  override def open(conf: java.util.Map[_, _], context: TopologyContext, 
collector: SpoutOutputCollector): Unit ={
-    _collector = collector
-  }
-}
-
-case class TestKeyValueSpout() extends BaseRichSpout {
-  val LOG = LoggerFactory.getLogger(TestSpout.getClass)
-  var _collector : SpoutOutputCollector = null
-  var count : Int = 0
-  override def nextTuple : Unit = {
-    if(count%3 == 0) {
-      _collector.emit(util.Arrays.asList("abc", new Integer(1)))
-    }else{
-      _collector.emit(util.Arrays.asList("xyz", new Integer(1)))
-    }
-    count += 1;
-    LOG.info("send spout data abc/xyz")
-    Thread.sleep(1000)
-  }
-  override def declareOutputFields (declarer: OutputFieldsDeclarer): Unit ={
-    declarer.declare(new Fields("word", "count"))
-  }
-  override def open(conf: java.util.Map[_, _], context: TopologyContext, 
collector: SpoutOutputCollector): Unit ={
-    _collector = collector
-  }
-}
-
-case class EchoExecutor() extends StormStreamExecutor1[String] {
-  val LOG = LoggerFactory.getLogger(EchoExecutor.getClass)
-  var config : Config = null
-  override def prepareConfig(config : Config){this.config = config}
-  override def init {}
-  override def flatMap(input : Seq[AnyRef], outputCollector : 
Collector[Tuple1[String]]): Unit ={
-    outputCollector.collect(Tuple1(input.head.asInstanceOf[String]))
-    LOG.info("echo " + input.head)
-  }
-}
-
-case class WordPrependExecutor(prefix : String) extends 
StormStreamExecutor1[String] {
-  val LOG = LoggerFactory.getLogger(WordPrependExecutor.getClass)
-  var config : Config = null
-  override def prepareConfig(config : Config){this.config = config}
-  override def init {}
-  override def flatMap(input : Seq[AnyRef], outputCollector : 
Collector[Tuple1[String]]): Unit ={
-    outputCollector.collect(Tuple1(prefix + "_" + input.head))
-    LOG.info("preappend " + prefix + "_" + input.head)
-  }
-}
-
-case class WordPrependForAlertExecutor(prefix : String) extends 
StormStreamExecutor2[String, util.SortedMap[Object, Object]] {
-  val LOG = LoggerFactory.getLogger(WordPrependExecutor.getClass)
-  var config : Config = null
-  override def prepareConfig(config : Config){this.config = config}
-  override def init {}
-  override def flatMap(input : Seq[AnyRef], outputCollector : 
Collector[Tuple2[String, util.SortedMap[Object, Object]]]): Unit ={
-    val value = new util.TreeMap[Object, Object]()
-    value.put("word", prefix + "_" + input.head)
-    outputCollector.collect(Tuple2("key1",value))
-    LOG.info("preappend " + prefix + "_" + input.head)
-  }
-}
-
-case class WordPrependForAlertExecutor2(prefix : String) extends 
StormStreamExecutor1[util.SortedMap[Object, Object]] {
-  val LOG = LoggerFactory.getLogger(WordPrependExecutor.getClass)
-  var config : Config = null
-  override def prepareConfig(config : Config){this.config = config}
-  override def init {}
-  override def flatMap(input : Seq[AnyRef], outputCollector : 
Collector[Tuple1[util.SortedMap[Object, Object]]]): Unit ={
-    val value = new util.TreeMap[Object, Object]()
-    value.put("word", prefix + "_" + input.head)
-    outputCollector.collect(Tuple1(value))
-    LOG.info("preappend " + prefix + "_" + input.head)
-  }
-}
-
-case class WordAppendExecutor(suffix : String) extends 
StormStreamExecutor1[String] {
-  val LOG = LoggerFactory.getLogger(WordPrependExecutor.getClass)
-  var config : Config = null
-  override def prepareConfig(config : Config){this.config = config}
-  override def init {}
-  override def flatMap(input : Seq[AnyRef], outputCollector : 
Collector[Tuple1[String]]): Unit ={
-    outputCollector.collect(Tuple1(input.head + "_" + suffix))
-    LOG.info("append " + input.head + "_" + suffix)
-  }
-}
-
-case class WordAppendForAlertExecutor(suffix : String) extends 
StormStreamExecutor2[String, util.SortedMap[Object, Object]] {
-  val LOG = LoggerFactory.getLogger(WordPrependExecutor.getClass)
-  var config : Config = null
-  override def prepareConfig(config : Config){this.config = config}
-  override def init {}
-  override def flatMap(input : Seq[AnyRef], outputCollector : 
Collector[Tuple2[String, util.SortedMap[Object, Object]]]): Unit ={
-    val value = new util.TreeMap[Object, Object]()
-    value.put("word", input.head + "_" + suffix)
-    outputCollector.collect(Tuple2("key1", value))
-    LOG.info("append " + input.head + "_" + suffix)
-  }
-}
-
-case class PatternAlertExecutor(pattern : String) extends 
StormStreamExecutor1[String] {
-  val LOG = LoggerFactory.getLogger(PatternAlertExecutor.getClass)
-  var config : Config = null
-  override def prepareConfig(config : Config){this.config = config}
-  override def init {}
-  override def flatMap(input : Seq[AnyRef], outputCollector : 
Collector[Tuple1[String]]): Unit ={
-    LOG.info("send out " + input.head)
-    if(input.head.asInstanceOf[String].matches(pattern)){
-      LOG.info("Alert hadppens for input " + input.head + " and for pattern " 
+ pattern)
-    }
-  }
-}
-
-case class GroupedEchoExecutor() extends StormStreamExecutor1[String] {
-  val LOG = LoggerFactory.getLogger(GroupedEchoExecutor.getClass)
-  var config : Config = null
-  override def prepareConfig(config : Config){this.config = config}
-  override def init {}
-  override def flatMap(input : Seq[AnyRef], outputCollector : 
Collector[Tuple1[String]]): Unit ={
-    LOG.info("get " + input(0))
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/scala/org/apache/eagle/datastream/TestStormRunner.scala
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/scala/org/apache/eagle/datastream/TestStormRunner.scala
 
b/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/scala/org/apache/eagle/datastream/TestStormRunner.scala
deleted file mode 100644
index 424bdef..0000000
--- 
a/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/scala/org/apache/eagle/datastream/TestStormRunner.scala
+++ /dev/null
@@ -1,120 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.datastream
-
-import com.typesafe.config.{Config, ConfigFactory}
-
-/**
- * explicit union
- * a.union(b,c).alert() means (a,b,c)'s output is united into alert()
- * before running this testing, we should define in eagle service one policy 
and one stream schema
- * 1. stream schema
- * curl -X POST -H 'Content-Type:application/json' 
"http://${EAGLE_SERVICE_HOST}:${EAGLE_SERVICE_PORT}/eagle-service/rest/entities?serviceName=AlertStreamSchemaService";
 -d 
'[{"prefix":"alertStreamSchema","tags":{"dataSource":"ds1","streamName":"s1","attrName":"word"},"attrDescription":"word","attrType":"string","category":"","attrValueResolver":""}]'
- * 2. policy
- * curl -X POST -H 'Content-Type:application/json' 
"http://${EAGLE_SERVICE_HOST}:${EAGLE_SERVICE_PORT}/eagle-service/rest/entities?serviceName=AlertDefinitionService";
 -d 
'[{"tags":{"site":"sandbox","dataSource":"ds1","alertExecutorId":"alert1","policyId":"testAlert","policyType":"siddhiCEPEngine"},"desc":"test
 alert","policyDef":"{\"type\":\"siddhiCEPEngine\",\"expression\":\"from s1 
[(str:regexp(word,'\'.*test.*\)==true)] select * insert into outputStream 
;\"}","dedupeDef":"","notificationDef":"","remediationDef":"","enabled":"true"}]'
- */
-object UnionForAlert extends App{
-  val config : Config = ConfigFactory.load;
-  val env = ExecutionEnvironments.getStorm(config)
-  val tail1 = 
env.fromSpout(TestSpout()).flatMap(WordPrependForAlertExecutor("test")).map2(a 
=> ("key1",a))
-  val tail2 = 
env.fromSpout(TestSpout()).flatMap(WordAppendForAlertExecutor("test")).map2(a 
=> ("key2",a))
-  tail1.streamUnion(List(tail2)).alert(Seq("s1","s2"), "alert1", consume = 
false)
-  env.execute()
-}
-
-/**
- * test alert after flatMap
- */
-object TestAlertAfterFlatMap extends App{
-  val config : Config = ConfigFactory.load;
-  val env = ExecutionEnvironments.getStorm(config)
-  val tail1 = env.fromSpout(TestSpout())
-                  .flatMap(WordPrependForAlertExecutor("test"))
-                  .alert(Seq("s1"), "alert1", consume = false)
-  //env.execute
-}
-
-/**
- * test alert after Map
- */
-object TestAlertAfterMap extends App{
-  val config : Config = ConfigFactory.load;
-  val env = ExecutionEnvironments.getStorm(config)
-  val tail1 = env.fromSpout(TestSpout())
-    .flatMap(WordPrependForAlertExecutor2("test"))
-    .map2(a => ("key", a))
-    .alert(Seq("s1"), "alert1", false)
-  //env.execute
-}
-
-object StormRunnerWithoutSplitOrJoin extends App{
-  val config : Config = ConfigFactory.load;
-  val env = ExecutionEnvironments.getStorm(config)
-  
env.fromSpout(TestSpout()).flatMap(EchoExecutor()).flatMap(WordPrependExecutor("test"))
-    .flatMap(PatternAlertExecutor("test.*"))
-//  env.execute()
-}
-
-object StormRunnerWithSplit extends App{
-  val config : Config = ConfigFactory.load;
-  val env = ExecutionEnvironments.getStorm(config)
-  val toBeSplit = env.fromSpout(TestSpout()).flatMap(EchoExecutor())
-  
toBeSplit.flatMap(WordPrependExecutor("test")).flatMap(PatternAlertExecutor("test.*"))
-  toBeSplit.flatMap(WordAppendExecutor("test"))
-//  env.execute()
-}
-
-object StormRunnerWithUnion extends App{
-  val config : Config = ConfigFactory.load;
-  val env = ExecutionEnvironments.getStorm(config)
-  val tail1 = env.fromSpout(TestSpout()).flatMap(WordPrependExecutor("test"))
-  val tail2 = env.fromSpout(TestSpout()).flatMap(WordAppendExecutor("test"))
-  tail1.streamUnion(List(tail2)).flatMap(PatternAlertExecutor(".*test.*"))
-  env.execute()
-}
-
-object StormRunnerWithFilter extends App{
-  val config : Config = ConfigFactory.load;
-  val env = ExecutionEnvironments.getStorm(config)
-  
env.fromSpout(TestSpout()).flatMap(EchoExecutor()).flatMap(WordPrependExecutor("test")).
-    filter(_=>false).
-    flatMap(PatternAlertExecutor("test.*"))
-  //env.execute
-}
-
-object StormRunnerWithJavaExecutor extends App{
-  val config : Config = ConfigFactory.load;
-  val env = ExecutionEnvironments.getStorm(config)
-  env.fromSpout(TestSpout()).flatMap(new 
JavaEchoExecutor()).flatMap(WordPrependExecutor("test")).
-    filter(_=>false).
-    flatMap(PatternAlertExecutor("test.*"))
-  //env.execute
-}
-
-object StormRunnerWithKeyValueSpout extends App{
-  val config : Config = ConfigFactory.load;
-  val env = ExecutionEnvironments.getStorm(config)
-  env.fromSpout(TestKeyValueSpout()).groupBy(1).flatMap(new 
GroupedEchoExecutor()).parallelism(2)
-  //env.execute
-}
-
-object StormRunnerWithKeyValueSpoutRenameOutputFields extends App{
-  val config : Config = ConfigFactory.load;
-  val env = ExecutionEnvironments.getStorm(config)
-  
env.fromSpout(TestKeyValueSpout()).withOutputFields(2).groupBy(0).flatMap(new 
GroupedEchoExecutor()).parallelism(2)
-  //env.execute
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/scala/org/apache/eagle/datastream/TestStreamDAGBuilder.scala
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/scala/org/apache/eagle/datastream/TestStreamDAGBuilder.scala
 
b/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/scala/org/apache/eagle/datastream/TestStreamDAGBuilder.scala
deleted file mode 100644
index 48dc7e5..0000000
--- 
a/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/scala/org/apache/eagle/datastream/TestStreamDAGBuilder.scala
+++ /dev/null
@@ -1,236 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.datastream
-
-import com.typesafe.config.ConfigFactory
-import org.scalatest.{FlatSpec, Matchers}
-
-class TestStreamDAGBuilder extends FlatSpec with Matchers{
-//  "a single source DAG with groupBy" should "be traversed without groupBy 
node" in {
-//    val config = ConfigFactory.load()
-//    val env = ExecutionEnvironmentFactory.getStorm(config)
-//    val tail = 
env.newSource(null).flatMap(EchoExecutor()).groupBy(0).flatMap(WordPrependExecutor("test"))
-//    val dag = new StreamDAGBuilder(env.heads).build()
-//    val iter = dag.iterator()
-//    assert(iter.hasNext)
-//    iter.next() match{
-//      case StormSourceProducer(t) => assert(t == null)
-//      case _ => assert(false)
-//    }
-//    assert(iter.hasNext)
-//    iter.next() match{
-//      case FlatMapProducer(worker) => 
assert(worker.isInstanceOf[EchoExecutor])
-//      case _ => assert(false)
-//    }
-//    assert(iter.hasNext)
-//    iter.next() match{
-//      case FlatMapProducer(worker) => 
assert(worker.isInstanceOf[WordPrependExecutor])
-//      case _ => assert(false)
-//    }
-//  }
-//
-//  "a single source DAG with groupBy from spout" should "be traversed without 
groupBy node" in {
-//    val config = ConfigFactory.load()
-//    val env = ExecutionEnvironmentFactory.getStorm(config)
-//    val tail = 
env.newSource(null).groupBy(0).flatMap(WordPrependExecutor("test"))
-//    val dag = new StreamDAGBuilder(env.heads).build()
-//    val iter = dag.iterator()
-//    assert(iter.hasNext)
-//    iter.next() match{
-//      case StormSourceProducer(t) => assert(t == null)
-//      case _ => assert(false)
-//    }
-//    assert(iter.hasNext)
-//    iter.next() match{
-//      case FlatMapProducer(worker) => 
assert(worker.isInstanceOf[WordPrependExecutor])
-//      case _ => assert(false)
-//    }
-//  }
-//
-//  "a single source DAG with groupBy from spout and then split" should "be 
traversed without groupBy node" in {
-//    val config = ConfigFactory.load()
-//    val env = ExecutionEnvironmentFactory.getStorm(config)
-//    val groupby = env.newSource(null).groupBy(0)
-//    groupby.flatMap(WordPrependExecutor("test"))
-//    groupby.flatMap(WordAppendExecutor("test"))
-//    val dag = new StreamDAGBuilder(env.heads).build()
-//    val iter = dag.iterator()
-//    assert(iter.hasNext)
-//    iter.next() match{
-//      case StormSourceProducer(t) => assert(t == null)
-//      case _ => assert(false)
-//    }
-//    assert(iter.hasNext)
-//    iter.next() match{
-//      case FlatMapProducer(worker) => 
assert(worker.isInstanceOf[WordPrependExecutor])
-//      case _ => assert(false)
-//    }
-//    assert(iter.hasNext)
-//    iter.next() match{
-//      case FlatMapProducer(worker) => 
assert(worker.isInstanceOf[WordAppendExecutor])
-//      case _ => assert(false)
-//    }
-//  }
-//
-//  "a single source DAG without stream join" should "be traversed 
sequentially like specified" in{
-//    val config = ConfigFactory.load()
-//    val env = ExecutionEnvironmentFactory.getStorm(config)
-//    val tail = 
env.newSource(null).flatMap(EchoExecutor()).flatMap(WordPrependExecutor("test"))
-//    val dag = new StreamDAGBuilder(env.heads).build()
-//    val iter = dag.iterator()
-//    assert(iter.hasNext)
-//    iter.next() match{
-//      case StormSourceProducer(t) => assert(t == null)
-//      case _ => assert(false)
-//    }
-//    assert(iter.hasNext)
-//    iter.next() match{
-//      case FlatMapProducer(worker) => 
assert(worker.isInstanceOf[EchoExecutor])
-//      case _ => assert(false)
-//    }
-//    assert(iter.hasNext)
-//    iter.next() match{
-//      case FlatMapProducer(worker) => 
assert(worker.isInstanceOf[WordPrependExecutor])
-//      case _ => assert(false)
-//    }
-//  }
-//
-//  "a single source with split" should "has more than one tail producer" in {
-//    val config = ConfigFactory.load()
-//    val env = ExecutionEnvironmentFactory.getStorm(config)
-//    val echo = env.newSource(null).flatMap(EchoExecutor())
-//    val tail1 = echo.flatMap(WordPrependExecutor("test"))
-//    val tail2 = echo.flatMap(WordAppendExecutor("test"))
-//    val dag = new StreamDAGBuilder(env.heads).build()
-//    val iter = dag.iterator()
-//    assert(iter.hasNext)
-//    iter.next() match {
-//      case StormSourceProducer(t) => assert(t == null)
-//      case _ => assert(false)
-//    }
-//    assert(iter.hasNext)
-//    iter.next() match {
-//      case FlatMapProducer(worker) => 
assert(worker.isInstanceOf[EchoExecutor])
-//      case _ => assert(false)
-//    }
-//    assert(iter.hasNext)
-//    iter.next() match {
-//      case FlatMapProducer(worker) => 
assert(worker.isInstanceOf[WordPrependExecutor])
-//      case _ => assert(false)
-//    }
-//    assert(iter.hasNext)
-//    iter.next() match {
-//      case FlatMapProducer(worker) => 
assert(worker.isInstanceOf[WordAppendExecutor])
-//      case _ => assert(false)
-//    }
-//  }
-//
-//  "a single source with split and join" should "has join" in {
-//    val config = ConfigFactory.load()
-//    val env = ExecutionEnvironmentFactory.getStorm(config)
-//    val echo = env.newSource(null).flatMap(EchoExecutor())
-//    val tail1 = echo.flatMap(WordPrependExecutor("test"))
-//    val tail2 = 
echo.flatMap(WordAppendExecutor("test")).filter(_=>true).streamUnion(List(tail1)).
-//      flatMap(PatternAlertExecutor("test*"))
-//    val dag = new StreamDAGBuilder(env.heads).build()
-//    val iter = dag.iterator()
-//    assert(iter.hasNext)
-//    iter.next() match {
-//      case StormSourceProducer(t) => assert(t == null)
-//      case _ => assert(false)
-//    }
-//    assert(iter.hasNext)
-//    iter.next() match {
-//      case FlatMapProducer(worker) => 
assert(worker.isInstanceOf[EchoExecutor])
-//      case _ => assert(false)
-//    }
-//    assert(iter.hasNext)
-//    iter.next() match {
-//      case FlatMapProducer(worker) => 
assert(worker.isInstanceOf[WordPrependExecutor])
-//      case _ => assert(false)
-//    }
-//    assert(iter.hasNext)
-//    iter.next() match {
-//      case FlatMapProducer(worker) => 
assert(worker.isInstanceOf[WordAppendExecutor])
-//      case _ => assert(false)
-//    }
-//    assert(iter.hasNext)
-//    iter.next() match {
-//      case FilterProducer(fn) =>
-//      case _ => assert(false)
-//    }
-//    assert(iter.hasNext)
-//    iter.next() match {
-//      case FlatMapProducer(worker) => 
assert(worker.asInstanceOf[PatternAlertExecutor].pattern.equals("test*"))
-//      case _ => assert(false)
-//    }
-//    assert(!iter.hasNext)
-//  }
-//
-//  "multiple sources with split and union" should "has union" in {
-//    val config = ConfigFactory.load()
-//    val env = ExecutionEnvironmentFactory.getStorm(config)
-//    val source1 = env.newSource(TestSpout())
-//    val source2 = env.newSource(TestSpout())
-//    val source3 = env.newSource(TestSpout())
-//
-//    val tail1 = source1.flatMap(WordPrependExecutor("test"))
-//    val tail2 = source2.filter(_=>true)
-//    val tail3 = 
source3.flatMap(WordAppendExecutor("test")).streamUnion(List(tail1, tail2)).
-//      flatMap(PatternAlertExecutor("abc*"))
-//
-//    val dag = new StreamDAGBuilder(env.heads).build()
-//    val iter = dag.iterator()
-//
-//    assert(iter.hasNext)
-//    iter.next() match {
-//      case StormSourceProducer(t) => assert(t.isInstanceOf[TestSpout])
-//      case _ => assert(false)
-//    }
-//    assert(iter.hasNext)
-//    iter.next() match {
-//      case FlatMapProducer(worker) => 
assert(worker.isInstanceOf[WordPrependExecutor])
-//      case _ => assert(false)
-//    }
-//    assert(iter.hasNext)
-//    iter.next() match {
-//      case StormSourceProducer(t) => assert(t.isInstanceOf[String])
-//      case _ => assert(false)
-//    }
-//    assert(iter.hasNext)
-//    iter.next() match {
-//      case FilterProducer(fn) =>
-//      case _ => assert(false)
-//    }
-//    assert(iter.hasNext)
-//    iter.next() match {
-//      case SourceProducer(t) => assert(t.isInstanceOf[String])
-//      case _ => assert(false)
-//    }
-//    assert(iter.hasNext)
-//    iter.next() match {
-//      case FlatMapProducer(worker) => 
assert(worker.isInstanceOf[WordAppendExecutor])
-//      case _ => assert(false)
-//    }
-//    assert(iter.hasNext)
-//    iter.next() match {
-//      case FlatMapProducer(worker) => 
assert(worker.asInstanceOf[PatternAlertExecutor].pattern.equals("abc*"))
-//      case _ => assert(false)
-//    }
-//    assert(!iter.hasNext)
-//  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/scala/org/apache/eagle/datastream/TestTypeSafedDSL.scala
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/scala/org/apache/eagle/datastream/TestTypeSafedDSL.scala
 
b/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/scala/org/apache/eagle/datastream/TestTypeSafedDSL.scala
deleted file mode 100644
index 7754765..0000000
--- 
a/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/scala/org/apache/eagle/datastream/TestTypeSafedDSL.scala
+++ /dev/null
@@ -1,115 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.eagle.datastream
-
-import org.apache.eagle.datastream.core.StreamContext
-import org.apache.eagle.datastream.storm.StormExecutionEnvironment
-
-/**
- * @since  12/4/15
- */
-case class Entity(name:String,value:Double,var inc:Int=0)
-
-object TestIterableWithGroupBy extends App {
-
-  val env = ExecutionEnvironments.get[StormExecutionEnvironment](args)
-
-  val tuples = Seq(
-    Entity("a", 1),
-    Entity("a", 2),
-    Entity("a", 3),
-    Entity("b", 2),
-    Entity("c", 3),
-    Entity("d", 3)
-  )
-
-  env.from(tuples)
-    .groupByKey(_.name)
-    .map(o => {o.inc += 2;o})
-    .filter(_.name != "b")
-    .filter(_.name != "c")
-    .groupByKey(o=>(o.name,o.value))
-    .map(o => (o.name,o))
-    .map(o => (o._1,o._2.value,o._2.inc))
-    .foreach(println)
-
-  env.execute()
-}
-
-object TestIterableWithGroupByWithStreamContext extends App {
-  val stream = StreamContext(args)
-
-  val tuples = Seq(
-    Entity("a", 1),
-    Entity("a", 2),
-    Entity("a", 3),
-    Entity("b", 2),
-    Entity("c", 3),
-    Entity("d", 3)
-  )
-
-  stream.from(tuples)
-    .groupByKey(_.name)
-    .map(o => {o.inc += 2;o})
-    .filter(_.name != "b")
-    .filter(_.name != "c")
-    .groupByKey(o=>(o.name,o.value))
-    .map(o => (o.name,o))
-    .map(o => (o._1,o._2.value,o._2.inc))
-    .foreach(println)
-
-  stream.submit[StormExecutionEnvironment]
-}
-
-object TestIterableWithGroupByCircularly extends App{
-  val env = ExecutionEnvironments.get[StormExecutionEnvironment](args)
-
-  val tuples = Seq(
-    Entity("a", 1),
-    Entity("a", 2),
-    Entity("a", 3),
-    Entity("b", 2),
-    Entity("c", 3),
-    Entity("d", 3)
-  )
-
-  env.from(tuples,recycle = true)
-    .map(o => {o.inc += 2;o})
-    .groupByKey(_.name)
-    .foreach(println)
-  env.execute()
-}
-
-object TestGroupByKeyOnSpoutproxy extends App{
-  val env = ExecutionEnvironments.get[StormExecutionEnvironment](args)
-
-  val tuples = Seq(
-    Entity("a", 1),
-    Entity("a", 2),
-    Entity("a", 3),
-    Entity("b", 2),
-    Entity("c", 3),
-    Entity("d", 3)
-  )
-
-  env.fromSpout[String](TestSpout())
-    .groupByKey(_.charAt(0))
-    .foreach(println)
-  env.execute()
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/java/org/apache/eagle/dataproc/core/StreamingProcessConstants.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/java/org/apache/eagle/dataproc/core/StreamingProcessConstants.java
 
b/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/java/org/apache/eagle/dataproc/core/StreamingProcessConstants.java
deleted file mode 100644
index fc2a016..0000000
--- 
a/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/java/org/apache/eagle/dataproc/core/StreamingProcessConstants.java
+++ /dev/null
@@ -1,23 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.dataproc.core;
-
-public class StreamingProcessConstants {
-       public static final String EVENT_PARTITION_KEY = "eventPartitionKey";
-       public static final String EVENT_STREAM_NAME = "streamName";
-       public static final String EVENT_ATTRIBUTE_MAP = "value";
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/java/org/apache/eagle/datastream/JavaMapper.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/java/org/apache/eagle/datastream/JavaMapper.java
 
b/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/java/org/apache/eagle/datastream/JavaMapper.java
deleted file mode 100644
index 7e66478..0000000
--- 
a/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/java/org/apache/eagle/datastream/JavaMapper.java
+++ /dev/null
@@ -1,26 +0,0 @@
-/*
- *
- *  * Licensed to the Apache Software Foundation (ASF) under one or more
- *  * contributor license agreements.  See the NOTICE file distributed with
- *  * this work for additional information regarding copyright ownership.
- *  * The ASF licenses this file to You under the Apache License, Version 2.0
- *  * (the "License"); you may not use this file except in compliance with
- *  * the License.  You may obtain a copy of the License at
- *  *
- *  *    http://www.apache.org/licenses/LICENSE-2.0
- *  *
- *  * Unless required by applicable law or agreed to in writing, software
- *  * distributed under the License is distributed on an "AS IS" BASIS,
- *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  * See the License for the specific language governing permissions and
- *  * limitations under the License.
- *
- */
-
-package org.apache.eagle.datastream;
-
-import java.util.List;
-
-public interface JavaMapper {
-    List<Object> map(List<Object> input);
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/java/org/apache/eagle/datastream/JavaTypeCompatible.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/java/org/apache/eagle/datastream/JavaTypeCompatible.java
 
b/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/java/org/apache/eagle/datastream/JavaTypeCompatible.java
deleted file mode 100644
index 39ce6c2..0000000
--- 
a/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/java/org/apache/eagle/datastream/JavaTypeCompatible.java
+++ /dev/null
@@ -1,24 +0,0 @@
-package org.apache.eagle.datastream;
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/**
- * @since 12/8/15
- */
-public interface JavaTypeCompatible {
-    Class<?> getType();
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/scala/org/apache/eagle/datastream/FlatMapper.scala
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/scala/org/apache/eagle/datastream/FlatMapper.scala
 
b/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/scala/org/apache/eagle/datastream/FlatMapper.scala
deleted file mode 100644
index ddea46f..0000000
--- 
a/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/scala/org/apache/eagle/datastream/FlatMapper.scala
+++ /dev/null
@@ -1,25 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.datastream
-
-trait FlatMapper[T] extends Serializable {
-  def flatMap(input : Seq[AnyRef], collector : Collector[T])
-}
-
-case class FlatMapperWrapper[T](func:(Any,Collector[T]) => Unit) extends 
FlatMapper[T]{
-  override def flatMap(input: Seq[AnyRef], collector: Collector[T]): Unit = 
func(input,collector)
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/scala/org/apache/eagle/datastream/StormStreamExecutor.scala
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/scala/org/apache/eagle/datastream/StormStreamExecutor.scala
 
b/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/scala/org/apache/eagle/datastream/StormStreamExecutor.scala
deleted file mode 100644
index 836c7eb..0000000
--- 
a/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/scala/org/apache/eagle/datastream/StormStreamExecutor.scala
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.datastream
-
-import com.typesafe.config.Config
-import scala.collection.JavaConverters._
-
-trait StormStreamExecutor[R <: Any] extends FlatMapper[R] {
-  def prepareConfig(config : Config)
-  def init
-  def fields : Array[String]
-}
-
-trait JavaStormStreamExecutor[R <: AnyRef] extends FlatMapper[R] {
-  def prepareConfig(config : Config)
-  def init
-  def fields : Array[String]
-  override def toString() = this.getClass.getSimpleName
-
-  override def flatMap(input : Seq[AnyRef], collector : Collector[R]) = 
flatMap(input.asJava,collector)
-
-  def flatMap(input : java.util.List[AnyRef], collector : Collector[R])
-}
-
-abstract class StormStreamExecutor1[T0] extends 
StormStreamExecutor[Tuple1[T0]] {
-  override def fields = Array("f0")
-}
-
-abstract class JavaStormStreamExecutor1[T0] extends 
JavaStormStreamExecutor[Tuple1[T0]] {
-  override def fields = Array("f0")
-}
-
-abstract class  StormStreamExecutor2[T0, T1] extends 
StormStreamExecutor[Tuple2[T0, T1]] {
-  override def fields = Array("f0", "f1")
-}
-
-abstract class  JavaStormStreamExecutor2[T0, T1] extends 
JavaStormStreamExecutor[Tuple2[T0, T1]] {
-  override def fields = Array("f0", "f1")
-}
-
-abstract class  StormStreamExecutor3[T0, T1, T2] extends 
StormStreamExecutor[Tuple3[T0, T1, T2]] {
-  override def fields = Array("f0", "f1", "f2")
-}
-
-abstract class  JavaStormStreamExecutor3[T0, T1, T2] extends 
JavaStormStreamExecutor[Tuple3[T0, T1, T2]] {
-  override def fields = Array("f0", "f1", "f2")
-}
-
-abstract class  StormStreamExecutor4[T0, T1, T2, T3] extends 
StormStreamExecutor[Tuple4[T0, T1, T2, T3]] {
-  override def fields = Array("f0", "f1", "f2", "f3")
-}
-
-abstract class  JavaStormStreamExecutor4[T0, T1, T2, T3] extends 
JavaStormStreamExecutor[Tuple4[T0, T1, T2, T3]] {
-  override def fields = Array("f0", "f1", "f2", "f3")
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-data-process/pom.xml
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/pom.xml 
b/eagle-core/eagle-data-process/pom.xml
index e32be21..60bfced 100644
--- a/eagle-core/eagle-data-process/pom.xml
+++ b/eagle-core/eagle-data-process/pom.xml
@@ -31,6 +31,5 @@
        <module>eagle-storm-jobrunning-spout</module>
        <module>eagle-job-common</module>
         <module>eagle-stream-process-api</module>
-        <module>eagle-stream-pipeline</module>
     </modules>
 </project>

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/alert/entity/AlertAPIEntity.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/alert/entity/AlertAPIEntity.java
 
b/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/alert/entity/AlertAPIEntity.java
deleted file mode 100644
index 1ff8b27..0000000
--- 
a/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/alert/entity/AlertAPIEntity.java
+++ /dev/null
@@ -1,112 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.entity;
-
-
-import org.apache.eagle.log.entity.meta.*;
-import org.apache.eagle.policy.common.Constants;
-import org.apache.eagle.common.metric.AlertContext;
-import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity;
-import org.codehaus.jackson.annotate.JsonIgnore;
-import org.codehaus.jackson.annotate.JsonIgnoreProperties;
-import org.codehaus.jackson.annotate.JsonProperty;
-import org.codehaus.jackson.map.annotate.JsonSerialize;
-
-@JsonSerialize(include=JsonSerialize.Inclusion.NON_NULL)
-@Table("alertdetail")
-@ColumnFamily("f")
-@Prefix("hadoop")
-@Service(Constants.ALERT_SERVICE_ENDPOINT_NAME)
-@TimeSeries(true)
-@JsonIgnoreProperties(ignoreUnknown = true)
-@Tags({
-               "site",
-               "hostname",
-               "application",
-               "policyId",
-               "sourceStreams",
-               "alertSource",
-               "alertExecutorId"
-})
-public class AlertAPIEntity extends TaggedLogAPIEntity{
-       @Column("description")
-       private String description;
-       @Column("remediationID")
-       private String remediationID;
-       @Column("remediationCallback")
-       private String remediationCallback;
-       @Column("alertContext")
-       private String alertContext;
-       @Column("streamId")
-       private String streamId;
-
-       public String getDescription() {
-               return description;
-       }
-
-       public void setDescription(String description) {
-               this.description = description;
-               _pcs.firePropertyChange("description", null, null);
-       }
-
-       public String getRemediationID() {
-               return remediationID;
-       }
-
-       public void setRemediationID(String remediationID) {
-               this.remediationID = remediationID;
-               _pcs.firePropertyChange("remediationID", null, null);
-       }
-
-       public String getRemediationCallback() {
-               return remediationCallback;
-       }
-
-       public void setRemediationCallback(String remediationCallback) {
-               this.remediationCallback = remediationCallback;
-               _pcs.firePropertyChange("remediationCallback", null, null);
-       }
-
-    @JsonIgnore
-       public String getAlertContext() {
-               return alertContext;
-       }
-
-    @JsonProperty("alertContext")
-       public AlertContext getWrappedAlertContext() {
-               return AlertContext.fromJsonString(alertContext);
-       }
-
-    @JsonIgnore
-       public void setAlertContext(String alertContext) {
-               this.alertContext = alertContext;
-               _pcs.firePropertyChange("alertContext", null, null);
-       }
-    @JsonProperty("alertContext")
-       public void setDecodedAlertContext(AlertContext alertContext) {
-               if(alertContext != null) this.alertContext = 
alertContext.toJsonString();
-               _pcs.firePropertyChange("alertContext", null, null);
-       }
-
-       public String getStreamId() {
-               return streamId;
-       }
-
-       public void setStreamId(String streamId) {
-               this.streamId = streamId;
-       }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/alert/entity/AlertContextSerDeser.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/alert/entity/AlertContextSerDeser.java
 
b/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/alert/entity/AlertContextSerDeser.java
deleted file mode 100644
index f001725..0000000
--- 
a/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/alert/entity/AlertContextSerDeser.java
+++ /dev/null
@@ -1,98 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.entity;
-
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Map.Entry;
-
-import org.apache.eagle.common.metric.AlertContext;
-import org.apache.hadoop.hbase.util.Bytes;
-
-import org.apache.eagle.log.entity.meta.EntitySerDeser;
-
-public class AlertContextSerDeser implements EntitySerDeser<AlertContext> {
-
-       @Override
-       public AlertContext deserialize(byte[] bytes) {
-               AlertContext context = new AlertContext();
-               Map<String, String> properties = new HashMap<String, String>();
-               final int length = bytes.length;
-               if (length < 4) { return context; }
-               int size = Bytes.toInt(bytes, 0, 4);
-               
-               int offset = 4;
-               for (int i = 0; i < size; i++) {
-                       int keySize =  Bytes.toInt(bytes, offset, 4);
-                       offset += 4;
-                       int valueSize =  Bytes.toInt(bytes, offset, 4);
-                       offset += 4;
-                       String key = Bytes.toString(bytes, offset, keySize);
-                       offset += keySize;
-                       String value =Bytes.toString(bytes, offset, valueSize);
-                       offset += valueSize;
-                       properties.put(key, value);
-               }
-               context.addAll(properties);
-               return context;
-       }
-
-       @Override
-       public byte[] serialize(AlertContext context) {
-               
-               final Map<String, String> pair = context.getProperties();
-               int totalSize = 4;
-               for (Entry<String, String> entry : pair.entrySet()) {
-                       String key = entry.getKey();
-                       String value = entry.getValue();
-            int keySize = 0;
-            if(key!=null) keySize = key.getBytes().length;
-                       int valueSize = 0;
-            if(value!=null) valueSize = value.getBytes().length;
-                       totalSize += keySize + valueSize + 8;
-               }
-               byte[] buffer = new byte[totalSize];
-               
-               Bytes.putInt(buffer, 0, pair.size());
-               int offset = 4;
-               for (Entry<String, String> entry : pair.entrySet()) {
-                       String key = entry.getKey();
-                       String value = entry.getValue();
-
-                       int keySize = key !=null ? key.getBytes().length : 0;
-            int valueSize = value != null ? value.getBytes().length:0;
-
-            Bytes.putInt(buffer, offset, keySize);
-                       offset += 4;
-                       Bytes.putInt(buffer, offset, valueSize);
-                       offset += 4;
-
-
-            Bytes.putBytes(buffer, offset, key != null ? key.getBytes() : new 
byte[0], 0, keySize);
-                       offset += keySize;
-                       Bytes.putBytes(buffer, offset, value != null ? 
value.getBytes() : new byte[0], 0, valueSize);
-                       offset += valueSize;
-               }
-               return buffer;
-       }
-
-
-       @Override
-       public Class<AlertContext> type(){
-               return AlertContext.class;
-       }
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/alert/entity/AlertDataSourceEntity.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/alert/entity/AlertDataSourceEntity.java
 
b/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/alert/entity/AlertDataSourceEntity.java
deleted file mode 100644
index 7f8dbe3..0000000
--- 
a/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/alert/entity/AlertDataSourceEntity.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.entity;
-
-import org.apache.eagle.policy.common.Constants;
-import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity;
-import org.apache.eagle.log.entity.meta.*;
-import org.codehaus.jackson.annotate.JsonIgnoreProperties;
-import org.codehaus.jackson.map.annotate.JsonSerialize;
-
-@JsonSerialize(include=JsonSerialize.Inclusion.NON_NULL)
-@Table("alertDataSource")
-@ColumnFamily("f")
-@Prefix("alertDataSource")
-@Service(Constants.ALERT_DATA_SOURCE_SERVICE_ENDPOINT_NAME)
-@JsonIgnoreProperties(ignoreUnknown = true)
-@TimeSeries(false)
-@Tags({"site", "dataSource"})
-@Deprecated
-public class AlertDataSourceEntity extends TaggedLogAPIEntity{
-    @Column("a")
-    private boolean enabled;
-    @Column("b")
-    private String config;
-    @Column("c")
-    private String description;
-
-    public String getConfig() {
-        return config;
-    }
-
-    public void setConfig(String config) {
-        this.config = config;
-        valueChanged("config");
-    }
-
-    public boolean isEnabled() {
-        return enabled;
-    }
-
-    public void setEnabled(boolean enabled) {
-        this.enabled = enabled;
-        valueChanged("enabled");
-    }
-
-    public String getDescription() {
-        return description;
-    }
-
-    public void setDescription(String desc) {
-        this.description = desc;
-        valueChanged("desc");
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/alert/entity/AlertDefinitionAPIEntity.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/alert/entity/AlertDefinitionAPIEntity.java
 
b/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/alert/entity/AlertDefinitionAPIEntity.java
deleted file mode 100644
index 036ec7a..0000000
--- 
a/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/alert/entity/AlertDefinitionAPIEntity.java
+++ /dev/null
@@ -1,195 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.entity;
-
-import org.apache.commons.lang.builder.HashCodeBuilder;
-import org.apache.eagle.policy.common.Constants;
-import org.apache.eagle.log.entity.meta.Column;
-import org.apache.eagle.log.entity.meta.ColumnFamily;
-import org.apache.eagle.log.entity.meta.Index;
-import org.apache.eagle.log.entity.meta.Indexes;
-import org.apache.eagle.log.entity.meta.Prefix;
-import org.apache.eagle.log.entity.meta.Service;
-import org.apache.eagle.log.entity.meta.Table;
-import org.apache.eagle.log.entity.meta.Tags;
-import org.apache.eagle.log.entity.meta.TimeSeries;
-import org.codehaus.jackson.annotate.JsonIgnoreProperties;
-import org.codehaus.jackson.map.annotate.JsonSerialize;
-
-/**
- * site: site name
- * dataSource: data source name
- *
- * alertExecutorId: Group Policy by alertExecutorId, the policy definition 
with the sample ["site", "dataSource", "alertExecutorId"] should run on the 
sample alert executor
- *
- * policyId: policy name, should be unique
- * policyType: policy engine implementation type
- */
-@JsonSerialize(include=JsonSerialize.Inclusion.NON_NULL)
-@Table("alertdef")
-@ColumnFamily("f")
-@Prefix("alertdef")
-@Service(Constants.ALERT_DEFINITION_SERVICE_ENDPOINT_NAME)
-@JsonIgnoreProperties(ignoreUnknown = true)
-@TimeSeries(false)
-@Tags({"site", "application", "alertExecutorId", "policyId", "policyType"})
-@Indexes({
-       @Index(name="Index_1_alertExecutorId", columns = { "alertExecutorID" }, 
unique = true),
-})
-public class AlertDefinitionAPIEntity extends AbstractPolicyDefinitionEntity {
-       @Column("a")
-       private String description;
-       @Column("b")
-       private String policyDef;
-       @Column("c")
-       private String dedupeDef;
-       @Column("d")
-       private String notificationDef;
-       @Column("e")
-       private String remediationDef;
-       @Column("f")
-       private boolean enabled;
-       @Column("g")
-       private String owner;
-       @Column("h")
-       private long lastModifiedDate;
-       @Column("i")
-       private long severity;
-       @Column("j")
-       private long createdTime;
-    @Column("k")
-    private boolean markdownEnabled;
-    @Column("l")
-    private String markdownReason;
-
-       public String getDescription() {
-               return description;
-       }
-       public void setDescription(String desc) {
-               this.description = desc;
-               valueChanged("description");
-       }
-
-       public String getPolicyDef() {
-               return policyDef;
-       }
-       public void setPolicyDef(String policyDef) {
-               this.policyDef = policyDef;
-               valueChanged("policyDef");
-       }
-       public String getDedupeDef() {
-               return dedupeDef;
-       }
-       public void setDedupeDef(String dedupeDef) {
-               this.dedupeDef = dedupeDef;
-               valueChanged("dedupeDef");
-       }
-       public String getNotificationDef() {
-               return notificationDef;
-       }
-       public void setNotificationDef(String notificationDef) {
-               this.notificationDef = notificationDef;
-               valueChanged("notificationDef");
-       }
-       public String getRemediationDef() {
-               return remediationDef;
-       }
-       public void setRemediationDef(String remediationDef) {
-               this.remediationDef = remediationDef;
-               valueChanged("remediationDef");
-       }
-       public boolean isEnabled() {
-               return enabled;
-       }
-       public void setEnabled(boolean enabled) {
-               this.enabled = enabled;
-               valueChanged("enabled");
-       }
-       public String getOwner() {
-               return owner;
-       }
-       public void setOwner(String owner) {
-               this.owner = owner;
-               valueChanged("owner");
-       }       
-       public long getLastModifiedDate() {
-               return lastModifiedDate;
-       }
-       public void setLastModifiedDate(long lastModifiedDate) {
-               this.lastModifiedDate = lastModifiedDate;
-               valueChanged("lastModifiedDate");
-       }       
-       public long getSeverity() {
-               return severity;
-       }
-       public void setSeverity(long severity) {
-               this.severity = severity;
-               valueChanged("severity");
-       }       
-       public long getCreatedTime() {
-               return createdTime;
-       }
-       public void setCreatedTime(long createdTime) {
-               this.createdTime = createdTime;
-               valueChanged("createdTime");
-       }
-    public boolean isMarkdownEnabled() { return markdownEnabled; }
-    public void setMarkdownEnabled(boolean markdownEnabled) {
-        this.markdownEnabled = markdownEnabled;
-        valueChanged("markdownEnabled");
-    }
-    public String getMarkdownReason() { return markdownReason; }
-    public void setMarkdownReason(String markdownReason) {
-        this.markdownReason = markdownReason;
-        valueChanged("markdownReason");
-    }
-
-       public boolean equals(Object o){
-               if(o == this)
-                       return true;
-               if(!(o instanceof AlertDefinitionAPIEntity))
-                       return false;
-               AlertDefinitionAPIEntity that = (AlertDefinitionAPIEntity)o;
-               if(that.enabled == this.enabled &&
-                               compare(that.policyDef, this.policyDef) &&
-                               compare(that.dedupeDef, this.dedupeDef) &&
-                               compare(that.notificationDef, 
this.notificationDef) &&
-                               compare(that.remediationDef, 
this.remediationDef))
-                       return true;
-               return false;
-       }
-       
-       private boolean compare(String a, String b){
-               if(a == b)
-                       return true;
-               if(a == null || b == null)
-                       return false;
-               if(a.equals(b))
-                       return true;
-               return false;
-       }
-       
-       public int hashCode(){
-               HashCodeBuilder builder = new HashCodeBuilder();
-               builder.append(enabled);
-               builder.append(policyDef);
-               builder.append(dedupeDef);
-               builder.append(notificationDef);
-               builder.append(remediationDef);
-               return builder.toHashCode();
-       }
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/alert/entity/AlertEntityRepository.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/alert/entity/AlertEntityRepository.java
 
b/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/alert/entity/AlertEntityRepository.java
deleted file mode 100644
index 507c482..0000000
--- 
a/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/alert/entity/AlertEntityRepository.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.entity;
-
-import org.apache.eagle.common.metric.AlertContext;
-import org.apache.eagle.log.entity.repo.EntityRepository;
-
-public class AlertEntityRepository extends EntityRepository {
-       public AlertEntityRepository() {
-               serDeserMap.put(AlertContext.class, new AlertContextSerDeser());
-               entitySet.add(AlertAPIEntity.class);
-               entitySet.add(AlertDefinitionAPIEntity.class);
-               entitySet.add(AlertStreamSchemaEntity.class);
-               entitySet.add(AlertStreamEntity.class);
-               //entitySet.add(AlertDataSourceEntity.class);
-        entitySet.add(AlertExecutorEntity.class);
-               entitySet.add(ApplicationDescServiceEntity.class);
-               entitySet.add(FeatureDescServiceEntity.class);
-               entitySet.add(SiteDescServiceEntity.class);
-               entitySet.add(SiteApplicationServiceEntity.class);
-               this.registerEntity(GenericResourceEntity.class);
-               entitySet.add(AlertNotificationEntity.class);
-       }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/alert/entity/AlertExecutorEntity.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/alert/entity/AlertExecutorEntity.java
 
b/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/alert/entity/AlertExecutorEntity.java
deleted file mode 100644
index 7e023e6..0000000
--- 
a/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/alert/entity/AlertExecutorEntity.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.entity;
-
-import org.apache.eagle.policy.common.Constants;
-import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity;
-import org.apache.eagle.log.entity.meta.*;
-import org.codehaus.jackson.annotate.JsonIgnoreProperties;
-import org.codehaus.jackson.map.annotate.JsonSerialize;
-
-/**
- * Stream and Alert executor Id mapping should be automatically created by 
topology definition
- */
-@JsonSerialize(include=JsonSerialize.Inclusion.NON_NULL)
-@Table("alertExecutor")
-@ColumnFamily("f")
-@Prefix("alertExecutor")
-@Service(Constants.ALERT_EXECUTOR_SERVICE_ENDPOINT_NAME)
-@JsonIgnoreProperties(ignoreUnknown = true)
-@TimeSeries(false)
-@Tags({"site","application", "alertExecutorId", "streamName"})
-public class AlertExecutorEntity extends TaggedLogAPIEntity{
-    @Column("a")
-    private String description;
-
-    public String getDescription() {
-        return description;
-    }
-
-    public void setDescription(String desc) {
-        this.description = desc;
-        valueChanged("description");
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/alert/entity/AlertNotificationEntity.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/alert/entity/AlertNotificationEntity.java
 
b/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/alert/entity/AlertNotificationEntity.java
deleted file mode 100644
index 2782be2..0000000
--- 
a/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/alert/entity/AlertNotificationEntity.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.eagle.alert.entity;
-
-import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity;
-import org.apache.eagle.log.entity.meta.*;
-import org.apache.eagle.policy.common.Constants;
-import org.codehaus.jackson.annotate.JsonIgnoreProperties;
-import org.codehaus.jackson.map.annotate.JsonSerialize;
-
-import java.util.List;
-import java.util.Map;
-
-
-@JsonSerialize(include=JsonSerialize.Inclusion.NON_NULL)
-@Table("alertNotifications")
-@ColumnFamily("f")
-@Prefix("alertNotifications")
-@Service(Constants.ALERT_NOTIFICATION_SERVICE_ENDPOINT_NAME)
-@JsonIgnoreProperties(ignoreUnknown = true)
-@TimeSeries(false)
-@Tags({"notificationType"})
-public class AlertNotificationEntity extends TaggedLogAPIEntity {
-    @Column("a")
-    private String className;
-    public String getClassName(){
-        return className;
-    }
-    public void setClassName(String className){
-        this.className = className;
-        valueChanged("className");
-    }
-
-    @Column("b")
-    private String description;
-    public String getDescription(){
-        return description;
-    }
-    public void setDescription(String description){
-        this.description = description;
-        valueChanged("description");
-    }
-
-    @Column("c")
-    private boolean enabled;
-    public boolean isEnabled() {
-        return enabled;
-    }
-
-    public void setEnabled(boolean enabled) {
-        this.enabled = enabled;
-        valueChanged("enabled");
-    }
-
-    @Column("d")
-    private List<Map<String, String>> fields;
-    public List<Map<String, String>> getFields() {
-        return fields;
-    }
-
-    public void setFields(List<Map<String, String>> fields) {
-        this.fields = fields;
-        valueChanged("fields");
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/alert/entity/AlertStreamEntity.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/alert/entity/AlertStreamEntity.java
 
b/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/alert/entity/AlertStreamEntity.java
deleted file mode 100644
index 685da4f..0000000
--- 
a/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/alert/entity/AlertStreamEntity.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.entity;
-
-import org.apache.eagle.policy.common.Constants;
-import org.codehaus.jackson.annotate.JsonIgnoreProperties;
-import org.codehaus.jackson.map.annotate.JsonSerialize;
-
-import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity;
-import org.apache.eagle.log.entity.meta.Column;
-import org.apache.eagle.log.entity.meta.ColumnFamily;
-import org.apache.eagle.log.entity.meta.Prefix;
-import org.apache.eagle.log.entity.meta.Service;
-import org.apache.eagle.log.entity.meta.Table;
-import org.apache.eagle.log.entity.meta.Tags;
-import org.apache.eagle.log.entity.meta.TimeSeries;
-
-@JsonSerialize(include=JsonSerialize.Inclusion.NON_NULL)
-@Table("alertStream")
-@ColumnFamily("f")
-@Prefix("alertStream")
-@Service(Constants.ALERT_STREAM_SERVICE_ENDPOINT_NAME)
-@JsonIgnoreProperties(ignoreUnknown = true)
-@TimeSeries(false)
-@Tags({"site","application", "streamName"})
-public class AlertStreamEntity extends TaggedLogAPIEntity{
-       @Column("a")
-       private String description;
-
-       public String getDescription() {
-               return description;
-       }
-       public void setDescription(String description) {
-               this.description = description;
-               valueChanged("description");
-       }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/alert/entity/ApplicationDescServiceEntity.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/alert/entity/ApplicationDescServiceEntity.java
 
b/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/alert/entity/ApplicationDescServiceEntity.java
deleted file mode 100644
index 54ff203..0000000
--- 
a/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/alert/entity/ApplicationDescServiceEntity.java
+++ /dev/null
@@ -1,94 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- *
- */
-
-package org.apache.eagle.alert.entity;
-
-
-import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity;
-import org.apache.eagle.log.entity.meta.*;
-import org.apache.eagle.policy.common.Constants;
-import org.codehaus.jackson.annotate.JsonIgnoreProperties;
-import org.codehaus.jackson.map.annotate.JsonSerialize;
-
-import java.util.List;
-
-@JsonSerialize(include=JsonSerialize.Inclusion.NON_NULL)
-@Table("eagleApplicationDesc")
-@ColumnFamily("f")
-@Prefix("eagleApplicationDesc")
-@Service(Constants.APPLICATION_DESCRIPTION_SERVICE_ENDPOINT_NAME)
-@JsonIgnoreProperties(ignoreUnknown = true)
-@TimeSeries(false)
-@Tags({"application"})
-public class ApplicationDescServiceEntity extends TaggedLogAPIEntity {
-    @Column("a")
-    private String description;
-    @Column("b")
-    private String alias;
-    @Column("c")
-    private String groupName;
-    @Column("d")
-    private List<String> features;
-    @Column("e")
-    private String config;
-
-    public String getConfig() {
-        return config;
-    }
-
-    public void setConfig(String config) {
-        this.config = config;
-        valueChanged("config");
-    }
-
-    public String getDescription() {
-        return description;
-    }
-
-    public void setDescription(String description) {
-        this.description = description;
-        valueChanged("description");
-    }
-
-    public String getGroupName() {
-        return groupName;
-    }
-
-    public void setGroupName(String groupName) {
-        this.groupName = groupName;
-        valueChanged("groupName");
-    }
-
-    public String getAlias() {
-        return alias;
-    }
-
-    public void setAlias(String alias) {
-        this.alias = alias;
-        valueChanged("alias");
-    }
-
-    public List<String> getFeatures() {
-        return features;
-    }
-
-    public void setFeatures(List<String> features) {
-        this.features = features;
-        valueChanged("features");
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/alert/entity/FeatureDescServiceEntity.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/alert/entity/FeatureDescServiceEntity.java
 
b/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/alert/entity/FeatureDescServiceEntity.java
deleted file mode 100644
index 0f3c738..0000000
--- 
a/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/alert/entity/FeatureDescServiceEntity.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- *
- */
-
-package org.apache.eagle.alert.entity;
-
-
-import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity;
-import org.apache.eagle.log.entity.meta.*;
-import org.apache.eagle.policy.common.Constants;
-import org.codehaus.jackson.annotate.JsonIgnoreProperties;
-import org.codehaus.jackson.map.annotate.JsonSerialize;
-
-@JsonSerialize(include=JsonSerialize.Inclusion.NON_NULL)
-@Table("eagleFeatureDesc")
-@ColumnFamily("f")
-@Prefix("eagleFeatureDesc")
-@Service(Constants.FEATURE_DESCRIPTION_SERVICE_ENDPOINT_NAME)
-@JsonIgnoreProperties(ignoreUnknown = true)
-@TimeSeries(false)
-@Tags({"feature"})
-public class FeatureDescServiceEntity extends TaggedLogAPIEntity {
-    @Column("a")
-    private String description;
-    @Column("b")
-    private String version;
-
-    public String getVersion() {
-        return version;
-    }
-
-    public void setVersion(String version) {
-        this.version = version;
-        valueChanged("version");
-    }
-
-    public String getDescription() {
-        return description;
-    }
-
-    public void setDescription(String description) {
-        this.description = description;
-        valueChanged("description");
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/alert/entity/GenericResourceEntity.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/alert/entity/GenericResourceEntity.java
 
b/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/alert/entity/GenericResourceEntity.java
deleted file mode 100644
index c855499..0000000
--- 
a/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/alert/entity/GenericResourceEntity.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- *
- */
-
-package org.apache.eagle.alert.entity;
-
-import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity;
-import org.apache.eagle.log.entity.meta.*;
-import org.apache.eagle.policy.common.Constants;
-import org.codehaus.jackson.annotate.JsonIgnoreProperties;
-import org.codehaus.jackson.map.annotate.JsonSerialize;
-
-@JsonSerialize(include=JsonSerialize.Inclusion.NON_NULL)
-@Table("eagle_metadata")
-@ColumnFamily("f")
-@Prefix("generic_resource")
-@Service(Constants.GENERIC_RESOURCE_SERVICE_ENDPOINT_NAME)
-@JsonIgnoreProperties(ignoreUnknown = true)
-@TimeSeries(false)
-@Tags({"site", "name"})
-public class GenericResourceEntity extends TaggedLogAPIEntity {
-    @Column("a")
-    private String value;
-
-    public String getValue() {
-        return value;
-    }
-
-    public void setValue(String value) {
-        this.value = value;
-        valueChanged("value");
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/alert/entity/SiteApplicationServiceEntity.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/alert/entity/SiteApplicationServiceEntity.java
 
b/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/alert/entity/SiteApplicationServiceEntity.java
deleted file mode 100644
index 5eea4b3..0000000
--- 
a/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/alert/entity/SiteApplicationServiceEntity.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- *
- */
-
-package org.apache.eagle.alert.entity;
-
-
-import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity;
-import org.apache.eagle.log.entity.meta.*;
-import org.apache.eagle.policy.common.Constants;
-import org.codehaus.jackson.annotate.JsonIgnoreProperties;
-import org.codehaus.jackson.map.annotate.JsonSerialize;
-
-
-@JsonSerialize(include=JsonSerialize.Inclusion.NON_NULL)
-@Table("eagleSiteApplication")
-@ColumnFamily("f")
-@Prefix("eagleSiteApplication")
-@Service(Constants.SITE_APPLICATION_SERVICE_ENDPOINT_NAME)
-@JsonIgnoreProperties(ignoreUnknown = true)
-@TimeSeries(false)
-@Tags({"site", "application"})
-public class SiteApplicationServiceEntity extends TaggedLogAPIEntity {
-    @Column("a")
-    private String config;
-    @Column("b")
-    private Boolean enabled;
-
-    public String getConfig() {
-        return config;
-    }
-
-    public void setConfig(String config) {
-        this.config = config;
-        valueChanged("config");
-    }
-
-    public Boolean getEnabled() {
-        return enabled;
-    }
-
-    public void setEnabled(Boolean enabled) {
-        this.enabled = enabled;
-        valueChanged("enabled");
-    }
-}


Reply via email to