Author: ay
Date: Thu Jun 28 13:55:18 2012
New Revision: 1355016
URL: http://svn.apache.org/viewvc?rev=1355016&view=rev
Log:
Merged revisions 1354882 via svn merge from
https://svn.apache.org/repos/asf/cxf/trunk
........
r1354882 | ay | 2012-06-28 11:26:56 +0200 (Thu, 28 Jun 2012) | 1 line
an option to not hold the connection for CXF-4362
........
Added:
cxf/branches/2.6.x-fixes/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/persistence/jdbc/RMTxStorePooledConnectionTest.java
- copied unchanged from r1354882,
cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/persistence/jdbc/RMTxStorePooledConnectionTest.java
cxf/branches/2.6.x-fixes/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/persistence/jdbc/RMTxStoreTestBase.java
- copied unchanged from r1354882,
cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/persistence/jdbc/RMTxStoreTestBase.java
Modified:
cxf/branches/2.6.x-fixes/ (props changed)
cxf/branches/2.6.x-fixes/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/persistence/jdbc/RMTxStore.java
cxf/branches/2.6.x-fixes/rt/ws/rm/src/main/resources/schemas/configuration/wsrm-manager.xsd
cxf/branches/2.6.x-fixes/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/persistence/jdbc/RMTxStoreConfigurationTest.java
cxf/branches/2.6.x-fixes/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/persistence/jdbc/RMTxStoreTest.java
cxf/branches/2.6.x-fixes/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/persistence/jdbc/RMTxStoreTwoSchemasTest.java
cxf/branches/2.6.x-fixes/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/persistence/jdbc/RMTxStoreUpgradeTest.java
Propchange: cxf/branches/2.6.x-fixes/
('svn:mergeinfo' removed)
Propchange: cxf/branches/2.6.x-fixes/
------------------------------------------------------------------------------
Binary property 'svnmerge-integrated' - no diff available.
Modified:
cxf/branches/2.6.x-fixes/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/persistence/jdbc/RMTxStore.java
URL:
http://svn.apache.org/viewvc/cxf/branches/2.6.x-fixes/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/persistence/jdbc/RMTxStore.java?rev=1355016&r1=1355015&r2=1355016&view=diff
==============================================================================
---
cxf/branches/2.6.x-fixes/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/persistence/jdbc/RMTxStore.java
(original)
+++
cxf/branches/2.6.x-fixes/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/persistence/jdbc/RMTxStore.java
Thu Jun 28 13:55:18 2012
@@ -34,8 +34,10 @@ import java.text.MessageFormat;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Date;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
+import java.util.Map;
import java.util.Set;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
@@ -142,6 +144,19 @@ public class RMTxStore implements RMStor
"SELECT MSG_NO, SEND_TO, CONTENT FROM {0} WHERE SEQ_ID = ?";
private static final String ALTER_TABLE_STMT_STR =
"ALTER TABLE {0} ADD {1} {2}";
+ private static final String CREATE_INBOUND_MESSAGE_STMT_STR =
+ MessageFormat.format(CREATE_MESSAGE_STMT_STR, INBOUND_MSGS_TABLE_NAME);
+ private static final String CREATE_OUTBOUND_MESSAGE_STMT_STR =
+ MessageFormat.format(CREATE_MESSAGE_STMT_STR,
OUTBOUND_MSGS_TABLE_NAME);
+ private static final String DELETE_INBOUND_MESSAGE_STMT_STR =
+ MessageFormat.format(DELETE_MESSAGE_STMT_STR, INBOUND_MSGS_TABLE_NAME);
+ private static final String DELETE_OUTBOUND_MESSAGE_STMT_STR =
+ MessageFormat.format(DELETE_MESSAGE_STMT_STR,
OUTBOUND_MSGS_TABLE_NAME);
+ private static final String SELECT_INBOUND_MESSAGES_STMT_STR =
+ MessageFormat.format(SELECT_MESSAGES_STMT_STR,
INBOUND_MSGS_TABLE_NAME);
+ private static final String SELECT_OUTBOUND_MESSAGES_STMT_STR =
+ MessageFormat.format(SELECT_MESSAGES_STMT_STR,
OUTBOUND_MSGS_TABLE_NAME);
+
// create_schema may not work for several reasons, if so, create one
manually
private static final String CREATE_SCHEMA_STMT_STR = "CREATE SCHEMA {0}";
// given the schema, try these standard statements to switch to the schema
@@ -154,27 +169,14 @@ public class RMTxStore implements RMStor
private static final Logger LOG = LogUtils.getL7dLogger(RMTxStore.class);
+ // the connection and statements are cached only if
+ private boolean keepConnection = true;
private Connection connection;
private boolean createdConnection = true;
- private Lock writeLock = new ReentrantLock();
-
- private PreparedStatement createDestSequenceStmt;
- private PreparedStatement createSrcSequenceStmt;
- private PreparedStatement deleteDestSequenceStmt;
- private PreparedStatement deleteSrcSequenceStmt;
- private PreparedStatement updateDestSequenceStmt;
- private PreparedStatement updateSrcSequenceStmt;
- private PreparedStatement selectDestSequencesStmt;
- private PreparedStatement selectSrcSequencesStmt;
- private PreparedStatement selectDestSequenceStmt;
- private PreparedStatement selectSrcSequenceStmt;
- private PreparedStatement createInboundMessageStmt;
- private PreparedStatement createOutboundMessageStmt;
- private PreparedStatement deleteInboundMessageStmt;
- private PreparedStatement deleteOutboundMessageStmt;
- private PreparedStatement selectInboundMessagesStmt;
- private PreparedStatement selectOutboundMessagesStmt;
-
+
+ private Map<Statement, Lock> statementLocks;
+ private Map<String, PreparedStatement> cachedStatements;
+
private DataSource dataSource;
private String driverClassName = "org.apache.derby.jdbc.EmbeddedDriver";
private String url = MessageFormat.format("jdbc:derby:{0};create=true",
DEFAULT_DATABASE_NAME);
@@ -277,6 +279,15 @@ public class RMTxStore implements RMStor
this.tableExistsCode = tableExistsCode;
}
+
+ public boolean isKeepConnection() {
+ return keepConnection;
+ }
+
+ public void setKeepConnection(boolean keepConnection) {
+ this.keepConnection = keepConnection;
+ }
+
public long getInitialReconnectDelay() {
return initialReconnectDelay;
}
@@ -295,7 +306,7 @@ public class RMTxStore implements RMStor
public void setConnection(Connection c) {
connection = c;
- createdConnection = false;
+ createdConnection = false;
}
// RMStore interface
@@ -308,25 +319,28 @@ public class RMTxStore implements RMStor
LOG.info("Creating destination sequence: " + sequenceIdentifier +
", (endpoint: "
+ endpointIdentifier + ")");
}
- verifyConnection();
+ Connection con = verifyConnection();
+ PreparedStatement stmt = null;
SQLException conex = null;
try {
beginTransaction();
+ stmt = getStatement(con, CREATE_DEST_SEQUENCE_STMT_STR);
- createDestSequenceStmt.setString(1, sequenceIdentifier);
+ stmt.setString(1, sequenceIdentifier);
String addr = seq.getAcksTo().getAddress().getValue();
- createDestSequenceStmt.setString(2, addr);
- createDestSequenceStmt.setString(3, endpointIdentifier);
- createDestSequenceStmt.setString(4, protocolVersion);
- createDestSequenceStmt.execute();
+ stmt.setString(2, addr);
+ stmt.setString(3, endpointIdentifier);
+ stmt.setString(4, protocolVersion);
+ stmt.execute();
- commit();
+ commit(con);
} catch (SQLException ex) {
- abort();
+ abort(con);
conex = ex;
throw new RMStoreException(ex);
} finally {
- updateConnectionState(conex);
+ releaseResources(stmt, null);
+ updateConnectionState(con, conex);
}
}
@@ -338,27 +352,31 @@ public class RMTxStore implements RMStor
LOG.fine("Creating source sequence: " + sequenceIdentifier + ",
(endpoint: "
+ endpointIdentifier + ")");
}
- verifyConnection();
+ Connection con = verifyConnection();
+ PreparedStatement stmt = null;
SQLException conex = null;
try {
beginTransaction();
- createSrcSequenceStmt.setString(1, sequenceIdentifier);
+ stmt = getStatement(con, CREATE_SRC_SEQUENCE_STMT_STR);
+
+ stmt.setString(1, sequenceIdentifier);
Date expiry = seq.getExpires();
- createSrcSequenceStmt.setLong(2, expiry == null ? 0 :
expiry.getTime());
+ stmt.setLong(2, expiry == null ? 0 : expiry.getTime());
Identifier osid = seq.getOfferingSequenceIdentifier();
- createSrcSequenceStmt.setString(3, osid == null ? null :
osid.getValue());
- createSrcSequenceStmt.setString(4, endpointIdentifier);
- createSrcSequenceStmt.setString(5, protocolVersion);
- createSrcSequenceStmt.execute();
+ stmt.setString(3, osid == null ? null : osid.getValue());
+ stmt.setString(4, endpointIdentifier);
+ stmt.setString(5, protocolVersion);
+ stmt.execute();
- commit();
+ commit(con);
} catch (SQLException ex) {
conex = ex;
- abort();
+ abort(con);
throw new RMStoreException(ex);
} finally {
- updateConnectionState(conex);
+ releaseResources(stmt, null);
+ updateConnectionState(con, conex);
}
}
@@ -366,39 +384,34 @@ public class RMTxStore implements RMStor
if (LOG.isLoggable(Level.FINE)) {
LOG.info("Getting destination sequence for id: " + sid);
}
- verifyConnection();
+ Connection con = verifyConnection();
+ PreparedStatement stmt = null;
SQLException conex = null;
ResultSet res = null;
try {
- synchronized (selectDestSequenceStmt) {
- selectDestSequenceStmt.setString(1, sid.getValue());
- res = selectDestSequenceStmt.executeQuery();
-
- if (res.next()) {
- EndpointReferenceType acksTo =
RMUtils.createReference(res.getString(1));
- long lm = res.getLong(2);
- ProtocolVariation pv =
decodeProtocolVersion(res.getString(3));
- InputStream is = res.getBinaryStream(4);
- SequenceAcknowledgement ack = null;
- if (null != is) {
- ack = PersistenceUtils.getInstance()
- .deserialiseAcknowledgment(is);
- }
- return new DestinationSequence(sid, acksTo, lm, ack, pv);
+ stmt = getStatement(con, SELECT_DEST_SEQUENCE_STMT_STR);
+
+ stmt.setString(1, sid.getValue());
+ res = stmt.executeQuery();
+
+ if (res.next()) {
+ EndpointReferenceType acksTo =
RMUtils.createReference(res.getString(1));
+ long lm = res.getLong(2);
+ ProtocolVariation pv = decodeProtocolVersion(res.getString(3));
+ InputStream is = res.getBinaryStream(4);
+ SequenceAcknowledgement ack = null;
+ if (null != is) {
+ ack = PersistenceUtils.getInstance()
+ .deserialiseAcknowledgment(is);
}
+ return new DestinationSequence(sid, acksTo, lm, ack, pv);
}
} catch (SQLException ex) {
conex = ex;
LOG.log(Level.WARNING, new Message("SELECT_DEST_SEQ_FAILED_MSG",
LOG).toString(), ex);
} finally {
- if (res != null) {
- try {
- res.close();
- } catch (SQLException e) {
- // ignore
- }
- }
- updateConnectionState(conex);
+ releaseResources(stmt, res);
+ updateConnectionState(con, conex);
}
return null;
}
@@ -407,84 +420,87 @@ public class RMTxStore implements RMStor
if (LOG.isLoggable(Level.FINE)) {
LOG.info("Getting source sequences for id: " + sid);
}
- verifyConnection();
+ Connection con = verifyConnection();
+ PreparedStatement stmt = null;
SQLException conex = null;
ResultSet res = null;
try {
- synchronized (selectSrcSequenceStmt) {
- selectSrcSequenceStmt.setString(1, sid.getValue());
- res = selectSrcSequenceStmt.executeQuery();
-
- if (res.next()) {
- long cmn = res.getLong(1);
- boolean lm = res.getBoolean(2);
- long lval = res.getLong(3);
- Date expiry = 0 == lval ? null : new Date(lval);
- String oidValue = res.getString(4);
- Identifier oi = null;
- if (null != oidValue) {
- oi = RMUtils.getWSRMFactory().createIdentifier();
- oi.setValue(oidValue);
- }
- ProtocolVariation pv =
decodeProtocolVersion(res.getString(5));
- return new SourceSequence(sid, expiry, oi, cmn, lm, pv);
+ stmt = getStatement(con, SELECT_SRC_SEQUENCE_STMT_STR);
+
+ stmt.setString(1, sid.getValue());
+ res = stmt.executeQuery();
+
+ if (res.next()) {
+ long cmn = res.getLong(1);
+ boolean lm = res.getBoolean(2);
+ long lval = res.getLong(3);
+ Date expiry = 0 == lval ? null : new Date(lval);
+ String oidValue = res.getString(4);
+ Identifier oi = null;
+ if (null != oidValue) {
+ oi = RMUtils.getWSRMFactory().createIdentifier();
+ oi.setValue(oidValue);
}
+ ProtocolVariation pv = decodeProtocolVersion(res.getString(5));
+ return new SourceSequence(sid, expiry, oi, cmn, lm, pv);
}
} catch (SQLException ex) {
conex = ex;
// ignore
LOG.log(Level.WARNING, new Message("SELECT_SRC_SEQ_FAILED_MSG",
LOG).toString(), ex);
} finally {
- if (res != null) {
- try {
- res.close();
- } catch (SQLException e) {
- // ignore
- }
- }
- updateConnectionState(conex);
+ releaseResources(stmt, res);
+ updateConnectionState(con, conex);
}
return null;
}
public void removeDestinationSequence(Identifier sid) {
- verifyConnection();
+ Connection con = verifyConnection();
+ PreparedStatement stmt = null;
SQLException conex = null;
try {
beginTransaction();
- deleteDestSequenceStmt.setString(1, sid.getValue());
- deleteDestSequenceStmt.execute();
+ stmt = getStatement(con, DELETE_DEST_SEQUENCE_STMT_STR);
+
+ stmt.setString(1, sid.getValue());
+ stmt.execute();
- commit();
+ commit(con);
} catch (SQLException ex) {
conex = ex;
- abort();
+ abort(con);
throw new RMStoreException(ex);
} finally {
- updateConnectionState(conex);
+ releaseResources(stmt, null);
+ updateConnectionState(con, conex);
}
}
public void removeSourceSequence(Identifier sid) {
- verifyConnection();
+ Connection con = verifyConnection();
+ PreparedStatement stmt = null;
SQLException conex = null;
try {
beginTransaction();
- deleteSrcSequenceStmt.setString(1, sid.getValue());
- deleteSrcSequenceStmt.execute();
+ stmt = getStatement(con, DELETE_SRC_SEQUENCE_STMT_STR);
+
+ stmt.setString(1, sid.getValue());
+ stmt.execute();
- commit();
+ commit(con);
} catch (SQLException ex) {
conex = ex;
- abort();
+ abort(con);
throw new RMStoreException(ex);
} finally {
- updateConnectionState(conex);
+ releaseResources(stmt, null);
+ updateConnectionState(con, conex);
}
}
@@ -492,42 +508,37 @@ public class RMTxStore implements RMStor
if (LOG.isLoggable(Level.FINE)) {
LOG.info("Getting destination sequences for endpoint: " +
endpointIdentifier);
}
- verifyConnection();
+ Connection con = verifyConnection();
+ PreparedStatement stmt = null;
SQLException conex = null;
Collection<DestinationSequence> seqs = new
ArrayList<DestinationSequence>();
ResultSet res = null;
try {
- synchronized (selectDestSequencesStmt) {
- selectDestSequencesStmt.setString(1, endpointIdentifier);
- res = selectDestSequencesStmt.executeQuery();
- while (res.next()) {
- Identifier sid = new Identifier();
- sid.setValue(res.getString(1));
- EndpointReferenceType acksTo =
RMUtils.createReference(res.getString(2));
- long lm = res.getLong(3);
- ProtocolVariation pv =
decodeProtocolVersion(res.getString(4));
- InputStream is = res.getBinaryStream(5);
- SequenceAcknowledgement ack = null;
- if (null != is) {
- ack = PersistenceUtils.getInstance()
- .deserialiseAcknowledgment(is);
- }
- DestinationSequence seq = new DestinationSequence(sid,
acksTo, lm, ack, pv);
- seqs.add(seq);
+ stmt = getStatement(con, SELECT_DEST_SEQUENCES_STMT_STR);
+
+ stmt.setString(1, endpointIdentifier);
+ res = stmt.executeQuery();
+ while (res.next()) {
+ Identifier sid = new Identifier();
+ sid.setValue(res.getString(1));
+ EndpointReferenceType acksTo =
RMUtils.createReference(res.getString(2));
+ long lm = res.getLong(3);
+ ProtocolVariation pv = decodeProtocolVersion(res.getString(4));
+ InputStream is = res.getBinaryStream(5);
+ SequenceAcknowledgement ack = null;
+ if (null != is) {
+ ack = PersistenceUtils.getInstance()
+ .deserialiseAcknowledgment(is);
}
+ DestinationSequence seq = new DestinationSequence(sid, acksTo,
lm, ack, pv);
+ seqs.add(seq);
}
} catch (SQLException ex) {
conex = ex;
LOG.log(Level.WARNING, new Message("SELECT_DEST_SEQ_FAILED_MSG",
LOG).toString(), ex);
} finally {
- if (res != null) {
- try {
- res.close();
- } catch (SQLException e) {
- // ignore
- }
- }
- updateConnectionState(conex);
+ releaseResources(stmt, res);
+ updateConnectionState(con, conex);
}
return seqs;
}
@@ -536,69 +547,64 @@ public class RMTxStore implements RMStor
if (LOG.isLoggable(Level.FINE)) {
LOG.info("Getting source sequences for endpoint: " +
endpointIdentifier);
}
- verifyConnection();
+ Connection con = verifyConnection();
+ PreparedStatement stmt = null;
SQLException conex = null;
Collection<SourceSequence> seqs = new ArrayList<SourceSequence>();
ResultSet res = null;
try {
- synchronized (selectSrcSequencesStmt) {
- selectSrcSequencesStmt.setString(1, endpointIdentifier);
- res = selectSrcSequencesStmt.executeQuery();
- while (res.next()) {
- Identifier sid = new Identifier();
- sid.setValue(res.getString(1));
- long cmn = res.getLong(2);
- boolean lm = res.getBoolean(3);
- long lval = res.getLong(4);
- Date expiry = 0 == lval ? null : new Date(lval);
- String oidValue = res.getString(5);
- Identifier oi = null;
- if (null != oidValue) {
- oi = new Identifier();
- oi.setValue(oidValue);
- }
- ProtocolVariation pv =
decodeProtocolVersion(res.getString(6));
- SourceSequence seq = new SourceSequence(sid, expiry, oi,
cmn, lm, pv);
- seqs.add(seq);
- }
+ stmt = getStatement(con, SELECT_SRC_SEQUENCES_STMT_STR);
+
+ stmt.setString(1, endpointIdentifier);
+ res = stmt.executeQuery();
+ while (res.next()) {
+ Identifier sid = new Identifier();
+ sid.setValue(res.getString(1));
+ long cmn = res.getLong(2);
+ boolean lm = res.getBoolean(3);
+ long lval = res.getLong(4);
+ Date expiry = 0 == lval ? null : new Date(lval);
+ String oidValue = res.getString(5);
+ Identifier oi = null;
+ if (null != oidValue) {
+ oi = new Identifier();
+ oi.setValue(oidValue);
+ }
+ ProtocolVariation pv = decodeProtocolVersion(res.getString(6));
+ SourceSequence seq = new SourceSequence(sid, expiry, oi, cmn,
lm, pv);
+ seqs.add(seq);
}
} catch (SQLException ex) {
conex = ex;
// ignore
LOG.log(Level.WARNING, new Message("SELECT_SRC_SEQ_FAILED_MSG",
LOG).toString(), ex);
} finally {
- if (res != null) {
- try {
- res.close();
- } catch (SQLException e) {
- // ignore
- }
- }
- updateConnectionState(conex);
+ releaseResources(stmt, res);
+ updateConnectionState(con, conex);
}
return seqs;
}
public Collection<RMMessage> getMessages(Identifier sid, boolean outbound)
{
- verifyConnection();
+ Connection con = verifyConnection();
+ PreparedStatement stmt = null;
SQLException conex = null;
Collection<RMMessage> msgs = new ArrayList<RMMessage>();
ResultSet res = null;
try {
- PreparedStatement stmt = outbound ? selectOutboundMessagesStmt :
selectInboundMessagesStmt;
- synchronized (stmt) {
- stmt.setString(1, sid.getValue());
- res = stmt.executeQuery();
- while (res.next()) {
- long mn = res.getLong(1);
- String to = res.getString(2);
- Blob blob = res.getBlob(3);
- RMMessage msg = new RMMessage();
- msg.setMessageNumber(mn);
- msg.setTo(to);
- msg.setContent(blob.getBinaryStream());
- msgs.add(msg);
- }
+ stmt = getStatement(con, outbound ?
SELECT_OUTBOUND_MESSAGES_STMT_STR : SELECT_INBOUND_MESSAGES_STMT_STR);
+
+ stmt.setString(1, sid.getValue());
+ res = stmt.executeQuery();
+ while (res.next()) {
+ long mn = res.getLong(1);
+ String to = res.getString(2);
+ Blob blob = res.getBlob(3);
+ RMMessage msg = new RMMessage();
+ msg.setMessageNumber(mn);
+ msg.setTo(to);
+ msg.setContent(blob.getBinaryStream());
+ msgs.add(msg);
}
} catch (SQLException ex) {
conex = ex;
@@ -608,74 +614,70 @@ public class RMTxStore implements RMStor
LOG.log(Level.WARNING, new Message(outbound ?
"SELECT_OUTBOUND_MSGS_FAILED_MSG"
: "SELECT_INBOUND_MSGS_FAILED_MSG", LOG).toString(), ex);
} finally {
- if (res != null) {
- try {
- res.close();
- } catch (SQLException e) {
- // ignore
- }
- }
- updateConnectionState(conex);
+ releaseResources(stmt, res);
+ updateConnectionState(con, conex);
}
return msgs;
}
public void persistIncoming(DestinationSequence seq, RMMessage msg) {
- verifyConnection();
+ Connection con = verifyConnection();
SQLException conex = null;
try {
beginTransaction();
- updateDestinationSequence(seq);
+ updateDestinationSequence(con, seq);
if (msg != null && msg.getCachedOutputStream() != null) {
- storeMessage(seq.getIdentifier(), msg, false);
+ storeMessage(con, seq.getIdentifier(), msg, false);
}
- commit();
+ commit(con);
} catch (SQLException ex) {
conex = ex;
- abort();
+ abort(con);
throw new RMStoreException(ex);
} catch (IOException ex) {
- abort();
+ abort(con);
throw new RMStoreException(ex);
} finally {
- updateConnectionState(conex);
+ updateConnectionState(con, conex);
}
}
public void persistOutgoing(SourceSequence seq, RMMessage msg) {
- verifyConnection();
+ Connection con = verifyConnection();
SQLException conex = null;
try {
beginTransaction();
- updateSourceSequence(seq);
+ updateSourceSequence(con, seq);
if (msg != null && msg.getCachedOutputStream() != null) {
- storeMessage(seq.getIdentifier(), msg, true);
+ storeMessage(con, seq.getIdentifier(), msg, true);
}
- commit();
+ commit(con);
} catch (SQLException ex) {
conex = ex;
- abort();
+ abort(con);
throw new RMStoreException(ex);
} catch (IOException ex) {
- abort();
+ abort(con);
throw new RMStoreException(ex);
} finally {
- updateConnectionState(conex);
+ updateConnectionState(con, conex);
}
}
public void removeMessages(Identifier sid, Collection<Long> messageNrs,
boolean outbound) {
- verifyConnection();
+ Connection con = verifyConnection();
+ PreparedStatement stmt = null;
SQLException conex = null;
try {
- PreparedStatement stmt = outbound ? deleteOutboundMessageStmt :
deleteInboundMessageStmt;
+ stmt = getStatement(con, outbound ?
DELETE_OUTBOUND_MESSAGE_STMT_STR : DELETE_INBOUND_MESSAGE_STMT_STR);
+
beginTransaction();
stmt.setString(1, sid.getValue());
@@ -685,46 +687,51 @@ public class RMTxStore implements RMStor
stmt.execute();
}
- commit();
+ commit(con);
} catch (SQLException ex) {
conex = ex;
- abort();
+ abort(con);
throw new RMStoreException(ex);
} finally {
- updateConnectionState(conex);
+ releaseResources(stmt, null);
+ updateConnectionState(con, conex);
}
}
// transaction demarcation
//
-
+
protected void beginTransaction() {
- // avoid sharing of statements and result sets
- writeLock.lock();
}
+ protected void commit(Connection con) throws SQLException {
+ con.commit();
+ }
+
+ /**
+ * This method assumes that the connection is held and reused.
+ * Otherwise, use commit(Connection con)
+ */
protected void commit() throws SQLException {
- try {
- connection.commit();
- } finally {
- writeLock.unlock();
- }
+ commit(connection);
}
- protected void abort() {
+ protected void abort(Connection con) {
try {
- connection.rollback();
+ con.rollback();
} catch (SQLException ex) {
LogUtils.log(LOG, Level.SEVERE, "ABORT_FAILED_MSG", ex);
- } finally {
- writeLock.unlock();
}
}
+
+ protected void abort() {
+ abort(connection);
+ }
// helpers
- protected void storeMessage(Identifier sid, RMMessage msg, boolean
outbound)
+ protected void storeMessage(Connection con, Identifier sid, RMMessage msg,
boolean outbound)
throws IOException, SQLException {
String id = sid.getValue();
long nr = msg.getMessageNumber();
@@ -734,9 +741,11 @@ public class RMTxStore implements RMStor
new Object[] {outbound ? "outbound" : "inbound", nr, id,
to});
}
InputStream msgin = null;
+ PreparedStatement stmt = null;
try {
msgin = msg.getInputStream();
- PreparedStatement stmt = outbound ? createOutboundMessageStmt :
createInboundMessageStmt;
+ stmt = getStatement(con, outbound ?
CREATE_OUTBOUND_MESSAGE_STMT_STR : CREATE_INBOUND_MESSAGE_STMT_STR);
+
int i = 1;
stmt.setString(i++, id);
stmt.setLong(i++, nr);
@@ -755,86 +764,128 @@ public class RMTxStore implements RMStor
// ignore
}
}
+ releaseResources(stmt, null);
}
-
}
- protected void updateSourceSequence(SourceSequence seq)
+ /**
+ * this method is only useful when keepConnection is set to true
+ */
+ protected void storeMessage(Identifier sid, RMMessage msg, boolean
outbound)
+ throws IOException, SQLException {
+ storeMessage(connection, sid, msg, outbound);
+ }
+
+ protected void updateSourceSequence(Connection con, SourceSequence seq)
throws SQLException {
- synchronized (updateSrcSequenceStmt) {
- updateSrcSequenceStmt.setLong(1, seq.getCurrentMessageNr());
- updateSrcSequenceStmt.setString(2, seq.isLastMessage() ? "1" :
"0");
- updateSrcSequenceStmt.setString(3, seq.getIdentifier().getValue());
- updateSrcSequenceStmt.execute();
+ PreparedStatement stmt = null;
+ try {
+ stmt = getStatement(con, UPDATE_SRC_SEQUENCE_STMT_STR);
+
+ stmt.setLong(1, seq.getCurrentMessageNr());
+ stmt.setString(2, seq.isLastMessage() ? "1" : "0");
+ stmt.setString(3, seq.getIdentifier().getValue());
+ stmt.execute();
+ } finally {
+ releaseResources(stmt, null);
}
}
- protected void updateDestinationSequence(DestinationSequence seq)
+ /**
+ * @throws SQLException
+ */
+ protected void updateSourceSequence(SourceSequence seq) throws
SQLException {
+ updateSourceSequence(connection, seq);
+ }
+
+ protected void updateDestinationSequence(Connection con,
DestinationSequence seq)
throws SQLException, IOException {
- synchronized (updateDestSequenceStmt) {
+ PreparedStatement stmt = null;
+ try {
+ stmt = getStatement(con, UPDATE_DEST_SEQUENCE_STMT_STR);
+
long lastMessageNr = seq.getLastMessageNumber();
- updateDestSequenceStmt.setLong(1, lastMessageNr);
+ stmt.setLong(1, lastMessageNr);
InputStream is = PersistenceUtils.getInstance()
.serialiseAcknowledgment(seq.getAcknowledgment());
- updateDestSequenceStmt.setBinaryStream(2, is, is.available());
- updateDestSequenceStmt.setString(3, seq.getIdentifier()
.getValue());
- updateDestSequenceStmt.execute();
+ stmt.setBinaryStream(2, is, is.available());
+ stmt.setString(3, seq.getIdentifier() .getValue());
+ stmt.execute();
+ } finally {
+ releaseResources(stmt, null);
}
}
+ /**
+ * @throws IOException
+ * @throws SQLException
+ */
+ protected void updateDestinationSequence(DestinationSequence seq)
+ throws SQLException, IOException {
+ updateDestinationSequence(connection, seq);
+ }
+
protected void createTables() throws SQLException {
-
+ Connection con = verifyConnection();
Statement stmt = null;
- stmt = connection.createStatement();
+
try {
- stmt.executeUpdate(CREATE_SRC_SEQUENCES_TABLE_STMT);
- } catch (SQLException ex) {
- if (!isTableExistsError(ex)) {
- throw ex;
- } else {
- LOG.fine("Table CXF_RM_SRC_SEQUENCES already exists.");
- verifyTable(SRC_SEQUENCES_TABLE_NAME,
SRC_SEQUENCES_TABLE_COLS);
+ con.setAutoCommit(true);
+ stmt = con.createStatement();
+ try {
+ stmt.executeUpdate(CREATE_SRC_SEQUENCES_TABLE_STMT);
+ } catch (SQLException ex) {
+ if (!isTableExistsError(ex)) {
+ throw ex;
+ } else {
+ LOG.fine("Table CXF_RM_SRC_SEQUENCES already exists.");
+ verifyTable(con, SRC_SEQUENCES_TABLE_NAME,
SRC_SEQUENCES_TABLE_COLS);
+ }
+ } finally {
+ stmt.close();
}
- } finally {
- stmt.close();
- }
- stmt = connection.createStatement();
- try {
- stmt.executeUpdate(CREATE_DEST_SEQUENCES_TABLE_STMT);
- } catch (SQLException ex) {
- if (!isTableExistsError(ex)) {
- throw ex;
- } else {
- LOG.fine("Table CXF_RM_DEST_SEQUENCES already exists.");
- verifyTable(DEST_SEQUENCES_TABLE_NAME,
DEST_SEQUENCES_TABLE_COLS);
- }
- } finally {
- stmt.close();
- }
-
- for (String tableName : new String[] {OUTBOUND_MSGS_TABLE_NAME,
INBOUND_MSGS_TABLE_NAME}) {
- stmt = connection.createStatement();
+ stmt = con.createStatement();
try {
-
stmt.executeUpdate(MessageFormat.format(CREATE_MESSAGES_TABLE_STMT, tableName));
+ stmt.executeUpdate(CREATE_DEST_SEQUENCES_TABLE_STMT);
} catch (SQLException ex) {
if (!isTableExistsError(ex)) {
throw ex;
} else {
- if (LOG.isLoggable(Level.FINE)) {
- LOG.fine("Table " + tableName + " already exists.");
- }
- verifyTable(tableName, MESSAGES_TABLE_COLS);
+ LOG.fine("Table CXF_RM_DEST_SEQUENCES already exists.");
+ verifyTable(con, DEST_SEQUENCES_TABLE_NAME,
DEST_SEQUENCES_TABLE_COLS);
}
} finally {
stmt.close();
}
+
+ for (String tableName : new String[] {OUTBOUND_MSGS_TABLE_NAME,
INBOUND_MSGS_TABLE_NAME}) {
+ stmt = con.createStatement();
+ try {
+
stmt.executeUpdate(MessageFormat.format(CREATE_MESSAGES_TABLE_STMT, tableName));
+ } catch (SQLException ex) {
+ if (!isTableExistsError(ex)) {
+ throw ex;
+ } else {
+ if (LOG.isLoggable(Level.FINE)) {
+ LOG.fine("Table " + tableName + " already
exists.");
+ }
+ verifyTable(con, tableName, MESSAGES_TABLE_COLS);
+ }
+ } finally {
+ stmt.close();
+ }
+ }
+ } finally {
+ if (connection == null) {
+ con.close();
+ }
}
}
- protected void verifyTable(String tableName, String[][] tableCols) {
+ protected void verifyTable(Connection con, String tableName, String[][]
tableCols) {
try {
- DatabaseMetaData metadata = connection.getMetaData();
+ DatabaseMetaData metadata = con.getMetaData();
ResultSet rs = metadata.getColumns(null, null, tableName, "%");
Set<String> dbCols = new HashSet<String>();
List<String[]> newCols = new ArrayList<String[]>();
@@ -853,7 +904,7 @@ public class RMTxStore implements RMStor
}
for (String[] newCol : newCols) {
- Statement st = connection.createStatement();
+ Statement st = con.createStatement();
try {
st.executeUpdate(MessageFormat.format(ALTER_TABLE_STMT_STR,
tableName,
newCol[0], newCol[1]));
@@ -873,7 +924,15 @@ public class RMTxStore implements RMStor
}
}
}
-
+
+ protected void verifyTable(String tableName, String[][] tableCols) {
+ verifyTable(connection, tableName, tableCols);
+ }
+ /**
+ * Sets the current schema associated with the connection.
+ * If the connection is not set (e.g., keepConnection is false, it has no
effect.
+ * @throws SQLException
+ */
protected void setCurrentSchema() throws SQLException {
if (schemaName == null || connection == null) {
return;
@@ -910,70 +969,104 @@ public class RMTxStore implements RMStor
}
}
}
-
- private void createStatements() throws SQLException {
- // create the statements in advance to avoid synchronization later
- createDestSequenceStmt =
connection.prepareStatement(CREATE_DEST_SEQUENCE_STMT_STR);
- createSrcSequenceStmt =
connection.prepareStatement(CREATE_SRC_SEQUENCE_STMT_STR);
- deleteDestSequenceStmt =
connection.prepareStatement(DELETE_DEST_SEQUENCE_STMT_STR);
- deleteSrcSequenceStmt =
connection.prepareStatement(DELETE_SRC_SEQUENCE_STMT_STR);
- updateDestSequenceStmt =
connection.prepareStatement(UPDATE_DEST_SEQUENCE_STMT_STR);
- updateSrcSequenceStmt =
connection.prepareStatement(UPDATE_SRC_SEQUENCE_STMT_STR);
- selectDestSequencesStmt =
connection.prepareStatement(SELECT_DEST_SEQUENCES_STMT_STR);
- selectSrcSequencesStmt =
connection.prepareStatement(SELECT_SRC_SEQUENCES_STMT_STR);
- selectDestSequenceStmt =
connection.prepareStatement(SELECT_DEST_SEQUENCE_STMT_STR);
- selectSrcSequenceStmt =
connection.prepareStatement(SELECT_SRC_SEQUENCE_STMT_STR);
- createInboundMessageStmt = connection.prepareStatement(
- MessageFormat.format(CREATE_MESSAGE_STMT_STR,
INBOUND_MSGS_TABLE_NAME));
- createOutboundMessageStmt = connection.prepareStatement(
- MessageFormat.format(CREATE_MESSAGE_STMT_STR,
OUTBOUND_MSGS_TABLE_NAME));
- deleteInboundMessageStmt = connection.prepareStatement(
- MessageFormat.format(DELETE_MESSAGE_STMT_STR,
INBOUND_MSGS_TABLE_NAME));
- deleteOutboundMessageStmt = connection.prepareStatement(
- MessageFormat.format(DELETE_MESSAGE_STMT_STR,
OUTBOUND_MSGS_TABLE_NAME));
- selectInboundMessagesStmt = connection.prepareStatement(
- MessageFormat.format(SELECT_MESSAGES_STMT_STR,
INBOUND_MSGS_TABLE_NAME));
- selectOutboundMessagesStmt = connection.prepareStatement(
- MessageFormat.format(SELECT_MESSAGES_STMT_STR,
OUTBOUND_MSGS_TABLE_NAME));
+
+ /**
+ * Returns either the locally cached statement or the one from the
specified connection
+ * depending on whether the connection is held by this store. If the
statement retrieved from
+ * the local cache, it is locked until it is released. The retrieved
statement must be
+ * released using releaseResources(PreparedStatement stmt, ResultSet rs).
+ *
+ * @param con
+ * @param sql
+ * @return
+ * @throws SQLException
+ */
+ protected PreparedStatement getStatement(Connection con, String sql)
throws SQLException {
+ if (connection != null) {
+ PreparedStatement stmt = cachedStatements.get(sql);
+ statementLocks.get(stmt).lock();
+ return stmt;
+ } else {
+ return con.prepareStatement(sql);
+ }
}
- public synchronized void init() {
- if (null == connection) {
- LOG.log(Level.FINE, "Using derby.system.home: {0}",
- SystemPropertyAction.getProperty("derby.system.home"));
- if (null != dataSource) {
- try {
- LOG.log(Level.FINE, "Using dataSource: " + dataSource);
- connection = dataSource.getConnection();
- } catch (SQLException ex) {
- LogUtils.log(LOG, Level.SEVERE, "CONNECT_EXC", ex);
- return;
- }
+ /**
+ * Releases the statement and any result set.
+ *
+ * @param stmt
+ * @param rs
+ */
+ protected void releaseResources(PreparedStatement stmt, ResultSet rs) {
+ if (rs != null) {
+ try {
+ rs.close();
+ } catch (SQLException e) {
+ // ignore
+ }
+ }
+ if (stmt != null) {
+ if (connection != null) {
+ statementLocks.get(stmt).unlock();
} else {
- assert null != url;
- assert null != driverClassName;
try {
- Class.forName(driverClassName);
- } catch (ClassNotFoundException ex) {
- LogUtils.log(LOG, Level.SEVERE, "CONNECT_EXC", ex);
- return;
- }
-
- try {
- LOG.log(Level.FINE, "Using url: " + url);
- connection = DriverManager.getConnection(url, userName,
password);
- } catch (SQLException ex) {
- LogUtils.log(LOG, Level.SEVERE, "CONNECT_EXC", ex);
- return;
+ stmt.close();
+ } catch (SQLException e) {
+ // ignore
}
}
}
+ }
+
+
+ private void cacheStatement(Connection con, String sql)
+ throws SQLException {
+ PreparedStatement stmt = con.prepareStatement(sql);
+ cachedStatements.put(sql, stmt);
+ statementLocks.put(stmt, new ReentrantLock());
+ }
+
+ private void cacheStatements() throws SQLException {
+ if (connection == null) {
+ // if the connection is not held, no statement is cached.
+ return;
+ }
+ // create a statement specific lock table
+ statementLocks = new HashMap<Statement, Lock>();
+ cachedStatements = new HashMap<String, PreparedStatement>();
+
+ // create the statements in advance if the connection is to be kept
+ cacheStatement(connection, CREATE_DEST_SEQUENCE_STMT_STR);
+ cacheStatement(connection, CREATE_SRC_SEQUENCE_STMT_STR);
+ cacheStatement(connection, DELETE_DEST_SEQUENCE_STMT_STR);
+ cacheStatement(connection, DELETE_SRC_SEQUENCE_STMT_STR);
+ cacheStatement(connection, UPDATE_DEST_SEQUENCE_STMT_STR);
+ cacheStatement(connection, UPDATE_SRC_SEQUENCE_STMT_STR);
+ cacheStatement(connection, SELECT_DEST_SEQUENCES_STMT_STR);
+ cacheStatement(connection, SELECT_SRC_SEQUENCES_STMT_STR);
+ cacheStatement(connection, SELECT_DEST_SEQUENCE_STMT_STR);
+ cacheStatement(connection, SELECT_SRC_SEQUENCE_STMT_STR);
+ cacheStatement(connection, CREATE_INBOUND_MESSAGE_STMT_STR);
+ cacheStatement(connection, CREATE_OUTBOUND_MESSAGE_STMT_STR);
+ cacheStatement(connection, DELETE_INBOUND_MESSAGE_STMT_STR);
+ cacheStatement(connection, DELETE_OUTBOUND_MESSAGE_STMT_STR);
+ cacheStatement(connection, SELECT_INBOUND_MESSAGES_STMT_STR);
+ cacheStatement(connection, SELECT_OUTBOUND_MESSAGES_STMT_STR);
+ }
+
+ public synchronized void init() {
+ if (keepConnection && connection == null) {
+ connection = createConnection();
+ }
try {
- connection.setAutoCommit(true);
- setCurrentSchema();
+ if (connection != null && schemaName != null) {
+ setCurrentSchema();
+ }
createTables();
- createStatements();
+ if (connection != null) {
+ cacheStatements();
+ }
} catch (SQLException ex) {
ex.printStackTrace();
LogUtils.log(LOG, Level.SEVERE, "CONNECT_EXC", ex);
@@ -985,51 +1078,96 @@ public class RMTxStore implements RMStor
throw new RMStoreException(ex);
} catch (Throwable ex) {
ex.printStackTrace();
- } finally {
- try {
- connection.setAutoCommit(false);
- } catch (SQLException ex) {
- LogUtils.log(LOG, Level.SEVERE, "CONNECT_EXC", ex);
- throw new RMStoreException(ex);
- }
}
}
Connection getConnection() {
return connection;
}
+
+ protected Connection createConnection() {
+ LOG.log(Level.FINE, "Using derby.system.home: {0}",
+ SystemPropertyAction.getProperty("derby.system.home"));
+ Connection con = null;
+ if (null != dataSource) {
+ try {
+ if (LOG.isLoggable(Level.FINE)) {
+ LOG.log(Level.FINE, "Using dataSource: " + dataSource);
+ }
+ con = dataSource.getConnection();
+ } catch (SQLException ex) {
+ LogUtils.log(LOG, Level.SEVERE, "CONNECT_EXC", ex);
+ }
+ } else {
+ assert null != url;
+ assert null != driverClassName;
+ try {
+ if (LOG.isLoggable(Level.FINE)) {
+ LOG.log(Level.FINE, "Using url: " + url);
+ }
+ con = DriverManager.getConnection(url, userName, password);
+ } catch (SQLException ex) {
+ LogUtils.log(LOG, Level.SEVERE, "CONNECT_EXC", ex);
+ }
+ }
+ return con;
+ }
- protected synchronized void verifyConnection() {
- if (createdConnection && nextReconnectAttempt > 0
- && (maxReconnectAttempts < 0 || maxReconnectAttempts >
reconnectAttempts)) {
- if (System.currentTimeMillis() > nextReconnectAttempt) {
- // destroy the broken connection
- destroy();
- // try to reconnect
- reconnectAttempts++;
- init();
- // reset the next reconnect attempt time
- nextReconnectAttempt = 0;
- } else {
- LogUtils.log(LOG, Level.INFO, "WAIT_RECONNECT_MSG");
+ protected Connection verifyConnection() {
+ Connection con;
+ if (connection == null) {
+ // return a new connection
+ con = createConnection();
+ } else {
+ // return the cached connection or create and cache a new one if
the old one is dead
+ synchronized (this) {
+ if (createdConnection && nextReconnectAttempt > 0
+ && (maxReconnectAttempts < 0 || maxReconnectAttempts >
reconnectAttempts)) {
+ if (System.currentTimeMillis() > nextReconnectAttempt) {
+ // destroy the broken connection
+ destroy();
+ // try to reconnect
+ reconnectAttempts++;
+ init();
+ // reset the next reconnect attempt time
+ nextReconnectAttempt = 0;
+ } else {
+ LogUtils.log(LOG, Level.INFO, "WAIT_RECONNECT_MSG");
+ }
+ }
}
+ con = connection;
}
+
+ return con;
}
- protected synchronized void updateConnectionState(SQLException e) {
- if (e == null) {
- // reset the previous error status
- reconnectDelay = 0;
- reconnectAttempts = 0;
- nextReconnectAttempt = 0;
- } else if (createdConnection && isRecoverableError(e)) {
- // update the next reconnect schedule
- if (reconnectDelay == 0) {
- reconnectDelay = initialReconnectDelay;
+ protected void updateConnectionState(Connection con, SQLException e) {
+ if (connection == null) {
+ // close the locally created connection
+ try {
+ con.close();
+ } catch (SQLException ex) {
+ // ignore
}
- if (nextReconnectAttempt < System.currentTimeMillis()) {
- nextReconnectAttempt = System.currentTimeMillis() +
reconnectDelay;
- reconnectDelay = reconnectDelay * useExponentialBackOff;
+ } else {
+ synchronized (this) {
+ // update the status of the cached connection
+ if (e == null) {
+ // reset the previous error status
+ reconnectDelay = 0;
+ reconnectAttempts = 0;
+ nextReconnectAttempt = 0;
+ } else if (createdConnection && isRecoverableError(e)) {
+ // update the next reconnect schedule
+ if (reconnectDelay == 0) {
+ reconnectDelay = initialReconnectDelay;
+ }
+ if (nextReconnectAttempt < System.currentTimeMillis()) {
+ nextReconnectAttempt = System.currentTimeMillis() +
reconnectDelay;
+ reconnectDelay = reconnectDelay *
useExponentialBackOff;
+ }
+ }
}
}
}
Modified:
cxf/branches/2.6.x-fixes/rt/ws/rm/src/main/resources/schemas/configuration/wsrm-manager.xsd
URL:
http://svn.apache.org/viewvc/cxf/branches/2.6.x-fixes/rt/ws/rm/src/main/resources/schemas/configuration/wsrm-manager.xsd?rev=1355016&r1=1355015&r2=1355016&view=diff
==============================================================================
---
cxf/branches/2.6.x-fixes/rt/ws/rm/src/main/resources/schemas/configuration/wsrm-manager.xsd
(original)
+++
cxf/branches/2.6.x-fixes/rt/ws/rm/src/main/resources/schemas/configuration/wsrm-manager.xsd
Thu Jun 28 13:55:18 2012
@@ -192,11 +192,19 @@
</xs:documentation>
</xs:annotation>
</xs:attribute>
+ <xs:attribute name="keepConnection" type="xs:boolean">
+ <xs:annotation>
+ <xs:documentation>
+ Indicates if the connection is kept by the store.
+ </xs:documentation>
+ </xs:annotation>
+ </xs:attribute>
<xs:attribute name="initialReconnectDelay" type="xs:long">
<xs:annotation>
<xs:documentation>
The initialy delay before attempting to reconnect to the
database. The subsequent
- delay follows the exponential backoff algorithm.
+ delay follows the exponential backoff algorithm. This
attribute is only
+ relevant when the connection is kept by the store.
</xs:documentation>
</xs:annotation>
</xs:attribute>
@@ -204,7 +212,8 @@
<xs:annotation>
<xs:documentation>
The maximum number of attemps taken to reconnect to the
database after a connection error.
- The value of 0 indicates no attempt, while a negative value
indicates no limit.
+ The value of 0 indicates no attempt, while a negative value
indicates no limit.
+ This attribute is only relevant when the connection is kept
by the store.
</xs:documentation>
</xs:annotation>
</xs:attribute>
Modified:
cxf/branches/2.6.x-fixes/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/persistence/jdbc/RMTxStoreConfigurationTest.java
URL:
http://svn.apache.org/viewvc/cxf/branches/2.6.x-fixes/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/persistence/jdbc/RMTxStoreConfigurationTest.java?rev=1355016&r1=1355015&r2=1355016&view=diff
==============================================================================
---
cxf/branches/2.6.x-fixes/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/persistence/jdbc/RMTxStoreConfigurationTest.java
(original)
+++
cxf/branches/2.6.x-fixes/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/persistence/jdbc/RMTxStoreConfigurationTest.java
Thu Jun 28 13:55:18 2012
@@ -105,7 +105,7 @@ public class RMTxStoreConfigurationTest
assertNull(store.getConnection());
}
-
+
static class TestDataSource implements DataSource {
public PrintWriter getLogWriter() throws SQLException {
return null;