Author: dkulp
Date: Fri Aug 17 17:05:17 2012
New Revision: 1374362
URL: http://svn.apache.org/viewvc?rev=1374362&view=rev
Log:
When using multicast with async callbacks, allow multiple responses.
Modified:
cxf/trunk/rt/transports/udp/src/main/java/org/apache/cxf/transport/udp/UDPConduit.java
cxf/trunk/rt/ws/addr/src/main/java/org/apache/cxf/ws/addressing/soap/MAPCodec.java
Modified:
cxf/trunk/rt/transports/udp/src/main/java/org/apache/cxf/transport/udp/UDPConduit.java
URL:
http://svn.apache.org/viewvc/cxf/trunk/rt/transports/udp/src/main/java/org/apache/cxf/transport/udp/UDPConduit.java?rev=1374362&r1=1374361&r2=1374362&view=diff
==============================================================================
---
cxf/trunk/rt/transports/udp/src/main/java/org/apache/cxf/transport/udp/UDPConduit.java
(original)
+++
cxf/trunk/rt/transports/udp/src/main/java/org/apache/cxf/transport/udp/UDPConduit.java
Fri Aug 17 17:05:17 2012
@@ -58,6 +58,7 @@ import org.apache.mina.transport.socket.
*/
public class UDPConduit extends AbstractConduit {
private static final String CXF_MESSAGE_ATTR = "CXFMessage";
+ private static final String MULTI_RESPONSE_TIMEOUT =
"udp.multi.response.timeout";
private static final String HOST_PORT = UDPConduit.class + ".host:port";
private static final Logger LOG =
LogUtils.getL7dLogger(UDPDestination.class);
@@ -103,8 +104,10 @@ public class UDPConduit extends Abstract
});
} else {
incomingObserver.onMessage(inMessage);
+ if (!message.getExchange().isSynchronous()) {
+ message.getExchange().setInMessage(null);
+ }
}
-
} else {
IoSessionInputStream ins =
message.getExchange().getInMessage().get(IoSessionInputStream.class);
ins.setBuffer((IoBuffer)buf);
@@ -226,7 +229,7 @@ public class UDPConduit extends Abstract
socket.setReceiveBufferSize(64 * 1024);
socket.setBroadcast(true);
- if (multicast != null) {
+ if (multicast == null) {
Enumeration<NetworkInterface> interfaces =
NetworkInterface.getNetworkInterfaces();
while (interfaces.hasMoreElements()) {
NetworkInterface networkInterface =
interfaces.nextElement();
@@ -267,9 +270,26 @@ public class UDPConduit extends Abstract
if (!message.getExchange().isOneWay()) {
byte bytes[] = new byte[64 * 1024];
DatagramPacket p = new DatagramPacket(bytes, bytes.length);
- socket.setSoTimeout(30000);
- socket.receive(p);
- dataReceived(message, IoBuffer.wrap(bytes, 0, p.getLength()),
false);
+ Integer i =
(Integer)message.getContextualProperty(MULTI_RESPONSE_TIMEOUT);
+ if (i == null) {
+ socket.setSoTimeout(30000);
+ socket.receive(p);
+ dataReceived(message, IoBuffer.wrap(bytes, 0,
p.getLength()), false);
+ } else {
+ socket.setSoTimeout(i);
+ boolean found = false;
+ try {
+ while (true) {
+ socket.receive(p);
+ dataReceived(message, IoBuffer.wrap(bytes, 0,
p.getLength()), false);
+ found = true;
+ }
+ } catch (java.net.SocketTimeoutException ex) {
+ if (!found) {
+ throw ex;
+ }
+ }
+ }
}
socket.close();
}
Modified:
cxf/trunk/rt/ws/addr/src/main/java/org/apache/cxf/ws/addressing/soap/MAPCodec.java
URL:
http://svn.apache.org/viewvc/cxf/trunk/rt/ws/addr/src/main/java/org/apache/cxf/ws/addressing/soap/MAPCodec.java?rev=1374362&r1=1374361&r2=1374362&view=diff
==============================================================================
---
cxf/trunk/rt/ws/addr/src/main/java/org/apache/cxf/ws/addressing/soap/MAPCodec.java
(original)
+++
cxf/trunk/rt/ws/addr/src/main/java/org/apache/cxf/ws/addressing/soap/MAPCodec.java
Fri Aug 17 17:05:17 2012
@@ -803,8 +803,14 @@ public class MAPCodec extends AbstractSo
} else if (!MessageUtils.getContextualBoolean(message,
"org.apache.cxf.ws.addressing.MAPAggregator.addressingDisabled",
false)) {
- LOG.log(Level.WARNING, "CORRELATION_FAILURE_MSG");
- message.getInterceptorChain().abort();
+ //see if it can directly be correlated with the out
message:
+ AddressingProperties outp =
ContextUtils.retrieveMAPs(message.getExchange().getOutMessage(),
+
false, true, false);
+ if (outp == null
+ ||
!outp.getMessageID().getValue().equals(maps.getRelatesTo().getValue())) {
+ LOG.log(Level.WARNING, "CORRELATION_FAILURE_MSG");
+ message.getInterceptorChain().abort();
+ }
}
}
} else if (maps == null && isRequestor(message)) {