http://git-wip-us.apache.org/repos/asf/activemq/blob/3424e04f/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobLocationsMarshaller.java ---------------------------------------------------------------------- diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobLocationsMarshaller.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobLocationsMarshaller.java deleted file mode 100644 index e22e4df..0000000 --- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobLocationsMarshaller.java +++ /dev/null @@ -1,53 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.store.kahadb.scheduler; - -import java.io.DataInput; -import java.io.DataOutput; -import java.io.IOException; -import java.util.ArrayList; -import java.util.List; - -import org.apache.activemq.store.kahadb.disk.util.VariableMarshaller; - -/** - * A VariableMarshaller instance that performs the read and write of a list of - * JobLocation objects using the JobLocation's built in read and write methods. - */ -class JobLocationsMarshaller extends VariableMarshaller<List<JobLocation>> { - static JobLocationsMarshaller INSTANCE = new JobLocationsMarshaller(); - - @Override - public List<JobLocation> readPayload(DataInput dataIn) throws IOException { - List<JobLocation> result = new ArrayList<JobLocation>(); - int size = dataIn.readInt(); - for (int i = 0; i < size; i++) { - JobLocation jobLocation = new JobLocation(); - jobLocation.readExternal(dataIn); - result.add(jobLocation); - } - return result; - } - - @Override - public void writePayload(List<JobLocation> value, DataOutput dataOut) throws IOException { - dataOut.writeInt(value.size()); - for (JobLocation jobLocation : value) { - jobLocation.writeExternal(dataOut); - } - } -} \ No newline at end of file
http://git-wip-us.apache.org/repos/asf/activemq/blob/3424e04f/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobSchedulerImpl.java ---------------------------------------------------------------------- diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobSchedulerImpl.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobSchedulerImpl.java index bcb819c..455801a 100644 --- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobSchedulerImpl.java +++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobSchedulerImpl.java @@ -32,15 +32,11 @@ import org.apache.activemq.broker.scheduler.CronParser; import org.apache.activemq.broker.scheduler.Job; import org.apache.activemq.broker.scheduler.JobListener; import org.apache.activemq.broker.scheduler.JobScheduler; -import org.apache.activemq.protobuf.Buffer; -import org.apache.activemq.store.kahadb.data.KahaAddScheduledJobCommand; -import org.apache.activemq.store.kahadb.data.KahaRemoveScheduledJobCommand; -import org.apache.activemq.store.kahadb.data.KahaRemoveScheduledJobsCommand; -import org.apache.activemq.store.kahadb.data.KahaRescheduleJobCommand; import org.apache.activemq.store.kahadb.disk.index.BTreeIndex; import org.apache.activemq.store.kahadb.disk.journal.Location; import org.apache.activemq.store.kahadb.disk.page.Transaction; import org.apache.activemq.store.kahadb.disk.util.LongMarshaller; +import org.apache.activemq.store.kahadb.disk.util.VariableMarshaller; import org.apache.activemq.util.ByteSequence; import org.apache.activemq.util.IdGenerator; import org.apache.activemq.util.ServiceStopper; @@ -48,13 +44,12 @@ import org.apache.activemq.util.ServiceSupport; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class JobSchedulerImpl extends ServiceSupport implements Runnable, JobScheduler { - +class JobSchedulerImpl extends ServiceSupport implements Runnable, JobScheduler { private static final Logger LOG = LoggerFactory.getLogger(JobSchedulerImpl.class); - private final JobSchedulerStoreImpl store; + final JobSchedulerStoreImpl store; private final AtomicBoolean running = new AtomicBoolean(); private String name; - private BTreeIndex<Long, List<JobLocation>> index; + BTreeIndex<Long, List<JobLocation>> index; private Thread thread; private final AtomicBoolean started = new AtomicBoolean(false); private final List<JobListener> jobListeners = new CopyOnWriteArrayList<JobListener>(); @@ -69,163 +64,233 @@ public class JobSchedulerImpl extends ServiceSupport implements Runnable, JobSch this.name = name; } + /* + * (non-Javadoc) + * + * @see org.apache.activemq.beanstalk.JobScheduler#getName() + */ @Override public String getName() { return this.name; } + /* + * (non-Javadoc) + * + * @see org.apache.activemq.beanstalk.JobScheduler#addListener(org.apache.activemq .beanstalk.JobListener) + */ @Override public void addListener(JobListener l) { this.jobListeners.add(l); } + /* + * (non-Javadoc) + * + * @see org.apache.activemq.beanstalk.JobScheduler#removeListener(org.apache. activemq.beanstalk.JobListener) + */ @Override public void removeListener(JobListener l) { this.jobListeners.remove(l); } @Override - public void schedule(final String jobId, final ByteSequence payload, final long delay) throws IOException { - doSchedule(jobId, payload, "", 0, delay, 0); + public synchronized void schedule(final String jobId, final ByteSequence payload, final long delay) throws IOException { + this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() { + @Override + public void execute(Transaction tx) throws IOException { + schedule(tx, jobId, payload, "", 0, delay, 0); + } + }); } @Override - public void schedule(final String jobId, final ByteSequence payload, final String cronEntry) throws Exception { - doSchedule(jobId, payload, cronEntry, 0, 0, 0); + public synchronized void schedule(final String jobId, final ByteSequence payload, final String cronEntry) throws Exception { + this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() { + @Override + public void execute(Transaction tx) throws IOException { + schedule(tx, jobId, payload, cronEntry, 0, 0, 0); + } + }); } @Override - public void schedule(final String jobId, final ByteSequence payload, final String cronEntry, final long delay, final long period, final int repeat) throws IOException { - doSchedule(jobId, payload, cronEntry, delay, period, repeat); + public synchronized void schedule(final String jobId, final ByteSequence payload, final String cronEntry, final long delay, final long period, + final int repeat) throws IOException { + this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() { + @Override + public void execute(Transaction tx) throws IOException { + schedule(tx, jobId, payload, cronEntry, delay, period, repeat); + } + }); } + /* + * (non-Javadoc) + * + * @see org.apache.activemq.beanstalk.JobScheduler#remove(long) + */ @Override - public void remove(final long time) throws IOException { - doRemoveRange(time, time); + public synchronized void remove(final long time) throws IOException { + this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() { + @Override + public void execute(Transaction tx) throws IOException { + remove(tx, time); + } + }); } - @Override - public void remove(final String jobId) throws IOException { - doRemove(-1, jobId); + synchronized void removeFromIndex(final long time, final String jobId) throws IOException { + this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() { + @Override + public void execute(Transaction tx) throws IOException { + removeFromIndex(tx, time, jobId); + } + }); } - @Override - public void removeAllJobs() throws IOException { - doRemoveRange(0, Long.MAX_VALUE); + /* + * (non-Javadoc) + * + * @see org.apache.activemq.beanstalk.JobScheduler#remove(long, java.lang.String) + */ + public synchronized void remove(final long time, final String jobId) throws IOException { + this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() { + @Override + public void execute(Transaction tx) throws IOException { + remove(tx, time, jobId); + } + }); + } + + synchronized void remove(final long time, final List<JobLocation> jobIds) throws IOException { + this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() { + @Override + public void execute(Transaction tx) throws IOException { + remove(tx, time, jobIds); + } + }); } + /* + * (non-Javadoc) + * + * @see org.apache.activemq.beanstalk.JobScheduler#remove(java.lang.String) + */ @Override - public void removeAllJobs(final long start, final long finish) throws IOException { - doRemoveRange(start, finish); + public synchronized void remove(final String jobId) throws IOException { + this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() { + @Override + public void execute(Transaction tx) throws IOException { + remove(tx, jobId); + } + }); } @Override - public long getNextScheduleTime() throws IOException { - this.store.readLockIndex(); - try { - Map.Entry<Long, List<JobLocation>> first = this.index.getFirst(this.store.getPageFile().tx()); - return first != null ? first.getKey() : -1l; - } finally { - this.store.readUnlockIndex(); - } + public synchronized long getNextScheduleTime() throws IOException { + Map.Entry<Long, List<JobLocation>> first = this.index.getFirst(this.store.getPageFile().tx()); + return first != null ? first.getKey() : -1l; } + /* + * (non-Javadoc) + * + * @see org.apache.activemq.beanstalk.JobScheduler#getNextScheduleJobs() + */ @Override - public List<Job> getNextScheduleJobs() throws IOException { + public synchronized List<Job> getNextScheduleJobs() throws IOException { final List<Job> result = new ArrayList<Job>(); - this.store.readLockIndex(); - try { - this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() { - @Override - public void execute(Transaction tx) throws IOException { - Map.Entry<Long, List<JobLocation>> first = index.getFirst(tx); - if (first != null) { - for (JobLocation jl : first.getValue()) { - ByteSequence bs = getPayload(jl.getLocation()); - Job job = new JobImpl(jl, bs); - result.add(job); - } + + this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() { + @Override + public void execute(Transaction tx) throws IOException { + Map.Entry<Long, List<JobLocation>> first = index.getFirst(store.getPageFile().tx()); + if (first != null) { + for (JobLocation jl : first.getValue()) { + ByteSequence bs = getPayload(jl.getLocation()); + Job job = new JobImpl(jl, bs); + result.add(job); } } - }); - } finally { - this.store.readUnlockIndex(); - } - return result; - } - - private Map.Entry<Long, List<JobLocation>> getNextToSchedule() throws IOException { - this.store.readLockIndex(); - try { - if (!this.store.isStopped() && !this.store.isStopping()) { - Map.Entry<Long, List<JobLocation>> first = this.index.getFirst(this.store.getPageFile().tx()); - return first; } - } finally { - this.store.readUnlockIndex(); - } - return null; + }); + return result; } @Override - public List<Job> getAllJobs() throws IOException { + public synchronized List<Job> getAllJobs() throws IOException { final List<Job> result = new ArrayList<Job>(); - this.store.readLockIndex(); - try { - this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() { - @Override - public void execute(Transaction tx) throws IOException { - Iterator<Map.Entry<Long, List<JobLocation>>> iter = index.iterator(store.getPageFile().tx()); - while (iter.hasNext()) { - Map.Entry<Long, List<JobLocation>> next = iter.next(); - if (next != null) { - for (JobLocation jl : next.getValue()) { - ByteSequence bs = getPayload(jl.getLocation()); - Job job = new JobImpl(jl, bs); - result.add(job); - } - } else { - break; + this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() { + @Override + public void execute(Transaction tx) throws IOException { + Iterator<Map.Entry<Long, List<JobLocation>>> iter = index.iterator(store.getPageFile().tx()); + while (iter.hasNext()) { + Map.Entry<Long, List<JobLocation>> next = iter.next(); + if (next != null) { + for (JobLocation jl : next.getValue()) { + ByteSequence bs = getPayload(jl.getLocation()); + Job job = new JobImpl(jl, bs); + result.add(job); } + } else { + break; } } - }); - } finally { - this.store.readUnlockIndex(); - } + } + }); return result; } @Override - public List<Job> getAllJobs(final long start, final long finish) throws IOException { + public synchronized List<Job> getAllJobs(final long start, final long finish) throws IOException { final List<Job> result = new ArrayList<Job>(); - this.store.readLockIndex(); - try { - this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() { - @Override - public void execute(Transaction tx) throws IOException { - Iterator<Map.Entry<Long, List<JobLocation>>> iter = index.iterator(tx, start); - while (iter.hasNext()) { - Map.Entry<Long, List<JobLocation>> next = iter.next(); - if (next != null && next.getKey().longValue() <= finish) { - for (JobLocation jl : next.getValue()) { - ByteSequence bs = getPayload(jl.getLocation()); - Job job = new JobImpl(jl, bs); - result.add(job); - } - } else { - break; + this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() { + @Override + public void execute(Transaction tx) throws IOException { + Iterator<Map.Entry<Long, List<JobLocation>>> iter = index.iterator(store.getPageFile().tx(), start); + while (iter.hasNext()) { + Map.Entry<Long, List<JobLocation>> next = iter.next(); + if (next != null && next.getKey().longValue() <= finish) { + for (JobLocation jl : next.getValue()) { + ByteSequence bs = getPayload(jl.getLocation()); + Job job = new JobImpl(jl, bs); + result.add(job); } + } else { + break; } } - }); - } finally { - this.store.readUnlockIndex(); - } + } + }); return result; } - private void doSchedule(final String jobId, final ByteSequence payload, final String cronEntry, long delay, long period, int repeat) throws IOException { + @Override + public synchronized void removeAllJobs() throws IOException { + this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() { + @Override + public void execute(Transaction tx) throws IOException { + destroy(tx); + } + }); + } + + @Override + public synchronized void removeAllJobs(final long start, final long finish) throws IOException { + this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() { + @Override + public void execute(Transaction tx) throws IOException { + destroy(tx, start, finish); + } + }); + } + + ByteSequence getPayload(Location location) throws IllegalStateException, IOException { + return this.store.getPayload(location); + } + + void schedule(Transaction tx, String jobId, ByteSequence payload, String cronEntry, long delay, long period, int repeat) throws IOException { long startTime = System.currentTimeMillis(); // round startTime - so we can schedule more jobs // at the same time @@ -243,86 +308,38 @@ public class JobSchedulerImpl extends ServiceSupport implements Runnable, JobSch // start time not set by CRON - so it it to the current time time = startTime; } - if (delay > 0) { time += delay; } else { time += period; } - KahaAddScheduledJobCommand newJob = new KahaAddScheduledJobCommand(); - newJob.setScheduler(name); - newJob.setJobId(jobId); - newJob.setStartTime(startTime); - newJob.setCronEntry(cronEntry); - newJob.setDelay(delay); - newJob.setPeriod(period); - newJob.setRepeat(repeat); - newJob.setNextExecutionTime(time); - newJob.setPayload(new Buffer(payload.getData(), payload.getOffset(), payload.getLength())); - - this.store.store(newJob); - } - - private void doReschedule(final String jobId, long executionTime, long nextExecutionTime, int rescheduledCount) throws IOException { - KahaRescheduleJobCommand update = new KahaRescheduleJobCommand(); - update.setScheduler(name); - update.setJobId(jobId); - update.setExecutionTime(executionTime); - update.setNextExecutionTime(nextExecutionTime); - update.setRescheduledCount(rescheduledCount); - this.store.store(update); - } - - private void doRemove(final long executionTime, final List<JobLocation> jobs) throws IOException { - for (JobLocation job : jobs) { - doRemove(executionTime, job.getJobId()); - } - } - - private void doRemove(long executionTime, final String jobId) throws IOException { - KahaRemoveScheduledJobCommand remove = new KahaRemoveScheduledJobCommand(); - remove.setScheduler(name); - remove.setJobId(jobId); - remove.setNextExecutionTime(executionTime); - this.store.store(remove); - } - - private void doRemoveRange(long start, long end) throws IOException { - KahaRemoveScheduledJobsCommand destroy = new KahaRemoveScheduledJobsCommand(); - destroy.setScheduler(name); - destroy.setStartTime(start); - destroy.setEndTime(end); - this.store.store(destroy); - } - - /** - * Adds a new Scheduled job to the index. Must be called under index lock. - * - * This method must ensure that a duplicate add is not processed into the scheduler. On index - * recover some adds may be replayed and we don't allow more than one instance of a JobId to - * exist at any given scheduled time, so filter these out to ensure idempotence. - * - * @param tx - * Transaction in which the update is performed. - * @param command - * The new scheduled job command to process. - * @param location - * The location where the add command is stored in the journal. - * - * @throws IOException if an error occurs updating the index. - */ - protected void process(final Transaction tx, final KahaAddScheduledJobCommand command, Location location) throws IOException { + Location location = this.store.write(payload, false); JobLocation jobLocation = new JobLocation(location); - jobLocation.setJobId(command.getJobId()); - jobLocation.setStartTime(command.getStartTime()); - jobLocation.setCronEntry(command.getCronEntry()); - jobLocation.setDelay(command.getDelay()); - jobLocation.setPeriod(command.getPeriod()); - jobLocation.setRepeat(command.getRepeat()); - - long nextExecutionTime = command.getNextExecutionTime(); + this.store.incrementJournalCount(tx, location); + jobLocation.setJobId(jobId); + jobLocation.setStartTime(startTime); + jobLocation.setCronEntry(cronEntry); + jobLocation.setDelay(delay); + jobLocation.setPeriod(period); + jobLocation.setRepeat(repeat); + if (LOG.isDebugEnabled()) { + LOG.debug("Scheduling " + jobLocation); + } + storeJob(tx, jobLocation, time); + this.scheduleTime.newJob(); + } + + synchronized void storeJob(final JobLocation jobLocation, final long nextExecutionTime) throws IOException { + this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() { + @Override + public void execute(Transaction tx) throws IOException { + storeJob(tx, jobLocation, nextExecutionTime); + } + }); + } + void storeJob(final Transaction tx, final JobLocation jobLocation, final long nextExecutionTime) throws IOException { List<JobLocation> values = null; jobLocation.setNextTime(nextExecutionTime); if (this.index.containsKey(tx, nextExecutionTime)) { @@ -331,239 +348,106 @@ public class JobSchedulerImpl extends ServiceSupport implements Runnable, JobSch if (values == null) { values = new ArrayList<JobLocation>(); } + values.add(jobLocation); + this.index.put(tx, nextExecutionTime, values); + } - // There can never be more than one instance of the same JobId scheduled at any - // given time, when it happens its probably the result of index recovery and this - // method must be idempotent so check for it first. - if (!values.contains(jobLocation)) { - values.add(jobLocation); - - // Reference the log file where the add command is stored to prevent GC. - this.store.incrementJournalCount(tx, location); - this.index.put(tx, nextExecutionTime, values); - this.scheduleTime.newJob(); - } else { - this.index.put(tx, nextExecutionTime, values); - LOG.trace("Job {} already in scheduler at this time {}", - jobLocation.getJobId(), jobLocation.getNextTime()); + void remove(Transaction tx, long time, String jobId) throws IOException { + JobLocation result = removeFromIndex(tx, time, jobId); + if (result != null) { + this.store.decrementJournalCount(tx, result.getLocation()); } } - /** - * Reschedules a Job after it has be fired. - * - * For jobs that are repeating this method updates the job in the index by adding it to the - * jobs list for the new execution time. If the job is not a cron type job then this method - * will reduce the repeat counter if the job has a fixed number of repeats set. The Job will - * be removed from the jobs list it just executed on. - * - * This method must also update the value of the last update location in the JobLocation - * instance so that the checkpoint worker doesn't drop the log file in which that command lives. - * - * This method must ensure that an reschedule command that references a job that doesn't exist - * does not cause an error since it's possible that on recover the original add might be gone - * and so the job should not reappear in the scheduler. - * - * @param tx - * The TX under which the index is updated. - * @param command - * The reschedule command to process. - * @param location - * The location in the index where the reschedule command was stored. - * - * @throws IOException if an error occurs during the reschedule. - */ - protected void process(final Transaction tx, final KahaRescheduleJobCommand command, Location location) throws IOException { + JobLocation removeFromIndex(Transaction tx, long time, String jobId) throws IOException { JobLocation result = null; - final List<JobLocation> current = this.index.remove(tx, command.getExecutionTime()); - if (current != null) { - for (int i = 0; i < current.size(); i++) { - JobLocation jl = current.get(i); - if (jl.getJobId().equals(command.getJobId())) { - current.remove(i); - if (!current.isEmpty()) { - this.index.put(tx, command.getExecutionTime(), current); + List<JobLocation> values = this.index.remove(tx, time); + if (values != null) { + for (int i = 0; i < values.size(); i++) { + JobLocation jl = values.get(i); + if (jl.getJobId().equals(jobId)) { + values.remove(i); + if (!values.isEmpty()) { + this.index.put(tx, time, values); } result = jl; break; } } - } else { - LOG.debug("Process reschedule command for job {} non-existent executime time {}.", - command.getJobId(), command.getExecutionTime()); } + return result; + } + private void remove(Transaction tx, long time, List<JobLocation> jobIds) throws IOException { + List<JobLocation> result = removeFromIndex(tx, time, jobIds); if (result != null) { - Location previousUpdate = result.getLastUpdate(); - - List<JobLocation> target = null; - result.setNextTime(command.getNextExecutionTime()); - result.setLastUpdate(location); - result.setRescheduledCount(command.getRescheduledCount()); - if (!result.isCron() && result.getRepeat() > 0) { - result.setRepeat(result.getRepeat() - 1); + for (JobLocation jl : result) { + this.store.decrementJournalCount(tx, jl.getLocation()); } - if (this.index.containsKey(tx, command.getNextExecutionTime())) { - target = this.index.remove(tx, command.getNextExecutionTime()); - } - if (target == null) { - target = new ArrayList<JobLocation>(); - } - target.add(result); - - // Track the location of the last reschedule command and release the log file - // reference for the previous one if there was one. - this.store.incrementJournalCount(tx, location); - if (previousUpdate != null) { - this.store.decrementJournalCount(tx, previousUpdate); - } - - this.index.put(tx, command.getNextExecutionTime(), target); - this.scheduleTime.newJob(); - } else { - LOG.debug("Process reschedule command for non-scheduled job {} at executime time {}.", - command.getJobId(), command.getExecutionTime()); } } - /** - * Removes a scheduled job from the scheduler. - * - * The remove operation can be of two forms. The first is that there is a job Id but no set time - * (-1) in which case the jobs index is searched until the target job Id is located. The alternate - * form is that a job Id and execution time are both set in which case the given time is checked - * for a job matching that Id. In either case once an execution time is identified the job is - * removed and the index updated. - * - * This method should ensure that if the matching job is not found that no error results as it - * is possible that on a recover the initial add command could be lost so the job may not be - * rescheduled. - * - * @param tx - * The transaction under which the index is updated. - * @param command - * The remove command to process. - * @param location - * The location of the remove command in the Journal. - * - * @throws IOException if an error occurs while updating the scheduler index. - */ - void process(final Transaction tx, final KahaRemoveScheduledJobCommand command, Location location) throws IOException { - - // Case 1: JobId and no time value means find the job and remove it. - // Case 2: JobId and a time value means find exactly this scheduled job. - - Long executionTime = command.getNextExecutionTime(); - - List<JobLocation> values = null; - - if (executionTime == -1) { - for (Iterator<Map.Entry<Long, List<JobLocation>>> i = this.index.iterator(tx); i.hasNext();) { - Map.Entry<Long, List<JobLocation>> entry = i.next(); - List<JobLocation> candidates = entry.getValue(); - if (candidates != null) { - for (JobLocation jl : candidates) { - if (jl.getJobId().equals(command.getJobId())) { - LOG.trace("Entry {} contains the remove target: {}", entry.getKey(), command.getJobId()); - executionTime = entry.getKey(); - values = this.index.remove(tx, executionTime); - break; - } - } - } - } - } else { - values = this.index.remove(tx, executionTime); - } - - JobLocation removed = null; - - // Remove the job and update the index if there are any other jobs scheduled at this time. + private List<JobLocation> removeFromIndex(Transaction tx, long time, List<JobLocation> Jobs) throws IOException { + List<JobLocation> result = null; + List<JobLocation> values = this.index.remove(tx, time); if (values != null) { - for (JobLocation job : values) { - if (job.getJobId().equals(command.getJobId())) { - removed = job; - values.remove(removed); - break; + result = new ArrayList<JobLocation>(values.size()); + + for (JobLocation job : Jobs) { + if (values.remove(job)) { + result.add(job); } } if (!values.isEmpty()) { - this.index.put(tx, executionTime, values); + this.index.put(tx, time, values); } } + return result; + } - if (removed != null) { - LOG.trace("{} removed from scheduler {}", removed, this); - - // Remove the references for add and reschedule commands for this job - // so that those logs can be GC'd when free. - this.store.decrementJournalCount(tx, removed.getLocation()); - if (removed.getLastUpdate() != null) { - this.store.decrementJournalCount(tx, removed.getLastUpdate()); + void remove(Transaction tx, long time) throws IOException { + List<JobLocation> values = this.index.remove(tx, time); + if (values != null) { + for (JobLocation jl : values) { + this.store.decrementJournalCount(tx, jl.getLocation()); } - - // now that the job is removed from the index we can store the remove info and - // then dereference the log files that hold the initial add command and the most - // recent update command. - this.store.referenceRemovedLocation(tx, location, removed); } } - /** - * Removes all scheduled jobs within a given time range. - * - * The method can be used to clear the entire scheduler index by specifying a range that - * encompasses all time [0...Long.MAX_VALUE] or a single execution time can be removed by - * setting start and end time to the same value. - * - * @param tx - * The transaction under which the index is updated. - * @param command - * The remove command to process. - * @param location - * The location of the remove command in the Journal. - * - * @throws IOException if an error occurs while updating the scheduler index. - */ - protected void process(final Transaction tx, final KahaRemoveScheduledJobsCommand command, Location location) throws IOException { - removeInRange(tx, command.getStartTime(), command.getEndTime(), location); + void remove(Transaction tx, String id) throws IOException { + for (Iterator<Map.Entry<Long, List<JobLocation>>> i = this.index.iterator(tx); i.hasNext();) { + Map.Entry<Long, List<JobLocation>> entry = i.next(); + List<JobLocation> values = entry.getValue(); + if (values != null) { + for (JobLocation jl : values) { + if (jl.getJobId().equals(id)) { + remove(tx, entry.getKey(), id); + return; + } + } + } + } } - /** - * Removes all jobs from the schedulers index. Must be called with the index locked. - * - * @param tx - * The transaction under which the index entries for this scheduler are removed. - * - * @throws IOException if an error occurs removing the jobs from the scheduler index. - */ - protected void removeAll(Transaction tx) throws IOException { - this.removeInRange(tx, 0, Long.MAX_VALUE, null); + synchronized void destroy(Transaction tx) throws IOException { + List<Long> keys = new ArrayList<Long>(); + for (Iterator<Map.Entry<Long, List<JobLocation>>> i = this.index.iterator(tx); i.hasNext();) { + Map.Entry<Long, List<JobLocation>> entry = i.next(); + keys.add(entry.getKey()); + } + + for (Long l : keys) { + List<JobLocation> values = this.index.remove(tx, l); + if (values != null) { + for (JobLocation jl : values) { + this.store.decrementJournalCount(tx, jl.getLocation()); + } + } + } } - /** - * Removes all scheduled jobs within the target range. - * - * This method can be used to remove all the stored jobs by passing a range of [0...Long.MAX_VALUE] - * or it can be used to remove all jobs at a given scheduled time by passing the same time value - * for both start and end. If the optional location parameter is set then this method will update - * the store's remove location tracker with the location value and the Jobs that are being removed. - * - * This method must be called with the store index locked for writes. - * - * @param tx - * The transaction under which the index is to be updated. - * @param start - * The start time for the remove operation. - * @param finish - * The end time for the remove operation. - * @param location (optional) - * The location of the remove command that triggered this remove. - * - * @throws IOException if an error occurs during the remove operation. - */ - protected void removeInRange(Transaction tx, long start, long finish, Location location) throws IOException { + synchronized void destroy(Transaction tx, long start, long finish) throws IOException { List<Long> keys = new ArrayList<Long>(); for (Iterator<Map.Entry<Long, List<JobLocation>>> i = this.index.iterator(tx, start); i.hasNext();) { Map.Entry<Long, List<JobLocation>> entry = i.next(); @@ -574,97 +458,32 @@ public class JobSchedulerImpl extends ServiceSupport implements Runnable, JobSch } } - for (Long executionTime : keys) { - List<JobLocation> values = this.index.remove(tx, executionTime); - if (location != null) { - for (JobLocation job : values) { - LOG.trace("Removing {} scheduled at: {}", job, executionTime); - - // Remove the references for add and reschedule commands for this job - // so that those logs can be GC'd when free. - this.store.decrementJournalCount(tx, job.getLocation()); - if (job.getLastUpdate() != null) { - this.store.decrementJournalCount(tx, job.getLastUpdate()); - } - - // now that the job is removed from the index we can store the remove info and - // then dereference the log files that hold the initial add command and the most - // recent update command. - this.store.referenceRemovedLocation(tx, location, job); + for (Long l : keys) { + List<JobLocation> values = this.index.remove(tx, l); + if (values != null) { + for (JobLocation jl : values) { + this.store.decrementJournalCount(tx, jl.getLocation()); } } } } - /** - * Removes a Job from the index using it's Id value and the time it is currently set to - * be executed. This method will only remove the Job if it is found at the given execution - * time. - * - * This method must be called under index lock. - * - * @param tx - * the transaction under which this method is being executed. - * @param jobId - * the target Job Id to remove. - * @param executionTime - * the scheduled time that for the Job Id that is being removed. - * - * @returns true if the Job was removed or false if not found at the given time. - * - * @throws IOException if an error occurs while removing the Job. - */ - protected boolean removeJobAtTime(Transaction tx, String jobId, long executionTime) throws IOException { - boolean result = false; - - List<JobLocation> jobs = this.index.remove(tx, executionTime); - Iterator<JobLocation> jobsIter = jobs.iterator(); - while (jobsIter.hasNext()) { - JobLocation job = jobsIter.next(); - if (job.getJobId().equals(jobId)) { - jobsIter.remove(); - // Remove the references for add and reschedule commands for this job - // so that those logs can be GC'd when free. - this.store.decrementJournalCount(tx, job.getLocation()); - if (job.getLastUpdate() != null) { - this.store.decrementJournalCount(tx, job.getLastUpdate()); - } - result = true; - break; - } + private synchronized Map.Entry<Long, List<JobLocation>> getNextToSchedule() throws IOException { + if (!this.store.isStopped() && !this.store.isStopping()) { + Map.Entry<Long, List<JobLocation>> first = this.index.getFirst(this.store.getPageFile().tx()); + return first; } - - // Return the list to the index modified or unmodified. - this.index.put(tx, executionTime, jobs); - - return result; + return null; } - /** - * Walks the Scheduled Job Tree and collects the add location and last update location - * for all scheduled jobs. - * - * This method must be called with the index locked. - * - * @param tx - * the transaction under which this operation was invoked. - * - * @return a list of all referenced Location values for this JobSchedulerImpl - * - * @throws IOException if an error occurs walking the scheduler tree. - */ - protected List<JobLocation> getAllScheduledJobs(Transaction tx) throws IOException { - List<JobLocation> references = new ArrayList<JobLocation>(); - - for (Iterator<Map.Entry<Long, List<JobLocation>>> i = this.index.iterator(tx); i.hasNext();) { - Map.Entry<Long, List<JobLocation>> entry = i.next(); - List<JobLocation> scheduled = entry.getValue(); - for (JobLocation job : scheduled) { - references.add(job); - } + void fireJob(JobLocation job) throws IllegalStateException, IOException { + if (LOG.isDebugEnabled()) { + LOG.debug("Firing " + job); + } + ByteSequence bs = this.store.getPayload(job.getLocation()); + for (JobListener l : jobListeners) { + l.scheduledJob(job.getJobId(), bs); } - - return references; } @Override @@ -673,14 +492,14 @@ public class JobSchedulerImpl extends ServiceSupport implements Runnable, JobSch mainLoop(); } catch (Throwable e) { if (this.running.get() && isStarted()) { - LOG.error("{} Caught exception in mainloop", this, e); + LOG.error(this + " Caught exception in mainloop", e); } } finally { if (running.get()) { try { stop(); } catch (Exception e) { - LOG.error("Failed to stop {}", this); + LOG.error("Failed to stop " + this); } } } @@ -688,55 +507,56 @@ public class JobSchedulerImpl extends ServiceSupport implements Runnable, JobSch @Override public String toString() { - return "JobScheduler: " + this.name; + return "JobScheduler:" + this.name; } protected void mainLoop() { while (this.running.get()) { this.scheduleTime.clearNewJob(); try { + // peek the next job long currentTime = System.currentTimeMillis(); - // Read the list of scheduled events and fire the jobs, reschedule repeating jobs as - // needed before firing the job event. + // Read the list of scheduled events and fire the jobs. Once done with the batch + // remove all that were fired, and reschedule as needed. Map.Entry<Long, List<JobLocation>> first = getNextToSchedule(); if (first != null) { List<JobLocation> list = new ArrayList<JobLocation>(first.getValue()); - List<JobLocation> toRemove = new ArrayList<JobLocation>(list.size()); + List<JobLocation> fired = new ArrayList<JobLocation>(list.size()); final long executionTime = first.getKey(); long nextExecutionTime = 0; if (executionTime <= currentTime) { for (final JobLocation job : list) { - - if (!running.get()) { - break; - } - int repeat = job.getRepeat(); nextExecutionTime = calculateNextExecutionTime(job, currentTime, repeat); long waitTime = nextExecutionTime - currentTime; this.scheduleTime.setWaitTime(waitTime); - if (!job.isCron()) { + if (job.isCron() == false) { fireJob(job); if (repeat != 0) { - // Reschedule for the next time, the scheduler will take care of - // updating the repeat counter on the update. - doReschedule(job.getJobId(), executionTime, nextExecutionTime, job.getRescheduledCount() + 1); + repeat--; + job.setRepeat(repeat); + // remove this job from the index so it doesn't get destroyed + removeFromIndex(executionTime, job.getJobId()); + // and re-store it + storeJob(job, nextExecutionTime); } else { - toRemove.add(job); + fired.add(job); } } else { + // cron job will have a repeat time. if (repeat == 0) { - // This is a non-repeating Cron entry so we can fire and forget it. + // we haven't got a separate scheduler to execute at + // this time - just a cron job - so fire it fireJob(job); } if (nextExecutionTime > currentTime) { - // Reschedule the cron job as a new event, if the cron entry signals - // a repeat then it will be stored separately and fired as a normal - // event with decrementing repeat. - doReschedule(job.getJobId(), executionTime, nextExecutionTime, job.getRescheduledCount() + 1); - + // we will run again ... + // remove this job from the index - so it doesn't get destroyed + removeFromIndex(executionTime, job.getJobId()); + // and re-store it + storeJob(job, nextExecutionTime); if (repeat != 0) { // we have a separate schedule to run at this time // so the cron job is used to set of a separate schedule @@ -749,14 +569,14 @@ public class JobSchedulerImpl extends ServiceSupport implements Runnable, JobSch this.scheduleTime.setWaitTime(waitTime); } } else { - toRemove.add(job); + fired.add(job); } } } // now remove all jobs that have not been rescheduled from this execution // time, if there are no more entries in that time it will be removed. - doRemove(executionTime, toRemove); + remove(executionTime, fired); // If there is a job that should fire before the currently set wait time // we need to reset wait time otherwise we'll miss it. @@ -768,30 +588,25 @@ public class JobSchedulerImpl extends ServiceSupport implements Runnable, JobSch } } } else { + if (LOG.isDebugEnabled()) { + LOG.debug("Not yet time to execute the job, waiting " + (executionTime - currentTime) + " ms"); + } this.scheduleTime.setWaitTime(executionTime - currentTime); } } this.scheduleTime.pause(); } catch (Exception ioe) { - LOG.error("{} Failed to schedule job", this.name, ioe); + LOG.error(this.name + " Failed to schedule job", ioe); try { this.store.stop(); } catch (Exception e) { - LOG.error("{} Failed to shutdown JobSchedulerStore", this.name, e); + LOG.error(this.name + " Failed to shutdown JobSchedulerStore", e); } } } } - void fireJob(JobLocation job) throws IllegalStateException, IOException { - LOG.debug("Firing: {}", job); - ByteSequence bs = this.store.getPayload(job.getLocation()); - for (JobListener l : jobListeners) { - l.scheduledJob(job.getJobId(), bs); - } - } - @Override public void startDispatching() throws Exception { if (!this.running.get()) { @@ -812,7 +627,7 @@ public class JobSchedulerImpl extends ServiceSupport implements Runnable, JobSch Thread t = this.thread; this.thread = null; if (t != null) { - t.join(3000); + t.join(1000); } } } @@ -828,10 +643,6 @@ public class JobSchedulerImpl extends ServiceSupport implements Runnable, JobSch stopDispatching(); } - private ByteSequence getPayload(Location location) throws IllegalStateException, IOException { - return this.store.getPayload(location); - } - long calculateNextExecutionTime(final JobLocation job, long currentTime, int repeat) throws MessageFormatException { long result = currentTime; String cron = job.getCronEntry(); @@ -849,7 +660,7 @@ public class JobSchedulerImpl extends ServiceSupport implements Runnable, JobSch void load(Transaction tx) throws IOException { this.index.setKeyMarshaller(LongMarshaller.INSTANCE); - this.index.setValueMarshaller(JobLocationsMarshaller.INSTANCE); + this.index.setValueMarshaller(ValueMarshaller.INSTANCE); this.index.load(tx); } @@ -857,7 +668,7 @@ public class JobSchedulerImpl extends ServiceSupport implements Runnable, JobSch this.name = in.readUTF(); this.index = new BTreeIndex<Long, List<JobLocation>>(this.store.getPageFile(), in.readLong()); this.index.setKeyMarshaller(LongMarshaller.INSTANCE); - this.index.setValueMarshaller(JobLocationsMarshaller.INSTANCE); + this.index.setValueMarshaller(ValueMarshaller.INSTANCE); } public void write(DataOutput out) throws IOException { @@ -865,6 +676,30 @@ public class JobSchedulerImpl extends ServiceSupport implements Runnable, JobSch out.writeLong(this.index.getPageId()); } + static class ValueMarshaller extends VariableMarshaller<List<JobLocation>> { + static ValueMarshaller INSTANCE = new ValueMarshaller(); + + @Override + public List<JobLocation> readPayload(DataInput dataIn) throws IOException { + List<JobLocation> result = new ArrayList<JobLocation>(); + int size = dataIn.readInt(); + for (int i = 0; i < size; i++) { + JobLocation jobLocation = new JobLocation(); + jobLocation.readExternal(dataIn); + result.add(jobLocation); + } + return result; + } + + @Override + public void writePayload(List<JobLocation> value, DataOutput dataOut) throws IOException { + dataOut.writeInt(value.size()); + for (JobLocation jobLocation : value) { + jobLocation.writeExternal(dataOut); + } + } + } + static class ScheduleTime { private final int DEFAULT_WAIT = 500; private final int DEFAULT_NEW_JOB_WAIT = 100; http://git-wip-us.apache.org/repos/asf/activemq/blob/3424e04f/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobSchedulerKahaDBMetaData.java ---------------------------------------------------------------------- diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobSchedulerKahaDBMetaData.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobSchedulerKahaDBMetaData.java deleted file mode 100644 index c92dc7b..0000000 --- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobSchedulerKahaDBMetaData.java +++ /dev/null @@ -1,246 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.activemq.store.kahadb.scheduler; - -import java.io.DataInput; -import java.io.DataOutput; -import java.io.IOException; -import java.util.ArrayList; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.Map.Entry; -import java.util.UUID; - -import org.apache.activemq.store.kahadb.AbstractKahaDBMetaData; -import org.apache.activemq.store.kahadb.disk.index.BTreeIndex; -import org.apache.activemq.store.kahadb.disk.page.Transaction; -import org.apache.activemq.store.kahadb.disk.util.IntegerMarshaller; -import org.apache.activemq.store.kahadb.disk.util.LocationMarshaller; -import org.apache.activemq.store.kahadb.disk.util.StringMarshaller; -import org.apache.activemq.store.kahadb.disk.util.VariableMarshaller; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * The KahaDB MetaData used to house the Index data for the KahaDB implementation - * of a JobSchedulerStore. - */ -public class JobSchedulerKahaDBMetaData extends AbstractKahaDBMetaData<JobSchedulerKahaDBMetaData> { - - static final Logger LOG = LoggerFactory.getLogger(JobSchedulerKahaDBMetaData.class); - - private final JobSchedulerStoreImpl store; - - private UUID token = JobSchedulerStoreImpl.SCHEDULER_STORE_TOKEN; - private int version = JobSchedulerStoreImpl.CURRENT_VERSION; - - private BTreeIndex<Integer, List<Integer>> removeLocationTracker; - private BTreeIndex<Integer, Integer> journalRC; - private BTreeIndex<String, JobSchedulerImpl> storedSchedulers; - - /** - * Creates a new instance of this meta data object with the assigned - * parent JobSchedulerStore instance. - * - * @param store - * the store instance that owns this meta data. - */ - public JobSchedulerKahaDBMetaData(JobSchedulerStoreImpl store) { - this.store = store; - } - - /** - * @return the current value of the Scheduler store identification token. - */ - public UUID getToken() { - return this.token; - } - - /** - * @return the current value of the version tag for this meta data instance. - */ - public int getVersion() { - return this.version; - } - - /** - * Gets the index that contains the location tracking information for Jobs - * that have been removed from the index but whose add operation has yet - * to be removed from the Journal. - * - * The Journal log file where a remove command is written cannot be released - * until the log file with the original add command has also been released, - * otherwise on a log replay the scheduled job could reappear in the scheduler - * since its corresponding remove might no longer be present. - * - * @return the remove command location tracker index. - */ - public BTreeIndex<Integer, List<Integer>> getRemoveLocationTracker() { - return this.removeLocationTracker; - } - - /** - * Gets the index used to track the number of reference to a Journal log file. - * - * A log file in the Journal can only be considered for removal after all the - * references to it have been released. - * - * @return the journal log file reference counter index. - */ - public BTreeIndex<Integer, Integer> getJournalRC() { - return this.journalRC; - } - - /** - * Gets the index of JobScheduler instances that have been created and stored - * in the JobSchedulerStore instance. - * - * @return the index of stored JobScheduler instances. - */ - public BTreeIndex<String, JobSchedulerImpl> getJobSchedulers() { - return this.storedSchedulers; - } - - @Override - public void initialize(Transaction tx) throws IOException { - this.storedSchedulers = new BTreeIndex<String, JobSchedulerImpl>(store.getPageFile(), tx.allocate().getPageId()); - this.journalRC = new BTreeIndex<Integer, Integer>(store.getPageFile(), tx.allocate().getPageId()); - this.removeLocationTracker = new BTreeIndex<Integer, List<Integer>>(store.getPageFile(), tx.allocate().getPageId()); - } - - @Override - public void load(Transaction tx) throws IOException { - this.storedSchedulers.setKeyMarshaller(StringMarshaller.INSTANCE); - this.storedSchedulers.setValueMarshaller(new JobSchedulerMarshaller(this.store)); - this.storedSchedulers.load(tx); - this.journalRC.setKeyMarshaller(IntegerMarshaller.INSTANCE); - this.journalRC.setValueMarshaller(IntegerMarshaller.INSTANCE); - this.journalRC.load(tx); - this.removeLocationTracker.setKeyMarshaller(IntegerMarshaller.INSTANCE); - this.removeLocationTracker.setValueMarshaller(new IntegerListMarshaller()); - this.removeLocationTracker.load(tx); - } - - /** - * Loads all the stored JobScheduler instances into the provided map. - * - * @param tx - * the Transaction under which the load operation should be executed. - * @param schedulers - * a Map<String, JobSchedulerImpl> into which the loaded schedulers are stored. - * - * @throws IOException if an error occurs while performing the load operation. - */ - public void loadScheduler(Transaction tx, Map<String, JobSchedulerImpl> schedulers) throws IOException { - for (Iterator<Entry<String, JobSchedulerImpl>> i = this.storedSchedulers.iterator(tx); i.hasNext();) { - Entry<String, JobSchedulerImpl> entry = i.next(); - entry.getValue().load(tx); - schedulers.put(entry.getKey(), entry.getValue()); - } - } - - @Override - public void read(DataInput in) throws IOException { - try { - long msb = in.readLong(); - long lsb = in.readLong(); - this.token = new UUID(msb, lsb); - } catch (Exception e) { - throw new UnknownStoreVersionException(e); - } - - if (!token.equals(JobSchedulerStoreImpl.SCHEDULER_STORE_TOKEN)) { - throw new UnknownStoreVersionException(token.toString()); - } - this.version = in.readInt(); - if (in.readBoolean()) { - setLastUpdateLocation(LocationMarshaller.INSTANCE.readPayload(in)); - } else { - setLastUpdateLocation(null); - } - this.storedSchedulers = new BTreeIndex<String, JobSchedulerImpl>(store.getPageFile(), in.readLong()); - this.storedSchedulers.setKeyMarshaller(StringMarshaller.INSTANCE); - this.storedSchedulers.setValueMarshaller(new JobSchedulerMarshaller(this.store)); - this.journalRC = new BTreeIndex<Integer, Integer>(store.getPageFile(), in.readLong()); - this.journalRC.setKeyMarshaller(IntegerMarshaller.INSTANCE); - this.journalRC.setValueMarshaller(IntegerMarshaller.INSTANCE); - this.removeLocationTracker = new BTreeIndex<Integer, List<Integer>>(store.getPageFile(), in.readLong()); - this.removeLocationTracker.setKeyMarshaller(IntegerMarshaller.INSTANCE); - this.removeLocationTracker.setValueMarshaller(new IntegerListMarshaller()); - - LOG.info("Scheduler Store version {} loaded", this.version); - } - - @Override - public void write(DataOutput out) throws IOException { - out.writeLong(this.token.getMostSignificantBits()); - out.writeLong(this.token.getLeastSignificantBits()); - out.writeInt(this.version); - if (getLastUpdateLocation() != null) { - out.writeBoolean(true); - LocationMarshaller.INSTANCE.writePayload(getLastUpdateLocation(), out); - } else { - out.writeBoolean(false); - } - out.writeLong(this.storedSchedulers.getPageId()); - out.writeLong(this.journalRC.getPageId()); - out.writeLong(this.removeLocationTracker.getPageId()); - } - - private class JobSchedulerMarshaller extends VariableMarshaller<JobSchedulerImpl> { - private final JobSchedulerStoreImpl store; - - JobSchedulerMarshaller(JobSchedulerStoreImpl store) { - this.store = store; - } - - @Override - public JobSchedulerImpl readPayload(DataInput dataIn) throws IOException { - JobSchedulerImpl result = new JobSchedulerImpl(this.store); - result.read(dataIn); - return result; - } - - @Override - public void writePayload(JobSchedulerImpl js, DataOutput dataOut) throws IOException { - js.write(dataOut); - } - } - - private class IntegerListMarshaller extends VariableMarshaller<List<Integer>> { - - @Override - public List<Integer> readPayload(DataInput dataIn) throws IOException { - List<Integer> result = new ArrayList<Integer>(); - int size = dataIn.readInt(); - for (int i = 0; i < size; i++) { - result.add(IntegerMarshaller.INSTANCE.readPayload(dataIn)); - } - return result; - } - - @Override - public void writePayload(List<Integer> value, DataOutput dataOut) throws IOException { - dataOut.writeInt(value.size()); - for (Integer integer : value) { - IntegerMarshaller.INSTANCE.writePayload(integer, dataOut); - } - } - } -}
