Repository: asterixdb Updated Branches: refs/heads/master aa0d1df18 -> 11c83fae1
[ASTERIXDB-2490][NET] Support Encrypted Replication Connections - user model changes: no - storage format changes: no - interface changes: yes Details: - Use SocketChannelFactory in replication connections to support both unencrypted and encrypted sockets. - Add SSL replication test cases. - Make SslSocketChannel close idempotent. - Ensure FlushDatasetOperatorDescriptor waits for all on-going dataset IO. Change-Id: I9657624a5d54d4966357651efb671f3d8f0cb304 Reviewed-on: https://asterix-gerrit.ics.uci.edu/3092 Sonar-Qube: Jenkins <[email protected]> Integration-Tests: Jenkins <[email protected]> Tested-by: Jenkins <[email protected]> Contrib: Jenkins <[email protected]> Reviewed-by: Murtadha Hubail <[email protected]> Reviewed-by: Michael Blow <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/asterixdb/repo Commit: http://git-wip-us.apache.org/repos/asf/asterixdb/commit/11c83fae Tree: http://git-wip-us.apache.org/repos/asf/asterixdb/tree/11c83fae Diff: http://git-wip-us.apache.org/repos/asf/asterixdb/diff/11c83fae Branch: refs/heads/master Commit: 11c83fae1bd3514576b5c1a3e2d265d486cfa4ea Parents: aa0d1df Author: Murtadha Hubail <[email protected]> Authored: Mon Dec 17 17:23:51 2018 +0300 Committer: Murtadha Hubail <[email protected]> Committed: Wed Dec 19 09:00:06 2018 -0800 ---------------------------------------------------------------------- .../runtime/SslReplicationExecutionTest.java | 96 ++++++++++++++++++++ .../src/test/resources/cc-rep-ssl.conf | 61 +++++++++++++ .../replication/api/IReplicationWorker.java | 5 +- .../replication/api/PartitionReplica.java | 13 +-- .../replication/api/ReplicationDestination.java | 18 ++-- .../replication/logging/RemoteLogsNotifier.java | 2 +- .../management/LogReplicationManager.java | 34 +++---- .../replication/management/NetworkingUtil.java | 12 +-- .../management/ReplicationChannel.java | 41 ++++++--- .../messaging/ReplicateLogsTask.java | 3 +- .../messaging/ReplicationProtocol.java | 43 +++++++-- .../replication/sync/FileSynchronizer.java | 3 +- .../sync/ReplicaFilesSynchronizer.java | 3 +- .../std/FlushDatasetOperatorDescriptor.java | 1 + .../hyracks/api/network/ISocketChannel.java | 9 +- .../hyracks/ipc/sockets/SslSocketChannel.java | 8 +- 16 files changed, 278 insertions(+), 74 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/asterixdb/blob/11c83fae/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/SslReplicationExecutionTest.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/SslReplicationExecutionTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/SslReplicationExecutionTest.java new file mode 100644 index 0000000..14aac1f --- /dev/null +++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/SslReplicationExecutionTest.java @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.test.runtime; + +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; + +import org.apache.asterix.common.api.INcApplicationContext; +import org.apache.asterix.test.common.TestExecutor; +import org.apache.asterix.testframework.context.TestCaseContext; +import org.apache.hyracks.control.common.controllers.NCConfig; +import org.apache.hyracks.control.nc.NodeControllerService; +import org.apache.hyracks.test.support.TestUtils; +import org.junit.After; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameters; + +@RunWith(Parameterized.class) +public class SslReplicationExecutionTest { + protected static final String TEST_CONFIG_FILE_NAME = "src/test/resources/cc-rep-ssl.conf"; + private static final TestExecutor testExecutor = new TestExecutor(); + private static boolean configured = false; + + @BeforeClass + public static void setUp() { + LangExecutionUtil.setCheckStorageDistribution(false); + } + + @Before + public void before() throws Exception { + TestUtils.redirectLoggingToConsole(); + LangExecutionUtil.setUp(TEST_CONFIG_FILE_NAME, testExecutor); + if (!configured) { + final NodeControllerService[] ncs = ExecutionTestUtil.integrationUtil.ncs; + Map<String, InetSocketAddress> ncEndPoints = new HashMap<>(); + Map<String, InetSocketAddress> replicationAddress = new HashMap<>(); + final String ip = InetAddress.getLoopbackAddress().getHostAddress(); + for (NodeControllerService nc : ncs) { + final String nodeId = nc.getId(); + final INcApplicationContext appCtx = (INcApplicationContext) nc.getApplicationContext(); + int apiPort = appCtx.getExternalProperties().getNcApiPort(); + int replicationPort = + (int) appCtx.getServiceContext().getAppConfig().get(NCConfig.Option.REPLICATION_LISTEN_PORT); + ncEndPoints.put(nodeId, InetSocketAddress.createUnresolved(ip, apiPort)); + replicationAddress.put(nodeId, InetSocketAddress.createUnresolved(ip, replicationPort)); + } + testExecutor.setNcEndPoints(ncEndPoints); + testExecutor.setNcReplicationAddress(replicationAddress); + configured = true; + } + } + + @After + public void after() throws Exception { + LangExecutionUtil.tearDown(); + } + + @Parameters(name = "ReplicationExecutionTest {index}: {0}") + public static Collection<Object[]> tests() throws Exception { + return LangExecutionUtil.tests("replication.xml", "replication.xml"); + } + + protected TestCaseContext tcCtx; + + public SslReplicationExecutionTest(TestCaseContext tcCtx) { + this.tcCtx = tcCtx; + } + + @Test + public void test() throws Exception { + LangExecutionUtil.test(tcCtx); + } +} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/11c83fae/asterixdb/asterix-app/src/test/resources/cc-rep-ssl.conf ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/test/resources/cc-rep-ssl.conf b/asterixdb/asterix-app/src/test/resources/cc-rep-ssl.conf new file mode 100644 index 0000000..db4ca20 --- /dev/null +++ b/asterixdb/asterix-app/src/test/resources/cc-rep-ssl.conf @@ -0,0 +1,61 @@ +; Licensed to the Apache Software Foundation (ASF) under one +; or more contributor license agreements. See the NOTICE file +; distributed with this work for additional information +; regarding copyright ownership. The ASF licenses this file +; to you under the Apache License, Version 2.0 (the +; "License"); you may not use this file except in compliance +; with the License. You may obtain a copy of the License at +; +; http://www.apache.org/licenses/LICENSE-2.0 +; +; Unless required by applicable law or agreed to in writing, +; software distributed under the License is distributed on an +; "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +; KIND, either express or implied. See the License for the +; specific language governing permissions and limitations +; under the License. + +[nc/asterix_nc1] +txn.log.dir=../asterix-server/target/tmp/asterix_nc1/txnlog +core.dump.dir=../asterix-server/target/tmp/asterix_nc1/coredump +iodevices=asterix_nc1/iodevice1,asterix_nc1/iodevice2 +#jvm.args=-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=5006 +replication.listen.port=2001 +nc.api.port=19004 +key.store.path=security/nc1/asterix_nc1.jks +key.store.password=asterixdb +trust.store.path=security/root/root.truststore + +[nc/asterix_nc2] +ncservice.port=9091 +txn.log.dir=../asterix-server/target/tmp/asterix_nc2/txnlog +core.dump.dir=../asterix-server/target/tmp/asterix_nc2/coredump +iodevices=asterix_nc2/iodevice1,asterix_nc2/iodevice2 +#jvm.args=-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=5007 +replication.listen.port=2002 +nc.api.port=19005 +key.store.path=security/nc2/asterix_nc2.jks +key.store.password=asterixdb +trust.store.path=security/root/root.truststore + +[nc] +address=127.0.0.1 +command=asterixnc +app.class=org.apache.asterix.hyracks.bootstrap.NCApplication +jvm.args=-Xmx4096m -Dnode.Resolver="org.apache.asterix.external.util.IdentitiyResolverFactory" +storage.memorycomponent.globalbudget = 1073741824 + +[cc] +address = 127.0.0.1 +app.class=org.apache.asterix.hyracks.bootstrap.CCApplication +heartbeat.period=2000 +heartbeat.max.misses=25 +key.store.path=security/cc/cc.jks +key.store.password=asterixdb +trust.store.path=security/root/root.truststore + +[common] +log.level = INFO +replication.enabled=true +replication.strategy=all +ssl.enabled=true \ No newline at end of file http://git-wip-us.apache.org/repos/asf/asterixdb/blob/11c83fae/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/api/IReplicationWorker.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/api/IReplicationWorker.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/api/IReplicationWorker.java index c8abe8f..c7b2561 100644 --- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/api/IReplicationWorker.java +++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/api/IReplicationWorker.java @@ -19,14 +19,15 @@ package org.apache.asterix.replication.api; import java.nio.ByteBuffer; -import java.nio.channels.SocketChannel; + +import org.apache.hyracks.api.network.ISocketChannel; public interface IReplicationWorker extends Runnable { /** * @return The replication socket channel. */ - SocketChannel getChannel(); + ISocketChannel getChannel(); /** * Gets a reusable buffer that can be used to send data http://git-wip-us.apache.org/repos/asf/asterixdb/blob/11c83fae/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/api/PartitionReplica.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/api/PartitionReplica.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/api/PartitionReplica.java index 8847e7e..0e85665 100644 --- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/api/PartitionReplica.java +++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/api/PartitionReplica.java @@ -24,7 +24,6 @@ import static org.apache.asterix.common.replication.IPartitionReplica.PartitionR import java.io.IOException; import java.nio.ByteBuffer; -import java.nio.channels.SocketChannel; import org.apache.asterix.common.api.INcApplicationContext; import org.apache.asterix.common.exceptions.ReplicationException; @@ -32,6 +31,7 @@ import org.apache.asterix.common.replication.IPartitionReplica; import org.apache.asterix.common.storage.ReplicaIdentifier; import org.apache.asterix.replication.messaging.ReplicationProtocol; import org.apache.asterix.replication.sync.ReplicaSynchronizer; +import org.apache.hyracks.api.network.ISocketChannel; import org.apache.hyracks.util.NetworkUtil; import org.apache.hyracks.util.StorageUtil; import org.apache.hyracks.util.annotations.ThreadSafe; @@ -53,7 +53,7 @@ public class PartitionReplica implements IPartitionReplica { private final ReplicaIdentifier id; private ByteBuffer reusbaleBuf; private PartitionReplicaStatus status = DISCONNECTED; - private SocketChannel sc; + private ISocketChannel sc; public PartitionReplica(ReplicaIdentifier id, INcApplicationContext appCtx) { this.id = id; @@ -93,13 +93,10 @@ public class PartitionReplica implements IPartitionReplica { }); } - public synchronized SocketChannel getChannel() { + public synchronized ISocketChannel getChannel() { try { - if (sc == null || !sc.isOpen() || !sc.isConnected()) { - sc = SocketChannel.open(); - NetworkUtil.configure(sc); - sc.configureBlocking(true); - sc.connect(id.getLocation()); + if (sc == null || !sc.getSocketChannel().isOpen() || !sc.getSocketChannel().isConnected()) { + sc = ReplicationProtocol.establishReplicaConnection(appCtx, id.getLocation()); } return sc; } catch (IOException e) { http://git-wip-us.apache.org/repos/asf/asterixdb/blob/11c83fae/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/api/ReplicationDestination.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/api/ReplicationDestination.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/api/ReplicationDestination.java index 8ccfced..eda37b5 100644 --- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/api/ReplicationDestination.java +++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/api/ReplicationDestination.java @@ -20,17 +20,17 @@ package org.apache.asterix.replication.api; import java.io.IOException; import java.net.InetSocketAddress; -import java.nio.channels.SocketChannel; import java.util.HashSet; import java.util.Objects; import java.util.Optional; import java.util.Set; +import org.apache.asterix.common.api.INcApplicationContext; import org.apache.asterix.common.exceptions.ReplicationException; import org.apache.asterix.common.replication.IPartitionReplica; import org.apache.asterix.common.replication.IReplicationDestination; import org.apache.asterix.replication.messaging.ReplicationProtocol; -import org.apache.hyracks.util.NetworkUtil; +import org.apache.hyracks.api.network.ISocketChannel; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -39,7 +39,7 @@ public class ReplicationDestination implements IReplicationDestination { private static final Logger LOGGER = LogManager.getLogger(); private final Set<IPartitionReplica> replicas = new HashSet<>(); private final InetSocketAddress location; - private SocketChannel logRepChannel; + private ISocketChannel logRepChannel; private ReplicationDestination(InetSocketAddress location) { this.location = location; @@ -75,13 +75,11 @@ public class ReplicationDestination implements IReplicationDestination { && replica.getStatus() == IPartitionReplica.PartitionReplicaStatus.IN_SYNC).findAny(); } - public synchronized SocketChannel getLogReplicationChannel() { + public synchronized ISocketChannel getLogReplicationChannel(INcApplicationContext appCtx) { try { - if (logRepChannel == null || !logRepChannel.isOpen() || !logRepChannel.isConnected()) { - logRepChannel = SocketChannel.open(); - NetworkUtil.configure(logRepChannel); - logRepChannel.configureBlocking(true); - logRepChannel.connect(location); + if (logRepChannel == null || !logRepChannel.getSocketChannel().isOpen() + || !logRepChannel.getSocketChannel().isConnected()) { + logRepChannel = ReplicationProtocol.establishReplicaConnection(appCtx, location); } return logRepChannel; } catch (IOException e) { @@ -91,7 +89,7 @@ public class ReplicationDestination implements IReplicationDestination { private synchronized void closeLogReplicationChannel() { try { - if (logRepChannel != null && logRepChannel.isOpen()) { + if (logRepChannel != null && logRepChannel.getSocketChannel().isOpen()) { ReplicationProtocol.sendGoodbye(logRepChannel); logRepChannel.close(); logRepChannel = null; http://git-wip-us.apache.org/repos/asf/asterixdb/blob/11c83fae/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/logging/RemoteLogsNotifier.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/logging/RemoteLogsNotifier.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/logging/RemoteLogsNotifier.java index 366abce..440f8ef 100644 --- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/logging/RemoteLogsNotifier.java +++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/logging/RemoteLogsNotifier.java @@ -65,7 +65,7 @@ class RemoteLogsNotifier implements Runnable { case LogType.JOB_COMMIT: case LogType.ABORT: // send ACK to requester - logRecord.getReplicationWorker().getChannel().socket().getOutputStream() + logRecord.getReplicationWorker().getChannel().getSocketChannel().socket().getOutputStream() .write((nodeId + ReplicationProtocol.LOG_REPLICATION_ACK + logRecord.getTxnId() + System.lineSeparator()).getBytes()); break; http://git-wip-us.apache.org/repos/asf/asterixdb/blob/11c83fae/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/LogReplicationManager.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/LogReplicationManager.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/LogReplicationManager.java index 6c8e372..0bcffc6 100644 --- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/LogReplicationManager.java +++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/LogReplicationManager.java @@ -23,7 +23,6 @@ import java.io.IOException; import java.io.InputStreamReader; import java.nio.ByteBuffer; import java.nio.channels.AsynchronousCloseException; -import java.nio.channels.SocketChannel; import java.util.HashMap; import java.util.HashSet; import java.util.Map; @@ -46,6 +45,7 @@ import org.apache.asterix.replication.logging.TxnAckTracker; import org.apache.asterix.replication.logging.TxnLogReplicator; import org.apache.asterix.replication.messaging.ReplicateLogsTask; import org.apache.asterix.replication.messaging.ReplicationProtocol; +import org.apache.hyracks.api.network.ISocketChannel; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -55,17 +55,17 @@ public class LogReplicationManager { private final LinkedBlockingQueue<ReplicationLogBuffer> emptyLogBuffersQ; private final LinkedBlockingQueue<ReplicationLogBuffer> pendingFlushLogBuffersQ; private final ByteBuffer txnLogsBatchSizeBuffer = ByteBuffer.allocate(Integer.BYTES); - private final Map<ReplicationDestination, SocketChannel> destinations = new HashMap<>(); + private final Map<ReplicationDestination, ISocketChannel> destinations = new HashMap<>(); private final IReplicationManager replicationManager; private final Executor executor; private final TxnAckTracker ackTracker = new TxnAckTracker(); - private final Set<SocketChannel> failedSockets = new HashSet<>(); + private final Set<ISocketChannel> failedSockets = new HashSet<>(); private final Object transferLock = new Object(); private final INcApplicationContext appCtx; private final int logPageSize; private final int logBatchSize; private ReplicationLogBuffer currentTxnLogBuffer; - private SocketChannel[] destSockets; + private ISocketChannel[] destSockets; public LogReplicationManager(INcApplicationContext appCtx, IReplicationManager replicationManager) { this.appCtx = appCtx; @@ -100,11 +100,11 @@ public class LogReplicationManager { return; } LOGGER.info(() -> "register " + dest); - SocketChannel socketChannel = dest.getLogReplicationChannel(); + ISocketChannel socketChannel = dest.getLogReplicationChannel(appCtx); handshake(dest, socketChannel); destinations.put(dest, socketChannel); failedSockets.remove(socketChannel); - destSockets = destinations.values().toArray(new SocketChannel[destinations.size()]); + destSockets = destinations.values().toArray(new ISocketChannel[0]); } } } @@ -117,9 +117,9 @@ public class LogReplicationManager { } LOGGER.info(() -> "unregister " + dest); ackTracker.unregister(dest); - SocketChannel destSocket = destinations.remove(dest); + ISocketChannel destSocket = destinations.remove(dest); failedSockets.remove(destSocket); - destSockets = destinations.values().toArray(new SocketChannel[destinations.size()]); + destSockets = destinations.values().toArray(new ISocketChannel[0]); endReplication(destSocket); } } @@ -143,7 +143,7 @@ public class LogReplicationManager { buffer.mark(); synchronized (transferLock) { if (destSockets != null) { - for (SocketChannel replicaSocket : destSockets) { + for (ISocketChannel replicaSocket : destSockets) { try { // send batch size then the batch itself NetworkingUtil.transferBufferToChannel(replicaSocket, txnLogsBatchSizeBuffer); @@ -192,15 +192,15 @@ public class LogReplicationManager { pendingFlushLogBuffersQ.add(currentTxnLogBuffer); } - private void handshake(ReplicationDestination dest, SocketChannel socketChannel) { + private void handshake(ReplicationDestination dest, ISocketChannel socketChannel) { final String nodeId = appCtx.getServiceContext().getNodeId(); final ReplicateLogsTask task = new ReplicateLogsTask(nodeId); ReplicationProtocol.sendTo(socketChannel, task, null); executor.execute(new TxnAckListener(dest, socketChannel)); } - private void endReplication(SocketChannel socketChannel) { - if (socketChannel.isConnected()) { + private void endReplication(ISocketChannel socketChannel) { + if (socketChannel.getSocketChannel().isConnected()) { // end log replication (by sending a dummy log with a single byte) final ByteBuffer endLogRepBuffer = ReplicationProtocol.getEndLogReplicationBuffer(); try { @@ -211,7 +211,7 @@ public class LogReplicationManager { } } - private synchronized void handleFailure(SocketChannel replicaSocket, IOException e) { + private synchronized void handleFailure(ISocketChannel replicaSocket, IOException e) { if (failedSockets.contains(replicaSocket)) { return; } @@ -224,9 +224,9 @@ public class LogReplicationManager { private class TxnAckListener implements Runnable { private final ReplicationDestination dest; - private final SocketChannel replicaSocket; + private final ISocketChannel replicaSocket; - TxnAckListener(ReplicationDestination dest, SocketChannel replicaSocket) { + TxnAckListener(ReplicationDestination dest, ISocketChannel replicaSocket) { this.dest = dest; this.replicaSocket = replicaSocket; } @@ -235,8 +235,8 @@ public class LogReplicationManager { public void run() { Thread.currentThread().setName("TxnAckListener (" + dest + ")"); LOGGER.info("Started listening on socket: {}", dest); - try (BufferedReader incomingResponse = - new BufferedReader(new InputStreamReader(replicaSocket.socket().getInputStream()))) { + try (BufferedReader incomingResponse = new BufferedReader( + new InputStreamReader(replicaSocket.getSocketChannel().socket().getInputStream()))) { while (true) { final String response = incomingResponse.readLine(); if (response == null) { http://git-wip-us.apache.org/repos/asf/asterixdb/blob/11c83fae/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/NetworkingUtil.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/NetworkingUtil.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/NetworkingUtil.java index c93920f..30ad72c 100644 --- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/NetworkingUtil.java +++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/NetworkingUtil.java @@ -33,6 +33,7 @@ import java.nio.channels.SocketChannel; import java.util.Enumeration; import org.apache.hyracks.api.comm.NetworkAddress; +import org.apache.hyracks.api.network.ISocketChannel; public class NetworkingUtil { @@ -40,7 +41,7 @@ public class NetworkingUtil { throw new AssertionError("This util class should not be initialized."); } - public static void readBytes(SocketChannel socketChannel, ByteBuffer byteBuffer, int length) throws IOException { + public static void readBytes(ISocketChannel socketChannel, ByteBuffer byteBuffer, int length) throws IOException { byteBuffer.clear(); byteBuffer.limit(length); @@ -53,7 +54,7 @@ public class NetworkingUtil { byteBuffer.flip(); } - public static void sendFile(FileChannel fileChannel, SocketChannel socketChannel) throws IOException { + public static void sendFile(FileChannel fileChannel, ISocketChannel socketChannel) throws IOException { long pos = 0; long fileSize = fileChannel.size(); long remainingBytes = fileSize; @@ -63,11 +64,10 @@ public class NetworkingUtil { pos += transferredBytes; remainingBytes -= transferredBytes; } - - socketChannel.socket().getOutputStream().flush(); + socketChannel.getSocketChannel().socket().getOutputStream().flush(); } - public static void downloadFile(FileChannel fileChannel, SocketChannel socketChannel) throws IOException { + public static void downloadFile(FileChannel fileChannel, ISocketChannel socketChannel) throws IOException { long pos = 0; long fileSize = fileChannel.size(); long count = fileSize; @@ -97,7 +97,7 @@ public class NetworkingUtil { return hostName; } - public static void transferBufferToChannel(SocketChannel socketChannel, ByteBuffer requestBuffer) + public static void transferBufferToChannel(ISocketChannel socketChannel, ByteBuffer requestBuffer) throws IOException { while (requestBuffer.hasRemaining()) { socketChannel.write(requestBuffer); http://git-wip-us.apache.org/repos/asf/asterixdb/blob/11c83fae/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationChannel.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationChannel.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationChannel.java index 1f6efa8..3dc094e 100644 --- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationChannel.java +++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationChannel.java @@ -35,6 +35,10 @@ import org.apache.asterix.replication.logging.RemoteLogsProcessor; import org.apache.asterix.replication.messaging.ReplicationProtocol; import org.apache.asterix.replication.messaging.ReplicationProtocol.ReplicationRequestType; import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.api.network.INetworkSecurityManager; +import org.apache.hyracks.api.network.ISocketChannel; +import org.apache.hyracks.api.network.ISocketChannelFactory; +import org.apache.hyracks.util.NetworkUtil; import org.apache.logging.log4j.Level; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -70,9 +74,7 @@ public class ReplicationChannel extends Thread implements IReplicationChannel { LOGGER.log(Level.INFO, "opened Replication Channel @ IP Address: " + nodeIP + ":" + dataPort); while (serverSocketChannel.isOpen()) { SocketChannel socketChannel = serverSocketChannel.accept(); - socketChannel.configureBlocking(true); - //start a new thread to handle the request - appCtx.getThreadExecutor().execute(new ReplicationWorker(socketChannel)); + connectionAccepted(socketChannel); } } catch (AsynchronousCloseException e) { LOGGER.debug("Replication channel closed", e); @@ -93,12 +95,27 @@ public class ReplicationChannel extends Thread implements IReplicationChannel { } } + private void connectionAccepted(SocketChannel socketChannel) { + try { + NetworkUtil.configure(socketChannel); + socketChannel.configureBlocking(false); + final INetworkSecurityManager networkSecurityManager = + appCtx.getServiceContext().getControllerService().getNetworkSecurityManager(); + final ISocketChannelFactory socketChannelFactory = networkSecurityManager.getSocketChannelFactory(); + final ISocketChannel serverChannel = socketChannelFactory.createServerChannel(socketChannel); + //start a new thread to handle the request + appCtx.getThreadExecutor().execute(new ReplicationWorker(serverChannel)); + } catch (Exception e) { + LOGGER.error("failed to process accepted connection", e); + } + } + private class ReplicationWorker implements IReplicationWorker { - private final SocketChannel socketChannel; + private final ISocketChannel socketChannel; private final ByteBuffer inBuffer; private final ByteBuffer outBuffer; - public ReplicationWorker(SocketChannel socketChannel) { + public ReplicationWorker(ISocketChannel socketChannel) { this.socketChannel = socketChannel; inBuffer = ByteBuffer.allocate(ReplicationProtocol.INITIAL_BUFFER_SIZE); outBuffer = ByteBuffer.allocate(ReplicationProtocol.INITIAL_BUFFER_SIZE); @@ -108,6 +125,10 @@ public class ReplicationChannel extends Thread implements IReplicationChannel { public void run() { Thread.currentThread().setName("Replication Worker"); try { + if (socketChannel.requiresHandshake() && !socketChannel.handshake()) { + return; + } + socketChannel.getSocketChannel().configureBlocking(true); ReplicationRequestType requestType = ReplicationProtocol.getRequestType(socketChannel, inBuffer); while (requestType != ReplicationRequestType.GOODBYE) { handle(requestType); @@ -116,18 +137,12 @@ public class ReplicationChannel extends Thread implements IReplicationChannel { } catch (Exception e) { LOGGER.warn("Unexpected error during replication.", e); } finally { - if (socketChannel.isOpen()) { - try { - socketChannel.close(); - } catch (IOException e) { - LOGGER.warn("Failed to close replication socket.", e); - } - } + NetworkUtil.closeQuietly(socketChannel); } } @Override - public SocketChannel getChannel() { + public ISocketChannel getChannel() { return socketChannel; } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/11c83fae/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/ReplicateLogsTask.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/ReplicateLogsTask.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/ReplicateLogsTask.java index b71f4b8..e38a33d 100644 --- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/ReplicateLogsTask.java +++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/ReplicateLogsTask.java @@ -34,6 +34,7 @@ import org.apache.asterix.replication.logging.RemoteLogRecord; import org.apache.asterix.replication.logging.RemoteLogsProcessor; import org.apache.asterix.replication.management.ReplicationChannel; import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.api.network.ISocketChannel; /** * A task to replicate transaction logs from master replica @@ -53,7 +54,7 @@ public class ReplicateLogsTask implements IReplicaTask { final RemoteLogsProcessor logsProcessor = replicationChannel.getRemoteLogsProcessor(); final ILogManager logManager = appCtx.getTransactionSubsystem().getLogManager(); final RemoteLogRecord reusableLog = new RemoteLogRecord(); - final SocketChannel channel = worker.getChannel(); + final ISocketChannel channel = worker.getChannel(); ByteBuffer logsBuffer = ByteBuffer.allocate(logManager.getLogPageSize()); try { while (true) { http://git-wip-us.apache.org/repos/asf/asterixdb/blob/11c83fae/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/ReplicationProtocol.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/ReplicationProtocol.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/ReplicationProtocol.java index 41e7d9e..c702f1b 100644 --- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/ReplicationProtocol.java +++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/ReplicationProtocol.java @@ -22,17 +22,22 @@ import java.io.ByteArrayInputStream; import java.io.DataInputStream; import java.io.DataOutputStream; import java.io.IOException; +import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.SocketChannel; import java.util.HashMap; import java.util.Map; import java.util.stream.Stream; +import org.apache.asterix.common.api.INcApplicationContext; import org.apache.asterix.common.exceptions.ReplicationException; import org.apache.asterix.replication.api.IReplicationMessage; import org.apache.asterix.replication.api.PartitionReplica; import org.apache.asterix.replication.management.NetworkingUtil; +import org.apache.hyracks.api.network.ISocketChannel; +import org.apache.hyracks.api.network.ISocketChannelFactory; import org.apache.hyracks.data.std.util.ExtendedByteArrayOutputStream; +import org.apache.hyracks.util.NetworkUtil; import org.apache.hyracks.util.StorageUtil; public class ReplicationProtocol { @@ -65,7 +70,7 @@ public class ReplicationProtocol { Stream.of(ReplicationRequestType.values()).forEach(type -> TYPES.put(type.ordinal(), type)); } - public static ByteBuffer readRequest(SocketChannel socketChannel, ByteBuffer dataBuffer) throws IOException { + public static ByteBuffer readRequest(ISocketChannel socketChannel, ByteBuffer dataBuffer) throws IOException { // read request size NetworkingUtil.readBytes(socketChannel, dataBuffer, Integer.BYTES); final int requestSize = dataBuffer.getInt(); @@ -75,7 +80,7 @@ public class ReplicationProtocol { return buf; } - public static ReplicationRequestType getRequestType(SocketChannel socketChannel, ByteBuffer byteBuffer) + public static ReplicationRequestType getRequestType(ISocketChannel socketChannel, ByteBuffer byteBuffer) throws IOException { // read replication request type NetworkingUtil.readBytes(socketChannel, byteBuffer, REPLICATION_REQUEST_TYPE_SIZE); @@ -93,12 +98,12 @@ public class ReplicationProtocol { return Integer.parseInt(msg.substring(msg.indexOf(LOG_REPLICATION_ACK) + 1)); } - public static void sendGoodbye(SocketChannel socketChannel) throws IOException { + public static void sendGoodbye(ISocketChannel socketChannel) throws IOException { ByteBuffer goodbyeBuffer = ReplicationProtocol.getGoodbyeBuffer(); NetworkingUtil.transferBufferToChannel(socketChannel, goodbyeBuffer); } - public static void sendAck(SocketChannel socketChannel, ByteBuffer buf) { + public static void sendAck(ISocketChannel socketChannel, ByteBuffer buf) { try { buf.clear(); buf.putInt(ReplicationRequestType.ACK.ordinal()); @@ -110,7 +115,7 @@ public class ReplicationProtocol { } public static void waitForAck(PartitionReplica replica) throws IOException { - final SocketChannel channel = replica.getChannel(); + final ISocketChannel channel = replica.getChannel(); final ByteBuffer buf = replica.getReusableBuffer(); ReplicationRequestType responseFunction = ReplicationProtocol.getRequestType(channel, buf); if (responseFunction != ReplicationRequestType.ACK) { @@ -119,12 +124,12 @@ public class ReplicationProtocol { } public static void sendTo(PartitionReplica replica, IReplicationMessage task) { - final SocketChannel channel = replica.getChannel(); + final ISocketChannel channel = replica.getChannel(); final ByteBuffer buf = replica.getReusableBuffer(); sendTo(channel, task, buf); } - public static void sendTo(SocketChannel channel, IReplicationMessage task, ByteBuffer buf) { + public static void sendTo(ISocketChannel channel, IReplicationMessage task, ByteBuffer buf) { ExtendedByteArrayOutputStream outputStream = new ExtendedByteArrayOutputStream(); try (DataOutputStream oos = new DataOutputStream(outputStream)) { task.serialize(oos); @@ -135,18 +140,18 @@ public class ReplicationProtocol { requestBuffer.put(outputStream.getByteArray(), 0, outputStream.getLength()); requestBuffer.flip(); NetworkingUtil.transferBufferToChannel(channel, requestBuffer); - channel.socket().getOutputStream().flush(); + channel.getSocketChannel().socket().getOutputStream().flush(); } catch (IOException e) { throw new ReplicationException(e); } } - public static IReplicationMessage read(SocketChannel socketChannel, ByteBuffer buffer) throws IOException { + public static IReplicationMessage read(ISocketChannel socketChannel, ByteBuffer buffer) throws IOException { final ReplicationRequestType type = getRequestType(socketChannel, buffer); return readMessage(type, socketChannel, buffer); } - public static IReplicationMessage readMessage(ReplicationRequestType type, SocketChannel socketChannel, + public static IReplicationMessage readMessage(ReplicationRequestType type, ISocketChannel socketChannel, ByteBuffer buffer) { try { final ByteBuffer requestBuf = ReplicationProtocol.readRequest(socketChannel, buffer); @@ -191,6 +196,24 @@ public class ReplicationProtocol { return endLogRepBuffer; } + public static ISocketChannel establishReplicaConnection(INcApplicationContext appCtx, InetSocketAddress location) + throws IOException { + final SocketChannel socketChannel = SocketChannel.open(); + NetworkUtil.configure(socketChannel); + socketChannel.connect(location); + // perform handshake in a non-blocking mode + socketChannel.configureBlocking(false); + final ISocketChannelFactory socketChannelFactory = + appCtx.getServiceContext().getControllerService().getNetworkSecurityManager().getSocketChannelFactory(); + final ISocketChannel clientChannel = socketChannelFactory.createClientChannel(socketChannel); + if (clientChannel.requiresHandshake() && !clientChannel.handshake()) { + throw new IllegalStateException("handshake failure"); + } + // switch to blocking mode after handshake success + socketChannel.configureBlocking(true); + return clientChannel; + } + private static ByteBuffer ensureSize(ByteBuffer buffer, int size) { if (buffer == null || buffer.capacity() < size) { return ByteBuffer.allocate(size); http://git-wip-us.apache.org/repos/asf/asterixdb/blob/11c83fae/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/FileSynchronizer.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/FileSynchronizer.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/FileSynchronizer.java index cc0f7b4..d795d4e 100644 --- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/FileSynchronizer.java +++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/FileSynchronizer.java @@ -32,6 +32,7 @@ import org.apache.asterix.replication.messaging.ReplicateFileTask; import org.apache.asterix.replication.api.PartitionReplica; import org.apache.hyracks.api.io.FileReference; import org.apache.hyracks.api.io.IIOManager; +import org.apache.hyracks.api.network.ISocketChannel; public class FileSynchronizer { @@ -50,7 +51,7 @@ public class FileSynchronizer { public void replicate(String file, boolean metadata) { try { final IIOManager ioManager = appCtx.getIoManager(); - final SocketChannel channel = replica.getChannel(); + final ISocketChannel channel = replica.getChannel(); final FileReference filePath = ioManager.resolve(file); ReplicateFileTask task = new ReplicateFileTask(file, filePath.getFile().length(), metadata); ReplicationProtocol.sendTo(replica, task); http://git-wip-us.apache.org/repos/asf/asterixdb/blob/11c83fae/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/ReplicaFilesSynchronizer.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/ReplicaFilesSynchronizer.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/ReplicaFilesSynchronizer.java index 583f33d..3cd1a07 100644 --- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/ReplicaFilesSynchronizer.java +++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/ReplicaFilesSynchronizer.java @@ -34,6 +34,7 @@ import org.apache.asterix.replication.messaging.PartitionResourcesListResponse; import org.apache.asterix.replication.messaging.PartitionResourcesListTask; import org.apache.asterix.replication.messaging.ReplicationProtocol; import org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepository; +import org.apache.hyracks.api.network.ISocketChannel; /** * Ensures that the files between master and a replica are synchronized @@ -69,7 +70,7 @@ public class ReplicaFilesSynchronizer { private Set<String> getReplicaFiles(int partition) throws IOException { final PartitionResourcesListTask replicaFilesRequest = new PartitionResourcesListTask(partition); - final SocketChannel channel = replica.getChannel(); + final ISocketChannel channel = replica.getChannel(); final ByteBuffer reusableBuffer = replica.getReusableBuffer(); ReplicationProtocol.sendTo(replica, replicaFilesRequest); final PartitionResourcesListResponse response = http://git-wip-us.apache.org/repos/asf/asterixdb/blob/11c83fae/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/std/FlushDatasetOperatorDescriptor.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/std/FlushDatasetOperatorDescriptor.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/std/FlushDatasetOperatorDescriptor.java index 7e42d14..a404ba8 100644 --- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/std/FlushDatasetOperatorDescriptor.java +++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/std/FlushDatasetOperatorDescriptor.java @@ -90,6 +90,7 @@ public class FlushDatasetOperatorDescriptor extends AbstractSingleActivityOperat datasetLifeCycleManager.flushDataset(datasetId.getId(), false); } } + datasetInfo.waitForIO(); } catch (ACIDException e) { throw HyracksDataException.create(e); } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/11c83fae/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/network/ISocketChannel.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/network/ISocketChannel.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/network/ISocketChannel.java index 70ef1d2..5cfa442 100644 --- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/network/ISocketChannel.java +++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/network/ISocketChannel.java @@ -21,9 +21,11 @@ package org.apache.hyracks.api.network; import java.io.Closeable; import java.io.IOException; import java.nio.ByteBuffer; +import java.nio.channels.ReadableByteChannel; import java.nio.channels.SocketChannel; +import java.nio.channels.WritableByteChannel; -public interface ISocketChannel extends Closeable { +public interface ISocketChannel extends WritableByteChannel, ReadableByteChannel, Closeable { /** * Indicates whether this {@link ISocketChannel} requires a client/server handshake before @@ -91,4 +93,9 @@ public interface ISocketChannel extends Closeable { * @return the socket channel */ SocketChannel getSocketChannel(); + + @Override + default boolean isOpen() { + return getSocketChannel().isOpen(); + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/asterixdb/blob/11c83fae/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/sockets/SslSocketChannel.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/sockets/SslSocketChannel.java b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/sockets/SslSocketChannel.java index 73475b0..abb8b15 100644 --- a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/sockets/SslSocketChannel.java +++ b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/sockets/SslSocketChannel.java @@ -184,9 +184,11 @@ public class SslSocketChannel implements ISocketChannel { @Override public synchronized void close() throws IOException { - engine.closeOutbound(); - new SslHandshake(this).handshake(); - socketChannel.close(); + if (socketChannel.isOpen()) { + engine.closeOutbound(); + new SslHandshake(this).handshake(); + socketChannel.close(); + } } @Override
