Author: nextgens
Date: 2008-04-04 06:16:23 +0000 (Fri, 04 Apr 2008)
New Revision: 18977

Modified:
   trunk/freenet/src/freenet/support/PooledExecutor.java
Log:
More sync. fixes

Modified: trunk/freenet/src/freenet/support/PooledExecutor.java
===================================================================
--- trunk/freenet/src/freenet/support/PooledExecutor.java       2008-04-04 
06:12:05 UTC (rev 18976)
+++ trunk/freenet/src/freenet/support/PooledExecutor.java       2008-04-04 
06:16:23 UTC (rev 18977)
@@ -23,48 +23,48 @@
        private static boolean logMINOR;
        // Ticker thread that runs at maximum priority.
        private Ticker ticker;
-       
-       public void setTicker(Ticker ticker) {
+
+       public synchronized void setTicker(Ticker ticker) {
                this.ticker = ticker;
        }
-       
+
        public PooledExecutor() {
-               for(int i=0; i<runningThreads.length; i++) {
+               for(int i = 0; i < runningThreads.length; i++) {
                        runningThreads[i] = new ArrayList();
                        waitingThreads[i] = new ArrayList();
                        threadCounter[i] = 0;
                }
        }
-       
        /** Maximum time a thread will wait for a job */
-       static final int TIMEOUT = 5*60*1000;
-       
+       static final int TIMEOUT = 5 * 60 * 1000;
+
        public void start() {
                logMINOR = Logger.shouldLog(Logger.MINOR, this);
        }
-       
+
        public void execute(Runnable job, String jobName) {
                execute(job, jobName, false);
        }
-       
+
        public void execute(Runnable job, String jobName, boolean fromTicker) {
                int prio = NativeThread.NORM_PRIORITY;
-               if(job instanceof PrioRunnable) {
-                       prio = ((PrioRunnable)job).getPriority();
-               }
-               
-               if(logMINOR) Logger.minor(this, "Executing "+job+" as 
"+jobName+" at prio "+prio);
+               if(job instanceof PrioRunnable)
+                       prio = ((PrioRunnable) job).getPriority();
+
+               if(logMINOR)
+                       Logger.minor(this, "Executing " + job + " as " + 
jobName + " at prio " + prio);
                if(prio < NativeThread.MIN_PRIORITY || prio > 
NativeThread.MAX_PRIORITY)
-                       throw new IllegalArgumentException("Unreconized 
priority level : "+prio+'!');
+                       throw new IllegalArgumentException("Unreconized 
priority level : " + prio + '!');
                while(true) {
                        MyThread t;
                        boolean mustStart = false;
                        boolean miss = false;
                        synchronized(this) {
                                jobCount++;
-                               if(!waitingThreads[prio-1].isEmpty()) {
-                                       t = (MyThread) 
waitingThreads[prio-1].remove(waitingThreads[prio-1].size()-1);
-                                       if(logMINOR) Logger.minor(this, 
"Reusing thread "+t);
+                               if(!waitingThreads[prio - 1].isEmpty()) {
+                                       t = (MyThread) waitingThreads[prio - 
1].remove(waitingThreads[prio - 1].size() - 1);
+                                       if(logMINOR)
+                                               Logger.minor(this, "Reusing 
thread " + t);
                                } else {
                                        // Must create new thread
                                        if((!fromTicker) && 
NativeThread.usingNativeCode() && prio > Thread.currentThread().getPriority()) {
@@ -73,16 +73,18 @@
                                                return;
                                        }
                                        // Will be coalesced by thread count 
listings if we use "@" or "for"
-                                       t = new MyThread("Pooled thread 
awaiting work @"+(threadCounter[prio-1]), threadCounter[prio-1], prio, 
!fromTicker);
-                                       threadCounter[prio-1]++;
+                                       t = new MyThread("Pooled thread 
awaiting work @" + (threadCounter[prio - 1]), threadCounter[prio - 1], prio, 
!fromTicker);
+                                       threadCounter[prio - 1]++;
                                        t.setDaemon(true);
                                        mustStart = true;
                                        miss = true;
                                }
                        }
                        synchronized(t) {
-                               if(!t.alive) continue;
-                               if(t.nextJob != null) continue;
+                               if(!t.alive)
+                                       continue;
+                               if(t.nextJob != null)
+                                       continue;
                                t.nextJob = job;
                                if(!mustStart)
                                        // It is possible that we could get a 
wierd race condition with
@@ -90,44 +92,46 @@
                                        // level code. So we'd best use 
notifyAll().
                                        t.notifyAll();
                        }
-                       t.setName(jobName+"("+t.threadNo+")");
+                       t.setName(jobName + "(" + t.threadNo + ")");
                        if(mustStart) {
                                t.start();
                                synchronized(this) {
-                                       runningThreads[prio-1].add(t);
+                                       runningThreads[prio - 1].add(t);
                                        if(miss)
                                                jobMisses++;
                                        if(logMINOR)
-                                               Logger.minor(this, "Jobs: 
"+jobMisses+" misses of "+jobCount+" starting urgently "+jobName);
+                                               Logger.minor(this, "Jobs: " + 
jobMisses + " misses of " + jobCount + " starting urgently " + jobName);
                                }
-                       } else {
+                       } else
                                if(logMINOR)
-                                       Logger.minor(this, "Not starting: Jobs: 
"+jobMisses+" misses of "+jobCount+" starting urgently "+jobName);
-                       }
+                                       synchronized(this) {
+                                               Logger.minor(this, "Not 
starting: Jobs: " + jobMisses + " misses of " + jobCount + " starting urgently 
" + jobName);
+                                       }
                        return;
                }
        }

        public synchronized int[] runningThreads() {
                int[] result = new int[runningThreads.length];
-               for(int i=0; i<result.length; i++)
+               for(int i = 0; i < result.length; i++)
                        result[i] = runningThreads[i].size();
                return result;
        }
-       
+
        public synchronized int[] waitingThreads() {
                int[] result = new int[waitingThreads.length];
-               for(int i=0; i<result.length; i++)
+               for(int i = 0; i < result.length; i++)
                        result[i] = waitingThreads[i].size();
                return result;
        }
-       
+
        class MyThread extends NativeThread {
+
                final String defaultName;
                boolean alive = true;
                Runnable nextJob;
                final long threadNo;
-               
+
                public MyThread(String defaultName, long threadCounter, int 
prio, boolean dontCheckRenice) {
                        super(defaultName, prio, dontCheckRenice);
                        this.defaultName = defaultName;
@@ -140,53 +144,49 @@
                        int nativePriority = getNativePriority();
                        while(true) {
                                Runnable job;
-                               
+
                                synchronized(this) {
                                        job = nextJob;
                                        nextJob = null;
                                }
-                               
+
                                if(job == null) {
                                        synchronized(PooledExecutor.this) {
-                                               
waitingThreads[nativePriority-1].add(this);
+                                               waitingThreads[nativePriority - 
1].add(this);
                                        }
                                        synchronized(this) {
                                                if(nextJob == null) {
                                                        
this.setName(defaultName);
                                                        try {
                                                                wait(TIMEOUT);
-                                                       } catch 
(InterruptedException e) {
-                                                               // Ignore
+                                                       } 
catch(InterruptedException e) {
+                                                       // Ignore
                                                        }
                                                }
                                                job = nextJob;
                                                nextJob = null;
-                                               if(job == null) {
+                                               if(job == null)
                                                        alive = false;
-                                                       // execute() won't give 
us another job if alive = false
-                                               }
                                        }
                                        synchronized(PooledExecutor.this) {
-                                               
waitingThreads[nativePriority-1].remove(this);
+                                               waitingThreads[nativePriority - 
1].remove(this);
                                                if(!alive) {
-                                                       
runningThreads[nativePriority-1].remove(this);
+                                                       
runningThreads[nativePriority - 1].remove(this);
                                                        if(logMINOR)
-                                                               
Logger.minor(this, "Exiting having executed "+ranJobs+" jobs : "+this);
+                                                               
Logger.minor(this, "Exiting having executed " + ranJobs + " jobs : " + this);
                                                        return;
                                                }
                                        }
                                }
-                               
+
                                // Run the job
                                try {
                                        job.run();
-                               } catch (Throwable t) {
-                                       Logger.error(this, "Caught "+t+" 
running job "+job, t);
+                               } catch(Throwable t) {
+                                       Logger.error(this, "Caught " + t + " 
running job " + job, t);
                                }
                                ranJobs++;
                        }
                }
-               
        }
-       
 }


Reply via email to