http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-alert-parent/eagle-alert-service/src/main/java/org/apache/eagle/service/alert/SiteApplicationResource.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-alert-parent/eagle-alert-service/src/main/java/org/apache/eagle/service/alert/SiteApplicationResource.java
 
b/eagle-core/eagle-alert-parent/eagle-alert-service/src/main/java/org/apache/eagle/service/alert/SiteApplicationResource.java
deleted file mode 100644
index e399189..0000000
--- 
a/eagle-core/eagle-alert-parent/eagle-alert-service/src/main/java/org/apache/eagle/service/alert/SiteApplicationResource.java
+++ /dev/null
@@ -1,189 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- *
- */
-
-package org.apache.eagle.service.alert;
-
-import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity;
-import org.apache.eagle.log.entity.GenericServiceAPIResponseEntity;
-import org.apache.eagle.policy.common.Constants;
-import org.apache.eagle.alert.entity.SiteDescServiceEntity;
-import org.apache.eagle.service.generic.GenericEntityServiceResource;
-import org.apache.eagle.alert.entity.ApplicationDescServiceEntity;
-import org.apache.eagle.alert.entity.SiteApplicationServiceEntity;
-import org.codehaus.jackson.map.ObjectMapper;
-import org.codehaus.jackson.map.type.TypeFactory;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import javax.ws.rs.*;
-import javax.ws.rs.core.MediaType;
-import java.io.IOException;
-import java.io.InputStream;
-import java.util.*;
-
-@Path(SiteApplicationResource.ROOT_PATH)
-public class SiteApplicationResource {
-    private final static Logger LOG = 
LoggerFactory.getLogger(SiteApplicationResource.class);
-    private final static GenericEntityServiceResource resource = new 
GenericEntityServiceResource();
-    public final static String ROOT_PATH = "/module";
-
-    @Path("site")
-    @DELETE
-    @Consumes(MediaType.APPLICATION_JSON)
-    @Produces(MediaType.APPLICATION_JSON)
-    public GenericServiceAPIResponseEntity deleteSite(@QueryParam("site") 
String site) {
-        String siteQuery = Constants.SITE_DESCRIPTION_SERVICE_ENDPOINT_NAME+ 
"[@site=\"" + site + "\"]{*}";
-        String siteApplicationQuery = 
Constants.SITE_APPLICATION_SERVICE_ENDPOINT_NAME + "[@site=\"" + site + 
"\"]{*}";
-        int pageSize = Integer.MAX_VALUE;
-
-        GenericServiceAPIResponseEntity response = 
resource.deleteByQuery(siteQuery, null, null, pageSize, null, false, false, 0L, 
0, true, 0, null, false);
-        if(response.isSuccess()) {
-            response = resource.deleteByQuery(siteApplicationQuery, null, 
null, pageSize, null, false, false, 0L, 0, true, 0, null, false);
-            if(!response.isSuccess()) {
-                LOG.error(response.getException());
-            }
-        } else {
-            LOG.error(response.getException());
-        }
-        return response;
-    }
-
-    @Path("application")
-    @DELETE
-    @Consumes(MediaType.APPLICATION_JSON)
-    @Produces(MediaType.APPLICATION_JSON)
-    public GenericServiceAPIResponseEntity 
deleteApplication(@QueryParam("application") String application) {
-        String applicationQuery = 
Constants.APPLICATION_DESCRIPTION_SERVICE_ENDPOINT_NAME+ "[@application=\"" + 
application + "\"]{*}";
-        String siteApplicationQuery = 
Constants.SITE_APPLICATION_SERVICE_ENDPOINT_NAME + "[@application=\"" + 
application + "\"]{*}";
-        int pageSize = Integer.MAX_VALUE;
-
-        GenericServiceAPIResponseEntity response = 
resource.deleteByQuery(applicationQuery, null, null, pageSize, null, false, 
false, 0L, 0, true, 0, null, false);
-        if(response.isSuccess()) {
-            response = resource.deleteByQuery(siteApplicationQuery, null, 
null, pageSize, null, false, false, 0L, 0, true, 0, null, false);
-            if(!response.isSuccess()) {
-                LOG.error(response.getException());
-            }
-        } else {
-            LOG.error(response.getException());
-        }
-        return response;
-    }
-
-    @Path("feature")
-    @DELETE
-    @Consumes(MediaType.APPLICATION_JSON)
-    @Produces(MediaType.APPLICATION_JSON)
-    public GenericServiceAPIResponseEntity 
deleteFeature(@QueryParam("feature") String feature) {
-        String featureQuery = 
Constants.FEATURE_DESCRIPTION_SERVICE_ENDPOINT_NAME+ "[@feature=\"" + feature + 
"\"]{*}";
-        String applicationQuery = 
Constants.APPLICATION_DESCRIPTION_SERVICE_ENDPOINT_NAME + "[]{*}";
-        int pageSize = Integer.MAX_VALUE;
-
-        GenericServiceAPIResponseEntity response = 
resource.deleteByQuery(featureQuery, null, null, pageSize, null, false, false, 
0L, 0, true, 0, null, false);
-        if(response.isSuccess()) {
-            response = resource.search(applicationQuery, null, null, pageSize, 
null, false, false, 0L, 0, true, 0, null, false);
-            if(response.isSuccess()) {
-                List<ApplicationDescServiceEntity> entityList = 
response.getObj();
-                Boolean isModified = false;
-                for(ApplicationDescServiceEntity entity : entityList) {
-                    if(entity.getFeatures().contains(feature)) {
-                        List<String> features = entity.getFeatures();
-                        features.remove(feature);
-                        entity.setFeatures(features);
-                        isModified = true;
-                    }
-                }
-                if(isModified) {
-                    response = resource.updateEntities(entityList, 
Constants.APPLICATION_DESCRIPTION_SERVICE_ENDPOINT_NAME);
-                }
-            }
-        }
-        if(!response.isSuccess()) {
-            LOG.error(response.getException());
-        }
-        return response;
-    }
-
-    @Path("siteApplication")
-    @POST
-    @Consumes(MediaType.APPLICATION_JSON)
-    @Produces(MediaType.APPLICATION_JSON)
-    public GenericServiceAPIResponseEntity createSiteApplications(InputStream 
inputStream) {
-        GenericServiceAPIResponseEntity response = new 
GenericServiceAPIResponseEntity<>();
-        int pageSize = Integer.MAX_VALUE;
-        try {
-            List<SiteApplicationObject> entities = 
(List<SiteApplicationObject>) unmarshalSiteApplicationEntities(inputStream);
-            if(entities == null) {
-                throw new IllegalArgumentException("cannot convert to 
SiteApplicationObject");
-            }
-            List<SiteDescServiceEntity> siteEntities = new LinkedList<>();
-            List<SiteApplicationServiceEntity> applicationEntities = new 
LinkedList<>();
-            Set<String> sites = new HashSet<>();
-            for(SiteApplicationObject e : entities) {
-                sites.add(e.getTags().get("site"));
-                SiteDescServiceEntity entity = new SiteDescServiceEntity();
-                entity.setEnabled(e.getEnabled());
-                entity.setTags(e.getTags());
-                siteEntities.add(entity);
-                applicationEntities.addAll(e.getApplications());
-            }
-            response = resource.updateEntities(siteEntities, 
Constants.SITE_DESCRIPTION_SERVICE_ENDPOINT_NAME);
-            if(response.isSuccess()) {
-                String query = 
buildQueryWithAttributeList(Constants.SITE_APPLICATION_SERVICE_ENDPOINT_NAME, 
"site", sites);
-                LOG.info("query=" + query);
-                response = resource.search(query, null, null, pageSize, null, 
false, false, 0L, 0, true, 0, null, false);
-                if(response.isSuccess()) {
-                    List<SiteApplicationServiceEntity> applications = 
response.getObj();
-                    for(SiteApplicationServiceEntity app : applications) {
-                        app.setEnabled(false);
-                    }
-                    response = resource.updateEntities(applications, 
Constants.SITE_APPLICATION_SERVICE_ENDPOINT_NAME);
-                    if(response.isSuccess()) {
-                        response = 
resource.updateEntities(applicationEntities, 
Constants.SITE_APPLICATION_SERVICE_ENDPOINT_NAME);
-                    }
-                }
-            }
-            if(!response.isSuccess()) {
-                LOG.error(response.getException());
-            }
-        } catch (Exception ex) {
-            LOG.error(ex.getMessage(), ex);
-            response.setException(ex);
-        }
-        return response;
-    }
-
-    private String buildQueryWithAttributeList(String serviceName, String 
attr, Set<String> sets) {
-        StringBuilder builder = new StringBuilder(serviceName + "[");
-        String attribute = "@" + attr + "=";
-        String condition = " OR ";
-        for(String s : sets) {
-            String value = String.format("\"%s\"", s);
-            builder.append(attribute + value);
-            builder.append(condition);
-        }
-        String result = builder.substring(0, 
builder.length()-condition.length());
-        result = result + "]{*}";
-        return result;
-    }
-
-    private List<? extends TaggedLogAPIEntity> 
unmarshalSiteApplicationEntities(InputStream inputStream) throws 
IllegalAccessException, InstantiationException, IOException {
-        ObjectMapper objectMapper = new ObjectMapper();
-        return objectMapper.readValue(inputStream, 
TypeFactory.defaultInstance().constructCollectionType(LinkedList.class, 
SiteApplicationObject.class));
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-alert-parent/eagle-alert-service/src/main/java/org/apache/eagle/service/alert/resolver/AttributeResolveResponse.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-alert-parent/eagle-alert-service/src/main/java/org/apache/eagle/service/alert/resolver/AttributeResolveResponse.java
 
b/eagle-core/eagle-alert-parent/eagle-alert-service/src/main/java/org/apache/eagle/service/alert/resolver/AttributeResolveResponse.java
deleted file mode 100644
index 0ec9cf4..0000000
--- 
a/eagle-core/eagle-alert-parent/eagle-alert-service/src/main/java/org/apache/eagle/service/alert/resolver/AttributeResolveResponse.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.service.alert.resolver;
-
-import java.util.List;
-
-public class AttributeResolveResponse<V> {
-    private String exception;
-    private List<V> values;
-
-    public List<V> getValues() {
-        return values;
-    }
-
-    public void setValues(List<V> values) {
-        this.values = values;
-    }
-
-    public String getException() {
-        return exception;
-    }
-
-    public void setException(String exception) {
-        this.exception = exception;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-alert-parent/eagle-alert-service/src/main/resources/META-INF/services/org.apache.eagle.service.alert.AlertPolicyValidateProvider
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-alert-parent/eagle-alert-service/src/main/resources/META-INF/services/org.apache.eagle.service.alert.AlertPolicyValidateProvider
 
b/eagle-core/eagle-alert-parent/eagle-alert-service/src/main/resources/META-INF/services/org.apache.eagle.service.alert.AlertPolicyValidateProvider
deleted file mode 100644
index 3833f47..0000000
--- 
a/eagle-core/eagle-alert-parent/eagle-alert-service/src/main/resources/META-INF/services/org.apache.eagle.service.alert.AlertPolicyValidateProvider
+++ /dev/null
@@ -1,16 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-org.apache.eagle.service.alert.SiddhiAlertPolicyValidateProvider
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-alert-parent/eagle-alert-service/src/test/resources/META-INF/services/org.apache.eagle.service.alert.AlertPolicyValidateProvider
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-alert-parent/eagle-alert-service/src/test/resources/META-INF/services/org.apache.eagle.service.alert.AlertPolicyValidateProvider
 
b/eagle-core/eagle-alert-parent/eagle-alert-service/src/test/resources/META-INF/services/org.apache.eagle.service.alert.AlertPolicyValidateProvider
deleted file mode 100644
index 3833f47..0000000
--- 
a/eagle-core/eagle-alert-parent/eagle-alert-service/src/test/resources/META-INF/services/org.apache.eagle.service.alert.AlertPolicyValidateProvider
+++ /dev/null
@@ -1,16 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-org.apache.eagle.service.alert.SiddhiAlertPolicyValidateProvider
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-alert-parent/pom.xml
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/pom.xml 
b/eagle-core/eagle-alert-parent/pom.xml
index bc35fc9..9dbacf0 100644
--- a/eagle-core/eagle-alert-parent/pom.xml
+++ b/eagle-core/eagle-alert-parent/pom.xml
@@ -31,9 +31,6 @@
        <name>eagle-alert-parent</name>
        <modules>
                <module>eagle-alert</module>
-               <module>eagle-alert-base</module>
-               <module>eagle-alert-process</module>
                <module>eagle-alert-service</module>
-               <module>eagle-alert-notification-plugin</module>
        </modules>
 </project>

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-app/eagle-stream-application-manager/pom.xml
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-stream-application-manager/pom.xml 
b/eagle-core/eagle-app/eagle-stream-application-manager/pom.xml
deleted file mode 100644
index 919188f..0000000
--- a/eagle-core/eagle-app/eagle-stream-application-manager/pom.xml
+++ /dev/null
@@ -1,142 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
-  ~ Licensed to the Apache Software Foundation (ASF) under one or more
-  ~ contributor license agreements.  See the NOTICE file distributed with
-  ~ this work for additional information regarding copyright ownership.
-  ~ The ASF licenses this file to You under the Apache License, Version 2.0
-  ~ (the "License"); you may not use this file except in compliance with
-  ~ the License.  You may obtain a copy of the License at
-  ~
-  ~     http://www.apache.org/licenses/LICENSE-2.0
-  ~
-  ~  Unless required by applicable law or agreed to in writing, software
-  ~  distributed under the License is distributed on an "AS IS" BASIS,
-  ~  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  ~  See the License for the specific language governing permissions and
-  ~  limitations under the License.
-  ~
-  -->
-
-<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
-    <parent>
-        <artifactId>eagle-app-parent</artifactId>
-        <groupId>org.apache.eagle</groupId>
-        <version>0.5.0-incubating-SNAPSHOT</version>
-    </parent>
-
-    <modelVersion>4.0.0</modelVersion>
-    <artifactId>eagle-stream-application-manager</artifactId>
-
-    <dependencies>
-        <dependency>
-            <groupId>org.apache.eagle</groupId>
-            <artifactId>eagle-application-service</artifactId>
-            <version>${project.version}</version>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.eagle</groupId>
-            <artifactId>eagle-stream-pipeline</artifactId>
-            <version>${project.version}</version>
-        </dependency>
-
-        <dependency>
-            <groupId>org.apache.storm</groupId>
-            <artifactId>storm-core</artifactId>
-            <version>${storm.version}</version>
-            <exclusions>
-                <exclusion>
-                    <groupId>ch.qos.logback</groupId>
-                    <artifactId>logback-classic</artifactId>
-                </exclusion>
-                <exclusion>
-                    <groupId>log4j</groupId>
-                    <artifactId>log4j</artifactId>
-                </exclusion>
-                <exclusion>
-                    <groupId>org.slf4j</groupId>
-                    <artifactId>log4j-over-slf4j</artifactId>
-                </exclusion>
-            </exclusions>
-        </dependency>
-        <dependency>
-            <groupId>org.scala-lang</groupId>
-            <artifactId>scala-reflect</artifactId>
-        </dependency>
-        <dependency>
-            <groupId>org.scala-lang</groupId>
-            <artifactId>scala-compiler</artifactId>
-            <version>${scala.version}.0</version>
-        </dependency>
-        <dependency>
-            <groupId>org.scalatest</groupId>
-            <artifactId>scalatest_${scala.version}</artifactId>
-            <scope>test</scope>
-        </dependency>
-        <dependency>
-            <groupId>com.typesafe.akka</groupId>
-            <artifactId>akka-actor_${scala.version}</artifactId>
-            <version>${akka.actor.version}</version>
-        </dependency>
-        <dependency>
-            <groupId>com.typesafe.akka</groupId>
-            <artifactId>akka-testkit_${scala.version}</artifactId>
-            <version>${akka.actor.version}</version>
-            <scope>test</scope>
-        </dependency>
-        <dependency>
-            <groupId>org.slf4j</groupId>
-            <artifactId>slf4j-api</artifactId>
-            <scope>compile</scope>
-        </dependency>
-    </dependencies>
-
-    <build>
-        <plugins>
-            <plugin>
-                <groupId>org.scala-tools</groupId>
-                <artifactId>maven-scala-plugin</artifactId>
-                <executions>
-                    <execution>
-                        <id>scala-compile-first</id>
-                        <phase>process-resources</phase>
-                        <goals>
-                            <goal>add-source</goal>
-                            <goal>compile</goal>
-                        </goals>
-                    </execution>
-                    <execution>
-                        <id>scala-test-compile</id>
-                        <phase>process-test-resources</phase>
-                        <goals>
-                            <goal>testCompile</goal>
-                        </goals>
-                    </execution>
-                </executions>
-            </plugin>
-            <plugin>
-                <groupId>org.apache.maven.plugins</groupId>
-                <artifactId>maven-surefire-plugin</artifactId>
-                <configuration>
-                    <skipTests>true</skipTests>
-                </configuration>
-            </plugin>
-            <plugin>
-                <groupId>org.scalatest</groupId>
-                <artifactId>scalatest-maven-plugin</artifactId>
-                <configuration>
-                    
<reportsDirectory>${project.build.directory}/surefire-reports</reportsDirectory>
-                    <junitxml>.</junitxml>
-                    <filereports>TestSuite.txt</filereports>
-                </configuration>
-                <executions>
-                    <execution>
-                        <id>test</id>
-                        <goals>
-                            <goal>test</goal>
-                        </goals>
-                    </execution>
-                </executions>
-            </plugin>
-        </plugins>
-    </build>
-</project>

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-app/eagle-stream-application-manager/src/main/java/org/apache/eagle/stream/application/TopologyException.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-app/eagle-stream-application-manager/src/main/java/org/apache/eagle/stream/application/TopologyException.java
 
b/eagle-core/eagle-app/eagle-stream-application-manager/src/main/java/org/apache/eagle/stream/application/TopologyException.java
deleted file mode 100644
index d382629..0000000
--- 
a/eagle-core/eagle-app/eagle-stream-application-manager/src/main/java/org/apache/eagle/stream/application/TopologyException.java
+++ /dev/null
@@ -1,26 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- *
- */
-
-package org.apache.eagle.stream.application;
-
-
-public class TopologyException extends Exception {
-    public TopologyException(String s, Exception e) { super(s,e); }
-    public TopologyException(Exception e) { super(e); }
-    public TopologyException(String s) { super(s); }
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-app/eagle-stream-application-manager/src/main/java/org/apache/eagle/stream/application/TopologyExecutable.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-app/eagle-stream-application-manager/src/main/java/org/apache/eagle/stream/application/TopologyExecutable.java
 
b/eagle-core/eagle-app/eagle-stream-application-manager/src/main/java/org/apache/eagle/stream/application/TopologyExecutable.java
deleted file mode 100644
index 8f625c7..0000000
--- 
a/eagle-core/eagle-app/eagle-stream-application-manager/src/main/java/org/apache/eagle/stream/application/TopologyExecutable.java
+++ /dev/null
@@ -1,27 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- *
- */
-
-package org.apache.eagle.stream.application;
-
-
-import com.typesafe.config.Config;
-
-
-public interface TopologyExecutable {
-    void submit(String topology, Config config);
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-app/eagle-stream-application-manager/src/main/java/org/apache/eagle/stream/application/TopologyFactory.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-app/eagle-stream-application-manager/src/main/java/org/apache/eagle/stream/application/TopologyFactory.java
 
b/eagle-core/eagle-app/eagle-stream-application-manager/src/main/java/org/apache/eagle/stream/application/TopologyFactory.java
deleted file mode 100644
index e32f48e..0000000
--- 
a/eagle-core/eagle-app/eagle-stream-application-manager/src/main/java/org/apache/eagle/stream/application/TopologyFactory.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- *
- */
-
-package org.apache.eagle.stream.application;
-
-
-import com.typesafe.config.Config;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
-
-
-public final class TopologyFactory {
-    public static Logger LOG = LoggerFactory.getLogger(TopologyFactory.class);
-    private final static Map<String, TopologyExecutable> topologyCache = 
Collections.synchronizedMap(new HashMap<String, TopologyExecutable>());
-    public static TopologyExecutable getTopologyInstance(String topologyClass) 
throws TopologyException {
-        TopologyExecutable instance;
-        if(topologyCache.containsKey(topologyClass)){
-            instance = topologyCache.get(topologyClass);
-        } else {
-            try {
-                LOG.info("load class " + topologyClass + "with classLoader " + 
TopologyFactory.class.getClassLoader().toString());
-                instance = (TopologyExecutable) 
Class.forName(topologyClass).newInstance();
-                topologyCache.put(topologyClass, instance);
-            } catch (ClassNotFoundException e) {
-                throw new TopologyException("Topology in type of " + 
topologyClass + " is not found",e);
-            } catch (InstantiationException | IllegalAccessException e) {
-                throw new TopologyException(e);
-            }
-        }
-        return instance;
-    }
-
-    public static void submit(String topologyClass, Config config) throws 
TopologyException {
-        TopologyExecutable topology = getTopologyInstance(topologyClass);
-        topology.submit(topologyClass, config);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-app/eagle-stream-application-manager/src/main/scala/org/apache/eagle/stream/application/AbstractDynamicApplication.scala
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-app/eagle-stream-application-manager/src/main/scala/org/apache/eagle/stream/application/AbstractDynamicApplication.scala
 
b/eagle-core/eagle-app/eagle-stream-application-manager/src/main/scala/org/apache/eagle/stream/application/AbstractDynamicApplication.scala
index 3e918cc..b5fbf59 100644
--- 
a/eagle-core/eagle-app/eagle-stream-application-manager/src/main/scala/org/apache/eagle/stream/application/AbstractDynamicApplication.scala
+++ 
b/eagle-core/eagle-app/eagle-stream-application-manager/src/main/scala/org/apache/eagle/stream/application/AbstractDynamicApplication.scala
@@ -20,13 +20,11 @@ package org.apache.eagle.stream.application
 
 import com.typesafe.config.Config
 import org.apache.eagle.datastream.core.StreamContext
-import org.apache.eagle.stream.pipeline.Pipeline
 
 
 trait AbstractDynamicApplication extends TopologyExecutable {
   def compileStream(application: String, config: Config): StreamContext = {
-    val pipeline = Pipeline.parseStringWithConfig(application, config)
-    Pipeline.compile(pipeline)
+    null
   }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-app/eagle-stream-application-manager/src/main/scala/org/apache/eagle/stream/application/ApplicationManager.scala
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-app/eagle-stream-application-manager/src/main/scala/org/apache/eagle/stream/application/ApplicationManager.scala
 
b/eagle-core/eagle-app/eagle-stream-application-manager/src/main/scala/org/apache/eagle/stream/application/ApplicationManager.scala
deleted file mode 100644
index bbfaedd..0000000
--- 
a/eagle-core/eagle-app/eagle-stream-application-manager/src/main/scala/org/apache/eagle/stream/application/ApplicationManager.scala
+++ /dev/null
@@ -1,126 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- *
- */
-
-package org.apache.eagle.stream.application
-
-import java.util
-
-import com.google.common.base.Preconditions
-import org.apache.eagle.service.application.entity.TopologyExecutionStatus
-import org.apache.eagle.stream.application.impl.StormExecutionPlatform
-import org.slf4j.{Logger, LoggerFactory}
-
-import scala.collection.JavaConversions
-
-
-object ApplicationManager {
-  private val LOG: Logger = 
LoggerFactory.getLogger(ApplicationManager.getClass)
-  private val workerMap: util.Map[AnyRef, TaskExecutor] = new 
util.TreeMap[AnyRef, TaskExecutor]
-
-  def getWorkerMap: util.Map[AnyRef, TaskExecutor] = {
-    return workerMap
-  }
-
-  def submit(id: AnyRef, runnable: Runnable): TaskExecutor = {
-    if (workerMap.containsKey(id)) {
-      val executor: Thread = workerMap.get(id)
-      if (!executor.isAlive || executor.getState.equals() ) {
-        LOG.info("Replacing dead executor: {}", executor)
-        workerMap.remove(id)
-      }
-      else {
-        throw new IllegalArgumentException("Duplicated id '" + id + "'")
-      }
-    }
-    val worker: TaskExecutor = new TaskExecutor(runnable)
-    LOG.info("Registering new executor %s: %s".format(id, worker))
-    workerMap.put(id, worker)
-    worker.setName(id.toString)
-    worker.setDaemon(true)
-    worker.start
-    return worker
-  }
-
-  def get(id: AnyRef): TaskExecutor = {
-    Preconditions.checkArgument(workerMap.containsKey(id))
-    return workerMap.get(id)
-  }
-
-  @throws(classOf[Exception])
-  def stop(id: AnyRef): TaskExecutor = {
-    val worker: TaskExecutor = get(id)
-    worker.interrupt
-    //this.workerMap.remove(id)
-    return worker
-  }
-
-  def getWorkerStatus(state: Thread.State): String = {
-    if (whereIn(state, java.lang.Thread.State.RUNNABLE, 
java.lang.Thread.State.TIMED_WAITING, java.lang.Thread.State.WAITING)) {
-      return TopologyExecutionStatus.STARTED
-    }
-    else if (whereIn(state, java.lang.Thread.State.NEW)) {
-      return TopologyExecutionStatus.STARTING
-    }
-    else if (whereIn(state, java.lang.Thread.State.TERMINATED)) {
-      return TopologyExecutionStatus.STOPPED
-    }
-    throw new IllegalStateException("Unknown state: " + state)
-  }
-
-  def getTopologyStatus(status: String): String = {
-    if(whereIn(status, StormExecutionPlatform.KILLED))
-      return TopologyExecutionStatus.STOPPING
-    return TopologyExecutionStatus.STARTED
-  }
-
-  private def whereIn(status: String, inStatuses: String*): Boolean = {
-    for (_status <- inStatuses) {
-      if (_status.equalsIgnoreCase(status)) {
-        return true
-      }
-    }
-    return false
-  }
-  private def whereIn(state: Thread.State, inStates: Thread.State*): Boolean = 
{
-    for (_state <- inStates) {
-      if (_state eq state) {
-        return true
-      }
-    }
-    return false
-  }
-
-  def remove(id: AnyRef) {
-    val executor: TaskExecutor = this.get(id)
-    if (executor.isAlive) {
-      throw new RuntimeException("Failed to remove alive executor '" + id + 
"'")
-    }
-    else {
-      this.workerMap.remove(id)
-    }
-  }
-
-  def stopAll(): Unit ={
-    JavaConversions.collectionAsScalaIterable(workerMap.values()) foreach { 
worker =>
-      if(!worker.isInterrupted) {
-        worker.interrupt()
-      }
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-app/eagle-stream-application-manager/src/main/scala/org/apache/eagle/stream/application/ApplicationManagerUtils.scala
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-app/eagle-stream-application-manager/src/main/scala/org/apache/eagle/stream/application/ApplicationManagerUtils.scala
 
b/eagle-core/eagle-app/eagle-stream-application-manager/src/main/scala/org/apache/eagle/stream/application/ApplicationManagerUtils.scala
deleted file mode 100644
index 4c2df77..0000000
--- 
a/eagle-core/eagle-app/eagle-stream-application-manager/src/main/scala/org/apache/eagle/stream/application/ApplicationManagerUtils.scala
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- *
- */
-
-package org.apache.eagle.stream.application
-
-import com.typesafe.config.Config
-import org.apache.eagle.service.application.AppManagerConstants
-import org.apache.eagle.service.application.entity.TopologyExecutionEntity
-
-
-object ApplicationManagerUtils {
-
-  def generateTopologyFullName(topologyExecution: TopologyExecutionEntity) = {
-    val fullName = "eagle-%s-%s-%s".format(topologyExecution.getSite, 
topologyExecution.getApplication, topologyExecution.getTopology)
-    fullName
-  }
-
-  def buildStormTopologyURL(config: Config, topologyID: String): String = {
-    val clusterURL = if(config.hasPath(AppManagerConstants.CLUSTER_URL)) 
config.getString(AppManagerConstants.CLUSTER_URL) else 
AppManagerConstants.DEFAULT_CLUSTER_URL
-    val topologyURL = clusterURL + "/topology.html?id=" + topologyID
-    topologyURL
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-app/eagle-stream-application-manager/src/main/scala/org/apache/eagle/stream/application/ApplicationSchedulerAsyncDAO.scala
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-app/eagle-stream-application-manager/src/main/scala/org/apache/eagle/stream/application/ApplicationSchedulerAsyncDAO.scala
 
b/eagle-core/eagle-app/eagle-stream-application-manager/src/main/scala/org/apache/eagle/stream/application/ApplicationSchedulerAsyncDAO.scala
deleted file mode 100644
index ae0f6e8..0000000
--- 
a/eagle-core/eagle-app/eagle-stream-application-manager/src/main/scala/org/apache/eagle/stream/application/ApplicationSchedulerAsyncDAO.scala
+++ /dev/null
@@ -1,179 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- *
- */
-
-package org.apache.eagle.stream.application
-
-import java.util
-import java.util.concurrent.Callable
-
-import akka.dispatch.Futures
-import com.typesafe.config.Config
-import org.apache.eagle.alert.entity.SiteApplicationServiceEntity
-import org.apache.eagle.log.entity.GenericServiceAPIResponseEntity
-import org.apache.eagle.policy.common.Constants
-import org.apache.eagle.service.application.entity.{TopologyDescriptionEntity, 
TopologyExecutionEntity, TopologyExecutionStatus, TopologyOperationEntity}
-import org.apache.eagle.service.client.EagleServiceConnector
-import org.apache.eagle.service.client.impl.EagleServiceClientImpl
-import org.slf4j.{Logger, LoggerFactory}
-
-import scala.collection.JavaConversions
-import scala.concurrent.ExecutionContext
-
-
-class ApplicationSchedulerAsyncDAO(config: Config, ex: ExecutionContext) {
-  private val LOG: Logger = 
LoggerFactory.getLogger(classOf[ApplicationSchedulerAsyncDAO])
-  private val connector: EagleServiceConnector = new 
EagleServiceConnector(config)
-
-  def getEagleServiceClient(): EagleServiceClientImpl = {
-    return new EagleServiceClientImpl(connector)
-  }
-
-  def readOperationsByStatus(status: String) = {
-    Futures.future(new Callable[util.List[TopologyOperationEntity]]{
-      override def call(): util.List[TopologyOperationEntity] = {
-        val client = getEagleServiceClient()
-        val query = 
"%s[@status=\"%s\"]{*}".format(Constants.TOPOLOGY_OPERATION_SERVICE_ENDPOINT_NAME,
 status)
-        val response: GenericServiceAPIResponseEntity[TopologyOperationEntity] 
= client.search(query).pageSize(Int.MaxValue).send()
-        if(client != null) client.close()
-        if(!response.isSuccess || response.getObj == null)
-          throw new Exception(s"Fail to load operations with status $status")
-        response.getObj
-      }
-    }, ex)
-  }
-
-  def loadAllTopologyExecutionEntities() = {
-    Futures.future(new Callable[util.List[TopologyExecutionEntity]]{
-      override def call(): util.List[TopologyExecutionEntity] = {
-        val client = getEagleServiceClient()
-        val query = "%s[@status != 
\"%s\"]{*}".format(Constants.TOPOLOGY_EXECUTION_SERVICE_ENDPOINT_NAME, 
TopologyExecutionStatus.NEW)
-        val response: GenericServiceAPIResponseEntity[TopologyExecutionEntity] 
= client.search(query).pageSize(Int.MaxValue).send()
-        if(client != null) client.close()
-        if(!response.isSuccess || response.getObj == null) throw new 
Exception(response.getException)
-        response.getObj
-      }
-    }, ex)
-  }
-
-  def loadTopologyExecutionByName(site: String, appName: String, topologyName: 
String) = {
-    Futures.future(new Callable[TopologyExecutionEntity]{
-      override def call(): TopologyExecutionEntity = {
-        val client = getEagleServiceClient()
-        val query = "%s[@site=\"%s\" AND @application=\"%s\" AND 
@topology=\"%s\"]{*}".format(Constants.TOPOLOGY_EXECUTION_SERVICE_ENDPOINT_NAME,
 site, appName, topologyName)
-        LOG.info(s"query=$query")
-        val response: GenericServiceAPIResponseEntity[TopologyExecutionEntity] 
= client.search(query).pageSize(Int.MaxValue).send()
-        if(client != null) client.close()
-        if(!response.isSuccess || response.getObj == null)
-          throw new Exception(s"Fail to load topologyExecutionEntity with 
application=$appName topology=$topologyName due to Exception: 
${response.getException}")
-        if(response.getObj.size() == 0 || response.getObj.size() > 1) {
-          throw new Exception(s"Get 0 or more than 1 topologyExecutionEntity 
with application=$appName topology=$topologyName")
-        }
-        response.getObj.get(0)
-      }
-    }, ex)
-  }
-
-  def loadTopologyDescriptionByName(site: String, application: String, 
topologyName: String) = {
-    Futures.future(new Callable[TopologyDescriptionEntity]{
-      override def call(): TopologyDescriptionEntity = {
-        val client = getEagleServiceClient()
-        var query = 
"%s[@topology=\"%s\"]{*}".format(Constants.TOPOLOGY_DESCRIPTION_SERVICE_ENDPOINT_NAME,
 topologyName)
-        val response: 
GenericServiceAPIResponseEntity[TopologyDescriptionEntity] = 
client.search(query).pageSize(Int.MaxValue).send()
-        if(!response.isSuccess || response.getObj == null || 
response.getObj.size() == 0)
-          throw new Exception(s"Fail to load TopologyDescriptionEntity with 
site=$site application=$application topology=$topologyName due to Exception: 
${response.getException}")
-        val topologyDescriptionEntity = response.getObj.get(0)
-
-        query = "%s[@site=\"%s\" AND 
@application=\"%s\"]{*}".format(Constants.SITE_APPLICATION_SERVICE_ENDPOINT_NAME,
 site, application)
-        val configResponse: 
GenericServiceAPIResponseEntity[SiteApplicationServiceEntity] = 
client.search(query).pageSize(Int.MaxValue).send()
-        if (client != null) client.close()
-        if(!configResponse.isSuccess || configResponse.getObj == null || 
configResponse.getObj.size() == 0)
-          throw new Exception(s"Fail to load topology configuration with 
query=$query due to Exception: ${configResponse.getException}")
-        val siteApplicationEntity = configResponse.getObj.get(0)
-        topologyDescriptionEntity.setContext(siteApplicationEntity.getConfig)
-        topologyDescriptionEntity
-      }
-    }, ex)
-  }
-
-  def updateOperationStatus(operation: TopologyOperationEntity) = {
-    Futures.future(new Callable[GenericServiceAPIResponseEntity[String]]{
-      override def call(): GenericServiceAPIResponseEntity[String] = {
-        if(LOG.isDebugEnabled()) LOG.debug(s"Updating status of 
command[$operation] as ${operation.getStatus}")
-        val client = getEagleServiceClient()
-        operation.setLastModifiedDate(System.currentTimeMillis())
-        val response= client.update(java.util.Arrays.asList(operation), 
classOf[TopologyOperationEntity])
-        if(client != null) client.close()
-        if(response.isSuccess) {
-          LOG.info(s"Updated operation status [$operation] as: 
${operation.getStatus}")
-        } else {
-          LOG.error(s"Failed to update status as ${operation.getStatus} of 
command[$operation]")
-          throw new RuntimeException(s"Failed to update command due to 
exception: ${response.getException}")
-        }
-        response
-      }
-    }, ex)
-  }
-
-  def updateTopologyExecutionStatus(topology: TopologyExecutionEntity) = {
-    Futures.future(new Callable[GenericServiceAPIResponseEntity[String]]{
-      override def call(): GenericServiceAPIResponseEntity[String] = {
-        if(LOG.isDebugEnabled()) LOG.debug(s"Updating status of app[$topology] 
as ${topology.getStatus}")
-        val client = getEagleServiceClient()
-        topology.setLastModifiedDate(System.currentTimeMillis())
-        if(client != null) client.close()
-        val response= client.update(java.util.Arrays.asList(topology), 
classOf[TopologyExecutionEntity])
-        if(response.isSuccess) {
-          LOG.info(s"Updated status application[$topology] as: 
${topology.getStatus}")
-        } else {
-          LOG.error(s"Failed to update status as ${topology.getStatus} of 
application[$topology] due to ${response.getException}")
-        }
-        response
-      }
-    }, ex)
-  }
-
-  def clearPendingOperations() = {
-    Futures.future(new Callable[GenericServiceAPIResponseEntity[String]]{
-      override def call(): GenericServiceAPIResponseEntity[String] = {
-        LOG.info("start to clear operation")
-        val query: String = 
"%s[@status=\"%s\"]{*}".format(Constants.TOPOLOGY_OPERATION_SERVICE_ENDPOINT_NAME,
 TopologyOperationEntity.OPERATION_STATUS.PENDING)
-        val client = getEagleServiceClient()
-        val response: GenericServiceAPIResponseEntity[TopologyOperationEntity] 
= client.search(query).pageSize(Int.MaxValue).send()
-        var ret: GenericServiceAPIResponseEntity[String] = new 
GenericServiceAPIResponseEntity[String]()
-        if (response.isSuccess && response.getObj.size != 0) {
-          val pendingOperations: util.List[TopologyOperationEntity] = 
response.getObj
-          val failedOperations: util.List[TopologyOperationEntity] = new 
util.ArrayList[TopologyOperationEntity]
-          JavaConversions.collectionAsScalaIterable(pendingOperations) foreach 
{ operation =>
-            
operation.setStatus(TopologyOperationEntity.OPERATION_STATUS.FAILED)
-            failedOperations.add(operation)
-          }
-          ret = client.update(failedOperations, 
Constants.TOPOLOGY_OPERATION_SERVICE_ENDPOINT_NAME)
-          if (client != null) client.close()
-          if (ret.isSuccess) {
-            LOG.info(s"Successfully clear ${failedOperations.size()} pending 
operations")
-          } else {
-            LOG.error(s"Failed to clear pending operations due to exception:" 
+ ret.getException)
-          }
-        }
-        ret
-      }
-    }, ex)
-  }
-}
-
-

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-app/eagle-stream-application-manager/src/main/scala/org/apache/eagle/stream/application/ExecutionPlatform.scala
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-app/eagle-stream-application-manager/src/main/scala/org/apache/eagle/stream/application/ExecutionPlatform.scala
 
b/eagle-core/eagle-app/eagle-stream-application-manager/src/main/scala/org/apache/eagle/stream/application/ExecutionPlatform.scala
deleted file mode 100644
index 88271bb..0000000
--- 
a/eagle-core/eagle-app/eagle-stream-application-manager/src/main/scala/org/apache/eagle/stream/application/ExecutionPlatform.scala
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- *
- */
-
-package org.apache.eagle.stream.application
-
-import com.typesafe.config.Config
-import org.apache.eagle.service.application.entity.{TopologyExecutionEntity, 
TopologyDescriptionEntity}
-
-
-trait ExecutionPlatform {
-  def start(topology: TopologyDescriptionEntity, topologyExecution: 
TopologyExecutionEntity, config: Config)
-  def stop(topologyExecution: TopologyExecutionEntity, config: Config)
-  def status(topologyExecutions: java.util.List[TopologyExecutionEntity], 
config: Config)
-  def status(topologyExecution: TopologyExecutionEntity, config: Config)
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-app/eagle-stream-application-manager/src/main/scala/org/apache/eagle/stream/application/ExecutionPlatformFactory.scala
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-app/eagle-stream-application-manager/src/main/scala/org/apache/eagle/stream/application/ExecutionPlatformFactory.scala
 
b/eagle-core/eagle-app/eagle-stream-application-manager/src/main/scala/org/apache/eagle/stream/application/ExecutionPlatformFactory.scala
deleted file mode 100644
index 6b9c033..0000000
--- 
a/eagle-core/eagle-app/eagle-stream-application-manager/src/main/scala/org/apache/eagle/stream/application/ExecutionPlatformFactory.scala
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- *
- */
-
-package org.apache.eagle.stream.application
-
-import org.apache.eagle.service.application.AppManagerConstants
-import org.apache.eagle.stream.application.impl.StormExecutionPlatform
-import org.slf4j.{LoggerFactory, Logger}
-
-import scala.collection.mutable
-
-
-object ExecutionPlatformFactory {
-  private val LOG: Logger = 
LoggerFactory.getLogger(ExecutionPlatformFactory.getClass)
-
-  var managerCache = new mutable.HashMap[String, ExecutionPlatform] with
-    mutable.SynchronizedMap[String, ExecutionPlatform]
-
-  def getApplicationManager(managerType: String): ExecutionPlatform = {
-    if(managerCache.contains(managerType)) {
-      managerCache.get(managerType).get
-    } else {
-      managerType match {
-        case AppManagerConstants.EAGLE_CLUSTER_STORM =>
-          val instance = new StormExecutionPlatform
-          managerCache.put(managerType, instance)
-          instance
-        case _ =>
-          throw new Exception(s"Invalid managerType $managerType")
-      }
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-app/eagle-stream-application-manager/src/main/scala/org/apache/eagle/stream/application/TaskExecutor.scala
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-app/eagle-stream-application-manager/src/main/scala/org/apache/eagle/stream/application/TaskExecutor.scala
 
b/eagle-core/eagle-app/eagle-stream-application-manager/src/main/scala/org/apache/eagle/stream/application/TaskExecutor.scala
deleted file mode 100644
index 07737ac..0000000
--- 
a/eagle-core/eagle-app/eagle-stream-application-manager/src/main/scala/org/apache/eagle/stream/application/TaskExecutor.scala
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- *
- */
-
-package org.apache.eagle.stream.application
-
-import org.codehaus.jackson.annotate.JsonIgnore
-
-class TaskExecutor(runnable: Runnable) extends Thread(runnable) {
-
-  @JsonIgnore override def getContextClassLoader: ClassLoader = {
-    return super.getContextClassLoader
-  }
-
-  @JsonIgnore override def getUncaughtExceptionHandler: 
Thread.UncaughtExceptionHandler = {
-    return super.getUncaughtExceptionHandler
-  }
-
-  def shutdown {
-    this.interrupt
-  }
-
-  def restart {
-    this.interrupt
-    this.start
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-app/eagle-stream-application-manager/src/main/scala/org/apache/eagle/stream/application/impl/StormDynamicTopology.scala
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-app/eagle-stream-application-manager/src/main/scala/org/apache/eagle/stream/application/impl/StormDynamicTopology.scala
 
b/eagle-core/eagle-app/eagle-stream-application-manager/src/main/scala/org/apache/eagle/stream/application/impl/StormDynamicTopology.scala
deleted file mode 100644
index 7d52649..0000000
--- 
a/eagle-core/eagle-app/eagle-stream-application-manager/src/main/scala/org/apache/eagle/stream/application/impl/StormDynamicTopology.scala
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- *
- */
-
-package org.apache.eagle.stream.application.impl
-
-import com.typesafe.config.Config
-import org.apache.eagle.datastream.ExecutionEnvironments
-import org.apache.eagle.datastream.storm.StormExecutionEnvironment
-import org.apache.eagle.stream.application.AbstractDynamicApplication
-import org.slf4j.LoggerFactory
-
-
-object StormDynamicTopology extends AbstractDynamicApplication {
-  val LOG = LoggerFactory.getLogger(classOf[AbstractDynamicApplication])
-
-  override def submit(application: String, config: Config) {
-    val stream = compileStream(application, config)
-    var ret = true
-
-    try {
-      val stormEnv = 
ExecutionEnvironments.getWithConfig[StormExecutionEnvironment](stream.getConfig)
-      stream.submit(stormEnv)
-    } catch {
-      case e: Throwable =>
-        ret = false
-        LOG.error(e.toString)
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-app/eagle-stream-application-manager/src/main/scala/org/apache/eagle/stream/application/impl/StormExecutionPlatform.scala
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-app/eagle-stream-application-manager/src/main/scala/org/apache/eagle/stream/application/impl/StormExecutionPlatform.scala
 
b/eagle-core/eagle-app/eagle-stream-application-manager/src/main/scala/org/apache/eagle/stream/application/impl/StormExecutionPlatform.scala
deleted file mode 100644
index af4cafa..0000000
--- 
a/eagle-core/eagle-app/eagle-stream-application-manager/src/main/scala/org/apache/eagle/stream/application/impl/StormExecutionPlatform.scala
+++ /dev/null
@@ -1,197 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- *
- */
-
-package org.apache.eagle.stream.application.impl
-
-import java.net.URLDecoder
-import java.nio.file.{Files, Paths}
-
-import backtype.storm.generated.InvalidTopologyException
-import backtype.storm.utils.{NimbusClient, Utils}
-import com.typesafe.config.{Config, ConfigFactory}
-import org.apache.eagle.common.config.EagleConfigConstants
-import org.apache.eagle.service.application.AppManagerConstants
-import org.apache.eagle.service.application.entity.{TopologyDescriptionEntity, 
TopologyExecutionEntity, TopologyExecutionStatus}
-import org.apache.eagle.stream.application.{ApplicationManager, 
ApplicationManagerUtils, ExecutionPlatform, TopologyFactory}
-import org.slf4j.LoggerFactory
-
-import scala.collection.JavaConversions
-
-object StormExecutionPlatform {
-  val ACTIVE: String = "ACTIVE"
-  val INACTIVE: String = "INACTIVE"
-  val KILLED: String = "KILLED"
-  val REBALANCING: String = "REBALANCING"
-}
-
-class StormExecutionPlatform extends ExecutionPlatform {
-  val LOG = LoggerFactory.getLogger(classOf[StormExecutionPlatform])
-
-  private def getNimbusClient(appConfig: com.typesafe.config.Config): 
NimbusClient = {
-    val conf = Utils.readStormConfig().asInstanceOf[java.util.HashMap[String, 
Object]]
-    
conf.putAll(Utils.readCommandLineOpts().asInstanceOf[java.util.HashMap[String, 
Object]])
-
-    if(appConfig.hasPath("envContextConfig.nimbusHost")) {
-      LOG.info(s"Setting ${backtype.storm.Config.NIMBUS_HOST} as 
${appConfig.getString("envContextConfig.nimbusHost")}")
-      conf.put(backtype.storm.Config.NIMBUS_HOST, 
appConfig.getString("envContextConfig.nimbusHost"))
-    }
-
-    if(appConfig.hasPath("envContextConfig.nimbusThriftPort")) {
-      LOG.info(s"Setting ${backtype.storm.Config.NIMBUS_THRIFT_PORT} as 
${appConfig.getString("envContextConfig.nimbusThriftPort")}")
-      conf.put(backtype.storm.Config.NIMBUS_THRIFT_PORT, 
appConfig.getNumber("envContextConfig.nimbusThriftPort"))
-    }
-    NimbusClient.getConfiguredClient(conf)
-  }
-
-  def startLocal(topologyName: String, topology: TopologyDescriptionEntity, 
topologyExecution: TopologyExecutionEntity, config: Config): Unit = {
-    val worker: Thread = ApplicationManager.submit(topologyName, new Runnable {
-      override def run(): Unit = {
-        try {
-          val topologyType = topology.getType.toUpperCase()
-          topologyType match {
-            case TopologyDescriptionEntity.TYPE.CLASS =>
-              TopologyFactory.submit(topology.getExeClass, config)
-            case TopologyDescriptionEntity.TYPE.DYNAMIC =>
-              StormDynamicTopology.submit(topology.getExeClass, config)
-            case m@_ =>
-              LOG.error("Unsupported topology type: " + topology.getType)
-          }
-        } catch {
-          case ex: Throwable =>
-            LOG.error(s"topology $topologyName in local mode is interrupted 
with ${ex.toString}")
-        }
-      }
-    })
-    topologyExecution.setFullName(topologyName)
-    
topologyExecution.setStatus(ApplicationManager.getWorkerStatus(worker.getState))
-    topologyExecution.setDescription("Running inside " + worker.toString + " 
in local mode")
-  }
-
-  override def start(topology: TopologyDescriptionEntity, topologyExecution: 
TopologyExecutionEntity, config: Config): Unit = {
-    val stormJarPath: String = 
URLDecoder.decode(classOf[ExecutionPlatform].getProtectionDomain.getCodeSource.getLocation.getPath,
 "UTF-8")
-    if (stormJarPath == null || !Files.exists(Paths.get(stormJarPath)) || 
!stormJarPath.endsWith(".jar")) {
-      val errMsg = s"storm jar file $stormJarPath does not exists, or is a 
invalid jar file"
-      LOG.error(errMsg)
-      throw new Exception(errMsg)
-    }
-    LOG.info(s"Detected a storm.jar location at: $stormJarPath")
-    System.setProperty("storm.jar", stormJarPath)
-
-    val fullName = 
ApplicationManagerUtils.generateTopologyFullName(topologyExecution)
-    val extConfigStr = "envContextConfig.topologyName=%s".format(fullName)
-    val extConfig = ConfigFactory.parseString(extConfigStr)
-    val newConfig = extConfig.withFallback(config)
-
-    val mode = if(config.hasPath(AppManagerConstants.RUNNING_MODE)) 
config.getString(AppManagerConstants.RUNNING_MODE) else 
EagleConfigConstants.LOCAL_MODE
-    topologyExecution.setMode(mode)
-    if 
(topologyExecution.getMode.equalsIgnoreCase(EagleConfigConstants.LOCAL_MODE)) {
-      startLocal(fullName, topology, topologyExecution, newConfig)
-      return
-    }
-
-    val topologyType = topology.getType.toUpperCase()
-    topologyType match {
-      case TopologyDescriptionEntity.TYPE.CLASS =>
-        TopologyFactory.submit(topology.getExeClass, newConfig)
-      case TopologyDescriptionEntity.TYPE.DYNAMIC =>
-        StormDynamicTopology.submit(topology.getExeClass, newConfig)
-      case m@_ =>
-        throw new InvalidTopologyException("Unsupported topology type: " + 
topology.getType)
-    }
-    topologyExecution.setFullName(fullName)
-    //topologyExecution.setStatus(TopologyExecutionStatus.STARTED)
-  }
-
-  override def stop(topologyExecution: TopologyExecutionEntity, config: 
Config): Unit = {
-    val name: String = 
ApplicationManagerUtils.generateTopologyFullName(topologyExecution)
-
-    
if(topologyExecution.getMode.equalsIgnoreCase(EagleConfigConstants.LOCAL_MODE)) 
{
-      stopLocal(name, topologyExecution)
-    } else {
-      getNimbusClient(config).getClient.killTopology(name)
-      topologyExecution.setStatus(TopologyExecutionStatus.STOPPING)
-      //topologyExecution.setDescription("")
-    }
-  }
-
-  def stopLocal(name: String, topologyExecution: TopologyExecutionEntity): 
Unit = {
-      val taskWorker = ApplicationManager.stop(name)
-      
topologyExecution.setStatus(ApplicationManager.getWorkerStatus(taskWorker.getState))
-      topologyExecution.setDescription(s"topology status is 
${taskWorker.getState}")
-      /*try{
-        ApplicationManager.remove(name)
-      } catch {
-        case ex: IllegalArgumentException =>
-          LOG.warn(s"ApplicationManager.remove($name) failed as it has been 
removed")
-      }*/
-  }
-
-
-  def getTopology(topologyName: String, config: Config) = {
-    val topologySummery = 
getNimbusClient(config).getClient.getClusterInfo.get_topologies
-    JavaConversions.collectionAsScalaIterable(topologySummery).find { t => 
t.get_name.equals(topologyName) }
-    match {
-      case Some(t) => Some(t)
-      case None    => None
-    }
-  }
-
-  override def status(topologyExecution: TopologyExecutionEntity, config: 
Config): Unit = {
-    val name: String = 
ApplicationManagerUtils.generateTopologyFullName(topologyExecution)
-
-    
if(topologyExecution.getMode.equalsIgnoreCase(EagleConfigConstants.LOCAL_MODE)) 
{
-      statusLocal(name, topologyExecution)
-    } else {
-      val topology = getTopology(name, config)
-      topology match {
-        case Some(topology) =>
-          
topologyExecution.setStatus(ApplicationManager.getTopologyStatus(topology.get_status()))
-          
topologyExecution.setUrl(ApplicationManagerUtils.buildStormTopologyURL(config, 
topology.get_id()))
-          topologyExecution.setDescription(topology.toString)
-        case None =>
-          topologyExecution.setStatus(TopologyExecutionStatus.STOPPED)
-          topologyExecution.setUrl("")
-          topologyExecution.setDescription("")
-      }
-    }
-  }
-
-  def statusLocal(name: String, topologyExecution: TopologyExecutionEntity): 
Unit = {
-    try {
-      val currentStatus = topologyExecution.getStatus()
-      val newStatus = 
ApplicationManager.getWorkerStatus(ApplicationManager.get(name).getState())
-      if (!currentStatus.equals(newStatus)) {
-        LOG.info("Status of topology: %s changed from %s to 
%s".format(topologyExecution.getFullName, currentStatus, newStatus))
-        topologyExecution.setStatus(newStatus)
-        topologyExecution.setDescription(String.format("Status of topology: %s 
changed from %s to %s", name, currentStatus, newStatus))
-      } else 
if(currentStatus.equalsIgnoreCase(TopologyExecutionStatus.STOPPED)) {
-        ApplicationManager.remove(name)
-      }
-    }catch {
-      case ex: Throwable =>
-        topologyExecution.setDescription("")
-        topologyExecution.setStatus(TopologyExecutionStatus.STOPPED)
-    }
-  }
-
-  override def status(topologyExecutions: 
java.util.List[TopologyExecutionEntity], config: Config): Unit = {
-    JavaConversions.collectionAsScalaIterable(topologyExecutions) foreach {
-      topologyExecution => status(topologyExecution, config)
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-app/eagle-stream-application-manager/src/main/scala/org/apache/eagle/stream/application/scheduler/AppCommandExecutor.scala
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-app/eagle-stream-application-manager/src/main/scala/org/apache/eagle/stream/application/scheduler/AppCommandExecutor.scala
 
b/eagle-core/eagle-app/eagle-stream-application-manager/src/main/scala/org/apache/eagle/stream/application/scheduler/AppCommandExecutor.scala
deleted file mode 100644
index 8fbf60d..0000000
--- 
a/eagle-core/eagle-app/eagle-stream-application-manager/src/main/scala/org/apache/eagle/stream/application/scheduler/AppCommandExecutor.scala
+++ /dev/null
@@ -1,170 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- *
- */
-
-package org.apache.eagle.stream.application.scheduler
-
-import java.util.concurrent.Callable
-
-import akka.actor.{Actor, ActorLogging}
-import akka.dispatch.Futures
-import com.typesafe.config.{Config, ConfigFactory, ConfigParseOptions, 
ConfigSyntax}
-import org.apache.eagle.common.config.EagleConfigConstants
-import org.apache.eagle.service.application.AppManagerConstants
-import 
org.apache.eagle.service.application.entity.TopologyOperationEntity.OPERATION
-import org.apache.eagle.service.application.entity.{TopologyExecutionEntity, 
TopologyExecutionStatus, TopologyOperationEntity}
-import org.apache.eagle.stream.application.{ApplicationSchedulerAsyncDAO, 
ExecutionPlatformFactory}
-
-import scala.collection.JavaConversions
-import scala.util.{Failure, Success}
-
-
-private[scheduler] class AppCommandExecutor extends Actor with ActorLogging {
-  @volatile var _config: Config = _
-  @volatile var _dao: ApplicationSchedulerAsyncDAO = _
-
-  import context.dispatcher
-
-  def start(topologyExecution: TopologyExecutionEntity, topologyOperation: 
TopologyOperationEntity) = {
-    val options: ConfigParseOptions = 
ConfigParseOptions.defaults.setSyntax(ConfigSyntax.PROPERTIES).setAllowMissing(false)
-    _dao.loadTopologyDescriptionByName(topologyOperation.getSite, 
topologyOperation.getApplication, topologyOperation.getTopology) onComplete {
-      case Success(topology) =>
-        val topologyConfig: Config = 
ConfigFactory.parseString(topology.getContext, options)
-
-        if(!topologyConfig.hasPath(EagleConfigConstants.APP_CONFIG)) {
-          topologyOperation.setMessage("Fail to detect topology configuration")
-          
topologyOperation.setStatus(TopologyOperationEntity.OPERATION_STATUS.FAILED)
-          _dao.updateOperationStatus(topologyOperation)
-        } else {
-          val config = 
topologyConfig.getConfig(EagleConfigConstants.APP_CONFIG).withFallback(_config)
-          val clusterType = 
if(config.hasPath(AppManagerConstants.CLUSTER_ENV)) 
config.getString(AppManagerConstants.CLUSTER_ENV) else 
AppManagerConstants.EAGLE_CLUSTER_STORM
-          topologyExecution.setEnvironment(clusterType)
-
-          Futures.future(new Callable[TopologyExecutionEntity]{
-            override def call(): TopologyExecutionEntity = {
-              topologyExecution.setStatus(TopologyExecutionStatus.STARTING)
-              _dao.updateTopologyExecutionStatus(topologyExecution)
-              
ExecutionPlatformFactory.getApplicationManager(clusterType).start(topology, 
topologyExecution, config)
-              
topologyOperation.setStatus(TopologyOperationEntity.OPERATION_STATUS.SUCCESS)
-              topologyExecution
-            }
-          }, context.dispatcher) onComplete {
-            case Success(topologyExecutionEntity) =>
-              
topologyOperation.setStatus(TopologyOperationEntity.OPERATION_STATUS.SUCCESS)
-              updateStatus(topologyExecution, topologyOperation)
-            case Failure(ex) =>
-              topologyOperation.setMessage(ex.getMessage)
-              
topologyOperation.setStatus(TopologyOperationEntity.OPERATION_STATUS.FAILED)
-              _dao.updateOperationStatus(topologyOperation)
-          }
-        }
-
-      case Failure(ex) =>
-        topologyOperation.setMessage(ex.getMessage)
-        
topologyOperation.setStatus(TopologyOperationEntity.OPERATION_STATUS.FAILED)
-        _dao.updateOperationStatus(topologyOperation)
-    }
-  }
-
-  def stop(topologyExecution: TopologyExecutionEntity, topologyOperation: 
TopologyOperationEntity) = {
-    val clusterType = topologyExecution.getEnvironment
-
-    Futures.future(new Callable[TopologyExecutionEntity]{
-      override def call(): TopologyExecutionEntity = {
-        topologyExecution.setStatus(TopologyExecutionStatus.STOPPING)
-        _dao.updateTopologyExecutionStatus(topologyExecution)
-        
ExecutionPlatformFactory.getApplicationManager(clusterType).stop(topologyExecution,
 _config)
-        
topologyOperation.setStatus(TopologyOperationEntity.OPERATION_STATUS.SUCCESS)
-        topologyExecution
-      }
-    }, context.dispatcher) onComplete {
-      case Success(topologyExecutionEntity) =>
-        
topologyOperation.setStatus(TopologyOperationEntity.OPERATION_STATUS.SUCCESS)
-        updateStatus(topologyExecution, topologyOperation)
-      case Failure(ex) =>
-        topologyOperation.setMessage(ex.toString)
-        
topologyOperation.setStatus(TopologyOperationEntity.OPERATION_STATUS.FAILED)
-        _dao.updateOperationStatus(topologyOperation)
-    }
-  }
-
-  def status(topologyExecution: TopologyExecutionEntity) = {
-    val clusterType = topologyExecution.getEnvironment
-
-    Futures.future(new Callable[TopologyExecutionEntity]{
-      override def call(): TopologyExecutionEntity = {
-        
ExecutionPlatformFactory.getApplicationManager(clusterType).status(topologyExecution,
 _config)
-        topologyExecution
-      }
-    }, context.dispatcher) onComplete {
-      case _ =>
-        _dao.updateTopologyExecutionStatus(topologyExecution)
-    }
-  }
-
-  def updateStatus(topologyExecution: TopologyExecutionEntity, 
topologyOperation: TopologyOperationEntity): Unit = {
-    _dao.updateOperationStatus(topologyOperation)
-    _dao.updateTopologyExecutionStatus(topologyExecution)
-  }
-
-  def execute(topologyExecution: TopologyExecutionEntity, topologyOperation: 
TopologyOperationEntity): Unit = {
-    try {
-      topologyOperation.getOperation match {
-        case OPERATION.START =>
-          start(topologyExecution, topologyOperation)
-        case OPERATION.STOP =>
-          stop(topologyExecution, topologyOperation)
-        case m@_ =>
-          log.warning("Unsupported operation: " + topologyOperation)
-          throw new Exception(s"Unsupported operation: 
${topologyOperation.getOperation}, possible values are START/STOP")
-      }
-    } catch {
-      case e: Throwable =>
-        topologyOperation.setMessage(e.getMessage)
-        
topologyOperation.setStatus(TopologyOperationEntity.OPERATION_STATUS.FAILED)
-        _dao.updateOperationStatus(topologyOperation)
-    }
-  }
-
-  override def receive = {
-    case InitializationEvent(config: Config) =>
-      _config = config
-      _dao = new ApplicationSchedulerAsyncDAO(config, context.dispatcher)
-    case SchedulerCommand(topologyExecution, topologyOperation) =>
-      execute(topologyExecution, topologyOperation)
-    case HealthCheckerEvent =>
-      _dao.loadAllTopologyExecutionEntities() onComplete {
-        case Success(topologyExecutions) =>
-          log.info(s"Load ${topologyExecutions.size()} topologies in 
execution")
-          JavaConversions.collectionAsScalaIterable(topologyExecutions) 
foreach { topologyExecution =>
-            try{
-              status(topologyExecution)
-            } catch {
-              case ex: Throwable =>
-                log.error(ex.getMessage)
-            }
-          }
-        case Failure(ex) =>
-          log.error(s"Fail to load any topologyExecutionEntity due to 
Exception: ${ex.getMessage}")
-      }
-    case TerminatedEvent =>
-      context.stop(self)
-    case m@_ =>
-      log.warning("Unsupported operation $m")
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-app/eagle-stream-application-manager/src/main/scala/org/apache/eagle/stream/application/scheduler/AppCommandLoader.scala
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-app/eagle-stream-application-manager/src/main/scala/org/apache/eagle/stream/application/scheduler/AppCommandLoader.scala
 
b/eagle-core/eagle-app/eagle-stream-application-manager/src/main/scala/org/apache/eagle/stream/application/scheduler/AppCommandLoader.scala
deleted file mode 100644
index c731846..0000000
--- 
a/eagle-core/eagle-app/eagle-stream-application-manager/src/main/scala/org/apache/eagle/stream/application/scheduler/AppCommandLoader.scala
+++ /dev/null
@@ -1,78 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- *
- */
-
-package org.apache.eagle.stream.application.scheduler
-
-import akka.actor.{Actor, ActorLogging}
-import com.typesafe.config.Config
-import 
org.apache.eagle.service.application.entity.TopologyOperationEntity.OPERATION_STATUS
-import org.apache.eagle.stream.application.ApplicationSchedulerAsyncDAO
-
-import scala.collection.JavaConversions
-import scala.util.{Failure, Success}
-
-
-private[scheduler] class AppCommandLoader extends Actor with ActorLogging {
-  @volatile var _config: Config = null
-  @volatile var _dao: ApplicationSchedulerAsyncDAO = null
-
-  import context.dispatcher
-
-  override def receive = {
-    case InitializationEvent(config: Config) =>
-      _config = config
-      _dao = new ApplicationSchedulerAsyncDAO(config, context.dispatcher)
-    case ClearPendingOperation =>
-      if(_dao == null) _dao = new ApplicationSchedulerAsyncDAO(_config, 
context.dispatcher)
-      _dao.clearPendingOperations()
-    case CommandLoaderEvent => {
-      val _sender = sender()
-      _dao.readOperationsByStatus(OPERATION_STATUS.INITIALIZED) onComplete {
-        case Success(commands) => {
-          log.info(s"Load ${commands.size()} new commands")
-          JavaConversions.collectionAsScalaIterable(commands) foreach { 
command =>
-            command.setStatus(OPERATION_STATUS.PENDING)
-            _dao.updateOperationStatus(command) onComplete {
-              case Success(response) =>
-                _dao.loadTopologyExecutionByName(command.getSite, 
command.getApplication, command.getTopology) onComplete {
-                  case Success(topologyExecution) => {
-                    _sender ! SchedulerCommand(topologyExecution, command)
-                  }
-                  case Failure(ex) =>
-                    log.error(ex.getMessage)
-                    command.setMessage(ex.getMessage)
-                    command.setStatus(OPERATION_STATUS.FAILED)
-                    _dao.updateOperationStatus(command)
-                }
-              case Failure(ex) =>
-                log.error(s"Got an exception to update command status 
$command: ${ex.getMessage}")
-                command.setMessage(ex.getMessage)
-                command.setStatus(OPERATION_STATUS.FAILED)
-                _dao.updateOperationStatus(command)
-            }
-          }
-        }
-        case Failure(ex) =>
-          log.error(s"Failed to get commands due to exception 
${ex.getMessage}")
-      }
-    }
-    case TerminatedEvent =>
-      context.stop(self)
-    case m@_ => throw new UnsupportedOperationException(s"Event is not 
supported $m")
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-app/eagle-stream-application-manager/src/main/scala/org/apache/eagle/stream/application/scheduler/ApplicationScheduler.scala
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-app/eagle-stream-application-manager/src/main/scala/org/apache/eagle/stream/application/scheduler/ApplicationScheduler.scala
 
b/eagle-core/eagle-app/eagle-stream-application-manager/src/main/scala/org/apache/eagle/stream/application/scheduler/ApplicationScheduler.scala
deleted file mode 100644
index 476a3fb..0000000
--- 
a/eagle-core/eagle-app/eagle-stream-application-manager/src/main/scala/org/apache/eagle/stream/application/scheduler/ApplicationScheduler.scala
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- *
- */
-
-package org.apache.eagle.stream.application.scheduler
-
-import akka.actor.{ActorSystem, Props}
-import com.typesafe.config.Config
-import org.apache.eagle.service.application.AppManagerConstants
-import org.apache.eagle.service.application.entity.{TopologyExecutionEntity, 
TopologyOperationEntity}
-import org.apache.eagle.stream.application.ApplicationManager
-
-import scala.concurrent.duration._
-
-
-private[scheduler] class ScheduleEvent
-private[scheduler] case class InitializationEvent(config: Config) extends 
ScheduleEvent
-private[scheduler] case class TerminatedEvent() extends ScheduleEvent
-private[scheduler] case class CommandLoaderEvent() extends ScheduleEvent
-private[scheduler] case class HealthCheckerEvent() extends ScheduleEvent
-private[scheduler] case class ClearPendingOperation() extends ScheduleEvent
-private[scheduler] case class SchedulerCommand(topologyExecution: 
TopologyExecutionEntity, topologyOperation: TopologyOperationEntity) extends 
ScheduleEvent
-
-case class EagleServiceUnavailableException(message:String) extends 
Exception(message)
-case class DuplicatedDefinitionException(message:String) extends 
Exception(message)
-case class LoadTopologyFailureException(message:String) extends 
Exception(message)
-
-
-/**
- * 1. Sync command from eagle service
- * 2. Coordinate command to different actor
- * 3. Actor execute command as requested
- */
-class ApplicationScheduler {
-  //val config = ConfigFactory.load()
-  val DEFAULT_COMMAND_LOADER_INTERVAL_SECS = 2
-  val DEFAULT_HEALTH_CHECK_INTERVAL_SECS = 5
-
-  def start(config: Config) = {
-    val system = ActorSystem("application-manager-scheduler", config)
-    system.log.info(s"Started actor system: $system")
-
-    import system.dispatcher
-
-    val commandLoaderIntervalSecs: Long = 
if(config.hasPath(AppManagerConstants.APP_COMMAND_LOADER_INTERVAL_SECS)) 
config.getLong(AppManagerConstants.APP_COMMAND_LOADER_INTERVAL_SECS) else 
DEFAULT_COMMAND_LOADER_INTERVAL_SECS
-    val healthCheckIntervalSecs: Long = 
if(config.hasPath(AppManagerConstants.APP_HEALTH_CHECK_INTERVAL_SECS)) 
config.getLong(AppManagerConstants.APP_HEALTH_CHECK_INTERVAL_SECS) else 
DEFAULT_HEALTH_CHECK_INTERVAL_SECS
-
-    val coordinator = system.actorOf(Props[StreamAppCoordinator])
-    system.scheduler.scheduleOnce(0 seconds, coordinator, 
InitializationEvent(config))
-    system.scheduler.scheduleOnce(1 seconds, coordinator, 
ClearPendingOperation)
-    system.scheduler.schedule(2.seconds, commandLoaderIntervalSecs.seconds, 
coordinator, CommandLoaderEvent)
-    system.scheduler.schedule(10.seconds, healthCheckIntervalSecs.seconds, 
coordinator, HealthCheckerEvent)
-
-    /*
-     registerOnTermination is called when you have shut down the ActorSystem 
(system.shutdown),
-     and the callbacks will be executed after all actors have been stopped.
-     */
-    system.registerOnTermination(new Runnable {
-      override def run(): Unit = {
-        coordinator ! TerminatedEvent
-        ApplicationManager.stopAll()
-      }
-    })
-    system
-  }
-}
-

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-app/eagle-stream-application-manager/src/main/scala/org/apache/eagle/stream/application/scheduler/StreamAppCoordinator.scala
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-app/eagle-stream-application-manager/src/main/scala/org/apache/eagle/stream/application/scheduler/StreamAppCoordinator.scala
 
b/eagle-core/eagle-app/eagle-stream-application-manager/src/main/scala/org/apache/eagle/stream/application/scheduler/StreamAppCoordinator.scala
deleted file mode 100644
index 17006ee..0000000
--- 
a/eagle-core/eagle-app/eagle-stream-application-manager/src/main/scala/org/apache/eagle/stream/application/scheduler/StreamAppCoordinator.scala
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- *
- */
-
-package org.apache.eagle.stream.application.scheduler
-
-import akka.actor.{Actor, ActorLogging, ActorRef, Props}
-
-private[scheduler] class StreamAppCoordinator extends Actor with ActorLogging {
-  var commandLoader: ActorRef = null
-  var commandExecutor: ActorRef = null
-
-
-  override def preStart(): Unit = {
-    commandLoader = context.actorOf(Props[AppCommandLoader], "command-loader")
-    commandExecutor = context.actorOf(Props[AppCommandExecutor], 
"command-worker")
-  }
-
-  override def receive = {
-    case InitializationEvent(config) => {
-      log.info(s"Config updated: $config")
-      commandLoader ! InitializationEvent(config)
-      commandExecutor ! InitializationEvent(config)
-    }
-    case ClearPendingOperation =>
-      commandLoader ! ClearPendingOperation
-    case CommandLoaderEvent =>
-      commandLoader ! CommandLoaderEvent
-    case command: SchedulerCommand =>
-      log.info(s"Executing command: $SchedulerCommand")
-      commandExecutor ! command
-    case HealthCheckerEvent =>
-      commandExecutor ! HealthCheckerEvent
-    case TerminatedEvent =>
-      log.info("Coordinator exit ...")
-      context.stop(self)
-    case m@_ =>
-      log.warning(s"Coordinator Unsupported message: $m")
-  }
-}


Reply via email to