http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/eagle/datastream/FilterBoltWrapper.scala
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/eagle/datastream/FilterBoltWrapper.scala
 
b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/eagle/datastream/FilterBoltWrapper.scala
deleted file mode 100644
index cf9b53f..0000000
--- 
a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/eagle/datastream/FilterBoltWrapper.scala
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package eagle.datastream
-
-import java.util
-
-import backtype.storm.task.{OutputCollector, TopologyContext}
-import backtype.storm.topology.OutputFieldsDeclarer
-import backtype.storm.topology.base.BaseRichBolt
-import backtype.storm.tuple.{Fields, Tuple}
-import org.slf4j.LoggerFactory
-
-case class FilterBoltWrapper[T](fn : T => Boolean) extends BaseRichBolt{
-  val LOG = LoggerFactory.getLogger(FilterBoltWrapper.getClass)
-  var _collector : OutputCollector = null
-
-  override def prepare(stormConf: util.Map[_, _], context: TopologyContext, 
collector: OutputCollector): Unit = {
-    _collector = collector
-  }
-
-  override def execute(input : Tuple): Unit = {
-    input.getValue(0) match {
-      case v:T =>
-        if(fn(v)){
-          _collector.emit(input, input.getValues)
-          _collector.ack(input)
-        }
-    }
-  }
-
-  override def declareOutputFields(declarer : OutputFieldsDeclarer): Unit ={
-    declarer.declare(new Fields(OutputFieldNameConst.FIELD_PREFIX + "0"))
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/eagle/datastream/GraphPrinter.scala
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/eagle/datastream/GraphPrinter.scala
 
b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/eagle/datastream/GraphPrinter.scala
deleted file mode 100644
index 24afdbb..0000000
--- 
a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/eagle/datastream/GraphPrinter.scala
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- *
- *  * Licensed to the Apache Software Foundation (ASF) under one or more
- *  * contributor license agreements.  See the NOTICE file distributed with
- *  * this work for additional information regarding copyright ownership.
- *  * The ASF licenses this file to You under the Apache License, Version 2.0
- *  * (the "License"); you may not use this file except in compliance with
- *  * the License.  You may obtain a copy of the License at
- *  *
- *  *    http://www.apache.org/licenses/LICENSE-2.0
- *  *
- *  * Unless required by applicable law or agreed to in writing, software
- *  * distributed under the License is distributed on an "AS IS" BASIS,
- *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  * See the License for the specific language governing permissions and
- *  * limitations under the License.
- *
- */
-
-package eagle.datastream
-
-import org.jgrapht.experimental.dag.DirectedAcyclicGraph
-import org.slf4j.LoggerFactory
-import scala.collection.JavaConversions._
-
-object GraphPrinter {
-  private val LOG = LoggerFactory.getLogger(GraphPrinter.getClass)
-  def print(dag: DirectedAcyclicGraph[StreamProducer, StreamConnector]): Unit 
={
-    val iter = dag.iterator()
-    while(iter.hasNext) {
-      val current = iter.next()
-      dag.outgoingEdgesOf(current).foreach(edge => {
-        LOG.info(edge.from + "{" + edge.from.parallelism + "}" +" => " + 
edge.to + "{" + edge.to.parallelism + "}" + " with groupByFields " + 
edge.groupByFields)
-      })
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/eagle/datastream/JavaStormBoltWrapper.scala
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/eagle/datastream/JavaStormBoltWrapper.scala
 
b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/eagle/datastream/JavaStormBoltWrapper.scala
deleted file mode 100644
index 13be3c7..0000000
--- 
a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/eagle/datastream/JavaStormBoltWrapper.scala
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package eagle.datastream
-
-import java.util
-
-import backtype.storm.task.{OutputCollector, TopologyContext}
-import backtype.storm.topology.OutputFieldsDeclarer
-import backtype.storm.topology.base.BaseRichBolt
-import backtype.storm.tuple.{Fields, Tuple}
-import org.slf4j.LoggerFactory
-
-import scala.collection.JavaConverters._
-
-case class JavaStormBoltWrapper(worker : JavaStormStreamExecutor[EagleTuple]) 
extends BaseRichBolt{
-  val LOG = LoggerFactory.getLogger(StormBoltWrapper.getClass)
-  var _collector : OutputCollector = null
-
-  override def prepare(stormConf: util.Map[_, _], context: TopologyContext, 
collector: OutputCollector): Unit = {
-    _collector = collector
-    worker.init
-  }
-
-  override def execute(input : Tuple): Unit ={
-    worker.flatMap(input.getValues, new Collector[EagleTuple](){
-      def collect(t: EagleTuple): Unit ={
-        _collector.emit(input, t.getList.asJava)
-      }
-    })
-    _collector.ack(input)
-  }
-
-  override def declareOutputFields(declarer : OutputFieldsDeclarer): Unit ={
-    val fields = worker.fields
-    LOG.info("output fields for worker " + worker + " : " + fields.toList)
-    declarer.declare(new Fields(fields:_*))
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/eagle/datastream/MapBoltWrapper.scala
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/eagle/datastream/MapBoltWrapper.scala
 
b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/eagle/datastream/MapBoltWrapper.scala
deleted file mode 100644
index 7407c32..0000000
--- 
a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/eagle/datastream/MapBoltWrapper.scala
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package eagle.datastream
-
-import java.util
-
-import backtype.storm.task.{OutputCollector, TopologyContext}
-import backtype.storm.topology.OutputFieldsDeclarer
-import backtype.storm.topology.base.BaseRichBolt
-import backtype.storm.tuple.{Fields, Tuple}
-import org.slf4j.LoggerFactory
-
-/**
- * @since  9/29/15
- */
-case class MapBoltWrapper[T,R](num: Int, fn: T => R) extends BaseRichBolt {
-  val LOG = LoggerFactory.getLogger(FilterBoltWrapper.getClass)
-  var _collector : OutputCollector = null
-
-  override def declareOutputFields(declarer: OutputFieldsDeclarer): Unit = {
-    var fields = new util.ArrayList[String]()
-    var i : Int = 0;
-    while(i < num){
-      fields.add(OutputFieldNameConst.FIELD_PREFIX + i)
-      i += 1
-    }
-    declarer.declare(new Fields(fields))
-  }
-
-  override def execute(input: Tuple): Unit = {
-    val size = input.size()
-    var values : AnyRef = null
-    size match {
-      case 1 => values = scala.Tuple1(input.getValue(0))
-      case 2 => values = scala.Tuple2(input.getValue(0), input.getValue(1))
-      case 3 => values = scala.Tuple3(input.getValue(0), input.getValue(1), 
input.getValue(2))
-      case 4 => values = scala.Tuple4(input.getValue(0), input.getValue(1), 
input.getValue(2), input.getValue(3))
-      case _ => throw new IllegalArgumentException
-    }
-    val output = fn(values.asInstanceOf[T])
-    output match {
-      case scala.Tuple1(a) => _collector.emit(input, 
util.Arrays.asList(a.asInstanceOf[AnyRef]))
-      case scala.Tuple2(a, b) => _collector.emit(input, 
util.Arrays.asList(a.asInstanceOf[AnyRef], b.asInstanceOf[AnyRef]))
-      case scala.Tuple3(a, b, c) => _collector.emit(input, 
util.Arrays.asList(a.asInstanceOf[AnyRef], b.asInstanceOf[AnyRef], 
c.asInstanceOf[AnyRef]))
-      case scala.Tuple4(a, b, c, d) => _collector.emit(input, 
util.Arrays.asList(a.asInstanceOf[AnyRef], b.asInstanceOf[AnyRef], 
c.asInstanceOf[AnyRef], d.asInstanceOf[AnyRef]))
-      case a => _collector.emit(input, 
util.Arrays.asList(a.asInstanceOf[AnyRef]))
-    }
-    _collector.ack(input)
-  }
-
-  override def prepare(stormConf: util.Map[_, _], context: TopologyContext, 
collector: OutputCollector): Unit = {
-    _collector = collector
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/eagle/datastream/NodeNameSelector.scala
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/eagle/datastream/NodeNameSelector.scala
 
b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/eagle/datastream/NodeNameSelector.scala
deleted file mode 100644
index f579881..0000000
--- 
a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/eagle/datastream/NodeNameSelector.scala
+++ /dev/null
@@ -1,26 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package eagle.datastream
-
-case class NodeNameSelector(producer : StreamProducer) {
-  def getName : String = {
-    producer.name match {
-      case null => producer.toString
-      case _ => producer.name
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/eagle/datastream/OutputFieldNameConst.scala
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/eagle/datastream/OutputFieldNameConst.scala
 
b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/eagle/datastream/OutputFieldNameConst.scala
deleted file mode 100644
index b00f149..0000000
--- 
a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/eagle/datastream/OutputFieldNameConst.scala
+++ /dev/null
@@ -1,21 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package eagle.datastream
-
-object OutputFieldNameConst {
-  val FIELD_PREFIX = "f"
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/eagle/datastream/SpoutProxy.scala
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/eagle/datastream/SpoutProxy.scala
 
b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/eagle/datastream/SpoutProxy.scala
deleted file mode 100644
index 7bed261..0000000
--- 
a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/eagle/datastream/SpoutProxy.scala
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package eagle.datastream
-
-import backtype.storm.spout.SpoutOutputCollector
-import backtype.storm.task.TopologyContext
-import backtype.storm.topology.OutputFieldsDeclarer
-import backtype.storm.topology.base.BaseRichSpout
-import backtype.storm.tuple.Fields
-
-/**
- * Declare delegated BaseRichSpout with given field names
- *
- * @param delegate delegated BaseRichSpout
- * @param outputFields given field names
- */
-case class SpoutProxy(delegate: BaseRichSpout, outputFields: 
java.util.List[String]) extends BaseRichSpout{
-  def open(conf: java.util.Map[_, _], context: TopologyContext, collector: 
SpoutOutputCollector) {
-    this.delegate.open(conf, context, collector)
-  }
-
-  def nextTuple {
-    this.delegate.nextTuple
-  }
-
-  override def ack(msgId: AnyRef) {
-    this.delegate.ack(msgId)
-  }
-
-  override def fail(msgId: AnyRef) {
-    this.delegate.fail(msgId)
-  }
-
-  override def deactivate {
-    this.delegate.deactivate
-  }
-
-  override def declareOutputFields(declarer: OutputFieldsDeclarer) {
-    declarer.declare(new Fields(outputFields))
-  }
-
-  override def close {
-    this.delegate.close
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/eagle/datastream/StormBoltFactory.scala
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/eagle/datastream/StormBoltFactory.scala
 
b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/eagle/datastream/StormBoltFactory.scala
deleted file mode 100644
index c35b3e9..0000000
--- 
a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/eagle/datastream/StormBoltFactory.scala
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- *
- *  * Licensed to the Apache Software Foundation (ASF) under one or more
- *  * contributor license agreements.  See the NOTICE file distributed with
- *  * this work for additional information regarding copyright ownership.
- *  * The ASF licenses this file to You under the Apache License, Version 2.0
- *  * (the "License"); you may not use this file except in compliance with
- *  * the License.  You may obtain a copy of the License at
- *  *
- *  *    http://www.apache.org/licenses/LICENSE-2.0
- *  *
- *  * Unless required by applicable law or agreed to in writing, software
- *  * distributed under the License is distributed on an "AS IS" BASIS,
- *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  * See the License for the specific language governing permissions and
- *  * limitations under the License.
- *
- */
-package eagle.datastream
-
-import backtype.storm.topology.base.BaseRichBolt
-import com.typesafe.config.Config
-
-object StormBoltFactory {
-  def getBoltWrapper(graph: AbstractStreamProducerGraph, producer : 
StreamProducer, config : Config) : BaseRichBolt = {
-    producer match{
-      case FlatMapProducer(id, worker) => {
-        if(worker.isInstanceOf[JavaStormStreamExecutor[EagleTuple]]){
-          
worker.asInstanceOf[JavaStormStreamExecutor[EagleTuple]].prepareConfig(config)
-          
JavaStormBoltWrapper(worker.asInstanceOf[JavaStormStreamExecutor[EagleTuple]])
-        }else if(worker.isInstanceOf[StormStreamExecutor[EagleTuple]]){
-          
worker.asInstanceOf[StormStreamExecutor[EagleTuple]].prepareConfig(config)
-          
StormBoltWrapper(worker.asInstanceOf[StormStreamExecutor[EagleTuple]])
-        }else {
-          throw new UnsupportedOperationException
-        }
-      }
-      case FilterProducer(id, fn) => {
-        FilterBoltWrapper(fn)
-      }
-      case MapProducer(id, n, fn) => {
-        MapBoltWrapper(n, fn)
-      }
-      case _ => throw new UnsupportedOperationException
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/eagle/datastream/StormBoltWrapper.scala
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/eagle/datastream/StormBoltWrapper.scala
 
b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/eagle/datastream/StormBoltWrapper.scala
deleted file mode 100644
index 5a953c9..0000000
--- 
a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/eagle/datastream/StormBoltWrapper.scala
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package eagle.datastream
-
-import java.util
-
-import backtype.storm.task.{OutputCollector, TopologyContext}
-import backtype.storm.topology.OutputFieldsDeclarer
-import backtype.storm.topology.base.BaseRichBolt
-import backtype.storm.tuple.{Fields, Tuple}
-import org.slf4j.LoggerFactory
-
-import scala.collection.JavaConverters._
-
-case class StormBoltWrapper(worker : StormStreamExecutor[EagleTuple]) extends 
BaseRichBolt{
-  val LOG = LoggerFactory.getLogger(StormBoltWrapper.getClass)
-  var _collector : OutputCollector = null
-
-  override def prepare(stormConf: util.Map[_, _], context: TopologyContext, 
collector: OutputCollector): Unit = {
-    _collector = collector
-    worker.init
-  }
-
-  override def execute(input : Tuple): Unit = {
-    try {
-      worker.flatMap(input.getValues.asScala, new Collector[EagleTuple] {
-        override def collect(t: EagleTuple): Unit = {
-          _collector.emit(input, t.getList.asJava)
-        }
-      })
-    }catch{
-      case ex: Exception => {
-        LOG.error("fail executing", ex)
-        _collector.fail(input)
-        throw new RuntimeException(ex)
-      }
-    }
-    _collector.ack(input)
-  }
-
-  override def declareOutputFields(declarer : OutputFieldsDeclarer): Unit ={
-    val fields = worker.fields
-    LOG.info("output fields for worker " + worker + " : " + fields.toList)
-    declarer.declare(new Fields(fields:_*))
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/eagle/datastream/StormExecutorForAlertWrapper.scala
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/eagle/datastream/StormExecutorForAlertWrapper.scala
 
b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/eagle/datastream/StormExecutorForAlertWrapper.scala
deleted file mode 100644
index fd3e2e5..0000000
--- 
a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/eagle/datastream/StormExecutorForAlertWrapper.scala
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- *
- *  * Licensed to the Apache Software Foundation (ASF) under one or more
- *  * contributor license agreements.  See the NOTICE file distributed with
- *  * this work for additional information regarding copyright ownership.
- *  * The ASF licenses this file to You under the Apache License, Version 2.0
- *  * (the "License"); you may not use this file except in compliance with
- *  * the License.  You may obtain a copy of the License at
- *  *
- *  *    http://www.apache.org/licenses/LICENSE-2.0
- *  *
- *  * Unless required by applicable law or agreed to in writing, software
- *  * distributed under the License is distributed on an "AS IS" BASIS,
- *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  * See the License for the specific language governing permissions and
- *  * limitations under the License.
- *
- */
-
-package eagle.datastream
-
-import java.util
-
-import com.typesafe.config.Config
-
-case class StormExecutorForAlertWrapper(delegate: 
StormStreamExecutor[Tuple2[String, util.SortedMap[AnyRef, AnyRef]]], 
streamName: String)
-  extends StormStreamExecutor3[String, String, util.SortedMap[Object, Object]]{
-  override def prepareConfig(config: Config): Unit = {
-    delegate.prepareConfig(config)
-  }
-
-  override def init: Unit = {
-    delegate.init
-  }
-
-  override def flatMap(input: Seq[AnyRef], collector: Collector[Tuple3[String, 
String, util.SortedMap[Object, Object]]]): Unit = {
-    delegate.flatMap(input, new Collector[Tuple2[String, 
util.SortedMap[AnyRef, AnyRef]]] {
-      override def collect(r: Tuple2[String, util.SortedMap[AnyRef, AnyRef]]): 
Unit = {
-        collector.collect(Tuple3(r.f0, streamName, r.f1))
-      }
-    })
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/eagle/datastream/StormSpoutFactory.scala
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/eagle/datastream/StormSpoutFactory.scala
 
b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/eagle/datastream/StormSpoutFactory.scala
deleted file mode 100644
index be4ee81..0000000
--- 
a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/eagle/datastream/StormSpoutFactory.scala
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- *
- *  * Licensed to the Apache Software Foundation (ASF) under one or more
- *  * contributor license agreements.  See the NOTICE file distributed with
- *  * this work for additional information regarding copyright ownership.
- *  * The ASF licenses this file to You under the Apache License, Version 2.0
- *  * (the "License"); you may not use this file except in compliance with
- *  * the License.  You may obtain a copy of the License at
- *  *
- *  *    http://www.apache.org/licenses/LICENSE-2.0
- *  *
- *  * Unless required by applicable law or agreed to in writing, software
- *  * distributed under the License is distributed on an "AS IS" BASIS,
- *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  * See the License for the specific language governing permissions and
- *  * limitations under the License.
- *
- */
-package eagle.datastream
-
-import java.util
-
-import backtype.storm.topology.base.BaseRichSpout
-import com.typesafe.config.Config
-
-object StormSpoutFactory {
-  /**
-   * @param config context configuration
-   * @param sourceProducer source producer
-   * @return
-   */
-  def createSpout(config: Config, sourceProducer: StormSourceProducer) : 
BaseRichSpout = {
-    val numFields = sourceProducer.numFields
-    if(numFields <= 0) {
-      sourceProducer.source
-    }else{
-      var i = 0
-      val ret = new util.ArrayList[String]
-      while(i < numFields){
-        ret.add(OutputFieldNameConst.FIELD_PREFIX + i)
-        i += 1
-      }
-      SpoutProxy(sourceProducer.source, ret)
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/eagle/datastream/StormStreamDAG.scala
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/eagle/datastream/StormStreamDAG.scala
 
b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/eagle/datastream/StormStreamDAG.scala
deleted file mode 100644
index 8fe9695..0000000
--- 
a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/eagle/datastream/StormStreamDAG.scala
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- *
- *  * Licensed to the Apache Software Foundation (ASF) under one or more
- *  * contributor license agreements.  See the NOTICE file distributed with
- *  * this work for additional information regarding copyright ownership.
- *  * The ASF licenses this file to You under the Apache License, Version 2.0
- *  * (the "License"); you may not use this file except in compliance with
- *  * the License.  You may obtain a copy of the License at
- *  *
- *  *    http://www.apache.org/licenses/LICENSE-2.0
- *  *
- *  * Unless required by applicable law or agreed to in writing, software
- *  * distributed under the License is distributed on an "AS IS" BASIS,
- *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  * See the License for the specific language governing permissions and
- *  * limitations under the License.
- *
- */
-package eagle.datastream
-
-import org.jgrapht.experimental.dag.DirectedAcyclicGraph
-
-import scala.collection.JavaConverters._
-import scala.collection.{JavaConversions, mutable}
-
-/**
- * wrapper of DAG, used for storm topology compiler
- */
-class StormStreamDAG(graph: DirectedAcyclicGraph[StreamProducer, 
StreamConnector]) extends AbstractStreamProducerGraph {
-  var nodeMap: mutable.Map[String, StreamProducer] = null
-
-  override def addEdge(from: StreamProducer, to: StreamProducer, 
streamConnector: StreamConnector): Unit = {
-    graph.addEdge(from, to, streamConnector)
-  }
-
-  override def addVertex(producer: StreamProducer): Unit = {
-    graph.addVertex(producer)
-  }
-
-  override def iterator(): Iterator[StreamProducer] = {
-    JavaConversions.asScalaIterator(graph.iterator())
-  }
-
-  override def isSource(v: StreamProducer): Boolean = {
-    graph.inDegreeOf(v) match {
-      case 0 => true
-      case _ => false
-    }
-  }
-
-  override def outgoingEdgesOf(v: StreamProducer): 
scala.collection.mutable.Set[StreamConnector] = {
-    JavaConversions.asScalaSet(graph.outgoingEdgesOf(v))
-  }
-
-  override def getNodeByName(name: String): Option[StreamProducer] = {
-    nodeMap.get(name)
-  }
-
-  def setNodeMap(nodeMap: mutable.Map[String, StreamProducer]): Unit = {
-    this.nodeMap = nodeMap
-  }
-
-  override def incomingVertexOf(v: StreamProducer): 
scala.collection.mutable.Set[StreamProducer] = {
-    val set = mutable.Set[StreamProducer]()
-    graph.incomingEdgesOf(v).asScala.foreach(e => set += 
graph.getEdgeSource(e))
-    set
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/eagle/datastream/StormStreamDAGTransformer.scala
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/eagle/datastream/StormStreamDAGTransformer.scala
 
b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/eagle/datastream/StormStreamDAGTransformer.scala
deleted file mode 100644
index 5966387..0000000
--- 
a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/eagle/datastream/StormStreamDAGTransformer.scala
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- *
- *  * Licensed to the Apache Software Foundation (ASF) under one or more
- *  * contributor license agreements.  See the NOTICE file distributed with
- *  * this work for additional information regarding copyright ownership.
- *  * The ASF licenses this file to You under the Apache License, Version 2.0
- *  * (the "License"); you may not use this file except in compliance with
- *  * the License.  You may obtain a copy of the License at
- *  *
- *  *    http://www.apache.org/licenses/LICENSE-2.0
- *  *
- *  * Unless required by applicable law or agreed to in writing, software
- *  * distributed under the License is distributed on an "AS IS" BASIS,
- *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  * See the License for the specific language governing permissions and
- *  * limitations under the License.
- *
- */
-
-package eagle.datastream
-
-import org.jgrapht.experimental.dag.DirectedAcyclicGraph
-import scala.collection.mutable
-
-/**
- * convert generic DAG data structure to Storm specific DAG data structure for 
easy topology compiler
- */
-object StormStreamDAGTransformer {
-  /**
-   * Transform DirectedAcyclicGraph[StreamProducer, StreamConnector] into 
StormStreamDAG
-   *
-   * @param dag DirectedAcyclicGraph[StreamProducer, StreamConnector]
-   * @return StormStreamDAG
-   */
-  def transform(dag: DirectedAcyclicGraph[StreamProducer, StreamConnector]) : 
StormStreamDAG = {
-    val stormDAG = new StormStreamDAG(dag)
-    val nodeMap = mutable.HashMap[String, StreamProducer]()
-    val iter = dag.iterator()
-    while(iter.hasNext){
-      val sp = iter.next()
-      nodeMap.put(sp.name, sp)
-    }
-    stormDAG.setNodeMap(nodeMap)
-    stormDAG
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/eagle/datastream/StormTopologyCompiler.scala
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/eagle/datastream/StormTopologyCompiler.scala
 
b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/eagle/datastream/StormTopologyCompiler.scala
deleted file mode 100644
index de6cc9f..0000000
--- 
a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/eagle/datastream/StormTopologyCompiler.scala
+++ /dev/null
@@ -1,98 +0,0 @@
-/*
- *
- *  * Licensed to the Apache Software Foundation (ASF) under one or more
- *  * contributor license agreements.  See the NOTICE file distributed with
- *  * this work for additional information regarding copyright ownership.
- *  * The ASF licenses this file to You under the Apache License, Version 2.0
- *  * (the "License"); you may not use this file except in compliance with
- *  * the License.  You may obtain a copy of the License at
- *  *
- *  *    http://www.apache.org/licenses/LICENSE-2.0
- *  *
- *  * Unless required by applicable law or agreed to in writing, software
- *  * distributed under the License is distributed on an "AS IS" BASIS,
- *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  * See the License for the specific language governing permissions and
- *  * limitations under the License.
- *
- */
-package eagle.datastream
-
-import java.util
-
-import backtype.storm.topology.base.BaseRichBolt
-import backtype.storm.topology.{BoltDeclarer, TopologyBuilder}
-import backtype.storm.tuple.Fields
-import com.typesafe.config.Config
-import org.slf4j.LoggerFactory
-
-case class StormTopologyCompiler(config: Config, graph: 
AbstractStreamProducerGraph) extends AbstractTopologyCompiler{
-  val LOG = LoggerFactory.getLogger(StormTopologyCompiler.getClass)
-  val boltCache = scala.collection.mutable.Map[StreamProducer, 
StormBoltWrapper]()
-
-  override def buildTopology: AbstractTopologyExecutor ={
-    val builder = new TopologyBuilder();
-    val iter = graph.iterator()
-    val boltDeclarerCache = scala.collection.mutable.Map[String, 
BoltDeclarer]()
-    while(iter.hasNext){
-      val from = iter.next()
-      val fromName = from.name
-      if(graph.isSource(from)){
-        val spout = StormSpoutFactory.createSpout(config, 
from.asInstanceOf[StormSourceProducer])
-        builder.setSpout(fromName, spout, from.parallelism)
-        LOG.info("Spout name : " + fromName + " with parallelism " + 
from.parallelism)
-      } else {
-        LOG.info("Bolt name:" + fromName)
-      }
-
-      val edges = graph.outgoingEdgesOf(from)
-      edges.foreach(sc => {
-        val toName = sc.to.name
-        var boltDeclarer : BoltDeclarer = null
-        val toBolt = createBoltIfAbsent(toName)
-        boltDeclarerCache.get(toName) match{
-          case None => {
-            var finalParallelism = 1
-            graph.getNodeByName(toName) match {
-              case Some(p) => finalParallelism = p.parallelism
-              case None => finalParallelism = 1
-            }
-            boltDeclarer = builder.setBolt(toName, toBolt, finalParallelism);
-            LOG.info("created bolt " + toName + " with parallelism " + 
finalParallelism)
-            boltDeclarerCache.put(toName, boltDeclarer)
-          }
-          case Some(bt) => boltDeclarer = bt
-        }
-        sc.groupByFields match{
-          case Nil => boltDeclarer.shuffleGrouping(fromName)
-          case p => boltDeclarer.fieldsGrouping(fromName, new 
Fields(fields(p)))
-        }
-        LOG.info("bolt connected " + fromName + "->" + toName + " with groupby 
fields " + sc.groupByFields)
-      })
-    }
-    new StormTopologyExecutorImpl(builder.createTopology, config)
-  }
-
-  def fields(fields : Seq[Int]): java.util.List[String] ={
-    val ret = new util.ArrayList[String]
-    fields.map(n => ret.add(OutputFieldNameConst.FIELD_PREFIX + n))
-    ret
-  }
-
-  def createBoltIfAbsent(name : String) : BaseRichBolt = {
-    val producer = graph.getNodeByName(name)
-    producer match{
-      case Some(p) => createBoltIfAbsent(graph, p)
-      case None => throw new IllegalArgumentException("please check bolt name 
" + name)
-    }
-  }
-
-  def createBoltIfAbsent(graph: AbstractStreamProducerGraph, producer : 
StreamProducer): BaseRichBolt ={
-    boltCache.get(producer) match{
-      case Some(bolt) => bolt
-      case None => {
-        StormBoltFactory.getBoltWrapper(graph, producer, config)
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/eagle/datastream/StormTopologyExecutorImpl.scala
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/eagle/datastream/StormTopologyExecutorImpl.scala
 
b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/eagle/datastream/StormTopologyExecutorImpl.scala
deleted file mode 100644
index 656495d..0000000
--- 
a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/eagle/datastream/StormTopologyExecutorImpl.scala
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package eagle.datastream
-
-import backtype.storm.generated.StormTopology
-import backtype.storm.utils.Utils
-import backtype.storm.{Config, LocalCluster, StormSubmitter}
-import storm.trident.spout.RichSpoutBatchExecutor
-
-case class StormTopologyExecutorImpl(topology: StormTopology, config: 
com.typesafe.config.Config) extends AbstractTopologyExecutor {
-  @throws(classOf[Exception])
-  def execute {
-    val localMode: Boolean = 
config.getString("envContextConfig.mode").equalsIgnoreCase("local")
-    val conf: Config = new Config
-    conf.put(RichSpoutBatchExecutor.MAX_BATCH_SIZE_CONF, Int.box(64 * 1024))
-    conf.put(Config.TOPOLOGY_RECEIVER_BUFFER_SIZE, Int.box(8))
-    conf.put(Config.TOPOLOGY_TRANSFER_BUFFER_SIZE, Int.box(32))
-    conf.put(Config.TOPOLOGY_EXECUTOR_RECEIVE_BUFFER_SIZE, Int.box(16384))
-    conf.put(Config.TOPOLOGY_EXECUTOR_SEND_BUFFER_SIZE, Int.box(16384))
-
-    val topologyName = config.getString("envContextConfig.topologyName")
-    if (!localMode) {
-      StormSubmitter.submitTopologyWithProgressBar(topologyName, conf, 
topology)
-    }
-    else {
-      val cluster: LocalCluster = new LocalCluster
-      cluster.submitTopology(topologyName, conf, topology)
-      Utils.sleep(Integer.MAX_VALUE)
-      cluster.killTopology(topologyName)
-      cluster.shutdown
-    }
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/eagle/datastream/StreamAlertExpansion.scala
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/eagle/datastream/StreamAlertExpansion.scala
 
b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/eagle/datastream/StreamAlertExpansion.scala
deleted file mode 100644
index e6a4012..0000000
--- 
a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/eagle/datastream/StreamAlertExpansion.scala
+++ /dev/null
@@ -1,192 +0,0 @@
-/*
- *
- *  * Licensed to the Apache Software Foundation (ASF) under one or more
- *  * contributor license agreements.  See the NOTICE file distributed with
- *  * this work for additional information regarding copyright ownership.
- *  * The ASF licenses this file to You under the Apache License, Version 2.0
- *  * (the "License"); you may not use this file except in compliance with
- *  * the License.  You may obtain a copy of the License at
- *  *
- *  *    http://www.apache.org/licenses/LICENSE-2.0
- *  *
- *  * Unless required by applicable law or agreed to in writing, software
- *  * distributed under the License is distributed on an "AS IS" BASIS,
- *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  * See the License for the specific language governing permissions and
- *  * limitations under the License.
- *
- */
-
-package eagle.datastream
-
-import java.util
-
-import com.typesafe.config.Config
-import eagle.alert.dao.AlertDefinitionDAOImpl
-import eagle.executor.AlertExecutorCreationUtils
-import org.jgrapht.experimental.dag.DirectedAcyclicGraph
-import org.slf4j.LoggerFactory
-
-import scala.collection.JavaConversions._
-import scala.collection.mutable.ListBuffer
-
-/**
- * The constraints for alert is:
- * 1. only 3 StreamProducers can be put immediately before MapProducer, 
FlatMapProducer, StreamUnionProducer
- * 2. For StreamUnionProducer, the only supported unioned producers are 
MapProducer and FlatMapProducer
- * 3. the output for MapProducer and FlatMapProducer is 2-field tuple, key and 
value, key is string, value has to be SortedMap
- * 4. the framework will wrapper original MapProducer and FlatMapProducer to 
emit 3-field tuple, {key, streamName and value}
- * 5. the framework will automatically partition traffic with first field
- *
- *
- * 2 steps
- * step 1: wrapper previous StreamProducer with one more field "streamName"
- * step 2: partition alert executor by policy partitioner class
- */
-
-class StreamAlertExpansion(config: Config) extends StreamDAGExpansion(config) {
-  val LOG = LoggerFactory.getLogger(classOf[StreamAlertExpansion])
-
-  override def expand(dag: DirectedAcyclicGraph[StreamProducer, 
StreamConnector]): Unit ={
-    val iter = dag.iterator()
-    val toBeAddedEdges = new ListBuffer[StreamConnector]
-    val toBeRemovedVertex = new ListBuffer[StreamProducer]
-    while(iter.hasNext) {
-      val current = iter.next()
-      dag.outgoingEdgesOf(current).foreach(edge => {
-        val child = edge.to
-        onIteration(toBeAddedEdges, toBeRemovedVertex, dag, current, child)
-      })
-    }
-    // add back edges
-    toBeAddedEdges.foreach(e => {
-      dag.addVertex(e.from)
-      dag.addVertex(e.to)
-      dag.addEdge(e.from, e.to, e)
-    })
-    toBeRemovedVertex.foreach(v => dag.removeVertex(v))
-  }
-
-  def onIteration(toBeAddedEdges: ListBuffer[StreamConnector], 
toBeRemovedVertex: ListBuffer[StreamProducer],
-               dag: DirectedAcyclicGraph[StreamProducer, StreamConnector], 
current: StreamProducer, child: StreamProducer): Unit = {
-    child match {
-      case AlertStreamSink(id, upStreamNames, alertExecutorId, withConsumer) 
=> {
-        /**
-         * step 1: wrapper previous StreamProducer with one more field 
"streamName"
-         * for AlertStreamSink, we check previous StreamProducer and replace 
that
-         */
-        val newStreamProducers = new ListBuffer[StreamProducer]
-        current match {
-          case StreamUnionProducer(id, others) => {
-            val incomingEdges = dag.incomingEdgesOf(current)
-            incomingEdges.foreach(e => newStreamProducers += 
replace(toBeAddedEdges, toBeRemovedVertex, dag, e.from, upStreamNames.get(0)))
-            var i: Int = 1
-            others.foreach(o => {
-              newStreamProducers += replace(toBeAddedEdges, toBeRemovedVertex, 
dag, o, upStreamNames.get(i))
-              i += 1
-            })
-          }
-          case _: FlatMapProducer[AnyRef, AnyRef] => {
-            newStreamProducers += replace(toBeAddedEdges, toBeRemovedVertex, 
dag, current, upStreamNames.get(0))
-          }
-          case _: MapProducer => {
-            newStreamProducers += replace(toBeAddedEdges, toBeRemovedVertex, 
dag, current, upStreamNames.get(0))
-          }
-          case s: StreamProducer if dag.inDegreeOf(s) == 0 => {
-            newStreamProducers += replace(toBeAddedEdges, toBeRemovedVertex, 
dag, current, upStreamNames.get(0))
-          }
-          case p@_ => throw new IllegalStateException(s"$p can not be put 
before AlertStreamSink, only StreamUnionProducer,FlatMapProducer and 
MapProducer are supported")
-        }
-
-        /**
-         * step 2: partition alert executor by policy partitioner class
-         */
-        val alertExecutors = 
AlertExecutorCreationUtils.createAlertExecutors(config, new 
AlertDefinitionDAOImpl(config), upStreamNames, alertExecutorId)
-        var alertProducers = new 
scala.collection.mutable.MutableList[StreamProducer]
-        alertExecutors.foreach(exec => {
-          val t = FlatMapProducer(UniqueId.incrementAndGetId(), 
exec).withName(exec.getAlertExecutorId() + "_" + exec.getPartitionSeq())
-          t.setConfig(config)
-          t.setGraph(dag)
-          alertProducers += t
-          newStreamProducers.foreach(newsp => toBeAddedEdges += 
StreamConnector(newsp, t).groupBy(Seq(0)))
-        })
-
-        // remove AlertStreamSink
-        toBeRemovedVertex += child
-
-        // add alert consumer if necessary
-        if (withConsumer) {
-          AlertExecutorConsumerUtils.setupAlertConsumers(toBeAddedEdges, 
alertProducers.toList)
-        }
-      }
-      case _ =>
-    }
-  }
-
-  private def replace(toBeAddedEdges: ListBuffer[StreamConnector], 
toBeRemovedVertex: ListBuffer[StreamProducer],
-                      dag: DirectedAcyclicGraph[StreamProducer, 
StreamConnector], current: StreamProducer, upStreamName: String) : 
StreamProducer= {
-    var newsp: StreamProducer = null
-    current match {
-      case _: FlatMapProducer[AnyRef, AnyRef] => {
-        val mapper = current.asInstanceOf[FlatMapProducer[_, _]].mapper
-        mapper match {
-          case a: JavaStormStreamExecutor[EagleTuple] => {
-            val newmapper = new 
JavaStormExecutorForAlertWrapper(a.asInstanceOf[JavaStormStreamExecutor[Tuple2[String,
 util.SortedMap[AnyRef, AnyRef]]]], upStreamName)
-            newsp = FlatMapProducer(UniqueId.incrementAndGetId(), newmapper)
-            newsp.setGraph(dag)
-            newsp.setConfig(config)
-          }
-          case b: StormStreamExecutor[EagleTuple] => {
-            val newmapper = 
StormExecutorForAlertWrapper(b.asInstanceOf[StormStreamExecutor[Tuple2[String, 
util.SortedMap[AnyRef, AnyRef]]]], upStreamName)
-            newsp = FlatMapProducer(UniqueId.incrementAndGetId(), newmapper)
-            newsp.setGraph(dag)
-            newsp.setConfig(config)
-          }
-          case _ => throw new IllegalArgumentException
-        }
-        // remove old StreamProducer and replace that with new StreamProducer
-        val incomingEdges = dag.incomingEdgesOf(current)
-        incomingEdges.foreach(e => toBeAddedEdges += StreamConnector(e.from, 
newsp))
-        val outgoingEdges = dag.outgoingEdgesOf(current)
-        outgoingEdges.foreach(e => toBeAddedEdges += StreamConnector(newsp, 
e.to))
-        toBeRemovedVertex += current
-      }
-      case _: MapProducer => {
-        val mapper = current.asInstanceOf[MapProducer].fn
-        val newfun: (AnyRef => AnyRef) = {
-          a => mapper(a) match {
-            case scala.Tuple2(x1, x2) => (x1, upStreamName, x2)
-            case _ => throw new IllegalArgumentException
-          }
-        }
-        current match {
-          case MapProducer(id, 2, fn) => newsp = 
MapProducer(UniqueId.incrementAndGetId(), 3, newfun)
-          case _ => throw new IllegalArgumentException
-        }
-        val incomingEdges = dag.incomingEdgesOf(current)
-        incomingEdges.foreach(e => toBeAddedEdges += StreamConnector(e.from, 
newsp))
-        val outgoingEdges = dag.outgoingEdgesOf(current)
-        outgoingEdges.foreach(e => toBeAddedEdges += StreamConnector(newsp, 
e.to))
-        toBeRemovedVertex += current
-      }
-      case s: StreamProducer if dag.inDegreeOf(s) == 0 => {
-        val fn:(AnyRef => AnyRef) = {
-          n => {
-            n match {
-              case scala.Tuple3 => n
-              case scala.Tuple2(x1,x2) => (x1,upStreamName,x2)
-              case scala.Tuple1(x1) => (if(x1 == null) null else 
x1.hashCode(),upStreamName,x1)
-              case _ => (if(n == null) null else n.hashCode(),upStreamName,n)
-            }
-          }
-        }
-        newsp = MapProducer(UniqueId.incrementAndGetId(),3,fn)
-        toBeAddedEdges += StreamConnector(current,newsp)
-        val outgoingEdges = dag.outgoingEdgesOf(current)
-        outgoingEdges.foreach(e => toBeAddedEdges += 
StreamConnector(newsp,e.to))
-      }
-      case _ => throw new IllegalArgumentException("Only FlatMapProducer and 
MapProducer can be replaced before AlertStreamSink")
-    }
-    newsp
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/eagle/datastream/StreamAppDSL.scala
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/eagle/datastream/StreamAppDSL.scala
 
b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/eagle/datastream/StreamAppDSL.scala
deleted file mode 100644
index 0a2e5ba..0000000
--- 
a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/eagle/datastream/StreamAppDSL.scala
+++ /dev/null
@@ -1,120 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package eagle.datastream
-
-import backtype.storm.topology.base.BaseRichSpout
-import com.typesafe.config._
-import eagle.dataproc.impl.storm.AbstractStormSpoutProvider
-import eagle.dataproc.util.ConfigOptionParser
-
-import scala.reflect.runtime.universe._
-
-/**
- * @since  11/6/15
- */
-trait ConfigContext{
-  def set(config:Config)
-  def config:Config
-
-  def set[T<:AnyRef](key:String,value:T): Unit = {
-    set(config.withValue(key,ConfigValueFactory.fromAnyRef(value)))
-  }
-
-  /**
-   *
-   * @param key config key
-   * @param default default value
-   * @tparam T return type
-   * @return
-   */
-  def get[T](key:String,default:T=null)(implicit tag:TypeTag[T]):T = {
-    if(config.hasPath(key)) {
-      get(key)
-    } else default
-  }
-
-  def get[T](key:String)(implicit tag: TypeTag[T]):T = tag.tpe match {
-    case STRING_TYPE => config.getString(key).asInstanceOf[T]
-    case TypeTag.Double => config.getDouble(key).asInstanceOf[T]
-    case TypeTag.Long => config.getLong(key).asInstanceOf[T]
-    case TypeTag.Int => config.getInt(key).asInstanceOf[T]
-    case TypeTag.Byte => config.getBytes(key).asInstanceOf[T]
-    case TypeTag.Boolean => config.getBoolean(key).asInstanceOf[T]
-    case NUMBER_TYPE => config.getNumber(key).asInstanceOf[T]
-    case OBJECT_TYPE => config.getObject(key).asInstanceOf[T]
-    case VALUE_TYPE => config.getValue(key).asInstanceOf[T]
-    case ANY_REF_TYPE => config.getAnyRef(key).asInstanceOf[T]
-    case INT_LIST_TYPE => config.getIntList(key).asInstanceOf[T]
-    case DOUBLE_LIST_TYPE => config.getDoubleList(key).asInstanceOf[T]
-    case BOOL_LIST_TYPE => config.getBooleanList(key).asInstanceOf[T]
-    case LONG_LIST_TYPE => config.getLongList(key).asInstanceOf[T]
-    case _ => throw new UnsupportedOperationException(s"$tag is not supported 
yet")
-  }
-
-  val STRING_TYPE = typeOf[String]
-  val NUMBER_TYPE = typeOf[Number]
-  val INT_LIST_TYPE = typeOf[List[Int]]
-  val BOOL_LIST_TYPE = typeOf[List[Boolean]]
-  val DOUBLE_LIST_TYPE = typeOf[List[Double]]
-  val LONG_LIST_TYPE = typeOf[List[Double]]
-  val OBJECT_TYPE = typeOf[ConfigObject]
-  val VALUE_TYPE = typeOf[ConfigValue]
-  val ANY_REF_TYPE = typeOf[AnyRef]
-}
-
-/**
- * Stream APP DSL
- * @tparam E
- */
-trait StreamApp[+E<:ExecutionEnvironment] extends App with ConfigContext{
-  private var _executed = false
-  private var _config:Config = null
-
-  override def config:Config = _config
-
-  override def set(config:Config) = _config = config
-
-  def env:E
-
-  def execute() {
-    env.execute()
-    _executed = true
-  }
-
-  override def main(args: Array[String]): Unit = {
-    _config = new ConfigOptionParser().load(args)
-    super.main(args)
-    if(!_executed) execute()
-  }
-}
-
-trait StormStreamApp extends StreamApp[StormExecutionEnvironment]{
-  private var _env:StormExecutionEnvironment = null
-  def source(sourceProvider: AbstractStormSpoutProvider) = {
-    val spout = sourceProvider.getSpout(config)
-    env.newSource(spout)
-  }
-
-  def source(spout:BaseRichSpout) = env.newSource(spout)
-
-  override def env:StormExecutionEnvironment = {
-    if(_env == null){
-      _env = ExecutionEnvironmentFactory.getStorm(config)
-    }
-    _env
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/eagle/datastream/StreamConnector.scala
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/eagle/datastream/StreamConnector.scala
 
b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/eagle/datastream/StreamConnector.scala
deleted file mode 100644
index ceeb411..0000000
--- 
a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/eagle/datastream/StreamConnector.scala
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- *
- *  * Licensed to the Apache Software Foundation (ASF) under one or more
- *  * contributor license agreements.  See the NOTICE file distributed with
- *  * this work for additional information regarding copyright ownership.
- *  * The ASF licenses this file to You under the Apache License, Version 2.0
- *  * (the "License"); you may not use this file except in compliance with
- *  * the License.  You may obtain a copy of the License at
- *  *
- *  *    http://www.apache.org/licenses/LICENSE-2.0
- *  *
- *  * Unless required by applicable law or agreed to in writing, software
- *  * distributed under the License is distributed on an "AS IS" BASIS,
- *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  * See the License for the specific language governing permissions and
- *  * limitations under the License.
- *
- */
-package eagle.datastream
-
-case class StreamConnector(from: StreamProducer, to: StreamProducer) {
-  var groupByFields : Seq[Int] = Nil
-
-  def groupBy(fields : Seq[Int]) : StreamConnector = {
-    groupByFields = fields
-    this
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/eagle/datastream/StreamDAGExpansion.scala
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/eagle/datastream/StreamDAGExpansion.scala
 
b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/eagle/datastream/StreamDAGExpansion.scala
deleted file mode 100644
index baa514e..0000000
--- 
a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/eagle/datastream/StreamDAGExpansion.scala
+++ /dev/null
@@ -1,27 +0,0 @@
-/*
- *
- *  * Licensed to the Apache Software Foundation (ASF) under one or more
- *  * contributor license agreements.  See the NOTICE file distributed with
- *  * this work for additional information regarding copyright ownership.
- *  * The ASF licenses this file to You under the Apache License, Version 2.0
- *  * (the "License"); you may not use this file except in compliance with
- *  * the License.  You may obtain a copy of the License at
- *  *
- *  *    http://www.apache.org/licenses/LICENSE-2.0
- *  *
- *  * Unless required by applicable law or agreed to in writing, software
- *  * distributed under the License is distributed on an "AS IS" BASIS,
- *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  * See the License for the specific language governing permissions and
- *  * limitations under the License.
- *
- */
-
-package eagle.datastream
-
-import com.typesafe.config.Config
-import org.jgrapht.experimental.dag.DirectedAcyclicGraph
-
-abstract class StreamDAGExpansion(config: Config) {
-  def expand(dag: DirectedAcyclicGraph[StreamProducer, StreamConnector])
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/eagle/datastream/StreamGroupbyExpansion.scala
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/eagle/datastream/StreamGroupbyExpansion.scala
 
b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/eagle/datastream/StreamGroupbyExpansion.scala
deleted file mode 100644
index 3bed891..0000000
--- 
a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/eagle/datastream/StreamGroupbyExpansion.scala
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- *
- *  * Licensed to the Apache Software Foundation (ASF) under one or more
- *  * contributor license agreements.  See the NOTICE file distributed with
- *  * this work for additional information regarding copyright ownership.
- *  * The ASF licenses this file to You under the Apache License, Version 2.0
- *  * (the "License"); you may not use this file except in compliance with
- *  * the License.  You may obtain a copy of the License at
- *  *
- *  *    http://www.apache.org/licenses/LICENSE-2.0
- *  *
- *  * Unless required by applicable law or agreed to in writing, software
- *  * distributed under the License is distributed on an "AS IS" BASIS,
- *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  * See the License for the specific language governing permissions and
- *  * limitations under the License.
- *
- */
-
-package eagle.datastream
-
-import com.typesafe.config.Config
-import org.jgrapht.experimental.dag.DirectedAcyclicGraph
-import org.slf4j.LoggerFactory
-import scala.collection.JavaConversions._
-import scala.collection.mutable.ListBuffer
-
-/**
- * Replace GroupByProducer(Vertex) with StreamConnector (Edge)
- * @param config context configuration
- */
-class StreamGroupbyExpansion(config: Config) extends 
StreamDAGExpansion(config){
-  val LOG = LoggerFactory.getLogger(classOf[StreamGroupbyExpansion])
-
-  override def expand(dag: DirectedAcyclicGraph[StreamProducer, 
StreamConnector]) = {
-    val iter = dag.iterator()
-    var toBeAddedEdges = new ListBuffer[StreamConnector]
-    var toBeRemovedVertex = new ListBuffer[StreamProducer]
-    while(iter.hasNext) {
-      val current = iter.next()
-      dag.outgoingEdgesOf(current).foreach(edge => {
-        val child = edge.to
-        child match {
-          case p : GroupByProducer => {
-            dag.outgoingEdgesOf(p).foreach(c2 => {
-              toBeAddedEdges += StreamConnector(current, 
c2.to).groupBy(p.fields)
-            })
-            toBeRemovedVertex += p
-          }
-          case _ =>
-        }
-      })
-    }
-
-    // add back edges
-    toBeAddedEdges.foreach(e => dag.addEdge(e.from, e.to, e))
-    toBeRemovedVertex.foreach(v => dag.removeVertex(v))
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/eagle/datastream/StreamNameExpansion.scala
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/eagle/datastream/StreamNameExpansion.scala
 
b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/eagle/datastream/StreamNameExpansion.scala
deleted file mode 100644
index 40abfd8..0000000
--- 
a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/eagle/datastream/StreamNameExpansion.scala
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- *
- *  * Licensed to the Apache Software Foundation (ASF) under one or more
- *  * contributor license agreements.  See the NOTICE file distributed with
- *  * this work for additional information regarding copyright ownership.
- *  * The ASF licenses this file to You under the Apache License, Version 2.0
- *  * (the "License"); you may not use this file except in compliance with
- *  * the License.  You may obtain a copy of the License at
- *  *
- *  *    http://www.apache.org/licenses/LICENSE-2.0
- *  *
- *  * Unless required by applicable law or agreed to in writing, software
- *  * distributed under the License is distributed on an "AS IS" BASIS,
- *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  * See the License for the specific language governing permissions and
- *  * limitations under the License.
- *
- */
-
-package eagle.datastream
-
-import com.typesafe.config.Config
-import org.jgrapht.experimental.dag.DirectedAcyclicGraph
-import org.slf4j.LoggerFactory
-
-/**
- * to set name for each StreamProducer
- * 1. if name is given programatically, then use this name
- * 2. otherwise use name generated by scala internally
- */
-class StreamNameExpansion(config: Config) extends StreamDAGExpansion(config){
-  val LOG = LoggerFactory.getLogger(classOf[StreamNameExpansion])
-
-  override def expand(dag: DirectedAcyclicGraph[StreamProducer, 
StreamConnector]) = {
-    val iter = dag.iterator()
-    while(iter.hasNext){
-      val sp = iter.next()
-      sp.name = NodeNameSelector(sp).getName
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/eagle/datastream/StreamParallelismConfigExpansion.scala
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/eagle/datastream/StreamParallelismConfigExpansion.scala
 
b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/eagle/datastream/StreamParallelismConfigExpansion.scala
deleted file mode 100644
index c3c4533..0000000
--- 
a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/eagle/datastream/StreamParallelismConfigExpansion.scala
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- *
- *  * Licensed to the Apache Software Foundation (ASF) under one or more
- *  * contributor license agreements.  See the NOTICE file distributed with
- *  * this work for additional information regarding copyright ownership.
- *  * The ASF licenses this file to You under the Apache License, Version 2.0
- *  * (the "License"); you may not use this file except in compliance with
- *  * the License.  You may obtain a copy of the License at
- *  *
- *  *    http://www.apache.org/licenses/LICENSE-2.0
- *  *
- *  * Unless required by applicable law or agreed to in writing, software
- *  * distributed under the License is distributed on an "AS IS" BASIS,
- *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  * See the License for the specific language governing permissions and
- *  * limitations under the License.
- *
- */
-
-package eagle.datastream
-
-import java.util.regex.Pattern
-
-import com.typesafe.config.{ConfigValue, ConfigObject, Config}
-import org.jgrapht.experimental.dag.DirectedAcyclicGraph
-import org.slf4j.LoggerFactory
-import scala.collection.JavaConverters._
-
-class StreamParallelismConfigExpansion(config: Config) extends 
StreamDAGExpansion(config){
-  val LOG = LoggerFactory.getLogger(classOf[StreamParallelismConfigExpansion])
-
-  override def expand(dag: DirectedAcyclicGraph[StreamProducer, 
StreamConnector]) = {
-    val map = getParallelismMap(config)
-    val iter = dag.iterator()
-    while(iter.hasNext){
-      val streamProducer = iter.next()
-      if(streamProducer.name != null) {
-        map.foreach(tuple => {
-          tuple._1.matcher(streamProducer.name).find() match {
-            case true => streamProducer.parallelism = tuple._2
-            case false =>
-          }
-        })
-      }
-    }
-  }
-
-  private def getParallelismMap(config: Config) : Map[Pattern, Int]= {
-    val parallelismConfig: ConfigObject = 
config.getObject("envContextConfig.parallelismConfig")
-    LOG.info("Found parallelismConfig ? " + (if (parallelismConfig == null) 
"no" else "yes"))
-    parallelismConfig.asScala.toMap map {
-      case (name, value) => (Pattern.compile(name), 
value.asInstanceOf[ConfigValue].unwrapped().asInstanceOf[Int])
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/eagle/datastream/StreamProducer.scala
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/eagle/datastream/StreamProducer.scala
 
b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/eagle/datastream/StreamProducer.scala
deleted file mode 100644
index 8485e28..0000000
--- 
a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/eagle/datastream/StreamProducer.scala
+++ /dev/null
@@ -1,193 +0,0 @@
-/*
- *
- *  * Licensed to the Apache Software Foundation (ASF) under one or more
- *  * contributor license agreements.  See the NOTICE file distributed with
- *  * this work for additional information regarding copyright ownership.
- *  * The ASF licenses this file to You under the Apache License, Version 2.0
- *  * (the "License"); you may not use this file except in compliance with
- *  * the License.  You may obtain a copy of the License at
- *  *
- *  *    http://www.apache.org/licenses/LICENSE-2.0
- *  *
- *  * Unless required by applicable law or agreed to in writing, software
- *  * distributed under the License is distributed on an "AS IS" BASIS,
- *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  * See the License for the specific language governing permissions and
- *  * limitations under the License.
- *
- */
-package eagle.datastream
-
-import java.util
-import java.util.concurrent.atomic.AtomicInteger
-
-import backtype.storm.topology.base.BaseRichSpout
-import com.typesafe.config.Config
-import org.jgrapht.experimental.dag.DirectedAcyclicGraph
-import scala.collection.JavaConversions._
-import scala.collection.JavaConverters._
-
-/**
- * StreamProducer is the base class for all other concrete StreamProducer
- * It defines high level API for user to organize data stream flow
- *
- * StreamProducer is independent of execution environment
- */
-
-trait StreamProducer{
-  var name: String = null
-  var parallelism: Int = 1
-  var graph: DirectedAcyclicGraph[StreamProducer, StreamConnector] = null
-  var config: Config = null
-
-  private def incrementAndGetId() = UniqueId.incrementAndGetId()
-
-  def setGraph(graph: DirectedAcyclicGraph[StreamProducer, StreamConnector]): 
Unit = this.graph = graph
-  def getGraph: DirectedAcyclicGraph[StreamProducer, StreamConnector] = graph
-  def setConfig(config: Config) : Unit = this.config = config
-  def getConfig: Config = config
-
-  def filter(fn : AnyRef => Boolean): StreamProducer ={
-    val ret = FilterProducer(incrementAndGetId(), fn)
-    hookupDAG(graph, this, ret)
-    ret
-  }
-
-  def flatMap[T, R](mapper : FlatMapper[T, R]) : StreamProducer = {
-    val ret = FlatMapProducer(incrementAndGetId(), mapper)
-    hookupDAG(graph, this, ret)
-    ret
-  }
-
-  def map1(fn : AnyRef => AnyRef) : StreamProducer = {
-    val ret = MapProducer(incrementAndGetId(), 1, fn)
-    hookupDAG(graph, this, ret)
-    ret
-  }
-
-  def map2(fn : AnyRef => AnyRef) : StreamProducer = {
-    val ret = MapProducer(incrementAndGetId(), 2, fn)
-    hookupDAG(graph, this, ret)
-    ret
-  }
-
-  def map3(fn : AnyRef => AnyRef) : StreamProducer = {
-    val ret = MapProducer(incrementAndGetId(), 3, fn)
-    hookupDAG(graph, this, ret)
-    ret
-  }
-
-  def map4(fn : AnyRef => AnyRef) : StreamProducer = {
-    val ret = MapProducer(incrementAndGetId(), 4, fn)
-    hookupDAG(graph, this, ret)
-    ret
-  }
-
-  /**
-   * starting from 0, groupby operator would be upon edge of the graph
-   */
-  def groupBy(fields : Int*) : StreamProducer = {
-    // validate each field index is greater or equal to 0
-    fields.foreach(n => if(n<0) throw new IllegalArgumentException("field 
index should be always >= 0"))
-    val ret = GroupByProducer(incrementAndGetId(), fields)
-    hookupDAG(graph, this, ret)
-    ret
-  }
-
-  //groupBy java version, starting from 1
-  def groupBy(fields : java.util.List[Integer]) : StreamProducer = {
-    // validate each field index is greater or equal to 0
-    fields.foreach(n => if(n<0) throw new IllegalArgumentException("field 
index should be always >= 0"))
-    val ret = GroupByProducer(incrementAndGetId(), 
fields.asScala.toSeq.asInstanceOf[Seq[Int]])
-    hookupDAG(graph, this, ret)
-    ret
-  }
-
-  def streamUnion(others : Seq[StreamProducer]) : StreamProducer = {
-    val ret = StreamUnionProducer(incrementAndGetId(), others)
-    hookupDAG(graph, this, ret)
-    ret
-  }
-
-  /**
-   * alert is always sink of data flow
-   */
-  def alertWithConsumer(upStreamNames: util.List[String], alertExecutorId : 
String) = {
-    alert(upStreamNames, alertExecutorId, true)
-  }
-
-  def alertWithoutConsumer(upStreamNames: util.List[String], alertExecutorId : 
String) = {
-    alert(upStreamNames, alertExecutorId, false)
-  }
-
-  def alert(upStreamNames: util.List[String], alertExecutorId : String, 
withConsumer: Boolean=true) = {
-    val ret = AlertStreamSink(incrementAndGetId(), upStreamNames, 
alertExecutorId, withConsumer)
-    hookupDAG(graph, this, ret)
-  }
-
-  def alertWithConsumer(upStreamName: String, alertExecutorId : String): Unit 
={
-    alert(util.Arrays.asList(upStreamName), alertExecutorId, true)
-  }
-
-  def alertWithoutConsumer(upStreamName: String, alertExecutorId : String): 
Unit ={
-    alert(util.Arrays.asList(upStreamName), alertExecutorId, false)
-  }
-
-  def hookupDAG(graph: DirectedAcyclicGraph[StreamProducer, StreamConnector], 
current: StreamProducer, next: StreamProducer) = {
-    current.getGraph.addVertex(next)
-    current.getGraph.addEdge(current, next, StreamConnector(current, next))
-    passOnContext(current, next)
-  }
-
-  private def passOnContext(current: StreamProducer, next: StreamProducer): 
Unit ={
-    next.graph = current.graph
-    next.config = current.config
-  }
-
-  /**
-   * can be set by programatically or by configuration
-   */
-  def withParallelism(parallelism : Int) : StreamProducer = {
-    this.parallelism = parallelism
-    this
-  }
-
-  def withName(name : String) : StreamProducer = {
-    this.name = name
-    this
-  }
-}
-
-case class FilterProducer(id: Int, fn : AnyRef => Boolean) extends 
StreamProducer
-
-case class FlatMapProducer[T, R](id: Int, var mapper: FlatMapper[T, R]) 
extends StreamProducer {
-  override def toString() = mapper.toString + "_" + id
-}
-
-case class MapProducer(id: Int, numOutputFields : Int, var fn : AnyRef => 
AnyRef) extends StreamProducer
-
-case class GroupByProducer(id: Int, fields : Seq[Int]) extends StreamProducer
-
-case class StreamUnionProducer(id: Int, others: Seq[StreamProducer]) extends 
StreamProducer
-
-case class StormSourceProducer(id: Int, source : BaseRichSpout) extends 
StreamProducer{
-  var numFields : Int = 0
-  /**
-    * rename outputfields to f0, f1, f2, ...
-   * if one spout declare some field names, those fields names will be modified
-   * @param n
-   */
-  def renameOutputFields(n : Int): StormSourceProducer ={
-    this.numFields = n
-    this
-  }
-}
-
-case class AlertStreamSink(id: Int, upStreamNames: util.List[String], 
alertExecutorId : String, withConsumer: Boolean=true) extends StreamProducer
-
-object UniqueId{
-  val id : AtomicInteger = new AtomicInteger(0);
-  def incrementAndGetId() : Int = {
-    id.incrementAndGet()
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/eagle/datastream/StreamUnionExpansion.scala
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/eagle/datastream/StreamUnionExpansion.scala
 
b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/eagle/datastream/StreamUnionExpansion.scala
deleted file mode 100644
index b68d213..0000000
--- 
a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/eagle/datastream/StreamUnionExpansion.scala
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- *
- *  * Licensed to the Apache Software Foundation (ASF) under one or more
- *  * contributor license agreements.  See the NOTICE file distributed with
- *  * this work for additional information regarding copyright ownership.
- *  * The ASF licenses this file to You under the Apache License, Version 2.0
- *  * (the "License"); you may not use this file except in compliance with
- *  * the License.  You may obtain a copy of the License at
- *  *
- *  *    http://www.apache.org/licenses/LICENSE-2.0
- *  *
- *  * Unless required by applicable law or agreed to in writing, software
- *  * distributed under the License is distributed on an "AS IS" BASIS,
- *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  * See the License for the specific language governing permissions and
- *  * limitations under the License.
- *
- */
-
-package eagle.datastream
-
-import com.typesafe.config.Config
-import org.jgrapht.experimental.dag.DirectedAcyclicGraph
-import org.slf4j.LoggerFactory
-import scala.collection.JavaConversions._
-import scala.collection.mutable.ListBuffer
-
-/**
- * union operator should be expanded
- */
-class StreamUnionExpansion(config: Config) extends StreamDAGExpansion(config){
-  val LOG = LoggerFactory.getLogger(classOf[StreamUnionExpansion])
-
-  override def expand(dag: DirectedAcyclicGraph[StreamProducer, 
StreamConnector]) = {
-    val iter = dag.iterator()
-    var toBeAddedEdges = new ListBuffer[StreamConnector]
-    var toBeRemovedVertex = new ListBuffer[StreamProducer]
-    while(iter.hasNext) {
-      val current = iter.next()
-      dag.outgoingEdgesOf(current).foreach(edge => {
-        val child = edge.to
-        val groupByFields = edge.groupByFields;
-        child match {
-          case StreamUnionProducer(id, others) => {
-            dag.outgoingEdgesOf(child).foreach(c2 => {
-              toBeAddedEdges += StreamConnector(current, 
c2.to).groupBy(groupByFields)
-              others.foreach(o => {
-                toBeAddedEdges += StreamConnector(o, 
c2.to).groupBy(groupByFields)
-              })
-            })
-            toBeRemovedVertex += child
-          }
-          case _ =>
-        }
-      })
-    }
-
-    // add back edges
-    toBeAddedEdges.foreach(e => dag.addEdge(e.from, e.to, e))
-    toBeRemovedVertex.foreach(v => dag.removeVertex(v))
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/eagle/datastream/UnionUtils.scala
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/eagle/datastream/UnionUtils.scala
 
b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/eagle/datastream/UnionUtils.scala
deleted file mode 100644
index fe914f1..0000000
--- 
a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/eagle/datastream/UnionUtils.scala
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- *
- *  * Licensed to the Apache Software Foundation (ASF) under one or more
- *  * contributor license agreements.  See the NOTICE file distributed with
- *  * this work for additional information regarding copyright ownership.
- *  * The ASF licenses this file to You under the Apache License, Version 2.0
- *  * (the "License"); you may not use this file except in compliance with
- *  * the License.  You may obtain a copy of the License at
- *  *
- *  *    http://www.apache.org/licenses/LICENSE-2.0
- *  *
- *  * Unless required by applicable law or agreed to in writing, software
- *  * distributed under the License is distributed on an "AS IS" BASIS,
- *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  * See the License for the specific language governing permissions and
- *  * limitations under the License.
- *
- */
-package eagle.datastream
-
-import java.util
-
-import scala.collection.JavaConverters._
-
-object UnionUtils {
-  def join(producers : StreamProducer*) : StreamProducer = {
-    producers.head.streamUnion(producers.drop(1))
-  }
-
-  def join(producers : java.util.List[StreamProducer]) : StreamProducer = {
-    val newList = new util.ArrayList(producers)
-    val head = newList.get(0)
-    newList.remove(0)
-    head.streamUnion(newList.asScala);
-  }
-
-  def join(producers : List[StreamProducer]) : StreamProducer = {
-    val head = producers.head
-    head.streamUnion(producers.tail);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/eagle/datastream/kafka/JsonMessageDeserializer.scala
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/eagle/datastream/kafka/JsonMessageDeserializer.scala
 
b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/eagle/datastream/kafka/JsonMessageDeserializer.scala
deleted file mode 100644
index 1e735c5..0000000
--- 
a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/eagle/datastream/kafka/JsonMessageDeserializer.scala
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package eagle.datastream.kafka
-
-import java.io.IOException
-import java.util
-import java.util.Properties
-
-import com.fasterxml.jackson.databind.ObjectMapper
-import eagle.dataproc.impl.storm.kafka.SpoutKafkaMessageDeserializer
-import org.slf4j.{Logger, LoggerFactory}
-
-/**
- * @since  11/6/15
- */
-case class JsonMessageDeserializer(props:Properties) extends 
SpoutKafkaMessageDeserializer{
-  private val objectMapper: ObjectMapper = new ObjectMapper
-  private val LOG: Logger = 
LoggerFactory.getLogger(classOf[JsonMessageDeserializer])
-
-  override def deserialize(bytes: Array[Byte]): AnyRef = {
-    var map: util.Map[String, _] = null
-    try {
-      map = objectMapper.readValue(bytes, classOf[util.TreeMap[String, _]])
-    } catch {
-      case e: IOException => {
-        LOG.error("Failed to deserialize json from: " + new String(bytes), e)
-      }
-    }
-    map
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/eagle/datastream/kafka/KafkaStreamMonitor.scala
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/eagle/datastream/kafka/KafkaStreamMonitor.scala
 
b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/eagle/datastream/kafka/KafkaStreamMonitor.scala
deleted file mode 100644
index bb8fb56..0000000
--- 
a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/eagle/datastream/kafka/KafkaStreamMonitor.scala
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package eagle.datastream.kafka
-
-import eagle.dataproc.impl.storm.kafka.KafkaSourcedSpoutProvider
-import eagle.datastream.StormStreamApp
-
-/**
- * @since  11/6/15
- */
-class KafkaStreamMonitorApp extends StormStreamApp{
-  val streamName = get[String]("eagle.stream.name","eventStream")
-  val streamExecutorId = 
get[String]("eagle.stream.executor",s"${streamName}Executor")
-
-  
set("dataSourceConfig.deserializerClass",classOf[JsonMessageDeserializer].getCanonicalName)
-
-  source(new 
KafkaSourcedSpoutProvider).renameOutputFields(1).withName(streamName)
-    .alertWithConsumer(streamName, streamExecutorId)
-}
-
-object KafkaStreamMonitor extends KafkaStreamMonitorApp
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/AbstractStreamProducerGraph.scala
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/AbstractStreamProducerGraph.scala
 
b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/AbstractStreamProducerGraph.scala
new file mode 100644
index 0000000..dc2c198
--- /dev/null
+++ 
b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/AbstractStreamProducerGraph.scala
@@ -0,0 +1,29 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one or more
+ *  * contributor license agreements.  See the NOTICE file distributed with
+ *  * this work for additional information regarding copyright ownership.
+ *  * The ASF licenses this file to You under the Apache License, Version 2.0
+ *  * (the "License"); you may not use this file except in compliance with
+ *  * the License.  You may obtain a copy of the License at
+ *  *
+ *  *    http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.
+ *
+ */
+package org.apache.eagle.datastream
+
+trait AbstractStreamProducerGraph {
+  def addEdge(from: StreamProducer, to: StreamProducer, streamConnector: 
StreamConnector)
+  def addVertex(producer: StreamProducer)
+  def iterator() : Iterator[StreamProducer]
+  def isSource(v : StreamProducer) : Boolean
+  def outgoingEdgesOf(v : StreamProducer) : 
scala.collection.mutable.Set[StreamConnector]
+  def getNodeByName(name : String) : Option[StreamProducer]
+  def incomingVertexOf(v: StreamProducer) : 
scala.collection.mutable.Set[StreamProducer]
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/AbstractTopologyCompiler.scala
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/AbstractTopologyCompiler.scala
 
b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/AbstractTopologyCompiler.scala
new file mode 100644
index 0000000..8c53ed5
--- /dev/null
+++ 
b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/AbstractTopologyCompiler.scala
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.datastream
+
+trait AbstractTopologyCompiler{
+  def buildTopology : AbstractTopologyExecutor
+}


Reply via email to