Author: dkulp Date: Wed Sep 3 11:41:24 2008 New Revision: 691721 URL: http://svn.apache.org/viewvc?rev=691721&view=rev Log: Merged revisions 691357 via svnmerge from https://svn.apache.org/repos/asf/cxf/branches/2.1.x-fixes
................ r691357 | dkulp | 2008-09-02 15:58:45 -0400 (Tue, 02 Sep 2008) | 10 lines Merged revisions 691355 via svnmerge from https://svn.apache.org/repos/asf/cxf/trunk ........ r691355 | dkulp | 2008-09-02 15:56:42 -0400 (Tue, 02 Sep 2008) | 4 lines [CXF-1778] Fix memory leak with WS-Addressing turned on on the client, but server doesn't respond with addressing information. Also reduce memory usage by clearing stuff in the messages sooner. ........ ................ Modified: cxf/branches/2.0.x-fixes/ (props changed) cxf/branches/2.0.x-fixes/api/src/main/java/org/apache/cxf/io/CachedOutputStream.java cxf/branches/2.0.x-fixes/rt/core/src/main/java/org/apache/cxf/interceptor/LoggingOutInterceptor.java cxf/branches/2.0.x-fixes/rt/core/src/main/java/org/apache/cxf/interceptor/StaxOutInterceptor.java cxf/branches/2.0.x-fixes/rt/core/src/main/java/org/apache/cxf/io/CacheAndWriteOutputStream.java cxf/branches/2.0.x-fixes/rt/transports/http/src/main/java/org/apache/cxf/transport/http/HTTPConduit.java cxf/branches/2.0.x-fixes/rt/ws/addr/src/main/java/org/apache/cxf/ws/addressing/MAPAggregator.java cxf/branches/2.0.x-fixes/rt/ws/addr/src/main/java/org/apache/cxf/ws/addressing/soap/MAPCodec.java cxf/branches/2.0.x-fixes/rt/ws/addr/src/main/java/org/apache/cxf/ws/addressing/soap/Messages.properties cxf/branches/2.0.x-fixes/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/WrappedEndpoint.java Propchange: cxf/branches/2.0.x-fixes/ ------------------------------------------------------------------------------ --- svn:mergeinfo (original) +++ svn:mergeinfo Wed Sep 3 11:41:24 2008 @@ -1,3 +1,3 @@ -/cxf/branches/2.1.x-fixes:673548,674485,674547,674551,674562,674601,674649,674764,674887,675644,675653,677048,677385,678004,678009,678559,678629,678808,678852,678891,678893,679248,679597,680435,681060,681165,681813,681816,682902,682951,683089,683290,683318,684099,684790-684793,684842,684862,684895-684918,685205,685253,686237,686283,686299,686333-686364,686765,686827,687097,687464-687465,689109,689112,689122,691316 -/cxf/trunk:651669-686342,686344-686363,686764,686820,687096,687387,687463,688086,688102,688735,691271 +/cxf/branches/2.1.x-fixes:673548,674485,674547,674551,674562,674601,674649,674764,674887,675644,675653,677048,677385,678004,678009,678559,678629,678808,678852,678891,678893,679248,679597,680435,681060,681165,681813,681816,682902,682951,683089,683290,683318,684099,684790-684793,684842,684862,684895-684918,685205,685253,686237,686283,686299,686333-686364,686765,686827,687097,687464-687465,689109,689112,689122,691316,691357 +/cxf/trunk:651669-686342,686344-686363,686764,686820,687096,687387,687463,688086,688102,688735,691271,691355 /incubator/cxf/trunk:434594-651668 Propchange: cxf/branches/2.0.x-fixes/ ------------------------------------------------------------------------------ --- svnmerge-integrated (original) +++ svnmerge-integrated Wed Sep 3 11:41:24 2008 @@ -1 +1 @@ -/cxf/branches/2.1.x-fixes:1-686313,686315-686332,686334-686346,686348-686828,687097,687464-687465,689109,689112,689122,691316 +/cxf/branches/2.1.x-fixes:1-686313,686315-686332,686334-686346,686348-686828,687097,687464-687465,689109,689112,689122,691316,691357 Modified: cxf/branches/2.0.x-fixes/api/src/main/java/org/apache/cxf/io/CachedOutputStream.java URL: http://svn.apache.org/viewvc/cxf/branches/2.0.x-fixes/api/src/main/java/org/apache/cxf/io/CachedOutputStream.java?rev=691721&r1=691720&r2=691721&view=diff ============================================================================== --- cxf/branches/2.0.x-fixes/api/src/main/java/org/apache/cxf/io/CachedOutputStream.java (original) +++ cxf/branches/2.0.x-fixes/api/src/main/java/org/apache/cxf/io/CachedOutputStream.java Wed Sep 3 11:41:24 2008 @@ -186,6 +186,10 @@ * @throws IOException */ public void resetOut(OutputStream out, boolean copyOldContent) throws IOException { + if (out == null) { + out = new ByteArrayOutputStream(); + } + if (currentStream instanceof CachedOutputStream) { CachedOutputStream ac = (CachedOutputStream) currentStream; InputStream in = ac.getInputStream(); @@ -210,6 +214,7 @@ if (copyOldContent) { IOUtils.copyAndCloseInput(fin, out); } + streamList.remove(currentStream); tempFile.delete(); tempFile = null; inmem = true; Modified: cxf/branches/2.0.x-fixes/rt/core/src/main/java/org/apache/cxf/interceptor/LoggingOutInterceptor.java URL: http://svn.apache.org/viewvc/cxf/branches/2.0.x-fixes/rt/core/src/main/java/org/apache/cxf/interceptor/LoggingOutInterceptor.java?rev=691721&r1=691720&r2=691721&view=diff ============================================================================== --- cxf/branches/2.0.x-fixes/rt/core/src/main/java/org/apache/cxf/interceptor/LoggingOutInterceptor.java (original) +++ cxf/branches/2.0.x-fixes/rt/core/src/main/java/org/apache/cxf/interceptor/LoggingOutInterceptor.java Wed Sep 3 11:41:24 2008 @@ -74,16 +74,18 @@ // Write the output while caching it for the log message final CacheAndWriteOutputStream newOut = new CacheAndWriteOutputStream(os); message.setContent(OutputStream.class, newOut); - newOut.registerCallback(new LoggingCallback(message)); + newOut.registerCallback(new LoggingCallback(message, os)); } } class LoggingCallback implements CachedOutputStreamCallback { private final Message message; + private final OutputStream origStream; - public LoggingCallback(final Message msg) { + public LoggingCallback(final Message msg, final OutputStream os) { this.message = msg; + this.origStream = os; } public void onFlush(CachedOutputStream cos) { @@ -128,6 +130,15 @@ } else if (LOG.isLoggable(Level.INFO)) { LOG.info(buffer.toString()); } + try { + //empty out the cache + cos.lockOutputStream(); + cos.resetOut(null, false); + } catch (Exception ex) { + //ignore + } + message.setContent(OutputStream.class, + origStream); } } } Modified: cxf/branches/2.0.x-fixes/rt/core/src/main/java/org/apache/cxf/interceptor/StaxOutInterceptor.java URL: http://svn.apache.org/viewvc/cxf/branches/2.0.x-fixes/rt/core/src/main/java/org/apache/cxf/interceptor/StaxOutInterceptor.java?rev=691721&r1=691720&r2=691721&view=diff ============================================================================== --- cxf/branches/2.0.x-fixes/rt/core/src/main/java/org/apache/cxf/interceptor/StaxOutInterceptor.java (original) +++ cxf/branches/2.0.x-fixes/rt/core/src/main/java/org/apache/cxf/interceptor/StaxOutInterceptor.java Wed Sep 3 11:41:24 2008 @@ -137,6 +137,7 @@ xtw.writeEndDocument(); xtw.close(); } + message.removeContent(XMLStreamWriter.class); } catch (XMLStreamException e) { throw new Fault(new org.apache.cxf.common.i18n.Message("STAX_WRITE_EXC", BUNDLE), e); } Modified: cxf/branches/2.0.x-fixes/rt/core/src/main/java/org/apache/cxf/io/CacheAndWriteOutputStream.java URL: http://svn.apache.org/viewvc/cxf/branches/2.0.x-fixes/rt/core/src/main/java/org/apache/cxf/io/CacheAndWriteOutputStream.java?rev=691721&r1=691720&r2=691721&view=diff ============================================================================== --- cxf/branches/2.0.x-fixes/rt/core/src/main/java/org/apache/cxf/io/CacheAndWriteOutputStream.java (original) +++ cxf/branches/2.0.x-fixes/rt/core/src/main/java/org/apache/cxf/io/CacheAndWriteOutputStream.java Wed Sep 3 11:41:24 2008 @@ -47,6 +47,10 @@ flowThroughStream.close(); } + public OutputStream getFlowThroughStream() { + return flowThroughStream; + } + @Override protected void onWrite() throws IOException { Modified: cxf/branches/2.0.x-fixes/rt/transports/http/src/main/java/org/apache/cxf/transport/http/HTTPConduit.java URL: http://svn.apache.org/viewvc/cxf/branches/2.0.x-fixes/rt/transports/http/src/main/java/org/apache/cxf/transport/http/HTTPConduit.java?rev=691721&r1=691720&r2=691721&view=diff ============================================================================== --- cxf/branches/2.0.x-fixes/rt/transports/http/src/main/java/org/apache/cxf/transport/http/HTTPConduit.java (original) +++ cxf/branches/2.0.x-fixes/rt/transports/http/src/main/java/org/apache/cxf/transport/http/HTTPConduit.java Wed Sep 3 11:41:24 2008 @@ -1827,7 +1827,7 @@ try { handleResponse(); } finally { - if (cachingForRetransmission) { + if (cachingForRetransmission && cachedStream != null) { cachedStream.close(); } } @@ -1937,6 +1937,14 @@ connection.getInputStream().close(); return; } + } else { + //not going to be resending or anything, clear out the stuff in the out message + //to free memory + outMessage.removeContent(OutputStream.class); + if (cachingForRetransmission) { + cachedStream.close(); + } + cachedStream = null; } Message inMessage = new MessageImpl(); @@ -1989,6 +1997,7 @@ } inMessage.setContent(InputStream.class, in); + incomingObserver.onMessage(inMessage); } } Modified: cxf/branches/2.0.x-fixes/rt/ws/addr/src/main/java/org/apache/cxf/ws/addressing/MAPAggregator.java URL: http://svn.apache.org/viewvc/cxf/branches/2.0.x-fixes/rt/ws/addr/src/main/java/org/apache/cxf/ws/addressing/MAPAggregator.java?rev=691721&r1=691720&r2=691721&view=diff ============================================================================== --- cxf/branches/2.0.x-fixes/rt/ws/addr/src/main/java/org/apache/cxf/ws/addressing/MAPAggregator.java (original) +++ cxf/branches/2.0.x-fixes/rt/ws/addr/src/main/java/org/apache/cxf/ws/addressing/MAPAggregator.java Wed Sep 3 11:41:24 2008 @@ -60,6 +60,7 @@ * Properties for outgoing messages. */ public class MAPAggregator extends AbstractPhaseInterceptor<Message> { + public static final String USING_ADDRESSING = MAPAggregator.class.getName() + ".usingAddressing"; private static final Logger LOG = LogUtils.getL7dLogger(MAPAggregator.class); @@ -74,11 +75,6 @@ protected final Map<String, String> messageIDs = new ConcurrentHashMap<String, String>(); - /** - * Whether the endpoint supports WS-Addressing. - */ - - private final Map<Endpoint, Boolean> usingAddressing = new ConcurrentHashMap<Endpoint, Boolean>(); private boolean usingAddressingAdvisory = true; private boolean allowDuplicates = true; @@ -183,7 +179,7 @@ boolean ret = false; Endpoint endpoint = message.getExchange().get(Endpoint.class); if (null != endpoint) { - Boolean b = usingAddressing.get(endpoint); + Boolean b = (Boolean)endpoint.get(USING_ADDRESSING); if (null == b) { EndpointInfo endpointInfo = endpoint.getEndpointInfo(); List<ExtensibilityElement> endpointExts = endpointInfo != null ? endpointInfo @@ -197,7 +193,7 @@ ret = hasUsingAddressing(endpointExts) || hasUsingAddressing(bindingExts) || hasUsingAddressing(serviceExts); b = ret ? Boolean.TRUE : Boolean.FALSE; - usingAddressing.put(endpoint, b); + endpoint.put(USING_ADDRESSING, b); } else { ret = b.booleanValue(); } Modified: cxf/branches/2.0.x-fixes/rt/ws/addr/src/main/java/org/apache/cxf/ws/addressing/soap/MAPCodec.java URL: http://svn.apache.org/viewvc/cxf/branches/2.0.x-fixes/rt/ws/addr/src/main/java/org/apache/cxf/ws/addressing/soap/MAPCodec.java?rev=691721&r1=691720&r2=691721&view=diff ============================================================================== --- cxf/branches/2.0.x-fixes/rt/ws/addr/src/main/java/org/apache/cxf/ws/addressing/soap/MAPCodec.java (original) +++ cxf/branches/2.0.x-fixes/rt/ws/addr/src/main/java/org/apache/cxf/ws/addressing/soap/MAPCodec.java Wed Sep 3 11:41:24 2008 @@ -19,13 +19,12 @@ package org.apache.cxf.ws.addressing.soap; -import java.util.Collections; -import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; import java.util.logging.Level; import java.util.logging.Logger; @@ -80,8 +79,8 @@ * REVISIT: map usage that the *same* interceptor instance * is used in all chains. */ - protected final Map<String, Exchange> uncorrelatedExchanges = - Collections.synchronizedMap(new HashMap<String, Exchange>()); + protected final Map<String, Exchange> uncorrelatedExchanges + = new ConcurrentHashMap<String, Exchange>(); private VersionTransformer transformer; private HeaderFactory headerFactory; @@ -734,7 +733,18 @@ LOG.log(Level.WARNING, "CORRELATION_FAILURE_MSG"); message.getInterceptorChain().abort(); } + } else if (maps == null && isRequestor(message)) { + Message m = message.getExchange().getOutMessage(); + maps = ContextUtils.retrieveMAPs(m, false, true, false); + if (maps != null) { + Exchange ex = uncorrelatedExchanges.get(maps.getMessageID().getValue()); + if (ex == message.getExchange()) { + uncorrelatedExchanges.remove(maps.getMessageID().getValue()); + LOG.log(Level.WARNING, "RESPONSE_NOT_USING_WSADDRESSING"); + } + } } + } /** Modified: cxf/branches/2.0.x-fixes/rt/ws/addr/src/main/java/org/apache/cxf/ws/addressing/soap/Messages.properties URL: http://svn.apache.org/viewvc/cxf/branches/2.0.x-fixes/rt/ws/addr/src/main/java/org/apache/cxf/ws/addressing/soap/Messages.properties?rev=691721&r1=691720&r2=691721&view=diff ============================================================================== --- cxf/branches/2.0.x-fixes/rt/ws/addr/src/main/java/org/apache/cxf/ws/addressing/soap/Messages.properties (original) +++ cxf/branches/2.0.x-fixes/rt/ws/addr/src/main/java/org/apache/cxf/ws/addressing/soap/Messages.properties Wed Sep 3 11:41:24 2008 @@ -23,3 +23,4 @@ UNSUPPORTED_VERSION_MSG = Unsupported WS-Addressing version {0} IGNORE_NON_ELEMENT_REF_PARAM_MSG = Ignoring reference parameter {0} because it is not a JAXBElement CORRELATION_FAILURE_MSG = Failed to correlate message, aborting dispatch. +RESPONSE_NOT_USING_WSADDRESSING = Response message does not contain WS-Addressing properties. Not correlating response. Modified: cxf/branches/2.0.x-fixes/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/WrappedEndpoint.java URL: http://svn.apache.org/viewvc/cxf/branches/2.0.x-fixes/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/WrappedEndpoint.java?rev=691721&r1=691720&r2=691721&view=diff ============================================================================== --- cxf/branches/2.0.x-fixes/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/WrappedEndpoint.java (original) +++ cxf/branches/2.0.x-fixes/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/WrappedEndpoint.java Wed Sep 3 11:41:24 2008 @@ -32,12 +32,14 @@ import org.apache.cxf.service.Service; import org.apache.cxf.service.model.EndpointInfo; import org.apache.cxf.transport.MessageObserver; +import org.apache.cxf.ws.addressing.MAPAggregator; public class WrappedEndpoint implements Endpoint { private Endpoint wrappedEndpoint; private EndpointInfo endpointInfo; private Service service; + private Boolean usingAddressing; WrappedEndpoint(Endpoint wrapped, EndpointInfo info, Service s) { wrappedEndpoint = wrapped; @@ -118,6 +120,9 @@ } public Object get(Object key) { + if (MAPAggregator.USING_ADDRESSING == key) { + return usingAddressing; + } return wrappedEndpoint.get(key); } @@ -130,6 +135,10 @@ } public Object put(String key, Object value) { + if (MAPAggregator.USING_ADDRESSING == key) { + usingAddressing = (Boolean)value; + return null; + } return wrappedEndpoint.put(key, value); }
