[SSHD-817] Netty nio provider
Project: http://git-wip-us.apache.org/repos/asf/mina-sshd/repo Commit: http://git-wip-us.apache.org/repos/asf/mina-sshd/commit/598c991f Tree: http://git-wip-us.apache.org/repos/asf/mina-sshd/tree/598c991f Diff: http://git-wip-us.apache.org/repos/asf/mina-sshd/diff/598c991f Branch: refs/heads/master Commit: 598c991fe4cc609c43f972fe025775fcf734933b Parents: 5c1c8a9 Author: Guillaume Nodet <gno...@apache.org> Authored: Tue Apr 17 22:16:55 2018 +0200 Committer: Guillaume Nodet <gno...@apache.org> Committed: Thu Apr 19 08:43:30 2018 +0200 ---------------------------------------------------------------------- pom.xml | 1 + sshd-netty/pom.xml | 214 +++++++++++++++++++ .../org/apache/sshd/netty/NettyIoAcceptor.java | 174 +++++++++++++++ .../org/apache/sshd/netty/NettyIoConnector.java | 125 +++++++++++ .../org/apache/sshd/netty/NettyIoService.java | 55 +++++ .../sshd/netty/NettyIoServiceFactory.java | 75 +++++++ .../netty/NettyIoServiceFactoryFactory.java | 49 +++++ .../org/apache/sshd/netty/NettyIoSession.java | 211 ++++++++++++++++++ .../org/apache/sshd/netty/NettySupport.java | 45 ++++ ...pache.sshd.common.io.IoServiceFactoryFactory | 20 ++ 10 files changed, 969 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/598c991f/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index e3f9353..1d75f92 100644 --- a/pom.xml +++ b/pom.xml @@ -1007,6 +1007,7 @@ <module>sshd-core</module> <module>sshd-sftp</module> <module>sshd-mina</module> + <module>sshd-netty</module> <module>sshd-ldap</module> <module>sshd-git</module> <module>sshd-contrib</module> http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/598c991f/sshd-netty/pom.xml ---------------------------------------------------------------------- diff --git a/sshd-netty/pom.xml b/sshd-netty/pom.xml new file mode 100644 index 0000000..fdb0f3e --- /dev/null +++ b/sshd-netty/pom.xml @@ -0,0 +1,214 @@ +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> + + + <!-- + + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + --> + + <modelVersion>4.0.0</modelVersion> + + <parent> + <groupId>org.apache.sshd</groupId> + <artifactId>sshd</artifactId> + <version>1.7.1-SNAPSHOT</version> + <relativePath>..</relativePath> + </parent> + + <artifactId>sshd-netty</artifactId> + <name>Apache Mina SSHD :: Netty</name> + <packaging>jar</packaging> + <inceptionYear>2008</inceptionYear> + + <properties> + <projectRoot>${project.basedir}/..</projectRoot> + <netty.version>4.1.1.Final</netty.version> + </properties> + + <dependencies> + <dependency> + <groupId>org.apache.sshd</groupId> + <artifactId>sshd-core</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>io.netty</groupId> + <artifactId>netty-transport</artifactId> + <version>${netty.version}</version> + </dependency> + <dependency> + <groupId>io.netty</groupId> + <artifactId>netty-handler</artifactId> + <version>${netty.version}</version> + </dependency> + + <!-- test dependencies --> + <dependency> + <groupId>org.apache.sshd</groupId> + <artifactId>sshd-core</artifactId> + <version>${project.version}</version> + <type>test-jar</type> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.sshd</groupId> + <artifactId>sshd-sftp</artifactId> + <version>${project.version}</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>net.i2p.crypto</groupId> + <artifactId>eddsa</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>jcl-over-slf4j</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-log4j12</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>com.jcraft</groupId> + <artifactId>jsch</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>com.jcraft</groupId> + <artifactId>jzlib</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.springframework</groupId> + <artifactId>spring-context</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.mockito</groupId> + <artifactId>mockito-core</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>commons-httpclient</groupId> + <artifactId>commons-httpclient</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>ch.ethz.ganymed</groupId> + <artifactId>ganymed-ssh2</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.servicemix.bundles</groupId> + <artifactId>org.apache.servicemix.bundles.not-yet-commons-ssl</artifactId> + <scope>test</scope> + </dependency> + </dependencies> + + <build> + <testSourceDirectory>${build.directory}/test-sources</testSourceDirectory> + <testResources> + <testResource> + <directory>${build.directory}/test-resources</directory> + </testResource> + </testResources> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-resources-plugin</artifactId> + <executions> + <execution> + <id>copy-test-resources</id> + <phase>generate-test-resources</phase> + <goals> + <goal>copy-resources</goal> + </goals> + <configuration> + <outputDirectory>${build.directory}/test-resources</outputDirectory> + <resources> + <resource> + <directory>${projectRoot}/sshd-core/src/test/resources</directory> + </resource> + </resources> + </configuration> + </execution> + <execution> + <id>copy-test-sources</id> + <phase>generate-test-sources</phase> + <goals> + <goal>copy-resources</goal> + </goals> + <configuration> + <outputDirectory>${build.directory}/test-sources</outputDirectory> + <resources> + <resource> + <directory>${projectRoot}/sshd-core/src/test/java</directory> + <excludes> + <exclude>**/ProxyTest.java</exclude> + <exclude>**/PortForwardingTest.java</exclude> + <exclude>**/PortForwardingLoadTest.java</exclude> + </excludes> + </resource> + <resource> + <directory>${projectRoot}/sshd-sftp/src/test/java</directory> + </resource> + </resources> + </configuration> + </execution> + </executions> + </plugin> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-surefire-plugin</artifactId> + <configuration> + <redirectTestOutputToFile>true</redirectTestOutputToFile> + <reportsDirectory>${project.build.directory}/surefire-reports-netty</reportsDirectory> + <systemProperties> + <org.apache.sshd.common.io.IoServiceFactoryFactory>org.apache.sshd.netty.NettyIoServiceFactoryFactory</org.apache.sshd.common.io.IoServiceFactoryFactory> + </systemProperties> + <excludes> + <!-- These tests use NIO explicitly --> + <exclude>**/*LoadTest.java</exclude> + <exclude>**/ProxyTest.java</exclude> + <exclude>**/Nio2ServiceTest.java</exclude> + <!-- TODO need some more research as to why this fails on MINA but not on NIO2 --> + <exclude>**/ClientDeadlockTest.java</exclude> + <exclude>**/ApacheServer*Test.java</exclude> + <exclude>**/ClientTest.java</exclude> + <exclude>**/SpaceAvailableExtensionImplTest.java</exclude> + </excludes> + <!-- No need to re-run core tests that do not involve session creation --> + <excludedGroups>org.apache.sshd.util.test.NoIoTestCase</excludedGroups> + </configuration> + </plugin> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-javadoc-plugin</artifactId> + <configuration> + <additionalparam>-Xdoclint:none</additionalparam> + </configuration> + </plugin> + </plugins> + </build> +</project> http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/598c991f/sshd-netty/src/main/java/org/apache/sshd/netty/NettyIoAcceptor.java ---------------------------------------------------------------------- diff --git a/sshd-netty/src/main/java/org/apache/sshd/netty/NettyIoAcceptor.java b/sshd-netty/src/main/java/org/apache/sshd/netty/NettyIoAcceptor.java new file mode 100644 index 0000000..8d88cdf --- /dev/null +++ b/sshd-netty/src/main/java/org/apache/sshd/netty/NettyIoAcceptor.java @@ -0,0 +1,174 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.sshd.netty; + +import java.io.IOException; +import java.io.InterruptedIOException; +import java.net.InetSocketAddress; +import java.net.SocketAddress; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.CountDownLatch; + +import org.apache.sshd.common.future.CloseFuture; +import org.apache.sshd.common.future.DefaultCloseFuture; +import org.apache.sshd.common.io.IoAcceptor; +import org.apache.sshd.common.io.IoHandler; + +import io.netty.bootstrap.ServerBootstrap; +import io.netty.channel.Channel; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.ChannelOption; +import io.netty.channel.ChannelPipeline; +import io.netty.channel.group.DefaultChannelGroup; +import io.netty.channel.socket.SocketChannel; +import io.netty.channel.socket.nio.NioServerSocketChannel; +import io.netty.handler.logging.LogLevel; +import io.netty.handler.logging.LoggingHandler; +import io.netty.util.concurrent.GlobalEventExecutor; + +/** + * The Netty based IoAcceptor implementation. + * + * @author <a href="mailto:jul...@julienviet.com">Julien Viet</a> + * @author <a href="mailto:d...@mina.apache.org">Apache MINA SSHD Project</a> + */ +public class NettyIoAcceptor extends NettyIoService implements IoAcceptor { + + protected final ServerBootstrap bootstrap = new ServerBootstrap(); + protected final DefaultCloseFuture closeFuture = new DefaultCloseFuture(toString(), lock); + protected final Map<SocketAddress, Channel> boundAddresses = new HashMap<>(); + protected final IoHandler handler; + + public NettyIoAcceptor(NettyIoServiceFactory factory, IoHandler handler) { + this.factory = factory; + this.handler = handler; + channelGroup = new DefaultChannelGroup("sshd-acceptor-channels", GlobalEventExecutor.INSTANCE); + bootstrap.group(factory.eventLoopGroup) + .channel(NioServerSocketChannel.class) + .option(ChannelOption.SO_BACKLOG, 100) + .handler(new LoggingHandler(LogLevel.INFO)) + .childHandler(new ChannelInitializer<SocketChannel>() { + @Override + public void initChannel(SocketChannel ch) throws Exception { + ChannelPipeline p = ch.pipeline(); + p.addLast(new NettyIoSession(NettyIoAcceptor.this, handler).adapter); + } + }); + } + + @Override + public void bind(Collection<? extends SocketAddress> addresses) throws IOException { + for (SocketAddress address : addresses) { + bind(address); + } + } + + @Override + public void bind(SocketAddress address) throws IOException { + InetSocketAddress inetAddress = (InetSocketAddress) address; + ChannelFuture f = bootstrap.bind(inetAddress); + Channel channel = f.channel(); + channelGroup.add(channel); + try { + f.sync(); + SocketAddress bound = channel.localAddress(); + boundAddresses.put(bound, channel); + channel.closeFuture().addListener(fut -> boundAddresses.remove(bound)); + } catch (InterruptedException e) { + throw (InterruptedIOException) new InterruptedIOException().initCause(e); + } catch (Exception e) { + throw new IOException(e); + } + } + + @Override + public void unbind(Collection<? extends SocketAddress> addresses) { + CountDownLatch latch = new CountDownLatch(addresses.size()); + for (SocketAddress address : addresses) { + Channel channel = boundAddresses.get(address); + if (channel != null) { + ChannelFuture fut; + if (channel.isOpen()) { + fut = channel.close(); + } else { + fut = channel.closeFuture(); + } + fut.addListener(f -> latch.countDown()); + } else { + latch.countDown(); + } + } + try { + latch.await(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } + + @Override + public void unbind(SocketAddress address) { + Channel channel = boundAddresses.get(address); + if (channel != null) { + ChannelFuture fut; + if (channel.isOpen()) { + fut = channel.close(); + } else { + fut = channel.closeFuture(); + } + try { + fut.sync(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } + } + + @Override + public void unbind() { + Collection<SocketAddress> addresses = getBoundAddresses(); + if (log.isDebugEnabled()) { + log.debug("Unbinding {}", addresses); + } + + unbind(addresses); + } + + @Override + public Set<SocketAddress> getBoundAddresses() { + return new HashSet<>(boundAddresses.keySet()); + } + + @Override + protected CloseFuture doCloseGracefully() { + channelGroup.close().addListener(fut -> closeFuture.setClosed()); + return closeFuture; + } + + @Override + protected void doCloseImmediately() { + doCloseGracefully(); + super.doCloseImmediately(); + } +} http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/598c991f/sshd-netty/src/main/java/org/apache/sshd/netty/NettyIoConnector.java ---------------------------------------------------------------------- diff --git a/sshd-netty/src/main/java/org/apache/sshd/netty/NettyIoConnector.java b/sshd-netty/src/main/java/org/apache/sshd/netty/NettyIoConnector.java new file mode 100644 index 0000000..ebced0d --- /dev/null +++ b/sshd-netty/src/main/java/org/apache/sshd/netty/NettyIoConnector.java @@ -0,0 +1,125 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.sshd.netty; + +import java.net.SocketAddress; + +import org.apache.sshd.common.future.DefaultSshFuture; +import org.apache.sshd.common.io.IoConnectFuture; +import org.apache.sshd.common.io.IoConnector; +import org.apache.sshd.common.io.IoHandler; +import org.apache.sshd.common.io.IoSession; + +import io.netty.bootstrap.Bootstrap; +import io.netty.channel.Channel; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.ChannelOption; +import io.netty.channel.ChannelPipeline; +import io.netty.channel.group.DefaultChannelGroup; +import io.netty.channel.socket.SocketChannel; +import io.netty.channel.socket.nio.NioSocketChannel; +import io.netty.handler.logging.LogLevel; +import io.netty.handler.logging.LoggingHandler; +import io.netty.util.concurrent.GlobalEventExecutor; + +/** + * The Netty based IoConnector implementation. + * + * @author <a href="mailto:d...@mina.apache.org">Apache MINA SSHD Project</a> + */ +public class NettyIoConnector extends NettyIoService implements IoConnector { + + protected final Bootstrap bootstrap = new Bootstrap(); + protected final IoHandler handler; + + public NettyIoConnector(NettyIoServiceFactory factory, IoHandler handler) { + this.factory = factory; + this.handler = handler; + channelGroup = new DefaultChannelGroup("sshd-connector-channels", GlobalEventExecutor.INSTANCE); + bootstrap.group(factory.eventLoopGroup) + .channel(NioSocketChannel.class) + .option(ChannelOption.SO_BACKLOG, 100) + .handler(new ChannelInitializer<SocketChannel>() { + @Override + protected void initChannel(SocketChannel ch) throws Exception { + NettyIoSession session = new NettyIoSession(NettyIoConnector.this, handler); + ChannelPipeline p = ch.pipeline(); + p.addLast(new LoggingHandler(LogLevel.INFO)); + p.addLast(session.adapter); + } + }); + } + + @Override + public IoConnectFuture connect(SocketAddress address) { + boolean debugEnabled = log.isDebugEnabled(); + if (debugEnabled) { + log.debug("Connecting to {}", address); + } + + IoConnectFuture future = new DefaultIoConnectFuture(address, null); + ChannelFuture chf = bootstrap.connect(address); + Channel channel = chf.channel(); + channel.attr(CONNECT_FUTURE_KEY).set(future); + chf.addListener(cf -> { + Throwable t = chf.cause(); + if (t != null) { + future.setException(t); + } else if (chf.isCancelled()) { + future.cancel(); + } + }); + return future; + } + + public static class DefaultIoConnectFuture extends DefaultSshFuture<IoConnectFuture> implements IoConnectFuture { + public DefaultIoConnectFuture(Object id, Object lock) { + super(id, lock); + } + + @Override + public IoSession getSession() { + Object v = getValue(); + return v instanceof IoSession ? (IoSession) v : null; + } + + @Override + public Throwable getException() { + Object v = getValue(); + return v instanceof Throwable ? (Throwable) v : null; + } + + @Override + public boolean isConnected() { + return getValue() instanceof IoSession; + } + + @Override + public void setSession(IoSession session) { + setValue(session); + } + + @Override + public void setException(Throwable exception) { + setValue(exception); + } + } +} http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/598c991f/sshd-netty/src/main/java/org/apache/sshd/netty/NettyIoService.java ---------------------------------------------------------------------- diff --git a/sshd-netty/src/main/java/org/apache/sshd/netty/NettyIoService.java b/sshd-netty/src/main/java/org/apache/sshd/netty/NettyIoService.java new file mode 100644 index 0000000..9bd3ca3 --- /dev/null +++ b/sshd-netty/src/main/java/org/apache/sshd/netty/NettyIoService.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.sshd.netty; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicLong; + +import org.apache.sshd.common.io.IoConnectFuture; +import org.apache.sshd.common.io.IoService; +import org.apache.sshd.common.io.IoSession; +import org.apache.sshd.common.util.closeable.AbstractCloseable; + +import io.netty.channel.group.ChannelGroup; +import io.netty.util.AttributeKey; + +/** + * @author <a href="mailto:jul...@julienviet.com">Julien Viet</a> + * @author <a href="mailto:d...@mina.apache.org">Apache MINA SSHD Project</a> + */ +public class NettyIoService extends AbstractCloseable implements IoService { + + public static final AttributeKey<IoConnectFuture> CONNECT_FUTURE_KEY = AttributeKey.valueOf(IoConnectFuture.class.getName()); + + protected final AtomicLong sessionSeq = new AtomicLong(); + protected final Map<Long, IoSession> sessions = new ConcurrentHashMap<>(); + protected NettyIoServiceFactory factory; + protected ChannelGroup channelGroup; + + public NettyIoService() { + super(); + } + + @Override + public Map<Long, IoSession> getManagedSessions() { + return sessions; + } +} http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/598c991f/sshd-netty/src/main/java/org/apache/sshd/netty/NettyIoServiceFactory.java ---------------------------------------------------------------------- diff --git a/sshd-netty/src/main/java/org/apache/sshd/netty/NettyIoServiceFactory.java b/sshd-netty/src/main/java/org/apache/sshd/netty/NettyIoServiceFactory.java new file mode 100644 index 0000000..2bc3f97 --- /dev/null +++ b/sshd-netty/src/main/java/org/apache/sshd/netty/NettyIoServiceFactory.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.sshd.netty; + +import org.apache.sshd.common.future.CloseFuture; +import org.apache.sshd.common.io.IoAcceptor; +import org.apache.sshd.common.io.IoConnector; +import org.apache.sshd.common.io.IoHandler; +import org.apache.sshd.common.io.IoServiceFactory; +import org.apache.sshd.common.util.closeable.AbstractCloseable; + +import io.netty.channel.EventLoopGroup; +import io.netty.channel.nio.NioEventLoopGroup; + +/** + * @author <a href="mailto:jul...@julienviet.com">Julien Viet</a> + * @author <a href="mailto:d...@mina.apache.org">Apache MINA SSHD Project</a> + */ +public class NettyIoServiceFactory extends AbstractCloseable implements IoServiceFactory { + + protected final EventLoopGroup eventLoopGroup; + protected final boolean closeEventLoopGroup; + + public NettyIoServiceFactory() { + this(null); + } + + public NettyIoServiceFactory(EventLoopGroup group) { + this.eventLoopGroup = group != null ? group : new NioEventLoopGroup(); + this.closeEventLoopGroup = group == null; + } + + @Override + public IoConnector createConnector(IoHandler handler) { + return new NettyIoConnector(this, handler); + } + + @Override + public IoAcceptor createAcceptor(IoHandler handler) { + return new NettyIoAcceptor(this, handler); + } + + @Override + protected CloseFuture doCloseGracefully() { + if (closeEventLoopGroup) { + eventLoopGroup.shutdownGracefully().addListener(fut -> closeFuture.setClosed()); + } else { + closeFuture.setClosed(); + } + return closeFuture; + } + + @Override + protected void doCloseImmediately() { + doCloseGracefully(); + super.doCloseImmediately(); + } +} http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/598c991f/sshd-netty/src/main/java/org/apache/sshd/netty/NettyIoServiceFactoryFactory.java ---------------------------------------------------------------------- diff --git a/sshd-netty/src/main/java/org/apache/sshd/netty/NettyIoServiceFactoryFactory.java b/sshd-netty/src/main/java/org/apache/sshd/netty/NettyIoServiceFactoryFactory.java new file mode 100644 index 0000000..bead4aa --- /dev/null +++ b/sshd-netty/src/main/java/org/apache/sshd/netty/NettyIoServiceFactoryFactory.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.sshd.netty; + +import org.apache.sshd.common.FactoryManager; +import org.apache.sshd.common.io.IoServiceFactory; +import org.apache.sshd.common.io.IoServiceFactoryFactory; + +import io.netty.channel.EventLoopGroup; + +/** + * @author <a href="mailto:jul...@julienviet.com">Julien Viet</a> + * @author <a href="mailto:d...@mina.apache.org">Apache MINA SSHD Project</a> + */ +public class NettyIoServiceFactoryFactory implements IoServiceFactoryFactory { + + protected final EventLoopGroup eventLoopGroup; + + public NettyIoServiceFactoryFactory() { + this(null); + } + + public NettyIoServiceFactoryFactory(EventLoopGroup eventLoopGroup) { + this.eventLoopGroup = eventLoopGroup; + } + + @Override + public IoServiceFactory create(FactoryManager manager) { + return new NettyIoServiceFactory(eventLoopGroup); + } + +} http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/598c991f/sshd-netty/src/main/java/org/apache/sshd/netty/NettyIoSession.java ---------------------------------------------------------------------- diff --git a/sshd-netty/src/main/java/org/apache/sshd/netty/NettyIoSession.java b/sshd-netty/src/main/java/org/apache/sshd/netty/NettyIoSession.java new file mode 100644 index 0000000..57aecc6 --- /dev/null +++ b/sshd-netty/src/main/java/org/apache/sshd/netty/NettyIoSession.java @@ -0,0 +1,211 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.sshd.netty; + +import java.net.SocketAddress; +import java.util.HashMap; +import java.util.Map; + +import org.apache.sshd.common.future.CloseFuture; +import org.apache.sshd.common.future.DefaultCloseFuture; +import org.apache.sshd.common.io.AbstractIoWriteFuture; +import org.apache.sshd.common.io.IoConnectFuture; +import org.apache.sshd.common.io.IoHandler; +import org.apache.sshd.common.io.IoService; +import org.apache.sshd.common.io.IoSession; +import org.apache.sshd.common.io.IoWriteFuture; +import org.apache.sshd.common.util.buffer.Buffer; +import org.apache.sshd.common.util.closeable.AbstractCloseable; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelFutureListener; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInboundHandlerAdapter; +import io.netty.channel.ChannelPromise; + +/** + * The Netty based IoSession implementation. + * + * @author <a href="mailto:jul...@julienviet.com">Julien Viet</a> + * @author <a href="mailto:d...@mina.apache.org">Apache MINA SSHD Project</a> + */ +public class NettyIoSession extends AbstractCloseable implements IoSession { + + protected final Map<Object, Object> attributes = new HashMap<>(); + protected final NettyIoService service; + protected final IoHandler handler; + protected final DefaultCloseFuture closeFuture = new DefaultCloseFuture(toString(), lock); + protected final long id; + protected ChannelHandlerContext context; + protected SocketAddress remoteAddr; + protected ChannelFuture prev; + protected final ChannelInboundHandlerAdapter adapter = new Adapter(); + + public NettyIoSession(NettyIoService service, IoHandler handler) { + this.service = service; + this.handler = handler; + this.id = service.sessionSeq.incrementAndGet(); + } + + @Override + public long getId() { + return id; + } + + @Override + public Object getAttribute(Object key) { + return attributes.get(key); + } + + @Override + public Object setAttribute(Object key, Object value) { + return attributes.put(key, value); + } + + @Override + public Object setAttributeIfAbsent(Object key, Object value) { + return attributes.putIfAbsent(key, value); + } + + @Override + public Object removeAttribute(Object key) { + return attributes.remove(key); + } + + @Override + public SocketAddress getRemoteAddress() { + return remoteAddr; + } + + @Override + public SocketAddress getLocalAddress() { + return context.channel().localAddress(); + } + + @Override + public IoWriteFuture writePacket(Buffer buffer) { + ByteBuf buf = Unpooled.buffer(buffer.available()); + buf.writeBytes(buffer.array(), buffer.rpos(), buffer.available()); + DefaultIoWriteFuture msg = new DefaultIoWriteFuture(getRemoteAddress(), null); + ChannelPromise next = context.newPromise(); + prev.addListener(whatever -> { + if (context != null) { + context.writeAndFlush(buf, next); + } + }); + prev = next; + next.addListener(fut -> { + if (fut.isSuccess()) { + msg.setValue(Boolean.TRUE); + } else { + msg.setValue(fut.cause()); + } + }); + return msg; + } + + @Override + public IoService getService() { + return service; + } + + @Override + protected CloseFuture doCloseGracefully() { + context.writeAndFlush(Unpooled.EMPTY_BUFFER). + addListener(ChannelFutureListener.CLOSE). + addListener(fut -> { + closeFuture.setClosed(); + }); + return closeFuture; + } + + @Override + protected void doCloseImmediately() { + context.close(); + super.doCloseImmediately(); + } + + protected void channelActive(ChannelHandlerContext ctx) throws Exception { + context = ctx; + service.channelGroup.add(ctx.channel()); + service.sessions.put(id, NettyIoSession.this); + prev = context.newPromise().setSuccess(); + remoteAddr = context.channel().remoteAddress(); + handler.sessionCreated(NettyIoSession.this); + IoConnectFuture future = ctx.channel().attr(NettyIoService.CONNECT_FUTURE_KEY).get(); + if (future != null) { + future.setSession(NettyIoSession.this); + } + } + + protected void channelInactive(ChannelHandlerContext ctx) throws Exception { + service.sessions.remove(id); + handler.sessionClosed(NettyIoSession.this); + context = null; + } + + protected void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { + ByteBuf buf = (ByteBuf) msg; + handler.messageReceived(NettyIoSession.this, NettySupport.asReadable(buf)); + } + + protected void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { + handler.exceptionCaught(NettyIoSession.this, cause); + } + + protected static class DefaultIoWriteFuture extends AbstractIoWriteFuture { + + public DefaultIoWriteFuture(Object id, Object lock) { + super(id, lock); + } + } + + /** + * Simple netty adapter to use as a bridge. + */ + protected class Adapter extends ChannelInboundHandlerAdapter { + + public Adapter() { + super(); + } + + @Override + public void channelActive(ChannelHandlerContext ctx) throws Exception { + NettyIoSession.this.channelActive(ctx); + } + + @Override + public void channelInactive(ChannelHandlerContext ctx) throws Exception { + NettyIoSession.this.channelInactive(ctx); + } + + @Override + public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { + NettyIoSession.this.channelRead(ctx, msg); + } + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { + NettyIoSession.this.exceptionCaught(ctx, cause); + } + } +} http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/598c991f/sshd-netty/src/main/java/org/apache/sshd/netty/NettySupport.java ---------------------------------------------------------------------- diff --git a/sshd-netty/src/main/java/org/apache/sshd/netty/NettySupport.java b/sshd-netty/src/main/java/org/apache/sshd/netty/NettySupport.java new file mode 100644 index 0000000..9f9de89 --- /dev/null +++ b/sshd-netty/src/main/java/org/apache/sshd/netty/NettySupport.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.sshd.netty; + +import org.apache.sshd.common.util.Readable; + +import io.netty.buffer.ByteBuf; + +public final class NettySupport { + + private NettySupport() { + throw new UnsupportedOperationException("No instance allowed"); + } + + public static Readable asReadable(final ByteBuf buffer) { + return new Readable() { + @Override + public int available() { + return buffer.readableBytes(); + } + + @Override + public void getRawBytes(byte[] data, int offset, int len) { + buffer.getBytes(0, data, offset, len); + } + }; + } + +} http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/598c991f/sshd-netty/src/main/resources/META-INF/services/org.apache.sshd.common.io.IoServiceFactoryFactory ---------------------------------------------------------------------- diff --git a/sshd-netty/src/main/resources/META-INF/services/org.apache.sshd.common.io.IoServiceFactoryFactory b/sshd-netty/src/main/resources/META-INF/services/org.apache.sshd.common.io.IoServiceFactoryFactory new file mode 100644 index 0000000..be71649 --- /dev/null +++ b/sshd-netty/src/main/resources/META-INF/services/org.apache.sshd.common.io.IoServiceFactoryFactory @@ -0,0 +1,20 @@ +## +## Licensed to the Apache Software Foundation (ASF) under one +## or more contributor license agreements. See the NOTICE file +## distributed with this work for additional information +## regarding copyright ownership. The ASF licenses this file +## to you under the Apache License, Version 2.0 (the +## "License"); you may not use this file except in compliance +## with the License. You may obtain a copy of the License at +## +## http://www.apache.org/licenses/LICENSE-2.0 +## +## Unless required by applicable law or agreed to in writing, +## software distributed under the License is distributed on an +## "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +## KIND, either express or implied. See the License for the +## specific language governing permissions and limitations +## under the License. +## + +org.apache.sshd.netty.NettyIoServiceFactoryFactory