This is an automated email from the ASF dual-hosted git repository. martin_s pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/archiva-components.git
commit 5a26d3c39d1877a207eca5726bda522cee317899 Author: Martin Stockhammer <[email protected]> AuthorDate: Thu Nov 21 21:18:38 2019 +0100 Moving spring-taskqueue component to archiva-components repository --- spring-taskqueue/pom.xml | 96 ++++++ .../components/taskqueue/DefaultTaskQueue.java | 217 ++++++++++++ .../apache/archiva/components/taskqueue/Task.java | 33 ++ .../components/taskqueue/TaskEntryEvaluator.java | 31 ++ .../components/taskqueue/TaskExitEvaluator.java | 31 ++ .../archiva/components/taskqueue/TaskQueue.java | 72 ++++ .../components/taskqueue/TaskQueueException.java | 38 +++ .../taskqueue/TaskViabilityEvaluator.java | 39 +++ .../execution/TaskExecutionException.java | 38 +++ .../taskqueue/execution/TaskExecutor.java | 32 ++ .../taskqueue/execution/TaskQueueExecutor.java | 47 +++ .../execution/ThreadedTaskQueueExecutor.java | 370 +++++++++++++++++++++ .../src/main/resources/META-INF/spring-context.xml | 33 ++ spring-taskqueue/src/site/site.xml | 34 ++ .../components/taskqueue/ATaskEntryEvaluator.java | 37 +++ .../components/taskqueue/ATaskExitEvaluator.java | 37 +++ .../components/taskqueue/BTaskEntryEvaluator.java | 37 +++ .../components/taskqueue/BTaskExitEvaluator.java | 37 +++ .../components/taskqueue/BuildProjectTask.java | 163 +++++++++ .../BuildProjectTaskViabilityEvaluator.java | 63 ++++ .../components/taskqueue/TaskQueueTest.java | 184 ++++++++++ .../execution/BuildProjectTaskExecutor.java | 84 +++++ .../taskqueue/execution/TaskQueueExecutorTest.java | 108 ++++++ .../src/test/resources/log4j2-test.xml | 36 ++ .../src/test/resources/spring-context.xml | 63 ++++ 25 files changed, 1960 insertions(+) diff --git a/spring-taskqueue/pom.xml b/spring-taskqueue/pom.xml new file mode 100644 index 0000000..7455939 --- /dev/null +++ b/spring-taskqueue/pom.xml @@ -0,0 +1,96 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + ~ Licensed to the Apache Software Foundation (ASF) under one + ~ or more contributor license agreements. See the NOTICE file + ~ distributed with this work for additional information + ~ regarding copyright ownership. The ASF licenses this file + ~ to you under the Apache License, Version 2.0 (the + ~ "License"); you may not use this file except in compliance + ~ with the License. You may obtain a copy of the License at + ~ + ~ http://www.apache.org/licenses/LICENSE-2.0 + ~ + ~ Unless required by applicable law or agreed to in writing, + ~ software distributed under the License is distributed on an + ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + ~ KIND, either express or implied. See the License for the + ~ specific language governing permissions and limitations + ~ under the License. + --> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> + + <modelVersion>4.0.0</modelVersion> + + <parent> + <groupId>org.apache.archiva.components</groupId> + <artifactId>archiva-components</artifactId> + <version>3.0-SNAPSHOT</version> + </parent> + + <version>3.0-SNAPSHOT</version> + <artifactId>archiva-components-spring-taskqueue</artifactId> + + <name>Archiva Components :: Spring Task Queue</name> + + <properties> + <site.staging.base>${project.parent.basedir}</site.staging.base> + </properties> + + <url>${webUrl}/${project.artifactId}</url> + + <dependencies> + <dependency> + <groupId>org.apache.commons</groupId> + <artifactId>commons-lang3</artifactId> + </dependency> + + <dependency> + <groupId>javax.inject</groupId> + <artifactId>javax.inject</artifactId> + </dependency> + <dependency> + <groupId>javax.annotation</groupId> + <artifactId>javax.annotation-api</artifactId> + </dependency> + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-api</artifactId> + </dependency> + <dependency> + <groupId>org.springframework</groupId> + <artifactId>spring-core</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.springframework</groupId> + <artifactId>spring-beans</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.springframework</groupId> + <artifactId>spring-context</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.springframework</groupId> + <artifactId>spring-context-support</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.springframework</groupId> + <artifactId>spring-test</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.logging.log4j</groupId> + <artifactId>log4j-slf4j-impl</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.logging.log4j</groupId> + <artifactId>log4j-jcl</artifactId> + <scope>test</scope> + </dependency> + </dependencies> + +</project> diff --git a/spring-taskqueue/src/main/java/org/apache/archiva/components/taskqueue/DefaultTaskQueue.java b/spring-taskqueue/src/main/java/org/apache/archiva/components/taskqueue/DefaultTaskQueue.java new file mode 100644 index 0000000..087f900 --- /dev/null +++ b/spring-taskqueue/src/main/java/org/apache/archiva/components/taskqueue/DefaultTaskQueue.java @@ -0,0 +1,217 @@ +package org.apache.archiva.components.taskqueue; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; + + +/** + * @author <a href="mailto:[email protected]">Jason van Zyl</a> + * @author <a href="mailto:[email protected]">Trygve Laugstøl</a> + * + */ +public class DefaultTaskQueue + implements TaskQueue +{ + + private Logger logger = LoggerFactory.getLogger( getClass() ); + + private List<TaskEntryEvaluator> taskEntryEvaluators = new ArrayList<>(); + + private List<TaskExitEvaluator> taskExitEvaluators = new ArrayList<>(); + + private List<TaskViabilityEvaluator> taskViabilityEvaluators = new ArrayList<>(); + + private BlockingQueue<Task> queue = new LinkedBlockingQueue<>(); + + // ---------------------------------------------------------------------- + // Component Lifecycle + // ---------------------------------------------------------------------- + + // ---------------------------------------------------------------------- + // TaskQueue Implementation + // ---------------------------------------------------------------------- + + // ---------------------------------------------------------------------- + // Queue operations + // ---------------------------------------------------------------------- + + public boolean put( Task task ) + throws TaskQueueException + { + // ---------------------------------------------------------------------- + // Check that all the task entry evaluators accepts the task + // ---------------------------------------------------------------------- + + for ( TaskEntryEvaluator taskEntryEvaluator : taskEntryEvaluators ) + { + boolean result = taskEntryEvaluator.evaluate( task ); + + if ( !result ) + { + return false; + } + } + + // ---------------------------------------------------------------------- + // The task was accepted, enqueue it + // ---------------------------------------------------------------------- + + enqueue( task ); + + // ---------------------------------------------------------------------- + // Check that all the task viability evaluators accepts the task + // ---------------------------------------------------------------------- + + for ( TaskViabilityEvaluator taskViabilityEvaluator : taskViabilityEvaluators ) + { + Collection<Task> toBeRemoved = + taskViabilityEvaluator.evaluate( Collections.unmodifiableCollection( queue ) ); + + for ( Iterator<Task> it = toBeRemoved.iterator(); it.hasNext(); ) + { + Task t = it.next(); + + queue.remove( t ); + } + } + + return true; + } + + public Task take() + throws TaskQueueException + { + logger.debug( "take" ); + while ( true ) + { + Task task = dequeue(); + + if ( task == null ) + { + return null; + } + + for ( TaskExitEvaluator taskExitEvaluator : taskExitEvaluators ) + { + boolean result = taskExitEvaluator.evaluate( task ); + + if ( !result ) + { + // the task wasn't accepted; drop it. + task = null; + + break; + } + } + + if ( task != null ) + { + return task; + } + } + } + + public Task poll( int timeout, TimeUnit timeUnit ) + throws InterruptedException + { + logger.debug( "pool" ); + return queue.poll( timeout, timeUnit ); + } + + public boolean remove( Task task ) + throws ClassCastException, NullPointerException + { + return queue.remove( task ); + } + + public boolean removeAll( List tasks ) + throws ClassCastException, NullPointerException + { + return queue.removeAll( tasks ); + } + + + // ---------------------------------------------------------------------- + // Queue Inspection + // ---------------------------------------------------------------------- + + public List<Task> getQueueSnapshot() + throws TaskQueueException + { + return Collections.unmodifiableList( new ArrayList( queue ) ); + } + + // ---------------------------------------------------------------------- + // Queue Management + // ---------------------------------------------------------------------- + + private void enqueue( Task task ) + { + boolean success = queue.add( task ); + logger.debug( "enqueue success {}", success ); + } + + private Task dequeue() + { + logger.debug( "dequeue" ); + return queue.poll(); + } + + public List<TaskEntryEvaluator> getTaskEntryEvaluators() + { + return taskEntryEvaluators; + } + + public void setTaskEntryEvaluators( List<TaskEntryEvaluator> taskEntryEvaluators ) + { + this.taskEntryEvaluators = taskEntryEvaluators; + } + + public List<TaskExitEvaluator> getTaskExitEvaluators() + { + return taskExitEvaluators; + } + + public void setTaskExitEvaluators( List<TaskExitEvaluator> taskExitEvaluators ) + { + this.taskExitEvaluators = taskExitEvaluators; + } + + public List<TaskViabilityEvaluator> getTaskViabilityEvaluators() + { + return taskViabilityEvaluators; + } + + public void setTaskViabilityEvaluators( List<TaskViabilityEvaluator> taskViabilityEvaluators ) + { + this.taskViabilityEvaluators = taskViabilityEvaluators; + } +} diff --git a/spring-taskqueue/src/main/java/org/apache/archiva/components/taskqueue/Task.java b/spring-taskqueue/src/main/java/org/apache/archiva/components/taskqueue/Task.java new file mode 100644 index 0000000..5da122e --- /dev/null +++ b/spring-taskqueue/src/main/java/org/apache/archiva/components/taskqueue/Task.java @@ -0,0 +1,33 @@ +package org.apache.archiva.components.taskqueue; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * @author <a href="mailto:[email protected]">Jason van Zyl</a> + * @author <a href="mailto:[email protected]">Trygve Laugstøl</a> + * + */ +public interface Task +{ + /** + * @return the maximum time in milliseconds this task may run before it's cancelled. + */ + long getMaxExecutionTime(); +} diff --git a/spring-taskqueue/src/main/java/org/apache/archiva/components/taskqueue/TaskEntryEvaluator.java b/spring-taskqueue/src/main/java/org/apache/archiva/components/taskqueue/TaskEntryEvaluator.java new file mode 100644 index 0000000..5086655 --- /dev/null +++ b/spring-taskqueue/src/main/java/org/apache/archiva/components/taskqueue/TaskEntryEvaluator.java @@ -0,0 +1,31 @@ +package org.apache.archiva.components.taskqueue; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * @author <a href="mailto:[email protected]">Jason van Zyl</a> + * @author <a href="mailto:[email protected]">Trygve Laugstøl</a> + * + */ +public interface TaskEntryEvaluator<T extends Task> +{ + boolean evaluate( T task ) + throws TaskQueueException; +} diff --git a/spring-taskqueue/src/main/java/org/apache/archiva/components/taskqueue/TaskExitEvaluator.java b/spring-taskqueue/src/main/java/org/apache/archiva/components/taskqueue/TaskExitEvaluator.java new file mode 100644 index 0000000..22de4b4 --- /dev/null +++ b/spring-taskqueue/src/main/java/org/apache/archiva/components/taskqueue/TaskExitEvaluator.java @@ -0,0 +1,31 @@ +package org.apache.archiva.components.taskqueue; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * @author <a href="mailto:[email protected]">Jason van Zyl</a> + * @author <a href="mailto:[email protected]">Trygve Laugstøl</a> + * + */ +public interface TaskExitEvaluator<T extends Task> +{ + boolean evaluate( T task ) + throws TaskQueueException; +} diff --git a/spring-taskqueue/src/main/java/org/apache/archiva/components/taskqueue/TaskQueue.java b/spring-taskqueue/src/main/java/org/apache/archiva/components/taskqueue/TaskQueue.java new file mode 100644 index 0000000..e7b7dfb --- /dev/null +++ b/spring-taskqueue/src/main/java/org/apache/archiva/components/taskqueue/TaskQueue.java @@ -0,0 +1,72 @@ +package org.apache.archiva.components.taskqueue; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + + +import java.util.List; +import java.util.concurrent.TimeUnit; + + +/** + * @author <a href="mailto:[email protected]">Jason van Zyl</a> + * @author <a href="mailto:[email protected]">Trygve Laugstøl</a> + * + */ +public interface TaskQueue<T extends Task> +{ + + // ---------------------------------------------------------------------- + // Queue operations + // ---------------------------------------------------------------------- + + /** + * @param task The task to add to the queue. + * @return Returns true if the task was accepted into the queue. + */ + boolean put( T task ) + throws TaskQueueException; + + T take() + throws TaskQueueException; + + boolean remove( T task ) + throws ClassCastException, NullPointerException; + + boolean removeAll( List<T> tasks ) + throws ClassCastException, NullPointerException; + + // ---------------------------------------------------------------------- + // Queue Inspection + // ---------------------------------------------------------------------- + + List<T> getQueueSnapshot() + throws TaskQueueException; + + /** + * Retrieves and removes the head of the queue, waiting at most timeout timeUnit when no element is available. + * + * @param timeout time to wait, in timeUnit units + * @param timeUnit how to interpret the timeout parameter. + * @return the head of the queue, or null if the timeout elapsed + * @throws InterruptedException when this thread is interrupted while waiting + */ + T poll( int timeout, TimeUnit timeUnit ) + throws InterruptedException; +} diff --git a/spring-taskqueue/src/main/java/org/apache/archiva/components/taskqueue/TaskQueueException.java b/spring-taskqueue/src/main/java/org/apache/archiva/components/taskqueue/TaskQueueException.java new file mode 100644 index 0000000..34c6d58 --- /dev/null +++ b/spring-taskqueue/src/main/java/org/apache/archiva/components/taskqueue/TaskQueueException.java @@ -0,0 +1,38 @@ +package org.apache.archiva.components.taskqueue; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * @author <a href="mailto:[email protected]">Trygve Laugstøl</a> + * + */ +public class TaskQueueException + extends Exception +{ + public TaskQueueException( String message ) + { + super( message ); + } + + public TaskQueueException( String message, Throwable cause ) + { + super( message, cause ); + } +} diff --git a/spring-taskqueue/src/main/java/org/apache/archiva/components/taskqueue/TaskViabilityEvaluator.java b/spring-taskqueue/src/main/java/org/apache/archiva/components/taskqueue/TaskViabilityEvaluator.java new file mode 100644 index 0000000..a02cba7 --- /dev/null +++ b/spring-taskqueue/src/main/java/org/apache/archiva/components/taskqueue/TaskViabilityEvaluator.java @@ -0,0 +1,39 @@ +package org.apache.archiva.components.taskqueue; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import java.util.Collection; + +/** + * @author <a href="mailto:[email protected]">Jason van Zyl</a> + * @author <a href="mailto:[email protected]">Trygve Laugstøl</a> + * + */ +public interface TaskViabilityEvaluator<T extends Task> +{ + + /** + * @param tasks The tasks to evaluate + * @return Returns a list of tasks to remove from the queue. + * @throws TaskQueueException + */ + Collection<Task> evaluate( Collection<T> tasks ) + throws TaskQueueException; +} diff --git a/spring-taskqueue/src/main/java/org/apache/archiva/components/taskqueue/execution/TaskExecutionException.java b/spring-taskqueue/src/main/java/org/apache/archiva/components/taskqueue/execution/TaskExecutionException.java new file mode 100644 index 0000000..f260cc2 --- /dev/null +++ b/spring-taskqueue/src/main/java/org/apache/archiva/components/taskqueue/execution/TaskExecutionException.java @@ -0,0 +1,38 @@ +package org.apache.archiva.components.taskqueue.execution; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * @author <a href="mailto:[email protected]">Trygve Laugstøl</a> + * + */ +public class TaskExecutionException + extends Exception +{ + public TaskExecutionException( String message ) + { + super( message ); + } + + public TaskExecutionException( String message, Throwable cause ) + { + super( message, cause ); + } +} diff --git a/spring-taskqueue/src/main/java/org/apache/archiva/components/taskqueue/execution/TaskExecutor.java b/spring-taskqueue/src/main/java/org/apache/archiva/components/taskqueue/execution/TaskExecutor.java new file mode 100644 index 0000000..24410e2 --- /dev/null +++ b/spring-taskqueue/src/main/java/org/apache/archiva/components/taskqueue/execution/TaskExecutor.java @@ -0,0 +1,32 @@ +package org.apache.archiva.components.taskqueue.execution; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import org.apache.archiva.components.taskqueue.Task; + +/** + * @author <a href="mailto:[email protected]">Trygve Laugstøl</a> + * + */ +public interface TaskExecutor<T extends Task> +{ + void executeTask( T task ) + throws TaskExecutionException; +} diff --git a/spring-taskqueue/src/main/java/org/apache/archiva/components/taskqueue/execution/TaskQueueExecutor.java b/spring-taskqueue/src/main/java/org/apache/archiva/components/taskqueue/execution/TaskQueueExecutor.java new file mode 100644 index 0000000..9c54387 --- /dev/null +++ b/spring-taskqueue/src/main/java/org/apache/archiva/components/taskqueue/execution/TaskQueueExecutor.java @@ -0,0 +1,47 @@ +package org.apache.archiva.components.taskqueue.execution; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import org.apache.archiva.components.taskqueue.Task; + + +/** + * @author <a href="mailto:[email protected]">Trygve Laugstøl</a> + * + */ +public interface TaskQueueExecutor<T extends Task> +{ + + /** + * Returns the currently executing task. + * + * @return the currently executing task. + */ + T getCurrentTask(); + + /** + * Cancels execution of this task, if it's currently running. + * Does NOT remove it from the associated queue! + * + * @param task The task to cancel + * @return true if the task was cancelled, false if the task was not executing. + */ + boolean cancelTask( T task ); +} diff --git a/spring-taskqueue/src/main/java/org/apache/archiva/components/taskqueue/execution/ThreadedTaskQueueExecutor.java b/spring-taskqueue/src/main/java/org/apache/archiva/components/taskqueue/execution/ThreadedTaskQueueExecutor.java new file mode 100644 index 0000000..198d714 --- /dev/null +++ b/spring-taskqueue/src/main/java/org/apache/archiva/components/taskqueue/execution/ThreadedTaskQueueExecutor.java @@ -0,0 +1,370 @@ +package org.apache.archiva.components.taskqueue.execution; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import org.apache.archiva.components.taskqueue.Task; +import org.apache.archiva.components.taskqueue.TaskQueue; +import org.apache.commons.lang3.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.PostConstruct; +import javax.annotation.PreDestroy; +import java.util.concurrent.CancellationException; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +/** + * @author <a href="mailto:[email protected]">Trygve Laugstøl</a> + * @author <a href="mailto:[email protected]">Kenney Westerhof</a> + */ +public class ThreadedTaskQueueExecutor + implements TaskQueueExecutor +{ + + private Logger logger = LoggerFactory.getLogger( getClass() ); + + private static final int SHUTDOWN = 1; + + private static final int CANCEL_TASK = 2; + + /** + * requirement + */ + private TaskQueue queue; + + /** + * requirement + */ + private TaskExecutor executor; + + /** + * configuration + */ + private String name; + + // ---------------------------------------------------------------------- + // + // ---------------------------------------------------------------------- + + private ExecutorRunnable executorRunnable; + + private ExecutorService executorService; + + private Task currentTask; + + private class ExecutorRunnable + extends Thread + { + private volatile int command; + + private boolean done; + + public void run() + { + while ( command != SHUTDOWN ) + { + final Task task; + + currentTask = null; + + try + { + task = queue.poll( 100, TimeUnit.MILLISECONDS ); + } + catch ( InterruptedException e ) + { + logger.info( "Executor thread interrupted, command: {}", ( command == SHUTDOWN + ? "Shutdown" + : command == CANCEL_TASK ? "Cancel task" : "Unknown" ) ); + continue; + } + + if ( task == null ) + { + continue; + } + + currentTask = task; + + Future future = executorService.submit( new Runnable() + { + public void run() + { + try + { + executor.executeTask( task ); + } + catch ( TaskExecutionException e ) + { + logger.error( "Error executing task: {}", e.getMessage(), e ); + } + } + } ); + + try + { + waitForTask( task, future ); + } + catch ( ExecutionException e ) + { + logger.error( "Error executing task: {}", e.getMessage(), e ); + } + } + + currentTask = null; + + logger.info( "Executor thread '{}' exited.", name ); + + done = true; + + synchronized ( this ) + { + notifyAll(); + } + } + + private void waitForTask( Task task, Future future ) + throws ExecutionException + { + boolean stop = false; + + while ( !stop ) + { + try + { + if ( task.getMaxExecutionTime() == 0 ) + { + logger.debug( "Waiting indefinitely for task to complete" ); + future.get(); + return; + } + else + { + logger.debug( "Waiting at most {} ms for task completion", task.getMaxExecutionTime() ); + future.get( task.getMaxExecutionTime(), TimeUnit.MILLISECONDS ); + logger.debug( "Task completed within {} ms", task.getMaxExecutionTime() ); + return; + } + } + catch ( InterruptedException e ) + { + switch ( command ) + { + case SHUTDOWN: + { + logger.info( "Shutdown command received. Cancelling task." ); + cancel( future ); + return; + } + + case CANCEL_TASK: + { + command = 0; + logger.info( "Cancelling task" ); + cancel( future ); + return; + } + + default: + // when can this thread be interrupted, and should we ignore it if shutdown = false? + logger.warn( "Interrupted while waiting for task to complete; ignoring", e ); + break; + } + } + catch ( TimeoutException e ) + { + logger.warn( "Task {} didn't complete within time, cancelling it.", task ); + cancel( future ); + return; + } + catch ( CancellationException e ) + { + logger.warn( "The task was cancelled", e ); + return; + } + } + } + + private void cancel( Future future ) + { + if ( !future.cancel( true ) ) + { + if ( !future.isDone() && !future.isCancelled() ) + { + logger.warn( "Unable to cancel task" ); + } + else + { + logger.warn( + "Task not cancelled (Flags: done: {} cancelled: {})", future.isDone(), future.isCancelled() ); + } + } + else + { + logger.debug( "Task successfully cancelled" ); + } + } + + public synchronized void shutdown() + { + logger.debug( "Signalling executor thread to shutdown" ); + + command = SHUTDOWN; + + interrupt(); + } + + public synchronized boolean cancelTask( Task task ) + { + if ( !task.equals( currentTask ) ) + { + logger.debug( "Not cancelling task - it is not running" ); + return false; + } + + if ( command != SHUTDOWN ) + { + logger.debug( "Signalling executor thread to cancel task" ); + + command = CANCEL_TASK; + + interrupt(); + } + else + { + logger.debug( "Executor thread already stopping; task will be cancelled automatically" ); + } + + return true; + } + + public boolean isDone() + { + return done; + } + } + + // ---------------------------------------------------------------------- + // Component lifecycle + // ---------------------------------------------------------------------- + + @PostConstruct + public void start() + { + + if ( StringUtils.isBlank( name ) ) + { + throw new IllegalArgumentException( "'name' must be set." ); + } + + logger.info( "Starting task executor, thread name '{}'.", name ); + + this.executorService = Executors.newSingleThreadExecutor(); + + executorRunnable = new ExecutorRunnable(); + + executorRunnable.setDaemon( true ); + + executorRunnable.start(); + } + + @PreDestroy + public void stop() + { + executorRunnable.shutdown(); + + int maxSleep = 10 * 1000; // 10 seconds + + int interval = 1000; + + long endTime = System.currentTimeMillis() + maxSleep; + + while ( !executorRunnable.isDone() && executorRunnable.isAlive() ) + { + if ( System.currentTimeMillis() > endTime ) + { + logger.warn( "Timeout waiting for executor thread '{}' to stop, aborting", name ); + break; + } + + logger.info( "Waiting until task executor '{}' is idling...", name ); + + try + { + synchronized ( executorRunnable ) + { + executorRunnable.wait( interval ); + } + } + catch ( InterruptedException ex ) + { + // ignore + } + + // notify again, just in case. + executorRunnable.shutdown(); + } + } + + public Task getCurrentTask() + { + return currentTask; + } + + public synchronized boolean cancelTask( Task task ) + { + return executorRunnable.cancelTask( task ); + } + + public TaskQueue getQueue() + { + return queue; + } + + public void setQueue( TaskQueue queue ) + { + this.queue = queue; + } + + public TaskExecutor getExecutor() + { + return executor; + } + + public void setExecutor( TaskExecutor executor ) + { + this.executor = executor; + } + + public String getName() + { + return name; + } + + public void setName( String name ) + { + this.name = name; + } +} diff --git a/spring-taskqueue/src/main/resources/META-INF/spring-context.xml b/spring-taskqueue/src/main/resources/META-INF/spring-context.xml new file mode 100755 index 0000000..83a8857 --- /dev/null +++ b/spring-taskqueue/src/main/resources/META-INF/spring-context.xml @@ -0,0 +1,33 @@ +<?xml version="1.0"?> + +<!-- + ~ Licensed to the Apache Software Foundation (ASF) under one + ~ or more contributor license agreements. See the NOTICE file + ~ distributed with this work for additional information + ~ regarding copyright ownership. The ASF licenses this file + ~ to you under the Apache License, Version 2.0 (the + ~ "License"); you may not use this file except in compliance + ~ with the License. You may obtain a copy of the License at + ~ + ~ http://www.apache.org/licenses/LICENSE-2.0 + ~ + ~ Unless required by applicable law or agreed to in writing, + ~ software distributed under the License is distributed on an + ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + ~ KIND, either express or implied. See the License for the + ~ specific language governing permissions and limitations + ~ under the License. + --> +<beans xmlns="http://www.springframework.org/schema/beans" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xmlns:context="http://www.springframework.org/schema/context" + xsi:schemaLocation="http://www.springframework.org/schema/beans + http://www.springframework.org/schema/beans/spring-beans-3.0.xsd + http://www.springframework.org/schema/context + http://www.springframework.org/schema/context/spring-context-3.0.xsd" + default-lazy-init="true"> + + <context:annotation-config /> + <context:component-scan base-package="org.apache.archiva.components.taskqueue"/> + +</beans> diff --git a/spring-taskqueue/src/site/site.xml b/spring-taskqueue/src/site/site.xml new file mode 100644 index 0000000..c3b26e6 --- /dev/null +++ b/spring-taskqueue/src/site/site.xml @@ -0,0 +1,34 @@ +<?xml version="1.0" encoding="ISO-8859-1"?> +<!-- + ~ Licensed to the Apache Software Foundation (ASF) under one + ~ or more contributor license agreements. See the NOTICE file + ~ distributed with this work for additional information + ~ regarding copyright ownership. The ASF licenses this file + ~ to you under the Apache License, Version 2.0 (the + ~ "License"); you may not use this file except in compliance + ~ with the License. You may obtain a copy of the License at + ~ + ~ http://www.apache.org/licenses/LICENSE-2.0 + ~ + ~ Unless required by applicable law or agreed to in writing, + ~ software distributed under the License is distributed on an + ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + ~ KIND, either express or implied. See the License for the + ~ specific language governing permissions and limitations + ~ under the License. + --> + +<project name="Spring Taskqueue Component" > + + <publishDate format="yyyy-MM-dd" position="none" /> + + <body> + <menu ref="modules" /> + <menu ref="reports" /> + <menu ref="ASF" /> + <breadcrumbs> + <item name="Archiva Components" href="../index.html" /> + <item name="Spring Taskqueue" href="index.html" /> + </breadcrumbs> + </body> +</project> diff --git a/spring-taskqueue/src/test/java/org/apache/archiva/components/taskqueue/ATaskEntryEvaluator.java b/spring-taskqueue/src/test/java/org/apache/archiva/components/taskqueue/ATaskEntryEvaluator.java new file mode 100644 index 0000000..4f28006 --- /dev/null +++ b/spring-taskqueue/src/test/java/org/apache/archiva/components/taskqueue/ATaskEntryEvaluator.java @@ -0,0 +1,37 @@ +package org.apache.archiva.components.taskqueue; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import org.springframework.stereotype.Service; + +/** + * @author <a href="mailto:[email protected]">Trygve Laugstøl</a> + * + */ +@Service("taskEntryEvaluator#a") +public class ATaskEntryEvaluator + implements TaskEntryEvaluator +{ + public boolean evaluate( Task task ) + throws TaskQueueException + { + return ( (BuildProjectTask) task).isPassAEntryEvaluator(); + } +} diff --git a/spring-taskqueue/src/test/java/org/apache/archiva/components/taskqueue/ATaskExitEvaluator.java b/spring-taskqueue/src/test/java/org/apache/archiva/components/taskqueue/ATaskExitEvaluator.java new file mode 100644 index 0000000..6954fde --- /dev/null +++ b/spring-taskqueue/src/test/java/org/apache/archiva/components/taskqueue/ATaskExitEvaluator.java @@ -0,0 +1,37 @@ +package org.apache.archiva.components.taskqueue; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import org.springframework.stereotype.Service; + +/** + * @author <a href="mailto:[email protected]">Trygve Laugstøl</a> + * + */ +@Service("taskExitEvaluator#a") +public class ATaskExitEvaluator + implements TaskExitEvaluator +{ + public boolean evaluate( Task task ) + throws TaskQueueException + { + return ( (BuildProjectTask) task ).isPassAExitEvaluator(); + } +} diff --git a/spring-taskqueue/src/test/java/org/apache/archiva/components/taskqueue/BTaskEntryEvaluator.java b/spring-taskqueue/src/test/java/org/apache/archiva/components/taskqueue/BTaskEntryEvaluator.java new file mode 100644 index 0000000..cd3b031 --- /dev/null +++ b/spring-taskqueue/src/test/java/org/apache/archiva/components/taskqueue/BTaskEntryEvaluator.java @@ -0,0 +1,37 @@ +package org.apache.archiva.components.taskqueue; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import org.springframework.stereotype.Service; + +/** + * @author <a href="mailto:[email protected]">Trygve Laugstøl</a> + * + */ +@Service("taskEntryEvaluator#b") +public class BTaskEntryEvaluator + implements TaskEntryEvaluator +{ + public boolean evaluate( Task task ) + throws TaskQueueException + { + return ( (BuildProjectTask) task ).isPassBEntryEvaluator(); + } +} diff --git a/spring-taskqueue/src/test/java/org/apache/archiva/components/taskqueue/BTaskExitEvaluator.java b/spring-taskqueue/src/test/java/org/apache/archiva/components/taskqueue/BTaskExitEvaluator.java new file mode 100644 index 0000000..e0b480e --- /dev/null +++ b/spring-taskqueue/src/test/java/org/apache/archiva/components/taskqueue/BTaskExitEvaluator.java @@ -0,0 +1,37 @@ +package org.apache.archiva.components.taskqueue; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import org.springframework.stereotype.Service; + +/** + * @author <a href="mailto:[email protected]">Trygve Laugstøl</a> + * + */ +@Service("taskExitEvaluator#b") +public class BTaskExitEvaluator + implements TaskExitEvaluator +{ + public boolean evaluate( Task task ) + throws TaskQueueException + { + return ( (BuildProjectTask) task ).isPassBExitEvaluator(); + } +} diff --git a/spring-taskqueue/src/test/java/org/apache/archiva/components/taskqueue/BuildProjectTask.java b/spring-taskqueue/src/test/java/org/apache/archiva/components/taskqueue/BuildProjectTask.java new file mode 100644 index 0000000..8a21742 --- /dev/null +++ b/spring-taskqueue/src/test/java/org/apache/archiva/components/taskqueue/BuildProjectTask.java @@ -0,0 +1,163 @@ +package org.apache.archiva.components.taskqueue; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import org.apache.archiva.components.taskqueue.Task; + +/** + * @author <a href="mailto:[email protected]">Trygve Laugstøl</a> + * + */ +public class BuildProjectTask + implements Task +{ + private boolean passAEntryEvaluator; + + private boolean passBEntryEvaluator; + + private boolean passAExitEvaluator; + + private boolean passBExitEvaluator; + + private long timestamp; + + private long maxExecutionTime; + + private long executionTime; + + private volatile boolean cancelled; + + private volatile boolean done; + + private volatile boolean started; + + private volatile boolean wasStarted = false; + + private boolean ignoreInterrupts; + + public BuildProjectTask( boolean passAEntryEvaluator, boolean passBEntryEvaluator, boolean passAExitEvaluator, + boolean passBExitEvaluator ) + { + this.passAEntryEvaluator = passAEntryEvaluator; + + this.passBEntryEvaluator = passBEntryEvaluator; + + this.passAExitEvaluator = passAExitEvaluator; + + this.passBExitEvaluator = passBExitEvaluator; + } + + public BuildProjectTask( long timestamp ) + { + this( true, true, true, true ); + + this.timestamp = timestamp; + } + + public boolean isPassAEntryEvaluator() + { + return passAEntryEvaluator; + } + + public boolean isPassBEntryEvaluator() + { + return passBEntryEvaluator; + } + + public boolean isPassAExitEvaluator() + { + return passAExitEvaluator; + } + + public boolean isPassBExitEvaluator() + { + return passBExitEvaluator; + } + + public long getTimestamp() + { + return timestamp; + } + + public long getMaxExecutionTime() + { + return maxExecutionTime; + } + + public void setMaxExecutionTime( long timeout ) + { + maxExecutionTime = timeout; + } + + public void setExecutionTime( long l ) + { + this.executionTime = l; + } + + public long getExecutionTime() + { + return executionTime; + } + + public boolean isCancelled() + { + return cancelled; + } + + public void cancel() + { + cancelled = true; + } + + public void done() + { + this.done = true; + } + + public boolean isDone() + { + return done; + } + + public boolean isStarted() + { + return started; + } + + public void start() + { + this.started = true; + this.wasStarted = true; + } + + public void setIgnoreInterrupts( boolean ignore ) + { + this.ignoreInterrupts = ignore; + } + + public boolean ignoreInterrupts() + { + return ignoreInterrupts; + } + + public boolean wasStarted() { + return wasStarted; + } +} diff --git a/spring-taskqueue/src/test/java/org/apache/archiva/components/taskqueue/BuildProjectTaskViabilityEvaluator.java b/spring-taskqueue/src/test/java/org/apache/archiva/components/taskqueue/BuildProjectTaskViabilityEvaluator.java new file mode 100644 index 0000000..1007199 --- /dev/null +++ b/spring-taskqueue/src/test/java/org/apache/archiva/components/taskqueue/BuildProjectTaskViabilityEvaluator.java @@ -0,0 +1,63 @@ +package org.apache.archiva.components.taskqueue; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import org.springframework.stereotype.Service; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Iterator; +import java.util.List; + +/** + * @author <a href="mailto:[email protected]">Trygve Laugstøl</a> + * + */ +@Service( "taskViabilityEvaluator#build-project" ) +public class BuildProjectTaskViabilityEvaluator + implements TaskViabilityEvaluator<BuildProjectTask> +{ + public Collection<Task> evaluate( Collection<BuildProjectTask> tasks ) + throws TaskQueueException + { + BuildProjectTask okTask = null; + + List<Task> toBeRemoved = new ArrayList<>( tasks.size() ); + + for ( Iterator<BuildProjectTask> it = tasks.iterator(); it.hasNext(); ) + { + BuildProjectTask buildProjectTask = it.next(); + + if ( okTask == null ) + { + okTask = buildProjectTask; + + continue; + } + + if ( buildProjectTask.getTimestamp() - okTask.getTimestamp() < 100 ) + { + toBeRemoved.add( buildProjectTask ); + } + } + + return toBeRemoved; + } +} diff --git a/spring-taskqueue/src/test/java/org/apache/archiva/components/taskqueue/TaskQueueTest.java b/spring-taskqueue/src/test/java/org/apache/archiva/components/taskqueue/TaskQueueTest.java new file mode 100644 index 0000000..5834ee3 --- /dev/null +++ b/spring-taskqueue/src/test/java/org/apache/archiva/components/taskqueue/TaskQueueTest.java @@ -0,0 +1,184 @@ +package org.apache.archiva.components.taskqueue; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import junit.framework.TestCase; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.springframework.test.context.ContextConfiguration; +import org.springframework.test.context.junit4.SpringJUnit4ClassRunner; + +import javax.inject.Inject; +import javax.inject.Named; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; + +/** + * @author <a href="mailto:[email protected]">Trygve Laugstøl</a> + * + */ +@RunWith( SpringJUnit4ClassRunner.class ) +@ContextConfiguration( locations = { "classpath*:/META-INF/spring-context.xml", "classpath*:/spring-context.xml" } ) +public class TaskQueueTest + extends TestCase +{ + @Inject + @Named( value = "taskQueue#taskQueueTest" ) + private TaskQueue taskQueue; + + + // NOTE: If we were using a blocking queue, the sleep/continue in the ThreadedTaskQueueExecutor wouldn't + // be necessary; the queue would block until an element was available. + @Test + public void testEmptyQueue() + throws Exception + { + assertNull( taskQueue.take() ); + } + + @Test + public void testTaskEntryAndExitEvaluators() + throws Exception + { + assertTaskIsAccepted( new BuildProjectTask( true, true, true, true ) ); + + assertTaskIsRejected( new BuildProjectTask( false, true, true, true ) ); + + assertTaskIsRejected( new BuildProjectTask( true, false, true, true ) ); + + assertTaskIsRejected( new BuildProjectTask( true, true, false, true ) ); + + assertTaskIsRejected( new BuildProjectTask( true, true, true, false ) ); + } + + @Test + public void testTaskViabilityEvaluators() + throws Exception + { + // The first and last task should be accepted + + Task task1 = new BuildProjectTask( 0 ); + + Task task2 = new BuildProjectTask( 10 ); + + Task task3 = new BuildProjectTask( 20 ); + + Task task4 = new BuildProjectTask( 30 ); + + Task task5 = new BuildProjectTask( 40 ); + + Task task6 = new BuildProjectTask( 100 ); + + assertTrue( taskQueue.put( task1 ) ); + + assertTrue( taskQueue.put( task2 ) ); + + assertTrue( taskQueue.put( task3 ) ); + + assertTrue( taskQueue.put( task4 ) ); + + assertTrue( taskQueue.put( task5 ) ); + + assertTrue( taskQueue.put( task6 ) ); + + Task actualTask1 = taskQueue.take(); + + assertNotNull( actualTask1 ); + + assertEquals( task1, actualTask1 ); + + Task actualTask6 = taskQueue.take(); + + assertNotNull( actualTask6 ); + + assertEquals( task6, actualTask6 ); + + assertNull( taskQueue.take() ); + } + + @Test + public void testRemoveTask() + throws Exception + { + Task task = new BuildProjectTask( 0 ); + + taskQueue.put( task ); + + taskQueue.remove( task ); + + assertNull( taskQueue.take() ); + } + + @Test + public void testRemoveAll() + throws Exception + { + + BlockingQueue<String> foo = new LinkedBlockingQueue<String>(); + foo.offer("1"); + foo.offer("2"); + + Task firstTask = new BuildProjectTask( 110 ); + + taskQueue.put( firstTask ); + + Task secondTask = new BuildProjectTask( 11120 ); + + taskQueue.put( secondTask ); + + assertEquals( 2, taskQueue.getQueueSnapshot().size() ); + + List<Task> tasks = new ArrayList<>(); + + tasks.add( firstTask ); + + tasks.add( secondTask ); + + taskQueue.removeAll( tasks ); + + assertTrue( taskQueue.getQueueSnapshot().isEmpty() ); + } + + // ---------------------------------------------------------------------- + // + // ---------------------------------------------------------------------- + + private void assertTaskIsAccepted( Task expectedTask ) + throws Exception + { + taskQueue.put( expectedTask ); + + Task actualTask = taskQueue.take(); + + assertEquals( expectedTask, actualTask ); + } + + private void assertTaskIsRejected( Task expectedTask ) + throws Exception + { + taskQueue.put( expectedTask ); + + Task actualTask = taskQueue.take(); + + assertNull( actualTask ); + } +} diff --git a/spring-taskqueue/src/test/java/org/apache/archiva/components/taskqueue/execution/BuildProjectTaskExecutor.java b/spring-taskqueue/src/test/java/org/apache/archiva/components/taskqueue/execution/BuildProjectTaskExecutor.java new file mode 100644 index 0000000..b39ed51 --- /dev/null +++ b/spring-taskqueue/src/test/java/org/apache/archiva/components/taskqueue/execution/BuildProjectTaskExecutor.java @@ -0,0 +1,84 @@ +package org.apache.archiva.components.taskqueue.execution; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import org.apache.archiva.components.taskqueue.execution.TaskExecutionException; +import org.apache.archiva.components.taskqueue.execution.TaskExecutor; +import org.apache.archiva.components.taskqueue.BuildProjectTask; +import org.apache.archiva.components.taskqueue.Task; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.stereotype.Service; + +/** + * @author <a href="mailto:[email protected]">Kenney Westerhof</a> + */ +@Service ( "taskExecutor#build-project" ) +public class BuildProjectTaskExecutor + implements TaskExecutor +{ + + private Logger logger = LoggerFactory.getLogger( getClass() ); + + public void executeTask( Task task0 ) + throws TaskExecutionException + { + BuildProjectTask task = (BuildProjectTask) task0; + + task.start(); + + logger.info( "Task:{} cancelled: {}; done: {}", task, task.isCancelled(), task.isDone() ); + + long time = System.currentTimeMillis(); + + long endTime = task.getExecutionTime() + time; + + for ( long timeToSleep = endTime - time; timeToSleep > 0; timeToSleep = endTime - System.currentTimeMillis() ) + { + try + { + logger.info( "Sleeping {} ms (interrupts ignored: {} )", timeToSleep, task.ignoreInterrupts() ); + Thread.sleep( timeToSleep ); + + task.done(); + + logger.info( "Task completed normally: {} cancelled: {}; done: {}", task, task.isCancelled(), + task.isDone() ); + } + catch ( InterruptedException e ) + { + if ( !task.ignoreInterrupts() ) + { + task.cancel(); + + logger.info( "Task cancelled: {} cancelled: {} ; done: {}", task, task.isCancelled(), + task.isDone() ); + + throw new TaskExecutionException( "Never interrupt sleeping threads! :)", e ); + } + else + { + logger.info( "Ignoring interrupt" ); + } + } + } + + } +} diff --git a/spring-taskqueue/src/test/java/org/apache/archiva/components/taskqueue/execution/TaskQueueExecutorTest.java b/spring-taskqueue/src/test/java/org/apache/archiva/components/taskqueue/execution/TaskQueueExecutorTest.java new file mode 100644 index 0000000..81982ea --- /dev/null +++ b/spring-taskqueue/src/test/java/org/apache/archiva/components/taskqueue/execution/TaskQueueExecutorTest.java @@ -0,0 +1,108 @@ +package org.apache.archiva.components.taskqueue.execution; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import junit.framework.TestCase; +import org.apache.archiva.components.taskqueue.execution.TaskQueueExecutor; +import org.apache.archiva.components.taskqueue.BuildProjectTask; +import org.apache.archiva.components.taskqueue.TaskQueue; +import org.apache.archiva.components.taskqueue.TaskQueueException; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.springframework.test.context.ContextConfiguration; +import org.springframework.test.context.junit4.SpringJUnit4ClassRunner; + +import javax.inject.Inject; +import javax.inject.Named; + +/** + * @author <a href="mailto:[email protected]">Kenney Westerhof</a> + */ +@RunWith( SpringJUnit4ClassRunner.class ) +@ContextConfiguration( locations = { "classpath*:/META-INF/spring-context.xml", "classpath*:/spring-context.xml" } ) +public class TaskQueueExecutorTest + extends TestCase +{ + @Inject + @Named( value = "taskQueue#default" ) + private TaskQueue taskQueue; + + // inject this to start the executor see @PostConstruct in {@link ThreadedTaskQueueExecutor + @Inject + @Named( value = "queueExecutor#default" ) + private TaskQueueExecutor taskQueueExecutor; + + + /** + * We run both tests in one test method, to avoid the shutdown of the executor + * + */ + @Test + public void testTimeoutWithInterrupts() + throws TaskQueueException, InterruptedException + { + BuildProjectTask task = putTask( 2 * 1000, false ); + + waitForExpectedTaskEnd( task ); + + assertTrue( task.isCancelled() ); + assertFalse( task.isDone() ); + + + task = putTask( 2 * 1000, true ); + + waitForExpectedTaskEnd( task ); + + // the thread is killed so the task is neither done nor cancelled + assertFalse( task.isCancelled() ); + assertFalse( task.isDone() ); + } + + private BuildProjectTask putTask( int executionTime, boolean ignoreInterrupts ) + throws TaskQueueException + { + BuildProjectTask task = new BuildProjectTask( 100 ); + task.setMaxExecutionTime( executionTime ); + task.setExecutionTime( 10 * executionTime ); + task.setIgnoreInterrupts( ignoreInterrupts ); + + taskQueue.put( task ); + return task; + } + + private static void waitForExpectedTaskEnd( BuildProjectTask task ) + throws InterruptedException + { + // thread scheduling may take some time, so we want to wait until the task + // is actually running before starting to count the timeout. + for ( int i = 0; i < 500; i++ ) + { + if ( task.wasStarted() ) + { + break; + } + Thread.sleep( 10 ); + } + + assertTrue( "Task not started in 5 seconds - heavy load?", task.isStarted() ); + + Thread.sleep( task.getMaxExecutionTime() ); + } +} diff --git a/spring-taskqueue/src/test/resources/log4j2-test.xml b/spring-taskqueue/src/test/resources/log4j2-test.xml new file mode 100644 index 0000000..2eb2827 --- /dev/null +++ b/spring-taskqueue/src/test/resources/log4j2-test.xml @@ -0,0 +1,36 @@ +<?xml version="1.0" encoding="UTF-8" ?> +<!-- + ~ Licensed to the Apache Software Foundation (ASF) under one + ~ or more contributor license agreements. See the NOTICE file + ~ distributed with this work for additional information + ~ regarding copyright ownership. The ASF licenses this file + ~ to you under the Apache License, Version 2.0 (the + ~ "License"); you may not use this file except in compliance + ~ with the License. You may obtain a copy of the License at + ~ + ~ http://www.apache.org/licenses/LICENSE-2.0 + ~ + ~ Unless required by applicable law or agreed to in writing, + ~ software distributed under the License is distributed on an + ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + ~ KIND, either express or implied. See the License for the + ~ specific language governing permissions and limitations + ~ under the License. + --> +<configuration> + <appenders> + <Console name="console" target="SYSTEM_OUT"> + <PatternLayout pattern="[%t] %-5p %c %x - %m%n"/> + </Console> + </appenders> + <loggers> + <logger name="org.apache.archiva" level="warn"/> + <logger name="org.apache.archiva.components.taskqueue" level="info"/> + + <root level="error" includeLocation="true"> + <appender-ref ref="console"/> + </root> + </loggers> +</configuration> + + diff --git a/spring-taskqueue/src/test/resources/spring-context.xml b/spring-taskqueue/src/test/resources/spring-context.xml new file mode 100644 index 0000000..b20dd11 --- /dev/null +++ b/spring-taskqueue/src/test/resources/spring-context.xml @@ -0,0 +1,63 @@ +<?xml version="1.0"?> + +<!-- + ~ Licensed to the Apache Software Foundation (ASF) under one + ~ or more contributor license agreements. See the NOTICE file + ~ distributed with this work for additional information + ~ regarding copyright ownership. The ASF licenses this file + ~ to you under the Apache License, Version 2.0 (the + ~ "License"); you may not use this file except in compliance + ~ with the License. You may obtain a copy of the License at + ~ + ~ http://www.apache.org/licenses/LICENSE-2.0 + ~ + ~ Unless required by applicable law or agreed to in writing, + ~ software distributed under the License is distributed on an + ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + ~ KIND, either express or implied. See the License for the + ~ specific language governing permissions and limitations + ~ under the License. + --> +<beans xmlns="http://www.springframework.org/schema/beans" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://www.springframework.org/schema/beans + http://www.springframework.org/schema/beans/spring-beans-3.0.xsd" + default-lazy-init="true"> + + <bean name="queueExecutor#default" class="org.apache.archiva.components.taskqueue.execution.ThreadedTaskQueueExecutor"> + <property name="queue" ref="taskQueue#default"/> + <property name="executor" ref="taskExecutor#build-project"/> + <property name="name" value="default"/> + </bean> + + <bean name="queueExecutor#taskQueueTest" class="org.apache.archiva.components.taskqueue.execution.ThreadedTaskQueueExecutor"> + <property name="queue" ref="taskQueue#taskQueueTest"/> + <property name="executor" ref="taskExecutor#build-project"/> + <property name="name" value="taskQueueTest"/> + </bean> + + <bean abstract="true" name="abstractQueue" class="org.apache.archiva.components.taskqueue.DefaultTaskQueue"> + <property name="taskEntryEvaluators"> + <list> + <ref bean="taskEntryEvaluator#a"/> + <ref bean="taskEntryEvaluator#b"/> + </list> + </property> + <property name="taskExitEvaluators"> + <list> + <ref bean="taskExitEvaluator#a"/> + <ref bean="taskExitEvaluator#b"/> + </list> + </property> + <property name="taskViabilityEvaluators"> + <list> + <ref bean="taskViabilityEvaluator#build-project"/> + </list> + </property> + </bean> + + <bean name="taskQueue#default" parent="abstractQueue"/> + + <bean name="taskQueue#taskQueueTest" parent="abstractQueue"/> + +</beans>
