http://git-wip-us.apache.org/repos/asf/asterixdb/blob/4c7b5bfa/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java index c47284c..21b9dcf 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java @@ -20,6 +20,7 @@ package org.apache.hyracks.control.cc; import java.io.File; import java.io.FileReader; +import java.io.IOException; import java.lang.reflect.Constructor; import java.lang.reflect.InvocationTargetException; import java.net.InetAddress; @@ -32,20 +33,22 @@ import java.util.Map; import java.util.Set; import java.util.Timer; import java.util.TimerTask; +import java.util.TreeMap; import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.logging.Level; import java.util.logging.Logger; +import org.apache.commons.lang3.tuple.Pair; import org.apache.hyracks.api.application.ICCApplicationEntryPoint; import org.apache.hyracks.api.client.ClusterControllerInfo; import org.apache.hyracks.api.comm.NetworkAddress; +import org.apache.hyracks.api.config.IApplicationConfig; import org.apache.hyracks.api.context.ICCContext; import org.apache.hyracks.api.deployment.DeploymentId; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.api.exceptions.HyracksException; -import org.apache.hyracks.api.job.resource.DefaultJobCapacityController; import org.apache.hyracks.api.job.resource.IJobCapacityController; import org.apache.hyracks.api.service.IControllerService; import org.apache.hyracks.api.topology.ClusterTopology; @@ -66,9 +69,10 @@ import org.apache.hyracks.control.cc.work.GetThreadDumpWork.ThreadDumpRun; import org.apache.hyracks.control.cc.work.RemoveDeadNodesWork; import org.apache.hyracks.control.cc.work.ShutdownNCServiceWork; import org.apache.hyracks.control.cc.work.TriggerNCWork; +import org.apache.hyracks.control.common.config.ConfigManager; import org.apache.hyracks.control.common.context.ServerContext; import org.apache.hyracks.control.common.controllers.CCConfig; -import org.apache.hyracks.control.common.controllers.IniUtils; +import org.apache.hyracks.control.common.controllers.NCConfig; import org.apache.hyracks.control.common.deployment.DeploymentRun; import org.apache.hyracks.control.common.ipc.CCNCFunctions; import org.apache.hyracks.control.common.logs.LogFile; @@ -77,7 +81,6 @@ import org.apache.hyracks.control.common.work.WorkQueue; import org.apache.hyracks.ipc.api.IIPCI; import org.apache.hyracks.ipc.impl.IPCSystem; import org.apache.hyracks.ipc.impl.JavaSerializationBasedPayloadSerializerDeserializer; -import org.ini4j.Ini; import org.xml.sax.InputSource; public class ClusterControllerService implements IControllerService { @@ -85,6 +88,8 @@ public class ClusterControllerService implements IControllerService { private final CCConfig ccConfig; + private final ConfigManager configManager; + private IPCSystem clusterIPC; private IPCSystem clientIPC; @@ -127,11 +132,22 @@ public class ClusterControllerService implements IControllerService { private ShutdownRun shutdownCallback; - private ICCApplicationEntryPoint aep; + private final ICCApplicationEntryPoint aep; + + public ClusterControllerService(final CCConfig config) throws Exception { + this(config, getApplicationEntryPoint(config)); + } - public ClusterControllerService(final CCConfig ccConfig) throws Exception { - this.ccConfig = ccConfig; - File jobLogFolder = new File(ccConfig.ccRoot, "logs/jobs"); + public ClusterControllerService(final CCConfig config, + final ICCApplicationEntryPoint aep) throws Exception { + this.ccConfig = config; + this.configManager = ccConfig.getConfigManager(); + if (aep == null) { + throw new IllegalArgumentException("ICCApplicationEntryPoint cannot be null"); + } + this.aep = aep; + configManager.processConfig(); + File jobLogFolder = new File(ccConfig.getRootDir(), "logs/jobs"); jobLog = new LogFile(jobLogFolder); // WorkQueue is in charge of heartbeat as well as other events. @@ -140,7 +156,8 @@ public class ClusterControllerService implements IControllerService { final ClusterTopology topology = computeClusterTopology(ccConfig); ccContext = new ClusterControllerContext(topology); sweeper = new DeadNodeSweeper(); - datasetDirectoryService = new DatasetDirectoryService(ccConfig.resultTTL, ccConfig.resultSweepThreshold); + datasetDirectoryService = new DatasetDirectoryService(ccConfig.getResultTTL(), + ccConfig.getResultSweepThreshold()); deploymentRunMap = new HashMap<>(); stateDumpRunMap = new HashMap<>(); @@ -151,10 +168,10 @@ public class ClusterControllerService implements IControllerService { } private static ClusterTopology computeClusterTopology(CCConfig ccConfig) throws Exception { - if (ccConfig.clusterTopologyDefinition == null) { + if (ccConfig.getClusterTopology() == null) { return null; } - FileReader fr = new FileReader(ccConfig.clusterTopologyDefinition); + FileReader fr = new FileReader(ccConfig.getClusterTopology()); InputSource in = new InputSource(fr); try { return TopologyDefinitionParser.parse(in); @@ -166,20 +183,21 @@ public class ClusterControllerService implements IControllerService { @Override public void start() throws Exception { LOGGER.log(Level.INFO, "Starting ClusterControllerService: " + this); - serverCtx = new ServerContext(ServerContext.ServerType.CLUSTER_CONTROLLER, new File(ccConfig.ccRoot)); + serverCtx = new ServerContext(ServerContext.ServerType.CLUSTER_CONTROLLER, new File(ccConfig.getRootDir())); IIPCI ccIPCI = new ClusterControllerIPCI(this); - clusterIPC = new IPCSystem(new InetSocketAddress(ccConfig.clusterNetPort), ccIPCI, + clusterIPC = new IPCSystem(new InetSocketAddress(ccConfig.getClusterListenPort()), ccIPCI, new CCNCFunctions.SerializerDeserializer()); IIPCI ciIPCI = new ClientInterfaceIPCI(this); - clientIPC = new IPCSystem(new InetSocketAddress(ccConfig.clientNetIpAddress, ccConfig.clientNetPort), ciIPCI, - new JavaSerializationBasedPayloadSerializerDeserializer()); - webServer = new WebServer(this, ccConfig.httpPort); + clientIPC = new IPCSystem( + new InetSocketAddress(ccConfig.getClientListenAddress(), ccConfig.getClientListenPort()), + ciIPCI, new JavaSerializationBasedPayloadSerializerDeserializer()); + webServer = new WebServer(this, ccConfig.getConsoleListenPort()); clusterIPC.start(); clientIPC.start(); webServer.start(); - info = new ClusterControllerInfo(ccConfig.clientNetIpAddress, ccConfig.clientNetPort, + info = new ClusterControllerInfo(ccConfig.getClientListenAddress(), ccConfig.getClientListenPort(), webServer.getListeningPort()); - timer.schedule(sweeper, 0, ccConfig.heartbeatPeriod); + timer.schedule(sweeper, 0, ccConfig.getHeartbeatPeriod()); jobLog.open(); startApplication(); @@ -194,84 +212,62 @@ public class ClusterControllerService implements IControllerService { appCtx = new CCApplicationContext(this, serverCtx, ccContext, ccConfig.getAppConfig()); appCtx.addJobLifecycleListener(datasetDirectoryService); executor = Executors.newCachedThreadPool(appCtx.getThreadFactory()); - String className = ccConfig.appCCMainClass; - - IJobCapacityController jobCapacityController = DefaultJobCapacityController.INSTANCE; - if (className != null) { - Class<?> c = Class.forName(className); - aep = (ICCApplicationEntryPoint) c.newInstance(); - String[] args = ccConfig.appArgs == null ? null - : ccConfig.appArgs.toArray(new String[ccConfig.appArgs.size()]); - aep.start(appCtx, args); - jobCapacityController = aep.getJobCapacityController(); - } + aep.start(appCtx, ccConfig.getAppArgsArray()); + IJobCapacityController jobCapacityController = aep.getJobCapacityController(); // Job manager is in charge of job lifecycle management. try { Constructor<?> jobManagerConstructor = this.getClass().getClassLoader() - .loadClass(ccConfig.jobManagerClassName) + .loadClass(ccConfig.getJobManagerClass()) .getConstructor(CCConfig.class, ClusterControllerService.class, IJobCapacityController.class); jobManager = (IJobManager) jobManagerConstructor.newInstance(ccConfig, this, jobCapacityController); } catch (ClassNotFoundException | InstantiationException | IllegalAccessException | NoSuchMethodException | InvocationTargetException e) { if (LOGGER.isLoggable(Level.WARNING)) { - LOGGER.log(Level.WARNING, "class " + ccConfig.jobManagerClassName + " could not be used: ", e); + LOGGER.log(Level.WARNING, "class " + ccConfig.getJobManagerClass() + " could not be used: ", e); } // Falls back to the default implementation if the user-provided class name is not valid. jobManager = new JobManager(ccConfig, this, jobCapacityController); } } - private void connectNCs() throws Exception { - Ini ini = ccConfig.getIni(); - if (ini == null || Boolean.parseBoolean(ini.get("cc", "virtual.cluster"))) { - return; - } - for (String section : ini.keySet()) { - if (!section.startsWith("nc/")) { - continue; - } - String ncid = section.substring(3); - String address = IniUtils.getString(ini, section, "address", null); - int port = IniUtils.getInt(ini, section, "port", 9090); - if (address == null) { - address = InetAddress.getLoopbackAddress().getHostAddress(); + private Map<String, Pair<String, Integer>> getNCServices() throws IOException { + Map<String, Pair<String, Integer>> ncMap = new TreeMap<>(); + for (String ncId : configManager.getNodeNames()) { + IApplicationConfig ncConfig = configManager.getNodeEffectiveConfig(ncId); + if (!ncConfig.getBoolean(NCConfig.Option.VIRTUAL_NC)) { + ncMap.put(ncId, Pair.of(ncConfig.getString(NCConfig.Option.NCSERVICE_ADDRESS), + ncConfig.getInt(NCConfig.Option.NCSERVICE_PORT))); } - workQueue.schedule(new TriggerNCWork(this, address, port, ncid)); } + return ncMap; + } + + private void connectNCs() throws IOException { + getNCServices().entrySet().forEach(ncService -> { + final TriggerNCWork triggerWork = new TriggerNCWork(ClusterControllerService.this, + ncService.getValue().getLeft(), ncService.getValue().getRight(), ncService.getKey()); + workQueue.schedule(triggerWork); + }); } private void terminateNCServices() throws Exception { - Ini ini = ccConfig.getIni(); - if (ini == null || Boolean.parseBoolean(ini.get("cc", "virtual.cluster"))) { - return; - } List<ShutdownNCServiceWork> shutdownNCServiceWorks = new ArrayList<>(); - for (String section : ini.keySet()) { - if (!section.startsWith("nc/")) { - continue; - } - String ncid = section.substring(3); - String address = IniUtils.getString(ini, section, "address", null); - int port = IniUtils.getInt(ini, section, "port", 9090); - if (address == null) { - address = InetAddress.getLoopbackAddress().getHostAddress(); - } - ShutdownNCServiceWork shutdownWork = new ShutdownNCServiceWork(address, port, ncid); + getNCServices().entrySet().forEach(ncService -> { + ShutdownNCServiceWork shutdownWork = new ShutdownNCServiceWork(ncService.getValue().getLeft(), + ncService.getValue().getRight(), ncService.getKey()); workQueue.schedule(shutdownWork); shutdownNCServiceWorks.add(shutdownWork); - } + }); for (ShutdownNCServiceWork shutdownWork : shutdownNCServiceWorks) { shutdownWork.sync(); } } private void notifyApplication() throws Exception { - if (aep != null) { - // Sometimes, there is no application entry point. Check hyracks-client project - aep.startupCompleted(); - } + aep.startupCompleted(); } + public void stop(boolean terminateNCService) throws Exception { if (terminateNCService) { terminateNCServices(); @@ -294,9 +290,7 @@ public class ClusterControllerService implements IControllerService { } private void stopApplication() throws Exception { - if (aep != null) { - aep.stop(); - } + aep.stop(); } public ServerContext getServerContext() { @@ -360,7 +354,7 @@ public class ClusterControllerService implements IControllerService { } public NetworkAddress getDatasetDirectoryServiceInfo() { - return new NetworkAddress(ccConfig.clientNetIpAddress, ccConfig.clientNetPort); + return new NetworkAddress(ccConfig.getClientListenAddress(), ccConfig.getClientListenPort()); } private final class ClusterControllerContext implements ICCContext { @@ -390,6 +384,7 @@ public class ClusterControllerService implements IControllerService { public ClusterTopology getClusterTopology() { return topology; } + } private class DeadNodeSweeper extends TimerTask { @@ -458,4 +453,14 @@ public class ClusterControllerService implements IControllerService { public ThreadDumpRun removeThreadDumpRun(String requestKey) { return threadDumpRunMap.remove(requestKey); } + + private static ICCApplicationEntryPoint getApplicationEntryPoint(CCConfig ccConfig) + throws ClassNotFoundException, IllegalAccessException, InstantiationException { + if (ccConfig.getAppClass() != null) { + Class<?> c = Class.forName(ccConfig.getAppClass()); + return (ICCApplicationEntryPoint) c.newInstance(); + } else { + return CCApplicationEntryPoint.INSTANCE; + } + } }
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/4c7b5bfa/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/NodeControllerState.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/NodeControllerState.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/NodeControllerState.java index 955b7f2..8400a59 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/NodeControllerState.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/NodeControllerState.java @@ -282,7 +282,7 @@ public class NodeControllerState { public synchronized ObjectNode toSummaryJSON() { ObjectMapper om = new ObjectMapper(); ObjectNode o = om.createObjectNode(); - o.put("node-id", ncConfig.nodeId); + o.put("node-id", ncConfig.getNodeId()); o.put("heap-used", heapUsedSize[(rrdPtr + RRD_SIZE - 1) % RRD_SIZE]); o.put("system-load-average", systemLoadAverage[(rrdPtr + RRD_SIZE - 1) % RRD_SIZE]); @@ -293,7 +293,7 @@ public class NodeControllerState { ObjectMapper om = new ObjectMapper(); ObjectNode o = om.createObjectNode(); - o.put("node-id", ncConfig.nodeId); + o.put("node-id", ncConfig.getNodeId()); if (includeConfig) { o.put("os-name", osName); http://git-wip-us.apache.org/repos/asf/asterixdb/blob/4c7b5bfa/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/application/CCApplicationContext.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/application/CCApplicationContext.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/application/CCApplicationContext.java index 77b9b17..a8b03bc 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/application/CCApplicationContext.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/application/CCApplicationContext.java @@ -27,9 +27,10 @@ import java.util.List; import java.util.Map; import java.util.Set; -import org.apache.hyracks.api.application.IApplicationConfig; import org.apache.hyracks.api.application.ICCApplicationContext; import org.apache.hyracks.api.application.IClusterLifecycleListener; +import org.apache.hyracks.api.config.IApplicationConfig; +import org.apache.hyracks.api.config.IOption; import org.apache.hyracks.api.context.ICCContext; import org.apache.hyracks.api.exceptions.HyracksException; import org.apache.hyracks.api.job.IJobLifecycleListener; @@ -105,7 +106,7 @@ public class CCApplicationContext extends ApplicationContext implements ICCAppli clusterLifecycleListeners.add(clusterLifecycleListener); } - public void notifyNodeJoin(String nodeId, Map<String, String> ncConfiguration) throws HyracksException { + public void notifyNodeJoin(String nodeId, Map<IOption, Object> ncConfiguration) throws HyracksException { for (IClusterLifecycleListener l : clusterLifecycleListeners) { l.notifyNodeJoin(nodeId, ncConfiguration); } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/4c7b5bfa/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/cluster/NodeManager.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/cluster/NodeManager.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/cluster/NodeManager.java index 354019c..d6d8bc4 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/cluster/NodeManager.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/cluster/NodeManager.java @@ -137,7 +137,7 @@ public class NodeManager implements INodeManager { Map.Entry<String, NodeControllerState> entry = nodeIterator.next(); String nodeId = entry.getKey(); NodeControllerState state = entry.getValue(); - if (state.incrementLastHeartbeatDuration() >= ccConfig.maxHeartbeatLapsePeriods) { + if (state.incrementLastHeartbeatDuration() >= ccConfig.getHeartbeatMaxMisses()) { deadNodes.add(nodeId); affectedJobIds.addAll(state.getActiveJobIds()); // Removes the node from node map. @@ -172,10 +172,7 @@ public class NodeManager implements INodeManager { // Retrieves the IP address for a given node. private InetAddress getIpAddress(NodeControllerState ncState) throws HyracksException { - String ipAddress = ncState.getNCConfig().dataIPAddress; - if (ncState.getNCConfig().dataPublicIPAddress != null) { - ipAddress = ncState.getNCConfig().dataPublicIPAddress; - } + String ipAddress = ncState.getNCConfig().getDataPublicAddress(); try { return InetAddress.getByName(ipAddress); } catch (UnknownHostException e) { http://git-wip-us.apache.org/repos/asf/asterixdb/blob/4c7b5bfa/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java index 031303b..b35de3d 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java @@ -68,13 +68,13 @@ public class JobManager implements IJobManager { this.ccs = ccs; this.jobCapacityController = jobCapacityController; try { - Constructor<?> jobQueueConstructor = this.getClass().getClassLoader().loadClass(ccConfig.jobQueueClassName) + Constructor<?> jobQueueConstructor = this.getClass().getClassLoader().loadClass(ccConfig.getJobQueueClass()) .getConstructor(IJobManager.class, IJobCapacityController.class); jobQueue = (IJobQueue) jobQueueConstructor.newInstance(this, this.jobCapacityController); } catch (ClassNotFoundException | InstantiationException | IllegalAccessException | NoSuchMethodException | InvocationTargetException e) { if (LOGGER.isLoggable(Level.WARNING)) { - LOGGER.log(Level.WARNING, "class " + ccConfig.jobQueueClassName + " could not be used: ", e); + LOGGER.log(Level.WARNING, "class " + ccConfig.getJobQueueClass() + " could not be used: ", e); } // Falls back to the default implementation if the user-provided class name is not valid. jobQueue = new FIFOJobQueue(this, jobCapacityController); @@ -85,13 +85,13 @@ public class JobManager implements IJobManager { @Override protected boolean removeEldestEntry(Map.Entry<JobId, JobRun> eldest) { - return size() > ccConfig.jobHistorySize; + return size() > ccConfig.getJobHistorySize(); } }; runMapHistory = new LinkedHashMap<JobId, List<Exception>>() { private static final long serialVersionUID = 1L; /** history size + 1 is for the case when history size = 0 */ - private int allowedSize = 100 * (ccConfig.jobHistorySize + 1); + private int allowedSize = 100 * (ccConfig.getJobHistorySize() + 1); @Override protected boolean removeEldestEntry(Map.Entry<JobId, List<Exception>> eldest) { http://git-wip-us.apache.org/repos/asf/asterixdb/blob/4c7b5bfa/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/GetNodeDetailsJSONWork.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/GetNodeDetailsJSONWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/GetNodeDetailsJSONWork.java index 0577002..3dec959 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/GetNodeDetailsJSONWork.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/GetNodeDetailsJSONWork.java @@ -27,28 +27,26 @@ import java.lang.management.MemoryUsage; import java.lang.management.OperatingSystemMXBean; import java.lang.management.RuntimeMXBean; import java.lang.management.ThreadMXBean; -import java.lang.reflect.Field; import java.util.ArrayList; import java.util.Date; -import java.util.HashMap; import java.util.List; -import java.util.Map; -import java.util.logging.Level; -import java.util.logging.Logger; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ObjectNode; +import org.apache.hyracks.api.config.Section; import org.apache.hyracks.control.cc.NodeControllerState; import org.apache.hyracks.control.cc.cluster.INodeManager; +import org.apache.hyracks.control.common.config.ConfigUtils; import org.apache.hyracks.control.common.controllers.CCConfig; +import org.apache.hyracks.control.common.controllers.NCConfig; import org.apache.hyracks.control.common.utils.PidHelper; import org.apache.hyracks.control.common.work.IPCResponder; import org.apache.hyracks.control.common.work.SynchronizableWork; -import org.kohsuke.args4j.Option; - -import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.databind.node.ObjectNode; public class GetNodeDetailsJSONWork extends SynchronizableWork { - private static final Logger LOGGER = Logger.getLogger(GetNodeDetailsJSONWork.class.getName()); + private static final Section [] CC_SECTIONS = { Section.CC, Section.COMMON }; + private static final Section [] NC_SECTIONS = { Section.NC, Section.COMMON }; + private final INodeManager nodeManager; private final CCConfig ccConfig; private final String nodeId; @@ -59,7 +57,7 @@ public class GetNodeDetailsJSONWork extends SynchronizableWork { private ObjectMapper om = new ObjectMapper(); public GetNodeDetailsJSONWork(INodeManager nodeManager, CCConfig ccConfig, String nodeId, boolean includeStats, - boolean includeConfig, IPCResponder<String> callback) { + boolean includeConfig, IPCResponder<String> callback) { this.nodeManager = nodeManager; this.ccConfig = ccConfig; this.nodeId = nodeId; @@ -69,7 +67,7 @@ public class GetNodeDetailsJSONWork extends SynchronizableWork { } public GetNodeDetailsJSONWork(INodeManager nodeManager, CCConfig ccConfig, String nodeId, boolean includeStats, - boolean includeConfig) { + boolean includeConfig) { this(nodeManager, ccConfig, nodeId, includeStats, includeConfig, null); } @@ -79,14 +77,18 @@ public class GetNodeDetailsJSONWork extends SynchronizableWork { // null nodeId is a request for CC detail = getCCDetails(); if (includeConfig) { - addIni(detail, ccConfig); + ConfigUtils.addConfigToJSON(detail, ccConfig.getAppConfig(), CC_SECTIONS); + detail.putPOJO("app.args", ccConfig.getAppArgs()); } } else { NodeControllerState ncs = nodeManager.getNodeControllerState(nodeId); if (ncs != null) { detail = ncs.toDetailedJSON(includeStats, includeConfig); if (includeConfig) { - addIni(detail, ncs.getNCConfig()); + final NCConfig ncConfig = ncs.getNCConfig(); + ConfigUtils.addConfigToJSON(detail, ncConfig.getConfigManager().getNodeEffectiveConfig(nodeId), + NC_SECTIONS); + detail.putPOJO("app.args", ncConfig.getAppArgs()); } } } @@ -96,7 +98,7 @@ public class GetNodeDetailsJSONWork extends SynchronizableWork { } } - private ObjectNode getCCDetails() { + private ObjectNode getCCDetails() { ObjectNode o = om.createObjectNode(); MemoryMXBean memoryMXBean = ManagementFactory.getMemoryMXBean(); List<GarbageCollectorMXBean> gcMXBeans = ManagementFactory.getGarbageCollectorMXBeans(); @@ -151,33 +153,6 @@ public class GetNodeDetailsJSONWork extends SynchronizableWork { return o; } - private static void addIni(ObjectNode o, Object configBean) { - Map<String, Object> iniMap = new HashMap<>(); - for (Field f : configBean.getClass().getFields()) { - Option option = f.getAnnotation(Option.class); - if (option == null) { - continue; - } - final String optionName = option.name(); - Object value = null; - try { - value = f.get(configBean); - } catch (IllegalAccessException e) { - LOGGER.log(Level.WARNING, "Unable to access ini option " + optionName, e); - } - if (value != null) { - if ("--".equals(optionName)) { - iniMap.put("app_args", value); - } else { - iniMap.put(optionName.substring(1).replace('-', '_'), - "-iodevices".equals(optionName) - ? String.valueOf(value).split(",") - : value); - } - } - } - o.putPOJO("ini", iniMap); - } public ObjectNode getDetail() { return detail; http://git-wip-us.apache.org/repos/asf/asterixdb/blob/4c7b5bfa/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RegisterNodeWork.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RegisterNodeWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RegisterNodeWork.java index dc93515..e97950e 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RegisterNodeWork.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RegisterNodeWork.java @@ -23,6 +23,8 @@ import java.util.Map; import java.util.logging.Level; import java.util.logging.Logger; +import org.apache.hyracks.api.config.IApplicationConfig; +import org.apache.hyracks.api.config.IOption; import org.apache.hyracks.control.cc.ClusterControllerService; import org.apache.hyracks.control.cc.NodeControllerState; import org.apache.hyracks.control.cc.cluster.INodeManager; @@ -50,19 +52,22 @@ public class RegisterNodeWork extends SynchronizableWork { String id = reg.getNodeId(); IIPCHandle ncIPCHandle = ccs.getClusterIPC().getHandle(reg.getNodeControllerAddress()); CCNCFunctions.NodeRegistrationResult result; - Map<String, String> ncConfiguration = new HashMap<>(); + Map<IOption, Object> ncConfiguration = new HashMap<>(); try { INodeController nodeController = new NodeControllerRemoteProxy(ncIPCHandle); NodeControllerState state = new NodeControllerState(nodeController, reg); INodeManager nodeManager = ccs.getNodeManager(); nodeManager.addNode(id, state); - state.getNCConfig().toMap(ncConfiguration); + IApplicationConfig cfg = state.getNCConfig().getConfigManager().getNodeEffectiveConfig(id); + for (IOption option : cfg.getOptions()) { + ncConfiguration.put(option, cfg.get(option)); + } LOGGER.log(Level.INFO, "Registered INodeController: id = " + id); NodeParameters params = new NodeParameters(); params.setClusterControllerInfo(ccs.getClusterControllerInfo()); params.setDistributedState(ccs.getApplicationContext().getDistributedState()); - params.setHeartbeatPeriod(ccs.getCCConfig().heartbeatPeriod); - params.setProfileDumpPeriod(ccs.getCCConfig().profileDumpPeriod); + params.setHeartbeatPeriod(ccs.getCCConfig().getHeartbeatPeriod()); + params.setProfileDumpPeriod(ccs.getCCConfig().getProfileDumpPeriod()); result = new CCNCFunctions.NodeRegistrationResult(params, null); } catch (Exception e) { result = new CCNCFunctions.NodeRegistrationResult(null, e); http://git-wip-us.apache.org/repos/asf/asterixdb/blob/4c7b5bfa/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/TriggerNCWork.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/TriggerNCWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/TriggerNCWork.java index a7bca25..ab526e8 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/TriggerNCWork.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/TriggerNCWork.java @@ -27,7 +27,9 @@ import java.net.Socket; import java.util.logging.Level; import java.util.logging.Logger; +import org.apache.hyracks.api.config.Section; import org.apache.hyracks.control.cc.ClusterControllerService; +import org.apache.hyracks.control.common.controllers.NCConfig; import org.apache.hyracks.control.common.controllers.ServiceConstants.ServiceCommand; import org.apache.hyracks.control.common.work.AbstractWork; import org.ini4j.Ini; @@ -79,14 +81,18 @@ public class TriggerNCWork extends AbstractWork { /** * Given an Ini object, serialize it to String with some enhancements. - * @param ccini + * @param ccini the ini file to decorate and forward to NC */ - String serializeIni(Ini ccini) throws IOException { + private String serializeIni(Ini ccini) throws IOException { StringWriter iniString = new StringWriter(); - ccini.store(iniString); + ccini.get(Section.NC.sectionName()).putIfAbsent(NCConfig.Option.CLUSTER_ADDRESS.ini(), + ccs.getCCConfig().getClusterPublicAddress()); + ccini.get(Section.NC.sectionName()).putIfAbsent(NCConfig.Option.CLUSTER_PORT.ini(), + String.valueOf(ccs.getCCConfig().getClusterPublicPort())); // Finally insert *this* NC's name into localnc section - this is a fixed // entry point so that NCs can determine where all their config is. - iniString.append("\n[localnc]\nid=").append(ncId).append("\n"); + ccini.put(Section.LOCALNC.sectionName(), NCConfig.Option.NODE_ID.ini(), ncId); + ccini.store(iniString); if (LOGGER.isLoggable(Level.FINE)) { LOGGER.fine("Returning Ini file:\n" + iniString.toString()); } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/4c7b5bfa/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/test/java/org/apache/hyracks/control/cc/cluster/NodeManagerTest.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/test/java/org/apache/hyracks/control/cc/cluster/NodeManagerTest.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/test/java/org/apache/hyracks/control/cc/cluster/NodeManagerTest.java index c742a4a..dde3bad 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/test/java/org/apache/hyracks/control/cc/cluster/NodeManagerTest.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/test/java/org/apache/hyracks/control/cc/cluster/NodeManagerTest.java @@ -46,8 +46,8 @@ public class NodeManagerTest { public void testNormal() throws HyracksException { IResourceManager resourceManager = new ResourceManager(); INodeManager nodeManager = new NodeManager(makeCCConfig(), resourceManager); - NodeControllerState ncState1 = mockNodeControllerState(false); - NodeControllerState ncState2 = mockNodeControllerState(false); + NodeControllerState ncState1 = mockNodeControllerState(NODE1, false); + NodeControllerState ncState2 = mockNodeControllerState(NODE2, false); // Verifies states after adding nodes. nodeManager.addNode(NODE1, ncState1); @@ -71,7 +71,7 @@ public class NodeManagerTest { public void testException() throws HyracksException { IResourceManager resourceManager = new ResourceManager(); INodeManager nodeManager = new NodeManager(makeCCConfig(), resourceManager); - NodeControllerState ncState1 = mockNodeControllerState(true); + NodeControllerState ncState1 = mockNodeControllerState(NODE1, true); boolean invalidNetworkAddress = false; // Verifies states after a failure during adding nodes. @@ -106,11 +106,11 @@ public class NodeManagerTest { private CCConfig makeCCConfig() { CCConfig ccConfig = new CCConfig(); - ccConfig.maxHeartbeatLapsePeriods = 0; + ccConfig.setHeartbeatMaxMisses(0); return ccConfig; } - private NodeControllerState mockNodeControllerState(boolean invalidIpAddr) { + private NodeControllerState mockNodeControllerState(String nodeId, boolean invalidIpAddr) { NodeControllerState ncState = mock(NodeControllerState.class); String ipAddr = invalidIpAddr ? "255.255.255:255" : "127.0.0.2"; NetworkAddress dataAddr = new NetworkAddress(ipAddr, 1001); @@ -120,8 +120,8 @@ public class NodeManagerTest { when(ncState.getDataPort()).thenReturn(dataAddr); when(ncState.getDatasetPort()).thenReturn(resultAddr); when(ncState.getMessagingPort()).thenReturn(msgAddr); - NCConfig ncConfig = new NCConfig(); - ncConfig.dataIPAddress = ipAddr; + NCConfig ncConfig = new NCConfig(nodeId); + ncConfig.setDataPublicAddress(ipAddr); when(ncState.getNCConfig()).thenReturn(ncConfig); return ncState; } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/4c7b5bfa/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/test/java/org/apache/hyracks/control/cc/job/JobManagerTest.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/test/java/org/apache/hyracks/control/cc/job/JobManagerTest.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/test/java/org/apache/hyracks/control/cc/job/JobManagerTest.java index 3bb08bd..97b05e7 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/test/java/org/apache/hyracks/control/cc/job/JobManagerTest.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/test/java/org/apache/hyracks/control/cc/job/JobManagerTest.java @@ -27,6 +27,7 @@ import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +import java.io.IOException; import java.util.ArrayList; import java.util.Collections; import java.util.HashSet; @@ -48,14 +49,23 @@ import org.apache.hyracks.control.common.base.INodeController; import org.apache.hyracks.control.common.controllers.CCConfig; import org.apache.hyracks.control.common.logs.LogFile; import org.junit.Assert; +import org.junit.Before; import org.junit.Test; +import org.kohsuke.args4j.CmdLineException; import org.mockito.Mockito; public class JobManagerTest { + private CCConfig ccConfig; + + @Before + public void setup() throws IOException, CmdLineException { + ccConfig = new CCConfig(); + ccConfig.getConfigManager().processConfig(); + } + @Test - public void test() throws HyracksException { - CCConfig ccConfig = new CCConfig(); + public void test() throws IOException, CmdLineException { IJobCapacityController jobCapacityController = mock(IJobCapacityController.class); IJobManager jobManager = spy(new JobManager(ccConfig, mockClusterControllerService(), jobCapacityController)); @@ -114,7 +124,7 @@ public class JobManagerTest { } Assert.assertTrue(jobManager.getRunningJobs().size() == 4096); Assert.assertTrue(jobManager.getPendingJobs().isEmpty()); - Assert.assertTrue(jobManager.getArchivedJobs().size() == ccConfig.jobHistorySize); + Assert.assertTrue(jobManager.getArchivedJobs().size() == ccConfig.getJobHistorySize()); // Completes deferred jobs. for (JobRun run : deferredRuns) { @@ -123,14 +133,13 @@ public class JobManagerTest { } Assert.assertTrue(jobManager.getRunningJobs().isEmpty()); Assert.assertTrue(jobManager.getPendingJobs().isEmpty()); - Assert.assertTrue(jobManager.getArchivedJobs().size() == ccConfig.jobHistorySize); + Assert.assertTrue(jobManager.getArchivedJobs().size() == ccConfig.getJobHistorySize()); verify(jobManager, times(8192)).prepareComplete(any(), any(), any()); verify(jobManager, times(8192)).finalComplete(any()); } @Test public void testExceedMax() throws HyracksException { - CCConfig ccConfig = new CCConfig(); IJobCapacityController jobCapacityController = mock(IJobCapacityController.class); IJobManager jobManager = spy(new JobManager(ccConfig, mockClusterControllerService(), jobCapacityController)); boolean rejected = false; @@ -154,7 +163,6 @@ public class JobManagerTest { @Test public void testAdmitThenReject() throws HyracksException { - CCConfig ccConfig = new CCConfig(); IJobCapacityController jobCapacityController = mock(IJobCapacityController.class); IJobManager jobManager = spy(new JobManager(ccConfig, mockClusterControllerService(), jobCapacityController)); @@ -185,7 +193,6 @@ public class JobManagerTest { @Test public void testNullJob() throws HyracksException { - CCConfig ccConfig = new CCConfig(); IJobCapacityController jobCapacityController = mock(IJobCapacityController.class); IJobManager jobManager = new JobManager(ccConfig, mockClusterControllerService(), jobCapacityController); boolean invalidParameter = false; @@ -249,7 +256,7 @@ public class JobManagerTest { } Assert.assertTrue(jobManager.getPendingJobs().isEmpty()); - Assert.assertTrue(jobManager.getArchivedJobs().size() == ccConfig.jobHistorySize); + Assert.assertTrue(jobManager.getArchivedJobs().size() == ccConfig.getJobHistorySize()); verify(jobManager, times(0)).prepareComplete(any(), any(), any()); verify(jobManager, times(0)).finalComplete(any()); } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/4c7b5bfa/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/pom.xml ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/pom.xml b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/pom.xml index 08783cc..bd6960a 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/pom.xml +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/pom.xml @@ -48,7 +48,6 @@ <dependency> <groupId>args4j</groupId> <artifactId>args4j</artifactId> - <version>2.0.12</version> </dependency> <dependency> <groupId>org.apache.hyracks</groupId> @@ -67,5 +66,14 @@ <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-databind</artifactId> </dependency> + <dependency> + <groupId>org.apache.hyracks</groupId> + <artifactId>hyracks-util</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>org.apache.commons</groupId> + <artifactId>commons-collections4</artifactId> + </dependency> </dependencies> </project> http://git-wip-us.apache.org/repos/asf/asterixdb/blob/4c7b5bfa/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/application/ApplicationContext.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/application/ApplicationContext.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/application/ApplicationContext.java index 06bcda3..42bc636 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/application/ApplicationContext.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/application/ApplicationContext.java @@ -21,7 +21,7 @@ package org.apache.hyracks.control.common.application; import java.io.Serializable; import java.util.concurrent.ThreadFactory; -import org.apache.hyracks.api.application.IApplicationConfig; +import org.apache.hyracks.api.config.IApplicationConfig; import org.apache.hyracks.api.application.IApplicationContext; import org.apache.hyracks.api.job.IJobSerializerDeserializerContainer; import org.apache.hyracks.api.job.JobSerializerDeserializerContainer; http://git-wip-us.apache.org/repos/asf/asterixdb/blob/4c7b5bfa/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/application/ConfigManagerApplicationConfig.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/application/ConfigManagerApplicationConfig.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/application/ConfigManagerApplicationConfig.java new file mode 100644 index 0000000..92e90e7 --- /dev/null +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/application/ConfigManagerApplicationConfig.java @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hyracks.control.common.application; + +import java.io.Serializable; +import java.util.List; +import java.util.Set; +import java.util.function.Predicate; + +import org.apache.hyracks.api.config.IApplicationConfig; +import org.apache.hyracks.api.config.IOption; +import org.apache.hyracks.api.config.Section; +import org.apache.hyracks.control.common.config.ConfigManager; + +/** + * An implementation of IApplicationConfig which is backed by the Config Manager. + */ +public class ConfigManagerApplicationConfig implements IApplicationConfig, Serializable { + private static final long serialVersionUID = 1L; + + private final ConfigManager configManager; + + public ConfigManagerApplicationConfig(ConfigManager configManager) { + this.configManager = configManager; + } + + @Override + public String getString(String section, String key) { + return (String)get(section, key); + } + + @Override + public int getInt(String section, String key) { + return (int)get(section, key); + } + + @Override + public long getLong(String section, String key) { + return (long)get(section, key); + } + + @Override + public Set<String> getSectionNames() { + return configManager.getSectionNames(); + } + + @Override + public Set<Section> getSections() { + return configManager.getSections(); + } + + @Override + public Set<Section> getSections(Predicate<Section> predicate) { + return configManager.getSections(predicate); + } + + @Override + public Set<String> getKeys(String section) { + return configManager.getOptionNames(section); + } + + private Object get(String section, String key) { + return get(configManager.lookupOption(section, key)); + } + + @Override + public Object getStatic(IOption option) { + return configManager.get(option); + } + + @Override + public List<String> getNCNames() { + return configManager.getNodeNames(); + } + + @Override + public IOption lookupOption(String sectionName, String propertyName) { + return configManager.lookupOption(sectionName, propertyName); + } + + @Override + public Set<IOption> getOptions() { + return configManager.getOptions(); + } + + @Override + public Set<IOption> getOptions(Section section) { + return configManager.getOptions(section); + } + + @Override + public IApplicationConfig getNCEffectiveConfig(String nodeId) { + return configManager.getNodeEffectiveConfig(nodeId); + } + + public ConfigManager getConfigManager() { + return configManager; + } +} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/4c7b5bfa/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/application/IniApplicationConfig.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/application/IniApplicationConfig.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/application/IniApplicationConfig.java deleted file mode 100644 index 53db11c..0000000 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/application/IniApplicationConfig.java +++ /dev/null @@ -1,105 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.hyracks.control.common.application; - -import java.util.ArrayList; -import java.util.List; -import java.util.Map; -import java.util.Set; - -import org.apache.hyracks.api.application.IApplicationConfig; -import org.apache.hyracks.control.common.controllers.IniUtils; -import org.ini4j.Ini; -import org.ini4j.Profile.Section; - -/** - * An implementation of IApplicationConfig which is backed by Ini4j. - */ -public class IniApplicationConfig implements IApplicationConfig { - private final Ini ini; - - public IniApplicationConfig(Ini ini) { - if (ini != null) { - this.ini = ini; - } else { - this.ini = new Ini(); - } - } - - @Override - public String getString(String section, String key) { - return IniUtils.getString(ini, section, key, null); - } - - @Override - public String getString(String section, String key, String defaultValue) { - return IniUtils.getString(ini, section, key, defaultValue); - } - - @Override - public String[] getStringArray(String section, String key) { - return IniUtils.getStringArray(ini, section, key); - } - - @Override - public int getInt(String section, String key) { - return IniUtils.getInt(ini, section, key, 0); - } - - @Override - public int getInt(String section, String key, int defaultValue) { - return IniUtils.getInt(ini, section, key, defaultValue); - } - - @Override - public long getLong(String section, String key) { - return IniUtils.getLong(ini, section, key, 0); - } - - @Override - public long getLong(String section, String key, long defaultValue) { - return IniUtils.getLong(ini, section, key, defaultValue); - } - - @Override - public Set<String> getSections() { - return ini.keySet(); - } - - @Override - public Set<String> getKeys(String section) { - return ini.get(section).keySet(); - } - - @Override - public List<Set<Map.Entry<String, String>>> getMultiSections(String section) { - List<Set<Map.Entry<String, String>>> list = new ArrayList<>(); - List<Section> secs = getMulti(section); - if (secs != null) { - for (Section sec : secs) { - list.add(sec.entrySet()); - } - } - return list; - } - - private List<Section> getMulti(String section) { - return ini.getAll(section); - } -} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/4c7b5bfa/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/config/Args4jArgument.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/config/Args4jArgument.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/config/Args4jArgument.java new file mode 100644 index 0000000..de9b543 --- /dev/null +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/config/Args4jArgument.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hyracks.control.common.config; + +import java.lang.annotation.Annotation; + +import org.kohsuke.args4j.Argument; +import org.kohsuke.args4j.spi.OptionHandler; +import org.kohsuke.args4j.spi.StringOptionHandler; + +@SuppressWarnings("ClassExplicitlyAnnotation") +public class Args4jArgument implements Argument { + @Override + public String usage() { + return ""; + } + + @Override + public String metaVar() { + return ""; + } + + @Override + public boolean required() { + return false; + } + + @Override + public boolean hidden() { + return false; + } + + @Override + public Class<? extends OptionHandler> handler() { + return StringOptionHandler.class; + } + + @Override + public int index() { + return 0; + } + + @Override + public boolean multiValued() { + return true; + } + + @Override + public Class<? extends Annotation> annotationType() { + return Argument.class; + } +} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/4c7b5bfa/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/config/Args4jOption.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/config/Args4jOption.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/config/Args4jOption.java new file mode 100644 index 0000000..c904d0b --- /dev/null +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/config/Args4jOption.java @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hyracks.control.common.config; + +import java.lang.annotation.Annotation; + +import org.apache.hyracks.api.config.IOption; +import org.kohsuke.args4j.Option; +import org.kohsuke.args4j.spi.ExplicitBooleanOptionHandler; +import org.kohsuke.args4j.spi.IntOptionHandler; +import org.kohsuke.args4j.spi.OptionHandler; +import org.kohsuke.args4j.spi.StringOptionHandler; + +@SuppressWarnings("ClassExplicitlyAnnotation") +class Args4jOption implements Option { + private final IOption option; + private final ConfigManager configManager; + private final Class targetType; + + Args4jOption(IOption option, ConfigManager configManager, Class targetType) { + this.option = option; + this.targetType = targetType; + this.configManager = configManager; + } + + @Override + public String name() { + return option.cmdline(); + } + + @Override + public String[] aliases() { + return new String[0]; + } + + @Override + public String usage() { + return configManager.getUsage(option); + } + + @Override + public String metaVar() { + return ""; + } + + @Override + public boolean required() { + return false; + } + + @Override + public boolean help() { + return false; + } + + @Override + public boolean hidden() { + return option.hidden(); + } + + @Override + public Class<? extends OptionHandler> handler() { + if (targetType.equals(Boolean.class)) { + return ExplicitBooleanOptionHandler.class; + } else if (targetType.equals(Integer.class)) { + return IntOptionHandler.class; + } else { + return StringOptionHandler.class; + } + } + + @Override + public String[] depends() { + return new String[0]; + } + + @Override + public String[] forbids() { + return new String[0]; + } + + @Override + public Class<? extends Annotation> annotationType() { + return Option.class; + } +} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/4c7b5bfa/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/config/Args4jSetter.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/config/Args4jSetter.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/config/Args4jSetter.java new file mode 100644 index 0000000..1367ef0 --- /dev/null +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/config/Args4jSetter.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hyracks.control.common.config; + +import java.lang.reflect.AnnotatedElement; +import java.util.function.BiConsumer; +import java.util.function.Consumer; + +import org.apache.hyracks.api.config.IOption; +import org.kohsuke.args4j.CmdLineException; +import org.kohsuke.args4j.spi.FieldSetter; +import org.kohsuke.args4j.spi.Setter; + +class Args4jSetter implements Setter { + private final IOption option; + private BiConsumer<IOption, Object> consumer; + private final boolean multiValued; + private final Class type; + + Args4jSetter(IOption option, BiConsumer<IOption, Object> consumer, boolean multiValued) { + this.option = option; + this.consumer = consumer; + this.multiValued = multiValued; + this.type = option.type().targetType(); + } + + Args4jSetter(Consumer<Object> consumer, boolean multiValued, Class type) { + this.option = null; + this.consumer = (o, value) -> consumer.accept(value); + this.multiValued = multiValued; + this.type = type; + } + + @Override + public void addValue(Object value) throws CmdLineException { + consumer.accept(option, value); + } + + @Override + public Class getType() { + return type; + } + + @Override + public boolean isMultiValued() { + return multiValued; + } + + @Override + public FieldSetter asFieldSetter() { + return null; + } + + @Override + public AnnotatedElement asAnnotatedElement() { + return null; + } +} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/4c7b5bfa/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/config/ConfigManager.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/config/ConfigManager.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/config/ConfigManager.java new file mode 100644 index 0000000..bfe759e --- /dev/null +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/config/ConfigManager.java @@ -0,0 +1,549 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hyracks.control.common.config; + +import java.io.IOException; +import java.io.Serializable; +import java.net.URL; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.Comparator; +import java.util.EnumMap; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.SortedMap; +import java.util.TreeMap; +import java.util.function.BiConsumer; +import java.util.function.Consumer; +import java.util.function.Function; +import java.util.function.Predicate; +import java.util.function.Supplier; +import java.util.logging.Level; +import java.util.logging.Logger; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import org.apache.commons.collections4.map.CompositeMap; +import org.apache.commons.collections4.multimap.ArrayListValuedHashMap; +import org.apache.hyracks.api.config.IApplicationConfig; +import org.apache.hyracks.api.config.IConfigManager; +import org.apache.hyracks.api.config.IConfigurator; +import org.apache.hyracks.api.config.IOption; +import org.apache.hyracks.api.config.Section; +import org.apache.hyracks.api.exceptions.HyracksException; +import org.apache.hyracks.control.common.application.ConfigManagerApplicationConfig; +import org.ini4j.Ini; +import org.ini4j.Profile; +import org.kohsuke.args4j.CmdLineException; +import org.kohsuke.args4j.CmdLineParser; +import org.kohsuke.args4j.Option; +import org.kohsuke.args4j.OptionHandlerFilter; + +public class ConfigManager implements IConfigManager, Serializable { + + private static final long serialVersionUID = 1L; + private static final Logger LOGGER = Logger.getLogger(ConfigManager.class.getName()); + + private HashSet<IOption> registeredOptions = new HashSet<>(); + private HashMap<IOption, Object> definedMap = new HashMap<>(); + private HashMap<IOption, Object> defaultMap = new HashMap<>(); + private CompositeMap<IOption, Object> configurationMap = new CompositeMap<>(definedMap, defaultMap, + new NoOpMapMutator()); + private EnumMap<Section, Map<String, IOption>> sectionMap = new EnumMap<>(Section.class); + private TreeMap<String, Map<IOption, Object>> nodeSpecificMap = new TreeMap<>(); + private transient ArrayListValuedHashMap<IOption, IConfigSetter> optionSetters = new ArrayListValuedHashMap<>(); + private final String[] args; + private ConfigManagerApplicationConfig appConfig = new ConfigManagerApplicationConfig(this); + private Set<String> allSections = new HashSet<>(); + private transient Collection<Consumer<List<String>>> argListeners = new ArrayList<>(); + private transient Collection<IOption> iniPointerOptions = new ArrayList<>(); + private transient Collection<Section> cmdLineSections = new ArrayList<>();; + private transient OptionHandlerFilter usageFilter; + private transient SortedMap<Integer, List<IConfigurator>> configurators = new TreeMap<>(); + private boolean configured; + + public ConfigManager() { + this(null); + } + + public ConfigManager(String[] args) { + this.args = args; + for (Section section : Section.values()) { + allSections.add(section.sectionName()); + } + addConfigurator(PARSE_INI_POINTERS_METRIC, this::extractIniPointersFromCommandLine); + addConfigurator(PARSE_INI_METRIC, this::parseIni); + addConfigurator(PARSE_COMMAND_LINE_METRIC, this::processCommandLine); + addConfigurator(APPLY_DEFAULTS_METRIC, this::applyDefaults); + } + + @Override + public void addConfigurator(int metric, IConfigurator configurator) { + configurators.computeIfAbsent(metric, metric1 -> new ArrayList<>()).add(configurator); + } + + @Override + public void addIniParamOptions(IOption... options) { + Stream.of(options).forEach(iniPointerOptions::add); + } + + @Override + public void addCmdLineSections(Section... sections) { + Stream.of(sections).forEach(cmdLineSections::add); + } + + @Override + public void setUsageFilter(OptionHandlerFilter usageFilter) { + this.usageFilter = usageFilter; + } + + @Override + public void register(IOption... options) { + for (IOption option : options) { + if (option.section() == Section.VIRTUAL || registeredOptions.contains(option)) { + continue; + } + if (configured) { + throw new IllegalStateException("configuration already processed"); + } + LOGGER.fine("registering option: " + option.toIniString()); + Map<String, IOption> optionMap = sectionMap.computeIfAbsent(option.section(), section -> new HashMap<>()); + IOption prev = optionMap.put(option.ini(), option); + if (prev != null) { + if (prev != option) { + throw new IllegalStateException("An option cannot be defined multiple times: " + + option.toIniString() + ": " + Arrays.asList(option.getClass(), prev.getClass())); + } + } else { + registeredOptions.add(option); + optionSetters.put(option, (node, value, isDefault) -> correctedMap(node, isDefault).put(option, value)); + if (LOGGER.isLoggable(Level.FINE)) { + optionSetters.put(option, (node, value, isDefault) -> LOGGER + .fine((isDefault ? "defaulting" : "setting ") + option.toIniString() + " to " + value)); + } + } + } + } + + private Map<IOption, Object> correctedMap(String node, boolean isDefault) { + return node == null ? (isDefault ? defaultMap : definedMap) + : nodeSpecificMap.computeIfAbsent(node, this::createNodeSpecificMap); + } + + public void registerVirtualNode(String nodeId) { + LOGGER.fine("registerVirtualNode: " + nodeId); + nodeSpecificMap.computeIfAbsent(nodeId, this::createNodeSpecificMap); + } + + private Map<IOption, Object> createNodeSpecificMap(String nodeId) { + LOGGER.fine("createNodeSpecificMap: " + nodeId); + return new HashMap<>(); + } + + @Override + @SafeVarargs + public final void register(final Class<? extends IOption>... optionClasses) { + for (Class<? extends IOption> optionClass : optionClasses) { + register(optionClass.getEnumConstants()); + } + } + + public IOption lookupOption(String section, String key) { + Map<String, IOption> map = getSectionOptionMap(Section.parseSectionName(section)); + return map == null ? null : map.get(key); + } + + public void processConfig() + throws CmdLineException, IOException { + if (!configured) { + for (List<IConfigurator> configuratorList : configurators.values()) { + for (IConfigurator configurator : configuratorList) { + configurator.run(); + } + } + configured = true; + } + } + + private void processCommandLine() throws CmdLineException { + List<String> appArgs = processCommandLine(cmdLineSections, usageFilter, this::cmdLineSet); + // now propagate the app args to the listeners... + argListeners.forEach(l -> l.accept(appArgs)); + } + + private void extractIniPointersFromCommandLine() throws CmdLineException { + Map<IOption, Object> cmdLineOptions = new HashMap<>(); + processCommandLine(cmdLineSections, usageFilter, cmdLineOptions::put); + for (IOption option : iniPointerOptions) { + if (cmdLineOptions.containsKey(option)) { + set(option, cmdLineOptions.get(option)); + } + } + } + + private void cmdLineSet(IOption option, Object value) { + invokeSetters(option, option.type().parse(String.valueOf(value)), null); + } + + private void invokeSetters(IOption option, Object value, String nodeId) { + optionSetters.get(option).forEach(setter -> setter.set(nodeId, value, false)); + } + + @SuppressWarnings({ "squid:S106", "squid:S1147" }) // use of System.err, System.exit() + private List<String> processCommandLine(Collection<Section> sections, OptionHandlerFilter usageFilter, + BiConsumer<IOption, Object> setAction) + throws CmdLineException { + final Args4jBean bean = new Args4jBean(); + CmdLineParser cmdLineParser = new CmdLineParser(bean); + final List<String> appArgs = new ArrayList<>(); + List<IOption> commandLineOptions = new ArrayList<>(); + for (Map.Entry<Section, Map<String, IOption>> sectionMapEntry : sectionMap.entrySet()) { + if (!sections.contains(sectionMapEntry.getKey())) { + continue; + } + for (IOption option : sectionMapEntry.getValue().values()) { + if (option.section() != Section.VIRTUAL) { + commandLineOptions.add(option); + } + } + } + commandLineOptions.sort(Comparator.comparing(IOption::cmdline)); + + commandLineOptions.forEach(option -> cmdLineParser.addOption(new Args4jSetter(option, setAction, false), + new Args4jOption(option, this, option.type().targetType()))); + + if (!argListeners.isEmpty()) { + cmdLineParser.addArgument(new Args4jSetter(o -> appArgs.add(String.valueOf(o)), true, String.class), + new Args4jArgument()); + } + LOGGER.fine("parsing cmdline: " + Arrays.toString(args)); + try { + if (args == null || args.length == 0) { + LOGGER.info("no command line args supplied"); + return appArgs; + } + cmdLineParser.parseArgument(args); + if (bean.help) { + ConfigUtils.printUsage(cmdLineParser, usageFilter, System.err); + System.exit(0); + } + } catch (CmdLineException e) { + if (bean.help) { + ConfigUtils.printUsage(cmdLineParser, usageFilter, System.err); + System.exit(0); + } else { + ConfigUtils.printUsage(e, usageFilter, System.err); + throw e; + } + } + return appArgs; + } + + private void parseIni() throws IOException { + Ini ini = null; + for (IOption option : iniPointerOptions) { + Object pointer = get(option); + if (pointer instanceof String) { + ini = ConfigUtils.loadINIFile((String)pointer); + } else if (pointer instanceof URL) { + ini = ConfigUtils.loadINIFile((URL)pointer); + } else if (pointer != null) { + throw new IllegalArgumentException("config file pointer options must be of type String (for file) or " + + "URL, instead of " + option.type().targetType()); + } + } + if (ini == null) { + LOGGER.info("no INI file specified; skipping parsing"); + return; + } + LOGGER.info("parsing INI file: " + ini); + for (Profile.Section section : ini.values()) { + allSections.add(section.getName()); + final Section rootSection = Section + .parseSectionName(section.getParent() == null ? section.getName() : section.getParent().getName()); + String node; + if (rootSection == Section.EXTENSION) { + parseExtensionIniSection(section); + continue; + } else if (rootSection == Section.NC) { + node = section.getName().equals(section.getSimpleName()) ? null : section.getSimpleName(); + } else if (Section.parseSectionName(section.getName()) != null) { + node = null; + } else { + throw new HyracksException("Unknown section in ini: " + section.getName()); + } + Map<String, IOption> optionMap = getSectionOptionMap(rootSection); + for (Map.Entry<String, String> iniOption : section.entrySet()) { + String name = iniOption.getKey(); + final IOption option = optionMap == null ? null : optionMap.get(name); + if (option == null) { + handleUnknownOption(section, name); + return; + } + final String value = iniOption.getValue(); + LOGGER.fine("setting " + option.toIniString() + " to " + value); + final Object parsed = option.type().parse(value); + invokeSetters(option, parsed, node); + } + } + } + + private void parseExtensionIniSection(Profile.Section section) { + // TODO(mblow): parse extensions + } + + private void handleUnknownOption(Profile.Section section, String name) throws HyracksException { + Set<String> matches = new HashSet<>(); + for (IOption registeredOption : registeredOptions) { + if (registeredOption.ini().equals(name)) { + matches.add(registeredOption.section().sectionName()); + } + } + if (!matches.isEmpty()) { + throw new HyracksException( + "Section mismatch for [" + section.getName() + "] " + name + ", expected section(s) " + matches); + } else { + throw new HyracksException("Unknown option in ini: [" + section.getName() + "] " + name); + } + } + + private void applyDefaults() { + LOGGER.fine("applying defaults"); + for (Map.Entry<Section, Map<String, IOption>> entry : sectionMap.entrySet()) { + if (entry.getKey() == Section.NC) { + entry.getValue().values().forEach(option -> getNodeNames() + .forEach(node -> getOrDefault(getNodeEffectiveMap(node), option, node))); + for (Map.Entry<String, Map<IOption, Object>> nodeMap : nodeSpecificMap.entrySet()) { + entry.getValue().values() + .forEach(option -> getOrDefault( + new CompositeMap<>(nodeMap.getValue(), definedMap, new NoOpMapMutator()), option, + nodeMap.getKey())); + } + // also push the defaults to the shared map, if the CC requests NC properties, they should receive the + // defaults -- TODO (mblow): seems lame, should log warning on access + } + entry.getValue().values().forEach(option -> getOrDefault(configurationMap, option, null)); + } + } + + private Object getOrDefault(Map<IOption, Object> map, IOption option, String nodeId) { + if (map.containsKey(option)) { + return map.get(option); + } else { + Object value = resolveDefault(option, new ConfigManagerApplicationConfig(this) { + @Override + public Object getStatic(IOption option) { + return getOrDefault(map, option, nodeId); + } + }); + if (value != null && optionSetters != null) { + optionSetters.get(option).forEach(setter -> setter.set(nodeId, value, true)); + } + return value; + } + } + + public Object resolveDefault(IOption option, IApplicationConfig applicationConfig) { + final Object value = option.defaultValue(); + if (value instanceof IOption) { + return applicationConfig.get((IOption) value); + } else if (value instanceof Supplier) { + //noinspection unchecked + return ((Supplier<?>) value).get(); + } else if (value instanceof Function) { + //noinspection unchecked + return ((Function<IApplicationConfig, ?>) value).apply(applicationConfig); + } else { + return value; + } + } + + @Override + public Set<Section> getSections(Predicate<Section> predicate) { + return Arrays.stream(Section.values()).filter(predicate).collect(Collectors.toSet()); + } + + @Override + public Set<Section> getSections() { + return getSections(section -> true); + } + + public Set<String> getSectionNames() { + return Collections.unmodifiableSet(allSections); + } + + public Set<String> getOptionNames(String sectionName) { + Set<String> optionNames = new HashSet<>(); + Section section = Section.parseSectionName(sectionName); + for (IOption option : getSectionOptionMap(section).values()) { + optionNames.add(option.ini()); + } + return optionNames; + } + + @Override + public Set<IOption> getOptions(Section section) { + return getSectionOptionMap(section).values().stream().collect(Collectors.toSet()); + } + + private Map<String, IOption> getSectionOptionMap(Section section) { + final Map<String, IOption> map = sectionMap.get(section); + return map != null ? map : Collections.emptyMap(); + } + + public List<String> getNodeNames() { + return Collections.unmodifiableList(new ArrayList(nodeSpecificMap.keySet())); + } + + public IApplicationConfig getNodeEffectiveConfig(String nodeId) { + final Map<IOption, Object> nodeMap = nodeSpecificMap.computeIfAbsent(nodeId, this::createNodeSpecificMap); + Map<IOption, Object> nodeEffectiveMap = getNodeEffectiveMap(nodeId); + return new ConfigManagerApplicationConfig(this) { + @Override + public Object getStatic(IOption option) { + if (!nodeEffectiveMap.containsKey(option)) { + // we need to calculate the default the the context of the node specific map... + nodeMap.put(option, getOrDefault(nodeEffectiveMap, option, nodeId)); + } + return nodeEffectiveMap.get(option); + } + }; + } + + private CompositeMap<IOption, Object> getNodeEffectiveMap(String nodeId) { + return new CompositeMap<>(nodeSpecificMap.get(nodeId), definedMap, new NoOpMapMutator()); + } + + public Ini toIni(boolean includeDefaults) { + Ini ini = new Ini(); + for (Map.Entry<IOption, Object> entry : (includeDefaults ? configurationMap : definedMap).entrySet()) { + if (entry.getValue() != null) { + final IOption option = entry.getKey(); + ini.add(option.section().sectionName(), option.ini(), option.type().serializeToIni(entry.getValue())); + } + } + for (Map.Entry<String, Map<IOption, Object>> nodeMapEntry : nodeSpecificMap.entrySet()) { + String section = Section.NC.sectionName() + "/" + nodeMapEntry.getKey(); + for (Map.Entry<IOption, Object> entry : nodeMapEntry.getValue().entrySet()) { + if (entry.getValue() != null) { + final IOption option = entry.getKey(); + ini.add(section, option.ini(), option.type().serializeToIni(entry.getValue())); + } + } + } + return ini; + } + + public void set(IOption option, Object value) { + set(null, option, value); + } + + public void set(String nodeId, IOption option, Object value) { + invokeSetters(option, value, nodeId); + } + + public Object get(IOption option) { + if (!registeredOptions.contains(option)) { + throw new IllegalStateException("Option not registered with ConfigManager: " + option.toIniString() + "(" + + option.getClass() + "." + option + ")"); + } else if (option.section() == Section.NC) { + LOGGER.warning("NC option " + option.toIniString() + " being accessed outside of NC-scoped configuration."); + } + return getOrDefault(configurationMap, option, null); + } + + public Set<IOption> getOptions() { + return Collections.unmodifiableSet(registeredOptions); + } + + @Override + public IApplicationConfig getAppConfig() { + return appConfig; + } + + public void registerArgsListener(Consumer<List<String>> argListener) { + argListeners.add(argListener); + } + + String getUsage(IOption option) { + final String description = option.description(); + StringBuilder usage = new StringBuilder(); + if (description != null && !"".equals(description)) { + usage.append(description).append(" "); + } else { + LOGGER.warning("missing description for option: " + + option.getClass().getName().substring(option.getClass().getName().lastIndexOf(".") + 1) + "." + + option.name()); + } + usage.append("(default: "); + usage.append(defaultTextForUsage(option, IOption::cmdline)); + usage.append(")"); + return usage.toString(); + } + + public String defaultTextForUsage(IOption option, Function<IOption, String> optionPrinter) { + StringBuilder buf = new StringBuilder(); + String override = option.usageDefaultOverride(appConfig, optionPrinter); + if (override != null) { + buf.append(override); + } else { + final Object value = option.defaultValue(); + if (value instanceof IOption) { + buf.append("same as ").append(optionPrinter.apply((IOption) value)); + } else if (value instanceof Function) { + // TODO(mblow): defer usage calculation to enable evaluation of function + buf.append("<function>"); + } else { + buf.append(option.type().serializeToString(resolveDefault(option, appConfig))); + } + // TODO(mblow): defer usage calculation to enable inclusion of evaluated actual default + } + return buf.toString(); + } + + private static class NoOpMapMutator implements CompositeMap.MapMutator<IOption, Object> { + @Override + public Object put(CompositeMap<IOption, Object> compositeMap, Map<IOption, Object>[] maps, IOption iOption, + Object o) { + throw new UnsupportedOperationException("mutations are not allowed"); + } + + @Override + public void putAll(CompositeMap<IOption, Object> compositeMap, Map<IOption, Object>[] maps, + Map<? extends IOption, ?> map) { + throw new UnsupportedOperationException("mutations are not allowed"); + } + + @Override + public void resolveCollision(CompositeMap<IOption, Object> compositeMap, Map<IOption, Object> map, + Map<IOption, Object> map1, Collection<IOption> collection) { + // no-op + } + } + + private static class Args4jBean { + @Option(name = "-help", help = true) + boolean help; + } +} \ No newline at end of file
