This is an automated email from the ASF dual-hosted git repository. mblow pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/asterixdb.git
commit 2f56ed9d6d394fe990e83f96b9d8683fabfc1064 Author: Murtadha Hubail <[email protected]> AuthorDate: Fri Jul 7 16:26:40 2023 -0700 [ASTERIXDB-3219][REPL] Add timeout to log replication - user model changes: no - storage format changes: no - interface changes: no Details: - Do not wait indefinitely for logs to be replicated. Change-Id: I53b3a0d23514fce09082556e031f822dfe426a35 Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/17632 Integration-Tests: Jenkins <[email protected]> Tested-by: Jenkins <[email protected]> Reviewed-by: Ali Alsuliman <[email protected]> --- .../service/logging/LogManagerWithReplication.java | 19 ++++++++++++++++++- 1 file changed, 18 insertions(+), 1 deletion(-) diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManagerWithReplication.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManagerWithReplication.java index 8a1cc656fa..1c614c902f 100644 --- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManagerWithReplication.java +++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManagerWithReplication.java @@ -18,6 +18,8 @@ */ package org.apache.asterix.transaction.management.service.logging; +import java.util.concurrent.TimeUnit; + import org.apache.asterix.common.exceptions.ACIDException; import org.apache.asterix.common.replication.IReplicationManager; import org.apache.asterix.common.replication.IReplicationStrategy; @@ -26,6 +28,7 @@ import org.apache.asterix.common.transactions.ITransactionSubsystem; import org.apache.asterix.common.transactions.LogSource; import org.apache.asterix.common.transactions.LogType; import org.apache.hyracks.api.util.InvokeUtil; +import org.apache.logging.log4j.Logger; import it.unimi.dsi.fastutil.longs.LongOpenHashSet; import it.unimi.dsi.fastutil.longs.LongSet; @@ -33,12 +36,16 @@ import it.unimi.dsi.fastutil.longs.LongSets; public class LogManagerWithReplication extends LogManager { + private static final Logger LOGGER = org.apache.logging.log4j.LogManager.getLogger(); private IReplicationManager replicationManager; private IReplicationStrategy replicationStrategy; private final LongSet replicatedTxn = LongSets.synchronize(new LongOpenHashSet()); + private final long replicationTimeoutMillis; public LogManagerWithReplication(ITransactionSubsystem txnSubsystem) { super(txnSubsystem); + replicationTimeoutMillis = TimeUnit.SECONDS + .toMillis(txnSubsystem.getApplicationContext().getReplicationProperties().getReplicationTimeOut()); } @SuppressWarnings("squid:S2445") @@ -94,8 +101,18 @@ public class LogManagerWithReplication extends LogManager { //wait for job Commit/Abort ACK from replicas if (logRecord.isReplicate() && (logRecord.getLogType() == LogType.JOB_COMMIT || logRecord.getLogType() == LogType.ABORT)) { + long replicationTimeOut = replicationTimeoutMillis; while (!logRecord.isReplicated()) { - logRecord.wait(); + if (replicationTimeOut <= 0) { + LOGGER.warn( + "{} ms passed without receiving acks for log {}; setting log as replicated due to timeout", + replicationTimeoutMillis, logRecord.getLogRecordForDisplay()); + logRecord.setReplicated(true); + continue; + } + final long startTime = System.nanoTime(); + logRecord.wait(replicationTimeOut); + replicationTimeOut -= TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTime); } } }
