[EAGLE-556] Install/Update Alert Topology Metadata when start alert engine

Currently alert engine requires additional metadata like TopologyMeta, which in 
fact could be automatically generated during starting alert engine.

*Changes*

* Add "ApplicationListener" in ApplicationProvider to support extensible 
application lifecycle management callback listener
* Implement AlertUnitTopologyAppListener to  add topology metadata when 
topology is running and remove topology metadata when topology is stopped

Author: Hao Chen <h...@apache.org>

Closes #493 from haoch/EAGLE-556.


Project: http://git-wip-us.apache.org/repos/asf/incubator-eagle/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-eagle/commit/fc2407cd
Tree: http://git-wip-us.apache.org/repos/asf/incubator-eagle/tree/fc2407cd
Diff: http://git-wip-us.apache.org/repos/asf/incubator-eagle/diff/fc2407cd

Branch: refs/heads/master
Commit: fc2407cd53438a31b4c1072ce7d5b08578b8e43d
Parents: fa85dc3
Author: Hao Chen <h...@apache.org>
Authored: Wed Oct 12 12:10:06 2016 +0800
Committer: Hao Chen <h...@apache.org>
Committed: Wed Oct 12 12:10:06 2016 +0800

----------------------------------------------------------------------
 .../alert/app/AlertUnitTopologyAppListener.java |  90 ++++++++++
 .../alert/app/AlertUnitTopologyAppProvider.java |  10 +-
 ...e.alert.app.AlertUnitTopologyAppProvider.xml |   4 +-
 .../alert/engine/runner/UnitTopologyRunner.java | 164 ++++++++++++------
 .../apache/eagle/app/ApplicationLifecycle.java  |  23 ++-
 .../environment/ExecutionRuntimeManager.java    |   4 +-
 .../environment/impl/StormExecutionRuntime.java |  21 ++-
 .../eagle/app/service/ApplicationAction.java    | 161 ++++++++++++++++++
 .../eagle/app/service/ApplicationListener.java  |  32 ++++
 .../service/ApplicationOperationContext.java    | 167 -------------------
 .../impl/ApplicationManagementServiceImpl.java  | 110 +++++++-----
 .../impl/ApplicationProviderConfigLoader.java   |   1 -
 .../impl/ApplicationProviderSPILoader.java      |   1 -
 .../apache/eagle/app/sink/KafkaStreamSink.java  |   4 +-
 .../eagle/app/sink/LoggingStreamSink.java       |   8 +-
 .../app/spi/AbstractApplicationProvider.java    |  52 +++++-
 .../eagle/app/spi/ApplicationProvider.java      |  19 +--
 .../app/test/ApplicationSimulatorImpl.java      |   1 -
 .../apache/eagle/app/TestStormApplication.java  |   9 +-
 .../app/service/ApplicationActionTest.java      |  48 ++++++
 .../ApplicationOperationContextTest.java        |  48 ------
 .../eagle/metadata/model/ApplicationEntity.java |   6 +
 .../app/example/ExampleApplicationProvider.java |  55 ++++--
 .../hbase/HBaseAuditLogAppProvider.java         |   5 -
 ...ecurity.auditlog.HdfsAuditLogAppProvider.xml |   8 +-
 25 files changed, 672 insertions(+), 379 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fc2407cd/eagle-core/eagle-alert-parent/eagle-alert-app/src/main/java/org/apache/eagle/alert/app/AlertUnitTopologyAppListener.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-alert-parent/eagle-alert-app/src/main/java/org/apache/eagle/alert/app/AlertUnitTopologyAppListener.java
 
b/eagle-core/eagle-alert-parent/eagle-alert-app/src/main/java/org/apache/eagle/alert/app/AlertUnitTopologyAppListener.java
new file mode 100644
index 0000000..fd3f9d2
--- /dev/null
+++ 
b/eagle-core/eagle-alert-parent/eagle-alert-app/src/main/java/org/apache/eagle/alert/app/AlertUnitTopologyAppListener.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.alert.app;
+
+import org.apache.eagle.alert.coordination.model.internal.Topology;
+import org.apache.eagle.alert.engine.runner.UnitTopologyRunner;
+import org.apache.eagle.alert.metadata.IMetadataDao;
+import org.apache.eagle.alert.metadata.resource.OpResult;
+import org.apache.eagle.app.service.ApplicationListener;
+import org.apache.eagle.metadata.model.ApplicationEntity;
+import com.typesafe.config.ConfigFactory;
+
+import com.google.inject.Inject;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+class AlertUnitTopologyAppListener implements ApplicationListener {
+    private static final Logger LOG = 
LoggerFactory.getLogger(AlertUnitTopologyAppListener.class);
+
+    @Inject private IMetadataDao metadataDao;
+
+    private ApplicationEntity applicationEntity;
+
+    @Override
+    public void init(ApplicationEntity applicationEntity) {
+        this.applicationEntity = applicationEntity;
+    }
+
+    @Override
+    public void afterInstall() {
+        // Do nothing
+    }
+
+    @Override
+    public void afterUninstall() {
+        removeTopologyMetadata();
+    }
+
+    @Override
+    public void beforeStart() {
+        // Do thing, may do some validation works?
+        updateTopologyMetadata();
+    }
+
+    @Override
+    public void afterStop() {
+        removeTopologyMetadata();
+    }
+
+    // -------------
+    // Internal RPC
+    // -------------
+
+    private void updateTopologyMetadata() {
+        LOG.info("Update topology metadata {}", 
this.applicationEntity.getAppId());
+        OpResult result = 
metadataDao.addTopology(createTopologyMeta(this.applicationEntity));
+        if (result.code == OpResult.FAILURE) {
+            LOG.error(result.message);
+            throw new IllegalStateException(result.message);
+        }
+    }
+
+    private void removeTopologyMetadata() {
+        LOG.info("Remove topology metadata {}", 
this.applicationEntity.getAppId());
+        OpResult result = 
metadataDao.removeTopology(createTopologyMeta(this.applicationEntity).getName());
+        if (result.code == OpResult.FAILURE) {
+            LOG.error(result.message);
+            throw new IllegalStateException(result.message);
+        }
+    }
+
+    private Topology createTopologyMeta(ApplicationEntity applicationEntity) {
+        return 
UnitTopologyRunner.buildTopologyMetadata(applicationEntity.getAppId(),ConfigFactory.parseMap(applicationEntity.getConfiguration()));
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fc2407cd/eagle-core/eagle-alert-parent/eagle-alert-app/src/main/java/org/apache/eagle/alert/app/AlertUnitTopologyAppProvider.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-alert-parent/eagle-alert-app/src/main/java/org/apache/eagle/alert/app/AlertUnitTopologyAppProvider.java
 
b/eagle-core/eagle-alert-parent/eagle-alert-app/src/main/java/org/apache/eagle/alert/app/AlertUnitTopologyAppProvider.java
index 39a4583..5548c8c 100644
--- 
a/eagle-core/eagle-alert-parent/eagle-alert-app/src/main/java/org/apache/eagle/alert/app/AlertUnitTopologyAppProvider.java
+++ 
b/eagle-core/eagle-alert-parent/eagle-alert-app/src/main/java/org/apache/eagle/alert/app/AlertUnitTopologyAppProvider.java
@@ -16,8 +16,11 @@
  */
 
 package org.apache.eagle.alert.app;
+import org.apache.eagle.app.service.ApplicationListener;
 import org.apache.eagle.app.spi.AbstractApplicationProvider;
 
+import java.util.Optional;
+
 /**
  * since 8/25/16.
  */
@@ -26,4 +29,9 @@ public class AlertUnitTopologyAppProvider extends 
AbstractApplicationProvider<Al
     public AlertUnitTopologyApp getApplication() {
         return new AlertUnitTopologyApp();
     }
-}
+
+    @Override
+    public Optional<ApplicationListener> getApplicationListener() {
+        return Optional.of(new AlertUnitTopologyAppListener());
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fc2407cd/eagle-core/eagle-alert-parent/eagle-alert-app/src/main/resources/META-INF/providers/org.apache.eagle.alert.app.AlertUnitTopologyAppProvider.xml
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-alert-parent/eagle-alert-app/src/main/resources/META-INF/providers/org.apache.eagle.alert.app.AlertUnitTopologyAppProvider.xml
 
b/eagle-core/eagle-alert-parent/eagle-alert-app/src/main/resources/META-INF/providers/org.apache.eagle.alert.app.AlertUnitTopologyAppProvider.xml
index 498cb8d..bf22123 100644
--- 
a/eagle-core/eagle-alert-parent/eagle-alert-app/src/main/resources/META-INF/providers/org.apache.eagle.alert.app.AlertUnitTopologyAppProvider.xml
+++ 
b/eagle-core/eagle-alert-parent/eagle-alert-app/src/main/resources/META-INF/providers/org.apache.eagle.alert.app.AlertUnitTopologyAppProvider.xml
@@ -65,7 +65,7 @@
         <property>
             <name>spout.kafkaBrokerZkQuorum</name>
             <displayName>spout.kafkaBrokerZkQuorum</displayName>
-            <value>server.eagle.apache.org:2181</value>
+            <value>localhost:2181</value>
             <description>zookeeper quorum for spout to consume 
data</description>
         </property>
         <property>
@@ -97,7 +97,7 @@
         <property>
             <name>zkConfig.zkQuorum</name>
             <displayName>zkConfig.zkQuorum</displayName>
-            <value>server.eagle.apache.org:2181</value>
+            <value>localhost:2181</value>
             <description>zk quorum for alert engine</description>
         </property>
         <property>

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fc2407cd/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/UnitTopologyRunner.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/UnitTopologyRunner.java
 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/UnitTopologyRunner.java
index 7a93e72..ec129fe 100755
--- 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/UnitTopologyRunner.java
+++ 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/UnitTopologyRunner.java
@@ -19,6 +19,7 @@
 
 package org.apache.eagle.alert.engine.runner;
 
+import org.apache.eagle.alert.coordination.model.internal.Topology;
 import org.apache.eagle.alert.engine.coordinator.IMetadataChangeNotifyService;
 import 
org.apache.eagle.alert.engine.coordinator.impl.ZKMetadataChangeNotifyService;
 import org.apache.eagle.alert.engine.spout.CorrelationSpout;
@@ -40,6 +41,8 @@ import org.slf4j.LoggerFactory;
 
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Set;
+import java.util.TreeSet;
 
 /**
  * By default
@@ -76,20 +79,78 @@ public class UnitTopologyRunner {
         this.givenStormConfig = stormConfig;
     }
 
+    // -----------------------------
+    // Storm Topology Submit Helper
+    // -----------------------------
+
+    private void run(String topologyId,
+                     int numOfTotalWorkers,
+                     int numOfSpoutTasks,
+                     int numOfRouterBolts,
+                     int numOfAlertBolts,
+                     int numOfPublishTasks,
+                     Config config,
+                     boolean localMode) {
+
+        backtype.storm.Config stormConfig = givenStormConfig == null ? new 
backtype.storm.Config() : givenStormConfig;
+        // TODO: Configurable metric consumer instance number
+
+        int messageTimeoutSecs = config.hasPath(MESSAGE_TIMEOUT_SECS) ? 
config.getInt(MESSAGE_TIMEOUT_SECS) : DEFAULT_MESSAGE_TIMEOUT_SECS;
+        LOG.info("Set topology.message.timeout.secs as {}", 
messageTimeoutSecs);
+        stormConfig.setMessageTimeoutSecs(messageTimeoutSecs);
+
+        if (config.hasPath("metric")) {
+            
stormConfig.registerMetricsConsumer(StormMetricTaggedConsumer.class, 
config.root().render(ConfigRenderOptions.concise()), 1);
+        }
+
+        stormConfig.setNumWorkers(numOfTotalWorkers);
+        StormTopology topology = buildTopology(topologyId, numOfSpoutTasks, 
numOfRouterBolts, numOfAlertBolts, numOfPublishTasks, config);
+
+        if (localMode) {
+            LOG.info("Submitting as local mode");
+            LocalCluster cluster = new LocalCluster();
+            cluster.submitTopology(topologyId, stormConfig, topology);
+            Utils.sleep(Long.MAX_VALUE);
+        } else {
+            LOG.info("Submitting as cluster mode");
+            try {
+                StormSubmitter.submitTopologyWithProgressBar(topologyId, 
stormConfig, topology);
+            } catch (Exception ex) {
+                LOG.error("fail submitting topology {}", topology, ex);
+                throw new IllegalStateException(ex);
+            }
+        }
+    }
+
+    public void run(String topologyId, Config config) {
+        int numOfSpoutTasks = config.getInt(SPOUT_TASK_NUM);
+        int numOfRouterBolts = config.getInt(ROUTER_TASK_NUM);
+        int numOfAlertBolts = config.getInt(ALERT_TASK_NUM);
+        int numOfPublishTasks = config.getInt(PUBLISH_TASK_NUM);
+        boolean localMode = config.getBoolean(LOCAL_MODE);
+        int numOfTotalWorkers = config.getInt(TOTAL_WORKER_NUM);
+        run(topologyId, numOfTotalWorkers, numOfSpoutTasks, numOfRouterBolts, 
numOfAlertBolts, numOfPublishTasks, config, localMode);
+    }
+
+    public IMetadataChangeNotifyService getMetadataChangeNotifyService() {
+        return metadataChangeNotifyService;
+    }
+
+    // ---------------------------
+    // Build Storm Topology
+    // ---------------------------
+
     public StormTopology buildTopology(String topologyId,
                                        int numOfSpoutTasks,
                                        int numOfRouterBolts,
                                        int numOfAlertBolts,
                                        int numOfPublishTasks,
                                        Config config) {
-
         StreamRouterBolt[] routerBolts = new 
StreamRouterBolt[numOfRouterBolts];
         AlertBolt[] alertBolts = new AlertBolt[numOfAlertBolts];
 
-
         TopologyBuilder builder = new TopologyBuilder();
 
-
         // construct Spout object
         CorrelationSpout spout = new CorrelationSpout(config, topologyId, 
getMetadataChangeNotifyService(), numOfRouterBolts, spoutName, 
streamRouterBoltNamePrefix);
         builder.setSpout(spoutName, spout, 
numOfSpoutTasks).setNumTasks(numOfSpoutTasks);
@@ -147,63 +208,66 @@ public class UnitTopologyRunner {
     }
 
     public StormTopology buildTopology(String topologyId, Config config) {
-        int numOfSpoutTasks = config.getInt("topology.numOfSpoutTasks");
-        int numOfRouterBolts = config.getInt("topology.numOfRouterBolts");
-        int numOfAlertBolts = config.getInt("topology.numOfAlertBolts");
-        int numOfPublishTasks = config.getInt("topology.numOfPublishTasks");
+        int numOfSpoutTasks = config.getInt(SPOUT_TASK_NUM);
+        int numOfRouterBolts = config.getInt(ROUTER_TASK_NUM);
+        int numOfAlertBolts = config.getInt(ALERT_TASK_NUM);
+        int numOfPublishTasks = config.getInt(PUBLISH_TASK_NUM);
+
         return buildTopology(topologyId, numOfSpoutTasks, numOfRouterBolts, 
numOfAlertBolts, numOfPublishTasks, config);
     }
 
-    private void run(String topologyId,
-                     int numOfTotalWorkers,
-                     int numOfSpoutTasks,
-                     int numOfRouterBolts,
-                     int numOfAlertBolts,
-                     int numOfPublishTasks,
-                     Config config,
-                     boolean localMode) {
+    // ---------------------------
+    // Build Topology Metadata
+    // ---------------------------
 
-        backtype.storm.Config stormConfig = givenStormConfig == null ? new 
backtype.storm.Config() : givenStormConfig;
-        // TODO: Configurable metric consumer instance number
+    public static Topology buildTopologyMetadata(String topologyId, Config 
config) {
+        int numOfSpoutTasks = config.getInt(SPOUT_TASK_NUM);
+        int numOfRouterBolts = config.getInt(ROUTER_TASK_NUM);
+        int numOfAlertBolts = config.getInt(ALERT_TASK_NUM);
+        int numOfPublishTasks = config.getInt(PUBLISH_TASK_NUM);
 
-        int messageTimeoutSecs = config.hasPath(MESSAGE_TIMEOUT_SECS) ? 
config.getInt(MESSAGE_TIMEOUT_SECS) : DEFAULT_MESSAGE_TIMEOUT_SECS;
-        LOG.info("Set topology.message.timeout.secs as {}", 
messageTimeoutSecs);
-        stormConfig.setMessageTimeoutSecs(messageTimeoutSecs);
+        return buildTopologyMetadata(topologyId, numOfSpoutTasks, 
numOfRouterBolts, numOfAlertBolts, numOfPublishTasks, config);
+    }
 
-        if (config.hasPath("metric")) {
-            
stormConfig.registerMetricsConsumer(StormMetricTaggedConsumer.class, 
config.root().render(ConfigRenderOptions.concise()), 1);
+    public static Topology buildTopologyMetadata(String topologyId,
+                                                 int numOfSpoutTasks,
+                                                 int numOfRouterBolts,
+                                                 int numOfAlertBolts,
+                                                 int numOfPublishTasks,
+                                                 Config config) {
+        Topology topology = new Topology();
+        topology.setName(topologyId);
+        topology.setNumOfSpout(numOfSpoutTasks);
+        topology.setNumOfAlertBolt(numOfAlertBolts);
+        topology.setNumOfGroupBolt(numOfRouterBolts);
+        topology.setNumOfPublishBolt(numOfPublishTasks);
+
+        // Set Spout ID
+        topology.setSpoutId(spoutName);
+
+        // Set Router (Group) ID
+        Set<String> streamRouterBoltNames = new TreeSet<>();
+        for (int i = 0; i < numOfRouterBolts; i++) {
+            streamRouterBoltNames.add(streamRouterBoltNamePrefix + i);
         }
+        topology.setGroupNodeIds(streamRouterBoltNames);
 
-        stormConfig.setNumWorkers(numOfTotalWorkers);
-        StormTopology topology = buildTopology(topologyId, numOfSpoutTasks, 
numOfRouterBolts, numOfAlertBolts, numOfPublishTasks, config);
-
-        if (localMode) {
-            LOG.info("Submitting as local mode");
-            LocalCluster cluster = new LocalCluster();
-            cluster.submitTopology(topologyId, stormConfig, topology);
-            Utils.sleep(Long.MAX_VALUE);
-        } else {
-            LOG.info("Submitting as cluster mode");
-            try {
-                StormSubmitter.submitTopologyWithProgressBar(topologyId, 
stormConfig, topology);
-            } catch (Exception ex) {
-                LOG.error("fail submitting topology {}", topology, ex);
-                throw new IllegalStateException(ex);
-            }
+        // Set Alert Bolt ID
+        Set<String> alertBoltIds = new TreeSet<>();
+        for (int i = 0; i < numOfAlertBolts; i++) {
+            alertBoltIds.add(alertBoltNamePrefix + i);
         }
-    }
+        topology.setAlertBoltIds(alertBoltIds);
 
-    public void run(String topologyId, Config config) {
-        int numOfSpoutTasks = config.getInt(SPOUT_TASK_NUM);
-        int numOfRouterBolts = config.getInt(ROUTER_TASK_NUM);
-        int numOfAlertBolts = config.getInt(ALERT_TASK_NUM);
-        int numOfPublishTasks = config.getInt(PUBLISH_TASK_NUM);
-        boolean localMode = config.getBoolean(LOCAL_MODE);
-        int numOfTotalWorkers = config.getInt(TOTAL_WORKER_NUM);
-        run(topologyId, numOfTotalWorkers, numOfSpoutTasks, numOfRouterBolts, 
numOfAlertBolts, numOfPublishTasks, config, localMode);
-    }
+        // Set Publisher ID
+        topology.setPubBoltId(alertPublishBoltName);
 
-    public IMetadataChangeNotifyService getMetadataChangeNotifyService() {
-        return metadataChangeNotifyService;
+        // TODO: Load bolts' parallelism from configuration, currently keep 1 
by default.
+
+        topology.setSpoutParallelism(1);
+        topology.setGroupParallelism(1);
+        topology.setAlertParallelism(1);
+
+        return topology;
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fc2407cd/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/ApplicationLifecycle.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/ApplicationLifecycle.java
 
b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/ApplicationLifecycle.java
index 94b6195..e482615 100644
--- 
a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/ApplicationLifecycle.java
+++ 
b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/ApplicationLifecycle.java
@@ -16,28 +16,35 @@
  */
 package org.apache.eagle.app;
 
+/**
+ * Application Lifecycle Listener/Callback.
+ */
 public interface ApplicationLifecycle {
     /**
-     * on application installed.
+     * After Application Installed. (Callback)
      */
-    default void onInstall() {
+    default void afterInstall() {
+        // Do nothing by default
     }
 
     /**
-     * on application uninstalled.
+     * After Application Uninstalled. (Callback)
      */
-    default void onUninstall() {
+    default void afterUninstall() {
+        // Do nothing by default
     }
 
     /**
-     * onStart.
+     * Before Application Start. (Prepare)
      */
-    default void onStart() {
+    default void beforeStart() {
+        // Do nothing by default
     }
 
     /**
-     * onStop.
+     * After Application Stopped. (Callback)
      */
-    default void onStop() {
+    default void afterStop() {
+        // Do nothing by default
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fc2407cd/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/ExecutionRuntimeManager.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/ExecutionRuntimeManager.java
 
b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/ExecutionRuntimeManager.java
index 96f171e..d51b8b5 100644
--- 
a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/ExecutionRuntimeManager.java
+++ 
b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/ExecutionRuntimeManager.java
@@ -64,7 +64,9 @@ public class ExecutionRuntimeManager {
             ExecutionRuntime<E, P> runtime = ((ExecutionRuntimeProvider<E, P>) 
executionRuntimeProviders.get(environment.getClass())).get();
             runtime.prepare(environment);
             executionRuntimeCache.put(environment, runtime);
-            LOGGER.info("Created new execution runtime {} for environment: 
{}", runtime, environment);
+            if (LOGGER.isDebugEnabled()) {
+                LOGGER.debug("Created new execution runtime {} for 
environment: {}", runtime, environment);
+            }
             return runtime;
         } else {
             LOGGER.error("No matched execution runtime found for environment: 
" + environment);

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fc2407cd/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/impl/StormExecutionRuntime.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/impl/StormExecutionRuntime.java
 
b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/impl/StormExecutionRuntime.java
index 1994e28..fb4aff9 100644
--- 
a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/impl/StormExecutionRuntime.java
+++ 
b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/impl/StormExecutionRuntime.java
@@ -137,7 +137,7 @@ public class StormExecutionRuntime implements 
ExecutionRuntime<StormEnvironment,
     @Override
     public void stop(Application<StormEnvironment, StormTopology> executor, 
com.typesafe.config.Config config) {
         String appId = config.getString("appId");
-        LOG.info("Stopping topology {} ..." + appId);
+        LOG.info("Stopping topology {} ...", appId);
         if (Objects.equals(config.getString("mode"), 
ApplicationEntity.Mode.CLUSTER.name())) {
             Nimbus.Client stormClient = 
NimbusClient.getConfiguredClient(getStormConfig(config)).getClient();
             try {
@@ -151,7 +151,7 @@ public class StormExecutionRuntime implements 
ExecutionRuntime<StormEnvironment,
             killOptions.set_wait_secs(0);
             getLocalCluster().killTopologyWithOpts(appId,killOptions);
         }
-        LOG.info("Stopped topology {} ..." + appId);
+        LOG.info("Stopped topology {}", appId);
     }
 
     @Override
@@ -159,6 +159,7 @@ public class StormExecutionRuntime implements 
ExecutionRuntime<StormEnvironment,
         String appId = config.getString("appId");
         LOG.info("Fetching status of topology {} ...", appId);
         List<TopologySummary> topologySummaries ;
+        ApplicationEntity.Status status = null;
         try {
             if (Objects.equals(config.getString("mode"), 
ApplicationEntity.Mode.CLUSTER.name())) {
                 Nimbus.Client stormClient = 
NimbusClient.getConfiguredClient(getStormConfig(config)).getClient();
@@ -166,22 +167,28 @@ public class StormExecutionRuntime implements 
ExecutionRuntime<StormEnvironment,
             } else {
                 topologySummaries = 
getLocalCluster().getClusterInfo().get_topologies();
             }
+
             for (TopologySummary topologySummary : topologySummaries) {
                 if (topologySummary.get_name().equalsIgnoreCase(appId)) {
                     if 
(topologySummary.get_status().equalsIgnoreCase("ACTIVE")) {
-                        return ApplicationEntity.Status.RUNNING;
+                        status = ApplicationEntity.Status.RUNNING;
                     } else if 
(topologySummary.get_status().equalsIgnoreCase("INACTIVE")) {
-                        return ApplicationEntity.Status.STOPPED;
+                        status = ApplicationEntity.Status.STOPPED;
                     } else if 
(topologySummary.get_status().equalsIgnoreCase("KILLED")) {
-                        return ApplicationEntity.Status.REMOVED;
+                        status = ApplicationEntity.Status.REMOVED;
                     }
                 }
             }
             //If not exist, return removed
-            return ApplicationEntity.Status.REMOVED;
+            if (status == null) {
+                status = ApplicationEntity.Status.REMOVED;
+            }
         } catch (TException e) {
-            return ApplicationEntity.Status.UNKNOWN;
+            LOG.error("Got error to fetch status of {}", appId, e);
+            status = ApplicationEntity.Status.UNKNOWN;
         }
+        LOG.info("Status of {}: {}", appId, status);
+        return status;
     }
 
     public static class Provider implements 
ExecutionRuntimeProvider<StormEnvironment,StormTopology> {

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fc2407cd/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/ApplicationAction.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/ApplicationAction.java
 
b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/ApplicationAction.java
new file mode 100644
index 0000000..cecd81e
--- /dev/null
+++ 
b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/ApplicationAction.java
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.app.service;
+
+import com.google.common.base.Preconditions;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import org.apache.eagle.alert.coordination.model.Kafka2TupleMetadata;
+import org.apache.eagle.alert.coordination.model.Tuple2StreamMetadata;
+import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
+import org.apache.eagle.alert.engine.scheme.JsonScheme;
+import org.apache.eagle.alert.engine.scheme.JsonStringStreamNameSelector;
+import org.apache.eagle.alert.metadata.IMetadataDao;
+import org.apache.eagle.app.Application;
+import org.apache.eagle.app.environment.ExecutionRuntime;
+import org.apache.eagle.app.environment.ExecutionRuntimeManager;
+import org.apache.eagle.app.sink.KafkaStreamSinkConfig;
+import org.apache.eagle.metadata.model.ApplicationEntity;
+import org.apache.eagle.metadata.model.StreamDesc;
+import org.apache.eagle.metadata.model.StreamSinkConfig;
+
+import java.io.Serializable;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.stream.Collectors;
+
+/**
+ * Managed Application Action: org.apache.eagle.app.service.ApplicationAction
+ * <ul>
+ * <li>Application Metadata Entity (Persistence): 
org.apache.eagle.metadata.model.ApplicationEntity</li>
+ * <li>Application Processing Logic (Execution): 
org.apache.eagle.app.Application</li>
+ * <li>Application Lifecycle Listener (Installation): 
org.apache.eagle.app.ApplicationLifecycle</li>
+ * </ul>
+ */
+public class ApplicationAction implements Serializable {
+    private final Config config;
+    private final Application application;
+    private final ExecutionRuntime runtime;
+    private final ApplicationEntity metadata;
+    private final IMetadataDao alertMetadataService;
+
+    /**
+     * @param metadata    ApplicationEntity.
+     * @param application Application.
+     */
+    public ApplicationAction(Application application, ApplicationEntity 
metadata, Config envConfig, IMetadataDao alertMetadataService) {
+        Preconditions.checkNotNull(application, "Application is null");
+        Preconditions.checkNotNull(metadata, "ApplicationEntity is null");
+        this.application = application;
+        this.metadata = metadata;
+        this.runtime = 
ExecutionRuntimeManager.getInstance().getRuntime(application.getEnvironmentType(),
 envConfig);
+        Map<String, Object> executionConfig = metadata.getConfiguration();
+        if (executionConfig == null) {
+            executionConfig = Collections.emptyMap();
+        }
+
+        // TODO: Decouple hardcoded configuration key
+        executionConfig.put("siteId", metadata.getSite().getSiteId());
+        executionConfig.put("mode", metadata.getMode().name());
+        executionConfig.put("appId", metadata.getAppId());
+        executionConfig.put("jarPath", metadata.getJarPath());
+        this.config = 
ConfigFactory.parseMap(executionConfig).withFallback(envConfig);
+        this.alertMetadataService = alertMetadataService;
+    }
+
+    /**
+     * Generate global unique streamId to install.
+     * TODO refactor with streamId and siteId
+     */
+    private static String generateUniqueStreamId(String siteId,String 
streamTypeId) {
+        return String.format("%s_%s",streamTypeId,siteId).toUpperCase();
+    }   
+
+    public void doInstall() {
+        if (metadata.getDescriptor().getStreams() != null) {
+            List<StreamDesc> streamDescToInstall = 
metadata.getDescriptor().getStreams().stream().map((streamDefinition -> {
+                StreamDefinition copied = streamDefinition.copy();
+                copied.setSiteId(metadata.getSite().getSiteId());
+                
copied.setStreamId(generateUniqueStreamId(metadata.getSite().getSiteId(),copied.getStreamId()));
+                StreamSinkConfig streamSinkConfig = 
this.runtime.environment().streamSink().getSinkConfig(copied.getStreamId(), 
this.config);
+                StreamDesc streamDesc = new StreamDesc();
+                streamDesc.setSchema(copied);
+                streamDesc.setSink(streamSinkConfig);
+                streamDesc.setStreamId(copied.getStreamId());
+                return streamDesc;
+            })).collect(Collectors.toList());
+            metadata.setStreams(streamDescToInstall);
+
+            // TODO: Decouple converting from StreamSink to Alert DataSource
+            // iterate each stream descriptor and create alert datasource for 
each
+            for (StreamDesc streamDesc : streamDescToInstall) {
+                // only take care of Kafka sink
+                if (streamDesc.getSink() instanceof KafkaStreamSinkConfig) {
+                    KafkaStreamSinkConfig kafkaCfg = (KafkaStreamSinkConfig) 
streamDesc.getSink();
+                    Kafka2TupleMetadata datasource = new Kafka2TupleMetadata();
+                    datasource.setType("KAFKA");
+                    datasource.setName(metadata.getAppId());
+                    datasource.setTopic(kafkaCfg.getTopicId());
+                    
datasource.setSchemeCls(JsonScheme.class.getCanonicalName());
+                    Tuple2StreamMetadata tuple2Stream = new 
Tuple2StreamMetadata();
+                    Properties prop = new Properties();
+                    
prop.put(JsonStringStreamNameSelector.USER_PROVIDED_STREAM_NAME_PROPERTY, 
streamDesc.getStreamId());
+                    tuple2Stream.setStreamNameSelectorProp(prop);
+                    tuple2Stream.setTimestampColumn("timestamp");
+                    
tuple2Stream.setStreamNameSelectorCls(JsonStringStreamNameSelector.class.getCanonicalName());
+                    datasource.setCodec(tuple2Stream);
+                    alertMetadataService.addDataSource(datasource);
+
+                    StreamDefinition sd = streamDesc.getSchema();
+                    sd.setDataSource(metadata.getAppId());
+                    alertMetadataService.createStream(streamDesc.getSchema());
+                }
+            }
+        }
+    }
+
+    public void doUninstall() {
+        // we should remove alert data source and stream definition while we 
do uninstall
+        if (metadata.getStreams() == null) {
+            return;
+        }
+        // iterate each stream descriptor and create alert datasource for each
+        for (StreamDesc streamDesc : metadata.getStreams()) {
+            alertMetadataService.removeDataSource(metadata.getAppId());
+            alertMetadataService.removeStream(streamDesc.getStreamId());
+        }
+    }
+
+    public void doStart() {
+        this.runtime.start(this.application, this.config);
+    }
+
+    @SuppressWarnings("unchecked")
+    public void doStop() {
+        this.runtime.stop(this.application, this.config);
+    }
+
+    public ApplicationEntity.Status getStatus() {
+        return this.runtime.status(this.application, this.config);
+    }
+
+    public ApplicationEntity getMetadata() {
+        return metadata;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fc2407cd/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/ApplicationListener.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/ApplicationListener.java
 
b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/ApplicationListener.java
new file mode 100644
index 0000000..4a7f0c6
--- /dev/null
+++ 
b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/ApplicationListener.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.app.service;
+
+import org.apache.eagle.app.ApplicationLifecycle;
+import org.apache.eagle.metadata.model.ApplicationEntity;
+
+/**
+ * Application Lifecycle/Management Listener (Guice Aware).
+ * Currently only listen on application lifecycle , may extend to more later.
+ */
+public interface ApplicationListener extends ApplicationLifecycle {
+    /**
+     * @param applicationEntity ApplicationEntity.
+     */
+    void init(ApplicationEntity applicationEntity);
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fc2407cd/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/ApplicationOperationContext.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/ApplicationOperationContext.java
 
b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/ApplicationOperationContext.java
deleted file mode 100644
index 3561374..0000000
--- 
a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/ApplicationOperationContext.java
+++ /dev/null
@@ -1,167 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.app.service;
-
-import org.apache.eagle.alert.coordination.model.Kafka2TupleMetadata;
-import org.apache.eagle.alert.coordination.model.Tuple2StreamMetadata;
-import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
-import org.apache.eagle.alert.engine.scheme.JsonScheme;
-import org.apache.eagle.alert.engine.scheme.JsonStringStreamNameSelector;
-import org.apache.eagle.alert.metadata.IMetadataDao;
-import org.apache.eagle.app.Application;
-import org.apache.eagle.app.ApplicationLifecycle;
-import org.apache.eagle.app.environment.ExecutionRuntime;
-import org.apache.eagle.app.environment.ExecutionRuntimeManager;
-import org.apache.eagle.app.sink.KafkaStreamSinkConfig;
-import org.apache.eagle.metadata.model.ApplicationEntity;
-import org.apache.eagle.metadata.model.StreamDesc;
-import org.apache.eagle.metadata.model.StreamSinkConfig;
-import com.google.common.base.Preconditions;
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigFactory;
-
-import java.io.Serializable;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-import java.util.stream.Collectors;
-
-/**
- * Managed Application Interface: 
org.apache.eagle.app.service.ApplicationOperationContext
- * <ul>
- * <li>Application Metadata Entity (Persistence): 
org.apache.eagle.metadata.model.ApplicationEntity</li>
- * <li>Application Processing Logic (Execution): 
org.apache.eagle.app.Application</li>
- * <li>Application Lifecycle Listener (Installation): 
org.apache.eagle.app.ApplicationLifecycle</li>
- * </ul>
- */
-public class ApplicationOperationContext implements Serializable, 
ApplicationLifecycle {
-    private final Config config;
-    private final Application application;
-    private final ExecutionRuntime runtime;
-    private final ApplicationEntity metadata;
-    private final IMetadataDao alertMetadataService;
-
-    /**
-     * @param metadata    ApplicationEntity.
-     * @param application Application.
-     */
-    public ApplicationOperationContext(Application application, 
ApplicationEntity metadata, Config envConfig, IMetadataDao 
alertMetadataService) {
-        Preconditions.checkNotNull(application, "Application is null");
-        Preconditions.checkNotNull(metadata, "ApplicationEntity is null");
-        this.application = application;
-        this.metadata = metadata;
-        this.runtime = 
ExecutionRuntimeManager.getInstance().getRuntime(application.getEnvironmentType(),
 envConfig);
-        Map<String, Object> executionConfig = metadata.getConfiguration();
-        if (executionConfig == null) {
-            executionConfig = Collections.emptyMap();
-        }
-
-        // TODO: Decouple hardcoded configuration key
-        executionConfig.put("siteId", metadata.getSite().getSiteId());
-        executionConfig.put("mode", metadata.getMode().name());
-        executionConfig.put("appId", metadata.getAppId());
-        executionConfig.put("jarPath", metadata.getJarPath());
-        this.config = 
ConfigFactory.parseMap(executionConfig).withFallback(envConfig);
-        this.alertMetadataService = alertMetadataService;
-    }
-
-    /**
-     * Generate global unique streamId to install.
-     * TODO refactor with streamId and siteId
-     */
-    private static String generateUniqueStreamId(String siteId,String 
streamTypeId) {
-        return String.format("%s_%s",streamTypeId,siteId).toUpperCase();
-    }   
-
-    @Override
-    public void onInstall() {
-        if (metadata.getDescriptor().getStreams() != null) {
-            List<StreamDesc> streamDescToInstall = 
metadata.getDescriptor().getStreams().stream().map((streamDefinition -> {
-                StreamDefinition copied = streamDefinition.copy();
-                copied.setSiteId(metadata.getSite().getSiteId());
-                
copied.setStreamId(generateUniqueStreamId(metadata.getSite().getSiteId(),copied.getStreamId()));
-                StreamSinkConfig streamSinkConfig = 
this.runtime.environment().streamSink().getSinkConfig(copied.getStreamId(), 
this.config);
-                StreamDesc streamDesc = new StreamDesc();
-                streamDesc.setSchema(copied);
-                streamDesc.setSink(streamSinkConfig);
-                streamDesc.setStreamId(copied.getStreamId());
-                return streamDesc;
-            })).collect(Collectors.toList());
-            metadata.setStreams(streamDescToInstall);
-
-            // TODO: Decouple converting from StreamSink to Alert DataSource
-            // iterate each stream descriptor and create alert datasource for 
each
-            for (StreamDesc streamDesc : streamDescToInstall) {
-                // only take care of Kafka sink
-                if (streamDesc.getSink() instanceof KafkaStreamSinkConfig) {
-                    KafkaStreamSinkConfig kafkaCfg = (KafkaStreamSinkConfig) 
streamDesc.getSink();
-                    Kafka2TupleMetadata datasource = new Kafka2TupleMetadata();
-                    datasource.setType("KAFKA");
-                    datasource.setName(metadata.getAppId());
-                    datasource.setTopic(kafkaCfg.getTopicId());
-                    
datasource.setSchemeCls(JsonScheme.class.getCanonicalName());
-                    Tuple2StreamMetadata tuple2Stream = new 
Tuple2StreamMetadata();
-                    Properties prop = new Properties();
-                    
prop.put(JsonStringStreamNameSelector.USER_PROVIDED_STREAM_NAME_PROPERTY, 
streamDesc.getStreamId());
-                    tuple2Stream.setStreamNameSelectorProp(prop);
-                    tuple2Stream.setTimestampColumn("timestamp");
-                    
tuple2Stream.setStreamNameSelectorCls(JsonStringStreamNameSelector.class.getCanonicalName());
-                    datasource.setCodec(tuple2Stream);
-                    alertMetadataService.addDataSource(datasource);
-
-                    StreamDefinition sd = streamDesc.getSchema();
-                    sd.setDataSource(metadata.getAppId());
-                    alertMetadataService.createStream(streamDesc.getSchema());
-                }
-            }
-        }
-    }
-
-    @Override
-    public void onUninstall() {
-        // we should remove alert data source and stream definition while we 
do uninstall
-        if (metadata.getStreams() == null) {
-            return;
-        }
-        // iterate each stream descriptor and create alert datasource for each
-        for (StreamDesc streamDesc : metadata.getStreams()) {
-            alertMetadataService.removeDataSource(metadata.getAppId());
-            alertMetadataService.removeStream(streamDesc.getStreamId());
-        }
-    }
-
-    @SuppressWarnings("unchecked")
-    @Override
-    public void onStart() {
-        this.runtime.start(this.application, this.config);
-    }
-
-    @SuppressWarnings("unchecked")
-    @Override
-    public void onStop() {
-        this.runtime.stop(this.application, this.config);
-    }
-
-    public ApplicationEntity.Status getStatus() {
-        return this.runtime.status(this.application, this.config);
-    }
-
-    public ApplicationEntity getMetadata() {
-        return metadata;
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fc2407cd/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/impl/ApplicationManagementServiceImpl.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/impl/ApplicationManagementServiceImpl.java
 
b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/impl/ApplicationManagementServiceImpl.java
index abd9197..6dee1fc 100644
--- 
a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/impl/ApplicationManagementServiceImpl.java
+++ 
b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/impl/ApplicationManagementServiceImpl.java
@@ -18,14 +18,12 @@ package org.apache.eagle.app.service.impl;
 
 import com.google.common.base.Preconditions;
 import com.google.inject.Inject;
+import com.google.inject.Injector;
 import com.google.inject.Singleton;
 import com.typesafe.config.Config;
 import org.apache.eagle.alert.metadata.IMetadataDao;
 import org.apache.eagle.app.Application;
-import org.apache.eagle.app.service.ApplicationManagementService;
-import org.apache.eagle.app.service.ApplicationOperations;
-import org.apache.eagle.app.service.ApplicationOperationContext;
-import org.apache.eagle.app.service.ApplicationProviderService;
+import org.apache.eagle.app.service.*;
 import org.apache.eagle.app.spi.ApplicationProvider;
 import org.apache.eagle.metadata.exceptions.ApplicationWrongStatusException;
 import org.apache.eagle.metadata.exceptions.EntityNotFoundException;
@@ -41,12 +39,15 @@ import java.util.Map;
 
 @Singleton
 public class ApplicationManagementServiceImpl implements 
ApplicationManagementService {
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(ApplicationManagementServiceImpl.class);
+
     private final SiteEntityService siteEntityService;
     private final ApplicationProviderService applicationProviderService;
     private final ApplicationEntityService applicationEntityService;
     private final IMetadataDao alertMetadataService;
     private final Config config;
-    private static final Logger LOGGER = 
LoggerFactory.getLogger(ApplicationManagementServiceImpl.class);
+
+    @Inject private Injector currentInjector;
 
     @Inject
     public ApplicationManagementServiceImpl(
@@ -99,13 +100,26 @@ public class ApplicationManagementServiceImpl implements 
ApplicationManagementSe
         }
         applicationEntity.setConfiguration(appConfig);
 
+        // Validate Dependency
         validateDependingApplicationInstalled(applicationEntity);
 
-        ApplicationOperationContext applicationOperationContext = new 
ApplicationOperationContext(
-            
applicationProviderService.getApplicationProviderByType(applicationEntity.getDescriptor().getType()).getApplication(),
-            applicationEntity, config, alertMetadataService);
-        applicationOperationContext.onInstall();
-        return applicationEntityService.create(applicationEntity);
+        ApplicationProvider<?> applicationProvider = 
applicationProviderService.getApplicationProviderByType(applicationEntity.getDescriptor().getType());
+
+        // DoInstall
+        ApplicationAction applicationAction = new 
ApplicationAction(applicationProvider.getApplication(), applicationEntity, 
config, alertMetadataService);
+        applicationAction.doInstall();
+
+        // UpdateMetadata
+        ApplicationEntity result =  
applicationEntityService.create(applicationEntity);
+
+        // AfterInstall Callback
+        applicationProvider.getApplicationListener().ifPresent((listener) -> {
+            currentInjector.injectMembers(listener);
+            listener.init(result);
+            listener.afterInstall();
+        });
+
+        return result;
     }
 
     private void validateDependingApplicationInstalled(ApplicationEntity 
applicationEntity) {
@@ -120,18 +134,23 @@ public class ApplicationManagementServiceImpl implements 
ApplicationManagementSe
 
     @Override
     public ApplicationEntity 
uninstall(ApplicationOperations.UninstallOperation operation) throws 
ApplicationWrongStatusException {
-        ApplicationEntity applicationEntity = 
applicationEntityService.getByUUIDOrAppId(operation.getUuid(), 
operation.getAppId());
-        ApplicationOperationContext applicationOperationContext = new 
ApplicationOperationContext(
-            
applicationProviderService.getApplicationProviderByType(applicationEntity.getDescriptor().getType()).getApplication(),
-            applicationEntity, config, alertMetadataService);
+        ApplicationEntity appEntity = 
applicationEntityService.getByUUIDOrAppId(operation.getUuid(), 
operation.getAppId());
+        ApplicationProvider<?> appProvider = 
applicationProviderService.getApplicationProviderByType(appEntity.getDescriptor().getType());
 
-        ApplicationEntity.Status currentStatus = applicationEntity.getStatus();
+        ApplicationAction appAction = new 
ApplicationAction(appProvider.getApplication(), appEntity, config, 
alertMetadataService);
+        ApplicationEntity.Status currentStatus = appEntity.getStatus();
         try {
             if (currentStatus == ApplicationEntity.Status.INITIALIZED || 
currentStatus == ApplicationEntity.Status.STOPPED) {
-                applicationOperationContext.onUninstall();
-                return applicationEntityService.delete(applicationEntity);
+                // AfterUninstall Callback
+                appAction.doUninstall();
+                appProvider.getApplicationListener().ifPresent((listener) -> {
+                    currentInjector.injectMembers(listener);
+                    listener.init(appEntity);
+                    listener.afterUninstall();
+                });
+                return applicationEntityService.delete(appEntity);
             } else {
-                throw new ApplicationWrongStatusException("App: " + 
applicationEntity.getAppId() + " status is" + currentStatus + ", uninstall 
operation is not allowed");
+                throw new ApplicationWrongStatusException("App: " + 
appEntity.getAppId() + " status is" + currentStatus + ", uninstall operation is 
not allowed");
             }
         } catch (Throwable throwable) {
             LOGGER.error(throwable.getMessage(), throwable);
@@ -141,49 +160,59 @@ public class ApplicationManagementServiceImpl implements 
ApplicationManagementSe
 
     @Override
     public ApplicationEntity start(ApplicationOperations.StartOperation 
operation) throws ApplicationWrongStatusException {
-        ApplicationEntity applicationEntity = 
applicationEntityService.getByUUIDOrAppId(operation.getUuid(), 
operation.getAppId());
-        Application application = 
applicationProviderService.getApplicationProviderByType(applicationEntity.getDescriptor().getType()).getApplication();
+        ApplicationEntity appEntity = 
applicationEntityService.getByUUIDOrAppId(operation.getUuid(), 
operation.getAppId());
+        ApplicationProvider<?> appProvider = 
applicationProviderService.getApplicationProviderByType(appEntity.getDescriptor().getType());
+        Application application = appProvider.getApplication();
         Preconditions.checkArgument(application.isExecutable(), "Application 
is not executable");
 
-        ApplicationEntity.Status currentStatus = applicationEntity.getStatus();
+        ApplicationEntity.Status currentStatus = appEntity.getStatus();
         try {
             if (currentStatus == ApplicationEntity.Status.INITIALIZED || 
currentStatus == ApplicationEntity.Status.STOPPED) {
-                ApplicationOperationContext applicationOperationContext = new 
ApplicationOperationContext(
-                        application, applicationEntity, config, 
alertMetadataService);
-
-                applicationOperationContext.onStart();
-                //Only when topology submitted successfully can the state 
change to STARTING
-                applicationEntityService.delete(applicationEntity);
-                applicationEntity.setStatus(ApplicationEntity.Status.STARTING);
-                return applicationEntityService.create(applicationEntity);
+                ApplicationAction applicationAction = new 
ApplicationAction(application, appEntity, config, alertMetadataService);
+                // AfterInstall Callback
+                appProvider.getApplicationListener().ifPresent((listener) -> {
+                    currentInjector.injectMembers(listener);
+                    listener.init(appEntity);
+                    listener.beforeStart();
+                });
+                applicationAction.doStart();
+
+                //TODO: Only when topology submitted successfully can the 
state change to STARTING
+                applicationEntityService.delete(appEntity);
+                appEntity.setStatus(ApplicationEntity.Status.STARTING);
+                return applicationEntityService.create(appEntity);
             } else {
-                throw new ApplicationWrongStatusException("App: " + 
applicationEntity.getAppId() + " status is " + currentStatus + " start 
operation is not allowed");
+                throw new ApplicationWrongStatusException("App: " + 
appEntity.getAppId() + " status is " + currentStatus + " start operation is not 
allowed");
             }
         } catch (ApplicationWrongStatusException e) {
             LOGGER.error(e.getMessage(), e);
             throw e;
         } catch (Exception e) {
-            LOGGER.error("Failed to start app " + 
applicationEntity.getAppId(), e);
+            LOGGER.error("Failed to start app " + appEntity.getAppId(), e);
             throw e;
         } catch (Throwable throwable) {
             LOGGER.error(throwable.getMessage(), throwable);
             throw throwable;
         }
-
     }
 
     @Override
     public ApplicationEntity stop(ApplicationOperations.StopOperation 
operation) throws ApplicationWrongStatusException {
         ApplicationEntity applicationEntity = 
applicationEntityService.getByUUIDOrAppId(operation.getUuid(), 
operation.getAppId());
-        Application application = 
applicationProviderService.getApplicationProviderByType(applicationEntity.getDescriptor().getType()).getApplication();
+        ApplicationProvider<?> appProvider = 
applicationProviderService.getApplicationProviderByType(applicationEntity.getDescriptor().getType());
+        Application application = appProvider.getApplication();
         Preconditions.checkArgument(application.isExecutable(), "Application 
is not executable");
 
-        ApplicationOperationContext applicationOperationContext = new 
ApplicationOperationContext(
-                application, applicationEntity, config, alertMetadataService);
+        ApplicationAction applicationAction = new 
ApplicationAction(application, applicationEntity, config, alertMetadataService);
         ApplicationEntity.Status currentStatus = applicationEntity.getStatus();
         try {
             if (currentStatus == ApplicationEntity.Status.RUNNING) {
-                applicationOperationContext.onStop();
+                applicationAction.doStop();
+                appProvider.getApplicationListener().ifPresent((listener) -> {
+                    currentInjector.injectMembers(listener);
+                    listener.init(applicationEntity);
+                    listener.afterStop();
+                });
                 //stop -> directly killed
                 applicationEntityService.delete(applicationEntity);
                 applicationEntity.setStatus(ApplicationEntity.Status.STOPPING);
@@ -210,14 +239,11 @@ public class ApplicationManagementServiceImpl implements 
ApplicationManagementSe
             Application application = 
applicationProviderService.getApplicationProviderByType(applicationEntity.getDescriptor().getType()).getApplication();
             Preconditions.checkArgument(application.isExecutable(), 
"Application is not executable");
 
-            ApplicationOperationContext applicationOperationContext = new 
ApplicationOperationContext(
-                    application, applicationEntity, config, 
alertMetadataService);
-            ApplicationEntity.Status topologyStatus = 
applicationOperationContext.getStatus();
-            return topologyStatus;
+            ApplicationAction applicationAction = new 
ApplicationAction(application, applicationEntity, config, alertMetadataService);
+            return applicationAction.getStatus();
         } catch (IllegalArgumentException e) {
             LOGGER.error("application id not exist", e);
             throw e;
         }
     }
-
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fc2407cd/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/impl/ApplicationProviderConfigLoader.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/impl/ApplicationProviderConfigLoader.java
 
b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/impl/ApplicationProviderConfigLoader.java
index 025bc7c..1455922 100644
--- 
a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/impl/ApplicationProviderConfigLoader.java
+++ 
b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/impl/ApplicationProviderConfigLoader.java
@@ -99,7 +99,6 @@ public class ApplicationProviderConfigLoader extends 
ApplicationProviderLoader {
             throw new RuntimeException("providerClassName is not 
implementation of " + ApplicationProvider.class.getCanonicalName());
         }
         ApplicationProvider provider = (ApplicationProvider) 
providerClass.newInstance();
-        provider.prepare(providerConfig, this.getConfig());
         Preconditions.checkNotNull(provider.getApplicationDesc(), "appDesc is 
null");
         Preconditions.checkNotNull(provider.getApplicationDesc().getType(), 
"type is null");
         registerProvider(provider);

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fc2407cd/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/impl/ApplicationProviderSPILoader.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/impl/ApplicationProviderSPILoader.java
 
b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/impl/ApplicationProviderSPILoader.java
index 9f52c9c..cbbd438 100644
--- 
a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/impl/ApplicationProviderSPILoader.java
+++ 
b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/impl/ApplicationProviderSPILoader.java
@@ -81,7 +81,6 @@ public class ApplicationProviderSPILoader extends 
ApplicationProviderLoader {
                 
providerConfig.setClassName(applicationProvider.getClass().getCanonicalName());
                 
providerConfig.setJarPath(jarFileSupplier.apply(applicationProvider));
                 
applicationProvider.getApplicationDesc().setExecutable(applicationProvider.getApplication().isExecutable());
-                applicationProvider.prepare(providerConfig, getConfig());
                 registerProvider(applicationProvider);
                 LOG.warn("Loaded {}:{} ({}) from {}",
                     applicationProvider.getApplicationDesc().getType(),

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fc2407cd/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/KafkaStreamSink.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/KafkaStreamSink.java
 
b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/KafkaStreamSink.java
index 2a03275..2ac4779 100644
--- 
a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/KafkaStreamSink.java
+++ 
b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/KafkaStreamSink.java
@@ -65,7 +65,7 @@ public class KafkaStreamSink extends 
StormStreamSink<KafkaStreamSinkConfig> {
     }
 
     @Override
-    public void onInstall() {
+    public void afterInstall() {
         ensureTopicCreated();
     }
 
@@ -85,7 +85,7 @@ public class KafkaStreamSink extends 
StormStreamSink<KafkaStreamSinkConfig> {
     }
 
     @Override
-    public void onUninstall() {
+    public void afterUninstall() {
         ensureTopicDeleted();
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fc2407cd/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/LoggingStreamSink.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/LoggingStreamSink.java
 
b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/LoggingStreamSink.java
index 61a6836..8256aba 100644
--- 
a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/LoggingStreamSink.java
+++ 
b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/LoggingStreamSink.java
@@ -27,13 +27,13 @@ public class LoggingStreamSink extends 
StormStreamSink<DefaultStreamSinkConfig>
     private static final Logger LOGGER = 
LoggerFactory.getLogger(KafkaStreamSink.class);
 
     @Override
-    public void onInstall() {
-        LOGGER.info("Executing onInstall callback, do nothing");
+    public void afterInstall() {
+        LOGGER.info("Executing afterInstall callback, do nothing");
     }
 
     @Override
-    public void onUninstall() {
-        LOGGER.info("Executing onUninstall callback, do nothing");
+    public void afterUninstall() {
+        LOGGER.info("Executing afterUninstall callback, do nothing");
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fc2407cd/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/spi/AbstractApplicationProvider.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/spi/AbstractApplicationProvider.java
 
b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/spi/AbstractApplicationProvider.java
index 06ac703..2a8d7c0 100644
--- 
a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/spi/AbstractApplicationProvider.java
+++ 
b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/spi/AbstractApplicationProvider.java
@@ -17,20 +17,31 @@
 
 package org.apache.eagle.app.spi;
 
+import com.google.common.base.Preconditions;
+import com.google.inject.AbstractModule;
 import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
 import org.apache.eagle.app.Application;
+import org.apache.eagle.app.service.ApplicationListener;
+import org.apache.eagle.common.module.GlobalScope;
 import org.apache.eagle.common.module.ModuleRegistry;
+import org.apache.eagle.common.module.ModuleScope;
 import org.apache.eagle.metadata.model.ApplicationDesc;
 import org.apache.eagle.metadata.model.ApplicationDocs;
+import org.apache.eagle.metadata.model.ApplicationEntity;
 import org.apache.eagle.metadata.model.Configuration;
+import org.apache.eagle.metadata.persistence.MetadataStore;
+import org.apache.eagle.metadata.service.memory.MemoryMetadataStore;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import javax.xml.bind.JAXBException;
+import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
+import java.util.Optional;
 
 /**
- * Describe Application metadata with XML descriptor configuration in path of: 
 /META-INF/providers/${ApplicationProviderClassName}.xml
+ * Describe Application metadata with XML descriptor configuration in path of: 
 /META-INF/providers/${ApplicationProviderClassName}.xml.
  */
 public abstract class AbstractApplicationProvider<T extends Application> 
implements ApplicationProvider<T> {
     private static final Logger LOG = 
LoggerFactory.getLogger(AbstractApplicationProvider.class);
@@ -96,8 +107,43 @@ public abstract class AbstractApplicationProvider<T extends 
Application> impleme
         return applicationDesc;
     }
 
+    private ModuleRegistry currentRegistry;
+
     @Override
-    public void register(ModuleRegistry registry) {
+    public final void register(ModuleRegistry registry) {
         LOG.debug("Registering modules {}", this.getClass().getName());
+        this.currentRegistry = registry;
+        onRegister();
+    }
+
+    @Override
+    public Optional<ApplicationListener> getApplicationListener() {
+        return Optional.empty();
+    }
+
+    protected void onRegister() {
+        // Do nothing by default;
+    }
+
+    protected  <M extends ModuleScope,T> void bind(Class<M> scope, Class<T> 
type, Class<? extends T> impl) {
+        Preconditions.checkNotNull(currentRegistry, "No registry set before 
being used");
+        currentRegistry.register(scope, new AbstractModule() {
+            @Override
+            protected void configure() {
+                bind(type).to(impl);
+            }
+        });
+    }
+
+    public <T> void bind(Class<T> type, Class<? extends T> impl) {
+        bind(GlobalScope.class,type,impl);
+    }
+
+    protected <M extends MetadataStore,T> void bindToMetaStore(Class<? extends 
M> scope, Class<T> type, Class<? extends T> impl) {
+        bind(scope,type,impl);
+    }
+
+    public <T> void bindToMemoryMetaStore(Class<T> type, Class<? extends T> 
impl) {
+        bindToMetaStore(MemoryMetadataStore.class,type,impl);
     }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fc2407cd/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/spi/ApplicationProvider.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/spi/ApplicationProvider.java
 
b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/spi/ApplicationProvider.java
index 0dceb72..bc70373 100644
--- 
a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/spi/ApplicationProvider.java
+++ 
b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/spi/ApplicationProvider.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.
@@ -14,15 +14,16 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.eagle.app.spi;
 
-import com.typesafe.config.Config;
 import org.apache.eagle.app.Application;
-import org.apache.eagle.app.config.ApplicationProviderConfig;
+import org.apache.eagle.app.service.ApplicationListener;
 import org.apache.eagle.common.module.ModuleRegistry;
 import org.apache.eagle.metadata.model.ApplicationDesc;
 
 import java.lang.reflect.ParameterizedType;
+import java.util.Optional;
 
 /**
  * Application Service Provider Interface.
@@ -32,13 +33,6 @@ import java.lang.reflect.ParameterizedType;
 public interface ApplicationProvider<T extends Application> {
 
     /**
-     * Prepare Application Provider before loading.
-     */
-    default void prepare(ApplicationProviderConfig providerConfig, Config 
envConfig) {
-        // Do nothing by default.
-    }
-
-    /**
      * @return application descriptor.
      */
     ApplicationDesc getApplicationDesc();
@@ -67,6 +61,11 @@ public interface ApplicationProvider<T extends Application> {
     T getApplication();
 
     /**
+     * @return application lifecycle listeners type.
+     */
+    Optional<ApplicationListener> getApplicationListener();
+
+    /**
      * Extend application modules like Web Resource, Metadata Store, etc.
      */
     void register(ModuleRegistry registry);

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fc2407cd/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/test/ApplicationSimulatorImpl.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/test/ApplicationSimulatorImpl.java
 
b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/test/ApplicationSimulatorImpl.java
index 2094d74..1abdb6b 100644
--- 
a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/test/ApplicationSimulatorImpl.java
+++ 
b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/test/ApplicationSimulatorImpl.java
@@ -83,7 +83,6 @@ public class ApplicationSimulatorImpl extends 
ApplicationSimulator {
     public void start(Class<? extends ApplicationProvider> appProviderClass, 
Map<String, Object> appConfig) {
         try {
             ApplicationProvider applicationProvider = 
appProviderClass.newInstance();
-            applicationProvider.prepare(new 
ApplicationProviderConfig(DynamicJarPathFinder.findPath(appProviderClass), 
appProviderClass), config);
             start(applicationProvider.getApplicationDesc().getType(), 
appConfig);
         } catch (InstantiationException | IllegalAccessException e) {
             throw new IllegalStateException(e.getMessage(), e);

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fc2407cd/eagle-core/eagle-app/eagle-app-base/src/test/java/org/apache/eagle/app/TestStormApplication.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-app/eagle-app-base/src/test/java/org/apache/eagle/app/TestStormApplication.java
 
b/eagle-core/eagle-app/eagle-app-base/src/test/java/org/apache/eagle/app/TestStormApplication.java
index c6ac5db..a47e30a 100644
--- 
a/eagle-core/eagle-app/eagle-app-base/src/test/java/org/apache/eagle/app/TestStormApplication.java
+++ 
b/eagle-core/eagle-app/eagle-app-base/src/test/java/org/apache/eagle/app/TestStormApplication.java
@@ -72,13 +72,8 @@ public class TestStormApplication extends StormApplication{
         }
 
         @Override
-        public void register(ModuleRegistry registry) {
-            registry.register(MemoryMetadataStore.class, new AbstractModule() {
-                @Override
-                protected void configure() {
-                    bind(ExtendedDao.class).to(ExtendedDaoImpl.class);
-                }
-            });
+        public void onRegister() {
+            bindToMemoryMetaStore(ExtendedDao.class,ExtendedDaoImpl.class);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fc2407cd/eagle-core/eagle-app/eagle-app-base/src/test/java/org/apache/eagle/app/service/ApplicationActionTest.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-app/eagle-app-base/src/test/java/org/apache/eagle/app/service/ApplicationActionTest.java
 
b/eagle-core/eagle-app/eagle-app-base/src/test/java/org/apache/eagle/app/service/ApplicationActionTest.java
new file mode 100644
index 0000000..d076bf7
--- /dev/null
+++ 
b/eagle-core/eagle-app/eagle-app-base/src/test/java/org/apache/eagle/app/service/ApplicationActionTest.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.app.service;
+
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.HashMap;
+
+public class ApplicationActionTest {
+    /**
+     * appConfig.withFallback(envConfig): appConfig will override envConfig, 
envConfig is used as default config
+     */
+    @Test
+    public void testTypeSafeConfigMerge(){
+        Config appConfig = ConfigFactory.parseMap(new 
HashMap<String,String>(){{
+            
put("APP_CONFIG",ApplicationActionTest.this.getClass().getCanonicalName());
+            put("SCOPE","APP");
+        }});
+
+        Config envConfig = ConfigFactory.parseMap(new 
HashMap<String,String>(){{
+            
put("ENV_CONFIG",ApplicationActionTest.this.getClass().getCanonicalName());
+            put("SCOPE","ENV");
+        }});
+
+        Config mergedConfig = appConfig.withFallback(envConfig);
+        Assert.assertTrue(mergedConfig.hasPath("APP_CONFIG"));
+        Assert.assertTrue(mergedConfig.hasPath("ENV_CONFIG"));
+        Assert.assertEquals("appConfig.withFallback(envConfig): appConfig will 
override envConfig, envConfig is used as default config",
+                "APP",mergedConfig.getString("SCOPE"));
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fc2407cd/eagle-core/eagle-app/eagle-app-base/src/test/java/org/apache/eagle/app/service/ApplicationOperationContextTest.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-app/eagle-app-base/src/test/java/org/apache/eagle/app/service/ApplicationOperationContextTest.java
 
b/eagle-core/eagle-app/eagle-app-base/src/test/java/org/apache/eagle/app/service/ApplicationOperationContextTest.java
deleted file mode 100644
index 21044be..0000000
--- 
a/eagle-core/eagle-app/eagle-app-base/src/test/java/org/apache/eagle/app/service/ApplicationOperationContextTest.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.app.service;
-
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigFactory;
-import org.junit.Assert;
-import org.junit.Test;
-
-import java.util.HashMap;
-
-public class ApplicationOperationContextTest {
-    /**
-     * appConfig.withFallback(envConfig): appConfig will override envConfig, 
envConfig is used as default config
-     */
-    @Test
-    public void testTypeSafeConfigMerge(){
-        Config appConfig = ConfigFactory.parseMap(new 
HashMap<String,String>(){{
-            
put("APP_CONFIG",ApplicationOperationContextTest.this.getClass().getCanonicalName());
-            put("SCOPE","APP");
-        }});
-
-        Config envConfig = ConfigFactory.parseMap(new 
HashMap<String,String>(){{
-            
put("ENV_CONFIG",ApplicationOperationContextTest.this.getClass().getCanonicalName());
-            put("SCOPE","ENV");
-        }});
-
-        Config mergedConfig = appConfig.withFallback(envConfig);
-        Assert.assertTrue(mergedConfig.hasPath("APP_CONFIG"));
-        Assert.assertTrue(mergedConfig.hasPath("ENV_CONFIG"));
-        Assert.assertEquals("appConfig.withFallback(envConfig): appConfig will 
override envConfig, envConfig is used as default config",
-                "APP",mergedConfig.getString("SCOPE"));
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fc2407cd/eagle-core/eagle-metadata/eagle-metadata-base/src/main/java/org/apache/eagle/metadata/model/ApplicationEntity.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-metadata/eagle-metadata-base/src/main/java/org/apache/eagle/metadata/model/ApplicationEntity.java
 
b/eagle-core/eagle-metadata/eagle-metadata-base/src/main/java/org/apache/eagle/metadata/model/ApplicationEntity.java
index 5297de1..c6d01f4 100644
--- 
a/eagle-core/eagle-metadata/eagle-metadata-base/src/main/java/org/apache/eagle/metadata/model/ApplicationEntity.java
+++ 
b/eagle-core/eagle-metadata/eagle-metadata-base/src/main/java/org/apache/eagle/metadata/model/ApplicationEntity.java
@@ -39,6 +39,12 @@ public class ApplicationEntity extends PersistenceEntity {
     private List<StreamDesc> streams;
     private Mode mode = Mode.CLUSTER;
     private String jarPath;
+
+    @Override
+    public String toString() {
+        return String.format("Application[appId=%s,siteId=%s,UUID=%s]", appId, 
descriptor.getType(), this.getUuid());
+    }
+
     private Status status = Status.INITIALIZED;
 
     public ApplicationEntity() {

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fc2407cd/eagle-examples/eagle-app-example/src/main/java/org/apache/eagle/app/example/ExampleApplicationProvider.java
----------------------------------------------------------------------
diff --git 
a/eagle-examples/eagle-app-example/src/main/java/org/apache/eagle/app/example/ExampleApplicationProvider.java
 
b/eagle-examples/eagle-app-example/src/main/java/org/apache/eagle/app/example/ExampleApplicationProvider.java
index c207788..bedda04 100644
--- 
a/eagle-examples/eagle-app-example/src/main/java/org/apache/eagle/app/example/ExampleApplicationProvider.java
+++ 
b/eagle-examples/eagle-app-example/src/main/java/org/apache/eagle/app/example/ExampleApplicationProvider.java
@@ -16,23 +16,25 @@
  */
 package org.apache.eagle.app.example;
 
-import com.google.inject.AbstractModule;
 import org.apache.eagle.app.example.extensions.ExampleCommonService;
 import org.apache.eagle.app.example.extensions.ExampleCommonServiceImpl;
 import org.apache.eagle.app.example.extensions.ExampleEntityService;
 import org.apache.eagle.app.example.extensions.ExampleEntityServiceMemoryImpl;
+import org.apache.eagle.app.service.ApplicationListener;
 import org.apache.eagle.app.spi.AbstractApplicationProvider;
-import org.apache.eagle.common.module.GlobalScope;
-import org.apache.eagle.common.module.ModuleRegistry;
-import org.apache.eagle.metadata.service.memory.MemoryMetadataStore;
+import org.apache.eagle.metadata.model.ApplicationEntity;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import com.google.inject.Inject;
+
+import java.util.Optional;
 
 /**
  * Define application provider pragmatically
  */
 public class ExampleApplicationProvider extends 
AbstractApplicationProvider<ExampleStormApplication> {
-    private final static Logger LOGGER = 
LoggerFactory.getLogger(ExampleApplicationProvider.class);
+    private static final Logger LOG = 
LoggerFactory.getLogger(ExampleApplicationProvider.class);
 
     @Override
     public ExampleStormApplication getApplication() {
@@ -40,21 +42,44 @@ public class ExampleApplicationProvider extends 
AbstractApplicationProvider<Exam
     }
 
     @Override
-    public void register(ModuleRegistry registry) {
-        registry.register(MemoryMetadataStore.class, new AbstractModule() {
+    public Optional<ApplicationListener> getApplicationListener() {
+        return Optional.of(new ApplicationListener() {
+
+            @Inject ExampleEntityService entityService;
+
+            private ApplicationEntity application;
+
             @Override
-            protected void configure() {
-                LOGGER.info("Load memory metadata modules ...");
-                
bind(ExampleEntityService.class).to(ExampleEntityServiceMemoryImpl.class);
+            public void init(ApplicationEntity applicationEntity) {
+                this.application = applicationEntity;
+                entityService.getEntities();
+            }
+
+            @Override
+            public void afterInstall() {
+                LOG.info("afterInstall {}", this.application);
             }
-        });
 
-        registry.register(new AbstractModule() {
             @Override
-            protected void configure() {
-                LOGGER.info("Load global modules ...");
-                
bind(ExampleCommonService.class).to(ExampleCommonServiceImpl.class);
+            public void afterUninstall() {
+                LOG.info("afterUninstall {}", this.application);
+            }
+
+            @Override
+            public void beforeStart() {
+                LOG.info("beforeStart {}", this.application);
+            }
+
+            @Override
+            public void afterStop() {
+                LOG.info("afterStop {}", this.application);
             }
         });
     }
+
+    @Override
+    protected void onRegister() {
+        
bindToMemoryMetaStore(ExampleEntityService.class,ExampleEntityServiceMemoryImpl.class);
+        bind(ExampleCommonService.class,ExampleCommonServiceImpl.class);
+    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fc2407cd/eagle-security/eagle-security-hbase-auditlog/src/main/java/org/apache/eagle/security/hbase/HBaseAuditLogAppProvider.java
----------------------------------------------------------------------
diff --git 
a/eagle-security/eagle-security-hbase-auditlog/src/main/java/org/apache/eagle/security/hbase/HBaseAuditLogAppProvider.java
 
b/eagle-security/eagle-security-hbase-auditlog/src/main/java/org/apache/eagle/security/hbase/HBaseAuditLogAppProvider.java
index d9ae0ab..e788eb9 100644
--- 
a/eagle-security/eagle-security-hbase-auditlog/src/main/java/org/apache/eagle/security/hbase/HBaseAuditLogAppProvider.java
+++ 
b/eagle-security/eagle-security-hbase-auditlog/src/main/java/org/apache/eagle/security/hbase/HBaseAuditLogAppProvider.java
@@ -32,11 +32,6 @@ public class HBaseAuditLogAppProvider extends 
AbstractApplicationProvider<HBaseA
         return new HBaseAuditLogApplication();
     }
 
-    @Override
-    public void register(ModuleRegistry registry) {
-
-    }
-
     private static class MyModule extends AbstractModule {
         @Override
         protected void configure() {

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fc2407cd/eagle-security/eagle-security-hdfs-auditlog/src/main/resources/META-INF/providers/org.apache.eagle.security.auditlog.HdfsAuditLogAppProvider.xml
----------------------------------------------------------------------
diff --git 
a/eagle-security/eagle-security-hdfs-auditlog/src/main/resources/META-INF/providers/org.apache.eagle.security.auditlog.HdfsAuditLogAppProvider.xml
 
b/eagle-security/eagle-security-hdfs-auditlog/src/main/resources/META-INF/providers/org.apache.eagle.security.auditlog.HdfsAuditLogAppProvider.xml
index 625a3a7..9f10fdc 100644
--- 
a/eagle-security/eagle-security-hdfs-auditlog/src/main/resources/META-INF/providers/org.apache.eagle.security.auditlog.HdfsAuditLogAppProvider.xml
+++ 
b/eagle-security/eagle-security-hdfs-auditlog/src/main/resources/META-INF/providers/org.apache.eagle.security.auditlog.HdfsAuditLogAppProvider.xml
@@ -73,13 +73,13 @@
         <property>
             <name>dataSourceConfig.zkConnection</name>
             <displayName>dataSourceConfig.zkConnection</displayName>
-            <value>server.eagle.apache.org</value>
+            <value>localhost</value>
             <description>zk connection</description>
         </property>
         <property>
             <name>dataSourceConfig.txZkServers</name>
             <displayName>dataSourceConfig.txZkServers</displayName>
-            <value>server.eagle.apache.org:2181</value>
+            <value>localhost:2181</value>
             <description>zookeeper server for offset transaction</description>
         </property>
         <property>
@@ -133,7 +133,7 @@
         <property>
             <name>dataSinkConfig.brokerList</name>
             <displayName>dataSinkConfig.brokerList</displayName>
-            <value>server.eagle.apache.org:6667</value>
+            <value>localhost:6667</value>
             <description>kafka broker list</description>
         </property>
         <property>
@@ -153,7 +153,7 @@
         <property>
             <name>fs.defaultFS</name>
             <displayName>fs.defaultFS</displayName>
-            <value>hdfs://server.eagle.apache.org:8020</value>
+            <value>hdfs://localhost:8020</value>
             <description>hdfs endpoint</description>
         </property>
     </configuration>

Reply via email to