This is an automated email from the ASF dual-hosted git repository. cdutz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-plc4x.git
The following commit(s) were added to refs/heads/master by this push: new ee88f24 PLC4X-60 - Fix findings by the last release ee88f24 is described below commit ee88f24642ca465194d7ae33a3cbf14ae3bc9aa4 Author: Christofer Dutz <christofer.d...@c-ware.de> AuthorDate: Wed Sep 26 15:42:53 2018 -0400 PLC4X-60 - Fix findings by the last release - Handled appending things to the NOTICE files as required. --- .../google/src/remote-resources/META-INF/NOTICE | 5 + .../rawsockets/netty/AbstractRawSocketChannel.java | 740 -------------- .../netty/AbstractRawSocketStreamChannel.java | 1042 -------------------- .../utils/rawsockets/netty/RawSocketChannel.java | 133 --- .../rawsockets/netty/RawSocketChannelConfig.java | 187 ---- .../rawsockets/netty/RawSocketChannelSav.java | 186 ---- .../rawsockets/netty/RawSocketEventArray.java | 104 -- .../utils/rawsockets/netty/RawSocketEventLoop.java | 449 --------- .../src/remote-resources/META-INF/NOTICE | 7 + pom.xml | 5 + 10 files changed, 17 insertions(+), 2841 deletions(-) diff --git a/examples/google/src/remote-resources/META-INF/NOTICE b/examples/google/src/remote-resources/META-INF/NOTICE new file mode 100644 index 0000000..942dc88 --- /dev/null +++ b/examples/google/src/remote-resources/META-INF/NOTICE @@ -0,0 +1,5 @@ +=============================================================== + +The File: +src/main/java/org/apache/plc4x/java/examples/google/iotcore/MqttExampleOptions.java +Is copyright by Google and is distributed under the Apache 2.0 License \ No newline at end of file diff --git a/plc4j/utils/raw-sockets/src/main/java-sav/org/apache/plc4x/java/utils/rawsockets/netty/AbstractRawSocketChannel.java b/plc4j/utils/raw-sockets/src/main/java-sav/org/apache/plc4x/java/utils/rawsockets/netty/AbstractRawSocketChannel.java deleted file mode 100644 index a8762ad..0000000 --- a/plc4j/utils/raw-sockets/src/main/java-sav/org/apache/plc4x/java/utils/rawsockets/netty/AbstractRawSocketChannel.java +++ /dev/null @@ -1,740 +0,0 @@ -/* - * Copyright 2014 The Netty Project - * - * The Netty Project licenses this file to you under the Apache License, - * version 2.0 (the "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations - * under the License. - */ -package org.apache.plc4x.java.utils.rawsockets.netty; - -import io.netty.buffer.ByteBuf; -import io.netty.buffer.ByteBufAllocator; -import io.netty.buffer.ByteBufUtil; -import io.netty.buffer.Unpooled; -import io.netty.channel.*; -import io.netty.channel.Channel; -import io.netty.channel.socket.ChannelInputShutdownEvent; -import io.netty.channel.socket.ChannelInputShutdownReadComplete; -import io.netty.channel.unix.FileDescriptor; -import io.netty.channel.unix.Socket; -import io.netty.channel.unix.UnixChannel; -import io.netty.util.ReferenceCountUtil; -import io.netty.util.internal.ThrowableUtil; - -import java.io.IOException; -import java.net.InetSocketAddress; -import java.net.SocketAddress; -import java.nio.ByteBuffer; -import java.nio.channels.*; -import java.util.concurrent.ScheduledFuture; -import java.util.concurrent.TimeUnit; - -import static io.netty.channel.internal.ChannelUtils.WRITE_STATUS_SNDBUF_FULL; -import static io.netty.channel.unix.UnixChannelUtil.computeRemoteAddr; -import static io.netty.util.internal.ObjectUtil.checkNotNull; - -abstract class AbstractRawSocketChannel extends AbstractChannel implements Channel { - private static final ClosedChannelException DO_CLOSE_CLOSED_CHANNEL_EXCEPTION = ThrowableUtil.unknownStackTrace( - new ClosedChannelException(), AbstractRawSocketChannel.class, "doClose()"); - private static final ChannelMetadata METADATA = new ChannelMetadata(false); - private final int readFlag; - final LinuxSocket socket; - /** - * The future of the current connection attempt. If not null, subsequent - * connection attempts will fail. - */ - private ChannelPromise connectPromise; - private ScheduledFuture<?> connectTimeoutFuture; - private SocketAddress requestedRemoteAddress; - - private volatile SocketAddress local; - private volatile SocketAddress remote; - - protected int flags = Native.EPOLLET; - boolean inputClosedSeenErrorOnRead; - boolean epollInReadyRunnablePending; - - protected volatile boolean active; - - AbstractRawSocketChannel(LinuxSocket fd, int flag) { - this(null, fd, flag, false); - } - - AbstractRawSocketChannel(Channel parent, LinuxSocket fd, int flag, boolean active) { - super(parent); - socket = checkNotNull(fd, "fd"); - readFlag = flag; - flags |= flag; - this.active = active; - if (active) { - // Directly cache the remote and local addresses - // See https://github.com/netty/netty/issues/2359 - local = fd.localAddress(); - remote = fd.remoteAddress(); - } - } - - AbstractRawSocketChannel(Channel parent, LinuxSocket fd, int flag, SocketAddress remote) { - super(parent); - socket = checkNotNull(fd, "fd"); - readFlag = flag; - flags |= flag; - active = true; - // Directly cache the remote and local addresses - // See https://github.com/netty/netty/issues/2359 - this.remote = remote; - local = fd.localAddress(); - } - - static boolean isSoErrorZero(Socket fd) { - try { - return fd.getSoError() == 0; - } catch (IOException e) { - throw new ChannelException(e); - } - } - - void setFlag(int flag) throws IOException { - if (!isFlagSet(flag)) { - flags |= flag; - modifyEvents(); - } - } - - void clearFlag(int flag) throws IOException { - if (isFlagSet(flag)) { - flags &= ~flag; - modifyEvents(); - } - } - - boolean isFlagSet(int flag) { - return (flags & flag) != 0; - } - - @Override - public final FileDescriptor fd() { - return socket; - } - - @Override - public abstract RawSocketChannelConfig config(); - - @Override - public boolean isActive() { - return active; - } - - @Override - public ChannelMetadata metadata() { - return METADATA; - } - - @Override - protected void doClose() throws Exception { - active = false; - // Even if we allow half closed sockets we should give up on reading. Otherwise we may allow a read attempt on a - // socket which has not even been connected yet. This has been observed to block during unit tests. - inputClosedSeenErrorOnRead = true; - try { - ChannelPromise promise = connectPromise; - if (promise != null) { - // Use tryFailure() instead of setFailure() to avoid the race against cancel(). - promise.tryFailure(DO_CLOSE_CLOSED_CHANNEL_EXCEPTION); - connectPromise = null; - } - - ScheduledFuture<?> future = connectTimeoutFuture; - if (future != null) { - future.cancel(false); - connectTimeoutFuture = null; - } - - if (isRegistered()) { - // Need to check if we are on the EventLoop as doClose() may be triggered by the GlobalEventExecutor - // if SO_LINGER is used. - // - // See https://github.com/netty/netty/issues/7159 - EventLoop loop = eventLoop(); - if (loop.inEventLoop()) { - doDeregister(); - } else { - loop.execute(new Runnable() { - @Override - public void run() { - try { - doDeregister(); - } catch (Throwable cause) { - pipeline().fireExceptionCaught(cause); - } - } - }); - } - } - } finally { - socket.close(); - } - } - - @Override - protected void doDisconnect() throws Exception { - doClose(); - } - - @Override - protected boolean isCompatible(EventLoop loop) { - return loop instanceof RawSocketEventLoop; - } - - @Override - public boolean isOpen() { - return socket.isOpen(); - } - - @Override - protected void doDeregister() throws Exception { - ((RawSocketEventLoop) eventLoop()).remove(this); - } - - @Override - protected final void doBeginRead() throws Exception { - // Channel.read() or ChannelHandlerContext.read() was called - final AbstractEpollUnsafe unsafe = (AbstractEpollUnsafe) unsafe(); - unsafe.readPending = true; - - // We must set the read flag here as it is possible the user didn't read in the last read loop, the - // executeEpollInReadyRunnable could read nothing, and if the user doesn't explicitly call read they will - // never get data after this. - setFlag(readFlag); - - // If EPOLL ET mode is enabled and auto read was toggled off on the last read loop then we may not be notified - // again if we didn't consume all the data. So we force a read operation here if there maybe more data. - if (unsafe.maybeMoreDataToRead) { - unsafe.executeEpollInReadyRunnable(config()); - } - } - - final boolean shouldBreakEpollInReady(ChannelConfig config) { - return socket.isInputShutdown() && (inputClosedSeenErrorOnRead || !isAllowHalfClosure(config)); - } - - private static boolean isAllowHalfClosure(ChannelConfig config) { - return config instanceof RawSocketSocketChannelConfig && - ((RawSocketSocketChannelConfig) config).isAllowHalfClosure(); - } - - final void clearEpollIn() { - // Only clear if registered with an EventLoop as otherwise - if (isRegistered()) { - final EventLoop loop = eventLoop(); - final AbstractEpollUnsafe unsafe = (AbstractEpollUnsafe) unsafe(); - if (loop.inEventLoop()) { - unsafe.clearEpollIn0(); - } else { - // schedule a task to clear the EPOLLIN as it is not safe to modify it directly - loop.execute(new Runnable() { - @Override - public void run() { - if (!unsafe.readPending && !config().isAutoRead()) { - // Still no read triggered so clear it now - unsafe.clearEpollIn0(); - } - } - }); - } - } else { - // The EventLoop is not registered atm so just update the flags so the correct value - // will be used once the channel is registered - flags &= ~readFlag; - } - } - - private void modifyEvents() throws IOException { - if (isOpen() && isRegistered()) { - ((RawSocketEventLoop) eventLoop()).modify(this); - } - } - - @Override - protected void doRegister() throws Exception { - // Just in case the previous EventLoop was shutdown abruptly, or an event is still pending on the old EventLoop - // make sure the epollInReadyRunnablePending variable is reset so we will be able to execute the Runnable on the - // new EventLoop. - epollInReadyRunnablePending = false; - ((RawSocketEventLoop) eventLoop()).add(this); - } - - @Override - protected abstract AbstractEpollUnsafe newUnsafe(); - - /** - * Returns an off-heap copy of the specified {@link ByteBuf}, and releases the original one. - */ - protected final ByteBuf newDirectBuffer(ByteBuf buf) { - return newDirectBuffer(buf, buf); - } - - /** - * Returns an off-heap copy of the specified {@link ByteBuf}, and releases the specified holder. - * The caller must ensure that the holder releases the original {@link ByteBuf} when the holder is released by - * this method. - */ - protected final ByteBuf newDirectBuffer(Object holder, ByteBuf buf) { - final int readableBytes = buf.readableBytes(); - if (readableBytes == 0) { - ReferenceCountUtil.release(holder); - return Unpooled.EMPTY_BUFFER; - } - - final ByteBufAllocator alloc = alloc(); - if (alloc.isDirectBufferPooled()) { - return newDirectBuffer0(holder, buf, alloc, readableBytes); - } - - final ByteBuf directBuf = ByteBufUtil.threadLocalDirectBuffer(); - if (directBuf == null) { - return newDirectBuffer0(holder, buf, alloc, readableBytes); - } - - directBuf.writeBytes(buf, buf.readerIndex(), readableBytes); - ReferenceCountUtil.safeRelease(holder); - return directBuf; - } - - private static ByteBuf newDirectBuffer0(Object holder, ByteBuf buf, ByteBufAllocator alloc, int capacity) { - final ByteBuf directBuf = alloc.directBuffer(capacity); - directBuf.writeBytes(buf, buf.readerIndex(), capacity); - ReferenceCountUtil.safeRelease(holder); - return directBuf; - } - - protected static void checkResolvable(InetSocketAddress addr) { - if (addr.isUnresolved()) { - throw new UnresolvedAddressException(); - } - } - - /** - * Read bytes into the given {@link ByteBuf} and return the amount. - */ - protected final int doReadBytes(ByteBuf byteBuf) throws Exception { - int writerIndex = byteBuf.writerIndex(); - int localReadAmount; - unsafe().recvBufAllocHandle().attemptedBytesRead(byteBuf.writableBytes()); - if (byteBuf.hasMemoryAddress()) { - localReadAmount = socket.readAddress(byteBuf.memoryAddress(), writerIndex, byteBuf.capacity()); - } else { - ByteBuffer buf = byteBuf.internalNioBuffer(writerIndex, byteBuf.writableBytes()); - localReadAmount = socket.read(buf, buf.position(), buf.limit()); - } - if (localReadAmount > 0) { - byteBuf.writerIndex(writerIndex + localReadAmount); - } - return localReadAmount; - } - - protected final int doWriteBytes(ChannelOutboundBuffer in, ByteBuf buf) throws Exception { - if (buf.hasMemoryAddress()) { - int localFlushedAmount = socket.writeAddress(buf.memoryAddress(), buf.readerIndex(), buf.writerIndex()); - if (localFlushedAmount > 0) { - in.removeBytes(localFlushedAmount); - return 1; - } - } else { - final ByteBuffer nioBuf = buf.nioBufferCount() == 1 ? - buf.internalNioBuffer(buf.readerIndex(), buf.readableBytes()) : buf.nioBuffer(); - int localFlushedAmount = socket.write(nioBuf, nioBuf.position(), nioBuf.limit()); - if (localFlushedAmount > 0) { - nioBuf.position(nioBuf.position() + localFlushedAmount); - in.removeBytes(localFlushedAmount); - return 1; - } - } - return WRITE_STATUS_SNDBUF_FULL; - } - - protected abstract class AbstractEpollUnsafe extends AbstractUnsafe { - boolean readPending; - boolean maybeMoreDataToRead; - private RawSocketRecvByteAllocatorHandle allocHandle; - private final Runnable epollInReadyRunnable = new Runnable() { - @Override - public void run() { - epollInReadyRunnablePending = false; - epollInReady(); - } - }; - - /** - * Called once EPOLLIN event is ready to be processed - */ - abstract void epollInReady(); - - final void epollInBefore() { maybeMoreDataToRead = false; } - - final void epollInFinally(ChannelConfig config) { - maybeMoreDataToRead = allocHandle.isEdgeTriggered() && allocHandle.maybeMoreDataToRead(); - // Check if there is a readPending which was not processed yet. - // This could be for two reasons: - // * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method - // * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method - // - // See https://github.com/netty/netty/issues/2254 - if (!readPending && !config.isAutoRead()) { - clearEpollIn(); - } else if (readPending && maybeMoreDataToRead) { - // trigger a read again as there may be something left to read and because of epoll ET we - // will not get notified again until we read everything from the socket - // - // It is possible the last fireChannelRead call could cause the user to call read() again, or if - // autoRead is true the call to channelReadComplete would also call read, but maybeMoreDataToRead is set - // to false before every read operation to prevent re-entry into epollInReady() we will not read from - // the underlying OS again unless the user happens to call read again. - executeEpollInReadyRunnable(config); - } - } - - final void executeEpollInReadyRunnable(ChannelConfig config) { - if (epollInReadyRunnablePending || !isActive() || shouldBreakEpollInReady(config)) { - return; - } - epollInReadyRunnablePending = true; - eventLoop().execute(epollInReadyRunnable); - } - - /** - * Called once EPOLLRDHUP event is ready to be processed - */ - final void epollRdHupReady() { - // This must happen before we attempt to read. This will ensure reading continues until an error occurs. - recvBufAllocHandle().receivedRdHup(); - - if (isActive()) { - // If it is still active, we need to call epollInReady as otherwise we may miss to - // read pending data from the underlying file descriptor. - // See https://github.com/netty/netty/issues/3709 - epollInReady(); - } else { - // Just to be safe make sure the input marked as closed. - shutdownInput(true); - } - - // Clear the EPOLLRDHUP flag to prevent continuously getting woken up on this event. - clearEpollRdHup(); - } - - /** - * Clear the {@link Native#EPOLLRDHUP} flag from EPOLL, and close on failure. - */ - private void clearEpollRdHup() { - try { - clearFlag(Native.EPOLLRDHUP); - } catch (IOException e) { - pipeline().fireExceptionCaught(e); - close(voidPromise()); - } - } - - /** - * Shutdown the input side of the channel. - */ - void shutdownInput(boolean rdHup) { - if (!socket.isInputShutdown()) { - if (isAllowHalfClosure(config())) { - try { - socket.shutdown(true, false); - } catch (IOException ignored) { - // We attempted to shutdown and failed, which means the input has already effectively been - // shutdown. - fireEventAndClose(ChannelInputShutdownEvent.INSTANCE); - return; - } catch (NotYetConnectedException ignore) { - // We attempted to shutdown and failed, which means the input has already effectively been - // shutdown. - } - clearEpollIn(); - pipeline().fireUserEventTriggered(ChannelInputShutdownEvent.INSTANCE); - } else { - close(voidPromise()); - } - } else if (!rdHup) { - inputClosedSeenErrorOnRead = true; - pipeline().fireUserEventTriggered(ChannelInputShutdownReadComplete.INSTANCE); - } - } - - private void fireEventAndClose(Object evt) { - pipeline().fireUserEventTriggered(evt); - close(voidPromise()); - } - - @Override - public RawSocketRecvByteAllocatorHandle recvBufAllocHandle() { - if (allocHandle == null) { - allocHandle = newEpollHandle((RecvByteBufAllocator.ExtendedHandle) super.recvBufAllocHandle()); - } - return allocHandle; - } - - /** - * Create a new {@link RawSocketRecvByteAllocatorHandle} instance. - * @param handle The handle to wrap with EPOLL specific logic. - */ - RawSocketRecvByteAllocatorHandle newEpollHandle(RecvByteBufAllocator.ExtendedHandle handle) { - return new RawSocketRecvByteAllocatorHandle(handle); - } - - @Override - protected void flush0() { - // Flush immediately only when there's no pending flush. - // If there's a pending flush operation, event loop will call forceFlush() later, - // and thus there's no need to call it now. - if (isFlagSet(Native.EPOLLOUT)) { - return; - } - super.flush0(); - } - - /** - * Called once a EPOLLOUT event is ready to be processed - */ - final void epollOutReady() { - if (connectPromise != null) { - // pending connect which is now complete so handle it. - finishConnect(); - } else if (!socket.isOutputShutdown()) { - // directly call super.flush0() to force a flush now - super.flush0(); - } - } - - protected final void clearEpollIn0() { - assert eventLoop().inEventLoop(); - try { - readPending = false; - clearFlag(readFlag); - } catch (IOException e) { - // When this happens there is something completely wrong with either the filedescriptor or epoll, - // so fire the exception through the pipeline and close the Channel. - pipeline().fireExceptionCaught(e); - unsafe().close(unsafe().voidPromise()); - } - } - - @Override - public void connect( - final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) { - if (!promise.setUncancellable() || !ensureOpen(promise)) { - return; - } - - try { - if (connectPromise != null) { - throw new ConnectionPendingException(); - } - - boolean wasActive = isActive(); - if (doConnect(remoteAddress, localAddress)) { - fulfillConnectPromise(promise, wasActive); - } else { - connectPromise = promise; - requestedRemoteAddress = remoteAddress; - - // Schedule connect timeout. - int connectTimeoutMillis = config().getConnectTimeoutMillis(); - if (connectTimeoutMillis > 0) { - connectTimeoutFuture = eventLoop().schedule(new Runnable() { - @Override - public void run() { - ChannelPromise connectPromise = AbstractRawSocketChannel.this.connectPromise; - ConnectTimeoutException cause = - new ConnectTimeoutException("connection timed out: " + remoteAddress); - if (connectPromise != null && connectPromise.tryFailure(cause)) { - close(voidPromise()); - } - } - }, connectTimeoutMillis, TimeUnit.MILLISECONDS); - } - - promise.addListener(new ChannelFutureListener() { - @Override - public void operationComplete(ChannelFuture future) throws Exception { - if (future.isCancelled()) { - if (connectTimeoutFuture != null) { - connectTimeoutFuture.cancel(false); - } - connectPromise = null; - close(voidPromise()); - } - } - }); - } - } catch (Throwable t) { - closeIfClosed(); - promise.tryFailure(annotateConnectException(t, remoteAddress)); - } - } - - private void fulfillConnectPromise(ChannelPromise promise, boolean wasActive) { - if (promise == null) { - // Closed via cancellation and the promise has been notified already. - return; - } - active = true; - - // Get the state as trySuccess() may trigger an ChannelFutureListener that will close the Channel. - // We still need to ensure we call fireChannelActive() in this case. - boolean active = isActive(); - - // trySuccess() will return false if a user cancelled the connection attempt. - boolean promiseSet = promise.trySuccess(); - - // Regardless if the connection attempt was cancelled, channelActive() event should be triggered, - // because what happened is what happened. - if (!wasActive && active) { - pipeline().fireChannelActive(); - } - - // If a user cancelled the connection attempt, close the channel, which is followed by channelInactive(). - if (!promiseSet) { - close(voidPromise()); - } - } - - private void fulfillConnectPromise(ChannelPromise promise, Throwable cause) { - if (promise == null) { - // Closed via cancellation and the promise has been notified already. - return; - } - - // Use tryFailure() instead of setFailure() to avoid the race against cancel(). - promise.tryFailure(cause); - closeIfClosed(); - } - - private void finishConnect() { - // Note this method is invoked by the event loop only if the connection attempt was - // neither cancelled nor timed out. - - assert eventLoop().inEventLoop(); - - boolean connectStillInProgress = false; - try { - boolean wasActive = isActive(); - if (!doFinishConnect()) { - connectStillInProgress = true; - return; - } - fulfillConnectPromise(connectPromise, wasActive); - } catch (Throwable t) { - fulfillConnectPromise(connectPromise, annotateConnectException(t, requestedRemoteAddress)); - } finally { - if (!connectStillInProgress) { - // Check for null as the connectTimeoutFuture is only created if a connectTimeoutMillis > 0 is used - // See https://github.com/netty/netty/issues/1770 - if (connectTimeoutFuture != null) { - connectTimeoutFuture.cancel(false); - } - connectPromise = null; - } - } - } - - /** - * Finish the connect - */ - private boolean doFinishConnect() throws Exception { - if (socket.finishConnect()) { - clearFlag(Native.EPOLLOUT); - if (requestedRemoteAddress instanceof InetSocketAddress) { - remote = computeRemoteAddr((InetSocketAddress) requestedRemoteAddress, socket.remoteAddress()); - } - requestedRemoteAddress = null; - - return true; - } - setFlag(Native.EPOLLOUT); - return false; - } - } - - @Override - protected void doBind(SocketAddress local) throws Exception { - if (local instanceof InetSocketAddress) { - checkResolvable((InetSocketAddress) local); - } - socket.bind(local); - this.local = socket.localAddress(); - } - - /** - * Connect to the remote peer - */ - protected boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception { - if (localAddress instanceof InetSocketAddress) { - checkResolvable((InetSocketAddress) localAddress); - } - - InetSocketAddress remoteSocketAddr = remoteAddress instanceof InetSocketAddress - ? (InetSocketAddress) remoteAddress : null; - if (remoteSocketAddr != null) { - checkResolvable(remoteSocketAddr); - } - - if (remote != null) { - // Check if already connected before trying to connect. This is needed as connect(...) will not return -1 - // and set errno to EISCONN if a previous connect(...) attempt was setting errno to EINPROGRESS and finished - // later. - throw new AlreadyConnectedException(); - } - - if (localAddress != null) { - socket.bind(localAddress); - } - - boolean connected = doConnect0(remoteAddress); - if (connected) { - remote = remoteSocketAddr == null ? - remoteAddress : computeRemoteAddr(remoteSocketAddr, socket.remoteAddress()); - } - // We always need to set the localAddress even if not connected yet as the bind already took place. - // - // See https://github.com/netty/netty/issues/3463 - local = socket.localAddress(); - return connected; - } - - private boolean doConnect0(SocketAddress remote) throws Exception { - boolean success = false; - try { - boolean connected = socket.connect(remote); - if (!connected) { - setFlag(Native.EPOLLOUT); - } - success = true; - return connected; - } finally { - if (!success) { - doClose(); - } - } - } - - @Override - protected SocketAddress localAddress0() { - return local; - } - - @Override - protected SocketAddress remoteAddress0() { - return remote; - } -} diff --git a/plc4j/utils/raw-sockets/src/main/java-sav/org/apache/plc4x/java/utils/rawsockets/netty/AbstractRawSocketStreamChannel.java b/plc4j/utils/raw-sockets/src/main/java-sav/org/apache/plc4x/java/utils/rawsockets/netty/AbstractRawSocketStreamChannel.java deleted file mode 100644 index a1f2ff4..0000000 --- a/plc4j/utils/raw-sockets/src/main/java-sav/org/apache/plc4x/java/utils/rawsockets/netty/AbstractRawSocketStreamChannel.java +++ /dev/null @@ -1,1042 +0,0 @@ -/* - * Copyright 2015 The Netty Project - * - * The Netty Project licenses this file to you under the Apache License, - * version 2.0 (the "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations - * under the License. - */ -package org.apache.plc4x.java.utils.rawsockets.netty; - -import io.netty.buffer.ByteBuf; -import io.netty.buffer.ByteBufAllocator; -import io.netty.channel.*; -import io.netty.channel.internal.ChannelUtils; -import io.netty.channel.socket.DuplexChannel; -import io.netty.channel.unix.FileDescriptor; -import io.netty.channel.unix.IovArray; -import io.netty.channel.unix.SocketWritableByteChannel; -import io.netty.channel.unix.UnixChannelUtil; -import io.netty.util.internal.PlatformDependent; -import io.netty.util.internal.StringUtil; -import io.netty.util.internal.ThrowableUtil; -import io.netty.util.internal.UnstableApi; -import io.netty.util.internal.logging.InternalLogger; -import io.netty.util.internal.logging.InternalLoggerFactory; - -import java.io.IOException; -import java.net.SocketAddress; -import java.nio.ByteBuffer; -import java.nio.channels.ClosedChannelException; -import java.nio.channels.WritableByteChannel; -import java.util.Queue; -import java.util.concurrent.Executor; - -import static io.netty.channel.internal.ChannelUtils.MAX_BYTES_PER_GATHERING_WRITE_ATTEMPTED_LOW_THRESHOLD; -import static io.netty.channel.internal.ChannelUtils.WRITE_STATUS_SNDBUF_FULL; -import static io.netty.channel.unix.FileDescriptor.pipe; -import static io.netty.util.internal.ObjectUtil.checkNotNull; - -public abstract class AbstractRawSocketStreamChannel extends AbstractRawSocketChannel implements DuplexChannel { - private static final ChannelMetadata METADATA = new ChannelMetadata(false, 16); - private static final String EXPECTED_TYPES = - " (expected: " + StringUtil.simpleClassName(ByteBuf.class) + ", " + - StringUtil.simpleClassName(DefaultFileRegion.class) + ')'; - private static final InternalLogger logger = InternalLoggerFactory.getInstance(AbstractRawSocketStreamChannel.class); - private static final ClosedChannelException CLEAR_SPLICE_QUEUE_CLOSED_CHANNEL_EXCEPTION = - ThrowableUtil.unknownStackTrace(new ClosedChannelException(), - AbstractRawSocketStreamChannel.class, "clearSpliceQueue()"); - private static final ClosedChannelException SPLICE_TO_CLOSED_CHANNEL_EXCEPTION = ThrowableUtil.unknownStackTrace( - new ClosedChannelException(), - AbstractRawSocketStreamChannel.class, "spliceTo(...)"); - private static final ClosedChannelException FAIL_SPLICE_IF_CLOSED_CLOSED_CHANNEL_EXCEPTION = - ThrowableUtil.unknownStackTrace(new ClosedChannelException(), - AbstractRawSocketStreamChannel.class, "failSpliceIfClosed(...)"); - private final Runnable flushTask = new Runnable() { - @Override - public void run() { - flush(); - } - }; - private Queue<SpliceInTask> spliceQueue; - - // Lazy init these if we need to splice(...) - private FileDescriptor pipeIn; - private FileDescriptor pipeOut; - - private WritableByteChannel byteChannel; - - protected AbstractRawSocketStreamChannel(Channel parent, int fd) { - this(parent, new LinuxSocket(fd)); - } - - protected AbstractRawSocketStreamChannel(int fd) { - this(new LinuxSocket(fd)); - } - - AbstractRawSocketStreamChannel(LinuxSocket fd) { - this(fd, isSoErrorZero(fd)); - } - - AbstractRawSocketStreamChannel(Channel parent, LinuxSocket fd) { - super(parent, fd, Native.EPOLLIN, true); - // Add EPOLLRDHUP so we are notified once the remote peer close the connection. - flags |= Native.EPOLLRDHUP; - } - - AbstractRawSocketStreamChannel(Channel parent, LinuxSocket fd, SocketAddress remote) { - super(parent, fd, Native.EPOLLIN, remote); - // Add EPOLLRDHUP so we are notified once the remote peer close the connection. - flags |= Native.EPOLLRDHUP; - } - - protected AbstractRawSocketStreamChannel(LinuxSocket fd, boolean active) { - super(null, fd, Native.EPOLLIN, active); - // Add EPOLLRDHUP so we are notified once the remote peer close the connection. - flags |= Native.EPOLLRDHUP; - } - - @Override - protected AbstractEpollUnsafe newUnsafe() { - return new EpollStreamUnsafe(); - } - - @Override - public ChannelMetadata metadata() { - return METADATA; - } - - /** - * Splice from this {@link AbstractRawSocketStreamChannel} to another {@link AbstractRawSocketStreamChannel}. - * The {@code len} is the number of bytes to splice. If using {@link Integer#MAX_VALUE} it will - * splice until the {@link ChannelFuture} was canceled or it was failed. - * - * Please note: - * <ul> - * <li>both channels need to be registered to the same {@link EventLoop}, otherwise an - * {@link IllegalArgumentException} is thrown. </li> - * <li>{@link RawSocketChannelConfig#getEpollMode()} must be {@link RawSocketMode#LEVEL_TRIGGERED} for this and the - * target {@link AbstractRawSocketStreamChannel}</li> - * </ul> - * - */ - public final ChannelFuture spliceTo(final AbstractRawSocketStreamChannel ch, final int len) { - return spliceTo(ch, len, newPromise()); - } - - /** - * Splice from this {@link AbstractRawSocketStreamChannel} to another {@link AbstractRawSocketStreamChannel}. - * The {@code len} is the number of bytes to splice. If using {@link Integer#MAX_VALUE} it will - * splice until the {@link ChannelFuture} was canceled or it was failed. - * - * Please note: - * <ul> - * <li>both channels need to be registered to the same {@link EventLoop}, otherwise an - * {@link IllegalArgumentException} is thrown. </li> - * <li>{@link RawSocketChannelConfig#getEpollMode()} must be {@link RawSocketMode#LEVEL_TRIGGERED} for this and the - * target {@link AbstractRawSocketStreamChannel}</li> - * </ul> - * - */ - public final ChannelFuture spliceTo(final AbstractRawSocketStreamChannel ch, final int len, - final ChannelPromise promise) { - if (ch.eventLoop() != eventLoop()) { - throw new IllegalArgumentException("EventLoops are not the same."); - } - if (len < 0) { - throw new IllegalArgumentException("len: " + len + " (expected: >= 0)"); - } - if (ch.config().getEpollMode() != EpollMode.LEVEL_TRIGGERED - || config().getEpollMode() != EpollMode.LEVEL_TRIGGERED) { - throw new IllegalStateException("spliceTo() supported only when using " + EpollMode.LEVEL_TRIGGERED); - } - checkNotNull(promise, "promise"); - if (!isOpen()) { - promise.tryFailure(SPLICE_TO_CLOSED_CHANNEL_EXCEPTION); - } else { - addToSpliceQueue(new SpliceInChannelTask(ch, len, promise)); - failSpliceIfClosed(promise); - } - return promise; - } - - /** - * Splice from this {@link AbstractRawSocketStreamChannel} to another {@link FileDescriptor}. - * The {@code offset} is the offset for the {@link FileDescriptor} and {@code len} is the - * number of bytes to splice. If using {@link Integer#MAX_VALUE} it will splice until the - * {@link ChannelFuture} was canceled or it was failed. - * - * Please note: - * <ul> - * <li>{@link RawSocketChannelConfig#getRawSocketMode()} must be {@link RawSocketMode#LEVEL_TRIGGERED} for this - * {@link AbstractRawSocketStreamChannel}</li> - * <li>the {@link FileDescriptor} will not be closed after the {@link ChannelFuture} is notified</li> - * <li>this channel must be registered to an event loop or {@link IllegalStateException} will be thrown.</li> - * </ul> - */ - public final ChannelFuture spliceTo(final FileDescriptor ch, final int offset, final int len) { - return spliceTo(ch, offset, len, newPromise()); - } - - /** - * Splice from this {@link AbstractRawSocketStreamChannel} to another {@link FileDescriptor}. - * The {@code offset} is the offset for the {@link FileDescriptor} and {@code len} is the - * number of bytes to splice. If using {@link Integer#MAX_VALUE} it will splice until the - * {@link ChannelFuture} was canceled or it was failed. - * - * Please note: - * <ul> - * <li>{@link RawSocketChannelConfig#getRawSocketMode()} must be {@link RawSocketMode#LEVEL_TRIGGERED} for this - * {@link AbstractRawSocketStreamChannel}</li> - * <li>the {@link FileDescriptor} will not be closed after the {@link ChannelPromise} is notified</li> - * <li>this channel must be registered to an event loop or {@link IllegalStateException} will be thrown.</li> - * </ul> - */ - public final ChannelFuture spliceTo(final FileDescriptor ch, final int offset, final int len, - final ChannelPromise promise) { - if (len < 0) { - throw new IllegalArgumentException("len: " + len + " (expected: >= 0)"); - } - if (offset < 0) { - throw new IllegalArgumentException("offset must be >= 0 but was " + offset); - } - if (config().getRawSocketMode() != RawSocketMode.LEVEL_TRIGGERED) { - throw new IllegalStateException("spliceTo() supported only when using " + RawSocketMode.LEVEL_TRIGGERED); - } - checkNotNull(promise, "promise"); - if (!isOpen()) { - promise.tryFailure(SPLICE_TO_CLOSED_CHANNEL_EXCEPTION); - } else { - addToSpliceQueue(new SpliceFdTask(ch, offset, len, promise)); - failSpliceIfClosed(promise); - } - return promise; - } - - private void failSpliceIfClosed(ChannelPromise promise) { - if (!isOpen()) { - // Seems like the Channel was closed in the meantime try to fail the promise to prevent any - // cases where a future may not be notified otherwise. - if (promise.tryFailure(FAIL_SPLICE_IF_CLOSED_CLOSED_CHANNEL_EXCEPTION)) { - eventLoop().execute(new Runnable() { - @Override - public void run() { - // Call this via the EventLoop as it is a MPSC queue. - clearSpliceQueue(); - } - }); - } - } - } - - /** - * Write bytes form the given {@link ByteBuf} to the underlying {@link java.nio.channels.Channel}. - * @param in the collection which contains objects to write. - * @param buf the {@link ByteBuf} from which the bytes should be written - * @return The value that should be decremented from the write quantum which starts at - * {@link ChannelConfig#getWriteSpinCount()}. The typical use cases are as follows: - * <ul> - * <li>0 - if no write was attempted. This is appropriate if an empty {@link ByteBuf} (or other empty content) - * is encountered</li> - * <li>1 - if a single call to write data was made to the OS</li> - * <li>{@link ChannelUtils#WRITE_STATUS_SNDBUF_FULL} - if an attempt to write data was made to the OS, but - * no data was accepted</li> - * </ul> - */ - private int writeBytes(ChannelOutboundBuffer in, ByteBuf buf) throws Exception { - int readableBytes = buf.readableBytes(); - if (readableBytes == 0) { - in.remove(); - return 0; - } - - if (buf.hasMemoryAddress() || buf.nioBufferCount() == 1) { - return doWriteBytes(in, buf); - } else { - ByteBuffer[] nioBuffers = buf.nioBuffers(); - return writeBytesMultiple(in, nioBuffers, nioBuffers.length, readableBytes, - config().getMaxBytesPerGatheringWrite()); - } - } - - private void adjustMaxBytesPerGatheringWrite(long attempted, long written, long oldMaxBytesPerGatheringWrite) { - // By default we track the SO_SNDBUF when ever it is explicitly set. However some OSes may dynamically change - // SO_SNDBUF (and other characteristics that determine how much data can be written at once) so we should try - // make a best effort to adjust as OS behavior changes. - if (attempted == written) { - if (attempted << 1 > oldMaxBytesPerGatheringWrite) { - config().setMaxBytesPerGatheringWrite(attempted << 1); - } - } else if (attempted > MAX_BYTES_PER_GATHERING_WRITE_ATTEMPTED_LOW_THRESHOLD && written < attempted >>> 1) { - config().setMaxBytesPerGatheringWrite(attempted >>> 1); - } - } - - /** - * Write multiple bytes via {@link IovArray}. - * @param in the collection which contains objects to write. - * @param array The array which contains the content to write. - * @return The value that should be decremented from the write quantum which starts at - * {@link ChannelConfig#getWriteSpinCount()}. The typical use cases are as follows: - * <ul> - * <li>0 - if no write was attempted. This is appropriate if an empty {@link ByteBuf} (or other empty content) - * is encountered</li> - * <li>1 - if a single call to write data was made to the OS</li> - * <li>{@link ChannelUtils#WRITE_STATUS_SNDBUF_FULL} - if an attempt to write data was made to the OS, but - * no data was accepted</li> - * </ul> - * @throws IOException If an I/O exception occurs during write. - */ - private int writeBytesMultiple(ChannelOutboundBuffer in, IovArray array) throws IOException { - final long expectedWrittenBytes = array.size(); - assert expectedWrittenBytes != 0; - final int cnt = array.count(); - assert cnt != 0; - - final long localWrittenBytes = socket.writevAddresses(array.memoryAddress(0), cnt); - if (localWrittenBytes > 0) { - adjustMaxBytesPerGatheringWrite(expectedWrittenBytes, localWrittenBytes, array.maxBytes()); - in.removeBytes(localWrittenBytes); - return 1; - } - return WRITE_STATUS_SNDBUF_FULL; - } - - /** - * Write multiple bytes via {@link ByteBuffer} array. - * @param in the collection which contains objects to write. - * @param nioBuffers The buffers to write. - * @param nioBufferCnt The number of buffers to write. - * @param expectedWrittenBytes The number of bytes we expect to write. - * @param maxBytesPerGatheringWrite The maximum number of bytes we should attempt to write. - * @return The value that should be decremented from the write quantum which starts at - * {@link ChannelConfig#getWriteSpinCount()}. The typical use cases are as follows: - * <ul> - * <li>0 - if no write was attempted. This is appropriate if an empty {@link ByteBuf} (or other empty content) - * is encountered</li> - * <li>1 - if a single call to write data was made to the OS</li> - * <li>{@link ChannelUtils#WRITE_STATUS_SNDBUF_FULL} - if an attempt to write data was made to the OS, but - * no data was accepted</li> - * </ul> - * @throws IOException If an I/O exception occurs during write. - */ - private int writeBytesMultiple( - ChannelOutboundBuffer in, ByteBuffer[] nioBuffers, int nioBufferCnt, long expectedWrittenBytes, - long maxBytesPerGatheringWrite) throws IOException { - assert expectedWrittenBytes != 0; - if (expectedWrittenBytes > maxBytesPerGatheringWrite) { - expectedWrittenBytes = maxBytesPerGatheringWrite; - } - - final long localWrittenBytes = socket.writev(nioBuffers, 0, nioBufferCnt, expectedWrittenBytes); - if (localWrittenBytes > 0) { - adjustMaxBytesPerGatheringWrite(expectedWrittenBytes, localWrittenBytes, maxBytesPerGatheringWrite); - in.removeBytes(localWrittenBytes); - return 1; - } - return WRITE_STATUS_SNDBUF_FULL; - } - - /** - * Write a {@link DefaultFileRegion} - * @param in the collection which contains objects to write. - * @param region the {@link DefaultFileRegion} from which the bytes should be written - * @return The value that should be decremented from the write quantum which starts at - * {@link ChannelConfig#getWriteSpinCount()}. The typical use cases are as follows: - * <ul> - * <li>0 - if no write was attempted. This is appropriate if an empty {@link ByteBuf} (or other empty content) - * is encountered</li> - * <li>1 - if a single call to write data was made to the OS</li> - * <li>{@link ChannelUtils#WRITE_STATUS_SNDBUF_FULL} - if an attempt to write data was made to the OS, but - * no data was accepted</li> - * </ul> - */ - private int writeDefaultFileRegion(ChannelOutboundBuffer in, DefaultFileRegion region) throws Exception { - final long regionCount = region.count(); - if (region.transferred() >= regionCount) { - in.remove(); - return 0; - } - - final long offset = region.transferred(); - final long flushedAmount = socket.sendFile(region, region.position(), offset, regionCount - offset); - if (flushedAmount > 0) { - in.progress(flushedAmount); - if (region.transferred() >= regionCount) { - in.remove(); - } - return 1; - } - return WRITE_STATUS_SNDBUF_FULL; - } - - /** - * Write a {@link FileRegion} - * @param in the collection which contains objects to write. - * @param region the {@link FileRegion} from which the bytes should be written - * @return The value that should be decremented from the write quantum which starts at - * {@link ChannelConfig#getWriteSpinCount()}. The typical use cases are as follows: - * <ul> - * <li>0 - if no write was attempted. This is appropriate if an empty {@link ByteBuf} (or other empty content) - * is encountered</li> - * <li>1 - if a single call to write data was made to the OS</li> - * <li>{@link ChannelUtils#WRITE_STATUS_SNDBUF_FULL} - if an attempt to write data was made to the OS, but - * no data was accepted</li> - * </ul> - */ - private int writeFileRegion(ChannelOutboundBuffer in, FileRegion region) throws Exception { - if (region.transferred() >= region.count()) { - in.remove(); - return 0; - } - - if (byteChannel == null) { - byteChannel = new RawSocketSocketWritableByteChannel(); - } - final long flushedAmount = region.transferTo(byteChannel, region.transferred()); - if (flushedAmount > 0) { - in.progress(flushedAmount); - if (region.transferred() >= region.count()) { - in.remove(); - } - return 1; - } - return WRITE_STATUS_SNDBUF_FULL; - } - - @Override - protected void doWrite(ChannelOutboundBuffer in) throws Exception { - int writeSpinCount = config().getWriteSpinCount(); - do { - final int msgCount = in.size(); - // Do gathering write if the outbound buffer entries start with more than one ByteBuf. - if (msgCount > 1 && in.current() instanceof ByteBuf) { - writeSpinCount -= doWriteMultiple(in); - } else if (msgCount == 0) { - // Wrote all messages. - clearFlag(Native.EPOLLOUT); - // Return here so we not set the EPOLLOUT flag. - return; - } else { // msgCount == 1 - writeSpinCount -= doWriteSingle(in); - } - - // We do not break the loop here even if the outbound buffer was flushed completely, - // because a user might have triggered another write and flush when we notify his or her - // listeners. - } while (writeSpinCount > 0); - - if (writeSpinCount == 0) { - // We used our writeSpin quantum, and should try to write again later. - eventLoop().execute(flushTask); - } else { - // Underlying descriptor can not accept all data currently, so set the EPOLLOUT flag to be woken up - // when it can accept more data. - setFlag(Native.EPOLLOUT); - } - } - - /** - * Attempt to write a single object. - * @param in the collection which contains objects to write. - * @return The value that should be decremented from the write quantum which starts at - * {@link ChannelConfig#getWriteSpinCount()}. The typical use cases are as follows: - * <ul> - * <li>0 - if no write was attempted. This is appropriate if an empty {@link ByteBuf} (or other empty content) - * is encountered</li> - * <li>1 - if a single call to write data was made to the OS</li> - * <li>{@link ChannelUtils#WRITE_STATUS_SNDBUF_FULL} - if an attempt to write data was made to the OS, but - * no data was accepted</li> - * </ul> - * @throws Exception If an I/O error occurs. - */ - protected int doWriteSingle(ChannelOutboundBuffer in) throws Exception { - // The outbound buffer contains only one message or it contains a file region. - Object msg = in.current(); - if (msg instanceof ByteBuf) { - return writeBytes(in, (ByteBuf) msg); - } else if (msg instanceof DefaultFileRegion) { - return writeDefaultFileRegion(in, (DefaultFileRegion) msg); - } else if (msg instanceof FileRegion) { - return writeFileRegion(in, (FileRegion) msg); - } else if (msg instanceof SpliceOutTask) { - if (!((SpliceOutTask) msg).spliceOut()) { - return WRITE_STATUS_SNDBUF_FULL; - } - in.remove(); - return 1; - } else { - // Should never reach here. - throw new Error(); - } - } - - /** - * Attempt to write multiple {@link ByteBuf} objects. - * @param in the collection which contains objects to write. - * @return The value that should be decremented from the write quantum which starts at - * {@link ChannelConfig#getWriteSpinCount()}. The typical use cases are as follows: - * <ul> - * <li>0 - if no write was attempted. This is appropriate if an empty {@link ByteBuf} (or other empty content) - * is encountered</li> - * <li>1 - if a single call to write data was made to the OS</li> - * <li>{@link ChannelUtils#WRITE_STATUS_SNDBUF_FULL} - if an attempt to write data was made to the OS, but - * no data was accepted</li> - * </ul> - * @throws Exception If an I/O error occurs. - */ - private int doWriteMultiple(ChannelOutboundBuffer in) throws Exception { - final long maxBytesPerGatheringWrite = config().getMaxBytesPerGatheringWrite(); - if (PlatformDependent.hasUnsafe()) { - IovArray array = ((RawSocketEventLoop) eventLoop()).cleanArray(); - array.maxBytes(maxBytesPerGatheringWrite); - in.forEachFlushedMessage(array); - - if (array.count() >= 1) { - // TODO: Handle the case where cnt == 1 specially. - return writeBytesMultiple(in, array); - } - } else { - ByteBuffer[] buffers = in.nioBuffers(); - int cnt = in.nioBufferCount(); - if (cnt >= 1) { - // TODO: Handle the case where cnt == 1 specially. - return writeBytesMultiple(in, buffers, cnt, in.nioBufferSize(), maxBytesPerGatheringWrite); - } - } - // cnt == 0, which means the outbound buffer contained empty buffers only. - in.removeBytes(0); - return 0; - } - - @Override - protected Object filterOutboundMessage(Object msg) { - if (msg instanceof ByteBuf) { - ByteBuf buf = (ByteBuf) msg; - return UnixChannelUtil.isBufferCopyNeededForWrite(buf)? newDirectBuffer(buf): buf; - } - - if (msg instanceof FileRegion || msg instanceof SpliceOutTask) { - return msg; - } - - throw new UnsupportedOperationException( - "unsupported message type: " + StringUtil.simpleClassName(msg) + EXPECTED_TYPES); - } - - @UnstableApi - @Override - protected final void doShutdownOutput() throws Exception { - socket.shutdown(false, true); - } - - private void shutdownInput0(final ChannelPromise promise) { - try { - socket.shutdown(true, false); - promise.setSuccess(); - } catch (Throwable cause) { - promise.setFailure(cause); - } - } - - @Override - public boolean isOutputShutdown() { - return socket.isOutputShutdown(); - } - - @Override - public boolean isInputShutdown() { - return socket.isInputShutdown(); - } - - @Override - public boolean isShutdown() { - return socket.isShutdown(); - } - - @Override - public ChannelFuture shutdownOutput() { - return shutdownOutput(newPromise()); - } - - @Override - public ChannelFuture shutdownOutput(final ChannelPromise promise) { - EventLoop loop = eventLoop(); - if (loop.inEventLoop()) { - ((AbstractChannel.AbstractUnsafe) unsafe()).shutdownOutput(promise); - } else { - loop.execute(new Runnable() { - @Override - public void run() { - ((AbstractChannel.AbstractUnsafe) unsafe()).shutdownOutput(promise); - } - }); - } - - return promise; - } - - @Override - public ChannelFuture shutdownInput() { - return shutdownInput(newPromise()); - } - - @Override - public ChannelFuture shutdownInput(final ChannelPromise promise) { - Executor closeExecutor = ((RawSocketStreamUnsafe) unsafe()).prepareToClose(); - if (closeExecutor != null) { - closeExecutor.execute(new Runnable() { - @Override - public void run() { - shutdownInput0(promise); - } - }); - } else { - EventLoop loop = eventLoop(); - if (loop.inEventLoop()) { - shutdownInput0(promise); - } else { - loop.execute(new Runnable() { - @Override - public void run() { - shutdownInput0(promise); - } - }); - } - } - return promise; - } - - @Override - public ChannelFuture shutdown() { - return shutdown(newPromise()); - } - - @Override - public ChannelFuture shutdown(final ChannelPromise promise) { - ChannelFuture shutdownOutputFuture = shutdownOutput(); - if (shutdownOutputFuture.isDone()) { - shutdownOutputDone(shutdownOutputFuture, promise); - } else { - shutdownOutputFuture.addListener(new ChannelFutureListener() { - @Override - public void operationComplete(final ChannelFuture shutdownOutputFuture) throws Exception { - shutdownOutputDone(shutdownOutputFuture, promise); - } - }); - } - return promise; - } - - private void shutdownOutputDone(final ChannelFuture shutdownOutputFuture, final ChannelPromise promise) { - ChannelFuture shutdownInputFuture = shutdownInput(); - if (shutdownInputFuture.isDone()) { - shutdownDone(shutdownOutputFuture, shutdownInputFuture, promise); - } else { - shutdownInputFuture.addListener(new ChannelFutureListener() { - @Override - public void operationComplete(ChannelFuture shutdownInputFuture) throws Exception { - shutdownDone(shutdownOutputFuture, shutdownInputFuture, promise); - } - }); - } - } - - private static void shutdownDone(ChannelFuture shutdownOutputFuture, - ChannelFuture shutdownInputFuture, - ChannelPromise promise) { - Throwable shutdownOutputCause = shutdownOutputFuture.cause(); - Throwable shutdownInputCause = shutdownInputFuture.cause(); - if (shutdownOutputCause != null) { - if (shutdownInputCause != null) { - logger.debug("Exception suppressed because a previous exception occurred.", - shutdownInputCause); - } - promise.setFailure(shutdownOutputCause); - } else if (shutdownInputCause != null) { - promise.setFailure(shutdownInputCause); - } else { - promise.setSuccess(); - } - } - - @Override - protected void doClose() throws Exception { - try { - // Calling super.doClose() first so spliceTo(...) will fail on next call. - super.doClose(); - } finally { - safeClosePipe(pipeIn); - safeClosePipe(pipeOut); - clearSpliceQueue(); - } - } - - private void clearSpliceQueue() { - if (spliceQueue == null) { - return; - } - for (;;) { - SpliceInTask task = spliceQueue.poll(); - if (task == null) { - break; - } - task.promise.tryFailure(CLEAR_SPLICE_QUEUE_CLOSED_CHANNEL_EXCEPTION); - } - } - - private static void safeClosePipe(FileDescriptor fd) { - if (fd != null) { - try { - fd.close(); - } catch (IOException e) { - if (logger.isWarnEnabled()) { - logger.warn("Error while closing a pipe", e); - } - } - } - } - - class RawSocketStreamUnsafe extends AbstractRawSocketUnsafe { - // Overridden here just to be able to access this method from AbstractRawSocketStreamChannel - @Override - protected Executor prepareToClose() { - return super.prepareToClose(); - } - - private void handleReadException(ChannelPipeline pipeline, ByteBuf byteBuf, Throwable cause, boolean close, - RawSocketRecvByteAllocatorHandle allocHandle) { - if (byteBuf != null) { - if (byteBuf.isReadable()) { - readPending = false; - pipeline.fireChannelRead(byteBuf); - } else { - byteBuf.release(); - } - } - allocHandle.readComplete(); - pipeline.fireChannelReadComplete(); - pipeline.fireExceptionCaught(cause); - if (close || cause instanceof IOException) { - shutdownInput(false); - } - } - - @Override - RawSocketRecvByteAllocatorHandle newRawSocketHandle(RecvByteBufAllocator.ExtendedHandle handle) { - return new RawSocketRecvByteAllocatorStreamingHandle(handle); - } - - @Override - void RawSocketInReady() { - final ChannelConfig config = config(); - if (shouldBreakRawSocketInReady(config)) { - clearRawSocketIn0(); - return; - } - final RawSocketRecvByteAllocatorHandle allocHandle = recvBufAllocHandle(); - allocHandle.edgeTriggered(isFlagSet(Native.RawSocketET)); - - final ChannelPipeline pipeline = pipeline(); - final ByteBufAllocator allocator = config.getAllocator(); - allocHandle.reset(config); - RawSocketInBefore(); - - ByteBuf byteBuf = null; - boolean close = false; - try { - do { - if (spliceQueue != null) { - SpliceInTask spliceTask = spliceQueue.peek(); - if (spliceTask != null) { - if (spliceTask.spliceIn(allocHandle)) { - // We need to check if it is still active as if not we removed all SpliceTasks in - // doClose(...) - if (isActive()) { - spliceQueue.remove(); - } - continue; - } else { - break; - } - } - } - - // we use a direct buffer here as the native implementations only be able - // to handle direct buffers. - byteBuf = allocHandle.allocate(allocator); - allocHandle.lastBytesRead(doReadBytes(byteBuf)); - if (allocHandle.lastBytesRead() <= 0) { - // nothing was read, release the buffer. - byteBuf.release(); - byteBuf = null; - close = allocHandle.lastBytesRead() < 0; - if (close) { - // There is nothing left to read as we received an EOF. - readPending = false; - } - break; - } - allocHandle.incMessagesRead(1); - readPending = false; - pipeline.fireChannelRead(byteBuf); - byteBuf = null; - - if (shouldBreakRawSocketInReady(config)) { - // We need to do this for two reasons: - // - // - If the input was shutdown in between (which may be the case when the user did it in the - // fireChannelRead(...) method we should not try to read again to not produce any - // miss-leading exceptions. - // - // - If the user closes the channel we need to ensure we not try to read from it again as - // the filedescriptor may be re-used already by the OS if the system is handling a lot of - // concurrent connections and so needs a lot of filedescriptors. If not do this we risk - // reading data from a filedescriptor that belongs to another socket then the socket that - // was "wrapped" by this Channel implementation. - break; - } - } while (allocHandle.continueReading()); - - allocHandle.readComplete(); - pipeline.fireChannelReadComplete(); - - if (close) { - shutdownInput(false); - } - } catch (Throwable t) { - handleReadException(pipeline, byteBuf, t, close, allocHandle); - } finally { - RawSocketInFinally(config); - } - } - } - - private void addToSpliceQueue(final SpliceInTask task) { - EventLoop eventLoop = eventLoop(); - if (eventLoop.inEventLoop()) { - addToSpliceQueue0(task); - } else { - eventLoop.execute(new Runnable() { - @Override - public void run() { - addToSpliceQueue0(task); - } - }); - } - } - - private void addToSpliceQueue0(SpliceInTask task) { - if (spliceQueue == null) { - spliceQueue = PlatformDependent.newMpscQueue(); - } - spliceQueue.add(task); - } - - protected abstract class SpliceInTask { - final ChannelPromise promise; - int len; - - protected SpliceInTask(int len, ChannelPromise promise) { - this.promise = promise; - this.len = len; - } - - abstract boolean spliceIn(RecvByteBufAllocator.Handle handle); - - protected final int spliceIn(FileDescriptor pipeOut, RecvByteBufAllocator.Handle handle) throws IOException { - // calculate the maximum amount of data we are allowed to splice - int length = Math.min(handle.guess(), len); - int splicedIn = 0; - for (;;) { - // Splicing until there is nothing left to splice. - int localSplicedIn = Native.splice(socket.intValue(), -1, pipeOut.intValue(), -1, length); - if (localSplicedIn == 0) { - break; - } - splicedIn += localSplicedIn; - length -= localSplicedIn; - } - - return splicedIn; - } - } - - // Let it directly implement channelFutureListener as well to reduce object creation. - private final class SpliceInChannelTask extends SpliceInTask implements ChannelFutureListener { - private final AbstractRawSocketStreamChannel ch; - - SpliceInChannelTask(AbstractRawSocketStreamChannel ch, int len, ChannelPromise promise) { - super(len, promise); - this.ch = ch; - } - - @Override - public void operationComplete(ChannelFuture future) throws Exception { - if (!future.isSuccess()) { - promise.setFailure(future.cause()); - } - } - - @Override - public boolean spliceIn(RecvByteBufAllocator.Handle handle) { - assert ch.eventLoop().inEventLoop(); - if (len == 0) { - promise.setSuccess(); - return true; - } - try { - // We create the pipe on the target channel as this will allow us to just handle pending writes - // later in a correct fashion without get into any ordering issues when spliceTo(...) is called - // on multiple Channels pointing to one target Channel. - FileDescriptor pipeOut = ch.pipeOut; - if (pipeOut == null) { - // Create a new pipe as non was created before. - FileDescriptor[] pipe = pipe(); - ch.pipeIn = pipe[0]; - pipeOut = ch.pipeOut = pipe[1]; - } - - int splicedIn = spliceIn(pipeOut, handle); - if (splicedIn > 0) { - // Integer.MAX_VALUE is a special value which will result in splice forever. - if (len != Integer.MAX_VALUE) { - len -= splicedIn; - } - - // Depending on if we are done with splicing inbound data we set the right promise for the - // outbound splicing. - final ChannelPromise splicePromise; - if (len == 0) { - splicePromise = promise; - } else { - splicePromise = ch.newPromise().addListener(this); - } - - boolean autoRead = config().isAutoRead(); - - // Just call unsafe().write(...) and flush() as we not want to traverse the whole pipeline for this - // case. - ch.unsafe().write(new SpliceOutTask(ch, splicedIn, autoRead), splicePromise); - ch.unsafe().flush(); - if (autoRead && !splicePromise.isDone()) { - // Write was not done which means the target channel was not writable. In this case we need to - // disable reading until we are done with splicing to the target channel because: - // - // - The user may want to to trigger another splice operation once the splicing was complete. - config().setAutoRead(false); - } - } - - return len == 0; - } catch (Throwable cause) { - promise.setFailure(cause); - return true; - } - } - } - - private final class SpliceOutTask { - private final AbstractRawSocketStreamChannel ch; - private final boolean autoRead; - private int len; - - SpliceOutTask(AbstractRawSocketStreamChannel ch, int len, boolean autoRead) { - this.ch = ch; - this.len = len; - this.autoRead = autoRead; - } - - public boolean spliceOut() throws Exception { - assert ch.eventLoop().inEventLoop(); - try { - int splicedOut = Native.splice(ch.pipeIn.intValue(), -1, ch.socket.intValue(), -1, len); - len -= splicedOut; - if (len == 0) { - if (autoRead) { - // AutoRead was used and we spliced everything so start reading again - config().setAutoRead(true); - } - return true; - } - return false; - } catch (IOException e) { - if (autoRead) { - // AutoRead was used and we spliced everything so start reading again - config().setAutoRead(true); - } - throw e; - } - } - } - - private final class SpliceFdTask extends SpliceInTask { - private final FileDescriptor fd; - private final ChannelPromise promise; - private final int offset; - - SpliceFdTask(FileDescriptor fd, int offset, int len, ChannelPromise promise) { - super(len, promise); - this.fd = fd; - this.promise = promise; - this.offset = offset; - } - - @Override - public boolean spliceIn(RecvByteBufAllocator.Handle handle) { - assert eventLoop().inEventLoop(); - if (len == 0) { - promise.setSuccess(); - return true; - } - - try { - FileDescriptor[] pipe = pipe(); - FileDescriptor pipeIn = pipe[0]; - FileDescriptor pipeOut = pipe[1]; - try { - int splicedIn = spliceIn(pipeOut, handle); - if (splicedIn > 0) { - // Integer.MAX_VALUE is a special value which will result in splice forever. - if (len != Integer.MAX_VALUE) { - len -= splicedIn; - } - do { - int splicedOut = Native.splice(pipeIn.intValue(), -1, fd.intValue(), offset, splicedIn); - splicedIn -= splicedOut; - } while (splicedIn > 0); - if (len == 0) { - promise.setSuccess(); - return true; - } - } - return false; - } finally { - safeClosePipe(pipeIn); - safeClosePipe(pipeOut); - } - } catch (Throwable cause) { - promise.setFailure(cause); - return true; - } - } - } - - private final class RawSocketSocketWritableByteChannel extends SocketWritableByteChannel { - RawSocketSocketWritableByteChannel() { - super(socket); - } - - @Override - protected ByteBufAllocator alloc() { - return AbstractRawSocketStreamChannel.this.alloc(); - } - } -} diff --git a/plc4j/utils/raw-sockets/src/main/java-sav/org/apache/plc4x/java/utils/rawsockets/netty/RawSocketChannel.java b/plc4j/utils/raw-sockets/src/main/java-sav/org/apache/plc4x/java/utils/rawsockets/netty/RawSocketChannel.java deleted file mode 100644 index edf9ead..0000000 --- a/plc4j/utils/raw-sockets/src/main/java-sav/org/apache/plc4x/java/utils/rawsockets/netty/RawSocketChannel.java +++ /dev/null @@ -1,133 +0,0 @@ -/* - * Copyright 2015 The Netty Project - * - * The Netty Project licenses this file to you under the Apache License, - * version 2.0 (the "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations - * under the License. - */ -package org.apache.plc4x.java.utils.rawsockets.netty; - -import io.netty.channel.Channel; -import io.netty.channel.ChannelException; -import io.netty.channel.socket.ServerSocketChannel; -import io.netty.channel.socket.SocketChannel; -import io.netty.util.concurrent.GlobalEventExecutor; - -import java.io.IOException; -import java.net.InetAddress; -import java.net.InetSocketAddress; -import java.util.Collection; -import java.util.Collections; -import java.util.Map; -import java.util.concurrent.Executor; - -public class RawSocketChannel extends AbstractRawSocketStreamChannel implements SocketChannel { - - private final RawSocketChannelConfig config; - - private volatile Collection<InetAddress> tcpMd5SigAddresses = Collections.emptyList(); - - public RawSocketChannel() { - super(newSocketStream(), false); - config = new RawSocketChannelConfig(this); - } - - public RawSocketChannel(int fd) { - super(fd); - config = new RawSocketChannelConfig(this); - } - - RawSocketChannel(LinuxSocket fd, boolean active) { - super(fd, active); - config = new RawSocketChannelConfig(this); - } - - RawSocketChannel(Channel parent, LinuxSocket fd, InetSocketAddress remoteAddress) { - super(parent, fd, remoteAddress); - config = new RawSocketChannelConfig(this); - - if (parent instanceof RawSocketChannel) { - tcpMd5SigAddresses = ((RawSocketChannel) parent).tcpMd5SigAddresses(); - } - } - - /** - * Returns the {@code TCP_INFO} for the current socket. See <a href="http://linux.die.net/man/7/tcp">man 7 tcp</a>. - */ - public RawSocketTcpInfo tcpInfo() { - return tcpInfo(new RawSocketTcpInfo()); - } - - /** - * Updates and returns the {@code TCP_INFO} for the current socket. - * See <a href="http://linux.die.net/man/7/tcp">man 7 tcp</a>. - */ - public RawSocketTcpInfo tcpInfo(RawSocketTcpInfo info) { - try { - socket.getTcpInfo(info); - return info; - } catch (IOException e) { - throw new ChannelException(e); - } - } - - @Override - public InetSocketAddress remoteAddress() { - return (InetSocketAddress) super.remoteAddress(); - } - - @Override - public InetSocketAddress localAddress() { - return (InetSocketAddress) super.localAddress(); - } - - @Override - public RawSocketChannelConfig config() { - return config; - } - - @Override - public ServerSocketChannel parent() { - return (ServerSocketChannel) super.parent(); - } - - @Override - protected AbstractEpollUnsafe newUnsafe() { - return new EpollSocketChannelUnsafe(); - } - - private final class EpollSocketChannelUnsafe extends RawSocketStreamUnsafe { - @Override - protected Executor prepareToClose() { - try { - // Check isOpen() first as otherwise it will throw a RuntimeException - // when call getSoLinger() as the fd is not valid anymore. - if (isOpen() && config().getSoLinger() > 0) { - // We need to cancel this key of the channel so we may not end up in a eventloop spin - // because we try to read or write until the actual close happens which may be later due - // SO_LINGER handling. - // See https://github.com/netty/netty/issues/4449 - ((RawSocketEventLoop) eventLoop()).remove(RawSocketChannel.this); - return GlobalEventExecutor.INSTANCE; - } - } catch (Throwable ignore) { - // Ignore the error as the underlying channel may be closed in the meantime and so - // getSoLinger() may produce an exception. In this case we just return null. - // See https://github.com/netty/netty/issues/4449 - } - return null; - } - } - - void setTcpMd5Sig(Map<InetAddress, byte[]> keys) throws IOException { - tcpMd5SigAddresses = TcpMd5Util.newTcpMd5Sigs(this, tcpMd5SigAddresses, keys); - } -} diff --git a/plc4j/utils/raw-sockets/src/main/java-sav/org/apache/plc4x/java/utils/rawsockets/netty/RawSocketChannelConfig.java b/plc4j/utils/raw-sockets/src/main/java-sav/org/apache/plc4x/java/utils/rawsockets/netty/RawSocketChannelConfig.java deleted file mode 100644 index 8f77972..0000000 --- a/plc4j/utils/raw-sockets/src/main/java-sav/org/apache/plc4x/java/utils/rawsockets/netty/RawSocketChannelConfig.java +++ /dev/null @@ -1,187 +0,0 @@ -/* - * Copyright 2015 The Netty Project - * - * The Netty Project licenses this file to you under the Apache License, - * version 2.0 (the "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations - * under the License. - */ -package org.apache.plc4x.java.utils.rawsockets.netty; - -import io.netty.buffer.ByteBufAllocator; -import io.netty.channel.*; - -import java.io.IOException; -import java.util.Map; - -import static io.netty.channel.unix.Limits.SSIZE_MAX; - -public class RawSocketChannelConfig extends DefaultChannelConfig { - final AbstractRawSocketChannel channel; - private volatile long maxBytesPerGatheringWrite = SSIZE_MAX; - - RawSocketChannelConfig(AbstractRawSocketChannel channel) { - super(channel); - this.channel = channel; - } - - @Override - public Map<ChannelOption<?>, Object> getOptions() { - return getOptions(super.getOptions(), EpollChannelOption.EPOLL_MODE); - } - - @SuppressWarnings("unchecked") - @Override - public <T> T getOption(ChannelOption<T> option) { - if (option == EpollChannelOption.EPOLL_MODE) { - return (T) getEpollMode(); - } - return super.getOption(option); - } - - @Override - public <T> boolean setOption(ChannelOption<T> option, T value) { - validate(option, value); - if (option == EpollChannelOption.EPOLL_MODE) { - setEpollMode((EpollMode) value); - } else { - return super.setOption(option, value); - } - return true; - } - - @Override - public EpollChannelConfig setConnectTimeoutMillis(int connectTimeoutMillis) { - super.setConnectTimeoutMillis(connectTimeoutMillis); - return this; - } - - @Override - @Deprecated - public EpollChannelConfig setMaxMessagesPerRead(int maxMessagesPerRead) { - super.setMaxMessagesPerRead(maxMessagesPerRead); - return this; - } - - @Override - public EpollChannelConfig setWriteSpinCount(int writeSpinCount) { - super.setWriteSpinCount(writeSpinCount); - return this; - } - - @Override - public EpollChannelConfig setAllocator(ByteBufAllocator allocator) { - super.setAllocator(allocator); - return this; - } - - @Override - public EpollChannelConfig setRecvByteBufAllocator(RecvByteBufAllocator allocator) { - if (!(allocator.newHandle() instanceof RecvByteBufAllocator.ExtendedHandle)) { - throw new IllegalArgumentException("allocator.newHandle() must return an object of type: " + - RecvByteBufAllocator.ExtendedHandle.class); - } - super.setRecvByteBufAllocator(allocator); - return this; - } - - @Override - public EpollChannelConfig setAutoRead(boolean autoRead) { - super.setAutoRead(autoRead); - return this; - } - - @Override - @Deprecated - public EpollChannelConfig setWriteBufferHighWaterMark(int writeBufferHighWaterMark) { - super.setWriteBufferHighWaterMark(writeBufferHighWaterMark); - return this; - } - - @Override - @Deprecated - public EpollChannelConfig setWriteBufferLowWaterMark(int writeBufferLowWaterMark) { - super.setWriteBufferLowWaterMark(writeBufferLowWaterMark); - return this; - } - - @Override - public EpollChannelConfig setWriteBufferWaterMark(WriteBufferWaterMark writeBufferWaterMark) { - super.setWriteBufferWaterMark(writeBufferWaterMark); - return this; - } - - @Override - public EpollChannelConfig setMessageSizeEstimator(MessageSizeEstimator estimator) { - super.setMessageSizeEstimator(estimator); - return this; - } - - /** - * Return the {@link EpollMode} used. Default is - * {@link EpollMode#EDGE_TRIGGERED}. If you want to use {@link #isAutoRead()} {@code false} or - * {@link #getMaxMessagesPerRead()} and have an accurate behaviour you should use - * {@link EpollMode#LEVEL_TRIGGERED}. - */ - public EpollMode getEpollMode() { - return channel.isFlagSet(Native.EPOLLET) - ? EpollMode.EDGE_TRIGGERED : EpollMode.LEVEL_TRIGGERED; - } - - /** - * Set the {@link EpollMode} used. Default is - * {@link EpollMode#EDGE_TRIGGERED}. If you want to use {@link #isAutoRead()} {@code false} or - * {@link #getMaxMessagesPerRead()} and have an accurate behaviour you should use - * {@link EpollMode#LEVEL_TRIGGERED}. - * - * <strong>Be aware this config setting can only be adjusted before the channel was registered.</strong> - */ - public EpollChannelConfig setEpollMode(EpollMode mode) { - if (mode == null) { - throw new NullPointerException("mode"); - } - try { - switch (mode) { - case EDGE_TRIGGERED: - checkChannelNotRegistered(); - channel.setFlag(Native.EPOLLET); - break; - case LEVEL_TRIGGERED: - checkChannelNotRegistered(); - channel.clearFlag(Native.EPOLLET); - break; - default: - throw new Error(); - } - } catch (IOException e) { - throw new ChannelException(e); - } - return this; - } - - private void checkChannelNotRegistered() { - if (channel.isRegistered()) { - throw new IllegalStateException("EpollMode can only be changed before channel is registered"); - } - } - - @Override - protected final void autoReadCleared() { - channel.clearEpollIn(); - } - - final void setMaxBytesPerGatheringWrite(long maxBytesPerGatheringWrite) { - this.maxBytesPerGatheringWrite = maxBytesPerGatheringWrite; - } - - final long getMaxBytesPerGatheringWrite() { - return maxBytesPerGatheringWrite; - } -} diff --git a/plc4j/utils/raw-sockets/src/main/java-sav/org/apache/plc4x/java/utils/rawsockets/netty/RawSocketChannelSav.java b/plc4j/utils/raw-sockets/src/main/java-sav/org/apache/plc4x/java/utils/rawsockets/netty/RawSocketChannelSav.java deleted file mode 100644 index cba1ad8..0000000 --- a/plc4j/utils/raw-sockets/src/main/java-sav/org/apache/plc4x/java/utils/rawsockets/netty/RawSocketChannelSav.java +++ /dev/null @@ -1,186 +0,0 @@ -/* -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. -*/ -package org.apache.plc4x.java.utils.rawsockets.netty; - -import com.savarese.rocksaw.net.RawSocket; -import io.netty.buffer.ByteBuf; -import io.netty.channel.*; -import io.netty.channel.nio.AbstractNioByteChannel; -import io.netty.channel.socket.nio.NioSocketChannel; -import io.netty.util.internal.logging.InternalLogger; -import io.netty.util.internal.logging.InternalLoggerFactory; -import org.apache.plc4x.java.api.exceptions.PlcIoException; - -import java.io.IOException; -import java.net.InetSocketAddress; -import java.net.SocketAddress; -import java.nio.channels.SelectableChannel; - -/** - * Netty channel implementation that uses RockSaw to create a raw socket connection to implement - * IP-socket based protocols not based on TCP or UDP. - * - * NOTE: This class is currently a WIP (Work in progress) it should only be used with great care. - */ -public class RawSocketChannelSav extends AbstractNioByteChannel { - - private static final InternalLogger logger = InternalLoggerFactory.getInstance(NioSocketChannel.class); - - // The protocol number is defined in the IP protocol and indicates the type of protocol the payload - // the IP packet uses. This number is assigned by the IESG. A full list of the registered protocol - // numbers can be found here: https://www.iana.org/assignments/protocol-numbers/protocol-numbers.xhtml - private int protocolNumber; - - private RawSocket socket; - private InetSocketAddress localAddress; - private InetSocketAddress remoteAddress; - - /** - * Initializes a raw socket that is able to communicate with raw IPv4 and IPv6 sockets, hereby - * allowing to implement protocols below TCP and UDP. - * - * For a list of public known protocol numbers see: - * https://www.iana.org/assignments/protocol-numbers/protocol-numbers.xhtml - * - * @param parent - * @param ch - * @param protocolNumber protocol number identifying the protocol. - * @throws PlcIoException - */ - public RawSocketChannelSav(Channel parent, SelectableChannel ch, int protocolNumber) throws PlcIoException { - super(parent, ch); - - this.protocolNumber = protocolNumber; - - try { - socket = new RawSocket(); - socket.setIPHeaderInclude(true); - } catch (IOException e) { - throw new PlcIoException("Error setting up raw socket", e); - } - } - - /** - * Opens a connection to the given remote address. - * - * @param remoteAddress - * @param localAddress - * @return - * @throws Exception - */ - @Override - protected boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception { - if(!(remoteAddress instanceof InetSocketAddress) || !(localAddress instanceof InetSocketAddress)) { - throw new PlcIoException("Both remoteAddress and localAddress must be of type InetSocketAddress"); - } - - try { - this.localAddress = (InetSocketAddress) localAddress; - this.remoteAddress = (InetSocketAddress) remoteAddress; - - socket.open(RawSocket.PF_INET, protocolNumber); - - return socket.isOpen(); - } catch (IllegalStateException | IOException e) { - return false; - } - } - - @Override - protected void doFinishConnect() throws Exception { - - } - - /** - * Opens a listening socket. - * - * @param localAddress - * @throws Exception - */ - @Override - protected void doBind(SocketAddress localAddress) throws Exception { - if(socket.isOpen()) { - throw new PlcIoException("Raw socket already opened."); - } - if(localAddress instanceof InetSocketAddress) { - this.localAddress = (InetSocketAddress) localAddress; - socket.bind(this.localAddress.getAddress()); - } else { - throw new PlcIoException("Unsupported type of local address. Only InetSocketAddress supported."); - } - } - - /** - * Closes the connection. - * - * @throws Exception - */ - @Override - protected void doDisconnect() throws Exception { - if(socket.isOpen()) { - socket.close(); - } - } - - @Override - protected ChannelFuture shutdownInput() { - return null; - } - - @Override - protected int doReadBytes(ByteBuf buf) throws Exception { - byte[] byteBuf = new byte[1024]; - int readBytes = socket.read(byteBuf); - buf.writeBytes(byteBuf, 0, readBytes); - return readBytes; - } - - @Override - protected int doWriteBytes(ByteBuf buf) throws Exception { - byte[] readableBytes = new byte[buf.readableBytes()]; - buf.readBytes(readableBytes); - socket.write(remoteAddress.getAddress(), readableBytes); - return readableBytes.length; - } - - @Override - protected long doWriteFileRegion(FileRegion region) throws Exception { - throw new UnsupportedOperationException("doWriteFileRegion not implemented"); - } - - @Override - protected SocketAddress localAddress0() { - return localAddress; - } - - @Override - protected SocketAddress remoteAddress0() { - return remoteAddress; - } - - @Override - public ChannelConfig config() { - return null; - } - - @Override - public boolean isActive() { - return socket.isOpen(); - } -} diff --git a/plc4j/utils/raw-sockets/src/main/java-sav/org/apache/plc4x/java/utils/rawsockets/netty/RawSocketEventArray.java b/plc4j/utils/raw-sockets/src/main/java-sav/org/apache/plc4x/java/utils/rawsockets/netty/RawSocketEventArray.java deleted file mode 100644 index 03e024d..0000000 --- a/plc4j/utils/raw-sockets/src/main/java-sav/org/apache/plc4x/java/utils/rawsockets/netty/RawSocketEventArray.java +++ /dev/null @@ -1,104 +0,0 @@ -/* - * Copyright 2015 The Netty Project - * - * The Netty Project licenses this file to you under the Apache License, - * version 2.0 (the "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations - * under the License. - */ -package org.apache.plc4x.java.utils.rawsockets.netty; - -import io.netty.util.internal.PlatformDependent; - -/** - * This is an internal datastructure which can be directly passed to epoll_wait to reduce the overhead. - * - * typedef union epoll_data { - * void *ptr; - * int fd; - * uint32_t u32; - * uint64_t u64; - * } epoll_data_t; - * - * struct epoll_event { - * uint32_t events; // Epoll events - * epoll_data_t data; // User data variable - * }; - * - * We use {@code fd} if the {@code epoll_data union} to store the actual file descriptor of an - * {@link AbstractRawSocketChannel} and so be able to map it later. - */ -final class RawSocketEventArray { - // Size of the epoll_event struct - private static final int EPOLL_EVENT_SIZE = Native.sizeofEpollEvent(); - // The offsiet of the data union in the epoll_event struct - private static final int EPOLL_DATA_OFFSET = Native.offsetofEpollData(); - - private long memoryAddress; - private int length; - - RawSocketEventArray(int length) { - if (length < 1) { - throw new IllegalArgumentException("length must be >= 1 but was " + length); - } - this.length = length; - memoryAddress = allocate(length); - } - - private static long allocate(int length) { - return PlatformDependent.allocateMemory(length * EPOLL_EVENT_SIZE); - } - - /** - * Return the {@code memoryAddress} which points to the start of this {@link RawSocketEventArray}. - */ - long memoryAddress() { - return memoryAddress; - } - - /** - * Return the length of the {@link RawSocketEventArray} which represent the maximum number of {@code epoll_events} - * that can be stored in it. - */ - int length() { - return length; - } - - /** - * Increase the storage of this {@link RawSocketEventArray}. - */ - void increase() { - // double the size - length <<= 1; - free(); - memoryAddress = allocate(length); - } - - /** - * Free this {@link RawSocketEventArray}. Any usage after calling this method may segfault the JVM! - */ - void free() { - PlatformDependent.freeMemory(memoryAddress); - } - - /** - * Return the events for the {@code epoll_event} on this index. - */ - int events(int index) { - return PlatformDependent.getInt(memoryAddress + index * EPOLL_EVENT_SIZE); - } - - /** - * Return the file descriptor for the {@code epoll_event} on this index. - */ - int fd(int index) { - return PlatformDependent.getInt(memoryAddress + index * EPOLL_EVENT_SIZE + EPOLL_DATA_OFFSET); - } -} diff --git a/plc4j/utils/raw-sockets/src/main/java-sav/org/apache/plc4x/java/utils/rawsockets/netty/RawSocketEventLoop.java b/plc4j/utils/raw-sockets/src/main/java-sav/org/apache/plc4x/java/utils/rawsockets/netty/RawSocketEventLoop.java deleted file mode 100644 index ac13e50..0000000 --- a/plc4j/utils/raw-sockets/src/main/java-sav/org/apache/plc4x/java/utils/rawsockets/netty/RawSocketEventLoop.java +++ /dev/null @@ -1,449 +0,0 @@ -/* - * Copyright 2014 The Netty Project - * - * The Netty Project licenses this file to you under the Apache License, - * version 2.0 (the "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations - * under the License. - */ -package org.apache.plc4x.java.utils.rawsockets.netty; - -import io.netty.channel.EventLoop; -import io.netty.channel.EventLoopGroup; -import io.netty.channel.SelectStrategy; -import io.netty.channel.SingleThreadEventLoop; -import io.netty.util.IntSupplier; -import io.netty.util.collection.IntObjectHashMap; -import io.netty.util.collection.IntObjectMap; -import io.netty.util.concurrent.RejectedExecutionHandler; -import io.netty.util.internal.ObjectUtil; -import io.netty.util.internal.PlatformDependent; -import io.netty.util.internal.logging.InternalLogger; -import io.netty.util.internal.logging.InternalLoggerFactory; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Queue; -import java.util.concurrent.Callable; -import java.util.concurrent.Executor; -import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; - -import static java.lang.Math.min; - -/** - * {@link EventLoop} which uses epoll under the covers. Only works on Linux! - */ -final class RawSocketEventLoop extends SingleThreadEventLoop { - private static final InternalLogger logger = InternalLoggerFactory.getInstance(RawSocketEventLoop.class); - private static final AtomicIntegerFieldUpdater<RawSocketEventLoop> WAKEN_UP_UPDATER = - AtomicIntegerFieldUpdater.newUpdater(RawSocketEventLoop.class, "wakenUp"); - - static { - // Ensure JNI is initialized by the time this class is loaded by this time! - // We use unix-common methods in this class which are backed by JNI methods. - Epoll.ensureAvailability(); - } - - private final FileDescriptor epollFd; - private final FileDescriptor eventFd; - private final FileDescriptor timerFd; - private final IntObjectMap<AbstractRawSocketChannel> channels = new IntObjectHashMap<AbstractRawSocketChannel>(4096); - private final boolean allowGrowing; - private final RawSocketEventArray events; - private final IovArray iovArray = new IovArray(); - private final SelectStrategy selectStrategy; - private final IntSupplier selectNowSupplier = new IntSupplier() { - @Override - public int get() throws Exception { - return epollWaitNow(); - } - }; - private final Callable<Integer> pendingTasksCallable = new Callable<Integer>() { - @Override - public Integer call() throws Exception { - return RawSocketEventLoop.super.pendingTasks(); - } - }; - private volatile int wakenUp; - private volatile int ioRatio = 50; - - RawSocketEventLoop(EventLoopGroup parent, Executor executor, int maxEvents, - SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler) { - super(parent, executor, false, DEFAULT_MAX_PENDING_TASKS, rejectedExecutionHandler); - selectStrategy = ObjectUtil.checkNotNull(strategy, "strategy"); - if (maxEvents == 0) { - allowGrowing = true; - events = new RawSocketEventArray(4096); - } else { - allowGrowing = false; - events = new RawSocketEventArray(maxEvents); - } - boolean success = false; - FileDescriptor epollFd = null; - FileDescriptor eventFd = null; - FileDescriptor timerFd = null; - try { - this.epollFd = epollFd = Native.newEpollCreate(); - this.eventFd = eventFd = Native.newEventFd(); - try { - Native.epollCtlAdd(epollFd.intValue(), eventFd.intValue(), Native.EPOLLIN); - } catch (IOException e) { - throw new IllegalStateException("Unable to add eventFd filedescriptor to epoll", e); - } - this.timerFd = timerFd = Native.newTimerFd(); - try { - Native.epollCtlAdd(epollFd.intValue(), timerFd.intValue(), Native.EPOLLIN | Native.EPOLLET); - } catch (IOException e) { - throw new IllegalStateException("Unable to add timerFd filedescriptor to epoll", e); - } - success = true; - } finally { - if (!success) { - if (epollFd != null) { - try { - epollFd.close(); - } catch (Exception e) { - // ignore - } - } - if (eventFd != null) { - try { - eventFd.close(); - } catch (Exception e) { - // ignore - } - } - if (timerFd != null) { - try { - timerFd.close(); - } catch (Exception e) { - // ignore - } - } - } - } - } - - /** - * Return a cleared {@link IovArray} that can be used for writes in this {@link EventLoop}. - */ - IovArray cleanArray() { - iovArray.clear(); - return iovArray; - } - - @Override - protected void wakeup(boolean inEventLoop) { - if (!inEventLoop && WAKEN_UP_UPDATER.compareAndSet(this, 0, 1)) { - // write to the evfd which will then wake-up epoll_wait(...) - Native.eventFdWrite(eventFd.intValue(), 1L); - } - } - - /** - * Register the given epoll with this {@link EventLoop}. - */ - void add(AbstractRawSocketChannel ch) throws IOException { - assert inEventLoop(); - int fd = ch.socket.intValue(); - Native.epollCtlAdd(epollFd.intValue(), fd, ch.flags); - channels.put(fd, ch); - } - - /** - * The flags of the given epoll was modified so update the registration - */ - void modify(AbstractRawSocketChannel ch) throws IOException { - assert inEventLoop(); - Native.epollCtlMod(epollFd.intValue(), ch.socket.intValue(), ch.flags); - } - - /** - * Deregister the given epoll from this {@link EventLoop}. - */ - void remove(AbstractRawSocketChannel ch) throws IOException { - assert inEventLoop(); - - if (ch.isOpen()) { - int fd = ch.socket.intValue(); - if (channels.remove(fd) != null) { - // Remove the epoll. This is only needed if it's still open as otherwise it will be automatically - // removed once the file-descriptor is closed. - Native.epollCtlDel(epollFd.intValue(), ch.fd().intValue()); - } - } - } - - @Override - protected Queue<Runnable> newTaskQueue(int maxPendingTasks) { - // This event loop never calls takeTask() - return maxPendingTasks == Integer.MAX_VALUE ? PlatformDependent.<Runnable>newMpscQueue() - : PlatformDependent.<Runnable>newMpscQueue(maxPendingTasks); - } - - @Override - public int pendingTasks() { - // As we use a MpscQueue we need to ensure pendingTasks() is only executed from within the EventLoop as - // otherwise we may see unexpected behavior (as size() is only allowed to be called by a single consumer). - // See https://github.com/netty/netty/issues/5297 - if (inEventLoop()) { - return super.pendingTasks(); - } else { - return submit(pendingTasksCallable).syncUninterruptibly().getNow(); - } - } - /** - * Returns the percentage of the desired amount of time spent for I/O in the event loop. - */ - public int getIoRatio() { - return ioRatio; - } - - /** - * Sets the percentage of the desired amount of time spent for I/O in the event loop. The default value is - * {@code 50}, which means the event loop will try to spend the same amount of time for I/O as for non-I/O tasks. - */ - public void setIoRatio(int ioRatio) { - if (ioRatio <= 0 || ioRatio > 100) { - throw new IllegalArgumentException("ioRatio: " + ioRatio + " (expected: 0 < ioRatio <= 100)"); - } - this.ioRatio = ioRatio; - } - - private int epollWait(boolean oldWakeup) throws IOException { - // If a task was submitted when wakenUp value was 1, the task didn't get a chance to produce wakeup event. - // So we need to check task queue again before calling epoll_wait. If we don't, the task might be pended - // until epoll_wait was timed out. It might be pended until idle timeout if IdleStateHandler existed - // in pipeline. - if (oldWakeup && hasTasks()) { - return epollWaitNow(); - } - - long totalDelay = delayNanos(System.nanoTime()); - int delaySeconds = (int) min(totalDelay / 1000000000L, Integer.MAX_VALUE); - return Native.epollWait(epollFd, events, timerFd, delaySeconds, - (int) min(totalDelay - delaySeconds * 1000000000L, Integer.MAX_VALUE)); - } - - private int epollWaitNow() throws IOException { - return Native.epollWait(epollFd, events, timerFd, 0, 0); - } - - @Override - protected void run() { - for (;;) { - try { - int strategy = selectStrategy.calculateStrategy(selectNowSupplier, hasTasks()); - switch (strategy) { - case SelectStrategy.CONTINUE: - continue; - case SelectStrategy.SELECT: - strategy = epollWait(WAKEN_UP_UPDATER.getAndSet(this, 0) == 1); - - // 'wakenUp.compareAndSet(false, true)' is always evaluated - // before calling 'selector.wakeup()' to reduce the wake-up - // overhead. (Selector.wakeup() is an expensive operation.) - // - // However, there is a race condition in this approach. - // The race condition is triggered when 'wakenUp' is set to - // true too early. - // - // 'wakenUp' is set to true too early if: - // 1) Selector is waken up between 'wakenUp.set(false)' and - // 'selector.select(...)'. (BAD) - // 2) Selector is waken up between 'selector.select(...)' and - // 'if (wakenUp.get()) { ... }'. (OK) - // - // In the first case, 'wakenUp' is set to true and the - // following 'selector.select(...)' will wake up immediately. - // Until 'wakenUp' is set to false again in the next round, - // 'wakenUp.compareAndSet(false, true)' will fail, and therefore - // any attempt to wake up the Selector will fail, too, causing - // the following 'selector.select(...)' call to block - // unnecessarily. - // - // To fix this problem, we wake up the selector again if wakenUp - // is true immediately after selector.select(...). - // It is inefficient in that it wakes up the selector for both - // the first case (BAD - wake-up required) and the second case - // (OK - no wake-up required). - - if (wakenUp == 1) { - Native.eventFdWrite(eventFd.intValue(), 1L); - } - // fallthrough - default: - } - - final int ioRatio = this.ioRatio; - if (ioRatio == 100) { - try { - if (strategy > 0) { - processReady(events, strategy); - } - } finally { - // Ensure we always run tasks. - runAllTasks(); - } - } else { - final long ioStartTime = System.nanoTime(); - - try { - if (strategy > 0) { - processReady(events, strategy); - } - } finally { - // Ensure we always run tasks. - final long ioTime = System.nanoTime() - ioStartTime; - runAllTasks(ioTime * (100 - ioRatio) / ioRatio); - } - } - if (allowGrowing && strategy == events.length()) { - //increase the size of the array as we needed the whole space for the events - events.increase(); - } - } catch (Throwable t) { - handleLoopException(t); - } - // Always handle shutdown even if the loop processing threw an exception. - try { - if (isShuttingDown()) { - closeAll(); - if (confirmShutdown()) { - break; - } - } - } catch (Throwable t) { - handleLoopException(t); - } - } - } - - private static void handleLoopException(Throwable t) { - logger.warn("Unexpected exception in the selector loop.", t); - - // Prevent possible consecutive immediate failures that lead to - // excessive CPU consumption. - try { - Thread.sleep(1000); - } catch (InterruptedException e) { - // Ignore. - } - } - - private void closeAll() { - try { - epollWaitNow(); - } catch (IOException ignore) { - // ignore on close - } - // Using the intermediate collection to prevent ConcurrentModificationException. - // In the `close()` method, the channel is deleted from `channels` map. - Collection<AbstractRawSocketChannel> array = new ArrayList<AbstractRawSocketChannel>(channels.size()); - - for (AbstractRawSocketChannel channel: channels.values()) { - array.add(channel); - } - - for (AbstractRawSocketChannel ch: array) { - ch.unsafe().close(ch.unsafe().voidPromise()); - } - } - - private void processReady(RawSocketEventArray events, int ready) { - for (int i = 0; i < ready; i ++) { - final int fd = events.fd(i); - if (fd == eventFd.intValue()) { - // consume wakeup event. - Native.eventFdRead(fd); - } else if (fd == timerFd.intValue()) { - // consume wakeup event, necessary because the timer is added with ET mode. - Native.timerFdRead(fd); - } else { - final long ev = events.events(i); - - AbstractRawSocketChannel ch = channels.get(fd); - if (ch != null) { - // Don't change the ordering of processing EPOLLOUT | EPOLLRDHUP / EPOLLIN if you're not 100% - // sure about it! - // Re-ordering can easily introduce bugs and bad side-effects, as we found out painfully in the - // past. - AbstractRawSocketUnsafe unsafe = (AbstractRawSocketUnsafe) ch.unsafe(); - - // First check for EPOLLOUT as we may need to fail the connect ChannelPromise before try - // to read from the file descriptor. - // See https://github.com/netty/netty/issues/3785 - // - // It is possible for an EPOLLOUT or EPOLLERR to be generated when a connection is refused. - // In either case epollOutReady() will do the correct thing (finish connecting, or fail - // the connection). - // See https://github.com/netty/netty/issues/3848 - if ((ev & (Native.EPOLLERR | Native.EPOLLOUT)) != 0) { - // Force flush of data as the epoll is writable again - unsafe.epollOutReady(); - } - - // Check EPOLLIN before EPOLLRDHUP to ensure all data is read before shutting down the input. - // See https://github.com/netty/netty/issues/4317. - // - // If EPOLLIN or EPOLLERR was received and the channel is still open call epollInReady(). This will - // try to read from the underlying file descriptor and so notify the user about the error. - if ((ev & (Native.EPOLLERR | Native.EPOLLIN)) != 0) { - // The Channel is still open and there is something to read. Do it now. - unsafe.epollInReady(); - } - - // Check if EPOLLRDHUP was set, this will notify us for connection-reset in which case - // we may close the channel directly or try to read more data depending on the state of the - // Channel and als depending on the AbstractEpollChannel subtype. - if ((ev & Native.EPOLLRDHUP) != 0) { - unsafe.epollRdHupReady(); - } - } else { - // We received an event for an fd which we not use anymore. Remove it from the epoll_event set. - try { - Native.epollCtlDel(epollFd.intValue(), fd); - } catch (IOException ignore) { - // This can happen but is nothing we need to worry about as we only try to delete - // the fd from the epoll set as we not found it in our mappings. So this call to - // epollCtlDel(...) is just to ensure we cleanup stuff and so may fail if it was - // deleted before or the file descriptor was closed before. - } - } - } - } - } - - @Override - protected void cleanup() { - try { - try { - epollFd.close(); - } catch (IOException e) { - logger.warn("Failed to close the epoll fd.", e); - } - try { - eventFd.close(); - } catch (IOException e) { - logger.warn("Failed to close the event fd.", e); - } - try { - timerFd.close(); - } catch (IOException e) { - logger.warn("Failed to close the timer fd.", e); - } - } finally { - // release native memory - iovArray.release(); - events.free(); - } - } -} diff --git a/plc4j/utils/raw-sockets/src/remote-resources/META-INF/NOTICE b/plc4j/utils/raw-sockets/src/remote-resources/META-INF/NOTICE new file mode 100644 index 0000000..ae40e6e --- /dev/null +++ b/plc4j/utils/raw-sockets/src/remote-resources/META-INF/NOTICE @@ -0,0 +1,7 @@ +=============================================================== +The Files: +src/main/java/org/apache/plc4x/java/utils/rawsockets/RawSocketListener.java +src/main/java/org/apache/plc4x/java/utils/rawsockets/RawIpSocket.java +src/main/java/org/apache/plc4x/java/utils/rawsockets/RawSocketException.java +src/main/java/org/apache/plc4x/java/utils/rawsockets/RawSocketListener.java +Are copyrighted by the The Netty Project which is distributed under the Apache 2.0 license. \ No newline at end of file diff --git a/pom.xml b/pom.xml index 77c80a3..85fcfc2 100644 --- a/pom.xml +++ b/pom.xml @@ -176,6 +176,11 @@ </dependency> <dependency> <groupId>io.netty</groupId> + <artifactId>netty-codec-http</artifactId> + <version>${netty.version}</version> + </dependency> + <dependency> + <groupId>io.netty</groupId> <artifactId>netty-common</artifactId> <version>${netty.version}</version> </dependency>