Repository: asterixdb Updated Branches: refs/heads/master 592af6545 -> e6f426b80
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e6f426b8/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/CCNCFunctions.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/CCNCFunctions.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/CCNCFunctions.java index 4797ed7..77c352e 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/CCNCFunctions.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/CCNCFunctions.java @@ -49,6 +49,7 @@ import org.apache.hyracks.api.dataflow.connectors.ConnectorPolicyFactory; import org.apache.hyracks.api.dataflow.connectors.IConnectorPolicy; import org.apache.hyracks.api.dataset.ResultSetId; import org.apache.hyracks.api.deployment.DeploymentId; +import org.apache.hyracks.api.job.DeployedJobSpecId; import org.apache.hyracks.api.job.JobFlag; import org.apache.hyracks.api.job.JobId; import org.apache.hyracks.api.job.JobStatus; @@ -102,7 +103,7 @@ public class CCNCFunctions { DISTRIBUTE_JOB, DESTROY_JOB, - DISTRIBUTED_JOB_FAILURE, + DEPLOYED_JOB_FAILURE, STATE_DUMP_REQUEST, STATE_DUMP_RESPONSE, @@ -286,24 +287,24 @@ public class CCNCFunctions { } } - public static class ReportDistributedJobFailureFunction extends Function { + public static class ReportDeployedJobSpecFailureFunction extends Function { private static final long serialVersionUID = 1L; - private final JobId jobId; + private final DeployedJobSpecId deployedJobSpecId; private final String nodeId; - public ReportDistributedJobFailureFunction(JobId jobId, String nodeId) { - this.jobId = jobId; + public ReportDeployedJobSpecFailureFunction(DeployedJobSpecId deployedJobSpecId, String nodeId) { + this.deployedJobSpecId = deployedJobSpecId; this.nodeId = nodeId; } @Override public FunctionId getFunctionId() { - return FunctionId.DISTRIBUTED_JOB_FAILURE; + return FunctionId.DEPLOYED_JOB_FAILURE; } - public JobId getJobId() { - return jobId; + public DeployedJobSpecId getDeployedJobSpecId() { + return deployedJobSpecId; } public String getNodeId() { @@ -676,15 +677,15 @@ public class CCNCFunctions { } } - public static class DistributeJobFunction extends Function { + public static class DeployJobSpecFunction extends Function { private static final long serialVersionUID = 1L; - private final JobId jobId; + private final DeployedJobSpecId deployedJobSpecId; private final byte[] acgBytes; - public DistributeJobFunction(JobId jobId, byte[] acgBytes) { - this.jobId = jobId; + public DeployJobSpecFunction(DeployedJobSpecId deployedJobSpecId, byte[] acgBytes) { + this.deployedJobSpecId = deployedJobSpecId; this.acgBytes = acgBytes; } @@ -693,8 +694,8 @@ public class CCNCFunctions { return FunctionId.DISTRIBUTE_JOB; } - public JobId getJobId() { - return jobId; + public DeployedJobSpecId getDeployedJobSpecId() { + return deployedJobSpecId; } public byte[] getacgBytes() { @@ -702,13 +703,13 @@ public class CCNCFunctions { } } - public static class DestroyJobFunction extends Function { + public static class UndeployJobSpecFunction extends Function { private static final long serialVersionUID = 1L; - private final JobId jobId; + private final DeployedJobSpecId deployedJobSpecId; - public DestroyJobFunction(JobId jobId) { - this.jobId = jobId; + public UndeployJobSpecFunction(DeployedJobSpecId deployedJobSpecId) { + this.deployedJobSpecId = deployedJobSpecId; } @Override @@ -716,8 +717,8 @@ public class CCNCFunctions { return FunctionId.DESTROY_JOB; } - public JobId getJobId() { - return jobId; + public DeployedJobSpecId getDeployedJobSpecId() { + return deployedJobSpecId; } } @@ -730,16 +731,21 @@ public class CCNCFunctions { private final List<TaskAttemptDescriptor> taskDescriptors; private final Map<ConnectorDescriptorId, IConnectorPolicy> connectorPolicies; private final Set<JobFlag> flags; + private final Map<byte[], byte[]> jobParameters; + private final DeployedJobSpecId deployedJobSpecId; public StartTasksFunction(DeploymentId deploymentId, JobId jobId, byte[] planBytes, List<TaskAttemptDescriptor> taskDescriptors, - Map<ConnectorDescriptorId, IConnectorPolicy> connectorPolicies, Set<JobFlag> flags) { + Map<ConnectorDescriptorId, IConnectorPolicy> connectorPolicies, Set<JobFlag> flags, + Map<byte[], byte[]> jobParameters, DeployedJobSpecId deployedJobSpecId) { this.deploymentId = deploymentId; this.jobId = jobId; this.planBytes = planBytes; this.taskDescriptors = taskDescriptors; this.connectorPolicies = connectorPolicies; this.flags = flags; + this.jobParameters = jobParameters; + this.deployedJobSpecId = deployedJobSpecId; } @Override @@ -751,10 +757,18 @@ public class CCNCFunctions { return deploymentId; } + public DeployedJobSpecId getDeployedJobSpecId() { + return deployedJobSpecId; + } + public JobId getJobId() { return jobId; } + public Map<byte[], byte[]> getJobParameters() { + return jobParameters; + } + public byte[] getPlanBytes() { return planBytes; } @@ -815,7 +829,33 @@ public class CCNCFunctions { flags.add(JobFlag.values()[(dis.readInt())]); } - return new StartTasksFunction(deploymentId, jobId, planBytes, taskDescriptors, connectorPolicies, flags); + // read job parameters + int paramListSize = dis.readInt(); + Map<byte[], byte[]> jobParameters = new HashMap<>(); + for (int i = 0; i < paramListSize; i++) { + int nameLength = dis.readInt(); + byte[] nameBytes = null; + if (nameLength >= 0) { + nameBytes = new byte[nameLength]; + dis.read(nameBytes, 0, nameLength); + } + int valueLength = dis.readInt(); + byte[] valueBytes = null; + if (valueLength >= 0) { + valueBytes = new byte[valueLength]; + dis.read(valueBytes, 0, valueLength); + } + jobParameters.put(nameBytes, valueBytes); + } + + //read DeployedJobSpecId + DeployedJobSpecId deployedJobSpecId = null; + if (dis.readBoolean()) { + deployedJobSpecId = DeployedJobSpecId.create(dis); + } + + return new StartTasksFunction(deploymentId, jobId, planBytes, taskDescriptors, connectorPolicies, flags, + jobParameters, deployedJobSpecId); } public static void serialize(OutputStream out, Object object) throws Exception { @@ -853,6 +893,22 @@ public class CCNCFunctions { for (JobFlag flag : fn.flags) { dos.writeInt(flag.ordinal()); } + + //write job parameters + dos.writeInt(fn.jobParameters.size()); + for (Entry<byte[], byte[]> entry : fn.jobParameters.entrySet()) { + dos.writeInt(entry.getKey().length); + dos.write(entry.getKey(), 0, entry.getKey().length); + dos.writeInt(entry.getValue().length); + dos.write(entry.getValue(), 0, entry.getValue().length); + } + + //write deployed job spec id + dos.writeBoolean(fn.getDeployedJobSpecId() == null ? false : true); + if (fn.getDeployedJobSpecId() != null) { + fn.getDeployedJobSpecId().writeFields(dos); + } + } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e6f426b8/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java index c09f317..f9af4c6 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java @@ -18,8 +18,6 @@ */ package org.apache.hyracks.control.common.ipc; -import static org.apache.hyracks.control.common.ipc.CCNCFunctions.*; - import java.net.InetSocketAddress; import java.util.List; import java.util.logging.Logger; @@ -28,11 +26,30 @@ import org.apache.hyracks.api.comm.NetworkAddress; import org.apache.hyracks.api.dataflow.TaskAttemptId; import org.apache.hyracks.api.dataset.ResultSetId; import org.apache.hyracks.api.deployment.DeploymentId; +import org.apache.hyracks.api.job.DeployedJobSpecId; import org.apache.hyracks.api.job.JobId; import org.apache.hyracks.control.common.base.IClusterController; import org.apache.hyracks.control.common.controllers.NodeRegistration; import org.apache.hyracks.control.common.deployment.DeploymentStatus; import org.apache.hyracks.control.common.heartbeat.HeartbeatData; +import org.apache.hyracks.control.common.ipc.CCNCFunctions.GetNodeControllersInfoFunction; +import org.apache.hyracks.control.common.ipc.CCNCFunctions.NodeHeartbeatFunction; +import org.apache.hyracks.control.common.ipc.CCNCFunctions.NotifyDeployBinaryFunction; +import org.apache.hyracks.control.common.ipc.CCNCFunctions.NotifyJobletCleanupFunction; +import org.apache.hyracks.control.common.ipc.CCNCFunctions.NotifyTaskCompleteFunction; +import org.apache.hyracks.control.common.ipc.CCNCFunctions.NotifyTaskFailureFunction; +import org.apache.hyracks.control.common.ipc.CCNCFunctions.RegisterNodeFunction; +import org.apache.hyracks.control.common.ipc.CCNCFunctions.RegisterPartitionProviderFunction; +import org.apache.hyracks.control.common.ipc.CCNCFunctions.RegisterPartitionRequestFunction; +import org.apache.hyracks.control.common.ipc.CCNCFunctions.RegisterResultPartitionLocationFunction; +import org.apache.hyracks.control.common.ipc.CCNCFunctions.ReportDeployedJobSpecFailureFunction; +import org.apache.hyracks.control.common.ipc.CCNCFunctions.ReportProfileFunction; +import org.apache.hyracks.control.common.ipc.CCNCFunctions.ReportResultPartitionWriteCompletionFunction; +import org.apache.hyracks.control.common.ipc.CCNCFunctions.SendApplicationMessageFunction; +import org.apache.hyracks.control.common.ipc.CCNCFunctions.ShutdownResponseFunction; +import org.apache.hyracks.control.common.ipc.CCNCFunctions.StateDumpResponseFunction; +import org.apache.hyracks.control.common.ipc.CCNCFunctions.ThreadDumpResponseFunction; +import org.apache.hyracks.control.common.ipc.CCNCFunctions.UnregisterNodeFunction; import org.apache.hyracks.control.common.job.PartitionDescriptor; import org.apache.hyracks.control.common.job.PartitionRequest; import org.apache.hyracks.control.common.job.profiling.om.JobProfile; @@ -151,9 +168,8 @@ public class ClusterControllerRemoteProxy extends ControllerRemoteProxy implemen } @Override - public void notifyDistributedJobFailure(JobId jobId, String nodeId) throws Exception { - ReportDistributedJobFailureFunction fn = new ReportDistributedJobFailureFunction( - jobId, nodeId); + public void notifyDeployedJobSpecFailure(DeployedJobSpecId deployedJobSpecId, String nodeId) throws Exception { + ReportDeployedJobSpecFailureFunction fn = new ReportDeployedJobSpecFailureFunction(deployedJobSpecId, nodeId); ensureIpcHandle().send(-1, fn, null); } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e6f426b8/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/NodeControllerRemoteProxy.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/NodeControllerRemoteProxy.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/NodeControllerRemoteProxy.java index 3819ef8..8431eca 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/NodeControllerRemoteProxy.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/NodeControllerRemoteProxy.java @@ -18,8 +18,6 @@ */ package org.apache.hyracks.control.common.ipc; -import static org.apache.hyracks.control.common.ipc.CCNCFunctions.*; - import java.net.InetSocketAddress; import java.net.URL; import java.util.List; @@ -32,11 +30,24 @@ import org.apache.hyracks.api.dataflow.ConnectorDescriptorId; import org.apache.hyracks.api.dataflow.TaskAttemptId; import org.apache.hyracks.api.dataflow.connectors.IConnectorPolicy; import org.apache.hyracks.api.deployment.DeploymentId; +import org.apache.hyracks.api.job.DeployedJobSpecId; import org.apache.hyracks.api.job.JobFlag; import org.apache.hyracks.api.job.JobId; import org.apache.hyracks.api.job.JobStatus; import org.apache.hyracks.api.partitions.PartitionId; import org.apache.hyracks.control.common.base.INodeController; +import org.apache.hyracks.control.common.ipc.CCNCFunctions.AbortTasksFunction; +import org.apache.hyracks.control.common.ipc.CCNCFunctions.CleanupJobletFunction; +import org.apache.hyracks.control.common.ipc.CCNCFunctions.DeployBinaryFunction; +import org.apache.hyracks.control.common.ipc.CCNCFunctions.DeployJobSpecFunction; +import org.apache.hyracks.control.common.ipc.CCNCFunctions.ReportPartitionAvailabilityFunction; +import org.apache.hyracks.control.common.ipc.CCNCFunctions.SendApplicationMessageFunction; +import org.apache.hyracks.control.common.ipc.CCNCFunctions.ShutdownRequestFunction; +import org.apache.hyracks.control.common.ipc.CCNCFunctions.StartTasksFunction; +import org.apache.hyracks.control.common.ipc.CCNCFunctions.StateDumpRequestFunction; +import org.apache.hyracks.control.common.ipc.CCNCFunctions.ThreadDumpRequestFunction; +import org.apache.hyracks.control.common.ipc.CCNCFunctions.UnDeployBinaryFunction; +import org.apache.hyracks.control.common.ipc.CCNCFunctions.UndeployJobSpecFunction; import org.apache.hyracks.control.common.job.TaskAttemptDescriptor; import org.apache.hyracks.ipc.impl.IPCSystem; @@ -61,9 +72,10 @@ public class NodeControllerRemoteProxy extends ControllerRemoteProxy implements @Override public void startTasks(DeploymentId deploymentId, JobId jobId, byte[] planBytes, List<TaskAttemptDescriptor> taskDescriptors, Map<ConnectorDescriptorId, IConnectorPolicy> connectorPolicies, - Set<JobFlag> flags) throws Exception { + Set<JobFlag> flags, Map<byte[], byte[]> jobParameters, DeployedJobSpecId deployedJobSpecId) + throws Exception { StartTasksFunction stf = new StartTasksFunction(deploymentId, jobId, planBytes, - taskDescriptors, connectorPolicies, flags); + taskDescriptors, connectorPolicies, flags, jobParameters, deployedJobSpecId); ensureIpcHandle().send(-1, stf, null); } @@ -99,14 +111,14 @@ public class NodeControllerRemoteProxy extends ControllerRemoteProxy implements } @Override - public void distributeJob(JobId jobId, byte[] planBytes) throws Exception { - DistributeJobFunction fn = new DistributeJobFunction(jobId, planBytes); + public void deployJobSpec(DeployedJobSpecId deployedJobSpecId, byte[] planBytes) throws Exception { + DeployJobSpecFunction fn = new DeployJobSpecFunction(deployedJobSpecId, planBytes); ensureIpcHandle().send(-1, fn, null); } @Override - public void destroyJob(JobId jobId) throws Exception { - DestroyJobFunction fn = new DestroyJobFunction(jobId); + public void undeployJobSpec(DeployedJobSpecId deployedJobSpecId) throws Exception { + UndeployJobSpecFunction fn = new UndeployJobSpecFunction(deployedJobSpecId); ensureIpcHandle().send(-1, fn, null); } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e6f426b8/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Joblet.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Joblet.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Joblet.java index 66a5e0f..ce666b0 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Joblet.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Joblet.java @@ -100,8 +100,11 @@ public class Joblet implements IHyracksJobletContext, ICounterContext { private boolean cleanupPending; + private final IJobletEventListenerFactory jobletEventListenerFactory; + public Joblet(NodeControllerService nodeController, DeploymentId deploymentId, JobId jobId, - INCServiceContext serviceCtx, ActivityClusterGraph acg) { + INCServiceContext serviceCtx, ActivityClusterGraph acg, + IJobletEventListenerFactory jobletEventListenerFactory) { this.nodeController = nodeController; this.serviceCtx = serviceCtx; this.deploymentId = deploymentId; @@ -117,9 +120,9 @@ public class Joblet implements IHyracksJobletContext, ICounterContext { deallocatableRegistry = new DefaultDeallocatableRegistry(); fileFactory = new WorkspaceFileFactory(this, serviceCtx.getIoManager()); cleanupPending = false; - IJobletEventListenerFactory jelf = acg.getJobletEventListenerFactory(); - if (jelf != null) { - IJobletEventListener listener = jelf.createListener(this); + this.jobletEventListenerFactory = jobletEventListenerFactory; + if (jobletEventListenerFactory != null) { + IJobletEventListener listener = jobletEventListenerFactory.createListener(this); this.jobletEventListener = listener; listener.jobletStart(); } else { @@ -134,6 +137,11 @@ public class Joblet implements IHyracksJobletContext, ICounterContext { return jobId; } + @Override + public IJobletEventListenerFactory getJobletEventListenerFactory() { + return jobletEventListenerFactory; + } + public ActivityClusterGraph getActivityClusterGraph() { return acg; } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e6f426b8/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerIPCI.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerIPCI.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerIPCI.java index 1eb1393..c54f153 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerIPCI.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerIPCI.java @@ -27,12 +27,12 @@ import org.apache.hyracks.control.nc.work.AbortTasksWork; import org.apache.hyracks.control.nc.work.ApplicationMessageWork; import org.apache.hyracks.control.nc.work.CleanupJobletWork; import org.apache.hyracks.control.nc.work.DeployBinaryWork; -import org.apache.hyracks.control.nc.work.DestroyJobWork; -import org.apache.hyracks.control.nc.work.DistributeJobWork; +import org.apache.hyracks.control.nc.work.DeployJobSpecWork; import org.apache.hyracks.control.nc.work.ReportPartitionAvailabilityWork; import org.apache.hyracks.control.nc.work.StartTasksWork; import org.apache.hyracks.control.nc.work.StateDumpWork; import org.apache.hyracks.control.nc.work.UnDeployBinaryWork; +import org.apache.hyracks.control.nc.work.UndeployJobSpecWork; import org.apache.hyracks.ipc.api.IIPCHandle; import org.apache.hyracks.ipc.api.IIPCI; @@ -62,8 +62,10 @@ final class NodeControllerIPCI implements IIPCI { return; case START_TASKS: CCNCFunctions.StartTasksFunction stf = (CCNCFunctions.StartTasksFunction) fn; - ncs.getWorkQueue().schedule(new StartTasksWork(ncs, stf.getDeploymentId(), stf.getJobId(), - stf.getPlanBytes(), stf.getTaskDescriptors(), stf.getConnectorPolicies(), stf.getFlags())); + ncs.getWorkQueue() + .schedule(new StartTasksWork(ncs, stf.getDeploymentId(), stf.getJobId(), stf.getPlanBytes(), + stf.getTaskDescriptors(), stf.getConnectorPolicies(), stf.getFlags(), + stf.getJobParameters(), stf.getDeployedJobSpecId())); return; case ABORT_TASKS: CCNCFunctions.AbortTasksFunction atf = (CCNCFunctions.AbortTasksFunction) fn; @@ -104,13 +106,13 @@ final class NodeControllerIPCI implements IIPCI { return; case DISTRIBUTE_JOB: - CCNCFunctions.DistributeJobFunction djf = (CCNCFunctions.DistributeJobFunction) fn; - ncs.getWorkQueue().schedule(new DistributeJobWork(ncs, djf.getJobId(), djf.getacgBytes())); + CCNCFunctions.DeployJobSpecFunction djf = (CCNCFunctions.DeployJobSpecFunction) fn; + ncs.getWorkQueue().schedule(new DeployJobSpecWork(ncs, djf.getDeployedJobSpecId(), djf.getacgBytes())); return; case DESTROY_JOB: - CCNCFunctions.DestroyJobFunction dsjf = (CCNCFunctions.DestroyJobFunction) fn; - ncs.getWorkQueue().schedule(new DestroyJobWork(ncs, dsjf.getJobId())); + CCNCFunctions.UndeployJobSpecFunction dsjf = (CCNCFunctions.UndeployJobSpecFunction) fn; + ncs.getWorkQueue().schedule(new UndeployJobSpecWork(ncs, dsjf.getDeployedJobSpecId())); return; case STATE_DUMP_REQUEST: http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e6f426b8/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java index 7a01b87..9dd9536 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java @@ -29,6 +29,7 @@ import java.lang.management.RuntimeMXBean; import java.lang.management.ThreadMXBean; import java.net.InetSocketAddress; import java.util.Arrays; +import java.util.HashMap; import java.util.Hashtable; import java.util.List; import java.util.Map; @@ -53,7 +54,9 @@ import org.apache.hyracks.api.exceptions.ErrorCode; import org.apache.hyracks.api.exceptions.HyracksException; import org.apache.hyracks.api.io.IODeviceHandle; import org.apache.hyracks.api.job.ActivityClusterGraph; +import org.apache.hyracks.api.job.DeployedJobSpecId; import org.apache.hyracks.api.job.JobId; +import org.apache.hyracks.api.job.JobParameterByteStore; import org.apache.hyracks.api.lifecycle.ILifeCycleComponentManager; import org.apache.hyracks.api.lifecycle.LifeCycleComponentManager; import org.apache.hyracks.api.service.IControllerService; @@ -128,7 +131,9 @@ public class NodeControllerService implements IControllerService { private final Map<JobId, Joblet> jobletMap; - private final Map<JobId, ActivityClusterGraph> preDistributedJobs; + private final Map<Long, ActivityClusterGraph> deployedJobSpecActivityClusterGraphMap; + + private final Map<JobId, JobParameterByteStore> jobParameterByteStoreMap = new HashMap<>(); private ExecutorService executor; @@ -202,7 +207,7 @@ public class NodeControllerService implements IControllerService { workQueue = new WorkQueue(id, Thread.NORM_PRIORITY); // Reserves MAX_PRIORITY of the heartbeat thread. jobletMap = new Hashtable<>(); - preDistributedJobs = new Hashtable<>(); + deployedJobSpecActivityClusterGraphMap = new Hashtable<>(); timer = new Timer(true); serverCtx = new ServerContext(ServerContext.ServerType.NODE_CONTROLLER, new File(new File(NodeControllerService.class.getName()), id)); @@ -423,28 +428,43 @@ public class NodeControllerService implements IControllerService { return jobletMap; } - public void storeActivityClusterGraph(JobId jobId, ActivityClusterGraph acg) throws HyracksException { - if (preDistributedJobs.get(jobId) != null) { - throw HyracksException.create(ErrorCode.DUPLICATE_DISTRIBUTED_JOB, jobId); + public void removeJobParameterByteStore(JobId jobId) { + jobParameterByteStoreMap.remove(jobId); + } + + public JobParameterByteStore createOrGetJobParameterByteStore(JobId jobId) throws HyracksException { + JobParameterByteStore jpbs = jobParameterByteStoreMap.get(jobId); + if (jpbs == null) { + jpbs = new JobParameterByteStore(); + jobParameterByteStoreMap.put(jobId, jpbs); + } + return jpbs; + } + + + public void storeActivityClusterGraph(DeployedJobSpecId deployedJobSpecId, ActivityClusterGraph acg) + throws HyracksException { + if (deployedJobSpecActivityClusterGraphMap.get(deployedJobSpecId.getId()) != null) { + throw HyracksException.create(ErrorCode.DUPLICATE_DEPLOYED_JOB, deployedJobSpecId); } - preDistributedJobs.put(jobId, acg); + deployedJobSpecActivityClusterGraphMap.put(deployedJobSpecId.getId(), acg); } - public void removeActivityClusterGraph(JobId jobId) throws HyracksException { - if (preDistributedJobs.get(jobId) == null) { - throw HyracksException.create(ErrorCode.ERROR_FINDING_DISTRIBUTED_JOB, jobId); + public void removeActivityClusterGraph(DeployedJobSpecId deployedJobSpecId) throws HyracksException { + if (deployedJobSpecActivityClusterGraphMap.get(deployedJobSpecId.getId()) == null) { + throw HyracksException.create(ErrorCode.ERROR_FINDING_DEPLOYED_JOB, deployedJobSpecId); } - preDistributedJobs.remove(jobId); + deployedJobSpecActivityClusterGraphMap.remove(deployedJobSpecId.getId()); } - public void checkForDuplicateDistributedJob(JobId jobId) throws HyracksException { - if (preDistributedJobs.get(jobId) != null) { - throw HyracksException.create(ErrorCode.DUPLICATE_DISTRIBUTED_JOB, jobId); + public void checkForDuplicateDeployedJobSpec(DeployedJobSpecId deployedJobSpecId) throws HyracksException { + if (deployedJobSpecActivityClusterGraphMap.get(deployedJobSpecId.getId()) != null) { + throw HyracksException.create(ErrorCode.DUPLICATE_DEPLOYED_JOB, deployedJobSpecId); } } - public ActivityClusterGraph getActivityClusterGraph(JobId jobId) throws HyracksException { - return preDistributedJobs.get(jobId); + public ActivityClusterGraph getActivityClusterGraph(DeployedJobSpecId deployedJobSpecId) throws HyracksException { + return deployedJobSpecActivityClusterGraphMap.get(deployedJobSpecId.getId()); } public NetworkManager getNetworkManager() { http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e6f426b8/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java index fcd4bde..94ee92b 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java @@ -455,6 +455,9 @@ public class Task implements IHyracksTaskContext, ICounterContext, Runnable { } @Override + public byte[] getJobParameter(byte[] name, int start, int length) throws HyracksException { + return ncs.createOrGetJobParameterByteStore(joblet.getJobId()).getParameterValue(name, start, length); + } public Set<JobFlag> getJobFlags() { return jobFlags; } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e6f426b8/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/CleanupJobletWork.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/CleanupJobletWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/CleanupJobletWork.java index 670ce06..03ae90c 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/CleanupJobletWork.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/CleanupJobletWork.java @@ -51,6 +51,7 @@ public class CleanupJobletWork extends AbstractWork { if (LOGGER.isLoggable(Level.INFO)) { LOGGER.info("Cleaning up after job: " + jobId); } + ncs.removeJobParameterByteStore(jobId); final List<IPartition> unregisteredPartitions = new ArrayList<IPartition>(); ncs.getPartitionManager().unregisterPartitions(jobId, unregisteredPartitions); ncs.getExecutor().execute(new Runnable() { http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e6f426b8/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/DeployJobSpecWork.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/DeployJobSpecWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/DeployJobSpecWork.java new file mode 100644 index 0000000..4276b67 --- /dev/null +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/DeployJobSpecWork.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hyracks.control.nc.work; + +import org.apache.hyracks.api.exceptions.HyracksException; +import org.apache.hyracks.api.job.ActivityClusterGraph; +import org.apache.hyracks.api.job.DeployedJobSpecId; +import org.apache.hyracks.control.common.deployment.DeploymentUtils; +import org.apache.hyracks.control.common.work.AbstractWork; +import org.apache.hyracks.control.nc.NodeControllerService; + +/** + * pre-distribute a job that can be executed later + * + */ +public class DeployJobSpecWork extends AbstractWork { + + private final NodeControllerService ncs; + private final byte[] acgBytes; + private final DeployedJobSpecId deployedJobSpecId; + + public DeployJobSpecWork(NodeControllerService ncs, DeployedJobSpecId deployedJobSpecId, byte[] acgBytes) { + this.ncs = ncs; + this.deployedJobSpecId = deployedJobSpecId; + this.acgBytes = acgBytes; + } + + @Override + public void run() { + try { + ncs.checkForDuplicateDeployedJobSpec(deployedJobSpecId); + ActivityClusterGraph acg = + (ActivityClusterGraph) DeploymentUtils.deserialize(acgBytes, null, ncs.getContext()); + ncs.storeActivityClusterGraph(deployedJobSpecId, acg); + } catch (HyracksException e) { + try { + ncs.getClusterController().notifyDeployedJobSpecFailure(deployedJobSpecId, ncs.getId()); + } catch (Exception e1) { + e1.printStackTrace(); + } + } + + } + +} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e6f426b8/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/DestroyJobWork.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/DestroyJobWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/DestroyJobWork.java deleted file mode 100644 index 55dd01e..0000000 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/DestroyJobWork.java +++ /dev/null @@ -1,54 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.hyracks.control.nc.work; - -import org.apache.hyracks.api.exceptions.HyracksException; -import org.apache.hyracks.api.job.JobId; -import org.apache.hyracks.control.common.work.AbstractWork; -import org.apache.hyracks.control.nc.NodeControllerService; - -/** - * destroy a pre-distributed job - * - */ -public class DestroyJobWork extends AbstractWork { - - private final NodeControllerService ncs; - private final JobId jobId; - - public DestroyJobWork(NodeControllerService ncs, JobId jobId) { - this.ncs = ncs; - this.jobId = jobId; - } - - @Override - public void run() { - try { - ncs.removeActivityClusterGraph(jobId); - } catch (HyracksException e) { - try { - ncs.getClusterController().notifyDistributedJobFailure(jobId, ncs.getId()); - } catch (Exception e1) { - e1.printStackTrace(); - } - } - } - -} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e6f426b8/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/DistributeJobWork.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/DistributeJobWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/DistributeJobWork.java deleted file mode 100644 index 486a420..0000000 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/DistributeJobWork.java +++ /dev/null @@ -1,63 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.hyracks.control.nc.work; - -import org.apache.hyracks.api.exceptions.HyracksException; -import org.apache.hyracks.api.job.ActivityClusterGraph; -import org.apache.hyracks.api.job.JobId; -import org.apache.hyracks.control.common.deployment.DeploymentUtils; -import org.apache.hyracks.control.common.work.AbstractWork; -import org.apache.hyracks.control.nc.NodeControllerService; - -/** - * pre-distribute a job that can be executed later - * - */ -public class DistributeJobWork extends AbstractWork { - - private final NodeControllerService ncs; - private final byte[] acgBytes; - private final JobId jobId; - - public DistributeJobWork(NodeControllerService ncs, JobId jobId, byte[] acgBytes) { - this.ncs = ncs; - this.jobId = jobId; - this.acgBytes = acgBytes; - } - - @Override - public void run() { - try { - ncs.checkForDuplicateDistributedJob(jobId); - ncs.updateMaxJobId(jobId); - ActivityClusterGraph acg = - (ActivityClusterGraph) DeploymentUtils.deserialize(acgBytes, null, ncs.getContext()); - ncs.storeActivityClusterGraph(jobId, acg); - } catch (HyracksException e) { - try { - ncs.getClusterController().notifyDistributedJobFailure(jobId, ncs.getId()); - } catch (Exception e1) { - e1.printStackTrace(); - } - } - - } - -} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e6f426b8/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/StartTasksWork.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/StartTasksWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/StartTasksWork.java index c369781..a2fcc25 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/StartTasksWork.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/StartTasksWork.java @@ -47,11 +47,12 @@ import org.apache.hyracks.api.dataflow.connectors.IConnectorPolicy; import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider; import org.apache.hyracks.api.dataflow.value.RecordDescriptor; import org.apache.hyracks.api.deployment.DeploymentId; -import org.apache.hyracks.api.exceptions.ErrorCode; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.api.exceptions.HyracksException; import org.apache.hyracks.api.job.ActivityCluster; import org.apache.hyracks.api.job.ActivityClusterGraph; +import org.apache.hyracks.api.job.DeployedJobSpecId; +import org.apache.hyracks.api.job.IJobletEventListenerFactory; import org.apache.hyracks.api.job.JobFlag; import org.apache.hyracks.api.job.JobId; import org.apache.hyracks.api.partitions.PartitionId; @@ -79,6 +80,8 @@ public class StartTasksWork extends AbstractWork { private final JobId jobId; + private final DeployedJobSpecId deployedJobSpecId; + private final byte[] acgBytes; private final List<TaskAttemptDescriptor> taskDescriptors; @@ -87,16 +90,21 @@ public class StartTasksWork extends AbstractWork { private final Set<JobFlag> flags; + private final Map<byte[], byte[]> jobParameters; + public StartTasksWork(NodeControllerService ncs, DeploymentId deploymentId, JobId jobId, byte[] acgBytes, List<TaskAttemptDescriptor> taskDescriptors, - Map<ConnectorDescriptorId, IConnectorPolicy> connectorPoliciesMap, Set<JobFlag> flags) { + Map<ConnectorDescriptorId, IConnectorPolicy> connectorPoliciesMap, Set<JobFlag> flags, + Map<byte[], byte[]> jobParameters, DeployedJobSpecId deployedJobSpecId) { this.ncs = ncs; this.deploymentId = deploymentId; this.jobId = jobId; + this.deployedJobSpecId = deployedJobSpecId; this.acgBytes = acgBytes; this.taskDescriptors = taskDescriptors; this.connectorPoliciesMap = connectorPoliciesMap; this.flags = flags; + this.jobParameters = jobParameters; } @Override @@ -106,7 +114,7 @@ public class StartTasksWork extends AbstractWork { try { ncs.updateMaxJobId(jobId); NCServiceContext serviceCtx = ncs.getContext(); - Joblet joblet = getOrCreateLocalJoblet(deploymentId, jobId, serviceCtx, acgBytes); + Joblet joblet = getOrCreateLocalJoblet(deploymentId, serviceCtx, acgBytes); final ActivityClusterGraph acg = joblet.getActivityClusterGraph(); IRecordDescriptorProvider rdp = new IRecordDescriptorProvider() { @Override @@ -190,19 +198,22 @@ public class StartTasksWork extends AbstractWork { } } - private Joblet getOrCreateLocalJoblet(DeploymentId deploymentId, JobId jobId, INCServiceContext appCtx, - byte[] acgBytes) throws HyracksException { + private Joblet getOrCreateLocalJoblet(DeploymentId deploymentId, INCServiceContext appCtx, byte[] acgBytes) + throws HyracksException { Map<JobId, Joblet> jobletMap = ncs.getJobletMap(); Joblet ji = jobletMap.get(jobId); if (ji == null) { - ActivityClusterGraph acg = ncs.getActivityClusterGraph(jobId); - if (acg == null) { - if (acgBytes == null) { - throw HyracksException.create(ErrorCode.ERROR_FINDING_DISTRIBUTED_JOB, jobId); + ActivityClusterGraph acg = (deployedJobSpecId != null) ? ncs.getActivityClusterGraph(deployedJobSpecId) + : (ActivityClusterGraph) DeploymentUtils.deserialize(acgBytes, deploymentId, appCtx); + ncs.createOrGetJobParameterByteStore(jobId).setParameters(jobParameters); + IJobletEventListenerFactory listenerFactory = acg.getJobletEventListenerFactory(); + if (listenerFactory != null) { + if (deployedJobSpecId != null) { + listenerFactory = acg.getJobletEventListenerFactory().copyFactory(); } - acg = (ActivityClusterGraph) DeploymentUtils.deserialize(acgBytes, deploymentId, appCtx); + listenerFactory.updateListenerJobParameters(ncs.createOrGetJobParameterByteStore(jobId)); } - ji = new Joblet(ncs, deploymentId, jobId, appCtx, acg); + ji = new Joblet(ncs, deploymentId, jobId, appCtx, acg, listenerFactory); jobletMap.put(jobId, ji); } return ji; http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e6f426b8/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/UndeployJobSpecWork.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/UndeployJobSpecWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/UndeployJobSpecWork.java new file mode 100644 index 0000000..4383ff6 --- /dev/null +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/UndeployJobSpecWork.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hyracks.control.nc.work; + +import org.apache.hyracks.api.exceptions.HyracksException; +import org.apache.hyracks.api.job.DeployedJobSpecId; +import org.apache.hyracks.control.common.work.AbstractWork; +import org.apache.hyracks.control.nc.NodeControllerService; + +/** + * remove the deployed job + * + */ +public class UndeployJobSpecWork extends AbstractWork { + + private final NodeControllerService ncs; + private final DeployedJobSpecId deployedJobSpecId; + + public UndeployJobSpecWork(NodeControllerService ncs, DeployedJobSpecId deployedJobSpecId) { + this.ncs = ncs; + this.deployedJobSpecId = deployedJobSpecId; + } + + @Override + public void run() { + try { + ncs.removeActivityClusterGraph(deployedJobSpecId); + } catch (HyracksException e) { + try { + ncs.getClusterController().notifyDeployedJobSpecFailure(deployedJobSpecId, ncs.getId()); + } catch (Exception e1) { + e1.printStackTrace(); + } + } + } + +} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e6f426b8/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/DeployedJobSpecsTest.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/DeployedJobSpecsTest.java b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/DeployedJobSpecsTest.java new file mode 100644 index 0000000..dd4fdd1 --- /dev/null +++ b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/DeployedJobSpecsTest.java @@ -0,0 +1,206 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hyracks.tests.integration; + +import static org.apache.hyracks.util.file.FileUtil.joinPath; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.verify; + +import java.io.File; +import java.lang.reflect.Field; +import java.util.HashMap; +import java.util.logging.Level; +import java.util.logging.Logger; + +import org.apache.commons.io.FileUtils; +import org.apache.hyracks.api.client.HyracksConnection; +import org.apache.hyracks.api.client.IHyracksClientConnection; +import org.apache.hyracks.api.job.DeployedJobSpecId; +import org.apache.hyracks.api.job.JobId; +import org.apache.hyracks.api.job.JobSpecification; +import org.apache.hyracks.control.cc.ClusterControllerService; +import org.apache.hyracks.control.cc.cluster.INodeManager; +import org.apache.hyracks.control.cc.cluster.NodeManager; +import org.apache.hyracks.control.common.controllers.CCConfig; +import org.apache.hyracks.control.common.controllers.NCConfig; +import org.apache.hyracks.control.nc.NodeControllerService; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; +import org.mockito.Mockito; + +public class DeployedJobSpecsTest { + private static final Logger LOGGER = Logger.getLogger(DeployedJobSpecsTest.class.getName()); + + private static final String NC1_ID = "nc1"; + private static final String NC2_ID = "nc2"; + private static final int TIME_THRESHOLD = 5000; + + private static ClusterControllerService cc; + private static NodeControllerService nc1; + private static NodeControllerService nc2; + private static IHyracksClientConnection hcc; + + @BeforeClass + public static void init() throws Exception { + CCConfig ccConfig = new CCConfig(); + ccConfig.setClientListenAddress("127.0.0.1"); + ccConfig.setClientListenPort(39000); + ccConfig.setClusterListenAddress("127.0.0.1"); + ccConfig.setClusterListenPort(39001); + ccConfig.setProfileDumpPeriod(10000); + FileUtils.deleteQuietly(new File(joinPath("target", "data"))); + FileUtils.copyDirectory(new File("data"), new File(joinPath("target", "data"))); + File outDir = new File("target" + File.separator + "ClusterController"); + outDir.mkdirs(); + File ccRoot = File.createTempFile(AbstractIntegrationTest.class.getName(), ".data", outDir); + ccRoot.delete(); + ccRoot.mkdir(); + ccConfig.setRootDir(ccRoot.getAbsolutePath()); + ClusterControllerService ccBase = new ClusterControllerService(ccConfig); + // The spying below is dangerous since it replaces the ClusterControllerService already referenced by many + // objects created in the constructor above + cc = Mockito.spy(ccBase); + cc.start(); + + // The following code partially fixes the problem created by the spying + INodeManager nodeManager = cc.getNodeManager(); + Field ccsInNodeManager = NodeManager.class.getDeclaredField("ccs"); + ccsInNodeManager.setAccessible(true); + ccsInNodeManager.set(nodeManager, cc); + + NCConfig ncConfig1 = new NCConfig(NC1_ID); + ncConfig1.setClusterAddress("localhost"); + ncConfig1.setClusterPort(39001); + ncConfig1.setClusterListenAddress("127.0.0.1"); + ncConfig1.setDataListenAddress("127.0.0.1"); + ncConfig1.setResultListenAddress("127.0.0.1"); + ncConfig1.setResultSweepThreshold(TIME_THRESHOLD); + ncConfig1.setIODevices(new String[] { joinPath(System.getProperty("user.dir"), "target", "data", "device0") }); + NodeControllerService nc1Base = new NodeControllerService(ncConfig1); + nc1 = Mockito.spy(nc1Base); + nc1.start(); + + NCConfig ncConfig2 = new NCConfig(NC2_ID); + ncConfig2.setClusterAddress("localhost"); + ncConfig2.setClusterPort(39001); + ncConfig2.setClusterListenAddress("127.0.0.1"); + ncConfig2.setDataListenAddress("127.0.0.1"); + ncConfig2.setResultListenAddress("127.0.0.1"); + ncConfig2.setResultSweepThreshold(TIME_THRESHOLD); + ncConfig2.setIODevices(new String[] { joinPath(System.getProperty("user.dir"), "target", "data", "device1") }); + NodeControllerService nc2Base = new NodeControllerService(ncConfig2); + nc2 = Mockito.spy(nc2Base); + nc2.start(); + + hcc = new HyracksConnection(ccConfig.getClientListenAddress(), ccConfig.getClientListenPort()); + if (LOGGER.isLoggable(Level.INFO)) { + LOGGER.info("Starting CC in " + ccRoot.getAbsolutePath()); + } + } + + @Test + public void DistributedTest() throws Exception { + JobSpecification spec1 = UnionTest.createUnionJobSpec(); + JobSpecification spec2 = HeapSortMergeTest.createSortMergeJobSpec(); + + //distribute both jobs + DeployedJobSpecId distributedId1 = hcc.deployJobSpec(spec1); + DeployedJobSpecId distributedId2 = hcc.deployJobSpec(spec2); + + //make sure it finished + //cc will get the store once to check for duplicate insertion and once to insert per job + verify(cc, Mockito.timeout(TIME_THRESHOLD).times(4)).getDeployedJobSpecStore(); + verify(nc1, Mockito.timeout(TIME_THRESHOLD).times(2)).storeActivityClusterGraph(any(), any()); + verify(nc2, Mockito.timeout(TIME_THRESHOLD).times(2)).storeActivityClusterGraph(any(), any()); + verify(nc1, Mockito.timeout(TIME_THRESHOLD).times(2)).checkForDuplicateDeployedJobSpec(any()); + verify(nc2, Mockito.timeout(TIME_THRESHOLD).times(2)).checkForDuplicateDeployedJobSpec(any()); + + //confirm that both jobs are distributed + Assert.assertTrue(nc1.getActivityClusterGraph(distributedId1) != null && nc2.getActivityClusterGraph(distributedId1) != null); + Assert.assertTrue(nc1.getActivityClusterGraph(distributedId2) != null && nc2.getActivityClusterGraph(distributedId2) != null); + Assert.assertTrue(cc.getDeployedJobSpecStore().getDeployedJobSpecDescriptor(distributedId1) != null); + Assert.assertTrue(cc.getDeployedJobSpecStore().getDeployedJobSpecDescriptor(distributedId2) != null); + + //run the first job + JobId jobRunId1 = hcc.startJob(distributedId1, new HashMap<>()); + hcc.waitForCompletion(jobRunId1); + + //Make sure the job parameter map was removed + verify(cc, Mockito.timeout(TIME_THRESHOLD).times(1)).removeJobParameterByteStore(any()); + verify(nc1, Mockito.timeout(TIME_THRESHOLD).times(1)).removeJobParameterByteStore(any()); + verify(nc2, Mockito.timeout(TIME_THRESHOLD).times(1)).removeJobParameterByteStore(any()); + + //destroy the first job + hcc.undeployJobSpec(distributedId1); + + //make sure it finished + verify(cc, Mockito.timeout(TIME_THRESHOLD).times(8)).getDeployedJobSpecStore(); + verify(nc1, Mockito.timeout(TIME_THRESHOLD).times(1)).removeActivityClusterGraph(any()); + verify(nc2, Mockito.timeout(TIME_THRESHOLD).times(1)).removeActivityClusterGraph(any()); + + //confirm the first job is destroyed + Assert.assertTrue(nc1.getActivityClusterGraph(distributedId1) == null && nc2.getActivityClusterGraph(distributedId1) == null); + cc.getDeployedJobSpecStore().checkForExistingDeployedJobSpecDescriptor(distributedId1); + + //run the second job + JobId jobRunId2 = hcc.startJob(distributedId2, new HashMap<>()); + hcc.waitForCompletion(jobRunId2); + + //Make sure the job parameter map was removed + verify(cc, Mockito.timeout(TIME_THRESHOLD).times(2)).removeJobParameterByteStore(any()); + verify(nc1, Mockito.timeout(TIME_THRESHOLD).times(2)).removeJobParameterByteStore(any()); + verify(nc2, Mockito.timeout(TIME_THRESHOLD).times(2)).removeJobParameterByteStore(any()); + + //run the second job again + JobId jobRunId3 = hcc.startJob(distributedId2, new HashMap<>()); + hcc.waitForCompletion(jobRunId3); + + //Make sure the job parameter map was removed + verify(cc, Mockito.timeout(TIME_THRESHOLD).times(3)).removeJobParameterByteStore(any()); + verify(nc1, Mockito.timeout(TIME_THRESHOLD).times(3)).removeJobParameterByteStore(any()); + verify(nc2, Mockito.timeout(TIME_THRESHOLD).times(3)).removeJobParameterByteStore(any()); + + //destroy the second job + hcc.undeployJobSpec(distributedId2); + + //make sure it finished + verify(cc, Mockito.timeout(TIME_THRESHOLD).times(12)).getDeployedJobSpecStore(); + verify(nc1, Mockito.timeout(TIME_THRESHOLD).times(2)).removeActivityClusterGraph(any()); + verify(nc2, Mockito.timeout(TIME_THRESHOLD).times(2)).removeActivityClusterGraph(any()); + + //confirm the second job is destroyed + Assert.assertTrue(nc1.getActivityClusterGraph(distributedId2) == null && nc2.getActivityClusterGraph(distributedId2) == null); + cc.getDeployedJobSpecStore().checkForExistingDeployedJobSpecDescriptor(distributedId2); + + //run the second job 100 times in parallel + distributedId2 = hcc.deployJobSpec(spec2); + for (int i = 0; i < 100; i++) { + hcc.startJob(distributedId2, new HashMap<>()); + } + } + + @AfterClass + public static void deinit() throws Exception { + nc2.stop(); + nc1.stop(); + cc.stop(); + } +} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e6f426b8/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/PredistributedJobsTest.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/PredistributedJobsTest.java b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/PredistributedJobsTest.java deleted file mode 100644 index caba5f6..0000000 --- a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/PredistributedJobsTest.java +++ /dev/null @@ -1,186 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.hyracks.tests.integration; - -import static org.apache.hyracks.util.file.FileUtil.joinPath; -import static org.mockito.Matchers.any; -import static org.mockito.Mockito.verify; - -import java.io.File; -import java.lang.reflect.Field; -import java.util.logging.Level; -import java.util.logging.Logger; - -import org.apache.commons.io.FileUtils; -import org.apache.hyracks.api.client.HyracksConnection; -import org.apache.hyracks.api.client.IHyracksClientConnection; -import org.apache.hyracks.api.job.JobId; -import org.apache.hyracks.api.job.JobSpecification; -import org.apache.hyracks.control.cc.ClusterControllerService; -import org.apache.hyracks.control.cc.cluster.INodeManager; -import org.apache.hyracks.control.cc.cluster.NodeManager; -import org.apache.hyracks.control.common.controllers.CCConfig; -import org.apache.hyracks.control.common.controllers.NCConfig; -import org.apache.hyracks.control.nc.NodeControllerService; -import org.junit.AfterClass; -import org.junit.Assert; -import org.junit.BeforeClass; -import org.junit.Test; -import org.mockito.Mockito; - -public class PredistributedJobsTest { - private static final Logger LOGGER = Logger.getLogger(PredistributedJobsTest.class.getName()); - - private static final String NC1_ID = "nc1"; - private static final String NC2_ID = "nc2"; - - private static ClusterControllerService cc; - private static NodeControllerService nc1; - private static NodeControllerService nc2; - private static IHyracksClientConnection hcc; - - @BeforeClass - public static void init() throws Exception { - CCConfig ccConfig = new CCConfig(); - ccConfig.setClientListenAddress("127.0.0.1"); - ccConfig.setClientListenPort(39000); - ccConfig.setClusterListenAddress("127.0.0.1"); - ccConfig.setClusterListenPort(39001); - ccConfig.setProfileDumpPeriod(10000); - FileUtils.deleteQuietly(new File(joinPath("target", "data"))); - FileUtils.copyDirectory(new File("data"), new File(joinPath("target", "data"))); - File outDir = new File("target" + File.separator + "ClusterController"); - outDir.mkdirs(); - File ccRoot = File.createTempFile(AbstractIntegrationTest.class.getName(), ".data", outDir); - ccRoot.delete(); - ccRoot.mkdir(); - ccConfig.setRootDir(ccRoot.getAbsolutePath()); - ClusterControllerService ccBase = new ClusterControllerService(ccConfig); - // The spying below is dangerous since it replaces the ClusterControllerService already referenced by many - // objects created in the constructor above - cc = Mockito.spy(ccBase); - cc.start(); - - // The following code partially fixes the problem created by the spying - INodeManager nodeManager = cc.getNodeManager(); - Field ccsInNodeManager = NodeManager.class.getDeclaredField("ccs"); - ccsInNodeManager.setAccessible(true); - ccsInNodeManager.set(nodeManager, cc); - - NCConfig ncConfig1 = new NCConfig(NC1_ID); - ncConfig1.setClusterAddress("localhost"); - ncConfig1.setClusterPort(39001); - ncConfig1.setClusterListenAddress("127.0.0.1"); - ncConfig1.setDataListenAddress("127.0.0.1"); - ncConfig1.setResultListenAddress("127.0.0.1"); - ncConfig1.setResultSweepThreshold(5000); - ncConfig1.setIODevices(new String[] { joinPath(System.getProperty("user.dir"), "target", "data", "device0") }); - NodeControllerService nc1Base = new NodeControllerService(ncConfig1); - nc1 = Mockito.spy(nc1Base); - nc1.start(); - - NCConfig ncConfig2 = new NCConfig(NC2_ID); - ncConfig2.setClusterAddress("localhost"); - ncConfig2.setClusterPort(39001); - ncConfig2.setClusterListenAddress("127.0.0.1"); - ncConfig2.setDataListenAddress("127.0.0.1"); - ncConfig2.setResultListenAddress("127.0.0.1"); - ncConfig2.setResultSweepThreshold(5000); - ncConfig2.setIODevices(new String[] { joinPath(System.getProperty("user.dir"), "target", "data", "device1") }); - NodeControllerService nc2Base = new NodeControllerService(ncConfig2); - nc2 = Mockito.spy(nc2Base); - nc2.start(); - - hcc = new HyracksConnection(ccConfig.getClientListenAddress(), ccConfig.getClientListenPort()); - if (LOGGER.isLoggable(Level.INFO)) { - LOGGER.info("Starting CC in " + ccRoot.getAbsolutePath()); - } - } - - @Test - public void DistributedTest() throws Exception { - JobSpecification spec1 = UnionTest.createUnionJobSpec(); - JobSpecification spec2 = HeapSortMergeTest.createSortMergeJobSpec(); - - //distribute both jobs - JobId jobId1 = hcc.distributeJob(spec1); - JobId jobId2 = hcc.distributeJob(spec2); - - //make sure it finished - //cc will get the store once to check for duplicate insertion and once to insert per job - verify(cc, Mockito.timeout(5000).times(4)).getPreDistributedJobStore(); - verify(nc1, Mockito.timeout(5000).times(2)).storeActivityClusterGraph(any(), any()); - verify(nc2, Mockito.timeout(5000).times(2)).storeActivityClusterGraph(any(), any()); - verify(nc1, Mockito.timeout(5000).times(2)).checkForDuplicateDistributedJob(any()); - verify(nc2, Mockito.timeout(5000).times(2)).checkForDuplicateDistributedJob(any()); - - //confirm that both jobs are distributed - Assert.assertTrue(nc1.getActivityClusterGraph(jobId1) != null && nc2.getActivityClusterGraph(jobId1) != null); - Assert.assertTrue(nc1.getActivityClusterGraph(jobId2) != null && nc2.getActivityClusterGraph(jobId2) != null); - Assert.assertTrue(cc.getPreDistributedJobStore().getDistributedJobDescriptor(jobId1) != null); - Assert.assertTrue(cc.getPreDistributedJobStore().getDistributedJobDescriptor(jobId2) != null); - - //run the first job - hcc.startJob(jobId1); - hcc.waitForCompletion(jobId1); - - //destroy the first job - hcc.destroyJob(jobId1); - - //make sure it finished - verify(cc, Mockito.timeout(5000).times(8)).getPreDistributedJobStore(); - verify(nc1, Mockito.timeout(5000).times(1)).removeActivityClusterGraph(any()); - verify(nc2, Mockito.timeout(5000).times(1)).removeActivityClusterGraph(any()); - - //confirm the first job is destroyed - Assert.assertTrue(nc1.getActivityClusterGraph(jobId1) == null && nc2.getActivityClusterGraph(jobId1) == null); - cc.getPreDistributedJobStore().checkForExistingDistributedJobDescriptor(jobId1); - - //run the second job - hcc.startJob(jobId2); - hcc.waitForCompletion(jobId2); - - //wait ten seconds to ensure the result sweeper does not break the job - //The result sweeper runs every 5 seconds during the tests - Thread.sleep(10000); - - //run the second job again - hcc.startJob(jobId2); - hcc.waitForCompletion(jobId2); - - //destroy the second job - hcc.destroyJob(jobId2); - - //make sure it finished - verify(cc, Mockito.timeout(5000).times(12)).getPreDistributedJobStore(); - verify(nc1, Mockito.timeout(5000).times(2)).removeActivityClusterGraph(any()); - verify(nc2, Mockito.timeout(5000).times(2)).removeActivityClusterGraph(any()); - - //confirm the second job is destroyed - Assert.assertTrue(nc1.getActivityClusterGraph(jobId2) == null && nc2.getActivityClusterGraph(jobId2) == null); - cc.getPreDistributedJobStore().checkForExistingDistributedJobDescriptor(jobId2); - } - - @AfterClass - public static void deinit() throws Exception { - nc2.stop(); - nc1.stop(); - cc.stop(); - } -} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e6f426b8/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestJobletContext.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestJobletContext.java b/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestJobletContext.java index d3c34dd..e310385 100644 --- a/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestJobletContext.java +++ b/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestJobletContext.java @@ -26,6 +26,7 @@ import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.api.exceptions.HyracksException; import org.apache.hyracks.api.io.FileReference; import org.apache.hyracks.api.io.IIOManager; +import org.apache.hyracks.api.job.IJobletEventListenerFactory; import org.apache.hyracks.api.job.JobId; import org.apache.hyracks.api.job.profiling.counters.ICounterContext; import org.apache.hyracks.api.resources.IDeallocatable; @@ -56,10 +57,15 @@ public class TestJobletContext implements IHyracksJobletContext { return frameManger.allocateFrame(bytes); } - ByteBuffer reallocateFrame(ByteBuffer tobeDeallocate, int newFrameSizeInBytes, boolean copyOldData) throws HyracksDataException { + ByteBuffer reallocateFrame(ByteBuffer tobeDeallocate, int newFrameSizeInBytes, boolean copyOldData) + throws HyracksDataException { return frameManger.reallocateFrame(tobeDeallocate, newFrameSizeInBytes, copyOldData); } + public IJobletEventListenerFactory getJobletEventListenerFactory() { + return null; + } + void deallocateFrames(int bytes) { frameManger.deallocateFrames(bytes); } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e6f426b8/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestTaskContext.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestTaskContext.java b/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestTaskContext.java index 3d13cf9..ac52573 100644 --- a/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestTaskContext.java +++ b/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestTaskContext.java @@ -163,6 +163,10 @@ public class TestTaskContext implements IHyracksTaskContext { } @Override + public byte[] getJobParameter(byte[] name, int start, int length) { + return new byte[0]; + } + public Set<JobFlag> getJobFlags() { return EnumSet.noneOf(JobFlag.class); }
