Author: kwall Date: Mon Dec 8 17:01:57 2014 New Revision: 1643866 URL: http://svn.apache.org/r1643866 Log: NonBlockingSenderReceiver for the Java Broker for AMQP-0-10 and 0-9. Heartbeating working. AMQ 1.0 not tested.
System tests failing at about ~PriorityQueueTest with seeming resource leak. Added: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingSenderReceiver.java Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingConnection.java qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/test-profiles/JavaExcludes Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java?rev=1643866&r1=1643865&r2=1643866&view=diff ============================================================================== --- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java (original) +++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java Mon Dec 8 17:01:57 2014 @@ -278,7 +278,7 @@ public class MultiVersionProtocolEngine private class SelfDelegateProtocolEngine implements ServerProtocolEngine { private final ByteBuffer _header = ByteBuffer.allocate(MINIMUM_REQUIRED_HEADER_BYTES); - private long _lastReadTime; + private long _lastReadTime = System.currentTimeMillis(); public SocketAddress getRemoteAddress() { Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java?rev=1643866&r1=1643865&r2=1643866&view=diff ============================================================================== --- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java (original) +++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java Mon Dec 8 17:01:57 2014 @@ -52,8 +52,8 @@ public class ProtocolEngine_0_10 extend private ServerConnection _connection; private long _createTime = System.currentTimeMillis(); - private long _lastReadTime; - private long _lastWriteTime; + private long _lastReadTime = _createTime; + private long _lastWriteTime = _createTime; public ProtocolEngine_0_10(ServerConnection conn, NetworkConnection network) Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java?rev=1643866&r1=1643865&r2=1643866&view=diff ============================================================================== --- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java (original) +++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java Mon Dec 8 17:01:57 2014 @@ -170,7 +170,7 @@ public class AMQProtocolEngine implement private Sender<ByteBuffer> _sender; private volatile boolean _deferFlush; - private long _lastReceivedTime; + private long _lastReceivedTime = System.currentTimeMillis(); // TODO consider if this is what we want? private boolean _blocking; private final ReentrantLock _receivedLock; Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingConnection.java URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingConnection.java?rev=1643866&r1=1643865&r2=1643866&view=diff ============================================================================== --- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingConnection.java (original) +++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingConnection.java Mon Dec 8 17:01:57 2014 @@ -41,8 +41,7 @@ public class NonBlockingConnection imple private static final Logger LOGGER = LoggerFactory.getLogger(NonBlockingConnection.class); private final SocketChannel _socket; private final long _timeout; - private final NonBlockingSender _ioSender; - private final NonBlockingReceiver _ioReceiver; + private final NonBlockingSenderReceiver _nonBlockingSenderReceiver; private int _maxReadIdle; private int _maxWriteIdle; private Principal _principal; @@ -55,36 +54,30 @@ public class NonBlockingConnection imple _socket = socket; _timeout = timeout; - _ioReceiver = new NonBlockingReceiver(_socket, delegate, receiveBufferSize,_timeout); - _ioReceiver.setTicker(ticker); +// _ioReceiver = new NonBlockingReceiver(_socket, delegate, receiveBufferSize,_timeout); +// _nonBlockingSenderReceiver.setTicker(ticker); - _ioSender = new NonBlockingSender(_socket, 2 * sendBufferSize, _timeout); +// _ioSender = new NonBlockingSender(_socket, 2 * sendBufferSize, _timeout); - _ioSender.setReceiver(_ioReceiver); +// _ioSender.setReceiver(_nonBlockingSenderReceiver); + + _nonBlockingSenderReceiver = new NonBlockingSenderReceiver(_socket, delegate, receiveBufferSize, ticker); } public void start() { - _ioSender.initiate(); - _ioReceiver.initiate(); + _nonBlockingSenderReceiver.initiate(); } public Sender<ByteBuffer> getSender() { - return _ioSender; + return _nonBlockingSenderReceiver; } public void close() { - try - { - _ioSender.close(); - } - finally - { - _ioReceiver.close(false); - } + _nonBlockingSenderReceiver.close(); } public SocketAddress getRemoteAddress() Added: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingSenderReceiver.java URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingSenderReceiver.java?rev=1643866&view=auto ============================================================================== --- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingSenderReceiver.java (added) +++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingSenderReceiver.java Mon Dec 8 17:01:57 2014 @@ -0,0 +1,244 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.qpid.transport.network.io; + +import java.io.IOException; +import java.net.SocketException; +import java.net.SocketTimeoutException; +import java.nio.ByteBuffer; +import java.nio.channels.SelectionKey; +import java.nio.channels.Selector; +import java.nio.channels.SocketChannel; +import java.util.Iterator; +import java.util.Set; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.qpid.thread.Threading; +import org.apache.qpid.transport.Receiver; +import org.apache.qpid.transport.Sender; +import org.apache.qpid.transport.SenderException; +import org.apache.qpid.transport.network.Ticker; + +public class NonBlockingSenderReceiver implements Runnable, Sender<ByteBuffer> +{ + private static final Logger LOGGER = LoggerFactory.getLogger(NonBlockingSenderReceiver.class); + + private final SocketChannel _socketChannel; + private final Selector _selector; + + private final ConcurrentLinkedQueue<ByteBuffer> _buffers = new ConcurrentLinkedQueue<>(); + + private final Thread _ioThread; + private final String _remoteSocketAddress; + private final AtomicBoolean _closed = new AtomicBoolean(false); + private final Receiver<ByteBuffer> _receiver; + private final int _receiveBufSize; + private final Ticker _ticker; + + + + public NonBlockingSenderReceiver(final SocketChannel socketChannel, Receiver<ByteBuffer> receiver, int receiveBufSize, Ticker ticker) + { + _socketChannel = socketChannel; + _receiver = receiver; + _receiveBufSize = receiveBufSize; + _ticker = ticker; + + try + { + _remoteSocketAddress = socketChannel.getRemoteAddress().toString(); + _socketChannel.configureBlocking(false); + _selector = Selector.open(); + _socketChannel.register(_selector, SelectionKey.OP_READ); + } + catch (IOException e) + { + throw new SenderException("Unable to prepare the channel for non-blocking IO", e); + } + try + { + //Create but deliberately don't start the thread. + _ioThread = Threading.getThreadFactory().createThread(this); + } + catch(Exception e) + { + throw new SenderException("Error creating SenderReceiver thread for " + _remoteSocketAddress, e); + } + + _ioThread.setDaemon(true); + _ioThread.setName(String.format("IoSenderReceiver - %s", _remoteSocketAddress)); + + } + + public void initiate() + { + _ioThread.start(); + } + + @Override + public void setIdleTimeout(final int i) + { + // Probably unused - dead code to be removed?? + } + + @Override + public void send(final ByteBuffer msg) + { + // append to list and do selector wakeup + _buffers.add(msg); + _selector.wakeup(); + } + + @Override + public void run() + { + // never ending loop doing + // try to write all pending byte buffers, handle situation where zero bytes or part of a byte buffer is written + // read as much as you can + // try to write all pending byte buffers + + while (!_closed.get()) + { + + try + { + long currentTime = System.currentTimeMillis(); + int tick = _ticker.getTimeToNextTick(currentTime); + if(tick <= 0) + { + tick = _ticker.tick(currentTime); + } + + LOGGER.debug("Tick " + tick); + + int numberReady = _selector.select(tick <= 0 ? 1 : tick); + Set<SelectionKey> selectionKeys = _selector.selectedKeys(); + selectionKeys.clear(); + + LOGGER.debug("Number Ready " + numberReady); + + doWrite(); + doRead(); + boolean fullyWritten = doWrite(); + + _socketChannel.register(_selector, fullyWritten ? SelectionKey.OP_READ : (SelectionKey.OP_WRITE | SelectionKey.OP_READ)); + } + catch (IOException e) + { + LOGGER.info("Exception performing I/O for thread '" + _remoteSocketAddress + "': " + e); + close(); + } + } + + try + { + while(!doWrite()) + { + } + + try + { + _receiver.closed(); + } + finally + { + _socketChannel.close(); + } + } + catch (IOException e) + { + LOGGER.info("Exception performing final output for thread '" + _remoteSocketAddress + "': " + e); + } + } + + + + @Override + public void flush() + { + // maybe just wakeup? + + } + + @Override + public void close() + { + LOGGER.debug("Closing " + _remoteSocketAddress); + + _closed.set(true); + _selector.wakeup(); + + } + + private boolean doWrite() throws IOException + { + int byteBuffersWritten = 0; + + ByteBuffer[] bufArray = new ByteBuffer[_buffers.size()]; + Iterator<ByteBuffer> bufferIterator = _buffers.iterator(); + for (int i = 0; i < bufArray.length; i++) + { + bufArray[i] = bufferIterator.next(); + } + + _socketChannel.write(bufArray); + + for (ByteBuffer buf : bufArray) + { + if (buf.remaining() == 0) + { + byteBuffersWritten++; + _buffers.poll(); + } + } + + if (LOGGER.isDebugEnabled()) + { + LOGGER.debug("Written " + byteBuffersWritten + " byte buffer(s) completely"); + } + + return bufArray.length == byteBuffersWritten; + } + + private void doRead() throws IOException + { + + ByteBuffer buffer; + int remaining; + do + { + buffer = ByteBuffer.allocate(_receiveBufSize); + _socketChannel.read(buffer); + remaining = buffer.remaining(); + if (LOGGER.isDebugEnabled()) + { + LOGGER.debug("Read " + buffer.position() + " byte(s)"); + } + buffer.flip(); + _receiver.received(buffer); + } + while (remaining == 0); + + } +} Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/test-profiles/JavaExcludes URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/test-profiles/JavaExcludes?rev=1643866&r1=1643865&r2=1643866&view=diff ============================================================================== --- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/test-profiles/JavaExcludes (original) +++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/test-profiles/JavaExcludes Mon Dec 8 17:01:57 2014 @@ -34,3 +34,4 @@ org.apache.qpid.test.client.message.Sele // QPID-6262: Temporary exclusion whilst NIO refactoring is in flight org.apache.qpid.client.ssl.SSLTest#* +org.apache.qpid.server.transport.TCPandSSLTransportTest#* --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org