Fix for OODT-305: Make the Resource Manager manage different queues of jobs independently (resneck, mattmann)
Project: http://git-wip-us.apache.org/repos/asf/oodt/repo Commit: http://git-wip-us.apache.org/repos/asf/oodt/commit/4f3588ce Tree: http://git-wip-us.apache.org/repos/asf/oodt/tree/4f3588ce Diff: http://git-wip-us.apache.org/repos/asf/oodt/diff/4f3588ce Branch: refs/heads/feature/zookeeper-config Commit: 4f3588ce8c9a455fb4efbd5db04009c8c91dd44b Parents: d329e1a Author: Chris Mattmann <[email protected]> Authored: Sun Jul 16 21:37:26 2017 -0700 Committer: Chris Mattmann <[email protected]> Committed: Sun Jul 16 21:37:26 2017 -0700 ---------------------------------------------------------------------- CHANGES.txt | 2 + .../resource/jobqueue/FifoMappedJobQueue.java | 510 +++++++++++++++++++ .../jobqueue/FifoMappedJobQueueFactory.java | 75 +++ .../cas/resource/jobqueue/MappedJobQueue.java | 132 +++++ .../apache/oodt/cas/resource/structs/Job.java | 12 + .../oodt/cas/resource/structs/JobInput.java | 8 + .../cas/resource/structs/NameValueJobInput.java | 17 + .../jobqueue/TestFifoMappedJobQueue.java | 101 ++++ 8 files changed, 857 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/oodt/blob/4f3588ce/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index c4be36b..7e4fc34 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -3,6 +3,8 @@ Apache OODT Change Log Release 1.1 - Current Development +* OODT-305 Make the Resource Manager manage different queues of jobs independently (resneck, mattmann) + * OODT-753 Move FM and WM Python APIs into "agility" component (kelly, mattmann) * OODT-563 Task editing and Workflow execution (varun, mattmann) http://git-wip-us.apache.org/repos/asf/oodt/blob/4f3588ce/resource/src/main/java/org/apache/oodt/cas/resource/jobqueue/FifoMappedJobQueue.java ---------------------------------------------------------------------- diff --git a/resource/src/main/java/org/apache/oodt/cas/resource/jobqueue/FifoMappedJobQueue.java b/resource/src/main/java/org/apache/oodt/cas/resource/jobqueue/FifoMappedJobQueue.java new file mode 100644 index 0000000..ac32fa6 --- /dev/null +++ b/resource/src/main/java/org/apache/oodt/cas/resource/jobqueue/FifoMappedJobQueue.java @@ -0,0 +1,510 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.oodt.cas.resource.jobqueue; + +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Vector; +import java.util.logging.Level; +import java.util.logging.Logger; + +import org.apache.oodt.cas.resource.jobrepo.JobRepository; +import org.apache.oodt.cas.resource.structs.JobSpec; +import org.apache.oodt.cas.resource.structs.JobStatus; +import org.apache.oodt.cas.resource.structs.exceptions.JobQueueException; +import org.apache.oodt.cas.resource.structs.exceptions.JobRepositoryException; + +/** + * This implementation provides a "queue-aware" {@link JobQueue} that ensures + * the FIFO execution of jobs. + * + * @author resneck + * + */ +public class FifoMappedJobQueue implements MappedJobQueue { + + private Map<String, Vector<String>> queues; + private int maxQueueSize; + private JobRepository repo; + private static final Logger LOG = Logger + .getLogger(FifoMappedJobQueue.class.getName()); + + public FifoMappedJobQueue(int maxSize, JobRepository repo) { + this.maxQueueSize = maxSize; + this.repo = repo; + this.queues = new HashMap<String, Vector<String>>(); + } + + /** + * @see org.apache.oodt.cas.resource.jobqueue.JobQueue.addJob(JobSpec) + */ + public synchronized String addJob(JobSpec spec) throws JobQueueException { + + // Check if the job is null and if its queue exists + if (spec == null) { + throw new JobQueueException("A null job was given."); + } + String queueName = spec.getJob().getQueueName(); + validateQueueName(queueName); + + // Check if the jobs queue is full + List<String> queue = queues.get(queueName); + if (queue.size() == maxQueueSize) { + throw new JobQueueException( + "The queue " + spec.getJob().getQueueName() + " is full. The job " + + spec.getJob().getId() + " could not be requeued."); + } + + // Add the job to the repository + try { + this.repo.addJob(spec); + } catch (JobRepositoryException e) { + throw new JobQueueException( + "An error occurred while adding job " + spec.getJob().getId() + + " to the job repository: " + e.getMessage()); + } + + // Add the job to the queue + queue.add(spec.getJob().getId()); + + // Update the jobs status + spec.getJob().setStatus(JobStatus.QUEUED); + try { + this.repo.updateJob(spec); + } catch (JobRepositoryException e) { + throw new JobQueueException("An error occurred while updating " + + "the status of job " + spec.getJob().getId() + + " in the job repository: " + e.getMessage()); + } + + LOG.log(Level.INFO, + "Job [id=" + spec.getJob().getId() + ",name=" + spec.getJob().getName() + + "] was added to the job queue in queue " + queueName); + return spec.getJob().getId(); + + } + + /** + * @see org.apache.oodt.cas.resource.jobqueue.JobQueue.requeueJob(JobSpec) + */ + public synchronized String requeueJob(JobSpec spec) throws JobQueueException { + + // Check if the job is null and if its queue exists + if (spec == null) { + throw new JobQueueException("A null job was given."); + } + String queueName = spec.getJob().getQueueName(); + validateQueueName(queueName); + + List<String> queue = queues.get(queueName); + + // Place the job at the front of the queue + queue.add(0, spec.getJob().getId()); + + // Set the jobs status + spec.getJob().setStatus(JobStatus.QUEUED); + try { + this.repo.updateJob(spec); + } catch (JobRepositoryException e) { + throw new JobQueueException("An error occurred while updating " + + "the status of job " + spec.getJob().getId() + + " in the job repository: " + e.getMessage()); + } + + return spec.getJob().getId(); + + } + + /** + * @see org.apache.oodt.cas.resource.jobqueue.JobQueue.getQueuedJobs() + */ + public synchronized List getQueuedJobs() { + + List<JobSpec> allJobs = new Vector<JobSpec>(); + for (Iterator<Vector<String>> i = queues.values().iterator(); i + .hasNext();) { + List<String> queue = i.next(); + for (String jobId : queue) { + try { + allJobs.add(this.repo.getJobById(jobId)); + } catch (JobRepositoryException e) { + LOG.log(Level.WARNING, "Failed to fetch JobSpec from repo: " + jobId); + } + } + } + + return allJobs; + + } + + public synchronized List<JobSpec> getQueuedJobs(String queueName) + throws JobQueueException, JobRepositoryException { + + // Check if the queue name is null or if it does not exist + validateQueueName(queueName); + + List<JobSpec> queueJobs = new Vector<JobSpec>(); + for (String jobId : this.queues.get(queueName)) { + queueJobs.add(this.repo.getJobById(jobId)); + } + + return queueJobs; + + } + + /** + * @see org.apache.oodt.cas.resource.jobqueue.JobQueue.purge() + */ + public synchronized void purge() { + + for (Iterator<Vector<String>> i = this.queues.values().iterator(); i + .hasNext();) { + i.next().removeAllElements(); + } + + } + + /** + * @see org.apache.oodt.cas.resource.jobqueue.JobQueue.isEmpty() + */ + public synchronized boolean isEmpty() { + + return this.getSize() == 0; + + } + + public synchronized boolean isEmpty(String queueName) + throws JobQueueException { + + // Check if the queue name is null or if it does not exist + validateQueueName(queueName); + + return this.queues.get(queueName).size() == 0; + + } + + /** + * @see org.apache.oodt.cas.resource.jobqueue.JobQueue.getNextJob() + */ + public synchronized JobSpec getNextJob() { + + // Check if any queues exist + if (this.queues.keySet().size() == 0) { + throw new RuntimeException("No queues are defined."); + } + + // Check if all queues are empty + if (this.isEmpty()) { + throw new RuntimeException("The queue contains no jobs."); + } + + // Look in each queue for a job + for (Iterator<String> i = this.queues.keySet().iterator(); i.hasNext();) { + + // Check if the queue is empty + List<String> queue = this.queues.get(i.next()); + if (!queue.isEmpty()) { + + // Check jobs from the front of the queue until we find one that + // is flagged as ready to schedule + for (int index = 0; index < queue.size(); index++) { + + // Check how the job is flagged + String jobId = queue.get(index); + JobSpec spec = null; + try { + spec = this.repo.getJobById(jobId); + } catch (JobRepositoryException e) { + LOG.log(Level.WARNING, + "Failed to fetch JobSpec from repo: " + jobId); + } + if (spec.getJob().getReady()) { + + // Remove the job from the queue + queue.remove(index); + + // Set the status of the fetched job + spec.getJob().setStatus(JobStatus.SCHEDULED); + try { + this.repo.updateJob(spec); + } catch (JobRepositoryException e) { + LOG.log(Level.WARNING, + "The status of job " + spec.getJob().getId() + + "was not properly set " + + "after being dequeued. Message: " + e.getMessage()); + } + + return spec; + + } + + } + + } + + } + + return null; + + } + + + public synchronized JobSpec getNextJob(String queueName) + throws JobQueueException, JobRepositoryException { + + // Check if the given queue name is null and if it exists + validateQueueName(queueName); + + // If the queue contains no jobs, return null + List<String> queue = queues.get(queueName); + if (queue.isEmpty()) { + return null; + } + + // Check jobs from the front of the queue until we find one that + // is flagged as ready to schedule + for (int index = 0; index < queue.size(); index++) { + + // Check how the job is flagged + String jobId = queue.get(index); + JobSpec spec = this.repo.getJobById(jobId); + if (spec.getJob().getReady()) { + + // Remove the job from the queue + queue.remove(index); + + // Set the status of the fetched job + spec.getJob().setStatus(JobStatus.SCHEDULED); + try { + this.repo.updateJob(spec); + } catch (JobRepositoryException e) { + LOG.log(Level.WARNING, + "The status of job " + spec.getJob().getId() + + "was not properly set after being" + " dequeued. Message: " + + e.getMessage()); + } + + return spec; + + } + + } + + return null; + + } + + /** + * @see org.apache.oodt.cas.resource.jobqueue.JobQueue.getJobRepository() + */ + public synchronized JobRepository getJobRepository() { + + return this.repo; + + } + + /** + * @see org.apache.oodt.cas.resource.jobqueue.JobQueue.getSize() + */ + public synchronized int getSize() { + + int totalJobs = 0; + for (Iterator<Vector<String>> i = queues.values().iterator(); i + .hasNext();) { + totalJobs += i.next().size(); + } + return totalJobs; + + } + + public synchronized int getSize(String queueName) throws JobQueueException { + + // Check if the given queue name is null and if it exists + validateQueueName(queueName); + + return this.queues.get(queueName).size(); + + } + + /** + * This method returns the number of jobs in any given queue that can be + * retained in the queue. This number does not change with the number of + * queues in the ResourceManager. + * + * @return The number of jobs of each queue that can be queued. + */ + public int getCapacity() { + + return this.maxQueueSize; + + } + + // TODO: Write a javadoc for this method when it can actually be used by + // the operator. + public synchronized void removeJob(JobSpec spec) throws JobQueueException { + + // Check if the job is null and if its queue exists + if (spec == null) { + throw new JobQueueException("A null job was given."); + } + String queueName = spec.getJob().getQueueName(); + validateQueueName(queueName); + + // Get the ID of the job + String id = spec.getJob().getId(); + + // Find the job in the queue and remove it + List<String> queue = this.queues.get(queueName); + int index = getIndexInQueue(id, queue); + if (index == -1) { + LOG.log(Level.WARNING, "No job with ID " + id + "could be removed " + + "since it was not found in the queue."); + } else { + queue.remove(index); + } + + } + + public synchronized void addQueue(String queueName) throws JobQueueException { + + // Check if queue name is null or already exists + if (queueName == null) { + throw new JobQueueException("A null queue name was given."); + } + if (queues.containsKey(queueName)) { + throw new JobQueueException("A queue with name " + queueName + + " could not be created as one " + "with that name already exists."); + } + + // Add the new queue to our map + this.queues.put(queueName, new Vector()); + + } + + + public synchronized void removeQueue(String queueName) + throws JobQueueException { + + // Check if the given queue name is null and if it exists + validateQueueName(queueName); + + // Warn the user if they are losing jobs + int queueSize = this.queues.get(queueName).size(); + if (queueSize > 0) { + LOG.log(Level.WARNING, "The queue being removed (" + queueName + + ") contains " + queueSize + " jobs."); + } + + // Delete the queue + this.queues.remove(queueName); + + } + + public synchronized void promoteJob(JobSpec spec) throws JobQueueException { + + // Check if the job is null and if its queue exists + if (spec == null) { + throw new JobQueueException("A null job was given."); + } + String queueName = spec.getJob().getQueueName(); + validateQueueName(queueName); + + // Get the ID of the job + String id = spec.getJob().getId(); + + // Find the job in the queue and move it to the front + List<String> queue = this.queues.get(queueName); + int index = getIndexInQueue(id, queue); + if (index == -1) { + LOG.log(Level.WARNING, "No job with ID " + id + "could be promoted " + + "since it was not found in the queue."); + } else { + queue.add(0, queue.remove(index)); + } + + } + + public synchronized List<String> getQueueNames(){ + if (this.queues != null && this.queues.keySet() != null && + this.queues.keySet().size() > 0){ + return Arrays.asList(queues.keySet().toArray(new String[]{""})); + } + return Collections.EMPTY_LIST; + } + + public synchronized void promoteKeyValPair(String key, String val) + throws JobQueueException, JobRepositoryException { + + List<JobSpec> specsToPromote = new Vector<JobSpec>(); + + for (Iterator<Vector<String>> i = queues.values().iterator(); i + .hasNext();) { + List<String> queue = i.next(); + for (String jobId : queue) { + JobSpec spec = null; + try { + spec = this.repo.getJobById(jobId); + } catch (JobRepositoryException e) { + LOG.log(Level.WARNING, "Failed to fetch JobSpec from repo: " + jobId); + } + if (spec.getIn().getMetadata().get(key).equals(val)) { + specsToPromote.add(spec); + } + } + } + + for (JobSpec spec : specsToPromote) { + promoteJob(spec); + } + + } + + /** + * This method checks if a given queue name is valid + * + * @param queueName + * The name of the queue to validate + * @throws JobQueueException + * If the name is null or no queue with the given name exists + */ + private void validateQueueName(String queueName) throws JobQueueException { + + if (queueName == null) { + throw new JobQueueException("A null queue name was given."); + } + if (!queues.containsKey(queueName)) { + throw new JobQueueException( + "An invalid queue name was given: " + queueName); + } + + } + + private int getIndexInQueue(String id, List<String> queue) { + for (int i = 0; i < queue.size(); i++) { + if (queue.get(i).equals(id)) { + return i; + } + } + return -1; + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/oodt/blob/4f3588ce/resource/src/main/java/org/apache/oodt/cas/resource/jobqueue/FifoMappedJobQueueFactory.java ---------------------------------------------------------------------- diff --git a/resource/src/main/java/org/apache/oodt/cas/resource/jobqueue/FifoMappedJobQueueFactory.java b/resource/src/main/java/org/apache/oodt/cas/resource/jobqueue/FifoMappedJobQueueFactory.java new file mode 100644 index 0000000..96a5e13 --- /dev/null +++ b/resource/src/main/java/org/apache/oodt/cas/resource/jobqueue/FifoMappedJobQueueFactory.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.oodt.cas.resource.jobqueue; + +import java.io.File; +import java.util.logging.Level; +import java.util.logging.Logger; + +import org.apache.oodt.cas.metadata.util.PathUtils; +import org.apache.oodt.cas.resource.jobqueue.JobQueue; +import org.apache.oodt.cas.resource.jobqueue.JobQueueFactory; +import org.apache.oodt.cas.resource.jobrepo.JobRepository; +import org.apache.oodt.cas.resource.util.GenericResourceManagerObjectFactory; + + +/** + * This factory class reads in properties set in the resource.properties file + * and read in via the command line and uses those properties to create a + * {@link FifoMappedJobQueue}. + * + * @author resneck + * + */ +public class FifoMappedJobQueueFactory implements JobQueueFactory { + + private int stackSize = -1; + private JobRepository repo; + + private static final Logger LOG = + Logger.getLogger(FifoMappedJobQueueFactory.class.getName()); + + public FifoMappedJobQueueFactory() { + try{ + String stackSizeStr = System.getProperty( + "gov.nasa.smap.spdm.resource.jobqueue.fifomappedjobqueue.maxstacksize"); + + if (stackSizeStr != null) { + stackSize = Integer.parseInt(stackSizeStr); + } + + String jobRepoFactoryClassStr = System.getProperty( + "resource.jobrepo.factory", + "gov.nasa.smap.spdm.resource.jobrepo.SmapMemoryJobRepositoryFactory"); + this.repo = GenericResourceManagerObjectFactory. + getJobRepositoryFromServiceFactory(jobRepoFactoryClassStr); + }catch(Exception e){ + LOG.log(Level.SEVERE, "An error occurred while creating a " + + "FifoMappedJobQueue: " + e.getMessage()); + } + + } + + /** + * @see org.apache.oodt.cas.resource.jobqueue.JobQueueFactory#createQueue() + */ + public JobQueue createQueue() { + return new FifoMappedJobQueue(stackSize, repo); + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/oodt/blob/4f3588ce/resource/src/main/java/org/apache/oodt/cas/resource/jobqueue/MappedJobQueue.java ---------------------------------------------------------------------- diff --git a/resource/src/main/java/org/apache/oodt/cas/resource/jobqueue/MappedJobQueue.java b/resource/src/main/java/org/apache/oodt/cas/resource/jobqueue/MappedJobQueue.java new file mode 100644 index 0000000..fc2c636 --- /dev/null +++ b/resource/src/main/java/org/apache/oodt/cas/resource/jobqueue/MappedJobQueue.java @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.oodt.cas.resource.jobqueue; + +import java.util.List; + +import org.apache.oodt.cas.resource.jobqueue.JobQueue; +import org.apache.oodt.cas.resource.structs.JobSpec; +import org.apache.oodt.cas.resource.structs.exceptions.JobQueueException; +import org.apache.oodt.cas.resource.structs.exceptions.JobRepositoryException; + +/** + * The interface for a {@link JobQueue} that is aware of the different queues in + * the ResourceManager and allows for the manipulation of jobs on a + * queue-by-queue basis. + * + * @author resneck + * + */ +public interface MappedJobQueue extends JobQueue { + + /** + * Returns a boolean value representing whether or not the job queue contains + * jobs in the given queue. + * + * @param queueName + * The name of the queue which you are checking. + * @return true, if the queue is empty, false otherwise. + * @throws JobQueueException + * If the given queue name is null or if no queue with that name + * exists. + */ + public boolean isEmpty(String queueName) throws JobQueueException; + + /** + * Gets the next {@link JobSpec} in the queue with the given name. + * + * @param queueName + * The name of the queue from which the next {@link JobSpec} will be + * returned. + * @return The next {@link JobSpec} from the jobqueue belonging to the queue + * with the given name. + * @throws JobQueueException + * If the given queue name is null or if no queue with that name + * exists. + */ + public JobSpec getNextJob(String queueName) + throws JobQueueException, JobRepositoryException; + + /** + * Gets the number of jobs in the queue with the given name. + * + * @param queueName + * The name of the queue whos size will be given. + * @return The number of {@link JobSpec}s in the queue with the given name. + * @throws JobQueueException + * If the given queue name is null or if no queue with that name + * exists. + */ + public int getSize(String queueName) throws JobQueueException; + + /** + * Removes the {@link JobSpec} with the given ID from the queue. + * + * @param id + * The ID of the {@link JobSpec} that will be removed. + * @throws JobQueueException + * If no {@link JobSpec} has the given ID. + */ + public void removeJob(JobSpec spec) throws JobQueueException; + + /** + * Add a queue with the given name. + * + * @param queueName + * The name of the queue to be created. + * @throws JobQueueException + * If the given queue name is null or if a queue with the given name + * already exists. + */ + public void addQueue(String queueName) throws JobQueueException; + + /** + * Remove the queue with the given name. + * + * @param queueName + * The name of the queue to be removed. + * @throws JobQueueException + * If the given queue name is null or if no queue with the given + * name exists. + */ + public void removeQueue(String queueName) throws JobQueueException; + + /** + * Gets a list of all queued jobs that belong to the queue with the given + * name. + * + * @param queueName + * The name of the queue whose members will be given. + * @return A {@link List} of queued {@JobSpec}s from the given queue. + * @throws JobQueueException + * If the given queue name is null or if no queue with the given + * name exists. + */ + public List<JobSpec> getQueuedJobs(String queueName) + throws JobQueueException, JobRepositoryException; + + public String addJob(JobSpec spec) throws JobQueueException; + + public String requeueJob(JobSpec spec) throws JobQueueException; + + public void promoteJob(JobSpec spec) throws JobQueueException; + + public void promoteKeyValPair(String key, String val) + throws JobQueueException, JobRepositoryException; + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/oodt/blob/4f3588ce/resource/src/main/java/org/apache/oodt/cas/resource/structs/Job.java ---------------------------------------------------------------------- diff --git a/resource/src/main/java/org/apache/oodt/cas/resource/structs/Job.java b/resource/src/main/java/org/apache/oodt/cas/resource/structs/Job.java index 88d857a..a947c0c 100644 --- a/resource/src/main/java/org/apache/oodt/cas/resource/structs/Job.java +++ b/resource/src/main/java/org/apache/oodt/cas/resource/structs/Job.java @@ -49,6 +49,9 @@ public class Job { /* the status of this job */ private String status = null; + + /* ready or not ? */ + private boolean ready = true; /** * Default Constructor. @@ -77,6 +80,7 @@ public class Job { this.jobInputClassName = jobInputClassName; this.loadValue = loadValue; this.queueName = queueName; + this.ready = true; } /** @@ -169,5 +173,13 @@ public class Job { public void setStatus(String status) { this.status = status; } + + public boolean getReady(){ + return this.ready; + } + + public void setReady(boolean readyValue){ + this.ready = readyValue; + } } http://git-wip-us.apache.org/repos/asf/oodt/blob/4f3588ce/resource/src/main/java/org/apache/oodt/cas/resource/structs/JobInput.java ---------------------------------------------------------------------- diff --git a/resource/src/main/java/org/apache/oodt/cas/resource/structs/JobInput.java b/resource/src/main/java/org/apache/oodt/cas/resource/structs/JobInput.java index fc6011d..f307bc6 100644 --- a/resource/src/main/java/org/apache/oodt/cas/resource/structs/JobInput.java +++ b/resource/src/main/java/org/apache/oodt/cas/resource/structs/JobInput.java @@ -18,6 +18,11 @@ package org.apache.oodt.cas.resource.structs; + +import java.util.Map; +import java.util.Vector; + +import org.apache.oodt.cas.metadata.Metadata; //OODT imports import org.apache.oodt.cas.resource.util.Configurable; import org.apache.oodt.cas.resource.util.XmlRpcWriteable; @@ -39,4 +44,7 @@ public interface JobInput extends XmlRpcWriteable, Configurable { */ String getId(); + Map<String, Vector<String>> getMetadata(); + + } http://git-wip-us.apache.org/repos/asf/oodt/blob/4f3588ce/resource/src/main/java/org/apache/oodt/cas/resource/structs/NameValueJobInput.java ---------------------------------------------------------------------- diff --git a/resource/src/main/java/org/apache/oodt/cas/resource/structs/NameValueJobInput.java b/resource/src/main/java/org/apache/oodt/cas/resource/structs/NameValueJobInput.java index 73c26ab..0195c9c 100644 --- a/resource/src/main/java/org/apache/oodt/cas/resource/structs/NameValueJobInput.java +++ b/resource/src/main/java/org/apache/oodt/cas/resource/structs/NameValueJobInput.java @@ -20,8 +20,11 @@ package org.apache.oodt.cas.resource.structs; //JDK imports import java.util.concurrent.ConcurrentHashMap; +import java.util.Collections; +import java.util.HashMap; import java.util.Map; import java.util.Properties; +import java.util.Vector; /** * @author mattmann @@ -114,4 +117,18 @@ public class NameValueJobInput implements JobInput { } } + @Override + public Map<String, Vector<String>> getMetadata() { + Map<String, Vector<String>> met = new HashMap<String, Vector<String>>(); + if (props != null && props.keySet() != null && props.keySet().size() > 0){ + for (Object key: props.values()){ + String keyName = (String)key; + Vector<String> vals = new Vector<String>(); + vals.add(props.getProperty(keyName)); + met.put(keyName, vals); + } + } + return met; + } + } http://git-wip-us.apache.org/repos/asf/oodt/blob/4f3588ce/resource/src/test/java/org/apache/oodt/cas/resource/jobqueue/TestFifoMappedJobQueue.java ---------------------------------------------------------------------- diff --git a/resource/src/test/java/org/apache/oodt/cas/resource/jobqueue/TestFifoMappedJobQueue.java b/resource/src/test/java/org/apache/oodt/cas/resource/jobqueue/TestFifoMappedJobQueue.java new file mode 100644 index 0000000..cbcfb12 --- /dev/null +++ b/resource/src/test/java/org/apache/oodt/cas/resource/jobqueue/TestFifoMappedJobQueue.java @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.oodt.cas.resource.jobqueue; + +import org.apache.oodt.cas.resource.jobrepo.JobRepository; +import org.apache.oodt.cas.resource.jobrepo.MemoryJobRepository; +import org.apache.oodt.cas.resource.structs.Job; +import org.apache.oodt.cas.resource.structs.JobSpec; + +import junit.framework.TestCase; + +public class TestFifoMappedJobQueue extends TestCase { + + FifoMappedJobQueue q; + + private JobSpec[] jobs = new JobSpec[4]; + + private final static JobRepository repo = new MemoryJobRepository(); + + public TestFifoMappedJobQueue() { + } + + protected void setUp() { + q = new FifoMappedJobQueue(2, repo); + jobs[0] = new JobSpec(null, + new Job("job0q0", "j0", null, null, "queue0", new Integer(1))); + jobs[1] = new JobSpec(null, + new Job("job0q1", "j1", null, null, "queue1", new Integer(1))); + jobs[2] = new JobSpec(null, + new Job("job1q0", "j2", null, null, "queue0", new Integer(1))); + jobs[3] = new JobSpec(null, + new Job("job2q0", "j3", null, null, "queue0", new Integer(1))); + + try { + q.addQueue("queue0"); + q.addQueue("queue1"); + q.addQueue("queue2"); + } catch (Exception e) { + fail(e.getMessage()); + } + } + + public void testGetNextJob() { + // Ensure that requesting a job from a valid, empty queue returns a null job + // spec + try { + JobSpec s = q.getNextJob("queue2"); + assertNull(s); + } catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } + + public void testPurge() { + try { + q.addJob(this.jobs[0]); + q.addJob(this.jobs[1]); + q.purge(); + assertEquals(q.getSize(), 0); + } catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } + + public void testIsEmpty() { + + try { + for (String queueName : q.getQueueNames()) { + assertTrue(q.isEmpty(queueName)); + } + } catch (Exception e) { + fail(e.getMessage()); + } + } + + public void testGetJobRepository() { + JobRepository r = q.getJobRepository(); + assertSame(r, this.repo); + } + + public void testGetCapacity() { + assertEquals(2, q.getCapacity()); + } +} \ No newline at end of file
