Author: ay
Date: Wed Jun 6 12:01:02 2012
New Revision: 1346840
URL: http://svn.apache.org/viewvc?rev=1346840&view=rev
Log:
[CXF-4362] Add a reconnect option in WS-RM RMTxStore
Modified:
cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/persistence/jdbc/Messages.properties
cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/persistence/jdbc/RMTxStore.java
cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/persistence/jdbc/RMTxStoreTest.java
Modified:
cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/persistence/jdbc/Messages.properties
URL:
http://svn.apache.org/viewvc/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/persistence/jdbc/Messages.properties?rev=1346840&r1=1346839&r2=1346840&view=diff
==============================================================================
---
cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/persistence/jdbc/Messages.properties
(original)
+++
cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/persistence/jdbc/Messages.properties
Wed Jun 6 12:01:02 2012
@@ -20,6 +20,8 @@
#
CONNECT_EXC = Failed to connect to store.
ABORT_FAILED_MSG = Failed to abort transaction.
+CLOSE_FAILED_MSG = Failed to close connection.
+RECONNECT_WAIT_MSG = Waiting for the next reconnect attempt.
SELECT_DEST_SEQ_FAILED_MSG = Failed to retrieve destination sequences from
persistent store.
SELECT_SRC_SEQ_FAILED_MSG = Failed to retrieve source sequences from
persistent store.
VERIFY_TABLE_FAILED_MSG = Failed to verify the table definition.
\ No newline at end of file
Modified:
cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/persistence/jdbc/RMTxStore.java
URL:
http://svn.apache.org/viewvc/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/persistence/jdbc/RMTxStore.java?rev=1346840&r1=1346839&r2=1346840&view=diff
==============================================================================
---
cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/persistence/jdbc/RMTxStore.java
(original)
+++
cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/persistence/jdbc/RMTxStore.java
Wed Jun 6 12:01:02 2012
@@ -178,6 +178,14 @@ public class RMTxStore implements RMStor
private String userName;
private String password;
private String schemaName;
+
+ private long initialReconnectDelay = 60000L;
+ private int useExponentialBackOff = 2;
+ private int maxReconnectAttempts = 10;
+
+ private long reconnectDelay;
+ private int reconnectAttempts;
+ private long nextReconnectAttempt;
private String tableExistsState = DERBY_TABLE_EXISTS_STATE;
private int tableExistsCode = ORACLE_TABLE_EXISTS_CODE;
@@ -192,6 +200,7 @@ public class RMTxStore implements RMStor
} catch (SQLException e) {
//ignore
}
+ connection = null;
}
}
@@ -265,6 +274,22 @@ public class RMTxStore implements RMStor
this.tableExistsCode = tableExistsCode;
}
+ public long getInitialReconnectDelay() {
+ return initialReconnectDelay;
+ }
+
+ public void setInitialReconnectDelay(long initialReconnectDelay) {
+ this.initialReconnectDelay = initialReconnectDelay;
+ }
+
+ public int getMaxReconnectAttempts() {
+ return maxReconnectAttempts;
+ }
+
+ public void setMaxReconnectAttempts(int maxReconnectAttempts) {
+ this.maxReconnectAttempts = maxReconnectAttempts;
+ }
+
public void setConnection(Connection c) {
connection = c;
createdConnection = false;
@@ -280,6 +305,8 @@ public class RMTxStore implements RMStor
LOG.info("Creating destination sequence: " + sequenceIdentifier +
", (endpoint: "
+ endpointIdentifier + ")");
}
+ verifyConnection();
+ SQLException conex = null;
try {
beginTransaction();
@@ -291,10 +318,12 @@ public class RMTxStore implements RMStor
createDestSequenceStmt.execute();
commit();
-
} catch (SQLException ex) {
abort();
+ conex = ex;
throw new RMStoreException(ex);
+ } finally {
+ updateConnectionState(conex);
}
}
@@ -306,7 +335,8 @@ public class RMTxStore implements RMStor
LOG.fine("Creating source sequence: " + sequenceIdentifier + ",
(endpoint: "
+ endpointIdentifier + ")");
}
-
+ verifyConnection();
+ SQLException conex = null;
try {
beginTransaction();
@@ -320,10 +350,12 @@ public class RMTxStore implements RMStor
createSrcSequenceStmt.execute();
commit();
-
} catch (SQLException ex) {
+ conex = ex;
abort();
throw new RMStoreException(ex);
+ } finally {
+ updateConnectionState(conex);
}
}
@@ -331,6 +363,8 @@ public class RMTxStore implements RMStor
if (LOG.isLoggable(Level.FINE)) {
LOG.info("Getting destination sequence for id: " + sid);
}
+ verifyConnection();
+ SQLException conex = null;
ResultSet res = null;
try {
synchronized (selectDestSequenceStmt) {
@@ -351,6 +385,7 @@ public class RMTxStore implements RMStor
}
}
} catch (SQLException ex) {
+ conex = ex;
LOG.log(Level.WARNING, new Message("SELECT_DEST_SEQ_FAILED_MSG",
LOG).toString(), ex);
} finally {
if (res != null) {
@@ -360,6 +395,7 @@ public class RMTxStore implements RMStor
// ignore
}
}
+ updateConnectionState(conex);
}
return null;
}
@@ -368,6 +404,8 @@ public class RMTxStore implements RMStor
if (LOG.isLoggable(Level.FINE)) {
LOG.info("Getting source sequences for id: " + sid);
}
+ verifyConnection();
+ SQLException conex = null;
ResultSet res = null;
try {
synchronized (selectSrcSequenceStmt) {
@@ -390,6 +428,7 @@ public class RMTxStore implements RMStor
}
}
} catch (SQLException ex) {
+ conex = ex;
// ignore
LOG.log(Level.WARNING, new Message("SELECT_SRC_SEQ_FAILED_MSG",
LOG).toString(), ex);
} finally {
@@ -400,11 +439,14 @@ public class RMTxStore implements RMStor
// ignore
}
}
+ updateConnectionState(conex);
}
return null;
}
public void removeDestinationSequence(Identifier sid) {
+ verifyConnection();
+ SQLException conex = null;
try {
beginTransaction();
@@ -414,13 +456,18 @@ public class RMTxStore implements RMStor
commit();
} catch (SQLException ex) {
+ conex = ex;
abort();
throw new RMStoreException(ex);
- }
+ } finally {
+ updateConnectionState(conex);
+ }
}
public void removeSourceSequence(Identifier sid) {
+ verifyConnection();
+ SQLException conex = null;
try {
beginTransaction();
@@ -430,8 +477,11 @@ public class RMTxStore implements RMStor
commit();
} catch (SQLException ex) {
+ conex = ex;
abort();
throw new RMStoreException(ex);
+ } finally {
+ updateConnectionState(conex);
}
}
@@ -439,6 +489,8 @@ public class RMTxStore implements RMStor
if (LOG.isLoggable(Level.FINE)) {
LOG.info("Getting destination sequences for endpoint: " +
endpointIdentifier);
}
+ verifyConnection();
+ SQLException conex = null;
Collection<DestinationSequence> seqs = new
ArrayList<DestinationSequence>();
ResultSet res = null;
try {
@@ -462,6 +514,7 @@ public class RMTxStore implements RMStor
}
}
} catch (SQLException ex) {
+ conex = ex;
LOG.log(Level.WARNING, new Message("SELECT_DEST_SEQ_FAILED_MSG",
LOG).toString(), ex);
} finally {
if (res != null) {
@@ -471,6 +524,7 @@ public class RMTxStore implements RMStor
// ignore
}
}
+ updateConnectionState(conex);
}
return seqs;
}
@@ -479,6 +533,8 @@ public class RMTxStore implements RMStor
if (LOG.isLoggable(Level.FINE)) {
LOG.info("Getting source sequences for endpoint: " +
endpointIdentifier);
}
+ verifyConnection();
+ SQLException conex = null;
Collection<SourceSequence> seqs = new ArrayList<SourceSequence>();
ResultSet res = null;
try {
@@ -504,6 +560,7 @@ public class RMTxStore implements RMStor
}
}
} catch (SQLException ex) {
+ conex = ex;
// ignore
LOG.log(Level.WARNING, new Message("SELECT_SRC_SEQ_FAILED_MSG",
LOG).toString(), ex);
} finally {
@@ -514,11 +571,14 @@ public class RMTxStore implements RMStor
// ignore
}
}
+ updateConnectionState(conex);
}
return seqs;
}
public Collection<RMMessage> getMessages(Identifier sid, boolean outbound)
{
+ verifyConnection();
+ SQLException conex = null;
Collection<RMMessage> msgs = new ArrayList<RMMessage>();
ResultSet res = null;
try {
@@ -537,7 +597,11 @@ public class RMTxStore implements RMStor
msgs.add(msg);
}
}
- } catch (Exception ex) {
+ } catch (SQLException ex) {
+ conex = ex;
+ LOG.log(Level.WARNING, new Message(outbound ?
"SELECT_OUTBOUND_MSGS_FAILED_MSG"
+ : "SELECT_INBOUND_MSGS_FAILED_MSG", LOG).toString(), ex);
+ } catch (IOException ex) {
LOG.log(Level.WARNING, new Message(outbound ?
"SELECT_OUTBOUND_MSGS_FAILED_MSG"
: "SELECT_INBOUND_MSGS_FAILED_MSG", LOG).toString(), ex);
} finally {
@@ -548,11 +612,14 @@ public class RMTxStore implements RMStor
// ignore
}
}
+ updateConnectionState(conex);
}
return msgs;
}
public void persistIncoming(DestinationSequence seq, RMMessage msg) {
+ verifyConnection();
+ SQLException conex = null;
try {
beginTransaction();
@@ -565,14 +632,19 @@ public class RMTxStore implements RMStor
commit();
} catch (SQLException ex) {
+ conex = ex;
abort();
throw new RMStoreException(ex);
} catch (IOException ex) {
abort();
throw new RMStoreException(ex);
- }
+ } finally {
+ updateConnectionState(conex);
+ }
}
public void persistOutgoing(SourceSequence seq, RMMessage msg) {
+ verifyConnection();
+ SQLException conex = null;
try {
beginTransaction();
@@ -585,15 +657,20 @@ public class RMTxStore implements RMStor
commit();
} catch (SQLException ex) {
+ conex = ex;
abort();
throw new RMStoreException(ex);
} catch (IOException ex) {
abort();
throw new RMStoreException(ex);
+ } finally {
+ updateConnectionState(conex);
}
}
public void removeMessages(Identifier sid, Collection<Long> messageNrs,
boolean outbound) {
+ verifyConnection();
+ SQLException conex = null;
try {
PreparedStatement stmt = outbound ? deleteOutboundMessageStmt :
deleteInboundMessageStmt;
beginTransaction();
@@ -608,9 +685,12 @@ public class RMTxStore implements RMStor
commit();
} catch (SQLException ex) {
+ conex = ex;
abort();
throw new RMStoreException(ex);
- }
+ } finally {
+ updateConnectionState(conex);
+ }
}
// transaction demarcation
@@ -686,6 +766,7 @@ public class RMTxStore implements RMStor
updateDestSequenceStmt.execute();
}
}
+
protected void createTables() throws SQLException {
Statement stmt = null;
@@ -902,7 +983,42 @@ public class RMTxStore implements RMStor
Connection getConnection() {
return connection;
}
-
+
+ private void verifyConnection() {
+ if (createdConnection && nextReconnectAttempt > 0
+ && (maxReconnectAttempts < 0 || maxReconnectAttempts >
reconnectAttempts)) {
+ if (System.currentTimeMillis() > nextReconnectAttempt) {
+ // destroy the broken connection
+ destroy();
+ // try to reconnect
+ reconnectAttempts++;
+ init();
+ // reset the next reconnect attempt time
+ nextReconnectAttempt = 0;
+ } else {
+ LogUtils.log(LOG, Level.INFO, "WAIT_RECONNECT_MSG");
+ }
+ }
+ }
+
+ private synchronized void updateConnectionState(SQLException e) {
+ if (e == null) {
+ // reset the previous error status
+ reconnectDelay = 0;
+ reconnectAttempts = 0;
+ nextReconnectAttempt = 0;
+ } else if (createdConnection && isRecoverableError(e)) {
+ // update the next reconnect schedule
+ if (reconnectDelay == 0) {
+ reconnectDelay = initialReconnectDelay;
+ }
+ if (nextReconnectAttempt < System.currentTimeMillis()) {
+ nextReconnectAttempt = System.currentTimeMillis() +
reconnectDelay;
+ reconnectDelay = reconnectDelay * useExponentialBackOff;
+ }
+ }
+ }
+
public static void deleteDatabaseFiles() {
deleteDatabaseFiles(DEFAULT_DATABASE_NAME, true);
}
@@ -990,4 +1106,9 @@ public class RMTxStore implements RMStor
return (null != tableExistsState &&
tableExistsState.equals(ex.getSQLState()))
|| tableExistsCode == ex.getErrorCode();
}
+
+ protected boolean isRecoverableError(SQLException ex) {
+ // check for a transient or non-transient connection exception
+ return ex.getSQLState() != null && ex.getSQLState().startsWith("08");
+ }
}
\ No newline at end of file
Modified:
cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/persistence/jdbc/RMTxStoreTest.java
URL:
http://svn.apache.org/viewvc/cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/persistence/jdbc/RMTxStoreTest.java?rev=1346840&r1=1346839&r2=1346840&view=diff
==============================================================================
---
cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/persistence/jdbc/RMTxStoreTest.java
(original)
+++
cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/persistence/jdbc/RMTxStoreTest.java
Wed Jun 6 12:01:02 2012
@@ -674,6 +674,58 @@ public class RMTxStoreTest extends Asser
store.removeMessages(sid1, msgNrs, false);
}
}
+
+ @Test
+ public void testReconnect() throws Exception {
+ // set the initial reconnect delay to 100 msec for testing
+ long ird = store.getInitialReconnectDelay();
+ store.setInitialReconnectDelay(100);
+
+ SourceSequence seq = control.createMock(SourceSequence.class);
+ Identifier sid1 = RMUtils.getWSRMFactory().createIdentifier();
+ sid1.setValue("sequence1");
+ EasyMock.expect(seq.getIdentifier()).andReturn(sid1);
+ EasyMock.expect(seq.getExpires()).andReturn(null);
+ EasyMock.expect(seq.getOfferingSequenceIdentifier()).andReturn(null);
+
EasyMock.expect(seq.getEndpointIdentifier()).andReturn(CLIENT_ENDPOINT_ID);
+
EasyMock.expect(seq.getProtocol()).andReturn(ProtocolVariation.RM10WSA200408);
+
+ // intentionally invalidate the connection
+ try {
+ store.getConnection().close();
+ } catch (SQLException ex) {
+ // ignore
+ }
+
+ control.replay();
+ try {
+ store.createSourceSequence(seq);
+ fail("Expected RMStoreException was not thrown.");
+ } catch (RMStoreException ex) {
+ SQLException se = (SQLException)ex.getCause();
+ // expects a transient or non-transient connection exception
+ assertTrue(se.getSQLState().startsWith("08"));
+ }
+
+ // wait 200 msecs to make sure an reconnect is attempted
+ Thread.sleep(200);
+
+ control.reset();
+ EasyMock.expect(seq.getIdentifier()).andReturn(sid1);
+ EasyMock.expect(seq.getExpires()).andReturn(null);
+ EasyMock.expect(seq.getOfferingSequenceIdentifier()).andReturn(null);
+
EasyMock.expect(seq.getEndpointIdentifier()).andReturn(CLIENT_ENDPOINT_ID);
+
EasyMock.expect(seq.getProtocol()).andReturn(ProtocolVariation.RM10WSA200408);
+
+ control.replay();
+ store.createSourceSequence(seq);
+ control.verify();
+
+ // revert to the old initial reconnect delay
+ store.setInitialReconnectDelay(ird);
+
+ store.removeSourceSequence(sid1);
+ }
private Identifier setupDestinationSequence(String s) throws IOException,
SQLException {
DestinationSequence seq =
control.createMock(DestinationSequence.class);