Modified: river/jtsk/skunk/qa_refactor/trunk/src/com/sun/jini/norm/LRMEventListener.java URL: http://svn.apache.org/viewvc/river/jtsk/skunk/qa_refactor/trunk/src/com/sun/jini/norm/LRMEventListener.java?rev=1593493&r1=1593492&r2=1593493&view=diff ============================================================================== --- river/jtsk/skunk/qa_refactor/trunk/src/com/sun/jini/norm/LRMEventListener.java (original) +++ river/jtsk/skunk/qa_refactor/trunk/src/com/sun/jini/norm/LRMEventListener.java Fri May 9 07:03:18 2014 @@ -1,168 +1,160 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.sun.jini.norm; - -import com.sun.jini.thread.InterruptedStatusThread; - -import java.util.logging.Level; -import java.util.logging.Logger; - -import net.jini.core.lease.Lease; -import net.jini.core.lease.LeaseDeniedException; -import net.jini.lease.DesiredExpirationListener; -import net.jini.lease.LeaseRenewalEvent; - -/** - * Object that transfers events from the Lease Renewal Manager to the - * rest of Norm Server. Isolates the renewal manager from having to - * block on the snapshot locks. - * - * @author Sun Microsystems, Inc. - */ -class LRMEventListener extends InterruptedStatusThread - implements DesiredExpirationListener -{ - /** Logger for logging messages for this class */ - private static final Logger logger = Logger.getLogger("com.sun.jini.norm"); - - /** Ref to the main server object which has all the top level methods */ - volatile private NormServerBaseImpl server; - - /** - * Queue we use to decouple the reception of events from the lease - * renewal manager from the scheduling of the sending of remote - * events and modifying our internal tables (which both require - * obtaining serious locks). - */ - final private Queue queue = new Queue(); - - /** Any events that hold this object are ignored */ - final static LeaseDeniedException EXPIRED_SET_EXCEPTION = - new LeaseDeniedException("Set Expired"); - - /** - * Simple constructor - * - * @param server Object that will make the actual internal updates and - * schedule the sending of remote events - */ - LRMEventListener(NormServerBaseImpl server) { - super("LRM Event Listener"); - setDaemon(true); - this.server = server; - } - - LRMEventListener() { - super("LRM Event Listener"); - setDaemon(true); - } - - /** - * Set only once after construction. - * @param server - */ - void setServer(NormServerBaseImpl server){ - synchronized (this){ - if (this.server == null) this.server = server; - } - } - - ////////////////////////////////////////////////// - // Methods required by the LeaseListener interface - - // Inherit java doc from super type - public void notify(LeaseRenewalEvent e) { - // Drop if the exception field is == to EXPIRED_SET_EXCEPTION, this - // implies that lease could not be renewed because the wrapper - // has determined that the set has expired. - if (e.getException() == EXPIRED_SET_EXCEPTION) - return; - - // Paranoia, check to make sure that lease is one of wrapped - // client lease...if it's not, ignore the event - final Lease l = e.getLease(); - if (l instanceof ClientLeaseWrapper) { - final ClientLeaseWrapper clw = (ClientLeaseWrapper) l; - queue.enqueue(new Discriminator(clw, true)); - } - } - - ////////////////////////////////////////////////////////////// - // Methods required by the DesiredExpirationListener interface - - // Inherit java doc from super type - public void expirationReached(LeaseRenewalEvent e) { - // Paranoia, check to make sure that lease is one of wrapped - // client lease...if it's not, ignore the event - final Lease l = e.getLease(); - if (l instanceof ClientLeaseWrapper) { - final ClientLeaseWrapper clw = (ClientLeaseWrapper) l; - queue.enqueue(new Discriminator(clw, false)) ; - } - } - - public void run() { - // Loop taking items off the queue and pass them to the server - while (!hasBeenInterrupted()) { - try { - final Discriminator d = (Discriminator) queue.dequeue(); - - if (d.isFailure) { - server.renewalFailure(d.clw); - } else { - server.desiredExpirationReached(d.clw); - } - - } catch (InterruptedException e) { - // Someone wants this thread dead -- just return - return; - } catch (RuntimeException e) { - logger.log(Level.INFO, - "Exception in LRMEventListener Notifier while " + - "processing an event from the LRM -- " + - "attempting to continue", - e); - - } catch (Error e) { - logger.log(Level.INFO, - "Exception in LRMEventListener Notifier while " + - "processing an event from the LRM -- " + - "attempting to continue", - e); - } - } - } - - /** - * Trivial container class to tell us if we are processing the given - * wrapper because of a failure event or a desired expiration - * reached event. - */ - static private class Discriminator { - /** true if this wrapper is associated with a renewal failure event */ - final private boolean isFailure; - - /** The wrapped leases associated with the event */ - final private ClientLeaseWrapper clw; - - private Discriminator(ClientLeaseWrapper clw, boolean isFailure) { - this.isFailure = isFailure; - this.clw = clw; - } - } -} +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.sun.jini.norm; + +import com.sun.jini.thread.InterruptedStatusThread; + +import java.util.logging.Level; +import java.util.logging.Logger; + +import net.jini.core.lease.Lease; +import net.jini.core.lease.LeaseDeniedException; +import net.jini.lease.DesiredExpirationListener; +import net.jini.lease.LeaseRenewalEvent; + +/** + * Object that transfers events from the Lease Renewal Manager to the + * rest of Norm Server. Isolates the renewal manager from having to + * block on the snapshot locks. + * + * @author Sun Microsystems, Inc. + */ +class LRMEventListener extends InterruptedStatusThread + implements DesiredExpirationListener +{ + /** Logger for logging messages for this class */ + private static final Logger logger = Logger.getLogger("com.sun.jini.norm"); + + /** Ref to the main server object which has all the top level methods */ + volatile private NormServerBaseImpl server; + + /** + * Queue we use to decouple the reception of events from the lease + * renewal manager from the scheduling of the sending of remote + * events and modifying our internal tables (which both require + * obtaining serious locks). + */ + final private Queue queue = new Queue(); + + /** Any events that hold this object are ignored */ + final static LeaseDeniedException EXPIRED_SET_EXCEPTION = + new LeaseDeniedException("Set Expired"); + + /** + * Simple constructor + */ + LRMEventListener() { + super("LRM Event Listener"); + setDaemon(false); + } + + /** + * Set only once after construction. + * @param server Object that will make the actual internal updates and + * schedule the sending of remote events + */ + void setServer(NormServerBaseImpl server){ + synchronized (this){ + if (this.server == null) this.server = server; + } + } + + ////////////////////////////////////////////////// + // Methods required by the LeaseListener interface + + // Inherit java doc from super type + public void notify(LeaseRenewalEvent e) { + // Drop if the exception field is == to EXPIRED_SET_EXCEPTION, this + // implies that lease could not be renewed because the wrapper + // has determined that the set has expired. + if (e.getException() == EXPIRED_SET_EXCEPTION) + return; + + // Paranoia, check to make sure that lease is one of wrapped + // client lease...if it's not, ignore the event + final Lease l = e.getLease(); + if (l instanceof ClientLeaseWrapper) { + final ClientLeaseWrapper clw = (ClientLeaseWrapper) l; + queue.enqueue(new Discriminator(clw, true)); + } + } + + ////////////////////////////////////////////////////////////// + // Methods required by the DesiredExpirationListener interface + + // Inherit java doc from super type + public void expirationReached(LeaseRenewalEvent e) { + // Paranoia, check to make sure that lease is one of wrapped + // client lease...if it's not, ignore the event + final Lease l = e.getLease(); + if (l instanceof ClientLeaseWrapper) { + final ClientLeaseWrapper clw = (ClientLeaseWrapper) l; + queue.enqueue(new Discriminator(clw, false)) ; + } + } + + public void run() { + // Loop taking items off the queue and pass them to the server + while (!hasBeenInterrupted()) { + try { + final Discriminator d = (Discriminator) queue.dequeue(); + + if (d.isFailure) { + server.renewalFailure(d.clw); + } else { + server.desiredExpirationReached(d.clw); + } + + } catch (InterruptedException e) { + // Someone wants this thread dead -- just return + return; + } catch (RuntimeException e) { + logger.log(Level.INFO, + "Exception in LRMEventListener Notifier while " + + "processing an event from the LRM -- " + + "attempting to continue", + e); + + } catch (Error e) { + logger.log(Level.INFO, + "Exception in LRMEventListener Notifier while " + + "processing an event from the LRM -- " + + "attempting to continue", + e); + } + } + } + + /** + * Trivial container class to tell us if we are processing the given + * wrapper because of a failure event or a desired expiration + * reached event. + */ + static private class Discriminator { + /** true if this wrapper is associated with a renewal failure event */ + final private boolean isFailure; + + /** The wrapped leases associated with the event */ + final private ClientLeaseWrapper clw; + + private Discriminator(ClientLeaseWrapper clw, boolean isFailure) { + this.isFailure = isFailure; + this.clw = clw; + } + } +}
Modified: river/jtsk/skunk/qa_refactor/trunk/src/com/sun/jini/norm/LeaseExpirationMgr.java URL: http://svn.apache.org/viewvc/river/jtsk/skunk/qa_refactor/trunk/src/com/sun/jini/norm/LeaseExpirationMgr.java?rev=1593493&r1=1593492&r2=1593493&view=diff ============================================================================== --- river/jtsk/skunk/qa_refactor/trunk/src/com/sun/jini/norm/LeaseExpirationMgr.java (original) +++ river/jtsk/skunk/qa_refactor/trunk/src/com/sun/jini/norm/LeaseExpirationMgr.java Fri May 9 07:03:18 2014 @@ -1,369 +1,369 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.sun.jini.norm; - -import java.lang.ref.WeakReference; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; -import java.util.logging.Level; -import java.util.logging.Logger; - -import com.sun.jini.collection.WeakTable; -import com.sun.jini.landlord.LeasedResource; -import com.sun.jini.thread.InterruptedStatusThread; -import com.sun.jini.thread.WakeupManager; - -/** - * Lease manager that aggressively expires leases as their expiration times - * occur. Also schedules and manages expiration warning events. - * <p> - * Note, unlike Mahalo's <code>LeaseExpirationManager</code> (which this - * was seeded from), we make no attempt to make it generic because of - * the need to schedule expiration warning events. - * - * @author Sun Microsystems, Inc. - */ -class LeaseExpirationMgr implements WeakTable.KeyGCHandler { - /** Logger for logging messages for this class */ - static final Logger logger = Logger.getLogger("com.sun.jini.norm"); - - /** - * Map of sets to task tickets. - * <p> - * A Note on Synchronization - * <p> - * Whenever we operate on the <code>ticketMap</code> we hold - * the lock on the key being used. This is necessary because - * expiration and warning sender tasks need to remove tickets from - * the map but at the same time a renewal may be updating the map - * to associate the set with a new ticket. If we don't synchronize - * there is a small window where a task could remove the ticket - * for its replacement. - */ - private final WeakTable ticketMap; - - /** Ref to the main server object has all the top level methods */ - private volatile NormServerBaseImpl server; - - /** Queue of tasks, ordered by time */ - private final WakeupManager runQueue = new WakeupManager(); - - /** Queue of tasks to expire sets */ - private final List expireQueue = new LinkedList(); - - /** Thread to expire sets */ - private final Thread expireThread = new ExpirationThread(); - - /** - * Create a <code>LeaseExpirationMgr</code> to aggressively expire - * the leases of the passed <code>NormServerBaseImpl</code> - */ - LeaseExpirationMgr(NormServerBaseImpl server) { - this.server = server; - ticketMap = new WeakTable(this); //this escape is safe. - } - - LeaseExpirationMgr(){ - ticketMap = new WeakTable(this); //this escape is safe. - } - - /** - * Can be set once only after construction. - * @param server - */ - void setServer(NormServerBaseImpl server){ - synchronized (this){ - if (this.server == null) this.server = server; - } - } - - void start(){ - expireThread.start(); - } - - /** - * Terminate the <code>LeaseExpirationMgr</code>, killing any - * threads it has started - */ - void terminate() { - runQueue.stop(); - runQueue.cancelAll(); - expireThread.interrupt(); - } - - /** - * Notifies the manager of a new lease being created. - * - * @param resource the resource associated with the new lease - */ - void register(LeasedResource resource) { - // Need to synchronize because schedule manipulates - // ticketMap. - synchronized (resource) { - schedule(resource); - } - } - - /** - * Notifies the manager of a lease being renewed. <p> - * - * This method assumes the lock on <code>resource</code> is owned by the - * current thread. - * - * @param resource the set for which tasks have to be rescheduled - */ - void reschedule(LeasedResource resource) { - /* - * Remove the old event. This method is only called - * (indirectly) from NormServerBaseImpl.renew() so we know that - * we own the lock on resource. - */ - WakeupManager.Ticket ticket = - (WakeupManager.Ticket) ticketMap.remove(resource); - if (ticket != null) { - runQueue.cancel(ticket); - } - // Schedule the new event - schedule(resource); - } - - /** - * Schedule a leased resource to be reaped in the future. Called - * when a resource gets a lease, a lease is renewed, and during log - * recovery. - * <p> - * This method assumes the lock on <code>resource</code> is owned by - * the current thread. - */ - void schedule(LeasedResource resource) { - WakeupManager.Ticket ticket; - final LeaseSet set = (LeaseSet) resource; - MgrTask task; - - if (set.haveWarningRegistration()) { - task = new SendWarning(set); - ticket = runQueue.schedule(set.getWarningTime(), task); - } else { - task = new QueueExpiration(set); - ticket = runQueue.schedule(set.getExpiration(), task); - } - - /* - * No window here because the tasks only use the ticket after - * they acquire the lock on their set, but we still own the lock - * on the set. - */ - task.setTicket(ticket); - ticketMap.getOrAdd(set, ticket); - } - - // purposefully inherit doc comment from supertype - // Called when LeaseResource we are tracking is garbage collected - public void keyGC(Object value) { - final WakeupManager.Ticket ticket = (WakeupManager.Ticket) value; - runQueue.cancel(ticket); - } - - /** - * Expires sets queued for expiration. Perform the expiration in a - * separate thread because the operation will block if a snapshot is going - * on. It's OK for an expiration to block other expirations, which need - * not be timely, but using the separate thread avoids blocking renewal - * warnings, which should be timely. - */ - private class ExpirationThread extends InterruptedStatusThread { - - ExpirationThread() { - super("expire lease sets thread"); - setDaemon(true); - } - - public void run() { - while (!hasBeenInterrupted()) { - try { - Runnable task; - synchronized (expireQueue) { - if (expireQueue.isEmpty()) { - expireQueue.wait(); - continue; - } - task = (Runnable) expireQueue.remove(0); - } - task.run(); - } catch (InterruptedException e) { - return; - } catch (Throwable t) { - logger.log(Level.INFO, - "Exception in lease set expiration thread -- " + - "attempting to continue", - t); - } - } - } - } - - /** - * Utility base class for our tasks, mainly provides the the proper - * locking for manipulating the ticketMap. - */ - private abstract class MgrTask implements Runnable { - /** Resource this task is to operate on */ - protected final WeakReference resourceRef; - - /** Ticket for this task */ - private WakeupManager.Ticket ticket; - - /** - * Simple constructor. - * - * @param set the set this task is to operate on - */ - protected MgrTask(LeaseSet set) { - resourceRef = new WeakReference(set); - } - - /** Set the ticket associated with this task. */ - private void setTicket(WakeupManager.Ticket ticket) { - this.ticket = ticket; - } - - /** - * Removes this task's ticket from the ticket map iff this - * task's ticket is in the map. Returns the - * <code>LeaseSet</code> this task is to operate on or - * <code>null</code> if this task should stop. - */ - protected LeaseSet removeOurTicket() { - final LeaseSet set = (LeaseSet) resourceRef.get(); - if (set != null) { - synchronized (set) { - final WakeupManager.Ticket currentTicket = - (WakeupManager.Ticket) ticketMap.get(set); - if (ticket.equals(currentTicket)) { - ticketMap.remove(set); - } else { - /* - * Someone removed us after we were committed to - * run -- we should stop. - */ - return null; - } - } - } - - return set; - } - - // purposefully inherit doc comment from supertype - public abstract void run(); - } - - /** Task that queues a task to expire a lease set. */ - private class QueueExpiration extends MgrTask { - QueueExpiration(LeaseSet set) { - super(set); - } - - public void run() { - LeaseSet set = removeOurTicket(); - if (set != null) { - synchronized (expireQueue) { - expireQueue.add(new Expiration(set)); - expireQueue.notifyAll(); - } - } - } - } - - /** - * Objects that do the actual expiration of the set in question, - * stuck in <code>expireQueue</code>. - */ - private class Expiration implements Runnable { - - private final LeaseSet set; - /** - * Create a <code>Expiration</code> task for the passed resource. - * - * @param set the set this task is to operate on - */ - private Expiration(LeaseSet set) { - this.set = set; - } - - // purposefully inherit doc comment from supertype - public void run() { - server.expireIfTime(set); - /* - * Note we don't care if it's actually time or not, if it - * is not the task will be rescheduled by the renewal. - */ - } - } - - /** - * Objects that do the schedule the warning events, also schedules - * an expiration task. - */ - private class SendWarning extends MgrTask { - /** - * Create a <code>SendWarning</code> task for the passed resource. - * - * @param set the set this task is to operate on - */ - private SendWarning(LeaseSet set) { - super(set); - } - - // purposefully inherit doc comment from supertype - public void run() { - final LeaseSet s = (LeaseSet) resourceRef.get(); - if (s == null) { - // set is gone, no work to do - return; - } - - /* - * By holding this lock we prevent other threads from - * scheduling new tasks for this set...if we have been - * replaced we will return before scheduling a new task, if - * we have not been we will schedule the new task and it can - * be cleanly removed by any renew that is happening at the - * same time. - */ - synchronized (s) { - final LeaseSet set = removeOurTicket(); - if (set == null) { - // set is gone, or our task was replaced, no work to do - return; - } - - // Send event - server.sendWarningEvent(set); - - // Schedule expiration task - final MgrTask task = new QueueExpiration(set); - final WakeupManager.Ticket newTicket = - runQueue.schedule(set.getExpiration(), task); - task.setTicket(newTicket); - ticketMap.getOrAdd(set, newTicket); - } - } - } -} +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.sun.jini.norm; + +import java.lang.ref.WeakReference; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.logging.Level; +import java.util.logging.Logger; + +import com.sun.jini.collection.WeakTable; +import com.sun.jini.landlord.LeasedResource; +import com.sun.jini.thread.InterruptedStatusThread; +import com.sun.jini.thread.WakeupManager; + +/** + * Lease manager that aggressively expires leases as their expiration times + * occur. Also schedules and manages expiration warning events. + * <p> + * Note, unlike Mahalo's <code>LeaseExpirationManager</code> (which this + * was seeded from), we make no attempt to make it generic because of + * the need to schedule expiration warning events. + * + * @author Sun Microsystems, Inc. + */ +class LeaseExpirationMgr implements WeakTable.KeyGCHandler { + /** Logger for logging messages for this class */ + static final Logger logger = Logger.getLogger("com.sun.jini.norm"); + + /** + * Map of sets to task tickets. + * <p> + * A Note on Synchronization + * <p> + * Whenever we operate on the <code>ticketMap</code> we hold + * the lock on the key being used. This is necessary because + * expiration and warning sender tasks need to remove tickets from + * the map but at the same time a renewal may be updating the map + * to associate the set with a new ticket. If we don't synchronize + * there is a small window where a task could remove the ticket + * for its replacement. + */ + private final WeakTable ticketMap; + + /** Ref to the main server object has all the top level methods */ + private volatile NormServerBaseImpl server; + + /** Queue of tasks, ordered by time */ + private final WakeupManager runQueue = new WakeupManager(); + + /** Queue of tasks to expire sets */ + private final List expireQueue = new LinkedList(); + + /** Thread to expire sets */ + private final Thread expireThread = new ExpirationThread(); + + /** + * Create a <code>LeaseExpirationMgr</code> to aggressively expire + * the leases of the passed <code>NormServerBaseImpl</code> + */ + LeaseExpirationMgr(NormServerBaseImpl server) { + this.server = server; + ticketMap = new WeakTable(this); //this escape is safe. + } + + LeaseExpirationMgr(){ + ticketMap = new WeakTable(this); //this escape is safe. + } + + /** + * Can be set once only after construction. + * @param server + */ + void setServer(NormServerBaseImpl server){ + synchronized (this){ + if (this.server == null) this.server = server; + } + } + + void start(){ + expireThread.start(); + } + + /** + * Terminate the <code>LeaseExpirationMgr</code>, killing any + * threads it has started + */ + void terminate() { + runQueue.stop(); + runQueue.cancelAll(); + expireThread.interrupt(); + } + + /** + * Notifies the manager of a new lease being created. + * + * @param resource the resource associated with the new lease + */ + void register(LeasedResource resource) { + // Need to synchronize because schedule manipulates + // ticketMap. + synchronized (resource) { + schedule(resource); + } + } + + /** + * Notifies the manager of a lease being renewed. <p> + * + * This method assumes the lock on <code>resource</code> is owned by the + * current thread. + * + * @param resource the set for which tasks have to be rescheduled + */ + void reschedule(LeasedResource resource) { + /* + * Remove the old event. This method is only called + * (indirectly) from NormServerBaseImpl.renew() so we know that + * we own the lock on resource. + */ + WakeupManager.Ticket ticket = + (WakeupManager.Ticket) ticketMap.remove(resource); + if (ticket != null) { + runQueue.cancel(ticket); + } + // Schedule the new event + schedule(resource); + } + + /** + * Schedule a leased resource to be reaped in the future. Called + * when a resource gets a lease, a lease is renewed, and during log + * recovery. + * <p> + * This method assumes the lock on <code>resource</code> is owned by + * the current thread. + */ + void schedule(LeasedResource resource) { + WakeupManager.Ticket ticket; + final LeaseSet set = (LeaseSet) resource; + MgrTask task; + + if (set.haveWarningRegistration()) { + task = new SendWarning(set); + ticket = runQueue.schedule(set.getWarningTime(), task); + } else { + task = new QueueExpiration(set); + ticket = runQueue.schedule(set.getExpiration(), task); + } + + /* + * No window here because the tasks only use the ticket after + * they acquire the lock on their set, but we still own the lock + * on the set. + */ + task.setTicket(ticket); + ticketMap.getOrAdd(set, ticket); + } + + // purposefully inherit doc comment from supertype + // Called when LeaseResource we are tracking is garbage collected + public void keyGC(Object value) { + final WakeupManager.Ticket ticket = (WakeupManager.Ticket) value; + runQueue.cancel(ticket); + } + + /** + * Expires sets queued for expiration. Perform the expiration in a + * separate thread because the operation will block if a snapshot is going + * on. It's OK for an expiration to block other expirations, which need + * not be timely, but using the separate thread avoids blocking renewal + * warnings, which should be timely. + */ + private class ExpirationThread extends InterruptedStatusThread { + + ExpirationThread() { + super("expire lease sets thread"); + setDaemon(false); + } + + public void run() { + while (!hasBeenInterrupted()) { + try { + Runnable task; + synchronized (expireQueue) { + if (expireQueue.isEmpty()) { + expireQueue.wait(); + continue; + } + task = (Runnable) expireQueue.remove(0); + } + task.run(); + } catch (InterruptedException e) { + return; + } catch (Throwable t) { + logger.log(Level.INFO, + "Exception in lease set expiration thread -- " + + "attempting to continue", + t); + } + } + } + } + + /** + * Utility base class for our tasks, mainly provides the the proper + * locking for manipulating the ticketMap. + */ + private abstract class MgrTask implements Runnable { + /** Resource this task is to operate on */ + protected final WeakReference resourceRef; + + /** Ticket for this task */ + private WakeupManager.Ticket ticket; + + /** + * Simple constructor. + * + * @param set the set this task is to operate on + */ + protected MgrTask(LeaseSet set) { + resourceRef = new WeakReference(set); + } + + /** Set the ticket associated with this task. */ + private void setTicket(WakeupManager.Ticket ticket) { + this.ticket = ticket; + } + + /** + * Removes this task's ticket from the ticket map iff this + * task's ticket is in the map. Returns the + * <code>LeaseSet</code> this task is to operate on or + * <code>null</code> if this task should stop. + */ + protected LeaseSet removeOurTicket() { + final LeaseSet set = (LeaseSet) resourceRef.get(); + if (set != null) { + synchronized (set) { + final WakeupManager.Ticket currentTicket = + (WakeupManager.Ticket) ticketMap.get(set); + if (ticket.equals(currentTicket)) { + ticketMap.remove(set); + } else { + /* + * Someone removed us after we were committed to + * run -- we should stop. + */ + return null; + } + } + } + + return set; + } + + // purposefully inherit doc comment from supertype + public abstract void run(); + } + + /** Task that queues a task to expire a lease set. */ + private class QueueExpiration extends MgrTask { + QueueExpiration(LeaseSet set) { + super(set); + } + + public void run() { + LeaseSet set = removeOurTicket(); + if (set != null) { + synchronized (expireQueue) { + expireQueue.add(new Expiration(set)); + expireQueue.notifyAll(); + } + } + } + } + + /** + * Objects that do the actual expiration of the set in question, + * stuck in <code>expireQueue</code>. + */ + private class Expiration implements Runnable { + + private final LeaseSet set; + /** + * Create a <code>Expiration</code> task for the passed resource. + * + * @param set the set this task is to operate on + */ + private Expiration(LeaseSet set) { + this.set = set; + } + + // purposefully inherit doc comment from supertype + public void run() { + server.expireIfTime(set); + /* + * Note we don't care if it's actually time or not, if it + * is not the task will be rescheduled by the renewal. + */ + } + } + + /** + * Objects that do the schedule the warning events, also schedules + * an expiration task. + */ + private class SendWarning extends MgrTask { + /** + * Create a <code>SendWarning</code> task for the passed resource. + * + * @param set the set this task is to operate on + */ + private SendWarning(LeaseSet set) { + super(set); + } + + // purposefully inherit doc comment from supertype + public void run() { + final LeaseSet s = (LeaseSet) resourceRef.get(); + if (s == null) { + // set is gone, no work to do + return; + } + + /* + * By holding this lock we prevent other threads from + * scheduling new tasks for this set...if we have been + * replaced we will return before scheduling a new task, if + * we have not been we will schedule the new task and it can + * be cleanly removed by any renew that is happening at the + * same time. + */ + synchronized (s) { + final LeaseSet set = removeOurTicket(); + if (set == null) { + // set is gone, or our task was replaced, no work to do + return; + } + + // Send event + server.sendWarningEvent(set); + + // Schedule expiration task + final MgrTask task = new QueueExpiration(set); + final WakeupManager.Ticket newTicket = + runQueue.schedule(set.getExpiration(), task); + task.setTicket(newTicket); + ticketMap.getOrAdd(set, newTicket); + } + } + } +}
