Repository: asterixdb Updated Branches: refs/heads/master f184a1e7b -> 595a0f3e8
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/595a0f3e/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/HyracksConnection.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/HyracksConnection.java b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/HyracksConnection.java index e6c28fa..2c7e82e 100644 --- a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/HyracksConnection.java +++ b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/HyracksConnection.java @@ -52,10 +52,12 @@ import org.apache.hyracks.api.job.JobId; import org.apache.hyracks.api.job.JobInfo; import org.apache.hyracks.api.job.JobSpecification; import org.apache.hyracks.api.job.JobStatus; +import org.apache.hyracks.api.network.ISocketChannelFactory; import org.apache.hyracks.api.topology.ClusterTopology; import org.apache.hyracks.api.util.InvokeUtil; import org.apache.hyracks.api.util.JavaSerializationUtils; import org.apache.hyracks.ipc.api.RPCInterface; +import org.apache.hyracks.ipc.sockets.PlainSocketChannelFactory; import org.apache.hyracks.util.ExitUtil; import org.apache.hyracks.util.InterruptibleAction; import org.apache.logging.log4j.Level; @@ -102,11 +104,12 @@ public final class HyracksConnection implements IHyracksClientConnection { * host name. * @throws Exception */ - public HyracksConnection(String ccHost, int ccPort) throws Exception { + public HyracksConnection(String ccHost, int ccPort, ISocketChannelFactory socketChannelFactory) throws Exception { this.ccHost = ccHost; this.ccPort = ccPort; RPCInterface rpci = new RPCInterface(); - ipc = new IPCSystem(new InetSocketAddress(0), rpci, new JavaSerializationBasedPayloadSerializerDeserializer()); + ipc = new IPCSystem(new InetSocketAddress(0), socketChannelFactory, rpci, + new JavaSerializationBasedPayloadSerializerDeserializer()); ipc.start(); hci = new HyracksClientInterfaceRemoteProxy(ipc.getReconnectingHandle(new InetSocketAddress(ccHost, ccPort)), rpci); @@ -115,6 +118,10 @@ public final class HyracksConnection implements IHyracksClientConnection { uninterruptibleExecutor.execute(new UninterrubtileHandlerWatcher()); } + public HyracksConnection(String ccHost, int ccPort) throws Exception { + this(ccHost, ccPort, PlainSocketChannelFactory.INSTANCE); + } + @Override public JobStatus getJobStatus(JobId jobId) throws Exception { return hci.getJobStatus(jobId); http://git-wip-us.apache.org/repos/asf/asterixdb/blob/595a0f3e/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCConnectionManager.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCConnectionManager.java b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCConnectionManager.java index 9ef506e..205ecfe 100644 --- a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCConnectionManager.java +++ b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCConnectionManager.java @@ -36,8 +36,10 @@ import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Set; -import java.util.concurrent.TimeUnit; +import java.util.concurrent.CompletableFuture; +import org.apache.hyracks.api.network.ISocketChannel; +import org.apache.hyracks.api.network.ISocketChannelFactory; import org.apache.hyracks.util.ExitUtil; import org.apache.hyracks.util.NetworkUtil; import org.apache.logging.log4j.Level; @@ -71,8 +73,12 @@ public class IPCConnectionManager { private volatile boolean stopped; - IPCConnectionManager(IPCSystem system, InetSocketAddress socketAddress) throws IOException { + private final ISocketChannelFactory socketChannelFactory; + + IPCConnectionManager(IPCSystem system, InetSocketAddress socketAddress, ISocketChannelFactory socketChannelFactory) + throws IOException { this.system = system; + this.socketChannelFactory = socketChannelFactory; this.serverSocketChannel = ServerSocketChannel.open(); serverSocketChannel.socket().setReuseAddress(true); serverSocketChannel.configureBlocking(false); @@ -209,7 +215,8 @@ public class IPCConnectionManager { SelectionKey key = i.next(); i.remove(); final SelectableChannel sc = key.channel(); - if (key.isReadable()) { + // do not attempt to read until handle is set (e.g. after handshake is completed) + if (key.isReadable() && key.attachment() != null) { read(key); } else if (key.isWritable()) { write(key); @@ -229,8 +236,13 @@ public class IPCConnectionManager { try { connected = channel.finishConnect(); if (connected) { - connectableKey.interestOps(SelectionKey.OP_READ); - connectionEstablished(handle); + SelectionKey channelKey = channel.register(selector, SelectionKey.OP_READ); + final ISocketChannel clientChannel = socketChannelFactory.createClientChannel(channel); + if (clientChannel.requiresHandshake()) { + asyncHandshake(clientChannel, handle, channelKey); + } else { + connectionEstablished(handle, channelKey, clientChannel); + } } } catch (IOException e) { LOGGER.warn("Exception finishing connect", e); @@ -248,11 +260,13 @@ public class IPCConnectionManager { try { channel = serverSocketChannel.accept(); register(channel); + final ISocketChannel serverChannel = socketChannelFactory.createServerChannel(channel); channelKey = channel.register(selector, SelectionKey.OP_READ); - IPCHandle handle = new IPCHandle(system, null); - handle.setKey(channelKey); - channelKey.attach(handle); - handle.setState(HandleState.CONNECT_RECEIVED); + if (serverChannel.requiresHandshake()) { + asyncHandshake(serverChannel, null, channelKey); + } else { + connectionReceived(serverChannel, channelKey); + } } catch (IOException e) { LOGGER.error("Failed to accept channel ", e); close(channelKey, channel); @@ -268,12 +282,17 @@ public class IPCConnectionManager { register(channel); if (channel.connect(handle.getRemoteAddress())) { channelKey = channel.register(selector, SelectionKey.OP_READ); - connectionEstablished(handle); + final ISocketChannel clientChannel = socketChannelFactory.createClientChannel(channel); + if (clientChannel.requiresHandshake()) { + asyncHandshake(clientChannel, handle, channelKey); + } else { + connectionEstablished(handle, channelKey, clientChannel); + } } else { channelKey = channel.register(selector, SelectionKey.OP_CONNECT); + handle.setKey(channelKey); + channelKey.attach(handle); } - handle.setKey(channelKey); - channelKey.attach(handle); } catch (IOException e) { LOGGER.error("Failed to accept channel ", e); close(channelKey, channel); @@ -283,10 +302,13 @@ public class IPCConnectionManager { workingPendingConnections.clear(); } - private void connectionEstablished(IPCHandle handle) { + private void connectionEstablished(IPCHandle handle, SelectionKey channelKey, ISocketChannel channel) { + handle.setSocketChannel(channel); handle.setState(HandleState.CONNECT_SENT); + handle.setKey(channelKey); registerHandle(handle); IPCConnectionManager.this.write(createInitialReqMessage(handle)); + channelKey.attach(handle); } private void sendPendingMessages() { @@ -367,7 +389,7 @@ public class IPCConnectionManager { IPCHandle handle = (IPCHandle) readableKey.attachment(); ByteBuffer readBuffer = handle.getInBuffer(); try { - int len = channel.read(readBuffer); + int len = handle.getSocketChannel().read(readBuffer); if (len < 0) { close(readableKey, channel); return; @@ -386,15 +408,16 @@ public class IPCConnectionManager { private void write(SelectionKey writableKey) { SocketChannel channel = (SocketChannel) writableKey.channel(); IPCHandle handle = (IPCHandle) writableKey.attachment(); + final ISocketChannel socketChannel = handle.getSocketChannel(); ByteBuffer writeBuffer = handle.getOutBuffer(); try { - int len = channel.write(writeBuffer); + int len = socketChannel.write(writeBuffer); if (len < 0) { close(writableKey, channel); return; } system.getPerformanceCounters().addMessageBytesSent(len); - if (!writeBuffer.hasRemaining()) { + if (!writeBuffer.hasRemaining() && !socketChannel.isPendingWrite()) { writableKey.interestOps(writableKey.interestOps() & ~SelectionKey.OP_WRITE); } if (handle.full()) { @@ -445,5 +468,31 @@ public class IPCConnectionManager { target.addAll(source); source.clear(); } + + private void asyncHandshake(ISocketChannel socketChannel, IPCHandle handle, SelectionKey channelKey) { + CompletableFuture.supplyAsync(socketChannel::handshake).exceptionally(ex -> false).thenAccept( + handshakeSuccess -> handleHandshakeCompletion(handshakeSuccess, socketChannel, handle, channelKey)); + } + + private void handleHandshakeCompletion(Boolean handshakeSuccess, ISocketChannel socketChannel, IPCHandle handle, + SelectionKey channelKey) { + if (handshakeSuccess) { + if (handle == null) { + connectionReceived(socketChannel, channelKey); + } else { + connectionEstablished(handle, channelKey, socketChannel); + } + } else { + close(channelKey, socketChannel.getSocketChannel()); + } + } + + private void connectionReceived(ISocketChannel channel, SelectionKey channelKey) { + final IPCHandle handle = new IPCHandle(system, null); + handle.setState(HandleState.CONNECT_RECEIVED); + handle.setSocketChannel(channel); + handle.setKey(channelKey); + channelKey.attach(handle); + } } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/595a0f3e/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCHandle.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCHandle.java b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCHandle.java index 09c7c97..5d92960 100644 --- a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCHandle.java +++ b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCHandle.java @@ -22,6 +22,7 @@ import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; +import org.apache.hyracks.api.network.ISocketChannel; import org.apache.hyracks.ipc.api.IIPCHandle; import org.apache.hyracks.ipc.exceptions.IPCException; @@ -44,6 +45,8 @@ final class IPCHandle implements IIPCHandle { private boolean full; + private ISocketChannel socketChannel; + IPCHandle(IPCSystem system, InetSocketAddress remoteAddress) { this.system = system; this.remoteAddress = remoteAddress; @@ -100,6 +103,14 @@ final class IPCHandle implements IIPCHandle { this.key = key; } + public ISocketChannel getSocketChannel() { + return socketChannel; + } + + public void setSocketChannel(ISocketChannel socketChannel) { + this.socketChannel = socketChannel; + } + public synchronized boolean isConnected() { return state == HandleState.CONNECTED; } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/595a0f3e/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCSystem.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCSystem.java b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCSystem.java index b7dcf05..8d90ba3 100644 --- a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCSystem.java +++ b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCSystem.java @@ -22,6 +22,7 @@ import java.io.IOException; import java.net.InetSocketAddress; import java.util.concurrent.atomic.AtomicLong; +import org.apache.hyracks.api.network.ISocketChannelFactory; import org.apache.hyracks.ipc.api.IIPCEventListener; import org.apache.hyracks.ipc.api.IIPCHandle; import org.apache.hyracks.ipc.api.IIPCI; @@ -45,9 +46,9 @@ public class IPCSystem { private final IPCPerformanceCounters perfCounters; - public IPCSystem(InetSocketAddress socketAddress, IIPCI ipci, IPayloadSerializerDeserializer serde) - throws IOException { - cMgr = new IPCConnectionManager(this, socketAddress); + public IPCSystem(InetSocketAddress socketAddress, ISocketChannelFactory socketChannelFactory, IIPCI ipci, + IPayloadSerializerDeserializer serde) throws IOException { + cMgr = new IPCConnectionManager(this, socketAddress, socketChannelFactory); this.ipci = ipci; this.serde = serde; midFactory = new AtomicLong(); http://git-wip-us.apache.org/repos/asf/asterixdb/blob/595a0f3e/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/security/NetworkSecurityConfig.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/security/NetworkSecurityConfig.java b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/security/NetworkSecurityConfig.java new file mode 100644 index 0000000..7f02830 --- /dev/null +++ b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/security/NetworkSecurityConfig.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hyracks.ipc.security; + +import java.io.File; +import java.security.KeyStore; + +import org.apache.hyracks.api.network.INetworkSecurityConfig; + +public class NetworkSecurityConfig implements INetworkSecurityConfig { + + private final boolean sslEnabled; + private final File keyStoreFile; + private final File trustStoreFile; + private final String keyStorePassword; + private final KeyStore keyStore; + + private NetworkSecurityConfig(boolean sslEnabled, String keyStoreFile, String keyStorePassword, + String trustStoreFile, KeyStore keyStore) { + this.sslEnabled = sslEnabled; + this.keyStoreFile = keyStoreFile != null ? new File(keyStoreFile) : null; + this.keyStorePassword = keyStorePassword; + this.trustStoreFile = trustStoreFile != null ? new File(trustStoreFile) : null; + this.keyStore = keyStore; + } + + public static NetworkSecurityConfig of(boolean sslEnabled, String keyStoreFile, String keyStorePassword, + String trustStoreFile) { + return new NetworkSecurityConfig(sslEnabled, keyStoreFile, keyStorePassword, trustStoreFile, null); + } + + public static NetworkSecurityConfig of(boolean sslEnabled, KeyStore keyStore, String keyStorePassword, + String trustStoreFile) { + return new NetworkSecurityConfig(sslEnabled, null, keyStorePassword, trustStoreFile, keyStore); + } + + public boolean isSslEnabled() { + return sslEnabled; + } + + public File getKeyStoreFile() { + return keyStoreFile; + } + + public String getKeyStorePassword() { + return keyStorePassword; + } + + public KeyStore getKeyStore() { + return keyStore; + } + + public File getTrustStoreFile() { + return trustStoreFile; + } +} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/595a0f3e/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/security/NetworkSecurityManager.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/security/NetworkSecurityManager.java b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/security/NetworkSecurityManager.java new file mode 100644 index 0000000..ed25f41 --- /dev/null +++ b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/security/NetworkSecurityManager.java @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hyracks.ipc.security; + +import java.io.FileInputStream; +import java.security.KeyStore; +import java.security.SecureRandom; + +import javax.net.ssl.KeyManagerFactory; +import javax.net.ssl.SSLContext; +import javax.net.ssl.SSLEngine; +import javax.net.ssl.TrustManagerFactory; + +import org.apache.hyracks.api.network.INetworkSecurityConfig; +import org.apache.hyracks.api.network.INetworkSecurityManager; +import org.apache.hyracks.api.network.ISocketChannelFactory; +import org.apache.hyracks.ipc.sockets.PlainSocketChannelFactory; +import org.apache.hyracks.ipc.sockets.SslSocketChannelFactory; + +public class NetworkSecurityManager implements INetworkSecurityManager { + + private volatile INetworkSecurityConfig config; + private final ISocketChannelFactory sslSocketFactory; + private static final String TSL_VERSION = "TLSv1.2"; + + public NetworkSecurityManager(INetworkSecurityConfig config) { + this.config = config; + sslSocketFactory = new SslSocketChannelFactory(this); + } + + @Override + public SSLContext newSSLContext() { + try { + final char[] password = getKeyStorePassword(); + KeyStore engineKeyStore = config.getKeyStore(); + if (engineKeyStore == null) { + engineKeyStore = loadKeyStoreFromFile(password); + } + final String defaultAlgorithm = KeyManagerFactory.getDefaultAlgorithm(); + KeyManagerFactory keyManagerFactory = KeyManagerFactory.getInstance(defaultAlgorithm); + TrustManagerFactory trustManagerFactory = TrustManagerFactory.getInstance(defaultAlgorithm); + keyManagerFactory.init(engineKeyStore, "".toCharArray()); + final KeyStore trustStore = loadTrustStoreFromFile(password); + trustManagerFactory.init(trustStore); + SSLContext ctx = SSLContext.getInstance(TSL_VERSION); + ctx.init(keyManagerFactory.getKeyManagers(), trustManagerFactory.getTrustManagers(), new SecureRandom()); + return ctx; + } catch (Exception ex) { + throw new IllegalStateException("Failed to create SSLEngine", ex); + } + } + + @Override + public SSLEngine newSSLEngine() { + try { + SSLContext ctx = newSSLContext(); + return ctx.createSSLEngine(); + } catch (Exception ex) { + throw new IllegalStateException("Failed to create SSLEngine", ex); + } + } + + public ISocketChannelFactory getSocketChannelFactory() { + if (config.isSslEnabled()) { + return sslSocketFactory; + } + return PlainSocketChannelFactory.INSTANCE; + } + + @Override + public void setConfiguration(INetworkSecurityConfig config) { + this.config = config; + } + + private KeyStore loadKeyStoreFromFile(char[] password) { + try { + final KeyStore ks = KeyStore.getInstance(KeyStore.getDefaultType()); + ks.load(new FileInputStream(config.getKeyStoreFile()), password); + return ks; + } catch (Exception e) { + throw new IllegalStateException("failed to load key store", e); + } + } + + private KeyStore loadTrustStoreFromFile(char[] password) { + try { + final KeyStore ks = KeyStore.getInstance(KeyStore.getDefaultType()); + ks.load(new FileInputStream(config.getTrustStoreFile()), password); + return ks; + } catch (Exception e) { + throw new IllegalStateException("failed to load trust store", e); + } + } + + private char[] getKeyStorePassword() { + final String pass = config.getKeyStorePassword(); + return pass == null || pass.isEmpty() ? null : pass.toCharArray(); + } +} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/595a0f3e/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/sockets/PlainSocketChannel.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/sockets/PlainSocketChannel.java b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/sockets/PlainSocketChannel.java new file mode 100644 index 0000000..cae04a1 --- /dev/null +++ b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/sockets/PlainSocketChannel.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hyracks.ipc.sockets; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.channels.SocketChannel; + +import org.apache.hyracks.api.network.ISocketChannel; + +public class PlainSocketChannel implements ISocketChannel { + + private final SocketChannel socketChannel; + + public PlainSocketChannel(SocketChannel socketChannel) { + this.socketChannel = socketChannel; + } + + @Override + public boolean requiresHandshake() { + return false; + } + + @Override + public boolean handshake() { + return true; + } + + @Override + public int read(ByteBuffer dst) throws IOException { + return socketChannel.read(dst); + } + + @Override + public int write(ByteBuffer src) throws IOException { + return socketChannel.write(src); + } + + @Override + public void close() throws IOException { + socketChannel.close(); + } + + @Override + public SocketChannel getSocketChannel() { + return socketChannel; + } + + @Override + public boolean isPendingRead() { + return false; + } + + @Override + public boolean isPendingWrite() { + return false; + } + + @Override + public boolean completeWrite() { + return true; + } +} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/595a0f3e/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/sockets/PlainSocketChannelFactory.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/sockets/PlainSocketChannelFactory.java b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/sockets/PlainSocketChannelFactory.java new file mode 100644 index 0000000..e9b310f --- /dev/null +++ b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/sockets/PlainSocketChannelFactory.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hyracks.ipc.sockets; + +import java.nio.channels.SocketChannel; + +import org.apache.hyracks.api.network.ISocketChannel; +import org.apache.hyracks.api.network.ISocketChannelFactory; + +public class PlainSocketChannelFactory implements ISocketChannelFactory { + + public static final PlainSocketChannelFactory INSTANCE = new PlainSocketChannelFactory(); + + @Override + public ISocketChannel createServerChannel(SocketChannel socketChannel) { + return new PlainSocketChannel(socketChannel); + } + + @Override + public ISocketChannel createClientChannel(SocketChannel socketChannel) { + return new PlainSocketChannel(socketChannel); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/asterixdb/blob/595a0f3e/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/sockets/SslHandshake.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/sockets/SslHandshake.java b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/sockets/SslHandshake.java new file mode 100644 index 0000000..af32a70 --- /dev/null +++ b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/sockets/SslHandshake.java @@ -0,0 +1,173 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hyracks.ipc.sockets; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.channels.SocketChannel; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +import javax.net.ssl.SSLEngine; +import javax.net.ssl.SSLEngineResult; +import javax.net.ssl.SSLException; + +import org.apache.hyracks.util.NetworkUtil; + +public class SslHandshake { + + private final ExecutorService executor = Executors.newSingleThreadExecutor(); + private final ByteBuffer handshakeOutData; + private final SocketChannel socketChannel; + private final SSLEngine engine; + private SSLEngineResult.HandshakeStatus handshakeStatus; + private ByteBuffer handshakeInData; + private ByteBuffer outEncryptedData; + private ByteBuffer inEncryptedData; + + public SslHandshake(SslSocketChannel sslSocketChannel) { + socketChannel = sslSocketChannel.getSocketChannel(); + engine = sslSocketChannel.getSslEngine(); + final int pocketBufferSize = engine.getSession().getPacketBufferSize(); + inEncryptedData = ByteBuffer.allocate(pocketBufferSize); + outEncryptedData = ByteBuffer.allocate(pocketBufferSize); + // increase app buffer size to reduce possibility of overflow + final int appBufferSize = engine.getSession().getApplicationBufferSize() + 50; + handshakeOutData = ByteBuffer.allocate(appBufferSize); + handshakeInData = ByteBuffer.allocate(appBufferSize); + } + + public boolean handshake() throws IOException { + handshakeStatus = engine.getHandshakeStatus(); + while (handshakeStatus != SSLEngineResult.HandshakeStatus.FINISHED + && handshakeStatus != SSLEngineResult.HandshakeStatus.NOT_HANDSHAKING) { + switch (handshakeStatus) { + case NEED_UNWRAP: + if (!unwrap()) { + return false; + } + break; + case NEED_WRAP: + wrap(); + break; + case NEED_TASK: + Runnable task; + while ((task = engine.getDelegatedTask()) != null) { + executor.execute(task); + } + handshakeStatus = engine.getHandshakeStatus(); + break; + default: + throw new IllegalStateException("Invalid SSL handshake status: " + handshakeStatus); + } + } + return true; + } + + private void wrap() throws IOException { + outEncryptedData.clear(); + SSLEngineResult result; + try { + result = engine.wrap(handshakeOutData, outEncryptedData); + handshakeStatus = result.getHandshakeStatus(); + } catch (SSLException sslException) { + engine.closeOutbound(); + handshakeStatus = engine.getHandshakeStatus(); + throw sslException; + } + switch (result.getStatus()) { + case OK: + outEncryptedData.flip(); + while (outEncryptedData.hasRemaining()) { + socketChannel.write(outEncryptedData); + } + break; + case BUFFER_OVERFLOW: + outEncryptedData = NetworkUtil.enlargeSslPacketBuffer(engine, outEncryptedData); + break; + case CLOSED: + outEncryptedData.flip(); + while (outEncryptedData.hasRemaining()) { + socketChannel.write(outEncryptedData); + } + inEncryptedData.clear(); + handshakeStatus = engine.getHandshakeStatus(); + break; + case BUFFER_UNDERFLOW: + default: + throw new IllegalStateException("Invalid SSL status " + result.getStatus()); + } + } + + private boolean unwrap() throws IOException { + final int read = socketChannel.read(inEncryptedData); + if (read < 0) { + if (engine.isInboundDone() && engine.isOutboundDone()) { + return false; + } + engine.closeInbound(); + // close output to put engine in WRAP status to attempt graceful ssl session end + engine.closeOutbound(); + return false; + } + inEncryptedData.flip(); + SSLEngineResult result; + try { + result = engine.unwrap(inEncryptedData, handshakeInData); + inEncryptedData.compact(); + handshakeStatus = result.getHandshakeStatus(); + } catch (SSLException sslException) { + engine.closeOutbound(); + handshakeStatus = engine.getHandshakeStatus(); + throw sslException; + } + switch (result.getStatus()) { + case OK: + break; + case BUFFER_OVERFLOW: + handshakeInData = NetworkUtil.enlargeSslApplicationBuffer(engine, handshakeInData); + break; + case BUFFER_UNDERFLOW: + inEncryptedData = handleBufferUnderflow(engine, inEncryptedData); + break; + case CLOSED: + if (engine.isOutboundDone()) { + return false; + } else { + engine.closeOutbound(); + handshakeStatus = engine.getHandshakeStatus(); + break; + } + default: + throw new IllegalStateException("Invalid SSL status " + result.getStatus()); + } + return true; + } + + private ByteBuffer handleBufferUnderflow(SSLEngine engine, ByteBuffer buffer) { + if (buffer.capacity() >= engine.getSession().getPacketBufferSize()) { + return buffer; + } else { + final ByteBuffer replaceBuffer = NetworkUtil.enlargeSslPacketBuffer(engine, buffer); + buffer.flip(); + replaceBuffer.put(buffer); + return replaceBuffer; + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/asterixdb/blob/595a0f3e/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/sockets/SslSocketChannel.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/sockets/SslSocketChannel.java b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/sockets/SslSocketChannel.java new file mode 100644 index 0000000..73475b0 --- /dev/null +++ b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/sockets/SslSocketChannel.java @@ -0,0 +1,251 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hyracks.ipc.sockets; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.channels.SocketChannel; + +import javax.net.ssl.SSLEngine; +import javax.net.ssl.SSLEngineResult; +import javax.net.ssl.SSLSession; + +import org.apache.hyracks.api.network.ISocketChannel; +import org.apache.hyracks.util.NetworkUtil; +import org.apache.hyracks.util.StorageUtil; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +public class SslSocketChannel implements ISocketChannel { + + private static final Logger LOGGER = LogManager.getLogger(); + private static final int DEFAULT_APP_BUFFER_SIZE = + StorageUtil.getIntSizeInBytes(1, StorageUtil.StorageUnit.MEGABYTE); + private final SocketChannel socketChannel; + private final SSLEngine engine; + private ByteBuffer outEncryptedData; + private ByteBuffer inAppData; + private ByteBuffer inEncryptedData; + private boolean partialRecord = false; + private boolean cachedData = false; + private boolean pendingWrite = false; + + public SslSocketChannel(SocketChannel socketChannel, SSLEngine engine) { + this.socketChannel = socketChannel; + this.engine = engine; + inAppData = ByteBuffer.allocate(DEFAULT_APP_BUFFER_SIZE); + inAppData.limit(0); + final SSLSession sslSession = engine.getSession(); + inEncryptedData = ByteBuffer.allocate(sslSession.getPacketBufferSize()); + outEncryptedData = ByteBuffer.allocate(sslSession.getPacketBufferSize()); + outEncryptedData.limit(0); + } + + @Override + public synchronized boolean handshake() { + try { + LOGGER.debug("starting SSL handshake {}", this); + engine.beginHandshake(); + final SslHandshake sslHandshake = new SslHandshake(this); + final boolean success = sslHandshake.handshake(); + if (success) { + LOGGER.debug("SSL handshake successful {}", this); + } + return success; + } catch (Exception e) { + LOGGER.error("handshake failed {}", this, e); + throw new IllegalStateException(e); + } + } + + @Override + public boolean requiresHandshake() { + return true; + } + + @Override + public synchronized int read(ByteBuffer buffer) throws IOException { + int transfereeBytes = 0; + if (cachedData) { + transfereeBytes += transferTo(inAppData, buffer); + } + if (buffer.hasRemaining()) { + if (!partialRecord) { + inEncryptedData.clear(); + } + final int bytesRead = socketChannel.read(inEncryptedData); + if (bytesRead > 0) { + partialRecord = false; + inEncryptedData.flip(); + inAppData.clear(); + if (decrypt() > 0) { + inAppData.flip(); + transfereeBytes += transferTo(inAppData, buffer); + } else { + inAppData.limit(0); + } + } else if (bytesRead < 0) { + handleEndOfStreamQuietly(); + return -1; + } + } + cachedData = inAppData.hasRemaining(); + return transfereeBytes; + } + + private int decrypt() throws IOException { + int decryptedBytes = 0; + while (inEncryptedData.hasRemaining() && !partialRecord) { + SSLEngineResult result = engine.unwrap(inEncryptedData, inAppData); + switch (result.getStatus()) { + case OK: + decryptedBytes += result.bytesProduced(); + partialRecord = false; + break; + case BUFFER_OVERFLOW: + inAppData = NetworkUtil.enlargeSslApplicationBuffer(engine, inAppData); + break; + case BUFFER_UNDERFLOW: + handleReadUnderflow(); + break; + case CLOSED: + close(); + return -1; + default: + throw new IllegalStateException("Invalid SSL result status: " + result.getStatus()); + } + } + return decryptedBytes; + } + + public synchronized int write(ByteBuffer src) throws IOException { + if (pendingWrite && !completeWrite()) { + return 0; + } + int encryptedBytes = 0; + while (src.hasRemaining()) { + // chunk src to encrypted ssl records of pocket size + outEncryptedData.clear(); + final SSLEngineResult result = engine.wrap(src, outEncryptedData); + switch (result.getStatus()) { + case OK: + outEncryptedData.flip(); + encryptedBytes += result.bytesConsumed(); + while (outEncryptedData.hasRemaining()) { + final int written = socketChannel.write(outEncryptedData); + if (written == 0) { + pendingWrite = true; + return encryptedBytes; + } + } + break; + case BUFFER_OVERFLOW: + outEncryptedData = NetworkUtil.enlargeSslPacketBuffer(engine, outEncryptedData); + break; + case CLOSED: + close(); + return -1; + case BUFFER_UNDERFLOW: + default: + throw new IllegalStateException("Invalid SSL result status: " + result.getStatus()); + } + } + pendingWrite = false; + return encryptedBytes; + } + + @Override + public synchronized boolean completeWrite() throws IOException { + while (outEncryptedData.hasRemaining()) { + final int written = socketChannel.write(outEncryptedData); + if (written == 0) { + return false; + } + } + pendingWrite = false; + return true; + } + + @Override + public synchronized void close() throws IOException { + engine.closeOutbound(); + new SslHandshake(this).handshake(); + socketChannel.close(); + } + + @Override + public SocketChannel getSocketChannel() { + return socketChannel; + } + + @Override + public synchronized boolean isPendingRead() { + return cachedData; + } + + @Override + public synchronized boolean isPendingWrite() { + return pendingWrite; + } + + public SSLEngine getSslEngine() { + return engine; + } + + @Override + public String toString() { + return getConnectionInfo(); + } + + private void handleReadUnderflow() { + if (engine.getSession().getPacketBufferSize() > inEncryptedData.capacity()) { + inEncryptedData = NetworkUtil.enlargeSslPacketBuffer(engine, inEncryptedData); + } else { + inEncryptedData.compact(); + } + partialRecord = true; + } + + private void handleEndOfStreamQuietly() { + try { + engine.closeInbound(); + close(); + } catch (Exception e) { + LOGGER.warn("failed to close socket gracefully", e); + } + } + + private String getConnectionInfo() { + try { + return getSocketChannel().getLocalAddress() + " -> " + getSocketChannel().getRemoteAddress(); + } catch (IOException e) { + LOGGER.warn("failed to get connection info", e); + return ""; + } + } + + private static int transferTo(ByteBuffer src, ByteBuffer dst) { + final int maxTransfer = Math.min(dst.remaining(), src.remaining()); + if (maxTransfer > 0) { + dst.put(src.array(), src.arrayOffset() + src.position(), maxTransfer); + src.position(src.position() + maxTransfer); + } + return maxTransfer; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/asterixdb/blob/595a0f3e/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/sockets/SslSocketChannelFactory.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/sockets/SslSocketChannelFactory.java b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/sockets/SslSocketChannelFactory.java new file mode 100644 index 0000000..2a2fb62 --- /dev/null +++ b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/sockets/SslSocketChannelFactory.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hyracks.ipc.sockets; + +import java.nio.channels.SocketChannel; + +import javax.net.ssl.SSLEngine; + +import org.apache.hyracks.api.network.INetworkSecurityManager; +import org.apache.hyracks.api.network.ISocketChannel; +import org.apache.hyracks.api.network.ISocketChannelFactory; + +public class SslSocketChannelFactory implements ISocketChannelFactory { + + private final INetworkSecurityManager networkSecurityManager; + + public SslSocketChannelFactory(INetworkSecurityManager networkSecurityManager) { + this.networkSecurityManager = networkSecurityManager; + } + + @Override + public ISocketChannel createServerChannel(SocketChannel socketChannel) { + final SSLEngine sslEngine = networkSecurityManager.newSSLEngine(); + sslEngine.setUseClientMode(false); + return new SslSocketChannel(socketChannel, sslEngine); + } + + @Override + public ISocketChannel createClientChannel(SocketChannel socketChannel) { + final SSLEngine sslEngine = networkSecurityManager.newSSLEngine(); + sslEngine.setUseClientMode(true); + return new SslSocketChannel(socketChannel, sslEngine); + } +} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/595a0f3e/hyracks-fullstack/hyracks/hyracks-ipc/src/test/java/org/apache/hyracks/ipc/tests/IPCTest.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-ipc/src/test/java/org/apache/hyracks/ipc/tests/IPCTest.java b/hyracks-fullstack/hyracks/hyracks-ipc/src/test/java/org/apache/hyracks/ipc/tests/IPCTest.java index 70a0e18..00bd761 100644 --- a/hyracks-fullstack/hyracks/hyracks-ipc/src/test/java/org/apache/hyracks/ipc/tests/IPCTest.java +++ b/hyracks-fullstack/hyracks/hyracks-ipc/src/test/java/org/apache/hyracks/ipc/tests/IPCTest.java @@ -29,6 +29,7 @@ import org.apache.hyracks.ipc.api.RPCInterface; import org.apache.hyracks.ipc.exceptions.IPCException; import org.apache.hyracks.ipc.impl.IPCSystem; import org.apache.hyracks.ipc.impl.JavaSerializationBasedPayloadSerializerDeserializer; +import org.apache.hyracks.ipc.sockets.PlainSocketChannelFactory; import org.junit.Assert; import org.junit.Test; @@ -83,12 +84,12 @@ public class IPCTest { }); } }; - return new IPCSystem(new InetSocketAddress("127.0.0.1", 0), ipci, + return new IPCSystem(new InetSocketAddress("127.0.0.1", 0), PlainSocketChannelFactory.INSTANCE, ipci, new JavaSerializationBasedPayloadSerializerDeserializer()); } private IPCSystem createClientIPCSystem(RPCInterface rpci) throws IOException { - return new IPCSystem(new InetSocketAddress("127.0.0.1", 0), rpci, + return new IPCSystem(new InetSocketAddress("127.0.0.1", 0), PlainSocketChannelFactory.INSTANCE, rpci, new JavaSerializationBasedPayloadSerializerDeserializer()); } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/595a0f3e/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/NetworkUtil.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/NetworkUtil.java b/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/NetworkUtil.java index c6b76fc..d10be8e 100644 --- a/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/NetworkUtil.java +++ b/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/NetworkUtil.java @@ -24,10 +24,13 @@ import java.net.InetSocketAddress; import java.net.StandardSocketOptions; import java.net.URI; import java.net.URISyntaxException; +import java.nio.ByteBuffer; import java.nio.channels.SocketChannel; import java.util.ArrayList; import java.util.List; +import javax.net.ssl.SSLEngine; + import org.apache.http.HttpHost; import org.apache.http.client.utils.URIBuilder; import org.apache.http.conn.util.InetAddressUtils; @@ -107,4 +110,24 @@ public class NetworkUtil { return hostname.length() > 0 && hostname.charAt(0) == '[' ? hostname.substring(1, hostname.length() - 1) : hostname; } + + public static ByteBuffer enlargeSslPacketBuffer(SSLEngine engine, ByteBuffer buffer) { + return enlargeSslBuffer(buffer, engine.getSession().getPacketBufferSize()); + } + + public static ByteBuffer enlargeSslApplicationBuffer(SSLEngine engine, ByteBuffer buffer) { + return enlargeSslBuffer(buffer, engine.getSession().getApplicationBufferSize()); + } + + public static ByteBuffer enlargeSslBuffer(ByteBuffer src, int sessionProposedCapacity) { + final ByteBuffer enlargedBuffer; + if (sessionProposedCapacity > src.capacity()) { + enlargedBuffer = ByteBuffer.allocate(sessionProposedCapacity); + } else { + enlargedBuffer = ByteBuffer.allocate(src.capacity() * 2); + } + src.flip(); + enlargedBuffer.put(src); + return enlargedBuffer; + } }
