This is an automated email from the ASF dual-hosted git repository. rombert pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/sling-org-apache-sling-jobs-it-services.git
commit 8df384053b49dd8672947bdc03de3a2d47e1fef8 Author: Ian Boston <[email protected]> AuthorDate: Mon Oct 3 16:05:02 2016 +0000 SLING-5645 moved jobs out of examples git-svn-id: https://svn.apache.org/repos/asf/sling/trunk@1763181 13f79535-47bb-0310-9956-ffa450edef68 --- pom.xml | 117 +++++++++++++++++ .../sling/jobs/it/services/AsyncJobConsumer.java | 142 +++++++++++++++++++++ .../sling/jobs/it/services/FullySyncJob.java | 68 ++++++++++ .../jobs/it/services/JobManagerTestComponent.java | 72 +++++++++++ testlaunch.jsp | 44 +++++++ 5 files changed, 443 insertions(+) diff --git a/pom.xml b/pom.xml new file mode 100644 index 0000000..f8dc191 --- /dev/null +++ b/pom.xml @@ -0,0 +1,117 @@ +<?xml version="1.0" encoding="ISO-8859-1"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> + <modelVersion>4.0.0</modelVersion> + <parent> + <groupId>org.apache.sling</groupId> + <artifactId>sling</artifactId> + <version>26</version> + <relativePath /> + </parent> + + <artifactId>org.apache.sling.jobs-it-services</artifactId> + <packaging>bundle</packaging> + <version>0.0.1-SNAPSHOT</version> + + <name>Apache Sling Jobs Service Integration Tests Bundle</name> + <description> + Integration tests for the Jobs implementation + </description> + + <scm> + <connection>scm:svn:http://svn.apache.org/repos/asf/sling/trunk/contrib/commons/mom/jobs/it-services</connection> + <developerConnection>scm:svn:https://svn.apache.org/repos/asf/sling/trunk/contrib/commons/mom/jobs/it-services</developerConnection> + <url>http://svn.apache.org/viewvc/sling/trunk/contrib/commons/mom/jobs/it-services</url> + </scm> + + <properties> + <site.jira.version.id>12315369</site.jira.version.id> + <sling.java.version>7</sling.java.version> + <exam.version>4.4.0</exam.version> + <url.version>2.4.5</url.version> + <bundle.build.dir>${basedir}/target</bundle.build.dir> + <bundle.file.name>${bundle.build.dir}/${project.build.finalName}.jar</bundle.file.name> + <min.port>37000</min.port> + <max.port>37999</max.port> + </properties> + + + <build> + <plugins> + <plugin> + <groupId>org.apache.felix</groupId> + <artifactId>maven-scr-plugin</artifactId> + </plugin> + <plugin> + <groupId>org.apache.felix</groupId> + <artifactId>maven-bundle-plugin</artifactId> + <extensions>true</extensions> + <configuration> + <instructions> + <Export-Package></Export-Package> + </instructions> + </configuration> + </plugin> + </plugins> + </build> + <reporting> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-javadoc-plugin</artifactId> + <configuration> + <excludePackageNames> + </excludePackageNames> + </configuration> + </plugin> + </plugins> + </reporting> + <dependencies> + <dependency> + <groupId>org.apache.sling</groupId> + <artifactId>org.apache.sling.jobs</artifactId> + <version>0.0.1-SNAPSHOT</version> + </dependency> + <dependency> + <groupId>org.apache.felix</groupId> + <artifactId>org.apache.felix.scr.annotations</artifactId> + </dependency> + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-api</artifactId> + </dependency> + <dependency> + <groupId>com.google.code.findbugs</groupId> + <artifactId>jsr305</artifactId> + <version>2.0.1</version> + </dependency> + <!-- Testing --> + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-simple</artifactId> + <scope>test</scope> + </dependency> + </dependencies> +</project> diff --git a/src/main/java/org/apache/sling/jobs/it/services/AsyncJobConsumer.java b/src/main/java/org/apache/sling/jobs/it/services/AsyncJobConsumer.java new file mode 100644 index 0000000..99d22f1 --- /dev/null +++ b/src/main/java/org/apache/sling/jobs/it/services/AsyncJobConsumer.java @@ -0,0 +1,142 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.sling.jobs.it.services; + +import org.apache.felix.scr.annotations.*; +import org.apache.sling.jobs.*; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nonnull; +import java.util.Map; +import java.util.concurrent.*; + +/** + * This job consumer consumes jobs from the job subsystem. It accepts the jobs into a queue and uses a thread pool to drain the queue. + * If the queue fills up, jobs are returned back to the jobsystem without being accepted. The size of the queue, the number of threads and + * the maximum number of threads should be tuned for maximum throughput at an acceptable resource usage level. Retuning the consumer + * will cause the queue to drain and restart. + * + * The contract this component makes with the JobSystem is that it will make best efforts to ensure that jobs it accepts into its queue are executed. + * + */ +@Component(immediate = true) +@Properties({ + @Property(name = JobConsumer.JOB_TYPES, cardinality = Integer.MAX_VALUE, value = { + AsyncJobConsumer.JOB_TYPE + }) +}) +@Service(value = JobConsumer.class) +public class AsyncJobConsumer implements JobConsumer { + + private static final Logger LOGGER = LoggerFactory.getLogger(AsyncJobConsumer.class); + + + public static final String JOB_TYPE = "treadding/asyncthreadpoolwithbacklog"; + + /** + * The core number of threads that can be used to run this job. This should be just large enough to ensure + * throughput without being so large as to impact other operations. Probably 1/2 the number of cores is a good + * starting point. + */ + @Property(intValue = 4) + private static final String CORE_THREAD_POOL_SIZE = "core-thread-pool-size"; + /** + * The maximum number of threads allocated to running this job. This should not be so large that it can + * create availability issues for the JVM, but large enough to clear the backlog before it experiences + * inefficiency due to overflow. + */ + @Property(intValue = 8) + private static final String MAC_THREAD_POOL_SIZE = "max-thread-pool-size"; + + /** + * This defines how many messages the component can queue for execution dequeing from the + * Job queue. This should be just large enough to ensure that the executing threads are kept busy + * but small enough to ensure that the shutdown is not blocked. Once into the queue there is some + * impression that the jobs will be executed as they have been dequeued from the message system. + * The deactivate will wait for the shutdown wait time, and then shut the queue down. + */ + @Property(intValue = 8) + private static final String MAX_QUEUED_BACKLOG = "max-queue-backlog"; + + /** + * This is the maximum time allowed to shut the queue down. It should be long enough to ensure that all jobs in + * the local queue can complete. The longer the local queue set in max-queue-backlog, the higher this value must be. + */ + @Property(longValue = 30) + private static final String SHUTDOWN_WAIT_SECONDS = "max-shutdown-wait"; + + private ExecutorService executor; + private LinkedBlockingQueue<Runnable> workQueue; + private long shutdownWaitSeconds; + + @Activate + public void activate(Map<String, Object> properites) { + int corePoolSize = (int) properites.get(CORE_THREAD_POOL_SIZE); + int maxPoolSize = (int) properites.get(MAC_THREAD_POOL_SIZE); + int maxBacklog = (int) properites.get(MAX_QUEUED_BACKLOG); + shutdownWaitSeconds = (long) properites.get(SHUTDOWN_WAIT_SECONDS); + workQueue = new LinkedBlockingQueue<Runnable>(maxBacklog); + executor = new ThreadPoolExecutor(corePoolSize, maxPoolSize, 60, TimeUnit.SECONDS, workQueue); + } + + @Deactivate + public void deactivate(Map<String, Object> properties) { + try { + executor.awaitTermination(shutdownWaitSeconds, TimeUnit.SECONDS); + } catch (InterruptedException e) { + LOGGER.error("Interrupted while waiting for queue to drain ",e); + } + executor.shutdown(); + } + + @Nonnull + @Override + public void execute(@Nonnull final Job initialState, @Nonnull final JobUpdateListener listener, @Nonnull final JobCallback callback) { + LOGGER.info("Got request to start job {} ", initialState); + initialState.setState(Job.JobState.QUEUED); + listener.update(initialState.newJobUpdateBuilder().command(JobUpdate.JobUpdateCommand.UPDATE_JOB).put("processing", "step1").build()); + // if the Job cant be queued locally, a RejectedExecutionException will be thrown, back to the scheduler and the job message will be put back into the queue to be retried some time later. + executor.submit(new Callable<Void>() { + @Override + public Void call() throws Exception { + initialState.setState(Job.JobState.ACTIVE); + listener.update(initialState.newJobUpdateBuilder().command(JobUpdate.JobUpdateCommand.UPDATE_JOB).put("processing", "step1").build()); + // DO some work here. + + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + LOGGER.debug(e.getMessage(), e); + } + listener.update(initialState.newJobUpdateBuilder().command(JobUpdate.JobUpdateCommand.UPDATE_JOB).put("processing", "step2").build()); + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + LOGGER.debug(e.getMessage(), e); + } + initialState.setState(Job.JobState.SUCCEEDED); + listener.update(initialState.newJobUpdateBuilder().command(JobUpdate.JobUpdateCommand.UPDATE_JOB).put("processing", "step3").build()); + callback.callback(initialState); + return null; + } + }); + } +} diff --git a/src/main/java/org/apache/sling/jobs/it/services/FullySyncJob.java b/src/main/java/org/apache/sling/jobs/it/services/FullySyncJob.java new file mode 100644 index 0000000..0221ff6 --- /dev/null +++ b/src/main/java/org/apache/sling/jobs/it/services/FullySyncJob.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.sling.jobs.it.services; + +import org.apache.felix.scr.annotations.*; +import org.apache.sling.jobs.*; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nonnull; + +/** + */ +@Component(immediate = true) +@Properties({ + @Property(name = JobConsumer.JOB_TYPES, cardinality = Integer.MAX_VALUE, value = { + FullySyncJob.JOB_TYPE + }) +}) +@Service(value = JobConsumer.class) +public class FullySyncJob implements JobConsumer { + + + public static final String JOB_TYPE = "treadding/inthreadoperation"; + private static final Logger LOGGER = LoggerFactory.getLogger(FullySyncJob.class); + + @Nonnull + @Override + public void execute(@Nonnull Job initialState, @Nonnull JobUpdateListener listener, @Nonnull JobCallback callback) { + LOGGER.info("Got request to start job {} ", initialState); + initialState.setState(Job.JobState.ACTIVE); + listener.update(initialState.newJobUpdateBuilder().command(JobUpdate.JobUpdateCommand.UPDATE_JOB).put("processing", "step1").build()); + + // DO some work here. + + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + LOGGER.debug(e.getMessage(), e); + } + listener.update(initialState.newJobUpdateBuilder().command(JobUpdate.JobUpdateCommand.UPDATE_JOB).put("processing", "step2").build()); + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + LOGGER.debug(e.getMessage(),e); + } + initialState.setState(Job.JobState.SUCCEEDED); + listener.update(initialState.newJobUpdateBuilder().command(JobUpdate.JobUpdateCommand.UPDATE_JOB).put("processing", "step3").build()); + callback.callback(initialState); + } +} diff --git a/src/main/java/org/apache/sling/jobs/it/services/JobManagerTestComponent.java b/src/main/java/org/apache/sling/jobs/it/services/JobManagerTestComponent.java new file mode 100644 index 0000000..ffba8ac --- /dev/null +++ b/src/main/java/org/apache/sling/jobs/it/services/JobManagerTestComponent.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.sling.jobs.it.services; + + +import com.google.common.collect.ImmutableMap; +import org.apache.felix.scr.annotations.Activate; +import org.apache.felix.scr.annotations.Component; +import org.apache.felix.scr.annotations.Deactivate; +import org.apache.felix.scr.annotations.Reference; +import org.apache.sling.jobs.Job; +import org.apache.sling.jobs.JobManager; +import org.apache.sling.jobs.Types; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Map; + +import static org.junit.Assert.*; + +/** + */ +@Component(immediate = true) +public class JobManagerTestComponent { + + private static final Logger LOGGER = LoggerFactory.getLogger(JobManagerTestComponent.class); + public static final String TOPIC = "org/apache/sling/jobs/it/services"; + @Reference + private JobManager jobManager; + + + @Activate + public void activate(Map<String,Object> props) { + for( int i = 0; i < 10; i++) { + Job job = jobManager.newJobBuilder(Types.jobQueue(TOPIC), Types.jobType(AsyncJobConsumer.JOB_TYPE)).addProperties( + ImmutableMap.of("jobtest", (Object) "jobtest")).add(); + assertNotNull(job); + LOGGER.info("Started Job {} ", job.getId()); + } + // then start 10 sync jobs. + for( int i = 0; i < 10; i++) { + Job job = jobManager.newJobBuilder(Types.jobQueue(TOPIC), Types.jobType(FullySyncJob.JOB_TYPE)).addProperties( + ImmutableMap.of("jobtest", (Object) "jobtest")).add(); + assertNotNull(job); + LOGGER.info("Started Job {} ", job.getId()); + } + } + + @Deactivate + public void deactivate(Map<String, Object> props) { + + } + + +} diff --git a/testlaunch.jsp b/testlaunch.jsp new file mode 100644 index 0000000..6f11d2b --- /dev/null +++ b/testlaunch.jsp @@ -0,0 +1,44 @@ +<%-- +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +--%><%@include file="/libs/foundation/global.jsp"%><% +%><%@page session="false" contentType="text/html; charset=utf-8" + pageEncoding="UTF-8" + import="org.apache.sling.api.resource.*, + java.util.*, + javax.jcr.*, + com.day.cq.search.*, + com.day.cq.wcm.api.*, + com.day.cq.dam.api.*, + org.apache.sling.jobs.*, + com.google.common.collect.*"%><% + + // This is an AEM Fiddle that runs some jobs. + + JobManager jobManager = sling.getService(JobManager.class); + for ( int i = 0; i < 100; i++ ) { + Job job = jobManager.newJobBuilder( + Types.jobQueue("org/apache/sling/jobs/it/services"), + Types.jobType("treadding/asyncthreadpoolwithbacklog")) + .addProperties( + ImmutableMap.of("jobtest", (Object) "jobtest")) + .add(); +%>Added Job <%= job.getId() %><br/><% + } +%> \ No newline at end of file -- To stop receiving notification emails like this one, please contact "[email protected]" <[email protected]>.
