Repository: cxf Updated Branches: refs/heads/3.0.x-fixes f7f22c68a -> b096f12f3
[CXF-6886] CXF 3.x WSRM attachments are not retransmitted (modified patch based on kai's patches) Project: http://git-wip-us.apache.org/repos/asf/cxf/repo Commit: http://git-wip-us.apache.org/repos/asf/cxf/commit/62d67aa2 Tree: http://git-wip-us.apache.org/repos/asf/cxf/tree/62d67aa2 Diff: http://git-wip-us.apache.org/repos/asf/cxf/diff/62d67aa2 Branch: refs/heads/3.0.x-fixes Commit: 62d67aa204e6a3218007581fbc5b65851811f973 Parents: f7f22c6 Author: Akitoshi Yoshida <a...@apache.org> Authored: Mon May 2 13:35:15 2016 +0200 Committer: Akitoshi Yoshida <a...@apache.org> Committed: Wed May 4 12:42:55 2016 +0200 ---------------------------------------------------------------------- .../cxf/ws/rm/RMCaptureOutInterceptor.java | 5 +- .../java/org/apache/cxf/ws/rm/RMManager.java | 6 +- .../cxf/ws/rm/persistence/PersistenceUtils.java | 61 ++++++- .../apache/cxf/ws/rm/persistence/RMMessage.java | 23 +++ .../cxf/ws/rm/persistence/jdbc/RMTxStore.java | 159 ++++--------------- .../org/apache/cxf/ws/rm/RMManagerTest.java | 141 +++++++++++++++- .../ws/rm/persistence/PersistenceUtilsTest.java | 109 +++++++++++++ .../ws/rm/persistence/SerializedRMMessage.txt | 14 ++ .../rm/persistence/jdbc/RMTxStoreTestBase.java | 11 +- 9 files changed, 386 insertions(+), 143 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cxf/blob/62d67aa2/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMCaptureOutInterceptor.java ---------------------------------------------------------------------- diff --git a/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMCaptureOutInterceptor.java b/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMCaptureOutInterceptor.java index 8f1df9e..a580071 100644 --- a/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMCaptureOutInterceptor.java +++ b/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMCaptureOutInterceptor.java @@ -54,6 +54,7 @@ import org.apache.cxf.service.model.OperationInfo; import org.apache.cxf.ws.addressing.AddressingProperties; import org.apache.cxf.ws.addressing.AttributedURIType; import org.apache.cxf.ws.addressing.ContextUtils; +import org.apache.cxf.ws.rm.persistence.PersistenceUtils; import org.apache.cxf.ws.rm.persistence.RMMessage; import org.apache.cxf.ws.rm.persistence.RMStore; import org.apache.cxf.ws.rm.v200702.Identifier; @@ -272,7 +273,9 @@ public class RMCaptureOutInterceptor extends AbstractRMInterceptor<Message> { msg.setTo(maps.getTo().getValue()); } } - msg.setContent(bis); + // serializes the message content and the attachments into + // the RMMessage content + PersistenceUtils.encodeRMContent(msg, message, bis); store.persistOutgoing(ss, msg); } http://git-wip-us.apache.org/repos/asf/cxf/blob/62d67aa2/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMManager.java ---------------------------------------------------------------------- diff --git a/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMManager.java b/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMManager.java index 4442ec5..fc24db4 100644 --- a/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMManager.java +++ b/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMManager.java @@ -69,6 +69,7 @@ import org.apache.cxf.ws.rm.manager.DestinationPolicyType; import org.apache.cxf.ws.rm.manager.RM10AddressingNamespaceType; import org.apache.cxf.ws.rm.manager.SequenceTerminationPolicyType; import org.apache.cxf.ws.rm.manager.SourcePolicyType; +import org.apache.cxf.ws.rm.persistence.PersistenceUtils; import org.apache.cxf.ws.rm.persistence.RMMessage; import org.apache.cxf.ws.rm.persistence.RMStore; import org.apache.cxf.ws.rm.policy.RMPolicyUtilities; @@ -618,7 +619,10 @@ public class RMManager { } try { - message.put(RMMessageConstants.SAVED_CONTENT, RewindableInputStream.makeRewindable(m.getContent())); + // RMMessage is stored in a serialized way, therefore + // RMMessage content must be splitted into soap root message + // and attachments + PersistenceUtils.decodeRMContent(m, message); RMContextUtils.setProtocolVariation(message, ss.getProtocol()); retransmissionQueue.addUnacknowledged(message); } catch (IOException e) { http://git-wip-us.apache.org/repos/asf/cxf/blob/62d67aa2/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/persistence/PersistenceUtils.java ---------------------------------------------------------------------- diff --git a/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/persistence/PersistenceUtils.java b/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/persistence/PersistenceUtils.java index c4e8e7a..0981f8e 100644 --- a/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/persistence/PersistenceUtils.java +++ b/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/persistence/PersistenceUtils.java @@ -19,16 +19,26 @@ package org.apache.cxf.ws.rm.persistence; +import java.io.IOException; import java.io.InputStream; +import java.io.OutputStream; import javax.xml.bind.JAXBContext; import javax.xml.bind.JAXBElement; import javax.xml.bind.JAXBException; import javax.xml.stream.XMLStreamReader; +import org.apache.cxf.attachment.AttachmentDeserializer; +import org.apache.cxf.attachment.AttachmentSerializer; import org.apache.cxf.common.util.PackageUtils; +import org.apache.cxf.helpers.IOUtils; import org.apache.cxf.helpers.LoadingByteArrayOutputStream; +import org.apache.cxf.io.CachedOutputStream; +import org.apache.cxf.message.Message; +import org.apache.cxf.message.MessageImpl; import org.apache.cxf.staxutils.StaxUtils; +import org.apache.cxf.ws.rm.RMMessageConstants; +import org.apache.cxf.ws.rm.RewindableInputStream; import org.apache.cxf.ws.rm.v200702.SequenceAcknowledgement; /** @@ -51,7 +61,7 @@ public final class PersistenceUtils { } return instance; } - + public SequenceAcknowledgement deserialiseAcknowledgment(InputStream is) { Object obj = null; XMLStreamReader reader = StaxUtils.createXMLStreamReader(is); @@ -68,14 +78,14 @@ public final class PersistenceUtils { StaxUtils.close(reader); is.close(); } catch (Throwable t) { - //ignore, just cleaning up + // ignore, just cleaning up } } return (SequenceAcknowledgement)obj; } - + public InputStream serialiseAcknowledgment(SequenceAcknowledgement ack) { - LoadingByteArrayOutputStream bos = new LoadingByteArrayOutputStream(); + LoadingByteArrayOutputStream bos = new LoadingByteArrayOutputStream(); try { getContext().createMarshaller().marshal(ack, bos); } catch (JAXBException ex) { @@ -83,7 +93,7 @@ public final class PersistenceUtils { } return bos.createInputStream(); } - + private JAXBContext getContext() throws JAXBException { if (null == context) { context = JAXBContext.newInstance(PackageUtils @@ -92,4 +102,45 @@ public final class PersistenceUtils { } return context; } + + public static void encodeRMContent(RMMessage rmmsg, Message msg, InputStream msgContent) + throws IOException { + if (msg.getAttachments() == null) { + rmmsg.setContentType((String)msg.get(Message.CONTENT_TYPE)); + rmmsg.setContent(msgContent); + } else { + MessageImpl msgImpl1 = new MessageImpl(); + // using cached output stream to handle large files + CachedOutputStream cos = new CachedOutputStream(); + msgImpl1.setContent(OutputStream.class, cos); + msgImpl1.setAttachments(msg.getAttachments()); + msgImpl1.put(Message.CONTENT_TYPE, (String) msg.get(Message.CONTENT_TYPE)); + msgImpl1.setContent(InputStream.class, msgContent); + AttachmentSerializer serializer = new AttachmentSerializer(msgImpl1); + serializer.setXop(false); + serializer.writeProlog(); + // write soap root message into cached output stream + IOUtils.copyAndCloseInput(msgContent, cos); + serializer.writeAttachments(); + rmmsg.setContentType((String) msgImpl1.get(Message.CONTENT_TYPE)); + + //TODO will pass the cos instance to rmmessage in the future + rmmsg.setContent(cos.getInputStream()); + } + } + + public static void decodeRMContent(RMMessage rmmsg, Message msg) throws IOException { + String contentType = rmmsg.getContentType(); + if ((null != contentType) && contentType.startsWith("multipart/related")) { + msg.put(Message.CONTENT_TYPE, contentType); + msg.setContent(InputStream.class, rmmsg.getContent()); + AttachmentDeserializer ad = new AttachmentDeserializer(msg); + ad.initializeAttachments(); + } else { + msg.setContent(InputStream.class, rmmsg.getContent()); + } + InputStream is = RewindableInputStream.makeRewindable(msg.getContent(InputStream.class)); + msg.setContent(InputStream.class, is); + msg.put(RMMessageConstants.SAVED_CONTENT, is); + } } http://git-wip-us.apache.org/repos/asf/cxf/blob/62d67aa2/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/persistence/RMMessage.java ---------------------------------------------------------------------- diff --git a/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/persistence/RMMessage.java b/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/persistence/RMMessage.java index 4e91208..abab221 100644 --- a/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/persistence/RMMessage.java +++ b/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/persistence/RMMessage.java @@ -25,7 +25,9 @@ import java.util.List; public class RMMessage { private InputStream content; + //TODO remove attachments when we remove the deprecated attachments related methods private List<InputStream> attachments = Collections.emptyList(); + private String contentType; private long messageNumber; private String to; @@ -82,7 +84,9 @@ public class RMMessage { /** * Returns the list of attachments. * @return list (non-null) + * @deprecated not used as the optional attachments are stored in the content */ + @Deprecated public List<InputStream> getAttachments() { return attachments; } @@ -90,9 +94,28 @@ public class RMMessage { /** * Set the list of attachments. * @param attaches (non-null) + * @deprecated not used as the optional attachments are stored in the content */ + @Deprecated public void setAttachments(List<InputStream> attaches) { assert attaches != null; attachments = attaches; } + + /** + * Returns the content type of the message content + * @return + */ + public String getContentType() { + return contentType; + } + + /** + * Set the content type of the RMMessage + * @param contentType + */ + public void setContentType(String contentType) { + this.contentType = contentType; + } + } http://git-wip-us.apache.org/repos/asf/cxf/blob/62d67aa2/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/persistence/jdbc/RMTxStore.java ---------------------------------------------------------------------- diff --git a/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/persistence/jdbc/RMTxStore.java b/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/persistence/jdbc/RMTxStore.java index ef01405..cd19d7b 100644 --- a/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/persistence/jdbc/RMTxStore.java +++ b/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/persistence/jdbc/RMTxStore.java @@ -88,22 +88,13 @@ public class RMTxStore implements RMStore { = {{"SEQ_ID", "VARCHAR(256) NOT NULL"}, {"MSG_NO", "DECIMAL(19, 0) NOT NULL"}, {"SEND_TO", "VARCHAR(256)"}, - {"CONTENT", "BLOB"}}; + {"CONTENT", "BLOB"}, + {"CONTENT_TYPE", "VARCHAR(1024)"}}; private static final String[] MESSAGES_TABLE_KEYS = {"SEQ_ID", "MSG_NO"}; - private static final String[][] ATTACHMENTS_TABLE_COLS - = {{"SEQ_ID", "VARCHAR(256) NOT NULL"}, - {"MSG_NO", "DECIMAL(19, 0) NOT NULL"}, - {"ATTACHMENT_NO", "DECIMAL(19, 0) NOT NULL"}, - {"DATA", "BLOB"}}; - private static final String[] ATTACHMENTS_TABLE_KEYS = {"SEQ_ID", "MSG_NO", "ATTACHMENT_NO"}; - private static final String DEST_SEQUENCES_TABLE_NAME = "CXF_RM_DEST_SEQUENCES"; private static final String SRC_SEQUENCES_TABLE_NAME = "CXF_RM_SRC_SEQUENCES"; private static final String INBOUND_MSGS_TABLE_NAME = "CXF_RM_INBOUND_MESSAGES"; private static final String OUTBOUND_MSGS_TABLE_NAME = "CXF_RM_OUTBOUND_MESSAGES"; - private static final String INBOUND_ATTS_TABLE_NAME = "CXF_RM_INBOUND_ATTACHMENTS"; - private static final String OUTBOUND_ATTS_TABLE_NAME = "CXF_RM_OUTBOUND_ATTACHMENTS"; - private static final String CREATE_DEST_SEQUENCES_TABLE_STMT = buildCreateTableStatement(DEST_SEQUENCES_TABLE_NAME, DEST_SEQUENCES_TABLE_COLS, DEST_SEQUENCES_TABLE_KEYS); @@ -113,9 +104,6 @@ public class RMTxStore implements RMStore { SRC_SEQUENCES_TABLE_COLS, SRC_SEQUENCES_TABLE_KEYS); private static final String CREATE_MESSAGES_TABLE_STMT = buildCreateTableStatement("{0}", MESSAGES_TABLE_COLS, MESSAGES_TABLE_KEYS); - private static final String CREATE_ATTACHMENTS_TABLE_STMT = - buildCreateTableStatement("{0}", ATTACHMENTS_TABLE_COLS, ATTACHMENTS_TABLE_KEYS); - private static final String CREATE_DEST_SEQUENCE_STMT_STR = "INSERT INTO CXF_RM_DEST_SEQUENCES " + "(SEQ_ID, ACKS_TO, ENDPOINT_ID, PROTOCOL_VERSION) " @@ -133,13 +121,9 @@ public class RMTxStore implements RMStore { private static final String UPDATE_SRC_SEQUENCE_STMT_STR = "UPDATE CXF_RM_SRC_SEQUENCES SET CUR_MSG_NO = ?, LAST_MSG = ? WHERE SEQ_ID = ?"; private static final String CREATE_MESSAGE_STMT_STR - = "INSERT INTO {0} (SEQ_ID, MSG_NO, SEND_TO, CONTENT) VALUES(?, ?, ?, ?)"; + = "INSERT INTO {0} (SEQ_ID, MSG_NO, SEND_TO, CONTENT, CONTENT_TYPE) VALUES(?, ?, ?, ?, ?)"; private static final String DELETE_MESSAGE_STMT_STR = "DELETE FROM {0} WHERE SEQ_ID = ? AND MSG_NO = ?"; - private static final String CREATE_ATTACHMENT_STMT_STR - = "INSERT INTO {0} (SEQ_ID, MSG_NO, ATTACHMENT_NO, DATA) VALUES(?, ?, ?, ?)"; - private static final String DELETE_ATTACHMENTS_STMT_STR = - "DELETE FROM {0} WHERE SEQ_ID = ? AND MSG_NO = ?"; private static final String SELECT_DEST_SEQUENCE_STMT_STR = "SELECT ACKS_TO, LAST_MSG_NO, PROTOCOL_VERSION, ACKNOWLEDGED FROM CXF_RM_DEST_SEQUENCES " + "WHERE SEQ_ID = ?"; @@ -153,9 +137,7 @@ public class RMTxStore implements RMStore { "SELECT SEQ_ID, CUR_MSG_NO, LAST_MSG, EXPIRY, OFFERING_SEQ_ID, PROTOCOL_VERSION " + "FROM CXF_RM_SRC_SEQUENCES WHERE ENDPOINT_ID = ?"; private static final String SELECT_MESSAGES_STMT_STR = - "SELECT MSG_NO, SEND_TO, CONTENT FROM {0} WHERE SEQ_ID = ?"; - private static final String SELECT_ATTACHMENTS_STMT_STR = - "SELECT ATTACHMENT_NO, DATA FROM {0} WHERE SEQ_ID = ? AND MSG_NO = ?"; + "SELECT MSG_NO, SEND_TO, CONTENT, CONTENT_TYPE FROM {0} WHERE SEQ_ID = ?"; private static final String ALTER_TABLE_STMT_STR = "ALTER TABLE {0} ADD {1} {2}"; private static final String CREATE_INBOUND_MESSAGE_STMT_STR = @@ -170,19 +152,6 @@ public class RMTxStore implements RMStore { MessageFormat.format(SELECT_MESSAGES_STMT_STR, INBOUND_MSGS_TABLE_NAME); private static final String SELECT_OUTBOUND_MESSAGES_STMT_STR = MessageFormat.format(SELECT_MESSAGES_STMT_STR, OUTBOUND_MSGS_TABLE_NAME); - private static final String CREATE_INBOUND_ATTACHMENT_STMT_STR = - MessageFormat.format(CREATE_ATTACHMENT_STMT_STR, INBOUND_ATTS_TABLE_NAME); - private static final String CREATE_OUTBOUND_ATTACHMENT_STMT_STR = - MessageFormat.format(CREATE_ATTACHMENT_STMT_STR, OUTBOUND_ATTS_TABLE_NAME); - private static final String DELETE_INBOUND_ATTACHMENTS_STMT_STR = - MessageFormat.format(DELETE_ATTACHMENTS_STMT_STR, INBOUND_ATTS_TABLE_NAME); - private static final String DELETE_OUTBOUND_ATTACHMENTS_STMT_STR = - MessageFormat.format(DELETE_ATTACHMENTS_STMT_STR, OUTBOUND_ATTS_TABLE_NAME); - private static final String SELECT_INBOUND_ATTACHMENTS_STMT_STR = - MessageFormat.format(SELECT_ATTACHMENTS_STMT_STR, INBOUND_ATTS_TABLE_NAME); - private static final String SELECT_OUTBOUND_ATTACHMENTS_STMT_STR = - MessageFormat.format(SELECT_ATTACHMENTS_STMT_STR, OUTBOUND_ATTS_TABLE_NAME); - // create_schema may not work for several reasons, if so, create one manually private static final String CREATE_SCHEMA_STMT_STR = "CREATE SCHEMA {0}"; // given the schema, try these standard statements to switch to the schema @@ -613,44 +582,33 @@ public class RMTxStore implements RMStore { public Collection<RMMessage> getMessages(Identifier sid, boolean outbound) { Connection con = verifyConnection(); - PreparedStatement stmt1 = null; - PreparedStatement stmt2 = null; + PreparedStatement stmt = null; SQLException conex = null; Collection<RMMessage> msgs = new ArrayList<RMMessage>(); - ResultSet res1 = null; - ResultSet res2 = null; + ResultSet res = null; try { - stmt1 = getStatement(con, outbound ? SELECT_OUTBOUND_MESSAGES_STMT_STR : SELECT_INBOUND_MESSAGES_STMT_STR); + stmt = getStatement(con, outbound ? SELECT_OUTBOUND_MESSAGES_STMT_STR : SELECT_INBOUND_MESSAGES_STMT_STR); - stmt1.setString(1, sid.getValue()); - res1 = stmt1.executeQuery(); - while (res1.next()) { - long mn = res1.getLong(1); - String to = res1.getString(2); - Blob blob = res1.getBlob(3); + stmt.setString(1, sid.getValue()); + res = stmt.executeQuery(); + while (res.next()) { + long mn = res.getLong(1); + String to = res.getString(2); + Blob blob = res.getBlob(3); + String contentType = res.getString(4); RMMessage msg = new RMMessage(); msg.setMessageNumber(mn); msg.setTo(to); msg.setContent(blob.getBinaryStream()); + msg.setContentType(contentType); msgs.add(msg); - stmt2 = getStatement(con, outbound - ? SELECT_OUTBOUND_ATTACHMENTS_STMT_STR : SELECT_INBOUND_ATTACHMENTS_STMT_STR); - stmt2.setString(1, sid.getValue()); - stmt2.setLong(2, mn); - res2 = stmt2.executeQuery(); - List<InputStream> attaches = new ArrayList<InputStream>(); - while (res2.next()) { - attaches.add(res2.getBinaryStream(1)); - } - msg.setAttachments(attaches); } } catch (SQLException ex) { conex = ex; LOG.log(Level.WARNING, new Message(outbound ? "SELECT_OUTBOUND_MSGS_FAILED_MSG" : "SELECT_INBOUND_MSGS_FAILED_MSG", LOG).toString(), ex); } finally { - releaseResources(stmt2, res2); - releaseResources(stmt1, res1); + releaseResources(stmt, res); updateConnectionState(con, conex); } return msgs; @@ -709,24 +667,17 @@ public class RMTxStore implements RMStore { public void removeMessages(Identifier sid, Collection<Long> messageNrs, boolean outbound) { Connection con = verifyConnection(); - PreparedStatement stmt1 = null; - PreparedStatement stmt2 = null; + PreparedStatement stmt = null; SQLException conex = null; try { - stmt1 = getStatement(con, outbound ? DELETE_OUTBOUND_MESSAGE_STMT_STR : DELETE_INBOUND_MESSAGE_STMT_STR); - stmt2 = getStatement(con, outbound - ? DELETE_OUTBOUND_ATTACHMENTS_STMT_STR : DELETE_INBOUND_ATTACHMENTS_STMT_STR); - + stmt = getStatement(con, outbound ? DELETE_OUTBOUND_MESSAGE_STMT_STR : DELETE_INBOUND_MESSAGE_STMT_STR); beginTransaction(); - stmt1.setString(1, sid.getValue()); - stmt2.setString(1, sid.getValue()); - + stmt.setString(1, sid.getValue()); + for (Long messageNr : messageNrs) { - stmt2.setLong(2, messageNr); - stmt2.execute(); - stmt1.setLong(2, messageNr); - stmt1.execute(); + stmt.setLong(2, messageNr); + stmt.execute(); } commit(con); @@ -736,8 +687,7 @@ public class RMTxStore implements RMStore { abort(con); throw new RMStoreException(ex); } finally { - releaseResources(stmt2, null); - releaseResources(stmt1, null); + releaseResources(stmt, null); updateConnectionState(con, conex); } } @@ -779,47 +729,28 @@ public class RMTxStore implements RMStore { String id = sid.getValue(); long nr = msg.getMessageNumber(); String to = msg.getTo(); + String contentType = msg.getContentType(); if (LOG.isLoggable(Level.FINE)) { LOG.log(Level.FINE, "Storing {0} message number {1} for sequence {2}, to = {3}", new Object[] {outbound ? "outbound" : "inbound", nr, id, to}); } - PreparedStatement stmt1 = null; - PreparedStatement stmt2 = null; + PreparedStatement stmt = null; try { InputStream msgin = msg.getContent(); - stmt1 = getStatement(con, outbound ? CREATE_OUTBOUND_MESSAGE_STMT_STR : CREATE_INBOUND_MESSAGE_STMT_STR); + stmt = getStatement(con, outbound ? CREATE_OUTBOUND_MESSAGE_STMT_STR : CREATE_INBOUND_MESSAGE_STMT_STR); - stmt1.setString(1, id); - stmt1.setLong(2, nr); - stmt1.setString(3, to); - stmt1.setBinaryStream(4, msgin); - stmt1.execute(); - - List<InputStream> attachments = msg.getAttachments(); - if (attachments.size() > 0) { - stmt2 = getStatement(con, outbound - ? CREATE_OUTBOUND_ATTACHMENT_STMT_STR : CREATE_INBOUND_ATTACHMENT_STMT_STR); - stmt2.setString(1, id); - stmt2.setLong(2, nr); - for (int i = 0; i < attachments.size(); i++) { - stmt2.setLong(3, i); - stmt2.setBinaryStream(4, attachments.get(i)); - stmt2.execute(); - if (LOG.isLoggable(Level.FINE)) { - LOG.log(Level.FINE, - "Successfully stored {0} attachment {1} for message number {2} in sequence {3}", - new Object[] {outbound ? "outbound" : "inbound", i, nr, id}); - } - } - } - + stmt.setString(1, id); + stmt.setLong(2, nr); + stmt.setString(3, to); + stmt.setBinaryStream(4, msgin); + stmt.setString(5, contentType); + stmt.execute(); if (LOG.isLoggable(Level.FINE)) { LOG.log(Level.FINE, "Successfully stored {0} message number {1} for sequence {2}", new Object[] {outbound ? "outbound" : "inbound", nr, id}); } } finally { - releaseResources(stmt1, null); - releaseResources(stmt2, null); + releaseResources(stmt, null); } } @@ -930,24 +861,6 @@ public class RMTxStore implements RMStore { stmt.close(); } } - - for (String tableName : new String[] {OUTBOUND_ATTS_TABLE_NAME, INBOUND_ATTS_TABLE_NAME}) { - stmt = con.createStatement(); - try { - stmt.executeUpdate(MessageFormat.format(CREATE_ATTACHMENTS_TABLE_STMT, tableName)); - } catch (SQLException ex) { - if (!isTableExistsError(ex)) { - throw ex; - } else { - if (LOG.isLoggable(Level.FINE)) { - LOG.fine("Table " + tableName + " already exists."); - } - verifyTable(con, tableName, ATTACHMENTS_TABLE_COLS); - } - } finally { - stmt.close(); - } - } } finally { con.setAutoCommit(false); if (connection == null && con != null) { @@ -1137,12 +1050,6 @@ public class RMTxStore implements RMStore { cacheStatement(connection, DELETE_OUTBOUND_MESSAGE_STMT_STR); cacheStatement(connection, SELECT_INBOUND_MESSAGES_STMT_STR); cacheStatement(connection, SELECT_OUTBOUND_MESSAGES_STMT_STR); - cacheStatement(connection, CREATE_INBOUND_ATTACHMENT_STMT_STR); - cacheStatement(connection, CREATE_OUTBOUND_ATTACHMENT_STMT_STR); - cacheStatement(connection, DELETE_INBOUND_ATTACHMENTS_STMT_STR); - cacheStatement(connection, DELETE_OUTBOUND_ATTACHMENTS_STMT_STR); - cacheStatement(connection, SELECT_INBOUND_ATTACHMENTS_STMT_STR); - cacheStatement(connection, SELECT_OUTBOUND_ATTACHMENTS_STMT_STR); } public synchronized void init() { http://git-wip-us.apache.org/repos/asf/cxf/blob/62d67aa2/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/RMManagerTest.java ---------------------------------------------------------------------- diff --git a/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/RMManagerTest.java b/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/RMManagerTest.java index 25c149b..a774e36 100644 --- a/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/RMManagerTest.java +++ b/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/RMManagerTest.java @@ -20,6 +20,7 @@ package org.apache.cxf.ws.rm; import java.io.ByteArrayInputStream; +import java.io.IOException; import java.io.InputStream; import java.lang.reflect.Method; import java.util.ArrayList; @@ -69,7 +70,10 @@ import org.junit.Before; import org.junit.Test; public class RMManagerTest extends Assert { - + + private static final String MULTIPART_TYPE = "multipart/related; type=\"text/xml\";" + + " boundary=\"uuid:74b6a245-2e17-40eb-a86c-308664e18460\"; start=\"<root." + + "mess...@cxf.apache.org>\"; start-info=\"application/soap+xml\""; private MyControl control; private RMManager manager; @@ -541,6 +545,117 @@ public class RMManagerTest extends Assert { assertNotNull(msg.getExchange()); assertSame(msg, msg.getExchange().getOutMessage()); } + + @Test + public void testRecoverReliableClientEndpointWithAttachment() throws NoSuchMethodException, IOException { + Method method = RMManager.class.getDeclaredMethod("createReliableEndpoint", + new Class[] {Endpoint.class}); + manager = control.createMock(RMManager.class, new Method[] {method}); + manager.setReliableEndpointsMap(new HashMap<Endpoint, RMEndpoint>()); + Endpoint endpoint = control.createMock(Endpoint.class); + EndpointInfo ei = control.createMock(EndpointInfo.class); + ServiceInfo si = control.createMock(ServiceInfo.class); + BindingInfo bi = control.createMock(BindingInfo.class); + InterfaceInfo ii = control.createMock(InterfaceInfo.class); + setUpEndpointForRecovery(endpoint, ei, si, bi, ii); + Conduit conduit = control.createMock(Conduit.class); + SourceSequence ss = control.createMock(SourceSequence.class); + DestinationSequence ds = control.createMock(DestinationSequence.class); + RMMessage m1 = new RMMessage(); + InputStream fis = getClass().getResourceAsStream("persistence/SerializedRMMessage.txt"); + m1.setContent(fis); + m1.setTo("toAddress"); + m1.setMessageNumber(new Long(10)); + m1.setContentType(MULTIPART_TYPE); + Capture<Message> mc = Capture.newInstance(); + + setUpRecoverReliableEndpointWithAttachment(endpoint, conduit, ss, ds, m1, mc); + control.replay(); + manager.recoverReliableEndpoint(endpoint, conduit); + control.verify(); + + Message msg = mc.getValue(); + assertNotNull(msg); + assertNotNull(msg.getExchange()); + assertSame(msg, msg.getExchange().getOutMessage()); + + InputStream is = (InputStream) msg.get(RMMessageConstants.SAVED_CONTENT); + assertStartsWith(is, "<soap:Envelope"); + assertEquals(1, msg.getAttachments().size()); + } + + void setUpRecoverReliableEndpointWithAttachment(Endpoint endpoint, + Conduit conduit, + SourceSequence ss, + DestinationSequence ds, RMMessage m, + Capture<Message> mc) + throws IOException { + RMStore store = control.createMock(RMStore.class); + RetransmissionQueue queue = control.createMock(RetransmissionQueue.class); + manager.setStore(store); + manager.setRetransmissionQueue(queue); + + Collection<SourceSequence> sss = new ArrayList<SourceSequence>(); + if (null != ss) { + sss.add(ss); + } + EasyMock.expect(store.getSourceSequences("{S}s.{P}p@cxf")) + .andReturn(sss); + if (null == ss) { + return; + } + + Collection<DestinationSequence> dss = new ArrayList<DestinationSequence>(); + if (null != ds) { + dss.add(ds); + } + EasyMock.expect(store.getDestinationSequences("{S}s.{P}p@cxf")) + .andReturn(dss); + if (null == ds) { + return; + } + + Collection<RMMessage> ms = new ArrayList<RMMessage>(); + if (null != m) { + ms.add(m); + } + Identifier id = new Identifier(); + id.setValue("S1"); + EasyMock.expect(ss.getIdentifier()).andReturn(id).times(null == m ? 1 : 2); + EasyMock.expect(ss.getProtocol()).andReturn(ProtocolVariation.RM10WSA200408).anyTimes(); + EasyMock.expect(store.getMessages(id, true)).andReturn(ms); + + + RMEndpoint rme = control.createMock(RMEndpoint.class); + EasyMock.expect(manager.createReliableEndpoint(endpoint)) + .andReturn(rme); + Source source = control.createMock(Source.class); + EasyMock.expect(rme.getSource()).andReturn(source).anyTimes(); + + Destination destination = control.createMock(Destination.class); + EasyMock.expect(rme.getDestination()).andReturn(destination); + destination.addSequence(ds, false); + EasyMock.expectLastCall(); + + Service service = control.createMock(Service.class); + EasyMock.expect(endpoint.getService()).andReturn(service).anyTimes(); + Binding binding = control.createMock(Binding.class); + EasyMock.expect(endpoint.getBinding()).andReturn(binding).anyTimes(); + + EasyMock.expect(ss.isLastMessage()).andReturn(true).anyTimes(); + EasyMock.expect(ss.getCurrentMessageNr()).andReturn(new Long(10)).anyTimes(); + if (null == m) { + return; + } + + queue.addUnacknowledged(EasyMock.capture(mc)); + + EasyMock.expectLastCall(); + queue.start(); + EasyMock.expectLastCall(); + } + + Endpoint setUpEndpointForRecovery(Endpoint endpoint, EndpointInfo ei, @@ -552,6 +667,7 @@ public class RMManagerTest extends Assert { EasyMock.expect(si.getName()).andReturn(new QName("S", "s")).anyTimes(); EasyMock.expect(ei.getName()).andReturn(new QName("P", "p")).anyTimes(); EasyMock.expect(si.getInterface()).andReturn(ii).anyTimes(); + EasyMock.expect(ei.getBinding()).andReturn(bi).anyTimes(); return endpoint; } @@ -681,6 +797,25 @@ public class RMManagerTest extends Assert { return mock; } - + + } + // just read the begining of the input and compare it against the specified string + private static boolean assertStartsWith(InputStream in, String starting) { + assertNotNull(in); + byte[] buf = new byte[starting.length()]; + try { + in.read(buf, 0, buf.length); + assertEquals(starting, new String(buf, "utf-8")); + return true; + } catch (IOException e) { + // ignore + } finally { + try { + in.close(); + } catch (IOException e) { + // ignore + } + } + return false; } -} +} http://git-wip-us.apache.org/repos/asf/cxf/blob/62d67aa2/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/persistence/PersistenceUtilsTest.java ---------------------------------------------------------------------- diff --git a/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/persistence/PersistenceUtilsTest.java b/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/persistence/PersistenceUtilsTest.java index 47e197b..c0667fb 100644 --- a/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/persistence/PersistenceUtilsTest.java +++ b/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/persistence/PersistenceUtilsTest.java @@ -19,8 +19,19 @@ package org.apache.cxf.ws.rm.persistence; +import java.io.ByteArrayInputStream; +import java.io.IOException; import java.io.InputStream; +import java.util.ArrayList; +import java.util.Collection; +import javax.activation.DataHandler; +import javax.mail.util.ByteArrayDataSource; + +import org.apache.cxf.attachment.AttachmentImpl; +import org.apache.cxf.message.Attachment; +import org.apache.cxf.message.Message; +import org.apache.cxf.message.MessageImpl; import org.apache.cxf.ws.rm.v200702.SequenceAcknowledgement; import org.apache.cxf.ws.rm.v200702.SequenceAcknowledgement.AcknowledgementRange; @@ -31,6 +42,13 @@ import org.junit.Test; * */ public class PersistenceUtilsTest extends Assert { + + private static final String MULTIPART_TYPE = "multipart/related; type=\"text/xml\";" + + " boundary=\"uuid:74b6a245-2e17-40eb-a86c-308664e18460\"; start=\"<root." + + "mess...@cxf.apache.org>\"; start-info=\"application/soap+xml\""; + + private static final String SOAP_PART = "<soap:Envelope xmlns:soap=\"http://schemas.xmlsoap.org/soap/envelope/\">" + + "<data/></soap:Envelope>"; @Test public void testSerialiseDeserialiseAcknowledgement() { @@ -47,4 +65,95 @@ public class PersistenceUtilsTest extends Assert { assertEquals(range.getLower(), refRange.getLower()); assertEquals(range.getUpper(), refRange.getUpper()); } + + @Test + public void testEncodeRMContent() throws Exception { + ByteArrayInputStream bis = new ByteArrayInputStream(SOAP_PART.getBytes()); + + RMMessage rmmsg = new RMMessage(); + Message messageImpl = new MessageImpl(); + messageImpl.put(Message.CONTENT_TYPE, "text/xml"); + // update rmmessage + PersistenceUtils.encodeRMContent(rmmsg, messageImpl, bis); + + assertStartsWith(rmmsg.getContent(), "<soap:"); + assertNotNull(rmmsg.getContentType()); + assertTrue(rmmsg.getContentType().startsWith("text/xml")); + } + + @Test + public void testEncodeRMContentWithAttachments() throws Exception { + ByteArrayInputStream bis = new ByteArrayInputStream(SOAP_PART.getBytes()); + + RMMessage rmmsg = new RMMessage(); + Message messageImpl = new MessageImpl(); + messageImpl.put(Message.CONTENT_TYPE, "text/xml"); + // add attachments + addAttachment(messageImpl); + // update rmmessage + PersistenceUtils.encodeRMContent(rmmsg, messageImpl, bis); + + assertStartsWith(rmmsg.getContent(), "--uuid:"); + assertNotNull(rmmsg.getContentType()); + assertTrue(rmmsg.getContentType().startsWith("multipart/related")); + } + + @Test + public void testEncodeDecodeRMContent() throws Exception { + ByteArrayInputStream bis = new ByteArrayInputStream(SOAP_PART.getBytes()); + RMMessage rmmsg = new RMMessage(); + Message messageImpl = new MessageImpl(); + messageImpl.put(Message.CONTENT_TYPE, "text/xml"); + // add attachments + addAttachment(messageImpl); + // serialize + PersistenceUtils.encodeRMContent(rmmsg, messageImpl, bis); + + Message messageImplRestored = new MessageImpl(); + PersistenceUtils.decodeRMContent(rmmsg, messageImplRestored); + assertEquals(1, messageImplRestored.getAttachments().size()); + + assertStartsWith(messageImplRestored.getContent(InputStream.class), SOAP_PART); + } + + @Test + public void testDecodeRMContentWithAttachment() throws Exception { + InputStream is = getClass().getResourceAsStream("SerializedRMMessage.txt"); + RMMessage msg = new RMMessage(); + msg.setContent(is); + msg.setContentType(MULTIPART_TYPE); + Message messageImpl = new MessageImpl(); + PersistenceUtils.decodeRMContent(msg, messageImpl); + + assertEquals(1, messageImpl.getAttachments().size()); + assertStartsWith(messageImpl.getContent(InputStream.class), "<soap:Envelope"); + } + + private static void addAttachment(Message msg) throws IOException { + Collection<Attachment> attachments = new ArrayList<Attachment>(); + DataHandler dh = new DataHandler(new ByteArrayDataSource("hello world!", "text/plain")); + Attachment a = new AttachmentImpl("test.xml", dh); + attachments.add(a); + msg.setAttachments(attachments); + } + + // just read the begining of the input and compare it against the specified string + private static boolean assertStartsWith(InputStream in, String starting) { + assertNotNull(in); + byte[] buf = new byte[starting.length()]; + try { + in.read(buf, 0, buf.length); + assertEquals(starting, new String(buf, "utf-8")); + return true; + } catch (IOException e) { + // ignore + } finally { + try { + in.close(); + } catch (IOException e) { + // ignore + } + } + return false; + } } http://git-wip-us.apache.org/repos/asf/cxf/blob/62d67aa2/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/persistence/SerializedRMMessage.txt ---------------------------------------------------------------------- diff --git a/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/persistence/SerializedRMMessage.txt b/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/persistence/SerializedRMMessage.txt new file mode 100644 index 0000000..66e33a4 --- /dev/null +++ b/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/persistence/SerializedRMMessage.txt @@ -0,0 +1,14 @@ + +--uuid:74b6a245-2e17-40eb-a86c-308664e18460 +Content-Type: text/xml; charset=UTF-8 +Content-Transfer-Encoding: binary +Content-ID: <root.mess...@cxf.apache.org> + +<soap:Envelope xmlns:soap="http://schemas.xmlsoap.org/soap/envelope/"><soap:Header><Action xmlns="http://schemas.xmlsoap.org/ws/2004/08/addressing">http://cxf.apache.org/hello_world_soap_http/Greeter/greetMeOneWayRequest</Action><MessageID xmlns="http://schemas.xmlsoap.org/ws/2004/08/addressing">urn:uuid:9a29d476-d1c6-4cc2-b8cf-76de0cf1d4c7</MessageID><To xmlns="http://schemas.xmlsoap.org/ws/2004/08/addressing">http://localhost:8999/SoapContext/GreeterPort</To></soap:Header><soap:Body><greetMeOneWay xmlns="http://cxf.apache.org/hello_world_soap_http/types"><requestType>Chris</requestType></greetMeOneWay></soap:Body></soap:Envelope> +--uuid:74b6a245-2e17-40eb-a86c-308664e18460 +Content-Type: text/plain +Content-Transfer-Encoding: binary +Content-ID: <test.xml> + +Hello World! +--uuid:74b6a245-2e17-40eb-a86c-308664e18460-- \ No newline at end of file http://git-wip-us.apache.org/repos/asf/cxf/blob/62d67aa2/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/persistence/jdbc/RMTxStoreTestBase.java ---------------------------------------------------------------------- diff --git a/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/persistence/jdbc/RMTxStoreTestBase.java b/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/persistence/jdbc/RMTxStoreTestBase.java index 306e3b0..fb62f34 100644 --- a/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/persistence/jdbc/RMTxStoreTestBase.java +++ b/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/persistence/jdbc/RMTxStoreTestBase.java @@ -224,9 +224,7 @@ public abstract class RMTxStoreTestBase extends Assert { byte[] bytes = new byte[89]; EasyMock.expect(msg1.getContent()).andReturn(new ByteArrayInputStream(bytes)).anyTimes(); EasyMock.expect(msg2.getContent()).andReturn(new ByteArrayInputStream(bytes)).anyTimes(); - EasyMock.expect(msg1.getAttachments()).andReturn(new ArrayList<InputStream>()).anyTimes(); - EasyMock.expect(msg2.getAttachments()).andReturn(new ArrayList<InputStream>()).anyTimes(); - + EasyMock.expect(msg1.getContentType()).andReturn("text/xml").times(1); control.replay(); Connection con = getConnection(); @@ -264,9 +262,7 @@ public abstract class RMTxStoreTestBase extends Assert { EasyMock.expect(msg2.getMessageNumber()).andReturn(TEN).anyTimes(); EasyMock.expect(msg1.getContent()).andReturn(new ByteArrayInputStream(bytes)).anyTimes(); EasyMock.expect(msg2.getContent()).andReturn(new ByteArrayInputStream(bytes)).anyTimes(); - EasyMock.expect(msg1.getAttachments()).andReturn(new ArrayList<InputStream>()).anyTimes(); - EasyMock.expect(msg2.getAttachments()).andReturn(new ArrayList<InputStream>()).anyTimes(); - + control.replay(); con = getConnection(); try { @@ -863,7 +859,8 @@ public abstract class RMTxStoreTestBase extends Assert { RMMessage msg = control.createMock(RMMessage.class); EasyMock.expect(msg.getMessageNumber()).andReturn(mn).anyTimes(); EasyMock.expect(msg.getTo()).andReturn(to).anyTimes(); - EasyMock.expect(msg.getAttachments()).andReturn(new ArrayList<InputStream>()).anyTimes(); + + EasyMock.expect(msg.getContentType()).andReturn("text/xml").anyTimes(); byte[] value = ("Message " + mn.longValue()).getBytes(); EasyMock.expect(msg.getContent()).andReturn(new ByteArrayInputStream(value)).anyTimes(); return msg;