POLYGENE-304 : Added library-execution with 3 initial features.
Project: http://git-wip-us.apache.org/repos/asf/polygene-java/repo Commit: http://git-wip-us.apache.org/repos/asf/polygene-java/commit/db99803f Tree: http://git-wip-us.apache.org/repos/asf/polygene-java/tree/db99803f Diff: http://git-wip-us.apache.org/repos/asf/polygene-java/diff/db99803f Branch: refs/heads/develop Commit: db99803fe4a4dd698c4e5694345c1672561483a2 Parents: dcea137 Author: niclas <nic...@hedhman.org> Authored: Thu Apr 26 12:57:59 2018 +0800 Committer: niclas <nic...@hedhman.org> Committed: Thu Apr 26 12:57:59 2018 +0800 ---------------------------------------------------------------------- .../apache/polygene/api/concern/ConcernOf.java | 4 +- libraries/execution/build.gradle | 37 +++ libraries/execution/dev-status.xml | 38 +++ libraries/execution/src/docs/execution.txt | 68 ++++++ .../polygene/library/execution/Retry.java | 66 +++++ .../library/execution/RetryConcern.java | 78 ++++++ .../assembly/ExecutionServiceAssembler.java | 242 +++++++++++++++++++ .../ScheduledExecutionServiceAssembler.java | 140 +++++++++++ .../library/execution/ExecutionServiceTest.java | 51 ++++ .../polygene/library/execution/RetryTest.java | 184 ++++++++++++++ .../ScheduledExecutionServiceTest.java | 46 ++++ manual/src/docs/userguide/libraries.txt | 4 + settings.gradle | 1 + 13 files changed, 957 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/polygene-java/blob/db99803f/core/api/src/main/java/org/apache/polygene/api/concern/ConcernOf.java ---------------------------------------------------------------------- diff --git a/core/api/src/main/java/org/apache/polygene/api/concern/ConcernOf.java b/core/api/src/main/java/org/apache/polygene/api/concern/ConcernOf.java index e872308..01c90f7 100644 --- a/core/api/src/main/java/org/apache/polygene/api/concern/ConcernOf.java +++ b/core/api/src/main/java/org/apache/polygene/api/concern/ConcernOf.java @@ -40,7 +40,7 @@ public abstract class ConcernOf<T> * the next concern in the chain or the mixin * to be invoked. */ - final + @SuppressWarnings( "ConstantConditions" ) @ConcernFor - protected T next = null; + protected final T next = null; } http://git-wip-us.apache.org/repos/asf/polygene-java/blob/db99803f/libraries/execution/build.gradle ---------------------------------------------------------------------- diff --git a/libraries/execution/build.gradle b/libraries/execution/build.gradle new file mode 100644 index 0000000..6863209 --- /dev/null +++ b/libraries/execution/build.gradle @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * + */ + +apply plugin: 'polygene-library' + +description = "Apache Polygene⢠Execution Library provides common set of execution primitives and services." + +jar { manifest { name = "Apache Polygene⢠Library - Constraints"}} + +dependencies { + api polygene.core.bootstrap + + api libraries.commons_validator + + runtimeOnly polygene.core.runtime + + testImplementation polygene.core.testsupport + + testRuntimeOnly libraries.logback +} http://git-wip-us.apache.org/repos/asf/polygene-java/blob/db99803f/libraries/execution/dev-status.xml ---------------------------------------------------------------------- diff --git a/libraries/execution/dev-status.xml b/libraries/execution/dev-status.xml new file mode 100644 index 0000000..8a476df --- /dev/null +++ b/libraries/execution/dev-status.xml @@ -0,0 +1,38 @@ +<?xml version="1.0" encoding="UTF-8" ?> +<!-- + ~ Licensed to the Apache Software Foundation (ASF) under one + ~ or more contributor license agreements. See the NOTICE file + ~ distributed with this work for additional information + ~ regarding copyright ownership. The ASF licenses this file + ~ to you under the Apache License, Version 2.0 (the + ~ "License"); you may not use this file except in compliance + ~ with the License. You may obtain a copy of the License at + ~ + ~ http://www.apache.org/licenses/LICENSE-2.0 + ~ + ~ Unless required by applicable law or agreed to in writing, software + ~ distributed under the License is distributed on an "AS IS" BASIS, + ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + ~ See the License for the specific language governing permissions and + ~ limitations under the License. + ~ + ~ + --> +<module xmlns="http://polygene.apache.org/schemas/2008/dev-status/1" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://polygene.apache.org/schemas/2008/dev-status/1 + http://polygene.apache.org/schemas/2008/dev-status/1/dev-status.xsd"> + <status> + <!--none,early,beta,stable,mature--> + <codebase>beta</codebase> + + <!-- none, brief, good, complete --> + <documentation>good</documentation> + + <!-- none, some, good, complete --> + <unittests>some</unittests> + </status> + <licenses> + <license>ALv2</license> + </licenses> +</module> http://git-wip-us.apache.org/repos/asf/polygene-java/blob/db99803f/libraries/execution/src/docs/execution.txt ---------------------------------------------------------------------- diff --git a/libraries/execution/src/docs/execution.txt b/libraries/execution/src/docs/execution.txt new file mode 100644 index 0000000..998c11d --- /dev/null +++ b/libraries/execution/src/docs/execution.txt @@ -0,0 +1,68 @@ +/////////////////////////////////////////////////////////////// + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. +/////////////////////////////////////////////////////////////// + +[[library-execution,Execution Library]] += Execution = + +== @Retry == +Any method can be annotated with the @Retry annotation, which means that if there is an exception thrown, then +the method should be called again, for a max number of times. + +The value() defines how many times the method will be called, if retry is triggered by the on() and unless() +parameters. + +The backoff() parameter is available to slow down the retries, which is useful for network operations or +external systems that may still need more time to become available. The default is no backoff is deployed and retries +are executed as fast as possible. + +The on() parameter defines which Throwable and subclasses should be considered for retrying the method call. The +default is all Throwables. + +The unless() parameter negates the on() parameter, and if a subclass of any Throwable listed in unless() is thrown +then the retry operation(s) will not take effect. + +== ExecutionService == +The =ExecutionService= is the =java.util.concurrent.ExecutorService= provided simply as a flexible assembler for +configuration of it. + +=== Configuration Parameters === +The Configuration parameters are available in the =ExecutionServiceAssembler= via a fluent API (DSL), with the +following methods + +[snippet,java] +---- +source=libraries/execution/src/main/java/org/apache/polygene/library/execution/assembly/ExecutionServiceAssembler.java +tag=configuration +---- + + +== ScheduledExecutionService == +The =ScheduledExecutionService= is the =java.util.concurrent.ScheduledExecutorService= provided simply as a flexible +assembler for configuration of it. + + +=== Configuration Parameters === +The Configuration parameters are available in the =ScheduledExecutionService= via a fluent API (DSL), with the +following methods + +[snippet,java] +---- +source=libraries/execution/src/main/java/org/apache/polygene/library/execution/assembly/ScheduledExecutionService.java +tag=configuration +---- http://git-wip-us.apache.org/repos/asf/polygene-java/blob/db99803f/libraries/execution/src/main/java/org/apache/polygene/library/execution/Retry.java ---------------------------------------------------------------------- diff --git a/libraries/execution/src/main/java/org/apache/polygene/library/execution/Retry.java b/libraries/execution/src/main/java/org/apache/polygene/library/execution/Retry.java new file mode 100644 index 0000000..d283273 --- /dev/null +++ b/libraries/execution/src/main/java/org/apache/polygene/library/execution/Retry.java @@ -0,0 +1,66 @@ +package org.apache.polygene.library.execution; + +import java.lang.annotation.Documented; +import java.lang.annotation.Inherited; +import java.lang.annotation.Retention; +import java.lang.annotation.Target; + +import static java.lang.annotation.ElementType.METHOD; +import static java.lang.annotation.RetentionPolicy.RUNTIME; + +/** + * @Retry is a method annotation to automatically call the method again if an exception was thrown. + * <p> + * By default, the method will be called twice if any {@link java.lang.Throwable} is thrown. By setting + * the value, one can increase that number, and the {@link #on()} and {@link #unless()} parameters can + * be used to select which Throwable (incl its subtypes) the retry will happen on. + * </p> + * <p> + * This can only be applied to idempotent methods, and keeping in mind the ordering of Concerns may + * be very significant. E.g. If the {@link RetryConcern} is "around" the + * {@link org.apache.polygene.api.unitofwork.concern.UnitOfWorkConcern} then depending on the parameters on + * the {@link org.apache.polygene.api.unitofwork.concern.UnitOfWorkPropagation} will determine if the + * method is still idempotent or not, in particular + * {@link org.apache.polygene.api.unitofwork.concern.UnitOfWorkPropagation.Propagation#REQUIRES_NEW}. Furthermore, + * {@link org.apache.polygene.api.unitofwork.concern.UnitOfWorkPropagation} has its own Retry mechanism independent + * of this one. + * </p> + */ +@Retention( RUNTIME ) +@Target( METHOD ) +@Inherited +@Documented +public @interface Retry +{ + /** + * Number of times that the method should be called. + * <p> + * This number must be 1 or greater, otherwise an {@link IllegalArgumentException} is thrown. + * </p> + */ + int value() default 2; + + /** + * List of Throwables that should trigger the Retry operation. + * <p> + * Default: All Throwables. + * </p> + */ + Class<? extends Throwable>[] on() default { Throwable.class }; + + /** + * List of Throwables that should NOT trigger the Retry operation, even if they are subclasses found in the on() value + * <p> + * Default: none. + * </p> + */ + Class<? extends Throwable>[] unless() default {}; + + /** + * Slowing down of retries. + * <p> + * If the backoff is greater than 0 (default), there will be a successive backoff of retrying the call, + * and starting with backoff() milliseconds, the sleep time between tries will double for each try. + */ + int backoff() default 0; +} http://git-wip-us.apache.org/repos/asf/polygene-java/blob/db99803f/libraries/execution/src/main/java/org/apache/polygene/library/execution/RetryConcern.java ---------------------------------------------------------------------- diff --git a/libraries/execution/src/main/java/org/apache/polygene/library/execution/RetryConcern.java b/libraries/execution/src/main/java/org/apache/polygene/library/execution/RetryConcern.java new file mode 100644 index 0000000..0d995ae --- /dev/null +++ b/libraries/execution/src/main/java/org/apache/polygene/library/execution/RetryConcern.java @@ -0,0 +1,78 @@ +package org.apache.polygene.library.execution; + +import java.lang.reflect.InvocationHandler; +import java.lang.reflect.Method; +import java.util.Arrays; +import java.util.HashSet; +import java.util.List; +import java.util.stream.Collectors; +import org.apache.polygene.api.common.AppliesTo; +import org.apache.polygene.api.concern.ConcernOf; +import org.apache.polygene.api.injection.scope.Invocation; + +import static org.apache.polygene.api.util.Classes.classHierarchy; + +@AppliesTo( Retry.class ) +public class RetryConcern extends ConcernOf<InvocationHandler> + implements InvocationHandler +{ + private final int retries; + private final HashSet<Class<? extends Throwable>> on; + private final HashSet<Object> unless; + private final int backoff; + + @SuppressWarnings( "unchecked" ) + public RetryConcern( @Invocation Retry annotation ) + { + this.retries = annotation.value(); + if( retries < 1 ) + { + throw new IllegalArgumentException( "@Retry must have a positive value greater than zero." ); + } + this.on = new HashSet<>(); + List<Class<? extends Throwable>> on = Arrays.asList( annotation.on() ); + this.on.addAll( on ); + + this.unless = new HashSet<>(); + List<Class<? extends Throwable>> unless = Arrays.asList( annotation.unless() ); + this.unless.addAll( unless ); + this.backoff = annotation.backoff(); + } + + @Override + @SuppressWarnings( { "SuspiciousMethodCalls", "ConstantConditions" } ) + public Object invoke( Object o, Method method, Object[] objects ) + throws Throwable + { + int count = retries; + long sleep = backoff; + while( true ) + { + try + { + return next.invoke( o, method, objects ); + } + catch( Throwable e ) + { + --count; + List<Class<?>> types = classHierarchy( e.getClass() ).collect( Collectors.toList() ); + for( Class<?> type : types ) + { + if( this.unless.contains( type ) ) + { + throw e; + } + if( count == 0 && this.on.contains( type )) + { + throw e; + } + } + if( sleep > 0 ) + { + Thread.sleep( sleep ); + sleep = sleep * 2; + } + } + } + } +} http://git-wip-us.apache.org/repos/asf/polygene-java/blob/db99803f/libraries/execution/src/main/java/org/apache/polygene/library/execution/assembly/ExecutionServiceAssembler.java ---------------------------------------------------------------------- diff --git a/libraries/execution/src/main/java/org/apache/polygene/library/execution/assembly/ExecutionServiceAssembler.java b/libraries/execution/src/main/java/org/apache/polygene/library/execution/assembly/ExecutionServiceAssembler.java new file mode 100644 index 0000000..22832ff --- /dev/null +++ b/libraries/execution/src/main/java/org/apache/polygene/library/execution/assembly/ExecutionServiceAssembler.java @@ -0,0 +1,242 @@ +package org.apache.polygene.library.execution.assembly; + +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.RejectedExecutionHandler; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import org.apache.polygene.api.service.ImportedServiceDescriptor; +import org.apache.polygene.api.service.ServiceImporter; +import org.apache.polygene.api.service.ServiceImporterException; +import org.apache.polygene.bootstrap.Assembler; +import org.apache.polygene.bootstrap.Assemblers; +import org.apache.polygene.bootstrap.AssemblyException; +import org.apache.polygene.bootstrap.ModuleAssembly; + +public class ExecutionServiceAssembler extends Assemblers.VisibilityIdentityConfig<ExecutionServiceAssembler> + implements Assembler +{ + + private ThreadFactory factory; + private RejectedExecutionHandler rejectedExecutionHandler; + private int coreThreads = 3; + private int maxThreads = 10; + private int maxTasks = 1000; + private long time = 1000; + private TimeUnit unit = TimeUnit.MILLISECONDS; + private ThreadGroup group; + private BlockingQueue<Runnable> queue; + + @Override + public void assemble( ModuleAssembly module ) + throws AssemblyException + { + module.importedServices( ExecutorService.class ) + .importedBy( ThreadPoolExecutorImporter.class ) + .setMetaInfo( this ) + ; + } + + /** + * Number of core threads, i.e. threads that are not removed if idle, to be used. + * + * @param threads core threads to use + * @return Fluent API + */ + // START SNIPPET: configuration + // Number of core threads, i.e. threads that are not removed if idle, to be used. + public ExecutionServiceAssembler withCoreThreads( int threads ) + // END SNIPPET: configuration + { + this.coreThreads = threads; + return this; + } + + /** + * Maximum number of threads to be used. + * + * @param threads max threads to use + * @return Fluent API + */ + // START SNIPPET: configuration + // Maximum number of threads to be used. + public ExecutionServiceAssembler withMaxThreads( int threads ) + // END SNIPPET: configuration + { + this.maxThreads = threads; + return this; + } + + /** + * Provide a custom ThreadFactory. + * <p> + * If defined, the {@link #inThreadGroup(ThreadGroup)} parameter will be ignored. + * </p> + * + * @param factory The thread factory to use, when creating threads. + * @return Fluent API + */ + // START SNIPPET: configuration + // Provide a custom ThreadFactory. If defined, the inThreadGroup parameter will be ignored. + public ExecutionServiceAssembler withThreadFactory( ThreadFactory factory ) + // END SNIPPET: configuration + { + this.factory = factory; + return this; + } + + /** + * Provide a custom {@link RejectedExecutionHandler}, or one of the pre-defined policies + * + * @param handler the custom {@link RejectedExecutionHandler} to use. + * @return Fluent API + */ + // START SNIPPET: configuration + // Provide a custom RejectedExecutionHandler, or one of the pre-defined policies + public ExecutionServiceAssembler withRejectedExecutionHandler( RejectedExecutionHandler handler ) + // END SNIPPET: configuration + { + this.rejectedExecutionHandler = handler; + return this; + } + + /** + * Max number of entries in queue. + * <p> + * Blocking occurs if more submissions are given. + * </p> + * <p> + * If a custom queue is defined, then this parameter will be ignored. + * </p> + * + * @param maxTasks Max number of tasks that can be added to the queue before blocking occurs. + * @return Fluent API + * @see LinkedBlockingQueue which is backing the {@link ThreadPoolExecutor} if a custom one is not provided. + */ + // START SNIPPET: configuration + // Max number of entries in queue. Blocking occurs if more submissions given + public ExecutionServiceAssembler withMaxQueueSize( int maxTasks ) + // END SNIPPET: configuration + { + this.maxTasks = maxTasks; + return this; + } + + /** + * Provide custom queue. + * <p> + * If a custom queue is defined, then the {@link #withMaxQueueSize(int)} parameter will be ignored. + * </p> + * + * @param queue The custom queue to use. + * @return Fluent API + */ + // START SNIPPET: configuration + // Provide custom queue. If used, the withMaxQueueSize is ignored + public ExecutionServiceAssembler withQueue( BlockingQueue<Runnable> queue ) + // END SNIPPET: configuration + { + this.queue = queue; + return this; + } + + /** + * For how long the threads should be kept around idling before discarded + * + * @param time The time to keep alive + * @param unit The unit in which the 'time' argument is expressed. + * @return Fluent API + */ + // START SNIPPET: configuration + // For how long the threads should be kept around idling before discarded + public ExecutionServiceAssembler withKeepAliveTime( int time, TimeUnit unit ) + // END SNIPPET: configuration + { + this.time = time; + this.unit = unit; + return this; + } + + /** + * Thread Group to create the threads in. + * <p> + * If a custom ThreadFactory is given, {@link #withThreadFactory(ThreadFactory)} then this parameter is ignored. + * </p> + * + * @param group The thread group that all threads should be placed in. + * @return Fluent API + */ + // START SNIPPET: configuration + // Thread Group to create the threads in + public ExecutionServiceAssembler inThreadGroup( ThreadGroup group ) + // END SNIPPET: configuration + { + this.group = group; + return this; + } + + private static class ThreadPoolExecutorImporter + implements ServiceImporter + { + long count = 0; + + @Override + public Object importService( ImportedServiceDescriptor serviceDescriptor ) + throws ServiceImporterException + { + ExecutionServiceAssembler metaInfo = serviceDescriptor.metaInfo( ExecutionServiceAssembler.class ); + ThreadGroup group; + if( metaInfo.group == null ) + { + group = new ThreadGroup( "tg-" + metaInfo.identity() ); + } + else + { + group = metaInfo.group; + } + ThreadFactory factory = metaInfo.factory; + if( factory == null ) + { + factory = runnable -> new Thread( group, runnable, "t-" + count++ ); + } + BlockingQueue<Runnable> queue; + if( metaInfo.queue == null ) + { + queue = new LinkedBlockingQueue<>( metaInfo.maxTasks ); + } + else + { + queue = metaInfo.queue; + } + if( metaInfo.rejectedExecutionHandler == null ) + { + return new ThreadPoolExecutor( metaInfo.coreThreads, + metaInfo.maxThreads, + metaInfo.time, + metaInfo.unit, + queue, + factory + ); + } + else + { + return new ThreadPoolExecutor( metaInfo.coreThreads, + metaInfo.maxThreads, + metaInfo.time, + metaInfo.unit, + queue, + factory, + metaInfo.rejectedExecutionHandler + ); + } + } + + @Override + public boolean isAvailable( Object instance ) + { + return false; + } + } +} http://git-wip-us.apache.org/repos/asf/polygene-java/blob/db99803f/libraries/execution/src/main/java/org/apache/polygene/library/execution/assembly/ScheduledExecutionServiceAssembler.java ---------------------------------------------------------------------- diff --git a/libraries/execution/src/main/java/org/apache/polygene/library/execution/assembly/ScheduledExecutionServiceAssembler.java b/libraries/execution/src/main/java/org/apache/polygene/library/execution/assembly/ScheduledExecutionServiceAssembler.java new file mode 100644 index 0000000..c76d2b0 --- /dev/null +++ b/libraries/execution/src/main/java/org/apache/polygene/library/execution/assembly/ScheduledExecutionServiceAssembler.java @@ -0,0 +1,140 @@ +package org.apache.polygene.library.execution.assembly; + +import java.util.concurrent.RejectedExecutionHandler; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.ThreadFactory; +import org.apache.polygene.api.service.ImportedServiceDescriptor; +import org.apache.polygene.api.service.ServiceImporter; +import org.apache.polygene.api.service.ServiceImporterException; +import org.apache.polygene.bootstrap.Assembler; +import org.apache.polygene.bootstrap.Assemblers; +import org.apache.polygene.bootstrap.AssemblyException; +import org.apache.polygene.bootstrap.ModuleAssembly; + +public class ScheduledExecutionServiceAssembler extends Assemblers.VisibilityIdentityConfig<ScheduledExecutionServiceAssembler> + implements Assembler +{ + + private ThreadFactory factory; + private RejectedExecutionHandler rejectedExecutionHandler; + private int coreThreads = 3; + private ThreadGroup group; + + @Override + public void assemble( ModuleAssembly module ) + throws AssemblyException + { + module.importedServices( ScheduledExecutorService.class ) + .importedBy( ThreadPoolExecutorImporter.class ) + .setMetaInfo( this ) + ; + } + + /** + * Number of core threads, i.e. threads that are not removed if idle, to be used. + * + * @param threads core threads to use + * @return Fluent API + */ + // START SNIPPET: configuration + // Number of core threads, i.e. threads that are not removed if idle, to be used. + public ScheduledExecutionServiceAssembler withCoreThreads( int threads ) + // END SNIPPET: configuration + { + this.coreThreads = threads; + return this; + } + + /** + * Provide a custom ThreadFactory. + * <p> + * If defined, the {@link #inThreadGroup(ThreadGroup)} parameter will be ignored. + * </p> + * + * @param factory The thread factory to use, when creating threads. + * @return Fluent API + */ + // START SNIPPET: configuration + // Provide a custom ThreadFactory. If defined, the inThreadGroup parameter will be ignored. + public ScheduledExecutionServiceAssembler withThreadFactory( ThreadFactory factory ) + // END SNIPPET: configuration + { + this.factory = factory; + return this; + } + + /** + * Provide a custom {@link RejectedExecutionHandler}, or one of the pre-defined policies + * + * @param handler the custom {@link RejectedExecutionHandler} to use. + * @return Fluent API + */ + // START SNIPPET: configuration + // Provide a custom RejectedExecutionHandler, or one of the pre-defined policies + public ScheduledExecutionServiceAssembler withRejectedExecutionHandler( RejectedExecutionHandler handler ) + // END SNIPPET: configuration + { + this.rejectedExecutionHandler = handler; + return this; + } + + /** + * Thread Group to create the threads in. + * <p> + * If a custom ThreadFactory is given, {@link #withThreadFactory(ThreadFactory)} then this parameter is ignored. + * </p> + * + * @param group The thread group that all threads should be placed in. + * @return Fluent API + */ + // START SNIPPET: configuration + // Thread Group to create the threads in + public ScheduledExecutionServiceAssembler inThreadGroup( ThreadGroup group ) + // END SNIPPET: configuration + { + this.group = group; + return this; + } + + private static class ThreadPoolExecutorImporter + implements ServiceImporter + { + long count = 0; + + @Override + public Object importService( ImportedServiceDescriptor serviceDescriptor ) + throws ServiceImporterException + { + ScheduledExecutionServiceAssembler metaInfo = serviceDescriptor.metaInfo( ScheduledExecutionServiceAssembler.class ); + ThreadGroup group; + if( metaInfo.group == null ) + { + group = new ThreadGroup( "tg-" + metaInfo.identity() ); + } + else + { + group = metaInfo.group; + } + ThreadFactory factory = metaInfo.factory; + if( factory == null ) + { + factory = runnable -> new Thread( group, runnable, "t-" + count++ ); + } + if( metaInfo.rejectedExecutionHandler == null ) + { + return new ScheduledThreadPoolExecutor( metaInfo.coreThreads, factory ); + } + else + { + return new ScheduledThreadPoolExecutor( metaInfo.coreThreads, factory, metaInfo.rejectedExecutionHandler ); + } + } + + @Override + public boolean isAvailable( Object instance ) + { + return false; + } + } +} http://git-wip-us.apache.org/repos/asf/polygene-java/blob/db99803f/libraries/execution/src/test/java/org/apache/polygene/library/execution/ExecutionServiceTest.java ---------------------------------------------------------------------- diff --git a/libraries/execution/src/test/java/org/apache/polygene/library/execution/ExecutionServiceTest.java b/libraries/execution/src/test/java/org/apache/polygene/library/execution/ExecutionServiceTest.java new file mode 100644 index 0000000..d39cc26 --- /dev/null +++ b/libraries/execution/src/test/java/org/apache/polygene/library/execution/ExecutionServiceTest.java @@ -0,0 +1,51 @@ +package org.apache.polygene.library.execution; + +import java.util.concurrent.CopyOnWriteArraySet; +import java.util.concurrent.ExecutorService; +import org.apache.polygene.bootstrap.AssemblyException; +import org.apache.polygene.bootstrap.ModuleAssembly; +import org.apache.polygene.library.execution.assembly.ExecutionServiceAssembler; +import org.apache.polygene.test.AbstractPolygeneTest; +import org.junit.jupiter.api.Test; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.core.IsEqual.equalTo; + +public class ExecutionServiceTest extends AbstractPolygeneTest +{ + + private CopyOnWriteArraySet<Thread> threads = new CopyOnWriteArraySet<>(); + + @Override + public void assemble( ModuleAssembly module ) + throws AssemblyException + { + new ExecutionServiceAssembler() + .withMaxThreads( 3 ) + .assemble( module ); + } + + @Test + void givenMaxThreeThreadsWhenSubmittingManyTasksExpectToOnlySeeThreeThreads() + throws InterruptedException + { + ExecutorService underTest = serviceFinder.findService( ExecutorService.class ).get(); + Runnable r = () -> { + threads.add( Thread.currentThread() ); + }; + underTest.submit( r ); + underTest.submit( r ); + underTest.submit( r ); + underTest.submit( r ); + underTest.submit( r ); + underTest.submit( r ); + underTest.submit( r ); + underTest.submit( r ); + underTest.submit( r ); + underTest.submit( r ); + underTest.submit( r ); + Thread.sleep( 10 ); + assertThat( threads.size(), equalTo( 3 ) ); + underTest.shutdownNow(); + } +} http://git-wip-us.apache.org/repos/asf/polygene-java/blob/db99803f/libraries/execution/src/test/java/org/apache/polygene/library/execution/RetryTest.java ---------------------------------------------------------------------- diff --git a/libraries/execution/src/test/java/org/apache/polygene/library/execution/RetryTest.java b/libraries/execution/src/test/java/org/apache/polygene/library/execution/RetryTest.java new file mode 100644 index 0000000..4ed6086 --- /dev/null +++ b/libraries/execution/src/test/java/org/apache/polygene/library/execution/RetryTest.java @@ -0,0 +1,184 @@ +package org.apache.polygene.library.execution; + +import org.apache.polygene.api.mixin.Mixins; +import org.apache.polygene.bootstrap.AssemblyException; +import org.apache.polygene.bootstrap.ModuleAssembly; +import org.apache.polygene.test.AbstractPolygeneTest; +import org.junit.jupiter.api.Test; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.greaterThan; +import static org.hamcrest.Matchers.greaterThanOrEqualTo; +import static org.hamcrest.Matchers.lessThan; +import static org.hamcrest.core.IsEqual.equalTo; + +// TODO: These tests are extremely slow. Why is that? Could it be the streaming of exception class hierarchies? +public class RetryTest extends AbstractPolygeneTest +{ + @Override + public void assemble( ModuleAssembly module ) + throws AssemblyException + { + module.services( TestService.class ).withConcerns( RetryConcern.class ).instantiateOnStartup(); + } + + @Test + void givenMethodThrowingExceptionWhenCallingExpectFourCalls() + { + TestService underTest = serviceFinder.findService( TestService.class ).get(); + long start = System.currentTimeMillis(); + try + { + underTest.doWithRetry1(); + } + catch( IllegalStateException e ) + { + // expected after 4 calls + } + long end = System.currentTimeMillis(); + assertThat( underTest.calledTimes(), equalTo(4)); + assertThat( end-start, lessThan(20L)); + } + + @Test + void givenMethodThrowingExceptionWhenCallingExpectFiveCalls() + { + TestService underTest = serviceFinder.findService( TestService.class ).get(); + long start = System.currentTimeMillis(); + try + { + underTest.doWithRetry2(); + } + catch( IllegalStateException e ) + { + // expected after 4 calls + } + long end = System.currentTimeMillis(); + assertThat( underTest.calledTimes(), equalTo(5)); + assertThat( end-start, lessThan(20L)); + } + + @Test + void givenMethodThrowingExceptionWhenCallingExpectSixCalls() + { + TestService underTest = serviceFinder.findService( TestService.class ).get(); + long start = System.currentTimeMillis(); + try + { + underTest.doWithRetry3(); + } + catch( IllegalStateException e ) + { + // expected after 4 calls + } + long end = System.currentTimeMillis(); + assertThat( underTest.calledTimes(), equalTo(6)); + assertThat( end-start, lessThan(20L)); + } + + @Test + void givenMethodThrowingExceptionWhenCallingExpectOneTries() + { + TestService underTest = serviceFinder.findService( TestService.class ).get(); + long start = System.currentTimeMillis(); + try + { + underTest.doWithRetry4(); + } + catch( IllegalStateException e ) + { + // expected after 1 calls, since IllegalStateException is an "unless" + } + long end = System.currentTimeMillis(); + assertThat( underTest.calledTimes(), equalTo(1)); + assertThat( end-start, lessThan(20L)); + } + + @Test + void givenBackoffExceptionWhenCallingExpectSlowTries() + { + TestService underTest = serviceFinder.findService( TestService.class ).get(); + long start = System.currentTimeMillis(); + try + { + underTest.doWithRetry5(); + } + catch( IllegalStateException e ) + { + // expected after 1 calls, since IllegalStateException is an "unless" + } + long end = System.currentTimeMillis(); + assertThat( underTest.calledTimes(), equalTo(3)); + assertThat( end-start, greaterThanOrEqualTo( 300L)); + } + + + @Mixins( TestMixin.class) + public interface TestService{ + + int calledTimes(); + + @Retry(4) + void doWithRetry1(); + + @Retry( value=5, on = IllegalStateException.class ) + void doWithRetry2(); + + @Retry( value=6, on = IllegalStateException.class ) + void doWithRetry3(); + + @Retry( value=7, unless = IllegalStateException.class ) + void doWithRetry4(); + + @Retry( value = 3, backoff = 100 ) + void doWithRetry5(); + } + + public class TestMixin + implements TestService + { + + private int called = 0; + + @Override + public int calledTimes() + { + return called; + } + + @Override + public void doWithRetry1() + { + called++; + throw new IllegalStateException(); + } + + @Override + public void doWithRetry2() + { + called++; + throw new IllegalStateException(); + } + + @Override + public void doWithRetry3() + { + called++; + throw new IllegalStateException(); + } + + @Override + public void doWithRetry4() + { + called++; + throw new IllegalStateException(); + } + + @Override + public void doWithRetry5() + { + called++; + throw new IllegalStateException(); + } + } +} http://git-wip-us.apache.org/repos/asf/polygene-java/blob/db99803f/libraries/execution/src/test/java/org/apache/polygene/library/execution/ScheduledExecutionServiceTest.java ---------------------------------------------------------------------- diff --git a/libraries/execution/src/test/java/org/apache/polygene/library/execution/ScheduledExecutionServiceTest.java b/libraries/execution/src/test/java/org/apache/polygene/library/execution/ScheduledExecutionServiceTest.java new file mode 100644 index 0000000..83c9e12 --- /dev/null +++ b/libraries/execution/src/test/java/org/apache/polygene/library/execution/ScheduledExecutionServiceTest.java @@ -0,0 +1,46 @@ +package org.apache.polygene.library.execution; + +import java.util.concurrent.CopyOnWriteArraySet; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import org.apache.polygene.bootstrap.AssemblyException; +import org.apache.polygene.bootstrap.ModuleAssembly; +import org.apache.polygene.library.execution.assembly.ExecutionServiceAssembler; +import org.apache.polygene.library.execution.assembly.ScheduledExecutionServiceAssembler; +import org.apache.polygene.test.AbstractPolygeneTest; +import org.junit.jupiter.api.Test; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.core.IsEqual.equalTo; + +public class ScheduledExecutionServiceTest extends AbstractPolygeneTest +{ + + private volatile AtomicInteger executed = new AtomicInteger( 0 ); + + @Override + public void assemble( ModuleAssembly module ) + throws AssemblyException + { + new ScheduledExecutionServiceAssembler() + .assemble( module ); + } + + @Test + void givenScheduleOfTenMillisWhenSubmittingTwoTasksFor105MillisExpect20Invocations() + throws InterruptedException + { + ScheduledExecutorService underTest = serviceFinder.findService( ScheduledExecutorService.class ).get(); + Runnable r = () -> { + executed.incrementAndGet(); + }; + underTest.scheduleAtFixedRate( r, 10, 10, TimeUnit.MILLISECONDS ); + underTest.scheduleAtFixedRate( r, 10, 10, TimeUnit.MILLISECONDS ); + Thread.sleep( 105 ); + assertThat( executed.intValue(), equalTo( 20 ) ); + underTest.shutdownNow(); + } + +} http://git-wip-us.apache.org/repos/asf/polygene-java/blob/db99803f/manual/src/docs/userguide/libraries.txt ---------------------------------------------------------------------- diff --git a/manual/src/docs/userguide/libraries.txt b/manual/src/docs/userguide/libraries.txt index b4bcce6..c07445c 100644 --- a/manual/src/docs/userguide/libraries.txt +++ b/manual/src/docs/userguide/libraries.txt @@ -47,6 +47,10 @@ include::../../../../libraries/constraints/src/docs/constraints.txt[] :leveloffset: 2 +include::../../../../libraries/execution/src/docs/execution.txt[] + +:leveloffset: 2 + include::../../../../libraries/fileconfig/src/docs/fileconfig.txt[] :leveloffset: 2 http://git-wip-us.apache.org/repos/asf/polygene-java/blob/db99803f/settings.gradle ---------------------------------------------------------------------- diff --git a/settings.gradle b/settings.gradle index 0f503e1..c0b9ed3 100644 --- a/settings.gradle +++ b/settings.gradle @@ -28,6 +28,7 @@ include 'core:api', 'libraries:alarm', 'libraries:circuitbreaker', 'libraries:constraints', + 'libraries:execution', 'libraries:fileconfig', 'libraries:http', 'libraries:invocation-cache',