Repository: asterixdb Updated Branches: refs/heads/master 11db8e30e -> c692e6a37
Fix indefinite wait time for replication Job ACK Change-Id: I88d2d61270522c766441e16fd996ac975935594b Reviewed-on: https://asterix-gerrit.ics.uci.edu/1372 Sonar-Qube: Jenkins <[email protected]> Tested-by: Jenkins <[email protected]> Integration-Tests: Jenkins <[email protected]> Reviewed-by: abdullah alamoudi <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/asterixdb/repo Commit: http://git-wip-us.apache.org/repos/asf/asterixdb/commit/c692e6a3 Tree: http://git-wip-us.apache.org/repos/asf/asterixdb/tree/c692e6a3 Diff: http://git-wip-us.apache.org/repos/asf/asterixdb/diff/c692e6a3 Branch: refs/heads/master Commit: c692e6a372160e72b9d8e868e02e8af5c2b2e4c7 Parents: 11db8e3 Author: Murtadha Hubail <[email protected]> Authored: Sat Dec 3 14:57:07 2016 +0300 Committer: abdullah alamoudi <[email protected]> Committed: Sat Dec 3 13:59:30 2016 -0800 ---------------------------------------------------------------------- .../replication/management/ReplicationManager.java | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/asterixdb/blob/c692e6a3/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationManager.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationManager.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationManager.java index 87be768..cd0179d 100644 --- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationManager.java +++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationManager.java @@ -97,6 +97,7 @@ public class ReplicationManager implements IReplicationManager { private static final Logger LOGGER = Logger.getLogger(ReplicationManager.class.getName()); private static final int INITIAL_REPLICATION_FACTOR = 1; + private static final int MAX_JOB_COMMIT_ACK_WAIT = 10000; private final String nodeId; private ExecutorService replicationListenerThreads; private final Map<Integer, Set<String>> jobCommitAcks; @@ -575,8 +576,16 @@ public class ReplicationManager implements IReplicationManager { if (logsRepSockets != null) { synchronized (jobCommitAcks) { try { - while (jobCommitAcks.size() != 0) { - jobCommitAcks.wait(); + long waitStartTime = System.currentTimeMillis(); + while (!jobCommitAcks.isEmpty()) { + jobCommitAcks.wait(1000); + long waitDuration = System.currentTimeMillis() - waitStartTime; + if (waitDuration > MAX_JOB_COMMIT_ACK_WAIT) { + LOGGER.log(Level.SEVERE, + "Timeout before receving all job ACKs from replicas. Pending jobs (" + + jobCommitAcks.keySet().toString() + ")"); + break; + } } } catch (InterruptedException e) { if (LOGGER.isLoggable(Level.SEVERE)) {
