djencks 2004/02/23 12:28:43
Modified: modules/connector/src/java/org/apache/geronimo/connector/work
GeronimoWorkManager.java WorkerContext.java
modules/connector/src/test/org/apache/geronimo/connector/work
PooledWorkManagerTest.java
modules/transaction/src/java/org/apache/geronimo/transaction
TransactionManagerProxy.java TransactionProxy.java
modules/transaction/src/java/org/apache/geronimo/transaction/manager
TransactionImpl.java TransactionManagerImpl.java
XidFactory.java XidImpl.java
Added: modules/transaction/src/java/org/apache/geronimo/transaction
XAWork.java
modules/transaction/src/java/org/apache/geronimo/transaction/manager
XidImporter.java
Log:
add remote tx importing support. no tests yet.
Revision Changes Path
1.2 +17 -29
incubator-geronimo/modules/connector/src/java/org/apache/geronimo/connector/work/GeronimoWorkManager.java
Index: GeronimoWorkManager.java
===================================================================
RCS file:
/home/cvs/incubator-geronimo/modules/connector/src/java/org/apache/geronimo/connector/work/GeronimoWorkManager.java,v
retrieving revision 1.1
retrieving revision 1.2
diff -u -r1.1 -r1.2
--- GeronimoWorkManager.java 23 Jan 2004 05:56:11 -0000 1.1
+++ GeronimoWorkManager.java 23 Feb 2004 20:28:42 -0000 1.2
@@ -66,11 +66,10 @@
import org.apache.geronimo.connector.work.pool.StartWorkExecutorPool;
import org.apache.geronimo.connector.work.pool.SyncWorkExecutorPool;
import org.apache.geronimo.connector.work.pool.WorkExecutorPool;
-import org.apache.geronimo.gbean.GAttributeInfo;
import org.apache.geronimo.gbean.GBeanInfo;
import org.apache.geronimo.gbean.GBeanInfoFactory;
import org.apache.geronimo.gbean.GConstructorInfo;
-import org.apache.geronimo.gbean.GOperationInfo;
+import org.apache.geronimo.transaction.XAWork;
/**
* WorkManager implementation which uses under the cover three
WorkExecutorPool
@@ -90,8 +89,6 @@
private final static int DEFAULT_MIN_POOL_SIZE = 0;
private final static int DEFAULT_MAX_POOL_SIZE = 10;
- private static final GBeanInfo GBEAN_INFO;
-
/**
* Pool of threads used by this WorkManager in order to process
* the Work instances submitted via the doWork methods.
@@ -110,21 +107,24 @@
*/
private final WorkExecutorPool scheduledWorkExecutorPool;
+ private final XAWork xaWork;
+
/**
* Create a WorkManager.
*/
public GeronimoWorkManager() {
- this(DEFAULT_MIN_POOL_SIZE, DEFAULT_MAX_POOL_SIZE);
+ this(DEFAULT_MIN_POOL_SIZE, DEFAULT_MAX_POOL_SIZE, null);
}
- public GeronimoWorkManager(int minSize, int maxSize) {
- this(minSize, maxSize, minSize, maxSize, minSize, maxSize);
+ public GeronimoWorkManager(int minSize, int maxSize, XAWork xaWork) {
+ this(minSize, maxSize, minSize, maxSize, minSize, maxSize, xaWork);
}
- public GeronimoWorkManager(int syncMinSize, int syncMaxSize, int
startMinSize, int startMaxSize, int schedMinSize, int schedMaxSize) {
+ public GeronimoWorkManager(int syncMinSize, int syncMaxSize, int
startMinSize, int startMaxSize, int schedMinSize, int schedMaxSize, XAWork
xaWork) {
syncWorkExecutorPool = new SyncWorkExecutorPool(syncMinSize,
syncMaxSize);
startWorkExecutorPool = new StartWorkExecutorPool(startMinSize,
startMaxSize);
scheduledWorkExecutorPool = new
ScheduleWorkExecutorPool(schedMinSize, schedMaxSize);
+ this.xaWork = xaWork;
}
public int getSyncThreadCount() {
@@ -204,7 +204,7 @@
WorkListener workListener)
throws WorkException {
WorkerContext workWrapper =
- new WorkerContext(work, startTimeout, execContext,
workListener);
+ new WorkerContext(work, startTimeout, execContext, xaWork,
workListener);
workWrapper.setThreadPriority(Thread.currentThread().getPriority());
syncWorkExecutorPool.executeWork(workWrapper);
}
@@ -229,7 +229,7 @@
WorkListener workListener)
throws WorkException {
WorkerContext workWrapper =
- new WorkerContext(work, startTimeout, execContext,
workListener);
+ new WorkerContext(work, startTimeout, execContext, xaWork,
workListener);
workWrapper.setThreadPriority(Thread.currentThread().getPriority());
startWorkExecutorPool.executeWork(workWrapper);
return System.currentTimeMillis() - workWrapper.getAcceptedTime();
@@ -254,31 +254,19 @@
WorkListener workListener)
throws WorkException {
WorkerContext workWrapper =
- new WorkerContext(work, startTimeout, execContext,
workListener);
+ new WorkerContext(work, startTimeout, execContext, xaWork,
workListener);
workWrapper.setThreadPriority(Thread.currentThread().getPriority());
scheduledWorkExecutorPool.executeWork(workWrapper);
}
+ public static final GBeanInfo GBEAN_INFO;
+
static {
GBeanInfoFactory infoFactory = new
GBeanInfoFactory(GeronimoWorkManager.class.getName());
- infoFactory.addAttribute(new GAttributeInfo("SyncThreadCount",
true));
- infoFactory.addAttribute(new GAttributeInfo("SyncMinimumPoolSize",
true));
- infoFactory.addAttribute(new GAttributeInfo("SyncMaximumPoolSize",
true));
- infoFactory.addAttribute(new GAttributeInfo("StartThreadCount",
true));
- infoFactory.addAttribute(new GAttributeInfo("StartMinimumPoolSize",
true));
- infoFactory.addAttribute(new GAttributeInfo("StartMaximumPoolSize",
true));
- infoFactory.addAttribute(new GAttributeInfo("ScheduledThreadCount",
true));
- infoFactory.addAttribute(new
GAttributeInfo("ScheduledMinimumPoolSize", true));
- infoFactory.addAttribute(new
GAttributeInfo("ScheduledMaximumPoolSize", true));
- infoFactory.addOperation(new GOperationInfo("doWork", new
String[]{Work.class.getName()}));
- infoFactory.addOperation(new GOperationInfo("doWork", new
String[]{Work.class.getName(), Long.TYPE.getName(),
ExecutionContext.class.getName(), WorkListener.class.getName()}));
- infoFactory.addOperation(new GOperationInfo("startWork", new
String[]{Work.class.getName()}));
- infoFactory.addOperation(new GOperationInfo("startWork", new
String[]{Work.class.getName(), Long.TYPE.getName(),
ExecutionContext.class.getName(), WorkListener.class.getName()}));
- infoFactory.addOperation(new GOperationInfo("scheduleWork", new
String[]{Work.class.getName()}));
- infoFactory.addOperation(new GOperationInfo("scheduleWork", new
String[]{Work.class.getName(), Long.TYPE.getName(),
ExecutionContext.class.getName(), WorkListener.class.getName()}));
+ infoFactory.addInterface(WorkManager.class, new
String[]{"SyncMinimumPoolSize", "SyncMaximumPoolSize", "StartMinimumPoolSize",
"StartMaximumPoolSize", "ScheduledMinimumPoolSize", "ScheduledMaximumPoolSize",
"XAWork"});
infoFactory.setConstructor(new GConstructorInfo(
- new String[]{"SyncMinimumPoolSize", "SyncMaximumPoolSize",
"StartMinimumPoolSize", "StartMaximumPoolSize", "ScheduledMinimumPoolSize",
"ScheduledMaximumPoolSize"},
- new Class[]{Integer.TYPE, Integer.TYPE, Integer.TYPE,
Integer.TYPE, Integer.TYPE, Integer.TYPE}));
+ new String[]{"SyncMinimumPoolSize", "SyncMaximumPoolSize",
"StartMinimumPoolSize", "StartMaximumPoolSize", "ScheduledMinimumPoolSize",
"ScheduledMaximumPoolSize", "XAWork"},
+ new Class[]{Integer.TYPE, Integer.TYPE, Integer.TYPE,
Integer.TYPE, Integer.TYPE, Integer.TYPE, XAWork.class}));
GBEAN_INFO = infoFactory.getBeanInfo();
}
1.2 +22 -5
incubator-geronimo/modules/connector/src/java/org/apache/geronimo/connector/work/WorkerContext.java
Index: WorkerContext.java
===================================================================
RCS file:
/home/cvs/incubator-geronimo/modules/connector/src/java/org/apache/geronimo/connector/work/WorkerContext.java,v
retrieving revision 1.1
retrieving revision 1.2
diff -u -r1.1 -r1.2
--- WorkerContext.java 23 Jan 2004 05:56:11 -0000 1.1
+++ WorkerContext.java 23 Feb 2004 20:28:42 -0000 1.2
@@ -67,6 +67,7 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.geronimo.transaction.XAWork;
import EDU.oswego.cs.dl.util.concurrent.Latch;
@@ -118,7 +119,9 @@
/**
* Execution context of the actual work to be executed.
*/
- private ExecutionContext executionContext;
+ private final ExecutionContext executionContext;
+
+ private final XAWork xaWork;
/**
* Listener to be notified during the life-cycle of the work treatment.
@@ -147,6 +150,8 @@
*/
public WorkerContext(Work aWork) {
adaptee = aWork;
+ executionContext = null;
+ xaWork = null;
}
/**
@@ -162,11 +167,13 @@
* work completed) occur.
*/
public WorkerContext(Work aWork, long aStartTimeout,
- ExecutionContext execContext,
- WorkListener workListener) {
+ ExecutionContext execContext,
+ XAWork xaWork,
+ WorkListener workListener) {
adaptee = aWork;
startTimeOut = aStartTimeout;
executionContext = execContext;
+ this.xaWork = xaWork;
if (null != workListener) {
this.workListener = workListener;
}
@@ -303,7 +310,17 @@
new WorkEvent(this, WorkEvent.WORK_STARTED, adaptee, null));
startLatch.release();
try {
- adaptee.run();
+ if (executionContext == null || executionContext.getXid() ==
null) {
+ adaptee.run();
+ } else {
+ try {
+ xaWork.begin(executionContext.getXid(),
executionContext.getTransactionTimeout());
+ adaptee.run();
+ } finally {
+ xaWork.end(executionContext.getXid());
+ }
+
+ }
workListener.workCompleted(
new WorkEvent(this, WorkEvent.WORK_COMPLETED, adaptee,
null));
} catch (Throwable e) {
1.2 +2 -2
incubator-geronimo/modules/connector/src/test/org/apache/geronimo/connector/work/PooledWorkManagerTest.java
Index: PooledWorkManagerTest.java
===================================================================
RCS file:
/home/cvs/incubator-geronimo/modules/connector/src/test/org/apache/geronimo/connector/work/PooledWorkManagerTest.java,v
retrieving revision 1.1
retrieving revision 1.2
diff -u -r1.1 -r1.2
--- PooledWorkManagerTest.java 23 Jan 2004 05:56:11 -0000 1.1
+++ PooledWorkManagerTest.java 23 Feb 2004 20:28:43 -0000 1.2
@@ -84,7 +84,7 @@
private static final int m_tempo = 200;
protected void setUp() throws Exception {
- m_workManager = new GeronimoWorkManager(1, 1);
+ m_workManager = new GeronimoWorkManager(1, 1, null);
}
public void testDoWork() throws Exception {
1.3 +98 -15
incubator-geronimo/modules/transaction/src/java/org/apache/geronimo/transaction/TransactionManagerProxy.java
Index: TransactionManagerProxy.java
===================================================================
RCS file:
/home/cvs/incubator-geronimo/modules/transaction/src/java/org/apache/geronimo/transaction/TransactionManagerProxy.java,v
retrieving revision 1.2
retrieving revision 1.3
diff -u -r1.2 -r1.3
--- TransactionManagerProxy.java 30 Jan 2004 01:32:00 -0000 1.2
+++ TransactionManagerProxy.java 23 Feb 2004 20:28:43 -0000 1.3
@@ -55,6 +55,11 @@
*/
package org.apache.geronimo.transaction;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.Set;
+import java.util.HashSet;
+
import javax.resource.spi.XATerminator;
import javax.transaction.HeuristicMixedException;
import javax.transaction.HeuristicRollbackException;
@@ -74,6 +79,7 @@
import org.apache.geronimo.gbean.GConstructorInfo;
import org.apache.geronimo.gbean.GAttributeInfo;
import org.apache.geronimo.transaction.manager.TransactionManagerImpl;
+import org.apache.geronimo.transaction.manager.XidImporter;
/**
* A wrapper for a TransactionManager that wraps all Transactions in a
TransactionProxy
@@ -83,19 +89,23 @@
*
* @version $Revision$ $Date$
*/
-public class TransactionManagerProxy implements TransactionManager,
XATerminator {
+public class TransactionManagerProxy implements TransactionManager,
XATerminator, XAWork {
public static final GBeanInfo GBEAN_INFO;
private final TransactionManager delegate;
+ private final XidImporter importer;
private final ThreadLocal threadTx = new ThreadLocal();
+ private final Map importedTransactions = new HashMap();
+ private Set activeTransactions = new HashSet();
/**
* Constructor taking the TransactionManager to wrap.
* @param delegate the TransactionManager that should be wrapped
*/
- public TransactionManagerProxy(TransactionManager delegate) {
+ public TransactionManagerProxy(TransactionManager delegate, XidImporter
importer) {
this.delegate = delegate;
+ this.importer = importer;
}
/**
@@ -103,6 +113,7 @@
*/
public TransactionManagerProxy() {
this.delegate = new TransactionManagerImpl();
+ this.importer = null;
}
public void setTransactionTimeout(int timeout) throws SystemException {
@@ -174,22 +185,55 @@
/**
* @see javax.resource.spi.XATerminator#commit(javax.transaction.xa.Xid,
boolean)
*/
- public void commit(Xid arg0, boolean arg1) throws XAException {
- throw new XAException("Not implemented.");
+ public void commit(Xid xid, boolean onePhase) throws XAException {
+ TransactionProxy tx = (TransactionProxy)
importedTransactions.remove(xid);
+ if (tx == null) {
+ throw new XAException("No imported transaction for xid: " + xid);
+ }
+
+ try {
+ int status = tx.getStatus();
+ assert status == Status.STATUS_ACTIVE || status ==
Status.STATUS_PREPARED;
+ } catch (SystemException e) {
+ throw new XAException();
+ }
+ importer.commit(tx.getDelegate(), onePhase);
}
/**
* @see javax.resource.spi.XATerminator#forget(javax.transaction.xa.Xid)
*/
- public void forget(Xid arg0) throws XAException {
- throw new XAException("Not implemented.");
+ public void forget(Xid xid) throws XAException {
+ TransactionProxy tx = (TransactionProxy)
importedTransactions.remove(xid);
+ if (tx == null) {
+ throw new XAException("No imported transaction for xid: " + xid);
+ }
+
+ try {
+ int status = tx.getStatus();
+ //assert status == Status.STATUS_ACTIVE || status ==
Status.STATUS_PREPARED;
+ } catch (SystemException e) {
+ throw new XAException();
+ }
+ importer.forget(tx.getDelegate());
}
/**
* @see javax.resource.spi.XATerminator#prepare(javax.transaction.xa.Xid)
*/
- public int prepare(Xid arg0) throws XAException {
- throw new XAException("Not implemented.");
+ public int prepare(Xid xid) throws XAException {
+ TransactionProxy tx = (TransactionProxy)
importedTransactions.get(xid);
+ if (tx == null) {
+ throw new XAException("No imported transaction for xid: " + xid);
+ }
+
+ try {
+ int status = tx.getStatus();
+ assert status == Status.STATUS_ACTIVE;
+ } catch (SystemException e) {
+ throw new XAException();
+ }
+ return importer.prepare(tx.getDelegate());
}
/**
@@ -202,8 +246,47 @@
/**
* @see
javax.resource.spi.XATerminator#rollback(javax.transaction.xa.Xid)
*/
- public void rollback(Xid arg0) throws XAException {
- throw new XAException("Not implemented.");
+ public void rollback(Xid xid) throws XAException {
+ TransactionProxy tx = (TransactionProxy)
importedTransactions.remove(xid);
+ if (tx == null) {
+ throw new XAException("No imported transaction for xid: " + xid);
+ }
+
+ try {
+ int status = tx.getStatus();
+ assert status == Status.STATUS_ACTIVE || status ==
Status.STATUS_PREPARED;
+ } catch (SystemException e) {
+ throw new XAException();
+ }
+ importer.rollback(tx.getDelegate());
+ }
+
+ public void begin(Xid xid, long txTimeoutMillis) throws XAException {
+ TransactionProxy tx = (TransactionProxy)
importedTransactions.get(xid);
+ if (tx == null) {
+ try {
+ tx = new TransactionProxy(importer.importXid(xid));
+ } catch (SystemException e) {
+ throw (XAException)new XAException("Could not import
xid").initCause(e);
+ }
+ importedTransactions.put(xid, tx);
+ }
+ if (activeTransactions.contains(tx)) {
+ throw new XAException("Xid already active");
+ }
+ activeTransactions.add(tx);
+ threadTx.set(tx);
+ importer.setTransactionTimeout(txTimeoutMillis);
+ }
+
+ public void end(Xid xid) throws XAException {
+ TransactionProxy tx = (TransactionProxy)
importedTransactions.get(xid);
+ if (tx == null) {
+ throw new XAException("No imported transaction for xid: " + xid);
+ }
+ if (!activeTransactions.remove(tx)) {
+ throw new XAException("tx not active for xid: " + xid);
+ }
}
//for now we use the default constructor.
@@ -211,17 +294,17 @@
GBeanInfoFactory infoFactory = new
GBeanInfoFactory(TransactionManagerProxy.class.getName());
infoFactory.setConstructor(new GConstructorInfo(
- new String[] { "Delegate" },
- new Class[] { TransactionManager.class }));
+ new String[]{"Delegate"},
+ new Class[]{TransactionManager.class}));
infoFactory.addAttribute(new GAttributeInfo("Delegate", true));
- infoFactory.addOperation(new GOperationInfo("setTransactionTimeout",
new String[] {Integer.TYPE.getName()}));
+ infoFactory.addOperation(new GOperationInfo("setTransactionTimeout",
new String[]{Integer.TYPE.getName()}));
infoFactory.addOperation(new GOperationInfo("begin"));
infoFactory.addOperation(new GOperationInfo("getStatus"));
infoFactory.addOperation(new GOperationInfo("getTransaction"));
infoFactory.addOperation(new GOperationInfo("suspend"));
- infoFactory.addOperation(new GOperationInfo("resume", new String[]
{Transaction.class.getName()}));
+ infoFactory.addOperation(new GOperationInfo("resume", new
String[]{Transaction.class.getName()}));
infoFactory.addOperation(new GOperationInfo("commit"));
infoFactory.addOperation(new GOperationInfo("rollback"));
infoFactory.addOperation(new GOperationInfo("setRollbackOnly"));
1.2 +5 -1
incubator-geronimo/modules/transaction/src/java/org/apache/geronimo/transaction/TransactionProxy.java
Index: TransactionProxy.java
===================================================================
RCS file:
/home/cvs/incubator-geronimo/modules/transaction/src/java/org/apache/geronimo/transaction/TransactionProxy.java,v
retrieving revision 1.1
retrieving revision 1.2
diff -u -r1.1 -r1.2
--- TransactionProxy.java 23 Jan 2004 18:54:15 -0000 1.1
+++ TransactionProxy.java 23 Feb 2004 20:28:43 -0000 1.2
@@ -105,4 +105,8 @@
public void setRollbackOnly() throws IllegalStateException,
SystemException {
delegate.setRollbackOnly();
}
+
+ Transaction getDelegate() {
+ return delegate;
+ }
}
1.1
incubator-geronimo/modules/transaction/src/java/org/apache/geronimo/transaction/XAWork.java
Index: XAWork.java
===================================================================
package org.apache.geronimo.transaction;
import javax.transaction.xa.Xid;
import javax.transaction.xa.XAException;
/**
* primarily an interface between the WorkManager/ExecutionContext and the tm.
*
* @version $Revision: 1.1 $ $Date: 2004/02/23 20:28:43 $
*
* */
public interface XAWork {
void begin(Xid xid, long txTimeout) throws XAException;
void end(Xid xid) throws XAException;
}
1.2 +204 -132
incubator-geronimo/modules/transaction/src/java/org/apache/geronimo/transaction/manager/TransactionImpl.java
Index: TransactionImpl.java
===================================================================
RCS file:
/home/cvs/incubator-geronimo/modules/transaction/src/java/org/apache/geronimo/transaction/manager/TransactionImpl.java,v
retrieving revision 1.1
retrieving revision 1.2
diff -u -r1.1 -r1.2
--- TransactionImpl.java 23 Jan 2004 18:54:16 -0000 1.1
+++ TransactionImpl.java 23 Feb 2004 20:28:43 -0000 1.2
@@ -63,6 +63,7 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
+
import javax.transaction.HeuristicMixedException;
import javax.transaction.HeuristicRollbackException;
import javax.transaction.RollbackException;
@@ -94,9 +95,13 @@
private Map xaResources = new HashMap(3);
TransactionImpl(XidFactory xidFactory, TransactionLog txnLog) throws
SystemException {
+ this(xidFactory.createXid(), xidFactory, txnLog);
+ }
+
+ TransactionImpl(Xid xid, XidFactory xidFactory, TransactionLog txnLog)
throws SystemException {
this.xidFactory = xidFactory;
this.txnLog = txnLog;
- this.xid = xidFactory.createXid();
+ this.xid = xid;
try {
txnLog.begin(xid);
} catch (IOException e) {
@@ -114,16 +119,16 @@
public synchronized void setRollbackOnly() throws IllegalStateException,
SystemException {
switch (status) {
- case Status.STATUS_ACTIVE:
- case Status.STATUS_PREPARING:
- status = Status.STATUS_MARKED_ROLLBACK;
- break;
- case Status.STATUS_MARKED_ROLLBACK:
- case Status.STATUS_ROLLING_BACK:
- // nothing to do
- break;
- default:
- throw new IllegalStateException("Cannot set rollback only,
status is " + getStateString(status));
+ case Status.STATUS_ACTIVE:
+ case Status.STATUS_PREPARING:
+ status = Status.STATUS_MARKED_ROLLBACK;
+ break;
+ case Status.STATUS_MARKED_ROLLBACK:
+ case Status.STATUS_ROLLING_BACK:
+ // nothing to do
+ break;
+ default:
+ throw new IllegalStateException("Cannot set rollback only,
status is " + getStateString(status));
}
}
@@ -132,13 +137,13 @@
throw new IllegalArgumentException("Synchronization is null");
}
switch (status) {
- case Status.STATUS_ACTIVE:
- case Status.STATUS_PREPARING:
- break;
- case Status.STATUS_MARKED_ROLLBACK:
- throw new RollbackException("Transaction is marked for
rollback");
- default:
- throw new IllegalStateException("Status is " +
getStateString(status));
+ case Status.STATUS_ACTIVE:
+ case Status.STATUS_PREPARING:
+ break;
+ case Status.STATUS_MARKED_ROLLBACK:
+ throw new RollbackException("Transaction is marked for
rollback");
+ default:
+ throw new IllegalStateException("Status is " +
getStateString(status));
}
syncList.add(synch);
}
@@ -148,12 +153,12 @@
throw new IllegalArgumentException("XAResource is null");
}
switch (status) {
- case Status.STATUS_ACTIVE:
- break;
- case Status.STATUS_MARKED_ROLLBACK:
- throw new RollbackException("Transaction is marked for
rollback");
- default:
- throw new IllegalStateException("Status is " +
getStateString(status));
+ case Status.STATUS_ACTIVE:
+ break;
+ case Status.STATUS_MARKED_ROLLBACK:
+ throw new RollbackException("Transaction is marked for
rollback");
+ default:
+ throw new IllegalStateException("Status is " +
getStateString(status));
}
try {
@@ -196,11 +201,11 @@
throw new IllegalArgumentException("XAResource is null");
}
switch (status) {
- case Status.STATUS_ACTIVE:
- case Status.STATUS_MARKED_ROLLBACK:
- break;
- default:
- throw new IllegalStateException("Status is " +
getStateString(status));
+ case Status.STATUS_ACTIVE:
+ case Status.STATUS_MARKED_ROLLBACK:
+ break;
+ default:
+ throw new IllegalStateException("Status is " +
getStateString(status));
}
ResourceManager manager = (ResourceManager)
xaResources.remove(xaRes);
if (manager == null) {
@@ -215,27 +220,21 @@
}
}
+ //Transaction method, does 2pc
public void commit() throws HeuristicMixedException,
HeuristicRollbackException, RollbackException, SecurityException,
SystemException {
- synchronized (this) {
- switch (status) {
- case Status.STATUS_ACTIVE:
- case Status.STATUS_MARKED_ROLLBACK:
- break;
- default:
- throw new IllegalStateException("Status is " +
getStateString(status));
- }
- }
+ beforePrepare();
- beforeCompletion();
- endResources();
try {
- LinkedList rms;
+ if (status == Status.STATUS_MARKED_ROLLBACK) {
+ rollbackResources(resourceManagers);
+ throw new RollbackException("Unable to commit");
+ }
synchronized (this) {
if (status == Status.STATUS_ACTIVE) {
- if (resourceManagers.size() == 0) {
+ if (this.resourceManagers.size() == 0) {
// nothing to commit
status = Status.STATUS_COMMITTED;
- } else if (resourceManagers.size() == 1) {
+ } else if (this.resourceManagers.size() == 1) {
// one-phase commit decision
status = Status.STATUS_COMMITTING;
} else {
@@ -244,12 +243,11 @@
}
}
// resourceManagers is now immutable
- rms = resourceManagers;
}
// one-phase
- if (rms.size() == 1) {
- ResourceManager manager = (ResourceManager) rms.getFirst();
+ if (resourceManagers.size() == 1) {
+ ResourceManager manager = (ResourceManager)
resourceManagers.getFirst();
try {
manager.committer.commit(manager.branchId, true);
synchronized (this) {
@@ -267,74 +265,72 @@
}
// two-phase
- try {
- txnLog.prepare(xid);
- } catch (IOException e) {
- try {
- rollbackResources(rms);
- } catch (Exception se) {
- log.error("Unable to rollback after failure to log
prepare", se.getCause());
- }
- SystemException ex = new SystemException("Error logging
prepare; transaction was rolled back)");
- ex.initCause(e);
- throw ex;
+ boolean willCommit = internalPrepare();
+
+ // notify the RMs
+ if (willCommit) {
+ commitResources(resourceManagers);
+ } else {
+ rollbackResources(resourceManagers);
+ throw new RollbackException("Unable to commit");
}
- for (Iterator i = rms.iterator(); i.hasNext();) {
- synchronized (this) {
- if (status != Status.STATUS_PREPARING) {
- // we were marked for rollback
- break;
- }
- }
- ResourceManager manager = (ResourceManager) i.next();
- try {
- int vote = manager.committer.prepare(manager.branchId);
- if (vote == XAResource.XA_RDONLY) {
- // we don't need to consider this RM any more
- i.remove();
- }
- } catch (XAException e) {
- synchronized (this) {
- status = Status.STATUS_MARKED_ROLLBACK;
- }
- }
+ } finally {
+ afterCompletion();
+ synchronized (this) {
+ status = Status.STATUS_NO_TRANSACTION;
}
+ }
+ }
- // decision time...
- boolean willCommit;
+ //Used from XATerminator for first phase in a remotely controlled tx.
+ int prepare() throws SystemException, RollbackException {
+ beforePrepare();
+ int result = XAResource.XA_RDONLY;
+ try {
+ LinkedList rms;
synchronized (this) {
- willCommit = (status != Status.STATUS_MARKED_ROLLBACK);
- if (willCommit) {
- status = Status.STATUS_PREPARED;
+ if (status == Status.STATUS_ACTIVE) {
+ if (resourceManagers.size() == 0) {
+ // nothing to commit
+ status = Status.STATUS_COMMITTED;
+ return result;
+ } else {
+ // start prepare part of two-phase
+ status = Status.STATUS_PREPARING;
+ }
}
+ // resourceManagers is now immutable
+ rms = resourceManagers;
}
- // log our decision
- try {
- if (willCommit) {
- txnLog.commit(xid);
- } else {
- txnLog.rollback(xid);
- }
- } catch (IOException e) {
- try {
- rollbackResources(rms);
- } catch (Exception se) {
- log.error("Unable to rollback after failure to log
decision", se.getCause());
- }
- SystemException ex = new SystemException("Error logging
decision (outcome is unknown)");
- ex.initCause(e);
- throw ex;
- }
+ boolean willCommit = internalPrepare();
// notify the RMs
if (willCommit) {
- commitResources(rms);
+ if (!rms.isEmpty()) {
+ result = XAResource.XA_OK;
+ }
+
} else {
rollbackResources(rms);
throw new RollbackException("Unable to commit");
}
} finally {
+ if (result == XAResource.XA_RDONLY) {
+ afterCompletion();
+ synchronized (this) {
+ status = Status.STATUS_NO_TRANSACTION;
+ }
+ }
+ }
+ return result;
+ }
+
+ //used from XATerminator for commit phase of non-readonly remotely
controlled tx.
+ void preparedCommit() throws SystemException {
+ try {
+ commitResources(resourceManagers);
+ } finally {
afterCompletion();
synchronized (this) {
status = Status.STATUS_NO_TRANSACTION;
@@ -342,17 +338,94 @@
}
}
+ //helper method used by Transaction.commit and XATerminator prepare.
+ private void beforePrepare() {
+ synchronized (this) {
+ switch (status) {
+ case Status.STATUS_ACTIVE:
+ case Status.STATUS_MARKED_ROLLBACK:
+ break;
+ default:
+ throw new IllegalStateException("Status is " +
getStateString(status));
+ }
+ }
+
+ beforeCompletion();
+ endResources();
+ }
+
+
+ //helper method used by Transaction.commit and XATerminator prepare.
+ private boolean internalPrepare() throws SystemException {
+ try {
+ txnLog.prepare(xid);
+ } catch (IOException e) {
+ try {
+ rollbackResources(resourceManagers);
+ } catch (Exception se) {
+ log.error("Unable to rollback after failure to log prepare",
se.getCause());
+ }
+ throw (SystemException) new SystemException("Error logging
prepare; transaction was rolled back)").initCause(e);
+ }
+ for (Iterator i = resourceManagers.iterator(); i.hasNext();) {
+ synchronized (this) {
+ if (status != Status.STATUS_PREPARING) {
+ // we were marked for rollback
+ break;
+ }
+ }
+ ResourceManager manager = (ResourceManager) i.next();
+ try {
+ int vote = manager.committer.prepare(manager.branchId);
+ if (vote == XAResource.XA_RDONLY) {
+ // we don't need to consider this RM any more
+ i.remove();
+ }
+ } catch (XAException e) {
+ synchronized (this) {
+ status = Status.STATUS_MARKED_ROLLBACK;
+ }
+ }
+ }
+
+ // decision time...
+ boolean willCommit;
+ synchronized (this) {
+ willCommit = (status != Status.STATUS_MARKED_ROLLBACK);
+ if (willCommit) {
+ status = Status.STATUS_PREPARED;
+ }
+ }
+
+ // log our decision
+ try {
+ if (willCommit) {
+ txnLog.commit(xid);
+ } else {
+ txnLog.rollback(xid);
+ }
+ } catch (IOException e) {
+ try {
+ rollbackResources(resourceManagers);
+ } catch (Exception se) {
+ log.error("Unable to rollback after failure to log
decision", se.getCause());
+ }
+ throw (SystemException) new SystemException("Error logging
decision (outcome is unknown)").initCause(e);
+ }
+ return willCommit;
+ }
+
public void rollback() throws IllegalStateException, SystemException {
List rms;
synchronized (this) {
switch (status) {
- case Status.STATUS_ACTIVE:
- status = Status.STATUS_MARKED_ROLLBACK;
- break;
- case Status.STATUS_MARKED_ROLLBACK:
- break;
- default:
- throw new IllegalStateException("Status is " +
getStateString(status));
+ case Status.STATUS_ACTIVE:
+ status = Status.STATUS_MARKED_ROLLBACK;
+ break;
+ case Status.STATUS_MARKED_ROLLBACK:
+ break;
+ default:
+ throw new IllegalStateException("Status is " +
getStateString(status));
}
rms = resourceManagers;
}
@@ -368,9 +441,7 @@
} catch (Exception se) {
log.error("Unable to rollback after failure to log
decision", se.getCause());
}
- SystemException ex = new SystemException("Error logging
rollback");
- ex.initCause(e);
- throw ex;
+ throw (SystemException) new SystemException("Error logging
rollback").initCause(e);
}
rollbackResources(rms);
} finally {
@@ -494,28 +565,28 @@
private static String getStateString(int status) {
switch (status) {
- case Status.STATUS_ACTIVE:
- return "STATUS_ACTIVE";
- case Status.STATUS_PREPARING:
- return "STATUS_PREPARING";
- case Status.STATUS_PREPARED:
- return "STATUS_PREPARED";
- case Status.STATUS_MARKED_ROLLBACK:
- return "STATUS_MARKED_ROLLBACK";
- case Status.STATUS_ROLLING_BACK:
- return "STATUS_ROLLING_BACK";
- case Status.STATUS_COMMITTING:
- return "STATUS_COMMITTING";
- case Status.STATUS_COMMITTED:
- return "STATUS_COMMITTED";
- case Status.STATUS_ROLLEDBACK:
- return "STATUS_ROLLEDBACK";
- case Status.STATUS_NO_TRANSACTION:
- return "STATUS_NO_TRANSACTION";
- case Status.STATUS_UNKNOWN:
- return "STATUS_UNKNOWN";
- default:
- throw new AssertionError();
+ case Status.STATUS_ACTIVE:
+ return "STATUS_ACTIVE";
+ case Status.STATUS_PREPARING:
+ return "STATUS_PREPARING";
+ case Status.STATUS_PREPARED:
+ return "STATUS_PREPARED";
+ case Status.STATUS_MARKED_ROLLBACK:
+ return "STATUS_MARKED_ROLLBACK";
+ case Status.STATUS_ROLLING_BACK:
+ return "STATUS_ROLLING_BACK";
+ case Status.STATUS_COMMITTING:
+ return "STATUS_COMMITTING";
+ case Status.STATUS_COMMITTED:
+ return "STATUS_COMMITTED";
+ case Status.STATUS_ROLLEDBACK:
+ return "STATUS_ROLLEDBACK";
+ case Status.STATUS_NO_TRANSACTION:
+ return "STATUS_NO_TRANSACTION";
+ case Status.STATUS_UNKNOWN:
+ return "STATUS_UNKNOWN";
+ default:
+ throw new AssertionError();
}
}
@@ -527,6 +598,7 @@
return false;
}
}
+
private static class ResourceManager {
private final XAResource committer;
1.2 +63 -2
incubator-geronimo/modules/transaction/src/java/org/apache/geronimo/transaction/manager/TransactionManagerImpl.java
Index: TransactionManagerImpl.java
===================================================================
RCS file:
/home/cvs/incubator-geronimo/modules/transaction/src/java/org/apache/geronimo/transaction/manager/TransactionManagerImpl.java,v
retrieving revision 1.1
retrieving revision 1.2
diff -u -r1.1 -r1.2
--- TransactionManagerImpl.java 23 Jan 2004 18:54:16 -0000 1.1
+++ TransactionManagerImpl.java 23 Feb 2004 20:28:43 -0000 1.2
@@ -64,6 +64,8 @@
import javax.transaction.SystemException;
import javax.transaction.Transaction;
import javax.transaction.TransactionManager;
+import javax.transaction.xa.Xid;
+import javax.transaction.xa.XAException;
import org.apache.geronimo.transaction.log.UnrecoverableLog;
@@ -72,7 +74,7 @@
*
* @version $Revision$ $Date$
*/
-public class TransactionManagerImpl implements TransactionManager {
+public class TransactionManagerImpl implements TransactionManager,
XidImporter {
private final TransactionLog txnLog;
private final XidFactory xidFactory = new XidFactory();
private volatile int timeout;
@@ -153,5 +155,64 @@
} finally {
threadTx.set(null);
}
+ }
+
+ public Transaction importXid(Xid xid) throws XAException,
SystemException {
+ if (getStatus() != Status.STATUS_NO_TRANSACTION) {
+ throw new XAException("Transaction already active in this
thread");
+ }
+ TransactionImpl tx = new TransactionImpl(xid, xidFactory, txnLog);
+ threadTx.set(tx);
+ return tx;
+ }
+
+ public void commit(Transaction tx, boolean onePhase) throws XAException {
+ if (onePhase) {
+ try {
+ tx.commit();
+ } catch (HeuristicMixedException e) {
+ throw new XAException();
+ } catch (HeuristicRollbackException e) {
+ throw new XAException();
+ } catch (RollbackException e) {
+ throw new XAException();
+ } catch (SecurityException e) {
+ throw new XAException();
+ } catch (SystemException e) {
+ throw new XAException();
+ }
+ } else {
+ try {
+ ((TransactionImpl)tx).preparedCommit();
+ } catch (SystemException e) {
+ throw new XAException();
+ }
+ }
+ }
+
+ public void forget(Transaction tx) throws XAException {
+ }
+
+ public int prepare(Transaction tx) throws XAException {
+ try {
+ return ((TransactionImpl)tx).prepare();
+ } catch (SystemException e) {
+ throw new XAException();
+ } catch (RollbackException e) {
+ throw new XAException();
+ }
+ }
+
+ public void rollback(Transaction tx) throws XAException {
+ try {
+ tx.rollback();
+ } catch (IllegalStateException e) {
+ throw new XAException();
+ } catch (SystemException e) {
+ throw new XAException();
+ }
+ }
+
+ public void setTransactionTimeout(long milliseconds) {
}
}
1.2 +8 -3
incubator-geronimo/modules/transaction/src/java/org/apache/geronimo/transaction/manager/XidFactory.java
Index: XidFactory.java
===================================================================
RCS file:
/home/cvs/incubator-geronimo/modules/transaction/src/java/org/apache/geronimo/transaction/manager/XidFactory.java,v
retrieving revision 1.1
retrieving revision 1.2
diff -u -r1.1 -r1.2
--- XidFactory.java 23 Jan 2004 18:54:16 -0000 1.1
+++ XidFactory.java 23 Feb 2004 20:28:43 -0000 1.2
@@ -67,7 +67,7 @@
* <li>4 or 16 byte IP address of host</li>
* <ol>
* @version $Revision$ $Date$
- * @todo Should have a way of setting baseId
+ * todo Should have a way of setting baseId
*/
public class XidFactory {
byte[] baseId = new byte[Xid.MAXGTRIDSIZE];
@@ -106,6 +106,11 @@
}
public Xid createBranch(Xid globalId, int branch) {
- return new XidImpl(globalId, branch);
+ byte[] branchId = (byte[]) baseId.clone();
+ branchId[0] = (byte) branch;
+ branchId[1] = (byte) (branch >>> 8);
+ branchId[2] = (byte) (branch >>> 16);
+ branchId[3] = (byte) (branch >>> 24);
+ return new XidImpl(globalId, branchId);
}
}
1.2 +7 -15
incubator-geronimo/modules/transaction/src/java/org/apache/geronimo/transaction/manager/XidImpl.java
Index: XidImpl.java
===================================================================
RCS file:
/home/cvs/incubator-geronimo/modules/transaction/src/java/org/apache/geronimo/transaction/manager/XidImpl.java,v
retrieving revision 1.1
retrieving revision 1.2
diff -u -r1.1 -r1.2
--- XidImpl.java 23 Jan 2004 18:54:16 -0000 1.1
+++ XidImpl.java 23 Feb 2004 20:28:43 -0000 1.2
@@ -76,7 +76,7 @@
*/
public XidImpl(byte[] globalId) {
this.globalId = globalId;
- this.hash = hash(globalId);
+ this.hash = hash(0, globalId);
branchId = new byte[Xid.MAXBQUALSIZE];
}
@@ -85,28 +85,20 @@
* @param global the xid of the global transaction this branch belongs to
* @param branch the branch id
*/
- public XidImpl(Xid global, int branch) {
+ public XidImpl(Xid global, byte[] branch) {
int hash;
if (global instanceof XidImpl) {
globalId = ((XidImpl) global).globalId;
hash = ((XidImpl) global).hash;
} else {
globalId = global.getGlobalTransactionId();
- hash = hash(globalId);
+ hash = hash(0, globalId);
}
- branchId = new byte[Xid.MAXBQUALSIZE];
- branchId[0] = (byte) branch;
- branchId[1] = (byte) (branch >>> 8);
- branchId[2] = (byte) (branch >>> 16);
- branchId[3] = (byte) (branch >>> 24);
- for (int i = 0; i < 4; i++) {
- hash = (hash * 37) + branchId[i];
- }
- this.hash = hash;
+ branchId = branch;
+ this.hash = hash(hash, branchId);
}
- private int hash(byte[] id) {
- int hash = 0;
+ private int hash(int hash, byte[] id) {
for (int i = 0; i < id.length; i++) {
hash = (hash * 37) + id[i];
}
1.1
incubator-geronimo/modules/transaction/src/java/org/apache/geronimo/transaction/manager/XidImporter.java
Index: XidImporter.java
===================================================================
package org.apache.geronimo.transaction.manager;
import javax.transaction.xa.Xid;
import javax.transaction.xa.XAException;
import javax.transaction.Transaction;
import javax.transaction.SystemException;
/**
*
*
* @version $Revision: 1.1 $ $Date: 2004/02/23 20:28:43 $
*
* */
public interface XidImporter {
Transaction importXid(Xid xid) throws XAException, SystemException;
void commit(Transaction tx, boolean onePhase) throws XAException;
void forget(Transaction tx) throws XAException;
int prepare(Transaction tx) throws XAException;
void rollback(Transaction tx) throws XAException;
void setTransactionTimeout(long milliseconds);
}