Repository: asterixdb
Updated Branches:
  refs/heads/master f184a1e7b -> 595a0f3e8


http://git-wip-us.apache.org/repos/asf/asterixdb/blob/595a0f3e/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/HyracksConnection.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/HyracksConnection.java
 
b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/HyracksConnection.java
index e6c28fa..2c7e82e 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/HyracksConnection.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/HyracksConnection.java
@@ -52,10 +52,12 @@ import org.apache.hyracks.api.job.JobId;
 import org.apache.hyracks.api.job.JobInfo;
 import org.apache.hyracks.api.job.JobSpecification;
 import org.apache.hyracks.api.job.JobStatus;
+import org.apache.hyracks.api.network.ISocketChannelFactory;
 import org.apache.hyracks.api.topology.ClusterTopology;
 import org.apache.hyracks.api.util.InvokeUtil;
 import org.apache.hyracks.api.util.JavaSerializationUtils;
 import org.apache.hyracks.ipc.api.RPCInterface;
+import org.apache.hyracks.ipc.sockets.PlainSocketChannelFactory;
 import org.apache.hyracks.util.ExitUtil;
 import org.apache.hyracks.util.InterruptibleAction;
 import org.apache.logging.log4j.Level;
@@ -102,11 +104,12 @@ public final class HyracksConnection implements 
IHyracksClientConnection {
      *            host name.
      * @throws Exception
      */
-    public HyracksConnection(String ccHost, int ccPort) throws Exception {
+    public HyracksConnection(String ccHost, int ccPort, ISocketChannelFactory 
socketChannelFactory) throws Exception {
         this.ccHost = ccHost;
         this.ccPort = ccPort;
         RPCInterface rpci = new RPCInterface();
-        ipc = new IPCSystem(new InetSocketAddress(0), rpci, new 
JavaSerializationBasedPayloadSerializerDeserializer());
+        ipc = new IPCSystem(new InetSocketAddress(0), socketChannelFactory, 
rpci,
+                new JavaSerializationBasedPayloadSerializerDeserializer());
         ipc.start();
         hci = new 
HyracksClientInterfaceRemoteProxy(ipc.getReconnectingHandle(new 
InetSocketAddress(ccHost, ccPort)),
                 rpci);
@@ -115,6 +118,10 @@ public final class HyracksConnection implements 
IHyracksClientConnection {
         uninterruptibleExecutor.execute(new UninterrubtileHandlerWatcher());
     }
 
+    public HyracksConnection(String ccHost, int ccPort) throws Exception {
+        this(ccHost, ccPort, PlainSocketChannelFactory.INSTANCE);
+    }
+
     @Override
     public JobStatus getJobStatus(JobId jobId) throws Exception {
         return hci.getJobStatus(jobId);

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/595a0f3e/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCConnectionManager.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCConnectionManager.java
 
b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCConnectionManager.java
index 9ef506e..205ecfe 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCConnectionManager.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCConnectionManager.java
@@ -36,8 +36,10 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import java.util.concurrent.TimeUnit;
+import java.util.concurrent.CompletableFuture;
 
+import org.apache.hyracks.api.network.ISocketChannel;
+import org.apache.hyracks.api.network.ISocketChannelFactory;
 import org.apache.hyracks.util.ExitUtil;
 import org.apache.hyracks.util.NetworkUtil;
 import org.apache.logging.log4j.Level;
@@ -71,8 +73,12 @@ public class IPCConnectionManager {
 
     private volatile boolean stopped;
 
-    IPCConnectionManager(IPCSystem system, InetSocketAddress socketAddress) 
throws IOException {
+    private final ISocketChannelFactory socketChannelFactory;
+
+    IPCConnectionManager(IPCSystem system, InetSocketAddress socketAddress, 
ISocketChannelFactory socketChannelFactory)
+            throws IOException {
         this.system = system;
+        this.socketChannelFactory = socketChannelFactory;
         this.serverSocketChannel = ServerSocketChannel.open();
         serverSocketChannel.socket().setReuseAddress(true);
         serverSocketChannel.configureBlocking(false);
@@ -209,7 +215,8 @@ public class IPCConnectionManager {
                 SelectionKey key = i.next();
                 i.remove();
                 final SelectableChannel sc = key.channel();
-                if (key.isReadable()) {
+                // do not attempt to read until handle is set (e.g. after 
handshake is completed)
+                if (key.isReadable() && key.attachment() != null) {
                     read(key);
                 } else if (key.isWritable()) {
                     write(key);
@@ -229,8 +236,13 @@ public class IPCConnectionManager {
             try {
                 connected = channel.finishConnect();
                 if (connected) {
-                    connectableKey.interestOps(SelectionKey.OP_READ);
-                    connectionEstablished(handle);
+                    SelectionKey channelKey = channel.register(selector, 
SelectionKey.OP_READ);
+                    final ISocketChannel clientChannel = 
socketChannelFactory.createClientChannel(channel);
+                    if (clientChannel.requiresHandshake()) {
+                        asyncHandshake(clientChannel, handle, channelKey);
+                    } else {
+                        connectionEstablished(handle, channelKey, 
clientChannel);
+                    }
                 }
             } catch (IOException e) {
                 LOGGER.warn("Exception finishing connect", e);
@@ -248,11 +260,13 @@ public class IPCConnectionManager {
             try {
                 channel = serverSocketChannel.accept();
                 register(channel);
+                final ISocketChannel serverChannel = 
socketChannelFactory.createServerChannel(channel);
                 channelKey = channel.register(selector, SelectionKey.OP_READ);
-                IPCHandle handle = new IPCHandle(system, null);
-                handle.setKey(channelKey);
-                channelKey.attach(handle);
-                handle.setState(HandleState.CONNECT_RECEIVED);
+                if (serverChannel.requiresHandshake()) {
+                    asyncHandshake(serverChannel, null, channelKey);
+                } else {
+                    connectionReceived(serverChannel, channelKey);
+                }
             } catch (IOException e) {
                 LOGGER.error("Failed to accept channel ", e);
                 close(channelKey, channel);
@@ -268,12 +282,17 @@ public class IPCConnectionManager {
                     register(channel);
                     if (channel.connect(handle.getRemoteAddress())) {
                         channelKey = channel.register(selector, 
SelectionKey.OP_READ);
-                        connectionEstablished(handle);
+                        final ISocketChannel clientChannel = 
socketChannelFactory.createClientChannel(channel);
+                        if (clientChannel.requiresHandshake()) {
+                            asyncHandshake(clientChannel, handle, channelKey);
+                        } else {
+                            connectionEstablished(handle, channelKey, 
clientChannel);
+                        }
                     } else {
                         channelKey = channel.register(selector, 
SelectionKey.OP_CONNECT);
+                        handle.setKey(channelKey);
+                        channelKey.attach(handle);
                     }
-                    handle.setKey(channelKey);
-                    channelKey.attach(handle);
                 } catch (IOException e) {
                     LOGGER.error("Failed to accept channel ", e);
                     close(channelKey, channel);
@@ -283,10 +302,13 @@ public class IPCConnectionManager {
             workingPendingConnections.clear();
         }
 
-        private void connectionEstablished(IPCHandle handle) {
+        private void connectionEstablished(IPCHandle handle, SelectionKey 
channelKey, ISocketChannel channel) {
+            handle.setSocketChannel(channel);
             handle.setState(HandleState.CONNECT_SENT);
+            handle.setKey(channelKey);
             registerHandle(handle);
             IPCConnectionManager.this.write(createInitialReqMessage(handle));
+            channelKey.attach(handle);
         }
 
         private void sendPendingMessages() {
@@ -367,7 +389,7 @@ public class IPCConnectionManager {
             IPCHandle handle = (IPCHandle) readableKey.attachment();
             ByteBuffer readBuffer = handle.getInBuffer();
             try {
-                int len = channel.read(readBuffer);
+                int len = handle.getSocketChannel().read(readBuffer);
                 if (len < 0) {
                     close(readableKey, channel);
                     return;
@@ -386,15 +408,16 @@ public class IPCConnectionManager {
         private void write(SelectionKey writableKey) {
             SocketChannel channel = (SocketChannel) writableKey.channel();
             IPCHandle handle = (IPCHandle) writableKey.attachment();
+            final ISocketChannel socketChannel = handle.getSocketChannel();
             ByteBuffer writeBuffer = handle.getOutBuffer();
             try {
-                int len = channel.write(writeBuffer);
+                int len = socketChannel.write(writeBuffer);
                 if (len < 0) {
                     close(writableKey, channel);
                     return;
                 }
                 system.getPerformanceCounters().addMessageBytesSent(len);
-                if (!writeBuffer.hasRemaining()) {
+                if (!writeBuffer.hasRemaining() && 
!socketChannel.isPendingWrite()) {
                     writableKey.interestOps(writableKey.interestOps() & 
~SelectionKey.OP_WRITE);
                 }
                 if (handle.full()) {
@@ -445,5 +468,31 @@ public class IPCConnectionManager {
             target.addAll(source);
             source.clear();
         }
+
+        private void asyncHandshake(ISocketChannel socketChannel, IPCHandle 
handle, SelectionKey channelKey) {
+            
CompletableFuture.supplyAsync(socketChannel::handshake).exceptionally(ex -> 
false).thenAccept(
+                    handshakeSuccess -> 
handleHandshakeCompletion(handshakeSuccess, socketChannel, handle, channelKey));
+        }
+
+        private void handleHandshakeCompletion(Boolean handshakeSuccess, 
ISocketChannel socketChannel, IPCHandle handle,
+                SelectionKey channelKey) {
+            if (handshakeSuccess) {
+                if (handle == null) {
+                    connectionReceived(socketChannel, channelKey);
+                } else {
+                    connectionEstablished(handle, channelKey, socketChannel);
+                }
+            } else {
+                close(channelKey, socketChannel.getSocketChannel());
+            }
+        }
+
+        private void connectionReceived(ISocketChannel channel, SelectionKey 
channelKey) {
+            final IPCHandle handle = new IPCHandle(system, null);
+            handle.setState(HandleState.CONNECT_RECEIVED);
+            handle.setSocketChannel(channel);
+            handle.setKey(channelKey);
+            channelKey.attach(handle);
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/595a0f3e/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCHandle.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCHandle.java
 
b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCHandle.java
index 09c7c97..5d92960 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCHandle.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCHandle.java
@@ -22,6 +22,7 @@ import java.net.InetSocketAddress;
 import java.nio.ByteBuffer;
 import java.nio.channels.SelectionKey;
 
+import org.apache.hyracks.api.network.ISocketChannel;
 import org.apache.hyracks.ipc.api.IIPCHandle;
 import org.apache.hyracks.ipc.exceptions.IPCException;
 
@@ -44,6 +45,8 @@ final class IPCHandle implements IIPCHandle {
 
     private boolean full;
 
+    private ISocketChannel socketChannel;
+
     IPCHandle(IPCSystem system, InetSocketAddress remoteAddress) {
         this.system = system;
         this.remoteAddress = remoteAddress;
@@ -100,6 +103,14 @@ final class IPCHandle implements IIPCHandle {
         this.key = key;
     }
 
+    public ISocketChannel getSocketChannel() {
+        return socketChannel;
+    }
+
+    public void setSocketChannel(ISocketChannel socketChannel) {
+        this.socketChannel = socketChannel;
+    }
+
     public synchronized boolean isConnected() {
         return state == HandleState.CONNECTED;
     }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/595a0f3e/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCSystem.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCSystem.java
 
b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCSystem.java
index b7dcf05..8d90ba3 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCSystem.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCSystem.java
@@ -22,6 +22,7 @@ import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.util.concurrent.atomic.AtomicLong;
 
+import org.apache.hyracks.api.network.ISocketChannelFactory;
 import org.apache.hyracks.ipc.api.IIPCEventListener;
 import org.apache.hyracks.ipc.api.IIPCHandle;
 import org.apache.hyracks.ipc.api.IIPCI;
@@ -45,9 +46,9 @@ public class IPCSystem {
 
     private final IPCPerformanceCounters perfCounters;
 
-    public IPCSystem(InetSocketAddress socketAddress, IIPCI ipci, 
IPayloadSerializerDeserializer serde)
-            throws IOException {
-        cMgr = new IPCConnectionManager(this, socketAddress);
+    public IPCSystem(InetSocketAddress socketAddress, ISocketChannelFactory 
socketChannelFactory, IIPCI ipci,
+            IPayloadSerializerDeserializer serde) throws IOException {
+        cMgr = new IPCConnectionManager(this, socketAddress, 
socketChannelFactory);
         this.ipci = ipci;
         this.serde = serde;
         midFactory = new AtomicLong();

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/595a0f3e/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/security/NetworkSecurityConfig.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/security/NetworkSecurityConfig.java
 
b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/security/NetworkSecurityConfig.java
new file mode 100644
index 0000000..7f02830
--- /dev/null
+++ 
b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/security/NetworkSecurityConfig.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.ipc.security;
+
+import java.io.File;
+import java.security.KeyStore;
+
+import org.apache.hyracks.api.network.INetworkSecurityConfig;
+
+public class NetworkSecurityConfig implements INetworkSecurityConfig {
+
+    private final boolean sslEnabled;
+    private final File keyStoreFile;
+    private final File trustStoreFile;
+    private final String keyStorePassword;
+    private final KeyStore keyStore;
+
+    private NetworkSecurityConfig(boolean sslEnabled, String keyStoreFile, 
String keyStorePassword,
+            String trustStoreFile, KeyStore keyStore) {
+        this.sslEnabled = sslEnabled;
+        this.keyStoreFile = keyStoreFile != null ? new File(keyStoreFile) : 
null;
+        this.keyStorePassword = keyStorePassword;
+        this.trustStoreFile = trustStoreFile != null ? new 
File(trustStoreFile) : null;
+        this.keyStore = keyStore;
+    }
+
+    public static NetworkSecurityConfig of(boolean sslEnabled, String 
keyStoreFile, String keyStorePassword,
+            String trustStoreFile) {
+        return new NetworkSecurityConfig(sslEnabled, keyStoreFile, 
keyStorePassword, trustStoreFile, null);
+    }
+
+    public static NetworkSecurityConfig of(boolean sslEnabled, KeyStore 
keyStore, String keyStorePassword,
+            String trustStoreFile) {
+        return new NetworkSecurityConfig(sslEnabled, null, keyStorePassword, 
trustStoreFile, keyStore);
+    }
+
+    public boolean isSslEnabled() {
+        return sslEnabled;
+    }
+
+    public File getKeyStoreFile() {
+        return keyStoreFile;
+    }
+
+    public String getKeyStorePassword() {
+        return keyStorePassword;
+    }
+
+    public KeyStore getKeyStore() {
+        return keyStore;
+    }
+
+    public File getTrustStoreFile() {
+        return trustStoreFile;
+    }
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/595a0f3e/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/security/NetworkSecurityManager.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/security/NetworkSecurityManager.java
 
b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/security/NetworkSecurityManager.java
new file mode 100644
index 0000000..ed25f41
--- /dev/null
+++ 
b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/security/NetworkSecurityManager.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.ipc.security;
+
+import java.io.FileInputStream;
+import java.security.KeyStore;
+import java.security.SecureRandom;
+
+import javax.net.ssl.KeyManagerFactory;
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.SSLEngine;
+import javax.net.ssl.TrustManagerFactory;
+
+import org.apache.hyracks.api.network.INetworkSecurityConfig;
+import org.apache.hyracks.api.network.INetworkSecurityManager;
+import org.apache.hyracks.api.network.ISocketChannelFactory;
+import org.apache.hyracks.ipc.sockets.PlainSocketChannelFactory;
+import org.apache.hyracks.ipc.sockets.SslSocketChannelFactory;
+
+public class NetworkSecurityManager implements INetworkSecurityManager {
+
+    private volatile INetworkSecurityConfig config;
+    private final ISocketChannelFactory sslSocketFactory;
+    private static final String TSL_VERSION = "TLSv1.2";
+
+    public NetworkSecurityManager(INetworkSecurityConfig config) {
+        this.config = config;
+        sslSocketFactory = new SslSocketChannelFactory(this);
+    }
+
+    @Override
+    public SSLContext newSSLContext() {
+        try {
+            final char[] password = getKeyStorePassword();
+            KeyStore engineKeyStore = config.getKeyStore();
+            if (engineKeyStore == null) {
+                engineKeyStore = loadKeyStoreFromFile(password);
+            }
+            final String defaultAlgorithm = 
KeyManagerFactory.getDefaultAlgorithm();
+            KeyManagerFactory keyManagerFactory = 
KeyManagerFactory.getInstance(defaultAlgorithm);
+            TrustManagerFactory trustManagerFactory = 
TrustManagerFactory.getInstance(defaultAlgorithm);
+            keyManagerFactory.init(engineKeyStore, "".toCharArray());
+            final KeyStore trustStore = loadTrustStoreFromFile(password);
+            trustManagerFactory.init(trustStore);
+            SSLContext ctx = SSLContext.getInstance(TSL_VERSION);
+            ctx.init(keyManagerFactory.getKeyManagers(), 
trustManagerFactory.getTrustManagers(), new SecureRandom());
+            return ctx;
+        } catch (Exception ex) {
+            throw new IllegalStateException("Failed to create SSLEngine", ex);
+        }
+    }
+
+    @Override
+    public SSLEngine newSSLEngine() {
+        try {
+            SSLContext ctx = newSSLContext();
+            return ctx.createSSLEngine();
+        } catch (Exception ex) {
+            throw new IllegalStateException("Failed to create SSLEngine", ex);
+        }
+    }
+
+    public ISocketChannelFactory getSocketChannelFactory() {
+        if (config.isSslEnabled()) {
+            return sslSocketFactory;
+        }
+        return PlainSocketChannelFactory.INSTANCE;
+    }
+
+    @Override
+    public void setConfiguration(INetworkSecurityConfig config) {
+        this.config = config;
+    }
+
+    private KeyStore loadKeyStoreFromFile(char[] password) {
+        try {
+            final KeyStore ks = 
KeyStore.getInstance(KeyStore.getDefaultType());
+            ks.load(new FileInputStream(config.getKeyStoreFile()), password);
+            return ks;
+        } catch (Exception e) {
+            throw new IllegalStateException("failed to load key store", e);
+        }
+    }
+
+    private KeyStore loadTrustStoreFromFile(char[] password) {
+        try {
+            final KeyStore ks = 
KeyStore.getInstance(KeyStore.getDefaultType());
+            ks.load(new FileInputStream(config.getTrustStoreFile()), password);
+            return ks;
+        } catch (Exception e) {
+            throw new IllegalStateException("failed to load trust store", e);
+        }
+    }
+
+    private char[] getKeyStorePassword() {
+        final String pass = config.getKeyStorePassword();
+        return pass == null || pass.isEmpty() ? null : pass.toCharArray();
+    }
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/595a0f3e/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/sockets/PlainSocketChannel.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/sockets/PlainSocketChannel.java
 
b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/sockets/PlainSocketChannel.java
new file mode 100644
index 0000000..cae04a1
--- /dev/null
+++ 
b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/sockets/PlainSocketChannel.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.ipc.sockets;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.SocketChannel;
+
+import org.apache.hyracks.api.network.ISocketChannel;
+
+public class PlainSocketChannel implements ISocketChannel {
+
+    private final SocketChannel socketChannel;
+
+    public PlainSocketChannel(SocketChannel socketChannel) {
+        this.socketChannel = socketChannel;
+    }
+
+    @Override
+    public boolean requiresHandshake() {
+        return false;
+    }
+
+    @Override
+    public boolean handshake() {
+        return true;
+    }
+
+    @Override
+    public int read(ByteBuffer dst) throws IOException {
+        return socketChannel.read(dst);
+    }
+
+    @Override
+    public int write(ByteBuffer src) throws IOException {
+        return socketChannel.write(src);
+    }
+
+    @Override
+    public void close() throws IOException {
+        socketChannel.close();
+    }
+
+    @Override
+    public SocketChannel getSocketChannel() {
+        return socketChannel;
+    }
+
+    @Override
+    public boolean isPendingRead() {
+        return false;
+    }
+
+    @Override
+    public boolean isPendingWrite() {
+        return false;
+    }
+
+    @Override
+    public boolean completeWrite() {
+        return true;
+    }
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/595a0f3e/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/sockets/PlainSocketChannelFactory.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/sockets/PlainSocketChannelFactory.java
 
b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/sockets/PlainSocketChannelFactory.java
new file mode 100644
index 0000000..e9b310f
--- /dev/null
+++ 
b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/sockets/PlainSocketChannelFactory.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.ipc.sockets;
+
+import java.nio.channels.SocketChannel;
+
+import org.apache.hyracks.api.network.ISocketChannel;
+import org.apache.hyracks.api.network.ISocketChannelFactory;
+
+public class PlainSocketChannelFactory implements ISocketChannelFactory {
+
+    public static final PlainSocketChannelFactory INSTANCE = new 
PlainSocketChannelFactory();
+
+    @Override
+    public ISocketChannel createServerChannel(SocketChannel socketChannel) {
+        return new PlainSocketChannel(socketChannel);
+    }
+
+    @Override
+    public ISocketChannel createClientChannel(SocketChannel socketChannel) {
+        return new PlainSocketChannel(socketChannel);
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/595a0f3e/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/sockets/SslHandshake.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/sockets/SslHandshake.java
 
b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/sockets/SslHandshake.java
new file mode 100644
index 0000000..af32a70
--- /dev/null
+++ 
b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/sockets/SslHandshake.java
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.ipc.sockets;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.SocketChannel;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+import javax.net.ssl.SSLEngine;
+import javax.net.ssl.SSLEngineResult;
+import javax.net.ssl.SSLException;
+
+import org.apache.hyracks.util.NetworkUtil;
+
+public class SslHandshake {
+
+    private final ExecutorService executor = 
Executors.newSingleThreadExecutor();
+    private final ByteBuffer handshakeOutData;
+    private final SocketChannel socketChannel;
+    private final SSLEngine engine;
+    private SSLEngineResult.HandshakeStatus handshakeStatus;
+    private ByteBuffer handshakeInData;
+    private ByteBuffer outEncryptedData;
+    private ByteBuffer inEncryptedData;
+
+    public SslHandshake(SslSocketChannel sslSocketChannel) {
+        socketChannel = sslSocketChannel.getSocketChannel();
+        engine = sslSocketChannel.getSslEngine();
+        final int pocketBufferSize = engine.getSession().getPacketBufferSize();
+        inEncryptedData = ByteBuffer.allocate(pocketBufferSize);
+        outEncryptedData = ByteBuffer.allocate(pocketBufferSize);
+        // increase app buffer size to reduce possibility of overflow
+        final int appBufferSize = 
engine.getSession().getApplicationBufferSize() + 50;
+        handshakeOutData = ByteBuffer.allocate(appBufferSize);
+        handshakeInData = ByteBuffer.allocate(appBufferSize);
+    }
+
+    public boolean handshake() throws IOException {
+        handshakeStatus = engine.getHandshakeStatus();
+        while (handshakeStatus != SSLEngineResult.HandshakeStatus.FINISHED
+                && handshakeStatus != 
SSLEngineResult.HandshakeStatus.NOT_HANDSHAKING) {
+            switch (handshakeStatus) {
+                case NEED_UNWRAP:
+                    if (!unwrap()) {
+                        return false;
+                    }
+                    break;
+                case NEED_WRAP:
+                    wrap();
+                    break;
+                case NEED_TASK:
+                    Runnable task;
+                    while ((task = engine.getDelegatedTask()) != null) {
+                        executor.execute(task);
+                    }
+                    handshakeStatus = engine.getHandshakeStatus();
+                    break;
+                default:
+                    throw new IllegalStateException("Invalid SSL handshake 
status: " + handshakeStatus);
+            }
+        }
+        return true;
+    }
+
+    private void wrap() throws IOException {
+        outEncryptedData.clear();
+        SSLEngineResult result;
+        try {
+            result = engine.wrap(handshakeOutData, outEncryptedData);
+            handshakeStatus = result.getHandshakeStatus();
+        } catch (SSLException sslException) {
+            engine.closeOutbound();
+            handshakeStatus = engine.getHandshakeStatus();
+            throw sslException;
+        }
+        switch (result.getStatus()) {
+            case OK:
+                outEncryptedData.flip();
+                while (outEncryptedData.hasRemaining()) {
+                    socketChannel.write(outEncryptedData);
+                }
+                break;
+            case BUFFER_OVERFLOW:
+                outEncryptedData = NetworkUtil.enlargeSslPacketBuffer(engine, 
outEncryptedData);
+                break;
+            case CLOSED:
+                outEncryptedData.flip();
+                while (outEncryptedData.hasRemaining()) {
+                    socketChannel.write(outEncryptedData);
+                }
+                inEncryptedData.clear();
+                handshakeStatus = engine.getHandshakeStatus();
+                break;
+            case BUFFER_UNDERFLOW:
+            default:
+                throw new IllegalStateException("Invalid SSL status " + 
result.getStatus());
+        }
+    }
+
+    private boolean unwrap() throws IOException {
+        final int read = socketChannel.read(inEncryptedData);
+        if (read < 0) {
+            if (engine.isInboundDone() && engine.isOutboundDone()) {
+                return false;
+            }
+            engine.closeInbound();
+            // close output to put engine in WRAP status to attempt graceful 
ssl session end
+            engine.closeOutbound();
+            return false;
+        }
+        inEncryptedData.flip();
+        SSLEngineResult result;
+        try {
+            result = engine.unwrap(inEncryptedData, handshakeInData);
+            inEncryptedData.compact();
+            handshakeStatus = result.getHandshakeStatus();
+        } catch (SSLException sslException) {
+            engine.closeOutbound();
+            handshakeStatus = engine.getHandshakeStatus();
+            throw sslException;
+        }
+        switch (result.getStatus()) {
+            case OK:
+                break;
+            case BUFFER_OVERFLOW:
+                handshakeInData = 
NetworkUtil.enlargeSslApplicationBuffer(engine, handshakeInData);
+                break;
+            case BUFFER_UNDERFLOW:
+                inEncryptedData = handleBufferUnderflow(engine, 
inEncryptedData);
+                break;
+            case CLOSED:
+                if (engine.isOutboundDone()) {
+                    return false;
+                } else {
+                    engine.closeOutbound();
+                    handshakeStatus = engine.getHandshakeStatus();
+                    break;
+                }
+            default:
+                throw new IllegalStateException("Invalid SSL status " + 
result.getStatus());
+        }
+        return true;
+    }
+
+    private ByteBuffer handleBufferUnderflow(SSLEngine engine, ByteBuffer 
buffer) {
+        if (buffer.capacity() >= engine.getSession().getPacketBufferSize()) {
+            return buffer;
+        } else {
+            final ByteBuffer replaceBuffer = 
NetworkUtil.enlargeSslPacketBuffer(engine, buffer);
+            buffer.flip();
+            replaceBuffer.put(buffer);
+            return replaceBuffer;
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/595a0f3e/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/sockets/SslSocketChannel.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/sockets/SslSocketChannel.java
 
b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/sockets/SslSocketChannel.java
new file mode 100644
index 0000000..73475b0
--- /dev/null
+++ 
b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/sockets/SslSocketChannel.java
@@ -0,0 +1,251 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.ipc.sockets;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.SocketChannel;
+
+import javax.net.ssl.SSLEngine;
+import javax.net.ssl.SSLEngineResult;
+import javax.net.ssl.SSLSession;
+
+import org.apache.hyracks.api.network.ISocketChannel;
+import org.apache.hyracks.util.NetworkUtil;
+import org.apache.hyracks.util.StorageUtil;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+public class SslSocketChannel implements ISocketChannel {
+
+    private static final Logger LOGGER = LogManager.getLogger();
+    private static final int DEFAULT_APP_BUFFER_SIZE =
+            StorageUtil.getIntSizeInBytes(1, StorageUtil.StorageUnit.MEGABYTE);
+    private final SocketChannel socketChannel;
+    private final SSLEngine engine;
+    private ByteBuffer outEncryptedData;
+    private ByteBuffer inAppData;
+    private ByteBuffer inEncryptedData;
+    private boolean partialRecord = false;
+    private boolean cachedData = false;
+    private boolean pendingWrite = false;
+
+    public SslSocketChannel(SocketChannel socketChannel, SSLEngine engine) {
+        this.socketChannel = socketChannel;
+        this.engine = engine;
+        inAppData = ByteBuffer.allocate(DEFAULT_APP_BUFFER_SIZE);
+        inAppData.limit(0);
+        final SSLSession sslSession = engine.getSession();
+        inEncryptedData = 
ByteBuffer.allocate(sslSession.getPacketBufferSize());
+        outEncryptedData = 
ByteBuffer.allocate(sslSession.getPacketBufferSize());
+        outEncryptedData.limit(0);
+    }
+
+    @Override
+    public synchronized boolean handshake() {
+        try {
+            LOGGER.debug("starting SSL handshake {}", this);
+            engine.beginHandshake();
+            final SslHandshake sslHandshake = new SslHandshake(this);
+            final boolean success = sslHandshake.handshake();
+            if (success) {
+                LOGGER.debug("SSL handshake successful {}", this);
+            }
+            return success;
+        } catch (Exception e) {
+            LOGGER.error("handshake failed {}", this, e);
+            throw new IllegalStateException(e);
+        }
+    }
+
+    @Override
+    public boolean requiresHandshake() {
+        return true;
+    }
+
+    @Override
+    public synchronized int read(ByteBuffer buffer) throws IOException {
+        int transfereeBytes = 0;
+        if (cachedData) {
+            transfereeBytes += transferTo(inAppData, buffer);
+        }
+        if (buffer.hasRemaining()) {
+            if (!partialRecord) {
+                inEncryptedData.clear();
+            }
+            final int bytesRead = socketChannel.read(inEncryptedData);
+            if (bytesRead > 0) {
+                partialRecord = false;
+                inEncryptedData.flip();
+                inAppData.clear();
+                if (decrypt() > 0) {
+                    inAppData.flip();
+                    transfereeBytes += transferTo(inAppData, buffer);
+                } else {
+                    inAppData.limit(0);
+                }
+            } else if (bytesRead < 0) {
+                handleEndOfStreamQuietly();
+                return -1;
+            }
+        }
+        cachedData = inAppData.hasRemaining();
+        return transfereeBytes;
+    }
+
+    private int decrypt() throws IOException {
+        int decryptedBytes = 0;
+        while (inEncryptedData.hasRemaining() && !partialRecord) {
+            SSLEngineResult result = engine.unwrap(inEncryptedData, inAppData);
+            switch (result.getStatus()) {
+                case OK:
+                    decryptedBytes += result.bytesProduced();
+                    partialRecord = false;
+                    break;
+                case BUFFER_OVERFLOW:
+                    inAppData = 
NetworkUtil.enlargeSslApplicationBuffer(engine, inAppData);
+                    break;
+                case BUFFER_UNDERFLOW:
+                    handleReadUnderflow();
+                    break;
+                case CLOSED:
+                    close();
+                    return -1;
+                default:
+                    throw new IllegalStateException("Invalid SSL result 
status: " + result.getStatus());
+            }
+        }
+        return decryptedBytes;
+    }
+
+    public synchronized int write(ByteBuffer src) throws IOException {
+        if (pendingWrite && !completeWrite()) {
+            return 0;
+        }
+        int encryptedBytes = 0;
+        while (src.hasRemaining()) {
+            // chunk src to encrypted ssl records of pocket size
+            outEncryptedData.clear();
+            final SSLEngineResult result = engine.wrap(src, outEncryptedData);
+            switch (result.getStatus()) {
+                case OK:
+                    outEncryptedData.flip();
+                    encryptedBytes += result.bytesConsumed();
+                    while (outEncryptedData.hasRemaining()) {
+                        final int written = 
socketChannel.write(outEncryptedData);
+                        if (written == 0) {
+                            pendingWrite = true;
+                            return encryptedBytes;
+                        }
+                    }
+                    break;
+                case BUFFER_OVERFLOW:
+                    outEncryptedData = 
NetworkUtil.enlargeSslPacketBuffer(engine, outEncryptedData);
+                    break;
+                case CLOSED:
+                    close();
+                    return -1;
+                case BUFFER_UNDERFLOW:
+                default:
+                    throw new IllegalStateException("Invalid SSL result 
status: " + result.getStatus());
+            }
+        }
+        pendingWrite = false;
+        return encryptedBytes;
+    }
+
+    @Override
+    public synchronized boolean completeWrite() throws IOException {
+        while (outEncryptedData.hasRemaining()) {
+            final int written = socketChannel.write(outEncryptedData);
+            if (written == 0) {
+                return false;
+            }
+        }
+        pendingWrite = false;
+        return true;
+    }
+
+    @Override
+    public synchronized void close() throws IOException {
+        engine.closeOutbound();
+        new SslHandshake(this).handshake();
+        socketChannel.close();
+    }
+
+    @Override
+    public SocketChannel getSocketChannel() {
+        return socketChannel;
+    }
+
+    @Override
+    public synchronized boolean isPendingRead() {
+        return cachedData;
+    }
+
+    @Override
+    public synchronized boolean isPendingWrite() {
+        return pendingWrite;
+    }
+
+    public SSLEngine getSslEngine() {
+        return engine;
+    }
+
+    @Override
+    public String toString() {
+        return getConnectionInfo();
+    }
+
+    private void handleReadUnderflow() {
+        if (engine.getSession().getPacketBufferSize() > 
inEncryptedData.capacity()) {
+            inEncryptedData = NetworkUtil.enlargeSslPacketBuffer(engine, 
inEncryptedData);
+        } else {
+            inEncryptedData.compact();
+        }
+        partialRecord = true;
+    }
+
+    private void handleEndOfStreamQuietly() {
+        try {
+            engine.closeInbound();
+            close();
+        } catch (Exception e) {
+            LOGGER.warn("failed to close socket gracefully", e);
+        }
+    }
+
+    private String getConnectionInfo() {
+        try {
+            return getSocketChannel().getLocalAddress() + " -> " + 
getSocketChannel().getRemoteAddress();
+        } catch (IOException e) {
+            LOGGER.warn("failed to get connection info", e);
+            return "";
+        }
+    }
+
+    private static int transferTo(ByteBuffer src, ByteBuffer dst) {
+        final int maxTransfer = Math.min(dst.remaining(), src.remaining());
+        if (maxTransfer > 0) {
+            dst.put(src.array(), src.arrayOffset() + src.position(), 
maxTransfer);
+            src.position(src.position() + maxTransfer);
+        }
+        return maxTransfer;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/595a0f3e/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/sockets/SslSocketChannelFactory.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/sockets/SslSocketChannelFactory.java
 
b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/sockets/SslSocketChannelFactory.java
new file mode 100644
index 0000000..2a2fb62
--- /dev/null
+++ 
b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/sockets/SslSocketChannelFactory.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.ipc.sockets;
+
+import java.nio.channels.SocketChannel;
+
+import javax.net.ssl.SSLEngine;
+
+import org.apache.hyracks.api.network.INetworkSecurityManager;
+import org.apache.hyracks.api.network.ISocketChannel;
+import org.apache.hyracks.api.network.ISocketChannelFactory;
+
+public class SslSocketChannelFactory implements ISocketChannelFactory {
+
+    private final INetworkSecurityManager networkSecurityManager;
+
+    public SslSocketChannelFactory(INetworkSecurityManager 
networkSecurityManager) {
+        this.networkSecurityManager = networkSecurityManager;
+    }
+
+    @Override
+    public ISocketChannel createServerChannel(SocketChannel socketChannel) {
+        final SSLEngine sslEngine = networkSecurityManager.newSSLEngine();
+        sslEngine.setUseClientMode(false);
+        return new SslSocketChannel(socketChannel, sslEngine);
+    }
+
+    @Override
+    public ISocketChannel createClientChannel(SocketChannel socketChannel) {
+        final SSLEngine sslEngine = networkSecurityManager.newSSLEngine();
+        sslEngine.setUseClientMode(true);
+        return new SslSocketChannel(socketChannel, sslEngine);
+    }
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/595a0f3e/hyracks-fullstack/hyracks/hyracks-ipc/src/test/java/org/apache/hyracks/ipc/tests/IPCTest.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/hyracks/hyracks-ipc/src/test/java/org/apache/hyracks/ipc/tests/IPCTest.java
 
b/hyracks-fullstack/hyracks/hyracks-ipc/src/test/java/org/apache/hyracks/ipc/tests/IPCTest.java
index 70a0e18..00bd761 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-ipc/src/test/java/org/apache/hyracks/ipc/tests/IPCTest.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-ipc/src/test/java/org/apache/hyracks/ipc/tests/IPCTest.java
@@ -29,6 +29,7 @@ import org.apache.hyracks.ipc.api.RPCInterface;
 import org.apache.hyracks.ipc.exceptions.IPCException;
 import org.apache.hyracks.ipc.impl.IPCSystem;
 import 
org.apache.hyracks.ipc.impl.JavaSerializationBasedPayloadSerializerDeserializer;
+import org.apache.hyracks.ipc.sockets.PlainSocketChannelFactory;
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -83,12 +84,12 @@ public class IPCTest {
                 });
             }
         };
-        return new IPCSystem(new InetSocketAddress("127.0.0.1", 0), ipci,
+        return new IPCSystem(new InetSocketAddress("127.0.0.1", 0), 
PlainSocketChannelFactory.INSTANCE, ipci,
                 new JavaSerializationBasedPayloadSerializerDeserializer());
     }
 
     private IPCSystem createClientIPCSystem(RPCInterface rpci) throws 
IOException {
-        return new IPCSystem(new InetSocketAddress("127.0.0.1", 0), rpci,
+        return new IPCSystem(new InetSocketAddress("127.0.0.1", 0), 
PlainSocketChannelFactory.INSTANCE, rpci,
                 new JavaSerializationBasedPayloadSerializerDeserializer());
     }
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/595a0f3e/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/NetworkUtil.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/NetworkUtil.java
 
b/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/NetworkUtil.java
index c6b76fc..d10be8e 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/NetworkUtil.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/NetworkUtil.java
@@ -24,10 +24,13 @@ import java.net.InetSocketAddress;
 import java.net.StandardSocketOptions;
 import java.net.URI;
 import java.net.URISyntaxException;
+import java.nio.ByteBuffer;
 import java.nio.channels.SocketChannel;
 import java.util.ArrayList;
 import java.util.List;
 
+import javax.net.ssl.SSLEngine;
+
 import org.apache.http.HttpHost;
 import org.apache.http.client.utils.URIBuilder;
 import org.apache.http.conn.util.InetAddressUtils;
@@ -107,4 +110,24 @@ public class NetworkUtil {
         return hostname.length() > 0 && hostname.charAt(0) == '[' ? 
hostname.substring(1, hostname.length() - 1)
                 : hostname;
     }
+
+    public static ByteBuffer enlargeSslPacketBuffer(SSLEngine engine, 
ByteBuffer buffer) {
+        return enlargeSslBuffer(buffer, 
engine.getSession().getPacketBufferSize());
+    }
+
+    public static ByteBuffer enlargeSslApplicationBuffer(SSLEngine engine, 
ByteBuffer buffer) {
+        return enlargeSslBuffer(buffer, 
engine.getSession().getApplicationBufferSize());
+    }
+
+    public static ByteBuffer enlargeSslBuffer(ByteBuffer src, int 
sessionProposedCapacity) {
+        final ByteBuffer enlargedBuffer;
+        if (sessionProposedCapacity > src.capacity()) {
+            enlargedBuffer = ByteBuffer.allocate(sessionProposedCapacity);
+        } else {
+            enlargedBuffer = ByteBuffer.allocate(src.capacity() * 2);
+        }
+        src.flip();
+        enlargedBuffer.put(src);
+        return enlargedBuffer;
+    }
 }

Reply via email to