User: mulder
Date: 00/10/21 09:24:01
Modified: src/main/org/jboss/tm TxCapsule.java XidImpl.java
Log:
Updates to support Oracle XADataSource implementation with multiple
data sources per transaction.
- Now we actually create distinct transaction branches for each
resource enlisted
- Had to comment out JOIN optimization when isSameRM because Oracle
counts every connection against the same DB instance as the same
RM (an "optimization"), but hangs when you try to JOIN (with
JDBC 8.17 and Oracle Enterprise 8.16 for Linux)
- Two connections in the same transaction against the same DB instance
don't seem to see the work done by each other (at least for an insert)
- Need to run this through load testing, since I had to add hash maps
to XAConnectionFactory and TxCapsule and I want to make sure they
don't leak.
Revision Changes Path
1.18 +63 -24 jboss/src/main/org/jboss/tm/TxCapsule.java
Index: TxCapsule.java
===================================================================
RCS file: /products/cvs/ejboss/jboss/src/main/org/jboss/tm/TxCapsule.java,v
retrieving revision 1.17
retrieving revision 1.18
diff -u -r1.17 -r1.18
--- TxCapsule.java 2000/10/21 05:20:23 1.17
+++ TxCapsule.java 2000/10/21 16:24:00 1.18
@@ -11,6 +11,7 @@
import java.util.ArrayList;
import java.util.Set;
import java.util.HashSet;
+import java.util.HashMap;
import java.util.Iterator;
import java.util.ConcurrentModificationException;
@@ -44,7 +45,7 @@
* @author <a href="mailto:[EMAIL PROTECTED]">Marc Fleury</a>
* @author <a href="mailto:[EMAIL PROTECTED]">Ole Husgaard</a>
*
- * @version $Revision: 1.17 $
+ * @version $Revision: 1.18 $
*/
class TxCapsule implements TimeoutTarget
{
@@ -59,6 +60,7 @@
static private final int HEUR_NONE = XAException.XA_RETRY;
// Attributes ----------------------------------------------------
+ private HashMap branchXids;
// Static --------------------------------------------------------
@@ -79,6 +81,7 @@
start = System.currentTimeMillis();
this.timeout = TimeoutFactory.createTimeout(start+timeout, this);
+ branchXids = new HashMap();
}
/**
@@ -192,7 +195,7 @@
public String toString()
{
- return xid.toString();
+ return XidImpl.toString(xid);
}
// Package protected ---------------------------------------------
@@ -559,27 +562,35 @@
if (idx != -1) {
if (resourceState[idx] == RS_SUSPENDED) {
- startResource(xaRes, XAResource.TMRESUME);
+ startResource(xaRes, XAResource.TMRESUME,
(Xid)branchXids.get(xaRes));
resourceState[idx] = RS_ENLISTED;
return true;
} else if (resourceState[idx] == RS_ENDED) {
- startResource(xaRes, XAResource.TMJOIN);
+ startResource(xaRes, XAResource.TMJOIN,
(Xid)branchXids.get(xaRes));
resourceState[idx] = RS_ENLISTED;
return true;
} else
return false; // already enlisted
}
+/* This optimization hangs the Oracle XAResource
+ when you perform xaCon.start(Xid, TMJOIN)
for (int i = 0; i < resourceCount; ++i) {
if (xaRes.isSameRM(resources[i])) {
- startResource(xaRes, XAResource.TMJOIN);
- addResource(xaRes);
+ Xid parentXid = (Xid)branchXids.get(resources[i]);
+ startResource(xaRes, XAResource.TMJOIN, parentXid);
+ addResource(xaRes); // Do we still need to do this?
return true;
}
}
+*/
// New resource
// According to the JTA spec we should create a new
// transaction branch here.
- startResource(xaRes, XAResource.TMNOFLAGS);
+ Xid useXid = xid;
+ if(resourceCount > 0) {
+ useXid = createXid((Xid)branchXids.get(resources[resourceCount-1]));
+ }
+ startResource(xaRes, XAResource.TMNOFLAGS, useXid);
addResource(xaRes);
return true;
} catch(XAException e) {
@@ -948,10 +959,8 @@
private int findResource(XAResource xaRes) {
int pos = -1;
for (int i = 0; i < resourceCount; ++i)
- try { // FIXME: Do we need to join if there's more than one?
- if(resources[i].isSameRM(xaRes))
- pos = i;
- } catch(XAException e) {}
+ if(xaRes == resources[i])
+ pos = i;
return pos;
}
@@ -998,21 +1007,22 @@
* Call <code>start()</code> on the XAResource.
* This will release the lock while calling out.
*/
- private void startResource(XAResource xaRes, int flags)
+ private void startResource(XAResource xaRes, int flags, Xid currentBranchXid)
throws XAException
{
if(trace)
- Logger.debug("TxCapsule.startResource(" + xid.toString() +
+ Logger.debug("TxCapsule.startResource(" +
XidImpl.toString(currentBranchXid) +
") entered: " + xaRes.toString() +
" flags=" + flags);
unlock();
// OSH FIXME: resourceState could be incorrect during this callout.
try {
- xaRes.start(xid, flags);
+ xaRes.start(currentBranchXid, flags);
+ branchXids.put(xaRes, currentBranchXid);
} finally {
lock();
if(trace)
- Logger.debug("TxCapsule.startResource(" + xid.toString() +
+ Logger.debug("TxCapsule.startResource(" +
XidImpl.toString(currentBranchXid) +
") leaving: " + xaRes.toString() +
" flags=" + flags);
}
@@ -1025,18 +1035,19 @@
private void endResource(XAResource xaRes, int flag)
throws XAException
{
+ Xid useXid = (Xid)branchXids.get(xaRes);
if(trace)
- Logger.debug("TxCapsule.endResource(" + xid.toString() +
+ Logger.debug("TxCapsule.endResource(" + XidImpl.toString(useXid) +
") entered: " + xaRes.toString() +
" flag=" + flag);
unlock();
// OSH FIXME: resourceState could be incorrect during this callout.
try {
- xaRes.end(xid, flag);
+ xaRes.end(useXid, flag);
} finally {
lock();
if(trace)
- Logger.debug("TxCapsule.endResource(" + xid.toString() +
+ Logger.debug("TxCapsule.endResource(" + XidImpl.toString(useXid) +
") leaving: " + xaRes.toString() +
" flag=" + flag);
}
@@ -1062,7 +1073,7 @@
// a suspended state.
// But the Minerva XA pool does not like that we call end()
// two times in a row, so we resume before ending.
- startResource(resources[i], XAResource.TMRESUME);
+ startResource(resources[i], XAResource.TMRESUME,
(Xid)branchXids.get(resources[i]));
resourceState[i] = RS_ENLISTED;
}
if (resourceState[i] == RS_ENLISTED) {
@@ -1152,7 +1163,7 @@
if (resource != null) {
try {
unlock();
- resource.forget(xid);
+ resource.forget((Xid)branchXids.get(resource));
} catch (XAException e) {
Logger.warning("XAException at forget(): errorCode=" +
getStringXAErrorCode(e.errorCode));
@@ -1256,7 +1267,7 @@
unlock();
try {
- vote = resources[i].prepare(xid);
+ vote = resources[i].prepare((Xid)branchXids.get(resources[i]));
} finally {
lock();
}
@@ -1326,7 +1337,7 @@
try {
unlock();
try {
- resources[i].commit(xid, onePhase);
+ resources[i].commit((Xid)branchXids.get(resources[i]), onePhase);
} finally {
lock();
}
@@ -1344,6 +1355,11 @@
Logger.exception(e);
break;
}
+ try {
+ resources[i].forget((Xid)branchXids.get(resources[i]));
+ } catch(XAException forgetEx) {}
+ } finally {
+ branchXids.remove(resources[i]);
}
}
@@ -1366,7 +1382,7 @@
try {
unlock();
try {
- resources[i].rollback(xid);
+ resources[i].rollback((Xid)branchXids.get(resources[i]));
} finally {
lock();
}
@@ -1388,8 +1404,10 @@
break;
}
try {
- resources[i].forget(xid);
+ resources[i].forget((Xid)branchXids.get(resources[i]));
} catch(XAException forgetEx) {}
+ } finally {
+ branchXids.remove(resources[i]);
}
}
@@ -1410,6 +1428,27 @@
Object xid = xidConstructor.newInstance(new Object[]{new
Integer(XidImpl.getJbossFormatId()),
XidImpl.getGlobalIdString(XidImpl.getNextId()),
XidImpl.noBranchQualifier});
+ return (Xid)xid;
+ } catch(Exception e) {
+ System.out.println("Unable to create an Xid (reverting to default
impl): "+e);
+ return new XidImpl(XidImpl.getJbossFormatId(),
XidImpl.getGlobalIdString(XidImpl.getNextId()), XidImpl.noBranchQualifier);
+ }
+ }
+
+ private Xid createXid(Xid lastBranch) {
+ String name = System.getProperty("jboss.xa.xidclass",
"org.jboss.tm.XidImpl");
+ if(xidConstructor == null) {
+ try {
+ Class cls = Class.forName(name);
+ xidConstructor = cls.getConstructor(new Class[]{Integer.TYPE,
byte[].class, byte[].class});
+ } catch(Exception e) {
+ System.out.println("Unable to load Xid class '"+name+"'");
+ }
+ }
+ try {
+ Object xid = xidConstructor.newInstance(new Object[]{new
Integer(XidImpl.getJbossFormatId()),
+
XidImpl.getGlobalIdString(XidImpl.getNextId()),
+
XidImpl.getNextBranchQualifier(lastBranch.getBranchQualifier())});
return (Xid)xid;
} catch(Exception e) {
System.out.println("Unable to create an Xid (reverting to default
impl): "+e);
1.6 +31 -1 jboss/src/main/org/jboss/tm/XidImpl.java
Index: XidImpl.java
===================================================================
RCS file: /products/cvs/ejboss/jboss/src/main/org/jboss/tm/XidImpl.java,v
retrieving revision 1.5
retrieving revision 1.6
diff -u -r1.5 -r1.6
--- XidImpl.java 2000/10/21 05:20:23 1.5
+++ XidImpl.java 2000/10/21 16:24:00 1.6
@@ -18,7 +18,7 @@
* @see TransactionImpl
* @author Rickard �berg ([EMAIL PROTECTED])
* @author <a href="mailto:[EMAIL PROTECTED]">Ole Husgaard</a>
- * @version $Revision: 1.5 $
+ * @version $Revision: 1.6 $
*/
class XidImpl
implements Xid, java.io.Serializable
@@ -214,6 +214,36 @@
System.arraycopy(value.getBytes(), 0, b, 0, value.length());
return b;
}
+
+ static byte[] getNextBranchQualifier() {
+ int id = getNextId();
+ String ids = Integer.toString(id);
+ byte[] source = ids.getBytes();
+ byte[] result = new byte[MAXBQUALSIZE];
+ System.arraycopy(source, 0, result, 0, source.length);
+ return result;
+ }
+
+ static byte[] getNextBranchQualifier(byte[] previous) {
+ String ids = new String(previous).trim();
+ if(ids.length() == 0)
+ ids = "1";
+ else
+ ids = Integer.toString(Integer.parseInt(ids)+1);
+ byte[] source = ids.getBytes();
+ byte[] result = new byte[MAXBQUALSIZE];
+ System.arraycopy(source, 0, result, 0, source.length);
+ return result;
+ }
+
+ static String toString(Xid id) {
+ if(id == null)
+ return "[NULL XID!!]";
+ String s = id.getClass().getName();
+ s = s.substring(s.lastIndexOf('.')+1);
+ s = s+" [ID="+id.getFormatId()+", Global="+new
String(id.getGlobalTransactionId()).trim()+", Branch="+new
String(id.getBranchQualifier()).trim()+"]";
+ return s;
+ }
// Inner classes -------------------------------------------------
}