http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e6f426b8/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/ActivityClusterId.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/ActivityClusterId.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/ActivityClusterId.java index e0c5279..84a754a 100644 --- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/ActivityClusterId.java +++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/ActivityClusterId.java @@ -23,19 +23,12 @@ import java.io.Serializable; public final class ActivityClusterId implements Serializable { private static final long serialVersionUID = 1L; - private final JobId jobId; - private final int id; - public ActivityClusterId(JobId jobId, int id) { - this.jobId = jobId; + public ActivityClusterId(int id) { this.id = id; } - public JobId getJobId() { - return jobId; - } - public int getId() { return id; } @@ -45,7 +38,6 @@ public final class ActivityClusterId implements Serializable { final int prime = 31; int result = 1; result = prime * result + id; - result = prime * result + ((jobId == null) ? 0 : jobId.hashCode()); return result; } @@ -64,18 +56,11 @@ public final class ActivityClusterId implements Serializable { if (id != other.id) { return false; } - if (jobId == null) { - if (other.jobId != null) { - return false; - } - } else if (!jobId.equals(other.jobId)) { - return false; - } return true; } @Override public String toString() { - return "ACID:" + jobId + ":" + id; + return "ACID:" + id; } }
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e6f426b8/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/DeployedJobSpecId.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/DeployedJobSpecId.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/DeployedJobSpecId.java new file mode 100644 index 0000000..8cbfb1a --- /dev/null +++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/DeployedJobSpecId.java @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hyracks.api.job; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.io.Serializable; + +import org.apache.hyracks.api.exceptions.ErrorCode; +import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.api.io.IWritable; + +public final class DeployedJobSpecId implements IWritable, Serializable { + + public static final DeployedJobSpecId INVALID = new DeployedJobSpecId(-1l); + + private static final long serialVersionUID = 1L; + private long id; + + public static DeployedJobSpecId create(DataInput dis) throws IOException { + DeployedJobSpecId deployedJobSpecId = new DeployedJobSpecId(); + deployedJobSpecId.readFields(dis); + return deployedJobSpecId; + } + + private DeployedJobSpecId() { + } + + public DeployedJobSpecId(long id) { + this.id = id; + } + + public long getId() { + return id; + } + + @Override + public int hashCode() { + return (int) id; + } + + @Override + public boolean equals(Object o) { + if (o == this) { + return true; + } + if (!(o instanceof DeployedJobSpecId)) { + return false; + } + return ((DeployedJobSpecId) o).id == id; + } + + @Override + public String toString() { + return "PDJID:" + id; + } + + public static DeployedJobSpecId parse(String str) throws HyracksDataException { + if (str.startsWith("PDJID:")) { + return new DeployedJobSpecId(Long.parseLong(str.substring(4))); + } + throw HyracksDataException.create(ErrorCode.NOT_A_JOBID, str); + } + + @Override + public void writeFields(DataOutput output) throws IOException { + output.writeLong(id); + } + + @Override + public void readFields(DataInput input) throws IOException { + id = input.readLong(); + } +} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e6f426b8/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/DeployedJobSpecIdFactory.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/DeployedJobSpecIdFactory.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/DeployedJobSpecIdFactory.java new file mode 100644 index 0000000..24caa9b --- /dev/null +++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/DeployedJobSpecIdFactory.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hyracks.api.job; + +import java.util.concurrent.atomic.AtomicLong; + +public class DeployedJobSpecIdFactory { + private final AtomicLong id = new AtomicLong(0); + + public DeployedJobSpecId create() { + return new DeployedJobSpecId(id.getAndIncrement()); + } + + public long maxDeployedJobSpecId() { + return id.get(); + } + +} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e6f426b8/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/IActivityClusterGraphGeneratorFactory.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/IActivityClusterGraphGeneratorFactory.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/IActivityClusterGraphGeneratorFactory.java index 133e342..d23b944 100644 --- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/IActivityClusterGraphGeneratorFactory.java +++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/IActivityClusterGraphGeneratorFactory.java @@ -25,7 +25,7 @@ import org.apache.hyracks.api.application.ICCServiceContext; import org.apache.hyracks.api.exceptions.HyracksException; public interface IActivityClusterGraphGeneratorFactory extends Serializable { - public IActivityClusterGraphGenerator createActivityClusterGraphGenerator(JobId jobId, + public IActivityClusterGraphGenerator createActivityClusterGraphGenerator( ICCServiceContext ccServiceCtx, Set<JobFlag> jobFlags) throws HyracksException; public JobSpecification getJobSpecification(); http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e6f426b8/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/IJobletEventListenerFactory.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/IJobletEventListenerFactory.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/IJobletEventListenerFactory.java index d523ccc..bd2f189 100644 --- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/IJobletEventListenerFactory.java +++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/IJobletEventListenerFactory.java @@ -23,5 +23,10 @@ import java.io.Serializable; import org.apache.hyracks.api.context.IHyracksJobletContext; public interface IJobletEventListenerFactory extends Serializable { - public IJobletEventListener createListener(IHyracksJobletContext ctx); + IJobletEventListener createListener(IHyracksJobletContext ctx); + + IJobletEventListenerFactory copyFactory(); + + //Allows job parameters to change listener settings + void updateListenerJobParameters(JobParameterByteStore jobParameterByteStore); } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e6f426b8/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobParameterByteStore.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobParameterByteStore.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobParameterByteStore.java new file mode 100644 index 0000000..551b3d7 --- /dev/null +++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobParameterByteStore.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hyracks.api.job; + +import java.io.Serializable; +import java.util.HashMap; +import java.util.Map; +import java.util.Map.Entry; + +public class JobParameterByteStore implements Serializable { + + private static final long serialVersionUID = 1L; + + private Map<byte[], byte[]> runtimeValues; + private final byte[] empty = new byte[0]; + + public JobParameterByteStore() { + runtimeValues = new HashMap<>(); + } + + public Map<byte[], byte[]> getParameterMap() { + return runtimeValues; + } + + public void setParameters(Map<byte[], byte[]> map) { + runtimeValues = map; + } + + public byte[] getParameterValue(byte[] name, int start, int length) { + for (Entry<byte[], byte[]> entry : runtimeValues.entrySet()) { + byte[] key = entry.getKey(); + if (key.length == length) { + boolean matched = true; + for (int j = 0; j < length; j++) { + if (key[j] != name[j + start]) { + matched = false; + break; + } + } + if (matched) { + return entry.getValue(); + } + } + } + return empty; + } + +} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e6f426b8/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClientInterfaceIPCI.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClientInterfaceIPCI.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClientInterfaceIPCI.java index 327c422..4e3c0f5 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClientInterfaceIPCI.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClientInterfaceIPCI.java @@ -24,15 +24,15 @@ import java.util.logging.Logger; import org.apache.hyracks.api.client.HyracksClientInterfaceFunctions; import org.apache.hyracks.api.comm.NetworkAddress; import org.apache.hyracks.api.dataset.DatasetJobRecord.Status; -import org.apache.hyracks.api.job.JobId; +import org.apache.hyracks.api.job.DeployedJobSpecId; +import org.apache.hyracks.api.job.DeployedJobSpecIdFactory; import org.apache.hyracks.api.job.JobIdFactory; import org.apache.hyracks.api.job.JobInfo; import org.apache.hyracks.control.cc.work.CancelJobWork; import org.apache.hyracks.control.cc.work.CliDeployBinaryWork; import org.apache.hyracks.control.cc.work.CliUnDeployBinaryWork; import org.apache.hyracks.control.cc.work.ClusterShutdownWork; -import org.apache.hyracks.control.cc.work.DestroyJobWork; -import org.apache.hyracks.control.cc.work.DistributeJobWork; +import org.apache.hyracks.control.cc.work.DeployJobSpecWork; import org.apache.hyracks.control.cc.work.GetDatasetDirectoryServiceInfoWork; import org.apache.hyracks.control.cc.work.GetJobInfoWork; import org.apache.hyracks.control.cc.work.GetJobStatusWork; @@ -42,6 +42,7 @@ import org.apache.hyracks.control.cc.work.GetResultPartitionLocationsWork; import org.apache.hyracks.control.cc.work.GetResultStatusWork; import org.apache.hyracks.control.cc.work.GetThreadDumpWork; import org.apache.hyracks.control.cc.work.JobStartWork; +import org.apache.hyracks.control.cc.work.UndeployJobSpecWork; import org.apache.hyracks.control.cc.work.WaitForJobCompletionWork; import org.apache.hyracks.control.common.work.IPCResponder; import org.apache.hyracks.ipc.api.IIPCHandle; @@ -53,10 +54,12 @@ class ClientInterfaceIPCI implements IIPCI { private static final Logger LOGGER = Logger.getLogger(ClientInterfaceIPCI.class.getName()); private final ClusterControllerService ccs; private final JobIdFactory jobIdFactory; + private final DeployedJobSpecIdFactory deployedJobSpecIdFactory; - ClientInterfaceIPCI(ClusterControllerService ccs, JobIdFactory jobIdFactory) { + ClientInterfaceIPCI(final ClusterControllerService ccs, final JobIdFactory jobIdFactory) { this.ccs = ccs; this.jobIdFactory = jobIdFactory; + this.deployedJobSpecIdFactory = ccs.getDeployedJobSpecIdFactory(); } @Override @@ -83,16 +86,17 @@ class ClientInterfaceIPCI implements IIPCI { new IPCResponder<JobInfo>(handle, mid))); break; case DISTRIBUTE_JOB: - HyracksClientInterfaceFunctions.DistributeJobFunction djf = - (HyracksClientInterfaceFunctions.DistributeJobFunction) fn; - ccs.getWorkQueue().schedule(new DistributeJobWork(ccs, djf.getACGGFBytes(), jobIdFactory, - new IPCResponder<JobId>(handle, mid))); + HyracksClientInterfaceFunctions.DeployJobSpecFunction djf = + (HyracksClientInterfaceFunctions.DeployJobSpecFunction) fn; + ccs.getWorkQueue().schedule(new DeployJobSpecWork(ccs, djf.getACGGFBytes(), + deployedJobSpecIdFactory.create(), new IPCResponder<>(handle, mid))); break; case DESTROY_JOB: - HyracksClientInterfaceFunctions.DestroyJobFunction dsjf = - (HyracksClientInterfaceFunctions.DestroyJobFunction) fn; + HyracksClientInterfaceFunctions.UndeployJobSpecFunction dsjf = + (HyracksClientInterfaceFunctions.UndeployJobSpecFunction) fn; ccs.getWorkQueue() - .schedule(new DestroyJobWork(ccs, dsjf.getJobId(), new IPCResponder<JobId>(handle, mid))); + .schedule(new UndeployJobSpecWork(ccs, dsjf.getDeployedJobSpecId(), + new IPCResponder<>(handle, mid))); break; case CANCEL_JOB: HyracksClientInterfaceFunctions.CancelJobFunction cjf = @@ -103,8 +107,14 @@ class ClientInterfaceIPCI implements IIPCI { case START_JOB: HyracksClientInterfaceFunctions.StartJobFunction sjf = (HyracksClientInterfaceFunctions.StartJobFunction) fn; - ccs.getWorkQueue().schedule(new JobStartWork(ccs, sjf.getDeploymentId(), sjf.getACGGFBytes(), - sjf.getJobFlags(), sjf.getJobId(), new IPCResponder<JobId>(handle, mid), jobIdFactory)); + DeployedJobSpecId id = sjf.getDeployedJobSpecId(); + byte[] acggfBytes = null; + if (id == null) { + //The job is new + acggfBytes = sjf.getACGGFBytes(); + } + ccs.getWorkQueue().schedule(new JobStartWork(ccs, sjf.getDeploymentId(), acggfBytes, sjf.getJobFlags(), + jobIdFactory, sjf.getJobParameters(), new IPCResponder<>(handle, mid), id)); break; case GET_DATASET_DIRECTORY_SERIVICE_INFO: ccs.getWorkQueue().schedule( http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e6f426b8/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerIPCI.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerIPCI.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerIPCI.java index af5c102..5a53fce 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerIPCI.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerIPCI.java @@ -23,7 +23,7 @@ import java.util.logging.Logger; import org.apache.hyracks.api.client.NodeControllerInfo; import org.apache.hyracks.control.cc.work.ApplicationMessageWork; -import org.apache.hyracks.control.cc.work.DistributedJobFailureWork; +import org.apache.hyracks.control.cc.work.DeployedJobFailureWork; import org.apache.hyracks.control.cc.work.GetNodeControllersInfoWork; import org.apache.hyracks.control.cc.work.JobletCleanupNotificationWork; import org.apache.hyracks.control.cc.work.NodeHeartbeatWork; @@ -76,13 +76,12 @@ class ClusterControllerIPCI implements IIPCI { break; case NOTIFY_JOBLET_CLEANUP: CCNCFunctions.NotifyJobletCleanupFunction njcf = (CCNCFunctions.NotifyJobletCleanupFunction) fn; - ccs.getWorkQueue().schedule(new JobletCleanupNotificationWork(ccs, njcf.getJobId(), - njcf.getNodeId())); + ccs.getWorkQueue().schedule(new JobletCleanupNotificationWork(ccs, njcf.getJobId(), njcf.getNodeId())); break; case NOTIFY_DEPLOY_BINARY: CCNCFunctions.NotifyDeployBinaryFunction ndbf = (CCNCFunctions.NotifyDeployBinaryFunction) fn; - ccs.getWorkQueue().schedule(new NotifyDeployBinaryWork(ccs, ndbf.getDeploymentId(), - ndbf.getNodeId(), ndbf.getDeploymentStatus())); + ccs.getWorkQueue().schedule(new NotifyDeployBinaryWork(ccs, ndbf.getDeploymentId(), ndbf.getNodeId(), + ndbf.getDeploymentStatus())); break; case REPORT_PROFILE: CCNCFunctions.ReportProfileFunction rpf = (CCNCFunctions.ReportProfileFunction) fn; @@ -90,49 +89,48 @@ class ClusterControllerIPCI implements IIPCI { break; case NOTIFY_TASK_COMPLETE: CCNCFunctions.NotifyTaskCompleteFunction ntcf = (CCNCFunctions.NotifyTaskCompleteFunction) fn; - ccs.getWorkQueue().schedule(new TaskCompleteWork(ccs, ntcf.getJobId(), - ntcf.getTaskId(), ntcf.getNodeId(), ntcf.getStatistics())); + ccs.getWorkQueue().schedule(new TaskCompleteWork(ccs, ntcf.getJobId(), ntcf.getTaskId(), + ntcf.getNodeId(), ntcf.getStatistics())); break; case NOTIFY_TASK_FAILURE: CCNCFunctions.NotifyTaskFailureFunction ntff = (CCNCFunctions.NotifyTaskFailureFunction) fn; - ccs.getWorkQueue().schedule(new TaskFailureWork(ccs, ntff.getJobId(), - ntff.getTaskId(), ntff.getNodeId(), ntff.getExceptions())); + ccs.getWorkQueue().schedule(new TaskFailureWork(ccs, ntff.getJobId(), ntff.getTaskId(), + ntff.getNodeId(), ntff.getExceptions())); break; - case DISTRIBUTED_JOB_FAILURE: - CCNCFunctions.ReportDistributedJobFailureFunction rdjf = - (CCNCFunctions.ReportDistributedJobFailureFunction) fn; - ccs.getWorkQueue().schedule(new DistributedJobFailureWork(rdjf.getJobId(), rdjf.getNodeId())); + case DEPLOYED_JOB_FAILURE: + CCNCFunctions.ReportDeployedJobSpecFailureFunction rdjf = + (CCNCFunctions.ReportDeployedJobSpecFailureFunction) fn; + ccs.getWorkQueue() + .schedule(new DeployedJobFailureWork(rdjf.getDeployedJobSpecId(), rdjf.getNodeId())); break; case REGISTER_PARTITION_PROVIDER: CCNCFunctions.RegisterPartitionProviderFunction rppf = (CCNCFunctions.RegisterPartitionProviderFunction) fn; - ccs.getWorkQueue().schedule(new RegisterPartitionAvailibilityWork(ccs, - rppf.getPartitionDescriptor())); + ccs.getWorkQueue().schedule(new RegisterPartitionAvailibilityWork(ccs, rppf.getPartitionDescriptor())); break; case REGISTER_PARTITION_REQUEST: CCNCFunctions.RegisterPartitionRequestFunction rprf = (CCNCFunctions.RegisterPartitionRequestFunction) fn; - ccs.getWorkQueue().schedule(new RegisterPartitionRequestWork(ccs, - rprf.getPartitionRequest())); + ccs.getWorkQueue().schedule(new RegisterPartitionRequestWork(ccs, rprf.getPartitionRequest())); break; case REGISTER_RESULT_PARTITION_LOCATION: CCNCFunctions.RegisterResultPartitionLocationFunction rrplf = (CCNCFunctions.RegisterResultPartitionLocationFunction) fn; - ccs.getWorkQueue().schedule(new RegisterResultPartitionLocationWork(ccs, - rrplf.getJobId(), rrplf.getResultSetId(), rrplf.getOrderedResult(), rrplf.getEmptyResult(), - rrplf.getPartition(), rrplf.getNPartitions(), rrplf.getNetworkAddress())); + ccs.getWorkQueue() + .schedule(new RegisterResultPartitionLocationWork(ccs, rrplf.getJobId(), rrplf.getResultSetId(), + rrplf.getOrderedResult(), rrplf.getEmptyResult(), rrplf.getPartition(), + rrplf.getNPartitions(), rrplf.getNetworkAddress())); break; case REPORT_RESULT_PARTITION_WRITE_COMPLETION: CCNCFunctions.ReportResultPartitionWriteCompletionFunction rrpwc = (CCNCFunctions.ReportResultPartitionWriteCompletionFunction) fn; - ccs.getWorkQueue().schedule(new ReportResultPartitionWriteCompletionWork(ccs, - rrpwc.getJobId(), rrpwc.getResultSetId(), rrpwc.getPartition())); + ccs.getWorkQueue().schedule(new ReportResultPartitionWriteCompletionWork(ccs, rrpwc.getJobId(), + rrpwc.getResultSetId(), rrpwc.getPartition())); break; case SEND_APPLICATION_MESSAGE: - CCNCFunctions.SendApplicationMessageFunction rsf = - (CCNCFunctions.SendApplicationMessageFunction) fn; - ccs.getWorkQueue().schedule(new ApplicationMessageWork(ccs, rsf.getMessage(), - rsf.getDeploymentId(), rsf.getNodeId())); + CCNCFunctions.SendApplicationMessageFunction rsf = (CCNCFunctions.SendApplicationMessageFunction) fn; + ccs.getWorkQueue().schedule( + new ApplicationMessageWork(ccs, rsf.getMessage(), rsf.getDeploymentId(), rsf.getNodeId())); break; case GET_NODE_CONTROLLERS_INFO: ccs.getWorkQueue().schedule(new GetNodeControllersInfoWork(ccs.getNodeManager(), @@ -150,18 +148,17 @@ class ClusterControllerIPCI implements IIPCI { break; case STATE_DUMP_RESPONSE: CCNCFunctions.StateDumpResponseFunction dsrf = (StateDumpResponseFunction) fn; - ccs.getWorkQueue().schedule(new NotifyStateDumpResponse(ccs, dsrf.getNodeId(), - dsrf.getStateDumpId(), dsrf.getState())); + ccs.getWorkQueue().schedule( + new NotifyStateDumpResponse(ccs, dsrf.getNodeId(), dsrf.getStateDumpId(), dsrf.getState())); break; case SHUTDOWN_RESPONSE: CCNCFunctions.ShutdownResponseFunction sdrf = (ShutdownResponseFunction) fn; ccs.getWorkQueue().schedule(new NotifyShutdownWork(ccs, sdrf.getNodeId())); break; case THREAD_DUMP_RESPONSE: - CCNCFunctions.ThreadDumpResponseFunction tdrf = - (CCNCFunctions.ThreadDumpResponseFunction)fn; - ccs.getWorkQueue().schedule(new NotifyThreadDumpResponse(ccs, - tdrf.getRequestId(), tdrf.getThreadDumpJSON())); + CCNCFunctions.ThreadDumpResponseFunction tdrf = (CCNCFunctions.ThreadDumpResponseFunction) fn; + ccs.getWorkQueue() + .schedule(new NotifyThreadDumpResponse(ccs, tdrf.getRequestId(), tdrf.getThreadDumpJSON())); break; default: LOGGER.warning("Unknown function: " + fn.getFunctionId()); http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e6f426b8/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java index 7b99df2..713bddd 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java @@ -50,7 +50,10 @@ import org.apache.hyracks.api.context.ICCContext; import org.apache.hyracks.api.deployment.DeploymentId; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.api.exceptions.HyracksException; +import org.apache.hyracks.api.job.DeployedJobSpecIdFactory; +import org.apache.hyracks.api.job.JobId; import org.apache.hyracks.api.job.JobIdFactory; +import org.apache.hyracks.api.job.JobParameterByteStore; import org.apache.hyracks.api.job.resource.IJobCapacityController; import org.apache.hyracks.api.service.IControllerService; import org.apache.hyracks.api.topology.ClusterTopology; @@ -107,7 +110,9 @@ public class ClusterControllerService implements IControllerService { private CCServiceContext serviceCtx; - private final PreDistributedJobStore preDistributedJobStore = new PreDistributedJobStore(); + private final DeployedJobSpecStore deployedJobSpecStore = new DeployedJobSpecStore(); + + private final Map<JobId, JobParameterByteStore> jobParameterByteStoreMap = new HashMap<>(); private final WorkQueue workQueue; @@ -135,6 +140,8 @@ public class ClusterControllerService implements IControllerService { private final JobIdFactory jobIdFactory; + private final DeployedJobSpecIdFactory deployedJobSpecIdFactory; + private IJobManager jobManager; private ShutdownRun shutdownCallback; @@ -164,8 +171,8 @@ public class ClusterControllerService implements IControllerService { final ClusterTopology topology = computeClusterTopology(ccConfig); ccContext = new ClusterControllerContext(topology); sweeper = new DeadNodeSweeper(); - datasetDirectoryService = new DatasetDirectoryService(ccConfig.getResultTTL(), - ccConfig.getResultSweepThreshold(), preDistributedJobStore); + datasetDirectoryService = + new DatasetDirectoryService(ccConfig.getResultTTL(), ccConfig.getResultSweepThreshold()); deploymentRunMap = new HashMap<>(); stateDumpRunMap = new HashMap<>(); @@ -175,6 +182,8 @@ public class ClusterControllerService implements IControllerService { nodeManager = new NodeManager(this, ccConfig, resourceManager); jobIdFactory = new JobIdFactory(); + + deployedJobSpecIdFactory = new DeployedJobSpecIdFactory(); } private static ClusterTopology computeClusterTopology(CCConfig ccConfig) throws Exception { @@ -347,8 +356,21 @@ public class ClusterControllerService implements IControllerService { return nodeManager; } - public PreDistributedJobStore getPreDistributedJobStore() throws HyracksException { - return preDistributedJobStore; + public DeployedJobSpecStore getDeployedJobSpecStore() throws HyracksException { + return deployedJobSpecStore; + } + + public void removeJobParameterByteStore(JobId jobId) throws HyracksException { + jobParameterByteStoreMap.remove(jobId); + } + + public JobParameterByteStore createOrGetJobParameterByteStore(JobId jobId) throws HyracksException { + JobParameterByteStore jpbs = jobParameterByteStoreMap.get(jobId); + if (jpbs == null) { + jpbs = new JobParameterByteStore(); + jobParameterByteStoreMap.put(jobId, jpbs); + } + return jpbs; } public IResourceManager getResourceManager() { @@ -397,6 +419,10 @@ public class ClusterControllerService implements IControllerService { return jobIdFactory; } + public DeployedJobSpecIdFactory getDeployedJobSpecIdFactory() { + return deployedJobSpecIdFactory; + } + private final class ClusterControllerContext implements ICCContext { private final ClusterTopology topology; http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e6f426b8/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/DeployedJobSpecStore.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/DeployedJobSpecStore.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/DeployedJobSpecStore.java new file mode 100644 index 0000000..1a3051e --- /dev/null +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/DeployedJobSpecStore.java @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hyracks.control.cc; + +import java.util.Hashtable; +import java.util.Map; +import java.util.Set; + +import org.apache.hyracks.api.constraints.Constraint; +import org.apache.hyracks.api.exceptions.ErrorCode; +import org.apache.hyracks.api.exceptions.HyracksException; +import org.apache.hyracks.api.job.ActivityClusterGraph; +import org.apache.hyracks.api.job.DeployedJobSpecId; +import org.apache.hyracks.api.job.JobSpecification; + +public class DeployedJobSpecStore { + + private final Map<DeployedJobSpecId, DeployedJobSpecDescriptor> deployedJobSpecDescriptorMap; + + public DeployedJobSpecStore() { + deployedJobSpecDescriptorMap = new Hashtable<>(); + } + + public void addDeployedJobSpecDescriptor(DeployedJobSpecId deployedJobSpecId, + ActivityClusterGraph activityClusterGraph, + JobSpecification jobSpecification, Set<Constraint> activityClusterGraphConstraints) + throws HyracksException { + if (deployedJobSpecDescriptorMap.get(deployedJobSpecId) != null) { + throw HyracksException.create(ErrorCode.DUPLICATE_DEPLOYED_JOB, deployedJobSpecId); + } + DeployedJobSpecDescriptor descriptor = + new DeployedJobSpecDescriptor(activityClusterGraph, jobSpecification, activityClusterGraphConstraints); + deployedJobSpecDescriptorMap.put(deployedJobSpecId, descriptor); + } + + public void checkForExistingDeployedJobSpecDescriptor(DeployedJobSpecId deployedJobSpecId) throws HyracksException { + if (deployedJobSpecDescriptorMap.get(deployedJobSpecId) != null) { + throw HyracksException.create(ErrorCode.DUPLICATE_DEPLOYED_JOB, deployedJobSpecId); + } + } + + public DeployedJobSpecDescriptor getDeployedJobSpecDescriptor(DeployedJobSpecId deployedJobSpecId) + throws HyracksException { + DeployedJobSpecDescriptor descriptor = deployedJobSpecDescriptorMap.get(deployedJobSpecId); + if (descriptor == null) { + throw HyracksException.create(ErrorCode.ERROR_FINDING_DEPLOYED_JOB, deployedJobSpecId); + } + return descriptor; + } + + public void removeDeployedJobSpecDescriptor(DeployedJobSpecId deployedJobSpecId) throws HyracksException { + DeployedJobSpecDescriptor descriptor = deployedJobSpecDescriptorMap.get(deployedJobSpecId); + if (descriptor == null) { + throw HyracksException.create(ErrorCode.ERROR_FINDING_DEPLOYED_JOB, deployedJobSpecId); + } + deployedJobSpecDescriptorMap.remove(deployedJobSpecId); + } + + public class DeployedJobSpecDescriptor { + + private final ActivityClusterGraph activityClusterGraph; + + private final JobSpecification jobSpecification; + + private final Set<Constraint> activityClusterGraphConstraints; + + private DeployedJobSpecDescriptor(ActivityClusterGraph activityClusterGraph, + JobSpecification jobSpecification, Set<Constraint> activityClusterGraphConstraints) { + this.activityClusterGraph = activityClusterGraph; + this.jobSpecification = jobSpecification; + this.activityClusterGraphConstraints = activityClusterGraphConstraints; + } + + public ActivityClusterGraph getActivityClusterGraph() { + return activityClusterGraph; + } + + public JobSpecification getJobSpecification() { + return jobSpecification; + } + + public Set<Constraint> getActivityClusterGraphConstraints() { + return activityClusterGraphConstraints; + } + } +} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e6f426b8/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/PreDistributedJobStore.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/PreDistributedJobStore.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/PreDistributedJobStore.java deleted file mode 100644 index 117621f..0000000 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/PreDistributedJobStore.java +++ /dev/null @@ -1,104 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.hyracks.control.cc; - -import java.util.Hashtable; -import java.util.Map; -import java.util.Set; - -import org.apache.hyracks.api.constraints.Constraint; -import org.apache.hyracks.api.exceptions.ErrorCode; -import org.apache.hyracks.api.exceptions.HyracksException; -import org.apache.hyracks.api.job.ActivityClusterGraph; -import org.apache.hyracks.api.job.JobId; -import org.apache.hyracks.api.job.JobSpecification; - -public class PreDistributedJobStore { - - private final Map<JobId, PreDistributedJobDescriptor> preDistributedJobDescriptorMap; - - public PreDistributedJobStore() { - preDistributedJobDescriptorMap = new Hashtable<>(); - } - - public void addDistributedJobDescriptor(JobId jobId, ActivityClusterGraph activityClusterGraph, - JobSpecification jobSpecification, Set<Constraint> activityClusterGraphConstraints) - throws HyracksException { - if (preDistributedJobDescriptorMap.get(jobId) != null) { - throw HyracksException.create(ErrorCode.DUPLICATE_DISTRIBUTED_JOB, jobId); - } - PreDistributedJobDescriptor descriptor = - new PreDistributedJobDescriptor(activityClusterGraph, jobSpecification, activityClusterGraphConstraints); - preDistributedJobDescriptorMap.put(jobId, descriptor); - } - - public void checkForExistingDistributedJobDescriptor(JobId jobId) throws HyracksException { - if (preDistributedJobDescriptorMap.get(jobId) != null) { - throw HyracksException.create(ErrorCode.DUPLICATE_DISTRIBUTED_JOB, jobId); - } - } - - public PreDistributedJobDescriptor getDistributedJobDescriptor(JobId jobId) throws HyracksException { - PreDistributedJobDescriptor descriptor = preDistributedJobDescriptorMap.get(jobId); - if (descriptor == null) { - throw HyracksException.create(ErrorCode.ERROR_FINDING_DISTRIBUTED_JOB, jobId); - } - return descriptor; - } - - public boolean jobIsPredistributed(JobId jobId) { - return preDistributedJobDescriptorMap.get(jobId) != null; - } - - public void removeDistributedJobDescriptor(JobId jobId) throws HyracksException { - PreDistributedJobDescriptor descriptor = preDistributedJobDescriptorMap.get(jobId); - if (descriptor == null) { - throw HyracksException.create(ErrorCode.ERROR_FINDING_DISTRIBUTED_JOB, jobId); - } - preDistributedJobDescriptorMap.remove(jobId); - } - - public class PreDistributedJobDescriptor { - - private final ActivityClusterGraph activityClusterGraph; - - private final JobSpecification jobSpecification; - - private final Set<Constraint> activityClusterGraphConstraints; - - private PreDistributedJobDescriptor(ActivityClusterGraph activityClusterGraph, - JobSpecification jobSpecification, Set<Constraint> activityClusterGraphConstraints) { - this.activityClusterGraph = activityClusterGraph; - this.jobSpecification = jobSpecification; - this.activityClusterGraphConstraints = activityClusterGraphConstraints; - } - - public ActivityClusterGraph getActivityClusterGraph() { - return activityClusterGraph; - } - - public JobSpecification getJobSpecification() { - return jobSpecification; - } - - public Set<Constraint> getActivityClusterGraphConstraints() { - return activityClusterGraphConstraints; - } - } -} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e6f426b8/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/dataset/DatasetDirectoryService.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/dataset/DatasetDirectoryService.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/dataset/DatasetDirectoryService.java index ca1c91b..1cb07d0 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/dataset/DatasetDirectoryService.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/dataset/DatasetDirectoryService.java @@ -42,7 +42,6 @@ import org.apache.hyracks.api.exceptions.HyracksException; import org.apache.hyracks.api.job.JobId; import org.apache.hyracks.api.job.JobSpecification; import org.apache.hyracks.api.job.JobStatus; -import org.apache.hyracks.control.cc.PreDistributedJobStore; import org.apache.hyracks.control.common.dataset.ResultStateSweeper; import org.apache.hyracks.control.common.work.IResultCallback; @@ -63,14 +62,10 @@ public class DatasetDirectoryService implements IDatasetDirectoryService { private final Map<JobId, JobResultInfo> jobResultLocations; - private final PreDistributedJobStore preDistributedJobStore; - - public DatasetDirectoryService(long resultTTL, long resultSweepThreshold, - PreDistributedJobStore preDistributedJobStore) { + public DatasetDirectoryService(long resultTTL, long resultSweepThreshold) { this.resultTTL = resultTTL; this.resultSweepThreshold = resultSweepThreshold; - this.preDistributedJobStore = preDistributedJobStore; - jobResultLocations = new LinkedHashMap<>(); + jobResultLocations = new LinkedHashMap<JobId, JobResultInfo>(); } @Override @@ -186,9 +181,6 @@ public class DatasetDirectoryService implements IDatasetDirectoryService { @Override public synchronized long getResultTimestamp(JobId jobId) { - if (preDistributedJobStore.jobIsPredistributed(jobId)) { - return -1; - } return getState(jobId).getTimestamp(); } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e6f426b8/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/executor/JobExecutor.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/executor/JobExecutor.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/executor/JobExecutor.java index 8a69a6f..0b69024 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/executor/JobExecutor.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/executor/JobExecutor.java @@ -47,6 +47,7 @@ import org.apache.hyracks.api.exceptions.ErrorCode; import org.apache.hyracks.api.exceptions.HyracksException; import org.apache.hyracks.api.job.ActivityCluster; import org.apache.hyracks.api.job.ActivityClusterGraph; +import org.apache.hyracks.api.job.DeployedJobSpecId; import org.apache.hyracks.api.job.JobId; import org.apache.hyracks.api.job.JobStatus; import org.apache.hyracks.api.partitions.PartitionId; @@ -77,7 +78,7 @@ public class JobExecutor { private final PartitionConstraintSolver solver; - private final boolean predistributed; + private final DeployedJobSpecId deployedJobSpecId; private final Map<PartitionId, TaskCluster> partitionProducingTaskClusterMap; @@ -88,10 +89,10 @@ public class JobExecutor { private boolean cancelled = false; public JobExecutor(ClusterControllerService ccs, JobRun jobRun, Collection<Constraint> constraints, - boolean predistributed) { + DeployedJobSpecId deployedJobSpecId) { this.ccs = ccs; this.jobRun = jobRun; - this.predistributed = predistributed; + this.deployedJobSpecId = deployedJobSpecId; solver = new PartitionConstraintSolver(); partitionProducingTaskClusterMap = new HashMap<>(); inProgressTaskClusters = new HashSet<>(); @@ -99,8 +100,8 @@ public class JobExecutor { random = new Random(); } - public boolean isPredistributed() { - return predistributed; + public boolean isDeployed() { + return deployedJobSpecId != null; } public JobRun getJobRun() { @@ -502,7 +503,7 @@ public class JobExecutor { new HashMap<>(jobRun.getConnectorPolicyMap()); INodeManager nodeManager = ccs.getNodeManager(); try { - byte[] acgBytes = predistributed ? null : JavaSerializationUtils.serialize(acg); + byte[] acgBytes = isDeployed() ? null : JavaSerializationUtils.serialize(acg); for (Map.Entry<String, List<TaskAttemptDescriptor>> entry : taskAttemptMap.entrySet()) { String nodeId = entry.getKey(); final List<TaskAttemptDescriptor> taskDescriptors = entry.getValue(); @@ -515,7 +516,8 @@ public class JobExecutor { } byte[] jagBytes = changed ? acgBytes : null; node.getNodeController().startTasks(deploymentId, jobId, jagBytes, taskDescriptors, - connectorPolicies, jobRun.getFlags()); + connectorPolicies, jobRun.getFlags(), + ccs.createOrGetJobParameterByteStore(jobId).getParameterMap(), deployedJobSpecId); } } } catch (Exception e) { http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e6f426b8/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java index fa22dd3..26f8022 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java @@ -142,6 +142,7 @@ public class JobManager implements IJobManager { @Override public void prepareComplete(JobRun run, JobStatus status, List<Exception> exceptions) throws HyracksException { + ccs.removeJobParameterByteStore(run.getJobId()); checkJob(run); if (status == JobStatus.FAILURE_BEFORE_EXECUTION) { run.setPendingStatus(JobStatus.FAILURE, exceptions); @@ -306,9 +307,7 @@ public class JobManager implements IJobManager { CCServiceContext serviceCtx = ccs.getContext(); JobSpecification spec = run.getJobSpecification(); - if (!run.getExecutor().isPredistributed()) { - serviceCtx.notifyJobCreation(jobId, spec); - } + serviceCtx.notifyJobCreation(jobId, spec); run.setStatus(JobStatus.RUNNING, null); executeJobInternal(run); } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e6f426b8/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobRun.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobRun.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobRun.java index ef0bca2..58f44ef 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobRun.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobRun.java @@ -37,6 +37,7 @@ import org.apache.hyracks.api.exceptions.HyracksException; import org.apache.hyracks.api.job.ActivityCluster; import org.apache.hyracks.api.job.ActivityClusterGraph; import org.apache.hyracks.api.job.ActivityClusterId; +import org.apache.hyracks.api.job.DeployedJobSpecId; import org.apache.hyracks.api.job.IActivityClusterGraphGenerator; import org.apache.hyracks.api.job.IActivityClusterGraphGeneratorFactory; import org.apache.hyracks.api.job.JobFlag; @@ -45,7 +46,7 @@ import org.apache.hyracks.api.job.JobSpecification; import org.apache.hyracks.api.job.JobStatus; import org.apache.hyracks.api.partitions.PartitionId; import org.apache.hyracks.control.cc.ClusterControllerService; -import org.apache.hyracks.control.cc.PreDistributedJobStore.PreDistributedJobDescriptor; +import org.apache.hyracks.control.cc.DeployedJobSpecStore.DeployedJobSpecDescriptor; import org.apache.hyracks.control.cc.executor.ActivityPartitionDetails; import org.apache.hyracks.control.cc.executor.JobExecutor; import org.apache.hyracks.control.cc.partitions.PartitionMatchMaker; @@ -114,21 +115,23 @@ public class JobRun implements IJobStatusConditionVariable { createTime = System.currentTimeMillis(); } - //Run a Pre-distributed job by passing the JobId + //Run a deployed job spec public JobRun(ClusterControllerService ccs, DeploymentId deploymentId, JobId jobId, Set<JobFlag> jobFlags, - PreDistributedJobDescriptor distributedJobDescriptor) + DeployedJobSpecDescriptor deployedJobSpecDescriptor, Map<byte[], byte[]> jobParameters, + DeployedJobSpecId deployedJobSpecId) throws HyracksException { this(deploymentId, jobId, jobFlags, - distributedJobDescriptor.getJobSpecification(), distributedJobDescriptor.getActivityClusterGraph()); - Set<Constraint> constaints = distributedJobDescriptor.getActivityClusterGraphConstraints(); - this.scheduler = new JobExecutor(ccs, this, constaints, true); + deployedJobSpecDescriptor.getJobSpecification(), deployedJobSpecDescriptor.getActivityClusterGraph()); + ccs.createOrGetJobParameterByteStore(jobId).setParameters(jobParameters); + Set<Constraint> constaints = deployedJobSpecDescriptor.getActivityClusterGraphConstraints(); + this.scheduler = new JobExecutor(ccs, this, constaints, deployedJobSpecId); } //Run a new job by creating an ActivityClusterGraph public JobRun(ClusterControllerService ccs, DeploymentId deploymentId, JobId jobId, IActivityClusterGraphGeneratorFactory acggf, IActivityClusterGraphGenerator acgg, Set<JobFlag> jobFlags) { this(deploymentId, jobId, jobFlags, acggf.getJobSpecification(), acgg.initialize()); - this.scheduler = new JobExecutor(ccs, this, acgg.getConstraints(), false); + this.scheduler = new JobExecutor(ccs, this, acgg.getConstraints(), null); } public DeploymentId getDeploymentId() { http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e6f426b8/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/DeployJobSpecWork.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/DeployJobSpecWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/DeployJobSpecWork.java new file mode 100644 index 0000000..f7335a8 --- /dev/null +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/DeployJobSpecWork.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hyracks.control.cc.work; + +import java.util.EnumSet; + +import org.apache.hyracks.api.job.ActivityClusterGraph; +import org.apache.hyracks.api.job.DeployedJobSpecId; +import org.apache.hyracks.api.job.IActivityClusterGraphGenerator; +import org.apache.hyracks.api.job.IActivityClusterGraphGeneratorFactory; +import org.apache.hyracks.api.job.JobFlag; +import org.apache.hyracks.api.util.JavaSerializationUtils; +import org.apache.hyracks.control.cc.ClusterControllerService; +import org.apache.hyracks.control.cc.NodeControllerState; +import org.apache.hyracks.control.cc.application.CCServiceContext; +import org.apache.hyracks.control.cc.cluster.INodeManager; +import org.apache.hyracks.control.common.deployment.DeploymentUtils; +import org.apache.hyracks.control.common.work.IResultCallback; +import org.apache.hyracks.control.common.work.SynchronizableWork; + +public class DeployJobSpecWork extends SynchronizableWork { + private final ClusterControllerService ccs; + private final byte[] acggfBytes; + private final DeployedJobSpecId deployedJobSpecId; + private final IResultCallback<DeployedJobSpecId> callback; + + public DeployJobSpecWork(ClusterControllerService ccs, byte[] acggfBytes, DeployedJobSpecId deployedJobSpecId, + IResultCallback<DeployedJobSpecId> callback) { + this.deployedJobSpecId = deployedJobSpecId; + this.ccs = ccs; + this.acggfBytes = acggfBytes; + this.callback = callback; + } + + @Override + protected void doRun() throws Exception { + try { + final CCServiceContext ccServiceCtx = ccs.getContext(); + ccs.getDeployedJobSpecStore().checkForExistingDeployedJobSpecDescriptor(deployedJobSpecId); + IActivityClusterGraphGeneratorFactory acggf = + (IActivityClusterGraphGeneratorFactory) DeploymentUtils.deserialize(acggfBytes, null, ccServiceCtx); + IActivityClusterGraphGenerator acgg = + acggf.createActivityClusterGraphGenerator(ccServiceCtx, EnumSet.noneOf(JobFlag.class)); + ActivityClusterGraph acg = acgg.initialize(); + ccs.getDeployedJobSpecStore().addDeployedJobSpecDescriptor(deployedJobSpecId, acg, + acggf.getJobSpecification(), + acgg.getConstraints()); + + byte[] acgBytes = JavaSerializationUtils.serialize(acg); + + INodeManager nodeManager = ccs.getNodeManager(); + for (NodeControllerState node : nodeManager.getAllNodeControllerStates()) { + node.getNodeController().deployJobSpec(deployedJobSpecId, acgBytes); + } + callback.setValue(deployedJobSpecId); + } catch (Exception e) { + callback.setException(e); + } + } +} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e6f426b8/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/DeployedJobFailureWork.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/DeployedJobFailureWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/DeployedJobFailureWork.java new file mode 100644 index 0000000..8afdf42 --- /dev/null +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/DeployedJobFailureWork.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hyracks.control.cc.work; + +import org.apache.hyracks.api.exceptions.ErrorCode; +import org.apache.hyracks.api.exceptions.HyracksException; +import org.apache.hyracks.api.job.DeployedJobSpecId; +import org.apache.hyracks.control.common.work.SynchronizableWork; + +public class DeployedJobFailureWork extends SynchronizableWork { + protected final DeployedJobSpecId deployedJobSpecId; + protected final String nodeId; + + public DeployedJobFailureWork(DeployedJobSpecId deployedJobSpecId, String nodeId) { + this.deployedJobSpecId = deployedJobSpecId; + this.nodeId = nodeId; + } + + @Override + public void doRun() throws HyracksException { + throw HyracksException.create(ErrorCode.DEPLOYED_JOB_FAILURE, deployedJobSpecId, nodeId); + } +} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e6f426b8/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/DestroyJobWork.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/DestroyJobWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/DestroyJobWork.java deleted file mode 100644 index df98252..0000000 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/DestroyJobWork.java +++ /dev/null @@ -1,52 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.hyracks.control.cc.work; - -import org.apache.hyracks.api.job.JobId; -import org.apache.hyracks.control.cc.ClusterControllerService; -import org.apache.hyracks.control.cc.NodeControllerState; -import org.apache.hyracks.control.cc.cluster.INodeManager; -import org.apache.hyracks.control.common.work.IResultCallback; -import org.apache.hyracks.control.common.work.SynchronizableWork; - -public class DestroyJobWork extends SynchronizableWork { - private final ClusterControllerService ccs; - private final JobId jobId; - private final IResultCallback<JobId> callback; - - public DestroyJobWork(ClusterControllerService ccs, JobId jobId, IResultCallback<JobId> callback) { - this.jobId = jobId; - this.ccs = ccs; - this.callback = callback; - } - - @Override - protected void doRun() throws Exception { - try { - ccs.getPreDistributedJobStore().removeDistributedJobDescriptor(jobId); - INodeManager nodeManager = ccs.getNodeManager(); - for (NodeControllerState node : nodeManager.getAllNodeControllerStates()) { - node.getNodeController().destroyJob(jobId); - } - callback.setValue(jobId); - } catch (Exception e) { - callback.setException(e); - } - } -} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e6f426b8/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/DistributeJobWork.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/DistributeJobWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/DistributeJobWork.java deleted file mode 100644 index 5a57b1b..0000000 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/DistributeJobWork.java +++ /dev/null @@ -1,80 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.hyracks.control.cc.work; - -import java.util.EnumSet; - -import org.apache.hyracks.api.job.ActivityClusterGraph; -import org.apache.hyracks.api.job.IActivityClusterGraphGenerator; -import org.apache.hyracks.api.job.IActivityClusterGraphGeneratorFactory; -import org.apache.hyracks.api.job.JobFlag; -import org.apache.hyracks.api.job.JobId; -import org.apache.hyracks.api.job.JobIdFactory; -import org.apache.hyracks.api.util.JavaSerializationUtils; -import org.apache.hyracks.control.cc.ClusterControllerService; -import org.apache.hyracks.control.cc.NodeControllerState; -import org.apache.hyracks.control.cc.application.CCServiceContext; -import org.apache.hyracks.control.cc.cluster.INodeManager; -import org.apache.hyracks.control.common.deployment.DeploymentUtils; -import org.apache.hyracks.control.common.work.IResultCallback; -import org.apache.hyracks.control.common.work.SynchronizableWork; - -public class DistributeJobWork extends SynchronizableWork { - private final ClusterControllerService ccs; - private final byte[] acggfBytes; - private final JobIdFactory jobIdFactory; - private final IResultCallback<JobId> callback; - - public DistributeJobWork(ClusterControllerService ccs, byte[] acggfBytes, JobIdFactory jobIdFactory, - IResultCallback<JobId> callback) { - this.jobIdFactory = jobIdFactory; - this.ccs = ccs; - this.acggfBytes = acggfBytes; - this.callback = callback; - } - - @Override - protected void doRun() throws Exception { - try { - JobId jobId = jobIdFactory.create(); - final CCServiceContext ccServiceCtx = ccs.getContext(); - ccs.getPreDistributedJobStore().checkForExistingDistributedJobDescriptor(jobId); - IActivityClusterGraphGeneratorFactory acggf = - (IActivityClusterGraphGeneratorFactory) DeploymentUtils.deserialize(acggfBytes, null, ccServiceCtx); - IActivityClusterGraphGenerator acgg = - acggf.createActivityClusterGraphGenerator(jobId, ccServiceCtx, EnumSet.noneOf(JobFlag.class)); - ActivityClusterGraph acg = acgg.initialize(); - ccs.getPreDistributedJobStore().addDistributedJobDescriptor(jobId, acg, acggf.getJobSpecification(), - acgg.getConstraints()); - - ccServiceCtx.notifyJobCreation(jobId, acggf.getJobSpecification()); - - byte[] acgBytes = JavaSerializationUtils.serialize(acg); - - INodeManager nodeManager = ccs.getNodeManager(); - for (NodeControllerState node : nodeManager.getAllNodeControllerStates()) { - node.getNodeController().distributeJob(jobId, acgBytes); - } - - callback.setValue(jobId); - } catch (Exception e) { - callback.setException(e); - } - } -} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e6f426b8/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/DistributedJobFailureWork.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/DistributedJobFailureWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/DistributedJobFailureWork.java deleted file mode 100644 index f7fa2a4..0000000 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/DistributedJobFailureWork.java +++ /dev/null @@ -1,39 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.hyracks.control.cc.work; - -import org.apache.hyracks.api.exceptions.ErrorCode; -import org.apache.hyracks.api.exceptions.HyracksException; -import org.apache.hyracks.api.job.JobId; -import org.apache.hyracks.control.common.work.SynchronizableWork; - -public class DistributedJobFailureWork extends SynchronizableWork { - protected final JobId jobId; - protected final String nodeId; - - public DistributedJobFailureWork(JobId jobId, String nodeId) { - this.jobId = jobId; - this.nodeId = nodeId; - } - - @Override - public void doRun() throws HyracksException { - throw HyracksException.create(ErrorCode.DISTRIBUTED_JOB_FAILURE, jobId, nodeId); - } -} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e6f426b8/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/JobStartWork.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/JobStartWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/JobStartWork.java index ed82705..cfedfc9 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/JobStartWork.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/JobStartWork.java @@ -18,9 +18,11 @@ */ package org.apache.hyracks.control.cc.work; -import java.util.EnumSet; +import java.util.Map; +import java.util.Set; import org.apache.hyracks.api.deployment.DeploymentId; +import org.apache.hyracks.api.job.DeployedJobSpecId; import org.apache.hyracks.api.job.IActivityClusterGraphGenerator; import org.apache.hyracks.api.job.IActivityClusterGraphGeneratorFactory; import org.apache.hyracks.api.job.JobFlag; @@ -37,20 +39,23 @@ import org.apache.hyracks.control.common.work.SynchronizableWork; public class JobStartWork extends SynchronizableWork { private final ClusterControllerService ccs; private final byte[] acggfBytes; - private final EnumSet<JobFlag> jobFlags; + private final Set<JobFlag> jobFlags; private final DeploymentId deploymentId; - private final JobId preDistributedJobId; private final IResultCallback<JobId> callback; private final JobIdFactory jobIdFactory; + private final DeployedJobSpecId deployedJobSpecId; + private final Map<byte[], byte[]> jobParameters; public JobStartWork(ClusterControllerService ccs, DeploymentId deploymentId, byte[] acggfBytes, - EnumSet<JobFlag> jobFlags, JobId jobId, IResultCallback<JobId> callback, JobIdFactory jobIdFactory) { + Set<JobFlag> jobFlags, JobIdFactory jobIdFactory, Map<byte[], byte[]> jobParameters, + IResultCallback<JobId> callback, DeployedJobSpecId deployedJobSpecId) { this.deploymentId = deploymentId; - this.preDistributedJobId = jobId; this.ccs = ccs; this.acggfBytes = acggfBytes; this.jobFlags = jobFlags; this.callback = callback; + this.deployedJobSpecId = deployedJobSpecId; + this.jobParameters = jobParameters; this.jobIdFactory = jobIdFactory; } @@ -61,19 +66,18 @@ public class JobStartWork extends SynchronizableWork { final CCServiceContext ccServiceCtx = ccs.getContext(); JobId jobId; JobRun run; - if (preDistributedJobId == null) { - jobId = jobIdFactory.create(); + jobId = jobIdFactory.create(); + if (deployedJobSpecId == null) { //Need to create the ActivityClusterGraph IActivityClusterGraphGeneratorFactory acggf = (IActivityClusterGraphGeneratorFactory) DeploymentUtils .deserialize(acggfBytes, deploymentId, ccServiceCtx); - IActivityClusterGraphGenerator acgg = - acggf.createActivityClusterGraphGenerator(jobId, ccServiceCtx, jobFlags); + IActivityClusterGraphGenerator acgg = acggf.createActivityClusterGraphGenerator(ccServiceCtx, jobFlags); run = new JobRun(ccs, deploymentId, jobId, acggf, acgg, jobFlags); } else { - jobId = preDistributedJobId; //ActivityClusterGraph has already been distributed run = new JobRun(ccs, deploymentId, jobId, jobFlags, - ccs.getPreDistributedJobStore().getDistributedJobDescriptor(jobId)); + ccs.getDeployedJobSpecStore().getDeployedJobSpecDescriptor(deployedJobSpecId), jobParameters, + deployedJobSpecId); } jobManager.add(run); callback.setValue(jobId); http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e6f426b8/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/UndeployJobSpecWork.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/UndeployJobSpecWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/UndeployJobSpecWork.java new file mode 100644 index 0000000..143c8c1 --- /dev/null +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/UndeployJobSpecWork.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hyracks.control.cc.work; + +import org.apache.hyracks.api.job.DeployedJobSpecId; +import org.apache.hyracks.control.cc.ClusterControllerService; +import org.apache.hyracks.control.cc.NodeControllerState; +import org.apache.hyracks.control.cc.cluster.INodeManager; +import org.apache.hyracks.control.common.work.IResultCallback; +import org.apache.hyracks.control.common.work.SynchronizableWork; + +public class UndeployJobSpecWork extends SynchronizableWork { + private final ClusterControllerService ccs; + private final DeployedJobSpecId deployedJobSpecId; + private final IResultCallback<DeployedJobSpecId> callback; + + public UndeployJobSpecWork(ClusterControllerService ccs, DeployedJobSpecId deployedJobSpecId, + IResultCallback<DeployedJobSpecId> callback) { + this.deployedJobSpecId = deployedJobSpecId; + this.ccs = ccs; + this.callback = callback; + } + + @Override + protected void doRun() throws Exception { + try { + ccs.getDeployedJobSpecStore().removeDeployedJobSpecDescriptor(deployedJobSpecId); + INodeManager nodeManager = ccs.getNodeManager(); + for (NodeControllerState node : nodeManager.getAllNodeControllerStates()) { + node.getNodeController().undeployJobSpec(deployedJobSpecId); + } + callback.setValue(deployedJobSpecId); + } catch (Exception e) { + callback.setException(e); + } + } +} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e6f426b8/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/IClusterController.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/IClusterController.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/IClusterController.java index ec8e045..6fd321e 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/IClusterController.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/IClusterController.java @@ -24,6 +24,7 @@ import org.apache.hyracks.api.comm.NetworkAddress; import org.apache.hyracks.api.dataflow.TaskAttemptId; import org.apache.hyracks.api.dataset.ResultSetId; import org.apache.hyracks.api.deployment.DeploymentId; +import org.apache.hyracks.api.job.DeployedJobSpecId; import org.apache.hyracks.api.job.JobId; import org.apache.hyracks.control.common.controllers.NodeRegistration; import org.apache.hyracks.control.common.deployment.DeploymentStatus; @@ -44,7 +45,7 @@ public interface IClusterController { public void notifyTaskFailure(JobId jobId, TaskAttemptId taskId, String nodeId, List<Exception> exceptions) throws Exception; - public void notifyDistributedJobFailure(JobId jobId, String nodeId) throws Exception; + public void notifyDeployedJobSpecFailure(DeployedJobSpecId deployedJobSpecId, String nodeId) throws Exception; public void notifyJobletCleanup(JobId jobId, String nodeId) throws Exception; http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e6f426b8/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/INodeController.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/INodeController.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/INodeController.java index a10f8f0..5d781cf 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/INodeController.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/INodeController.java @@ -19,7 +19,6 @@ package org.apache.hyracks.control.common.base; import java.net.URL; -import java.util.EnumSet; import java.util.List; import java.util.Map; import java.util.Set; @@ -29,6 +28,7 @@ import org.apache.hyracks.api.dataflow.ConnectorDescriptorId; import org.apache.hyracks.api.dataflow.TaskAttemptId; import org.apache.hyracks.api.dataflow.connectors.IConnectorPolicy; import org.apache.hyracks.api.deployment.DeploymentId; +import org.apache.hyracks.api.job.DeployedJobSpecId; import org.apache.hyracks.api.job.JobFlag; import org.apache.hyracks.api.job.JobId; import org.apache.hyracks.api.job.JobStatus; @@ -38,7 +38,8 @@ import org.apache.hyracks.control.common.job.TaskAttemptDescriptor; public interface INodeController { public void startTasks(DeploymentId deploymentId, JobId jobId, byte[] planBytes, List<TaskAttemptDescriptor> taskDescriptors, Map<ConnectorDescriptorId, IConnectorPolicy> connectorPolicies, - Set<JobFlag> flags) throws Exception; + Set<JobFlag> flags, Map<byte[], byte[]> jobParameters, DeployedJobSpecId deployedJobSpecId) + throws Exception; public void abortTasks(JobId jobId, List<TaskAttemptId> tasks) throws Exception; @@ -50,9 +51,9 @@ public interface INodeController { public void undeployBinary(DeploymentId deploymentId) throws Exception; - public void distributeJob(JobId jobId, byte[] planBytes) throws Exception; + public void deployJobSpec(DeployedJobSpecId deployedJobSpecId, byte[] planBytes) throws Exception; - public void destroyJob(JobId jobId) throws Exception; + public void undeployJobSpec(DeployedJobSpecId deployedJobSpecId) throws Exception; public void dumpState(String stateDumpId) throws Exception;
