Author: peter_firmstone Date: Thu Jan 2 05:19:25 2014 New Revision: 1554739
URL: http://svn.apache.org/r1554739 Log: Started working on ScheduledExecutors to take advantage of a ScheduledExecutorService to allow sharing of a thread pool for tasks that have dependencies, require order and must not execute until their dependencies have completed. Modified: river/jtsk/skunk/qa_refactor/trunk/src/org/apache/river/impl/thread/SynchronousExecutors.java river/jtsk/skunk/qa_refactor/trunk/test/src/org/apache/river/impl/thread/SynchronouExecutorsTest.java Modified: river/jtsk/skunk/qa_refactor/trunk/src/org/apache/river/impl/thread/SynchronousExecutors.java URL: http://svn.apache.org/viewvc/river/jtsk/skunk/qa_refactor/trunk/src/org/apache/river/impl/thread/SynchronousExecutors.java?rev=1554739&r1=1554738&r2=1554739&view=diff ============================================================================== --- river/jtsk/skunk/qa_refactor/trunk/src/org/apache/river/impl/thread/SynchronousExecutors.java (original) +++ river/jtsk/skunk/qa_refactor/trunk/src/org/apache/river/impl/thread/SynchronousExecutors.java Thu Jan 2 05:19:25 2014 @@ -25,11 +25,11 @@ import java.util.Iterator; import java.util.List; import java.util.Queue; import java.util.Random; +import java.util.concurrent.BlockingQueue; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; -import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; @@ -43,7 +43,9 @@ import java.util.logging.Logger; import org.apache.river.api.util.Startable; /** - * The intent of this Executor is to groups tasks into synchronous queues. + * The intent of this Executor is to share a single thread pool among tasks with + * dependencies that prevent them running concurrently. + * * @author peter */ public class SynchronousExecutors implements Startable { @@ -89,31 +91,44 @@ public class SynchronousExecutors implem distributorThread.interrupt(); } - public <T> ExecutorService newExecutor(){ - QueueWrapper que = new QueueWrapper<T>(new LinkedBlockingQueue<Callable<T>>()); - ExecutorService serv = new SynchronousExecutor<T>(que, distributorWaiting, distributorLock, workToDo, pool); + /** + * The ExecutorService returned, supports a subset of ExecutorService + * methods, the intent of this executor is to serialize the execution + * of tasks, it is up to the BlockingQueue or caller to ensure order, only + * one task will execute at a time, that task will be retried if it fails, + * using a back off strategy of 1, 5 and 10 seconds, followed by 1, 1 and 5 + * minutes thereafter forever, no other task will execute until the task + * at the head of the queue is completed successfully. + * + * Tasks submitted must implement Callable, Runnable is not supported. + * + * @param <T> + * @param queue + * @return + */ + public <T> ExecutorService newSerialExecutor(BlockingQueue<Callable<T>> queue){ + QueueWrapper que = new QueueWrapper<T>(queue); + ExecutorService serv = new SerialExecutor<T>(que, distributorWaiting, distributorLock, workToDo); addQueue(que); return serv; } - private static class SynchronousExecutor<T> implements ExecutorService { + private static class SerialExecutor<T> implements ExecutorService { QueueWrapper<T> queue; AtomicBoolean waiting; final Lock lock; final Condition workToDo; - final ScheduledExecutorService pool; - SynchronousExecutor(QueueWrapper<T> queue, AtomicBoolean waiting, Lock lock, Condition cond, ScheduledExecutorService pool){ + SerialExecutor(QueueWrapper<T> queue, AtomicBoolean waiting, Lock lock, Condition cond){ this.queue = queue; this.waiting = waiting; this.lock = lock; workToDo = cond; - this.pool = pool; } @Override public void shutdown() { - throw new UnsupportedOperationException("Not supported yet."); //To change body of generated methods, choose Tools | Templates. + } @Override @@ -139,7 +154,7 @@ public class SynchronousExecutors implem @Override public <T> Future<T> submit(Callable<T> task) { if (task == null) throw new NullPointerException("task cannot be null"); - Task t = new Task<T>(task, queue, lock, workToDo, pool); + Task t = new Task<T>(task, queue, lock, workToDo); if (queue.offer(t)){ if (waiting.get() && !queue.stalled){ lock.lock(); @@ -350,18 +365,16 @@ public class SynchronousExecutors implem private final Lock executorLock; private final Condition waiting; private final Condition resultAwait; - private final ScheduledExecutorService exec; private int attempt; private volatile long retryTime; - Task(Callable<T> c, QueueWrapper wrapper, Lock executorLock, Condition distributorWaiting, ScheduledExecutorService exec){ + Task(Callable<T> c, QueueWrapper wrapper, Lock executorLock, Condition distributorWaiting){ task = c; queue = wrapper; this.waiting = distributorWaiting; resultAwait = queue.lock.newCondition(); attempt = 0; this.executorLock = executorLock; - this.exec = exec; } /** Modified: river/jtsk/skunk/qa_refactor/trunk/test/src/org/apache/river/impl/thread/SynchronouExecutorsTest.java URL: http://svn.apache.org/viewvc/river/jtsk/skunk/qa_refactor/trunk/test/src/org/apache/river/impl/thread/SynchronouExecutorsTest.java?rev=1554739&r1=1554738&r2=1554739&view=diff ============================================================================== --- river/jtsk/skunk/qa_refactor/trunk/test/src/org/apache/river/impl/thread/SynchronouExecutorsTest.java (original) +++ river/jtsk/skunk/qa_refactor/trunk/test/src/org/apache/river/impl/thread/SynchronouExecutorsTest.java Thu Jan 2 05:19:25 2014 @@ -1,7 +1,19 @@ /* - * To change this license header, choose License Headers in Project Properties. - * To change this template file, choose Tools | Templates - * and open the template in the editor. + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ package org.apache.river.impl.thread; @@ -14,6 +26,7 @@ import java.util.concurrent.ExecutionExc import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; +import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; @@ -59,9 +72,9 @@ public class SynchronouExecutorsTest { } catch (Exception ex) { ex.printStackTrace(System.out); } - ExecutorService exec = instance.newExecutor(); - Future future = exec.submit(new Exceptional()); - Object result = null; + ExecutorService exec = instance.newSerialExecutor(new LinkedBlockingQueue<Callable<String>>()); + Future<String> future = exec.submit(new Exceptional()); + String result = null; try { result = future.get(8, TimeUnit.MINUTES); } catch (InterruptedException ex) { @@ -171,15 +184,15 @@ public class SynchronouExecutorsTest { } - private static class Exceptional implements Callable { + private static class Exceptional<String> implements Callable<String> { private final AtomicInteger tries = new AtomicInteger(0); @Override - public Object call() throws Exception { + public String call() throws Exception { System.out.println("Task called at:"); System.out.println(System.currentTimeMillis()); int tri = tries.incrementAndGet(); if (tri < 7) throw new RemoteException("Dummy communication problem"); - return "success"; + return (String) "success"; } }
