http://git-wip-us.apache.org/repos/asf/asterixdb/blob/4c7b5bfa/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/config/ConfigUtils.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/config/ConfigUtils.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/config/ConfigUtils.java new file mode 100644 index 0000000..adf1774 --- /dev/null +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/config/ConfigUtils.java @@ -0,0 +1,180 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hyracks.control.common.config; + +import java.io.File; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.OutputStreamWriter; +import java.io.PrintStream; +import java.lang.reflect.Field; +import java.net.URL; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.TreeMap; +import java.util.function.Predicate; + +import org.apache.hyracks.api.config.IApplicationConfig; +import org.apache.hyracks.api.config.IOption; +import org.apache.hyracks.control.common.controllers.ControllerConfig; +import org.ini4j.Ini; +import org.kohsuke.args4j.CmdLineException; +import org.kohsuke.args4j.CmdLineParser; +import org.kohsuke.args4j.OptionHandlerFilter; + +import com.fasterxml.jackson.databind.node.ArrayNode; +import com.fasterxml.jackson.databind.node.ObjectNode; + +/** + * Some utility functions for reading Ini4j objects with default values. + * For all getXxx() methods: if the 'section' contains a slash, and the 'key' + * is not found in that section, we will search for the key in the section named + * by stripping the leaf of the section name (final slash and anything following). + * eg. getInt(ini, "nc/red", "dir", null) will first look for the key "dir" in + * the section "nc/red", but if it is not found, will look in the section "nc". + */ +public class ConfigUtils { + private static final int USAGE_WIDTH = 120; + + private ConfigUtils() { + } + + private static <T> T getIniValue(Ini ini, String section, String key, T defaultValue, Class<T> clazz) { + T value; + while (true) { + value = ini.get(section, key, clazz); + if (value == null) { + int idx = section.lastIndexOf('/'); + if (idx > -1) { + section = section.substring(0, idx); + continue; + } + } + return (value != null) ? value : defaultValue; + } + } + + public static String getString(Ini ini, String section, String key, String defaultValue) { + return getIniValue(ini, section, key, defaultValue, String.class); + } + + public static int getInt(Ini ini, String section, String key, int defaultValue) { + return getIniValue(ini, section, key, defaultValue, Integer.class); + } + + public static long getLong(Ini ini, String section, String key, long defaultValue) { + return getIniValue(ini, section, key, defaultValue, Long.class); + } + + public static Ini loadINIFile(String configFile) throws IOException { + Ini ini = new Ini(); + File conffile = new File(configFile); + if (!conffile.exists()) { + throw new FileNotFoundException(configFile); + } + ini.load(conffile); + return ini; + } + + public static Ini loadINIFile(URL configURL) throws IOException { + Ini ini = new Ini(); + ini.load(configURL); + return ini; + } + + public static Field[] getFields(final Class beanClass, Predicate<Field> predicate) { + List<Field> fields = new ArrayList<>(); + for (Class clazz = beanClass; clazz != null && clazz.getClassLoader() != null + && clazz.getClassLoader().getParent() != null; clazz = clazz.getSuperclass()) { + for (Field f : clazz.getDeclaredFields()) { + if (predicate.test(f)) { + fields.add(f); + } + } + } + return fields.toArray(new Field[fields.size()]); + } + + public static void printUsage(CmdLineParser parser, OptionHandlerFilter filter, PrintStream out) { + parser.getProperties().withUsageWidth(USAGE_WIDTH); + parser.printUsage(new OutputStreamWriter(out), null, filter); + } + + public static void printUsage(CmdLineException e, OptionHandlerFilter filter, PrintStream out) { + out.println("ERROR: " + e.getMessage()); + printUsage(e.getParser(), filter, out); + } + + private static String getOptionValue(String[] args, String optionName) { + for (int i = 0; i < (args.length - 1); i++) { + if (args[i].equals(optionName)) { + return args[i + 1]; + } + } + return null; + } + + public static String getOptionValue(String[] args, IOption option) throws IOException { + String value = getOptionValue(args, option.cmdline()); + if (value == null) { + Ini iniFile = null; + String configFile = getOptionValue(args, ControllerConfig.Option.CONFIG_FILE.cmdline()); + String configFileUrl = getOptionValue(args, ControllerConfig.Option.CONFIG_FILE_URL.cmdline()); + if (configFile != null) { + iniFile = loadINIFile(configFile); + } else if (configFileUrl != null) { + iniFile = loadINIFile(new URL(configFileUrl)); + } + if (iniFile != null) { + value = iniFile.get(option.section().sectionName(), option.ini()); + } + } + return value; + } + + public static String getString(Ini ini, org.apache.hyracks.api.config.Section section, + IOption option, String defaultValue) { + return getString(ini, section.sectionName(), option.ini(), defaultValue); + } + + public static void addConfigToJSON(ObjectNode o, IApplicationConfig cfg, + org.apache.hyracks.api.config.Section... sections) { + ArrayNode configArray = o.putArray("config"); + for (org.apache.hyracks.api.config.Section section : cfg.getSections(Arrays.asList(sections)::contains)) { + ObjectNode sectionNode = configArray.addObject(); + Map<String, Object> sectionConfig = getSectionOptionsForJSON(cfg, section, option -> true); + sectionNode.put("section", section.sectionName()) + .putPOJO("properties", sectionConfig); + } + } + + public static Map<String, Object> getSectionOptionsForJSON(IApplicationConfig cfg, + org.apache.hyracks.api.config.Section section, + Predicate<IOption> selector) { + Map<String, Object> sectionConfig = new TreeMap<>(); + for (IOption option : cfg.getOptions(section)) { + if (selector.test(option)) { + sectionConfig.put(option.ini(), option.type().serializeToJSON(cfg.get(option))); + } + } + return sectionConfig; + } +}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/4c7b5bfa/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/config/IConfigSetter.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/config/IConfigSetter.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/config/IConfigSetter.java new file mode 100644 index 0000000..2234cca --- /dev/null +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/config/IConfigSetter.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hyracks.control.common.config; + +@FunctionalInterface +public interface IConfigSetter { + void set(String nodeId, Object value, boolean isDefault) throws SetException; + + class SetException extends RuntimeException { + public SetException(Exception e) { + super(e); + } + } +} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/4c7b5bfa/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/config/OptionTypes.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/config/OptionTypes.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/config/OptionTypes.java new file mode 100644 index 0000000..fc26b5e --- /dev/null +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/config/OptionTypes.java @@ -0,0 +1,176 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hyracks.control.common.config; + +import java.net.MalformedURLException; +import java.util.logging.Level; + +import org.apache.hyracks.api.config.IOptionType; +import org.apache.hyracks.util.StorageUtil; + +public class OptionTypes { + + public static final IOptionType<Integer> INTEGER_BYTE_UNIT = new IOptionType<Integer>() { + @Override + public Integer parse(String s) { + long result1 = StorageUtil.getByteValue(s); + if (result1 > Integer.MAX_VALUE || result1 < Integer.MIN_VALUE) { + throw new IllegalArgumentException( + "The given value: " + result1 + " is not within the int range."); + } + return (int) result1; + } + + @Override + public Class<Integer> targetType() { + return Integer.class; + } + }; + + public static final IOptionType<Long> LONG_BYTE_UNIT = new IOptionType<Long>() { + @Override + public Long parse(String s) { + return StorageUtil.getByteValue(s); + } + + @Override + public Class<Long> targetType() { + return Long.class; + } + }; + + public static final IOptionType<Integer> INTEGER = new IOptionType<Integer>() { + @Override + public Integer parse(String s) { + return Integer.parseInt(s); + } + + @Override + public Class<Integer> targetType() { + return Integer.class; + } + }; + + public static final IOptionType<Double> DOUBLE = new IOptionType<Double>() { + @Override + public Double parse(String s) { + return Double.parseDouble(s); + } + + @Override + public Class<Double> targetType() { + return Double.class; + } + }; + + public static final IOptionType<String> STRING = new IOptionType<String>() { + @Override + public String parse(String s) { + return s; + } + + @Override + public Class<String> targetType() { + return String.class; + } + }; + + public static final IOptionType<Long> LONG = new IOptionType<Long>() { + @Override + public Long parse(String s) { + return Long.parseLong(s); + } + + @Override + public Class<Long> targetType() { + return Long.class; + } + }; + + public static final IOptionType<Boolean> BOOLEAN = new IOptionType<Boolean>() { + @Override + public Boolean parse(String s) { + return Boolean.parseBoolean(s); + } + + @Override + public Class<Boolean> targetType() { + return Boolean.class; + } + }; + + public static final IOptionType<Level> LEVEL = new IOptionType<Level>() { + @Override + public Level parse(String s) { + return Level.parse(s); + } + + @Override + public Class<Level> targetType() { + return Level.class; + } + + @Override + public Object serializeToJSON(Object value) { + return ((Level)value).getName(); + } + + @Override + public String serializeToIni(Object value) { + return ((Level)value).getName(); + } + }; + + public static final IOptionType<String []> STRING_ARRAY = new IOptionType<String []>() { + @Override + public String [] parse(String s) { + return s.split("\\s*,\\s*"); + } + + @Override + public Class<String []> targetType() { + return String [].class; + } + + @Override + public String serializeToIni(Object value) { + return String.join(",", (String [])value); + } + }; + + public static final IOptionType<java.net.URL> URL = new IOptionType<java.net.URL>() { + @Override + public java.net.URL parse(String s) { + try { + return new java.net.URL(s); + } catch (MalformedURLException e) { + throw new IllegalArgumentException(e); + } + } + + @Override + public Class<java.net.URL> targetType() { + return java.net.URL.class; + } + }; + + + private OptionTypes() { + } +} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/4c7b5bfa/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/context/ServerContext.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/context/ServerContext.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/context/ServerContext.java index d4dc054..ff037f0 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/context/ServerContext.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/context/ServerContext.java @@ -29,7 +29,7 @@ public class ServerContext { private final ServerType type; private final File baseDir; - public ServerContext(ServerType type, File baseDir) throws Exception { + public ServerContext(ServerType type, File baseDir) { this.type = type; this.baseDir = baseDir; } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/4c7b5bfa/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/CCConfig.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/CCConfig.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/CCConfig.java index b636a09..fbde58c 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/CCConfig.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/CCConfig.java @@ -18,156 +18,160 @@ */ package org.apache.hyracks.control.common.controllers; +import static org.apache.hyracks.control.common.config.OptionTypes.INTEGER; +import static org.apache.hyracks.control.common.config.OptionTypes.LONG; +import static org.apache.hyracks.control.common.config.OptionTypes.STRING; + import java.io.File; -import java.io.IOException; import java.net.InetAddress; -import java.net.URL; +import java.util.ArrayList; import java.util.List; +import java.util.function.Supplier; -import org.apache.hyracks.api.application.IApplicationConfig; -import org.apache.hyracks.control.common.application.IniApplicationConfig; +import org.apache.hyracks.api.config.IOption; +import org.apache.hyracks.api.config.IOptionType; +import org.apache.hyracks.api.config.Section; +import org.apache.hyracks.control.common.config.ConfigManager; +import org.apache.hyracks.util.file.FileUtil; import org.ini4j.Ini; -import org.kohsuke.args4j.Argument; -import org.kohsuke.args4j.Option; -import org.kohsuke.args4j.spi.StopOptionHandler; -public class CCConfig { - @Option(name = "-address", usage = "IP Address for CC (default: localhost)", required = false) - public String ipAddress = InetAddress.getLoopbackAddress().getHostAddress(); +public class CCConfig extends ControllerConfig { - @Option(name = "-client-net-ip-address", - usage = "Sets the IP Address to listen for connections from clients (default: same as -address)", - required = false) - public String clientNetIpAddress; + public static String defaultDir = System.getProperty("java.io.tmpdir"); - @Option(name = "-client-net-port", usage = "Sets the port to listen for connections from clients (default 1098)") - public int clientNetPort = 1098; + public enum Option implements IOption { + APP_CLASS(STRING), + ADDRESS(STRING, InetAddress.getLoopbackAddress().getHostAddress()), + CLUSTER_LISTEN_ADDRESS(STRING, ADDRESS), + CLUSTER_LISTEN_PORT(INTEGER, 1099), + CLUSTER_PUBLIC_ADDRESS(STRING, CLUSTER_LISTEN_ADDRESS), + CLUSTER_PUBLIC_PORT(INTEGER, CLUSTER_LISTEN_PORT), + CLIENT_LISTEN_ADDRESS(STRING, ADDRESS), + CLIENT_LISTEN_PORT(INTEGER, 1098), + CONSOLE_LISTEN_ADDRESS(STRING, ADDRESS), + CONSOLE_LISTEN_PORT(INTEGER, 16001), + HEARTBEAT_PERIOD(INTEGER, 10000), // TODO (mblow): add time unit + HEARTBEAT_MAX_MISSES(INTEGER, 5), + PROFILE_DUMP_PERIOD(INTEGER, 0), + JOB_HISTORY_SIZE(INTEGER, 10), + RESULT_TTL(LONG, 86400000L), // TODO(mblow): add time unit + RESULT_SWEEP_THRESHOLD(LONG, 60000L), // TODO(mblow): add time unit + @SuppressWarnings("RedundantCast") // not redundant- false positive from IDEA + ROOT_DIR(STRING, (Supplier<String>)() -> FileUtil.joinPath(defaultDir, "ClusterControllerService")), + CLUSTER_TOPOLOGY(STRING), + JOB_QUEUE_CLASS(STRING, "org.apache.hyracks.control.cc.scheduler.FIFOJobQueue"), + JOB_MANAGER_CLASS(STRING, "org.apache.hyracks.control.cc.job.JobManager"); - // QQQ Note that clusterNetIpAddress is *not directly used* yet. Both - // the cluster listener and the web server listen on "all interfaces". - // This IP address is only used to instruct the NC on which IP to call in. - @Option(name = "-cluster-net-ip-address", - usage = "Sets the IP Address to listen for connections from NCs (default: same as -address)", - required = false) - public String clusterNetIpAddress; - - @Option(name = "-cluster-net-port", - usage = "Sets the port to listen for connections from node controllers (default 1099)") - public int clusterNetPort = 1099; - - @Option(name = "-http-port", usage = "Sets the http port for the Cluster Controller (default: 16001)") - public int httpPort = 16001; - - @Option(name = "-heartbeat-period", - usage = "Sets the time duration between two heartbeats from each node controller in milliseconds" + - " (default: 10000)") - public int heartbeatPeriod = 10000; - - @Option(name = "-max-heartbeat-lapse-periods", - usage = "Sets the maximum number of missed heartbeats before a node is marked as dead (default: 5)") - public int maxHeartbeatLapsePeriods = 5; - - @Option(name = "-profile-dump-period", usage = "Sets the time duration between two profile dumps from each node " + - "controller in milliseconds. 0 to disable. (default: 0)") - public int profileDumpPeriod = 0; - - @Option(name = "-default-max-job-attempts", usage = "Sets the default number of job attempts allowed if not " + - "specified in the job specification. (default: 5)") - public int defaultMaxJobAttempts = 5; - - @Option(name = "-job-history-size", usage = "Limits the number of historical jobs remembered by the system to " + - "the specified value. (default: 10)") - public int jobHistorySize = 10; - - @Option(name = "-result-time-to-live", usage = "Limits the amount of time results for asynchronous jobs should " + - "be retained by the system in milliseconds. (default: 24 hours)") - public long resultTTL = 86400000; - - @Option(name = "-result-sweep-threshold", usage = "The duration within which an instance of the result cleanup " + - "should be invoked in milliseconds. (default: 1 minute)") - public long resultSweepThreshold = 60000; - - @Option(name = "-cc-root", - usage = "Sets the root folder used for file operations. (default: ClusterControllerService)") - public String ccRoot = "ClusterControllerService"; - - @Option(name = "-cluster-topology", required = false, - usage = "Sets the XML file that defines the cluster topology. (default: null)") - public File clusterTopologyDefinition = null; - - @Option(name = "-app-cc-main-class", required = false, usage = "Application CC Main Class") - public String appCCMainClass = null; - - @Option(name = "-config-file", - usage = "Specify path to master configuration file (default: none)", required = false) - public String configFile = null; - - @Option(name = "-job-queue-class-name", usage = "Specify the implementation class name for the job queue. (default:" - + " org.apache.hyracks.control.cc.scheduler.FIFOJobQueue)", - required = false) - public String jobQueueClassName = "org.apache.hyracks.control.cc.scheduler.FIFOJobQueue"; - - @Option(name = "-job-manager-class-name", usage = "Specify the implementation class name for the job manager. " - + "(default: org.apache.hyracks.control.cc.job.JobManager)", required = false) - public String jobManagerClassName = "org.apache.hyracks.control.cc.job.JobManager"; - - @Argument - @Option(name = "--", handler = StopOptionHandler.class) - public List<String> appArgs; - - public URL configFileUrl = null; - - private Ini ini = null; - - private void loadINIFile() throws IOException { - // This method simply maps from the ini parameters to the CCConfig's fields. - // It does not apply defaults or any logic. - if (configFile != null) { - ini = IniUtils.loadINIFile(configFile); - } else if (configFileUrl != null) { - ini = IniUtils.loadINIFile(configFileUrl); - } else { - return; + private final IOptionType parser; + private final Object defaultValue; + + <T> Option(IOptionType<T> parser) { + this(parser, (T)null); } - ipAddress = IniUtils.getString(ini, "cc", "address", ipAddress); - clientNetIpAddress = IniUtils.getString(ini, "cc", "client.address", clientNetIpAddress); - clientNetPort = IniUtils.getInt(ini, "cc", "client.port", clientNetPort); - clusterNetIpAddress = IniUtils.getString(ini, "cc", "cluster.address", clusterNetIpAddress); - clusterNetPort = IniUtils.getInt(ini, "cc", "cluster.port", clusterNetPort); - httpPort = IniUtils.getInt(ini, "cc", "http.port", httpPort); - heartbeatPeriod = IniUtils.getInt(ini, "cc", "heartbeat.period", heartbeatPeriod); - maxHeartbeatLapsePeriods = IniUtils.getInt(ini, "cc", "heartbeat.maxlapse", maxHeartbeatLapsePeriods); - profileDumpPeriod = IniUtils.getInt(ini, "cc", "profiledump.period", profileDumpPeriod); - defaultMaxJobAttempts = IniUtils.getInt(ini, "cc", "job.defaultattempts", defaultMaxJobAttempts); - jobHistorySize = IniUtils.getInt(ini, "cc", "job.historysize", jobHistorySize); - resultTTL = IniUtils.getLong(ini, "cc", "results.ttl", resultTTL); - resultSweepThreshold = IniUtils.getLong(ini, "cc", "results.sweepthreshold", resultSweepThreshold); - ccRoot = IniUtils.getString(ini, "cc", "rootfolder", ccRoot); - // QQQ clusterTopologyDefinition is a "File"; should support verifying that the file - // exists, as @Option likely does - appCCMainClass = IniUtils.getString(ini, "cc", "app.class", appCCMainClass); - } + <T> Option(IOptionType<T> parser, Option defaultOption) { + this.parser = parser; + this.defaultValue = defaultOption; + } - /** - * Once all @Option fields have been loaded from command-line or otherwise - * specified programmatically, call this method to: - * 1. Load options from a config file (as specified by -config-file) - * 2. Set default values for certain derived values, such as setting - * clusterNetIpAddress to ipAddress - */ - public void loadConfigAndApplyDefaults() throws IOException { - loadINIFile(); - if (ini != null) { - // QQQ This way of passing overridden/defaulted values back into - // the ini feels clunky, and it's clearly incomplete - ini.add("cc", "cluster.address", clusterNetIpAddress); - ini.add("cc", "client.address", clientNetIpAddress); + <T> Option(IOptionType<T> parser, T defaultValue) { + this.parser = parser; + this.defaultValue = defaultValue; + } + + <T> Option(IOptionType<T> parser, Supplier<T> defaultValue) { + this.parser = parser; + this.defaultValue = defaultValue; + } + + @Override + public Section section() { + return Section.CC; + } + + @Override + public IOptionType type() { + return parser; + } + + @Override + public Object defaultValue() { + return defaultValue; } - // "address" is the default for all IP addresses - clusterNetIpAddress = clusterNetIpAddress == null ? ipAddress : clusterNetIpAddress; - clientNetIpAddress = clientNetIpAddress == null ? ipAddress : clientNetIpAddress; + @Override + public String description() { + switch (this) { + case APP_CLASS: + return "Application CC main class"; + case ADDRESS: + return "Default bind address for all services on this cluster controller"; + case CLUSTER_LISTEN_ADDRESS: + return "Sets the IP Address to listen for connections from NCs"; + case CLUSTER_LISTEN_PORT: + return "Sets the port to listen for connections from node controllers"; + case CLUSTER_PUBLIC_ADDRESS: + return "Address that NCs should use to contact this CC"; + case CLUSTER_PUBLIC_PORT: + return "Port that NCs should use to contact this CC"; + case CLIENT_LISTEN_ADDRESS: + return "Sets the IP Address to listen for connections from clients"; + case CLIENT_LISTEN_PORT: + return "Sets the port to listen for connections from clients"; + case CONSOLE_LISTEN_ADDRESS: + return "Sets the listen address for the Cluster Controller"; + case CONSOLE_LISTEN_PORT: + return "Sets the http port for the Cluster Controller)"; + case HEARTBEAT_PERIOD: + return "Sets the time duration between two heartbeats from each node controller in milliseconds"; + case HEARTBEAT_MAX_MISSES: + return "Sets the maximum number of missed heartbeats before a node is marked as dead"; + case PROFILE_DUMP_PERIOD: + return "Sets the time duration between two profile dumps from each node controller in " + + "milliseconds; 0 to disable"; + case JOB_HISTORY_SIZE: + return "Limits the number of historical jobs remembered by the system to the specified value"; + case RESULT_TTL: + return "Limits the amount of time results for asynchronous jobs should be retained by the system " + + "in milliseconds"; + case RESULT_SWEEP_THRESHOLD: + return "The duration within which an instance of the result cleanup should be invoked in " + + "milliseconds"; + case ROOT_DIR: + return "Sets the root folder used for file operations"; + case CLUSTER_TOPOLOGY: + return "Sets the XML file that defines the cluster topology"; + case JOB_QUEUE_CLASS: + return "Specify the implementation class name for the job queue"; + case JOB_MANAGER_CLASS: + return "Specify the implementation class name for the job manager"; + default: + throw new IllegalStateException("NYI: " + this); + } + } + } + + private final ConfigManager configManager; + + private List<String> appArgs = new ArrayList<>(); + + public CCConfig() { + this(new ConfigManager()); + } + + public CCConfig(ConfigManager configManager) { + super(configManager); + this.configManager = configManager; + configManager.register(Option.class); + configManager.registerArgsListener(appArgs::addAll); + } + + public List<String> getAppArgs() { + return appArgs; + } + + public String[] getAppArgsArray() { + return appArgs.toArray(new String[appArgs.size()]); } /** @@ -175,15 +179,158 @@ public class CCConfig { * if -config-file wasn't specified. */ public Ini getIni() { - return ini; + return configManager.toIni(false); } - /** - * @return An IApplicationConfig representing this NCConfig. - * Note: Currently this only includes the values from the configuration - * file, not anything specified on the command-line. QQQ - */ - public IApplicationConfig getAppConfig() { - return new IniApplicationConfig(ini); + public ConfigManager getConfigManager() { + return configManager; + } + + // QQQ Note that clusterListenAddress is *not directly used* yet. Both + // the cluster listener and the web server listen on "all interfaces". + // This IP address is only used to instruct the NC on which IP to call in. + public String getClusterListenAddress() { + return getAppConfig().getString(Option.CLUSTER_LISTEN_ADDRESS); + } + + public void setClusterListenAddress(String clusterListenAddress) { + configManager.set(Option.CLUSTER_LISTEN_ADDRESS, clusterListenAddress); + } + + public int getClusterListenPort() { + return getAppConfig().getInt(Option.CLUSTER_LISTEN_PORT); + } + + public void setClusterListenPort(int clusterListenPort) { + configManager.set(Option.CLUSTER_LISTEN_PORT, clusterListenPort); + } + + public String getClusterPublicAddress() { + return getAppConfig().getString(Option.CLUSTER_PUBLIC_ADDRESS); + } + + public void setClusterPublicAddress(String clusterPublicAddress) { + configManager.set(Option.CLUSTER_PUBLIC_ADDRESS, clusterPublicAddress); + } + + public int getClusterPublicPort() { + return getAppConfig().getInt(Option.CLUSTER_PUBLIC_PORT); + } + + public void setClusterPublicPort(int clusterPublicPort) { + configManager.set(Option.CLUSTER_PUBLIC_PORT, clusterPublicPort); + } + + public String getClientListenAddress() { + return getAppConfig().getString(Option.CLIENT_LISTEN_ADDRESS); + } + + public void setClientListenAddress(String clientListenAddress) { + configManager.set(Option.CLIENT_LISTEN_ADDRESS, clientListenAddress); + } + + public int getClientListenPort() { + return getAppConfig().getInt(Option.CLIENT_LISTEN_PORT); + } + + public void setClientListenPort(int clientListenPort) { + configManager.set(Option.CLIENT_LISTEN_PORT, clientListenPort); + } + + public int getConsoleListenPort() { + return getAppConfig().getInt(Option.CONSOLE_LISTEN_PORT); + } + + public void setConsoleListenPort(int consoleListenPort) { + configManager.set(Option.CONSOLE_LISTEN_PORT, consoleListenPort); + } + + public int getHeartbeatPeriod() { + return getAppConfig().getInt(Option.HEARTBEAT_PERIOD); + } + + public void setHeartbeatPeriod(int heartbeatPeriod) { + configManager.set(Option.HEARTBEAT_PERIOD, heartbeatPeriod); + } + + public int getHeartbeatMaxMisses() { + return getAppConfig().getInt(Option.HEARTBEAT_MAX_MISSES); + } + + public void setHeartbeatMaxMisses(int heartbeatMaxMisses) { + configManager.set(Option.HEARTBEAT_MAX_MISSES, heartbeatMaxMisses); + } + + public int getProfileDumpPeriod() { + return getAppConfig().getInt(Option.PROFILE_DUMP_PERIOD); + } + + public void setProfileDumpPeriod(int profileDumpPeriod) { + configManager.set(Option.PROFILE_DUMP_PERIOD, profileDumpPeriod); + } + + public int getJobHistorySize() { + return getAppConfig().getInt(Option.JOB_HISTORY_SIZE); + } + + public void setJobHistorySize(int jobHistorySize) { + configManager.set(Option.JOB_HISTORY_SIZE, jobHistorySize); + } + + public long getResultTTL() { + return getAppConfig().getLong(Option.RESULT_TTL); + } + + public void setResultTTL(long resultTTL) { + configManager.set(Option.RESULT_TTL, resultTTL); + } + + public long getResultSweepThreshold() { + return getAppConfig().getLong(Option.RESULT_SWEEP_THRESHOLD); + } + + public void setResultSweepThreshold(long resultSweepThreshold) { + configManager.set(Option.RESULT_SWEEP_THRESHOLD, resultSweepThreshold); + } + + public String getRootDir() { + return getAppConfig().getString(Option.ROOT_DIR); + } + + public void setRootDir(String rootDir) { + configManager.set(Option.ROOT_DIR, rootDir); + } + + public File getClusterTopology() { + return getAppConfig().getString(Option.CLUSTER_TOPOLOGY) == null ? null + : new File(getAppConfig().getString(Option.CLUSTER_TOPOLOGY)); + } + + public void setClusterTopology(File clusterTopology) { + configManager.set(Option.CLUSTER_TOPOLOGY, clusterTopology); + } + + public String getAppClass() { + return getAppConfig().getString(Option.APP_CLASS); + } + + public void setAppClass(String appClass) { + configManager.set(Option.APP_CLASS, appClass); + } + + public String getJobQueueClass() { + return getAppConfig().getString(Option.JOB_QUEUE_CLASS); + } + + public void setJobQueueClass(String jobQueueClass) { + configManager.set(Option.JOB_QUEUE_CLASS, jobQueueClass); + } + + public String getJobManagerClass() { + return getAppConfig().getString(Option.JOB_MANAGER_CLASS); + } + + public void setJobManagerClass(String jobManagerClass) { + configManager.set(Option.JOB_MANAGER_CLASS, jobManagerClass); } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/4c7b5bfa/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/ControllerConfig.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/ControllerConfig.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/ControllerConfig.java new file mode 100644 index 0000000..4aae6df --- /dev/null +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/ControllerConfig.java @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hyracks.control.common.controllers; + +import java.io.Serializable; +import java.net.URL; + +import org.apache.hyracks.api.config.IApplicationConfig; +import org.apache.hyracks.api.config.IOption; +import org.apache.hyracks.api.config.IOptionType; +import org.apache.hyracks.api.config.Section; +import org.apache.hyracks.control.common.config.ConfigManager; +import org.apache.hyracks.control.common.config.OptionTypes; + +public class ControllerConfig implements Serializable { + + public enum Option implements IOption { + CONFIG_FILE(OptionTypes.STRING, "Specify path to master configuration file"), + CONFIG_FILE_URL(OptionTypes.URL, "Specify URL to master configuration file"); + + private final IOptionType type; + private final String description; + + + Option(IOptionType type, String description) { + this.type = type; + this.description = description; + } + + @Override + public Section section() { + return Section.COMMON; + } + + @Override + public String description() { + return description; + } + + @Override + public IOptionType type() { + return type; + } + + @Override + public Object defaultValue() { + return null; + } + } + + protected final ConfigManager configManager; + + protected ControllerConfig(ConfigManager configManager) { + this.configManager = configManager; + } + + public IApplicationConfig getAppConfig() { + return configManager.getAppConfig(); + } + + public String getConfigFile() { + return getAppConfig().getString(ControllerConfig.Option.CONFIG_FILE); + } + + public void setConfigFile(String configFile) { + configManager.set(ControllerConfig.Option.CONFIG_FILE, configFile); + } + + public URL getConfigFileUrl() { + return (URL) getAppConfig().get(ControllerConfig.Option.CONFIG_FILE_URL); + } + + public void setConfigFileUrl(URL configFileUrl) { + configManager.set(ControllerConfig.Option.CONFIG_FILE_URL, configFileUrl); + } +} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/4c7b5bfa/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/IniUtils.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/IniUtils.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/IniUtils.java deleted file mode 100644 index 451421d..0000000 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/IniUtils.java +++ /dev/null @@ -1,103 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.hyracks.control.common.controllers; - -import java.io.File; -import java.io.FileNotFoundException; -import java.io.IOException; -import java.lang.reflect.Array; -import java.net.URL; - -import org.ini4j.Ini; -import org.ini4j.Profile.Section; - -/** - * Some utility functions for reading Ini4j objects with default values. - * For all getXxx() methods: if the 'section' contains a slash, and the 'key' - * is not found in that section, we will search for the key in the section named - * by stripping the leaf of the section name (final slash and anything following). - * eg. getInt(ini, "nc/red", "dir", null) will first look for the key "dir" in - * the section "nc/red", but if it is not found, will look in the section "nc". - */ -public class IniUtils { - - private IniUtils() { - } - - private static <T> T getIniValue(Ini ini, String section, String key, T defaultValue, Class<T> clazz) { - T value; - while (true) { - value = ini.get(section, key, clazz); - if (value == null) { - int idx = section.lastIndexOf('/'); - if (idx > -1) { - section = section.substring(0, idx); - continue; - } - } - break; - } - return (value != null) ? value : defaultValue; - } - - @SuppressWarnings("unchecked") - private static <T> T getIniArray(Ini ini, String section, String key, Class<T> clazz) { - Section sec = ini.get(section); - if (clazz.getComponentType() == null) { - return null; - } - if (sec == null) { - return (T) Array.newInstance(clazz.getComponentType(), 0); - } else { - return sec.getAll(key, clazz); - } - } - - public static String getString(Ini ini, String section, String key, String defaultValue) { - return getIniValue(ini, section, key, defaultValue, String.class); - } - - public static String[] getStringArray(Ini ini, String section, String key) { - return getIniArray(ini, section, key, String[].class); - } - - public static int getInt(Ini ini, String section, String key, int defaultValue) { - return getIniValue(ini, section, key, defaultValue, Integer.class); - } - - public static long getLong(Ini ini, String section, String key, long defaultValue) { - return getIniValue(ini, section, key, defaultValue, Long.class); - } - - public static Ini loadINIFile(String configFile) throws IOException { - Ini ini = new Ini(); - File conffile = new File(configFile); - if (!conffile.exists()) { - throw new FileNotFoundException(configFile); - } - ini.load(conffile); - return ini; - } - - public static Ini loadINIFile(URL configURL) throws IOException { - Ini ini = new Ini(); - ini.load(configURL); - return ini; - } -} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/4c7b5bfa/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/NCConfig.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/NCConfig.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/NCConfig.java index fa7d76a..7906b52 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/NCConfig.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/NCConfig.java @@ -18,287 +18,485 @@ */ package org.apache.hyracks.control.common.controllers; -import java.io.IOException; -import java.io.Serializable; +import static org.apache.hyracks.control.common.config.OptionTypes.BOOLEAN; +import static org.apache.hyracks.control.common.config.OptionTypes.INTEGER; +import static org.apache.hyracks.control.common.config.OptionTypes.INTEGER_BYTE_UNIT; +import static org.apache.hyracks.control.common.config.OptionTypes.LONG; +import static org.apache.hyracks.control.common.config.OptionTypes.STRING; +import static org.apache.hyracks.control.common.config.OptionTypes.STRING_ARRAY; + import java.net.InetAddress; -import java.net.URL; +import java.util.ArrayList; import java.util.List; -import java.util.Map; +import java.util.function.Supplier; + +import org.apache.hyracks.api.config.IApplicationConfig; +import org.apache.hyracks.api.config.IOption; +import org.apache.hyracks.api.config.IOptionType; +import org.apache.hyracks.api.config.Section; +import org.apache.hyracks.control.common.config.ConfigManager; +import org.apache.hyracks.util.file.FileUtil; + +public class NCConfig extends ControllerConfig { + private static final long serialVersionUID = 3L; + + public static String defaultDir = System.getProperty("java.io.tmpdir"); + public static String defaultAppClass = null; + + public enum Option implements IOption { + ADDRESS(STRING, InetAddress.getLoopbackAddress().getHostAddress()), + PUBLIC_ADDRESS(STRING, ADDRESS), + CLUSTER_LISTEN_ADDRESS(STRING, ADDRESS), + CLUSTER_LISTEN_PORT(INTEGER, 0), + NCSERVICE_ADDRESS(STRING, PUBLIC_ADDRESS), + NCSERVICE_PORT(INTEGER, 9090), + CLUSTER_ADDRESS(STRING, (String)null), + CLUSTER_PORT(INTEGER, 1099), + CLUSTER_PUBLIC_ADDRESS(STRING, PUBLIC_ADDRESS), + CLUSTER_PUBLIC_PORT(INTEGER, CLUSTER_LISTEN_PORT), + NODE_ID(STRING, (String)null), + DATA_LISTEN_ADDRESS(STRING, ADDRESS), + DATA_LISTEN_PORT(INTEGER, 0), + DATA_PUBLIC_ADDRESS(STRING, PUBLIC_ADDRESS), + DATA_PUBLIC_PORT(INTEGER, DATA_LISTEN_PORT), + RESULT_LISTEN_ADDRESS(STRING, ADDRESS), + RESULT_LISTEN_PORT(INTEGER, 0), + RESULT_PUBLIC_ADDRESS(STRING, PUBLIC_ADDRESS), + RESULT_PUBLIC_PORT(INTEGER, RESULT_LISTEN_PORT), + MESSAGING_LISTEN_ADDRESS(STRING, ADDRESS), + MESSAGING_LISTEN_PORT(INTEGER, 0), + MESSAGING_PUBLIC_ADDRESS(STRING, PUBLIC_ADDRESS), + MESSAGING_PUBLIC_PORT(INTEGER, MESSAGING_LISTEN_PORT), + CLUSTER_CONNECT_RETRIES(INTEGER, 5), + @SuppressWarnings("RedundantCast") // not redundant- false positive from IDEA + IODEVICES(STRING_ARRAY, (Supplier<String []>)() -> new String [] { FileUtil.joinPath(defaultDir, "iodevice") }), + NET_THREAD_COUNT(INTEGER, 1), + NET_BUFFER_COUNT(INTEGER, 1), + RESULT_TTL(LONG, 86400000L), + RESULT_SWEEP_THRESHOLD(LONG, 60000L), + RESULT_MANAGER_MEMORY(INTEGER_BYTE_UNIT, -1), + @SuppressWarnings("RedundantCast") // not redundant- false positive from IDEA + APP_CLASS(STRING, (Supplier<String>)() -> defaultAppClass), + NCSERVICE_PID(INTEGER, -1), + COMMAND(STRING, "hyracksnc"), + JVM_ARGS(STRING, (String)null), + VIRTUAL_NC(BOOLEAN, false); + + private final IOptionType parser; + private final Object defaultValue; + + <T> Option(IOptionType<T> parser, Option defaultOption) { + this.parser = parser; + this.defaultValue = defaultOption; + } -import org.apache.hyracks.api.application.IApplicationConfig; -import org.apache.hyracks.control.common.application.IniApplicationConfig; -import org.ini4j.Ini; -import org.kohsuke.args4j.Argument; -import org.kohsuke.args4j.Option; -import org.kohsuke.args4j.spi.StopOptionHandler; + <T> Option(IOptionType<T> parser, T defaultValue) { + this.parser = parser; + this.defaultValue = defaultValue; + } -public class NCConfig implements Serializable { - private static final long serialVersionUID = 2L; + <T> Option(IOptionType<T> parser, Supplier<T> defaultValue) { + this.parser = parser; + this.defaultValue = defaultValue; + } - @Option(name = "-cc-host", usage = "Cluster Controller host name (required unless specified in config file)", - required = false) - public String ccHost = null; + @Override + public Section section() { + switch (this) { + case NODE_ID: + return Section.LOCALNC; + default: + return Section.NC; + } + } - @Option(name = "-cc-port", usage = "Cluster Controller port (default: 1099)", required = false) - public int ccPort = 1099; + @Override + public String description() { + switch (this) { + case ADDRESS: + return "Default IP Address to bind listeners on this NC. All services will bind on this address " + + "unless a service-specific listen address is supplied."; + case CLUSTER_LISTEN_ADDRESS: + return "IP Address to bind cluster listener on this NC"; + case PUBLIC_ADDRESS: + return "Default public address that other processes should use to contact this NC. All services " + + "will advertise this address unless a service-specific public address is supplied."; + case NCSERVICE_ADDRESS: + return "Address the CC should use to contact the NCService associated with this NC"; + case NCSERVICE_PORT: + return "Port the CC should use to contact the NCService associated with this NC"; + case CLUSTER_ADDRESS: + return "Cluster Controller address (required unless specified in config file)"; + case CLUSTER_PORT: + return "Cluster Controller port"; + case CLUSTER_LISTEN_PORT: + return "IP port to bind cluster listener"; + case CLUSTER_PUBLIC_ADDRESS: + return "Public IP Address to announce cluster listener"; + case CLUSTER_PUBLIC_PORT: + return "Public IP port to announce cluster listener"; + case NODE_ID: + return "Logical name of node controller unique within the cluster (required unless specified in " + + "config file)"; + case DATA_LISTEN_ADDRESS: + return "IP Address to bind data listener"; + case DATA_LISTEN_PORT: + return "IP port to bind data listener"; + case DATA_PUBLIC_ADDRESS: + return "Public IP Address to announce data listener"; + case DATA_PUBLIC_PORT: + return "Public IP port to announce data listener"; + case RESULT_LISTEN_ADDRESS: + return "IP Address to bind dataset result distribution listener"; + case RESULT_LISTEN_PORT: + return "IP port to bind dataset result distribution listener"; + case RESULT_PUBLIC_ADDRESS: + return "Public IP Address to announce dataset result distribution listener"; + case RESULT_PUBLIC_PORT: + return "Public IP port to announce dataset result distribution listener"; + case MESSAGING_LISTEN_ADDRESS: + return "IP Address to bind messaging listener"; + case MESSAGING_LISTEN_PORT: + return "IP port to bind messaging listener"; + case MESSAGING_PUBLIC_ADDRESS: + return "Public IP Address to announce messaging listener"; + case MESSAGING_PUBLIC_PORT: + return "Public IP port to announce messaging listener"; + case CLUSTER_CONNECT_RETRIES: + return "Number of attempts to contact CC before giving up"; + case IODEVICES: + return "Comma separated list of IO Device mount points"; + case NET_THREAD_COUNT: + return "Number of threads to use for Network I/O"; + case NET_BUFFER_COUNT: + return "Number of network buffers per input/output channel"; + case RESULT_TTL: + return "Limits the amount of time results for asynchronous jobs should be retained by the system " + + "in milliseconds"; + case RESULT_SWEEP_THRESHOLD: + return "The duration within which an instance of the result cleanup should be invoked in " + + "milliseconds"; + case RESULT_MANAGER_MEMORY: + return "Memory usable for result caching at this Node Controller in bytes"; + case APP_CLASS: + return "Application NC Main Class"; + case NCSERVICE_PID: + return "PID of the NCService which launched this NCDriver"; + case COMMAND: + return "Command NCService should invoke to start the NCDriver"; + case JVM_ARGS: + return "JVM args to pass to the NCDriver"; + case VIRTUAL_NC: + return "A flag indicating if this NC is running on virtual cluster"; + default: + throw new IllegalStateException("NYI: " + this); + } + } - @Option(name = "-address", usage = "IP Address for NC (default: localhost)", required = false) - public String ipAddress = InetAddress.getLoopbackAddress().getHostAddress(); - @Option(name = "-cluster-net-ip-address", usage = "IP Address to bind cluster listener (default: same as -address)", - required = false) - public String clusterNetIPAddress; + @Override + public IOptionType type() { + return parser; + } - @Option(name = "-cluster-net-port", usage = "IP port to bind cluster listener (default: random port)", - required = false) - public int clusterNetPort = 0; + @Override + public Object defaultValue() { + return defaultValue; + } - @Option(name = "-cluster-net-public-ip-address", - usage = "Public IP Address to announce cluster listener (default: same as -cluster-net-ip-address)", - required = false) - public String clusterNetPublicIPAddress; + @Override + public boolean hidden() { + return this == VIRTUAL_NC; + } + } - @Option(name = "-cluster-net-public-port", - usage = "Public IP port to announce cluster listener (default: same as -cluster-net-port; " + - "must set -cluster-net-public-ip-address also)", required = false) - public int clusterNetPublicPort = 0; + private List<String> appArgs = new ArrayList<>(); - @Option(name = "-node-id", usage = "Logical name of node controller unique within the cluster (required unless " + - "specified in config file)", required = false) - public String nodeId = null; + private final IApplicationConfig appConfig; + private final String nodeId; - @Option(name = "-data-ip-address", usage = "IP Address to bind data listener (default: same as -address)", - required = false) - public String dataIPAddress; + public NCConfig(String nodeId) { + this(nodeId, new ConfigManager(null)); + } - @Option(name = "-data-port", usage = "IP port to bind data listener (default: random port)", required = false) - public int dataPort = 0; + public NCConfig(String nodeId, ConfigManager configManager) { + super(configManager); + this.appConfig = configManager.getNodeEffectiveConfig(nodeId); + configManager.register(Option.class); + setNodeId(nodeId); + this.nodeId = nodeId; + configManager.registerArgsListener(appArgs::addAll); + } - @Option(name = "-data-public-ip-address", - usage = "Public IP Address to announce data listener (default: same as -data-ip-address)", required = false) - public String dataPublicIPAddress; + public List<String> getAppArgs() { + return appArgs; + } - @Option(name = "-data-public-port", - usage = "Public IP port to announce data listener (default: same as -data-port; must set " + - "-data-public-ip-address also)", required = false) - public int dataPublicPort = 0; + public String[] getAppArgsArray() { + return appArgs.toArray(new String[appArgs.size()]); + } - @Option(name = "-result-ip-address", - usage = "IP Address to bind dataset result distribution listener (default: same as -address)", - required = false) - public String resultIPAddress; + public ConfigManager getConfigManager() { + return configManager; + } - @Option(name = "-result-port", - usage = "IP port to bind dataset result distribution listener (default: random port)", - required = false) - public int resultPort = 0; + public IApplicationConfig getAppConfig() { + return appConfig; + } - @Option(name = "-result-public-ip-address", - usage = "Public IP Address to announce dataset result distribution listener (default: same as " + - "-result-ip-address)", required = false) - public String resultPublicIPAddress; + public String getPublicAddress() { + return appConfig.getString(Option.PUBLIC_ADDRESS); + } + + public void setPublicAddress(String publicAddress) { + configManager.set(nodeId, Option.PUBLIC_ADDRESS, publicAddress); + } + + public String getNCServiceAddress() { + return appConfig.getString(Option.NCSERVICE_ADDRESS); + } + + public void setNCServiceAddress(String ncserviceAddress) { + configManager.set(nodeId, Option.NCSERVICE_ADDRESS, ncserviceAddress); + } + + public int getNCServicePort() { + return appConfig.getInt(Option.NCSERVICE_PORT); + } + + public void setNCServicePort(int ncservicePort) { + configManager.set(nodeId, Option.NCSERVICE_PORT, ncservicePort); + } + + public String getClusterAddress() { + return appConfig.getString(Option.CLUSTER_ADDRESS); + } + + public void setClusterAddress(String clusterAddress) { + configManager.set(nodeId, Option.CLUSTER_ADDRESS, clusterAddress); + } - @Option(name = "-result-public-port", usage = "Public IP port to announce dataset result distribution listener " + - "(default: same as -result-port; must set -result-public-ip-address also)", required = false) - public int resultPublicPort = 0; + public int getClusterPort() { + return appConfig.getInt(Option.CLUSTER_PORT); + } - @Option(name = "-retries", usage = "Number of attempts to contact CC before giving up (default: 5)") - public int retries = 5; + public void setClusterPort(int clusterPort) { + configManager.set(nodeId, Option.CLUSTER_PORT, clusterPort); + } - @Option(name = "-iodevices", - usage = "Comma separated list of IO Device mount points (default: One device in default temp folder)", - required = false) - public String ioDevices = System.getProperty("java.io.tmpdir"); + public String getClusterListenAddress() { + return appConfig.getString(Option.CLUSTER_LISTEN_ADDRESS); + } - @Option(name = "-net-thread-count", usage = "Number of threads to use for Network I/O (default: 1)") - public int nNetThreads = 1; + public void setClusterListenAddress(String clusterListenAddress) { + configManager.set(nodeId, Option.CLUSTER_LISTEN_ADDRESS, clusterListenAddress); + } - @Option(name = "-net-buffer-count", usage = "Number of network buffers per input/output channel (default: 1)", - required = false) - public int nNetBuffers = 1; + public int getClusterListenPort() { + return appConfig.getInt(Option.CLUSTER_LISTEN_PORT); + } - @Option(name = "-max-memory", usage = "Maximum memory usable at this Node Controller in bytes (default: -1 auto)") - public int maxMemory = -1; + public void setClusterListenPort(int clusterListenPort) { + configManager.set(nodeId, Option.CLUSTER_LISTEN_PORT, clusterListenPort); + } - @Option(name = "-result-time-to-live", usage = "Limits the amount of time results for asynchronous jobs should " + - "be retained by the system in milliseconds. (default: 24 hours)") - public long resultTTL = 86400000; + public String getClusterPublicAddress() { + return appConfig.getString(Option.CLUSTER_PUBLIC_ADDRESS); + } - @Option(name = "-result-sweep-threshold", usage = "The duration within which an instance of the result cleanup " + - "should be invoked in milliseconds. (default: 1 minute)") - public long resultSweepThreshold = 60000; + public void setClusterPublicAddress(String clusterPublicAddress) { + configManager.set(nodeId, Option.CLUSTER_PUBLIC_ADDRESS, clusterPublicAddress); + } - @Option(name = "-result-manager-memory", - usage = "Memory usable for result caching at this Node Controller in bytes (default: -1 auto)") - public int resultManagerMemory = -1; + public int getClusterPublicPort() { + return appConfig.getInt(Option.CLUSTER_PUBLIC_PORT); + } - @Option(name = "-app-nc-main-class", usage = "Application NC Main Class") - public String appNCMainClass; + public void setClusterPublicPort(int clusterPublicPort) { + configManager.set(nodeId, Option.CLUSTER_PUBLIC_PORT, clusterPublicPort); + } - @Option(name = "-config-file", usage = "Specify path to local configuration file (default: no local config)", - required = false) - public String configFile = null; + public String getNodeId() { + return appConfig.getString(Option.NODE_ID); + } - @Option(name = "-messaging-ip-address", usage = "IP Address to bind messaging " - + "listener (default: same as -address)", required = false) - public String messagingIPAddress; + public void setNodeId(String nodeId) { + configManager.set(nodeId, Option.NODE_ID, nodeId); + } - @Option(name = "-messaging-port", usage = "IP port to bind messaging listener " - + "(default: random port)", required = false) - public int messagingPort = 0; + public String getDataListenAddress() { + return appConfig.getString(Option.DATA_LISTEN_ADDRESS); + } - @Option(name = "-messaging-public-ip-address", usage = "Public IP Address to announce messaging" - + " listener (default: same as -messaging-ip-address)", required = false) - public String messagingPublicIPAddress; + public void setDataListenAddress(String dataListenAddress) { + configManager.set(nodeId, Option.DATA_LISTEN_ADDRESS, dataListenAddress); + } - @Option(name = "-messaging-public-port", usage = "Public IP port to announce messaging listener" - + " (default: same as -messaging-port; must set -messaging-public-port also)", required = false) - public int messagingPublicPort = 0; + public int getDataListenPort() { + return appConfig.getInt(Option.DATA_LISTEN_PORT); + } - @Option(name = "-ncservice-pid", usage = "PID of the NCService which launched this NCDriver", required = false) - public int ncservicePid = -1; + public void setDataListenPort(int dataListenPort) { + configManager.set(nodeId, Option.DATA_LISTEN_PORT, dataListenPort); + } - @Argument - @Option(name = "--", handler = StopOptionHandler.class) - public List<String> appArgs; + public String getDataPublicAddress() { + return appConfig.getString(Option.DATA_PUBLIC_ADDRESS); + } - public URL configFileUrl = null; + public void setDataPublicAddress(String dataPublicAddress) { + configManager.set(nodeId, Option.DATA_PUBLIC_ADDRESS, dataPublicAddress); + } - private transient Ini ini = null; + public int getDataPublicPort() { + return appConfig.getInt(Option.DATA_PUBLIC_PORT); + } - private void loadINIFile() throws IOException { - if (configFile != null) { - ini = IniUtils.loadINIFile(configFile); - } else if (configFileUrl != null) { - ini = IniUtils.loadINIFile(configFileUrl); - } else { - return; - } + public void setDataPublicPort(int dataPublicPort) { + configManager.set(nodeId, Option.DATA_PUBLIC_PORT, dataPublicPort); + } - // QQQ This should default to cc/address if cluster.address not set, but - // that logic really should be handled by the ini file sent from the CC - ccHost = IniUtils.getString(ini, "cc", "cluster.address", ccHost); - ccPort = IniUtils.getInt(ini, "cc", "cluster.port", ccPort); - - // Get ID of *this* NC - nodeId = IniUtils.getString(ini, "localnc", "id", nodeId); - String nodeSection = "nc/" + nodeId; - - // Network ports - ipAddress = IniUtils.getString(ini, nodeSection, "address", ipAddress); - - clusterNetIPAddress = IniUtils.getString(ini, nodeSection, "cluster.address", clusterNetIPAddress); - clusterNetPort = IniUtils.getInt(ini, nodeSection, "cluster.port", clusterNetPort); - dataIPAddress = IniUtils.getString(ini, nodeSection, "data.address", dataIPAddress); - dataPort = IniUtils.getInt(ini, nodeSection, "data.port", dataPort); - resultIPAddress = IniUtils.getString(ini, nodeSection, "result.address", resultIPAddress); - resultPort = IniUtils.getInt(ini, nodeSection, "result.port", resultPort); - - clusterNetPublicIPAddress = IniUtils.getString(ini, nodeSection, "public.cluster.address", - clusterNetPublicIPAddress); - clusterNetPublicPort = IniUtils.getInt(ini, nodeSection, "public.cluster.port", clusterNetPublicPort); - dataPublicIPAddress = IniUtils.getString(ini, nodeSection, "public.data.address", dataPublicIPAddress); - dataPublicPort = IniUtils.getInt(ini, nodeSection, "public.data.port", dataPublicPort); - resultPublicIPAddress = IniUtils.getString(ini, nodeSection, "public.result.address", resultPublicIPAddress); - resultPublicPort = IniUtils.getInt(ini, nodeSection, "public.result.port", resultPublicPort); - - messagingIPAddress = IniUtils.getString(ini, nodeSection, "messaging.address", messagingIPAddress); - messagingPort = IniUtils.getInt(ini, nodeSection, "messaging.port", messagingPort); - messagingPublicIPAddress = IniUtils.getString(ini, nodeSection, "public.messaging.address", - messagingPublicIPAddress); - messagingPublicPort = IniUtils.getInt(ini, nodeSection, "public.messaging.port", messagingPublicPort); - - retries = IniUtils.getInt(ini, nodeSection, "retries", retries); - - // Directories - ioDevices = IniUtils.getString(ini, nodeSection, "iodevices", ioDevices); - - // Hyracks client entrypoint - appNCMainClass = IniUtils.getString(ini, nodeSection, "app.class", appNCMainClass); - } - - /* - * Once all @Option fields have been loaded from command-line or otherwise - * specified programmatically, call this method to: - * 1. Load options from a config file (as specified by -config-file) - * 2. Set default values for certain derived values, such as setting - * clusterNetIpAddress to ipAddress - */ - public void loadConfigAndApplyDefaults() throws IOException { - loadINIFile(); - - // "address" is the default for all IP addresses - if (clusterNetIPAddress == null) { - clusterNetIPAddress = ipAddress; - } - if (dataIPAddress == null) { - dataIPAddress = ipAddress; - } - if (resultIPAddress == null) { - resultIPAddress = ipAddress; - } + public String getResultListenAddress() { + return appConfig.getString(Option.RESULT_LISTEN_ADDRESS); + } - // All "public" options default to their "non-public" versions - if (clusterNetPublicIPAddress == null) { - clusterNetPublicIPAddress = clusterNetIPAddress; - } - if (clusterNetPublicPort == 0) { - clusterNetPublicPort = clusterNetPort; - } - if (dataPublicIPAddress == null) { - dataPublicIPAddress = dataIPAddress; - } - if (dataPublicPort == 0) { - dataPublicPort = dataPort; - } - if (resultPublicIPAddress == null) { - resultPublicIPAddress = resultIPAddress; - } - if (resultPublicPort == 0) { - resultPublicPort = resultPort; - } + public void setResultListenAddress(String resultListenAddress) { + configManager.set(nodeId, Option.RESULT_LISTEN_ADDRESS, resultListenAddress); } - /** - * @return An IApplicationConfig representing this NCConfig. - * Note: Currently this only includes the values from the configuration - * file, not anything specified on the command-line. QQQ - */ - public IApplicationConfig getAppConfig() { - return new IniApplicationConfig(ini); - } - - public void toMap(Map<String, String> configuration) { - configuration.put("cc-host", ccHost); - configuration.put("cc-port", (String.valueOf(ccPort))); - configuration.put("cluster-net-ip-address", clusterNetIPAddress); - configuration.put("cluster-net-port", String.valueOf(clusterNetPort)); - configuration.put("cluster-net-public-ip-address", clusterNetPublicIPAddress); - configuration.put("cluster-net-public-port", String.valueOf(clusterNetPublicPort)); - configuration.put("node-id", nodeId); - configuration.put("data-ip-address", dataIPAddress); - configuration.put("data-port", String.valueOf(dataPort)); - configuration.put("data-public-ip-address", dataPublicIPAddress); - configuration.put("data-public-port", String.valueOf(dataPublicPort)); - configuration.put("result-ip-address", resultIPAddress); - configuration.put("result-port", String.valueOf(resultPort)); - configuration.put("result-public-ip-address", resultPublicIPAddress); - configuration.put("result-public-port", String.valueOf(resultPublicPort)); - configuration.put("retries", String.valueOf(retries)); - configuration.put("iodevices", ioDevices); - configuration.put("net-thread-count", String.valueOf(nNetThreads)); - configuration.put("net-buffer-count", String.valueOf(nNetBuffers)); - configuration.put("max-memory", String.valueOf(maxMemory)); - configuration.put("result-time-to-live", String.valueOf(resultTTL)); - configuration.put("result-sweep-threshold", String.valueOf(resultSweepThreshold)); - configuration.put("result-manager-memory", String.valueOf(resultManagerMemory)); - configuration.put("messaging-ip-address", messagingIPAddress); - configuration.put("messaging-port", String.valueOf(messagingPort)); - configuration.put("messaging-public-ip-address", messagingPublicIPAddress); - configuration.put("messaging-public-port", String.valueOf(messagingPublicPort)); - configuration.put("ncservice-pid", String.valueOf(ncservicePid)); - if (appNCMainClass != null) { - configuration.put("app-nc-main-class", appNCMainClass); - } + public int getResultListenPort() { + return appConfig.getInt(Option.RESULT_LISTEN_PORT); + } + + public void setResultListenPort(int resultListenPort) { + configManager.set(nodeId, Option.RESULT_LISTEN_PORT, resultListenPort); + } + + public String getResultPublicAddress() { + return appConfig.getString(Option.RESULT_PUBLIC_ADDRESS); + } + + public void setResultPublicAddress(String resultPublicAddress) { + configManager.set(nodeId, Option.RESULT_PUBLIC_ADDRESS, resultPublicAddress); + } + + public int getResultPublicPort() { + return appConfig.getInt(Option.RESULT_PUBLIC_PORT); + } + + public void setResultPublicPort(int resultPublicPort) { + configManager.set(nodeId, Option.RESULT_PUBLIC_PORT, resultPublicPort); + } + + public String getMessagingListenAddress() { + return appConfig.getString(Option.MESSAGING_LISTEN_ADDRESS); + } + + public void setMessagingListenAddress(String messagingListenAddress) { + configManager.set(nodeId, Option.MESSAGING_LISTEN_ADDRESS, messagingListenAddress); + } + + public int getMessagingListenPort() { + return appConfig.getInt(Option.MESSAGING_LISTEN_PORT); + } + + public void setMessagingListenPort(int messagingListenPort) { + configManager.set(nodeId, Option.MESSAGING_LISTEN_PORT, messagingListenPort); + } + + public String getMessagingPublicAddress() { + return appConfig.getString(Option.MESSAGING_PUBLIC_ADDRESS); + } + + public void setMessagingPublicAddress(String messagingPublicAddress) { + configManager.set(nodeId, Option.MESSAGING_PUBLIC_ADDRESS, messagingPublicAddress); + } + + public int getMessagingPublicPort() { + return appConfig.getInt(Option.MESSAGING_PUBLIC_PORT); + } + + public void setMessagingPublicPort(int messagingPublicPort) { + configManager.set(nodeId, Option.MESSAGING_PUBLIC_PORT, messagingPublicPort); + } + + public int getClusterConnectRetries() { + return appConfig.getInt(Option.CLUSTER_CONNECT_RETRIES); + } + + public void setClusterConnectRetries(int clusterConnectRetries) { + configManager.set(nodeId, Option.CLUSTER_CONNECT_RETRIES, clusterConnectRetries); + } + + public String[] getIODevices() { + return appConfig.getStringArray(Option.IODEVICES); + } + + public void setIODevices(String[] iodevices) { + configManager.set(nodeId, Option.IODEVICES, iodevices); + } + + public int getNetThreadCount() { + return appConfig.getInt(Option.NET_THREAD_COUNT); + } + + public void setNetThreadCount(int netThreadCount) { + configManager.set(nodeId, Option.NET_THREAD_COUNT, netThreadCount); + } + + public int getNetBufferCount() { + return appConfig.getInt(Option.NET_BUFFER_COUNT); + } + + public void setNetBufferCount(int netBufferCount) { + configManager.set(nodeId, Option.NET_BUFFER_COUNT, netBufferCount); + } + + public long getResultTTL() { + return appConfig.getLong(Option.RESULT_TTL); + } + + public void setResultTTL(long resultTTL) { + configManager.set(nodeId, Option.RESULT_TTL, resultTTL); + } + + public long getResultSweepThreshold() { + return appConfig.getLong(Option.RESULT_SWEEP_THRESHOLD); + } + + public void setResultSweepThreshold(long resultSweepThreshold) { + configManager.set(nodeId, Option.RESULT_SWEEP_THRESHOLD, resultSweepThreshold); + } + + public int getResultManagerMemory() { + return appConfig.getInt(Option.RESULT_MANAGER_MEMORY); + } + + public void setResultManagerMemory(int resultManagerMemory) { + configManager.set(nodeId, Option.RESULT_MANAGER_MEMORY, resultManagerMemory); + } + + public String getAppClass() { + return appConfig.getString(Option.APP_CLASS); + } + + public void setAppClass(String appClass) { + configManager.set(nodeId, Option.APP_CLASS, appClass); + } + + public int getNCServicePid() { + return appConfig.getInt(Option.NCSERVICE_PID); + } + + public void setNCServicePid(int ncservicePid) { + configManager.set(nodeId, Option.NCSERVICE_PID, ncservicePid); + } + + public boolean getVirtualNC() { + return appConfig.getBoolean(Option.VIRTUAL_NC); + } + + public void setVirtualNC(boolean virtualNC) { + configManager.set(nodeId, Option.VIRTUAL_NC, virtualNC); } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/4c7b5bfa/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/pom.xml ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/pom.xml b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/pom.xml index 148cf18..a7e3fa9 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/pom.xml +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/pom.xml @@ -60,7 +60,6 @@ <dependency> <groupId>args4j</groupId> <artifactId>args4j</artifactId> - <version>2.0.12</version> </dependency> <dependency> <groupId>org.apache.hyracks</groupId> http://git-wip-us.apache.org/repos/asf/asterixdb/blob/4c7b5bfa/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NCApplicationEntryPoint.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NCApplicationEntryPoint.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NCApplicationEntryPoint.java new file mode 100644 index 0000000..d4e67fd --- /dev/null +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NCApplicationEntryPoint.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hyracks.control.nc; + +import java.lang.management.ManagementFactory; +import java.util.Arrays; + +import org.apache.hyracks.api.application.INCApplicationContext; +import org.apache.hyracks.api.application.INCApplicationEntryPoint; +import org.apache.hyracks.api.config.IConfigManager; +import org.apache.hyracks.api.config.Section; +import org.apache.hyracks.api.job.resource.NodeCapacity; +import org.apache.hyracks.control.common.controllers.CCConfig; +import org.apache.hyracks.control.common.controllers.ControllerConfig; +import org.apache.hyracks.control.common.controllers.NCConfig; + +public class NCApplicationEntryPoint implements INCApplicationEntryPoint { + public static final NCApplicationEntryPoint INSTANCE = new NCApplicationEntryPoint(); + + protected NCApplicationEntryPoint() { + } + + @Override + public void start(INCApplicationContext ncAppCtx, String[] args) throws Exception { + if (args.length > 0) { + throw new IllegalArgumentException("Unrecognized argument(s): " + Arrays.toString(args)); + } + } + + @Override + public void notifyStartupComplete() throws Exception { + // no-op + } + + @Override + public void stop() throws Exception { + // no-op + } + + @Override + public NodeCapacity getCapacity() { + int allCores = ManagementFactory.getOperatingSystemMXBean().getAvailableProcessors(); + return new NodeCapacity(Runtime.getRuntime().maxMemory(), allCores > 1 ? allCores - 1 : allCores); + } + + @Override + public void registerConfigOptions(IConfigManager configManager) { + configManager.addIniParamOptions(ControllerConfig.Option.CONFIG_FILE, ControllerConfig.Option.CONFIG_FILE_URL); + configManager.addCmdLineSections(Section.NC, Section.COMMON, Section.LOCALNC); + configManager.setUsageFilter(getUsageFilter()); + configManager.register(ControllerConfig.Option.class, CCConfig.Option.class, NCConfig.Option.class); + } +} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/4c7b5bfa/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NCDriver.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NCDriver.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NCDriver.java index 2323d71..b52064e 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NCDriver.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NCDriver.java @@ -18,28 +18,32 @@ */ package org.apache.hyracks.control.nc; +import java.io.IOException; +import java.util.Arrays; import java.util.logging.Level; import java.util.logging.Logger; +import org.apache.hyracks.api.application.INCApplicationEntryPoint; +import org.apache.hyracks.control.common.config.ConfigManager; +import org.apache.hyracks.control.common.config.ConfigUtils; import org.apache.hyracks.control.common.controllers.NCConfig; -import org.kohsuke.args4j.CmdLineParser; +import org.kohsuke.args4j.CmdLineException; +@SuppressWarnings("InfiniteLoopStatement") public class NCDriver { private static final Logger LOGGER = Logger.getLogger(NCDriver.class.getName()); - public static void main(String args[]) throws Exception { + private NCDriver() { + } + + public static void main(String[] args) { try { - NCConfig ncConfig = new NCConfig(); - CmdLineParser cp = new CmdLineParser(ncConfig); - try { - cp.parseArgument(args); - } catch (Exception e) { - e.printStackTrace(); - cp.printUsage(System.err); - System.exit(1); - } - ncConfig.loadConfigAndApplyDefaults(); - final NodeControllerService ncService = new NodeControllerService(ncConfig); + final String nodeId = ConfigUtils.getOptionValue(args, NCConfig.Option.NODE_ID); + final ConfigManager configManager = new ConfigManager(args); + INCApplicationEntryPoint appEntryPoint = getAppEntryPoint(args); + appEntryPoint.registerConfigOptions(configManager); + NCConfig ncConfig = new NCConfig(nodeId, configManager); + final NodeControllerService ncService = new NodeControllerService(ncConfig, appEntryPoint); if (LOGGER.isLoggable(Level.SEVERE)) { LOGGER.severe("Setting uncaught exception handler " + ncService.getLifeCycleComponentManager()); } @@ -49,9 +53,20 @@ public class NCDriver { while (true) { Thread.sleep(10000); } + } catch (CmdLineException e) { + LOGGER.log(Level.FINE, "Exception parsing command line: " + Arrays.toString(args), e); + System.exit(2); } catch (Exception e) { - e.printStackTrace(); + LOGGER.log(Level.SEVERE, "Exiting NCDriver due to exception", e); System.exit(1); } } + + private static INCApplicationEntryPoint getAppEntryPoint(String[] args) + throws ClassNotFoundException, InstantiationException, IllegalAccessException, IOException { + // determine app class so that we can use the correct implementation of the configuration... + String appClassName = ConfigUtils.getOptionValue(args, NCConfig.Option.APP_CLASS); + return appClassName != null ? (INCApplicationEntryPoint) (Class.forName(appClassName)).newInstance() + : NCApplicationEntryPoint.INSTANCE; + } }
