This is an automated email from the ASF dual-hosted git repository. dlych pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/asterixdb.git
commit 16acb99937eb2b6a0d639615109aac79e0ef8301 Author: Murtadha Hubail <[email protected]> AuthorDate: Fri Mar 25 00:54:35 2022 +0300 [NO ISSUE][OTH] Txn Logs Replication Trace Logs - user model changes: no - storage format changes: no - interface changes: yes Details: - Add debug logs for txn logs replication. Change-Id: Id4a98e30763f9a86952e1dc1c226af89dddc2b0a Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/15865 Reviewed-by: Ali Alsuliman <[email protected]> Integration-Tests: Jenkins <[email protected]> Tested-by: Jenkins <[email protected]> --- .../java/org/apache/asterix/app/nc/IndexCheckpointManager.java | 1 + .../org/apache/asterix/replication/api/IReplicationWorker.java | 5 +++++ .../apache/asterix/replication/logging/RemoteLogsProcessor.java | 5 ++++- .../apache/asterix/replication/logging/ReplicationLogBuffer.java | 5 +++++ .../asterix/replication/management/LogReplicationManager.java | 3 +++ .../apache/asterix/replication/management/NetworkingUtil.java | 1 + .../asterix/replication/management/ReplicationChannel.java | 9 +++++++++ .../apache/asterix/replication/messaging/ReplicateLogsTask.java | 4 ++++ 8 files changed, 32 insertions(+), 1 deletion(-) diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/IndexCheckpointManager.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/IndexCheckpointManager.java index 96e89a7b3c..42159bedae 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/IndexCheckpointManager.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/IndexCheckpointManager.java @@ -102,6 +102,7 @@ public class IndexCheckpointManager implements IIndexCheckpointManager { public synchronized void masterFlush(long masterLsn, long localLsn) throws HyracksDataException { final IndexCheckpoint latest = getLatest(); latest.getMasterNodeFlushMap().put(masterLsn, localLsn); + LOGGER.debug("index {} master flush {} -> {}", indexPath, masterLsn, localLsn); final IndexCheckpoint next = IndexCheckpoint.next(latest, latest.getLowWatermark(), latest.getValidComponentSequence(), latest.getLastComponentId(), null); persist(next); diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/api/IReplicationWorker.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/api/IReplicationWorker.java index c7b2561ebf..d6cccc00bb 100644 --- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/api/IReplicationWorker.java +++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/api/IReplicationWorker.java @@ -35,4 +35,9 @@ public interface IReplicationWorker extends Runnable { * @return the reusable buffer */ ByteBuffer getReusableBuffer(); + + /** + * @return The remote address of the sender + */ + String getRemoteAddress(); } diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/logging/RemoteLogsProcessor.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/logging/RemoteLogsProcessor.java index b094d9eaf6..63e194ed0d 100644 --- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/logging/RemoteLogsProcessor.java +++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/logging/RemoteLogsProcessor.java @@ -47,7 +47,8 @@ public class RemoteLogsProcessor implements ILogRequester { public void process(ByteBuffer logsBatch, RemoteLogRecord reusableLog, IReplicationWorker worker) { while (logsBatch.hasRemaining()) { // get rid of log size - logsBatch.getInt(); + int batchSize = logsBatch.getInt(); + LOGGER.debug("received logs batch size {} from {}", batchSize, worker.getRemoteAddress()); reusableLog.readRemoteLog(logsBatch); reusableLog.setLogSource(LogSource.REMOTE); switch (reusableLog.getLogType()) { @@ -74,6 +75,8 @@ public class RemoteLogsProcessor implements ILogRequester { flushLog.setRequester(this); flushLog.setLogSource(LogSource.REMOTE); flushLog.setMasterLsn(reusableLog.getLSN()); + LOGGER.debug("received master LSN {} for partition {}", reusableLog.getLSN(), + reusableLog.getResourcePartition()); logManager.log(flushLog); break; default: diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/logging/ReplicationLogBuffer.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/logging/ReplicationLogBuffer.java index 2ed2ac95f1..3c13825309 100644 --- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/logging/ReplicationLogBuffer.java +++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/logging/ReplicationLogBuffer.java @@ -23,9 +23,12 @@ import java.util.concurrent.atomic.AtomicBoolean; import org.apache.asterix.common.transactions.ILogRecord; import org.apache.asterix.replication.management.LogReplicationManager; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; public class ReplicationLogBuffer { + private static final Logger LOGGER = LogManager.getLogger(); private final int logBufferSize; private final AtomicBoolean full; private int appendOffset; @@ -117,6 +120,7 @@ public class ReplicationLogBuffer { private void transferBuffer(ByteBuffer buffer) { if (buffer.remaining() <= batchSize) { //the current batch can be sent as it is + LOGGER.debug("sending txn logs batch size {}", buffer.remaining()); replicationManager.transferBatch(buffer); return; } @@ -142,6 +146,7 @@ public class ReplicationLogBuffer { //return to the beginning of the batch position buffer.reset(); } + LOGGER.debug("sending logs slice size {}", buffer.remaining()); replicationManager.transferBatch(buffer); //return the original limit to check the new remaining size buffer.limit(totalTransferLimit); diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/LogReplicationManager.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/LogReplicationManager.java index f1d8d4d5c0..b76fa25a69 100644 --- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/LogReplicationManager.java +++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/LogReplicationManager.java @@ -131,6 +131,9 @@ public class LogReplicationManager { ackTracker.track(logRecord, new HashSet<>(destinations.keySet())); } } + if (logRecord.getLogType() == LogType.FLUSH) { + LOGGER.debug("appending flush lsn {} to replication queue", logRecord.getLSN()); + } appendToLogBuffer(logRecord); } diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/NetworkingUtil.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/NetworkingUtil.java index 7f6439ce14..736b54e33b 100644 --- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/NetworkingUtil.java +++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/NetworkingUtil.java @@ -114,6 +114,7 @@ public class NetworkingUtil { while (requestBuffer.hasRemaining()) { socketChannel.write(requestBuffer); } + socketChannel.getSocketChannel().socket().getOutputStream().flush(); } //unused diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationChannel.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationChannel.java index 3dc094e65f..19de02af3b 100644 --- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationChannel.java +++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationChannel.java @@ -151,6 +151,15 @@ public class ReplicationChannel extends Thread implements IReplicationChannel { return outBuffer; } + @Override + public String getRemoteAddress() { + try { + return socketChannel.getSocketChannel().getRemoteAddress().toString(); + } catch (Exception e) { + return "unknown"; + } + } + private void handle(ReplicationRequestType requestType) throws HyracksDataException { final IReplicaTask task = (IReplicaTask) ReplicationProtocol.readMessage(requestType, socketChannel, inBuffer); diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/ReplicateLogsTask.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/ReplicateLogsTask.java index d9357df540..67c8ebad9f 100644 --- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/ReplicateLogsTask.java +++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/ReplicateLogsTask.java @@ -34,12 +34,15 @@ import org.apache.asterix.replication.logging.RemoteLogsProcessor; import org.apache.asterix.replication.management.ReplicationChannel; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.api.network.ISocketChannel; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; /** * A task to replicate transaction logs from master replica */ public class ReplicateLogsTask implements IReplicaTask { + private static final Logger LOGGER = LogManager.getLogger(); public static final int END_REPLICATION_LOG_SIZE = 1; private final String nodeId; @@ -61,6 +64,7 @@ public class ReplicateLogsTask implements IReplicaTask { logsBuffer = ReplicationProtocol.readRequest(channel, logsBuffer); // check if it is end of handshake if (logsBuffer.remaining() == END_REPLICATION_LOG_SIZE) { + LOGGER.info("ending log replication with {}", worker.getRemoteAddress()); break; } logsProcessor.process(logsBuffer, reusableLog, worker);
