This is an automated email from the ASF dual-hosted git repository. dlych pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/asterixdb.git
commit 2636846ce9bc738b874f5b69fb3a2cf767e2932d Author: Murtadha Hubail <[email protected]> AuthorDate: Fri Apr 1 18:07:02 2022 +0300 [NO ISSUE][NET] SSL Socket Fixes - user model changes: no - storage format changes: no - interface changes: no Details: - On SSL socket handshake failure, deliver any remaining data to requester. - Add replica synchronize debug logs. Change-Id: Ie1f6a4df1ab0cc7c6feb352607a45194f96b3c8b Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/15963 Integration-Tests: Jenkins <[email protected]> Tested-by: Jenkins <[email protected]> Reviewed-by: Ali Alsuliman <[email protected]> --- .../apache/asterix/replication/api/PartitionReplica.java | 1 + .../asterix/replication/management/NetworkingUtil.java | 2 +- .../asterix/replication/management/ReplicationChannel.java | 10 +++++++++- .../replication/messaging/PartitionResourcesListTask.java | 14 ++++++++++++++ .../asterix/replication/sync/ReplicaFilesSynchronizer.java | 5 +++++ .../asterix/replication/sync/ReplicaSynchronizer.java | 12 ++++++++++++ .../org/apache/hyracks/ipc/sockets/SslSocketChannel.java | 12 ++++++++---- .../src/main/java/org/apache/hyracks/util/NetworkUtil.java | 3 ++- 8 files changed, 52 insertions(+), 7 deletions(-) diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/api/PartitionReplica.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/api/PartitionReplica.java index e1f99f4638..27da909ffc 100644 --- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/api/PartitionReplica.java +++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/api/PartitionReplica.java @@ -93,6 +93,7 @@ public class PartitionReplica implements IPartitionReplica { ExecutorService threadExecutor = (ExecutorService) appCtx.getThreadExecutor(); syncFuture = threadExecutor.submit(() -> { try { + Thread.currentThread().setName("Replica " + id.toString() + " Synchronizer"); new ReplicaSynchronizer(appCtx, this).sync(register, deltaRecovery); setStatus(IN_SYNC); } catch (Exception e) { diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/NetworkingUtil.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/NetworkingUtil.java index 736b54e33b..8162e0af3f 100644 --- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/NetworkingUtil.java +++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/NetworkingUtil.java @@ -52,7 +52,7 @@ public class NetworkingUtil { while (byteBuffer.remaining() > 0 && socketChannel.read(byteBuffer) > 0); if (byteBuffer.remaining() > 0) { - throw new EOFException(); + throw new EOFException("could not read all data from source; remaining bytes: " + byteBuffer.remaining()); } byteBuffer.flip(); diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationChannel.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationChannel.java index 19de02af3b..ba770cd80e 100644 --- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationChannel.java +++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationChannel.java @@ -25,6 +25,7 @@ import java.nio.ByteBuffer; import java.nio.channels.AsynchronousCloseException; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.asterix.common.api.INcApplicationContext; import org.apache.asterix.common.config.ReplicationProperties; @@ -52,6 +53,7 @@ public class ReplicationChannel extends Thread implements IReplicationChannel { private ServerSocketChannel serverSocketChannel = null; private final INcApplicationContext appCtx; private final RemoteLogsProcessor logsProcessor; + private final AtomicInteger replicationWorkerCounter = new AtomicInteger(0); public ReplicationChannel(INcApplicationContext appCtx) { this.appCtx = appCtx; @@ -123,16 +125,22 @@ public class ReplicationChannel extends Thread implements IReplicationChannel { @Override public void run() { - Thread.currentThread().setName("Replication Worker"); + Thread.currentThread().setName("Replication Worker-" + replicationWorkerCounter.incrementAndGet() + "(" + + getRemoteAddress() + ")"); try { if (socketChannel.requiresHandshake() && !socketChannel.handshake()) { + LOGGER.warn("failed to complete handshake"); return; } socketChannel.getSocketChannel().configureBlocking(true); + LOGGER.debug("reading replication worker initial request"); ReplicationRequestType requestType = ReplicationProtocol.getRequestType(socketChannel, inBuffer); + LOGGER.debug("got request type: {}", requestType); while (requestType != ReplicationRequestType.GOODBYE) { handle(requestType); + LOGGER.debug("handled request type: {}", requestType); requestType = ReplicationProtocol.getRequestType(socketChannel, inBuffer); + LOGGER.debug("got request type: {}", requestType); } } catch (Exception e) { LOGGER.warn("Unexpected error during replication.", e); diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/PartitionResourcesListTask.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/PartitionResourcesListTask.java index d9b3b0cff0..82ec60156d 100644 --- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/PartitionResourcesListTask.java +++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/PartitionResourcesListTask.java @@ -33,12 +33,15 @@ import org.apache.asterix.replication.api.IReplicaTask; import org.apache.asterix.replication.api.IReplicationWorker; import org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepository; import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; /** * A task to get the list of the files in a partition on a replica */ public class PartitionResourcesListTask implements IReplicaTask { + private static final Logger LOGGER = LogManager.getLogger(); private final int partition; public PartitionResourcesListTask(int partition) { @@ -47,20 +50,26 @@ public class PartitionResourcesListTask implements IReplicaTask { @Override public void perform(INcApplicationContext appCtx, IReplicationWorker worker) throws HyracksDataException { + LOGGER.debug("processing {}", this); final PersistentLocalResourceRepository localResourceRepository = (PersistentLocalResourceRepository) appCtx.getLocalResourceRepository(); localResourceRepository.cleanup(partition); + LOGGER.debug("cleaned up partition {}", partition); final IReplicationStrategy replicationStrategy = appCtx.getReplicationManager().getReplicationStrategy(); // .metadata file -> resource id Map<String, Long> partitionReplicatedResources = localResourceRepository.getPartitionReplicatedResources(partition, replicationStrategy); + LOGGER.debug("got partition {} resources", partition); // all data files in partitions + .metadata files final List<String> partitionFiles = localResourceRepository.getPartitionReplicatedFiles(partition, replicationStrategy).stream() .map(StoragePathUtil::getFileRelativePath).collect(Collectors.toList()); + LOGGER.debug("got partition {} files ({})", partition, partitionFiles.size()); final PartitionResourcesListResponse response = new PartitionResourcesListResponse(partition, partitionReplicatedResources, partitionFiles, appCtx.getReplicaManager().isPartitionOrigin(partition)); + LOGGER.debug("partition {} files list to requester", partition); ReplicationProtocol.sendTo(worker.getChannel(), response, worker.getReusableBuffer()); + LOGGER.debug("sent partition {} files list to requester", partition); } @Override @@ -78,6 +87,11 @@ public class PartitionResourcesListTask implements IReplicaTask { } } + @Override + public String toString() { + return "PartitionResourcesListTask{" + "partition=" + partition + '}'; + } + public static PartitionResourcesListTask create(DataInput input) throws HyracksDataException { try { int partition = input.readInt(); diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/ReplicaFilesSynchronizer.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/ReplicaFilesSynchronizer.java index 44c940418c..809b7a6eb5 100644 --- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/ReplicaFilesSynchronizer.java +++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/ReplicaFilesSynchronizer.java @@ -70,7 +70,9 @@ public class ReplicaFilesSynchronizer { if (!deltaRecovery) { deletePartitionFromReplica(partition); } + LOGGER.debug("getting replica files"); PartitionResourcesListResponse replicaResourceResponse = getReplicaFiles(partition); + LOGGER.debug("got replica files"); Map<ResourceReference, Long> resourceReferenceLongMap = getValidReplicaResources( replicaResourceResponse.getPartitionReplicatedResources(), replicaResourceResponse.isOrigin()); // clean up files for invalid resources (deleted or recreated while the replica was down) @@ -79,9 +81,11 @@ public class ReplicaFilesSynchronizer { final PersistentLocalResourceRepository localResourceRepository = (PersistentLocalResourceRepository) appCtx.getLocalResourceRepository(); final IReplicationStrategy replicationStrategy = appCtx.getReplicationManager().getReplicationStrategy(); + LOGGER.debug("clean up replica invalid files"); final Set<String> masterFiles = localResourceRepository.getPartitionReplicatedFiles(partition, replicationStrategy).stream() .map(StoragePathUtil::getFileRelativePath).collect(Collectors.toSet()); + LOGGER.debug("got master partition files"); // exclude from the replica files the list of invalid deleted files final Set<String> replicaFiles = new HashSet<>(replicaResourceResponse.getFiles()); replicaFiles.removeAll(deletedReplicaFiles); @@ -127,6 +131,7 @@ public class ReplicaFilesSynchronizer { } private void deleteInvalidFiles(List<String> files) { + LOGGER.debug("deleting replica invalid files"); final FileSynchronizer sync = new FileSynchronizer(appCtx, replica); // sort files to ensure index metadata files starting with "." are deleted last files.sort(String::compareTo); diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/ReplicaSynchronizer.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/ReplicaSynchronizer.java index 0d0ef19d58..7130e0793e 100644 --- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/ReplicaSynchronizer.java +++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/ReplicaSynchronizer.java @@ -28,6 +28,8 @@ import org.apache.asterix.replication.messaging.CheckpointPartitionIndexesTask; import org.apache.asterix.replication.messaging.ReplicationProtocol; import org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepository; import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; /** * Performs the steps required to ensure any newly added replica @@ -35,6 +37,7 @@ import org.apache.hyracks.api.exceptions.HyracksDataException; */ public class ReplicaSynchronizer { + private static final Logger LOGGER = LogManager.getLogger(); private final INcApplicationContext appCtx; private final PartitionReplica replica; @@ -44,16 +47,23 @@ public class ReplicaSynchronizer { } public void sync(boolean register, boolean deltaRecovery) throws IOException { + LOGGER.debug("starting replica sync process for replica {}", replica); Object partitionLock = appCtx.getReplicaManager().getPartitionSyncLock(replica.getIdentifier().getPartition()); synchronized (partitionLock) { + LOGGER.debug("acquired partition replica lock"); final ICheckpointManager checkpointManager = appCtx.getTransactionSubsystem().getCheckpointManager(); try { // suspend checkpointing datasets to prevent async IO operations while sync'ing replicas checkpointManager.suspend(); + LOGGER.debug("starting replica files sync"); syncFiles(deltaRecovery); + LOGGER.debug("completed replica files sync"); checkpointReplicaIndexes(); + LOGGER.debug("replica indexes checkpoint completed"); if (register) { + LOGGER.debug("registering replica"); appCtx.getReplicationManager().register(replica); + LOGGER.debug("replica registered"); } } finally { checkpointManager.resume(); @@ -68,6 +78,7 @@ public class ReplicaSynchronizer { appCtx.getDatasetLifecycleManager().flushDataset(replStrategy, p -> p == replica.getIdentifier().getPartition()); waitForReplicatedDatasetsIO(); + LOGGER.debug("flushed partition datasets"); fileSync.sync(); } @@ -77,6 +88,7 @@ public class ReplicaSynchronizer { appCtx.getReplicaManager().isPartitionOrigin(partition) ? appCtx.getServiceContext().getNodeId() : null; CheckpointPartitionIndexesTask task = new CheckpointPartitionIndexesTask(partition, getPartitionMaxComponentId(partition), masterNode); + LOGGER.debug("asking replica to checkpoint indexes"); ReplicationProtocol.sendTo(replica, task); ReplicationProtocol.waitForAck(replica); } diff --git a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/sockets/SslSocketChannel.java b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/sockets/SslSocketChannel.java index ce8496faca..fc379fbbdf 100644 --- a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/sockets/SslSocketChannel.java +++ b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/sockets/SslSocketChannel.java @@ -82,9 +82,9 @@ public class SslSocketChannel implements ISocketChannel { @Override public synchronized int read(ByteBuffer buffer) throws IOException { - int transfereeBytes = 0; + int transferredBytes = 0; if (cachedData) { - transfereeBytes += transferTo(inAppData, buffer); + transferredBytes += transferTo(inAppData, buffer); } if (buffer.hasRemaining()) { if (!partialRecord) { @@ -97,17 +97,18 @@ public class SslSocketChannel implements ISocketChannel { inAppData.clear(); if (decrypt() > 0) { inAppData.flip(); - transfereeBytes += transferTo(inAppData, buffer); + transferredBytes += transferTo(inAppData, buffer); } else { inAppData.limit(0); } } else if (bytesRead < 0) { + LOGGER.debug("received EOF; transferredBytes Bytes: {}", transferredBytes); handleEndOfStreamQuietly(); return -1; } } cachedData = inAppData.hasRemaining(); - return transfereeBytes; + return transferredBytes; } private int decrypt() throws IOException { @@ -192,6 +193,9 @@ public class SslSocketChannel implements ISocketChannel { engine.closeOutbound(); try { new SslHandshake(this).handshake(); + } catch (Exception e) { + // ignore exceptions on best effort graceful close handshake + LOGGER.debug("ssl socket close handshake failed", e); } finally { socketChannel.close(); } diff --git a/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/NetworkUtil.java b/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/NetworkUtil.java index 4f0c3a86aa..b2cd4359a4 100644 --- a/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/NetworkUtil.java +++ b/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/NetworkUtil.java @@ -57,7 +57,8 @@ public class NetworkUtil { try { closeable.close(); } catch (IOException e) { - LOGGER.warn("Failed to close", e); + // ignore since we are closing quietly + LOGGER.trace("failed to close", e); } } }
