AMBARI-22264. Sometimes request and host component status updates are lost. 
(mpapirkovskyy)


Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/f3e98bf7
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/f3e98bf7
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/f3e98bf7

Branch: refs/heads/branch-3.0-perf
Commit: f3e98bf7d3e807af6bb26e43c17322d9928fbe18
Parents: 6231996
Author: Myroslav Papirkovskyi <[email protected]>
Authored: Tue Oct 17 17:46:06 2017 +0300
Committer: Myroslav Papirkovskyi <[email protected]>
Committed: Wed Oct 18 17:48:56 2017 +0300

----------------------------------------------------------------------
 .../agent/stomp/AmbariSubscriptionRegistry.java     |  8 +++++++-
 .../publishers/BufferedUpdateEventPublisher.java    | 16 ++++++++--------
 .../HostComponentUpdateEventPublisher.java          | 13 +++++++++----
 .../publishers/ServiceUpdateEventPublisher.java     |  6 ++++--
 .../publishers/StateUpdateEventPublisher.java       |  5 ++---
 5 files changed, 30 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ambari/blob/f3e98bf7/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/AmbariSubscriptionRegistry.java
----------------------------------------------------------------------
diff --git 
a/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/AmbariSubscriptionRegistry.java
 
b/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/AmbariSubscriptionRegistry.java
index aaab7bf..68330c6 100644
--- 
a/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/AmbariSubscriptionRegistry.java
+++ 
b/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/AmbariSubscriptionRegistry.java
@@ -245,6 +245,7 @@ public class AmbariSubscriptionRegistry extends 
AbstractSubscriptionRegistry {
     private final Map<String, LinkedMultiValueMap<String, String>> accessCache 
=
         new ConcurrentHashMap<>(cacheLimit);
 
+    //TODO optimize usage of this cache on perf cluster
     private final Cache<String, String> notSubscriptionCache =
         CacheBuilder.newBuilder().maximumSize(cacheLimit).build();
 
@@ -275,7 +276,7 @@ public class AmbariSubscriptionRegistry extends 
AbstractSubscriptionRegistry {
     }
 
     public void updateAfterNewSubscription(String destination, String 
sessionId, String subsId) {
-      this.accessCache.computeIfPresent(destination, (key, value) -> {
+      LinkedMultiValueMap<String, String> updatedMap = 
this.accessCache.computeIfPresent(destination, (key, value) -> {
         if (getPathMatcher().match(destination, key)) {
           LinkedMultiValueMap<String, String> subs = value.deepCopy();
           subs.add(sessionId, subsId);
@@ -283,6 +284,9 @@ public class AmbariSubscriptionRegistry extends 
AbstractSubscriptionRegistry {
         }
         return value;
       });
+      if (updatedMap == null) {
+        this.notSubscriptionCache.invalidate(destination);
+      }
     }
 
     public void updateAfterRemovedSubscription(String sessionId, String 
subsId) {
@@ -301,6 +305,7 @@ public class AmbariSubscriptionRegistry extends 
AbstractSubscriptionRegistry {
             iterator.remove();
           }
           else {
+            this.notSubscriptionCache.invalidate(destination);
             this.accessCache.put(destination, sessionMap.deepCopy());
           }
         }
@@ -318,6 +323,7 @@ public class AmbariSubscriptionRegistry extends 
AbstractSubscriptionRegistry {
             iterator.remove();
           }
           else {
+            this.notSubscriptionCache.invalidate(destination);
             this.accessCache.put(destination, sessionMap.deepCopy());
           }
         }

http://git-wip-us.apache.org/repos/asf/ambari/blob/f3e98bf7/ambari-server/src/main/java/org/apache/ambari/server/events/publishers/BufferedUpdateEventPublisher.java
----------------------------------------------------------------------
diff --git 
a/ambari-server/src/main/java/org/apache/ambari/server/events/publishers/BufferedUpdateEventPublisher.java
 
b/ambari-server/src/main/java/org/apache/ambari/server/events/publishers/BufferedUpdateEventPublisher.java
index 75549c3..e02785f 100644
--- 
a/ambari-server/src/main/java/org/apache/ambari/server/events/publishers/BufferedUpdateEventPublisher.java
+++ 
b/ambari-server/src/main/java/org/apache/ambari/server/events/publishers/BufferedUpdateEventPublisher.java
@@ -19,7 +19,6 @@
 package org.apache.ambari.server.events.publishers;
 
 import java.util.ArrayList;
-import java.util.Collection;
 import java.util.List;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.Executors;
@@ -34,21 +33,21 @@ import com.google.inject.Singleton;
 @Singleton
 public abstract class BufferedUpdateEventPublisher<T> {
 
-  private static final long TIMEOUT = 1L;
+  private static final long TIMEOUT = 1000L;
   private final AtomicLong previousTime = new AtomicLong(0);
   private final AtomicBoolean collecting = new AtomicBoolean(false);
   private final ConcurrentLinkedQueue<T> buffer = new 
ConcurrentLinkedQueue<>();
   private final ScheduledExecutorService scheduledExecutorService = 
Executors.newScheduledThreadPool(1);
 
-  public void publish(Collection<T> event, EventBus m_eventBus) {
+  public void publish(T event, EventBus m_eventBus) {
     long eventTime = System.currentTimeMillis();
-    if (eventTime - previousTime.get() <= TIMEOUT && !collecting.get()) {
-      buffer.addAll(event);
+    if ((eventTime - previousTime.get() <= TIMEOUT) && !collecting.get()) {
+      buffer.add(event);
       collecting.set(true);
-      scheduledExecutorService.schedule(getScheduledPublished(m_eventBus),
+      scheduledExecutorService.schedule(getScheduledPublisher(m_eventBus),
           TIMEOUT, TimeUnit.MILLISECONDS);
     } else if (collecting.get()) {
-      buffer.addAll(event);
+      buffer.add(event);
     } else {
       //TODO add logging and metrics posting
       previousTime.set(eventTime);
@@ -56,9 +55,10 @@ public abstract class BufferedUpdateEventPublisher<T> {
     }
   }
 
-  protected abstract Runnable getScheduledPublished(EventBus m_eventBus);
+  protected abstract Runnable getScheduledPublisher(EventBus m_eventBus);
 
   protected List<T> retrieveBuffer() {
+    resetCollecting();
     List<T> bufferContent = new ArrayList<>();
     while (!buffer.isEmpty()) {
       bufferContent.add(buffer.poll());

http://git-wip-us.apache.org/repos/asf/ambari/blob/f3e98bf7/ambari-server/src/main/java/org/apache/ambari/server/events/publishers/HostComponentUpdateEventPublisher.java
----------------------------------------------------------------------
diff --git 
a/ambari-server/src/main/java/org/apache/ambari/server/events/publishers/HostComponentUpdateEventPublisher.java
 
b/ambari-server/src/main/java/org/apache/ambari/server/events/publishers/HostComponentUpdateEventPublisher.java
index a8c1b1d..f7fea1d 100644
--- 
a/ambari-server/src/main/java/org/apache/ambari/server/events/publishers/HostComponentUpdateEventPublisher.java
+++ 
b/ambari-server/src/main/java/org/apache/ambari/server/events/publishers/HostComponentUpdateEventPublisher.java
@@ -19,6 +19,7 @@
 package org.apache.ambari.server.events.publishers;
 
 import java.util.List;
+import java.util.stream.Collectors;
 
 import org.apache.ambari.server.events.HostComponentUpdate;
 import org.apache.ambari.server.events.HostComponentsUpdateEvent;
@@ -27,10 +28,10 @@ import com.google.common.eventbus.EventBus;
 import com.google.inject.Singleton;
 
 @Singleton
-public class HostComponentUpdateEventPublisher extends 
BufferedUpdateEventPublisher<HostComponentUpdate> {
+public class HostComponentUpdateEventPublisher extends 
BufferedUpdateEventPublisher<HostComponentsUpdateEvent> {
 
   @Override
-  protected Runnable getScheduledPublished(EventBus m_eventBus) {
+  protected Runnable getScheduledPublisher(EventBus m_eventBus) {
     return new HostComponentsEventRunnable(m_eventBus);
   }
 
@@ -44,12 +45,16 @@ public class HostComponentUpdateEventPublisher extends 
BufferedUpdateEventPublis
 
     @Override
     public void run() {
-      List<HostComponentUpdate> hostComponentUpdates = retrieveBuffer();
+      List<HostComponentsUpdateEvent> hostComponentUpdateEvents = 
retrieveBuffer();
+      if (hostComponentUpdateEvents.isEmpty()) {
+        return;
+      }
+      List<HostComponentUpdate> hostComponentUpdates = 
hostComponentUpdateEvents.stream().flatMap(
+          u -> 
u.getHostComponentUpdates().stream()).collect(Collectors.toList());
 
       HostComponentsUpdateEvent resultEvents = new 
HostComponentsUpdateEvent(hostComponentUpdates);
       //TODO add logging and metrics posting
       eventBus.post(resultEvents);
-      resetCollecting();
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/ambari/blob/f3e98bf7/ambari-server/src/main/java/org/apache/ambari/server/events/publishers/ServiceUpdateEventPublisher.java
----------------------------------------------------------------------
diff --git 
a/ambari-server/src/main/java/org/apache/ambari/server/events/publishers/ServiceUpdateEventPublisher.java
 
b/ambari-server/src/main/java/org/apache/ambari/server/events/publishers/ServiceUpdateEventPublisher.java
index 7bf1290..8f45859 100644
--- 
a/ambari-server/src/main/java/org/apache/ambari/server/events/publishers/ServiceUpdateEventPublisher.java
+++ 
b/ambari-server/src/main/java/org/apache/ambari/server/events/publishers/ServiceUpdateEventPublisher.java
@@ -30,7 +30,7 @@ import com.google.inject.Singleton;
 public class ServiceUpdateEventPublisher extends 
BufferedUpdateEventPublisher<ServiceUpdateEvent> {
 
   @Override
-  protected Runnable getScheduledPublished(EventBus m_eventBus) {
+  protected Runnable getScheduledPublisher(EventBus m_eventBus) {
     return new ServiceEventRunnable(m_eventBus);
   }
 
@@ -45,6 +45,9 @@ public class ServiceUpdateEventPublisher extends 
BufferedUpdateEventPublisher<Se
     @Override
     public void run() {
       List<ServiceUpdateEvent> serviceUpdates = retrieveBuffer();
+      if (serviceUpdates.isEmpty()) {
+        return;
+      }
       List<ServiceUpdateEvent> filtered = new ArrayList<>();
       for (ServiceUpdateEvent event : serviceUpdates) {
         int pos = filtered.indexOf(event);
@@ -62,7 +65,6 @@ public class ServiceUpdateEventPublisher extends 
BufferedUpdateEventPublisher<Se
       for (ServiceUpdateEvent serviceUpdateEvent : serviceUpdates) {
         eventBus.post(serviceUpdateEvent);
       }
-      resetCollecting();
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/ambari/blob/f3e98bf7/ambari-server/src/main/java/org/apache/ambari/server/events/publishers/StateUpdateEventPublisher.java
----------------------------------------------------------------------
diff --git 
a/ambari-server/src/main/java/org/apache/ambari/server/events/publishers/StateUpdateEventPublisher.java
 
b/ambari-server/src/main/java/org/apache/ambari/server/events/publishers/StateUpdateEventPublisher.java
index 7d343a5..80c9813 100644
--- 
a/ambari-server/src/main/java/org/apache/ambari/server/events/publishers/StateUpdateEventPublisher.java
+++ 
b/ambari-server/src/main/java/org/apache/ambari/server/events/publishers/StateUpdateEventPublisher.java
@@ -17,7 +17,6 @@
  */
 package org.apache.ambari.server.events.publishers;
 
-import java.util.Collections;
 import java.util.concurrent.Executors;
 
 import org.apache.ambari.server.events.AmbariUpdateEvent;
@@ -53,9 +52,9 @@ public class StateUpdateEventPublisher {
     if (event.getType().equals(AmbariUpdateEvent.Type.REQUEST)) {
       requestUpdateEventPublisher.publish((RequestUpdateEvent) event, 
m_eventBus);
     } else if (event.getType().equals(AmbariUpdateEvent.Type.HOSTCOMPONENT)) {
-      hostComponentUpdateEventPublisher.publish(((HostComponentsUpdateEvent) 
event).getHostComponentUpdates(), m_eventBus);
+      hostComponentUpdateEventPublisher.publish((HostComponentsUpdateEvent) 
event, m_eventBus);
     } else if (event.getType().equals(AmbariUpdateEvent.Type.SERVICE)) {
-      
serviceUpdateEventPublisher.publish(Collections.singletonList((ServiceUpdateEvent)
 event), m_eventBus);
+      serviceUpdateEventPublisher.publish((ServiceUpdateEvent) event, 
m_eventBus);
     } else {
       m_eventBus.post(event);
     }

Reply via email to