http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1a3a8212/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java index 360975d..1ec7485 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java @@ -25,7 +25,6 @@ import java.lang.reflect.InvocationTargetException; import java.net.InetAddress; import java.net.InetSocketAddress; import java.util.ArrayList; -import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.List; @@ -37,14 +36,12 @@ import java.util.TreeMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; -import org.apache.commons.lang3.tuple.Pair; import org.apache.hyracks.api.application.ICCApplication; -import org.apache.hyracks.api.application.IClusterLifecycleListener; import org.apache.hyracks.api.client.ClusterControllerInfo; import org.apache.hyracks.api.comm.NetworkAddress; import org.apache.hyracks.api.config.IApplicationConfig; -import org.apache.hyracks.api.config.IOption; import org.apache.hyracks.api.context.ICCContext; +import org.apache.hyracks.api.control.CcId; import org.apache.hyracks.api.deployment.DeploymentId; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.api.exceptions.HyracksException; @@ -147,6 +144,8 @@ public class ClusterControllerService implements IControllerService { private ShutdownRun shutdownCallback; + private final CcId ccId; + static { ExitUtil.init(); } @@ -182,7 +181,8 @@ public class ClusterControllerService implements IControllerService { // Node manager is in charge of cluster membership management. nodeManager = new NodeManager(this, ccConfig, resourceManager); - jobIdFactory = new JobIdFactory(); + ccId = ccConfig.getCcId(); + jobIdFactory = new JobIdFactory(ccId); deployedJobSpecIdFactory = new DeployedJobSpecIdFactory(); } @@ -252,17 +252,18 @@ public class ClusterControllerService implements IControllerService { } } - private Pair<String, Integer> getNCService(String nodeId) { + private InetSocketAddress getNCService(String nodeId) { IApplicationConfig ncConfig = configManager.getNodeEffectiveConfig(nodeId); - return Pair.of(ncConfig.getString(NCConfig.Option.NCSERVICE_ADDRESS), - ncConfig.getInt(NCConfig.Option.NCSERVICE_PORT)); + final int port = ncConfig.getInt(NCConfig.Option.NCSERVICE_PORT); + return port == NCConfig.NCSERVICE_PORT_DISABLED ? null + : InetSocketAddress.createUnresolved(ncConfig.getString(NCConfig.Option.NCSERVICE_ADDRESS), port); } - private Map<String, Pair<String, Integer>> getNCServices() { - Map<String, Pair<String, Integer>> ncMap = new TreeMap<>(); + private Map<String, InetSocketAddress> getNCServices() { + Map<String, InetSocketAddress> ncMap = new TreeMap<>(); for (String ncId : configManager.getNodeNames()) { - Pair<String, Integer> ncService = getNCService(ncId); - if (ncService.getRight() != NCConfig.NCSERVICE_PORT_DISABLED) { + InetSocketAddress ncService = getNCService(ncId); + if (ncService != null) { ncMap.put(ncId, ncService); } } @@ -271,31 +272,19 @@ public class ClusterControllerService implements IControllerService { private void connectNCs() { getNCServices().forEach((key, value) -> { - final TriggerNCWork triggerWork = new TriggerNCWork(ClusterControllerService.this, value.getLeft(), - value.getRight(), key); + final TriggerNCWork triggerWork = new TriggerNCWork(ClusterControllerService.this, value.getHostString(), + value.getPort(), key); executor.submit(triggerWork); }); - serviceCtx.addClusterLifecycleListener(new IClusterLifecycleListener() { - @Override - public void notifyNodeJoin(String nodeId, Map<IOption, Object> ncConfiguration) throws HyracksException { - // no-op, we don't care - LOGGER.log(Level.WARN, "Getting notified that node: " + nodeId + " has joined. and we don't care"); - } - - @Override - public void notifyNodeFailure(Collection<String> deadNodeIds) throws HyracksException { - LOGGER.log(Level.WARN, "Getting notified that nodes: " + deadNodeIds + " has failed"); - } - }); } public boolean startNC(String nodeId) { - Pair<String, Integer> ncServiceAddress = getNCService(nodeId); + InetSocketAddress ncServiceAddress = getNCService(nodeId); if (ncServiceAddress == null) { return false; } - final TriggerNCWork startNc = new TriggerNCWork(ClusterControllerService.this, ncServiceAddress.getLeft(), - ncServiceAddress.getRight(), nodeId); + final TriggerNCWork startNc = new TriggerNCWork(ClusterControllerService.this, ncServiceAddress.getHostString(), + ncServiceAddress.getPort(), nodeId); executor.submit(startNc); return true; @@ -304,11 +293,9 @@ public class ClusterControllerService implements IControllerService { private void terminateNCServices() throws Exception { List<ShutdownNCServiceWork> shutdownNCServiceWorks = new ArrayList<>(); getNCServices().forEach((key, value) -> { - if (value.getRight() != NCConfig.NCSERVICE_PORT_DISABLED) { - ShutdownNCServiceWork shutdownWork = new ShutdownNCServiceWork(value.getLeft(), value.getRight(), key); - workQueue.schedule(shutdownWork); - shutdownNCServiceWorks.add(shutdownWork); - } + ShutdownNCServiceWork shutdownWork = new ShutdownNCServiceWork(value.getHostString(), value.getPort(), key); + workQueue.schedule(shutdownWork); + shutdownNCServiceWorks.add(shutdownWork); }); for (ShutdownNCServiceWork shutdownWork : shutdownNCServiceWorks) { shutdownWork.sync(); @@ -428,6 +415,10 @@ public class ClusterControllerService implements IControllerService { return deployedJobSpecIdFactory; } + public CcId getCcId() { + return ccId; + } + private final class ClusterControllerContext implements ICCContext { private final ClusterTopology topology;
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1a3a8212/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/cluster/NodeManager.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/cluster/NodeManager.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/cluster/NodeManager.java index 367a1d5..742e2e0 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/cluster/NodeManager.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/cluster/NodeManager.java @@ -105,7 +105,7 @@ public class NodeManager implements INodeManager { try { // TODO(mblow): it seems we should close IPC handles when we're done with them (like here) IIPCHandle ncIPCHandle = ccs.getClusterIPC().getHandle(ncState.getNodeController().getAddress()); - ncIPCHandle.send(-1, new AbortCCJobsFunction(), null); + ncIPCHandle.send(-1, new AbortCCJobsFunction(ccConfig.getCcId()), null); } catch (IPCException e) { throw HyracksDataException.create(e); } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1a3a8212/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RegisterNodeWork.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RegisterNodeWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RegisterNodeWork.java index 3a38287..04a34af 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RegisterNodeWork.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RegisterNodeWork.java @@ -57,7 +57,7 @@ public class RegisterNodeWork extends SynchronizableWork { try { LOGGER.log(Level.WARN, "Registering INodeController: id = " + id); NodeControllerRemoteProxy nc = - new NodeControllerRemoteProxy( + new NodeControllerRemoteProxy(ccs.getCcId(), ccs.getClusterIPC().getReconnectingHandle(reg.getNodeControllerAddress())); NodeControllerState state = new NodeControllerState(nc, reg); INodeManager nodeManager = ccs.getNodeManager(); @@ -73,7 +73,6 @@ public class RegisterNodeWork extends SynchronizableWork { params.setHeartbeatPeriod(ccs.getCCConfig().getHeartbeatPeriodMillis()); params.setProfileDumpPeriod(ccs.getCCConfig().getProfileDumpPeriod()); result = new CCNCFunctions.NodeRegistrationResult(params, null); - ccs.getJobIdFactory().ensureMinimumId(reg.getMaxJobId() + 1); } catch (Exception e) { LOGGER.log(Level.WARN, "Node registration failed", e); result = new CCNCFunctions.NodeRegistrationResult(null, e); http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1a3a8212/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/WaitForJobCompletionWork.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/WaitForJobCompletionWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/WaitForJobCompletionWork.java index f1d9a4d..53998aa 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/WaitForJobCompletionWork.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/WaitForJobCompletionWork.java @@ -63,17 +63,15 @@ public class WaitForJobCompletionWork extends SynchronizableWork { List<Exception> exceptions; if (exceptionHistory == null) { // couldn't be found - long maxJobId = ccs.getJobIdFactory().maxJobId(); - exceptions = Collections.singletonList(jobId.getId() <= maxJobId - ? HyracksDataException.create(ErrorCode.JOB_HAS_BEEN_CLEARED_FROM_HISTORY, jobId) - : HyracksDataException.create(ErrorCode.JOB_HAS_NOT_BEEN_CREATED_YET, jobId)); + exceptions = Collections + .singletonList(HyracksDataException.create(ErrorCode.JOB_HAS_BEEN_CLEARED_FROM_HISTORY, jobId)); } else { exceptions = exceptionHistory; } ccs.getExecutor().execute(() -> { if (!exceptions.isEmpty()) { - /** + /* * only report the first exception because IResultCallback will only throw one exception * anyway */ http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1a3a8212/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/IClusterController.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/IClusterController.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/IClusterController.java index 6fd321e..2307185 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/IClusterController.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/IClusterController.java @@ -21,6 +21,7 @@ package org.apache.hyracks.control.common.base; import java.util.List; import org.apache.hyracks.api.comm.NetworkAddress; +import org.apache.hyracks.api.control.CcId; import org.apache.hyracks.api.dataflow.TaskAttemptId; import org.apache.hyracks.api.dataset.ResultSetId; import org.apache.hyracks.api.deployment.DeploymentId; @@ -35,42 +36,44 @@ import org.apache.hyracks.control.common.job.profiling.om.JobProfile; import org.apache.hyracks.control.common.job.profiling.om.TaskProfile; public interface IClusterController { - public void registerNode(NodeRegistration reg) throws Exception; + void registerNode(NodeRegistration reg) throws Exception; - public void unregisterNode(String nodeId) throws Exception; + void unregisterNode(String nodeId) throws Exception; - public void notifyTaskComplete(JobId jobId, TaskAttemptId taskId, String nodeId, TaskProfile statistics) + void notifyTaskComplete(JobId jobId, TaskAttemptId taskId, String nodeId, TaskProfile statistics) throws Exception; - public void notifyTaskFailure(JobId jobId, TaskAttemptId taskId, String nodeId, List<Exception> exceptions) + void notifyTaskFailure(JobId jobId, TaskAttemptId taskId, String nodeId, List<Exception> exceptions) throws Exception; - public void notifyDeployedJobSpecFailure(DeployedJobSpecId deployedJobSpecId, String nodeId) throws Exception; + void notifyDeployedJobSpecFailure(DeployedJobSpecId deployedJobSpecId, String nodeId) throws Exception; - public void notifyJobletCleanup(JobId jobId, String nodeId) throws Exception; + void notifyJobletCleanup(JobId jobId, String nodeId) throws Exception; - public void notifyDeployBinary(DeploymentId deploymentId, String nodeId, DeploymentStatus status) throws Exception; + void notifyDeployBinary(DeploymentId deploymentId, String nodeId, DeploymentStatus status) throws Exception; - public void notifyStateDump(String nodeId, String stateDumpId, String state) throws Exception; + void notifyStateDump(String nodeId, String stateDumpId, String state) throws Exception; - public void notifyShutdown(String nodeId) throws Exception; + void notifyShutdown(String nodeId) throws Exception; - public void nodeHeartbeat(String id, HeartbeatData hbData) throws Exception; + void nodeHeartbeat(String id, HeartbeatData hbData) throws Exception; - public void reportProfile(String id, List<JobProfile> profiles) throws Exception; + void reportProfile(String id, List<JobProfile> profiles) throws Exception; - public void registerPartitionProvider(PartitionDescriptor partitionDescriptor) throws Exception; + void registerPartitionProvider(PartitionDescriptor partitionDescriptor) throws Exception; - public void registerPartitionRequest(PartitionRequest partitionRequest) throws Exception; + void registerPartitionRequest(PartitionRequest partitionRequest) throws Exception; - public void sendApplicationMessageToCC(byte[] data, DeploymentId deploymentId, String nodeId) throws Exception; + void sendApplicationMessageToCC(byte[] data, DeploymentId deploymentId, String nodeId) throws Exception; - public void registerResultPartitionLocation(JobId jobId, ResultSetId rsId, boolean orderedResult, + void registerResultPartitionLocation(JobId jobId, ResultSetId rsId, boolean orderedResult, boolean emptyResult, int partition, int nPartitions, NetworkAddress networkAddress) throws Exception; - public void reportResultPartitionWriteCompletion(JobId jobId, ResultSetId rsId, int partition) throws Exception; + void reportResultPartitionWriteCompletion(JobId jobId, ResultSetId rsId, int partition) throws Exception; - public void getNodeControllerInfos() throws Exception; + void getNodeControllerInfos() throws Exception; - public void notifyThreadDump(String nodeId, String requestId, String threadDumpJSON) throws Exception; + void notifyThreadDump(String nodeId, String requestId, String threadDumpJSON) throws Exception; + + CcId getCcId(); } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1a3a8212/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/INodeController.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/INodeController.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/INodeController.java index 5d781cf..ef3b27c 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/INodeController.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/INodeController.java @@ -36,30 +36,30 @@ import org.apache.hyracks.api.partitions.PartitionId; import org.apache.hyracks.control.common.job.TaskAttemptDescriptor; public interface INodeController { - public void startTasks(DeploymentId deploymentId, JobId jobId, byte[] planBytes, - List<TaskAttemptDescriptor> taskDescriptors, Map<ConnectorDescriptorId, IConnectorPolicy> connectorPolicies, - Set<JobFlag> flags, Map<byte[], byte[]> jobParameters, DeployedJobSpecId deployedJobSpecId) + void startTasks(DeploymentId deploymentId, JobId jobId, byte[] planBytes, + List<TaskAttemptDescriptor> taskDescriptors, Map<ConnectorDescriptorId, IConnectorPolicy> connectorPolicies, + Set<JobFlag> flags, Map<byte[], byte[]> jobParameters, DeployedJobSpecId deployedJobSpecId) throws Exception; - public void abortTasks(JobId jobId, List<TaskAttemptId> tasks) throws Exception; + void abortTasks(JobId jobId, List<TaskAttemptId> tasks) throws Exception; - public void cleanUpJoblet(JobId jobId, JobStatus status) throws Exception; + void cleanUpJoblet(JobId jobId, JobStatus status) throws Exception; - public void reportPartitionAvailability(PartitionId pid, NetworkAddress networkAddress) throws Exception; + void reportPartitionAvailability(PartitionId pid, NetworkAddress networkAddress) throws Exception; - public void deployBinary(DeploymentId deploymentId, List<URL> url) throws Exception; + void deployBinary(DeploymentId deploymentId, List<URL> url) throws Exception; - public void undeployBinary(DeploymentId deploymentId) throws Exception; + void undeployBinary(DeploymentId deploymentId) throws Exception; - public void deployJobSpec(DeployedJobSpecId deployedJobSpecId, byte[] planBytes) throws Exception; + void deployJobSpec(DeployedJobSpecId deployedJobSpecId, byte[] planBytes) throws Exception; - public void undeployJobSpec(DeployedJobSpecId deployedJobSpecId) throws Exception; + void undeployJobSpec(DeployedJobSpecId deployedJobSpecId) throws Exception; - public void dumpState(String stateDumpId) throws Exception; + void dumpState(String stateDumpId) throws Exception; - public void shutdown(boolean terminateNCService) throws Exception; + void shutdown(boolean terminateNCService) throws Exception; - public void sendApplicationMessageToNC(byte[] data, DeploymentId deploymentId, String nodeId) throws Exception; + void sendApplicationMessageToNC(byte[] data, DeploymentId deploymentId, String nodeId) throws Exception; - public void takeThreadDump(String requestId) throws Exception; + void takeThreadDump(String requestId) throws Exception; } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1a3a8212/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/config/OptionTypes.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/config/OptionTypes.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/config/OptionTypes.java index 62e6ee0..42ed1e7 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/config/OptionTypes.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/config/OptionTypes.java @@ -81,6 +81,27 @@ public class OptionTypes { } }; + public static final IOptionType<Short> SHORT = new IOptionType<Short>() { + @Override + public Short parse(String s) { + int value = Integer.decode(s); + if (Integer.highestOneBit(value) > 16) { + throw new IllegalArgumentException("The given value " + s + " is too big for a short"); + } + return (short)value; + } + + @Override + public Class<Short> targetType() { + return Short.class; + } + + @Override + public void serializeJSONField(String fieldName, Object value, ObjectNode node) { + node.put(fieldName, (short)value); + } + }; + public static final IOptionType<Integer> INTEGER = new IOptionType<Integer>() { @Override public Integer parse(String s) { http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1a3a8212/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/CCConfig.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/CCConfig.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/CCConfig.java index 470e87c..85731b6 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/CCConfig.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/CCConfig.java @@ -21,6 +21,7 @@ package org.apache.hyracks.control.common.controllers; import static org.apache.hyracks.control.common.config.OptionTypes.BOOLEAN; import static org.apache.hyracks.control.common.config.OptionTypes.INTEGER; import static org.apache.hyracks.control.common.config.OptionTypes.LONG; +import static org.apache.hyracks.control.common.config.OptionTypes.SHORT; import static org.apache.hyracks.control.common.config.OptionTypes.STRING; import java.io.File; @@ -33,6 +34,7 @@ import org.apache.hyracks.api.config.IApplicationConfig; import org.apache.hyracks.api.config.IOption; import org.apache.hyracks.api.config.IOptionType; import org.apache.hyracks.api.config.Section; +import org.apache.hyracks.api.control.CcId; import org.apache.hyracks.control.common.config.ConfigManager; import org.apache.hyracks.util.file.FileUtil; import org.ini4j.Ini; @@ -67,7 +69,8 @@ public class CCConfig extends ControllerConfig { JOB_QUEUE_CAPACITY(INTEGER, 4096), JOB_MANAGER_CLASS(STRING, "org.apache.hyracks.control.cc.job.JobManager"), ENFORCE_FRAME_WRITER_PROTOCOL(BOOLEAN, false), - CORES_MULTIPLIER(INTEGER, 3); + CORES_MULTIPLIER(INTEGER, 3), + CONTROLLER_ID(SHORT, (short)0x0000); private final IOptionType parser; private Object defaultValue; @@ -164,6 +167,8 @@ public class CCConfig extends ControllerConfig { + "bad behaving operators"; case CORES_MULTIPLIER: return "Specifies the multiplier to use on the cluster available cores"; + case CONTROLLER_ID: + return "The 16-bit (0-65535) id of this Cluster Controller"; default: throw new IllegalStateException("NYI: " + this); } @@ -374,4 +379,8 @@ public class CCConfig extends ControllerConfig { public int getCoresMultiplier() { return getAppConfig().getInt(Option.CORES_MULTIPLIER); } + + public CcId getCcId() { + return CcId.valueOf(getAppConfig().getShort(Option.CONTROLLER_ID)); + } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1a3a8212/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/NCConfig.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/NCConfig.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/NCConfig.java index 0a5ba30..95c063f 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/NCConfig.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/NCConfig.java @@ -21,6 +21,7 @@ package org.apache.hyracks.control.common.controllers; import static org.apache.hyracks.control.common.config.OptionTypes.INTEGER; import static org.apache.hyracks.control.common.config.OptionTypes.INTEGER_BYTE_UNIT; import static org.apache.hyracks.control.common.config.OptionTypes.LONG; +import static org.apache.hyracks.control.common.config.OptionTypes.SHORT; import static org.apache.hyracks.control.common.config.OptionTypes.STRING; import static org.apache.hyracks.control.common.config.OptionTypes.STRING_ARRAY; @@ -33,6 +34,7 @@ import org.apache.hyracks.api.config.IApplicationConfig; import org.apache.hyracks.api.config.IOption; import org.apache.hyracks.api.config.IOptionType; import org.apache.hyracks.api.config.Section; +import org.apache.hyracks.api.control.CcId; import org.apache.hyracks.control.common.config.ConfigManager; import org.apache.hyracks.util.file.FileUtil; @@ -48,6 +50,7 @@ public class NCConfig extends ControllerConfig { NCSERVICE_PORT(INTEGER, 9090), CLUSTER_ADDRESS(STRING, (String) null), CLUSTER_PORT(INTEGER, 1099), + CLUSTER_CONTROLLER_ID(SHORT, (short)0x0000), CLUSTER_PUBLIC_ADDRESS(STRING, PUBLIC_ADDRESS), CLUSTER_PUBLIC_PORT(INTEGER, CLUSTER_LISTEN_PORT), NODE_ID(STRING, (String) null), @@ -141,6 +144,8 @@ public class NCConfig extends ControllerConfig { return "Cluster Controller port"; case CLUSTER_LISTEN_PORT: return "IP port to bind cluster listener"; + case CLUSTER_CONTROLLER_ID: + return "16-bit (0-65535) id of the Cluster Controller"; case CLUSTER_PUBLIC_ADDRESS: return "Public IP Address to announce cluster listener"; case CLUSTER_PUBLIC_PORT: @@ -308,6 +313,10 @@ public class NCConfig extends ControllerConfig { configManager.set(nodeId, Option.CLUSTER_PORT, clusterPort); } + public CcId getClusterControllerId() { + return CcId.valueOf(appConfig.getShort(Option.CLUSTER_CONTROLLER_ID)); + } + public String getClusterListenAddress() { return appConfig.getString(Option.CLUSTER_LISTEN_ADDRESS); } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1a3a8212/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/CCNCFunctions.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/CCNCFunctions.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/CCNCFunctions.java index dca8c07..5c6d078 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/CCNCFunctions.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/CCNCFunctions.java @@ -38,6 +38,7 @@ import java.util.Set; import org.apache.hyracks.api.client.NodeControllerInfo; import org.apache.hyracks.api.comm.NetworkAddress; +import org.apache.hyracks.api.control.CcId; import org.apache.hyracks.api.dataflow.ActivityId; import org.apache.hyracks.api.dataflow.ConnectorDescriptorId; import org.apache.hyracks.api.dataflow.OperatorDescriptorId; @@ -150,12 +151,25 @@ public class CCNCFunctions { } - public static abstract class Function implements Serializable { + public abstract static class Function implements Serializable { private static final long serialVersionUID = 1L; public abstract FunctionId getFunctionId(); } + public abstract static class CCIdentifiedFunction extends Function { + private static final long serialVersionUID = 1L; + private final CcId ccId; + + protected CCIdentifiedFunction(CcId ccId) { + this.ccId = ccId; + } + + public CcId getCcId() { + return ccId; + } + } + public static class RegisterNodeFunction extends Function { private static final long serialVersionUID = 1L; @@ -668,24 +682,33 @@ public class CCNCFunctions { } } - //TODO: Add CC id to this job to only abort jobs by this CC: https://issues.apache.org/jira/browse/ASTERIXDB-2110 public static class AbortCCJobsFunction extends Function { private static final long serialVersionUID = 1L; + private final CcId ccId; + + public AbortCCJobsFunction(CcId ccId) { + this.ccId = ccId; + } @Override public FunctionId getFunctionId() { return FunctionId.ABORT_ALL_JOBS; } + + public CcId getCcId() { + return ccId; + } } - public static class DeployJobSpecFunction extends Function { + public static class DeployJobSpecFunction extends CCIdentifiedFunction { private static final long serialVersionUID = 1L; private final DeployedJobSpecId deployedJobSpecId; private final byte[] acgBytes; - public DeployJobSpecFunction(DeployedJobSpecId deployedJobSpecId, byte[] acgBytes) { + public DeployJobSpecFunction(DeployedJobSpecId deployedJobSpecId, byte[] acgBytes, CcId ccId) { + super(ccId); this.deployedJobSpecId = deployedJobSpecId; this.acgBytes = acgBytes; } @@ -704,12 +727,13 @@ public class CCNCFunctions { } } - public static class UndeployJobSpecFunction extends Function { + public static class UndeployJobSpecFunction extends CCIdentifiedFunction { private static final long serialVersionUID = 1L; private final DeployedJobSpecId deployedJobSpecId; - public UndeployJobSpecFunction(DeployedJobSpecId deployedJobSpecId) { + public UndeployJobSpecFunction(DeployedJobSpecId deployedJobSpecId, CcId ccId) { + super(ccId); this.deployedJobSpecId = deployedJobSpecId; } @@ -724,7 +748,7 @@ public class CCNCFunctions { } public static class StartTasksFunction extends Function { - private static final long serialVersionUID = 1L; + private static final long serialVersionUID = 2L; private final DeploymentId deploymentId; private final JobId jobId; @@ -1008,11 +1032,12 @@ public class CCNCFunctions { } } - public static class ThreadDumpRequestFunction extends Function { + public static class ThreadDumpRequestFunction extends CCIdentifiedFunction { private static final long serialVersionUID = 1L; private final String requestId; - public ThreadDumpRequestFunction(String requestId) { + public ThreadDumpRequestFunction(String requestId, CcId ccId) { + super(ccId); this.requestId = requestId; } @@ -1106,13 +1131,14 @@ public class CCNCFunctions { } } - public static class DeployBinaryFunction extends Function { + public static class DeployBinaryFunction extends CCIdentifiedFunction { private static final long serialVersionUID = 1L; private final List<URL> binaryURLs; private final DeploymentId deploymentId; - public DeployBinaryFunction(DeploymentId deploymentId, List<URL> binaryURLs) { + public DeployBinaryFunction(DeploymentId deploymentId, List<URL> binaryURLs, CcId ccId) { + super(ccId); this.binaryURLs = binaryURLs; this.deploymentId = deploymentId; } @@ -1131,12 +1157,13 @@ public class CCNCFunctions { } } - public static class UnDeployBinaryFunction extends Function { + public static class UnDeployBinaryFunction extends CCIdentifiedFunction { private static final long serialVersionUID = 1L; private final DeploymentId deploymentId; - public UnDeployBinaryFunction(DeploymentId deploymentId) { + public UnDeployBinaryFunction(DeploymentId deploymentId, CcId ccId) { + super(ccId); this.deploymentId = deploymentId; } @@ -1211,12 +1238,13 @@ public class CCNCFunctions { } } - public static class StateDumpRequestFunction extends Function { + public static class StateDumpRequestFunction extends CCIdentifiedFunction { private static final long serialVersionUID = 1L; private final String stateDumpId; - public StateDumpRequestFunction(String stateDumpId) { + public StateDumpRequestFunction(String stateDumpId, CcId ccId) { + super(ccId); this.stateDumpId = stateDumpId; } @@ -1265,12 +1293,13 @@ public class CCNCFunctions { } } - public static class ShutdownRequestFunction extends Function { + public static class ShutdownRequestFunction extends CCIdentifiedFunction { private static final long serialVersionUID = 1L; private final boolean terminateNCService; - public ShutdownRequestFunction(boolean terminateNCService) { + public ShutdownRequestFunction(boolean terminateNCService, CcId ccId) { + super(ccId); this.terminateNCService = terminateNCService; } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1a3a8212/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java index f2e7d87..e4e2dbe 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java @@ -21,6 +21,7 @@ package org.apache.hyracks.control.common.ipc; import java.util.List; import org.apache.hyracks.api.comm.NetworkAddress; +import org.apache.hyracks.api.control.CcId; import org.apache.hyracks.api.dataflow.TaskAttemptId; import org.apache.hyracks.api.dataset.ResultSetId; import org.apache.hyracks.api.deployment.DeploymentId; @@ -56,9 +57,11 @@ import org.apache.hyracks.ipc.api.IIPCHandle; public class ClusterControllerRemoteProxy implements IClusterController { + private final CcId ccId; private IIPCHandle ipcHandle; - public ClusterControllerRemoteProxy(IIPCHandle ipcHandle) { + public ClusterControllerRemoteProxy(CcId ccId, IIPCHandle ipcHandle) { + this.ccId = ccId; this.ipcHandle = ipcHandle; } @@ -181,4 +184,14 @@ public class ClusterControllerRemoteProxy implements IClusterController { threadDumpJSON); ipcHandle.send(-1, tdrf, null); } + + @Override + public CcId getCcId() { + return ccId; + } + + @Override + public String toString() { + return getClass().getSimpleName() + " " + ccId + " [" + ipcHandle.getRemoteAddress() + "]"; + } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1a3a8212/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/NodeControllerRemoteProxy.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/NodeControllerRemoteProxy.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/NodeControllerRemoteProxy.java index b6b9b4b..a09a8bc 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/NodeControllerRemoteProxy.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/NodeControllerRemoteProxy.java @@ -25,6 +25,7 @@ import java.util.Map; import java.util.Set; import org.apache.hyracks.api.comm.NetworkAddress; +import org.apache.hyracks.api.control.CcId; import org.apache.hyracks.api.dataflow.ConnectorDescriptorId; import org.apache.hyracks.api.dataflow.TaskAttemptId; import org.apache.hyracks.api.dataflow.connectors.IConnectorPolicy; @@ -51,9 +52,11 @@ import org.apache.hyracks.control.common.job.TaskAttemptDescriptor; import org.apache.hyracks.ipc.api.IIPCHandle; public class NodeControllerRemoteProxy implements INodeController { + private final CcId ccId; private final IIPCHandle ipcHandle; - public NodeControllerRemoteProxy(IIPCHandle ipcHandle) { + public NodeControllerRemoteProxy(CcId ccId, IIPCHandle ipcHandle) { + this.ccId = ccId; this.ipcHandle = ipcHandle; } @@ -88,37 +91,37 @@ public class NodeControllerRemoteProxy implements INodeController { @Override public void deployBinary(DeploymentId deploymentId, List<URL> binaryURLs) throws Exception { - DeployBinaryFunction rpaf = new DeployBinaryFunction(deploymentId, binaryURLs); + DeployBinaryFunction rpaf = new DeployBinaryFunction(deploymentId, binaryURLs, ccId); ipcHandle.send(-1, rpaf, null); } @Override public void undeployBinary(DeploymentId deploymentId) throws Exception { - UnDeployBinaryFunction rpaf = new UnDeployBinaryFunction(deploymentId); + UnDeployBinaryFunction rpaf = new UnDeployBinaryFunction(deploymentId, ccId); ipcHandle.send(-1, rpaf, null); } @Override public void deployJobSpec(DeployedJobSpecId deployedJobSpecId, byte[] planBytes) throws Exception { - DeployJobSpecFunction fn = new DeployJobSpecFunction(deployedJobSpecId, planBytes); + DeployJobSpecFunction fn = new DeployJobSpecFunction(deployedJobSpecId, planBytes, ccId); ipcHandle.send(-1, fn, null); } @Override public void undeployJobSpec(DeployedJobSpecId deployedJobSpecId) throws Exception { - UndeployJobSpecFunction fn = new UndeployJobSpecFunction(deployedJobSpecId); + UndeployJobSpecFunction fn = new UndeployJobSpecFunction(deployedJobSpecId, ccId); ipcHandle.send(-1, fn, null); } @Override public void dumpState(String stateDumpId) throws Exception { - StateDumpRequestFunction dsf = new StateDumpRequestFunction(stateDumpId); + StateDumpRequestFunction dsf = new StateDumpRequestFunction(stateDumpId, ccId); ipcHandle.send(-1, dsf, null); } @Override public void shutdown(boolean terminateNCService) throws Exception { - ShutdownRequestFunction sdrf = new ShutdownRequestFunction(terminateNCService); + ShutdownRequestFunction sdrf = new ShutdownRequestFunction(terminateNCService, ccId); ipcHandle.send(-1, sdrf, null); } @@ -131,7 +134,7 @@ public class NodeControllerRemoteProxy implements INodeController { @Override public void takeThreadDump(String requestId) throws Exception { - ThreadDumpRequestFunction fn = new ThreadDumpRequestFunction(requestId); + ThreadDumpRequestFunction fn = new ThreadDumpRequestFunction(requestId, ccId); ipcHandle.send(-1, fn, null); } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1a3a8212/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/BaseNCApplication.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/BaseNCApplication.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/BaseNCApplication.java index 94e86dd..9670e42 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/BaseNCApplication.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/BaseNCApplication.java @@ -25,6 +25,7 @@ import org.apache.hyracks.api.application.INCApplication; import org.apache.hyracks.api.application.IServiceContext; import org.apache.hyracks.api.config.IConfigManager; import org.apache.hyracks.api.config.Section; +import org.apache.hyracks.api.control.CcId; import org.apache.hyracks.api.io.IFileDeviceResolver; import org.apache.hyracks.api.job.resource.NodeCapacity; import org.apache.hyracks.control.common.controllers.CCConfig; @@ -58,7 +59,7 @@ public class BaseNCApplication implements INCApplication { } @Override - public void onRegisterNode() throws Exception { + public void onRegisterNode(CcId ccId) throws Exception { // no-op } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1a3a8212/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Joblet.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Joblet.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Joblet.java index 8cb33ca..8790434 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Joblet.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Joblet.java @@ -222,15 +222,10 @@ public class Joblet implements IHyracksJobletContext, ICounterContext { public void close() { long stillAllocated = memoryAllocation.get(); if (stillAllocated > 0) { - LOGGER.warn("Freeing leaked " + stillAllocated + " bytes"); + LOGGER.info(() -> "Freeing leaked " + stillAllocated + " bytes"); serviceCtx.getMemoryManager().deallocate(stillAllocated); } - nodeController.getExecutor().execute(new Runnable() { - @Override - public void run() { - deallocatableRegistry.close(); - } - }); + nodeController.getExecutor().execute(() -> deallocatableRegistry.close()); } ByteBuffer allocateFrame() throws HyracksDataException { @@ -298,7 +293,7 @@ public class Joblet implements IHyracksJobletContext, ICounterContext { for (PartitionId pid : pids) { partitionRequestMap.put(pid, collector); PartitionRequest req = new PartitionRequest(pid, nodeController.getId(), taId, minState); - nodeController.getClusterController().registerPartitionRequest(req); + nodeController.getClusterController(jobId.getCcId()).registerPartitionRequest(req); } } @@ -326,7 +321,7 @@ public class Joblet implements IHyracksJobletContext, ICounterContext { close(); cleanupPending = false; try { - nodeController.getClusterController().notifyJobletCleanup(jobId, nodeController.getId()); + nodeController.getClusterController(jobId.getCcId()).notifyJobletCleanup(jobId, nodeController.getId()); } catch (Exception e) { e.printStackTrace(); } @@ -341,4 +336,5 @@ public class Joblet implements IHyracksJobletContext, ICounterContext { public ClassLoader getClassLoader() throws HyracksException { return DeploymentUtils.getClassLoader(deploymentId, serviceCtx); } + } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1a3a8212/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerIPCI.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerIPCI.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerIPCI.java index b220039..f55e250 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerIPCI.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerIPCI.java @@ -72,7 +72,8 @@ final class NodeControllerIPCI implements IIPCI { ncs.getWorkQueue().schedule(new AbortTasksWork(ncs, atf.getJobId(), atf.getTasks())); return; case ABORT_ALL_JOBS: - ncs.getWorkQueue().schedule(new AbortAllJobsWork(ncs)); + CCNCFunctions.AbortCCJobsFunction aajf = (CCNCFunctions.AbortCCJobsFunction) fn; + ncs.getWorkQueue().schedule(new AbortAllJobsWork(ncs, aajf.getCcId())); return; case CLEANUP_JOBLET: CCNCFunctions.CleanupJobletFunction cjf = (CCNCFunctions.CleanupJobletFunction) fn; @@ -97,27 +98,29 @@ final class NodeControllerIPCI implements IIPCI { case DEPLOY_BINARY: CCNCFunctions.DeployBinaryFunction dbf = (CCNCFunctions.DeployBinaryFunction) fn; - ncs.getWorkQueue().schedule(new DeployBinaryWork(ncs, dbf.getDeploymentId(), dbf.getBinaryURLs())); + ncs.getWorkQueue() + .schedule(new DeployBinaryWork(ncs, dbf.getDeploymentId(), dbf.getBinaryURLs(), dbf.getCcId())); return; case UNDEPLOY_BINARY: CCNCFunctions.UnDeployBinaryFunction ndbf = (CCNCFunctions.UnDeployBinaryFunction) fn; - ncs.getWorkQueue().schedule(new UnDeployBinaryWork(ncs, ndbf.getDeploymentId())); + ncs.getWorkQueue().schedule(new UnDeployBinaryWork(ncs, ndbf.getDeploymentId(), ndbf.getCcId())); return; case DISTRIBUTE_JOB: CCNCFunctions.DeployJobSpecFunction djf = (CCNCFunctions.DeployJobSpecFunction) fn; - ncs.getWorkQueue().schedule(new DeployJobSpecWork(ncs, djf.getDeployedJobSpecId(), djf.getacgBytes())); + ncs.getWorkQueue().schedule( + new DeployJobSpecWork(ncs, djf.getDeployedJobSpecId(), djf.getacgBytes(), djf.getCcId())); return; case DESTROY_JOB: CCNCFunctions.UndeployJobSpecFunction dsjf = (CCNCFunctions.UndeployJobSpecFunction) fn; - ncs.getWorkQueue().schedule(new UndeployJobSpecWork(ncs, dsjf.getDeployedJobSpecId())); + ncs.getWorkQueue().schedule(new UndeployJobSpecWork(ncs, dsjf.getDeployedJobSpecId(), dsjf.getCcId())); return; case STATE_DUMP_REQUEST: final CCNCFunctions.StateDumpRequestFunction dsrf = (StateDumpRequestFunction) fn; - ncs.getWorkQueue().schedule(new StateDumpWork(ncs, dsrf.getStateDumpId())); + ncs.getWorkQueue().schedule(new StateDumpWork(ncs, dsrf.getStateDumpId(), dsrf.getCcId())); return; case SHUTDOWN_REQUEST: @@ -127,7 +130,7 @@ final class NodeControllerIPCI implements IIPCI { case THREAD_DUMP_REQUEST: final CCNCFunctions.ThreadDumpRequestFunction tdrf = (CCNCFunctions.ThreadDumpRequestFunction) fn; - ncs.getExecutor().submit(new ThreadDumpTask(ncs, tdrf.getRequestId())); + ncs.getExecutor().submit(new ThreadDumpTask(ncs, tdrf.getRequestId(), tdrf.getCcId())); return; default: http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1a3a8212/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java index 18a6b20..24d72f8 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java @@ -29,12 +29,14 @@ import java.lang.management.RuntimeMXBean; import java.lang.management.ThreadMXBean; import java.net.InetSocketAddress; import java.util.Arrays; +import java.util.Collections; import java.util.HashMap; import java.util.Hashtable; import java.util.List; import java.util.Map; import java.util.Timer; import java.util.TimerTask; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Semaphore; @@ -46,6 +48,7 @@ import org.apache.commons.lang3.mutable.MutableObject; import org.apache.hyracks.api.application.INCApplication; import org.apache.hyracks.api.client.NodeControllerInfo; import org.apache.hyracks.api.comm.NetworkAddress; +import org.apache.hyracks.api.control.CcId; import org.apache.hyracks.api.dataset.IDatasetPartitionManager; import org.apache.hyracks.api.deployment.DeploymentId; import org.apache.hyracks.api.exceptions.ErrorCode; @@ -90,6 +93,7 @@ import org.apache.hyracks.ipc.impl.IPCSystem; import org.apache.hyracks.net.protocols.muxdemux.FullFrameChannelInterfaceFactory; import org.apache.hyracks.net.protocols.muxdemux.MuxDemuxPerformanceCounters; import org.apache.hyracks.util.ExitUtil; +import org.apache.hyracks.util.InvokeUtil; import org.apache.hyracks.util.PidHelper; import org.apache.hyracks.util.trace.ITracer; import org.apache.hyracks.util.trace.Tracer; @@ -128,7 +132,9 @@ public class NodeControllerService implements IControllerService { private Exception registrationException; - private IClusterController ccs; + private IClusterController primaryCcs; + + private final Map<CcId, IClusterController> ccsMap = Collections.synchronizedMap(new HashMap<>()); private final Map<JobId, Joblet> jobletMap; @@ -140,7 +146,9 @@ public class NodeControllerService implements IControllerService { private NodeParameters nodeParameters; - private Thread heartbeatThread; + private Map<IClusterController, Thread> heartbeatThreads = new ConcurrentHashMap<>(); + + private Map<IClusterController, Timer> ccTimers = new ConcurrentHashMap<>(); private final ServerContext serverCtx; @@ -180,6 +188,8 @@ public class NodeControllerService implements IControllerService { ExitUtil.init(); } + private NCShutdownHook ncShutdownHook; + public NodeControllerService(NCConfig config) throws Exception { this(config, getApplication(config)); } @@ -201,13 +211,14 @@ public class NodeControllerService implements IControllerService { LOGGER.info("Setting uncaught exception handler " + getLifeCycleComponentManager()); } // Set shutdown hook before so it doesn't have the same uncaught exception handler - Runtime.getRuntime().addShutdownHook(new NCShutdownHook(this)); + ncShutdownHook = new NCShutdownHook(this); + Runtime.getRuntime().addShutdownHook(ncShutdownHook); Thread.currentThread().setUncaughtExceptionHandler(getLifeCycleComponentManager()); ioManager = new IOManager(IODeviceHandle.getDevices(ncConfig.getIODevices()), application.getFileDeviceResolver()); workQueue = new WorkQueue(id, Thread.NORM_PRIORITY); // Reserves MAX_PRIORITY of the heartbeat thread. - jobletMap = new Hashtable<>(); + jobletMap = new ConcurrentHashMap<>(); deployedJobSpecActivityClusterGraphMap = new Hashtable<>(); timer = new Timer(true); serverCtx = new ServerContext(ServerContext.ServerType.NODE_CONTROLLER, @@ -235,13 +246,6 @@ public class NodeControllerService implements IControllerService { return lccm; } - synchronized void setNodeRegistrationResult(NodeParameters parameters, Exception exception) { - this.nodeParameters = parameters; - this.registrationException = exception; - this.registrationPending = false; - notifyAll(); - } - public Map<String, NodeControllerInfo> getNodeControllersInfo() throws Exception { FutureValue<Map<String, NodeControllerInfo>> fv = new FutureValue<>(); synchronized (getNodeControllerInfosAcceptor) { @@ -250,7 +254,7 @@ public class NodeControllerService implements IControllerService { } getNodeControllerInfosAcceptor.setValue(fv); } - ccs.getNodeControllerInfos(); + primaryCcs.getNodeControllerInfos(); return fv.get(); } @@ -297,79 +301,142 @@ public class NodeControllerService implements IControllerService { if (messagingNetManager != null) { messagingNetManager.start(); } - this.ccs = new ClusterControllerRemoteProxy( - ipc.getHandle( - new InetSocketAddress(ncConfig.getClusterAddress(), ncConfig.getClusterPort()), - ncConfig.getClusterConnectRetries(), 1, new IIPCEventListener() { - @Override - public void ipcHandleRestored(IIPCHandle handle) throws IPCException { - // we need to re-register in case of NC -> CC connection reset - try { - registerNode(); - } catch (Exception e) { - LOGGER.log(Level.WARN, "Failed Registering with cc", e); - throw new IPCException(e); - } - } - })); - registerNode(); + + final InetSocketAddress ccAddress = new InetSocketAddress(ncConfig.getClusterAddress(), + ncConfig.getClusterPort()); + this.primaryCcs = addCc(ncConfig.getClusterControllerId(), ccAddress); workQueue.start(); // Schedule tracing a human-readable datetime timer.schedule(new TraceCurrentTimeTask(serviceCtx.getTracer()), 0, 60000); - if (nodeParameters.getProfileDumpPeriod() > 0) { - // Schedule profile dump generator. - timer.schedule(new ProfileDumpTask(ccs), 0, nodeParameters.getProfileDumpPeriod()); + LOGGER.log(Level.INFO, "Started NodeControllerService"); + application.startupCompleted(); + } + + public ClusterControllerRemoteProxy addCc(CcId ccId, InetSocketAddress ccAddress) throws Exception { + ClusterControllerRemoteProxy ccProxy; + synchronized (ccsMap) { + if (ccsMap.containsKey(ccId)) { + throw new IllegalStateException("cc already registered: " + ccId); + } + final IIPCEventListener ipcEventListener = new IIPCEventListener() { + @Override + public void ipcHandleRestored(IIPCHandle handle) throws IPCException { + // we need to re-register in case of NC -> CC connection reset + try { + registerNode(ccsMap.get(ccId)); + } catch (Exception e) { + LOGGER.log(Level.WARN, "Failed Registering with cc", e); + throw new IPCException(e); + } + } + }; + ccProxy = new ClusterControllerRemoteProxy(ccId, + ipc.getHandle(ccAddress, ncConfig.getClusterConnectRetries(), 1, ipcEventListener)); + registerNode(ccProxy); + ccsMap.put(ccId, ccProxy); } + return ccProxy; + } - // Start heartbeat generator. - heartbeatThread = new Thread(new HeartbeatTask(ccs, nodeParameters.getHeartbeatPeriod()), id + "-Heartbeat"); - heartbeatThread.setPriority(Thread.MAX_PRIORITY); - heartbeatThread.setDaemon(true); - heartbeatThread.start(); + public void makePrimaryCc(CcId ccId) throws Exception { + synchronized (ccsMap) { + if (!ccsMap.containsKey(ccId)) { + throw new IllegalArgumentException("unknown cc: " + ccId); + } + primaryCcs = ccsMap.get(ccId); + } + } - LOGGER.log(Level.INFO, "Started NodeControllerService"); - application.startupCompleted(); + public void removeCc(CcId ccId) throws Exception { + synchronized (ccsMap) { + final IClusterController ccs = ccsMap.get(ccId); + if (ccs == null) { + throw new IllegalArgumentException("unknown cc: " + ccId); + } + if (primaryCcs.equals(ccs)) { + throw new IllegalStateException("cannot remove primary cc: " + ccId); + } + // TODO(mblow): consider how to handle running jobs + ccs.unregisterNode(id); + Thread hbThread = heartbeatThreads.remove(ccs); + hbThread.interrupt(); + Timer ccTimer = ccTimers.remove(ccs); + if (ccTimer != null) { + ccTimer.cancel(); + } + } } - public void registerNode() throws Exception { - LOGGER.info("Registering with Cluster Controller"); + protected void registerNode(IClusterController ccs) throws Exception { + LOGGER.info("Registering with Cluster Controller {}", ccs); registrationPending = true; HeartbeatSchema.GarbageCollectorInfo[] gcInfos = new HeartbeatSchema.GarbageCollectorInfo[gcMXBeans.size()]; for (int i = 0; i < gcInfos.length; ++i) { gcInfos[i] = new HeartbeatSchema.GarbageCollectorInfo(gcMXBeans.get(i).getName()); } HeartbeatSchema hbSchema = new HeartbeatSchema(gcInfos); - // Use "public" versions of network addresses and ports + // Use "public" versions of network addresses and ports, if defined + InetSocketAddress ncAddress; + if (ncConfig.getClusterPublicPort() == 0) { + ncAddress = ipc.getSocketAddress(); + } else { + ncAddress = new InetSocketAddress(ncConfig.getClusterPublicAddress(), ncConfig.getClusterPublicPort()); + } NetworkAddress datasetAddress = datasetNetworkManager.getPublicNetworkAddress(); NetworkAddress netAddress = netManager.getPublicNetworkAddress(); - NetworkAddress meesagingPort = + NetworkAddress messagingAddress = messagingNetManager != null ? messagingNetManager.getPublicNetworkAddress() : null; int allCores = osMXBean.getAvailableProcessors(); - nodeRegistration = new NodeRegistration(ipc.getSocketAddress(), id, ncConfig, netAddress, datasetAddress, + nodeRegistration = new NodeRegistration(ncAddress, id, ncConfig, netAddress, datasetAddress, osMXBean.getName(), osMXBean.getArch(), osMXBean.getVersion(), allCores, runtimeMXBean.getVmName(), runtimeMXBean.getVmVersion(), runtimeMXBean.getVmVendor(), runtimeMXBean.getClassPath(), runtimeMXBean.getLibraryPath(), runtimeMXBean.getBootClassPath(), runtimeMXBean.getInputArguments(), - runtimeMXBean.getSystemProperties(), hbSchema, meesagingPort, application.getCapacity(), + runtimeMXBean.getSystemProperties(), hbSchema, messagingAddress, application.getCapacity(), PidHelper.getPid(), maxJobId.get()); ccs.registerNode(nodeRegistration); - synchronized (this) { - while (registrationPending) { - wait(); - } + completeNodeRegistration(ccs); + + // Start heartbeat generator. + if (!heartbeatThreads.containsKey(ccs)) { + Thread heartbeatThread = new Thread(new HeartbeatTask(ccs, nodeParameters.getHeartbeatPeriod()), + id + "-Heartbeat"); + heartbeatThread.setPriority(Thread.MAX_PRIORITY); + heartbeatThread.setDaemon(true); + heartbeatThread.start(); + heartbeatThreads.put(ccs, heartbeatThread); + } + if (!ccTimers.containsKey(ccs) && nodeParameters.getProfileDumpPeriod() > 0) { + Timer ccTimer = new Timer("Timer-" + ccs.getCcId(), true); + // Schedule profile dump generator. + ccTimer.schedule(new ProfileDumpTask(ccs), 0, nodeParameters.getProfileDumpPeriod()); + ccTimers.put(ccs, ccTimer); + } + + LOGGER.info("Registering with Cluster Controller {} complete", ccs); + } + + synchronized void setNodeRegistrationResult(NodeParameters parameters, Exception exception) { + this.nodeParameters = parameters; + this.registrationException = exception; + this.registrationPending = false; + notifyAll(); + } + + private synchronized void completeNodeRegistration(IClusterController ccs) throws Exception { + while (registrationPending) { + wait(); } if (registrationException != null) { - LOGGER.log(Level.WARN, "Registering with Cluster Controller failed with exception", - registrationException); + LOGGER.log(Level.WARN, "Registering with Cluster Controller failed with exception", registrationException); throw registrationException; } serviceCtx.setDistributedState(nodeParameters.getDistributedState()); - application.onRegisterNode(); - LOGGER.info("Registering with Cluster Controller complete"); + application.onRegisterNode(ccs.getCcId()); } private void startApplication() throws Exception { @@ -404,17 +471,21 @@ public class NodeControllerService implements IControllerService { workQueue.stop(); application.stop(); /* - * Stop heartbeat after NC has stopped to avoid false node failure detection + * Stop heartbeats only after NC has stopped to avoid false node failure detection * on CC if an NC takes a long time to stop. */ - if (heartbeatThread != null) { - heartbeatThread.interrupt(); - heartbeatThread.join(1000); // give it 1s to stop gracefully - } - try { - ccs.notifyShutdown(id); - } catch (Exception e) { - LOGGER.log(Level.WARN, "Exception notifying CC of shutdown", e); + heartbeatThreads.values().parallelStream().forEach(t -> { + t.interrupt(); + InvokeUtil.doUninterruptibly(() -> t.join(1000)); + }); + synchronized (ccsMap) { + ccsMap.values().parallelStream().forEach(ccs -> { + try { + ccs.notifyShutdown(id); + } catch (Exception e) { + LOGGER.log(Level.WARN, "Exception notifying CC of shutdown", e); + } + }); } ipc.stop(); @@ -423,6 +494,14 @@ public class NodeControllerService implements IControllerService { LOGGER.log(Level.ERROR, "Duplicate shutdown call; original: " + Arrays.toString(shutdownCallStack), new Exception("Duplicate shutdown call")); } + if (ncShutdownHook != null) { + try { + Runtime.getRuntime().removeShutdownHook(ncShutdownHook); + LOGGER.info("removed shutdown hook for {}", id); + } catch (IllegalStateException e) { + LOGGER.log(Level.DEBUG, "ignoring exception while attempting to remove shutdown hook", e); + } + } } public String getId() { @@ -488,8 +567,12 @@ public class NodeControllerService implements IControllerService { return partitionManager; } - public IClusterController getClusterController() { - return ccs; + public IClusterController getPrimaryClusterController() { + return primaryCcs; + } + + public IClusterController getClusterController(CcId ccId) { + return ccsMap.get(ccId); } public NodeParameters getNodeParameters() { @@ -619,7 +702,7 @@ public class NodeControllerService implements IControllerService { public void run() { try { FutureValue<List<JobProfile>> fv = new FutureValue<>(); - BuildJobProfilesWork bjpw = new BuildJobProfilesWork(NodeControllerService.this, fv); + BuildJobProfilesWork bjpw = new BuildJobProfilesWork(NodeControllerService.this, cc.getCcId(), fv); workQueue.scheduleAndSync(bjpw); List<JobProfile> profiles = fv.get(); if (!profiles.isEmpty()) { @@ -651,8 +734,8 @@ public class NodeControllerService implements IControllerService { } } - public void sendApplicationMessageToCC(byte[] data, DeploymentId deploymentId) throws Exception { - ccs.sendApplicationMessageToCC(data, deploymentId, id); + public void sendApplicationMessageToCC(CcId ccId, byte[] data, DeploymentId deploymentId) throws Exception { + ccsMap.get(ccId).sendApplicationMessageToCC(data, deploymentId, id); } public IDatasetPartitionManager getDatasetPartitionManager() { http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1a3a8212/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java index 34ddd6a..07bb504 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java @@ -437,12 +437,13 @@ public class Task implements IHyracksTaskContext, ICounterContext, Runnable { @Override public void sendApplicationMessageToCC(byte[] message, DeploymentId deploymentId) throws Exception { - this.ncs.sendApplicationMessageToCC(message, deploymentId); + this.ncs.sendApplicationMessageToCC(getJobletContext().getJobId().getCcId(), message, deploymentId); } @Override public void sendApplicationMessageToCC(Serializable message, DeploymentId deploymentId) throws Exception { - this.ncs.sendApplicationMessageToCC(JavaSerializationUtils.serialize(message), deploymentId); + this.ncs.sendApplicationMessageToCC(getJobletContext().getJobId().getCcId(), + JavaSerializationUtils.serialize(message), deploymentId); } @Override http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1a3a8212/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/DatasetPartitionManager.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/DatasetPartitionManager.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/DatasetPartitionManager.java index 476aeae..fb7308e 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/DatasetPartitionManager.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/DatasetPartitionManager.java @@ -93,10 +93,10 @@ public class DatasetPartitionManager extends AbstractDatasetManager implements I boolean orderedResult, boolean emptyResult) throws HyracksException { try { // Be sure to send the *public* network address to the CC - ncs.getClusterController().registerResultPartitionLocation(jobId, rsId, orderedResult, emptyResult, - partition, nPartitions, ncs.getDatasetNetworkManager().getPublicNetworkAddress()); + ncs.getClusterController(jobId.getCcId()).registerResultPartitionLocation(jobId, rsId, orderedResult, + emptyResult, partition, nPartitions, ncs.getDatasetNetworkManager().getPublicNetworkAddress()); } catch (Exception e) { - throw new HyracksException(e); + throw HyracksException.create(e); } } @@ -105,9 +105,9 @@ public class DatasetPartitionManager extends AbstractDatasetManager implements I try { LOGGER.debug("Reporting partition write completion: JobId: " + jobId + ": ResultSetId: " + rsId + ":partition: " + partition); - ncs.getClusterController().reportResultPartitionWriteCompletion(jobId, rsId, partition); + ncs.getClusterController(jobId.getCcId()).reportResultPartitionWriteCompletion(jobId, rsId, partition); } catch (Exception e) { - throw new HyracksException(e); + throw HyracksException.create(e); } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1a3a8212/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/MaterializedPartitionWriter.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/MaterializedPartitionWriter.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/MaterializedPartitionWriter.java index b9d2f4d..4787a50 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/MaterializedPartitionWriter.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/MaterializedPartitionWriter.java @@ -96,7 +96,7 @@ public class MaterializedPartitionWriter implements IFrameWriter { ctx.getIoManager().close(handle); } if (!failed) { - manager.registerPartition(pid, taId, + manager.registerPartition(pid, ctx.getJobletContext().getJobId().getCcId(), taId, new MaterializedPartition(ctx, fRef, executor, ctx.getIoManager()), PartitionState.COMMITTED, taId.getAttempt() == 0 ? false : true); http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1a3a8212/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/MaterializingPipelinedPartition.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/MaterializingPipelinedPartition.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/MaterializingPipelinedPartition.java index 57eba53..147606d 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/MaterializingPipelinedPartition.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/MaterializingPipelinedPartition.java @@ -188,7 +188,8 @@ public class MaterializingPipelinedPartition implements IFrameWriter, IPartition eos = false; failed = false; deallocated = false; - manager.registerPartition(pid, taId, this, PartitionState.STARTED, false); + manager.registerPartition(pid, ctx.getJobletContext().getJobId().getCcId(), taId, this, PartitionState.STARTED, + false); } private void checkOrCreateFile() throws HyracksDataException { http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1a3a8212/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/PartitionManager.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/PartitionManager.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/PartitionManager.java index 667cfa3..bb69eec 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/PartitionManager.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/PartitionManager.java @@ -25,6 +25,7 @@ import java.util.Iterator; import java.util.List; import java.util.Map; +import org.apache.hyracks.api.control.CcId; import org.apache.hyracks.api.dataflow.TaskAttemptId; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.api.exceptions.HyracksException; @@ -58,10 +59,10 @@ public class PartitionManager { this.fileFactory = new WorkspaceFileFactory(deallocatableRegistry, ncs.getIoManager()); } - public synchronized void registerPartition(PartitionId pid, TaskAttemptId taId, IPartition partition, + public synchronized void registerPartition(PartitionId pid, CcId ccId, TaskAttemptId taId, IPartition partition, PartitionState state, boolean updateToCC) throws HyracksDataException { try { - /** + /* * process pending requests */ NetworkOutputChannel writer = partitionRequests.remove(pid); @@ -73,24 +74,20 @@ public class PartitionManager { } } - /** + /* * put a coming available partition into the available partition map */ - List<IPartition> pList = availablePartitionMap.get(pid); - if (pList == null) { - pList = new ArrayList<>(); - availablePartitionMap.put(pid, pList); - } + List<IPartition> pList = availablePartitionMap.computeIfAbsent(pid, k -> new ArrayList<>()); pList.add(partition); - /** + /* * update to CC only when necessary */ if (updateToCC) { - updatePartitionState(pid, taId, partition, state); + updatePartitionState(ccId, pid, taId, partition, state); } } catch (Exception e) { - throw new HyracksDataException(e); + throw HyracksDataException.create(e); } } @@ -128,7 +125,7 @@ public class PartitionManager { partitionRequests.put(partitionId, writer); } } catch (Exception e) { - throw new HyracksDataException(e); + throw HyracksDataException.create(e); } } @@ -140,14 +137,15 @@ public class PartitionManager { deallocatableRegistry.close(); } - public void updatePartitionState(PartitionId pid, TaskAttemptId taId, IPartition partition, PartitionState state) + public void updatePartitionState(CcId ccId, PartitionId pid, TaskAttemptId taId, IPartition partition, + PartitionState state) throws HyracksDataException { PartitionDescriptor desc = new PartitionDescriptor(pid, ncs.getId(), taId, partition.isReusable()); desc.setState(state); try { - ncs.getClusterController().registerPartitionProvider(desc); + ncs.getClusterController(ccId).registerPartitionProvider(desc); } catch (Exception e) { - throw new HyracksDataException(e); + throw HyracksDataException.create(e); } } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1a3a8212/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/PipelinedPartition.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/PipelinedPartition.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/PipelinedPartition.java index 16e5027..fc2f8e7 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/PipelinedPartition.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/PipelinedPartition.java @@ -71,7 +71,8 @@ public class PipelinedPartition implements IFrameWriter, IPartition { @Override public void open() throws HyracksDataException { - manager.registerPartition(pid, taId, this, PartitionState.STARTED, false); + manager.registerPartition(pid, ctx.getJobletContext().getJobId().getCcId(), taId, this, PartitionState.STARTED, + false); pendingConnection = true; ensureConnected(); } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1a3a8212/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/task/ThreadDumpTask.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/task/ThreadDumpTask.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/task/ThreadDumpTask.java index f43dcbc..4969a85 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/task/ThreadDumpTask.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/task/ThreadDumpTask.java @@ -18,8 +18,9 @@ */ package org.apache.hyracks.control.nc.task; -import org.apache.hyracks.util.ThreadDumpUtil; +import org.apache.hyracks.api.control.CcId; import org.apache.hyracks.control.nc.NodeControllerService; +import org.apache.hyracks.util.ThreadDumpUtil; import org.apache.logging.log4j.Level; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -28,10 +29,12 @@ public class ThreadDumpTask implements Runnable { private static final Logger LOGGER = LogManager.getLogger(); private final NodeControllerService ncs; private final String requestId; + private final CcId ccId; - public ThreadDumpTask(NodeControllerService ncs, String requestId) { + public ThreadDumpTask(NodeControllerService ncs, String requestId, CcId ccId) { this.ncs = ncs; this.requestId = requestId; + this.ccId = ccId; } @Override @@ -44,8 +47,7 @@ public class ThreadDumpTask implements Runnable { result = null; } try { - ncs.getClusterController().notifyThreadDump( - ncs.getContext().getNodeId(), requestId, result); + ncs.getClusterController(ccId).notifyThreadDump(ncs.getContext().getNodeId(), requestId, result); } catch (Exception e) { LOGGER.log(Level.WARN, "Exception sending thread dump to CC", e); } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1a3a8212/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/AbortAllJobsWork.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/AbortAllJobsWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/AbortAllJobsWork.java index 6132639..68d677f 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/AbortAllJobsWork.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/AbortAllJobsWork.java @@ -20,6 +20,7 @@ package org.apache.hyracks.control.nc.work; import java.util.Collection; +import org.apache.hyracks.api.control.CcId; import org.apache.hyracks.api.dataset.IDatasetPartitionManager; import org.apache.hyracks.api.job.JobStatus; import org.apache.hyracks.control.common.work.SynchronizableWork; @@ -34,24 +35,29 @@ public class AbortAllJobsWork extends SynchronizableWork { private static final Logger LOGGER = LogManager.getLogger(); private final NodeControllerService ncs; + private final CcId ccId; - public AbortAllJobsWork(NodeControllerService ncs) { + public AbortAllJobsWork(NodeControllerService ncs, CcId ccId) { this.ncs = ncs; + this.ccId = ccId; } @Override protected void doRun() throws Exception { - if (LOGGER.isInfoEnabled()) { - LOGGER.info("Aborting all tasks"); - } + LOGGER.info("Aborting all tasks for controller {}", ccId); IDatasetPartitionManager dpm = ncs.getDatasetPartitionManager(); - if (dpm != null) { - ncs.getDatasetPartitionManager().abortAllReaders(); - } else { + if (dpm == null) { LOGGER.log(Level.WARN, "DatasetPartitionManager is null on " + ncs.getId()); } Collection<Joblet> joblets = ncs.getJobletMap().values(); for (Joblet ji : joblets) { + // TODO(mblow): should we have one jobletmap per cc? + if (!ji.getJobId().getCcId().equals(ccId)) { + continue; + } + if (dpm != null) { + dpm.abortReader(ji.getJobId()); + } Collection<Task> tasks = ji.getTaskMap().values(); for (Task task : tasks) { task.abort(); http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1a3a8212/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/BuildJobProfilesWork.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/BuildJobProfilesWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/BuildJobProfilesWork.java index 582f058..0dd5d4e 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/BuildJobProfilesWork.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/BuildJobProfilesWork.java @@ -22,6 +22,7 @@ import java.util.ArrayList; import java.util.List; import java.util.Map; +import org.apache.hyracks.api.control.CcId; import org.apache.hyracks.api.job.JobId; import org.apache.hyracks.control.common.job.profiling.om.JobProfile; import org.apache.hyracks.control.common.job.profiling.om.JobletProfile; @@ -33,20 +34,21 @@ import org.apache.hyracks.control.nc.NodeControllerService; public class BuildJobProfilesWork extends SynchronizableWork { private final NodeControllerService ncs; + private final CcId ccId; private final FutureValue<List<JobProfile>> fv; - public BuildJobProfilesWork(NodeControllerService ncs, FutureValue<List<JobProfile>> fv) { + public BuildJobProfilesWork(NodeControllerService ncs, CcId ccId, FutureValue<List<JobProfile>> fv) { this.ncs = ncs; + this.ccId = ccId; this.fv = fv; } @Override protected void doRun() throws Exception { - List<JobProfile> profiles = new ArrayList<JobProfile>(); + List<JobProfile> profiles = new ArrayList<>(); Map<JobId, Joblet> jobletMap = ncs.getJobletMap(); - for (Joblet ji : jobletMap.values()) { - profiles.add(new JobProfile(ji.getJobId())); - } + jobletMap.values().stream().filter(ji -> ji.getJobId().getCcId().equals(ccId)) + .forEach(ji -> profiles.add(new JobProfile(ji.getJobId()))); for (JobProfile jProfile : profiles) { Joblet ji; JobletProfile jobletProfile = new JobletProfile(ncs.getId());
