Modified: river/jtsk/modules/modularize/apache-river/river-services/outrigger/outrigger-service/src/main/java/org/apache/river/outrigger/Txn.java URL: http://svn.apache.org/viewvc/river/jtsk/modules/modularize/apache-river/river-services/outrigger/outrigger-service/src/main/java/org/apache/river/outrigger/Txn.java?rev=1879521&r1=1879520&r2=1879521&view=diff ============================================================================== --- river/jtsk/modules/modularize/apache-river/river-services/outrigger/outrigger-service/src/main/java/org/apache/river/outrigger/Txn.java (original) +++ river/jtsk/modules/modularize/apache-river/river-services/outrigger/outrigger-service/src/main/java/org/apache/river/outrigger/Txn.java Sun Jul 5 11:41:39 2020 @@ -1,537 +1,540 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.river.outrigger; - -import org.apache.river.constants.TxnConstants; -import org.apache.river.logging.Levels; - -import java.io.IOException; -import java.io.ObjectInputStream; -import java.io.ObjectOutputStream; -import java.rmi.RemoteException; -import java.util.List; -import java.util.Iterator; -import java.util.Collections; -import java.util.logging.Level; -import java.util.logging.Logger; - -import net.jini.core.transaction.CannotJoinException; -import net.jini.core.transaction.server.ServerTransaction; -import net.jini.core.transaction.server.TransactionConstants; -import net.jini.core.transaction.server.TransactionManager; - -import net.jini.space.InternalSpaceException; -import net.jini.security.ProxyPreparer; - -/** - * This class represents a space's state in a single transaction. - * - * Object of this class represent Jini transactions within outrigger. - * These transactions hold "Transactables" -- things that represent - * the actions that have been taken under this transaction. For example, - * if this transaction were to be cancelled, the Transactables are - * examined and serve as the list of things to roll back in order to - * restore the state of the Space status quo ante. - * - * This is achieved by having the transactables notified of state changes - * to this transaction such as preparing, commit, etc. The - * transactables themselves are responsible for "doing the right thing." - * - * NB: some--but not all--of the actions one can take with a transaction - * are managed internally by these objects. That is, all of the important - * methods objects of these types are synchronized. Therefore, two - * simultaneous calls to abort() and commit() are arbitrated properly. - * - * However, care must be taken when add()ing a transactable. Even - * though the add() method is synchronized if you check the state of - * the transaction to ensure that is active and then call add() the - * transaction could have left the active state between the check and - * the add() call unless the call obtains the appropriate locks. This - * is even more likely if other work needs to be done in addition to - * calling add() (e.g. persisting state, obtaining locks, etc.). The - * caller of add() should lock the associated transaction object and - * ensure that the transaction is still considered ACTIVE, do whatever - * work is necessary to complete while the transaction is in the ACTIVE - * state (including calling call add()) and then release the lock. - * This can be done by : - * <ul> - * <li> holding the lock on this object while checking the - * state and carrying out the operation (including calling add()), or - * <li> calling ensureActive() to check the state - * and obtain a non-exclusive lock, carrying out the operation - * (including calling add()), and then calling allowStateChange() to - * release the lock. - * </ul> - * The pair of ensureActive() and allowStateChange() allows for more - * concurrency if the operation is expected to take a long time, in - * that it will allow for other operations to be performed under the - * same transaction and let aborts prevent other operations from - * being started. - * - * @author Sun Microsystems, Inc. - */ -class Txn implements TransactableMgr, TransactionConstants, StorableObject<Txn>, Comparable<Txn> { - - /** The internal id Outrigger as assigned to the transaction */ - final private long id; - - /** What state we think the transaction is in */ - private volatile int state; - - /** - * The transaction manager associated with the transaction - * this object is fronting for. - */ - private StorableReference trm; - - /** - * Cached <code>ServerTransaction</code> object for - * the transaction this object is fronting for. - */ - private ServerTransaction tr; - - /** The id the transaction manager assigned to this transaction */ - private volatile long trId; - - /** - * The list of <code>Transactable</code> participating in - * this transaction. - */ - final private List<Transactable> txnables = new java.util.LinkedList<Transactable>(); - - /** - * The task responsible for monitoring to see if this - * transaction has been aborted with us being told, or - * null if no such task as been allocated. - */ - private volatile TxnMonitorTask monitorTask; - - /** Count of number of threads holding a read lock on state */ - private int stateReaders = 0; - - /** - * <code>true</code> if there is a blocked state change. Used - * to give writers priority. - */ - private boolean stateChangeWaiting = false; - - /** Logger for logging transaction related information */ - private static final Logger logger = - Logger.getLogger(OutriggerServerImpl.txnLoggerName); - - /** - * Create a new <code>Txn</code> that represents our state in the - * given <code>ServerTransaction</code>. - */ - Txn(ServerTransaction tr, long id) { - this(id); - trId = tr.id; - this.tr = tr; - this.trm = new StorableReference(tr.mgr); - state = ACTIVE; - - if (logger.isLoggable(Level.FINER)) { - logger.log(Level.FINER, "creating txn for transaction mgr:" + - "{0}, id:{1}, state:{2}", - new Object[]{tr, Long.valueOf(trId), TxnConstants.getName(state)}); - } - } - - /** Used in recovery */ - Txn(long id) { - this.id = id; // the txn id is not persisted - } - - /** - * Get the id for this txn. Note that this id is NOT the same as - * the ID of the transaction. Since that ID is not unique (it must - * be qualified with the <code>ServerTransaction</code> object) we create - * our own internal id to make txns unique. This is needed since we - * may not have the <code>Transaction</code> unmarshalled. - */ - Long getId() { - return Long.valueOf(id); - } - - /** - * We keep the transaction ID around because we may need it - * to identify a broken transaction after recovery. - */ - long getTransactionId() { - return trId; - } - - /** - * Return our local view of the current state. Need to be holding - * the lock on this object or have called <code>ensureActive</code> - * to get the current value. - */ - int getState() { - return state; - } - - /** - * Atomically checks that this transaction is in the active - * state and locks the transaction in the active state. - * The lock can be released by calling <code>allowStateChange</code>. - * Each call to this method should be paired with a call to - * <code>allowStateChange</code> in a finally block. - * @throws CannotJoinException if the transaction - * is not active or a state change is pending. - */ - synchronized void ensureActive() throws CannotJoinException { - if (state != ACTIVE || stateChangeWaiting) { - final String msg = "transaction mgr:" + tr + ", id:" + trId + - " not active, in state " + TxnConstants.getName(state); - final CannotJoinException e = new CannotJoinException(msg); - logger.log(Levels.FAILED, msg, e); - throw e; - } - assert stateReaders >= 0; - stateReaders++; - } - - /** - * Release the read lock created by an <code>ensureActive</code> - * call. Does nothing if the transaction is not active or there is - * a state change pending and thus is safe to call even if the - * corresponding <code>ensureActive</code> call threw - * <code>CannotJoinException</code>. - */ - synchronized void allowStateChange() { - if (state != ACTIVE || stateChangeWaiting) - return; - stateReaders--; - assert stateReaders >= 0; - notifyAll(); - } - - /** - * Prevents new operations from being started under this - * transaction and blocks until in process operations are - * completed. - */ - synchronized void makeInactive() { - stateChangeWaiting = true; - assert stateReaders >= 0; - while (stateReaders != 0) { - try { - wait(); - } catch (InterruptedException e) { - throw new AssertionError(e); - } - assert stateReaders >= 0; - } - } - - /** - * Prepare for transaction commit. <code>makeInactive</code> must have - * been called on this transaction first. - */ - synchronized int prepare(OutriggerServerImpl space) { - assert stateChangeWaiting : "prepare called before makeInactive"; - assert stateReaders == 0 : "prepare called before makeInactive completed"; - - if (logger.isLoggable(Level.FINER)) { - logger.log(Level.FINER, "prepare: transaction mgr:{0}, id:{1}, " + - "state:{2}", - new Object[]{tr, Long.valueOf(trId), TxnConstants.getName(state)}); - } - - switch (state) { - case ABORTED: // previously aborted - return ABORTED; - - case COMMITTED: // previously committed - throw new IllegalStateException(); // "cannot happen" - - case NOTCHANGED: // previously voted NOTCHANGED - case PREPARED: // previously voted PREPARED - return state; // they are idempotent, and - // and we have nothing to do - // so return - - case ACTIVE: // currently active - boolean changed = false; // did this txn change - // anything? - - if (logger.isLoggable(Level.FINEST)) { - logger.log(Level.FINEST, "prepare:preparing transaction mgr:" + - "{0}, id:{1}, state:{2}", - new Object[]{tr, Long.valueOf(trId), TxnConstants.getName(state)}); - } - - // loop through Transactable members of this Txn - final Iterator<Transactable> i = txnables.iterator(); - int c=0; // Counter for debugging message - while (i.hasNext()) { - // get this member's vote - final Transactable transactable = i.next(); - final int prepState = transactable.prepare(this, space); - if (logger.isLoggable(Level.FINEST)) { - logger.log(Level.FINEST, "prepare:prepared " + - "transactable {0} for transaction mgr:{1}, id:{2}," + - " transactable now in state {3}", - new Object[]{transactable, tr, Long.valueOf(trId), - TxnConstants.getName(prepState)}); - } - - switch (prepState) { - case PREPARED: // has prepared state - changed = true; // this means a change - continue; - - case ABORTED: // has to abort - abort(space); // abort this txn (does cleanup) - state = ABORTED; - return state; // vote aborted - - case NOTCHANGED: // no change - i.remove(); // Won't need to call again - continue; - - default: // huh? - throw new - InternalSpaceException("prepare said " + prepState); - } - } - - if (changed) { - state = PREPARED; - // have to watch this since it's now holding permanent - // resources - space.monitor(Collections.nCopies(1, this)); - } else { - state = NOTCHANGED; - } - break; - - default: - throw new IllegalStateException("unknown Txn state: " + state); - } - - return state; - } - - /** - * Abort the transaction. This must be callable from - * <code>prepare</code> because if a <code>Transactable</code> - * votes <code>ABORT</code>, this method is called to make that - * happen. <code>makeInactive</code> must have been called on this - * transaction first. - */ - synchronized void abort(OutriggerServerImpl space) { - assert stateChangeWaiting : "abort called before makeInactive"; - assert stateReaders == 0 : "abort called before makeInactive completed"; - - if (logger.isLoggable(Level.FINER)) { - logger.log(Level.FINER, "abort: transaction mgr:{0}, id:{1}, " + - "state:{2}", - new Object[]{tr, Long.valueOf(trId), TxnConstants.getName(state)}); - } - - switch (state) { - case ABORTED: // already aborted - case NOTCHANGED: // nothing to abort - break; - - case COMMITTED: // "cannot happen" - throw new IllegalStateException("aborting a committed txn"); - - case ACTIVE: - case PREPARED: - if (logger.isLoggable(Level.FINEST)) { - logger.log(Level.FINEST, "abort:aborting transaction mgr:" + - "{0}, id:{1}, state:{2}", - new Object[]{tr, Long.valueOf(trId), TxnConstants.getName(state)}); - } - - final Iterator<Transactable> i = txnables.iterator(); - while (i.hasNext()) { - i.next().abort(this, space); - } - state = ABORTED; - cleanup(); - break; - - default: - throw new IllegalStateException("unknown Txn state: " + state); - } - } - - /** - * Having prepared, roll the changes - * forward. <code>makeInactive</code> must have been called on - * this transaction first. - */ - synchronized void commit(OutriggerServerImpl space) { - assert stateChangeWaiting : "commit called before makeInactive"; - assert stateReaders == 0 : "commit called before makeInactive completed"; - - //!! Need to involve mgr here - if (logger.isLoggable(Level.FINER)) { - logger.log(Level.FINER, "commit: transaction mgr:{0}, id:{1}, " + - "state:{2}", - new Object[]{tr, Long.valueOf(trId), TxnConstants.getName(state)}); - } - - switch (state) { - case ABORTED: // "cannot happen" stuff - case ACTIVE: - case NOTCHANGED: - throw new IllegalStateException("committing " - + TxnConstants.getName(state) + " txn"); - - case COMMITTED: // previous committed, that's okay - return; - - case PREPARED: // voted PREPARED, time to finish up - final Iterator<Transactable> i = txnables.iterator(); - if (logger.isLoggable(Level.FINEST)) { - logger.log(Level.FINEST, "commit:committing transaction mgr:" + - "{0}, id:{1}, state:{2}", - new Object[]{tr, Long.valueOf(trId), TxnConstants.getName(state)}); - } - - while (i.hasNext()) { - i.next().commit(this, space); - } - state = COMMITTED; - cleanup(); - return; - - default: - throw new IllegalStateException("unknown Txn state: " + state); - } - } - - /** - * Caution: see locking discussion at the class level. - */ - public synchronized Transactable add(Transactable t) { - txnables.add(t); - return t; - } - - // inherit doc comment - public ServerTransaction getTransaction(ProxyPreparer preparer) - throws IOException, ClassNotFoundException - { - synchronized (this){ - if (tr == null) { - final TransactionManager mgr = - (TransactionManager)trm.get(preparer); - tr = new ServerTransaction(mgr, trId); - } - return tr; - } - } - - /** - * Return the manager associated with this transaction. - * @return the manager associated with this transaction. - * @throws IllegalStateException if this <code>Txn</code> - * is still broken. - */ - synchronized TransactionManager getManager() { - if (tr == null) - throw new IllegalStateException("Txn is still broken"); - return tr.mgr; - } - - /** - * Return the monitor task for this object. Note, this - * method is unsynchronized because it (and - * <code>monitorTask(TxnMonitorTask)</code> are both called - * from the same thread. - */ - TxnMonitorTask monitorTask() { - return monitorTask; - } - - /** - * Set the monitor task for this object. Note, this method is - * unsynchronized because it (and <code>monitorTask()</code> are - * both called from the same thread. - */ - void monitorTask(TxnMonitorTask task) { - monitorTask = task; - } - - /** - * Clean up any state when the transaction is finished. - */ - private void cleanup() { - if (monitorTask != null) - monitorTask.cancel(false); } - - // ----------------------------------- - // Methods required by StorableObject - // ----------------------------------- - - // inherit doc comment - public void store(ObjectOutputStream out) throws IOException { - /* There is a bunch of stuff we don't need to write. The - * Txn id not stored since it is handed back during - * recovery. The content is rebuilt txnables by the various - * recoverWrite and recoverTake calls. state is not written - * because it is always ACTIVE when we write, and always - * needs to be PREPARED when we read it back. - */ - synchronized (this){ - out.writeObject(trm); - out.writeLong(trId); - } - } - - // inherit doc comment - public Txn restore(ObjectInputStream in) - throws IOException, ClassNotFoundException - { - /* Only transactions that got prepared and not committed or - * aborted get recovered - */ - synchronized (this){ - state = PREPARED; - trm = (StorableReference)in.readObject(); - trId = in.readLong(); - } - return this; - } - - @Override - public int compareTo(Txn o) { - if (o == null) return -1; - if (o.id < id) return -1; - if (o.id > id) return 1; - return 0; - } - - @Override - public int hashCode() { - int hash = 7; - hash = 31 * hash + (int) (this.id ^ (this.id >>> 32)); - return hash; - } - - public boolean equals(Object o){ - if (!(o instanceof Txn)) return false; - Txn txn = (Txn) o; - return id == txn.id; - } -} +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.river.outrigger; + +import org.apache.river.constants.TxnConstants; +import org.apache.river.logging.Levels; + +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.rmi.RemoteException; +import java.util.List; +import java.util.Iterator; +import java.util.Collections; +import java.util.logging.Level; +import java.util.logging.Logger; + +import net.jini.core.transaction.CannotJoinException; +import net.jini.core.transaction.server.ServerTransaction; +import net.jini.core.transaction.server.TransactionConstants; +import net.jini.core.transaction.server.TransactionManager; + +import net.jini.space.InternalSpaceException; +import net.jini.security.ProxyPreparer; +import org.apache.river.outrigger.proxy.StorableObject; + + + +/** + * This class represents a space's state in a single transaction. + * + * Object of this class represent Jini transactions within outrigger. + * These transactions hold "Transactables" -- things that represent + * the actions that have been taken under this transaction. For example, + * if this transaction were to be cancelled, the Transactables are + * examined and serve as the list of things to roll back in order to + * restore the state of the Space status quo ante. + * + * This is achieved by having the transactables notified of state changes + * to this transaction such as preparing, commit, etc. The + * transactables themselves are responsible for "doing the right thing." + * + * NB: some--but not all--of the actions one can take with a transaction + * are managed internally by these objects. That is, all of the important + * methods objects of these types are synchronized. Therefore, two + * simultaneous calls to abort() and commit() are arbitrated properly. + * + * However, care must be taken when add()ing a transactable. Even + * though the add() method is synchronized if you check the state of + * the transaction to ensure that is active and then call add() the + * transaction could have left the active state between the check and + * the add() call unless the call obtains the appropriate locks. This + * is even more likely if other work needs to be done in addition to + * calling add() (e.g. persisting state, obtaining locks, etc.). The + * caller of add() should lock the associated transaction object and + * ensure that the transaction is still considered ACTIVE, do whatever + * work is necessary to complete while the transaction is in the ACTIVE + * state (including calling call add()) and then release the lock. + * This can be done by : + * <ul> + * <li> holding the lock on this object while checking the + * state and carrying out the operation (including calling add()), or + * <li> calling ensureActive() to check the state + * and obtain a non-exclusive lock, carrying out the operation + * (including calling add()), and then calling allowStateChange() to + * release the lock. + * </ul> + * The pair of ensureActive() and allowStateChange() allows for more + * concurrency if the operation is expected to take a long time, in + * that it will allow for other operations to be performed under the + * same transaction and let aborts prevent other operations from + * being started. + * + * @author Sun Microsystems, Inc. + */ +class Txn implements TransactableMgr, TransactionConstants, StorableObject<Txn>, Comparable<Txn> { + + /** The internal id Outrigger as assigned to the transaction */ + final private long id; + + /** What state we think the transaction is in */ + private volatile int state; + + /** + * The transaction manager associated with the transaction + * this object is fronting for. + */ + private StorableReference trm; + + /** + * Cached <code>ServerTransaction</code> object for + * the transaction this object is fronting for. + */ + private ServerTransaction tr; + + /** The id the transaction manager assigned to this transaction */ + private volatile long trId; + + /** + * The list of <code>Transactable</code> participating in + * this transaction. + */ + final private List<Transactable> txnables = new java.util.LinkedList<Transactable>(); + + /** + * The task responsible for monitoring to see if this + * transaction has been aborted with us being told, or + * null if no such task as been allocated. + */ + private volatile TxnMonitorTask monitorTask; + + /** Count of number of threads holding a read lock on state */ + private int stateReaders = 0; + + /** + * <code>true</code> if there is a blocked state change. Used + * to give writers priority. + */ + private boolean stateChangeWaiting = false; + + /** Logger for logging transaction related information */ + private static final Logger logger = + Logger.getLogger(OutriggerServerImpl.txnLoggerName); + + /** + * Create a new <code>Txn</code> that represents our state in the + * given <code>ServerTransaction</code>. + */ + Txn(ServerTransaction tr, long id) { + this(id); + trId = tr.id; + this.tr = tr; + this.trm = new StorableReference(tr.mgr); + state = ACTIVE; + + if (logger.isLoggable(Level.FINER)) { + logger.log(Level.FINER, "creating txn for transaction mgr:" + + "{0}, id:{1}, state:{2}", + new Object[]{tr, Long.valueOf(trId), TxnConstants.getName(state)}); + } + } + + /** Used in recovery */ + Txn(long id) { + this.id = id; // the txn id is not persisted + } + + /** + * Get the id for this txn. Note that this id is NOT the same as + * the ID of the transaction. Since that ID is not unique (it must + * be qualified with the <code>ServerTransaction</code> object) we create + * our own internal id to make txns unique. This is needed since we + * may not have the <code>Transaction</code> unmarshalled. + */ + Long getId() { + return Long.valueOf(id); + } + + /** + * We keep the transaction ID around because we may need it + * to identify a broken transaction after recovery. + */ + long getTransactionId() { + return trId; + } + + /** + * Return our local view of the current state. Need to be holding + * the lock on this object or have called <code>ensureActive</code> + * to get the current value. + */ + int getState() { + return state; + } + + /** + * Atomically checks that this transaction is in the active + * state and locks the transaction in the active state. + * The lock can be released by calling <code>allowStateChange</code>. + * Each call to this method should be paired with a call to + * <code>allowStateChange</code> in a finally block. + * @throws CannotJoinException if the transaction + * is not active or a state change is pending. + */ + synchronized void ensureActive() throws CannotJoinException { + if (state != ACTIVE || stateChangeWaiting) { + final String msg = "transaction mgr:" + tr + ", id:" + trId + + " not active, in state " + TxnConstants.getName(state); + final CannotJoinException e = new CannotJoinException(msg); + logger.log(Levels.FAILED, msg, e); + throw e; + } + assert stateReaders >= 0; + stateReaders++; + } + + /** + * Release the read lock created by an <code>ensureActive</code> + * call. Does nothing if the transaction is not active or there is + * a state change pending and thus is safe to call even if the + * corresponding <code>ensureActive</code> call threw + * <code>CannotJoinException</code>. + */ + synchronized void allowStateChange() { + if (state != ACTIVE || stateChangeWaiting) + return; + stateReaders--; + assert stateReaders >= 0; + notifyAll(); + } + + /** + * Prevents new operations from being started under this + * transaction and blocks until in process operations are + * completed. + */ + synchronized void makeInactive() { + stateChangeWaiting = true; + assert stateReaders >= 0; + while (stateReaders != 0) { + try { + wait(); + } catch (InterruptedException e) { + throw new AssertionError(e); + } + assert stateReaders >= 0; + } + } + + /** + * Prepare for transaction commit. <code>makeInactive</code> must have + * been called on this transaction first. + */ + synchronized int prepare(OutriggerServerImpl space) { + assert stateChangeWaiting : "prepare called before makeInactive"; + assert stateReaders == 0 : "prepare called before makeInactive completed"; + + if (logger.isLoggable(Level.FINER)) { + logger.log(Level.FINER, "prepare: transaction mgr:{0}, id:{1}, " + + "state:{2}", + new Object[]{tr, Long.valueOf(trId), TxnConstants.getName(state)}); + } + + switch (state) { + case ABORTED: // previously aborted + return ABORTED; + + case COMMITTED: // previously committed + throw new IllegalStateException(); // "cannot happen" + + case NOTCHANGED: // previously voted NOTCHANGED + case PREPARED: // previously voted PREPARED + return state; // they are idempotent, and + // and we have nothing to do + // so return + + case ACTIVE: // currently active + boolean changed = false; // did this txn change + // anything? + + if (logger.isLoggable(Level.FINEST)) { + logger.log(Level.FINEST, "prepare:preparing transaction mgr:" + + "{0}, id:{1}, state:{2}", + new Object[]{tr, Long.valueOf(trId), TxnConstants.getName(state)}); + } + + // loop through Transactable members of this Txn + final Iterator<Transactable> i = txnables.iterator(); + int c=0; // Counter for debugging message + while (i.hasNext()) { + // get this member's vote + final Transactable transactable = i.next(); + final int prepState = transactable.prepare(this, space); + if (logger.isLoggable(Level.FINEST)) { + logger.log(Level.FINEST, "prepare:prepared " + + "transactable {0} for transaction mgr:{1}, id:{2}," + + " transactable now in state {3}", + new Object[]{transactable, tr, Long.valueOf(trId), + TxnConstants.getName(prepState)}); + } + + switch (prepState) { + case PREPARED: // has prepared state + changed = true; // this means a change + continue; + + case ABORTED: // has to abort + abort(space); // abort this txn (does cleanup) + state = ABORTED; + return state; // vote aborted + + case NOTCHANGED: // no change + i.remove(); // Won't need to call again + continue; + + default: // huh? + throw new + InternalSpaceException("prepare said " + prepState); + } + } + + if (changed) { + state = PREPARED; + // have to watch this since it's now holding permanent + // resources + space.monitor(Collections.nCopies(1, this)); + } else { + state = NOTCHANGED; + } + break; + + default: + throw new IllegalStateException("unknown Txn state: " + state); + } + + return state; + } + + /** + * Abort the transaction. This must be callable from + * <code>prepare</code> because if a <code>Transactable</code> + * votes <code>ABORT</code>, this method is called to make that + * happen. <code>makeInactive</code> must have been called on this + * transaction first. + */ + synchronized void abort(OutriggerServerImpl space) { + assert stateChangeWaiting : "abort called before makeInactive"; + assert stateReaders == 0 : "abort called before makeInactive completed"; + + if (logger.isLoggable(Level.FINER)) { + logger.log(Level.FINER, "abort: transaction mgr:{0}, id:{1}, " + + "state:{2}", + new Object[]{tr, Long.valueOf(trId), TxnConstants.getName(state)}); + } + + switch (state) { + case ABORTED: // already aborted + case NOTCHANGED: // nothing to abort + break; + + case COMMITTED: // "cannot happen" + throw new IllegalStateException("aborting a committed txn"); + + case ACTIVE: + case PREPARED: + if (logger.isLoggable(Level.FINEST)) { + logger.log(Level.FINEST, "abort:aborting transaction mgr:" + + "{0}, id:{1}, state:{2}", + new Object[]{tr, Long.valueOf(trId), TxnConstants.getName(state)}); + } + + final Iterator<Transactable> i = txnables.iterator(); + while (i.hasNext()) { + i.next().abort(this, space); + } + state = ABORTED; + cleanup(); + break; + + default: + throw new IllegalStateException("unknown Txn state: " + state); + } + } + + /** + * Having prepared, roll the changes + * forward. <code>makeInactive</code> must have been called on + * this transaction first. + */ + synchronized void commit(OutriggerServerImpl space) { + assert stateChangeWaiting : "commit called before makeInactive"; + assert stateReaders == 0 : "commit called before makeInactive completed"; + + //!! Need to involve mgr here + if (logger.isLoggable(Level.FINER)) { + logger.log(Level.FINER, "commit: transaction mgr:{0}, id:{1}, " + + "state:{2}", + new Object[]{tr, Long.valueOf(trId), TxnConstants.getName(state)}); + } + + switch (state) { + case ABORTED: // "cannot happen" stuff + case ACTIVE: + case NOTCHANGED: + throw new IllegalStateException("committing " + + TxnConstants.getName(state) + " txn"); + + case COMMITTED: // previous committed, that's okay + return; + + case PREPARED: // voted PREPARED, time to finish up + final Iterator<Transactable> i = txnables.iterator(); + if (logger.isLoggable(Level.FINEST)) { + logger.log(Level.FINEST, "commit:committing transaction mgr:" + + "{0}, id:{1}, state:{2}", + new Object[]{tr, Long.valueOf(trId), TxnConstants.getName(state)}); + } + + while (i.hasNext()) { + i.next().commit(this, space); + } + state = COMMITTED; + cleanup(); + return; + + default: + throw new IllegalStateException("unknown Txn state: " + state); + } + } + + /** + * Caution: see locking discussion at the class level. + */ + public synchronized Transactable add(Transactable t) { + txnables.add(t); + return t; + } + + // inherit doc comment + public ServerTransaction getTransaction(ProxyPreparer preparer) + throws IOException, ClassNotFoundException + { + synchronized (this){ + if (tr == null) { + final TransactionManager mgr = + (TransactionManager)trm.get(preparer); + tr = new ServerTransaction(mgr, trId); + } + return tr; + } + } + + /** + * Return the manager associated with this transaction. + * @return the manager associated with this transaction. + * @throws IllegalStateException if this <code>Txn</code> + * is still broken. + */ + synchronized TransactionManager getManager() { + if (tr == null) + throw new IllegalStateException("Txn is still broken"); + return tr.mgr; + } + + /** + * Return the monitor task for this object. Note, this + * method is unsynchronized because it (and + * <code>monitorTask(TxnMonitorTask)</code> are both called + * from the same thread. + */ + TxnMonitorTask monitorTask() { + return monitorTask; + } + + /** + * Set the monitor task for this object. Note, this method is + * unsynchronized because it (and <code>monitorTask()</code> are + * both called from the same thread. + */ + void monitorTask(TxnMonitorTask task) { + monitorTask = task; + } + + /** + * Clean up any state when the transaction is finished. + */ + private void cleanup() { + if (monitorTask != null) + monitorTask.cancel(false); } + + // ----------------------------------- + // Methods required by StorableObject + // ----------------------------------- + + // inherit doc comment + public void store(ObjectOutputStream out) throws IOException { + /* There is a bunch of stuff we don't need to write. The + * Txn id not stored since it is handed back during + * recovery. The content is rebuilt txnables by the various + * recoverWrite and recoverTake calls. state is not written + * because it is always ACTIVE when we write, and always + * needs to be PREPARED when we read it back. + */ + synchronized (this){ + out.writeObject(trm); + out.writeLong(trId); + } + } + + // inherit doc comment + public Txn restore(ObjectInputStream in) + throws IOException, ClassNotFoundException + { + /* Only transactions that got prepared and not committed or + * aborted get recovered + */ + synchronized (this){ + state = PREPARED; + trm = (StorableReference)in.readObject(); + trId = in.readLong(); + } + return this; + } + + @Override + public int compareTo(Txn o) { + if (o == null) return -1; + if (o.id < id) return -1; + if (o.id > id) return 1; + return 0; + } + + @Override + public int hashCode() { + int hash = 7; + hash = 31 * hash + (int) (this.id ^ (this.id >>> 32)); + return hash; + } + + public boolean equals(Object o){ + if (!(o instanceof Txn)) return false; + Txn txn = (Txn) o; + return id == txn.id; + } +}
Modified: river/jtsk/modules/modularize/apache-river/river-services/outrigger/outrigger-service/src/main/java/org/apache/river/outrigger/TxnMonitor.java URL: http://svn.apache.org/viewvc/river/jtsk/modules/modularize/apache-river/river-services/outrigger/outrigger-service/src/main/java/org/apache/river/outrigger/TxnMonitor.java?rev=1879521&r1=1879520&r2=1879521&view=diff ============================================================================== --- river/jtsk/modules/modularize/apache-river/river-services/outrigger/outrigger-service/src/main/java/org/apache/river/outrigger/TxnMonitor.java (original) +++ river/jtsk/modules/modularize/apache-river/river-services/outrigger/outrigger-service/src/main/java/org/apache/river/outrigger/TxnMonitor.java Sun Jul 5 11:41:39 2020 @@ -18,7 +18,7 @@ package org.apache.river.outrigger; import org.apache.river.config.Config; -import org.apache.river.thread.WakeupManager; +import org.apache.river.thread.wakeup.WakeupManager; import net.jini.config.Configuration; import net.jini.config.ConfigurationException; Modified: river/jtsk/modules/modularize/apache-river/river-services/outrigger/outrigger-service/src/main/java/org/apache/river/outrigger/TxnMonitorTask.java URL: http://svn.apache.org/viewvc/river/jtsk/modules/modularize/apache-river/river-services/outrigger/outrigger-service/src/main/java/org/apache/river/outrigger/TxnMonitorTask.java?rev=1879521&r1=1879520&r2=1879521&view=diff ============================================================================== --- river/jtsk/modules/modularize/apache-river/river-services/outrigger/outrigger-service/src/main/java/org/apache/river/outrigger/TxnMonitorTask.java (original) +++ river/jtsk/modules/modularize/apache-river/river-services/outrigger/outrigger-service/src/main/java/org/apache/river/outrigger/TxnMonitorTask.java Sun Jul 5 11:41:39 2020 @@ -1,584 +1,584 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.river.outrigger; - -import org.apache.river.constants.TxnConstants; -import org.apache.river.constants.ThrowableConstants; -import org.apache.river.logging.Levels; -import org.apache.river.thread.RetryTask; -import org.apache.river.thread.WakeupManager; - -import java.io.IOException; -import java.rmi.RemoteException; -import java.rmi.UnmarshalException; -import java.rmi.NoSuchObjectException; -import java.util.Map; -import java.util.WeakHashMap; -import java.util.Iterator; -import java.util.Collection; -import java.util.Collections; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicLong; -import java.util.logging.Level; -import java.util.logging.Logger; - -import net.jini.core.transaction.TransactionException; -import net.jini.core.transaction.UnknownTransactionException; -import net.jini.core.transaction.server.ServerTransaction; -import net.jini.core.transaction.server.TransactionConstants; - -/** - * A task that will try to validate the state of a transaction. This - * uses weak references a good deal to let the other parts of the system - * be GC'ed as necessary. - * <p> - * The retry mechanism is subtle, so bear with me. The purpose is - * to ensure that if any activity is being blocked by a given - * transaction, that transaction will be tested at some point in - * the future (if necessary, i.e., if it still is thought to be - * active). We assume it to be rare that a transactions that the - * space thinks is active is, in fact, aborted, so the algorithm is - * designed to guarantee the detection without a lot of overhead, - * specifically without a lot of RMI calls. - * <p> - * Each task has three values: a <code>nextQuery</code> time, a - * <code>mustQuery</code> boolean that force the next query to be - * made, and <code>deltaT</code>, the time at which the following - * query will be scheduled. When the task is awakened at its - * <code>nextQuery</code> time, it checks to see if it must make an - * actual query to the transaction manager, which it will do if either - * <code>mustQuery</code> is <code>true</code>, or if we know about - * any in progress queries on the space that are blocked on the - * transaction. Whether or not an actual query is made, - * <code>deltaT</code> is added to <code>nextQuery</code> to get the - * <code>nextQuery</code> time, <code>deltaT</code> is doubled, and - * <code>mustQuery</code> boolean is set to <code>false</code>. - * <p> - * There are two kinds of requests that a with which transaction - * can cause a conflict -- those with long timeouts (such as - * blocking reads and takes) and those that are under short timeouts - * (such as reads and takes with zero-length timeouts). We will - * treat them separately at several points of the algorithm. A - * short timeout is any query whose expiration time is sooner than - * the <code>nextQuery</code> time. Any other timeout is long - * If a short query arrives, <code>mustQuery</code> is set to - * <code>true</code>. - * <p> - * The result is that any time a transaction causes a conflict, if - * the query on the space has not ended by the time of the - * <code>nextQuery</code> we will attempt to poll the transaction manager. - * There will also poll the transaction manager if any conflict occurred - * on a query on the space with a short timeout. - * <p> - * The first time a transaction causes a conflict, we schedule a - * time in the future at which we will poll its status. We do not - * poll right away because often a transaction will complete on - * its own before we get to that time, making the check - * unnecessary. An instant poll is, therefore, unnecessarily - * aggressive, since giving an initial grace time will usually mean - * no poll is made at all. So if the first conflict occurs at - * <i>T</i><sub>0</sub>, the <code>nextQuery</code> value will be - * <i>T</i><sub>0</sub><code>+INITIAL_GRACE</code>, the boolean - * will be <code>true</code> to force that poll to happen, and - * <code>deltaT</code> will be set to <code>INITIAL_GRACE</code>. - * - * @author Sun Microsystems, Inc. - * - * @see TxnMonitor - */ -class TxnMonitorTask extends RetryTask - implements TransactionConstants, org.apache.river.constants.TimeConstants -{ - /** transaction being monitored */ - private final Txn txn; - - /** the monitor we were made by */ - private final TxnMonitor monitor; - - /** - * All the queries on the space (not queries to the transaction - * manager) waiting for <code>txn</code> to be resolved. - * <code>null</code> until we have at least one. Represented by - * <code>QueryWatcher</code> objects. - */ - private Map<QueryWatcher,Collection<Txn>> queries; //Sync on this. - - /** count of RemoteExceptions */ - private final AtomicInteger failCnt; - - /** - * The next time we need to poll the transaction manager - * to get <code>txn</code>'s actual state. - */ - private final AtomicLong nextQuery; - - /** - * When we're given an opportunity to poll the transaction manager - * for the <code>txn</code>'s state, do so. - */ - private volatile boolean mustQuery; - - /** next value added to <code>nextQuery</code> */ - private volatile long deltaT; - - /** - * The initial grace period before the first query. - */ - private static final long INITIAL_GRACE = 15 * SECONDS; - - /** - * The retry time when we have an encountered an exception - */ - private static final long BETWEEN_EXCEPTIONS = 15 * SECONDS; - - /** - * The largest value that <code>deltaT</code> will reach. - */ - private static final long MAX_DELTA_T = 1 * HOURS; - - /** - * The maximum number of failures allowed in a row before we simply - * give up on the transaction and consider it aborted. - */ - private static final int MAX_FAILURES = 3; - - /** Logger for logging transaction related information */ - private static final Logger logger = - Logger.getLogger(OutriggerServerImpl.txnLoggerName); - - /** - * Create a new TxnMonitorTask. - */ - TxnMonitorTask(Txn txn, TxnMonitor monitor, - ExecutorService manager, WakeupManager wakeupMgr) { - super(manager, wakeupMgr); - this.txn = txn; - this.monitor = monitor; - nextQuery = new AtomicLong(startTime()); // retryTime will add INITIAL_GRACE - deltaT = INITIAL_GRACE; - mustQuery = true; - failCnt = new AtomicInteger(); - } - - /** - * Return the time of the next query, bumping <code>deltaT</code> as - * necessary for the next iteration. If the transaction has voted - * <code>PREPARED</code> or the manager has been giving us a - * <code>RemoteException</code>, we should retry on short times; - * otherwise we back off quickly. - */ - public long retryTime() { - boolean noFailures = false; - synchronized (txn){ - noFailures = (failCnt.get() == 0 && txn.getState() != PREPARED); - } - if (noFailures) { // no failures - if (logger.isLoggable(Level.FINEST)) { - logger.log(Level.FINEST, "{0} retryTime adds {1}", - new Object[]{this, Long.valueOf(deltaT)}); - } - - nextQuery.addAndGet(deltaT); - synchronized (this){ - if (deltaT < MAX_DELTA_T) - deltaT = Math.min(deltaT * 2, MAX_DELTA_T); - } - } else { - if (logger.isLoggable(Level.FINEST)) { - logger.log(Level.FINEST, "{0} retryTime adds {1} (for {2})", - new Object[]{this, Long.valueOf(BETWEEN_EXCEPTIONS), - (failCnt.get() != 0 ? "failure" : "PREPARED")}); - } - nextQuery.addAndGet(BETWEEN_EXCEPTIONS); - } - - return nextQuery.get(); - } - - /** - * Add a ``sibling'' transaction, one that is now blocking progress - * on one of the same entries. For example, if a client is blocked - * on a <code>read</code>, another transaction can read the same - * entry, thereby also blocking that same client. This means that - * the transaction for the second <code>read</code> must be - * watched, too. The list of queries for the second transaction - * might be less that the list of those in this transaction, but - * the process of figuring out the subset is too expensive, since - * we have tried to make the checking process itself cheap, - * anyway. So we add all queries this task is currently monitoring - * to the task monitoring the second transaction. If there are - * no queries, then the blocking occurred because of a short query - * or all the queries have expired, in which case the second transaction - * isn't blocking the way of anything currently, so this method does - * nothing. - * <p> - * Of course, in order to avoid blocking the thread that is calling - * this (which is trying to perform a <code>read</code>, after - * all), we simply add each lease in this task to the monitor's - * queue. - * - */ - // @see TxnEntryHandle#monitor - //!! Would it be worth the overhead to make TxnEntryHandle.monitor - //!! search for the transaction with the smallest set of leases? -arnold - synchronized void addSibling(Txn txn) { - if (queries == null || queries.size() == 0) - return; - Collection<Txn> sibling = Collections.nCopies(1, txn); - Iterator<QueryWatcher> it = queries.keySet().iterator(); - while (it.hasNext()) { - QueryWatcher query = it.next(); - if (query != null) // from a weak map, so might be null - monitor.add(query, sibling); - } - } - - /** - * Try to see if this transaction should be aborted. This returns - * <code>true</code> (don't repeat the task) if it knows that the - * transaction is no longer interesting to anyone. - */ - public boolean tryOnce() { - if (logger.isLoggable(Level.FINEST)) { - logger.log(Level.FINEST, "{0} attempt {1} mustQuery:{2}", - new Object[]{this, Integer.valueOf(attempt()), - Boolean.valueOf(mustQuery) }); - } - - /* - * The first time we do nothing, since RetryTask invokes run first, - * but we want to wait a bit before testing the transaction. - */ - if (attempt() == 0) - return false; - int txnState; - synchronized (txn){ - txnState = txn.getState(); - } - if (logger.isLoggable(Level.FINEST)) { - logger.log(Level.FINEST, "{0} txn.getState() = {1}", - new Object[]{this, Integer.valueOf(txnState)}); - } - - // not active or prepared == no longer blocking - - if (txnState != ACTIVE && txnState != PREPARED) - return true; - - // if we're prepared, test every time -- this shouldn't take long - mustQuery |= (txnState == PREPARED); - - /* - * Go through the resources to see if we can find one still active - * that cares. Must be synchronized since we test, then clear -- - * another thread that set the flag between the test and clear - * would have its requirements lost. - */ - synchronized (this) { - if (!mustQuery) { // then try resources - if (queries == null) // no resources, so nobody wants it - return false; // try again next time - - Iterator<QueryWatcher> it = queries.keySet().iterator(); - boolean foundNeed = false; - - if (logger.isLoggable(Level.FINEST)) { - logger.log(Level.FINEST, "{0} nextQuery {1}", - new Object[]{this, nextQuery}); - } - - while (it.hasNext()) { - QueryWatcher query = it.next(); - if (query == null) // gone -- the map will reap it - continue; - if (logger.isLoggable(Level.FINEST)) { - logger.log(Level.FINEST, - "{0} query.getExpiration() {1}", - new Object[]{this, - Long.valueOf(query.getExpiration())}); - } - - if (query.getExpiration() < nextQuery.get() || query.isResolved()) { - it.remove(); - } // expired, so we don't care about it - else { - foundNeed = true; - break; - } - } - - if (logger.isLoggable(Level.FINEST)) { - logger.log(Level.FINEST, "{0} foundNeed = {1}", - new Object[]{this, Boolean.valueOf(foundNeed)}); - } - - if (!foundNeed) // nobody wants it - return false; // try again next time - } - mustQuery = false; // clear it for next time - } - - /* - * Now we know (a) the transaction itself is alive, and (b) some - * lease still cares. Make sure it's still active as far as the - * it knows, and if it is, then ask the manager about it. - */ - ServerTransaction tr; - try { - /* This may fix a broken Txn, if it does it won't get moved - * from the broken to the unbroken list. It will get - * moved eventually, but it does seem unfortunate it does - * not happen immediately - */ - tr = txn.getTransaction( - monitor.space().getRecoveredTransactionManagerPreparer()); - } catch (RemoteException e) { - final int cat = ThrowableConstants.retryable(e); - - if (cat == ThrowableConstants.BAD_INVOCATION || - cat == ThrowableConstants.BAD_OBJECT) - { - // Not likely to get better, give up - logUnpackingFailure("definite exception", Level.INFO, - true, e); - return true; - } else if (cat == ThrowableConstants.INDEFINITE) { - // try, try, again - logUnpackingFailure("indefinite exception", Levels.FAILED, - false, e); - mustQuery = true; - return false; - } else if (cat == ThrowableConstants.UNCATEGORIZED) { - // Same as above but log differently. - mustQuery = true; - logUnpackingFailure("uncategorized exception", Level.INFO, - false, e); - return false; - } else { - logger.log(Level.WARNING, "ThrowableConstants.retryable " + - "returned out of range value, " + cat, - new AssertionError(e)); - return false; - } - } catch (IOException e) { - // Not likely to get better - logUnpackingFailure("IOException", Level.INFO, true, e); - return true; - } catch (RuntimeException e) { - // Not likely to get better - logUnpackingFailure("RuntimeException", Level.INFO, true, e); - return true; - } catch (ClassNotFoundException e) { - // codebase probably down, keep trying - logUnpackingFailure("ClassNotFoundException", Levels.FAILED, - false, e); - mustQuery = true; - return false; - } - - if (logger.isLoggable(Level.FINEST)) - logger.log(Level.FINEST, "{0} tr = {1}", new Object[]{this, tr}); - - int trState; - try { - trState = tr.getState(); - } catch (TransactionException e) { - if (logger.isLoggable(Level.INFO)) - logger.log(Level.INFO, "Got TransactionException when " + - "calling getState on " + tr + ", dropping transaction " + - tr.id, e); - trState = ABORTED; - } catch (NoSuchObjectException e) { - /* It would be epsilon better to to give up immediately - * if we get a NoSuchObjectException and we are in the - * active state, however, the code to do this would - * be very complicated since we need to hold a lock to - * while reading and acting on the state. - */ - if (failCnt.incrementAndGet() >= MAX_FAILURES) { - if (logger.isLoggable(Level.INFO)) { - logger.log(Level.INFO, "Got NoSuchObjectException when " + - "calling getState on " + tr + ", this was the " + - failCnt + " RemoteException, dropping transaction" + - tr.id, e); - } - trState = ABORTED; - } else { - if (logger.isLoggable(Levels.FAILED)) { - logger.log(Levels.FAILED, "Got NoSuchObjectException " + - "when calling getState on " + tr + ", failCount = " + - failCnt + ", will retry", e); - } - mustQuery = true; // keep on trying - return false; // try again next time - } - } catch (RemoteException e) { - if (failCnt.incrementAndGet() >= MAX_FAILURES) { - /* abort if we are not prepared and not already - * aborted. If prepared retry, otherwise - * we're done. Check state and make any abort() call - * atomically so we can't accidently abort a prepared - * transaction. - */ - synchronized (txn) { - switch (txn.getState()) { - case ACTIVE: - // Safe to abort, give up - if (logger.isLoggable(Level.INFO)) { - logger.log(Level.INFO, "Got RemoteException " + - "when calling getState on " + tr + ", this " + - "was " + failCnt + " RemoteException, " + - "dropping active transaction " + tr.id, e); - } - - try { - monitor.space().abort(tr.mgr, tr.id); - return true; - } catch (UnknownTransactionException ute) { - throw new AssertionError(ute); - } catch (UnmarshalException ume) { - throw new AssertionError(ume); - } - case PREPARED: - final Level l = (failCnt.get()%MAX_FAILURES == 0)? - Level.INFO:Levels.FAILED; - if (logger.isLoggable(l)) { - logger.log(l, "Got RemoteException when calling " + - "getState on " + tr + ", this was " + - failCnt + " RemoteException, will keep " + - "prepared transaction " + tr.id, e); - } - - // Can't give up, keep on trying to find real state - mustQuery = true; - return false; - case ABORTED: - case COMMITTED: - // done - return true; - default: - throw new AssertionError("Txn in unreachable state"); - } - } - } else { - // Don't know, but not ready to give up - if (logger.isLoggable(Levels.FAILED)) { - logger.log(Levels.FAILED, "Got RemoteException when " + - "calling getState on " + tr + ", failCount = " + - failCnt + ", will retry", e); - } - - mustQuery = true; // keep on trying - return false; // try again next time - } - } - - if (logger.isLoggable(Level.FINER)) { - logger.log(Level.FINER, "{0} trState = {1}", - new Object[]{this, Integer.valueOf(trState)}); - } - - failCnt.set(0); // reset failures -- we got a response - - /* - * If the two states aren't the same, the state changed and we - * need to account for that locally here by calling the method - * that would make the change (the one we should have gotten. - * (We use the external forms of abort, commit, etc., because - * they are what the manager would call, and therefore these - * calls will always do exactly what the incoming manager - * calls would have done. I don't want this to be fragile by - * bypassing those calls and going straight to the Txn - * object's calls, which might skip something important in the - * OutriggerServerImpl calls). - */ - - if (trState != txnState) { - if (logger.isLoggable(Level.FINER)) { - logger.log(Level.FINER, - "{0} mgr state[{1}] != local state [{2}]", - new Object[]{this, - TxnConstants.getName(trState), - TxnConstants.getName(txnState)}); - } - - try { - switch (trState) { - case ABORTED: - logger.log(Level.FINER, "{0} moving to abort", this); - - monitor.space().abort(tr.mgr, tr.id); - return true; - - case COMMITTED: - logger.log(Level.FINER, "{0} moving to commit", this); - - monitor.space().commit(tr.mgr, tr.id); - return true; - } - } catch (UnknownTransactionException e) { - // we must somehow have already gotten the abort() or - // commit(), and have therefore forgotten about the - // transaction, while this code was executing - return true; - } catch (UnmarshalException ume) { - throw new AssertionError(ume); - } - - // we can't fake anything else -- the manager will have to call - // us - } - - logger.log(Level.FINEST, "{0} return false", this); - - return false; // now we know so nothing more to do - } - - /** - * Add in a resource. The lease may already be in, in which case it is - * ignored, or it may be null, in which case it was a non-leased probe - * that was blocked and we simply set <code>mustQuery</code> to - * <code>true</code>. - */ - synchronized void add(QueryWatcher query) { - if (query == null || query.getExpiration() <= nextQuery.get()) { - if (logger.isLoggable(Level.FINEST)) - logger.log(Level.FINEST, "adding resource to task -- SHORT"); - mustQuery = true; - } else { - if (logger.isLoggable(Level.FINEST)) - logger.log(Level.FINEST, "adding resource to task -- LONG"); - if (queries == null) - queries = new WeakHashMap<QueryWatcher,Collection<Txn>>();// we use it like a WeakHashSet - queries.put(query, null); - } - } - - /** Log failed unpacking attempt attempt */ - private void logUnpackingFailure(String exceptionDescription, Level level, - boolean terminal, Throwable t) - { - if (logger.isLoggable(level)) { - logger.log(level, "Encountered " + exceptionDescription + - "while unpacking exception to check state, " + - (terminal?"dropping":"keeping") + " monitoring task", t); - } - } - -} +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.river.outrigger; + +import org.apache.river.constants.TxnConstants; +import org.apache.river.constants.ThrowableConstants; +import org.apache.river.logging.Levels; +import org.apache.river.thread.wakeup.RetryTask; +import org.apache.river.thread.wakeup.WakeupManager; + +import java.io.IOException; +import java.rmi.RemoteException; +import java.rmi.UnmarshalException; +import java.rmi.NoSuchObjectException; +import java.util.Map; +import java.util.WeakHashMap; +import java.util.Iterator; +import java.util.Collection; +import java.util.Collections; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; +import java.util.logging.Level; +import java.util.logging.Logger; + +import net.jini.core.transaction.TransactionException; +import net.jini.core.transaction.UnknownTransactionException; +import net.jini.core.transaction.server.ServerTransaction; +import net.jini.core.transaction.server.TransactionConstants; + +/** + * A task that will try to validate the state of a transaction. This + * uses weak references a good deal to let the other parts of the system + * be GC'ed as necessary. + * <p> + * The retry mechanism is subtle, so bear with me. The purpose is + * to ensure that if any activity is being blocked by a given + * transaction, that transaction will be tested at some point in + * the future (if necessary, i.e., if it still is thought to be + * active). We assume it to be rare that a transactions that the + * space thinks is active is, in fact, aborted, so the algorithm is + * designed to guarantee the detection without a lot of overhead, + * specifically without a lot of RMI calls. + * <p> + * Each task has three values: a <code>nextQuery</code> time, a + * <code>mustQuery</code> boolean that force the next query to be + * made, and <code>deltaT</code>, the time at which the following + * query will be scheduled. When the task is awakened at its + * <code>nextQuery</code> time, it checks to see if it must make an + * actual query to the transaction manager, which it will do if either + * <code>mustQuery</code> is <code>true</code>, or if we know about + * any in progress queries on the space that are blocked on the + * transaction. Whether or not an actual query is made, + * <code>deltaT</code> is added to <code>nextQuery</code> to get the + * <code>nextQuery</code> time, <code>deltaT</code> is doubled, and + * <code>mustQuery</code> boolean is set to <code>false</code>. + * <p> + * There are two kinds of requests that a with which transaction + * can cause a conflict -- those with long timeouts (such as + * blocking reads and takes) and those that are under short timeouts + * (such as reads and takes with zero-length timeouts). We will + * treat them separately at several points of the algorithm. A + * short timeout is any query whose expiration time is sooner than + * the <code>nextQuery</code> time. Any other timeout is long + * If a short query arrives, <code>mustQuery</code> is set to + * <code>true</code>. + * <p> + * The result is that any time a transaction causes a conflict, if + * the query on the space has not ended by the time of the + * <code>nextQuery</code> we will attempt to poll the transaction manager. + * There will also poll the transaction manager if any conflict occurred + * on a query on the space with a short timeout. + * <p> + * The first time a transaction causes a conflict, we schedule a + * time in the future at which we will poll its status. We do not + * poll right away because often a transaction will complete on + * its own before we get to that time, making the check + * unnecessary. An instant poll is, therefore, unnecessarily + * aggressive, since giving an initial grace time will usually mean + * no poll is made at all. So if the first conflict occurs at + * <i>T</i><sub>0</sub>, the <code>nextQuery</code> value will be + * <i>T</i><sub>0</sub><code>+INITIAL_GRACE</code>, the boolean + * will be <code>true</code> to force that poll to happen, and + * <code>deltaT</code> will be set to <code>INITIAL_GRACE</code>. + * + * @author Sun Microsystems, Inc. + * + * @see TxnMonitor + */ +class TxnMonitorTask extends RetryTask + implements TransactionConstants, org.apache.river.constants.TimeConstants +{ + /** transaction being monitored */ + private final Txn txn; + + /** the monitor we were made by */ + private final TxnMonitor monitor; + + /** + * All the queries on the space (not queries to the transaction + * manager) waiting for <code>txn</code> to be resolved. + * <code>null</code> until we have at least one. Represented by + * <code>QueryWatcher</code> objects. + */ + private Map<QueryWatcher,Collection<Txn>> queries; //Sync on this. + + /** count of RemoteExceptions */ + private final AtomicInteger failCnt; + + /** + * The next time we need to poll the transaction manager + * to get <code>txn</code>'s actual state. + */ + private final AtomicLong nextQuery; + + /** + * When we're given an opportunity to poll the transaction manager + * for the <code>txn</code>'s state, do so. + */ + private volatile boolean mustQuery; + + /** next value added to <code>nextQuery</code> */ + private volatile long deltaT; + + /** + * The initial grace period before the first query. + */ + private static final long INITIAL_GRACE = 15 * SECONDS; + + /** + * The retry time when we have an encountered an exception + */ + private static final long BETWEEN_EXCEPTIONS = 15 * SECONDS; + + /** + * The largest value that <code>deltaT</code> will reach. + */ + private static final long MAX_DELTA_T = 1 * HOURS; + + /** + * The maximum number of failures allowed in a row before we simply + * give up on the transaction and consider it aborted. + */ + private static final int MAX_FAILURES = 3; + + /** Logger for logging transaction related information */ + private static final Logger logger = + Logger.getLogger(OutriggerServerImpl.txnLoggerName); + + /** + * Create a new TxnMonitorTask. + */ + TxnMonitorTask(Txn txn, TxnMonitor monitor, + ExecutorService manager, WakeupManager wakeupMgr) { + super(manager, wakeupMgr); + this.txn = txn; + this.monitor = monitor; + nextQuery = new AtomicLong(startTime()); // retryTime will add INITIAL_GRACE + deltaT = INITIAL_GRACE; + mustQuery = true; + failCnt = new AtomicInteger(); + } + + /** + * Return the time of the next query, bumping <code>deltaT</code> as + * necessary for the next iteration. If the transaction has voted + * <code>PREPARED</code> or the manager has been giving us a + * <code>RemoteException</code>, we should retry on short times; + * otherwise we back off quickly. + */ + public long retryTime() { + boolean noFailures = false; + synchronized (txn){ + noFailures = (failCnt.get() == 0 && txn.getState() != PREPARED); + } + if (noFailures) { // no failures + if (logger.isLoggable(Level.FINEST)) { + logger.log(Level.FINEST, "{0} retryTime adds {1}", + new Object[]{this, Long.valueOf(deltaT)}); + } + + nextQuery.addAndGet(deltaT); + synchronized (this){ + if (deltaT < MAX_DELTA_T) + deltaT = Math.min(deltaT * 2, MAX_DELTA_T); + } + } else { + if (logger.isLoggable(Level.FINEST)) { + logger.log(Level.FINEST, "{0} retryTime adds {1} (for {2})", + new Object[]{this, Long.valueOf(BETWEEN_EXCEPTIONS), + (failCnt.get() != 0 ? "failure" : "PREPARED")}); + } + nextQuery.addAndGet(BETWEEN_EXCEPTIONS); + } + + return nextQuery.get(); + } + + /** + * Add a ``sibling'' transaction, one that is now blocking progress + * on one of the same entries. For example, if a client is blocked + * on a <code>read</code>, another transaction can read the same + * entry, thereby also blocking that same client. This means that + * the transaction for the second <code>read</code> must be + * watched, too. The list of queries for the second transaction + * might be less that the list of those in this transaction, but + * the process of figuring out the subset is too expensive, since + * we have tried to make the checking process itself cheap, + * anyway. So we add all queries this task is currently monitoring + * to the task monitoring the second transaction. If there are + * no queries, then the blocking occurred because of a short query + * or all the queries have expired, in which case the second transaction + * isn't blocking the way of anything currently, so this method does + * nothing. + * <p> + * Of course, in order to avoid blocking the thread that is calling + * this (which is trying to perform a <code>read</code>, after + * all), we simply add each lease in this task to the monitor's + * queue. + * + */ + // @see TxnEntryHandle#monitor + //!! Would it be worth the overhead to make TxnEntryHandle.monitor + //!! search for the transaction with the smallest set of leases? -arnold + synchronized void addSibling(Txn txn) { + if (queries == null || queries.size() == 0) + return; + Collection<Txn> sibling = Collections.nCopies(1, txn); + Iterator<QueryWatcher> it = queries.keySet().iterator(); + while (it.hasNext()) { + QueryWatcher query = it.next(); + if (query != null) // from a weak map, so might be null + monitor.add(query, sibling); + } + } + + /** + * Try to see if this transaction should be aborted. This returns + * <code>true</code> (don't repeat the task) if it knows that the + * transaction is no longer interesting to anyone. + */ + public boolean tryOnce() { + if (logger.isLoggable(Level.FINEST)) { + logger.log(Level.FINEST, "{0} attempt {1} mustQuery:{2}", + new Object[]{this, Integer.valueOf(attempt()), + Boolean.valueOf(mustQuery) }); + } + + /* + * The first time we do nothing, since RetryTask invokes run first, + * but we want to wait a bit before testing the transaction. + */ + if (attempt() == 0) + return false; + int txnState; + synchronized (txn){ + txnState = txn.getState(); + } + if (logger.isLoggable(Level.FINEST)) { + logger.log(Level.FINEST, "{0} txn.getState() = {1}", + new Object[]{this, Integer.valueOf(txnState)}); + } + + // not active or prepared == no longer blocking + + if (txnState != ACTIVE && txnState != PREPARED) + return true; + + // if we're prepared, test every time -- this shouldn't take long + mustQuery |= (txnState == PREPARED); + + /* + * Go through the resources to see if we can find one still active + * that cares. Must be synchronized since we test, then clear -- + * another thread that set the flag between the test and clear + * would have its requirements lost. + */ + synchronized (this) { + if (!mustQuery) { // then try resources + if (queries == null) // no resources, so nobody wants it + return false; // try again next time + + Iterator<QueryWatcher> it = queries.keySet().iterator(); + boolean foundNeed = false; + + if (logger.isLoggable(Level.FINEST)) { + logger.log(Level.FINEST, "{0} nextQuery {1}", + new Object[]{this, nextQuery}); + } + + while (it.hasNext()) { + QueryWatcher query = it.next(); + if (query == null) // gone -- the map will reap it + continue; + if (logger.isLoggable(Level.FINEST)) { + logger.log(Level.FINEST, + "{0} query.getExpiration() {1}", + new Object[]{this, + Long.valueOf(query.getExpiration())}); + } + + if (query.getExpiration() < nextQuery.get() || query.isResolved()) { + it.remove(); + } // expired, so we don't care about it + else { + foundNeed = true; + break; + } + } + + if (logger.isLoggable(Level.FINEST)) { + logger.log(Level.FINEST, "{0} foundNeed = {1}", + new Object[]{this, Boolean.valueOf(foundNeed)}); + } + + if (!foundNeed) // nobody wants it + return false; // try again next time + } + mustQuery = false; // clear it for next time + } + + /* + * Now we know (a) the transaction itself is alive, and (b) some + * lease still cares. Make sure it's still active as far as the + * it knows, and if it is, then ask the manager about it. + */ + ServerTransaction tr; + try { + /* This may fix a broken Txn, if it does it won't get moved + * from the broken to the unbroken list. It will get + * moved eventually, but it does seem unfortunate it does + * not happen immediately + */ + tr = txn.getTransaction( + monitor.space().getRecoveredTransactionManagerPreparer()); + } catch (RemoteException e) { + final int cat = ThrowableConstants.retryable(e); + + if (cat == ThrowableConstants.BAD_INVOCATION || + cat == ThrowableConstants.BAD_OBJECT) + { + // Not likely to get better, give up + logUnpackingFailure("definite exception", Level.INFO, + true, e); + return true; + } else if (cat == ThrowableConstants.INDEFINITE) { + // try, try, again + logUnpackingFailure("indefinite exception", Levels.FAILED, + false, e); + mustQuery = true; + return false; + } else if (cat == ThrowableConstants.UNCATEGORIZED) { + // Same as above but log differently. + mustQuery = true; + logUnpackingFailure("uncategorized exception", Level.INFO, + false, e); + return false; + } else { + logger.log(Level.WARNING, "ThrowableConstants.retryable " + + "returned out of range value, " + cat, + new AssertionError(e)); + return false; + } + } catch (IOException e) { + // Not likely to get better + logUnpackingFailure("IOException", Level.INFO, true, e); + return true; + } catch (RuntimeException e) { + // Not likely to get better + logUnpackingFailure("RuntimeException", Level.INFO, true, e); + return true; + } catch (ClassNotFoundException e) { + // codebase probably down, keep trying + logUnpackingFailure("ClassNotFoundException", Levels.FAILED, + false, e); + mustQuery = true; + return false; + } + + if (logger.isLoggable(Level.FINEST)) + logger.log(Level.FINEST, "{0} tr = {1}", new Object[]{this, tr}); + + int trState; + try { + trState = tr.getState(); + } catch (TransactionException e) { + if (logger.isLoggable(Level.INFO)) + logger.log(Level.INFO, "Got TransactionException when " + + "calling getState on " + tr + ", dropping transaction " + + tr.id, e); + trState = ABORTED; + } catch (NoSuchObjectException e) { + /* It would be epsilon better to to give up immediately + * if we get a NoSuchObjectException and we are in the + * active state, however, the code to do this would + * be very complicated since we need to hold a lock to + * while reading and acting on the state. + */ + if (failCnt.incrementAndGet() >= MAX_FAILURES) { + if (logger.isLoggable(Level.INFO)) { + logger.log(Level.INFO, "Got NoSuchObjectException when " + + "calling getState on " + tr + ", this was the " + + failCnt + " RemoteException, dropping transaction" + + tr.id, e); + } + trState = ABORTED; + } else { + if (logger.isLoggable(Levels.FAILED)) { + logger.log(Levels.FAILED, "Got NoSuchObjectException " + + "when calling getState on " + tr + ", failCount = " + + failCnt + ", will retry", e); + } + mustQuery = true; // keep on trying + return false; // try again next time + } + } catch (RemoteException e) { + if (failCnt.incrementAndGet() >= MAX_FAILURES) { + /* abort if we are not prepared and not already + * aborted. If prepared retry, otherwise + * we're done. Check state and make any abort() call + * atomically so we can't accidently abort a prepared + * transaction. + */ + synchronized (txn) { + switch (txn.getState()) { + case ACTIVE: + // Safe to abort, give up + if (logger.isLoggable(Level.INFO)) { + logger.log(Level.INFO, "Got RemoteException " + + "when calling getState on " + tr + ", this " + + "was " + failCnt + " RemoteException, " + + "dropping active transaction " + tr.id, e); + } + + try { + monitor.space().abort(tr.mgr, tr.id); + return true; + } catch (UnknownTransactionException ute) { + throw new AssertionError(ute); + } catch (UnmarshalException ume) { + throw new AssertionError(ume); + } + case PREPARED: + final Level l = (failCnt.get()%MAX_FAILURES == 0)? + Level.INFO:Levels.FAILED; + if (logger.isLoggable(l)) { + logger.log(l, "Got RemoteException when calling " + + "getState on " + tr + ", this was " + + failCnt + " RemoteException, will keep " + + "prepared transaction " + tr.id, e); + } + + // Can't give up, keep on trying to find real state + mustQuery = true; + return false; + case ABORTED: + case COMMITTED: + // done + return true; + default: + throw new AssertionError("Txn in unreachable state"); + } + } + } else { + // Don't know, but not ready to give up + if (logger.isLoggable(Levels.FAILED)) { + logger.log(Levels.FAILED, "Got RemoteException when " + + "calling getState on " + tr + ", failCount = " + + failCnt + ", will retry", e); + } + + mustQuery = true; // keep on trying + return false; // try again next time + } + } + + if (logger.isLoggable(Level.FINER)) { + logger.log(Level.FINER, "{0} trState = {1}", + new Object[]{this, Integer.valueOf(trState)}); + } + + failCnt.set(0); // reset failures -- we got a response + + /* + * If the two states aren't the same, the state changed and we + * need to account for that locally here by calling the method + * that would make the change (the one we should have gotten. + * (We use the external forms of abort, commit, etc., because + * they are what the manager would call, and therefore these + * calls will always do exactly what the incoming manager + * calls would have done. I don't want this to be fragile by + * bypassing those calls and going straight to the Txn + * object's calls, which might skip something important in the + * OutriggerServerImpl calls). + */ + + if (trState != txnState) { + if (logger.isLoggable(Level.FINER)) { + logger.log(Level.FINER, + "{0} mgr state[{1}] != local state [{2}]", + new Object[]{this, + TxnConstants.getName(trState), + TxnConstants.getName(txnState)}); + } + + try { + switch (trState) { + case ABORTED: + logger.log(Level.FINER, "{0} moving to abort", this); + + monitor.space().abort(tr.mgr, tr.id); + return true; + + case COMMITTED: + logger.log(Level.FINER, "{0} moving to commit", this); + + monitor.space().commit(tr.mgr, tr.id); + return true; + } + } catch (UnknownTransactionException e) { + // we must somehow have already gotten the abort() or + // commit(), and have therefore forgotten about the + // transaction, while this code was executing + return true; + } catch (UnmarshalException ume) { + throw new AssertionError(ume); + } + + // we can't fake anything else -- the manager will have to call + // us + } + + logger.log(Level.FINEST, "{0} return false", this); + + return false; // now we know so nothing more to do + } + + /** + * Add in a resource. The lease may already be in, in which case it is + * ignored, or it may be null, in which case it was a non-leased probe + * that was blocked and we simply set <code>mustQuery</code> to + * <code>true</code>. + */ + synchronized void add(QueryWatcher query) { + if (query == null || query.getExpiration() <= nextQuery.get()) { + if (logger.isLoggable(Level.FINEST)) + logger.log(Level.FINEST, "adding resource to task -- SHORT"); + mustQuery = true; + } else { + if (logger.isLoggable(Level.FINEST)) + logger.log(Level.FINEST, "adding resource to task -- LONG"); + if (queries == null) + queries = new WeakHashMap<QueryWatcher,Collection<Txn>>();// we use it like a WeakHashSet + queries.put(query, null); + } + } + + /** Log failed unpacking attempt attempt */ + private void logUnpackingFailure(String exceptionDescription, Level level, + boolean terminal, Throwable t) + { + if (logger.isLoggable(level)) { + logger.log(level, "Encountered " + exceptionDescription + + "while unpacking exception to check state, " + + (terminal?"dropping":"keeping") + " monitoring task", t); + } + } + +} Modified: river/jtsk/modules/modularize/apache-river/river-services/outrigger/outrigger-service/src/main/java/org/apache/river/outrigger/TypeTree.java URL: http://svn.apache.org/viewvc/river/jtsk/modules/modularize/apache-river/river-services/outrigger/outrigger-service/src/main/java/org/apache/river/outrigger/TypeTree.java?rev=1879521&r1=1879520&r2=1879521&view=diff ============================================================================== --- river/jtsk/modules/modularize/apache-river/river-services/outrigger/outrigger-service/src/main/java/org/apache/river/outrigger/TypeTree.java (original) +++ river/jtsk/modules/modularize/apache-river/river-services/outrigger/outrigger-service/src/main/java/org/apache/river/outrigger/TypeTree.java Sun Jul 5 11:41:39 2020 @@ -26,6 +26,9 @@ import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentSkipListSet; +import org.apache.river.outrigger.proxy.EntryRep; + + /** * A type tree for entries. It maintains, for each class, a list of
