This is an automated email from the ASF dual-hosted git repository.

mpapirkovskyy pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/ambari.git


The following commit(s) were added to refs/heads/trunk by this push:
     new d623dd5  AMBARI-24076. Guarantee execution command retrying task 
completion. (mpapirkovskyy) (#1516)
d623dd5 is described below

commit d623dd530d9ba3ebc05211ac982093c0c7f5bcdf
Author: Myroslav Papirkovskyi <[email protected]>
AuthorDate: Wed Jun 13 13:53:29 2018 +0300

    AMBARI-24076. Guarantee execution command retrying task completion. 
(mpapirkovskyy) (#1516)
---
 .../ambari/server/events/MessageEmitter.java       | 39 ++++++++++++++--------
 1 file changed, 26 insertions(+), 13 deletions(-)

diff --git 
a/ambari-server/src/main/java/org/apache/ambari/server/events/MessageEmitter.java
 
b/ambari-server/src/main/java/org/apache/ambari/server/events/MessageEmitter.java
index 1483b3f..47b55ed 100644
--- 
a/ambari-server/src/main/java/org/apache/ambari/server/events/MessageEmitter.java
+++ 
b/ambari-server/src/main/java/org/apache/ambari/server/events/MessageEmitter.java
@@ -28,6 +28,7 @@ import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.ambari.server.AmbariException;
@@ -61,6 +62,7 @@ public abstract class MessageEmitter {
       new 
ThreadFactoryBuilder().setNameFormat("ambari-message-monitor-%d").build());
 
   protected static final AtomicLong MESSAGE_ID = new AtomicLong(0);
+  protected static final Long ADDITIONAL_TIMEOUT_TIME = 5L;
   protected ConcurrentHashMap<Long, ScheduledFuture> unconfirmedMessages = new 
ConcurrentHashMap<>();
   protected ConcurrentHashMap<Long, BlockingQueue<ExecutionCommandEvent>> 
messagesToEmit = new ConcurrentHashMap<>();
 
@@ -119,7 +121,12 @@ public abstract class MessageEmitter {
           emitMessageTask.setScheduledFuture(scheduledFuture);
           unconfirmedMessages.put(event.getMessageId(), scheduledFuture);
 
-          scheduledFuture.get();
+          long timeout = retryCount * retryInterval + ADDITIONAL_TIMEOUT_TIME;
+          try {
+            scheduledFuture.get(timeout, TimeUnit.SECONDS);
+          } catch (TimeoutException e) {
+            processMessageMissing(scheduledFuture, event);
+          }
         } catch (InterruptedException e) {
           // can be interrupted when no responses were received from agent and 
HEARTBEAT_LOST will be fired
           return;
@@ -167,18 +174,7 @@ public abstract class MessageEmitter {
     @Override
     public void run() {
       if (retry_counter >= retryCount) {
-        // generate delivery failed event and cancel emitter
-        ambariEventPublisher.publish(new 
MessageNotDelivered(executionCommandEvent.getHostId()));
-        unconfirmedMessages.remove(executionCommandEvent.getMessageId()); //?
-
-        // remove commands queue for host
-        messagesToEmit.remove(executionCommandEvent.getHostId());
-
-        // cancel retrying to emit command
-        scheduledFuture.cancel(true);
-
-        // cancel checking for new commands for host
-        monitors.get(executionCommandEvent.getHostId()).cancel(true);
+        processMessageMissing(scheduledFuture, executionCommandEvent);
         return;
       }
       try {
@@ -191,6 +187,23 @@ public abstract class MessageEmitter {
     }
   }
 
+  private void processMessageMissing(ScheduledFuture scheduledFuture, 
ExecutionCommandEvent executionCommandEvent) {
+    // generate delivery failed event and cancel emitter
+    ambariEventPublisher.publish(new 
MessageNotDelivered(executionCommandEvent.getHostId()));
+    unconfirmedMessages.remove(executionCommandEvent.getMessageId());
+
+    // remove commands queue for host
+    messagesToEmit.remove(executionCommandEvent.getHostId());
+
+    // cancel retrying to emit command
+    scheduledFuture.cancel(true);
+
+    // cancel checking for new commands for host
+    if (monitors.containsKey(executionCommandEvent.getHostId())) {
+      monitors.get(executionCommandEvent.getHostId()).cancel(true);
+    }
+  }
+
   protected abstract String getDestination(STOMPEvent stompEvent);
 
   /**

-- 
To stop receiving notification emails like this one, please contact
[email protected].

Reply via email to