http://git-wip-us.apache.org/repos/asf/asterixdb/blob/4c7b5bfa/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java
index c47284c..21b9dcf 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java
@@ -20,6 +20,7 @@ package org.apache.hyracks.control.cc;
 
 import java.io.File;
 import java.io.FileReader;
+import java.io.IOException;
 import java.lang.reflect.Constructor;
 import java.lang.reflect.InvocationTargetException;
 import java.net.InetAddress;
@@ -32,20 +33,22 @@ import java.util.Map;
 import java.util.Set;
 import java.util.Timer;
 import java.util.TimerTask;
+import java.util.TreeMap;
 import java.util.concurrent.Executor;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
+import org.apache.commons.lang3.tuple.Pair;
 import org.apache.hyracks.api.application.ICCApplicationEntryPoint;
 import org.apache.hyracks.api.client.ClusterControllerInfo;
 import org.apache.hyracks.api.comm.NetworkAddress;
+import org.apache.hyracks.api.config.IApplicationConfig;
 import org.apache.hyracks.api.context.ICCContext;
 import org.apache.hyracks.api.deployment.DeploymentId;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.exceptions.HyracksException;
-import org.apache.hyracks.api.job.resource.DefaultJobCapacityController;
 import org.apache.hyracks.api.job.resource.IJobCapacityController;
 import org.apache.hyracks.api.service.IControllerService;
 import org.apache.hyracks.api.topology.ClusterTopology;
@@ -66,9 +69,10 @@ import 
org.apache.hyracks.control.cc.work.GetThreadDumpWork.ThreadDumpRun;
 import org.apache.hyracks.control.cc.work.RemoveDeadNodesWork;
 import org.apache.hyracks.control.cc.work.ShutdownNCServiceWork;
 import org.apache.hyracks.control.cc.work.TriggerNCWork;
+import org.apache.hyracks.control.common.config.ConfigManager;
 import org.apache.hyracks.control.common.context.ServerContext;
 import org.apache.hyracks.control.common.controllers.CCConfig;
-import org.apache.hyracks.control.common.controllers.IniUtils;
+import org.apache.hyracks.control.common.controllers.NCConfig;
 import org.apache.hyracks.control.common.deployment.DeploymentRun;
 import org.apache.hyracks.control.common.ipc.CCNCFunctions;
 import org.apache.hyracks.control.common.logs.LogFile;
@@ -77,7 +81,6 @@ import org.apache.hyracks.control.common.work.WorkQueue;
 import org.apache.hyracks.ipc.api.IIPCI;
 import org.apache.hyracks.ipc.impl.IPCSystem;
 import 
org.apache.hyracks.ipc.impl.JavaSerializationBasedPayloadSerializerDeserializer;
-import org.ini4j.Ini;
 import org.xml.sax.InputSource;
 
 public class ClusterControllerService implements IControllerService {
@@ -85,6 +88,8 @@ public class ClusterControllerService implements 
IControllerService {
 
     private final CCConfig ccConfig;
 
+    private final ConfigManager configManager;
+
     private IPCSystem clusterIPC;
 
     private IPCSystem clientIPC;
@@ -127,11 +132,22 @@ public class ClusterControllerService implements 
IControllerService {
 
     private ShutdownRun shutdownCallback;
 
-    private ICCApplicationEntryPoint aep;
+    private final ICCApplicationEntryPoint aep;
+
+    public ClusterControllerService(final CCConfig config) throws Exception {
+        this(config, getApplicationEntryPoint(config));
+    }
 
-    public ClusterControllerService(final CCConfig ccConfig) throws Exception {
-        this.ccConfig = ccConfig;
-        File jobLogFolder = new File(ccConfig.ccRoot, "logs/jobs");
+    public ClusterControllerService(final CCConfig config,
+                                    final ICCApplicationEntryPoint aep) throws 
Exception {
+        this.ccConfig = config;
+        this.configManager = ccConfig.getConfigManager();
+        if (aep == null) {
+            throw new IllegalArgumentException("ICCApplicationEntryPoint 
cannot be null");
+        }
+        this.aep = aep;
+        configManager.processConfig();
+        File jobLogFolder = new File(ccConfig.getRootDir(), "logs/jobs");
         jobLog = new LogFile(jobLogFolder);
 
         // WorkQueue is in charge of heartbeat as well as other events.
@@ -140,7 +156,8 @@ public class ClusterControllerService implements 
IControllerService {
         final ClusterTopology topology = computeClusterTopology(ccConfig);
         ccContext = new ClusterControllerContext(topology);
         sweeper = new DeadNodeSweeper();
-        datasetDirectoryService = new 
DatasetDirectoryService(ccConfig.resultTTL, ccConfig.resultSweepThreshold);
+        datasetDirectoryService = new 
DatasetDirectoryService(ccConfig.getResultTTL(),
+                ccConfig.getResultSweepThreshold());
 
         deploymentRunMap = new HashMap<>();
         stateDumpRunMap = new HashMap<>();
@@ -151,10 +168,10 @@ public class ClusterControllerService implements 
IControllerService {
     }
 
     private static ClusterTopology computeClusterTopology(CCConfig ccConfig) 
throws Exception {
-        if (ccConfig.clusterTopologyDefinition == null) {
+        if (ccConfig.getClusterTopology() == null) {
             return null;
         }
-        FileReader fr = new FileReader(ccConfig.clusterTopologyDefinition);
+        FileReader fr = new FileReader(ccConfig.getClusterTopology());
         InputSource in = new InputSource(fr);
         try {
             return TopologyDefinitionParser.parse(in);
@@ -166,20 +183,21 @@ public class ClusterControllerService implements 
IControllerService {
     @Override
     public void start() throws Exception {
         LOGGER.log(Level.INFO, "Starting ClusterControllerService: " + this);
-        serverCtx = new 
ServerContext(ServerContext.ServerType.CLUSTER_CONTROLLER, new 
File(ccConfig.ccRoot));
+        serverCtx = new 
ServerContext(ServerContext.ServerType.CLUSTER_CONTROLLER, new 
File(ccConfig.getRootDir()));
         IIPCI ccIPCI = new ClusterControllerIPCI(this);
-        clusterIPC = new IPCSystem(new 
InetSocketAddress(ccConfig.clusterNetPort), ccIPCI,
+        clusterIPC = new IPCSystem(new 
InetSocketAddress(ccConfig.getClusterListenPort()), ccIPCI,
                 new CCNCFunctions.SerializerDeserializer());
         IIPCI ciIPCI = new ClientInterfaceIPCI(this);
-        clientIPC = new IPCSystem(new 
InetSocketAddress(ccConfig.clientNetIpAddress, ccConfig.clientNetPort), ciIPCI,
-                new JavaSerializationBasedPayloadSerializerDeserializer());
-        webServer = new WebServer(this, ccConfig.httpPort);
+        clientIPC = new IPCSystem(
+                new InetSocketAddress(ccConfig.getClientListenAddress(), 
ccConfig.getClientListenPort()),
+                ciIPCI, new 
JavaSerializationBasedPayloadSerializerDeserializer());
+        webServer = new WebServer(this, ccConfig.getConsoleListenPort());
         clusterIPC.start();
         clientIPC.start();
         webServer.start();
-        info = new ClusterControllerInfo(ccConfig.clientNetIpAddress, 
ccConfig.clientNetPort,
+        info = new ClusterControllerInfo(ccConfig.getClientListenAddress(), 
ccConfig.getClientListenPort(),
                 webServer.getListeningPort());
-        timer.schedule(sweeper, 0, ccConfig.heartbeatPeriod);
+        timer.schedule(sweeper, 0, ccConfig.getHeartbeatPeriod());
         jobLog.open();
         startApplication();
 
@@ -194,84 +212,62 @@ public class ClusterControllerService implements 
IControllerService {
         appCtx = new CCApplicationContext(this, serverCtx, ccContext, 
ccConfig.getAppConfig());
         appCtx.addJobLifecycleListener(datasetDirectoryService);
         executor = Executors.newCachedThreadPool(appCtx.getThreadFactory());
-        String className = ccConfig.appCCMainClass;
-
-        IJobCapacityController jobCapacityController = 
DefaultJobCapacityController.INSTANCE;
-        if (className != null) {
-            Class<?> c = Class.forName(className);
-            aep = (ICCApplicationEntryPoint) c.newInstance();
-            String[] args = ccConfig.appArgs == null ? null
-                    : ccConfig.appArgs.toArray(new 
String[ccConfig.appArgs.size()]);
-            aep.start(appCtx, args);
-            jobCapacityController = aep.getJobCapacityController();
-        }
+        aep.start(appCtx, ccConfig.getAppArgsArray());
+        IJobCapacityController jobCapacityController = 
aep.getJobCapacityController();
 
         // Job manager is in charge of job lifecycle management.
         try {
             Constructor<?> jobManagerConstructor = 
this.getClass().getClassLoader()
-                    .loadClass(ccConfig.jobManagerClassName)
+                    .loadClass(ccConfig.getJobManagerClass())
                     .getConstructor(CCConfig.class, 
ClusterControllerService.class, IJobCapacityController.class);
             jobManager = (IJobManager) 
jobManagerConstructor.newInstance(ccConfig, this, jobCapacityController);
         } catch (ClassNotFoundException | InstantiationException | 
IllegalAccessException | NoSuchMethodException
                 | InvocationTargetException e) {
             if (LOGGER.isLoggable(Level.WARNING)) {
-                LOGGER.log(Level.WARNING, "class " + 
ccConfig.jobManagerClassName + " could not be used: ", e);
+                LOGGER.log(Level.WARNING, "class " + 
ccConfig.getJobManagerClass() + " could not be used: ", e);
             }
             // Falls back to the default implementation if the user-provided 
class name is not valid.
             jobManager = new JobManager(ccConfig, this, jobCapacityController);
         }
     }
 
-    private void connectNCs() throws Exception {
-        Ini ini = ccConfig.getIni();
-        if (ini == null || Boolean.parseBoolean(ini.get("cc", 
"virtual.cluster"))) {
-            return;
-        }
-        for (String section : ini.keySet()) {
-            if (!section.startsWith("nc/")) {
-                continue;
-            }
-            String ncid = section.substring(3);
-            String address = IniUtils.getString(ini, section, "address", null);
-            int port = IniUtils.getInt(ini, section, "port", 9090);
-            if (address == null) {
-                address = InetAddress.getLoopbackAddress().getHostAddress();
+    private Map<String, Pair<String, Integer>> getNCServices() throws 
IOException {
+        Map<String, Pair<String, Integer>> ncMap = new TreeMap<>();
+        for (String ncId : configManager.getNodeNames()) {
+            IApplicationConfig ncConfig = 
configManager.getNodeEffectiveConfig(ncId);
+            if (!ncConfig.getBoolean(NCConfig.Option.VIRTUAL_NC)) {
+                ncMap.put(ncId, 
Pair.of(ncConfig.getString(NCConfig.Option.NCSERVICE_ADDRESS),
+                        ncConfig.getInt(NCConfig.Option.NCSERVICE_PORT)));
             }
-            workQueue.schedule(new TriggerNCWork(this, address, port, ncid));
         }
+        return ncMap;
+    }
+
+    private void connectNCs() throws IOException {
+        getNCServices().entrySet().forEach(ncService -> {
+            final TriggerNCWork triggerWork = new 
TriggerNCWork(ClusterControllerService.this,
+                    ncService.getValue().getLeft(), 
ncService.getValue().getRight(), ncService.getKey());
+            workQueue.schedule(triggerWork);
+        });
     }
 
     private void terminateNCServices() throws Exception {
-        Ini ini = ccConfig.getIni();
-        if (ini == null || Boolean.parseBoolean(ini.get("cc", 
"virtual.cluster"))) {
-            return;
-        }
         List<ShutdownNCServiceWork> shutdownNCServiceWorks = new ArrayList<>();
-        for (String section : ini.keySet()) {
-            if (!section.startsWith("nc/")) {
-                continue;
-            }
-            String ncid = section.substring(3);
-            String address = IniUtils.getString(ini, section, "address", null);
-            int port = IniUtils.getInt(ini, section, "port", 9090);
-            if (address == null) {
-                address = InetAddress.getLoopbackAddress().getHostAddress();
-            }
-            ShutdownNCServiceWork shutdownWork = new 
ShutdownNCServiceWork(address, port, ncid);
+        getNCServices().entrySet().forEach(ncService -> {
+            ShutdownNCServiceWork shutdownWork = new 
ShutdownNCServiceWork(ncService.getValue().getLeft(),
+                    ncService.getValue().getRight(), ncService.getKey());
             workQueue.schedule(shutdownWork);
             shutdownNCServiceWorks.add(shutdownWork);
-        }
+        });
         for (ShutdownNCServiceWork shutdownWork : shutdownNCServiceWorks) {
             shutdownWork.sync();
         }
     }
 
     private void notifyApplication() throws Exception {
-        if (aep != null) {
-            // Sometimes, there is no application entry point. Check 
hyracks-client project
-            aep.startupCompleted();
-        }
+        aep.startupCompleted();
     }
+
     public void stop(boolean terminateNCService) throws Exception {
         if (terminateNCService) {
             terminateNCServices();
@@ -294,9 +290,7 @@ public class ClusterControllerService implements 
IControllerService {
     }
 
     private void stopApplication() throws Exception {
-        if (aep != null) {
-            aep.stop();
-        }
+        aep.stop();
     }
 
     public ServerContext getServerContext() {
@@ -360,7 +354,7 @@ public class ClusterControllerService implements 
IControllerService {
     }
 
     public NetworkAddress getDatasetDirectoryServiceInfo() {
-        return new NetworkAddress(ccConfig.clientNetIpAddress, 
ccConfig.clientNetPort);
+        return new NetworkAddress(ccConfig.getClientListenAddress(), 
ccConfig.getClientListenPort());
     }
 
     private final class ClusterControllerContext implements ICCContext {
@@ -390,6 +384,7 @@ public class ClusterControllerService implements 
IControllerService {
         public ClusterTopology getClusterTopology() {
             return topology;
         }
+
     }
 
     private class DeadNodeSweeper extends TimerTask {
@@ -458,4 +453,14 @@ public class ClusterControllerService implements 
IControllerService {
     public ThreadDumpRun removeThreadDumpRun(String requestKey) {
         return threadDumpRunMap.remove(requestKey);
     }
+
+    private static ICCApplicationEntryPoint getApplicationEntryPoint(CCConfig 
ccConfig)
+            throws ClassNotFoundException, IllegalAccessException, 
InstantiationException {
+        if (ccConfig.getAppClass() != null) {
+            Class<?> c = Class.forName(ccConfig.getAppClass());
+            return (ICCApplicationEntryPoint) c.newInstance();
+        } else {
+            return CCApplicationEntryPoint.INSTANCE;
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/4c7b5bfa/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/NodeControllerState.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/NodeControllerState.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/NodeControllerState.java
index 955b7f2..8400a59 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/NodeControllerState.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/NodeControllerState.java
@@ -282,7 +282,7 @@ public class NodeControllerState {
     public synchronized ObjectNode toSummaryJSON()  {
         ObjectMapper om = new ObjectMapper();
         ObjectNode o = om.createObjectNode();
-        o.put("node-id", ncConfig.nodeId);
+        o.put("node-id", ncConfig.getNodeId());
         o.put("heap-used", heapUsedSize[(rrdPtr + RRD_SIZE - 1) % RRD_SIZE]);
         o.put("system-load-average", systemLoadAverage[(rrdPtr + RRD_SIZE - 1) 
% RRD_SIZE]);
 
@@ -293,7 +293,7 @@ public class NodeControllerState {
         ObjectMapper om = new ObjectMapper();
         ObjectNode o = om.createObjectNode();
 
-        o.put("node-id", ncConfig.nodeId);
+        o.put("node-id", ncConfig.getNodeId());
 
         if (includeConfig) {
             o.put("os-name", osName);

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/4c7b5bfa/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/application/CCApplicationContext.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/application/CCApplicationContext.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/application/CCApplicationContext.java
index 77b9b17..a8b03bc 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/application/CCApplicationContext.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/application/CCApplicationContext.java
@@ -27,9 +27,10 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
-import org.apache.hyracks.api.application.IApplicationConfig;
 import org.apache.hyracks.api.application.ICCApplicationContext;
 import org.apache.hyracks.api.application.IClusterLifecycleListener;
+import org.apache.hyracks.api.config.IApplicationConfig;
+import org.apache.hyracks.api.config.IOption;
 import org.apache.hyracks.api.context.ICCContext;
 import org.apache.hyracks.api.exceptions.HyracksException;
 import org.apache.hyracks.api.job.IJobLifecycleListener;
@@ -105,7 +106,7 @@ public class CCApplicationContext extends 
ApplicationContext implements ICCAppli
         clusterLifecycleListeners.add(clusterLifecycleListener);
     }
 
-    public void notifyNodeJoin(String nodeId, Map<String, String> 
ncConfiguration) throws HyracksException {
+    public void notifyNodeJoin(String nodeId, Map<IOption, Object> 
ncConfiguration) throws HyracksException {
         for (IClusterLifecycleListener l : clusterLifecycleListeners) {
             l.notifyNodeJoin(nodeId, ncConfiguration);
         }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/4c7b5bfa/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/cluster/NodeManager.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/cluster/NodeManager.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/cluster/NodeManager.java
index 354019c..d6d8bc4 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/cluster/NodeManager.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/cluster/NodeManager.java
@@ -137,7 +137,7 @@ public class NodeManager implements INodeManager {
             Map.Entry<String, NodeControllerState> entry = nodeIterator.next();
             String nodeId = entry.getKey();
             NodeControllerState state = entry.getValue();
-            if (state.incrementLastHeartbeatDuration() >= 
ccConfig.maxHeartbeatLapsePeriods) {
+            if (state.incrementLastHeartbeatDuration() >= 
ccConfig.getHeartbeatMaxMisses()) {
                 deadNodes.add(nodeId);
                 affectedJobIds.addAll(state.getActiveJobIds());
                 // Removes the node from node map.
@@ -172,10 +172,7 @@ public class NodeManager implements INodeManager {
 
     // Retrieves the IP address for a given node.
     private InetAddress getIpAddress(NodeControllerState ncState) throws 
HyracksException {
-        String ipAddress = ncState.getNCConfig().dataIPAddress;
-        if (ncState.getNCConfig().dataPublicIPAddress != null) {
-            ipAddress = ncState.getNCConfig().dataPublicIPAddress;
-        }
+        String ipAddress = ncState.getNCConfig().getDataPublicAddress();
         try {
             return InetAddress.getByName(ipAddress);
         } catch (UnknownHostException e) {

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/4c7b5bfa/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java
index 031303b..b35de3d 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java
@@ -68,13 +68,13 @@ public class JobManager implements IJobManager {
         this.ccs = ccs;
         this.jobCapacityController = jobCapacityController;
         try {
-            Constructor<?> jobQueueConstructor = 
this.getClass().getClassLoader().loadClass(ccConfig.jobQueueClassName)
+            Constructor<?> jobQueueConstructor = 
this.getClass().getClassLoader().loadClass(ccConfig.getJobQueueClass())
                     .getConstructor(IJobManager.class, 
IJobCapacityController.class);
             jobQueue = (IJobQueue) jobQueueConstructor.newInstance(this, 
this.jobCapacityController);
         } catch (ClassNotFoundException | InstantiationException | 
IllegalAccessException | NoSuchMethodException
                 | InvocationTargetException e) {
             if (LOGGER.isLoggable(Level.WARNING)) {
-                LOGGER.log(Level.WARNING, "class " + 
ccConfig.jobQueueClassName + " could not be used: ", e);
+                LOGGER.log(Level.WARNING, "class " + 
ccConfig.getJobQueueClass() + " could not be used: ", e);
             }
             // Falls back to the default implementation if the user-provided 
class name is not valid.
             jobQueue = new FIFOJobQueue(this, jobCapacityController);
@@ -85,13 +85,13 @@ public class JobManager implements IJobManager {
 
             @Override
             protected boolean removeEldestEntry(Map.Entry<JobId, JobRun> 
eldest) {
-                return size() > ccConfig.jobHistorySize;
+                return size() > ccConfig.getJobHistorySize();
             }
         };
         runMapHistory = new LinkedHashMap<JobId, List<Exception>>() {
             private static final long serialVersionUID = 1L;
             /** history size + 1 is for the case when history size = 0 */
-            private int allowedSize = 100 * (ccConfig.jobHistorySize + 1);
+            private int allowedSize = 100 * (ccConfig.getJobHistorySize() + 1);
 
             @Override
             protected boolean removeEldestEntry(Map.Entry<JobId, 
List<Exception>> eldest) {

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/4c7b5bfa/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/GetNodeDetailsJSONWork.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/GetNodeDetailsJSONWork.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/GetNodeDetailsJSONWork.java
index 0577002..3dec959 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/GetNodeDetailsJSONWork.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/GetNodeDetailsJSONWork.java
@@ -27,28 +27,26 @@ import java.lang.management.MemoryUsage;
 import java.lang.management.OperatingSystemMXBean;
 import java.lang.management.RuntimeMXBean;
 import java.lang.management.ThreadMXBean;
-import java.lang.reflect.Field;
 import java.util.ArrayList;
 import java.util.Date;
-import java.util.HashMap;
 import java.util.List;
-import java.util.Map;
-import java.util.logging.Level;
-import java.util.logging.Logger;
 
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import org.apache.hyracks.api.config.Section;
 import org.apache.hyracks.control.cc.NodeControllerState;
 import org.apache.hyracks.control.cc.cluster.INodeManager;
+import org.apache.hyracks.control.common.config.ConfigUtils;
 import org.apache.hyracks.control.common.controllers.CCConfig;
+import org.apache.hyracks.control.common.controllers.NCConfig;
 import org.apache.hyracks.control.common.utils.PidHelper;
 import org.apache.hyracks.control.common.work.IPCResponder;
 import org.apache.hyracks.control.common.work.SynchronizableWork;
-import org.kohsuke.args4j.Option;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.node.ObjectNode;
 
 public class GetNodeDetailsJSONWork extends SynchronizableWork {
-    private static final Logger LOGGER = 
Logger.getLogger(GetNodeDetailsJSONWork.class.getName());
+    private static final Section [] CC_SECTIONS = { Section.CC, Section.COMMON 
};
+    private static final Section [] NC_SECTIONS = { Section.NC, Section.COMMON 
};
+
     private final INodeManager nodeManager;
     private final CCConfig ccConfig;
     private final String nodeId;
@@ -59,7 +57,7 @@ public class GetNodeDetailsJSONWork extends 
SynchronizableWork {
     private ObjectMapper om = new ObjectMapper();
 
     public GetNodeDetailsJSONWork(INodeManager nodeManager, CCConfig ccConfig, 
String nodeId, boolean includeStats,
-                                  boolean includeConfig, IPCResponder<String> 
callback) {
+            boolean includeConfig, IPCResponder<String> callback) {
         this.nodeManager = nodeManager;
         this.ccConfig = ccConfig;
         this.nodeId = nodeId;
@@ -69,7 +67,7 @@ public class GetNodeDetailsJSONWork extends 
SynchronizableWork {
     }
 
     public GetNodeDetailsJSONWork(INodeManager nodeManager, CCConfig ccConfig, 
String nodeId, boolean includeStats,
-                                  boolean includeConfig) {
+            boolean includeConfig) {
         this(nodeManager, ccConfig, nodeId, includeStats, includeConfig, null);
     }
 
@@ -79,14 +77,18 @@ public class GetNodeDetailsJSONWork extends 
SynchronizableWork {
             // null nodeId is a request for CC
             detail = getCCDetails();
             if (includeConfig) {
-                addIni(detail, ccConfig);
+                ConfigUtils.addConfigToJSON(detail, ccConfig.getAppConfig(), 
CC_SECTIONS);
+                detail.putPOJO("app.args", ccConfig.getAppArgs());
             }
         } else {
             NodeControllerState ncs = 
nodeManager.getNodeControllerState(nodeId);
             if (ncs != null) {
                 detail = ncs.toDetailedJSON(includeStats, includeConfig);
                 if (includeConfig) {
-                    addIni(detail, ncs.getNCConfig());
+                    final NCConfig ncConfig = ncs.getNCConfig();
+                    ConfigUtils.addConfigToJSON(detail, 
ncConfig.getConfigManager().getNodeEffectiveConfig(nodeId),
+                            NC_SECTIONS);
+                    detail.putPOJO("app.args", ncConfig.getAppArgs());
                 }
             }
         }
@@ -96,7 +98,7 @@ public class GetNodeDetailsJSONWork extends 
SynchronizableWork {
         }
     }
 
-    private ObjectNode getCCDetails()  {
+    private ObjectNode getCCDetails() {
         ObjectNode o = om.createObjectNode();
         MemoryMXBean memoryMXBean = ManagementFactory.getMemoryMXBean();
         List<GarbageCollectorMXBean> gcMXBeans = 
ManagementFactory.getGarbageCollectorMXBeans();
@@ -151,33 +153,6 @@ public class GetNodeDetailsJSONWork extends 
SynchronizableWork {
         return o;
     }
 
-    private static void addIni(ObjectNode o, Object configBean)  {
-        Map<String, Object> iniMap = new HashMap<>();
-        for (Field f : configBean.getClass().getFields()) {
-            Option option = f.getAnnotation(Option.class);
-            if (option == null) {
-                continue;
-            }
-            final String optionName = option.name();
-            Object value = null;
-            try {
-                value = f.get(configBean);
-            } catch (IllegalAccessException e) {
-                LOGGER.log(Level.WARNING, "Unable to access ini option " + 
optionName, e);
-            }
-            if (value != null) {
-                if ("--".equals(optionName)) {
-                    iniMap.put("app_args", value);
-                } else {
-                    iniMap.put(optionName.substring(1).replace('-', '_'),
-                            "-iodevices".equals(optionName)
-                            ? String.valueOf(value).split(",")
-                            : value);
-                }
-            }
-        }
-        o.putPOJO("ini", iniMap);
-    }
 
     public ObjectNode getDetail() {
         return detail;

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/4c7b5bfa/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RegisterNodeWork.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RegisterNodeWork.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RegisterNodeWork.java
index dc93515..e97950e 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RegisterNodeWork.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RegisterNodeWork.java
@@ -23,6 +23,8 @@ import java.util.Map;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
+import org.apache.hyracks.api.config.IApplicationConfig;
+import org.apache.hyracks.api.config.IOption;
 import org.apache.hyracks.control.cc.ClusterControllerService;
 import org.apache.hyracks.control.cc.NodeControllerState;
 import org.apache.hyracks.control.cc.cluster.INodeManager;
@@ -50,19 +52,22 @@ public class RegisterNodeWork extends SynchronizableWork {
         String id = reg.getNodeId();
         IIPCHandle ncIPCHandle = 
ccs.getClusterIPC().getHandle(reg.getNodeControllerAddress());
         CCNCFunctions.NodeRegistrationResult result;
-        Map<String, String> ncConfiguration = new HashMap<>();
+        Map<IOption, Object> ncConfiguration = new HashMap<>();
         try {
             INodeController nodeController = new 
NodeControllerRemoteProxy(ncIPCHandle);
             NodeControllerState state = new 
NodeControllerState(nodeController, reg);
             INodeManager nodeManager = ccs.getNodeManager();
             nodeManager.addNode(id, state);
-            state.getNCConfig().toMap(ncConfiguration);
+            IApplicationConfig cfg = 
state.getNCConfig().getConfigManager().getNodeEffectiveConfig(id);
+            for (IOption option : cfg.getOptions()) {
+                ncConfiguration.put(option, cfg.get(option));
+            }
             LOGGER.log(Level.INFO, "Registered INodeController: id = " + id);
             NodeParameters params = new NodeParameters();
             params.setClusterControllerInfo(ccs.getClusterControllerInfo());
             
params.setDistributedState(ccs.getApplicationContext().getDistributedState());
-            params.setHeartbeatPeriod(ccs.getCCConfig().heartbeatPeriod);
-            params.setProfileDumpPeriod(ccs.getCCConfig().profileDumpPeriod);
+            params.setHeartbeatPeriod(ccs.getCCConfig().getHeartbeatPeriod());
+            
params.setProfileDumpPeriod(ccs.getCCConfig().getProfileDumpPeriod());
             result = new CCNCFunctions.NodeRegistrationResult(params, null);
         } catch (Exception e) {
             result = new CCNCFunctions.NodeRegistrationResult(null, e);

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/4c7b5bfa/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/TriggerNCWork.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/TriggerNCWork.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/TriggerNCWork.java
index a7bca25..ab526e8 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/TriggerNCWork.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/TriggerNCWork.java
@@ -27,7 +27,9 @@ import java.net.Socket;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
+import org.apache.hyracks.api.config.Section;
 import org.apache.hyracks.control.cc.ClusterControllerService;
+import org.apache.hyracks.control.common.controllers.NCConfig;
 import 
org.apache.hyracks.control.common.controllers.ServiceConstants.ServiceCommand;
 import org.apache.hyracks.control.common.work.AbstractWork;
 import org.ini4j.Ini;
@@ -79,14 +81,18 @@ public class TriggerNCWork extends AbstractWork {
 
     /**
      * Given an Ini object, serialize it to String with some enhancements.
-     * @param ccini
+     * @param ccini the ini file to decorate and forward to NC
      */
-    String serializeIni(Ini ccini) throws IOException {
+    private String serializeIni(Ini ccini) throws IOException {
         StringWriter iniString = new StringWriter();
-        ccini.store(iniString);
+        
ccini.get(Section.NC.sectionName()).putIfAbsent(NCConfig.Option.CLUSTER_ADDRESS.ini(),
+                ccs.getCCConfig().getClusterPublicAddress());
+        
ccini.get(Section.NC.sectionName()).putIfAbsent(NCConfig.Option.CLUSTER_PORT.ini(),
+                String.valueOf(ccs.getCCConfig().getClusterPublicPort()));
         // Finally insert *this* NC's name into localnc section - this is a 
fixed
         // entry point so that NCs can determine where all their config is.
-        iniString.append("\n[localnc]\nid=").append(ncId).append("\n");
+        ccini.put(Section.LOCALNC.sectionName(), 
NCConfig.Option.NODE_ID.ini(), ncId);
+        ccini.store(iniString);
         if (LOGGER.isLoggable(Level.FINE)) {
             LOGGER.fine("Returning Ini file:\n" + iniString.toString());
         }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/4c7b5bfa/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/test/java/org/apache/hyracks/control/cc/cluster/NodeManagerTest.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/test/java/org/apache/hyracks/control/cc/cluster/NodeManagerTest.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/test/java/org/apache/hyracks/control/cc/cluster/NodeManagerTest.java
index c742a4a..dde3bad 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/test/java/org/apache/hyracks/control/cc/cluster/NodeManagerTest.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/test/java/org/apache/hyracks/control/cc/cluster/NodeManagerTest.java
@@ -46,8 +46,8 @@ public class NodeManagerTest {
     public void testNormal() throws HyracksException {
         IResourceManager resourceManager = new ResourceManager();
         INodeManager nodeManager = new NodeManager(makeCCConfig(), 
resourceManager);
-        NodeControllerState ncState1 = mockNodeControllerState(false);
-        NodeControllerState ncState2 = mockNodeControllerState(false);
+        NodeControllerState ncState1 = mockNodeControllerState(NODE1, false);
+        NodeControllerState ncState2 = mockNodeControllerState(NODE2, false);
 
         // Verifies states after adding nodes.
         nodeManager.addNode(NODE1, ncState1);
@@ -71,7 +71,7 @@ public class NodeManagerTest {
     public void testException() throws HyracksException {
         IResourceManager resourceManager = new ResourceManager();
         INodeManager nodeManager = new NodeManager(makeCCConfig(), 
resourceManager);
-        NodeControllerState ncState1 = mockNodeControllerState(true);
+        NodeControllerState ncState1 = mockNodeControllerState(NODE1, true);
 
         boolean invalidNetworkAddress = false;
         // Verifies states after a failure during adding nodes.
@@ -106,11 +106,11 @@ public class NodeManagerTest {
 
     private CCConfig makeCCConfig() {
         CCConfig ccConfig = new CCConfig();
-        ccConfig.maxHeartbeatLapsePeriods = 0;
+        ccConfig.setHeartbeatMaxMisses(0);
         return ccConfig;
     }
 
-    private NodeControllerState mockNodeControllerState(boolean invalidIpAddr) 
{
+    private NodeControllerState mockNodeControllerState(String nodeId, boolean 
invalidIpAddr) {
         NodeControllerState ncState = mock(NodeControllerState.class);
         String ipAddr = invalidIpAddr ? "255.255.255:255" : "127.0.0.2";
         NetworkAddress dataAddr = new NetworkAddress(ipAddr, 1001);
@@ -120,8 +120,8 @@ public class NodeManagerTest {
         when(ncState.getDataPort()).thenReturn(dataAddr);
         when(ncState.getDatasetPort()).thenReturn(resultAddr);
         when(ncState.getMessagingPort()).thenReturn(msgAddr);
-        NCConfig ncConfig = new NCConfig();
-        ncConfig.dataIPAddress = ipAddr;
+        NCConfig ncConfig = new NCConfig(nodeId);
+        ncConfig.setDataPublicAddress(ipAddr);
         when(ncState.getNCConfig()).thenReturn(ncConfig);
         return ncState;
     }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/4c7b5bfa/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/test/java/org/apache/hyracks/control/cc/job/JobManagerTest.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/test/java/org/apache/hyracks/control/cc/job/JobManagerTest.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/test/java/org/apache/hyracks/control/cc/job/JobManagerTest.java
index 3bb08bd..97b05e7 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/test/java/org/apache/hyracks/control/cc/job/JobManagerTest.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/test/java/org/apache/hyracks/control/cc/job/JobManagerTest.java
@@ -27,6 +27,7 @@ import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashSet;
@@ -48,14 +49,23 @@ import 
org.apache.hyracks.control.common.base.INodeController;
 import org.apache.hyracks.control.common.controllers.CCConfig;
 import org.apache.hyracks.control.common.logs.LogFile;
 import org.junit.Assert;
+import org.junit.Before;
 import org.junit.Test;
+import org.kohsuke.args4j.CmdLineException;
 import org.mockito.Mockito;
 
 public class JobManagerTest {
 
+    private CCConfig ccConfig;
+
+    @Before
+    public void setup() throws IOException, CmdLineException {
+        ccConfig = new CCConfig();
+        ccConfig.getConfigManager().processConfig();
+    }
+
     @Test
-    public void test() throws HyracksException {
-        CCConfig ccConfig = new CCConfig();
+    public void test() throws IOException, CmdLineException {
         IJobCapacityController jobCapacityController = 
mock(IJobCapacityController.class);
         IJobManager jobManager = spy(new JobManager(ccConfig, 
mockClusterControllerService(), jobCapacityController));
 
@@ -114,7 +124,7 @@ public class JobManagerTest {
         }
         Assert.assertTrue(jobManager.getRunningJobs().size() == 4096);
         Assert.assertTrue(jobManager.getPendingJobs().isEmpty());
-        Assert.assertTrue(jobManager.getArchivedJobs().size() == 
ccConfig.jobHistorySize);
+        Assert.assertTrue(jobManager.getArchivedJobs().size() == 
ccConfig.getJobHistorySize());
 
         // Completes deferred jobs.
         for (JobRun run : deferredRuns) {
@@ -123,14 +133,13 @@ public class JobManagerTest {
         }
         Assert.assertTrue(jobManager.getRunningJobs().isEmpty());
         Assert.assertTrue(jobManager.getPendingJobs().isEmpty());
-        Assert.assertTrue(jobManager.getArchivedJobs().size() == 
ccConfig.jobHistorySize);
+        Assert.assertTrue(jobManager.getArchivedJobs().size() == 
ccConfig.getJobHistorySize());
         verify(jobManager, times(8192)).prepareComplete(any(), any(), any());
         verify(jobManager, times(8192)).finalComplete(any());
     }
 
     @Test
     public void testExceedMax() throws HyracksException {
-        CCConfig ccConfig = new CCConfig();
         IJobCapacityController jobCapacityController = 
mock(IJobCapacityController.class);
         IJobManager jobManager = spy(new JobManager(ccConfig, 
mockClusterControllerService(), jobCapacityController));
         boolean rejected = false;
@@ -154,7 +163,6 @@ public class JobManagerTest {
 
     @Test
     public void testAdmitThenReject() throws HyracksException {
-        CCConfig ccConfig = new CCConfig();
         IJobCapacityController jobCapacityController = 
mock(IJobCapacityController.class);
         IJobManager jobManager = spy(new JobManager(ccConfig, 
mockClusterControllerService(), jobCapacityController));
 
@@ -185,7 +193,6 @@ public class JobManagerTest {
 
     @Test
     public void testNullJob() throws HyracksException {
-        CCConfig ccConfig = new CCConfig();
         IJobCapacityController jobCapacityController = 
mock(IJobCapacityController.class);
         IJobManager jobManager = new JobManager(ccConfig, 
mockClusterControllerService(), jobCapacityController);
         boolean invalidParameter = false;
@@ -249,7 +256,7 @@ public class JobManagerTest {
         }
 
         Assert.assertTrue(jobManager.getPendingJobs().isEmpty());
-        Assert.assertTrue(jobManager.getArchivedJobs().size() == 
ccConfig.jobHistorySize);
+        Assert.assertTrue(jobManager.getArchivedJobs().size() == 
ccConfig.getJobHistorySize());
         verify(jobManager, times(0)).prepareComplete(any(), any(), any());
         verify(jobManager, times(0)).finalComplete(any());
     }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/4c7b5bfa/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/pom.xml
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/pom.xml 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/pom.xml
index 08783cc..bd6960a 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/pom.xml
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/pom.xml
@@ -48,7 +48,6 @@
     <dependency>
       <groupId>args4j</groupId>
       <artifactId>args4j</artifactId>
-      <version>2.0.12</version>
     </dependency>
     <dependency>
       <groupId>org.apache.hyracks</groupId>
@@ -67,5 +66,14 @@
       <groupId>com.fasterxml.jackson.core</groupId>
       <artifactId>jackson-databind</artifactId>
     </dependency>
+    <dependency>
+      <groupId>org.apache.hyracks</groupId>
+      <artifactId>hyracks-util</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.commons</groupId>
+      <artifactId>commons-collections4</artifactId>
+    </dependency>
   </dependencies>
 </project>

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/4c7b5bfa/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/application/ApplicationContext.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/application/ApplicationContext.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/application/ApplicationContext.java
index 06bcda3..42bc636 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/application/ApplicationContext.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/application/ApplicationContext.java
@@ -21,7 +21,7 @@ package org.apache.hyracks.control.common.application;
 import java.io.Serializable;
 import java.util.concurrent.ThreadFactory;
 
-import org.apache.hyracks.api.application.IApplicationConfig;
+import org.apache.hyracks.api.config.IApplicationConfig;
 import org.apache.hyracks.api.application.IApplicationContext;
 import org.apache.hyracks.api.job.IJobSerializerDeserializerContainer;
 import org.apache.hyracks.api.job.JobSerializerDeserializerContainer;

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/4c7b5bfa/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/application/ConfigManagerApplicationConfig.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/application/ConfigManagerApplicationConfig.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/application/ConfigManagerApplicationConfig.java
new file mode 100644
index 0000000..92e90e7
--- /dev/null
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/application/ConfigManagerApplicationConfig.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.control.common.application;
+
+import java.io.Serializable;
+import java.util.List;
+import java.util.Set;
+import java.util.function.Predicate;
+
+import org.apache.hyracks.api.config.IApplicationConfig;
+import org.apache.hyracks.api.config.IOption;
+import org.apache.hyracks.api.config.Section;
+import org.apache.hyracks.control.common.config.ConfigManager;
+
+/**
+ * An implementation of IApplicationConfig which is backed by the Config 
Manager.
+ */
+public class ConfigManagerApplicationConfig implements IApplicationConfig, 
Serializable {
+    private static final long serialVersionUID = 1L;
+
+    private final ConfigManager configManager;
+
+    public ConfigManagerApplicationConfig(ConfigManager configManager) {
+        this.configManager = configManager;
+    }
+
+    @Override
+    public String getString(String section, String key) {
+        return (String)get(section, key);
+    }
+
+    @Override
+    public int getInt(String section, String key) {
+        return (int)get(section, key);
+    }
+
+    @Override
+    public long getLong(String section, String key) {
+        return (long)get(section, key);
+    }
+
+    @Override
+    public Set<String> getSectionNames() {
+        return configManager.getSectionNames();
+    }
+
+    @Override
+    public Set<Section> getSections() {
+        return configManager.getSections();
+    }
+
+    @Override
+    public Set<Section> getSections(Predicate<Section> predicate) {
+        return configManager.getSections(predicate);
+    }
+
+    @Override
+    public Set<String> getKeys(String section) {
+        return configManager.getOptionNames(section);
+    }
+
+    private Object get(String section, String key) {
+        return get(configManager.lookupOption(section, key));
+    }
+
+    @Override
+    public Object getStatic(IOption option) {
+        return configManager.get(option);
+    }
+
+    @Override
+    public List<String> getNCNames() {
+        return configManager.getNodeNames();
+    }
+
+    @Override
+    public IOption lookupOption(String sectionName, String propertyName) {
+        return configManager.lookupOption(sectionName, propertyName);
+    }
+
+    @Override
+    public Set<IOption> getOptions() {
+        return configManager.getOptions();
+    }
+
+    @Override
+    public Set<IOption> getOptions(Section section) {
+        return configManager.getOptions(section);
+    }
+
+    @Override
+    public IApplicationConfig getNCEffectiveConfig(String nodeId) {
+        return configManager.getNodeEffectiveConfig(nodeId);
+    }
+
+    public ConfigManager getConfigManager() {
+        return configManager;
+    }
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/4c7b5bfa/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/application/IniApplicationConfig.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/application/IniApplicationConfig.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/application/IniApplicationConfig.java
deleted file mode 100644
index 53db11c..0000000
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/application/IniApplicationConfig.java
+++ /dev/null
@@ -1,105 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.hyracks.control.common.application;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.hyracks.api.application.IApplicationConfig;
-import org.apache.hyracks.control.common.controllers.IniUtils;
-import org.ini4j.Ini;
-import org.ini4j.Profile.Section;
-
-/**
- * An implementation of IApplicationConfig which is backed by Ini4j.
- */
-public class IniApplicationConfig implements IApplicationConfig {
-    private final Ini ini;
-
-    public IniApplicationConfig(Ini ini) {
-        if (ini != null) {
-            this.ini = ini;
-        } else {
-            this.ini = new Ini();
-        }
-    }
-
-    @Override
-    public String getString(String section, String key) {
-        return IniUtils.getString(ini, section, key, null);
-    }
-
-    @Override
-    public String getString(String section, String key, String defaultValue) {
-        return IniUtils.getString(ini, section, key, defaultValue);
-    }
-
-    @Override
-    public String[] getStringArray(String section, String key) {
-        return IniUtils.getStringArray(ini, section, key);
-    }
-
-    @Override
-    public int getInt(String section, String key) {
-        return IniUtils.getInt(ini, section, key, 0);
-    }
-
-    @Override
-    public int getInt(String section, String key, int defaultValue) {
-        return IniUtils.getInt(ini, section, key, defaultValue);
-    }
-
-    @Override
-    public long getLong(String section, String key) {
-        return IniUtils.getLong(ini, section, key, 0);
-    }
-
-    @Override
-    public long getLong(String section, String key, long defaultValue) {
-        return IniUtils.getLong(ini, section, key, defaultValue);
-    }
-
-    @Override
-    public Set<String> getSections() {
-        return ini.keySet();
-    }
-
-    @Override
-    public Set<String> getKeys(String section) {
-        return ini.get(section).keySet();
-    }
-
-    @Override
-    public List<Set<Map.Entry<String, String>>> getMultiSections(String 
section) {
-        List<Set<Map.Entry<String, String>>> list = new ArrayList<>();
-        List<Section> secs = getMulti(section);
-        if (secs != null) {
-            for (Section sec : secs) {
-                list.add(sec.entrySet());
-            }
-        }
-        return list;
-    }
-
-    private List<Section> getMulti(String section) {
-        return ini.getAll(section);
-    }
-}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/4c7b5bfa/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/config/Args4jArgument.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/config/Args4jArgument.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/config/Args4jArgument.java
new file mode 100644
index 0000000..de9b543
--- /dev/null
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/config/Args4jArgument.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.control.common.config;
+
+import java.lang.annotation.Annotation;
+
+import org.kohsuke.args4j.Argument;
+import org.kohsuke.args4j.spi.OptionHandler;
+import org.kohsuke.args4j.spi.StringOptionHandler;
+
+@SuppressWarnings("ClassExplicitlyAnnotation")
+public class Args4jArgument implements Argument {
+    @Override
+    public String usage() {
+        return "";
+    }
+
+    @Override
+    public String metaVar() {
+        return "";
+    }
+
+    @Override
+    public boolean required() {
+        return false;
+    }
+
+    @Override
+    public boolean hidden() {
+        return false;
+    }
+
+    @Override
+    public Class<? extends OptionHandler> handler() {
+        return StringOptionHandler.class;
+    }
+
+    @Override
+    public int index() {
+        return 0;
+    }
+
+    @Override
+    public boolean multiValued() {
+        return true;
+    }
+
+    @Override
+    public Class<? extends Annotation> annotationType() {
+        return Argument.class;
+    }
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/4c7b5bfa/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/config/Args4jOption.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/config/Args4jOption.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/config/Args4jOption.java
new file mode 100644
index 0000000..c904d0b
--- /dev/null
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/config/Args4jOption.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.control.common.config;
+
+import java.lang.annotation.Annotation;
+
+import org.apache.hyracks.api.config.IOption;
+import org.kohsuke.args4j.Option;
+import org.kohsuke.args4j.spi.ExplicitBooleanOptionHandler;
+import org.kohsuke.args4j.spi.IntOptionHandler;
+import org.kohsuke.args4j.spi.OptionHandler;
+import org.kohsuke.args4j.spi.StringOptionHandler;
+
+@SuppressWarnings("ClassExplicitlyAnnotation")
+class Args4jOption implements Option {
+    private final IOption option;
+    private final ConfigManager configManager;
+    private final Class targetType;
+
+    Args4jOption(IOption option, ConfigManager configManager, Class 
targetType) {
+        this.option = option;
+        this.targetType = targetType;
+        this.configManager = configManager;
+    }
+
+    @Override
+    public String name() {
+        return option.cmdline();
+    }
+
+    @Override
+    public String[] aliases() {
+        return new String[0];
+    }
+
+    @Override
+    public String usage() {
+        return configManager.getUsage(option);
+    }
+
+    @Override
+    public String metaVar() {
+        return "";
+    }
+
+    @Override
+    public boolean required() {
+        return false;
+    }
+
+    @Override
+    public boolean help() {
+        return false;
+    }
+
+    @Override
+    public boolean hidden() {
+        return option.hidden();
+    }
+
+    @Override
+    public Class<? extends OptionHandler> handler() {
+        if (targetType.equals(Boolean.class)) {
+            return ExplicitBooleanOptionHandler.class;
+        } else if (targetType.equals(Integer.class)) {
+            return IntOptionHandler.class;
+        } else {
+            return StringOptionHandler.class;
+        }
+    }
+
+    @Override
+    public String[] depends() {
+        return new String[0];
+    }
+
+    @Override
+    public String[] forbids() {
+        return new String[0];
+    }
+
+    @Override
+    public Class<? extends Annotation> annotationType() {
+        return Option.class;
+    }
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/4c7b5bfa/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/config/Args4jSetter.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/config/Args4jSetter.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/config/Args4jSetter.java
new file mode 100644
index 0000000..1367ef0
--- /dev/null
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/config/Args4jSetter.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.control.common.config;
+
+import java.lang.reflect.AnnotatedElement;
+import java.util.function.BiConsumer;
+import java.util.function.Consumer;
+
+import org.apache.hyracks.api.config.IOption;
+import org.kohsuke.args4j.CmdLineException;
+import org.kohsuke.args4j.spi.FieldSetter;
+import org.kohsuke.args4j.spi.Setter;
+
+class Args4jSetter implements Setter {
+    private final IOption option;
+    private BiConsumer<IOption, Object> consumer;
+    private final boolean multiValued;
+    private final Class type;
+
+    Args4jSetter(IOption option, BiConsumer<IOption, Object> consumer, boolean 
multiValued) {
+        this.option = option;
+        this.consumer = consumer;
+        this.multiValued = multiValued;
+        this.type = option.type().targetType();
+    }
+
+    Args4jSetter(Consumer<Object> consumer, boolean multiValued, Class type) {
+        this.option = null;
+        this.consumer = (o, value) -> consumer.accept(value);
+        this.multiValued = multiValued;
+        this.type = type;
+    }
+
+    @Override
+    public void addValue(Object value) throws CmdLineException {
+        consumer.accept(option, value);
+    }
+
+    @Override
+    public Class getType() {
+        return type;
+    }
+
+    @Override
+    public boolean isMultiValued() {
+        return multiValued;
+    }
+
+    @Override
+    public FieldSetter asFieldSetter() {
+        return null;
+    }
+
+    @Override
+    public AnnotatedElement asAnnotatedElement() {
+        return null;
+    }
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/4c7b5bfa/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/config/ConfigManager.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/config/ConfigManager.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/config/ConfigManager.java
new file mode 100644
index 0000000..bfe759e
--- /dev/null
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/config/ConfigManager.java
@@ -0,0 +1,549 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.control.common.config;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.EnumMap;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.SortedMap;
+import java.util.TreeMap;
+import java.util.function.BiConsumer;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import java.util.function.Predicate;
+import java.util.function.Supplier;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import org.apache.commons.collections4.map.CompositeMap;
+import org.apache.commons.collections4.multimap.ArrayListValuedHashMap;
+import org.apache.hyracks.api.config.IApplicationConfig;
+import org.apache.hyracks.api.config.IConfigManager;
+import org.apache.hyracks.api.config.IConfigurator;
+import org.apache.hyracks.api.config.IOption;
+import org.apache.hyracks.api.config.Section;
+import org.apache.hyracks.api.exceptions.HyracksException;
+import 
org.apache.hyracks.control.common.application.ConfigManagerApplicationConfig;
+import org.ini4j.Ini;
+import org.ini4j.Profile;
+import org.kohsuke.args4j.CmdLineException;
+import org.kohsuke.args4j.CmdLineParser;
+import org.kohsuke.args4j.Option;
+import org.kohsuke.args4j.OptionHandlerFilter;
+
+public class ConfigManager implements IConfigManager, Serializable {
+
+    private static final long serialVersionUID = 1L;
+    private static final Logger LOGGER = 
Logger.getLogger(ConfigManager.class.getName());
+
+    private HashSet<IOption> registeredOptions = new HashSet<>();
+    private HashMap<IOption, Object> definedMap = new HashMap<>();
+    private HashMap<IOption, Object> defaultMap = new HashMap<>();
+    private CompositeMap<IOption, Object> configurationMap = new 
CompositeMap<>(definedMap, defaultMap,
+            new NoOpMapMutator());
+    private EnumMap<Section, Map<String, IOption>> sectionMap = new 
EnumMap<>(Section.class);
+    private TreeMap<String, Map<IOption, Object>> nodeSpecificMap = new 
TreeMap<>();
+    private transient ArrayListValuedHashMap<IOption, IConfigSetter> 
optionSetters = new ArrayListValuedHashMap<>();
+    private final String[] args;
+    private ConfigManagerApplicationConfig appConfig = new 
ConfigManagerApplicationConfig(this);
+    private Set<String> allSections = new HashSet<>();
+    private transient Collection<Consumer<List<String>>> argListeners = new 
ArrayList<>();
+    private transient Collection<IOption> iniPointerOptions = new 
ArrayList<>();
+    private transient Collection<Section> cmdLineSections = new ArrayList<>();;
+    private transient OptionHandlerFilter usageFilter;
+    private transient SortedMap<Integer, List<IConfigurator>> configurators = 
new TreeMap<>();
+    private boolean configured;
+
+    public ConfigManager() {
+        this(null);
+    }
+
+    public ConfigManager(String[] args) {
+        this.args = args;
+        for (Section section : Section.values()) {
+            allSections.add(section.sectionName());
+        }
+        addConfigurator(PARSE_INI_POINTERS_METRIC, 
this::extractIniPointersFromCommandLine);
+        addConfigurator(PARSE_INI_METRIC, this::parseIni);
+        addConfigurator(PARSE_COMMAND_LINE_METRIC, this::processCommandLine);
+        addConfigurator(APPLY_DEFAULTS_METRIC, this::applyDefaults);
+    }
+
+    @Override
+    public void addConfigurator(int metric, IConfigurator configurator) {
+        configurators.computeIfAbsent(metric, metric1 -> new 
ArrayList<>()).add(configurator);
+    }
+
+    @Override
+    public void addIniParamOptions(IOption... options) {
+        Stream.of(options).forEach(iniPointerOptions::add);
+    }
+
+    @Override
+    public void addCmdLineSections(Section... sections) {
+        Stream.of(sections).forEach(cmdLineSections::add);
+    }
+
+    @Override
+    public void setUsageFilter(OptionHandlerFilter usageFilter) {
+        this.usageFilter = usageFilter;
+    }
+
+    @Override
+    public void register(IOption... options) {
+        for (IOption option : options) {
+            if (option.section() == Section.VIRTUAL || 
registeredOptions.contains(option)) {
+                continue;
+            }
+            if (configured) {
+                throw new IllegalStateException("configuration already 
processed");
+            }
+            LOGGER.fine("registering option: " + option.toIniString());
+            Map<String, IOption> optionMap = 
sectionMap.computeIfAbsent(option.section(), section -> new HashMap<>());
+            IOption prev = optionMap.put(option.ini(), option);
+            if (prev != null) {
+                if (prev != option) {
+                    throw new IllegalStateException("An option cannot be 
defined multiple times: "
+                            + option.toIniString() + ": " + 
Arrays.asList(option.getClass(), prev.getClass()));
+                }
+            } else {
+                registeredOptions.add(option);
+                optionSetters.put(option, (node, value, isDefault) -> 
correctedMap(node, isDefault).put(option, value));
+                if (LOGGER.isLoggable(Level.FINE)) {
+                    optionSetters.put(option, (node, value, isDefault) -> 
LOGGER
+                            .fine((isDefault ? "defaulting" : "setting ") + 
option.toIniString() + " to " + value));
+                }
+            }
+        }
+    }
+
+    private Map<IOption, Object> correctedMap(String node, boolean isDefault) {
+        return node == null ? (isDefault ? defaultMap : definedMap)
+                : nodeSpecificMap.computeIfAbsent(node, 
this::createNodeSpecificMap);
+    }
+
+    public void registerVirtualNode(String nodeId) {
+        LOGGER.fine("registerVirtualNode: " + nodeId);
+        nodeSpecificMap.computeIfAbsent(nodeId, this::createNodeSpecificMap);
+    }
+
+    private Map<IOption, Object> createNodeSpecificMap(String nodeId) {
+        LOGGER.fine("createNodeSpecificMap: " + nodeId);
+        return new HashMap<>();
+    }
+
+    @Override
+    @SafeVarargs
+    public final void register(final Class<? extends IOption>... 
optionClasses) {
+        for (Class<? extends IOption> optionClass : optionClasses) {
+            register(optionClass.getEnumConstants());
+        }
+    }
+
+    public IOption lookupOption(String section, String key) {
+        Map<String, IOption> map = 
getSectionOptionMap(Section.parseSectionName(section));
+        return map == null ? null : map.get(key);
+    }
+
+    public void processConfig()
+            throws CmdLineException, IOException {
+        if (!configured) {
+            for (List<IConfigurator> configuratorList : 
configurators.values()) {
+                for (IConfigurator configurator : configuratorList) {
+                    configurator.run();
+                }
+            }
+            configured = true;
+        }
+    }
+
+    private void processCommandLine() throws CmdLineException {
+        List<String> appArgs = processCommandLine(cmdLineSections, 
usageFilter, this::cmdLineSet);
+        // now propagate the app args to the listeners...
+        argListeners.forEach(l -> l.accept(appArgs));
+    }
+
+    private void extractIniPointersFromCommandLine() throws CmdLineException {
+        Map<IOption, Object> cmdLineOptions = new HashMap<>();
+        processCommandLine(cmdLineSections, usageFilter, cmdLineOptions::put);
+        for (IOption option : iniPointerOptions) {
+            if (cmdLineOptions.containsKey(option)) {
+                set(option, cmdLineOptions.get(option));
+            }
+        }
+    }
+
+    private void cmdLineSet(IOption option, Object value) {
+        invokeSetters(option, option.type().parse(String.valueOf(value)), 
null);
+    }
+
+    private void invokeSetters(IOption option, Object value, String nodeId) {
+        optionSetters.get(option).forEach(setter -> setter.set(nodeId, value, 
false));
+    }
+
+    @SuppressWarnings({ "squid:S106", "squid:S1147" }) // use of System.err, 
System.exit()
+    private List<String> processCommandLine(Collection<Section> sections, 
OptionHandlerFilter usageFilter,
+            BiConsumer<IOption, Object> setAction)
+            throws CmdLineException {
+        final Args4jBean bean = new Args4jBean();
+        CmdLineParser cmdLineParser = new CmdLineParser(bean);
+        final List<String> appArgs = new ArrayList<>();
+        List<IOption> commandLineOptions = new ArrayList<>();
+        for (Map.Entry<Section, Map<String, IOption>> sectionMapEntry : 
sectionMap.entrySet()) {
+            if (!sections.contains(sectionMapEntry.getKey())) {
+                continue;
+            }
+            for (IOption option : sectionMapEntry.getValue().values()) {
+                if (option.section() != Section.VIRTUAL) {
+                    commandLineOptions.add(option);
+                }
+            }
+        }
+        commandLineOptions.sort(Comparator.comparing(IOption::cmdline));
+
+        commandLineOptions.forEach(option -> cmdLineParser.addOption(new 
Args4jSetter(option, setAction, false),
+                new Args4jOption(option, this, option.type().targetType())));
+
+        if (!argListeners.isEmpty()) {
+            cmdLineParser.addArgument(new Args4jSetter(o -> 
appArgs.add(String.valueOf(o)), true, String.class),
+                    new Args4jArgument());
+        }
+        LOGGER.fine("parsing cmdline: " + Arrays.toString(args));
+        try {
+            if (args == null || args.length == 0) {
+                LOGGER.info("no command line args supplied");
+                return appArgs;
+            }
+            cmdLineParser.parseArgument(args);
+            if (bean.help) {
+                ConfigUtils.printUsage(cmdLineParser, usageFilter, System.err);
+                System.exit(0);
+            }
+        } catch (CmdLineException e) {
+            if (bean.help) {
+                ConfigUtils.printUsage(cmdLineParser, usageFilter, System.err);
+                System.exit(0);
+            } else {
+                ConfigUtils.printUsage(e, usageFilter, System.err);
+                throw e;
+            }
+        }
+        return appArgs;
+    }
+
+    private void parseIni() throws IOException {
+        Ini ini = null;
+        for (IOption option : iniPointerOptions) {
+            Object pointer = get(option);
+            if (pointer instanceof String) {
+                ini = ConfigUtils.loadINIFile((String)pointer);
+            } else if (pointer instanceof URL) {
+                ini = ConfigUtils.loadINIFile((URL)pointer);
+            } else if (pointer != null) {
+                throw new IllegalArgumentException("config file pointer 
options must be of type String (for file) or " +
+                        "URL, instead of " + option.type().targetType());
+            }
+        }
+        if (ini == null) {
+            LOGGER.info("no INI file specified; skipping parsing");
+            return;
+        }
+        LOGGER.info("parsing INI file: " + ini);
+        for (Profile.Section section : ini.values()) {
+            allSections.add(section.getName());
+            final Section rootSection = Section
+                    .parseSectionName(section.getParent() == null ? 
section.getName() : section.getParent().getName());
+            String node;
+            if (rootSection == Section.EXTENSION) {
+                parseExtensionIniSection(section);
+                continue;
+            } else if (rootSection == Section.NC) {
+                node = section.getName().equals(section.getSimpleName()) ? 
null : section.getSimpleName();
+            } else if (Section.parseSectionName(section.getName()) != null) {
+                node = null;
+            } else {
+                throw new HyracksException("Unknown section in ini: " + 
section.getName());
+            }
+            Map<String, IOption> optionMap = getSectionOptionMap(rootSection);
+            for (Map.Entry<String, String> iniOption : section.entrySet()) {
+                String name = iniOption.getKey();
+                final IOption option = optionMap == null ? null : 
optionMap.get(name);
+                if (option == null) {
+                    handleUnknownOption(section, name);
+                    return;
+                }
+                final String value = iniOption.getValue();
+                LOGGER.fine("setting " + option.toIniString() + " to " + 
value);
+                final Object parsed = option.type().parse(value);
+                invokeSetters(option, parsed, node);
+            }
+        }
+    }
+
+    private void parseExtensionIniSection(Profile.Section section) {
+        // TODO(mblow): parse extensions
+    }
+
+    private void handleUnknownOption(Profile.Section section, String name) 
throws HyracksException {
+        Set<String> matches = new HashSet<>();
+        for (IOption registeredOption : registeredOptions) {
+            if (registeredOption.ini().equals(name)) {
+                matches.add(registeredOption.section().sectionName());
+            }
+        }
+        if (!matches.isEmpty()) {
+            throw new HyracksException(
+                    "Section mismatch for [" + section.getName() + "] " + name 
+ ", expected section(s) " + matches);
+        } else {
+            throw new HyracksException("Unknown option in ini: [" + 
section.getName() + "] " + name);
+        }
+    }
+
+    private void applyDefaults() {
+        LOGGER.fine("applying defaults");
+        for (Map.Entry<Section, Map<String, IOption>> entry : 
sectionMap.entrySet()) {
+            if (entry.getKey() == Section.NC) {
+                entry.getValue().values().forEach(option -> getNodeNames()
+                        .forEach(node -> 
getOrDefault(getNodeEffectiveMap(node), option, node)));
+                for (Map.Entry<String, Map<IOption, Object>> nodeMap : 
nodeSpecificMap.entrySet()) {
+                    entry.getValue().values()
+                            .forEach(option -> getOrDefault(
+                                    new CompositeMap<>(nodeMap.getValue(), 
definedMap, new NoOpMapMutator()), option,
+                                    nodeMap.getKey()));
+                }
+                // also push the defaults to the shared map, if the CC 
requests NC properties, they should receive the
+                // defaults -- TODO (mblow): seems lame, should log warning on 
access
+            }
+            entry.getValue().values().forEach(option -> 
getOrDefault(configurationMap, option, null));
+        }
+    }
+
+    private Object getOrDefault(Map<IOption, Object> map, IOption option, 
String nodeId) {
+        if (map.containsKey(option)) {
+            return map.get(option);
+        } else {
+            Object value = resolveDefault(option, new 
ConfigManagerApplicationConfig(this) {
+                @Override
+                public Object getStatic(IOption option) {
+                    return getOrDefault(map, option, nodeId);
+                }
+            });
+            if (value != null && optionSetters != null) {
+                optionSetters.get(option).forEach(setter -> setter.set(nodeId, 
value, true));
+            }
+            return value;
+        }
+    }
+
+    public Object resolveDefault(IOption option, IApplicationConfig 
applicationConfig) {
+        final Object value = option.defaultValue();
+        if (value instanceof IOption) {
+            return applicationConfig.get((IOption) value);
+        } else if (value instanceof Supplier) {
+            //noinspection unchecked
+            return ((Supplier<?>) value).get();
+        } else if (value instanceof Function) {
+            //noinspection unchecked
+            return ((Function<IApplicationConfig, ?>) 
value).apply(applicationConfig);
+        } else {
+            return value;
+        }
+    }
+
+    @Override
+    public Set<Section> getSections(Predicate<Section> predicate) {
+        return 
Arrays.stream(Section.values()).filter(predicate).collect(Collectors.toSet());
+    }
+
+    @Override
+    public Set<Section> getSections() {
+        return getSections(section -> true);
+    }
+
+    public Set<String> getSectionNames() {
+        return Collections.unmodifiableSet(allSections);
+    }
+
+    public Set<String> getOptionNames(String sectionName) {
+        Set<String> optionNames = new HashSet<>();
+        Section section = Section.parseSectionName(sectionName);
+        for (IOption option : getSectionOptionMap(section).values()) {
+            optionNames.add(option.ini());
+        }
+        return optionNames;
+    }
+
+    @Override
+    public Set<IOption> getOptions(Section section) {
+        return 
getSectionOptionMap(section).values().stream().collect(Collectors.toSet());
+    }
+
+    private Map<String, IOption> getSectionOptionMap(Section section) {
+        final Map<String, IOption> map = sectionMap.get(section);
+        return map != null ? map : Collections.emptyMap();
+    }
+
+    public List<String> getNodeNames() {
+        return Collections.unmodifiableList(new 
ArrayList(nodeSpecificMap.keySet()));
+    }
+
+    public IApplicationConfig getNodeEffectiveConfig(String nodeId) {
+        final Map<IOption, Object> nodeMap = 
nodeSpecificMap.computeIfAbsent(nodeId, this::createNodeSpecificMap);
+        Map<IOption, Object> nodeEffectiveMap = getNodeEffectiveMap(nodeId);
+        return new ConfigManagerApplicationConfig(this) {
+            @Override
+            public Object getStatic(IOption option) {
+                if (!nodeEffectiveMap.containsKey(option)) {
+                    // we need to calculate the default the the context of the 
node specific map...
+                    nodeMap.put(option, getOrDefault(nodeEffectiveMap, option, 
nodeId));
+                }
+                return nodeEffectiveMap.get(option);
+            }
+        };
+    }
+
+    private CompositeMap<IOption, Object> getNodeEffectiveMap(String nodeId) {
+        return new CompositeMap<>(nodeSpecificMap.get(nodeId), definedMap, new 
NoOpMapMutator());
+    }
+
+    public Ini toIni(boolean includeDefaults) {
+        Ini ini = new Ini();
+        for (Map.Entry<IOption, Object> entry : (includeDefaults ? 
configurationMap : definedMap).entrySet()) {
+            if (entry.getValue() != null) {
+                final IOption option = entry.getKey();
+                ini.add(option.section().sectionName(), option.ini(), 
option.type().serializeToIni(entry.getValue()));
+            }
+        }
+        for (Map.Entry<String, Map<IOption, Object>> nodeMapEntry : 
nodeSpecificMap.entrySet()) {
+            String section = Section.NC.sectionName() + "/" + 
nodeMapEntry.getKey();
+            for (Map.Entry<IOption, Object> entry : 
nodeMapEntry.getValue().entrySet()) {
+                if (entry.getValue() != null) {
+                    final IOption option = entry.getKey();
+                    ini.add(section, option.ini(), 
option.type().serializeToIni(entry.getValue()));
+                }
+            }
+        }
+        return ini;
+    }
+
+    public void set(IOption option, Object value) {
+        set(null, option, value);
+    }
+
+    public void set(String nodeId, IOption option, Object value) {
+        invokeSetters(option, value, nodeId);
+    }
+
+    public Object get(IOption option) {
+        if (!registeredOptions.contains(option)) {
+            throw new IllegalStateException("Option not registered with 
ConfigManager: " + option.toIniString() + "("
+                    + option.getClass() + "." + option + ")");
+        } else if (option.section() == Section.NC) {
+            LOGGER.warning("NC option " + option.toIniString() + " being 
accessed outside of NC-scoped configuration.");
+        }
+        return getOrDefault(configurationMap, option, null);
+    }
+
+    public Set<IOption> getOptions() {
+        return Collections.unmodifiableSet(registeredOptions);
+    }
+
+    @Override
+    public IApplicationConfig getAppConfig() {
+        return appConfig;
+    }
+
+    public void registerArgsListener(Consumer<List<String>> argListener) {
+        argListeners.add(argListener);
+    }
+
+    String getUsage(IOption option) {
+        final String description = option.description();
+        StringBuilder usage = new StringBuilder();
+        if (description != null && !"".equals(description)) {
+            usage.append(description).append(" ");
+        } else {
+            LOGGER.warning("missing description for option: "
+                    + 
option.getClass().getName().substring(option.getClass().getName().lastIndexOf(".")
 + 1) + "."
+                    + option.name());
+        }
+        usage.append("(default: ");
+        usage.append(defaultTextForUsage(option, IOption::cmdline));
+        usage.append(")");
+        return usage.toString();
+    }
+
+    public String defaultTextForUsage(IOption option, Function<IOption, 
String> optionPrinter) {
+        StringBuilder buf = new StringBuilder();
+        String override = option.usageDefaultOverride(appConfig, 
optionPrinter);
+        if (override != null) {
+            buf.append(override);
+        } else {
+            final Object value = option.defaultValue();
+            if (value instanceof IOption) {
+                buf.append("same as ").append(optionPrinter.apply((IOption) 
value));
+            } else if (value instanceof Function) {
+                // TODO(mblow): defer usage calculation to enable evaluation 
of function
+                buf.append("<function>");
+            } else {
+                
buf.append(option.type().serializeToString(resolveDefault(option, appConfig)));
+            }
+            // TODO(mblow): defer usage calculation to enable inclusion of 
evaluated actual default
+        }
+        return buf.toString();
+    }
+
+    private static class NoOpMapMutator implements 
CompositeMap.MapMutator<IOption, Object> {
+        @Override
+        public Object put(CompositeMap<IOption, Object> compositeMap, 
Map<IOption, Object>[] maps, IOption iOption,
+                Object o) {
+            throw new UnsupportedOperationException("mutations are not 
allowed");
+        }
+
+        @Override
+        public void putAll(CompositeMap<IOption, Object> compositeMap, 
Map<IOption, Object>[] maps,
+                Map<? extends IOption, ?> map) {
+            throw new UnsupportedOperationException("mutations are not 
allowed");
+        }
+
+        @Override
+        public void resolveCollision(CompositeMap<IOption, Object> 
compositeMap, Map<IOption, Object> map,
+                Map<IOption, Object> map1, Collection<IOption> collection) {
+            // no-op
+        }
+    }
+
+    private static class Args4jBean {
+        @Option(name = "-help", help = true)
+        boolean help;
+    }
+}
\ No newline at end of file

Reply via email to