Modified: river/jtsk/skunk/qa_refactor/trunk/src/com/sun/jini/norm/event/EventTypeGenerator.java URL: http://svn.apache.org/viewvc/river/jtsk/skunk/qa_refactor/trunk/src/com/sun/jini/norm/event/EventTypeGenerator.java?rev=1593493&r1=1593492&r2=1593493&view=diff ============================================================================== --- river/jtsk/skunk/qa_refactor/trunk/src/com/sun/jini/norm/event/EventTypeGenerator.java (original) +++ river/jtsk/skunk/qa_refactor/trunk/src/com/sun/jini/norm/event/EventTypeGenerator.java Fri May 9 07:03:18 2014 @@ -1,170 +1,170 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.sun.jini.norm.event; - -import com.sun.jini.thread.WakeupManager; -import java.io.IOException; -import java.io.ObjectInputStream; -import java.io.Serializable; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; -import org.apache.river.impl.thread.NamedThreadFactory; - -/** - * Factory class for <code>EventType</code> objects. All - * <code>EventType</code> objects created by the same generator (or - * associated with the same generator by a - * <code>EventType.restoreTransientState<code> call) will use the same - * thread pool to manage their event send threads. - * - * @author Sun Microsystems, Inc. - * @see EventType - * @see EventType#restoreTransientState - */ -public class EventTypeGenerator implements Serializable { - private static final long serialVersionUID = 1L; - - /** - * Next event ID. - * @serial - */ - private long nextEvID = 1; - - /** - * Task manager used to send events - */ - private transient ExecutorService taskManager = - new ThreadPoolExecutor( - 10, - 10, /* Ignored */ - 15, - TimeUnit.SECONDS, - new LinkedBlockingQueue<Runnable>(), /* Unbounded queue */ - new NamedThreadFactory("EventTypeGenerator", false) - ); - - /** - * Wakeup manager used by the event sending tasks to schedule - * retries. - */ - private transient WakeupManager wakeupManager = - new WakeupManager(new WakeupManager.ThreadDesc(null, false)); - - /** - * Create a new <code>EventType</code> object. The event ID for - * this type will be generated by this call. - * - * @param monitor Object to callback when an event sending - * attempt fails with a definite exception and to - * ensure that the lease on the event is still current. - * May not be <code>null</code>. - * @return the new <code>EventType</code> object - * @throws IOException if listener cannot be serialized - */ - public synchronized EventType newEventType(SendMonitor monitor) - throws IOException - { - EventType rslt = new EventType( - this, monitor, nextEvID, null, null); - // Do this afterward so we don't use up an ID if - // we can't create the new object - nextEvID++; - return rslt; - } - - /** - * Create a new <code>EventType</code> object specify the - * event id it should have. - * - * @param eventID the event ID of this type - * @param monitor Object to callback when an event sending - * attempt fails with a definite exception and to - * ensure that the lease on the event is still current. - * May not be <code>null</code>. - * @return the new <code>EventType</code> object - * @throws IOException if listener cannot be serialized - */ - public EventType newEventType(SendMonitor monitor, long eventID) - throws IOException - { - return new EventType(this, monitor, eventID, null, null); - } - - /** - * Called by event types during transient state recovery to ensure - * the generator knows about there event ID. - * <p> - * Note: this method is not synchronized. - * @param evID event ID of recovered <code>EventType</code> object - */ - synchronized void recoverEventID(long evID) { - if (evID >= nextEvID) - nextEvID = evID + 1; - } - - /** - * Return the task manager that <code>EventType</code> objects created - * by this generator should use to send their events. - */ - ExecutorService getTaskManager() { - return taskManager; - } - - /** - * Return the wakeup manager that <code>EventType</code> objects created - * by this generator should use to send their events. - */ - WakeupManager getWakeupManager() { - return wakeupManager; - } - - /** - * Terminate any independent treads started by event types - * associated with this generator. - */ - public void terminate() { - taskManager.shutdown(); - wakeupManager.stop(); - wakeupManager.cancelAll(); - } - - /** - * Override <code>readObject</code> to create a <code>TaskManager</code> - * and a <code>WakeupManager</code>. - * @see ObjectInputStream#defaultReadObject - */ - private void readObject(ObjectInputStream in) - throws IOException, ClassNotFoundException - { - // fill in the object from the stream - in.defaultReadObject(); - - taskManager = new ThreadPoolExecutor( - 10, - 10, /* Ignored */ - 15, - TimeUnit.SECONDS, - new LinkedBlockingQueue<Runnable>(), /* Unbounded Queue */ - new NamedThreadFactory("EventTypeGenerator", true) - ); - wakeupManager = - new WakeupManager(new WakeupManager.ThreadDesc(null, true)); - } -} +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.sun.jini.norm.event; + +import com.sun.jini.thread.WakeupManager; +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.Serializable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import org.apache.river.impl.thread.NamedThreadFactory; + +/** + * Factory class for <code>EventType</code> objects. All + * <code>EventType</code> objects created by the same generator (or + * associated with the same generator by a + * <code>EventType.restoreTransientState<code> call) will use the same + * thread pool to manage their event send threads. + * + * @author Sun Microsystems, Inc. + * @see EventType + * @see EventType#restoreTransientState + */ +public class EventTypeGenerator implements Serializable { + private static final long serialVersionUID = 1L; + + /** + * Next event ID. + * @serial + */ + private long nextEvID = 1; + + /** + * Task manager used to send events + */ + private transient ExecutorService taskManager = + new ThreadPoolExecutor( + 10, + 10, /* Ignored */ + 15, + TimeUnit.SECONDS, + new LinkedBlockingQueue<Runnable>(), /* Unbounded queue */ + new NamedThreadFactory("EventTypeGenerator", false) + ); + + /** + * Wakeup manager used by the event sending tasks to schedule + * retries. + */ + private transient WakeupManager wakeupManager = + new WakeupManager(new WakeupManager.ThreadDesc(null, false)); + + /** + * Create a new <code>EventType</code> object. The event ID for + * this type will be generated by this call. + * + * @param monitor Object to callback when an event sending + * attempt fails with a definite exception and to + * ensure that the lease on the event is still current. + * May not be <code>null</code>. + * @return the new <code>EventType</code> object + * @throws IOException if listener cannot be serialized + */ + public synchronized EventType newEventType(SendMonitor monitor) + throws IOException + { + EventType rslt = new EventType( + this, monitor, nextEvID, null, null); + // Do this afterward so we don't use up an ID if + // we can't create the new object + nextEvID++; + return rslt; + } + + /** + * Create a new <code>EventType</code> object specify the + * event id it should have. + * + * @param eventID the event ID of this type + * @param monitor Object to callback when an event sending + * attempt fails with a definite exception and to + * ensure that the lease on the event is still current. + * May not be <code>null</code>. + * @return the new <code>EventType</code> object + * @throws IOException if listener cannot be serialized + */ + public EventType newEventType(SendMonitor monitor, long eventID) + throws IOException + { + return new EventType(this, monitor, eventID, null, null); + } + + /** + * Called by event types during transient state recovery to ensure + * the generator knows about there event ID. + * <p> + * Note: this method is not synchronized. + * @param evID event ID of recovered <code>EventType</code> object + */ + synchronized void recoverEventID(long evID) { + if (evID >= nextEvID) + nextEvID = evID + 1; + } + + /** + * Return the task manager that <code>EventType</code> objects created + * by this generator should use to send their events. + */ + ExecutorService getTaskManager() { + return taskManager; + } + + /** + * Return the wakeup manager that <code>EventType</code> objects created + * by this generator should use to send their events. + */ + WakeupManager getWakeupManager() { + return wakeupManager; + } + + /** + * Terminate any independent treads started by event types + * associated with this generator. + */ + public void terminate() { + taskManager.shutdown(); + wakeupManager.stop(); + wakeupManager.cancelAll(); + } + + /** + * Override <code>readObject</code> to create a <code>TaskManager</code> + * and a <code>WakeupManager</code>. + * @see ObjectInputStream#defaultReadObject + */ + private void readObject(ObjectInputStream in) + throws IOException, ClassNotFoundException + { + // fill in the object from the stream + in.defaultReadObject(); + + taskManager = new ThreadPoolExecutor( + 10, + 10, /* Ignored */ + 15, + TimeUnit.SECONDS, + new LinkedBlockingQueue<Runnable>(), /* Unbounded Queue */ + new NamedThreadFactory("EventTypeGenerator", false) + ); + wakeupManager = + new WakeupManager(new WakeupManager.ThreadDesc(null, false)); + } +}
Modified: river/jtsk/skunk/qa_refactor/trunk/src/com/sun/jini/outrigger/ExpirationOpQueue.java URL: http://svn.apache.org/viewvc/river/jtsk/skunk/qa_refactor/trunk/src/com/sun/jini/outrigger/ExpirationOpQueue.java?rev=1593493&r1=1593492&r2=1593493&view=diff ============================================================================== --- river/jtsk/skunk/qa_refactor/trunk/src/com/sun/jini/outrigger/ExpirationOpQueue.java (original) +++ river/jtsk/skunk/qa_refactor/trunk/src/com/sun/jini/outrigger/ExpirationOpQueue.java Fri May 9 07:03:18 2014 @@ -1,102 +1,100 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.sun.jini.outrigger; - -import java.util.LinkedList; -import java.util.logging.Level; -import java.util.logging.Logger; - -import net.jini.id.Uuid; - -/** - * Logs expiration of leases and asynchronously persists them to disk. - */ -class ExpirationOpQueue extends Thread { - /** <code>true</code> if we should stop */ - private volatile boolean dead; - - /** The queue of expirations to log */ - private final LinkedList queue = new LinkedList(); - - /** The server we are working for */ - private final OutriggerServerImpl server; - - /** Logger for logging exceptions */ - private static final Logger logger = - Logger.getLogger(OutriggerServerImpl.leaseLoggerName); - - /** - * Create a new <code>ExpirationOpQueue</code> that - * will handle lease expiration logging for the - * specified server. - * @param server the <code>OutriggerServerImpl</code> to - * log for. - */ - ExpirationOpQueue(OutriggerServerImpl server) { - super("Expiration Op Queue"); - this.server = server; - } - - /** - * Enqueue the logging of the expiration of the specified lease. - * @param cookie The cookie of the lease that has expired. - */ - synchronized void enqueue(Uuid cookie) { - queue.add(cookie); - notifyAll(); - } - - /** - * Stop the queue - */ - synchronized void terminate() { - dead = true; - notifyAll(); - } - - public void run() { - while (!dead) { // ok not to lock since it starts false - try { - final Uuid cookie; - synchronized (this) { - while (!dead && queue.isEmpty()) { - wait(); - } - - if (dead) - return; - - cookie = (Uuid)queue.removeFirst(); - } - - server.cancelOp(cookie, true); - } catch (Throwable t) { - try { - logger.log(Level.INFO, - "ExpirationOpQueue.run encountered " + - t.getClass().getName() + ", continuing", - t); - } catch (Throwable tt) { - // don't let a problem in logging kill the thread - } - } - } - } -} - - +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.sun.jini.outrigger; + +import java.util.LinkedList; +import java.util.logging.Level; +import java.util.logging.Logger; + +import net.jini.id.Uuid; + +/** + * Logs expiration of leases and asynchronously persists them to disk. + */ +class ExpirationOpQueue extends Thread { + /** The queue of expirations to log */ + private final LinkedList queue = new LinkedList(); + + /** The server we are working for */ + private final OutriggerServerImpl server; + + /** Logger for logging exceptions */ + private static final Logger logger = + Logger.getLogger(OutriggerServerImpl.leaseLoggerName); + + /** + * Create a new <code>ExpirationOpQueue</code> that + * will handle lease expiration logging for the + * specified server. + * @param server the <code>OutriggerServerImpl</code> to + * log for. + */ + ExpirationOpQueue(OutriggerServerImpl server) { + super("Expiration Op Queue"); + this.server = server; + } + + /** + * Enqueue the logging of the expiration of the specified lease. + * @param cookie The cookie of the lease that has expired. + */ + synchronized void enqueue(Uuid cookie) { + queue.add(cookie); + notifyAll(); + } + + /** + * Stop the queue + */ + void terminate() { + interrupt(); + synchronized (this){ + notifyAll(); + } + } + + public void run() { + while (!Thread.currentThread().isInterrupted()) { // ok not to lock since it starts false + try { + final Uuid cookie; + synchronized (this) { + while (queue.isEmpty()) { + wait(); + } + cookie = (Uuid)queue.removeFirst(); + } + + server.cancelOp(cookie, true); + } catch (InterruptedException e){ + Thread.currentThread().interrupt(); // restore + return; + } catch (Throwable t) { + try { + logger.log(Level.INFO, + "ExpirationOpQueue.run encountered " + + t.getClass().getName() + ", continuing", + t); + } catch (Throwable tt) { + // don't let a problem in logging kill the thread + } + } + } + } +} + + Modified: river/jtsk/skunk/qa_refactor/trunk/src/com/sun/jini/outrigger/OutriggerServerImpl.java URL: http://svn.apache.org/viewvc/river/jtsk/skunk/qa_refactor/trunk/src/com/sun/jini/outrigger/OutriggerServerImpl.java?rev=1593493&r1=1593492&r2=1593493&view=diff ============================================================================== --- river/jtsk/skunk/qa_refactor/trunk/src/com/sun/jini/outrigger/OutriggerServerImpl.java (original) +++ river/jtsk/skunk/qa_refactor/trunk/src/com/sun/jini/outrigger/OutriggerServerImpl.java Fri May 9 07:03:18 2014 @@ -939,7 +939,7 @@ public class OutriggerServerImpl // Use this (trivially) in log recovery h.operationJournal = new OperationJournal(h.templates); - h.operationJournal.setDaemon(true); + h.operationJournal.setDaemon(false); if (persistent){ @@ -947,7 +947,7 @@ public class OutriggerServerImpl * preparers for recovered proxies. */ h.expirationOpQueue = new ExpirationOpQueue(this); - h.expirationOpQueue.setDaemon(true); + h.expirationOpQueue.setDaemon(false); h.recoveredTransactionManagerPreparer = (ProxyPreparer)Config.getNonNullEntry(config, COMPONENT_NAME, "recoveredTransactionManagerPreparer", @@ -1006,13 +1006,13 @@ public class OutriggerServerImpl h.templateReaperThread = new TemplateReaper(reapingInterval); h.templateReaperThread.setPriority(reapingPriority); - h.templateReaperThread.setDaemon(true); + h.templateReaperThread.setDaemon(false); h.entryReaperThread = new EntryReaper(reapingInterval); h.entryReaperThread.setPriority(reapingPriority); - h.entryReaperThread.setDaemon(true); + h.entryReaperThread.setDaemon(false); h.contentsQueryReaperThread = new ContentsQueryReaper(reapingInterval); h.contentsQueryReaperThread.setPriority(reapingPriority); - h.contentsQueryReaperThread.setDaemon(true); + h.contentsQueryReaperThread.setDaemon(false); return h; } Modified: river/jtsk/skunk/qa_refactor/trunk/src/com/sun/jini/outrigger/TxnMonitor.java URL: http://svn.apache.org/viewvc/river/jtsk/skunk/qa_refactor/trunk/src/com/sun/jini/outrigger/TxnMonitor.java?rev=1593493&r1=1593492&r2=1593493&view=diff ============================================================================== --- river/jtsk/skunk/qa_refactor/trunk/src/com/sun/jini/outrigger/TxnMonitor.java (original) +++ river/jtsk/skunk/qa_refactor/trunk/src/com/sun/jini/outrigger/TxnMonitor.java Fri May 9 07:03:18 2014 @@ -96,9 +96,6 @@ class TxnMonitor implements Runnable { */ private final Thread ourThread; - /** Set when we are told to stop */ - private volatile boolean die = false; - private volatile boolean started = false; /** Logger for logging transaction related information */ @@ -129,7 +126,7 @@ class TxnMonitor implements Runnable { ); ourThread = new Thread(this, "TxnMonitor"); - ourThread.setDaemon(true); + ourThread.setDaemon(false); } public void start(){ @@ -142,16 +139,15 @@ class TxnMonitor implements Runnable { public void destroy() { taskManager.shutdown(); wakeupMgr.stop(); - + ourThread.interrupt(); synchronized (this) { - die = true; notifyAll(); } try { if (started) ourThread.join(); } catch(InterruptedException ie) { - // ignore + Thread.currentThread().interrupt(); // restore } } @@ -202,16 +198,12 @@ class TxnMonitor implements Runnable { public void run() { try { ToMonitor tm; - for (;;) { + while (!Thread.currentThread().isInterrupted()) { synchronized (this) { - // Sleep if nothing is pending. - while (pending.isEmpty() && !die) { + while (pending.isEmpty()) { wait(); } - - if (die) return; - tm = pending.removeFirst(); } @@ -226,7 +218,7 @@ class TxnMonitor implements Runnable { } } } catch (InterruptedException e) { - return; + Thread.currentThread().interrupt();// restore } }
