[OODT-965] Added configuration change listening feature

Project: http://git-wip-us.apache.org/repos/asf/oodt/repo
Commit: http://git-wip-us.apache.org/repos/asf/oodt/commit/df1db1e4
Tree: http://git-wip-us.apache.org/repos/asf/oodt/tree/df1db1e4
Diff: http://git-wip-us.apache.org/repos/asf/oodt/diff/df1db1e4

Branch: refs/heads/master
Commit: df1db1e44b3289214693f3a0894681400782ba99
Parents: fc6311d
Author: Imesha Sudasingha <imesha.sudasin...@gmail.com>
Authored: Sat Aug 19 16:21:10 2017 +0530
Committer: Imesha Sudasingha <imesha.sudasin...@gmail.com>
Committed: Thu Oct 12 08:07:14 2017 +0530

----------------------------------------------------------------------
 config/pom.xml                                  |  2 +-
 .../org/apache/oodt/config/ConfigEventType.java | 45 +++++++++++++
 .../oodt/config/ConfigurationListener.java      | 31 +++++++++
 .../oodt/config/ConfigurationManager.java       | 23 +++++++
 .../java/org/apache/oodt/config/Constants.java  |  3 +
 .../DistributedConfigurationManager.java        | 64 +++++++++++++++---
 .../DistributedConfigurationPublisher.java      | 26 +++++++-
 .../oodt/config/distributed/ZNodePaths.java     | 27 ++++++++
 .../oodt/config/distributed/cli/CLIAction.java  | 24 +++++--
 config/src/main/resources/cmd-line-options.xml  | 31 +++++++++
 .../DistributedConfigurationManagerTest.java    | 69 ++++++++++++++++++--
 core/pom.xml                                    |  2 +-
 .../cas/filemgr/system/XmlRpcFileManager.java   | 18 ++++-
 pge/pom.xml                                     |  2 +-
 14 files changed, 343 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/oodt/blob/df1db1e4/config/pom.xml
----------------------------------------------------------------------
diff --git a/config/pom.xml b/config/pom.xml
index 3f26499..e34ff44 100644
--- a/config/pom.xml
+++ b/config/pom.xml
@@ -26,7 +26,7 @@
         </dependency>
         <dependency>
             <groupId>org.apache.curator</groupId>
-            <artifactId>curator-framework</artifactId>
+            <artifactId>curator-recipes</artifactId>
         </dependency>
         <dependency>
             <groupId>org.apache.curator</groupId>

http://git-wip-us.apache.org/repos/asf/oodt/blob/df1db1e4/config/src/main/java/org/apache/oodt/config/ConfigEventType.java
----------------------------------------------------------------------
diff --git a/config/src/main/java/org/apache/oodt/config/ConfigEventType.java 
b/config/src/main/java/org/apache/oodt/config/ConfigEventType.java
new file mode 100644
index 0000000..8a465b0
--- /dev/null
+++ b/config/src/main/java/org/apache/oodt/config/ConfigEventType.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.oodt.config;
+
+public enum ConfigEventType {
+    PUBLISH("publish"),
+    CLEAR("clear");
+
+    private String name;
+
+    ConfigEventType(String name) {
+        this.name = name;
+    }
+
+    @Override
+    public String toString() {
+        return name;
+    }
+
+    public static ConfigEventType parse(String string) {
+        switch (string) {
+            case "publish":
+                return PUBLISH;
+            case "clear":
+                return CLEAR;
+            default:
+                return null;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/oodt/blob/df1db1e4/config/src/main/java/org/apache/oodt/config/ConfigurationListener.java
----------------------------------------------------------------------
diff --git 
a/config/src/main/java/org/apache/oodt/config/ConfigurationListener.java 
b/config/src/main/java/org/apache/oodt/config/ConfigurationListener.java
new file mode 100644
index 0000000..a5abcab
--- /dev/null
+++ b/config/src/main/java/org/apache/oodt/config/ConfigurationListener.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.oodt.config;
+
+/**
+ * The interface which should be implemented in order to listen for 
configuration changes.
+ *
+ * @author Imesha Sudasingha
+ */
+public interface ConfigurationListener {
+
+    /**
+     * This method is invoked when there has been any change in configuration 
of the interested component
+     */
+    void configurationChanged(ConfigEventType type);
+}

http://git-wip-us.apache.org/repos/asf/oodt/blob/df1db1e4/config/src/main/java/org/apache/oodt/config/ConfigurationManager.java
----------------------------------------------------------------------
diff --git 
a/config/src/main/java/org/apache/oodt/config/ConfigurationManager.java 
b/config/src/main/java/org/apache/oodt/config/ConfigurationManager.java
index 53aacef..9158fad 100644
--- a/config/src/main/java/org/apache/oodt/config/ConfigurationManager.java
+++ b/config/src/main/java/org/apache/oodt/config/ConfigurationManager.java
@@ -17,7 +17,9 @@
 
 package org.apache.oodt.config;
 
+import java.util.HashSet;
 import java.util.List;
+import java.util.Set;
 
 /**
  * The abstract class to define functions of the configuration managers.
@@ -28,6 +30,7 @@ public abstract class ConfigurationManager {
 
     protected Component component;
     protected String project;
+    private Set<ConfigurationListener> configurationListeners = new 
HashSet<>(1);
 
     public ConfigurationManager(Component component) {
         this(component, Constants.DEFAULT_PROJECT);
@@ -38,6 +41,12 @@ public abstract class ConfigurationManager {
         this.project = project;
     }
 
+    /**
+     * Loads configuration required for {@link #component}. If distributed 
configuration management is enabled, this
+     * will download configuration from zookeeper. Else, this will load 
properties files specified.
+     *
+     * @throws Exception
+     */
     public abstract void loadConfiguration() throws Exception;
 
     /**
@@ -47,6 +56,20 @@ public abstract class ConfigurationManager {
      */
     public abstract void clearConfiguration();
 
+    public synchronized void addConfigurationListener(ConfigurationListener 
listener) {
+        configurationListeners.add(listener);
+    }
+
+    public synchronized void removeConfigurationListener(ConfigurationListener 
listener) {
+        configurationListeners.remove(listener);
+    }
+
+    protected synchronized void notifyConfigurationChange(ConfigEventType 
type) {
+        for (ConfigurationListener listener : configurationListeners) {
+            listener.configurationChanged(type);
+        }
+    }
+
     public Component getComponent() {
         return component;
     }

http://git-wip-us.apache.org/repos/asf/oodt/blob/df1db1e4/config/src/main/java/org/apache/oodt/config/Constants.java
----------------------------------------------------------------------
diff --git a/config/src/main/java/org/apache/oodt/config/Constants.java 
b/config/src/main/java/org/apache/oodt/config/Constants.java
index 9cd217f..3824afb 100644
--- a/config/src/main/java/org/apache/oodt/config/Constants.java
+++ b/config/src/main/java/org/apache/oodt/config/Constants.java
@@ -93,5 +93,8 @@ public class Constants {
 
         /** Where other configuration files will be stored */
         public static final String CONFIGURATION_PATH_NAME = "configuration";
+
+        /** Path to be watched for configuration changes */
+        public static final String NOTIFICATIONS_PATH = "notifications";
     }
 }

http://git-wip-us.apache.org/repos/asf/oodt/blob/df1db1e4/config/src/main/java/org/apache/oodt/config/distributed/DistributedConfigurationManager.java
----------------------------------------------------------------------
diff --git 
a/config/src/main/java/org/apache/oodt/config/distributed/DistributedConfigurationManager.java
 
b/config/src/main/java/org/apache/oodt/config/distributed/DistributedConfigurationManager.java
index ae8912a..6dfb24b 100644
--- 
a/config/src/main/java/org/apache/oodt/config/distributed/DistributedConfigurationManager.java
+++ 
b/config/src/main/java/org/apache/oodt/config/distributed/DistributedConfigurationManager.java
@@ -19,7 +19,10 @@ package org.apache.oodt.config.distributed;
 
 import org.apache.commons.io.FileUtils;
 import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.recipes.cache.NodeCache;
+import org.apache.curator.framework.recipes.cache.NodeCacheListener;
 import org.apache.oodt.config.Component;
+import org.apache.oodt.config.ConfigEventType;
 import org.apache.oodt.config.ConfigurationManager;
 import org.apache.oodt.config.Constants;
 import org.apache.oodt.config.Constants.Properties;
@@ -38,6 +41,7 @@ import java.util.concurrent.TimeUnit;
 
 import static org.apache.oodt.config.Constants.Properties.ZK_CONNECT_STRING;
 import static org.apache.oodt.config.Constants.Properties.ZK_PROPERTIES_FILE;
+import static org.apache.oodt.config.Constants.Properties.ZK_STARTUP_TIMEOUT;
 import static 
org.apache.oodt.config.distributed.utils.ConfigUtils.getOODTProjectName;
 
 /**
@@ -57,12 +61,41 @@ public class DistributedConfigurationManager extends 
ConfigurationManager {
 
     private List<String> savedFiles = new ArrayList<>();
 
+    /** {@link NodeCache} to watch for configuration change notifications */
+    private NodeCache nodeCache;
+    private NodeCacheListener nodeCacheListener = new NodeCacheListener() {
+        @Override
+        public void nodeChanged() throws Exception {
+            byte[] data = 
client.getData().forPath(zNodePaths.getNotificationsZNodePath());
+            if (data == null) {
+                return;
+            }
+
+            String event = new String(data);
+            ConfigEventType type = ConfigEventType.parse(event);
+            if (type != null) {
+                logger.info("Configuration changed event of type: '{}' 
received", type);
+                switch (type) {
+                    case PUBLISH:
+                        loadConfiguration();
+                        break;
+                    case CLEAR:
+                        clearConfiguration();
+                        break;
+                }
+
+                notifyConfigurationChange(type);
+            }
+        }
+    };
+
     public DistributedConfigurationManager(Component component) {
         super(component, getOODTProjectName());
         this.zNodePaths = new ZNodePaths(this.project, 
this.component.getName());
 
-        if (System.getProperty(ZK_PROPERTIES_FILE) == null && 
System.getProperty(Constants.Properties.ZK_CONNECT_STRING) == null) {
-            throw new IllegalArgumentException("Zookeeper requires system 
properties " + ZK_PROPERTIES_FILE + " or " + ZK_CONNECT_STRING + " to be set");
+        if (System.getProperty(ZK_PROPERTIES_FILE) == null && 
System.getProperty(ZK_CONNECT_STRING) == null) {
+            throw new IllegalArgumentException("Zookeeper requires system 
properties " + ZK_PROPERTIES_FILE + " or " +
+                    ZK_CONNECT_STRING + " to be set");
         }
 
         if (System.getProperty(ZK_PROPERTIES_FILE) != null) {
@@ -73,11 +106,11 @@ public class DistributedConfigurationManager extends 
ConfigurationManager {
             }
         }
 
-        if (System.getProperty(Constants.Properties.ZK_CONNECT_STRING) == 
null) {
+        if (System.getProperty(ZK_CONNECT_STRING) == null) {
             throw new IllegalArgumentException("Zookeeper requires a proper 
connect string to connect to zookeeper ensemble");
         }
 
-        connectString = 
System.getProperty(Constants.Properties.ZK_CONNECT_STRING);
+        connectString = System.getProperty(ZK_CONNECT_STRING);
         logger.info("Using zookeeper connect string : {}", connectString);
         startZookeeper();
     }
@@ -90,7 +123,8 @@ public class DistributedConfigurationManager extends 
ConfigurationManager {
         client = CuratorUtils.newCuratorFrameworkClient(connectString, logger);
         client.start();
         logger.info("Curator framework start operation invoked");
-        int startupTimeOutMs = 
Integer.parseInt(System.getProperty(Properties.ZK_STARTUP_TIMEOUT, "30000"));
+
+        int startupTimeOutMs = 
Integer.parseInt(System.getProperty(ZK_STARTUP_TIMEOUT, "30000"));
         try {
             logger.info("Waiting to connect to zookeeper, startupTimeout : 
{}", startupTimeOutMs);
             client.blockUntilConnected(startupTimeOutMs, 
TimeUnit.MILLISECONDS);
@@ -103,10 +137,21 @@ public class DistributedConfigurationManager extends 
ConfigurationManager {
         }
 
         logger.info("CuratorFramework client started successfully");
+
+        nodeCache = new NodeCache(client, 
zNodePaths.getNotificationsZNodePath());
+        nodeCache.getListenable().addListener(nodeCacheListener);
+        try {
+            logger.debug("Starting NodeCache to watch for configuration 
changes");
+            nodeCache.start(true);
+        } catch (Exception e) {
+            logger.error("Error occurred when start listening for 
configuration changes", e);
+            throw new IllegalStateException("Unable to start listening for 
configuration changes", e);
+        }
+        logger.info("NodeCache for watching configuration changes started 
successfully");
     }
 
     @Override
-    public void loadConfiguration() throws Exception {
+    public synchronized void loadConfiguration() throws Exception {
         logger.debug("Loading properties for : {}", component);
         loadProperties();
         logger.info("Properties loaded for : {}", component);
@@ -170,9 +215,8 @@ public class DistributedConfigurationManager extends 
ConfigurationManager {
     private void saveFile(String path, byte[] data) throws IOException {
         String localFilePath = ConfigUtils.fixForComponentHome(component, 
path);
         File localFile = new File(localFilePath);
-        if (localFile.exists()) {
-            logger.warn("Deleting already existing file at {} before writing 
new content", localFilePath);
-            localFile.delete();
+        if (localFile.exists() && localFile.delete()) {
+            logger.warn("Deleted already existing file at {} before writing 
new content", localFilePath);
         }
 
         logger.debug("Storing configuration in file: {}", localFilePath);
@@ -183,7 +227,7 @@ public class DistributedConfigurationManager extends 
ConfigurationManager {
 
     /** {@inheritDoc} */
     @Override
-    public void clearConfiguration() {
+    public synchronized void clearConfiguration() {
         for (String path : savedFiles) {
             logger.debug("Removing saved file {}", path);
             File file = new File(path);

http://git-wip-us.apache.org/repos/asf/oodt/blob/df1db1e4/config/src/main/java/org/apache/oodt/config/distributed/DistributedConfigurationPublisher.java
----------------------------------------------------------------------
diff --git 
a/config/src/main/java/org/apache/oodt/config/distributed/DistributedConfigurationPublisher.java
 
b/config/src/main/java/org/apache/oodt/config/distributed/DistributedConfigurationPublisher.java
index 6229e96..9d3dd4a 100644
--- 
a/config/src/main/java/org/apache/oodt/config/distributed/DistributedConfigurationPublisher.java
+++ 
b/config/src/main/java/org/apache/oodt/config/distributed/DistributedConfigurationPublisher.java
@@ -20,6 +20,7 @@ package org.apache.oodt.config.distributed;
 import org.apache.commons.io.FileUtils;
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.oodt.config.Component;
+import org.apache.oodt.config.ConfigEventType;
 import org.apache.oodt.config.Constants;
 import org.apache.oodt.config.distributed.utils.CuratorUtils;
 import org.apache.zookeeper.data.Stat;
@@ -83,6 +84,14 @@ public class DistributedConfigurationPublisher {
         logger.info("Using zookeeper connect string : {}", connectString);
 
         startZookeeper();
+
+        try {
+            logger.debug("Creating ZNode paths");
+            zNodePaths.createZNodes(client);
+        } catch (Exception e) {
+            logger.error("Error occurred when creating initial ZNode paths", 
e);
+            throw new IllegalStateException("Unable to create ZNode paths", e);
+        }
     }
 
     /**
@@ -145,7 +154,8 @@ public class DistributedConfigurationPublisher {
      */
     public boolean verifyPublishedConfiguration() {
         try {
-            return verifyPublishedConfiguration(propertiesFiles, true) && 
verifyPublishedConfiguration(configFiles, false);
+            return verifyPublishedConfiguration(propertiesFiles, true) &&
+                    verifyPublishedConfiguration(configFiles, false);
         } catch (Exception e) {
             logger.error("Error occurred when checking published config", e);
             return false;
@@ -165,6 +175,17 @@ public class DistributedConfigurationPublisher {
         logger.info("Configuration cleared!");
     }
 
+    /**
+     * Notifies the watching {@link 
org.apache.oodt.config.ConfigurationManager}s about the configuration change
+     *
+     * @param type {@link ConfigEventType}
+     * @throws Exception
+     */
+    public void notifyConfigEvent(ConfigEventType type) throws Exception {
+        logger.info("Notifying event: '{}' to configuration managers of {}", 
type, component);
+        client.setData().forPath(zNodePaths.getNotificationsZNodePath(), 
type.toString().getBytes());
+    }
+
     private void publishConfiguration(Map<String, String> fileMapping, boolean 
isProperties) throws Exception {
         for (Map.Entry<String, String> entry : fileMapping.entrySet()) {
             String filePath = entry.getKey();
@@ -173,7 +194,8 @@ public class DistributedConfigurationPublisher {
 
             String content = getFileContent(filePath);
 
-            String zNodePath = isProperties ? 
zNodePaths.getPropertiesZNodePath(relativeZNodePath) : 
zNodePaths.getConfigurationZNodePath(relativeZNodePath);
+            String zNodePath = isProperties ? 
zNodePaths.getPropertiesZNodePath(relativeZNodePath) :
+                    zNodePaths.getConfigurationZNodePath(relativeZNodePath);
             if (client.checkExists().forPath(zNodePath) != null) {
                 byte[] bytes = client.getData().forPath(zNodePath);
                 String existingData = new String(bytes);

http://git-wip-us.apache.org/repos/asf/oodt/blob/df1db1e4/config/src/main/java/org/apache/oodt/config/distributed/ZNodePaths.java
----------------------------------------------------------------------
diff --git 
a/config/src/main/java/org/apache/oodt/config/distributed/ZNodePaths.java 
b/config/src/main/java/org/apache/oodt/config/distributed/ZNodePaths.java
index cf3ca00..6324ac1 100644
--- a/config/src/main/java/org/apache/oodt/config/distributed/ZNodePaths.java
+++ b/config/src/main/java/org/apache/oodt/config/distributed/ZNodePaths.java
@@ -17,9 +17,14 @@
 
 package org.apache.oodt.config.distributed;
 
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.oodt.config.distributed.utils.CuratorUtils;
+import org.apache.zookeeper.CreateMode;
+
 import static org.apache.oodt.config.Constants.DEFAULT_PROJECT;
 import static org.apache.oodt.config.Constants.ZPaths.COMPONENTS_PATH_NAME;
 import static org.apache.oodt.config.Constants.ZPaths.CONFIGURATION_PATH_NAME;
+import static org.apache.oodt.config.Constants.ZPaths.NOTIFICATIONS_PATH;
 import static org.apache.oodt.config.Constants.ZPaths.PROJECTS_PATH_NAME;
 import static org.apache.oodt.config.Constants.ZPaths.PROPERTIES_PATH_NAME;
 import static org.apache.oodt.config.Constants.ZPaths.SEPARATOR;
@@ -43,6 +48,9 @@ public class ZNodePaths {
     private String configurationZNodePath;
     private String configurationZNodeRoot;
 
+    /** ZNode to be watched for configuration changes. 
/projects/${project}/components/${component}/notifications */
+    private String notificationsZNodePath;
+
     /**
      * Creates the ZNode path structure accordingly to the 
<pre>componentName</pre> and <pre>propertiesFileNames</pre>
      * given.
@@ -70,6 +78,21 @@ public class ZNodePaths {
 
         configurationZNodePath = componentZNodeRoot + CONFIGURATION_PATH_NAME;
         configurationZNodeRoot = configurationZNodePath + SEPARATOR;
+
+        notificationsZNodePath = componentZNodeRoot + NOTIFICATIONS_PATH;
+    }
+
+    /**
+     * Creates the initial ZNode structure in zookeeper. Supposed to be called 
by the {@link
+     * DistributedConfigurationPublisher}.
+     *
+     * @param client {@link CuratorFramework} instance
+     * @throws Exception
+     */
+    public void createZNodes(CuratorFramework client) throws Exception {
+        CuratorUtils.createZNodeIfNotExists(client, propertiesZNodePath, 
CreateMode.PERSISTENT, new byte[1]);
+        CuratorUtils.createZNodeIfNotExists(client, configurationZNodePath, 
CreateMode.PERSISTENT, new byte[1]);
+        CuratorUtils.createZNodeIfNotExists(client, notificationsZNodePath, 
CreateMode.PERSISTENT, new byte[1]);
     }
 
     public String getComponentZNodePath() {
@@ -99,4 +122,8 @@ public class ZNodePaths {
     public String getLocalPropertiesFilePath(String zNodePath) {
         return zNodePath.substring(propertiesZNodeRoot.length());
     }
+
+    public String getNotificationsZNodePath() {
+        return notificationsZNodePath;
+    }
 }

http://git-wip-us.apache.org/repos/asf/oodt/blob/df1db1e4/config/src/main/java/org/apache/oodt/config/distributed/cli/CLIAction.java
----------------------------------------------------------------------
diff --git 
a/config/src/main/java/org/apache/oodt/config/distributed/cli/CLIAction.java 
b/config/src/main/java/org/apache/oodt/config/distributed/cli/CLIAction.java
index 1ab1b30..50e567a 100644
--- a/config/src/main/java/org/apache/oodt/config/distributed/cli/CLIAction.java
+++ b/config/src/main/java/org/apache/oodt/config/distributed/cli/CLIAction.java
@@ -19,6 +19,7 @@ package org.apache.oodt.config.distributed.cli;
 
 import org.apache.oodt.cas.cli.action.CmdLineAction;
 import org.apache.oodt.cas.cli.exception.CmdLineActionException;
+import org.apache.oodt.config.ConfigEventType;
 import org.apache.oodt.config.distributed.DistributedConfigurationPublisher;
 import org.springframework.beans.BeansException;
 import org.springframework.context.ApplicationContext;
@@ -36,12 +37,9 @@ import static 
org.apache.oodt.config.Constants.Properties.ZK_CONNECT_STRING;
  */
 public class CLIAction extends CmdLineAction {
 
-    public enum Action {
-        PUBLISH, VERIFY, CLEAR
-    }
-
     private String connectString;
     private String configFile = DEFAULT_CONFIG_PUBLISHER_XML;
+    private boolean notify = false;
 
     private Action action;
 
@@ -60,12 +58,18 @@ public class CLIAction extends CmdLineAction {
                 switch (action) {
                     case PUBLISH:
                         publish(publisher);
+                        if (notify) {
+                            
publisher.notifyConfigEvent(ConfigEventType.PUBLISH);
+                        }
                         break;
                     case VERIFY:
                         verify(publisher);
                         break;
                     case CLEAR:
                         clear(publisher);
+                        if (notify) {
+                            publisher.notifyConfigEvent(ConfigEventType.CLEAR);
+                        }
                         break;
                 }
                 publisher.destroy();
@@ -123,4 +127,16 @@ public class CLIAction extends CmdLineAction {
     public void setConfigFile(String configFile) {
         this.configFile = configFile;
     }
+
+    public boolean isNotify() {
+        return notify;
+    }
+
+    public void setNotify(boolean notify) {
+        this.notify = notify;
+    }
+
+    public enum Action {
+        PUBLISH, VERIFY, CLEAR
+    }
 }

http://git-wip-us.apache.org/repos/asf/oodt/blob/df1db1e4/config/src/main/resources/cmd-line-options.xml
----------------------------------------------------------------------
diff --git a/config/src/main/resources/cmd-line-options.xml 
b/config/src/main/resources/cmd-line-options.xml
index 698ad32..3aa506b 100644
--- a/config/src/main/resources/cmd-line-options.xml
+++ b/config/src/main/resources/cmd-line-options.xml
@@ -82,4 +82,35 @@
             </bean>
         </property>
     </bean>
+
+    <bean id="notifyConfigChange" 
class="org.apache.oodt.cas.cli.option.AdvancedCmdLineOption">
+        <property name="shortOption" value="n"/>
+        <property name="longOption" value="notify"/>
+        <property name="description" value="Notify the configuration managers' 
about the configuration change done"/>
+        <property name="hasArgs" value="false"/>
+        <property name="requirementRules">
+            <list>
+                <bean 
class="org.apache.oodt.cas.cli.option.require.ActionDependencyRule"
+                      p:actionName="publish" p:relation="OPTIONAL"/>
+                <bean 
class="org.apache.oodt.cas.cli.option.require.ActionDependencyRule"
+                      p:actionName="clear" p:relation="OPTIONAL"/>
+                <bean 
class="org.apache.oodt.cas.cli.option.require.ActionDependencyRule"
+                      p:actionName="verify" p:relation="OPTIONAL"/>
+            </list>
+        </property>
+        <property name="handler">
+            <bean 
class="org.apache.oodt.cas.cli.option.handler.ApplyToActionHandler">
+                <property name="applyToActions">
+                    <list>
+                        <bean 
class="org.apache.oodt.cas.cli.option.handler.ApplyToAction"
+                              p:actionName="publish" p:methodName="setNotify"/>
+                        <bean 
class="org.apache.oodt.cas.cli.option.handler.ApplyToAction"
+                              p:actionName="verify" p:methodName="setNotify"/>
+                        <bean 
class="org.apache.oodt.cas.cli.option.handler.ApplyToAction"
+                              p:actionName="clear" p:methodName="setNotify"/>
+                    </list>
+                </property>
+            </bean>
+        </property>
+    </bean>
 </beans>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/oodt/blob/df1db1e4/config/src/test/java/org/apache/oodt/config/distributed/DistributedConfigurationManagerTest.java
----------------------------------------------------------------------
diff --git 
a/config/src/test/java/org/apache/oodt/config/distributed/DistributedConfigurationManagerTest.java
 
b/config/src/test/java/org/apache/oodt/config/distributed/DistributedConfigurationManagerTest.java
index 0bf2dde..b553643 100644
--- 
a/config/src/test/java/org/apache/oodt/config/distributed/DistributedConfigurationManagerTest.java
+++ 
b/config/src/test/java/org/apache/oodt/config/distributed/DistributedConfigurationManagerTest.java
@@ -17,6 +17,7 @@
 
 package org.apache.oodt.config.distributed;
 
+import org.apache.oodt.config.Component;
 import org.apache.oodt.config.ConfigurationManager;
 import org.apache.oodt.config.distributed.cli.ConfigPublisher;
 import org.apache.oodt.config.distributed.utils.ConfigUtils;
@@ -32,6 +33,7 @@ import java.io.File;
 import java.io.FileInputStream;
 import java.io.InputStream;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
@@ -50,6 +52,7 @@ public class DistributedConfigurationManagerTest extends 
AbstractDistributedConf
     private static final String CONFIG_PUBLISHER_XML = "config-publisher.xml";
 
     private List<DistributedConfigurationPublisher> publishers;
+    private Map<Component, ConfigurationManager> configurationManagers;
 
     @Before
     public void setUpTest() throws Exception {
@@ -65,21 +68,24 @@ public class DistributedConfigurationManagerTest extends 
AbstractDistributedConf
         ApplicationContext applicationContext = new 
ClassPathXmlApplicationContext(CONFIG_PUBLISHER_XML);
         Map distributedConfigurationPublishers = 
applicationContext.getBeansOfType(DistributedConfigurationPublisher.class);
 
-        publishers = new 
ArrayList<>(distributedConfigurationPublishers.values().size());
+        publishers = new ArrayList<>();
+        configurationManagers = new HashMap<>();
         for (Object bean : distributedConfigurationPublishers.values()) {
             DistributedConfigurationPublisher publisher = 
(DistributedConfigurationPublisher) bean;
 
+            System.setProperty(OODT_PROJECT, publisher.getProject());
             System.setProperty(publisher.getComponent().getHome(), ".");
             publishers.add(publisher);
+            configurationManagers.put(publisher.getComponent(), new 
DistributedConfigurationManager(publisher.getComponent()));
+            System.clearProperty(OODT_PROJECT);
         }
     }
 
     @Test
     public void loadConfigurationTest() throws Exception {
         for (DistributedConfigurationPublisher publisher : publishers) {
-            System.setProperty(OODT_PROJECT, publisher.getProject());
 
-            ConfigurationManager configurationManager = new 
DistributedConfigurationManager(publisher.getComponent());
+            ConfigurationManager configurationManager = 
configurationManagers.get(publisher.getComponent());
             configurationManager.loadConfiguration();
 
             // Checking for configuration files
@@ -117,8 +123,63 @@ public class DistributedConfigurationManagerTest extends 
AbstractDistributedConf
                 File file = new File(localFile);
                 Assert.assertFalse(file.exists());
             }
+        }
+    }
 
-            System.clearProperty(OODT_PROJECT);
+    @Test
+    public void notifyConfigurationChangeTest() throws Exception {
+        // First publish config. Then check if config has downloaded locally.
+        ConfigPublisher.main(new String[]{
+                "-connectString", zookeeper.getConnectString(),
+                "-config", CONFIG_PUBLISHER_XML,
+                "-notify",
+                "-a", "publish"
+        });
+        Thread.sleep(5000);
+        checkFileExistence(true);
+
+        // Now clear config. Then check if config has deleted locally.
+        ConfigPublisher.main(new String[]{
+                "-connectString", zookeeper.getConnectString(),
+                "-config", CONFIG_PUBLISHER_XML,
+                "-notify",
+                "-a", "clear"
+        });
+        Thread.sleep(5000);
+        checkFileExistence(false);
+
+        // First publish config. Then check if config has downloaded locally.
+        ConfigPublisher.main(new String[]{
+                "-connectString", zookeeper.getConnectString(),
+                "-config", CONFIG_PUBLISHER_XML,
+                "-notify",
+                "-a", "publish"
+        });
+        Thread.sleep(5000);
+        checkFileExistence(true);
+    }
+
+    private void checkFileExistence(boolean exists) {
+        for (DistributedConfigurationPublisher publisher : publishers) {
+            for (String fileName : publisher.getPropertiesFiles().values()) {
+                fileName = 
ConfigUtils.fixForComponentHome(publisher.getComponent(), fileName);
+                File file = new File(fileName);
+                if (exists) {
+                    Assert.assertTrue(file.exists());
+                } else {
+                    Assert.assertFalse(file.exists());
+                }
+            }
+
+            for (String fileName : publisher.getConfigFiles().values()) {
+                fileName = 
ConfigUtils.fixForComponentHome(publisher.getComponent(), fileName);
+                File file = new File(fileName);
+                if (exists) {
+                    Assert.assertTrue(file.exists());
+                } else {
+                    Assert.assertFalse(file.exists());
+                }
+            }
         }
     }
 

http://git-wip-us.apache.org/repos/asf/oodt/blob/df1db1e4/core/pom.xml
----------------------------------------------------------------------
diff --git a/core/pom.xml b/core/pom.xml
index 5e84d83..f6e0fc3 100644
--- a/core/pom.xml
+++ b/core/pom.xml
@@ -244,7 +244,7 @@ the License.
       </dependency>
       <dependency>
         <groupId>org.apache.curator</groupId>
-        <artifactId>curator-framework</artifactId>
+        <artifactId>curator-recipes</artifactId>
         <version>3.3.0</version>
       </dependency>
       <dependency>

http://git-wip-us.apache.org/repos/asf/oodt/blob/df1db1e4/filemgr/src/main/java/org/apache/oodt/cas/filemgr/system/XmlRpcFileManager.java
----------------------------------------------------------------------
diff --git 
a/filemgr/src/main/java/org/apache/oodt/cas/filemgr/system/XmlRpcFileManager.java
 
b/filemgr/src/main/java/org/apache/oodt/cas/filemgr/system/XmlRpcFileManager.java
index 4ac48bb..37d32d2 100644
--- 
a/filemgr/src/main/java/org/apache/oodt/cas/filemgr/system/XmlRpcFileManager.java
+++ 
b/filemgr/src/main/java/org/apache/oodt/cas/filemgr/system/XmlRpcFileManager.java
@@ -54,6 +54,8 @@ import org.apache.oodt.cas.metadata.Metadata;
 import org.apache.oodt.cas.metadata.exceptions.MetExtractionException;
 import org.apache.oodt.commons.date.DateUtils;
 import org.apache.oodt.config.Component;
+import org.apache.oodt.config.ConfigEventType;
+import org.apache.oodt.config.ConfigurationListener;
 import org.apache.oodt.config.ConfigurationManager;
 import org.apache.oodt.config.ConfigurationManagerFactory;
 import org.apache.xmlrpc.WebServer;
@@ -112,6 +114,18 @@ public class XmlRpcFileManager {
 
   /** Configuration Manager instance which will handle the configuration 
aspect in distributed/standalone manner */
   private ConfigurationManager configurationManager;
+  private ConfigurationListener configurationListener = new 
ConfigurationListener() {
+    @Override
+    public void configurationChanged(ConfigEventType type) {
+      switch (type) {
+        case PUBLISH:
+          refreshConfigAndPolicy();
+          break;
+        case CLEAR:
+          // TODO: 8/19/17 What should we do if the config has been cleared?
+      }
+    }
+  };
 
   /**
    * <p> Creates a new XmlRpcFileManager with the given metadata store 
factory, and the given data store factory, on the
@@ -135,8 +149,9 @@ public class XmlRpcFileManager {
     }
 
     configurationManager = 
ConfigurationManagerFactory.getConfigurationManager(Component.FILE_MANAGER, 
propertiesFiles);
-
+    configurationManager.addConfigurationListener(configurationListener);
     this.loadConfiguration();
+
     LOG.log(Level.INFO, "File Manager started by " + 
System.getProperty("user.name", "unknown"));
   }
 
@@ -1224,6 +1239,7 @@ public class XmlRpcFileManager {
   }
 
   public boolean shutdown() {
+    configurationManager.removeConfigurationListener(configurationListener);
     configurationManager.clearConfiguration();
     if (this.webServer != null) {
       this.webServer.shutdown();

http://git-wip-us.apache.org/repos/asf/oodt/blob/df1db1e4/pge/pom.xml
----------------------------------------------------------------------
diff --git a/pge/pom.xml b/pge/pom.xml
index 6fd4d72..72a33d9 100644
--- a/pge/pom.xml
+++ b/pge/pom.xml
@@ -27,7 +27,7 @@ the License.
   <artifactId>cas-pge</artifactId>
   <name>CAS PGE Adaptor Framework</name>
   <description>Allows data processing jobs not written in conformance with the
-        PCS PGE interface to be run within the PCS.</description>
+        PCS PGE (Production Generation Executive) interface to be run within 
the PCS.</description>
   <!-- All dependencies should be listed in core/pom.xml and be ordered 
alphabetically by package and artifact.
      Once the dependency is in the core pom, it can then be used in other 
modules without the version tags.
      For example, within core/pom.xml:

Reply via email to