Added: sling/trunk/contrib/commons/mom/examples/jobs/core/src/main/java/org/apache/sling/jobs/Types.java URL: http://svn.apache.org/viewvc/sling/trunk/contrib/commons/mom/examples/jobs/core/src/main/java/org/apache/sling/jobs/Types.java?rev=1754255&view=auto ============================================================================== --- sling/trunk/contrib/commons/mom/examples/jobs/core/src/main/java/org/apache/sling/jobs/Types.java (added) +++ sling/trunk/contrib/commons/mom/examples/jobs/core/src/main/java/org/apache/sling/jobs/Types.java Wed Jul 27 12:10:12 2016 @@ -0,0 +1,160 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.sling.jobs; + +import java.util.Collections; +import java.util.HashSet; +import java.util.Set; + +/** + * Created by ieb on 13/04/2016. + * This has to be part of the API to prevent all sorts of other issues. + */ +public final class Types { + + + + private Types() { + + } + + public interface JobQueue { + org.apache.sling.mom.Types.QueueName asQueueName(); + + org.apache.sling.mom.Types.TopicName asTopicName(); + + } + + public interface JobType { + } + + public static JobQueue jobQueue(String jobQueue) { + return new JobQueueImpl(jobQueue); + } + + public static JobType jobType(String jobType) { + return new JobTypeImpl(jobType); + } + + public static JobQueue ANY_JOB_QUEUE = new AnyJobQueue(); + + + public static Set<JobType> jobType(String[] types) { + Set<JobType> hs = new HashSet<JobType>(); + for ( String s : types) { + hs.add(jobType(s)); + } + return Collections.unmodifiableSet(hs); + } + + + + + /** + * Wraps a JobType. + */ + private static class JobTypeImpl extends StringWrapper implements JobType { + + private JobTypeImpl(String jobType) { + super(jobType); + } + } + + + + /** + * Wraps a JobQueue. + */ + private static class JobQueueImpl extends StringWrapper implements JobQueue { + + private JobQueueImpl(String jobQueue) { + super(jobQueue); + } + + + @Override + public boolean equals(Object obj) { + return obj == ANY_JOB_QUEUE || super.equals(obj); + } + + + @Override + public org.apache.sling.mom.Types.QueueName asQueueName() { + return org.apache.sling.mom.Types.queueName(toString()); + } + + @Override + public org.apache.sling.mom.Types.TopicName asTopicName() { + return org.apache.sling.mom.Types.topicName(toString()); + } + } + + /** + * Special JobQueue to match any. + */ + private static class AnyJobQueue extends JobQueueImpl { + + + private AnyJobQueue() { + super("*"); + } + + + @Override + public boolean equals(Object obj) { + return true; + } + + @Override + public int compareTo(String o) { + return 0; + } + } + + private static class StringWrapper implements Comparable<String> { + + + private String value; + + private StringWrapper(String value) { + this.value = value; + } + + @Override + public int hashCode() { + return value.hashCode(); + } + + @Override + public boolean equals(Object obj) { + return value.equals(obj.toString()); + } + + @Override + public int compareTo(String o) { + return value.compareTo(o); + } + + public String toString() { + return value; + } + } + +}
Propchange: sling/trunk/contrib/commons/mom/examples/jobs/core/src/main/java/org/apache/sling/jobs/Types.java ------------------------------------------------------------------------------ svn:eol-style = native Added: sling/trunk/contrib/commons/mom/examples/jobs/core/src/main/java/org/apache/sling/jobs/impl/JobBuilderImpl.java URL: http://svn.apache.org/viewvc/sling/trunk/contrib/commons/mom/examples/jobs/core/src/main/java/org/apache/sling/jobs/impl/JobBuilderImpl.java?rev=1754255&view=auto ============================================================================== --- sling/trunk/contrib/commons/mom/examples/jobs/core/src/main/java/org/apache/sling/jobs/impl/JobBuilderImpl.java (added) +++ sling/trunk/contrib/commons/mom/examples/jobs/core/src/main/java/org/apache/sling/jobs/impl/JobBuilderImpl.java Wed Jul 27 12:10:12 2016 @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.sling.jobs.impl; + +import org.apache.sling.jobs.Job; +import org.apache.sling.jobs.JobBuilder; +import org.apache.sling.jobs.Types; +import org.apache.sling.jobs.impl.spi.JobStarter; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nonnull; +import java.util.HashMap; +import java.util.Map; + +/** + * Created by ieb on 29/03/2016. + * Provides an implementation of a JobBuilder. + */ +public class JobBuilderImpl implements JobBuilder { + private static final Logger LOGGER = LoggerFactory.getLogger(JobBuilderImpl.class); + private final String id; + private final Map<String, Object> properties; + private final JobStarter jobStarter; + private final Types.JobQueue topic; + private final Types.JobType jobType; + + + public JobBuilderImpl(JobStarter jobStarter, Types.JobQueue topic, Types.JobType jobType) { + this.jobStarter = jobStarter; + this.topic = topic; + this.jobType = jobType; + this.id = Utils.generateId(); + properties = new HashMap<String, Object>(); + } + + + @Nonnull + @Override + public JobBuilder addProperties(@Nonnull Map<String, Object> props) { + this.properties.putAll(props); + return this; + } + + @Nonnull + @Override + public Job add() { + return jobStarter.start(new JobImpl(topic, id, jobType, properties)); + } + + + +} Propchange: sling/trunk/contrib/commons/mom/examples/jobs/core/src/main/java/org/apache/sling/jobs/impl/JobBuilderImpl.java ------------------------------------------------------------------------------ svn:eol-style = native Added: sling/trunk/contrib/commons/mom/examples/jobs/core/src/main/java/org/apache/sling/jobs/impl/JobImpl.java URL: http://svn.apache.org/viewvc/sling/trunk/contrib/commons/mom/examples/jobs/core/src/main/java/org/apache/sling/jobs/impl/JobImpl.java?rev=1754255&view=auto ============================================================================== --- sling/trunk/contrib/commons/mom/examples/jobs/core/src/main/java/org/apache/sling/jobs/impl/JobImpl.java (added) +++ sling/trunk/contrib/commons/mom/examples/jobs/core/src/main/java/org/apache/sling/jobs/impl/JobImpl.java Wed Jul 27 12:10:12 2016 @@ -0,0 +1,245 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.sling.jobs.impl; + +import com.google.common.base.Preconditions; +import org.apache.sling.jobs.*; + +import javax.annotation.Nonnull; +import javax.annotation.Nullable; +import java.util.HashMap; +import java.util.Map; + +/** + * JobImpl is a data object to hold the current state of the job in the current JVM as loaded into memory. + * The JobImpl also listens for JobUpdates. + * Created by ieb on 23/03/2016. + */ +public class JobImpl implements Job, JobUpdateListener { + private final Types.JobQueue jobQueue; + private final String id; + private final Map<String, Object> properties = new HashMap<String, Object>(); + private int retryCount; + private int numberOfRetries; + private long startedAt; + private long createdAt; + private long lastUpdate = 0; + private JobState jobState; + private long finishedAt; + private String resultMessage; + private JobController jobController; + private Types.JobType jobType; + + public JobImpl(@Nonnull Types.JobQueue jobQueue, @Nonnull String id, @Nonnull Types.JobType jobType, @Nonnull Map<String, Object> properties) { + this.jobQueue = jobQueue; + this.jobType = jobType; + this.id = id; + this.resultMessage = ""; + this.createdAt = System.currentTimeMillis(); + this.jobState = JobState.CREATED; + this.properties.putAll(properties); + } + + public JobImpl(JobUpdate update) { + this.jobQueue = update.getQueue(); + this.id = update.getId(); + update(update); + updateProperties(update.getProperties()); + } + + @Nonnull + @Override + public Types.JobQueue getQueue() { + return jobQueue; + } + + @Nonnull + @Override + public String getId() { + return id; + } + + @Nonnull + @Override + public Types.JobType getJobType() { + return jobType; + } + + @Nonnull + @Override + public Map<String, Object> getProperties() { + return properties; + } + + @Override + public int getRetryCount() { + return retryCount; + } + + @Override + public int getNumberOfRetries() { + return numberOfRetries; + } + + @Override + public long getStarted() { + return startedAt; + } + + @Override + public long getCreated() { + return createdAt; + } + + @Nonnull + @Override + public JobState getJobState() { + return jobState; + } + + @Override + public void setState(@Nonnull JobState newState) { + jobState = newState; + } + + @Override + public long getFinished() { + return finishedAt; + } + + @Nullable + @Override + public String getResultMessage() { + return resultMessage; + } + + + @Nullable + @Override + public JobController getController() { + return jobController; + } + + @Override + public void setJobController(@Nonnull JobController jobController) { + this.jobController = jobController; + } + + @Override + public void removeJobController() { + jobController = null; + } + + /** + * Apply an job update to this job, checking that the update is valid for the job. + * @param jobUpdate + */ + @Override + public void update(@Nonnull JobUpdate jobUpdate) { + if ( id.equals(jobUpdate.getId()) && ( jobQueue == Types.ANY_JOB_QUEUE || jobQueue.equals(jobUpdate.getQueue()))) { + // Start Job commands always go onto a queue and dont expire. + if ( jobUpdate.getCommand() != JobUpdate.JobUpdateCommand.START_JOB && jobUpdate.expires() < System.currentTimeMillis()) { + throw new IllegalStateException( + "JobUpdate has expired, can't be applied. Expired at "+jobUpdate.expires()+ + ", time now "+System.currentTimeMillis()+ + " expired "+(System.currentTimeMillis()-jobUpdate.expires())+" ms ago."); + } + if (jobUpdate.updateTimestamp() < lastUpdate ) { + throw new IllegalStateException("JobUpdate received out of sequence, cant be applied. Last Update was at "+lastUpdate+" this update is at "+jobUpdate.updateTimestamp()); + } + lastUpdate = jobUpdate.updateTimestamp(); + switch(jobUpdate.getCommand()) { + case START_JOB: + updateState(jobUpdate); + updateProperties(jobUpdate.getProperties()); + break; + case UPDATE_JOB: + // note, when job first comes into existence it is updated, then started. + // the start message is a queued message, the update is a jobQueue or pub sub message. + updateState(jobUpdate); + updateProperties(jobUpdate.getProperties()); + break; + case RETRY_JOB: + updateState(jobUpdate); + // Allow more retries. + numberOfRetries = retryCount + numberOfRetries; + // TODO: trigger retry if required. + updateProperties(jobUpdate.getProperties()); + break; + case STOP_JOB: + if (jobController != null) { + jobController.stop(); + } + break; + case ABORT_JOB: + if (jobController != null) { + jobController.abort(); + } + break; + } + } else { + throw new IllegalArgumentException("Cant update job with jobUpdate that doesn't match id and jobQueue "); + } + } + + /** + * Update the properties taking into account any PropertyActions required. + * @param properties the update properties. + */ + private void updateProperties(@Nonnull Map<String, Object> properties) { + Preconditions.checkNotNull(properties, "Properties cant be null."); + for (Map.Entry<String, Object> e : properties.entrySet()) { + if (e.getValue() instanceof JobUpdate.JobPropertyAction ) { + switch(((JobUpdate.JobPropertyAction)e.getValue())) { + case REMOVE: + this.properties.remove(e.getKey()); + break; + } + } else { + this.properties.put(e.getKey(), e.getValue()); + } + } + } + + /** + * Update the jobstate data for the job. + * @param jobUpdate + */ + private void updateState(@Nonnull JobUpdate jobUpdate) { + retryCount = jobUpdate.getRetryCount(); + jobType = jobUpdate.getJobType(); + numberOfRetries = jobUpdate.getNumberOfRetries(); + startedAt = jobUpdate.getStarted(); + createdAt = jobUpdate.getCreated(); + finishedAt = jobUpdate.getFinished(); + resultMessage = jobUpdate.getResultMessage(); + jobState = jobUpdate.getState(); + } + + /** + * Get a JobUpdateBuilder for this Job. + * @return the job update builder. + */ + @Nonnull + @Override + public JobUpdateBuilder newJobUpdateBuilder() { + return new JobUpdateBuilderImpl(this); + } + +} Propchange: sling/trunk/contrib/commons/mom/examples/jobs/core/src/main/java/org/apache/sling/jobs/impl/JobImpl.java ------------------------------------------------------------------------------ svn:eol-style = native Added: sling/trunk/contrib/commons/mom/examples/jobs/core/src/main/java/org/apache/sling/jobs/impl/JobManagerImpl.java URL: http://svn.apache.org/viewvc/sling/trunk/contrib/commons/mom/examples/jobs/core/src/main/java/org/apache/sling/jobs/impl/JobManagerImpl.java?rev=1754255&view=auto ============================================================================== --- sling/trunk/contrib/commons/mom/examples/jobs/core/src/main/java/org/apache/sling/jobs/impl/JobManagerImpl.java (added) +++ sling/trunk/contrib/commons/mom/examples/jobs/core/src/main/java/org/apache/sling/jobs/impl/JobManagerImpl.java Wed Jul 27 12:10:12 2016 @@ -0,0 +1,137 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.sling.jobs.impl; + +import org.apache.sling.jobs.*; +import org.apache.sling.jobs.impl.spi.JobStarter; +import org.apache.sling.jobs.impl.spi.JobStorage; + +import javax.annotation.Nonnull; +import javax.annotation.Nullable; +import java.util.Collection; +import java.util.Map; + +/** + * Created by ieb on 29/03/2016. + * Implements a JobManager, storing Jobs in a JobStorage implementation. + * Implements a JobStarter so it can start Jobs and queue them by sending a message. + * Implements a JobUpdateListener so it can store inbound update messages. + * Requires a JobStorage implementation to store jobs and a JobListener to process update messages. + * Requires a JobUpdateLister which it uses to send JobUpdates to. + * Does not run jobs, that is performed by implementations of JobConsumer listening to queues. + */ +public class JobManagerImpl implements JobManager, JobStarter, JobUpdateListener { + + + /** + * Storage for jobs. + */ + private final JobStorage jobStorage; + /** + * A listener for job updates. + */ + private final JobUpdateListener messageSender; + + /** + * + * @param jobStorage + * @param messageSender + */ + public JobManagerImpl(JobStorage jobStorage, JobUpdateListener messageSender) { + this.jobStorage = jobStorage; + this.messageSender = messageSender; + } + + @Nonnull + @Override + public JobBuilder newJobBuilder(@Nonnull Types.JobQueue queue, @Nonnull Types.JobType jobType) { + return new JobBuilderImpl(this, queue, jobType); + } + + @Nullable + @Override + public Job getJobById(@Nonnull String jobId) { + return jobStorage.get(jobId); + } + + + @Nullable + @Override + public Job getJob(@Nonnull Types.JobQueue queue, @Nonnull Map<String, Object> template) { + throw new UnsupportedOperationException("TODO, implementation required"); + } + + @Nonnull + @Override + public Collection<Job> findJobs(@Nonnull QueryType type, @Nonnull Types.JobQueue queue, long limit, @Nullable Map<String, Object>... templates) { + throw new UnsupportedOperationException("TODO, implementation required"); + } + + @Override + public void stopJobById(@Nonnull String jobId) { + Job job = getJobById(jobId); + if ( job != null) { + messageSender.update(job.newJobUpdateBuilder().command(JobUpdate.JobUpdateCommand.STOP_JOB).build()); + } + } + + @Override + public boolean abortJob(@Nonnull String jobId) { + Job job = getJobById(jobId); + if ( job != null) { + messageSender.update(job.newJobUpdateBuilder().command(JobUpdate.JobUpdateCommand.ABORT_JOB).build()); + return true; + } else { + messageSender.update(new JobUpdateBuilderImpl(jobId).command(JobUpdate.JobUpdateCommand.ABORT_JOB).build()); + return false; + } + } + + @Nullable + @Override + public Job retryJobById(@Nonnull String jobId) { + Job job = getJobById(jobId); + if (job != null) { + messageSender.update(job.newJobUpdateBuilder().command(JobUpdate.JobUpdateCommand.RETRY_JOB).build()); + } + return job; + } + + + @Override + public Job start(Job j) { + j.setState(Job.JobState.QUEUED); + messageSender.update(j.newJobUpdateBuilder().command(JobUpdate.JobUpdateCommand.START_JOB).putAll(j.getProperties()).build()); + return jobStorage.put(j); + } + + + @Override + public void update(@Nonnull JobUpdate update) { + Job j = jobStorage.get(update.getId()); + if ( j instanceof JobUpdateListener ) { + ((JobUpdateListener) j).update(update); + } else { + jobStorage.put(new JobImpl(update)); + } + } + + public void dispose() { + } +} Propchange: sling/trunk/contrib/commons/mom/examples/jobs/core/src/main/java/org/apache/sling/jobs/impl/JobManagerImpl.java ------------------------------------------------------------------------------ svn:eol-style = native Added: sling/trunk/contrib/commons/mom/examples/jobs/core/src/main/java/org/apache/sling/jobs/impl/JobQueueConsumerFactory.java URL: http://svn.apache.org/viewvc/sling/trunk/contrib/commons/mom/examples/jobs/core/src/main/java/org/apache/sling/jobs/impl/JobQueueConsumerFactory.java?rev=1754255&view=auto ============================================================================== --- sling/trunk/contrib/commons/mom/examples/jobs/core/src/main/java/org/apache/sling/jobs/impl/JobQueueConsumerFactory.java (added) +++ sling/trunk/contrib/commons/mom/examples/jobs/core/src/main/java/org/apache/sling/jobs/impl/JobQueueConsumerFactory.java Wed Jul 27 12:10:12 2016 @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.sling.jobs.impl; + +import com.google.common.collect.ImmutableSet; +import org.apache.felix.scr.annotations.*; +import org.apache.sling.jobs.*; +import org.apache.sling.mom.*; +import org.apache.sling.mom.Types; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nonnull; +import java.util.Map; +import java.util.Queue; +import java.util.Set; + +/** + * Created by ieb on 12/04/2016. + * This is a configuration factory that creates QueueReader instances on configuration. These connect to the JobManager + * service and are registered using the OSGi Whiteboard pattern with the QueueManager. The JobManager service must implement JobConsumer. + * + */ +@Component(configurationFactory = true, + policy = ConfigurationPolicy.REQUIRE, + metatype = true, + immediate = true) +@Properties({ + @Property(name= QueueReader.QUEUE_NAME_PROP) +}) +@Service(value = QueueReader.class) +public class JobQueueConsumerFactory implements QueueReader, MessageFilter { + + private static final Logger LOGGER = LoggerFactory.getLogger(JobQueueConsumerFactory.class); + private static final Set<JobUpdate.JobUpdateCommand> ALLOWED_COMMANDS = ImmutableSet.of(JobUpdate.JobUpdateCommand.UPDATE_JOB) ; + + @Reference + private JobManager jobManager; + + @Reference + private TopicManager topicManager; + @Reference + private QueueManager queueManager; + + @Activate + public void activate(Map<String, Object> properties) { + if ( !(jobManager instanceof JobConsumer) ) { + LOGGER.error("JobManager must implement JobConsumer interface. {} does not. ", jobManager.getClass()); + throw new IllegalStateException("JobManager does not implement JobConsumer"); + } + } + + @Deactivate + public void deactivate(@SuppressWarnings("UnusedParameters") Map<String, Object> properties) { + } + + + + + @Override + public void onMessage(Types.QueueName queueName, Map<String, Object> message) throws RequeueMessageException { + + final Job job = new JobImpl(new JobUpdateImpl(message)); + + + ((JobConsumer)jobManager).execute(job, new JobUpdateListener() { + @Override + public void update(@Nonnull JobUpdate update) { + if (update.getId() != job.getId() || !ALLOWED_COMMANDS.contains(update.getCommand())) { + + throw new IllegalArgumentException("Not allowed to update other jobs or issue reserved commands when updating the state of a running job."); + } + topicManager.publish(update.getQueue().asTopicName(), update.getCommand().asCommandName(), Utils.toMapValue(update)); + } + }, new JobCallback() { + @Override + public void callback(Job finalJobState) { + if (finalJobState.getId() != job.getId()) { + throw new IllegalArgumentException("Final Job state ID must match initial JobState ID"); + } + JobUpdate finalJobUpdate = finalJobState.newJobUpdateBuilder() + .command(JobUpdate.JobUpdateCommand.UPDATE_JOB) + .putAll(finalJobState.getProperties()) + .build(); + topicManager.publish(finalJobUpdate.getQueue().asTopicName(), finalJobUpdate.getCommand().asCommandName(), Utils.toMapValue(finalJobUpdate)); + } + }); + + + } + + @Override + public boolean accept(Types.Name name, Map<String, Object> mapMessage) { + return !(jobManager instanceof MessageFilter) || ((MessageFilter) jobManager).accept(name, mapMessage); + } + + +} Propchange: sling/trunk/contrib/commons/mom/examples/jobs/core/src/main/java/org/apache/sling/jobs/impl/JobQueueConsumerFactory.java ------------------------------------------------------------------------------ svn:eol-style = native Added: sling/trunk/contrib/commons/mom/examples/jobs/core/src/main/java/org/apache/sling/jobs/impl/JobSubsystem.java URL: http://svn.apache.org/viewvc/sling/trunk/contrib/commons/mom/examples/jobs/core/src/main/java/org/apache/sling/jobs/impl/JobSubsystem.java?rev=1754255&view=auto ============================================================================== --- sling/trunk/contrib/commons/mom/examples/jobs/core/src/main/java/org/apache/sling/jobs/impl/JobSubsystem.java (added) +++ sling/trunk/contrib/commons/mom/examples/jobs/core/src/main/java/org/apache/sling/jobs/impl/JobSubsystem.java Wed Jul 27 12:10:12 2016 @@ -0,0 +1,211 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.sling.jobs.impl; + +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; +import org.apache.felix.scr.annotations.Activate; +import org.apache.felix.scr.annotations.Component; +import org.apache.felix.scr.annotations.Deactivate; +import org.apache.felix.scr.annotations.Reference; +import org.apache.felix.scr.annotations.ReferenceCardinality; +import org.apache.felix.scr.annotations.ReferencePolicy; +import org.apache.felix.scr.annotations.Service; +import org.apache.sling.jobs.*; +import org.apache.sling.jobs.impl.spi.JobStorage; +import org.apache.sling.jobs.impl.storage.InMemoryJobStorage; +import org.apache.sling.mom.QueueManager; +import org.apache.sling.mom.TopicManager; +import org.osgi.framework.ServiceReference; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nonnull; +import javax.annotation.Nullable; +import java.io.Closeable; +import java.util.Collection; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; + +/** + * Created by ieb on 11/04/2016. + * NB, this does *not* register as a JobConsumer service. it implements a JobConsumer so that it can consume Jobs from JobQueueConsumers. + */ +@Component(immediate = true) +@Service(value = JobManager.class) +public class JobSubsystem implements JobManager, JobConsumer { + + + private static final Logger LOGGER = LoggerFactory.getLogger(JobSubsystem.class); + private JobManagerImpl manager; + private JobStorage jobStorage; + private OutboundJobUpdateListener messageSender; + + /** + * Contains a map of JobConsumers wrapped by JobConsumerHolders keyed by ServiceReference. + */ + @Reference(referenceInterface = JobConsumer.class, + cardinality = ReferenceCardinality.OPTIONAL_MULTIPLE, + policy = ReferencePolicy.DYNAMIC, + bind="addConsumer", + unbind="removeConsumer") + private final Map<ServiceReference<JobConsumer>, JobConsumerHolder> registrations = + new ConcurrentHashMap<ServiceReference<JobConsumer>, JobConsumerHolder>(); + + @Reference + private TopicManager topicManager; + @Reference + private QueueManager queueManager; + + @Activate + public synchronized void activate(@SuppressWarnings("UnusedParameters") Map<String, Object> properties) { + jobStorage = new InMemoryJobStorage(); + messageSender = new OutboundJobUpdateListener(topicManager, queueManager); + manager = new JobManagerImpl(jobStorage, messageSender); + } + + @Deactivate + public synchronized void deactivate(@SuppressWarnings("UnusedParameters") Map<String, Object> properties) { + for (Map.Entry<ServiceReference<JobConsumer>, JobConsumerHolder> e : registrations.entrySet()) { + e.getValue().close(); + } + registrations.clear(); + manager.dispose(); + messageSender.dispose(); + jobStorage.dispose(); + } + + // --- Job Manager. + @Nonnull + @Override + public JobBuilder newJobBuilder(@Nonnull Types.JobQueue queue, @Nonnull Types.JobType jobType) { + return manager.newJobBuilder(queue, jobType); + } + + @Nullable + @Override + public Job getJobById(@Nonnull String jobId) { + return manager.getJobById(jobId); + } + + @Nullable + @Override + public Job getJob(@Nonnull Types.JobQueue queue, @Nonnull Map<String, Object> template) { + return manager.getJob(queue, template); + } + + @Nonnull + @Override + public Collection<Job> findJobs(@Nonnull QueryType type, @Nonnull Types.JobQueue queue, long limit, @Nullable Map<String, Object>... templates) { + return manager.findJobs(type, queue, limit, templates); + } + + @Override + public void stopJobById(@Nonnull String jobId) { + manager.stopJobById(jobId); + } + + @Override + public boolean abortJob(@Nonnull String jobId) { + return manager.abortJob(jobId); + } + + @Nullable + @Override + public Job retryJobById(@Nonnull String jobId) { + return manager.retryJobById(jobId); + } + + + // ---- JobConsumer Registration + // Register Consumers using + public synchronized void addConsumer(ServiceReference<JobConsumer> serviceRef) { + if (registrations.containsKey(serviceRef)) { + LOGGER.error("Registration for service reference is already present {}",serviceRef); + return; + } + JobConsumerHolder jobConsumerHolder = new JobConsumerHolder(serviceRef.getBundle().getBundleContext().getService(serviceRef), getServiceProperties(serviceRef)); + registrations.put(serviceRef, jobConsumerHolder); + } + + private Map<Object, Object> getServiceProperties(ServiceReference<JobConsumer> serviceRef) { + ImmutableMap.Builder<Object, Object> builder = ImmutableMap.builder(); + for ( String k : serviceRef.getPropertyKeys()) { + builder.put(k, serviceRef.getProperty(k)); + } + return builder.build(); + } + + public synchronized void removeConsumer(ServiceReference<JobConsumer> serviceRef) { + JobConsumerHolder jobConsumerHolder = registrations.remove(serviceRef); + if ( jobConsumerHolder != null) { + jobConsumerHolder.close(); + } + } + + + // ------- job execution, invoked by JobQueueConsumerFactory. + @Nonnull + @Override + public void execute(@Nonnull Job initialState, @Nonnull JobUpdateListener listener, @Nonnull JobCallback callback) { + // iterate over the entries. This should cause the entries to come out in natural key order + // which should respect any priority applied to the Services via ServiceReference. (TODO: check that is the case) + // TODO: add a Job controller to the job before executing. + for (Map.Entry<ServiceReference<JobConsumer>,JobConsumerHolder> e : registrations.entrySet()) { + JobConsumerHolder jobConsumerHolder = e.getValue(); + if (jobConsumerHolder.accept(initialState.getJobType())) { + jobConsumerHolder.consumer.execute(initialState, listener, callback); + return; + } + } + throw new IllegalArgumentException("No JobConsumer able to process a job of type "+initialState.getJobType()+" can be found in this instance."); + } + + + /** + * Holds job consumers and configures a JobTypeValve delegating to the JobConsumer implementation if it implements that interface. + */ + private class JobConsumerHolder implements JobTypeValve, Closeable { + private final JobConsumer consumer; + private final Set<Types.JobType> jobTypes; + + public JobConsumerHolder(JobConsumer consumer, Map<Object, Object> properties) { + this.consumer = consumer; + if ( consumer instanceof JobTypeValve) { + jobTypes = ImmutableSet.of(); + } else { + jobTypes = Types.jobType((String[]) properties.get(JobConsumer.JOB_TYPES)); + } + } + + @Override + public boolean accept(@Nonnull Types.JobType jobType) { + if ( consumer instanceof JobTypeValve) { + return ((JobTypeValve) consumer).accept(jobType); + } + return jobTypes.contains(jobType); + } + + public void close() { + // nothing to do at the moment. + } + } +} Propchange: sling/trunk/contrib/commons/mom/examples/jobs/core/src/main/java/org/apache/sling/jobs/impl/JobSubsystem.java ------------------------------------------------------------------------------ svn:eol-style = native Added: sling/trunk/contrib/commons/mom/examples/jobs/core/src/main/java/org/apache/sling/jobs/impl/JobUpdateBuilderImpl.java URL: http://svn.apache.org/viewvc/sling/trunk/contrib/commons/mom/examples/jobs/core/src/main/java/org/apache/sling/jobs/impl/JobUpdateBuilderImpl.java?rev=1754255&view=auto ============================================================================== --- sling/trunk/contrib/commons/mom/examples/jobs/core/src/main/java/org/apache/sling/jobs/impl/JobUpdateBuilderImpl.java (added) +++ sling/trunk/contrib/commons/mom/examples/jobs/core/src/main/java/org/apache/sling/jobs/impl/JobUpdateBuilderImpl.java Wed Jul 27 12:10:12 2016 @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.sling.jobs.impl; + +import com.google.common.collect.ImmutableMap; +import org.apache.sling.jobs.Job; +import org.apache.sling.jobs.JobUpdate; +import org.apache.sling.jobs.JobUpdateBuilder; + +import javax.annotation.Nonnull; +import javax.annotation.Nullable; +import java.util.Map; + +/** + * Created by ieb on 23/03/2016. + */ +public class JobUpdateBuilderImpl implements JobUpdateBuilder { + + private final String jobId; + private final Job job; + private JobUpdate.JobUpdateCommand command; + private final ImmutableMap.Builder<String, Object> updateProperties = ImmutableMap.builder(); + + /** + * Create a JobUpdateBuilder from a job. + * @param job + */ + public JobUpdateBuilderImpl(@Nonnull Job job) { + this.job = job; + this.jobId = null; + } + + public JobUpdateBuilderImpl(@Nonnull String jobId) { + this.jobId = jobId; + this.job = null; + + } + + /** + * Set the JobUpdateCommand + * @param command the command. + * @return this JobBuilder instance. + */ + @Nonnull + @Override + public JobUpdateBuilder command(@Nonnull JobUpdate.JobUpdateCommand command) { + this.command = command; + return this; + } + + /** + * Set a property to update. + * @param name the name of the property + * @param value the value of the property which may be null. To remove the property set the value to JobUpdate.JobPropertyAction.REMOVE. + * @return this JobBuilder instance. + */ + @Nonnull + @Override + public JobUpdateBuilder put(@Nonnull String name, @Nullable Object value) { + if ( value == null) { + this.updateProperties.put(name, JobUpdate.JobPropertyAction.REMOVE); + } else { + this.updateProperties.put(name, value); + } + return this; + } + + @Nonnull + @Override + public JobUpdateBuilder putAll(@Nonnull Map<String, Object> properties) { + this.updateProperties.putAll(properties); + return this; + } + + + /** + * Build the JobUpdate. + * @return the JobUpdate. + */ + @Nonnull + @Override + public JobUpdate build() { + if ( job != null) { + return new JobUpdateImpl(job, command, updateProperties.build()); + } else if ( command == JobUpdate.JobUpdateCommand.ABORT_JOB || command == JobUpdate.JobUpdateCommand.STOP_JOB) { + return new JobUpdateImpl(jobId, command); + } else { + throw new IllegalStateException("Only possible to abort or stop a job by ID alone "); + } + } + +} Propchange: sling/trunk/contrib/commons/mom/examples/jobs/core/src/main/java/org/apache/sling/jobs/impl/JobUpdateBuilderImpl.java ------------------------------------------------------------------------------ svn:eol-style = native Added: sling/trunk/contrib/commons/mom/examples/jobs/core/src/main/java/org/apache/sling/jobs/impl/JobUpdateImpl.java URL: http://svn.apache.org/viewvc/sling/trunk/contrib/commons/mom/examples/jobs/core/src/main/java/org/apache/sling/jobs/impl/JobUpdateImpl.java?rev=1754255&view=auto ============================================================================== --- sling/trunk/contrib/commons/mom/examples/jobs/core/src/main/java/org/apache/sling/jobs/impl/JobUpdateImpl.java (added) +++ sling/trunk/contrib/commons/mom/examples/jobs/core/src/main/java/org/apache/sling/jobs/impl/JobUpdateImpl.java Wed Jul 27 12:10:12 2016 @@ -0,0 +1,235 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.sling.jobs.impl; + +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableMap; +import org.apache.sling.jobs.Job; +import org.apache.sling.jobs.JobUpdate; +import org.apache.sling.jobs.Types; +import org.apache.sling.jobs.impl.spi.MapValueAdapter; + +import javax.annotation.Nonnull; +import javax.annotation.Nullable; +import java.util.HashMap; +import java.util.Map; + +/** + * Represents messages sent to the Job via a message queue. + * Abort, stop and update messages should be sent via a priority queue. + * Start messages should be sent by a processing queue. + * Created by ieb on 23/03/2016. + */ +public class JobUpdateImpl implements MapValueAdapter, JobUpdate { + private static final long TTL = 1000 * 60; + private long updateTimestamp; + private long expires; + private JobUpdateCommand command; + private Types.JobQueue jobQueue; + private String id; + private Job.JobState jobState; + private Map<String, Object> properties; + private int retryCount; + private int numberOfRetries; + private long startedAt; + private long createdAt; + private long finishedAt; + private String resultMessage; + private Types.JobType jobType; + + + /** + * Create an update message using a job, command and update properties. Only the update properties will in the update. + * The job will be used to specify the job jobQueue, job ID and job state of the update message. + * @param job the job + * @param command the command + * @param properties properties in the update message. + */ + public JobUpdateImpl(@Nonnull Job job, @Nonnull JobUpdateCommand command, @Nonnull Map<String, Object> properties) { + Preconditions.checkNotNull(job, "Job argument cant be null"); + Preconditions.checkNotNull(command, "JobCommand argument cant be null"); + Preconditions.checkNotNull(properties, "Map of properties cant be null"); + + jobQueue = job.getQueue(); + jobType = job.getJobType(); + id = job.getId(); + startedAt = job.getStarted(); + createdAt = job.getCreated(); + finishedAt = job.getFinished(); + retryCount = job.getRetryCount(); + jobState = job.getJobState(); + resultMessage = job.getResultMessage(); + numberOfRetries = job.getNumberOfRetries(); + updateTimestamp = System.currentTimeMillis(); + expires = updateTimestamp + TTL; + this.command = command; + this.properties = properties; + } + + /** + * Create a JobUpdateImpl based on a inbound message in the form of a Map. + * @param message a inbound message in map form. + */ + public JobUpdateImpl(@Nonnull Map<String, Object> message) { + Preconditions.checkNotNull(message, "Message cant be null"); + fromMapValue(message); + } + + public JobUpdateImpl(@Nonnull String jobId, @Nonnull JobUpdateCommand command) { + Preconditions.checkNotNull(jobId, "JobId argument cant be null"); + Preconditions.checkNotNull(command, "JobUpdateCommand argument cant be null"); + jobQueue = Types.ANY_JOB_QUEUE; + id = jobId; + updateTimestamp = System.currentTimeMillis(); + expires = updateTimestamp + TTL; + jobState = Job.JobState.ANY_STATE; + this.command = command; + this.properties = ImmutableMap.of(); + + } + + + @Override + public long updateTimestamp() { + return updateTimestamp; + } + + @Override + public long expires() { + return expires; + } + + @Nonnull + @Override + public Types.JobType getJobType() { + return jobType; + } + + @Nonnull + @Override + public JobUpdateCommand getCommand() { + return command; + } + + @Nonnull + @Override + public Types.JobQueue getQueue() { + return jobQueue; + } + + @Nonnull + @Override + public String getId() { + return id; + } + + @Nonnull + @Override + public Job.JobState getState() { + return jobState; + } + + @Nonnull + @Override + public Map<String, Object> getProperties() { + return properties; + } + + @Override + public int getRetryCount() { + return retryCount; + } + + @Override + public int getNumberOfRetries() { + return numberOfRetries; + } + + @Override + public long getStarted() { + return startedAt; + } + + @Override + public long getCreated() { + return createdAt; + } + + @Override + public long getFinished() { + return finishedAt; + } + + @Override + public String getResultMessage() { + return resultMessage; + } + + + @Override + public void fromMapValue(@Nullable Object mapValue) { + if (mapValue != null && mapValue instanceof Map) { + @SuppressWarnings("unchecked") Map<String, Object> m = (Map<String, Object>) mapValue; + jobQueue = Types.jobQueue((String) Utils.getRequired(m, "tp")); + jobType = Types.jobType((String)Utils.getRequired(m, "jt")); + id = Utils.getRequired(m, "id"); + command = JobUpdateCommand.valueOf((String) Utils.getRequired(m, "cm")); + updateTimestamp = Utils.getRequired(m, "ts"); + expires = Utils.getRequired(m, "ex"); + if (command == JobUpdateCommand.UPDATE_JOB || command == JobUpdateCommand.START_JOB || command == JobUpdateCommand.RETRY_JOB ) { + startedAt = Utils.getOptional(m, "startedAt", 0L); + createdAt = Utils.getOptional(m, "createdAt", 0L); + finishedAt = Utils.getOptional(m, "finishedAt", 0L); + retryCount = Utils.getOptional(m, "retryCount", 0); + numberOfRetries = Utils.getOptional(m, "nRetries", 10); + jobState = Job.JobState.valueOf(Utils.getOptional(m, "jobState", Job.JobState.QUEUED.toString())); + resultMessage = Utils.getOptional(m, "resultMessage", null); + properties = Utils.getOptional(m, "properties", new HashMap<String, Object>()); + } else { + properties = new HashMap<String, Object>(); + } + } else { + throw new IllegalArgumentException("Cant populate JobImpl from "+mapValue); + } + } + + @Override + @Nonnull + public Object toMapValue() { + ImmutableMap.Builder<String, Object> builder = ImmutableMap.builder(); + builder.put("tp", jobQueue.toString()); + builder.put("jt",jobType.toString()); + builder.put("id",id); + builder.put("cm", command.toString()); + builder.put("ts", this.updateTimestamp); + builder.put("ex", expires); + if ( command == JobUpdateCommand.UPDATE_JOB || command == JobUpdateCommand.START_JOB || command == JobUpdateCommand.RETRY_JOB ) { + builder.put("retryCount", retryCount); + builder.put("nRetries", numberOfRetries); + builder.put("startedAt", startedAt); + builder.put("createdAt", createdAt); + builder.put("finishedAt", finishedAt); + builder.put("jobState", jobState.toString()); + builder.put("resultMessage", resultMessage); + builder.put("properties", ImmutableMap.builder().putAll(properties).build()); + + } + return builder.build(); + } +} Propchange: sling/trunk/contrib/commons/mom/examples/jobs/core/src/main/java/org/apache/sling/jobs/impl/JobUpdateImpl.java ------------------------------------------------------------------------------ svn:eol-style = native Added: sling/trunk/contrib/commons/mom/examples/jobs/core/src/main/java/org/apache/sling/jobs/impl/ManagerSubscriber.java URL: http://svn.apache.org/viewvc/sling/trunk/contrib/commons/mom/examples/jobs/core/src/main/java/org/apache/sling/jobs/impl/ManagerSubscriber.java?rev=1754255&view=auto ============================================================================== --- sling/trunk/contrib/commons/mom/examples/jobs/core/src/main/java/org/apache/sling/jobs/impl/ManagerSubscriber.java (added) +++ sling/trunk/contrib/commons/mom/examples/jobs/core/src/main/java/org/apache/sling/jobs/impl/ManagerSubscriber.java Wed Jul 27 12:10:12 2016 @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.sling.jobs.impl; + +import org.apache.felix.scr.annotations.*; +import org.apache.sling.jobs.JobManager; +import org.apache.sling.jobs.JobUpdateListener; +import org.apache.sling.mom.Subscriber; +import org.apache.sling.mom.Types; + +import java.util.Map; + +/** + * Created by ieb on 30/03/2016. + * Listens to a topic to retrieve control messages. + */ +@Component(immediate = true, metatype = true) +@Service(value = Subscriber.class) +@Properties({ + @Property(name= Subscriber.TOPIC_NAMES_PROP, cardinality = Integer.MAX_VALUE, value = {"sling/jobupdates"} ) +}) +public class ManagerSubscriber implements Subscriber { + + + @Reference + private JobManager jobManager; + + + @Override + public void onMessage(Types.TopicName topic, Map<String, Object> message) { + if (jobManager instanceof JobUpdateListener) { + ((JobUpdateListener) jobManager).update(new JobUpdateImpl(message)); + } + } + +} Propchange: sling/trunk/contrib/commons/mom/examples/jobs/core/src/main/java/org/apache/sling/jobs/impl/ManagerSubscriber.java ------------------------------------------------------------------------------ svn:eol-style = native Added: sling/trunk/contrib/commons/mom/examples/jobs/core/src/main/java/org/apache/sling/jobs/impl/OutboundJobUpdateListener.java URL: http://svn.apache.org/viewvc/sling/trunk/contrib/commons/mom/examples/jobs/core/src/main/java/org/apache/sling/jobs/impl/OutboundJobUpdateListener.java?rev=1754255&view=auto ============================================================================== --- sling/trunk/contrib/commons/mom/examples/jobs/core/src/main/java/org/apache/sling/jobs/impl/OutboundJobUpdateListener.java (added) +++ sling/trunk/contrib/commons/mom/examples/jobs/core/src/main/java/org/apache/sling/jobs/impl/OutboundJobUpdateListener.java Wed Jul 27 12:10:12 2016 @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.sling.jobs.impl; + +import org.apache.sling.jobs.JobUpdate; +import org.apache.sling.jobs.JobUpdateListener; +import org.apache.sling.jobs.Types; +import org.apache.sling.mom.QueueManager; +import org.apache.sling.mom.TopicManager; + +import javax.annotation.Nonnull; + +/** + * Created by ieb on 30/03/2016. + * Sends messages out to JMS Queues or topics. Normally called by the local JobManager Implementation. + * Uses a TopicManager or QueueManager to perform the send operation. + */ +public class OutboundJobUpdateListener implements JobUpdateListener { + + + private boolean active; + private final TopicManager topicManager; + private final QueueManager queueManager; + + public OutboundJobUpdateListener(TopicManager topicManager, QueueManager queueManager ) { + this.topicManager = topicManager; + this.queueManager = queueManager; + active = true; + } + + public void dispose() { + active = false; + } + + + @Override + public void update(@Nonnull JobUpdate update) { + if ( active ) { + switch(update.getCommand()) { + case START_JOB: + queueManager.add(update.getQueue().asQueueName(), Utils.toMapValue(update)); + break; + default: + topicManager.publish(update.getQueue().asTopicName(), update.getCommand().asCommandName(), Utils.toMapValue(update)); + break; + } + } + } +} Propchange: sling/trunk/contrib/commons/mom/examples/jobs/core/src/main/java/org/apache/sling/jobs/impl/OutboundJobUpdateListener.java ------------------------------------------------------------------------------ svn:eol-style = native Added: sling/trunk/contrib/commons/mom/examples/jobs/core/src/main/java/org/apache/sling/jobs/impl/Utils.java URL: http://svn.apache.org/viewvc/sling/trunk/contrib/commons/mom/examples/jobs/core/src/main/java/org/apache/sling/jobs/impl/Utils.java?rev=1754255&view=auto ============================================================================== --- sling/trunk/contrib/commons/mom/examples/jobs/core/src/main/java/org/apache/sling/jobs/impl/Utils.java (added) +++ sling/trunk/contrib/commons/mom/examples/jobs/core/src/main/java/org/apache/sling/jobs/impl/Utils.java Wed Jul 27 12:10:12 2016 @@ -0,0 +1,151 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.sling.jobs.impl; + +import org.apache.sling.jobs.impl.spi.MapValueAdapter; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nonnull; +import javax.annotation.Nullable; +import java.io.File; +import java.net.NetworkInterface; +import java.net.SocketException; +import java.nio.charset.Charset; +import java.security.MessageDigest; +import java.security.NoSuchAlgorithmException; +import java.util.*; +import java.util.concurrent.atomic.AtomicLong; + +/** + * Created by ieb on 29/03/2016. + */ +public class Utils { + + private static final Logger LOGGER = LoggerFactory.getLogger(Utils.class); + private static final Charset UTF8 = Charset.forName("UTF-8") ; + private static final String PROCESS_NAME = generateUniqueNamespace(); + // Set the counter to the class load time. + private static final AtomicLong idCounter = new AtomicLong(System.currentTimeMillis()); + + /** + * Gets a string + * @return + */ + @Nonnull + private static String generateUniqueNamespace() { + String macAddress = null; + // get the MAC address of the primary interface, failing that use a fake. + try { + for ( Enumeration<NetworkInterface> netInterfaceE = NetworkInterface.getNetworkInterfaces(); netInterfaceE.hasMoreElements();) { + NetworkInterface netInterface = netInterfaceE.nextElement(); + byte[] hw = netInterface.getHardwareAddress(); + if ( !netInterface.isLoopback() && !netInterface.isVirtual() && hw != null) { + macAddress = tohex(hw); + LOGGER.info("Job IDs seeded with MAC Address from interface {} ", netInterface); + break; + } + } + if ( macAddress == null) { + LOGGER.info("No MAC address available, seeding JobID from startup time."); + macAddress = "fake-" + System.currentTimeMillis(); + } + } catch (SocketException e) { + LOGGER.warn("Unable to get MAC address, defaulting to fake ", e); + } + long processID; + try { + // most JVMs. + processID = Long.parseLong(java.lang.management.ManagementFactory.getRuntimeMXBean().getName().split("@")[0]); + } catch (Exception e) { + try { + // most Linux kernels. + processID = Long.parseLong(new File("/proc/self").getCanonicalFile().getName()); + } catch (Exception e1) { + LOGGER.warn("Unable to get ProcessID by address, defaulting to fake ", e); + processID = System.currentTimeMillis(); // this will be way beyond any process ID. + } + } + String baseId = macAddress + "/" + processID+ "/"; + LOGGER.info("Job IDS base is {} ", baseId); + return baseId; + } + + @Nonnull + private static String tohex(@Nonnull byte[] bytes) { + StringBuilder sb = new StringBuilder(); + for( byte b : bytes) { + sb.append(String.format("%02x",b)); + } + return sb.toString(); + } + + /** + * Generate an ID based on the unique name of the jvm process and a counter. + * @return + */ + @Nonnull + public static String generateId() { + try { + return Utils.tohex(MessageDigest.getInstance("SHA1").digest((Utils.PROCESS_NAME+idCounter.incrementAndGet()).getBytes(UTF8))); + } catch (NoSuchAlgorithmException nsae) { + throw new RuntimeException("SHA1 not supported", nsae); + } + } + + @Nonnull + public static Map<String, Object> toMapValue(@Nonnull Object msg) { + if (msg instanceof Map) { + //noinspection unchecked + return (Map<String, Object>) msg; + } else if (msg instanceof MapValueAdapter) { + return toMapValue(((MapValueAdapter) msg).toMapValue()); + } + throw new IllegalArgumentException("Unable to convert "+msg.getClass()+" to a Map."); + } + + + @Nonnull + public static <T> T getRequired(@Nonnull Map<String, Object> m, @Nonnull String name) { + if (m.containsKey(name)) { + //noinspection unchecked + if ( m.get(name) != null) { + return (T) m.get(name); + } + } + throw new IllegalArgumentException("Required key "+name+" is missing from "+m); + } + + @Nullable + public static <T> T getOptional(@Nonnull Map<String, Object> m, @Nonnull String name, @Nullable T defaultValue) { + if (m.containsKey(name)) { + //noinspection unchecked + Object o = m.get(name); + if ( defaultValue instanceof Integer && o instanceof Long) { + return (T)(Integer) ((Long) o).intValue(); + } else if ( defaultValue instanceof Float && o instanceof Double) { + return (T)(Float) ((Double) o).floatValue(); + } + return (T) o; + } + return defaultValue; + } + + +} Propchange: sling/trunk/contrib/commons/mom/examples/jobs/core/src/main/java/org/apache/sling/jobs/impl/Utils.java ------------------------------------------------------------------------------ svn:eol-style = native Added: sling/trunk/contrib/commons/mom/examples/jobs/core/src/main/java/org/apache/sling/jobs/impl/spi/JobStarter.java URL: http://svn.apache.org/viewvc/sling/trunk/contrib/commons/mom/examples/jobs/core/src/main/java/org/apache/sling/jobs/impl/spi/JobStarter.java?rev=1754255&view=auto ============================================================================== --- sling/trunk/contrib/commons/mom/examples/jobs/core/src/main/java/org/apache/sling/jobs/impl/spi/JobStarter.java (added) +++ sling/trunk/contrib/commons/mom/examples/jobs/core/src/main/java/org/apache/sling/jobs/impl/spi/JobStarter.java Wed Jul 27 12:10:12 2016 @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.sling.jobs.impl.spi; + +import org.apache.sling.jobs.Job; + +/** + * Created by ieb on 29/03/2016. + * Starts a job, used by the JobBuilderImpl to perform the add operation. + */ +public interface JobStarter { + Job start(Job j); +} Propchange: sling/trunk/contrib/commons/mom/examples/jobs/core/src/main/java/org/apache/sling/jobs/impl/spi/JobStarter.java ------------------------------------------------------------------------------ svn:eol-style = native Added: sling/trunk/contrib/commons/mom/examples/jobs/core/src/main/java/org/apache/sling/jobs/impl/spi/JobStorage.java URL: http://svn.apache.org/viewvc/sling/trunk/contrib/commons/mom/examples/jobs/core/src/main/java/org/apache/sling/jobs/impl/spi/JobStorage.java?rev=1754255&view=auto ============================================================================== --- sling/trunk/contrib/commons/mom/examples/jobs/core/src/main/java/org/apache/sling/jobs/impl/spi/JobStorage.java (added) +++ sling/trunk/contrib/commons/mom/examples/jobs/core/src/main/java/org/apache/sling/jobs/impl/spi/JobStorage.java Wed Jul 27 12:10:12 2016 @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.sling.jobs.impl.spi; + +import org.apache.sling.jobs.Job; + +import javax.annotation.Nonnull; +import javax.annotation.Nullable; + +/** + * Created by ieb on 29/03/2016. + * Provides JobStorage local to the JVM. Implementation may or may not decide to persist over restarts, page, etc + */ +public interface JobStorage { + + + /** + * Get a Job by ID. + * @param jobId the job ID to get. + * @return the job or null of the job doesn't exist. + */ + @Nullable + Job get(@Nonnull String jobId); + + /** + * Put a Job into the Job Storage. + * @param job the job. + * @return the job just added. + */ + @Nonnull + Job put(@Nonnull Job job); + + /** + * Remove the Job + * @param jobId + * @return the job removed or null if not present. + */ + @Nullable + Job remove(@Nonnull String jobId); + + /** + * Remove the Job, returning the job removed. + * @param job the job to remove. + * @return the job removed, if the the job was present, otherwise null. + */ + @Nullable + Job remove(@Nonnull Job job); + + /** + * Dispose of the JobStorage. + */ + void dispose(); + +} Propchange: sling/trunk/contrib/commons/mom/examples/jobs/core/src/main/java/org/apache/sling/jobs/impl/spi/JobStorage.java ------------------------------------------------------------------------------ svn:eol-style = native Added: sling/trunk/contrib/commons/mom/examples/jobs/core/src/main/java/org/apache/sling/jobs/impl/spi/MapValueAdapter.java URL: http://svn.apache.org/viewvc/sling/trunk/contrib/commons/mom/examples/jobs/core/src/main/java/org/apache/sling/jobs/impl/spi/MapValueAdapter.java?rev=1754255&view=auto ============================================================================== --- sling/trunk/contrib/commons/mom/examples/jobs/core/src/main/java/org/apache/sling/jobs/impl/spi/MapValueAdapter.java (added) +++ sling/trunk/contrib/commons/mom/examples/jobs/core/src/main/java/org/apache/sling/jobs/impl/spi/MapValueAdapter.java Wed Jul 27 12:10:12 2016 @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.sling.jobs.impl.spi; + +/** + * Objects that can be converted to and from a Map are expected to extend this base class. + * The values contained in the Map are expected to be Maps or values that can be serialised into most + * common formats. It would be safe to use json or yaml as an example of a common format. + * Created by ieb on 28/03/2016. + * + */ +public interface MapValueAdapter { + + + /** + * Populate the object from a map value. + * @param mapValue + */ + void fromMapValue(Object mapValue); + + /** + * Adapt the object into a value suitable for use in a map to be serialised by standard map -> json,yaml writers. + * @return a value, which may be a primitive, an array or a map of primitives. + */ + Object toMapValue(); + + + + +} Propchange: sling/trunk/contrib/commons/mom/examples/jobs/core/src/main/java/org/apache/sling/jobs/impl/spi/MapValueAdapter.java ------------------------------------------------------------------------------ svn:eol-style = native Added: sling/trunk/contrib/commons/mom/examples/jobs/core/src/main/java/org/apache/sling/jobs/impl/storage/InMemoryJobStorage.java URL: http://svn.apache.org/viewvc/sling/trunk/contrib/commons/mom/examples/jobs/core/src/main/java/org/apache/sling/jobs/impl/storage/InMemoryJobStorage.java?rev=1754255&view=auto ============================================================================== --- sling/trunk/contrib/commons/mom/examples/jobs/core/src/main/java/org/apache/sling/jobs/impl/storage/InMemoryJobStorage.java (added) +++ sling/trunk/contrib/commons/mom/examples/jobs/core/src/main/java/org/apache/sling/jobs/impl/storage/InMemoryJobStorage.java Wed Jul 27 12:10:12 2016 @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.sling.jobs.impl.storage; + +import org.apache.sling.jobs.Job; +import org.apache.sling.jobs.impl.spi.JobStorage; + +import javax.annotation.Nonnull; +import javax.annotation.Nullable; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +/** + * Created by ieb on 29/03/2016. + * An unbounded local JVM job store. + */ +public class InMemoryJobStorage implements JobStorage { + + + private Map<String, Job> store = new ConcurrentHashMap<String, Job>(); + + @Nullable + @Override + public Job get(@Nonnull String jobId) { + check(); + return store.get(jobId); + } + + @Nonnull + @Override + public Job put(@Nonnull Job job) { + check(); + store.put(job.getId(), job); + return job; + } + + @Nullable + @Override + public Job remove(@Nonnull String jobId) { + check(); + Job j = store.get(jobId); + store.remove(jobId); + return j; + } + + @Nullable + @Override + public Job remove(@Nonnull Job job) { + check(); + return remove(job.getId()); + } + + private void check() { + if ( store == null) { + throw new IllegalStateException("Job store already closed."); + } + } + + @Override + public void dispose() { + store.clear(); + store = null; + } +} Propchange: sling/trunk/contrib/commons/mom/examples/jobs/core/src/main/java/org/apache/sling/jobs/impl/storage/InMemoryJobStorage.java ------------------------------------------------------------------------------ svn:eol-style = native Added: sling/trunk/contrib/commons/mom/examples/jobs/core/src/main/java/org/apache/sling/jobs/package-info.java URL: http://svn.apache.org/viewvc/sling/trunk/contrib/commons/mom/examples/jobs/core/src/main/java/org/apache/sling/jobs/package-info.java?rev=1754255&view=auto ============================================================================== --- sling/trunk/contrib/commons/mom/examples/jobs/core/src/main/java/org/apache/sling/jobs/package-info.java (added) +++ sling/trunk/contrib/commons/mom/examples/jobs/core/src/main/java/org/apache/sling/jobs/package-info.java Wed Jul 27 12:10:12 2016 @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +@Version("1.0.0") +package org.apache.sling.jobs; + +import aQute.bnd.annotation.Version; + Propchange: sling/trunk/contrib/commons/mom/examples/jobs/core/src/main/java/org/apache/sling/jobs/package-info.java ------------------------------------------------------------------------------ svn:eol-style = native Added: sling/trunk/contrib/commons/mom/examples/jobs/core/src/test/java/org/apache/sling/jobs/impl/JobBuilderImplTest.java URL: http://svn.apache.org/viewvc/sling/trunk/contrib/commons/mom/examples/jobs/core/src/test/java/org/apache/sling/jobs/impl/JobBuilderImplTest.java?rev=1754255&view=auto ============================================================================== --- sling/trunk/contrib/commons/mom/examples/jobs/core/src/test/java/org/apache/sling/jobs/impl/JobBuilderImplTest.java (added) +++ sling/trunk/contrib/commons/mom/examples/jobs/core/src/test/java/org/apache/sling/jobs/impl/JobBuilderImplTest.java Wed Jul 27 12:10:12 2016 @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.sling.jobs.impl; + +import org.apache.sling.jobs.Job; +import org.apache.sling.jobs.JobController; +import org.apache.sling.jobs.Types; +import org.apache.sling.jobs.impl.spi.JobStarter; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.MockitoAnnotations; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; + +import java.util.HashMap; +import java.util.Map; +import java.util.Queue; +import java.util.concurrent.ArrayBlockingQueue; + +import static org.junit.Assert.*; + +/** + * Created by ieb on 05/04/2016. + * Tests job builder. + */ +public class JobBuilderImplTest { + + @Mock + private JobStarter jobStarter; + + private Queue<Job> queue; + @Mock + private JobController jobController; + + public JobBuilderImplTest() { + MockitoAnnotations.initMocks(this); + } + + @Before + public void before() { + queue = new ArrayBlockingQueue<Job>(1); + Mockito.when(jobStarter.start(Mockito.any(Job.class))).then(new Answer<Job>() { + @Override + public Job answer(InvocationOnMock invocationOnMock) throws Throwable { + queue.add((Job) invocationOnMock.getArguments()[0]); + return (Job) invocationOnMock.getArguments()[0]; + } + }); + } + + @Test + public void testAddJob() { + long start = System.currentTimeMillis(); + Map<String, Object> testMap = new HashMap<String, Object>(); + testMap.put("job.name", "Jobname"); + Job queuedJob = new JobBuilderImpl(jobStarter, Types.jobQueue("testtopic"), Types.jobType("testtype")).addProperties(testMap).add(); + assertEquals(1, queue.size()); + Job fromQueue = queue.remove(); + assertEquals(queuedJob, fromQueue); + assertEquals(Types.jobQueue("testtopic"), fromQueue.getQueue()); + assertEquals("Jobname", fromQueue.getProperties().get("job.name")); + assertNotNull(fromQueue.getId()); + long now = System.currentTimeMillis(); + assertTrue(fromQueue.getCreated() >= start); + assertTrue(fromQueue.getCreated() <= now); + assertEquals(Job.JobState.CREATED, fromQueue.getJobState()); + assertNull(fromQueue.getController()); + fromQueue.setJobController(jobController); + assertEquals(jobController, fromQueue.getController()); + fromQueue.removeJobController(); + assertNull(fromQueue.getController()); + assertEquals("", fromQueue.getResultMessage()); + assertEquals(0, fromQueue.getFinished()); + assertEquals(0, fromQueue.getStarted()); + assertEquals(0, fromQueue.getNumberOfRetries()); + assertEquals(0, fromQueue.getRetryCount()); + + } + +} \ No newline at end of file Propchange: sling/trunk/contrib/commons/mom/examples/jobs/core/src/test/java/org/apache/sling/jobs/impl/JobBuilderImplTest.java ------------------------------------------------------------------------------ svn:eol-style = native