http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-core-client/src/main/java/org/apache/activemq6/core/remoting/impl/netty/NettyConnector.java ---------------------------------------------------------------------- diff --git a/activemq6-core-client/src/main/java/org/apache/activemq6/core/remoting/impl/netty/NettyConnector.java b/activemq6-core-client/src/main/java/org/apache/activemq6/core/remoting/impl/netty/NettyConnector.java new file mode 100644 index 0000000..569ae20 --- /dev/null +++ b/activemq6-core-client/src/main/java/org/apache/activemq6/core/remoting/impl/netty/NettyConnector.java @@ -0,0 +1,1266 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +/* + * Copyright 2009 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.apache.activemq6.core.remoting.impl.netty; + +import javax.net.ssl.SSLContext; +import javax.net.ssl.SSLEngine; +import java.io.IOException; +import java.net.ConnectException; +import java.net.Inet6Address; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.net.SocketAddress; +import java.net.URI; +import java.net.URISyntaxException; +import java.net.UnknownHostException; +import java.nio.charset.StandardCharsets; +import java.security.AccessController; +import java.security.MessageDigest; +import java.security.NoSuchAlgorithmException; +import java.security.PrivilegedAction; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Executor; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; + +import io.netty.bootstrap.Bootstrap; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; +import io.netty.buffer.UnpooledByteBufAllocator; +import io.netty.channel.Channel; +import io.netty.channel.ChannelDuplexHandler; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.ChannelOption; +import io.netty.channel.ChannelPipeline; +import io.netty.channel.ChannelPromise; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.SimpleChannelInboundHandler; +import io.netty.channel.group.ChannelGroup; +import io.netty.channel.group.DefaultChannelGroup; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.nio.NioSocketChannel; +import io.netty.handler.codec.base64.Base64; +import io.netty.handler.codec.http.ClientCookieEncoder; +import io.netty.handler.codec.http.Cookie; +import io.netty.handler.codec.http.CookieDecoder; +import io.netty.handler.codec.http.DefaultFullHttpRequest; +import io.netty.handler.codec.http.DefaultHttpRequest; +import io.netty.handler.codec.http.FullHttpRequest; +import io.netty.handler.codec.http.FullHttpResponse; +import io.netty.handler.codec.http.HttpClientCodec; +import io.netty.handler.codec.http.HttpHeaders; +import io.netty.handler.codec.http.HttpMethod; +import io.netty.handler.codec.http.HttpObject; +import io.netty.handler.codec.http.HttpObjectAggregator; +import io.netty.handler.codec.http.HttpRequest; +import io.netty.handler.codec.http.HttpRequestEncoder; +import io.netty.handler.codec.http.HttpResponse; +import io.netty.handler.codec.http.HttpResponseDecoder; +import io.netty.handler.codec.http.HttpResponseStatus; +import io.netty.handler.codec.http.HttpVersion; +import io.netty.handler.ssl.SslHandler; +import io.netty.util.AttributeKey; +import io.netty.util.ResourceLeakDetector; +import io.netty.util.concurrent.Future; +import io.netty.util.concurrent.GlobalEventExecutor; +import org.apache.activemq6.api.config.HornetQDefaultConfiguration; +import org.apache.activemq6.api.core.HornetQException; +import org.apache.activemq6.core.client.HornetQClientLogger; +import org.apache.activemq6.core.client.HornetQClientMessageBundle; +import org.apache.activemq6.core.client.impl.ClientSessionFactoryImpl; +import org.apache.activemq6.core.protocol.core.impl.HornetQClientProtocolManager; +import org.apache.activemq6.core.remoting.impl.ssl.SSLSupport; +import org.apache.activemq6.core.server.HornetQComponent; +import org.apache.activemq6.spi.core.remoting.AbstractConnector; +import org.apache.activemq6.spi.core.remoting.BufferHandler; +import org.apache.activemq6.spi.core.remoting.ClientProtocolManager; +import org.apache.activemq6.spi.core.remoting.Connection; +import org.apache.activemq6.spi.core.remoting.ConnectionLifeCycleListener; +import org.apache.activemq6.utils.ConfigurationHelper; +import org.apache.activemq6.utils.FutureLatch; + +import static org.apache.activemq6.utils.Base64.encodeBytes; + +/** + * A NettyConnector + * + * @author <a href="mailto:[email protected]">Tim Fox</a> + * @author <a href="mailto:[email protected]">Trustin Lee</a> + * @author <a href="mailto:[email protected]">Norman Maurer</a> + * @author <a href="mailto:[email protected]">Martyn Taylor</a> + */ +public class NettyConnector extends AbstractConnector +{ + // Constants ----------------------------------------------------- + public static final String JAVAX_KEYSTORE_PATH_PROP_NAME = "javax.net.ssl.keyStore"; + public static final String JAVAX_KEYSTORE_PASSWORD_PROP_NAME = "javax.net.ssl.keyStorePassword"; + public static final String JAVAX_TRUSTSTORE_PATH_PROP_NAME = "javax.net.ssl.trustStore"; + public static final String JAVAX_TRUSTSTORE_PASSWORD_PROP_NAME = "javax.net.ssl.trustStorePassword"; + public static final String HORNETQ_KEYSTORE_PROVIDER_PROP_NAME = "org.apache.activemq6.ssl.keyStoreProvider"; + public static final String HORNETQ_KEYSTORE_PATH_PROP_NAME = "org.apache.activemq6.ssl.keyStore"; + public static final String HORNETQ_KEYSTORE_PASSWORD_PROP_NAME = "org.apache.activemq6.ssl.keyStorePassword"; + public static final String HORNETQ_TRUSTSTORE_PROVIDER_PROP_NAME = "org.apache.activemq6.ssl.trustStoreProvider"; + public static final String HORNETQ_TRUSTSTORE_PATH_PROP_NAME = "org.apache.activemq6.ssl.trustStore"; + public static final String HORNETQ_TRUSTSTORE_PASSWORD_PROP_NAME = "org.apache.activemq6.ssl.trustStorePassword"; + + // Constants for HTTP upgrade + // These constants are exposed publicly as they are used on the server-side to fetch + // headers from the HTTP request, compute some values and fill the HTTP response + public static final String MAGIC_NUMBER = "CF70DEB8-70F9-4FBA-8B4F-DFC3E723B4CD"; + public static final String SEC_HORNETQ_REMOTING_KEY = "Sec-HornetQRemoting-Key"; + public static final String SEC_HORNETQ_REMOTING_ACCEPT = "Sec-HornetQRemoting-Accept"; + public static final String HORNETQ_REMOTING = "hornetq-remoting"; + + private static final AttributeKey<String> REMOTING_KEY = AttributeKey.valueOf(SEC_HORNETQ_REMOTING_KEY); + + // Default Configuration + public static final Map<String, Object> DEFAULT_CONFIG; + + static + { + // Disable resource leak detection for performance reasons by default + ResourceLeakDetector.setEnabled(false); + + // Set default Configuration + Map<String, Object> config = new HashMap<String , Object>(); + config.put(TransportConstants.HOST_PROP_NAME, TransportConstants.DEFAULT_HOST); + config.put(TransportConstants.PORT_PROP_NAME, TransportConstants.DEFAULT_PORT); + DEFAULT_CONFIG = Collections.unmodifiableMap(config); + } + + // Attributes ---------------------------------------------------- + + private Class<? extends Channel> channelClazz; + + private Bootstrap bootstrap; + + private ChannelGroup channelGroup; + + private final BufferHandler handler; + + private final ConnectionLifeCycleListener listener; + + private final boolean sslEnabled; + + private final boolean httpEnabled; + + private final long httpMaxClientIdleTime; + + private final long httpClientIdleScanPeriod; + + private final boolean httpRequiresSessionId; + + // if true, after the connection, the connector will send + // a HTTP GET request (+ Upgrade: hornetq-remoting) that + // will be handled by the server's http server. + private final boolean httpUpgradeEnabled; + + private final boolean useServlet; + + private final String host; + + private final int port; + + private final String localAddress; + + private final int localPort; + + private final String keyStoreProvider; + + private final String keyStorePath; + + private final String keyStorePassword; + + private final String trustStoreProvider; + + private final String trustStorePath; + + private final String trustStorePassword; + + private final String enabledCipherSuites; + + private final String enabledProtocols; + + private final boolean tcpNoDelay; + + private final int tcpSendBufferSize; + + private final int tcpReceiveBufferSize; + + private final long batchDelay; + + private final ConcurrentMap<Object, Connection> connections = new ConcurrentHashMap<Object, Connection>(); + + private final String servletPath; + + private final int nioRemotingThreads; + + private final boolean useNioGlobalWorkerPool; + + private final ScheduledExecutorService scheduledThreadPool; + + private final Executor closeExecutor; + + private BatchFlusher flusher; + + private ScheduledFuture<?> batchFlusherFuture; + + private EventLoopGroup group; + + private int connectTimeoutMillis; + + private final ClientProtocolManager protocolManager; + + // Static -------------------------------------------------------- + + // Constructors -------------------------------------------------- + + // Public -------------------------------------------------------- + public NettyConnector(final Map<String, Object> configuration, + final BufferHandler handler, + final ConnectionLifeCycleListener listener, + final Executor closeExecutor, + final Executor threadPool, + final ScheduledExecutorService scheduledThreadPool) + { + this(configuration, handler, listener, closeExecutor, threadPool, scheduledThreadPool, new HornetQClientProtocolManager()); + } + + + public NettyConnector(final Map<String, Object> configuration, + final BufferHandler handler, + final ConnectionLifeCycleListener listener, + final Executor closeExecutor, + final Executor threadPool, + final ScheduledExecutorService scheduledThreadPool, + final ClientProtocolManager protocolManager) + { + super(configuration); + + this.protocolManager = protocolManager; + + if (listener == null) + { + throw HornetQClientMessageBundle.BUNDLE.nullListener(); + } + + if (handler == null) + { + throw HornetQClientMessageBundle.BUNDLE.nullHandler(); + } + + this.listener = listener; + + this.handler = handler; + + sslEnabled = ConfigurationHelper.getBooleanProperty(TransportConstants.SSL_ENABLED_PROP_NAME, + TransportConstants.DEFAULT_SSL_ENABLED, + configuration); + httpEnabled = ConfigurationHelper.getBooleanProperty(TransportConstants.HTTP_ENABLED_PROP_NAME, + TransportConstants.DEFAULT_HTTP_ENABLED, + configuration); + servletPath = ConfigurationHelper.getStringProperty(TransportConstants.SERVLET_PATH, + TransportConstants.DEFAULT_SERVLET_PATH, + configuration); + if (httpEnabled) + { + httpMaxClientIdleTime = ConfigurationHelper.getLongProperty(TransportConstants.HTTP_CLIENT_IDLE_PROP_NAME, + TransportConstants.DEFAULT_HTTP_CLIENT_IDLE_TIME, + configuration); + httpClientIdleScanPeriod = ConfigurationHelper.getLongProperty(TransportConstants.HTTP_CLIENT_IDLE_SCAN_PERIOD, + TransportConstants.DEFAULT_HTTP_CLIENT_SCAN_PERIOD, + configuration); + httpRequiresSessionId = ConfigurationHelper.getBooleanProperty(TransportConstants.HTTP_REQUIRES_SESSION_ID, + TransportConstants.DEFAULT_HTTP_REQUIRES_SESSION_ID, + configuration); + } + else + { + httpMaxClientIdleTime = 0; + httpClientIdleScanPeriod = -1; + httpRequiresSessionId = false; + } + + httpUpgradeEnabled = ConfigurationHelper.getBooleanProperty(TransportConstants.HTTP_UPGRADE_ENABLED_PROP_NAME, + TransportConstants.DEFAULT_HTTP_UPGRADE_ENABLED, + configuration); + + nioRemotingThreads = ConfigurationHelper.getIntProperty(TransportConstants.NIO_REMOTING_THREADS_PROPNAME, + -1, + configuration); + + useNioGlobalWorkerPool = ConfigurationHelper.getBooleanProperty(TransportConstants.USE_NIO_GLOBAL_WORKER_POOL_PROP_NAME, + TransportConstants.DEFAULT_USE_NIO_GLOBAL_WORKER_POOL, + configuration); + + useServlet = ConfigurationHelper.getBooleanProperty(TransportConstants.USE_SERVLET_PROP_NAME, + TransportConstants.DEFAULT_USE_SERVLET, + configuration); + host = ConfigurationHelper.getStringProperty(TransportConstants.HOST_PROP_NAME, + TransportConstants.DEFAULT_HOST, + configuration); + port = ConfigurationHelper.getIntProperty(TransportConstants.PORT_PROP_NAME, + TransportConstants.DEFAULT_PORT, + configuration); + localAddress = ConfigurationHelper.getStringProperty(TransportConstants.LOCAL_ADDRESS_PROP_NAME, + TransportConstants.DEFAULT_LOCAL_ADDRESS, + configuration); + + localPort = ConfigurationHelper.getIntProperty(TransportConstants.LOCAL_PORT_PROP_NAME, + TransportConstants.DEFAULT_LOCAL_PORT, + configuration); + if (sslEnabled) + { + keyStoreProvider = ConfigurationHelper.getStringProperty(TransportConstants.KEYSTORE_PROVIDER_PROP_NAME, + TransportConstants.DEFAULT_KEYSTORE_PROVIDER, + configuration); + + keyStorePath = ConfigurationHelper.getStringProperty(TransportConstants.KEYSTORE_PATH_PROP_NAME, + TransportConstants.DEFAULT_KEYSTORE_PATH, + configuration); + + keyStorePassword = ConfigurationHelper.getPasswordProperty(TransportConstants.KEYSTORE_PASSWORD_PROP_NAME, + TransportConstants.DEFAULT_KEYSTORE_PASSWORD, + configuration, + HornetQDefaultConfiguration.getPropMaskPassword(), + HornetQDefaultConfiguration.getPropMaskPassword()); + + trustStoreProvider = ConfigurationHelper.getStringProperty(TransportConstants.TRUSTSTORE_PROVIDER_PROP_NAME, + TransportConstants.DEFAULT_TRUSTSTORE_PROVIDER, + configuration); + + trustStorePath = ConfigurationHelper.getStringProperty(TransportConstants.TRUSTSTORE_PATH_PROP_NAME, + TransportConstants.DEFAULT_TRUSTSTORE_PATH, + configuration); + + trustStorePassword = ConfigurationHelper.getPasswordProperty(TransportConstants.TRUSTSTORE_PASSWORD_PROP_NAME, + TransportConstants.DEFAULT_TRUSTSTORE_PASSWORD, + configuration, + HornetQDefaultConfiguration.getPropMaskPassword(), + HornetQDefaultConfiguration.getPropMaskPassword()); + + enabledCipherSuites = ConfigurationHelper.getStringProperty(TransportConstants.ENABLED_CIPHER_SUITES_PROP_NAME, + TransportConstants.DEFAULT_ENABLED_CIPHER_SUITES, + configuration); + + enabledProtocols = ConfigurationHelper.getStringProperty(TransportConstants.ENABLED_PROTOCOLS_PROP_NAME, + TransportConstants.DEFAULT_ENABLED_PROTOCOLS, + configuration); + } + else + { + keyStoreProvider = TransportConstants.DEFAULT_KEYSTORE_PROVIDER; + keyStorePath = TransportConstants.DEFAULT_KEYSTORE_PATH; + keyStorePassword = TransportConstants.DEFAULT_KEYSTORE_PASSWORD; + trustStoreProvider = TransportConstants.DEFAULT_TRUSTSTORE_PROVIDER; + trustStorePath = TransportConstants.DEFAULT_TRUSTSTORE_PATH; + trustStorePassword = TransportConstants.DEFAULT_TRUSTSTORE_PASSWORD; + enabledCipherSuites = TransportConstants.DEFAULT_ENABLED_CIPHER_SUITES; + enabledProtocols = TransportConstants.DEFAULT_ENABLED_PROTOCOLS; + } + + tcpNoDelay = ConfigurationHelper.getBooleanProperty(TransportConstants.TCP_NODELAY_PROPNAME, + TransportConstants.DEFAULT_TCP_NODELAY, + configuration); + tcpSendBufferSize = ConfigurationHelper.getIntProperty(TransportConstants.TCP_SENDBUFFER_SIZE_PROPNAME, + TransportConstants.DEFAULT_TCP_SENDBUFFER_SIZE, + configuration); + tcpReceiveBufferSize = ConfigurationHelper.getIntProperty(TransportConstants.TCP_RECEIVEBUFFER_SIZE_PROPNAME, + TransportConstants.DEFAULT_TCP_RECEIVEBUFFER_SIZE, + configuration); + + batchDelay = ConfigurationHelper.getLongProperty(TransportConstants.BATCH_DELAY, + TransportConstants.DEFAULT_BATCH_DELAY, + configuration); + + connectTimeoutMillis = ConfigurationHelper.getIntProperty(TransportConstants.NETTY_CONNECT_TIMEOUT, + TransportConstants.DEFAULT_NETTY_CONNECT_TIMEOUT, + configuration); + this.closeExecutor = closeExecutor; + this.scheduledThreadPool = scheduledThreadPool; + } + + @Override + public String toString() + { + return "NettyConnector [host=" + host + + ", port=" + + port + + ", httpEnabled=" + + httpEnabled + + ", httpUpgradeEnabled=" + + httpUpgradeEnabled + + ", useServlet=" + + useServlet + + ", servletPath=" + + servletPath + + ", sslEnabled=" + + sslEnabled + + ", useNio=" + + true + + "]"; + } + + public synchronized void start() + { + if (channelClazz != null) + { + return; + } + + int threadsToUse; + + if (nioRemotingThreads == -1) + { + // Default to number of cores * 3 + + threadsToUse = Runtime.getRuntime().availableProcessors() * 3; + } + else + { + threadsToUse = this.nioRemotingThreads; + } + + + if (useNioGlobalWorkerPool) + { + channelClazz = NioSocketChannel.class; + group = SharedNioEventLoopGroup.getInstance(threadsToUse); + } + else + { + channelClazz = NioSocketChannel.class; + group = new NioEventLoopGroup(threadsToUse); + } + // if we are a servlet wrap the socketChannelFactory + + bootstrap = new Bootstrap(); + bootstrap.channel(channelClazz); + bootstrap.group(group); + + bootstrap.option(ChannelOption.TCP_NODELAY, tcpNoDelay); + + if (connectTimeoutMillis != -1) + { + bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, connectTimeoutMillis); + } + if (tcpReceiveBufferSize != -1) + { + bootstrap.option(ChannelOption.SO_RCVBUF, tcpReceiveBufferSize); + } + if (tcpSendBufferSize != -1) + { + bootstrap.option(ChannelOption.SO_SNDBUF, tcpSendBufferSize); + } + bootstrap.option(ChannelOption.SO_KEEPALIVE, true); + bootstrap.option(ChannelOption.SO_REUSEADDR, true); + bootstrap.option(ChannelOption.ALLOCATOR, new UnpooledByteBufAllocator(false)); + channelGroup = new DefaultChannelGroup("hornetq-connector", GlobalEventExecutor.INSTANCE); + + final SSLContext context; + if (sslEnabled) + { + try + { + // HORNETQ-680 - override the server-side config if client-side system properties are set + String realKeyStorePath = keyStorePath; + String realKeyStoreProvider = keyStoreProvider; + String realKeyStorePassword = keyStorePassword; + if (System.getProperty(JAVAX_KEYSTORE_PATH_PROP_NAME) != null) + { + realKeyStorePath = System.getProperty(JAVAX_KEYSTORE_PATH_PROP_NAME); + } + if (System.getProperty(JAVAX_KEYSTORE_PASSWORD_PROP_NAME) != null) + { + realKeyStorePassword = System.getProperty(JAVAX_KEYSTORE_PASSWORD_PROP_NAME); + } + + if (System.getProperty(HORNETQ_KEYSTORE_PROVIDER_PROP_NAME) != null) + { + realKeyStoreProvider = System.getProperty(HORNETQ_KEYSTORE_PROVIDER_PROP_NAME); + } + if (System.getProperty(HORNETQ_KEYSTORE_PATH_PROP_NAME) != null) + { + realKeyStorePath = System.getProperty(HORNETQ_KEYSTORE_PATH_PROP_NAME); + } + if (System.getProperty(HORNETQ_KEYSTORE_PASSWORD_PROP_NAME) != null) + { + realKeyStorePassword = System.getProperty(HORNETQ_KEYSTORE_PASSWORD_PROP_NAME); + } + + String realTrustStorePath = trustStorePath; + String realTrustStoreProvider = trustStoreProvider; + String realTrustStorePassword = trustStorePassword; + if (System.getProperty(JAVAX_TRUSTSTORE_PATH_PROP_NAME) != null) + { + realTrustStorePath = System.getProperty(JAVAX_TRUSTSTORE_PATH_PROP_NAME); + } + if (System.getProperty(JAVAX_TRUSTSTORE_PASSWORD_PROP_NAME) != null) + { + realTrustStorePassword = System.getProperty(JAVAX_TRUSTSTORE_PASSWORD_PROP_NAME); + } + + if (System.getProperty(HORNETQ_TRUSTSTORE_PROVIDER_PROP_NAME) != null) + { + realTrustStoreProvider = System.getProperty(HORNETQ_TRUSTSTORE_PROVIDER_PROP_NAME); + } + if (System.getProperty(HORNETQ_TRUSTSTORE_PATH_PROP_NAME) != null) + { + realTrustStorePath = System.getProperty(HORNETQ_TRUSTSTORE_PATH_PROP_NAME); + } + if (System.getProperty(HORNETQ_TRUSTSTORE_PASSWORD_PROP_NAME) != null) + { + realTrustStorePassword = System.getProperty(HORNETQ_TRUSTSTORE_PASSWORD_PROP_NAME); + } + context = SSLSupport.createContext(realKeyStoreProvider, realKeyStorePath, realKeyStorePassword, realTrustStoreProvider, realTrustStorePath, realTrustStorePassword); + } + catch (Exception e) + { + close(); + IllegalStateException ise = new IllegalStateException("Unable to create NettyConnector for " + host + ":" + port); + ise.initCause(e); + throw ise; + } + } + else + { + context = null; // Unused + } + + if (context != null && useServlet) + { + // TODO: Fix me + //bootstrap.setOption("sslContext", context); + } + + bootstrap.handler(new ChannelInitializer<Channel>() + { + public void initChannel(Channel channel) throws Exception + { + final ChannelPipeline pipeline = channel.pipeline(); + if (sslEnabled && !useServlet) + { + SSLEngine engine = context.createSSLEngine(); + + engine.setUseClientMode(true); + + engine.setWantClientAuth(true); + + // setting the enabled cipher suites resets the enabled protocols so we need + // to save the enabled protocols so that after the customer cipher suite is enabled + // we can reset the enabled protocols if a customer protocol isn't specified + String[] originalProtocols = engine.getEnabledProtocols(); + + if (enabledCipherSuites != null) + { + try + { + engine.setEnabledCipherSuites(SSLSupport.parseCommaSeparatedListIntoArray(enabledCipherSuites)); + } + catch (IllegalArgumentException e) + { + HornetQClientLogger.LOGGER.invalidCipherSuite(SSLSupport.parseArrayIntoCommandSeparatedList(engine.getSupportedCipherSuites())); + throw e; + } + } + + if (enabledProtocols != null) + { + try + { + engine.setEnabledProtocols(SSLSupport.parseCommaSeparatedListIntoArray(enabledProtocols)); + } + catch (IllegalArgumentException e) + { + HornetQClientLogger.LOGGER.invalidProtocol(SSLSupport.parseArrayIntoCommandSeparatedList(engine.getSupportedProtocols())); + throw e; + } + } + else + { + engine.setEnabledProtocols(originalProtocols); + } + + SslHandler handler = new SslHandler(engine); + + pipeline.addLast(handler); + } + + if (httpEnabled) + { + pipeline.addLast(new HttpRequestEncoder()); + + pipeline.addLast(new HttpResponseDecoder()); + + pipeline.addLast(new HttpObjectAggregator(Integer.MAX_VALUE)); + + pipeline.addLast(new HttpHandler()); + } + + if (httpUpgradeEnabled) + { + // prepare to handle a HTTP 101 response to upgrade the protocol. + final HttpClientCodec httpClientCodec = new HttpClientCodec(); + pipeline.addLast(httpClientCodec); + pipeline.addLast("http-upgrade", new HttpUpgradeHandler(pipeline, httpClientCodec)); + } + + protocolManager.addChannelHandlers(pipeline); + + pipeline.addLast(new HornetQClientChannelHandler(channelGroup, handler, new Listener())); + } + }); + + if (batchDelay > 0) + { + flusher = new BatchFlusher(); + + batchFlusherFuture = scheduledThreadPool.scheduleWithFixedDelay(flusher, batchDelay, batchDelay, TimeUnit.MILLISECONDS); + } + + HornetQClientLogger.LOGGER.debug("Started Netty Connector version " + TransportConstants.NETTY_VERSION); + } + + public synchronized void close() + { + if (channelClazz == null) + { + return; + } + + if (batchFlusherFuture != null) + { + batchFlusherFuture.cancel(false); + + flusher.cancel(); + + flusher = null; + + batchFlusherFuture = null; + } + + bootstrap = null; + channelGroup.close().awaitUninterruptibly(); + + // Shutdown the EventLoopGroup if no new task was added for 100ms or if + // 3000ms elapsed. + group.shutdownGracefully(100, 3000, TimeUnit.MILLISECONDS); + + channelClazz = null; + + for (Connection connection : connections.values()) + { + listener.connectionDestroyed(connection.getID()); + } + + connections.clear(); + } + + public boolean isStarted() + { + return channelClazz != null; + } + + public Connection createConnection() + { + if (channelClazz == null) + { + return null; + } + + // HORNETQ-907 - strip off IPv6 scope-id (if necessary) + SocketAddress remoteDestination = new InetSocketAddress(host, port); + InetAddress inetAddress = ((InetSocketAddress) remoteDestination).getAddress(); + if (inetAddress instanceof Inet6Address) + { + Inet6Address inet6Address = (Inet6Address) inetAddress; + if (inet6Address.getScopeId() != 0) + { + try + { + remoteDestination = new InetSocketAddress(InetAddress.getByAddress(inet6Address.getAddress()), ((InetSocketAddress) remoteDestination).getPort()); + } + catch (UnknownHostException e) + { + throw new IllegalArgumentException(e.getMessage()); + } + } + } + + HornetQClientLogger.LOGGER.debug("Remote destination: " + remoteDestination); + + ChannelFuture future; + //port 0 does not work so only use local address if set + if (localPort != 0) + { + SocketAddress localDestination; + if (localAddress != null) + { + localDestination = new InetSocketAddress(localAddress, localPort); + } + else + { + localDestination = new InetSocketAddress(localPort); + } + future = bootstrap.connect(remoteDestination, localDestination); + } + else + { + future = bootstrap.connect(remoteDestination); + } + + future.awaitUninterruptibly(); + + if (future.isSuccess()) + { + final Channel ch = future.channel(); + SslHandler sslHandler = ch.pipeline().get(SslHandler.class); + if (sslHandler != null) + { + Future<Channel> handshakeFuture = sslHandler.handshakeFuture(); + if (handshakeFuture.awaitUninterruptibly(30000)) + { + if (handshakeFuture.isSuccess()) + { + ChannelPipeline channelPipeline = ch.pipeline(); + HornetQChannelHandler channelHandler = channelPipeline.get(HornetQChannelHandler.class); + channelHandler.active = true; + } + else + { + ch.close().awaitUninterruptibly(); + HornetQClientLogger.LOGGER.errorCreatingNettyConnection(handshakeFuture.cause()); + return null; + } + } + else + { + //handshakeFuture.setFailure(new SSLException("Handshake was not completed in 30 seconds")); + ch.close().awaitUninterruptibly(); + return null; + } + + } + if (httpUpgradeEnabled) + { + // Send a HTTP GET + Upgrade request that will be handled by the http-upgrade handler. + try + { + //get this first incase it removes itself + HttpUpgradeHandler httpUpgradeHandler = (HttpUpgradeHandler) ch.pipeline().get("http-upgrade"); + URI uri = new URI("http", null, host, port, null, null, null); + HttpRequest request = new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, uri.getRawPath()); + request.headers().set(HttpHeaders.Names.HOST, host); + request.headers().set(HttpHeaders.Names.UPGRADE, HORNETQ_REMOTING); + request.headers().set(HttpHeaders.Names.CONNECTION, HttpHeaders.Values.UPGRADE); + + final String endpoint = ConfigurationHelper.getStringProperty(TransportConstants.HTTP_UPGRADE_ENDPOINT_PROP_NAME, + null, + configuration); + if (endpoint != null) + { + request.headers().set(TransportConstants.HTTP_UPGRADE_ENDPOINT_PROP_NAME, endpoint); + } + + // Get 16 bit nonce and base 64 encode it + byte[] nonce = randomBytes(16); + String key = base64(nonce); + request.headers().set(SEC_HORNETQ_REMOTING_KEY, key); + ch.attr(REMOTING_KEY).set(key); + + HornetQClientLogger.LOGGER.debugf("Sending HTTP request %s", request); + + // Send the HTTP request. + ch.writeAndFlush(request); + + if (!httpUpgradeHandler.awaitHandshake()) + { + return null; + } + } + catch (URISyntaxException e) + { + HornetQClientLogger.LOGGER.errorCreatingNettyConnection(e); + return null; + } + } + else + { + ChannelPipeline channelPipeline = ch.pipeline(); + HornetQChannelHandler channelHandler = channelPipeline.get(HornetQChannelHandler.class); + channelHandler.active = true; + } + + // No acceptor on a client connection + Listener connectionListener = new Listener(); + NettyConnection conn = new NettyConnection(configuration, ch, connectionListener, !httpEnabled && batchDelay > 0, false); + connectionListener.connectionCreated(null, conn, protocolManager.getName()); + return conn; + } + else + { + Throwable t = future.cause(); + + if (t != null && !(t instanceof ConnectException)) + { + HornetQClientLogger.LOGGER.errorCreatingNettyConnection(future.cause()); + } + + return null; + } + } + + // Public -------------------------------------------------------- + + // Package protected --------------------------------------------- + + // Protected ----------------------------------------------------- + + // Private ------------------------------------------------------- + + // Inner classes ------------------------------------------------- + + private static final class HornetQClientChannelHandler extends HornetQChannelHandler + { + HornetQClientChannelHandler(final ChannelGroup group, + final BufferHandler handler, + final ConnectionLifeCycleListener listener) + { + super(group, handler, listener); + } + } + + private static class HttpUpgradeHandler extends SimpleChannelInboundHandler<HttpObject> + { + private final ChannelPipeline pipeline; + private final HttpClientCodec httpClientCodec; + private final CountDownLatch latch = new CountDownLatch(1); + private boolean handshakeComplete = false; + + public HttpUpgradeHandler(ChannelPipeline pipeline, HttpClientCodec httpClientCodec) + { + this.pipeline = pipeline; + this.httpClientCodec = httpClientCodec; + } + + @Override + public void channelRead0(ChannelHandlerContext ctx, HttpObject msg) throws Exception + { + if (msg instanceof HttpResponse) + { + HttpResponse response = (HttpResponse) msg; + if (response.getStatus().code() == HttpResponseStatus.SWITCHING_PROTOCOLS.code() + && response.headers().get(HttpHeaders.Names.UPGRADE).equals(HORNETQ_REMOTING)) + { + String accept = response.headers().get(SEC_HORNETQ_REMOTING_ACCEPT); + String expectedResponse = createExpectedResponse(MAGIC_NUMBER, ctx.channel().attr(REMOTING_KEY).get()); + + if (expectedResponse.equals(accept)) + { + // remove the http handlers and flag the hornetq channel handler as active + pipeline.remove(httpClientCodec); + pipeline.remove(this); + handshakeComplete = true; + HornetQChannelHandler channelHandler = pipeline.get(HornetQChannelHandler.class); + channelHandler.active = true; + } + else + { + HornetQClientLogger.LOGGER.httpHandshakeFailed(accept, expectedResponse); + ctx.close(); + } + } + else if (response.getStatus().code() == HttpResponseStatus.FORBIDDEN.code()) + { + HornetQClientLogger.LOGGER.httpUpgradeNotSupportedByRemoteAcceptor(); + ctx.close(); + } + latch.countDown(); + } + } + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception + { + HornetQClientLogger.LOGGER.errorCreatingNettyConnection(cause); + ctx.close(); + } + + public boolean awaitHandshake() + { + try + { + if (!latch.await(30000, TimeUnit.MILLISECONDS)) + { + return false; + } + } + catch (InterruptedException e) + { + return false; + } + return handshakeComplete; + } + } + + class HttpHandler extends ChannelDuplexHandler + { + private Channel channel; + + private long lastSendTime = 0; + + private boolean waitingGet = false; + + private HttpIdleTimer task; + + private final String url; + + private final FutureLatch handShakeFuture = new FutureLatch(); + + private boolean active = false; + + private boolean handshaking = false; + + private String cookie; + + public HttpHandler() throws Exception + { + url = new URI("http", null, host, port, servletPath, null, null).toString(); + } + + @Override + public void channelActive(final ChannelHandlerContext ctx) throws Exception + { + super.channelActive(ctx); + channel = ctx.channel(); + if (httpClientIdleScanPeriod > 0) + { + task = new HttpIdleTimer(); + java.util.concurrent.Future<?> future = scheduledThreadPool.scheduleAtFixedRate(task, + httpClientIdleScanPeriod, + httpClientIdleScanPeriod, + TimeUnit.MILLISECONDS); + task.setFuture(future); + } + } + + @Override + public void channelInactive(final ChannelHandlerContext ctx) throws Exception + { + if (task != null) + { + task.close(); + } + + super.channelInactive(ctx); + } + + @Override + public void channelRead(final ChannelHandlerContext ctx, final Object msg) throws Exception + { + FullHttpResponse response = (FullHttpResponse) msg; + if (httpRequiresSessionId && !active) + { + Set<Cookie> cookieMap = CookieDecoder.decode(response.headers().get(HttpHeaders.Names.SET_COOKIE)); + for (Cookie cookie : cookieMap) + { + if (cookie.getName().equals("JSESSIONID")) + { + this.cookie = ClientCookieEncoder.encode(cookie); + } + } + active = true; + handShakeFuture.run(); + } + waitingGet = false; + ctx.fireChannelRead(response.content()); + } + + @Override + public void write(final ChannelHandlerContext ctx, final Object msg, ChannelPromise promise) throws Exception + { + if (msg instanceof ByteBuf) + { + if (httpRequiresSessionId && !active) + { + if (handshaking) + { + handshaking = true; + } + else + { + if (!handShakeFuture.await(5000)) + { + throw new RuntimeException("Handshake failed after timeout"); + } + } + } + + ByteBuf buf = (ByteBuf) msg; + FullHttpRequest httpRequest = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.POST, url, buf); + httpRequest.headers().add(HttpHeaders.Names.HOST, NettyConnector.this.host); + if (cookie != null) + { + httpRequest.headers().add(HttpHeaders.Names.COOKIE, cookie); + } + httpRequest.headers().add(HttpHeaders.Names.CONTENT_LENGTH, String.valueOf(buf.readableBytes())); + ctx.write(httpRequest, promise); + lastSendTime = System.currentTimeMillis(); + } + else + { + ctx.write(msg, promise); + lastSendTime = System.currentTimeMillis(); + } + } + + private class HttpIdleTimer implements Runnable + { + private boolean closed = false; + + private java.util.concurrent.Future<?> future; + + public synchronized void run() + { + if (closed) + { + return; + } + + if (!waitingGet && System.currentTimeMillis() > lastSendTime + httpMaxClientIdleTime) + { + FullHttpRequest httpRequest = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, url); + httpRequest.headers().add(HttpHeaders.Names.HOST, NettyConnector.this.host); + waitingGet = true; + channel.writeAndFlush(httpRequest); + } + } + + public synchronized void setFuture(final java.util.concurrent.Future<?> future) + { + this.future = future; + } + + public void close() + { + if (future != null) + { + future.cancel(false); + } + + closed = true; + } + } + } + + private class Listener implements ConnectionLifeCycleListener + { + public void connectionCreated(final HornetQComponent component, final Connection connection, final String protocol) + { + if (connections.putIfAbsent(connection.getID(), connection) != null) + { + throw HornetQClientMessageBundle.BUNDLE.connectionExists(connection.getID()); + } + } + + public void connectionDestroyed(final Object connectionID) + { + if (connections.remove(connectionID) != null) + { + // Execute on different thread to avoid deadlocks + closeExecutor.execute(new Runnable() + { + public void run() + { + listener.connectionDestroyed(connectionID); + } + }); + } + } + + public void connectionException(final Object connectionID, final HornetQException me) + { + // Execute on different thread to avoid deadlocks + closeExecutor.execute(new Runnable() + { + public void run() + { + listener.connectionException(connectionID, me); + } + }); + } + + public void connectionReadyForWrites(Object connectionID, boolean ready) + { + } + + + } + + private class BatchFlusher implements Runnable + { + private boolean cancelled; + + public synchronized void run() + { + if (!cancelled) + { + for (Connection connection : connections.values()) + { + connection.checkFlushBatchBuffer(); + } + } + } + + public synchronized void cancel() + { + cancelled = true; + } + } + + public boolean isEquivalent(Map<String, Object> configuration) + { + //here we only check host and port because these two parameters + //is sufficient to determine the target host + String host = ConfigurationHelper.getStringProperty(TransportConstants.HOST_PROP_NAME, + TransportConstants.DEFAULT_HOST, + configuration); + Integer port = ConfigurationHelper.getIntProperty(TransportConstants.PORT_PROP_NAME, + TransportConstants.DEFAULT_PORT, + configuration); + + if (!port.equals(this.port)) return false; + + if (host.equals(this.host)) return true; + + //The host may be an alias. We need to compare raw IP address. + boolean result = false; + try + { + InetAddress inetAddr1 = InetAddress.getByName(host); + InetAddress inetAddr2 = InetAddress.getByName(this.host); + String ip1 = inetAddr1.getHostAddress(); + String ip2 = inetAddr2.getHostAddress(); + HornetQClientLogger.LOGGER.debug(this + " host 1: " + host + " ip address: " + ip1 + " host 2: " + this.host + " ip address: " + ip2); + + result = ip1.equals(ip2); + } + catch (UnknownHostException e) + { + HornetQClientLogger.LOGGER.error("Cannot resolve host", e); + } + + return result; + } + + public void finalize() throws Throwable + { + close(); + super.finalize(); + } + + //for test purpose only + public Bootstrap getBootStrap() + { + return bootstrap; + } + + public static void clearThreadPools() + { + SharedNioEventLoopGroup.forceShutdown(); + } + + private static ClassLoader getThisClassLoader() + { + return AccessController.doPrivileged(new PrivilegedAction<ClassLoader>() + { + public ClassLoader run() + { + return ClientSessionFactoryImpl.class.getClassLoader(); + } + }); + + } + + private static String base64(byte[] data) + { + ByteBuf encodedData = Unpooled.wrappedBuffer(data); + ByteBuf encoded = Base64.encode(encodedData); + String encodedString = encoded.toString(StandardCharsets.UTF_8); + encoded.release(); + return encodedString; + } + + /** + * Creates an arbitrary number of random bytes + * + * @param size the number of random bytes to create + * @return An array of random bytes + */ + private static byte[] randomBytes(int size) + { + byte[] bytes = new byte[size]; + + for (int index = 0; index < size; index++) + { + bytes[index] = (byte) randomNumber(0, 255); + } + + return bytes; + } + + private static int randomNumber(int minimum, int maximum) + { + return (int) (Math.random() * maximum + minimum); + } + + public static String createExpectedResponse(final String magicNumber, final String secretKey) throws IOException + { + try + { + final String concat = secretKey + magicNumber; + final MessageDigest digest = MessageDigest.getInstance("SHA1"); + + digest.update(concat.getBytes(StandardCharsets.UTF_8)); + final byte[] bytes = digest.digest(); + return encodeBytes(bytes); + } + catch (NoSuchAlgorithmException e) + { + throw new IOException(e); + } + } +} +
http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-core-client/src/main/java/org/apache/activemq6/core/remoting/impl/netty/NettyConnectorFactory.java ---------------------------------------------------------------------- diff --git a/activemq6-core-client/src/main/java/org/apache/activemq6/core/remoting/impl/netty/NettyConnectorFactory.java b/activemq6-core-client/src/main/java/org/apache/activemq6/core/remoting/impl/netty/NettyConnectorFactory.java new file mode 100644 index 0000000..e2c31b6 --- /dev/null +++ b/activemq6-core-client/src/main/java/org/apache/activemq6/core/remoting/impl/netty/NettyConnectorFactory.java @@ -0,0 +1,61 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.apache.activemq6.core.remoting.impl.netty; + +import java.util.Map; +import java.util.Set; +import java.util.concurrent.Executor; +import java.util.concurrent.ScheduledExecutorService; + +import org.apache.activemq6.spi.core.remoting.BufferHandler; +import org.apache.activemq6.spi.core.remoting.ClientProtocolManager; +import org.apache.activemq6.spi.core.remoting.ConnectionLifeCycleListener; +import org.apache.activemq6.spi.core.remoting.Connector; +import org.apache.activemq6.spi.core.remoting.ConnectorFactory; + +/** + * A NettyConnectorFactory + * + * @author <a href="mailto:[email protected]">Tim Fox</a> + * @author <a href="mailto:[email protected]">Martyn Taylor</a> + */ +public class NettyConnectorFactory implements ConnectorFactory +{ + public Connector createConnector(final Map<String, Object> configuration, + final BufferHandler handler, + final ConnectionLifeCycleListener listener, + final Executor closeExecutor, + final Executor threadPool, + final ScheduledExecutorService scheduledThreadPool, + final ClientProtocolManager protocolManager) + { + return new NettyConnector(configuration, handler, listener, closeExecutor, threadPool, scheduledThreadPool); + } + + public Set<String> getAllowableProperties() + { + return TransportConstants.ALLOWABLE_CONNECTOR_KEYS; + } + + @Override + public boolean isReliable() + { + return false; + } + + @Override + public Map<String, Object> getDefaults() + { + return NettyConnector.DEFAULT_CONFIG; + } +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-core-client/src/main/java/org/apache/activemq6/core/remoting/impl/netty/PartialPooledByteBufAllocator.java ---------------------------------------------------------------------- diff --git a/activemq6-core-client/src/main/java/org/apache/activemq6/core/remoting/impl/netty/PartialPooledByteBufAllocator.java b/activemq6-core-client/src/main/java/org/apache/activemq6/core/remoting/impl/netty/PartialPooledByteBufAllocator.java new file mode 100644 index 0000000..6c481c6 --- /dev/null +++ b/activemq6-core-client/src/main/java/org/apache/activemq6/core/remoting/impl/netty/PartialPooledByteBufAllocator.java @@ -0,0 +1,152 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.apache.activemq6.core.remoting.impl.netty; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufAllocator; +import io.netty.buffer.CompositeByteBuf; +import io.netty.buffer.PooledByteBufAllocator; +import io.netty.buffer.UnpooledByteBufAllocator; + + +/** + * A {@link ByteBufAllocator} which is partial pooled. Which means only direct {@link ByteBuf}s are pooled. The rest + * is unpooled. + * + * @author <a href="mailto:[email protected]">Norman Maurer</a> + */ +public class PartialPooledByteBufAllocator implements ByteBufAllocator +{ + private static final ByteBufAllocator POOLED = new PooledByteBufAllocator(false); + private static final ByteBufAllocator UNPOOLED = new UnpooledByteBufAllocator(false); + + public static final PartialPooledByteBufAllocator INSTANCE = new PartialPooledByteBufAllocator(); + + private PartialPooledByteBufAllocator() + { + } + + @Override + public ByteBuf buffer() + { + return UNPOOLED.heapBuffer(); + } + + @Override + public ByteBuf buffer(int initialCapacity) + { + return UNPOOLED.heapBuffer(initialCapacity); + } + + @Override + public ByteBuf buffer(int initialCapacity, int maxCapacity) + { + return UNPOOLED.heapBuffer(initialCapacity, maxCapacity); + } + + @Override + public ByteBuf ioBuffer() + { + return UNPOOLED.heapBuffer(); + } + + @Override + public ByteBuf ioBuffer(int initialCapacity) + { + return UNPOOLED.heapBuffer(initialCapacity); + } + + @Override + public ByteBuf ioBuffer(int initialCapacity, int maxCapacity) + { + return UNPOOLED.heapBuffer(initialCapacity, maxCapacity); + } + + @Override + public ByteBuf heapBuffer() + { + return UNPOOLED.heapBuffer(); + } + + @Override + public ByteBuf heapBuffer(int initialCapacity) + { + return UNPOOLED.heapBuffer(initialCapacity); + } + + @Override + public ByteBuf heapBuffer(int initialCapacity, int maxCapacity) + { + return UNPOOLED.heapBuffer(initialCapacity, maxCapacity); + } + + @Override + public ByteBuf directBuffer() + { + return POOLED.directBuffer(); + } + + @Override + public ByteBuf directBuffer(int initialCapacity) + { + return POOLED.directBuffer(initialCapacity); + } + + @Override + public ByteBuf directBuffer(int initialCapacity, int maxCapacity) + { + return POOLED.directBuffer(initialCapacity, maxCapacity); + } + + @Override + public CompositeByteBuf compositeBuffer() + { + return UNPOOLED.compositeHeapBuffer(); + } + + @Override + public CompositeByteBuf compositeBuffer(int maxNumComponents) + { + return UNPOOLED.compositeHeapBuffer(maxNumComponents); + } + + @Override + public CompositeByteBuf compositeHeapBuffer() + { + return UNPOOLED.compositeHeapBuffer(); + } + + @Override + public CompositeByteBuf compositeHeapBuffer(int maxNumComponents) + { + return UNPOOLED.compositeHeapBuffer(maxNumComponents); + } + + @Override + public CompositeByteBuf compositeDirectBuffer() + { + return POOLED.compositeDirectBuffer(); + } + + @Override + public CompositeByteBuf compositeDirectBuffer(int maxNumComponents) + { + return POOLED.compositeDirectBuffer(); + } + + @Override + public boolean isDirectBufferPooled() + { + return true; + } +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-core-client/src/main/java/org/apache/activemq6/core/remoting/impl/netty/SharedNioEventLoopGroup.java ---------------------------------------------------------------------- diff --git a/activemq6-core-client/src/main/java/org/apache/activemq6/core/remoting/impl/netty/SharedNioEventLoopGroup.java b/activemq6-core-client/src/main/java/org/apache/activemq6/core/remoting/impl/netty/SharedNioEventLoopGroup.java new file mode 100644 index 0000000..f7ed135 --- /dev/null +++ b/activemq6-core-client/src/main/java/org/apache/activemq6/core/remoting/impl/netty/SharedNioEventLoopGroup.java @@ -0,0 +1,150 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +/* + * Copyright 2009 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.apache.activemq6.core.remoting.impl.netty; + +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.util.concurrent.Future; +import io.netty.util.concurrent.FutureListener; +import io.netty.util.concurrent.ImmediateEventExecutor; +import io.netty.util.concurrent.Promise; +import org.apache.activemq6.core.client.impl.ClientSessionFactoryImpl; +import org.apache.activemq6.utils.HornetQThreadFactory; + +import java.security.AccessController; +import java.security.PrivilegedAction; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; + +/** + * @author <a href="[email protected]">Norman Maurer</a> + */ +public class SharedNioEventLoopGroup extends NioEventLoopGroup +{ + private static SharedNioEventLoopGroup instance; + + private final AtomicReference<ScheduledFuture<?>> shutdown = new AtomicReference<ScheduledFuture<?>>(); + private final AtomicLong nioChannelFactoryCount = new AtomicLong(); + private final Promise<?> terminationPromise = ImmediateEventExecutor.INSTANCE.newPromise(); + + private SharedNioEventLoopGroup(int numThreads, ThreadFactory factory) + { + super(numThreads, factory); + } + + private static ClassLoader getThisClassLoader() + { + return AccessController.doPrivileged(new PrivilegedAction<ClassLoader>() + { + public ClassLoader run() + { + return ClientSessionFactoryImpl.class.getClassLoader(); + } + }); + } + + public static synchronized void forceShutdown() + { + if (instance != null) + { + instance.shutdown(); + instance.nioChannelFactoryCount.set(0); + instance = null; + } + } + + public static synchronized SharedNioEventLoopGroup getInstance(int numThreads) + { + if (instance != null) + { + ScheduledFuture f = instance.shutdown.getAndSet(null); + if (f != null) + { + f.cancel(false); + } + } + else + { + instance = new SharedNioEventLoopGroup(numThreads, new HornetQThreadFactory("HornetQ-client-netty-threads", true, getThisClassLoader())); + } + instance.nioChannelFactoryCount.incrementAndGet(); + return instance; + } + + @Override + public Future<?> terminationFuture() + { + return terminationPromise; + } + + @Override + public Future<?> shutdownGracefully() + { + return shutdownGracefully(100, 3000, TimeUnit.MILLISECONDS); + } + + @Override + public Future<?> shutdownGracefully(final long l, final long l2, final TimeUnit timeUnit) + { + if (nioChannelFactoryCount.decrementAndGet() == 0) + { + shutdown.compareAndSet(null, next().scheduleAtFixedRate(new Runnable() + { + @Override + public void run() + { + synchronized (SharedNioEventLoopGroup.class) + { + if (shutdown.get() != null) + { + Future<?> future = SharedNioEventLoopGroup.super.shutdownGracefully(l, l2, timeUnit); + future.addListener(new FutureListener<Object>() + { + @Override + public void operationComplete(Future future) throws Exception + { + if (future.isSuccess()) + { + terminationPromise.setSuccess(null); + } + else + { + terminationPromise.setFailure(future.cause()); + } + } + }); + instance = null; + } + } + } + + }, 10, 10, TimeUnit.SECONDS)); + } + return terminationPromise; + } +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-core-client/src/main/java/org/apache/activemq6/core/remoting/impl/netty/TransportConstants.java ---------------------------------------------------------------------- diff --git a/activemq6-core-client/src/main/java/org/apache/activemq6/core/remoting/impl/netty/TransportConstants.java b/activemq6-core-client/src/main/java/org/apache/activemq6/core/remoting/impl/netty/TransportConstants.java new file mode 100644 index 0000000..990e99e --- /dev/null +++ b/activemq6-core-client/src/main/java/org/apache/activemq6/core/remoting/impl/netty/TransportConstants.java @@ -0,0 +1,281 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.apache.activemq6.core.remoting.impl.netty; + +import java.util.Collections; +import java.util.HashSet; +import java.util.Set; + +import io.netty.util.Version; +import org.apache.activemq6.api.config.HornetQDefaultConfiguration; + +/** + * A TransportConstants + * + * @author <a href="mailto:[email protected]">Tim Fox</a> + */ +public class TransportConstants +{ + public static final String SSL_ENABLED_PROP_NAME = "ssl-enabled"; + + public static final String HTTP_ENABLED_PROP_NAME = "http-enabled"; + + public static final String HTTP_CLIENT_IDLE_PROP_NAME = "http-client-idle-time"; + + public static final String HTTP_CLIENT_IDLE_SCAN_PERIOD = "http-client-idle-scan-period"; + + public static final String HTTP_RESPONSE_TIME_PROP_NAME = "http-response-time"; + + public static final String HTTP_SERVER_SCAN_PERIOD_PROP_NAME = "http-server-scan-period"; + + public static final String HTTP_REQUIRES_SESSION_ID = "http-requires-session-id"; + + public static final String HTTP_UPGRADE_ENABLED_PROP_NAME = "http-upgrade-enabled"; + + public static final String HTTP_UPGRADE_ENDPOINT_PROP_NAME = "http-upgrade-endpoint"; + + public static final String USE_SERVLET_PROP_NAME = "use-servlet"; + + public static final String SERVLET_PATH = "servlet-path"; + + public static final String USE_NIO_PROP_NAME = "use-nio"; + + public static final String USE_NIO_GLOBAL_WORKER_POOL_PROP_NAME = "use-nio-global-worker-pool"; + + public static final String USE_INVM_PROP_NAME = "use-invm"; + + public static final String PROTOCOL_PROP_NAME = "protocol"; + + public static final String PROTOCOLS_PROP_NAME = "protocols"; + + public static final String HOST_PROP_NAME = "host"; + + public static final String PORT_PROP_NAME = "port"; + + public static final String LOCAL_ADDRESS_PROP_NAME = "local-address"; + + public static final String LOCAL_PORT_PROP_NAME = "local-port"; + + public static final String KEYSTORE_PROVIDER_PROP_NAME = "key-store-provider"; + + public static final String KEYSTORE_PATH_PROP_NAME = "key-store-path"; + + public static final String KEYSTORE_PASSWORD_PROP_NAME = "key-store-password"; + + public static final String TRUSTSTORE_PROVIDER_PROP_NAME = "trust-store-provider"; + + public static final String TRUSTSTORE_PATH_PROP_NAME = "trust-store-path"; + + public static final String TRUSTSTORE_PASSWORD_PROP_NAME = "trust-store-password"; + + public static final String ENABLED_CIPHER_SUITES_PROP_NAME = "enabled-cipher-suites"; + + public static final String ENABLED_PROTOCOLS_PROP_NAME = "enabled-protocols"; + + public static final String NEED_CLIENT_AUTH_PROP_NAME = "need-client-auth"; + + public static final String BACKLOG_PROP_NAME = "backlog"; + + public static final String NETTY_VERSION; + + /** + * Disable Nagle's algorithm.<br> + * Valid for (client) Sockets. + * + * @see <a + * href="http://design.jboss.org/jbossorg/branding/Javadocs/doc/api/org/jboss/netty/channel/socket/SocketChannelConfig.html#setTcpNoDelay%28boolean%29"> + * Netty note on this option</a> + * @see <a href="http://docs.oracle.com/javase/6/docs/technotes/guides/net/socketOpt.html">Oracle + * doc on tcpNoDelay</a> + */ + public static final String TCP_NODELAY_PROPNAME = "tcp-no-delay"; + + public static final String TCP_SENDBUFFER_SIZE_PROPNAME = "tcp-send-buffer-size"; + + public static final String TCP_RECEIVEBUFFER_SIZE_PROPNAME = "tcp-receive-buffer-size"; + + public static final String NIO_REMOTING_THREADS_PROPNAME = "nio-remoting-threads"; + + public static final String BATCH_DELAY = "batch-delay"; + + public static final String DIRECT_DELIVER = "direct-deliver"; + + public static final String CLUSTER_CONNECTION = "cluster-connection"; + + public static final String STOMP_CONSUMERS_CREDIT = "stomp-consumer-credits"; + + public static final int STOMP_DEFAULT_CONSUMERS_CREDIT = 10 * 1024; // 10K + + public static final boolean DEFAULT_SSL_ENABLED = false; + + public static final boolean DEFAULT_USE_NIO_GLOBAL_WORKER_POOL = true; + + public static final boolean DEFAULT_USE_INVM = false; + + public static final boolean DEFAULT_USE_SERVLET = false; + + public static final String DEFAULT_HOST = "localhost"; + + public static final int DEFAULT_PORT = 5445; + + public static final String DEFAULT_LOCAL_ADDRESS = null; + + public static final int DEFAULT_LOCAL_PORT = 0; + + public static final int DEFAULT_STOMP_PORT = 61613; + + public static final String DEFAULT_KEYSTORE_PROVIDER = "JKS"; + + public static final String DEFAULT_KEYSTORE_PATH = null; + + public static final String DEFAULT_KEYSTORE_PASSWORD = null; + + public static final String DEFAULT_TRUSTSTORE_PROVIDER = "JKS"; + + public static final String DEFAULT_TRUSTSTORE_PATH = null; + + public static final String DEFAULT_TRUSTSTORE_PASSWORD = null; + + public static final String DEFAULT_ENABLED_CIPHER_SUITES = null; + + public static final String DEFAULT_ENABLED_PROTOCOLS = null; + + public static final boolean DEFAULT_NEED_CLIENT_AUTH = false; + + public static final boolean DEFAULT_TCP_NODELAY = true; + + public static final int DEFAULT_TCP_SENDBUFFER_SIZE = 32768; + + public static final int DEFAULT_TCP_RECEIVEBUFFER_SIZE = 32768; + + public static final boolean DEFAULT_HTTP_ENABLED = false; + + public static final long DEFAULT_HTTP_CLIENT_IDLE_TIME = 500; + + public static final long DEFAULT_HTTP_CLIENT_SCAN_PERIOD = 500; + + public static final long DEFAULT_HTTP_RESPONSE_TIME = 10000; + + public static final long DEFAULT_HTTP_SERVER_SCAN_PERIOD = 5000; + + public static final boolean DEFAULT_HTTP_REQUIRES_SESSION_ID = false; + + public static final boolean DEFAULT_HTTP_UPGRADE_ENABLED = false; + + public static final String DEFAULT_SERVLET_PATH = "/messaging/HornetQServlet"; + + public static final long DEFAULT_BATCH_DELAY = 0; + + public static final boolean DEFAULT_DIRECT_DELIVER = true; + + public static final Set<String> ALLOWABLE_CONNECTOR_KEYS; + + public static final Set<String> ALLOWABLE_ACCEPTOR_KEYS; + + public static final String CONNECTION_TTL = "connection-ttl"; + + public static final String STOMP_ENABLE_MESSAGE_ID = "stomp-enable-message-id"; + + public static final String STOMP_MIN_LARGE_MESSAGE_SIZE = "stomp-min-large-message-size"; + + public static final String NETTY_CONNECT_TIMEOUT = "connect-timeout-millis"; + + public static final int DEFAULT_NETTY_CONNECT_TIMEOUT = -1; + + static + { + Set<String> allowableAcceptorKeys = new HashSet<String>(); + allowableAcceptorKeys.add(TransportConstants.SSL_ENABLED_PROP_NAME); + allowableAcceptorKeys.add(TransportConstants.HTTP_RESPONSE_TIME_PROP_NAME); + allowableAcceptorKeys.add(TransportConstants.HTTP_SERVER_SCAN_PERIOD_PROP_NAME); + allowableAcceptorKeys.add(TransportConstants.HTTP_UPGRADE_ENABLED_PROP_NAME); + allowableAcceptorKeys.add(TransportConstants.USE_NIO_PROP_NAME); + allowableAcceptorKeys.add(TransportConstants.USE_INVM_PROP_NAME); + allowableAcceptorKeys.add(TransportConstants.PROTOCOL_PROP_NAME); + allowableAcceptorKeys.add(TransportConstants.PROTOCOLS_PROP_NAME); + allowableAcceptorKeys.add(TransportConstants.HOST_PROP_NAME); + allowableAcceptorKeys.add(TransportConstants.PORT_PROP_NAME); + allowableAcceptorKeys.add(TransportConstants.KEYSTORE_PROVIDER_PROP_NAME); + allowableAcceptorKeys.add(TransportConstants.KEYSTORE_PATH_PROP_NAME); + allowableAcceptorKeys.add(TransportConstants.KEYSTORE_PASSWORD_PROP_NAME); + allowableAcceptorKeys.add(TransportConstants.TRUSTSTORE_PROVIDER_PROP_NAME); + allowableAcceptorKeys.add(TransportConstants.TRUSTSTORE_PATH_PROP_NAME); + allowableAcceptorKeys.add(TransportConstants.TRUSTSTORE_PASSWORD_PROP_NAME); + allowableAcceptorKeys.add(TransportConstants.ENABLED_CIPHER_SUITES_PROP_NAME); + allowableAcceptorKeys.add(TransportConstants.ENABLED_PROTOCOLS_PROP_NAME); + allowableAcceptorKeys.add(TransportConstants.NEED_CLIENT_AUTH_PROP_NAME); + allowableAcceptorKeys.add(TransportConstants.TCP_NODELAY_PROPNAME); + allowableAcceptorKeys.add(TransportConstants.TCP_SENDBUFFER_SIZE_PROPNAME); + allowableAcceptorKeys.add(TransportConstants.TCP_RECEIVEBUFFER_SIZE_PROPNAME); + allowableAcceptorKeys.add(TransportConstants.NIO_REMOTING_THREADS_PROPNAME); + allowableAcceptorKeys.add(TransportConstants.BATCH_DELAY); + allowableAcceptorKeys.add(TransportConstants.DIRECT_DELIVER); + allowableAcceptorKeys.add(TransportConstants.CLUSTER_CONNECTION); + allowableAcceptorKeys.add(TransportConstants.STOMP_CONSUMERS_CREDIT); + allowableAcceptorKeys.add(TransportConstants.STOMP_MIN_LARGE_MESSAGE_SIZE); + allowableAcceptorKeys.add(TransportConstants.CONNECTION_TTL); + allowableAcceptorKeys.add(TransportConstants.STOMP_ENABLE_MESSAGE_ID); + allowableAcceptorKeys.add(HornetQDefaultConfiguration.getPropMaskPassword()); + allowableAcceptorKeys.add(HornetQDefaultConfiguration.getPropPasswordCodec()); + + ALLOWABLE_ACCEPTOR_KEYS = Collections.unmodifiableSet(allowableAcceptorKeys); + + Set<String> allowableConnectorKeys = new HashSet<String>(); + allowableConnectorKeys.add(TransportConstants.SSL_ENABLED_PROP_NAME); + allowableConnectorKeys.add(TransportConstants.HTTP_ENABLED_PROP_NAME); + allowableConnectorKeys.add(TransportConstants.HTTP_CLIENT_IDLE_PROP_NAME); + allowableConnectorKeys.add(TransportConstants.HTTP_CLIENT_IDLE_SCAN_PERIOD); + allowableConnectorKeys.add(TransportConstants.HTTP_REQUIRES_SESSION_ID); + allowableConnectorKeys.add(TransportConstants.HTTP_UPGRADE_ENABLED_PROP_NAME); + allowableConnectorKeys.add(TransportConstants.HTTP_UPGRADE_ENDPOINT_PROP_NAME); + allowableConnectorKeys.add(TransportConstants.USE_SERVLET_PROP_NAME); + allowableConnectorKeys.add(TransportConstants.SERVLET_PATH); + allowableConnectorKeys.add(TransportConstants.USE_NIO_PROP_NAME); + allowableConnectorKeys.add(TransportConstants.USE_NIO_GLOBAL_WORKER_POOL_PROP_NAME); + allowableConnectorKeys.add(TransportConstants.HOST_PROP_NAME); + allowableConnectorKeys.add(TransportConstants.PORT_PROP_NAME); + allowableConnectorKeys.add(TransportConstants.LOCAL_ADDRESS_PROP_NAME); + allowableConnectorKeys.add(TransportConstants.LOCAL_PORT_PROP_NAME); + allowableConnectorKeys.add(TransportConstants.KEYSTORE_PROVIDER_PROP_NAME); + allowableConnectorKeys.add(TransportConstants.KEYSTORE_PATH_PROP_NAME); + allowableConnectorKeys.add(TransportConstants.KEYSTORE_PASSWORD_PROP_NAME); + allowableConnectorKeys.add(TransportConstants.TRUSTSTORE_PROVIDER_PROP_NAME); + allowableConnectorKeys.add(TransportConstants.TRUSTSTORE_PATH_PROP_NAME); + allowableConnectorKeys.add(TransportConstants.TRUSTSTORE_PASSWORD_PROP_NAME); + allowableConnectorKeys.add(TransportConstants.ENABLED_CIPHER_SUITES_PROP_NAME); + allowableConnectorKeys.add(TransportConstants.ENABLED_PROTOCOLS_PROP_NAME); + allowableConnectorKeys.add(TransportConstants.TCP_NODELAY_PROPNAME); + allowableConnectorKeys.add(TransportConstants.TCP_SENDBUFFER_SIZE_PROPNAME); + allowableConnectorKeys.add(TransportConstants.TCP_RECEIVEBUFFER_SIZE_PROPNAME); + allowableConnectorKeys.add(TransportConstants.NIO_REMOTING_THREADS_PROPNAME); + allowableConnectorKeys.add(TransportConstants.BATCH_DELAY); + allowableConnectorKeys.add(HornetQDefaultConfiguration.getPropMaskPassword()); + allowableConnectorKeys.add(HornetQDefaultConfiguration.getPropPasswordCodec()); + allowableConnectorKeys.add(TransportConstants.NETTY_CONNECT_TIMEOUT); + + ALLOWABLE_CONNECTOR_KEYS = Collections.unmodifiableSet(allowableConnectorKeys); + + String version; + Version v = Version.identify().get("netty-transport"); + if (v == null) + { + version = "unknown"; + } + else + { + version = v.artifactVersion(); + } + NETTY_VERSION = version; + } + +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-core-client/src/main/java/org/apache/activemq6/core/remoting/impl/ssl/SSLSupport.java ---------------------------------------------------------------------- diff --git a/activemq6-core-client/src/main/java/org/apache/activemq6/core/remoting/impl/ssl/SSLSupport.java b/activemq6-core-client/src/main/java/org/apache/activemq6/core/remoting/impl/ssl/SSLSupport.java new file mode 100644 index 0000000..f2036af --- /dev/null +++ b/activemq6-core-client/src/main/java/org/apache/activemq6/core/remoting/impl/ssl/SSLSupport.java @@ -0,0 +1,188 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.apache.activemq6.core.remoting.impl.ssl; + +import java.io.File; +import java.io.IOException; +import java.io.InputStream; +import java.net.MalformedURLException; +import java.net.URL; +import java.security.AccessController; +import java.security.KeyStore; +import java.security.PrivilegedAction; +import java.security.SecureRandom; + +import javax.net.ssl.KeyManager; +import javax.net.ssl.KeyManagerFactory; +import javax.net.ssl.SSLContext; +import javax.net.ssl.TrustManager; +import javax.net.ssl.TrustManagerFactory; + +import org.apache.activemq6.utils.ClassloadingUtil; + +/** + * @author <a href="mailto:[email protected]">Jeff Mesnil</a> + * @author Justin Bertram + * + * Please note, this class supports PKCS#11 keystores, but there are no specific tests in the HornetQ test-suite to + * validate/verify this works because this requires a functioning PKCS#11 provider which is not available by default + * (see java.security.Security#getProviders()). The main thing to keep in mind is that PKCS#11 keystores will have a + * null keystore path. + */ +public class SSLSupport +{ + // Public -------------------------------------------------------- + + public static SSLContext createContext(final String keystoreProvider, final String keystorePath, final String keystorePassword, + final String trustStoreProvider, final String trustStorePath, final String trustStorePassword) throws Exception + { + SSLContext context = SSLContext.getInstance("TLS"); + KeyManager[] keyManagers = SSLSupport.loadKeyManagers(keystoreProvider, keystorePath, keystorePassword); + TrustManager[] trustManagers = SSLSupport.loadTrustManager(trustStoreProvider, trustStorePath, trustStorePassword); + context.init(keyManagers, trustManagers, new SecureRandom()); + return context; + } + + public static String[] parseCommaSeparatedListIntoArray(String suites) + { + String[] cipherSuites = suites.split(","); + for (int i = 0; i < cipherSuites.length; i++) + { + cipherSuites[i] = cipherSuites[i].trim(); + } + return cipherSuites; + } + + public static String parseArrayIntoCommandSeparatedList(String[] suites) + { + StringBuilder supportedSuites = new StringBuilder(); + + for (int i = 0; i < suites.length; i++) + { + supportedSuites.append(suites[i]); + supportedSuites.append(", "); + } + + // trim the last 2 characters (i.e. unnecessary comma and space) + return supportedSuites.delete(supportedSuites.length() - 2, supportedSuites.length()).toString(); + } + + // Private ------------------------------------------------------- + + private static TrustManager[] loadTrustManager(final String trustStoreProvider, + final String trustStorePath, + final String trustStorePassword) throws Exception + { + if (trustStorePath == null && (trustStoreProvider == null || (trustStoreProvider != null && !"PKCS11".equals(trustStoreProvider.toUpperCase())))) + { + return null; + } + else + { + TrustManagerFactory trustMgrFactory; + KeyStore trustStore = SSLSupport.loadKeystore(trustStoreProvider, trustStorePath, trustStorePassword); + trustMgrFactory = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm()); + trustMgrFactory.init(trustStore); + return trustMgrFactory.getTrustManagers(); + } + } + + private static KeyStore loadKeystore(final String keystoreProvider, final String keystorePath, final String keystorePassword) throws Exception + { + KeyStore ks = KeyStore.getInstance(keystoreProvider); + InputStream in = null; + try + { + if (keystorePath != null) + { + URL keystoreURL = SSLSupport.validateStoreURL(keystorePath); + in = keystoreURL.openStream(); + } + ks.load(in, keystorePassword.toCharArray()); + } + finally + { + if (in != null) + { + try + { + in.close(); + } + catch (IOException ignored) + { + } + } + } + return ks; + } + + private static KeyManager[] loadKeyManagers(final String keyStoreProvider, final String keystorePath, final String keystorePassword) throws Exception + { + if (keystorePath == null && (keyStoreProvider == null || (keyStoreProvider != null && !"PKCS11".equals(keyStoreProvider.toUpperCase())))) + { + return null; + } + else + { + KeyManagerFactory kmf = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm()); + KeyStore ks = SSLSupport.loadKeystore(keyStoreProvider, keystorePath, keystorePassword); + kmf.init(ks, keystorePassword.toCharArray()); + + return kmf.getKeyManagers(); + } + } + + private static URL validateStoreURL(final String storePath) throws Exception + { + assert storePath != null; + + // First see if this is a URL + try + { + return new URL(storePath); + } + catch (MalformedURLException e) + { + File file = new File(storePath); + if (file.exists() == true && file.isFile()) + { + return file.toURI().toURL(); + } + else + { + URL url = findResource(storePath); + if (url != null) + { + return url; + } + } + } + + throw new Exception("Failed to find a store at " + storePath); + } + + /** This seems duplicate code all over the place, but for security reasons we can't let something like this to be open in a + * utility class, as it would be a door to load anything you like in a safe VM. + * For that reason any class trying to do a privileged block should do with the AccessController directly. + */ + private static URL findResource(final String resourceName) + { + return AccessController.doPrivileged(new PrivilegedAction<URL>() + { + public URL run() + { + return ClassloadingUtil.findResource(resourceName); + } + }); + } +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-core-client/src/main/java/org/apache/activemq6/core/remoting/package-info.java ---------------------------------------------------------------------- diff --git a/activemq6-core-client/src/main/java/org/apache/activemq6/core/remoting/package-info.java b/activemq6-core-client/src/main/java/org/apache/activemq6/core/remoting/package-info.java new file mode 100644 index 0000000..457daae --- /dev/null +++ b/activemq6-core-client/src/main/java/org/apache/activemq6/core/remoting/package-info.java @@ -0,0 +1,19 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +/** + * Remoting API. + * <br> + * This package defines the API used by HornetQ for remoting. + */ +package org.apache.activemq6.core.remoting; + http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-core-client/src/main/java/org/apache/activemq6/core/security/HornetQPrincipal.java ---------------------------------------------------------------------- diff --git a/activemq6-core-client/src/main/java/org/apache/activemq6/core/security/HornetQPrincipal.java b/activemq6-core-client/src/main/java/org/apache/activemq6/core/security/HornetQPrincipal.java new file mode 100644 index 0000000..94f6752 --- /dev/null +++ b/activemq6-core-client/src/main/java/org/apache/activemq6/core/security/HornetQPrincipal.java @@ -0,0 +1,41 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package org.apache.activemq6.core.security; + +/** + * @author <a href="mailto:[email protected]">Andy Taylor</a> + * 1/30/12 + */ +public class HornetQPrincipal +{ + private final String userName; + + private final String password; + + public HornetQPrincipal(String userName, String password) + { + this.userName = userName; + this.password = password; + } + + public String getUserName() + { + return userName; + } + + public String getPassword() + { + return password; + } +}
