This is an automated email from the ASF dual-hosted git repository.

mblow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/asterixdb.git

commit 2f56ed9d6d394fe990e83f96b9d8683fabfc1064
Author: Murtadha Hubail <[email protected]>
AuthorDate: Fri Jul 7 16:26:40 2023 -0700

    [ASTERIXDB-3219][REPL] Add timeout to log replication
    
    - user model changes: no
    - storage format changes: no
    - interface changes: no
    
    Details:
    
    - Do not wait indefinitely for logs to be replicated.
    
    Change-Id: I53b3a0d23514fce09082556e031f822dfe426a35
    Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/17632
    Integration-Tests: Jenkins <[email protected]>
    Tested-by: Jenkins <[email protected]>
    Reviewed-by: Ali Alsuliman <[email protected]>
---
 .../service/logging/LogManagerWithReplication.java    | 19 ++++++++++++++++++-
 1 file changed, 18 insertions(+), 1 deletion(-)

diff --git 
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManagerWithReplication.java
 
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManagerWithReplication.java
index 8a1cc656fa..1c614c902f 100644
--- 
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManagerWithReplication.java
+++ 
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManagerWithReplication.java
@@ -18,6 +18,8 @@
  */
 package org.apache.asterix.transaction.management.service.logging;
 
+import java.util.concurrent.TimeUnit;
+
 import org.apache.asterix.common.exceptions.ACIDException;
 import org.apache.asterix.common.replication.IReplicationManager;
 import org.apache.asterix.common.replication.IReplicationStrategy;
@@ -26,6 +28,7 @@ import 
org.apache.asterix.common.transactions.ITransactionSubsystem;
 import org.apache.asterix.common.transactions.LogSource;
 import org.apache.asterix.common.transactions.LogType;
 import org.apache.hyracks.api.util.InvokeUtil;
+import org.apache.logging.log4j.Logger;
 
 import it.unimi.dsi.fastutil.longs.LongOpenHashSet;
 import it.unimi.dsi.fastutil.longs.LongSet;
@@ -33,12 +36,16 @@ import it.unimi.dsi.fastutil.longs.LongSets;
 
 public class LogManagerWithReplication extends LogManager {
 
+    private static final Logger LOGGER = 
org.apache.logging.log4j.LogManager.getLogger();
     private IReplicationManager replicationManager;
     private IReplicationStrategy replicationStrategy;
     private final LongSet replicatedTxn = LongSets.synchronize(new 
LongOpenHashSet());
+    private final long replicationTimeoutMillis;
 
     public LogManagerWithReplication(ITransactionSubsystem txnSubsystem) {
         super(txnSubsystem);
+        replicationTimeoutMillis = TimeUnit.SECONDS
+                
.toMillis(txnSubsystem.getApplicationContext().getReplicationProperties().getReplicationTimeOut());
     }
 
     @SuppressWarnings("squid:S2445")
@@ -94,8 +101,18 @@ public class LogManagerWithReplication extends LogManager {
                     //wait for job Commit/Abort ACK from replicas
                     if (logRecord.isReplicate() && (logRecord.getLogType() == 
LogType.JOB_COMMIT
                             || logRecord.getLogType() == LogType.ABORT)) {
+                        long replicationTimeOut = replicationTimeoutMillis;
                         while (!logRecord.isReplicated()) {
-                            logRecord.wait();
+                            if (replicationTimeOut <= 0) {
+                                LOGGER.warn(
+                                        "{} ms passed without receiving acks 
for log {}; setting log as replicated due to timeout",
+                                        replicationTimeoutMillis, 
logRecord.getLogRecordForDisplay());
+                                logRecord.setReplicated(true);
+                                continue;
+                            }
+                            final long startTime = System.nanoTime();
+                            logRecord.wait(replicationTimeOut);
+                            replicationTimeOut -= 
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTime);
                         }
                     }
                 }

Reply via email to