http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-data-process/eagle-stream-pipeline/src/test/resources/pipeline_5.conf
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-data-process/eagle-stream-pipeline/src/test/resources/pipeline_5.conf
 
b/eagle-core/eagle-data-process/eagle-stream-pipeline/src/test/resources/pipeline_5.conf
deleted file mode 100644
index 85b5334..0000000
--- 
a/eagle-core/eagle-data-process/eagle-stream-pipeline/src/test/resources/pipeline_5.conf
+++ /dev/null
@@ -1,110 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing software
-# distributed under the License is distributed on an "AS IS" BASIS
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-{
-       config {
-               envContextConfig {
-                       "env" : "storm"
-                       "mode" : "cluster"
-                       "topologyName" : "dynamical-topology-5"
-                       "parallelismConfig" : {
-                               "kafkaMsgConsumer" : 1
-                       },
-                       "nimbusHost":"sandbox.hortonworks.com",
-                       "nimbusThriftPort":6627
-               }
-               alertExecutorConfigs {
-                       defaultAlertExecutor  {
-                               "parallelism" : 1
-                               "partitioner" : 
"org.apache.eagle.policy.DefaultPolicyPartitioner"
-                               "needValidation" : "true"
-                       }
-               }
-               eagleProps {
-                       "site" : "sandbox"
-                       "application": "HADOOP"
-               }
-       }
-       
-       dataflow {
-               KafkaSource.JmxStreamOne {
-                       parallism = 1000
-                       topic = "metric_event_1"
-                       zkConnection = "sandbox.hortonworks.com:2181"
-                       zkConnectionTimeoutMS = 15000
-                       consumerGroupId = "Consumer"
-                       fetchSize = 1048586
-                       transactionZKServers = "sandbox.hortonworks.com"
-                       transactionZKPort = 2181
-                       transactionZKRoot = "/consumers"
-                       transactionStateUpdateMS = 2000
-                       deserializerClass = 
"org.apache.eagle.datastream.storm.JsonMessageDeserializer"
-               }
-
-               KafkaSource.JmxStreamTwo {
-                       parallism = 1000
-                       topic = "metric_event_2"
-                       zkConnection = "sandbox.hortonworks.com:2181"
-                       zkConnectionTimeoutMS = 15000
-                       consumerGroupId = "Consumer"
-                       fetchSize = 1048586
-                       transactionZKServers = "sandbox.hortonworks.com"
-                       transactionZKPort = 2181
-                       transactionZKRoot = "/consumers"
-                       transactionStateUpdateMS = 2000
-                       deserializerClass = 
"org.apache.eagle.datastream.storm.JsonMessageDeserializer"
-               }
-
-               KafkaSource.JmxStreamThree{
-                       parallism = 1000
-                       topic = "metric_event_3"
-                       zkConnection = "sandbox.hortonworks.com:2181"
-                       zkConnectionTimeoutMS = 15000
-                       consumerGroupId = "Consumer"
-                       fetchSize = 1048586
-                       transactionZKServers = "sandbox.hortonworks.com"
-                       transactionZKPort = 2181
-                       transactionZKRoot = "/consumers"
-                       transactionStateUpdateMS = 2000
-                       deserializerClass = 
"org.apache.eagle.datastream.storm.JsonMessageDeserializer"
-               }
-
-               Console.printer {
-                       format = "%s"
-               }
-
-               KafkaSink.metricStore {
-                       "topic" = "metric_event_persist"
-                       "bootstrap.servers" = "sandbox.hortonworks.com:6667"
-               }
-
-               Alert.defaultAlertExecutor {
-                       // upStreamNames = 
[JmxStreamOne,JmxStreamTwo,JmxStreamThree]
-                       // alertExecutorId = defaultAlertExecutor
-               }
-
-               JmxStreamOne|JmxStreamTwo|JmxStreamThree -> 
defaultAlertExecutor {
-                       grouping = shuffle
-               }
-
-               JmxStreamOne|JmxStreamTwo|JmxStreamThree -> metricStore {
-                       grouping = shuffle
-               }
-
-               JmxStreamOne|JmxStreamTwo|JmxStreamThree -> printer {
-                       grouping = shuffle
-               }
-       }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-data-process/eagle-stream-pipeline/src/test/scala/org/apache/eagle/stream/pipeline/ConfigSpec.scala
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-data-process/eagle-stream-pipeline/src/test/scala/org/apache/eagle/stream/pipeline/ConfigSpec.scala
 
b/eagle-core/eagle-data-process/eagle-stream-pipeline/src/test/scala/org/apache/eagle/stream/pipeline/ConfigSpec.scala
deleted file mode 100644
index 7b552da..0000000
--- 
a/eagle-core/eagle-data-process/eagle-stream-pipeline/src/test/scala/org/apache/eagle/stream/pipeline/ConfigSpec.scala
+++ /dev/null
@@ -1,37 +0,0 @@
-package org.apache.eagle.stream.pipeline
-
-import com.typesafe.config.ConfigFactory
-import org.scalatest.{FlatSpec, Matchers}
-
-/**
-  * Licensed to the Apache Software Foundation (ASF) under one or more
-  * contributor license agreements.  See the NOTICE file distributed with
-  * this work for additional information regarding copyright ownership.
-  * The ASF licenses this file to You under the Apache License, Version 2.0
-  * (the "License"); you may not use this file except in compliance with
-  * the License.  You may obtain a copy of the License at
-  *
-  * http://www.apache.org/licenses/LICENSE-2.0
-  *
-  * Unless required by applicable law or agreed to in writing, software
-  * distributed under the License is distributed on an "AS IS" BASIS,
-  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  * See the License for the specific language governing permissions and
-  * limitations under the License.
-  */
-class ConfigSpec extends FlatSpec with Matchers{
-  "Config" should "be overrode correctly" in {
-    val conf1 = ConfigFactory.parseString(
-      """
-        |value=1
-      """.stripMargin)
-    val conf2 = ConfigFactory.parseString(
-      """
-        |value=2
-      """.stripMargin)
-    val conf3 = conf1.withFallback(conf2)
-    val conf4 = conf2.withFallback(conf1)
-    conf3.getInt("value") should be(1)
-    conf4.getInt("value") should be(2)
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-data-process/eagle-stream-pipeline/src/test/scala/org/apache/eagle/stream/pipeline/DataFlowSpec.scala
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-data-process/eagle-stream-pipeline/src/test/scala/org/apache/eagle/stream/pipeline/DataFlowSpec.scala
 
b/eagle-core/eagle-data-process/eagle-stream-pipeline/src/test/scala/org/apache/eagle/stream/pipeline/DataFlowSpec.scala
deleted file mode 100644
index e63280a..0000000
--- 
a/eagle-core/eagle-data-process/eagle-stream-pipeline/src/test/scala/org/apache/eagle/stream/pipeline/DataFlowSpec.scala
+++ /dev/null
@@ -1,113 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.stream.pipeline
-
-import com.typesafe.config.ConfigFactory
-import org.apache.eagle.stream.pipeline.parser.{ConnectionIdentifier, 
DataFlow, DefinitionIdentifier, Identifier}
-import org.scalatest.{FlatSpec, Matchers}
-
-class DataFlowSpec extends FlatSpec with Matchers {
-  val dataFlowConfig =
-    """
-       |{
-       |       kafkaSource.metric_event_1 {
-       |    schema {
-       |      metric: string
-       |      timestamp: long
-       |      value: double
-       |    }
-       |               parallism = 1000
-       |               topic = "metric_event_1"
-       |               zkConnection = "localhost:2181"
-       |               zkConnectionTimeoutMS = 15000
-       |               consumerGroupId = "Consumer"
-       |               fetchSize = 1048586
-       |               transactionZKServers = "localhost"
-       |               transactionZKPort = 2181
-       |               transactionZKRoot = "/consumers"
-       |               transactionStateUpdateMS = 2000
-       |               deserializerClass = 
"org.apache.eagle.datastream.storm.JsonMessageDeserializer"
-       |       }
-       |
-       |       kafkaSource.metric_event_2 {
-       |               schema = {
-       |      metric: string
-       |      timestamp: long
-       |      value: double
-       |    }
-       |               parallism = 1000
-       |               topic = "metric_event_2"
-       |               zkConnection = "localhost:2181"
-       |               zkConnectionTimeoutMS = 15000
-       |               consumerGroupId = "Consumer"
-       |               fetchSize = 1048586
-       |               transactionZKServers = "localhost"
-       |               transactionZKPort = 2181
-       |               transactionZKRoot = "/consumers"
-       |               transactionStateUpdateMS = 2000
-       |               deserializerClass = 
"org.apache.eagle.datastream.storm.JsonMessageDeserializer"
-       |       }
-       |
-       |       kafkaSink.metricStore {}
-       |
-       |       alert.alert {
-       |               executor = "alertExecutor"
-       |       }
-       |
-       |       aggregator.aggreator {
-       |               executor = "aggreationExecutor"
-       |       }
-       |
-       |       metric_event_1|metric_event_2 -> alert {}
-       |       metric_event_1|metric_event_2 -> metricStore {}
-       |}
-     """.stripMargin
-
-  DataFlow.getClass.toString should "parse dataflow end-to-end correctly" in {
-    val config = ConfigFactory.parseString(dataFlowConfig)
-    config should not be null
-    val dataflow = DataFlow.parse(config)
-    dataflow should not be null
-    dataflow.getConnectors.size should be(4)
-    dataflow.getProcessors.size should be(5)
-  }
-
-  Identifier.getClass.toString should "parse as definition" in {
-    val defId = Identifier.parse("kafka").asInstanceOf[DefinitionIdentifier]
-    defId.moduleType should be("kafka")
-  }
-
-  Identifier.getClass.toString should "parse node1 -> node2 as connection" in {
-    val id = Identifier.parse("node1 -> 
node2").asInstanceOf[ConnectionIdentifier]
-    id.fromIds.size should be(1)
-  }
-
-  Identifier.getClass.toString should "parse node1|node2 -> node3" in {
-    val id = Identifier.parse("node1|node2 -> 
node3").asInstanceOf[ConnectionIdentifier]
-    id.fromIds.size should be(2)
-  }
-
-  Identifier.getClass.toString should "parse node1|node2|node3 -> node4 as 
connection" in {
-    val id = Identifier.parse("node1|node2|node3 -> 
node4").asInstanceOf[ConnectionIdentifier]
-    id.fromIds.size should be(3)
-  }
-
-  Identifier.getClass.toString should "parse node1 | node2 | node3 -> node4 as 
connection" in {
-    val id = Identifier.parse("node1 | node2 | node3 -> 
node4").asInstanceOf[ConnectionIdentifier]
-    id.fromIds.size should be(3)
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-data-process/eagle-stream-pipeline/src/test/scala/org/apache/eagle/stream/pipeline/PipelineSpec.scala
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-data-process/eagle-stream-pipeline/src/test/scala/org/apache/eagle/stream/pipeline/PipelineSpec.scala
 
b/eagle-core/eagle-data-process/eagle-stream-pipeline/src/test/scala/org/apache/eagle/stream/pipeline/PipelineSpec.scala
deleted file mode 100644
index 5e2007d..0000000
--- 
a/eagle-core/eagle-data-process/eagle-stream-pipeline/src/test/scala/org/apache/eagle/stream/pipeline/PipelineSpec.scala
+++ /dev/null
@@ -1,61 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.stream.pipeline
-
-import org.apache.eagle.datastream.ExecutionEnvironments.storm
-import org.scalatest.{FlatSpec, Matchers}
-
-class PipelineSpec extends FlatSpec with Matchers{
-  "Pipeline" should "parse successfully from pipeline_1.conf" in {
-    val pipeline = Pipeline.parseResource("pipeline_1.conf")
-    pipeline should not be null
-  }
-
-  "Pipeline" should "compile successfully from pipeline_2.conf" in {
-    val pipeline = Pipeline.parseResource("pipeline_2.conf")
-    pipeline should not be null
-    val stream = Pipeline.compile(pipeline)
-    stream should not be null
-    // Throw ClassNotFoundException when submit in unit test
-    // stream.submit[storm]
-  }
-}
-
-/**
- * Storm LocalCluster throws ClassNotFoundException when submit in unit test, 
so here submit in App
- */
-object PipelineSpec_2 extends App{
-  val pipeline = Pipeline(args).parseResource("pipeline_2.conf")
-  val stream = Pipeline.compile(pipeline)
-  stream.submit[storm]
-}
-
-object PipelineSpec_3 extends App {
-  Pipeline(args).submit[storm]("pipeline_3.conf")
-}
-
-object PipelineSpec_4 extends App {
-  Pipeline(args).submit[storm]("pipeline_4.conf")
-}
-
-object PipelineSpec_5 extends App {
-  Pipeline(args).submit[storm]("pipeline_5.conf")
-}
-
-object PipelineCLISpec extends App{
-  Pipeline.main(args)
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-data-process/eagle-stream-process-api/pom.xml
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/pom.xml 
b/eagle-core/eagle-data-process/eagle-stream-process-api/pom.xml
index e9be371..5e3fc9e 100644
--- a/eagle-core/eagle-data-process/eagle-stream-process-api/pom.xml
+++ b/eagle-core/eagle-data-process/eagle-stream-process-api/pom.xml
@@ -66,11 +66,6 @@
       </dependency>
       <dependency>
           <groupId>org.apache.eagle</groupId>
-          <artifactId>eagle-alert-process</artifactId>
-          <version>${project.version}</version>
-      </dependency>
-      <dependency>
-          <groupId>org.apache.eagle</groupId>
           <artifactId>eagle-stream-process-base</artifactId>
           <version>${project.version}</version>
       </dependency>

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/aggregate/AggregateExecutor.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/aggregate/AggregateExecutor.java
 
b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/aggregate/AggregateExecutor.java
deleted file mode 100644
index b17c192..0000000
--- 
a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/aggregate/AggregateExecutor.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.dataproc.impl.aggregate;
-
-import org.apache.eagle.policy.ResultRender;
-import org.apache.eagle.policy.dao.PolicyDefinitionDAO;
-import org.apache.eagle.policy.PolicyPartitioner;
-import 
org.apache.eagle.dataproc.impl.aggregate.entity.AggregateDefinitionAPIEntity;
-import org.apache.eagle.dataproc.impl.aggregate.entity.AggregateEntity;
-import org.apache.eagle.policy.executor.PolicyProcessExecutor;
-
-/**
- * @since Dec 16, 2015
- *
- */
-public class AggregateExecutor extends 
PolicyProcessExecutor<AggregateDefinitionAPIEntity, AggregateEntity> {
-
-       private static final long serialVersionUID = 1L;
-
-       private ResultRender<AggregateDefinitionAPIEntity, AggregateEntity> 
render = new AggregateResultRender();
-
-       public AggregateExecutor(String executorId, PolicyPartitioner 
partitioner, int numPartitions, int partitionSeq,
-                       PolicyDefinitionDAO<AggregateDefinitionAPIEntity> 
alertDefinitionDao, String[] sourceStreams) {
-               super(executorId, partitioner, numPartitions, partitionSeq, 
alertDefinitionDao, sourceStreams,
-                               AggregateDefinitionAPIEntity.class);
-       }
-
-       @Override
-       public ResultRender<AggregateDefinitionAPIEntity, AggregateEntity> 
getResultRender() {
-               return render;
-       }
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/aggregate/AggregateExecutorFactory.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/aggregate/AggregateExecutorFactory.java
 
b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/aggregate/AggregateExecutorFactory.java
deleted file mode 100644
index 5093685..0000000
--- 
a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/aggregate/AggregateExecutorFactory.java
+++ /dev/null
@@ -1,117 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.dataproc.impl.aggregate;
-
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigValue;
-import 
org.apache.eagle.dataproc.impl.aggregate.entity.AggregateDefinitionAPIEntity;
-import org.apache.eagle.policy.DefaultPolicyPartitioner;
-import org.apache.eagle.policy.PolicyPartitioner;
-import org.apache.eagle.policy.common.Constants;
-import org.apache.eagle.policy.dao.PolicyDefinitionDAO;
-import org.apache.eagle.policy.dao.PolicyDefinitionEntityDAOImpl;
-import org.apache.eagle.policy.executor.IPolicyExecutor;
-import org.apache.eagle.service.client.EagleServiceConnector;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.List;
-import java.util.Map;
-
-/**
- * @since Dec 16, 2015
- *
- */
-public class AggregateExecutorFactory {
-
-       private static final Logger LOG = 
LoggerFactory.getLogger(AggregateExecutorFactory.class);
-       
-       private AggregateExecutorFactory() {}
-       public static final AggregateExecutorFactory Instance = new 
AggregateExecutorFactory();
-
-
-       public IPolicyExecutor[] createExecutors(List<String> streamNames, 
String cql) throws Exception {
-               int numPartitions = 1; //loadExecutorConfig(config, executorId, 
partitionerCls);
-
-               IPolicyExecutor[] executors = new 
IPolicyExecutor[numPartitions];
-               String[] upStreams = streamNames.toArray(new String[0]);
-               for (int i = 0; i < numPartitions ; i++ ) {
-                       executors[i] = new SimpleAggregateExecutor(upStreams, 
cql, "siddhiCEPEngine", i, numPartitions);
-               }
-
-               return executors;
-       }
-
-       public IPolicyExecutor[] createExecutors(Config config, List<String> 
streamNames, String executorId) throws Exception {
-               StringBuilder partitionerCls = new 
StringBuilder(DefaultPolicyPartitioner.class.getCanonicalName());
-        int numPartitions = loadExecutorConfig(config, executorId, 
partitionerCls);
-               PolicyDefinitionDAO<AggregateDefinitionAPIEntity> policyDefDao 
= new PolicyDefinitionEntityDAOImpl<>(
-                               new EagleServiceConnector(config), 
Constants.AGGREGATE_DEFINITION_SERVICE_ENDPOINT_NAME);
-               return newAggregateExecutors(policyDefDao, streamNames, 
executorId, numPartitions, partitionerCls.toString());
-       }
-       
-       @SuppressWarnings("unchecked")
-       private int loadExecutorConfig(Config config, String executorId, 
StringBuilder partitionerCls) {
-               int numPartitions = 0;
-               String aggregateExecutorConfigsKey = "aggregateExecutorConfigs";
-        if(config.hasPath(aggregateExecutorConfigsKey)) {
-            Map<String, ConfigValue> analyzeExecutorConfigs = 
config.getObject(aggregateExecutorConfigsKey);
-            if(analyzeExecutorConfigs !=null && 
analyzeExecutorConfigs.containsKey(executorId)) {
-                Map<String, Object> alertExecutorConfig = (Map<String, 
Object>) analyzeExecutorConfigs.get(executorId).unwrapped();
-                int parts = 0;
-                if(alertExecutorConfig.containsKey("parallelism")) parts = 
(int) (alertExecutorConfig.get("parallelism"));
-                numPartitions = parts == 0 ? 1 : parts;
-                if(alertExecutorConfig.containsKey("partitioner")) {
-                       partitionerCls.setLength(0);
-                       partitionerCls.append((String) 
alertExecutorConfig.get("partitioner"));
-                }
-            }
-        }
-        return numPartitions;
-       }
-
-//     private List<String> findStreamNames(Config config, String executorId, 
String dataSource) throws Exception {
-//             // Get map from alertExecutorId to alert stream
-//             // (dataSource) => 
Map[alertExecutorId:String,streamName:List[String]]
-//             List<String> streamNames = new ArrayList<String>();
-//             // FIXME : here we reuse the executor definition. But the name 
alert is not ambiguous now.
-//             AlertExecutorDAOImpl alertExecutorDAO = new 
AlertExecutorDAOImpl(new EagleServiceConnector(config));
-//             List<AlertExecutorEntity> alertExecutorEntities = 
alertExecutorDAO.findAlertExecutor(dataSource,
-//                             executorId);
-//             for (AlertExecutorEntity entity : alertExecutorEntities) {
-//                     
streamNames.add(entity.getTags().get(Constants.STREAM_NAME));
-//             }
-//             return streamNames;
-//     }
-       
-       private AggregateExecutor[] 
newAggregateExecutors(PolicyDefinitionDAO<AggregateDefinitionAPIEntity> 
alertDefDAO,
-                       List<String> sourceStreams, String executorID, int 
numPartitions, String partitionerCls)
-                                       throws Exception {
-               LOG.info("Creating aggregator executors with executorID: " + 
executorID + ", numPartitions: "
-                               + numPartitions + ", Partition class is: " + 
partitionerCls);
-
-               PolicyPartitioner partitioner = (PolicyPartitioner) 
Class.forName(partitionerCls).newInstance();
-               AggregateExecutor[] alertExecutors = new 
AggregateExecutor[numPartitions];
-               String[] _sourceStreams = sourceStreams.toArray(new 
String[sourceStreams.size()]);
-
-               for (int i = 0; i < numPartitions; i++) {
-                       alertExecutors[i] = new AggregateExecutor(executorID, 
partitioner, numPartitions, i, alertDefDAO,
-                                       _sourceStreams);
-               }
-               return alertExecutors;
-       }
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/aggregate/AggregateResultRender.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/aggregate/AggregateResultRender.java
 
b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/aggregate/AggregateResultRender.java
deleted file mode 100644
index 986885a..0000000
--- 
a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/aggregate/AggregateResultRender.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.dataproc.impl.aggregate;
-
-import com.typesafe.config.Config;
-import 
org.apache.eagle.dataproc.impl.aggregate.entity.AggregateDefinitionAPIEntity;
-import org.apache.eagle.dataproc.impl.aggregate.entity.AggregateEntity;
-import org.apache.eagle.policy.PolicyEvaluationContext;
-import org.apache.eagle.policy.ResultRender;
-
-import java.io.Serializable;
-import java.util.List;
-
-/**
- * Created on 12/29/15.
- */
-public class AggregateResultRender implements 
ResultRender<AggregateDefinitionAPIEntity, AggregateEntity>, Serializable {
-
-
-    @Override
-    public AggregateEntity render(Config config,
-                                  List<Object> rets,
-                                  
PolicyEvaluationContext<AggregateDefinitionAPIEntity, AggregateEntity> 
siddhiAlertContext,
-                                  long timestamp) {
-        AggregateEntity result = new AggregateEntity();
-        for (Object o : rets) {
-            result.add(o);
-        }
-        return result;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/aggregate/SimpleAggregateExecutor.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/aggregate/SimpleAggregateExecutor.java
 
b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/aggregate/SimpleAggregateExecutor.java
deleted file mode 100644
index e0dadbf..0000000
--- 
a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/aggregate/SimpleAggregateExecutor.java
+++ /dev/null
@@ -1,189 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.dataproc.impl.aggregate;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.typesafe.config.Config;
-import org.apache.eagle.dataproc.core.JsonSerDeserUtils;
-import org.apache.eagle.dataproc.core.ValuesArray;
-import 
org.apache.eagle.dataproc.impl.aggregate.entity.AggregateDefinitionAPIEntity;
-import org.apache.eagle.dataproc.impl.aggregate.entity.AggregateEntity;
-import org.apache.eagle.datastream.Collector;
-import org.apache.eagle.datastream.JavaStormStreamExecutor2;
-import org.apache.eagle.policy.PolicyEvaluationContext;
-import org.apache.eagle.policy.PolicyEvaluator;
-import org.apache.eagle.policy.PolicyManager;
-import org.apache.eagle.policy.common.Constants;
-import org.apache.eagle.policy.config.AbstractPolicyDefinition;
-import org.apache.eagle.policy.executor.IPolicyExecutor;
-import org.apache.eagle.policy.siddhi.SiddhiEvaluationHandler;
-import org.apache.hadoop.hbase.util.MD5Hash;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import scala.Tuple2;
-
-import java.util.Date;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-/**
- * Only one policy for one simple aggregate executor
- *
- * Created on 1/10/16.
- */
-public class SimpleAggregateExecutor
-        extends JavaStormStreamExecutor2<String, AggregateEntity>
-        implements SiddhiEvaluationHandler<AggregateDefinitionAPIEntity, 
AggregateEntity>, IPolicyExecutor<AggregateDefinitionAPIEntity, 
AggregateEntity> {
-
-    private static final Logger LOG = 
LoggerFactory.getLogger(SimpleAggregateExecutor.class);
-
-    private final String cql;
-    private final int partitionSeq;
-    private final int totalPartitionNum;
-
-    private final String[] upStreamNames;
-    private String policyId;
-    private String executorId;
-    private Config config;
-    private AggregateDefinitionAPIEntity aggDef;
-    private PolicyEvaluator<AggregateDefinitionAPIEntity> evaluator;
-
-    public SimpleAggregateExecutor(String[] upStreams, String cql, String 
policyType, int partitionSeq, int totalPartitionNum) {
-        this.cql = cql;
-        this.partitionSeq = partitionSeq;
-        this.upStreamNames = upStreams;
-        this.totalPartitionNum = totalPartitionNum;
-        // create an fixed definition policy api entity, and indicate it has 
full definition
-        aggDef = new AggregateDefinitionAPIEntity();
-        aggDef.setTags(new HashMap<String, String>());
-        aggDef.getTags().put(Constants.POLICY_TYPE, policyType);
-        // TODO make it more general, not only hard code siddhi cep support 
here.
-        try {
-            Map<String,Object> template = new HashMap<>();
-            template.put("type","siddhiCEPEngine");
-            template.put("expression",this.cql);
-            template.put("containsDefinition",true);
-            aggDef.setPolicyDef(new 
ObjectMapper().writer().writeValueAsString(template));
-        } catch (Exception e) {
-            LOG.error("Simple aggregate generate policy definition failed!", 
e);
-        }
-        aggDef.setCreatedTime(new Date().getTime());
-        aggDef.setLastModifiedDate(new Date().getTime());
-        aggDef.setName("anonymous-aggregation-def");
-        aggDef.setOwner("anonymous");
-        aggDef.setEnabled(true);
-        aggDef.setDescription("anonymous aggregation definition");
-
-        String random = MD5Hash.getMD5AsHex(cql.getBytes());
-        policyId = "anonymousAggregatePolicyId-" + random;
-        executorId= "anonymousAggregateId-" +random;
-    }
-
-    @Override
-    public void prepareConfig(Config config) {
-        this.config = config;
-    }
-
-    @Override
-    public void init() {
-        evaluator = createPolicyEvaluator(aggDef);
-    }
-
-    /**
-     * Create PolicyEvaluator instance according to policyType-mapped policy 
evaluator class
-     *
-     * @return PolicyEvaluator instance
-     */
-    @SuppressWarnings({"unchecked", "rawtypes"})
-    protected PolicyEvaluator<AggregateDefinitionAPIEntity> 
createPolicyEvaluator(AggregateDefinitionAPIEntity alertDef) {
-        String policyType = alertDef.getTags().get(Constants.POLICY_TYPE);
-        Class<? extends PolicyEvaluator> evalCls = 
PolicyManager.getInstance().getPolicyEvaluator(policyType);
-        if (evalCls == null) {
-            String msg = "No policy evaluator defined for policy type : " + 
policyType;
-            LOG.error(msg);
-            throw new IllegalStateException(msg);
-        }
-
-        AbstractPolicyDefinition policyDef = null;
-        try {
-            policyDef = JsonSerDeserUtils.deserialize(alertDef.getPolicyDef(), 
AbstractPolicyDefinition.class,
-                    PolicyManager.getInstance().getPolicyModules(policyType));
-        } catch (Exception ex) {
-            LOG.error("Fail initial alert policy def: " + 
alertDef.getPolicyDef(), ex);
-        }
-
-        PolicyEvaluator<AggregateDefinitionAPIEntity> pe;
-        PolicyEvaluationContext<AggregateDefinitionAPIEntity, AggregateEntity> 
context = new PolicyEvaluationContext<>();
-        context.policyId = alertDef.getTags().get("policyId");
-        context.alertExecutor = this;
-        context.resultRender = new AggregateResultRender();
-        try {
-            // create evaluator instances
-            pe = (PolicyEvaluator<AggregateDefinitionAPIEntity>) evalCls
-                    .getConstructor(Config.class, 
PolicyEvaluationContext.class, AbstractPolicyDefinition.class, String[].class, 
boolean.class)
-                    .newInstance(config, context, policyDef, upStreamNames, 
false);
-        } catch (Exception ex) {
-            LOG.error("Fail creating new policyEvaluator", ex);
-            LOG.warn("Broken policy definition and stop running : " + 
alertDef.getPolicyDef());
-            throw new IllegalStateException(ex);
-        }
-        return pe;
-    }
-
-    @Override
-    public void flatMap(List<Object> input, Collector<Tuple2<String, 
AggregateEntity>> collector) {
-        if (input.size() != 3)
-            throw new IllegalStateException("AggregateExecutor always consumes 
exactly 3 fields: key, stream name and value(SortedMap)");
-        if (LOG.isDebugEnabled()) LOG.debug("Msg is coming " + input.get(2));
-        if (LOG.isDebugEnabled()) LOG.debug("Current policyEvaluators: " + 
evaluator);
-
-        try {
-            evaluator.evaluate(new ValuesArray(collector, input.get(1), 
input.get(2)));
-        } catch (Exception ex) {
-            LOG.error("Got an exception, but continue to run " + 
input.get(2).toString(), ex);
-        }
-    }
-
-    @Override
-    public void 
onEvalEvents(PolicyEvaluationContext<AggregateDefinitionAPIEntity, 
AggregateEntity> context, List<AggregateEntity> alerts) {
-        if (alerts != null && !alerts.isEmpty()) {
-            String policyId = context.policyId;
-            LOG.info(String.format("Detected %d alerts for policy %s", 
alerts.size(), policyId));
-            Collector outputCollector = context.outputCollector;
-            PolicyEvaluator<AggregateDefinitionAPIEntity> evaluator = 
context.evaluator;
-            for (AggregateEntity entity : alerts) {
-                synchronized (this) {
-                    outputCollector.collect(new Tuple2(policyId, entity));
-                }
-                if (LOG.isDebugEnabled()) {
-                    LOG.debug("A new alert is triggered: " + executorId + ", 
partition " + partitionSeq + ", Got an alert with output context: " + entity + 
", for policy " + evaluator);
-                }
-            }
-        }
-    }
-
-    @Override
-    public String getExecutorId() {
-        return executorId;
-    }
-
-    @Override
-    public int getPartitionSeq() {
-        return partitionSeq;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/aggregate/entity/AggregateDefinitionAPIEntity.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/aggregate/entity/AggregateDefinitionAPIEntity.java
 
b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/aggregate/entity/AggregateDefinitionAPIEntity.java
deleted file mode 100644
index 62830ae..0000000
--- 
a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/aggregate/entity/AggregateDefinitionAPIEntity.java
+++ /dev/null
@@ -1,121 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.dataproc.impl.aggregate.entity;
-
-import org.apache.eagle.alert.entity.AbstractPolicyDefinitionEntity;
-import org.apache.eagle.log.entity.meta.*;
-import org.apache.eagle.policy.common.Constants;
-import org.codehaus.jackson.annotate.JsonIgnoreProperties;
-import org.codehaus.jackson.map.annotate.JsonSerialize;
-
-/**
- * entity of stream analyze definition
- *
- */
-@JsonSerialize(include=JsonSerialize.Inclusion.NON_NULL)
-@Table("aggregatedef")
-@ColumnFamily("f")
-@Prefix("aggregatedef")
-@Service(Constants.AGGREGATE_DEFINITION_SERVICE_ENDPOINT_NAME)
-@JsonIgnoreProperties(ignoreUnknown = true)
-@TimeSeries(false)
-@Tags({"site", "dataSource", "executorId", "policyId", "policyType"})
-@Indexes({
-       @Index(name="Index_1_aggregateExecutorId", columns = { "executorId" }, 
unique = true),
-})
-@SuppressWarnings("serial")
-public class AggregateDefinitionAPIEntity extends 
AbstractPolicyDefinitionEntity {
-
-       @Column("a")
-       private String name;
-       @Column("b")
-       private String policyDef;
-       @Column("c")
-       private String description;
-       @Column("d")
-       private boolean enabled;
-       @Column("e")
-       private String owner;
-       @Column("f")
-       private long lastModifiedDate;
-       @Column("g")
-       private long createdTime;
-
-       public String getName() {
-               return name;
-       }
-
-       public void setName(String name) {
-               this.name = name;
-       }
-
-       public String getPolicyDef() {
-               return policyDef;
-       }
-
-       public void setPolicyDef(String policyDef) {
-               this.policyDef = policyDef;
-               valueChanged("policyDef");
-       }
-
-       public String getDescription() {
-               return description;
-       }
-
-       public void setDescription(String description) {
-               this.description = description;
-               valueChanged("description");
-       }
-
-       public boolean isEnabled() {
-               return enabled;
-       }
-
-       public void setEnabled(boolean enabled) {
-               this.enabled = enabled;
-               valueChanged("enabled");
-       }
-
-       public String getOwner() {
-               return owner;
-       }
-
-       public void setOwner(String owner) {
-               this.owner = owner;
-               valueChanged("owner");
-       }
-
-       public long getLastModifiedDate() {
-               return lastModifiedDate;
-       }
-
-       public void setLastModifiedDate(long lastModifiedDate) {
-               this.lastModifiedDate = lastModifiedDate;
-               valueChanged("lastModifiedDate");
-       }
-
-       public long getCreatedTime() {
-               return createdTime;
-       }
-
-       public void setCreatedTime(long createdTime) {
-               this.createdTime = createdTime;
-               valueChanged("createdTime");
-       }
-
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/aggregate/entity/AggregateEntity.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/aggregate/entity/AggregateEntity.java
 
b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/aggregate/entity/AggregateEntity.java
deleted file mode 100644
index 64c20b2..0000000
--- 
a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/aggregate/entity/AggregateEntity.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.dataproc.impl.aggregate.entity;
-
-import java.io.Serializable;
-import java.util.LinkedList;
-import java.util.List;
-
-/**
- * Event entity during stream processing
- * 
- * @since Dec 17, 2015
- *
- */
-public class AggregateEntity implements Serializable {
-
-       private static final long serialVersionUID = 5911351515190098292L;
-
-    private List<Object> data = new LinkedList<>();
-
-    public void add(Object res) {
-        data.add(res);
-    }
-
-    public List<Object> getData() {
-        return data;
-    }
-       
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/aggregate/entity/AggregateEntityRepository.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/aggregate/entity/AggregateEntityRepository.java
 
b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/aggregate/entity/AggregateEntityRepository.java
deleted file mode 100644
index 7c932d4..0000000
--- 
a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/aggregate/entity/AggregateEntityRepository.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.dataproc.impl.aggregate.entity;
-
-import org.apache.eagle.log.entity.repo.EntityRepository;
-
-/**
- * Created on 1/6/16.
- */
-public class AggregateEntityRepository extends EntityRepository {
-    public AggregateEntityRepository() {
-        entitySet.add(AggregateDefinitionAPIEntity.class);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/persist/IPersistService.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/persist/IPersistService.java
 
b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/persist/IPersistService.java
deleted file mode 100644
index 0732639..0000000
--- 
a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/persist/IPersistService.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.dataproc.impl.persist;
-
-/**
- * Interface by the stream framework to storage
- * 
- * @since Dec 19, 2015
- *
- */
-public interface IPersistService<T> {
-
-       boolean save(String stream, T apiEntity) throws Exception;
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/persist/PersistExecutor.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/persist/PersistExecutor.java
 
b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/persist/PersistExecutor.java
deleted file mode 100644
index 2e1754b..0000000
--- 
a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/persist/PersistExecutor.java
+++ /dev/null
@@ -1,84 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.dataproc.impl.persist;
-
-import com.typesafe.config.Config;
-import org.apache.eagle.dataproc.impl.aggregate.entity.AggregateEntity;
-import org.apache.eagle.dataproc.impl.persist.druid.KafkaPersistService;
-import org.apache.eagle.datastream.Collector;
-import org.apache.eagle.datastream.JavaStormStreamExecutor2;
-import org.apache.eagle.datastream.core.StorageType;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import scala.Tuple2;
-
-import java.text.MessageFormat;
-import java.util.List;
-
-/**
- *
- * TODO: currently only accept to be used after aggregation node (See the 
AggregateEntity reference here).
- * @since Dec 19, 2015
- *
- */
-public class PersistExecutor extends JavaStormStreamExecutor2<String, 
AggregateEntity> {
-       
-       private static final Logger LOG = 
LoggerFactory.getLogger(PersistExecutor.class);
-
-       private Config config;
-       private IPersistService<AggregateEntity> persistService;
-       private String persistExecutorId;
-       private String persistType;
-
-       public PersistExecutor(String persistExecutorId, String persistType) {
-               this.persistExecutorId = persistExecutorId;
-               this.persistType = persistType;
-       }
-
-    @Override
-       public void prepareConfig(Config config) {
-               this.config = config;
-       }
-
-    @Override
-       public void init() {
-               if 
(persistType.equalsIgnoreCase(StorageType.KAFKA().toString())) {
-                       Config subConfig = 
this.config.getConfig("persistExecutorConfigs" + "." + persistExecutorId);
-                       persistService = new KafkaPersistService(subConfig);
-               } else {
-                       throw new RuntimeException(String.format("Persist type 
'%s' not supported yet!", persistService));
-               }
-       }
-
-       @Override
-       public void flatMap(List<Object> input, Collector<Tuple2<String, 
AggregateEntity>> collector) {
-               if (input.size() != 2) {
-                       LOG.error(String.format("Persist executor expect two 
elements per tuple. But actually got size %d lists!",
-                                       input.size()));
-                       return;
-               }
-
-               String policyId = (String) input.get(0);
-               AggregateEntity entity = (AggregateEntity) input.get(1);
-               try {
-                       persistService.save("defaultOutput", entity);
-               } catch (Exception e) {
-                       LOG.error(MessageFormat.format("persist entity failed: 
{0}", entity), e);
-               }
-       }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/persist/druid/AggregateEntitySerializer.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/persist/druid/AggregateEntitySerializer.java
 
b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/persist/druid/AggregateEntitySerializer.java
deleted file mode 100644
index ea61278..0000000
--- 
a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/persist/druid/AggregateEntitySerializer.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.dataproc.impl.persist.druid;
-
-import org.apache.eagle.dataproc.impl.aggregate.entity.AggregateEntity;
-import org.apache.kafka.common.serialization.Serializer;
-import org.apache.kafka.common.serialization.StringSerializer;
-import org.codehaus.jackson.map.ObjectMapper;
-import org.codehaus.jackson.map.SerializationConfig;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.Closeable;
-import java.io.IOException;
-import java.util.Map;
-
-/**
- * TODO: configurable null handling for serialization??
- * Created on 1/4/16.
- */
-public class AggregateEntitySerializer implements
-        Closeable, AutoCloseable, Serializer<AggregateEntity> {
-
-    private final StringSerializer stringSerializer = new StringSerializer();
-    private static final Logger logger = 
LoggerFactory.getLogger(AggregateEntitySerializer.class);
-    private static final ObjectMapper om = new ObjectMapper();
-
-    static {
-        om.configure(SerializationConfig.Feature.WRITE_DATES_AS_TIMESTAMPS, 
true);
-    }
-
-    @Override
-    public void configure(Map<String, ?> configs, boolean isKey) {
-
-    }
-
-    @Override
-    public byte[] serialize(String topic, AggregateEntity data) {
-        String str = null;
-        try {
-            str = om.writeValueAsString(data.getData());
-        } catch (IOException e) {
-            logger.error("Kafka serialization for send error!", e);
-        }
-        return stringSerializer.serialize(topic, str);
-    }
-
-    @Override
-    public void close() {
-
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/persist/druid/KafkaPersistService.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/persist/druid/KafkaPersistService.java
 
b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/persist/druid/KafkaPersistService.java
deleted file mode 100644
index 919b92e..0000000
--- 
a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/persist/druid/KafkaPersistService.java
+++ /dev/null
@@ -1,122 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.dataproc.impl.persist.druid;
-
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigValue;
-import org.apache.eagle.dataproc.impl.aggregate.entity.AggregateEntity;
-import org.apache.eagle.dataproc.impl.persist.IPersistService;
-import org.apache.kafka.clients.producer.KafkaProducer;
-import org.apache.kafka.clients.producer.ProducerRecord;
-import org.apache.kafka.clients.producer.RecordMetadata;
-import org.apache.kafka.common.serialization.StringSerializer;
-
-import java.util.*;
-import java.util.concurrent.Future;
-
-/**
- * TODO : support more general entity input
- * @since Dec 21, 2015
- *
- */
-public class KafkaPersistService implements IPersistService<AggregateEntity> {
-
-       private static final String ACKS = "acks";
-       private static final String RETRIES = "retries";
-       private static final String BATCH_SIZE = "batchSize";
-       private static final String LINGER_MS = "lingerMs";
-       private static final String BUFFER_MEMORY = "bufferMemory";
-       private static final String KEY_SERIALIZER = "keySerializer";
-       private static final String VALUE_SERIALIZER = "valueSerializer";
-       private static final String BOOTSTRAP_SERVERS = "bootstrap_servers";
-       
-       private KafkaProducer<String, AggregateEntity> producer;
-       private final Config config;
-       private final SortedMap<String, String> streamTopicMap;
-       private final Properties props;
-       
-       /**
-        * <pre>
-        * props.put("bootstrap.servers", "localhost:4242");
-        * props.put("acks", "all");
-        * props.put("retries", 0);
-        * props.put("batch.size", 16384);
-        * props.put("linger.ms", 1);
-        * props.put("buffer.memory", 33554432);
-        * props.put("key.serializer", 
"org.apache.kafka.common.serialization.StringSerializer");
-        * props.put("value.serializer", 
"org.apache.kafka.common.serialization.StringSerializer");
-        * </pre>
-        */
-       public KafkaPersistService(Config config) {
-               this.config = config;
-               Config kafkaConfig = config.getConfig("kafka");
-               if (kafkaConfig == null) {
-                       throw new IllegalStateException("Druid persiste service 
failed to find kafka configurations!");
-               }
-               props = new Properties();
-               if (kafkaConfig.hasPath(BOOTSTRAP_SERVERS)) {
-                       props.put("bootstrap.servers", 
kafkaConfig.getString(BOOTSTRAP_SERVERS));
-               }
-               if (kafkaConfig.hasPath(ACKS)) {
-                       props.put(ACKS, kafkaConfig.getString(ACKS));
-               }
-               if (kafkaConfig.hasPath(RETRIES)) {
-                       props.put(RETRIES, kafkaConfig.getInt(RETRIES));
-               }
-               if (kafkaConfig.hasPath(BATCH_SIZE)) {
-                       props.put("batch.size", kafkaConfig.getInt(BATCH_SIZE));
-               }
-               if (kafkaConfig.hasPath(LINGER_MS)) {
-                       props.put("linger.ms", kafkaConfig.getInt(LINGER_MS));
-               }
-               if (kafkaConfig.hasPath(BUFFER_MEMORY)) {
-                       props.put("buffer.memory", 
kafkaConfig.getLong(BUFFER_MEMORY));
-               }
-               if (kafkaConfig.hasPath(KEY_SERIALIZER)) {
-                       props.put("key.serializer", 
kafkaConfig.getString(KEY_SERIALIZER));
-               } else {
-                       props.put("key.serializer", 
StringSerializer.class.getCanonicalName());
-               }
-//             if (kafkaConfig.hasPath(VALUE_SERIALIZER)) {
-//                     props.put("value.serializer", 
kafkaConfig.getString(VALUE_SERIALIZER));
-//             }
-               props.put("value.serializer", 
AggregateEntitySerializer.class.getCanonicalName());
-
-               streamTopicMap = new TreeMap<>();
-               if (kafkaConfig.hasPath("topics")) {
-                       Config topicConfig = kafkaConfig.getConfig("topics");
-                       Set<Map.Entry<String, ConfigValue>> topics = 
topicConfig.entrySet();
-                       for (Map.Entry<String, ConfigValue> t : topics) {
-                               streamTopicMap.put(t.getKey(), (String) 
t.getValue().unwrapped());
-                       }
-               }
-
-               producer = new KafkaProducer<>(props);
-       }
-
-       @Override
-       public boolean save(String stream, AggregateEntity apiEntity) throws 
Exception {
-               if (streamTopicMap.get(stream) != null) {
-                       ProducerRecord<String, AggregateEntity> record = new 
ProducerRecord<>(streamTopicMap.get(stream), apiEntity);
-                       Future<RecordMetadata> future = producer.send(record);
-                       // TODO : more for check the sending status
-                       return true;
-               }
-               return false;
-       }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/storm/hdfs/HDFSSourcedStormSpoutProvider.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/storm/hdfs/HDFSSourcedStormSpoutProvider.java
 
b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/storm/hdfs/HDFSSourcedStormSpoutProvider.java
index 2323f39..e9ba3ca 100644
--- 
a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/storm/hdfs/HDFSSourcedStormSpoutProvider.java
+++ 
b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/storm/hdfs/HDFSSourcedStormSpoutProvider.java
@@ -43,9 +43,6 @@ public class HDFSSourcedStormSpoutProvider implements 
StormSpoutProvider {
                        if(conf.equalsIgnoreCase("data collection")){
                                return new 
DataCollectionHDFSSpout(configContext); 
                        }
-                       if(conf.equalsIgnoreCase("user profile generation")){
-                               return new 
UserProfileGenerationHDFSSpout(configContext); 
-                       }
                        return null;
                }
        }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/storm/hdfs/UserProfileGenerationHDFSSpout.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/storm/hdfs/UserProfileGenerationHDFSSpout.java
 
b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/storm/hdfs/UserProfileGenerationHDFSSpout.java
deleted file mode 100644
index e07ee81..0000000
--- 
a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/storm/hdfs/UserProfileGenerationHDFSSpout.java
+++ /dev/null
@@ -1,299 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.dataproc.impl.storm.hdfs;
-
-import java.io.BufferedReader;
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.InputStreamReader;
-import java.io.Reader;
-import java.io.Serializable;
-import java.net.URL;
-import java.net.URLClassLoader;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-
-import com.typesafe.config.Config;
-import org.apache.eagle.dataproc.core.StreamingProcessConstants;
-import org.apache.eagle.dataproc.core.ValuesArray;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapred.JobConf;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import backtype.storm.spout.SpoutOutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.tuple.Fields;
-import backtype.storm.utils.Utils;
-
-import com.esotericsoftware.minlog.Log;
-
-public class UserProfileGenerationHDFSSpout extends 
HDFSSourcedStormSpoutProvider.HDFSSpout {
-
-       private static final long serialVersionUID = 2274234104008894386L;
-       private Config configContext;
-       private TopologyContext _context; 
-       SpoutOutputCollector _collector;
-       
-       public class UserProfileData implements Serializable{
-               private static final long serialVersionUID = 
-3315860110144736840L;
-               private String user; 
-               private List<String> dateTime = new ArrayList<String>(); 
-               private List<Integer> hrInDay = new ArrayList<Integer>(); 
-               private List<String> line = new ArrayList<String>();
-               
-               public String getUser() {
-                       return user;
-               }
-               public void setUser(String user) {
-                       this.user = user;
-               }
-               public String getDateTime(int index) {
-                       return dateTime.get(index);
-               }
-               public List<String> getDateTimes() {
-                       return dateTime;
-               }
-               public void setDateTime(String dateTime) {
-                       this.dateTime.add(dateTime);
-               }
-               public int getHrInDay(int index) {
-                       return hrInDay.get(index);
-               }
-               public List<Integer> getHrsInDay() {
-                       return hrInDay;
-               }
-               public void setHrInDay(int hrInDay) {
-                       this.hrInDay.add(hrInDay);
-               }
-               public String getLine(int index) {
-                       return line.get(index);
-               }
-               public List<String> getLines() {
-                       return line;
-               }
-               public void setLine(String line) {
-                       this.line.add(line);
-               } 
-               
-       }
-       
-       private static final Logger LOG = 
LoggerFactory.getLogger(UserProfileGenerationHDFSSpout.class);
-       
-       public UserProfileGenerationHDFSSpout(Config configContext){
-               this.configContext = configContext;
-               LOG.info("UserProfileGenerationHDFSSpout called");
-       }
-       
-       public void copyFiles(){
-               LOG.info("Inside listFiles()");
-               //Configuration conf = new Configuration();
-               JobConf conf = new JobConf();
-               // _____________ TO TEST THAT CORRECT HADOOP JARs ARE INCLUDED 
__________________
-               ClassLoader cl = ClassLoader.getSystemClassLoader();
-        URL[] urls = ((URLClassLoader)cl).getURLs();
-        if(LOG.isDebugEnabled()) {
-                       for (URL url : urls) {
-                               LOG.debug(url.getFile());
-                       }
-               }
-               // _________________________________________
-        String hdfsConnectionStr = 
configContext.getString("dataSourceConfig.hdfsConnection");
-        LOG.info("HDFS connection string: " + hdfsConnectionStr);
-       
-               String hdfsPath = 
configContext.getString("dataSourceConfig.hdfsPath");
-               LOG.info("HDFS path: " + hdfsPath);
-                
-               String copyToPath = 
configContext.getString("dataSourceConfig.copyToPath");
-               LOG.info("copyToPath: " + copyToPath);
-               String srcPathStr = new String("hdfs://" + hdfsConnectionStr + 
hdfsPath);
-               Path srcPath = new Path(srcPathStr); 
-               LOG.info("listFiles called");
-               LOG.info("srcPath: " + srcPath);
-               try {
-                       FileSystem fs = srcPath.getFileSystem(conf);
-                       /*CompressionCodecFactory codecFactory = new 
CompressionCodecFactory(conf); 
-                       CompressionCodec codec = codecFactory.getCodec(srcPath);
-                       DataInputStream inputStream = new 
DataInputStream(codec.createInputStream(fs.open(srcPath)));
-                       */
-                       
-                       Path destPath = new Path(copyToPath);
-                       LOG.info("Destination path: " + destPath);
-                       String userListFileName = 
configContext.getString("dataSourceConfig.userList");
-                       //loggerHDFSSpout.info("userListFileName: " + 
userListFileName);
-                       List<String> userList = getUser(userListFileName);
-                       for(String user:userList){
-                               Path finalSrcPath = new Path(srcPath.getName() 
+ "/" + user);
-                               fs.copyToLocalFile(finalSrcPath, destPath);
-                       }
-                       LOG.info("Copy to local succeed");
-                       fs.close();
-                                                       
-               } catch (IOException e) {
-                       // TODO Auto-generated catch block
-                       e.printStackTrace();
-               }
-               
-       }
-       
-       private List<String> getAllFiles(String root, int level){
-               
-               List<String> lists = new ArrayList<String>();
-               File rootFile = new File(root);
-               File[] tempList = rootFile.listFiles();
-               if(tempList == null)
-                       return lists; 
-               
-               for(File temp:tempList){
-                       if(temp.isDirectory())
-                               
lists.addAll(getAllFiles(temp.getAbsolutePath(), ++level));
-                       else{
-                               if(temp.getName().endsWith(".csv"))
-                                       lists.add(temp.getAbsolutePath());
-                       }
-               }
-               return lists;
-                       
-       }
-       
-       public List<String> listFiles(String path){
-               
-               LOG.info("Reading from: " + path);
-               List<String> files = new ArrayList<String>();
-               files = getAllFiles(path, 0); 
-               return files;
-       }
-       
-       private List<String> getUser(String listFileName){
-               List<String> userList = new ArrayList<String>();
-               BufferedReader reader = null; 
-               try{
-                       InputStream is = 
getClass().getResourceAsStream(listFileName);
-                       reader = new BufferedReader(new InputStreamReader(is));
-                       String line = ""; 
-                       while((line = reader.readLine()) != null){
-                               userList.add(line);
-                               LOG.info("User added:" + line);
-                       }
-               }catch(Exception e){
-                       e.printStackTrace();
-               }finally{
-                       try {
-                               if(reader != null)
-                                       reader.close();
-                       } catch (IOException e) {
-                               // TODO Auto-generated catch block
-                               e.printStackTrace();
-                       }
-               }
-               return userList;
-       }
-       
-       @Override
-       public void nextTuple() {
-               LOG.info("Releasing nextTuple");
-               
-               String userListFileName = 
configContext.getString("dataSourceConfig.userList");
-
-               //loggerHDFSSpout.info("userListFileName: " + userListFileName);
-               List<String> userList = getUser(userListFileName);
-               //loggerHDFSSpout.info("user list size:" + userList.size());
-               for(String user: userList){
-                       LOG.info("Processing user: " + user);
-                       String copyToPath = 
configContext.getString("dataSourceConfig.copyToPath");
-                       //loggerHDFSSpout.info("copyToPath: " + copyToPath);
-                       
-                       copyToPath +="/" + user; 
-                       List<String> files = listFiles(copyToPath);
-                       LOG.info("Files returned: " + files.size());
-                       String typeOfFile = 
configContext.getString("dataSourceConfig.fileFormat");
-                       //loggerHDFSSpout.info("typeOfFile returned: " + 
typeOfFile);
-                       UserProfileData usersProfileDataset = new 
UserProfileData();
-                               
-                       for(String fileName:files){
-                               LOG.info("FileName: " + fileName);
-                               
usersProfileDataset.setDateTime(fileName.substring(fileName.lastIndexOf("/")+1, 
fileName.lastIndexOf(".")));
-                               BufferedReader br = null; 
-                               Reader decoder = null;
-                               InputStream inStream = null;
-                               
-                               try{
-                                       inStream = new FileInputStream(new 
File(fileName));
-                                       decoder = new 
InputStreamReader(inStream);
-                                       br = new BufferedReader(decoder);
-                                       int lineNo = 0; 
-                                       String line = "";
-                                       while((line = br.readLine())!= null){
-                                               boolean containsFileHeader = 
configContext.getBoolean("dataSourceConfig.containsFileHeader");
-                                               
//loggerHDFSSpout.info("containsFileHeader returned: " + containsFileHeader);
-                                               if(containsFileHeader == true 
&& lineNo == 0){
-                                                       // ignore the header 
column
-                                                       lineNo++;
-                                                       continue;
-                                               }
-                                       //loggerHDFSSpout.info("emitting line 
from file: " + fileName);
-                                       
-                                               
usersProfileDataset.setLine(line);
-                                               
usersProfileDataset.setHrInDay(lineNo);
-                                       lineNo++;
-                                       }
-                               }
-                               catch (Exception e) {
-                                       Log.error("File operation failed");
-                                       throw new IllegalStateException();
-                               }finally{
-                                       try {
-                                               if(br != null)
-                                                       br.close();
-                                               if(decoder != null)
-                                                       decoder.close();
-                                               if(inStream != null)
-                                                       inStream.close();
-                                       } catch (IOException e) {
-                                               // TODO Auto-generated catch 
block
-                                               e.printStackTrace();
-                                       }
-                               }
-                       }
-                       usersProfileDataset.setUser(user);
-                       _collector.emit(new ValuesArray(user, 
"HDFSSourcedStormExecutor", usersProfileDataset));
-               LOG.info("Emitting data of length: " + 
usersProfileDataset.getLines().size());
-                       Utils.sleep(1000);
-               }
-               this.close();
-       }
-       
-       @Override
-       public void open(Map arg0, TopologyContext context,
-                       SpoutOutputCollector collector) {
-                _collector = collector;
-                _context = context;
-       }
-
-       @Override
-       public void declareOutputFields(OutputFieldsDeclarer declarer) {
-               // TODO Auto-generated method stub
-               declarer.declare(new 
Fields(StreamingProcessConstants.EVENT_PARTITION_KEY, 
StreamingProcessConstants.EVENT_STREAM_NAME, 
StreamingProcessConstants.EVENT_ATTRIBUTE_MAP));
-       }
-       
-       
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/datastream/JavaMapperStormExecutor.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/datastream/JavaMapperStormExecutor.java
 
b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/datastream/JavaMapperStormExecutor.java
deleted file mode 100644
index 993a4a2..0000000
--- 
a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/datastream/JavaMapperStormExecutor.java
+++ /dev/null
@@ -1,83 +0,0 @@
-/*
- *
- *  * Licensed to the Apache Software Foundation (ASF) under one or more
- *  * contributor license agreements.  See the NOTICE file distributed with
- *  * this work for additional information regarding copyright ownership.
- *  * The ASF licenses this file to You under the Apache License, Version 2.0
- *  * (the "License"); you may not use this file except in compliance with
- *  * the License.  You may obtain a copy of the License at
- *  *
- *  *    http://www.apache.org/licenses/LICENSE-2.0
- *  *
- *  * Unless required by applicable law or agreed to in writing, software
- *  * distributed under the License is distributed on an "AS IS" BASIS,
- *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  * See the License for the specific language governing permissions and
- *  * limitations under the License.
- *
- */
-
-package org.apache.eagle.datastream;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-
-import backtype.storm.task.OutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.topology.base.BaseRichBolt;
-import backtype.storm.tuple.Fields;
-import backtype.storm.tuple.Tuple;
-import org.apache.eagle.datastream.utils.NameConstants;
-
-public class JavaMapperStormExecutor extends BaseRichBolt{
-    public static class e1 extends JavaMapperStormExecutor {
-        public e1(JavaMapper mapper){
-            super(1, mapper);
-        }
-    }
-    public static class e2 extends JavaMapperStormExecutor {
-        public e2(JavaMapper mapper){
-            super(2, mapper);
-        }
-    }
-    public static class e3 extends JavaMapperStormExecutor {
-        public e3(JavaMapper mapper){
-            super(3, mapper);
-        }
-    }
-    public static class e4 extends JavaMapperStormExecutor {
-        public e4(JavaMapper mapper){
-            super(4, mapper);
-        }
-    }
-
-    private JavaMapper mapper;
-    private OutputCollector collector;
-    private int numOutputFields;
-    public JavaMapperStormExecutor(int numOutputFields, JavaMapper mapper){
-        this.numOutputFields = numOutputFields;
-        this.mapper = mapper;
-    }
-
-    @Override
-    public void prepare(Map stormConf, TopologyContext context, 
OutputCollector collector) {
-        this.collector = collector;
-    }
-
-    @Override
-    public void execute(Tuple input) {
-        List<Object> ret = mapper.map(input.getValues());
-        this.collector.emit(ret);
-    }
-
-    @Override
-    public void declareOutputFields(OutputFieldsDeclarer declarer) {
-        List<String> fields = new ArrayList<String>();
-        for(int i=0; i<numOutputFields; i++){
-            fields.add(NameConstants.FIELD_PREFIX() + i);
-        }
-        declarer.declare(new Fields(fields));
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/datastream/JavaStormExecutorForAlertWrapper.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/datastream/JavaStormExecutorForAlertWrapper.java
 
b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/datastream/JavaStormExecutorForAlertWrapper.java
deleted file mode 100644
index a485d76..0000000
--- 
a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/datastream/JavaStormExecutorForAlertWrapper.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- *
- *  * Licensed to the Apache Software Foundation (ASF) under one or more
- *  * contributor license agreements.  See the NOTICE file distributed with
- *  * this work for additional information regarding copyright ownership.
- *  * The ASF licenses this file to You under the Apache License, Version 2.0
- *  * (the "License"); you may not use this file except in compliance with
- *  * the License.  You may obtain a copy of the License at
- *  *
- *  *    http://www.apache.org/licenses/LICENSE-2.0
- *  *
- *  * Unless required by applicable law or agreed to in writing, software
- *  * distributed under the License is distributed on an "AS IS" BASIS,
- *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  * See the License for the specific language governing permissions and
- *  * limitations under the License.
- *
- */
-
-package org.apache.eagle.datastream;
-
-import java.util.List;
-import java.util.SortedMap;
-
-import com.typesafe.config.Config;
-import scala.Tuple2;
-import scala.Tuple3;
-
-public class JavaStormExecutorForAlertWrapper extends 
JavaStormStreamExecutor3<String, String, SortedMap<Object, Object>>{
-    private JavaStormStreamExecutor<Tuple2<String, SortedMap<Object, Object>>> 
delegate;
-    private String streamName;
-    public 
JavaStormExecutorForAlertWrapper(JavaStormStreamExecutor<Tuple2<String, 
SortedMap<Object, Object>>> delegate, String streamName){
-        this.delegate = delegate;
-        this.streamName = streamName;
-    }
-    @Override
-    public void prepareConfig(Config config) {
-        delegate.prepareConfig(config);
-    }
-
-    @Override
-    public void init() {
-        delegate.init();
-    }
-
-    @Override
-    public void flatMap(List<Object> input, final Collector<Tuple3<String, 
String, SortedMap<Object, Object>>> collector) {
-        Collector delegateCollector = new Collector(){
-            @Override
-            public void collect(Object o) {
-                Tuple2 tuple2 = (Tuple2)o;
-                collector.collect(new Tuple3(tuple2._1, streamName, 
tuple2._2));
-            }
-        };
-        delegate.flatMap(input, delegateCollector);
-    }
-    
-    public JavaStormStreamExecutor<Tuple2<String, SortedMap<Object, Object>>> 
getDelegate() {
-       return delegate;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/resources/log4j.properties
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/resources/log4j.properties
 
b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/resources/log4j.properties
index 80e0aba..d59ded6 100644
--- 
a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/resources/log4j.properties
+++ 
b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/resources/log4j.properties
@@ -13,28 +13,9 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-log4j.rootLogger=info, stdout, DRFA
+log4j.rootLogger=INFO, stdout
 
-eagle.log.dir=./logs
-eagle.log.file=eagle.log
-
-
-#log4j.logger.org.apache.eagle.security.auditlog.IPZoneDataJoinBolt=DEBUG
-#log4j.logger.org.apache.eagle.security.auditlog.FileSensitivityDataJoinBolt=DEBUG
-log4j.logger.org.apache.eagle.security.auditlog.HdfsUserCommandReassembler=DEBUG
-#log4j.logger.org.apache.eagle.executor.AlertExecutor=DEBUG
 # standard output
 log4j.appender.stdout=org.apache.log4j.ConsoleAppender
 log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
-log4j.appender.stdout.layout.ConversionPattern=%d{ISO8601} %p [%t] %c{2}[%L]: 
%m%n
-
-# Daily Rolling File Appender
-log4j.appender.DRFA=org.apache.log4j.DailyRollingFileAppender
-log4j.appender.DRFA.File=${eagle.log.dir}/${eagle.log.file}
-log4j.appender.DRFA.DatePattern=yyyy-MM-dd
-## 30-day backup
-# log4j.appender.DRFA.MaxBackupIndex=30
-log4j.appender.DRFA.layout=org.apache.log4j.PatternLayout
-
-# Pattern format: Date LogLevel LoggerName LogMessage
-log4j.appender.DRFA.layout.ConversionPattern=%d{ISO8601} %p [%t] %c{2}[%L]: 
%m%n
\ No newline at end of file
+log4j.appender.stdout.layout.ConversionPattern=%d{ISO8601} %p [%t] %c{2}[%L]: 
%m%n
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/ExecutionEnvironments.scala
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/ExecutionEnvironments.scala
 
b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/ExecutionEnvironments.scala
index 90e59cf..c3ddb00 100644
--- 
a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/ExecutionEnvironments.scala
+++ 
b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/ExecutionEnvironments.scala
@@ -36,37 +36,6 @@ import scala.reflect.runtime.universe._
 object ExecutionEnvironments{
   type storm = StormExecutionEnvironment
 
-  /**
-   * Use `'''get[StormExecutionEnvironment](config)'''` instead
-   *
-   * @param config
-   * @return
-   */
-  @deprecated("Execution environment should not know implementation of Storm")
-  def getStorm(config : Config) = new StormExecutionEnvironment(config)
-
-  /**
-   * Use `'''get[StormExecutionEnvironment]'''` instead
-   *
-   * @return
-   */
-  @deprecated("Execution environment should not know implementation of Storm")
-  def getStorm:StormExecutionEnvironment = {
-    val config = ConfigFactory.load()
-    getStorm(config)
-  }
-
-  /**
-   * Use `'''get[StormExecutionEnvironment](args)'''` instead
-   *
-   * @see get[StormExecutionEnvironment](args)
-    * @param args
-   * @return
-   */
-  @deprecated("Execution environment should not know implementation of Storm")
-  def getStorm(args:Array[String]):StormExecutionEnvironment = {
-    getStorm(new ConfigOptionParser().load(args))
-  }
 
   /**
    * @param typeTag

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/AbstractTopologyCompiler.scala
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/AbstractTopologyCompiler.scala
 
b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/AbstractTopologyCompiler.scala
deleted file mode 100644
index 46f4738..0000000
--- 
a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/AbstractTopologyCompiler.scala
+++ /dev/null
@@ -1,21 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.datastream.core
-
-trait AbstractTopologyCompiler{
-  def buildTopology : AbstractTopologyExecutor
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/AbstractTopologyExecutor.scala
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/AbstractTopologyExecutor.scala
 
b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/AbstractTopologyExecutor.scala
deleted file mode 100644
index 1e1664a..0000000
--- 
a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/AbstractTopologyExecutor.scala
+++ /dev/null
@@ -1,21 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.datastream.core
-
-trait AbstractTopologyExecutor {
-  def execute
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/Configuration.scala
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/Configuration.scala
 
b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/Configuration.scala
deleted file mode 100644
index e3f3050..0000000
--- 
a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/Configuration.scala
+++ /dev/null
@@ -1,74 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.eagle.datastream.core
-
-import com.typesafe.config.{Config, _}
-
-import scala.reflect.runtime.universe._
-
-/**
- * @since  12/4/15
- */
-case class Configuration(private var config:Config) extends Serializable{
-  def get:Config = config
-
-  def set[T<:AnyRef](key:String,value:T): Unit = {
-    config = config.withValue(key,ConfigValueFactory.fromAnyRef(value))
-  }
-
-  /**
-   *
-   * @param key config key
-   * @param default default value
-   * @tparam T return type
-   * @return
-   */
-  def get[T](key:String,default:T=null)(implicit tag:TypeTag[T]):T = {
-    if(get.hasPath(key)) {
-      get(key)
-    } else default
-  }
-
-  def get[T](key:String)(implicit tag: TypeTag[T]):T = tag.tpe match {
-    case STRING_TYPE => config.getString(key).asInstanceOf[T]
-    case TypeTag.Double => get.getDouble(key).asInstanceOf[T]
-    case TypeTag.Long => get.getLong(key).asInstanceOf[T]
-    case TypeTag.Int => get.getInt(key).asInstanceOf[T]
-    case TypeTag.Byte => get.getBytes(key).asInstanceOf[T]
-    case TypeTag.Boolean => get.getBoolean(key).asInstanceOf[T]
-    case NUMBER_TYPE => get.getNumber(key).asInstanceOf[T]
-    case OBJECT_TYPE => get.getObject(key).asInstanceOf[T]
-    case VALUE_TYPE => get.getValue(key).asInstanceOf[T]
-    case ANY_REF_TYPE => get.getAnyRef(key).asInstanceOf[T]
-    case INT_LIST_TYPE => get.getIntList(key).asInstanceOf[T]
-    case DOUBLE_LIST_TYPE => get.getDoubleList(key).asInstanceOf[T]
-    case BOOL_LIST_TYPE => get.getBooleanList(key).asInstanceOf[T]
-    case LONG_LIST_TYPE => get.getLongList(key).asInstanceOf[T]
-    case _ => throw new UnsupportedOperationException(s"$tag is not supported 
yet")
-  }
-
-  val STRING_TYPE = typeOf[String]
-  val NUMBER_TYPE = typeOf[Number]
-  val INT_LIST_TYPE = typeOf[List[Int]]
-  val BOOL_LIST_TYPE = typeOf[List[Boolean]]
-  val DOUBLE_LIST_TYPE = typeOf[List[Double]]
-  val LONG_LIST_TYPE = typeOf[List[Double]]
-  val OBJECT_TYPE = typeOf[ConfigObject]
-  val VALUE_TYPE = typeOf[ConfigValue]
-  val ANY_REF_TYPE = typeOf[AnyRef]
-}
\ No newline at end of file


Reply via email to