Modified: 
river/jtsk/modules/modularize/apache-river/river-services/outrigger/outrigger-service/src/main/java/org/apache/river/outrigger/Txn.java
URL: 
http://svn.apache.org/viewvc/river/jtsk/modules/modularize/apache-river/river-services/outrigger/outrigger-service/src/main/java/org/apache/river/outrigger/Txn.java?rev=1879521&r1=1879520&r2=1879521&view=diff
==============================================================================
--- 
river/jtsk/modules/modularize/apache-river/river-services/outrigger/outrigger-service/src/main/java/org/apache/river/outrigger/Txn.java
 (original)
+++ 
river/jtsk/modules/modularize/apache-river/river-services/outrigger/outrigger-service/src/main/java/org/apache/river/outrigger/Txn.java
 Sun Jul  5 11:41:39 2020
@@ -1,537 +1,540 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * 
- *      http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.river.outrigger;
-
-import org.apache.river.constants.TxnConstants;
-import org.apache.river.logging.Levels;
-
-import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
-import java.rmi.RemoteException;
-import java.util.List;
-import java.util.Iterator;
-import java.util.Collections;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import net.jini.core.transaction.CannotJoinException;
-import net.jini.core.transaction.server.ServerTransaction;
-import net.jini.core.transaction.server.TransactionConstants;
-import net.jini.core.transaction.server.TransactionManager;
-
-import net.jini.space.InternalSpaceException;
-import net.jini.security.ProxyPreparer;
-
-/**
- * This class represents a space's state in a single transaction.
- *
- * Object of this class represent Jini transactions within outrigger.
- * These transactions hold "Transactables" -- things that represent
- * the actions that have been taken under this transaction. For example, 
- * if this transaction were to be cancelled, the Transactables are
- * examined and serve as the list of things to roll back in order to 
- * restore the state of the Space status quo ante.
- *
- * This is achieved by having the transactables notified of state changes 
- * to this transaction such as preparing, commit, etc. The 
- * transactables themselves are responsible for "doing the right thing."
- *
- * NB: some--but not all--of the actions one can take with a transaction
- * are managed internally by these objects. That is, all of the important
- * methods objects of these types are synchronized. Therefore, two
- * simultaneous calls to abort() and commit() are arbitrated properly. 
- *
- * However, care must be taken when add()ing a transactable. Even
- * though the add() method is synchronized if you check the state of
- * the transaction to ensure that is active and then call add() the
- * transaction could have left the active state between the check and
- * the add() call unless the call obtains the appropriate locks. This 
- * is even more likely if other work needs to be done in addition to
- * calling add() (e.g. persisting state, obtaining locks, etc.). The
- * caller of add() should lock the associated transaction object and
- * ensure that the transaction is still considered ACTIVE, do whatever
- * work is necessary to complete while the transaction is in the ACTIVE
- * state (including calling call add()) and then release the lock.
- * This can be done by :
- * <ul>
- * <li> holding the lock on this object while checking the
- *      state and carrying out the operation (including calling add()), or
- * <li> calling ensureActive() to check the state
- *      and obtain a non-exclusive lock, carrying out the operation
- *      (including calling add()), and then calling allowStateChange() to
- *      release the lock.
- * </ul>
- * The pair of ensureActive() and allowStateChange() allows for more
- * concurrency if the operation is expected to take a long time, in
- * that it will allow for other operations to be performed under the
- * same transaction and let aborts prevent other operations from
- * being started.
- *
- * @author Sun Microsystems, Inc.  
- */
-class Txn implements TransactableMgr, TransactionConstants, 
StorableObject<Txn>, Comparable<Txn> {
-
-    /** The internal id Outrigger as assigned to the transaction */
-    final private long id;
-
-    /** What state we think the transaction is in */
-    private volatile int state;
-
-    /** 
-     * The transaction manager associated with the transaction 
-     * this object is fronting for.
-     */
-    private StorableReference trm;
-
-    /**
-     * Cached <code>ServerTransaction</code> object for
-     * the transaction this object is fronting for.
-     */
-    private ServerTransaction tr;
-    
-    /** The id the transaction manager assigned to this transaction */
-    private volatile long  trId;
-
-    /** 
-     * The list of <code>Transactable</code> participating in 
-     * this transaction.
-     */
-    final private List<Transactable> txnables = new 
java.util.LinkedList<Transactable>();
-
-    /**
-     * The task responsible for monitoring to see if this
-     * transaction has been aborted with us being told, or
-     * null if no such task as been allocated.
-     */
-    private volatile TxnMonitorTask    monitorTask;
-
-    /** Count of number of threads holding a read lock on state */
-    private int stateReaders = 0;
-
-    /** 
-     * <code>true</code> if there is a blocked state change. Used
-     * to give writers priority.
-     */
-    private boolean stateChangeWaiting = false;
-
-    /** Logger for logging transaction related information */
-    private static final Logger logger = 
-       Logger.getLogger(OutriggerServerImpl.txnLoggerName);
-
-    /**
-     * Create a new <code>Txn</code> that represents our state in the
-     * given <code>ServerTransaction</code>.
-     */
-    Txn(ServerTransaction tr, long id) {
-       this(id);
-       trId = tr.id;
-       this.tr = tr;
-       this.trm = new StorableReference(tr.mgr);
-       state = ACTIVE;
-
-       if (logger.isLoggable(Level.FINER)) {
-           logger.log(Level.FINER, "creating txn for transaction mgr:" +
-                   "{0}, id:{1}, state:{2}", 
-               new Object[]{tr, Long.valueOf(trId), 
TxnConstants.getName(state)});
-       }
-    }
-
-    /** Used in recovery */
-    Txn(long id) {
-       this.id = id;           // the txn id is not persisted
-    }
-
-    /**
-     * Get the id for this txn. Note that this id is NOT the same as
-     * the ID of the transaction. Since that ID is not unique (it must
-     * be qualified with the <code>ServerTransaction</code> object) we create
-     * our own internal id to make txns unique. This is needed since we
-     * may not have the <code>Transaction</code> unmarshalled.
-     */
-    Long getId() {
-       return Long.valueOf(id);
-    }
-
-    /**
-     * We keep the transaction ID around because we may need it
-     * to identify a broken transaction after recovery.
-     */
-    long getTransactionId() {
-       return trId;
-    }
-
-    /**
-     * Return our local view of the current state. Need to be holding
-     * the lock on this object or have called <code>ensureActive</code>
-     * to get the current value.
-     */
-    int getState() {
-       return state;
-    }
-
-    /**
-     * Atomically checks that this transaction is in the active
-     * state and locks the transaction in the active state.
-     * The lock can be released by calling <code>allowStateChange</code>.
-     * Each call to this method should be paired with a call to
-     * <code>allowStateChange</code> in a finally block.
-     * @throws CannotJoinException if the transaction
-     * is not active or a state change is pending.  
-     */
-    synchronized void ensureActive() throws CannotJoinException {
-       if (state != ACTIVE || stateChangeWaiting) {
-           final String msg = "transaction mgr:" + tr + ", id:" + trId +
-               " not active, in state " + TxnConstants.getName(state);
-           final CannotJoinException e = new CannotJoinException(msg);
-           logger.log(Levels.FAILED, msg, e);
-           throw e; 
-       }
-       assert stateReaders >= 0;
-       stateReaders++;
-    }
-
-    /**
-     * Release the read lock created by an <code>ensureActive</code>
-     * call. Does nothing if the transaction is not active or there is
-     * a state change pending and thus is safe to call even if the
-     * corresponding <code>ensureActive</code> call threw
-     * <code>CannotJoinException</code>.  
-     */
-    synchronized void allowStateChange() {
-       if (state != ACTIVE || stateChangeWaiting)
-           return;
-       stateReaders--;
-       assert stateReaders >= 0;
-       notifyAll();
-    }
-
-    /**
-     * Prevents new operations from being started under this
-     * transaction and blocks until in process operations are
-     * completed.
-     */
-    synchronized void makeInactive() {
-       stateChangeWaiting = true;
-       assert stateReaders >= 0;
-       while (stateReaders != 0) {
-           try {
-               wait();
-           } catch (InterruptedException e) {
-               throw new AssertionError(e);
-           }
-           assert stateReaders >= 0;
-       }
-    }
-
-    /**
-     * Prepare for transaction commit. <code>makeInactive</code> must have 
-     * been called on this transaction first.
-     */
-    synchronized int prepare(OutriggerServerImpl space) {
-       assert stateChangeWaiting : "prepare called before makeInactive";
-       assert stateReaders == 0 : "prepare called before makeInactive 
completed";
-
-       if (logger.isLoggable(Level.FINER)) {
-           logger.log(Level.FINER, "prepare: transaction mgr:{0}, id:{1}, " +
-               "state:{2}", 
-               new Object[]{tr, Long.valueOf(trId), 
TxnConstants.getName(state)});
-       }
-
-       switch (state) {
-         case ABORTED:                        // previously aborted
-           return ABORTED;
-
-         case COMMITTED:                      // previously committed
-           throw new IllegalStateException(); // "cannot happen"
-
-         case NOTCHANGED:                     // previously voted NOTCHANGED
-         case PREPARED:                       // previously voted PREPARED
-           return state;                      // they are idempotent, and
-                                              // and we have nothing to do
-                                              // so return
-
-         case ACTIVE:                         // currently active
-           boolean changed = false;           // did this txn change
-                                              // anything?
-
-           if (logger.isLoggable(Level.FINEST)) {
-               logger.log(Level.FINEST, "prepare:preparing transaction mgr:" +
-                   "{0}, id:{1}, state:{2}", 
-               new Object[]{tr, Long.valueOf(trId), 
TxnConstants.getName(state)});
-           }
-
-           // loop through Transactable members of this Txn
-           final Iterator<Transactable> i = txnables.iterator();
-           int c=0; // Counter for debugging message
-           while (i.hasNext()) {
-               // get this member's vote
-               final Transactable transactable = i.next();
-               final int prepState =  transactable.prepare(this, space);
-               if (logger.isLoggable(Level.FINEST)) {
-                   logger.log(Level.FINEST, "prepare:prepared " +
-                       "transactable {0} for transaction mgr:{1}, id:{2}," +
-                       " transactable now in state {3}", 
-                       new Object[]{transactable, tr, Long.valueOf(trId), 
-                                    TxnConstants.getName(prepState)});
-               }
-
-               switch (prepState) {
-                 case PREPARED:             // has prepared state
-                   changed = true;          // this means a change
-                   continue;
-
-                 case ABORTED:              // has to abort
-                   abort(space);            // abort this txn (does cleanup)
-                   state = ABORTED;
-                   return state;            // vote aborted
-
-                 case NOTCHANGED:           // no change
-                   i.remove();              // Won't need to call again
-                   continue;
-
-                 default:                   // huh?
-                   throw new
-                       InternalSpaceException("prepare said " + prepState);
-               }
-           }
-
-           if (changed) {
-               state = PREPARED;
-               // have to watch this since it's now holding permanent 
-               // resources
-               space.monitor(Collections.nCopies(1, this));
-           } else {
-               state = NOTCHANGED;
-           }
-           break;
-
-         default:
-           throw new IllegalStateException("unknown Txn state: " + state);
-       }
-
-       return state;
-    }
-
-    /**
-     * Abort the transaction.  This must be callable from
-     * <code>prepare</code> because if a <code>Transactable</code>
-     * votes <code>ABORT</code>, this method is called to make that
-     * happen. <code>makeInactive</code> must have been called on this
-     * transaction first.
-     */
-    synchronized void abort(OutriggerServerImpl space) {
-       assert stateChangeWaiting : "abort called before makeInactive";
-       assert stateReaders == 0 : "abort called before makeInactive completed";
-
-       if (logger.isLoggable(Level.FINER)) {
-           logger.log(Level.FINER, "abort: transaction mgr:{0}, id:{1}, " +
-               "state:{2}", 
-               new Object[]{tr, Long.valueOf(trId), 
TxnConstants.getName(state)});
-       }
-
-       switch (state) {
-         case ABORTED:         // already aborted
-         case NOTCHANGED:      // nothing to abort
-           break;
-
-         case COMMITTED:       // "cannot happen"
-           throw new IllegalStateException("aborting a committed txn");
-
-         case ACTIVE:
-         case PREPARED:
-           if (logger.isLoggable(Level.FINEST)) {
-               logger.log(Level.FINEST, "abort:aborting transaction mgr:" +
-                   "{0}, id:{1}, state:{2}", 
-               new Object[]{tr, Long.valueOf(trId), 
TxnConstants.getName(state)});
-           }
-
-           final Iterator<Transactable> i = txnables.iterator();
-           while (i.hasNext()) {
-                i.next().abort(this, space);
-            }
-           state = ABORTED;
-           cleanup();
-           break;
-
-         default:
-           throw new IllegalStateException("unknown Txn state: " + state);
-       }
-    }
-
-    /**
-     * Having prepared, roll the changes
-     * forward. <code>makeInactive</code> must have been called on
-     * this transaction first.
-     */
-    synchronized void commit(OutriggerServerImpl space) {
-       assert stateChangeWaiting : "commit called before makeInactive";
-       assert stateReaders == 0 : "commit called before makeInactive 
completed";
-
-       //!! Need to involve mgr here
-       if (logger.isLoggable(Level.FINER)) {
-           logger.log(Level.FINER, "commit: transaction mgr:{0}, id:{1}, " +
-               "state:{2}", 
-               new Object[]{tr, Long.valueOf(trId), 
TxnConstants.getName(state)});
-       }
-
-       switch (state) {
-         case ABORTED:         // "cannot happen" stuff
-         case ACTIVE:
-         case NOTCHANGED:
-           throw new IllegalStateException("committing "
-               + TxnConstants.getName(state) + " txn");
-
-         case COMMITTED:       // previous committed, that's okay
-           return;
-
-         case PREPARED:        // voted PREPARED, time to finish up
-           final Iterator<Transactable> i = txnables.iterator();
-           if (logger.isLoggable(Level.FINEST)) {
-               logger.log(Level.FINEST, "commit:committing transaction mgr:" +
-                   "{0}, id:{1}, state:{2}", 
-               new Object[]{tr, Long.valueOf(trId), 
TxnConstants.getName(state)});
-           }
-
-           while (i.hasNext()) {
-                i.next().commit(this, space);
-            }
-           state = COMMITTED;
-           cleanup();
-           return;
-
-         default:
-           throw new IllegalStateException("unknown Txn state: " + state);
-       }
-    }
-
-    /**
-     * Caution: see locking discussion at the class level.
-     */
-    public synchronized Transactable add(Transactable t) {
-       txnables.add(t);
-       return t;
-    }
-
-    // inherit doc comment
-    public ServerTransaction getTransaction(ProxyPreparer preparer)
-       throws IOException, ClassNotFoundException
-    {
-        synchronized (this){
-            if (tr == null) {
-                final TransactionManager mgr = 
-                    (TransactionManager)trm.get(preparer);
-                tr = new ServerTransaction(mgr, trId);
-            }
-            return tr;
-        }
-    }
-
-    /**
-     * Return the manager associated with this transaction.
-     * @return the manager associated with this transaction.
-     * @throws IllegalStateException if this <code>Txn</code>
-     *         is still broken.
-     */
-    synchronized TransactionManager getManager() {
-       if (tr == null)
-           throw new IllegalStateException("Txn is still broken");
-       return tr.mgr;
-    }
-
-    /**
-     * Return the monitor task for this object. Note, this
-     * method is unsynchronized because it (and 
-     * <code>monitorTask(TxnMonitorTask)</code> are both called
-     * from the same thread.
-     */
-    TxnMonitorTask monitorTask() {
-       return monitorTask;
-    }
-
-    /**
-     * Set the monitor task for this object. Note, this method is
-     * unsynchronized because it (and <code>monitorTask()</code> are
-     * both called from the same thread.
-     */
-    void monitorTask(TxnMonitorTask task) {
-       monitorTask = task;
-    }
-
-    /**
-     * Clean up any state when the transaction is finished.
-     */
-    private void cleanup() {
-       if (monitorTask != null)
-           monitorTask.cancel(false);      }
-
-    // -----------------------------------
-    //  Methods required by StorableObject
-    // -----------------------------------
-
-    // inherit doc comment
-    public void store(ObjectOutputStream out) throws IOException {
-       /* There is a bunch of stuff we don't need to write. The
-        * Txn id not stored since it is handed back during
-        * recovery. The content is rebuilt txnables by the various
-        * recoverWrite and recoverTake calls. state is not written
-        * because it is always ACTIVE when we write, and always
-        * needs to be PREPARED when we read it back.
-        */
-        synchronized (this){
-            out.writeObject(trm);
-            out.writeLong(trId);
-        }
-    }
-
-    // inherit doc comment
-    public Txn restore(ObjectInputStream in) 
-       throws IOException, ClassNotFoundException 
-    {
-       /* Only transactions that got prepared and not committed or
-        * aborted get recovered
-        */
-        synchronized (this){
-            state    = PREPARED;
-            trm      = (StorableReference)in.readObject();
-            trId     = in.readLong();
-        }
-        return this;
-    }
-
-    @Override
-    public int compareTo(Txn o) {
-        if (o == null) return -1;
-        if (o.id < id) return -1;
-        if (o.id > id) return 1;
-        return 0;
-    }
-
-    @Override
-    public int hashCode() {
-        int hash = 7;
-        hash = 31 * hash + (int) (this.id ^ (this.id >>> 32));
-        return hash;
-    }
-    
-    public boolean equals(Object o){
-        if (!(o instanceof Txn)) return false;
-        Txn txn = (Txn) o;
-        return id == txn.id;
-    }
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.river.outrigger;
+
+import org.apache.river.constants.TxnConstants;
+import org.apache.river.logging.Levels;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.rmi.RemoteException;
+import java.util.List;
+import java.util.Iterator;
+import java.util.Collections;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import net.jini.core.transaction.CannotJoinException;
+import net.jini.core.transaction.server.ServerTransaction;
+import net.jini.core.transaction.server.TransactionConstants;
+import net.jini.core.transaction.server.TransactionManager;
+
+import net.jini.space.InternalSpaceException;
+import net.jini.security.ProxyPreparer;
+import org.apache.river.outrigger.proxy.StorableObject;
+
+
+
+/**
+ * This class represents a space's state in a single transaction.
+ *
+ * Object of this class represent Jini transactions within outrigger.
+ * These transactions hold "Transactables" -- things that represent
+ * the actions that have been taken under this transaction. For example, 
+ * if this transaction were to be cancelled, the Transactables are
+ * examined and serve as the list of things to roll back in order to 
+ * restore the state of the Space status quo ante.
+ *
+ * This is achieved by having the transactables notified of state changes 
+ * to this transaction such as preparing, commit, etc. The 
+ * transactables themselves are responsible for "doing the right thing."
+ *
+ * NB: some--but not all--of the actions one can take with a transaction
+ * are managed internally by these objects. That is, all of the important
+ * methods objects of these types are synchronized. Therefore, two
+ * simultaneous calls to abort() and commit() are arbitrated properly. 
+ *
+ * However, care must be taken when add()ing a transactable. Even
+ * though the add() method is synchronized if you check the state of
+ * the transaction to ensure that is active and then call add() the
+ * transaction could have left the active state between the check and
+ * the add() call unless the call obtains the appropriate locks. This 
+ * is even more likely if other work needs to be done in addition to
+ * calling add() (e.g. persisting state, obtaining locks, etc.). The
+ * caller of add() should lock the associated transaction object and
+ * ensure that the transaction is still considered ACTIVE, do whatever
+ * work is necessary to complete while the transaction is in the ACTIVE
+ * state (including calling call add()) and then release the lock.
+ * This can be done by :
+ * <ul>
+ * <li> holding the lock on this object while checking the
+ *      state and carrying out the operation (including calling add()), or
+ * <li> calling ensureActive() to check the state
+ *      and obtain a non-exclusive lock, carrying out the operation
+ *      (including calling add()), and then calling allowStateChange() to
+ *      release the lock.
+ * </ul>
+ * The pair of ensureActive() and allowStateChange() allows for more
+ * concurrency if the operation is expected to take a long time, in
+ * that it will allow for other operations to be performed under the
+ * same transaction and let aborts prevent other operations from
+ * being started.
+ *
+ * @author Sun Microsystems, Inc.  
+ */
+class Txn implements TransactableMgr, TransactionConstants, 
StorableObject<Txn>, Comparable<Txn> {
+
+    /** The internal id Outrigger as assigned to the transaction */
+    final private long id;
+
+    /** What state we think the transaction is in */
+    private volatile int state;
+
+    /** 
+     * The transaction manager associated with the transaction 
+     * this object is fronting for.
+     */
+    private StorableReference trm;
+
+    /**
+     * Cached <code>ServerTransaction</code> object for
+     * the transaction this object is fronting for.
+     */
+    private ServerTransaction tr;
+    
+    /** The id the transaction manager assigned to this transaction */
+    private volatile long  trId;
+
+    /** 
+     * The list of <code>Transactable</code> participating in 
+     * this transaction.
+     */
+    final private List<Transactable> txnables = new 
java.util.LinkedList<Transactable>();
+
+    /**
+     * The task responsible for monitoring to see if this
+     * transaction has been aborted with us being told, or
+     * null if no such task as been allocated.
+     */
+    private volatile TxnMonitorTask    monitorTask;
+
+    /** Count of number of threads holding a read lock on state */
+    private int stateReaders = 0;
+
+    /** 
+     * <code>true</code> if there is a blocked state change. Used
+     * to give writers priority.
+     */
+    private boolean stateChangeWaiting = false;
+
+    /** Logger for logging transaction related information */
+    private static final Logger logger = 
+       Logger.getLogger(OutriggerServerImpl.txnLoggerName);
+
+    /**
+     * Create a new <code>Txn</code> that represents our state in the
+     * given <code>ServerTransaction</code>.
+     */
+    Txn(ServerTransaction tr, long id) {
+       this(id);
+       trId = tr.id;
+       this.tr = tr;
+       this.trm = new StorableReference(tr.mgr);
+       state = ACTIVE;
+
+       if (logger.isLoggable(Level.FINER)) {
+           logger.log(Level.FINER, "creating txn for transaction mgr:" +
+                   "{0}, id:{1}, state:{2}", 
+               new Object[]{tr, Long.valueOf(trId), 
TxnConstants.getName(state)});
+       }
+    }
+
+    /** Used in recovery */
+    Txn(long id) {
+       this.id = id;           // the txn id is not persisted
+    }
+
+    /**
+     * Get the id for this txn. Note that this id is NOT the same as
+     * the ID of the transaction. Since that ID is not unique (it must
+     * be qualified with the <code>ServerTransaction</code> object) we create
+     * our own internal id to make txns unique. This is needed since we
+     * may not have the <code>Transaction</code> unmarshalled.
+     */
+    Long getId() {
+       return Long.valueOf(id);
+    }
+
+    /**
+     * We keep the transaction ID around because we may need it
+     * to identify a broken transaction after recovery.
+     */
+    long getTransactionId() {
+       return trId;
+    }
+
+    /**
+     * Return our local view of the current state. Need to be holding
+     * the lock on this object or have called <code>ensureActive</code>
+     * to get the current value.
+     */
+    int getState() {
+       return state;
+    }
+
+    /**
+     * Atomically checks that this transaction is in the active
+     * state and locks the transaction in the active state.
+     * The lock can be released by calling <code>allowStateChange</code>.
+     * Each call to this method should be paired with a call to
+     * <code>allowStateChange</code> in a finally block.
+     * @throws CannotJoinException if the transaction
+     * is not active or a state change is pending.  
+     */
+    synchronized void ensureActive() throws CannotJoinException {
+       if (state != ACTIVE || stateChangeWaiting) {
+           final String msg = "transaction mgr:" + tr + ", id:" + trId +
+               " not active, in state " + TxnConstants.getName(state);
+           final CannotJoinException e = new CannotJoinException(msg);
+           logger.log(Levels.FAILED, msg, e);
+           throw e; 
+       }
+       assert stateReaders >= 0;
+       stateReaders++;
+    }
+
+    /**
+     * Release the read lock created by an <code>ensureActive</code>
+     * call. Does nothing if the transaction is not active or there is
+     * a state change pending and thus is safe to call even if the
+     * corresponding <code>ensureActive</code> call threw
+     * <code>CannotJoinException</code>.  
+     */
+    synchronized void allowStateChange() {
+       if (state != ACTIVE || stateChangeWaiting)
+           return;
+       stateReaders--;
+       assert stateReaders >= 0;
+       notifyAll();
+    }
+
+    /**
+     * Prevents new operations from being started under this
+     * transaction and blocks until in process operations are
+     * completed.
+     */
+    synchronized void makeInactive() {
+       stateChangeWaiting = true;
+       assert stateReaders >= 0;
+       while (stateReaders != 0) {
+           try {
+               wait();
+           } catch (InterruptedException e) {
+               throw new AssertionError(e);
+           }
+           assert stateReaders >= 0;
+       }
+    }
+
+    /**
+     * Prepare for transaction commit. <code>makeInactive</code> must have 
+     * been called on this transaction first.
+     */
+    synchronized int prepare(OutriggerServerImpl space) {
+       assert stateChangeWaiting : "prepare called before makeInactive";
+       assert stateReaders == 0 : "prepare called before makeInactive 
completed";
+
+       if (logger.isLoggable(Level.FINER)) {
+           logger.log(Level.FINER, "prepare: transaction mgr:{0}, id:{1}, " +
+               "state:{2}", 
+               new Object[]{tr, Long.valueOf(trId), 
TxnConstants.getName(state)});
+       }
+
+       switch (state) {
+         case ABORTED:                        // previously aborted
+           return ABORTED;
+
+         case COMMITTED:                      // previously committed
+           throw new IllegalStateException(); // "cannot happen"
+
+         case NOTCHANGED:                     // previously voted NOTCHANGED
+         case PREPARED:                       // previously voted PREPARED
+           return state;                      // they are idempotent, and
+                                              // and we have nothing to do
+                                              // so return
+
+         case ACTIVE:                         // currently active
+           boolean changed = false;           // did this txn change
+                                              // anything?
+
+           if (logger.isLoggable(Level.FINEST)) {
+               logger.log(Level.FINEST, "prepare:preparing transaction mgr:" +
+                   "{0}, id:{1}, state:{2}", 
+               new Object[]{tr, Long.valueOf(trId), 
TxnConstants.getName(state)});
+           }
+
+           // loop through Transactable members of this Txn
+           final Iterator<Transactable> i = txnables.iterator();
+           int c=0; // Counter for debugging message
+           while (i.hasNext()) {
+               // get this member's vote
+               final Transactable transactable = i.next();
+               final int prepState =  transactable.prepare(this, space);
+               if (logger.isLoggable(Level.FINEST)) {
+                   logger.log(Level.FINEST, "prepare:prepared " +
+                       "transactable {0} for transaction mgr:{1}, id:{2}," +
+                       " transactable now in state {3}", 
+                       new Object[]{transactable, tr, Long.valueOf(trId), 
+                                    TxnConstants.getName(prepState)});
+               }
+
+               switch (prepState) {
+                 case PREPARED:             // has prepared state
+                   changed = true;          // this means a change
+                   continue;
+
+                 case ABORTED:              // has to abort
+                   abort(space);            // abort this txn (does cleanup)
+                   state = ABORTED;
+                   return state;            // vote aborted
+
+                 case NOTCHANGED:           // no change
+                   i.remove();              // Won't need to call again
+                   continue;
+
+                 default:                   // huh?
+                   throw new
+                       InternalSpaceException("prepare said " + prepState);
+               }
+           }
+
+           if (changed) {
+               state = PREPARED;
+               // have to watch this since it's now holding permanent 
+               // resources
+               space.monitor(Collections.nCopies(1, this));
+           } else {
+               state = NOTCHANGED;
+           }
+           break;
+
+         default:
+           throw new IllegalStateException("unknown Txn state: " + state);
+       }
+
+       return state;
+    }
+
+    /**
+     * Abort the transaction.  This must be callable from
+     * <code>prepare</code> because if a <code>Transactable</code>
+     * votes <code>ABORT</code>, this method is called to make that
+     * happen. <code>makeInactive</code> must have been called on this
+     * transaction first.
+     */
+    synchronized void abort(OutriggerServerImpl space) {
+       assert stateChangeWaiting : "abort called before makeInactive";
+       assert stateReaders == 0 : "abort called before makeInactive completed";
+
+       if (logger.isLoggable(Level.FINER)) {
+           logger.log(Level.FINER, "abort: transaction mgr:{0}, id:{1}, " +
+               "state:{2}", 
+               new Object[]{tr, Long.valueOf(trId), 
TxnConstants.getName(state)});
+       }
+
+       switch (state) {
+         case ABORTED:         // already aborted
+         case NOTCHANGED:      // nothing to abort
+           break;
+
+         case COMMITTED:       // "cannot happen"
+           throw new IllegalStateException("aborting a committed txn");
+
+         case ACTIVE:
+         case PREPARED:
+           if (logger.isLoggable(Level.FINEST)) {
+               logger.log(Level.FINEST, "abort:aborting transaction mgr:" +
+                   "{0}, id:{1}, state:{2}", 
+               new Object[]{tr, Long.valueOf(trId), 
TxnConstants.getName(state)});
+           }
+
+           final Iterator<Transactable> i = txnables.iterator();
+           while (i.hasNext()) {
+                i.next().abort(this, space);
+            }
+           state = ABORTED;
+           cleanup();
+           break;
+
+         default:
+           throw new IllegalStateException("unknown Txn state: " + state);
+       }
+    }
+
+    /**
+     * Having prepared, roll the changes
+     * forward. <code>makeInactive</code> must have been called on
+     * this transaction first.
+     */
+    synchronized void commit(OutriggerServerImpl space) {
+       assert stateChangeWaiting : "commit called before makeInactive";
+       assert stateReaders == 0 : "commit called before makeInactive 
completed";
+
+       //!! Need to involve mgr here
+       if (logger.isLoggable(Level.FINER)) {
+           logger.log(Level.FINER, "commit: transaction mgr:{0}, id:{1}, " +
+               "state:{2}", 
+               new Object[]{tr, Long.valueOf(trId), 
TxnConstants.getName(state)});
+       }
+
+       switch (state) {
+         case ABORTED:         // "cannot happen" stuff
+         case ACTIVE:
+         case NOTCHANGED:
+           throw new IllegalStateException("committing "
+               + TxnConstants.getName(state) + " txn");
+
+         case COMMITTED:       // previous committed, that's okay
+           return;
+
+         case PREPARED:        // voted PREPARED, time to finish up
+           final Iterator<Transactable> i = txnables.iterator();
+           if (logger.isLoggable(Level.FINEST)) {
+               logger.log(Level.FINEST, "commit:committing transaction mgr:" +
+                   "{0}, id:{1}, state:{2}", 
+               new Object[]{tr, Long.valueOf(trId), 
TxnConstants.getName(state)});
+           }
+
+           while (i.hasNext()) {
+                i.next().commit(this, space);
+            }
+           state = COMMITTED;
+           cleanup();
+           return;
+
+         default:
+           throw new IllegalStateException("unknown Txn state: " + state);
+       }
+    }
+
+    /**
+     * Caution: see locking discussion at the class level.
+     */
+    public synchronized Transactable add(Transactable t) {
+       txnables.add(t);
+       return t;
+    }
+
+    // inherit doc comment
+    public ServerTransaction getTransaction(ProxyPreparer preparer)
+       throws IOException, ClassNotFoundException
+    {
+        synchronized (this){
+            if (tr == null) {
+                final TransactionManager mgr = 
+                    (TransactionManager)trm.get(preparer);
+                tr = new ServerTransaction(mgr, trId);
+            }
+            return tr;
+        }
+    }
+
+    /**
+     * Return the manager associated with this transaction.
+     * @return the manager associated with this transaction.
+     * @throws IllegalStateException if this <code>Txn</code>
+     *         is still broken.
+     */
+    synchronized TransactionManager getManager() {
+       if (tr == null)
+           throw new IllegalStateException("Txn is still broken");
+       return tr.mgr;
+    }
+
+    /**
+     * Return the monitor task for this object. Note, this
+     * method is unsynchronized because it (and 
+     * <code>monitorTask(TxnMonitorTask)</code> are both called
+     * from the same thread.
+     */
+    TxnMonitorTask monitorTask() {
+       return monitorTask;
+    }
+
+    /**
+     * Set the monitor task for this object. Note, this method is
+     * unsynchronized because it (and <code>monitorTask()</code> are
+     * both called from the same thread.
+     */
+    void monitorTask(TxnMonitorTask task) {
+       monitorTask = task;
+    }
+
+    /**
+     * Clean up any state when the transaction is finished.
+     */
+    private void cleanup() {
+       if (monitorTask != null)
+           monitorTask.cancel(false);      }
+
+    // -----------------------------------
+    //  Methods required by StorableObject
+    // -----------------------------------
+
+    // inherit doc comment
+    public void store(ObjectOutputStream out) throws IOException {
+       /* There is a bunch of stuff we don't need to write. The
+        * Txn id not stored since it is handed back during
+        * recovery. The content is rebuilt txnables by the various
+        * recoverWrite and recoverTake calls. state is not written
+        * because it is always ACTIVE when we write, and always
+        * needs to be PREPARED when we read it back.
+        */
+        synchronized (this){
+            out.writeObject(trm);
+            out.writeLong(trId);
+        }
+    }
+
+    // inherit doc comment
+    public Txn restore(ObjectInputStream in) 
+       throws IOException, ClassNotFoundException 
+    {
+       /* Only transactions that got prepared and not committed or
+        * aborted get recovered
+        */
+        synchronized (this){
+            state    = PREPARED;
+            trm      = (StorableReference)in.readObject();
+            trId     = in.readLong();
+        }
+        return this;
+    }
+
+    @Override
+    public int compareTo(Txn o) {
+        if (o == null) return -1;
+        if (o.id < id) return -1;
+        if (o.id > id) return 1;
+        return 0;
+    }
+
+    @Override
+    public int hashCode() {
+        int hash = 7;
+        hash = 31 * hash + (int) (this.id ^ (this.id >>> 32));
+        return hash;
+    }
+    
+    public boolean equals(Object o){
+        if (!(o instanceof Txn)) return false;
+        Txn txn = (Txn) o;
+        return id == txn.id;
+    }
+}

Modified: 
river/jtsk/modules/modularize/apache-river/river-services/outrigger/outrigger-service/src/main/java/org/apache/river/outrigger/TxnMonitor.java
URL: 
http://svn.apache.org/viewvc/river/jtsk/modules/modularize/apache-river/river-services/outrigger/outrigger-service/src/main/java/org/apache/river/outrigger/TxnMonitor.java?rev=1879521&r1=1879520&r2=1879521&view=diff
==============================================================================
--- 
river/jtsk/modules/modularize/apache-river/river-services/outrigger/outrigger-service/src/main/java/org/apache/river/outrigger/TxnMonitor.java
 (original)
+++ 
river/jtsk/modules/modularize/apache-river/river-services/outrigger/outrigger-service/src/main/java/org/apache/river/outrigger/TxnMonitor.java
 Sun Jul  5 11:41:39 2020
@@ -18,7 +18,7 @@
 package org.apache.river.outrigger;
 
 import org.apache.river.config.Config;
-import org.apache.river.thread.WakeupManager;
+import org.apache.river.thread.wakeup.WakeupManager;
 
 import net.jini.config.Configuration;
 import net.jini.config.ConfigurationException;

Modified: 
river/jtsk/modules/modularize/apache-river/river-services/outrigger/outrigger-service/src/main/java/org/apache/river/outrigger/TxnMonitorTask.java
URL: 
http://svn.apache.org/viewvc/river/jtsk/modules/modularize/apache-river/river-services/outrigger/outrigger-service/src/main/java/org/apache/river/outrigger/TxnMonitorTask.java?rev=1879521&r1=1879520&r2=1879521&view=diff
==============================================================================
--- 
river/jtsk/modules/modularize/apache-river/river-services/outrigger/outrigger-service/src/main/java/org/apache/river/outrigger/TxnMonitorTask.java
 (original)
+++ 
river/jtsk/modules/modularize/apache-river/river-services/outrigger/outrigger-service/src/main/java/org/apache/river/outrigger/TxnMonitorTask.java
 Sun Jul  5 11:41:39 2020
@@ -1,584 +1,584 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * 
- *      http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.river.outrigger;
-
-import org.apache.river.constants.TxnConstants;
-import org.apache.river.constants.ThrowableConstants;
-import org.apache.river.logging.Levels;
-import org.apache.river.thread.RetryTask;
-import org.apache.river.thread.WakeupManager;
-
-import java.io.IOException;
-import java.rmi.RemoteException;
-import java.rmi.UnmarshalException;
-import java.rmi.NoSuchObjectException;
-import java.util.Map;
-import java.util.WeakHashMap;
-import java.util.Iterator;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import net.jini.core.transaction.TransactionException;
-import net.jini.core.transaction.UnknownTransactionException;
-import net.jini.core.transaction.server.ServerTransaction;
-import net.jini.core.transaction.server.TransactionConstants;
-
-/**
- * A task that will try to validate the state of a transaction.  This
- * uses weak references a good deal to let the other parts of the system
- * be GC'ed as necessary.
- * <p>
- * The retry mechanism is subtle, so bear with me.  The purpose is
- * to ensure that if any activity is being blocked by a given
- * transaction, that transaction will be tested at some point in
- * the future (if necessary, i.e., if it still is thought to be
- * active).  We assume it to be rare that a transactions that the
- * space thinks is active is, in fact, aborted, so the algorithm is
- * designed to guarantee the detection without a lot of overhead,
- * specifically without a lot of RMI calls.
- * <p>
- * Each task has three values: a <code>nextQuery</code> time, a
- * <code>mustQuery</code> boolean that force the next query to be
- * made, and <code>deltaT</code>, the time at which the following
- * query will be scheduled.  When the task is awakened at its
- * <code>nextQuery</code> time, it checks to see if it must make an
- * actual query to the transaction manager, which it will do if either
- * <code>mustQuery</code> is <code>true</code>, or if we know about
- * any in progress queries on the space that are blocked on the
- * transaction.  Whether or not an actual query is made,
- * <code>deltaT</code> is added to <code>nextQuery</code> to get the
- * <code>nextQuery</code> time, <code>deltaT</code> is doubled, and
- * <code>mustQuery</code> boolean is set to <code>false</code>.
- * <p>
- * There are two kinds of requests that a with which transaction
- * can cause a conflict -- those with long timeouts (such as
- * blocking reads and takes) and those that are under short timeouts
- * (such as reads and takes with zero-length timeouts).  We will
- * treat them separately at several points of the algorithm.  A
- * short timeout is any query whose expiration time is sooner than
- * the <code>nextQuery</code> time.  Any other timeout is long
- * If a short query arrives, <code>mustQuery</code> is set to 
- * <code>true</code>.
- * <p>
- * The result is that any time a transaction causes a conflict, if
- * the query on the space has not ended by the time of the
- * <code>nextQuery</code> we will attempt to poll the transaction manager.  
- * There will also poll the transaction manager if any conflict occurred
- * on a query on the space with a short timeout.
- * <p>
- * The first time a transaction causes a conflict, we schedule a
- * time in the future at which we will poll its status.  We do not
- * poll right away because often a transaction will complete on
- * its own before we get to that time, making the check
- * unnecessary.  An instant poll is, therefore, unnecessarily
- * aggressive, since giving an initial grace time will usually mean
- * no poll is made at all.  So if the first conflict occurs at
- * <i>T</i><sub>0</sub>, the <code>nextQuery</code> value will be
- * <i>T</i><sub>0</sub><code>+INITIAL_GRACE</code>, the boolean
- * will be <code>true</code> to force that poll to happen, and
- * <code>deltaT</code> will be set to <code>INITIAL_GRACE</code>.
- *
- * @author Sun Microsystems, Inc.
- *
- * @see TxnMonitor 
- */
-class TxnMonitorTask extends RetryTask
-    implements TransactionConstants, org.apache.river.constants.TimeConstants
-{
-    /** transaction being monitored */
-    private final Txn  txn;
-
-    /** the monitor we were made by */
-    private final TxnMonitor monitor; 
-
-    /**
-     * All the queries on the space (not queries to the transaction
-     * manager) waiting for <code>txn</code> to be resolved.
-     * <code>null</code> until we have at least one. Represented by
-     * <code>QueryWatcher</code> objects.  
-     */
-    private Map<QueryWatcher,Collection<Txn>>          queries; //Sync on this.
-
-    /** count of RemoteExceptions */
-    private final AtomicInteger                failCnt;
-
-    /** 
-     * The next time we need to poll the transaction manager 
-     * to get <code>txn</code>'s actual state.
-     */
-    private final AtomicLong   nextQuery;
-
-    /**
-     * When we're given an opportunity to poll the transaction manager
-     * for the <code>txn</code>'s state, do so.
-     */
-    private volatile boolean   mustQuery;
-
-    /** next value added to <code>nextQuery</code> */
-    private volatile long      deltaT;
-
-    /**
-     * The initial grace period before the first query.
-     */
-    private static final long  INITIAL_GRACE = 15 * SECONDS;
-
-    /**
-     * The retry time when we have an encountered an exception
-     */
-    private static final long  BETWEEN_EXCEPTIONS = 15 * SECONDS;
-
-    /**
-     * The largest value that <code>deltaT</code> will reach.
-     */
-    private static final long  MAX_DELTA_T = 1 * HOURS;
-
-    /**
-     * The maximum number of failures allowed in a row before we simply
-     * give up on the transaction and consider it aborted.
-     */
-    private static final int   MAX_FAILURES = 3;
-
-    /** Logger for logging transaction related information */
-    private static final Logger logger = 
-       Logger.getLogger(OutriggerServerImpl.txnLoggerName);
-
-    /**
-     * Create a new TxnMonitorTask.
-     */
-    TxnMonitorTask(Txn txn, TxnMonitor monitor,
-                  ExecutorService manager, WakeupManager wakeupMgr) {
-       super(manager, wakeupMgr);
-       this.txn = txn;
-       this.monitor = monitor;
-       nextQuery = new AtomicLong(startTime());        // retryTime will add 
INITIAL_GRACE
-       deltaT = INITIAL_GRACE;
-       mustQuery = true;
-        failCnt = new AtomicInteger();
-    }
-
-    /**
-     * Return the time of the next query, bumping <code>deltaT</code> as
-     * necessary for the next iteration.  If the transaction has voted
-     * <code>PREPARED</code> or the manager has been giving us a
-     * <code>RemoteException</code>, we should retry on short times;
-     * otherwise we back off quickly.
-     */
-    public long retryTime() {
-        boolean noFailures = false;
-        synchronized (txn){
-            noFailures = (failCnt.get() == 0 && txn.getState() != PREPARED);
-        }
-            if (noFailures) {      // no failures
-                if (logger.isLoggable(Level.FINEST)) {
-                    logger.log(Level.FINEST, "{0} retryTime adds {1}", 
-                               new Object[]{this, Long.valueOf(deltaT)});
-                }
-
-                nextQuery.addAndGet(deltaT);
-                synchronized (this){
-                    if (deltaT < MAX_DELTA_T)
-                        deltaT = Math.min(deltaT * 2, MAX_DELTA_T);
-                }
-            } else {
-                if (logger.isLoggable(Level.FINEST)) {
-                    logger.log(Level.FINEST, "{0} retryTime adds {1} (for 
{2})", 
-                               new Object[]{this, 
Long.valueOf(BETWEEN_EXCEPTIONS), 
-                                   (failCnt.get() != 0 ? "failure" : 
"PREPARED")});
-                }
-                nextQuery.addAndGet(BETWEEN_EXCEPTIONS);
-            }
-        
-       return nextQuery.get();
-    }
-
-    /**
-     * Add a ``sibling'' transaction, one that is now blocking progress
-     * on one of the same entries.  For example, if a client is blocked
-     * on a <code>read</code>, another transaction can read the same
-     * entry, thereby also blocking that same client.  This means that
-     * the transaction for the second <code>read</code> must be
-     * watched, too.  The list of queries for the second transaction
-     * might be less that the list of those in this transaction, but
-     * the process of figuring out the subset is too expensive, since
-     * we have tried to make the checking process itself cheap,
-     * anyway.  So we add all queries this task is currently monitoring
-     * to the task monitoring the second transaction.  If there are
-     * no queries, then the blocking occurred because of a short query
-     * or all the queries have expired, in which case the second transaction
-     * isn't blocking the way of anything currently, so this method does
-     * nothing.
-     * <p>
-     * Of course, in order to avoid blocking the thread that is calling
-     * this (which is trying to perform a <code>read</code>, after
-     * all), we simply add each lease in this task to the monitor's
-     * queue.
-     *
-     */
-    // @see TxnEntryHandle#monitor
-    //!! Would it be worth the overhead to make TxnEntryHandle.monitor
-    //!! search for the transaction with the smallest set of leases?  -arnold
-    synchronized void addSibling(Txn txn) {
-       if (queries == null || queries.size() == 0)
-           return;
-       Collection<Txn> sibling = Collections.nCopies(1, txn);
-       Iterator<QueryWatcher> it = queries.keySet().iterator();
-       while (it.hasNext()) {
-           QueryWatcher query = it.next();
-           if (query != null)  // from a weak map, so might be null
-               monitor.add(query, sibling);
-       }
-    }
-
-    /**
-     * Try to see if this transaction should be aborted.  This returns
-     * <code>true</code> (don't repeat the task) if it knows that the
-     * transaction is no longer interesting to anyone.
-     */
-    public boolean tryOnce() {
-       if (logger.isLoggable(Level.FINEST)) {
-           logger.log(Level.FINEST, "{0} attempt {1} mustQuery:{2}", 
-               new Object[]{this, Integer.valueOf(attempt()), 
-                            Boolean.valueOf(mustQuery) });
-       }
-
-       /*
-        * The first time we do nothing, since RetryTask invokes run first,
-        * but we want to wait a bit before testing the transaction.
-        */
-       if (attempt() == 0)
-           return false;
-        int txnState;
-        synchronized (txn){
-            txnState = txn.getState();
-        }
-       if (logger.isLoggable(Level.FINEST)) {
-           logger.log(Level.FINEST, "{0} txn.getState() = {1}", 
-               new Object[]{this, Integer.valueOf(txnState)});
-       }
-
-       // not active or prepared == no longer blocking
-       
-       if (txnState != ACTIVE && txnState != PREPARED)
-           return true;
-
-       // if we're prepared, test every time -- this shouldn't take long
-       mustQuery |= (txnState == PREPARED);
-
-       /*
-        * Go through the resources to see if we can find one still active
-        * that cares.  Must be synchronized since we test, then clear --
-        * another thread that set the flag between the test and clear
-        * would have its requirements lost.
-        */
-       synchronized (this) {
-           if (!mustQuery) {           // then try resources
-               if (queries == null)    // no resources, so nobody wants it
-                   return false;       // try again next time
-
-               Iterator<QueryWatcher> it = queries.keySet().iterator();
-               boolean foundNeed = false;
-
-               if (logger.isLoggable(Level.FINEST)) {
-                   logger.log(Level.FINEST, "{0} nextQuery {1}", 
-                              new Object[]{this, nextQuery});
-               }
-
-               while (it.hasNext()) {
-                   QueryWatcher query = it.next();
-                   if (query == null)     // gone -- the map will reap it
-                       continue;
-                   if (logger.isLoggable(Level.FINEST)) {
-                       logger.log(Level.FINEST, 
-                                  "{0} query.getExpiration() {1}", 
-                                  new Object[]{this, 
-                                      Long.valueOf(query.getExpiration())});
-                   }
-
-                   if (query.getExpiration() < nextQuery.get() || 
query.isResolved()) {
-                        it.remove();
-                    }  // expired, so we don't care about it
-                   else {
-                       foundNeed = true;
-                       break;
-                   }
-               }
-
-               if (logger.isLoggable(Level.FINEST)) {
-                   logger.log(Level.FINEST, "{0} foundNeed = {1}", 
-                              new Object[]{this, Boolean.valueOf(foundNeed)});
-               }
-
-               if (!foundNeed)         // nobody wants it
-                   return false;       // try again next time
-           }
-           mustQuery = false;          // clear it for next time
-       }
-
-       /*
-        * Now we know (a) the transaction itself is alive, and (b) some
-        * lease still cares.  Make sure it's still active as far as the
-        * it knows, and if it is, then ask the manager about it.
-        */
-       ServerTransaction tr;
-       try {
-           /* This may fix a broken Txn, if it does it won't get moved
-            * from the broken to the unbroken list. It will get
-            * moved eventually, but it does seem unfortunate it does
-            * not happen immediately
-            */
-           tr = txn.getTransaction(
-               monitor.space().getRecoveredTransactionManagerPreparer());
-       } catch (RemoteException e) {
-           final int cat = ThrowableConstants.retryable(e);
-
-           if (cat == ThrowableConstants.BAD_INVOCATION ||
-               cat == ThrowableConstants.BAD_OBJECT)
-           {
-               // Not likely to get better, give up
-               logUnpackingFailure("definite exception", Level.INFO,
-                                   true, e);                               
-               return true;
-           } else if (cat == ThrowableConstants.INDEFINITE) {
-               // try, try, again
-               logUnpackingFailure("indefinite exception", Levels.FAILED,
-                                   false, e);                              
-               mustQuery = true;
-               return false;
-           } else if (cat == ThrowableConstants.UNCATEGORIZED) {
-               // Same as above but log differently.
-               mustQuery = true;
-               logUnpackingFailure("uncategorized exception", Level.INFO,
-                                   false, e);                              
-               return false;
-           } else {
-               logger.log(Level.WARNING, "ThrowableConstants.retryable " +
-                          "returned out of range value, " + cat,
-                          new AssertionError(e));
-               return false;
-           }
-       } catch (IOException e) {
-           // Not likely to get better
-           logUnpackingFailure("IOException", Level.INFO, true, e);
-           return true;
-       } catch (RuntimeException e) {
-           // Not likely to get better
-           logUnpackingFailure("RuntimeException", Level.INFO, true, e);
-           return true;
-       } catch (ClassNotFoundException e) {
-           // codebase probably down, keep trying
-           logUnpackingFailure("ClassNotFoundException", Levels.FAILED, 
-                               false, e);
-           mustQuery = true;
-           return false;
-       }
-
-       if (logger.isLoggable(Level.FINEST))
-           logger.log(Level.FINEST, "{0} tr = {1}", new Object[]{this, tr});
-
-       int trState;
-       try {
-           trState = tr.getState();
-       } catch (TransactionException e) {
-           if (logger.isLoggable(Level.INFO))
-               logger.log(Level.INFO, "Got TransactionException when " +
-                   "calling getState on " + tr + ", dropping transaction " +
-                   tr.id, e);
-           trState = ABORTED;
-       } catch (NoSuchObjectException e) {
-           /* It would be epsilon better to to give up immediately
-            * if we get a NoSuchObjectException and we are in the
-            * active state, however, the code to do this would
-            * be very complicated since we need to hold a lock to
-            * while reading and acting on the state.
-            */
-           if (failCnt.incrementAndGet() >= MAX_FAILURES) {
-               if (logger.isLoggable(Level.INFO)) {
-                   logger.log(Level.INFO, "Got NoSuchObjectException when " +
-                       "calling getState on " + tr + ", this was the " +
-                       failCnt + " RemoteException, dropping transaction" +
-                       tr.id, e);
-               }
-               trState = ABORTED;
-           } else {
-               if (logger.isLoggable(Levels.FAILED)) {
-                   logger.log(Levels.FAILED, "Got NoSuchObjectException " +
-                       "when calling getState on " + tr + ", failCount = " +
-                       failCnt + ", will retry", e);
-               }
-               mustQuery = true;      // keep on trying
-               return false;          // try again next time
-           }
-       } catch (RemoteException e) {
-           if (failCnt.incrementAndGet() >= MAX_FAILURES) {
-               /* abort if we are not prepared and not already 
-                * aborted. If prepared retry, otherwise
-                * we're done. Check state and make any abort() call
-                * atomically so we can't accidently abort a prepared
-                * transaction.
-                */
-               synchronized (txn) {
-                   switch (txn.getState()) {
-                     case ACTIVE:
-                       // Safe to abort, give up
-                       if (logger.isLoggable(Level.INFO)) {
-                           logger.log(Level.INFO, "Got RemoteException " +
-                               "when calling getState on " + tr + ", this " +
-                                "was " + failCnt + " RemoteException, " +
-                               "dropping active transaction " + tr.id, e);
-                       }
-
-                       try {
-                           monitor.space().abort(tr.mgr, tr.id);
-                           return true;
-                       } catch (UnknownTransactionException ute) {
-                           throw new AssertionError(ute);
-                       } catch (UnmarshalException ume) {
-                           throw new AssertionError(ume);
-                       }
-                     case PREPARED:
-                       final Level l = (failCnt.get()%MAX_FAILURES == 0)?
-                           Level.INFO:Levels.FAILED;
-                       if (logger.isLoggable(l)) {
-                           logger.log(l, "Got RemoteException when calling " +
-                               "getState on " + tr + ", this was " + 
-                               failCnt + " RemoteException, will keep " +
-                               "prepared transaction " + tr.id, e);
-                       }
-
-                       // Can't give up, keep on trying to find real state
-                       mustQuery = true;
-                       return false;
-                     case ABORTED:
-                     case COMMITTED:
-                       // done
-                       return true;
-                     default:
-                       throw new AssertionError("Txn in unreachable state");
-                   }
-               }
-           } else {
-               // Don't know, but not ready to give up
-               if (logger.isLoggable(Levels.FAILED)) {
-                   logger.log(Levels.FAILED, "Got RemoteException when " +
-                       "calling getState on " + tr + ", failCount = " +
-                       failCnt + ", will retry", e);
-               }
-
-               mustQuery = true;      // keep on trying
-               return false;          // try again next time
-           }
-       }
-    
-       if (logger.isLoggable(Level.FINER)) {
-           logger.log(Level.FINER, "{0} trState = {1}", 
-                      new Object[]{this, Integer.valueOf(trState)});
-       }
-
-       failCnt.set(0);                // reset failures -- we got a response
-
-       /*
-        * If the two states aren't the same, the state changed and we
-        * need to account for that locally here by calling the method
-        * that would make the change (the one we should have gotten.
-        * (We use the external forms of abort, commit, etc., because
-        * they are what the manager would call, and therefore these
-        * calls will always do exactly what the incoming manager
-        * calls would have done.  I don't want this to be fragile by
-        * bypassing those calls and going straight to the Txn
-        * object's calls, which might skip something important in the
-        * OutriggerServerImpl calls).
-        */
-
-       if (trState != txnState) {
-           if (logger.isLoggable(Level.FINER)) {
-               logger.log(Level.FINER, 
-                   "{0} mgr state[{1}] != local state [{2}]", 
-                   new Object[]{this,
-                                TxnConstants.getName(trState),
-                                TxnConstants.getName(txnState)});
-           }
-
-           try {
-               switch (trState) {
-                 case ABORTED:
-                   logger.log(Level.FINER, "{0} moving to abort", this);
-                         
-                   monitor.space().abort(tr.mgr, tr.id);
-                   return true;
-
-                 case COMMITTED:
-                   logger.log(Level.FINER, "{0} moving to commit", this);
-
-                   monitor.space().commit(tr.mgr, tr.id);
-                   return true;
-               }
-           } catch (UnknownTransactionException e) {
-               // we must somehow have already gotten the abort() or
-               // commit(), and have therefore forgotten about the
-               // transaction, while this code was executing
-               return true;
-           } catch (UnmarshalException ume) {
-               throw new AssertionError(ume);
-           }
-
-           // we can't fake anything else -- the manager will have to call
-           // us
-       }
-
-       logger.log(Level.FINEST, "{0} return false", this);
-
-       return false;                   // now we know so nothing more to do
-    }
-
-    /**
-     * Add in a resource.  The lease may already be in, in which case it is
-     * ignored, or it may be null, in which case it was a non-leased probe
-     * that was blocked and we simply set <code>mustQuery</code> to
-     * <code>true</code>.
-     */
-    synchronized void add(QueryWatcher query) {
-       if (query == null || query.getExpiration() <= nextQuery.get()) {
-           if (logger.isLoggable(Level.FINEST))
-               logger.log(Level.FINEST, "adding resource to task -- SHORT");
-           mustQuery = true;
-       } else {
-           if (logger.isLoggable(Level.FINEST))
-               logger.log(Level.FINEST, "adding resource to task -- LONG");
-           if (queries == null)
-               queries = new WeakHashMap<QueryWatcher,Collection<Txn>>();// we 
use it like a WeakHashSet
-           queries.put(query, null);
-       }
-    }
-
-    /** Log failed unpacking attempt attempt */
-    private void logUnpackingFailure(String exceptionDescription, Level level,
-                                    boolean terminal, Throwable t) 
-    {
-       if (logger.isLoggable(level)) {
-           logger.log(level, "Encountered " + exceptionDescription +
-               "while unpacking exception to check state, " +
-               (terminal?"dropping":"keeping") +  " monitoring task", t);
-       }
-    }
-
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.river.outrigger;
+
+import org.apache.river.constants.TxnConstants;
+import org.apache.river.constants.ThrowableConstants;
+import org.apache.river.logging.Levels;
+import org.apache.river.thread.wakeup.RetryTask;
+import org.apache.river.thread.wakeup.WakeupManager;
+
+import java.io.IOException;
+import java.rmi.RemoteException;
+import java.rmi.UnmarshalException;
+import java.rmi.NoSuchObjectException;
+import java.util.Map;
+import java.util.WeakHashMap;
+import java.util.Iterator;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import net.jini.core.transaction.TransactionException;
+import net.jini.core.transaction.UnknownTransactionException;
+import net.jini.core.transaction.server.ServerTransaction;
+import net.jini.core.transaction.server.TransactionConstants;
+
+/**
+ * A task that will try to validate the state of a transaction.  This
+ * uses weak references a good deal to let the other parts of the system
+ * be GC'ed as necessary.
+ * <p>
+ * The retry mechanism is subtle, so bear with me.  The purpose is
+ * to ensure that if any activity is being blocked by a given
+ * transaction, that transaction will be tested at some point in
+ * the future (if necessary, i.e., if it still is thought to be
+ * active).  We assume it to be rare that a transactions that the
+ * space thinks is active is, in fact, aborted, so the algorithm is
+ * designed to guarantee the detection without a lot of overhead,
+ * specifically without a lot of RMI calls.
+ * <p>
+ * Each task has three values: a <code>nextQuery</code> time, a
+ * <code>mustQuery</code> boolean that force the next query to be
+ * made, and <code>deltaT</code>, the time at which the following
+ * query will be scheduled.  When the task is awakened at its
+ * <code>nextQuery</code> time, it checks to see if it must make an
+ * actual query to the transaction manager, which it will do if either
+ * <code>mustQuery</code> is <code>true</code>, or if we know about
+ * any in progress queries on the space that are blocked on the
+ * transaction.  Whether or not an actual query is made,
+ * <code>deltaT</code> is added to <code>nextQuery</code> to get the
+ * <code>nextQuery</code> time, <code>deltaT</code> is doubled, and
+ * <code>mustQuery</code> boolean is set to <code>false</code>.
+ * <p>
+ * There are two kinds of requests that a with which transaction
+ * can cause a conflict -- those with long timeouts (such as
+ * blocking reads and takes) and those that are under short timeouts
+ * (such as reads and takes with zero-length timeouts).  We will
+ * treat them separately at several points of the algorithm.  A
+ * short timeout is any query whose expiration time is sooner than
+ * the <code>nextQuery</code> time.  Any other timeout is long
+ * If a short query arrives, <code>mustQuery</code> is set to 
+ * <code>true</code>.
+ * <p>
+ * The result is that any time a transaction causes a conflict, if
+ * the query on the space has not ended by the time of the
+ * <code>nextQuery</code> we will attempt to poll the transaction manager.  
+ * There will also poll the transaction manager if any conflict occurred
+ * on a query on the space with a short timeout.
+ * <p>
+ * The first time a transaction causes a conflict, we schedule a
+ * time in the future at which we will poll its status.  We do not
+ * poll right away because often a transaction will complete on
+ * its own before we get to that time, making the check
+ * unnecessary.  An instant poll is, therefore, unnecessarily
+ * aggressive, since giving an initial grace time will usually mean
+ * no poll is made at all.  So if the first conflict occurs at
+ * <i>T</i><sub>0</sub>, the <code>nextQuery</code> value will be
+ * <i>T</i><sub>0</sub><code>+INITIAL_GRACE</code>, the boolean
+ * will be <code>true</code> to force that poll to happen, and
+ * <code>deltaT</code> will be set to <code>INITIAL_GRACE</code>.
+ *
+ * @author Sun Microsystems, Inc.
+ *
+ * @see TxnMonitor 
+ */
+class TxnMonitorTask extends RetryTask
+    implements TransactionConstants, org.apache.river.constants.TimeConstants
+{
+    /** transaction being monitored */
+    private final Txn  txn;
+
+    /** the monitor we were made by */
+    private final TxnMonitor monitor; 
+
+    /**
+     * All the queries on the space (not queries to the transaction
+     * manager) waiting for <code>txn</code> to be resolved.
+     * <code>null</code> until we have at least one. Represented by
+     * <code>QueryWatcher</code> objects.  
+     */
+    private Map<QueryWatcher,Collection<Txn>>          queries; //Sync on this.
+
+    /** count of RemoteExceptions */
+    private final AtomicInteger                failCnt;
+
+    /** 
+     * The next time we need to poll the transaction manager 
+     * to get <code>txn</code>'s actual state.
+     */
+    private final AtomicLong   nextQuery;
+
+    /**
+     * When we're given an opportunity to poll the transaction manager
+     * for the <code>txn</code>'s state, do so.
+     */
+    private volatile boolean   mustQuery;
+
+    /** next value added to <code>nextQuery</code> */
+    private volatile long      deltaT;
+
+    /**
+     * The initial grace period before the first query.
+     */
+    private static final long  INITIAL_GRACE = 15 * SECONDS;
+
+    /**
+     * The retry time when we have an encountered an exception
+     */
+    private static final long  BETWEEN_EXCEPTIONS = 15 * SECONDS;
+
+    /**
+     * The largest value that <code>deltaT</code> will reach.
+     */
+    private static final long  MAX_DELTA_T = 1 * HOURS;
+
+    /**
+     * The maximum number of failures allowed in a row before we simply
+     * give up on the transaction and consider it aborted.
+     */
+    private static final int   MAX_FAILURES = 3;
+
+    /** Logger for logging transaction related information */
+    private static final Logger logger = 
+       Logger.getLogger(OutriggerServerImpl.txnLoggerName);
+
+    /**
+     * Create a new TxnMonitorTask.
+     */
+    TxnMonitorTask(Txn txn, TxnMonitor monitor,
+                  ExecutorService manager, WakeupManager wakeupMgr) {
+       super(manager, wakeupMgr);
+       this.txn = txn;
+       this.monitor = monitor;
+       nextQuery = new AtomicLong(startTime());        // retryTime will add 
INITIAL_GRACE
+       deltaT = INITIAL_GRACE;
+       mustQuery = true;
+        failCnt = new AtomicInteger();
+    }
+
+    /**
+     * Return the time of the next query, bumping <code>deltaT</code> as
+     * necessary for the next iteration.  If the transaction has voted
+     * <code>PREPARED</code> or the manager has been giving us a
+     * <code>RemoteException</code>, we should retry on short times;
+     * otherwise we back off quickly.
+     */
+    public long retryTime() {
+        boolean noFailures = false;
+        synchronized (txn){
+            noFailures = (failCnt.get() == 0 && txn.getState() != PREPARED);
+        }
+            if (noFailures) {      // no failures
+                if (logger.isLoggable(Level.FINEST)) {
+                    logger.log(Level.FINEST, "{0} retryTime adds {1}", 
+                               new Object[]{this, Long.valueOf(deltaT)});
+                }
+
+                nextQuery.addAndGet(deltaT);
+                synchronized (this){
+                    if (deltaT < MAX_DELTA_T)
+                        deltaT = Math.min(deltaT * 2, MAX_DELTA_T);
+                }
+            } else {
+                if (logger.isLoggable(Level.FINEST)) {
+                    logger.log(Level.FINEST, "{0} retryTime adds {1} (for 
{2})", 
+                               new Object[]{this, 
Long.valueOf(BETWEEN_EXCEPTIONS), 
+                                   (failCnt.get() != 0 ? "failure" : 
"PREPARED")});
+                }
+                nextQuery.addAndGet(BETWEEN_EXCEPTIONS);
+            }
+        
+       return nextQuery.get();
+    }
+
+    /**
+     * Add a ``sibling'' transaction, one that is now blocking progress
+     * on one of the same entries.  For example, if a client is blocked
+     * on a <code>read</code>, another transaction can read the same
+     * entry, thereby also blocking that same client.  This means that
+     * the transaction for the second <code>read</code> must be
+     * watched, too.  The list of queries for the second transaction
+     * might be less that the list of those in this transaction, but
+     * the process of figuring out the subset is too expensive, since
+     * we have tried to make the checking process itself cheap,
+     * anyway.  So we add all queries this task is currently monitoring
+     * to the task monitoring the second transaction.  If there are
+     * no queries, then the blocking occurred because of a short query
+     * or all the queries have expired, in which case the second transaction
+     * isn't blocking the way of anything currently, so this method does
+     * nothing.
+     * <p>
+     * Of course, in order to avoid blocking the thread that is calling
+     * this (which is trying to perform a <code>read</code>, after
+     * all), we simply add each lease in this task to the monitor's
+     * queue.
+     *
+     */
+    // @see TxnEntryHandle#monitor
+    //!! Would it be worth the overhead to make TxnEntryHandle.monitor
+    //!! search for the transaction with the smallest set of leases?  -arnold
+    synchronized void addSibling(Txn txn) {
+       if (queries == null || queries.size() == 0)
+           return;
+       Collection<Txn> sibling = Collections.nCopies(1, txn);
+       Iterator<QueryWatcher> it = queries.keySet().iterator();
+       while (it.hasNext()) {
+           QueryWatcher query = it.next();
+           if (query != null)  // from a weak map, so might be null
+               monitor.add(query, sibling);
+       }
+    }
+
+    /**
+     * Try to see if this transaction should be aborted.  This returns
+     * <code>true</code> (don't repeat the task) if it knows that the
+     * transaction is no longer interesting to anyone.
+     */
+    public boolean tryOnce() {
+       if (logger.isLoggable(Level.FINEST)) {
+           logger.log(Level.FINEST, "{0} attempt {1} mustQuery:{2}", 
+               new Object[]{this, Integer.valueOf(attempt()), 
+                            Boolean.valueOf(mustQuery) });
+       }
+
+       /*
+        * The first time we do nothing, since RetryTask invokes run first,
+        * but we want to wait a bit before testing the transaction.
+        */
+       if (attempt() == 0)
+           return false;
+        int txnState;
+        synchronized (txn){
+            txnState = txn.getState();
+        }
+       if (logger.isLoggable(Level.FINEST)) {
+           logger.log(Level.FINEST, "{0} txn.getState() = {1}", 
+               new Object[]{this, Integer.valueOf(txnState)});
+       }
+
+       // not active or prepared == no longer blocking
+       
+       if (txnState != ACTIVE && txnState != PREPARED)
+           return true;
+
+       // if we're prepared, test every time -- this shouldn't take long
+       mustQuery |= (txnState == PREPARED);
+
+       /*
+        * Go through the resources to see if we can find one still active
+        * that cares.  Must be synchronized since we test, then clear --
+        * another thread that set the flag between the test and clear
+        * would have its requirements lost.
+        */
+       synchronized (this) {
+           if (!mustQuery) {           // then try resources
+               if (queries == null)    // no resources, so nobody wants it
+                   return false;       // try again next time
+
+               Iterator<QueryWatcher> it = queries.keySet().iterator();
+               boolean foundNeed = false;
+
+               if (logger.isLoggable(Level.FINEST)) {
+                   logger.log(Level.FINEST, "{0} nextQuery {1}", 
+                              new Object[]{this, nextQuery});
+               }
+
+               while (it.hasNext()) {
+                   QueryWatcher query = it.next();
+                   if (query == null)     // gone -- the map will reap it
+                       continue;
+                   if (logger.isLoggable(Level.FINEST)) {
+                       logger.log(Level.FINEST, 
+                                  "{0} query.getExpiration() {1}", 
+                                  new Object[]{this, 
+                                      Long.valueOf(query.getExpiration())});
+                   }
+
+                   if (query.getExpiration() < nextQuery.get() || 
query.isResolved()) {
+                        it.remove();
+                    }  // expired, so we don't care about it
+                   else {
+                       foundNeed = true;
+                       break;
+                   }
+               }
+
+               if (logger.isLoggable(Level.FINEST)) {
+                   logger.log(Level.FINEST, "{0} foundNeed = {1}", 
+                              new Object[]{this, Boolean.valueOf(foundNeed)});
+               }
+
+               if (!foundNeed)         // nobody wants it
+                   return false;       // try again next time
+           }
+           mustQuery = false;          // clear it for next time
+       }
+
+       /*
+        * Now we know (a) the transaction itself is alive, and (b) some
+        * lease still cares.  Make sure it's still active as far as the
+        * it knows, and if it is, then ask the manager about it.
+        */
+       ServerTransaction tr;
+       try {
+           /* This may fix a broken Txn, if it does it won't get moved
+            * from the broken to the unbroken list. It will get
+            * moved eventually, but it does seem unfortunate it does
+            * not happen immediately
+            */
+           tr = txn.getTransaction(
+               monitor.space().getRecoveredTransactionManagerPreparer());
+       } catch (RemoteException e) {
+           final int cat = ThrowableConstants.retryable(e);
+
+           if (cat == ThrowableConstants.BAD_INVOCATION ||
+               cat == ThrowableConstants.BAD_OBJECT)
+           {
+               // Not likely to get better, give up
+               logUnpackingFailure("definite exception", Level.INFO,
+                                   true, e);                               
+               return true;
+           } else if (cat == ThrowableConstants.INDEFINITE) {
+               // try, try, again
+               logUnpackingFailure("indefinite exception", Levels.FAILED,
+                                   false, e);                              
+               mustQuery = true;
+               return false;
+           } else if (cat == ThrowableConstants.UNCATEGORIZED) {
+               // Same as above but log differently.
+               mustQuery = true;
+               logUnpackingFailure("uncategorized exception", Level.INFO,
+                                   false, e);                              
+               return false;
+           } else {
+               logger.log(Level.WARNING, "ThrowableConstants.retryable " +
+                          "returned out of range value, " + cat,
+                          new AssertionError(e));
+               return false;
+           }
+       } catch (IOException e) {
+           // Not likely to get better
+           logUnpackingFailure("IOException", Level.INFO, true, e);
+           return true;
+       } catch (RuntimeException e) {
+           // Not likely to get better
+           logUnpackingFailure("RuntimeException", Level.INFO, true, e);
+           return true;
+       } catch (ClassNotFoundException e) {
+           // codebase probably down, keep trying
+           logUnpackingFailure("ClassNotFoundException", Levels.FAILED, 
+                               false, e);
+           mustQuery = true;
+           return false;
+       }
+
+       if (logger.isLoggable(Level.FINEST))
+           logger.log(Level.FINEST, "{0} tr = {1}", new Object[]{this, tr});
+
+       int trState;
+       try {
+           trState = tr.getState();
+       } catch (TransactionException e) {
+           if (logger.isLoggable(Level.INFO))
+               logger.log(Level.INFO, "Got TransactionException when " +
+                   "calling getState on " + tr + ", dropping transaction " +
+                   tr.id, e);
+           trState = ABORTED;
+       } catch (NoSuchObjectException e) {
+           /* It would be epsilon better to to give up immediately
+            * if we get a NoSuchObjectException and we are in the
+            * active state, however, the code to do this would
+            * be very complicated since we need to hold a lock to
+            * while reading and acting on the state.
+            */
+           if (failCnt.incrementAndGet() >= MAX_FAILURES) {
+               if (logger.isLoggable(Level.INFO)) {
+                   logger.log(Level.INFO, "Got NoSuchObjectException when " +
+                       "calling getState on " + tr + ", this was the " +
+                       failCnt + " RemoteException, dropping transaction" +
+                       tr.id, e);
+               }
+               trState = ABORTED;
+           } else {
+               if (logger.isLoggable(Levels.FAILED)) {
+                   logger.log(Levels.FAILED, "Got NoSuchObjectException " +
+                       "when calling getState on " + tr + ", failCount = " +
+                       failCnt + ", will retry", e);
+               }
+               mustQuery = true;      // keep on trying
+               return false;          // try again next time
+           }
+       } catch (RemoteException e) {
+           if (failCnt.incrementAndGet() >= MAX_FAILURES) {
+               /* abort if we are not prepared and not already 
+                * aborted. If prepared retry, otherwise
+                * we're done. Check state and make any abort() call
+                * atomically so we can't accidently abort a prepared
+                * transaction.
+                */
+               synchronized (txn) {
+                   switch (txn.getState()) {
+                     case ACTIVE:
+                       // Safe to abort, give up
+                       if (logger.isLoggable(Level.INFO)) {
+                           logger.log(Level.INFO, "Got RemoteException " +
+                               "when calling getState on " + tr + ", this " +
+                                "was " + failCnt + " RemoteException, " +
+                               "dropping active transaction " + tr.id, e);
+                       }
+
+                       try {
+                           monitor.space().abort(tr.mgr, tr.id);
+                           return true;
+                       } catch (UnknownTransactionException ute) {
+                           throw new AssertionError(ute);
+                       } catch (UnmarshalException ume) {
+                           throw new AssertionError(ume);
+                       }
+                     case PREPARED:
+                       final Level l = (failCnt.get()%MAX_FAILURES == 0)?
+                           Level.INFO:Levels.FAILED;
+                       if (logger.isLoggable(l)) {
+                           logger.log(l, "Got RemoteException when calling " +
+                               "getState on " + tr + ", this was " + 
+                               failCnt + " RemoteException, will keep " +
+                               "prepared transaction " + tr.id, e);
+                       }
+
+                       // Can't give up, keep on trying to find real state
+                       mustQuery = true;
+                       return false;
+                     case ABORTED:
+                     case COMMITTED:
+                       // done
+                       return true;
+                     default:
+                       throw new AssertionError("Txn in unreachable state");
+                   }
+               }
+           } else {
+               // Don't know, but not ready to give up
+               if (logger.isLoggable(Levels.FAILED)) {
+                   logger.log(Levels.FAILED, "Got RemoteException when " +
+                       "calling getState on " + tr + ", failCount = " +
+                       failCnt + ", will retry", e);
+               }
+
+               mustQuery = true;      // keep on trying
+               return false;          // try again next time
+           }
+       }
+    
+       if (logger.isLoggable(Level.FINER)) {
+           logger.log(Level.FINER, "{0} trState = {1}", 
+                      new Object[]{this, Integer.valueOf(trState)});
+       }
+
+       failCnt.set(0);                // reset failures -- we got a response
+
+       /*
+        * If the two states aren't the same, the state changed and we
+        * need to account for that locally here by calling the method
+        * that would make the change (the one we should have gotten.
+        * (We use the external forms of abort, commit, etc., because
+        * they are what the manager would call, and therefore these
+        * calls will always do exactly what the incoming manager
+        * calls would have done.  I don't want this to be fragile by
+        * bypassing those calls and going straight to the Txn
+        * object's calls, which might skip something important in the
+        * OutriggerServerImpl calls).
+        */
+
+       if (trState != txnState) {
+           if (logger.isLoggable(Level.FINER)) {
+               logger.log(Level.FINER, 
+                   "{0} mgr state[{1}] != local state [{2}]", 
+                   new Object[]{this,
+                                TxnConstants.getName(trState),
+                                TxnConstants.getName(txnState)});
+           }
+
+           try {
+               switch (trState) {
+                 case ABORTED:
+                   logger.log(Level.FINER, "{0} moving to abort", this);
+                         
+                   monitor.space().abort(tr.mgr, tr.id);
+                   return true;
+
+                 case COMMITTED:
+                   logger.log(Level.FINER, "{0} moving to commit", this);
+
+                   monitor.space().commit(tr.mgr, tr.id);
+                   return true;
+               }
+           } catch (UnknownTransactionException e) {
+               // we must somehow have already gotten the abort() or
+               // commit(), and have therefore forgotten about the
+               // transaction, while this code was executing
+               return true;
+           } catch (UnmarshalException ume) {
+               throw new AssertionError(ume);
+           }
+
+           // we can't fake anything else -- the manager will have to call
+           // us
+       }
+
+       logger.log(Level.FINEST, "{0} return false", this);
+
+       return false;                   // now we know so nothing more to do
+    }
+
+    /**
+     * Add in a resource.  The lease may already be in, in which case it is
+     * ignored, or it may be null, in which case it was a non-leased probe
+     * that was blocked and we simply set <code>mustQuery</code> to
+     * <code>true</code>.
+     */
+    synchronized void add(QueryWatcher query) {
+       if (query == null || query.getExpiration() <= nextQuery.get()) {
+           if (logger.isLoggable(Level.FINEST))
+               logger.log(Level.FINEST, "adding resource to task -- SHORT");
+           mustQuery = true;
+       } else {
+           if (logger.isLoggable(Level.FINEST))
+               logger.log(Level.FINEST, "adding resource to task -- LONG");
+           if (queries == null)
+               queries = new WeakHashMap<QueryWatcher,Collection<Txn>>();// we 
use it like a WeakHashSet
+           queries.put(query, null);
+       }
+    }
+
+    /** Log failed unpacking attempt attempt */
+    private void logUnpackingFailure(String exceptionDescription, Level level,
+                                    boolean terminal, Throwable t) 
+    {
+       if (logger.isLoggable(level)) {
+           logger.log(level, "Encountered " + exceptionDescription +
+               "while unpacking exception to check state, " +
+               (terminal?"dropping":"keeping") +  " monitoring task", t);
+       }
+    }
+
+}

Modified: 
river/jtsk/modules/modularize/apache-river/river-services/outrigger/outrigger-service/src/main/java/org/apache/river/outrigger/TypeTree.java
URL: 
http://svn.apache.org/viewvc/river/jtsk/modules/modularize/apache-river/river-services/outrigger/outrigger-service/src/main/java/org/apache/river/outrigger/TypeTree.java?rev=1879521&r1=1879520&r2=1879521&view=diff
==============================================================================
--- 
river/jtsk/modules/modularize/apache-river/river-services/outrigger/outrigger-service/src/main/java/org/apache/river/outrigger/TypeTree.java
 (original)
+++ 
river/jtsk/modules/modularize/apache-river/river-services/outrigger/outrigger-service/src/main/java/org/apache/river/outrigger/TypeTree.java
 Sun Jul  5 11:41:39 2020
@@ -26,6 +26,9 @@ import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ConcurrentSkipListSet;
+import org.apache.river.outrigger.proxy.EntryRep;
+
+
 
 /**
  * A type tree for entries.  It maintains, for each class, a list of


Reply via email to