Author: dkulp
Date: Thu Aug 16 14:14:09 2012
New Revision: 1373849
URL: http://svn.apache.org/viewvc?rev=1373849&view=rev
Log:
Update UDP transport ot properly mark end of streams so it works with the
Logging interceptors
Added:
cxf/trunk/rt/transports/udp/src/main/java/org/apache/cxf/transport/udp/IoSessionOutputStream.java
Modified:
cxf/trunk/rt/transports/udp/src/main/java/org/apache/cxf/transport/udp/IoSessionInputStream.java
cxf/trunk/rt/transports/udp/src/main/java/org/apache/cxf/transport/udp/UDPConduit.java
cxf/trunk/rt/transports/udp/src/main/java/org/apache/cxf/transport/udp/UDPDestination.java
Modified:
cxf/trunk/rt/transports/udp/src/main/java/org/apache/cxf/transport/udp/IoSessionInputStream.java
URL:
http://svn.apache.org/viewvc/cxf/trunk/rt/transports/udp/src/main/java/org/apache/cxf/transport/udp/IoSessionInputStream.java?rev=1373849&r1=1373848&r2=1373849&view=diff
==============================================================================
---
cxf/trunk/rt/transports/udp/src/main/java/org/apache/cxf/transport/udp/IoSessionInputStream.java
(original)
+++
cxf/trunk/rt/transports/udp/src/main/java/org/apache/cxf/transport/udp/IoSessionInputStream.java
Thu Aug 16 14:14:09 2012
@@ -25,143 +25,86 @@ import java.io.InputStream;
import org.apache.mina.core.buffer.IoBuffer;
-// Copies almost ver-batim from Mina due to the version in Mina not being
public
public class IoSessionInputStream extends InputStream {
- private final Object mutex = new Object();
- private final IoBuffer buf;
- private volatile boolean closed;
- private volatile boolean released;
- private IOException exception;
+ private volatile IoBuffer buf;
+ private volatile IOException exception;
+ public IoSessionInputStream(IoBuffer b) {
+ buf = IoBuffer.allocate(b.limit());
+ buf.put(b);
+ buf.flip();
+ }
public IoSessionInputStream() {
- buf = IoBuffer.allocate(2048);
- buf.setAutoExpand(true);
- buf.limit(0);
+ buf = null;
}
@Override
- public int available() {
- if (released) {
- return 0;
+ public int available() throws IOException {
+ if (exception != null) {
+ throw exception;
}
-
- synchronized (mutex) {
- return buf.remaining();
+ if (buf == null) {
+ return 0;
}
+ return buf.remaining();
}
@Override
- public void close() {
- if (closed) {
- return;
- }
-
- synchronized (mutex) {
- closed = true;
- releaseBuffer();
-
- mutex.notifyAll();
+ public void close() throws IOException {
+ if (exception != null) {
+ throw exception;
}
}
@Override
public int read() throws IOException {
- synchronized (mutex) {
- if (!waitForData()) {
- return -1;
- }
-
- return buf.get() & 0xff;
+ waitForData();
+ if (exception != null) {
+ throw exception;
}
+ return buf.get() & 0xff;
}
- @Override
- public int read(byte[] b, int off, int len) throws IOException {
- synchronized (mutex) {
- if (!waitForData()) {
- return -1;
- }
-
- int readBytes;
-
- if (len > buf.remaining()) {
- readBytes = buf.remaining();
- } else {
- readBytes = len;
+ public synchronized void waitForData() throws IOException {
+ if (exception == null && buf == null) {
+ try {
+ wait();
+ } catch (InterruptedException e) {
+ throw new IOException();
}
-
- buf.get(b, off, readBytes);
-
- return readBytes;
}
}
-
- private boolean waitForData() throws IOException {
- if (released) {
- return false;
- }
-
- synchronized (mutex) {
- while (!released && buf.remaining() == 0 && exception == null) {
- try {
- mutex.wait();
- } catch (InterruptedException e) {
- IOException ioe = new IOException(
- "Interrupted while waiting for more data");
- ioe.initCause(e);
- throw ioe;
- }
- }
- }
-
+
+ @Override
+ public int read(byte[] b, int off, int len) throws IOException {
+ waitForData();
if (exception != null) {
- releaseBuffer();
throw exception;
}
-
- if (closed && buf.remaining() == 0) {
- releaseBuffer();
-
- return false;
+ if (buf.remaining() == 0) {
+ return -1;
}
-
- return true;
- }
-
- private void releaseBuffer() {
- if (released) {
- return;
+ int readBytes;
+ if (len > buf.remaining()) {
+ readBytes = buf.remaining();
+ } else {
+ readBytes = len;
}
-
- released = true;
+ buf.get(b, off, readBytes);
+ return readBytes;
}
- public void write(IoBuffer src) {
- synchronized (mutex) {
- if (closed) {
- return;
- }
-
- if (buf.hasRemaining()) {
- this.buf.compact();
- this.buf.put(src);
- this.buf.flip();
- } else {
- this.buf.clear();
- this.buf.put(src);
- this.buf.flip();
- mutex.notifyAll();
- }
+ public synchronized void throwException(IOException e) {
+ if (exception == null) {
+ exception = e;
}
+ notifyAll();
}
- public void throwException(IOException e) {
- synchronized (mutex) {
- if (exception == null) {
- exception = e;
-
- mutex.notifyAll();
- }
- }
+ public synchronized void setBuffer(IoBuffer b) {
+ buf = IoBuffer.allocate(b.limit());
+ buf.put(b);
+ buf.flip();
+ notifyAll();
}
}
\ No newline at end of file
Added:
cxf/trunk/rt/transports/udp/src/main/java/org/apache/cxf/transport/udp/IoSessionOutputStream.java
URL:
http://svn.apache.org/viewvc/cxf/trunk/rt/transports/udp/src/main/java/org/apache/cxf/transport/udp/IoSessionOutputStream.java?rev=1373849&view=auto
==============================================================================
---
cxf/trunk/rt/transports/udp/src/main/java/org/apache/cxf/transport/udp/IoSessionOutputStream.java
(added)
+++
cxf/trunk/rt/transports/udp/src/main/java/org/apache/cxf/transport/udp/IoSessionOutputStream.java
Thu Aug 16 14:14:09 2012
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.transport.udp;
+
+import java.io.IOException;
+import java.io.OutputStream;
+
+import org.apache.mina.core.buffer.IoBuffer;
+import org.apache.mina.core.future.WriteFuture;
+import org.apache.mina.core.session.IoSession;
+
+class IoSessionOutputStream extends OutputStream {
+ private final IoSession session;
+
+ private WriteFuture lastWriteFuture;
+
+ public IoSessionOutputStream(IoSession session) {
+ this.session = session;
+ }
+
+ @Override
+ public void close() throws IOException {
+ try {
+ flush();
+ } finally {
+ session.close(true).awaitUninterruptibly();
+ }
+ }
+
+ private void checkClosed() throws IOException {
+ if (!session.isConnected()) {
+ throw new IOException("The session has been closed.");
+ }
+ }
+
+ private synchronized void write(IoBuffer buf) throws IOException {
+ checkClosed();
+ WriteFuture future = session.write(buf);
+ lastWriteFuture = future;
+ }
+
+ @Override
+ public void write(byte[] b, int off, int len) throws IOException {
+ write(IoBuffer.wrap(b.clone(), off, len));
+ }
+
+ @Override
+ public void write(int b) throws IOException {
+ IoBuffer buf = IoBuffer.allocate(1);
+ buf.put((byte) b);
+ buf.flip();
+ write(buf);
+ }
+
+ @Override
+ public synchronized void flush() throws IOException {
+ if (lastWriteFuture == null) {
+ return;
+ }
+
+ lastWriteFuture.awaitUninterruptibly();
+ if (!lastWriteFuture.isWritten()) {
+ throw new IOException(
+ "The bytes could not be written to the session");
+ }
+ }
+}
Modified:
cxf/trunk/rt/transports/udp/src/main/java/org/apache/cxf/transport/udp/UDPConduit.java
URL:
http://svn.apache.org/viewvc/cxf/trunk/rt/transports/udp/src/main/java/org/apache/cxf/transport/udp/UDPConduit.java?rev=1373849&r1=1373848&r2=1373849&view=diff
==============================================================================
---
cxf/trunk/rt/transports/udp/src/main/java/org/apache/cxf/transport/udp/UDPConduit.java
(original)
+++
cxf/trunk/rt/transports/udp/src/main/java/org/apache/cxf/transport/udp/UDPConduit.java
Thu Aug 16 14:14:09 2012
@@ -85,8 +85,7 @@ public class UDPConduit extends Abstract
inMessage.setExchange(message.getExchange());
message.getExchange().setInMessage(inMessage);
- IoSessionInputStream ins = new IoSessionInputStream();
- ins.write((IoBuffer)buf);
+ IoSessionInputStream ins = new IoSessionInputStream(buf);
inMessage.setContent(InputStream.class, ins);
inMessage.put(IoSessionInputStream.class, ins);
@@ -107,7 +106,7 @@ public class UDPConduit extends Abstract
} else {
IoSessionInputStream ins =
message.getExchange().getInMessage().get(IoSessionInputStream.class);
- ins.write((IoBuffer)buf);
+ ins.setBuffer((IoBuffer)buf);
}
}
Modified:
cxf/trunk/rt/transports/udp/src/main/java/org/apache/cxf/transport/udp/UDPDestination.java
URL:
http://svn.apache.org/viewvc/cxf/trunk/rt/transports/udp/src/main/java/org/apache/cxf/transport/udp/UDPDestination.java?rev=1373849&r1=1373848&r2=1373849&view=diff
==============================================================================
---
cxf/trunk/rt/transports/udp/src/main/java/org/apache/cxf/transport/udp/UDPDestination.java
(original)
+++
cxf/trunk/rt/transports/udp/src/main/java/org/apache/cxf/transport/udp/UDPDestination.java
Thu Aug 16 14:14:09 2012
@@ -23,6 +23,7 @@ import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.InetSocketAddress;
+import java.net.SocketTimeoutException;
import java.net.URI;
import java.util.logging.Logger;
@@ -41,6 +42,8 @@ import org.apache.cxf.workqueue.WorkQueu
import org.apache.cxf.ws.addressing.EndpointReferenceType;
import org.apache.mina.core.buffer.IoBuffer;
import org.apache.mina.core.service.IoHandler;
+import org.apache.mina.core.session.AttributeKey;
+import org.apache.mina.core.session.IdleStatus;
import org.apache.mina.core.session.IoSession;
import org.apache.mina.handler.stream.StreamIoHandler;
import org.apache.mina.transport.socket.DatagramSessionConfig;
@@ -51,6 +54,8 @@ import org.apache.mina.transport.socket.
*/
public class UDPDestination extends AbstractDestination {
private static final Logger LOG =
LogUtils.getL7dLogger(UDPDestination.class);
+ private static final AttributeKey KEY_IN = new
AttributeKey(StreamIoHandler.class, "in");
+ private static final AttributeKey KEY_OUT = new
AttributeKey(StreamIoHandler.class, "out");
NioDatagramAcceptor acceptor;
AutomaticWorkQueue queue;
@@ -134,11 +139,27 @@ public class UDPDestination extends Abst
class UDPIOHandler extends StreamIoHandler implements IoHandler {
-
+
+
+ @Override
+ public void sessionOpened(IoSession session) {
+ // Set timeouts
+ session.getConfig().setWriteTimeout(getWriteTimeout());
+ session.getConfig().setIdleTime(IdleStatus.READER_IDLE,
getReadTimeout());
+
+ // Create streams
+ InputStream in = new IoSessionInputStream();
+ OutputStream out = new IoSessionOutputStream(session);
+ session.setAttribute(KEY_IN, in);
+ session.setAttribute(KEY_OUT, out);
+ processStreamIo(session, in, out);
+ }
+
protected void processStreamIo(IoSession session, InputStream in,
OutputStream out) {
final MessageImpl m = new MessageImpl();
final Exchange exchange = new ExchangeImpl();
exchange.setDestination(UDPDestination.this);
+ m.setDestination(UDPDestination.this);
exchange.setInMessage(m);
m.setContent(InputStream.class, in);
out = new UDPDestinationOutputStream(out);
@@ -150,6 +171,52 @@ public class UDPDestination extends Abst
});
}
+ public void sessionClosed(IoSession session) throws Exception {
+ final InputStream in = (InputStream) session.getAttribute(KEY_IN);
+ final OutputStream out = (OutputStream)
session.getAttribute(KEY_OUT);
+ try {
+ in.close();
+ } finally {
+ out.close();
+ }
+ }
+
+ public void messageReceived(IoSession session, Object buf) {
+ final IoSessionInputStream in = (IoSessionInputStream) session
+ .getAttribute(KEY_IN);
+ in.setBuffer((IoBuffer) buf);
+ }
+
+ public void exceptionCaught(IoSession session, Throwable cause) {
+ final IoSessionInputStream in = (IoSessionInputStream) session
+ .getAttribute(KEY_IN);
+
+ IOException e = null;
+ if (cause instanceof StreamIoException) {
+ e = (IOException) cause.getCause();
+ } else if (cause instanceof IOException) {
+ e = (IOException) cause;
+ }
+
+ if (e != null && in != null) {
+ in.throwException(e);
+ } else {
+ session.close(true);
+ }
+ }
+ public void sessionIdle(IoSession session, IdleStatus status) {
+ if (status == IdleStatus.READER_IDLE) {
+ throw new StreamIoException(new SocketTimeoutException(
+ "Read timeout"));
+ }
+ }
+ }
+ private static class StreamIoException extends RuntimeException {
+ private static final long serialVersionUID = 3976736960742503222L;
+
+ public StreamIoException(IOException cause) {
+ super(cause);
+ }
}
public class UDPDestinationOutputStream extends OutputStream {