Added: 
river/jtsk/skunk/qa_refactor/trunk/src/org/apache/river/impl/thread/SynchronousExecutors.java
URL: 
http://svn.apache.org/viewvc/river/jtsk/skunk/qa_refactor/trunk/src/org/apache/river/impl/thread/SynchronousExecutors.java?rev=1563596&view=auto
==============================================================================
--- 
river/jtsk/skunk/qa_refactor/trunk/src/org/apache/river/impl/thread/SynchronousExecutors.java
 (added)
+++ 
river/jtsk/skunk/qa_refactor/trunk/src/org/apache/river/impl/thread/SynchronousExecutors.java
 Sun Feb  2 12:31:09 2014
@@ -0,0 +1,529 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.river.impl.thread;
+
+import java.util.AbstractQueue;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Queue;
+import java.util.Random;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+import org.apache.river.api.util.Startable;
+
+/**
+ * The intent of this Executor is to share a single thread pool among tasks 
with
+ * dependencies that prevent them running concurrently.
+ * 
+ * @author peter
+ */
+public class SynchronousExecutors implements Startable {
+    private static final Logger logger = 
Logger.getLogger("org.apache.river.impl");
+    private final Lock distributorLock;
+    private final Condition workToDo;
+    private final List<Queue<Callable>> queues ;
+    private final Distributor distributor;
+    private final Thread distributorThread;
+    private final ScheduledExecutorService pool;
+    private final AtomicBoolean distributorWaiting;
+    
+    public SynchronousExecutors(ScheduledExecutorService pool){
+        queues = new ArrayList<Queue<Callable>>(24);
+        this.pool = pool;
+        distributorLock = new ReentrantLock();
+        workToDo = distributorLock.newCondition();
+        distributorWaiting = new AtomicBoolean(false);
+        distributor = new Distributor(queues, pool, distributorLock, workToDo, 
distributorWaiting);
+        distributorThread = new Thread(distributor ,"SynchronousQueueArray 
distributor");
+        distributorThread.setDaemon(true);
+    }
+    
+    void addQueue(Queue<Callable> queue){
+        synchronized (queues){
+            queues.add(queue);
+        }
+    }
+    
+    boolean removeQueue(Object queue){
+        synchronized (queues){
+            return queues.remove(queue);
+        }
+    }
+
+    @Override
+    public void start() throws Exception {
+        distributorThread.start();
+    }
+    
+    public void shutdown() {
+        distributorThread.interrupt();
+    }
+    
+    /**
+     * The ExecutorService returned, supports a subset of ExecutorService
+     * methods, the intent of this executor is to serialize the execution
+     * of tasks, it is up to the BlockingQueue or caller to ensure order, only 
+     * one task will execute at a time, that task will be retried if it fails,
+     * using a back off strategy of 1, 5 and 10 seconds, followed by 1, 1 and 5
+     * minutes thereafter forever, no other task will execute until the task
+     * at the head of the queue is completed successfully.
+     * 
+     * Tasks submitted must implement Callable, Runnable is not supported.
+     * 
+     * @param <T>
+     * @param queue
+     * @return 
+     */
+    public <T> ExecutorService newSerialExecutor(BlockingQueue<Callable<T>> 
queue){
+        QueueWrapper que = new QueueWrapper<T>(queue);
+        ExecutorService serv = new SerialExecutor<T>(que, distributorWaiting, 
distributorLock, workToDo, this);
+        addQueue(que);
+        return serv;
+    }
+    
+    private static class SerialExecutor<T> implements ExecutorService {
+        
+        QueueWrapper<T> queue;
+        AtomicBoolean waiting;
+        final Lock lock;
+        final Condition workToDo;
+        final SynchronousExecutors parent;
+        volatile boolean terminating;
+                
+
+        SerialExecutor(QueueWrapper<T> queue, AtomicBoolean waiting, Lock 
lock, Condition cond, SynchronousExecutors parent){
+            this.queue = queue;
+            this.waiting = waiting;
+            this.lock = lock;
+            workToDo = cond;
+            terminating = false;
+            this.parent = parent;
+        }
+        @Override
+        public void shutdown() {
+            terminating = true;
+        }
+
+        @Override
+        public List<Runnable> shutdownNow() {
+            terminating = true;
+            parent.removeQueue(queue);
+            return new ArrayList(queue);
+        }
+
+        @Override
+        public boolean isShutdown() {
+            throw new UnsupportedOperationException("Not supported yet."); 
//To change body of generated methods, choose Tools | Templates.
+        }
+
+        @Override
+        public boolean isTerminated() {
+            throw new UnsupportedOperationException("Not supported yet."); 
//To change body of generated methods, choose Tools | Templates.
+        }
+
+        @Override
+        public boolean awaitTermination(long timeout, TimeUnit unit) throws 
InterruptedException {
+            throw new UnsupportedOperationException("Not supported yet."); 
//To change body of generated methods, choose Tools | Templates.
+        }
+
+        @Override
+        public <T> Future<T> submit(Callable<T> task) {
+            if (terminating) throw new 
RejectedExecutionException("ExecutorService is shutting down");
+            if (task == null) throw new NullPointerException("task cannot be 
null");
+            Task t = new Task<T>(task, queue, lock, workToDo);
+            if (queue.offer(t)){
+                if (waiting.get() && !queue.stalled){
+                    lock.lock();
+                    try {
+                        workToDo.signalAll();
+                    } finally {
+                        lock.unlock();
+                    }
+                }
+                return t;
+            }
+            throw new RejectedExecutionException("task rejected, queue likely 
full");
+        }
+
+        @Override
+        public <T> Future<T> submit(Runnable task, T result) {
+            throw new UnsupportedOperationException("Not supported."); 
+        }
+
+        @Override
+        public Future<?> submit(Runnable task) {
+            throw new UnsupportedOperationException("Not supported."); 
+        }
+
+        @Override
+        public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> 
tasks) throws InterruptedException {
+            if (terminating) throw new 
RejectedExecutionException("ExecutorService is shutting down");
+            List<Future<T>> result = new ArrayList<Future<T>>(tasks.size());
+            Iterator<? extends Callable<T>> it = tasks.iterator();
+            while (it.hasNext()){
+                result.add(submit(it.next()));
+            }
+            return result;
+        }
+
+        @Override
+        public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> 
tasks, long timeout, TimeUnit unit) throws InterruptedException {
+            throw new UnsupportedOperationException("Not supported yet."); 
+        }
+
+        @Override
+        public <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws 
InterruptedException, ExecutionException {
+            throw new UnsupportedOperationException("Not supported yet."); 
+        }
+
+        @Override
+        public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long 
timeout, TimeUnit unit) throws InterruptedException, ExecutionException, 
TimeoutException {
+            throw new UnsupportedOperationException("Not supported yet."); 
//To change body of generated methods, choose Tools | Templates.
+        }
+
+        @Override
+        public void execute(Runnable command) {
+            throw new UnsupportedOperationException("Not supported.");
+        }
+        
+    }
+    
+    private static class Distributor implements Runnable {
+        
+//        private final Random selector = new Random();
+        private final List<Queue<Callable>> queues;
+        private final ScheduledExecutorService executor;
+        private final Lock lock;
+        private final Condition workToDo;
+        private final AtomicBoolean waiting;
+        
+        Distributor(List<Queue<Callable>> queues, ScheduledExecutorService 
executor, Lock lock, Condition workToDo, AtomicBoolean waiting){
+            this.queues = queues;
+            this.executor = executor;
+            this.lock = lock;
+            this.workToDo = workToDo;
+            this.waiting = waiting;
+        }
+
+        @Override
+        public void run() {
+            int nullCount = 0; // sequence of null tasks
+            int size = 0;
+            List<Callable> tasks = new ArrayList<Callable>(64);
+            try {
+                while (!Thread.currentThread().isInterrupted()){
+                    try {
+                        Queue<Callable> queue = null;
+                        synchronized (queues){
+                            size = queues.size();
+//                                if (size > 0){
+//                                    int index = selector.nextInt(size);
+//                                    queue = queues.get(index);
+//                                }
+                            for (int i = 0; i < size; i++){
+                                queue = queues.get(i);
+                                Callable task = queue != null ? queue.peek() : 
null;
+                                if (task != null) tasks.add(task);
+                                else nullCount++;
+                            }
+                        }
+
+                        Iterator <Callable> it = tasks.iterator();
+                        while (it.hasNext()){
+                            Callable task = it.next();
+                            long delay = 0;
+                            if (task instanceof Task) delay = 
((Task)task).delay();
+                            if (delay == 0) executor.submit(task);
+                            else executor.schedule(task, delay, 
TimeUnit.MILLISECONDS);
+                            nullCount = 0; // reset null count
+                        }
+                        tasks.clear();
+                        if (nullCount >= size ) {
+                            // Time for a nap.
+                            lock.lock();
+                            try {
+                                waiting.set(true);
+                                workToDo.await(2, TimeUnit.SECONDS);
+                            } catch (InterruptedException ex) {
+                                Thread.currentThread().interrupt(); // restore
+                            } finally {
+                                waiting.set(false);
+                                lock.unlock();
+                            }
+                        }
+                    } catch (Exception e){
+                        System.out.println(e);
+                        logger.log(Level.FINE, "Exception thrown by 
distributor: {0}", e);
+                    }
+                }// end while
+            } finally {
+                executor.shutdown();
+            }
+        }
+    }
+    
+    private static class QueueWrapper<T> extends AbstractQueue<Callable<T>> 
implements Queue<Callable<T>>{
+        
+        final ReentrantLock lock; // lock to control the head of the queue.
+        final Queue<Callable<T>> queue;
+        Callable<T> peek;// Only ever accessed by distributor thread.
+        boolean stalled;
+        
+        QueueWrapper(Queue<Callable<T>> queue){
+            this.queue = queue;
+            lock = new ReentrantLock();
+            peek = null;
+            stalled = false;
+        }
+
+        @Override
+        public Iterator<Callable<T>> iterator() {
+            return queue.iterator();
+        }
+
+        @Override
+        public int size() {
+            return queue.size();
+        }
+
+        @Override
+        public boolean offer(Callable<T> e) {
+            return queue.offer(e);
+        }
+
+        @Override
+        public Callable<T> poll() {
+            lock.lock();
+            try {
+                return queue.poll();
+            } finally {
+                lock.unlock();
+            }
+        }
+
+        @Override
+        public Callable<T> peek() {
+            boolean locked = lock.tryLock();
+            if (!locked) return null; // Pretend queue empty so director tries 
another queue.
+            try {
+                if (peek == null) {
+                    peek = queue.peek();
+                    return peek;
+                } else {
+                    stalled = true;
+                    return null; // Pretend queue empty so director tries 
another queue.
+                }
+            } finally {
+                lock.unlock();
+            }
+        }
+        
+    }
+    
+    private static class Task<T> implements Callable<T>, Future<T>, 
Comparable<Task> {
+        
+        /**
+         * Default delay backoff times. 
+         *
+         * @see #retryTime 
+         */
+        private static final long[] delays = {
+             0, // First value is never read
+             TimeUnit.SECONDS.toMillis(1),
+             TimeUnit.SECONDS.toMillis(5),
+             TimeUnit.SECONDS.toMillis(10),
+             TimeUnit.MINUTES.toMillis(1),
+             TimeUnit.MINUTES.toMillis(1),
+             TimeUnit.MINUTES.toMillis(5)
+        };
+        
+        
+        volatile boolean complete = false;
+        volatile boolean cancelled = false;
+        volatile T result = null;
+        volatile Exception exception = null;
+        volatile Thread executorThread;
+        private final Callable<T> task;
+        private final QueueWrapper queue;
+        private final Lock executorLock;
+        private final Condition waiting;
+        private final Condition resultAwait;
+        private final boolean comparable;
+        private int attempt;
+        private volatile long retryTime;
+        
+        Task(Callable<T> c, QueueWrapper wrapper, Lock executorLock, Condition 
distributorWaiting){
+            task = c;
+            comparable = task instanceof Comparable;
+            queue = wrapper;
+            this.waiting = distributorWaiting;
+            resultAwait = queue.lock.newCondition();
+            attempt = 0;
+            this.executorLock = executorLock;
+        }
+        
+        /**
+         * Return the next time at which we should make another attempt.
+         * This is <em>not</em> an interval, but the actual time.
+         * <p>
+         * The implementation is free to do as it pleases with the policy
+         * here.  The default implementation is to delay using intervals of
+         * 1 second, 5 seconds, 10 seconds, and 1 minute between
+         * attempts, and then retrying every five minutes forever.
+         * <p>
+         * The default implementation assumes it is being called from
+         * the default <code>run</code> method and that the current thread
+         * holds the lock on this object. If the caller does
+         * not own the lock the result is undefined and could result in an
+         * exception.
+         */
+        long delay() {
+            int index = (attempt < delays.length ? attempt : delays.length - 
1); 
+            return delays[index];
+        }
+
+        public T call() throws Exception {
+            if (cancelled) return null;
+            boolean reschedule = false;
+            queue.lock.lock();
+            try {
+                result = task.call();
+                if (((Task)queue.peek).task == task 
+                        && task == ((Task)queue.queue.peek()).task)
+                {
+                    queue.queue.poll(); // Remove successfully completed task 
from queue.
+                } 
+                queue.peek = null; // set peek to null to unblock queue.
+                queue.stalled = false;
+                executorLock.lock();
+                try {
+                    waiting.signalAll();
+                } finally {
+                    executorLock.unlock();
+                }
+                complete = true;
+                resultAwait.signalAll();
+                return result;
+            } catch (Exception e) {
+                exception = e;
+                reschedule = true;
+                throw e;
+            } finally {
+                if (reschedule) {
+                    attempt++;
+                    queue.peek = null; // set peek to null to unblock queue.
+                    executorLock.lock();
+                    try {
+                        waiting.signalAll();
+                    } finally {
+                        executorLock.unlock();
+                    }
+                }
+                queue.lock.unlock();
+            }
+        }
+
+        @Override
+        public boolean cancel(boolean mayInterruptIfRunning) {
+            throw new UnsupportedOperationException("Not supported yet."); 
//To change body of generated methods, choose Tools | Templates.
+        }
+
+        @Override
+        public boolean isCancelled() {
+            return cancelled;
+        }
+
+        @Override
+        public boolean isDone() {
+            return complete;
+        }
+
+        @Override
+        public T get() throws InterruptedException, ExecutionException {
+            if (complete) {
+                if (exception != null) throw new ExecutionException(exception);
+                return result;
+            }
+            queue.lock.lock();
+            try {
+                while (!complete){
+                    resultAwait.await();
+                }
+                return result;
+            } finally {
+                queue.lock.unlock();
+            }
+        }
+
+        @Override
+        public T get(long timeout, TimeUnit unit) throws InterruptedException, 
ExecutionException, TimeoutException {
+            if (complete) return result;
+            long begin = System.currentTimeMillis();
+            if (queue.lock.tryLock(timeout, unit)){
+                try {
+                    while (!complete){
+                        long remain = unit.toMillis(timeout) - 
(System.currentTimeMillis() - begin);
+                        if ( 1L > remain ) {
+                            if (exception != null) throw new 
ExecutionException(exception);
+                            throw new TimeoutException(
+                                    "Timed out while waiting for result");
+                        }
+                        resultAwait.await(remain, TimeUnit.MILLISECONDS);
+                    }
+                    return result;
+                } finally {
+                    queue.lock.unlock();
+                }
+            } else {
+                throw new TimeoutException("Timed out while waiting for lock");
+            }
+        }
+
+        @Override
+        public int compareTo(Task o) {
+            if (comparable) {
+                return ((Comparable) task).compareTo(o.task);
+            }
+            int h1 = task.hashCode();
+            int h2 = o.task.hashCode();
+            if ( h1 < h2 ) return -1;
+            if ( h1 > h2 ) return 1;
+            return 0;
+        }
+
+        
+    
+    }
+    
+}
\ No newline at end of file

Added: 
river/jtsk/skunk/qa_refactor/trunk/test/src/org/apache/river/impl/thread/SynchronousExecutorsTest.java
URL: 
http://svn.apache.org/viewvc/river/jtsk/skunk/qa_refactor/trunk/test/src/org/apache/river/impl/thread/SynchronousExecutorsTest.java?rev=1563596&view=auto
==============================================================================
--- 
river/jtsk/skunk/qa_refactor/trunk/test/src/org/apache/river/impl/thread/SynchronousExecutorsTest.java
 (added)
+++ 
river/jtsk/skunk/qa_refactor/trunk/test/src/org/apache/river/impl/thread/SynchronousExecutorsTest.java
 Sun Feb  2 12:31:09 2014
@@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.river.impl.thread;
+
+import java.rmi.RemoteException;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import static org.junit.Assert.*;
+
+/**
+ *
+ * @author peter
+ */
+public class SynchronousExecutorsTest {
+    
+    
+    
+    public SynchronousExecutorsTest() {
+    }
+    
+    @Before
+    public void setUp() {
+    }
+    
+    @After
+    public void tearDown() {
+    }
+
+    /**
+     * Test of newExecutor method, of class SynchronousQueueArrayExecutor.
+     */
+    @Test
+    public void testNewExecutor() {
+        System.out.println("newExecutor");
+        SynchronousExecutors instance = new SynchronousExecutors(new Exec());
+        try {
+            instance.start();
+        } catch (Exception ex) {
+            ex.printStackTrace(System.out);
+        }
+        ExecutorService exec = instance.newSerialExecutor(new 
LinkedBlockingQueue<Callable<String>>());
+        Future<String> future = exec.submit(new Exceptional());
+        String result = null;
+        try {
+            result = future.get(8, TimeUnit.MINUTES);
+        } catch (InterruptedException ex) {
+            ex.printStackTrace(System.out);
+        } catch (ExecutionException ex) {
+            ex.printStackTrace(System.out);
+        } catch (TimeoutException ex) {
+            ex.printStackTrace(System.out);
+        }
+        assertEquals("success", result);
+        instance.shutdown();
+    }
+    
+    private static class Exec implements ScheduledExecutorService {
+        
+        private final ScheduledExecutorService ses = 
Executors.newScheduledThreadPool(1);
+
+        @Override
+        public ScheduledFuture<?> schedule(Runnable command, long delay, 
TimeUnit unit) {
+            return ses.schedule(command, delay, unit);
+        }
+
+        @Override
+        public <V> ScheduledFuture<V> schedule(Callable<V> callable, long 
delay, TimeUnit unit) {
+            System.out.println(System.currentTimeMillis());
+            System.out.println("schedule:" + delay + unit);
+            return ses.schedule(callable, delay, unit);
+        }
+
+        @Override
+        public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long 
initialDelay, long period, TimeUnit unit) {
+            return ses.scheduleAtFixedRate(command, initialDelay, period, 
unit);
+        }
+
+        @Override
+        public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, 
long initialDelay, long delay, TimeUnit unit) {
+            return ses.scheduleWithFixedDelay(command, initialDelay, delay, 
unit);
+        }
+
+        @Override
+        public void shutdown() {
+            System.out.println("shutdown called at:");
+            System.out.println(System.currentTimeMillis());
+            ses.shutdown();
+        }
+
+        @Override
+        public List<Runnable> shutdownNow() {
+            return ses.shutdownNow();
+        }
+
+        @Override
+        public boolean isShutdown() {
+            return ses.isShutdown();
+        }
+
+        @Override
+        public boolean isTerminated() {
+            return ses.isTerminated();
+        }
+
+        @Override
+        public boolean awaitTermination(long timeout, TimeUnit unit) throws 
InterruptedException {
+            return ses.awaitTermination(timeout, unit);
+        }
+
+        @Override
+        public <T> Future<T> submit(Callable<T> task) {
+            System.out.println("submit called");
+            return ses.submit(task);
+        }
+
+        @Override
+        public <T> Future<T> submit(Runnable task, T result) {
+            return ses.submit(task, result);
+        }
+
+        @Override
+        public Future<?> submit(Runnable task) {
+            return ses.submit(task);
+        }
+
+        @Override
+        public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> 
tasks) throws InterruptedException {
+            return ses.invokeAll(tasks);
+        }
+
+        @Override
+        public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> 
tasks, long timeout, TimeUnit unit) throws InterruptedException {
+            return ses.invokeAll(tasks, timeout, unit);
+        }
+
+        @Override
+        public <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws 
InterruptedException, ExecutionException {
+            throw new UnsupportedOperationException("Not supported yet."); 
//To change body of generated methods, choose Tools | Templates.
+        }
+
+        @Override
+        public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long 
timeout, TimeUnit unit) throws InterruptedException, ExecutionException, 
TimeoutException {
+            throw new UnsupportedOperationException("Not supported yet."); 
//To change body of generated methods, choose Tools | Templates.
+        }
+
+        @Override
+        public void execute(Runnable command) {
+            throw new UnsupportedOperationException("Not supported yet."); 
//To change body of generated methods, choose Tools | Templates.
+        }
+        
+    }
+    
+    private static class Exceptional<String> implements Callable<String> {
+        private final AtomicInteger tries = new AtomicInteger(0);
+        @Override
+        public String  call() throws Exception {
+            System.out.println("Task called at:");
+            System.out.println(System.currentTimeMillis());
+            int tri = tries.incrementAndGet();
+            if (tri < 7) throw new RemoteException("Dummy communication 
problem");
+            return (String) "success";
+        }
+        
+    }
+}
+


Reply via email to