This is an automated email from the ASF dual-hosted git repository. dlych pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/asterixdb.git
commit 4dfd3b0a8f497e4d5aa95dacaef7a24065e10451 Author: Murtadha Hubail <[email protected]> AuthorDate: Sat Apr 2 15:55:26 2022 +0300 [NO ISSUE][OTH] Logging Fixes - user model changes: no - storage format changes: no - interface changes: no Details: - Log exception when closing sockets quietly at trace level. - Fix replication logging levels. - Fix RemoteLogsNotifier thread name. Change-Id: I210900a410a18144c22fd5af928151b7e7c4bfbd Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/15983 Reviewed-by: Murtadha Hubail <[email protected]> Reviewed-by: Michael Blow <[email protected]> Integration-Tests: Jenkins <[email protected]> Tested-by: Murtadha Hubail <[email protected]> --- .../asterix/replication/logging/RemoteLogsNotifier.java | 2 +- .../replication/management/IndexReplicationManager.java | 3 +-- .../asterix/replication/management/ReplicationChannel.java | 12 ++++++------ .../apache/asterix/replication/messaging/DeleteFileTask.java | 7 ++++++- .../replication/messaging/PartitionResourcesListTask.java | 1 - .../apache/asterix/replication/sync/FileSynchronizer.java | 5 ++++- .../asterix/replication/sync/ReplicaFilesSynchronizer.java | 12 +++++------- .../apache/asterix/replication/sync/ReplicaSynchronizer.java | 2 +- .../org/apache/hyracks/ipc/sockets/SslSocketChannel.java | 7 ++++--- 9 files changed, 28 insertions(+), 23 deletions(-) diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/logging/RemoteLogsNotifier.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/logging/RemoteLogsNotifier.java index 58025fc8f3..80bb3c8f75 100644 --- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/logging/RemoteLogsNotifier.java +++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/logging/RemoteLogsNotifier.java @@ -57,7 +57,7 @@ class RemoteLogsNotifier implements Runnable { @Override public void run() { final String nodeId = appCtx.getServiceContext().getNodeId(); - Thread.currentThread().setName(nodeId + RemoteLogsNotifier.class.getSimpleName()); + Thread.currentThread().setName(RemoteLogsNotifier.class.getSimpleName() + ":" + nodeId); while (!Thread.currentThread().isInterrupted()) { try { final RemoteLogRecord logRecord = remoteLogsQ.take(); diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/IndexReplicationManager.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/IndexReplicationManager.java index 48eb8e3778..063709aa10 100644 --- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/IndexReplicationManager.java +++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/IndexReplicationManager.java @@ -38,7 +38,6 @@ import org.apache.asterix.transaction.management.resource.PersistentLocalResourc import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.api.replication.IReplicationJob; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexReplicationJob; -import org.apache.logging.log4j.Level; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -153,7 +152,7 @@ public class IndexReplicationManager { if (!replicationJobsQ.isEmpty()) { return; } - LOGGER.log(Level.INFO, "No pending replication jobs. Closing connections to replicas"); + LOGGER.trace("no pending replication jobs; closing connections to replicas"); for (ReplicationDestination dest : destinations) { dest.getReplicas().stream().map(PartitionReplica.class::cast).forEach(PartitionReplica::close); } diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationChannel.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationChannel.java index ba770cd80e..6c6a10ad25 100644 --- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationChannel.java +++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationChannel.java @@ -129,21 +129,21 @@ public class ReplicationChannel extends Thread implements IReplicationChannel { + getRemoteAddress() + ")"); try { if (socketChannel.requiresHandshake() && !socketChannel.handshake()) { - LOGGER.warn("failed to complete handshake"); + LOGGER.warn("failed to complete handshake with {}", this::getRemoteAddress); return; } socketChannel.getSocketChannel().configureBlocking(true); - LOGGER.debug("reading replication worker initial request"); + LOGGER.trace("reading replication worker initial request"); ReplicationRequestType requestType = ReplicationProtocol.getRequestType(socketChannel, inBuffer); - LOGGER.debug("got request type: {}", requestType); + LOGGER.trace("got request type: {}", requestType); while (requestType != ReplicationRequestType.GOODBYE) { handle(requestType); - LOGGER.debug("handled request type: {}", requestType); + LOGGER.trace("handled request type: {}", requestType); requestType = ReplicationProtocol.getRequestType(socketChannel, inBuffer); - LOGGER.debug("got request type: {}", requestType); + LOGGER.trace("got request type: {}", requestType); } } catch (Exception e) { - LOGGER.warn("Unexpected error during replication.", e); + LOGGER.warn("unexpected error during replication.", e); } finally { NetworkUtil.closeQuietly(socketChannel); } diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/DeleteFileTask.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/DeleteFileTask.java index 1e9322883f..92e4989b40 100644 --- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/DeleteFileTask.java +++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/DeleteFileTask.java @@ -60,7 +60,7 @@ public class DeleteFileTask implements IReplicaTask { ((PersistentLocalResourceRepository) appCtx.getLocalResourceRepository()) .invalidateResource(replicaRes.getRelativePath().toString()); } - LOGGER.info(() -> "Deleted file: " + localFile.getAbsolutePath()); + LOGGER.debug(() -> "Deleted file: " + localFile.getAbsolutePath()); } else { LOGGER.warn(() -> "Requested to delete a non-existing file: " + localFile.getAbsolutePath()); } @@ -85,6 +85,11 @@ public class DeleteFileTask implements IReplicaTask { } } + @Override + public String toString() { + return "DeleteFileTask{" + "file='" + file + '\'' + '}'; + } + public static DeleteFileTask create(DataInput input) throws IOException { return new DeleteFileTask(input.readUTF()); } diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/PartitionResourcesListTask.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/PartitionResourcesListTask.java index 82ec60156d..0f5949e61b 100644 --- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/PartitionResourcesListTask.java +++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/PartitionResourcesListTask.java @@ -67,7 +67,6 @@ public class PartitionResourcesListTask implements IReplicaTask { LOGGER.debug("got partition {} files ({})", partition, partitionFiles.size()); final PartitionResourcesListResponse response = new PartitionResourcesListResponse(partition, partitionReplicatedResources, partitionFiles, appCtx.getReplicaManager().isPartitionOrigin(partition)); - LOGGER.debug("partition {} files list to requester", partition); ReplicationProtocol.sendTo(worker.getChannel(), response, worker.getReusableBuffer()); LOGGER.debug("sent partition {} files list to requester", partition); } diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/FileSynchronizer.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/FileSynchronizer.java index 73fca9c836..0e27a51cb5 100644 --- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/FileSynchronizer.java +++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/FileSynchronizer.java @@ -58,7 +58,7 @@ public class FileSynchronizer { String masterNode = appCtx.getReplicaManager().isPartitionOrigin(replica.getIdentifier().getPartition()) ? appCtx.getServiceContext().getNodeId() : null; ReplicateFileTask task = new ReplicateFileTask(file, filePath.getFile().length(), metadata, masterNode); - LOGGER.debug("attempting to replicate {} to replica {}", task, replica); + LOGGER.trace("attempting {} to replica {}", task, replica); ReplicationProtocol.sendTo(replica, task); // send the file itself try (RandomAccessFile fromFile = new RandomAccessFile(filePath.getFile(), "r"); @@ -66,6 +66,7 @@ public class FileSynchronizer { NetworkingUtil.sendFile(fileChannel, channel); } ReplicationProtocol.waitForAck(replica); + LOGGER.debug("completed {} to replica {}", task, replica); } catch (IOException e) { throw new ReplicationException(e); } @@ -74,8 +75,10 @@ public class FileSynchronizer { public void delete(String file) { try { final DeleteFileTask task = new DeleteFileTask(file); + LOGGER.trace("attempting {} from replica {}", task, replica); ReplicationProtocol.sendTo(replica, task); ReplicationProtocol.waitForAck(replica); + LOGGER.debug("completed {} from replica {}", task, replica); } catch (IOException e) { throw new ReplicationException(e); } diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/ReplicaFilesSynchronizer.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/ReplicaFilesSynchronizer.java index 809b7a6eb5..5d217a4cb2 100644 --- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/ReplicaFilesSynchronizer.java +++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/ReplicaFilesSynchronizer.java @@ -70,9 +70,9 @@ public class ReplicaFilesSynchronizer { if (!deltaRecovery) { deletePartitionFromReplica(partition); } - LOGGER.debug("getting replica files"); + LOGGER.trace("getting replica files"); PartitionResourcesListResponse replicaResourceResponse = getReplicaFiles(partition); - LOGGER.debug("got replica files"); + LOGGER.trace("got replica files"); Map<ResourceReference, Long> resourceReferenceLongMap = getValidReplicaResources( replicaResourceResponse.getPartitionReplicatedResources(), replicaResourceResponse.isOrigin()); // clean up files for invalid resources (deleted or recreated while the replica was down) @@ -81,11 +81,10 @@ public class ReplicaFilesSynchronizer { final PersistentLocalResourceRepository localResourceRepository = (PersistentLocalResourceRepository) appCtx.getLocalResourceRepository(); final IReplicationStrategy replicationStrategy = appCtx.getReplicationManager().getReplicationStrategy(); - LOGGER.debug("clean up replica invalid files"); final Set<String> masterFiles = localResourceRepository.getPartitionReplicatedFiles(partition, replicationStrategy).stream() .map(StoragePathUtil::getFileRelativePath).collect(Collectors.toSet()); - LOGGER.debug("got master partition files"); + LOGGER.trace("got master partition files"); // exclude from the replica files the list of invalid deleted files final Set<String> replicaFiles = new HashSet<>(replicaResourceResponse.getFiles()); replicaFiles.removeAll(deletedReplicaFiles); @@ -131,13 +130,12 @@ public class ReplicaFilesSynchronizer { } private void deleteInvalidFiles(List<String> files) { - LOGGER.debug("deleting replica invalid files"); final FileSynchronizer sync = new FileSynchronizer(appCtx, replica); // sort files to ensure index metadata files starting with "." are deleted last files.sort(String::compareTo); Collections.reverse(files); - LOGGER.info("deleting {}", files); files.forEach(sync::delete); + LOGGER.debug("completed invalid files deletion"); } private long getResourceMasterValidSeq(ResourceReference rr) throws HyracksDataException { @@ -174,7 +172,7 @@ public class ReplicaFilesSynchronizer { } } if (!invalidFiles.isEmpty()) { - LOGGER.info("will delete the following files from replica {}", invalidFiles); + LOGGER.debug("will delete the following files from replica {}", invalidFiles); deleteInvalidFiles(new ArrayList<>(invalidFiles)); } return invalidFiles; diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/ReplicaSynchronizer.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/ReplicaSynchronizer.java index 7130e0793e..459ff01245 100644 --- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/ReplicaSynchronizer.java +++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/ReplicaSynchronizer.java @@ -50,7 +50,7 @@ public class ReplicaSynchronizer { LOGGER.debug("starting replica sync process for replica {}", replica); Object partitionLock = appCtx.getReplicaManager().getPartitionSyncLock(replica.getIdentifier().getPartition()); synchronized (partitionLock) { - LOGGER.debug("acquired partition replica lock"); + LOGGER.trace("acquired partition replica lock"); final ICheckpointManager checkpointManager = appCtx.getTransactionSubsystem().getCheckpointManager(); try { // suspend checkpointing datasets to prevent async IO operations while sync'ing replicas diff --git a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/sockets/SslSocketChannel.java b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/sockets/SslSocketChannel.java index fc379fbbdf..9e052f8524 100644 --- a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/sockets/SslSocketChannel.java +++ b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/sockets/SslSocketChannel.java @@ -102,7 +102,7 @@ public class SslSocketChannel implements ISocketChannel { inAppData.limit(0); } } else if (bytesRead < 0) { - LOGGER.debug("received EOF; transferredBytes Bytes: {}", transferredBytes); + LOGGER.trace("received EOF; transferred bytes: {}", transferredBytes); handleEndOfStreamQuietly(); return -1; } @@ -195,7 +195,7 @@ public class SslSocketChannel implements ISocketChannel { new SslHandshake(this).handshake(); } catch (Exception e) { // ignore exceptions on best effort graceful close handshake - LOGGER.debug("ssl socket close handshake failed", e); + LOGGER.trace("ssl socket close handshake failed", e); } finally { socketChannel.close(); } @@ -243,7 +243,8 @@ public class SslSocketChannel implements ISocketChannel { close(); } } catch (Exception e) { - LOGGER.warn("failed to close socket gracefully", e); + // ignore close exception since we are closing quietly + LOGGER.trace("failed to close socket gracefully", e); } }
