This is an automated email from the ASF dual-hosted git repository.
mpapirkovskyy pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/ambari.git
The following commit(s) were added to refs/heads/trunk by this push:
new d623dd5 AMBARI-24076. Guarantee execution command retrying task
completion. (mpapirkovskyy) (#1516)
d623dd5 is described below
commit d623dd530d9ba3ebc05211ac982093c0c7f5bcdf
Author: Myroslav Papirkovskyi <[email protected]>
AuthorDate: Wed Jun 13 13:53:29 2018 +0300
AMBARI-24076. Guarantee execution command retrying task completion.
(mpapirkovskyy) (#1516)
---
.../ambari/server/events/MessageEmitter.java | 39 ++++++++++++++--------
1 file changed, 26 insertions(+), 13 deletions(-)
diff --git
a/ambari-server/src/main/java/org/apache/ambari/server/events/MessageEmitter.java
b/ambari-server/src/main/java/org/apache/ambari/server/events/MessageEmitter.java
index 1483b3f..47b55ed 100644
---
a/ambari-server/src/main/java/org/apache/ambari/server/events/MessageEmitter.java
+++
b/ambari-server/src/main/java/org/apache/ambari/server/events/MessageEmitter.java
@@ -28,6 +28,7 @@ import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.ambari.server.AmbariException;
@@ -61,6 +62,7 @@ public abstract class MessageEmitter {
new
ThreadFactoryBuilder().setNameFormat("ambari-message-monitor-%d").build());
protected static final AtomicLong MESSAGE_ID = new AtomicLong(0);
+ protected static final Long ADDITIONAL_TIMEOUT_TIME = 5L;
protected ConcurrentHashMap<Long, ScheduledFuture> unconfirmedMessages = new
ConcurrentHashMap<>();
protected ConcurrentHashMap<Long, BlockingQueue<ExecutionCommandEvent>>
messagesToEmit = new ConcurrentHashMap<>();
@@ -119,7 +121,12 @@ public abstract class MessageEmitter {
emitMessageTask.setScheduledFuture(scheduledFuture);
unconfirmedMessages.put(event.getMessageId(), scheduledFuture);
- scheduledFuture.get();
+ long timeout = retryCount * retryInterval + ADDITIONAL_TIMEOUT_TIME;
+ try {
+ scheduledFuture.get(timeout, TimeUnit.SECONDS);
+ } catch (TimeoutException e) {
+ processMessageMissing(scheduledFuture, event);
+ }
} catch (InterruptedException e) {
// can be interrupted when no responses were received from agent and
HEARTBEAT_LOST will be fired
return;
@@ -167,18 +174,7 @@ public abstract class MessageEmitter {
@Override
public void run() {
if (retry_counter >= retryCount) {
- // generate delivery failed event and cancel emitter
- ambariEventPublisher.publish(new
MessageNotDelivered(executionCommandEvent.getHostId()));
- unconfirmedMessages.remove(executionCommandEvent.getMessageId()); //?
-
- // remove commands queue for host
- messagesToEmit.remove(executionCommandEvent.getHostId());
-
- // cancel retrying to emit command
- scheduledFuture.cancel(true);
-
- // cancel checking for new commands for host
- monitors.get(executionCommandEvent.getHostId()).cancel(true);
+ processMessageMissing(scheduledFuture, executionCommandEvent);
return;
}
try {
@@ -191,6 +187,23 @@ public abstract class MessageEmitter {
}
}
+ private void processMessageMissing(ScheduledFuture scheduledFuture,
ExecutionCommandEvent executionCommandEvent) {
+ // generate delivery failed event and cancel emitter
+ ambariEventPublisher.publish(new
MessageNotDelivered(executionCommandEvent.getHostId()));
+ unconfirmedMessages.remove(executionCommandEvent.getMessageId());
+
+ // remove commands queue for host
+ messagesToEmit.remove(executionCommandEvent.getHostId());
+
+ // cancel retrying to emit command
+ scheduledFuture.cancel(true);
+
+ // cancel checking for new commands for host
+ if (monitors.containsKey(executionCommandEvent.getHostId())) {
+ monitors.get(executionCommandEvent.getHostId()).cancel(true);
+ }
+ }
+
protected abstract String getDestination(STOMPEvent stompEvent);
/**
--
To stop receiving notification emails like this one, please contact
[email protected].