AMBARI-22264. Sometimes request and host component status updates are lost. (mpapirkovskyy)
Project: http://git-wip-us.apache.org/repos/asf/ambari/repo Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/f3e98bf7 Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/f3e98bf7 Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/f3e98bf7 Branch: refs/heads/branch-3.0-perf Commit: f3e98bf7d3e807af6bb26e43c17322d9928fbe18 Parents: 6231996 Author: Myroslav Papirkovskyi <[email protected]> Authored: Tue Oct 17 17:46:06 2017 +0300 Committer: Myroslav Papirkovskyi <[email protected]> Committed: Wed Oct 18 17:48:56 2017 +0300 ---------------------------------------------------------------------- .../agent/stomp/AmbariSubscriptionRegistry.java | 8 +++++++- .../publishers/BufferedUpdateEventPublisher.java | 16 ++++++++-------- .../HostComponentUpdateEventPublisher.java | 13 +++++++++---- .../publishers/ServiceUpdateEventPublisher.java | 6 ++++-- .../publishers/StateUpdateEventPublisher.java | 5 ++--- 5 files changed, 30 insertions(+), 18 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ambari/blob/f3e98bf7/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/AmbariSubscriptionRegistry.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/AmbariSubscriptionRegistry.java b/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/AmbariSubscriptionRegistry.java index aaab7bf..68330c6 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/AmbariSubscriptionRegistry.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/AmbariSubscriptionRegistry.java @@ -245,6 +245,7 @@ public class AmbariSubscriptionRegistry extends AbstractSubscriptionRegistry { private final Map<String, LinkedMultiValueMap<String, String>> accessCache = new ConcurrentHashMap<>(cacheLimit); + //TODO optimize usage of this cache on perf cluster private final Cache<String, String> notSubscriptionCache = CacheBuilder.newBuilder().maximumSize(cacheLimit).build(); @@ -275,7 +276,7 @@ public class AmbariSubscriptionRegistry extends AbstractSubscriptionRegistry { } public void updateAfterNewSubscription(String destination, String sessionId, String subsId) { - this.accessCache.computeIfPresent(destination, (key, value) -> { + LinkedMultiValueMap<String, String> updatedMap = this.accessCache.computeIfPresent(destination, (key, value) -> { if (getPathMatcher().match(destination, key)) { LinkedMultiValueMap<String, String> subs = value.deepCopy(); subs.add(sessionId, subsId); @@ -283,6 +284,9 @@ public class AmbariSubscriptionRegistry extends AbstractSubscriptionRegistry { } return value; }); + if (updatedMap == null) { + this.notSubscriptionCache.invalidate(destination); + } } public void updateAfterRemovedSubscription(String sessionId, String subsId) { @@ -301,6 +305,7 @@ public class AmbariSubscriptionRegistry extends AbstractSubscriptionRegistry { iterator.remove(); } else { + this.notSubscriptionCache.invalidate(destination); this.accessCache.put(destination, sessionMap.deepCopy()); } } @@ -318,6 +323,7 @@ public class AmbariSubscriptionRegistry extends AbstractSubscriptionRegistry { iterator.remove(); } else { + this.notSubscriptionCache.invalidate(destination); this.accessCache.put(destination, sessionMap.deepCopy()); } } http://git-wip-us.apache.org/repos/asf/ambari/blob/f3e98bf7/ambari-server/src/main/java/org/apache/ambari/server/events/publishers/BufferedUpdateEventPublisher.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/events/publishers/BufferedUpdateEventPublisher.java b/ambari-server/src/main/java/org/apache/ambari/server/events/publishers/BufferedUpdateEventPublisher.java index 75549c3..e02785f 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/events/publishers/BufferedUpdateEventPublisher.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/events/publishers/BufferedUpdateEventPublisher.java @@ -19,7 +19,6 @@ package org.apache.ambari.server.events.publishers; import java.util.ArrayList; -import java.util.Collection; import java.util.List; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.Executors; @@ -34,21 +33,21 @@ import com.google.inject.Singleton; @Singleton public abstract class BufferedUpdateEventPublisher<T> { - private static final long TIMEOUT = 1L; + private static final long TIMEOUT = 1000L; private final AtomicLong previousTime = new AtomicLong(0); private final AtomicBoolean collecting = new AtomicBoolean(false); private final ConcurrentLinkedQueue<T> buffer = new ConcurrentLinkedQueue<>(); private final ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(1); - public void publish(Collection<T> event, EventBus m_eventBus) { + public void publish(T event, EventBus m_eventBus) { long eventTime = System.currentTimeMillis(); - if (eventTime - previousTime.get() <= TIMEOUT && !collecting.get()) { - buffer.addAll(event); + if ((eventTime - previousTime.get() <= TIMEOUT) && !collecting.get()) { + buffer.add(event); collecting.set(true); - scheduledExecutorService.schedule(getScheduledPublished(m_eventBus), + scheduledExecutorService.schedule(getScheduledPublisher(m_eventBus), TIMEOUT, TimeUnit.MILLISECONDS); } else if (collecting.get()) { - buffer.addAll(event); + buffer.add(event); } else { //TODO add logging and metrics posting previousTime.set(eventTime); @@ -56,9 +55,10 @@ public abstract class BufferedUpdateEventPublisher<T> { } } - protected abstract Runnable getScheduledPublished(EventBus m_eventBus); + protected abstract Runnable getScheduledPublisher(EventBus m_eventBus); protected List<T> retrieveBuffer() { + resetCollecting(); List<T> bufferContent = new ArrayList<>(); while (!buffer.isEmpty()) { bufferContent.add(buffer.poll()); http://git-wip-us.apache.org/repos/asf/ambari/blob/f3e98bf7/ambari-server/src/main/java/org/apache/ambari/server/events/publishers/HostComponentUpdateEventPublisher.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/events/publishers/HostComponentUpdateEventPublisher.java b/ambari-server/src/main/java/org/apache/ambari/server/events/publishers/HostComponentUpdateEventPublisher.java index a8c1b1d..f7fea1d 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/events/publishers/HostComponentUpdateEventPublisher.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/events/publishers/HostComponentUpdateEventPublisher.java @@ -19,6 +19,7 @@ package org.apache.ambari.server.events.publishers; import java.util.List; +import java.util.stream.Collectors; import org.apache.ambari.server.events.HostComponentUpdate; import org.apache.ambari.server.events.HostComponentsUpdateEvent; @@ -27,10 +28,10 @@ import com.google.common.eventbus.EventBus; import com.google.inject.Singleton; @Singleton -public class HostComponentUpdateEventPublisher extends BufferedUpdateEventPublisher<HostComponentUpdate> { +public class HostComponentUpdateEventPublisher extends BufferedUpdateEventPublisher<HostComponentsUpdateEvent> { @Override - protected Runnable getScheduledPublished(EventBus m_eventBus) { + protected Runnable getScheduledPublisher(EventBus m_eventBus) { return new HostComponentsEventRunnable(m_eventBus); } @@ -44,12 +45,16 @@ public class HostComponentUpdateEventPublisher extends BufferedUpdateEventPublis @Override public void run() { - List<HostComponentUpdate> hostComponentUpdates = retrieveBuffer(); + List<HostComponentsUpdateEvent> hostComponentUpdateEvents = retrieveBuffer(); + if (hostComponentUpdateEvents.isEmpty()) { + return; + } + List<HostComponentUpdate> hostComponentUpdates = hostComponentUpdateEvents.stream().flatMap( + u -> u.getHostComponentUpdates().stream()).collect(Collectors.toList()); HostComponentsUpdateEvent resultEvents = new HostComponentsUpdateEvent(hostComponentUpdates); //TODO add logging and metrics posting eventBus.post(resultEvents); - resetCollecting(); } } } http://git-wip-us.apache.org/repos/asf/ambari/blob/f3e98bf7/ambari-server/src/main/java/org/apache/ambari/server/events/publishers/ServiceUpdateEventPublisher.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/events/publishers/ServiceUpdateEventPublisher.java b/ambari-server/src/main/java/org/apache/ambari/server/events/publishers/ServiceUpdateEventPublisher.java index 7bf1290..8f45859 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/events/publishers/ServiceUpdateEventPublisher.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/events/publishers/ServiceUpdateEventPublisher.java @@ -30,7 +30,7 @@ import com.google.inject.Singleton; public class ServiceUpdateEventPublisher extends BufferedUpdateEventPublisher<ServiceUpdateEvent> { @Override - protected Runnable getScheduledPublished(EventBus m_eventBus) { + protected Runnable getScheduledPublisher(EventBus m_eventBus) { return new ServiceEventRunnable(m_eventBus); } @@ -45,6 +45,9 @@ public class ServiceUpdateEventPublisher extends BufferedUpdateEventPublisher<Se @Override public void run() { List<ServiceUpdateEvent> serviceUpdates = retrieveBuffer(); + if (serviceUpdates.isEmpty()) { + return; + } List<ServiceUpdateEvent> filtered = new ArrayList<>(); for (ServiceUpdateEvent event : serviceUpdates) { int pos = filtered.indexOf(event); @@ -62,7 +65,6 @@ public class ServiceUpdateEventPublisher extends BufferedUpdateEventPublisher<Se for (ServiceUpdateEvent serviceUpdateEvent : serviceUpdates) { eventBus.post(serviceUpdateEvent); } - resetCollecting(); } } } http://git-wip-us.apache.org/repos/asf/ambari/blob/f3e98bf7/ambari-server/src/main/java/org/apache/ambari/server/events/publishers/StateUpdateEventPublisher.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/events/publishers/StateUpdateEventPublisher.java b/ambari-server/src/main/java/org/apache/ambari/server/events/publishers/StateUpdateEventPublisher.java index 7d343a5..80c9813 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/events/publishers/StateUpdateEventPublisher.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/events/publishers/StateUpdateEventPublisher.java @@ -17,7 +17,6 @@ */ package org.apache.ambari.server.events.publishers; -import java.util.Collections; import java.util.concurrent.Executors; import org.apache.ambari.server.events.AmbariUpdateEvent; @@ -53,9 +52,9 @@ public class StateUpdateEventPublisher { if (event.getType().equals(AmbariUpdateEvent.Type.REQUEST)) { requestUpdateEventPublisher.publish((RequestUpdateEvent) event, m_eventBus); } else if (event.getType().equals(AmbariUpdateEvent.Type.HOSTCOMPONENT)) { - hostComponentUpdateEventPublisher.publish(((HostComponentsUpdateEvent) event).getHostComponentUpdates(), m_eventBus); + hostComponentUpdateEventPublisher.publish((HostComponentsUpdateEvent) event, m_eventBus); } else if (event.getType().equals(AmbariUpdateEvent.Type.SERVICE)) { - serviceUpdateEventPublisher.publish(Collections.singletonList((ServiceUpdateEvent) event), m_eventBus); + serviceUpdateEventPublisher.publish((ServiceUpdateEvent) event, m_eventBus); } else { m_eventBus.post(event); }
