This is an automated email from the ASF dual-hosted git repository.

mpapirkovskyy pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/ambari.git


The following commit(s) were added to refs/heads/branch-2.7 by this push:
     new 77faf73  AMBARI-25063. Restarting Ambari Server Fails Due to Recursive 
Injection of STOMPUpdatePublisher. (mpapirkovskyy) (#2826)
77faf73 is described below

commit 77faf73ecff7dfd1fc9d06f34eff48fdd697d7f8
Author: Myroslav Papirkovskyi <mpapirkovs...@apache.org>
AuthorDate: Wed Feb 20 21:22:07 2019 +0200

    AMBARI-25063. Restarting Ambari Server Fails Due to Recursive Injection of 
STOMPUpdatePublisher. (mpapirkovskyy) (#2826)
---
 .../publishers/BufferedUpdateEventPublisher.java   | 24 +++++++++++-
 .../HostComponentUpdateEventPublisher.java         | 16 +++++++-
 .../publishers/RequestUpdateEventPublisher.java    | 15 +++++++-
 .../events/publishers/STOMPUpdatePublisher.java    | 43 ++++++++++++----------
 .../publishers/ServiceUpdateEventPublisher.java    | 16 +++++++-
 5 files changed, 86 insertions(+), 28 deletions(-)

diff --git 
a/ambari-server/src/main/java/org/apache/ambari/server/events/publishers/BufferedUpdateEventPublisher.java
 
b/ambari-server/src/main/java/org/apache/ambari/server/events/publishers/BufferedUpdateEventPublisher.java
index 25d396c..c315f12 100644
--- 
a/ambari-server/src/main/java/org/apache/ambari/server/events/publishers/BufferedUpdateEventPublisher.java
+++ 
b/ambari-server/src/main/java/org/apache/ambari/server/events/publishers/BufferedUpdateEventPublisher.java
@@ -20,22 +20,29 @@ package org.apache.ambari.server.events.publishers;
 
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Objects;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 
+import org.apache.ambari.server.events.STOMPEvent;
+
 import com.google.common.eventbus.EventBus;
-import com.google.inject.Singleton;
 
-@Singleton
 public abstract class BufferedUpdateEventPublisher<T> {
 
   private static final long TIMEOUT = 1000L;
   private final ConcurrentLinkedQueue<T> buffer = new 
ConcurrentLinkedQueue<>();
 
+  public abstract STOMPEvent.Type getType();
+
   private ScheduledExecutorService scheduledExecutorService;
 
+  public BufferedUpdateEventPublisher(STOMPUpdatePublisher 
stompUpdatePublisher) {
+    stompUpdatePublisher.registerPublisher(this);
+  }
+
   public void publish(T event, EventBus m_eventBus) {
     if (scheduledExecutorService == null) {
       scheduledExecutorService =
@@ -77,4 +84,17 @@ public abstract class BufferedUpdateEventPublisher<T> {
       mergeBufferAndPost(events, m_eventBus);
     }
   }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) return true;
+    if (o == null || getClass() != o.getClass()) return false;
+    BufferedUpdateEventPublisher<?> that = (BufferedUpdateEventPublisher<?>) o;
+    return Objects.equals(getType(), that.getType());
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(getType());
+  }
 }
diff --git 
a/ambari-server/src/main/java/org/apache/ambari/server/events/publishers/HostComponentUpdateEventPublisher.java
 
b/ambari-server/src/main/java/org/apache/ambari/server/events/publishers/HostComponentUpdateEventPublisher.java
index d9f51e8..f3b3144 100644
--- 
a/ambari-server/src/main/java/org/apache/ambari/server/events/publishers/HostComponentUpdateEventPublisher.java
+++ 
b/ambari-server/src/main/java/org/apache/ambari/server/events/publishers/HostComponentUpdateEventPublisher.java
@@ -21,15 +21,27 @@ package org.apache.ambari.server.events.publishers;
 import java.util.List;
 import java.util.stream.Collectors;
 
+import org.apache.ambari.server.EagerSingleton;
 import org.apache.ambari.server.events.HostComponentUpdate;
 import org.apache.ambari.server.events.HostComponentsUpdateEvent;
+import org.apache.ambari.server.events.STOMPEvent;
 
 import com.google.common.eventbus.EventBus;
-import com.google.inject.Singleton;
+import com.google.inject.Inject;
 
-@Singleton
+@EagerSingleton
 public class HostComponentUpdateEventPublisher extends 
BufferedUpdateEventPublisher<HostComponentsUpdateEvent> {
 
+  @Inject
+  public HostComponentUpdateEventPublisher(STOMPUpdatePublisher 
stompUpdatePublisher) {
+    super(stompUpdatePublisher);
+  }
+
+  @Override
+  public STOMPEvent.Type getType() {
+    return STOMPEvent.Type.HOSTCOMPONENT;
+  }
+
   @Override
   public void mergeBufferAndPost(List<HostComponentsUpdateEvent> events, 
EventBus m_eventBus) {
     List<HostComponentUpdate> hostComponentUpdates = events.stream().flatMap(
diff --git 
a/ambari-server/src/main/java/org/apache/ambari/server/events/publishers/RequestUpdateEventPublisher.java
 
b/ambari-server/src/main/java/org/apache/ambari/server/events/publishers/RequestUpdateEventPublisher.java
index 42f22ba..7ff4bdd 100644
--- 
a/ambari-server/src/main/java/org/apache/ambari/server/events/publishers/RequestUpdateEventPublisher.java
+++ 
b/ambari-server/src/main/java/org/apache/ambari/server/events/publishers/RequestUpdateEventPublisher.java
@@ -22,8 +22,10 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
+import org.apache.ambari.server.EagerSingleton;
 import org.apache.ambari.server.controller.internal.CalculatedStatus;
 import org.apache.ambari.server.events.RequestUpdateEvent;
+import org.apache.ambari.server.events.STOMPEvent;
 import org.apache.ambari.server.orm.dao.ClusterDAO;
 import org.apache.ambari.server.orm.dao.HostRoleCommandDAO;
 import org.apache.ambari.server.orm.dao.RequestDAO;
@@ -32,9 +34,8 @@ import org.apache.ambari.server.topology.TopologyManager;
 
 import com.google.common.eventbus.EventBus;
 import com.google.inject.Inject;
-import com.google.inject.Singleton;
 
-@Singleton
+@EagerSingleton
 public class RequestUpdateEventPublisher extends 
BufferedUpdateEventPublisher<RequestUpdateEvent> {
 
   @Inject
@@ -49,6 +50,16 @@ public class RequestUpdateEventPublisher extends 
BufferedUpdateEventPublisher<Re
   @Inject
   private ClusterDAO clusterDAO;
 
+  @Inject
+  public RequestUpdateEventPublisher(STOMPUpdatePublisher 
stompUpdatePublisher) {
+    super(stompUpdatePublisher);
+  }
+
+  @Override
+  public STOMPEvent.Type getType() {
+    return STOMPEvent.Type.REQUEST;
+  }
+
   @Override
   public void mergeBufferAndPost(List<RequestUpdateEvent> events, EventBus 
m_eventBus) {
     Map<Long, RequestUpdateEvent> filteredRequests = new HashMap<>();
diff --git 
a/ambari-server/src/main/java/org/apache/ambari/server/events/publishers/STOMPUpdatePublisher.java
 
b/ambari-server/src/main/java/org/apache/ambari/server/events/publishers/STOMPUpdatePublisher.java
index 073531b..0a1fb4d 100644
--- 
a/ambari-server/src/main/java/org/apache/ambari/server/events/publishers/STOMPUpdatePublisher.java
+++ 
b/ambari-server/src/main/java/org/apache/ambari/server/events/publishers/STOMPUpdatePublisher.java
@@ -17,37 +17,29 @@
  */
 package org.apache.ambari.server.events.publishers;
 
+import java.util.ArrayList;
+import java.util.List;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 
 import org.apache.ambari.server.AmbariRuntimeException;
 import org.apache.ambari.server.events.DefaultMessageEmitter;
-import org.apache.ambari.server.events.HostComponentsUpdateEvent;
-import org.apache.ambari.server.events.RequestUpdateEvent;
 import org.apache.ambari.server.events.STOMPEvent;
-import org.apache.ambari.server.events.ServiceUpdateEvent;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import com.google.common.eventbus.AsyncEventBus;
 import com.google.common.eventbus.EventBus;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
-import com.google.inject.Inject;
 import com.google.inject.Singleton;
 
 @Singleton
 public class STOMPUpdatePublisher {
+  private static final Logger LOG = 
LoggerFactory.getLogger(STOMPUpdatePublisher.class);
 
   private final EventBus agentEventBus;
   private final EventBus apiEventBus;
 
-  @Inject
-  private RequestUpdateEventPublisher requestUpdateEventPublisher;
-
-  @Inject
-  private HostComponentUpdateEventPublisher hostComponentUpdateEventPublisher;
-
-  @Inject
-  private ServiceUpdateEventPublisher serviceUpdateEventPublisher;
-
   private final ExecutorService threadPoolExecutorAgent = 
Executors.newSingleThreadExecutor(
       new ThreadFactoryBuilder().setNameFormat("stomp-agent-bus-%d").build());
   private final ExecutorService threadPoolExecutorAPI = 
Executors.newSingleThreadExecutor(
@@ -61,6 +53,16 @@ public class STOMPUpdatePublisher {
         threadPoolExecutorAPI);
   }
 
+  private List<BufferedUpdateEventPublisher> publishers = new ArrayList<>();
+
+  public void registerPublisher(BufferedUpdateEventPublisher publisher) {
+    if (publishers.contains(publisher)) {
+      LOG.error("Publisher for type {} is already in use", 
publisher.getType());
+    } else {
+      publishers.add(publisher);
+    }
+  }
+
   public void publish(STOMPEvent event) {
     if 
(DefaultMessageEmitter.DEFAULT_AGENT_EVENT_TYPES.contains(event.getType())) {
       publishAgent(event);
@@ -73,13 +75,14 @@ public class STOMPUpdatePublisher {
   }
 
   private void publishAPI(STOMPEvent event) {
-    if (event.getType().equals(STOMPEvent.Type.REQUEST)) {
-      requestUpdateEventPublisher.publish((RequestUpdateEvent) event, 
apiEventBus);
-    } else if (event.getType().equals(STOMPEvent.Type.HOSTCOMPONENT)) {
-      hostComponentUpdateEventPublisher.publish((HostComponentsUpdateEvent) 
event, apiEventBus);
-    } else if (event.getType().equals(STOMPEvent.Type.SERVICE)) {
-      serviceUpdateEventPublisher.publish((ServiceUpdateEvent) event, 
apiEventBus);
-    } else {
+    boolean published = false;
+    for (BufferedUpdateEventPublisher publisher : publishers) {
+      if (publisher.getType().equals(event.getType())) {
+        publisher.publish(event, apiEventBus);
+        published = true;
+      }
+    }
+    if (!published) {
       apiEventBus.post(event);
     }
   }
diff --git 
a/ambari-server/src/main/java/org/apache/ambari/server/events/publishers/ServiceUpdateEventPublisher.java
 
b/ambari-server/src/main/java/org/apache/ambari/server/events/publishers/ServiceUpdateEventPublisher.java
index 6dfef43..c5802ac 100644
--- 
a/ambari-server/src/main/java/org/apache/ambari/server/events/publishers/ServiceUpdateEventPublisher.java
+++ 
b/ambari-server/src/main/java/org/apache/ambari/server/events/publishers/ServiceUpdateEventPublisher.java
@@ -23,18 +23,30 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
+import org.apache.ambari.server.EagerSingleton;
 import 
org.apache.ambari.server.controller.utilities.ServiceCalculatedStateFactory;
 import 
org.apache.ambari.server.controller.utilities.state.ServiceCalculatedState;
+import org.apache.ambari.server.events.STOMPEvent;
 import org.apache.ambari.server.events.ServiceUpdateEvent;
 import org.apache.ambari.server.state.State;
 
 import com.google.common.eventbus.EventBus;
-import com.google.inject.Singleton;
+import com.google.inject.Inject;
 
-@Singleton
+@EagerSingleton
 public class ServiceUpdateEventPublisher extends 
BufferedUpdateEventPublisher<ServiceUpdateEvent> {
   private Map<String, Map<String, State>> states = new HashMap<>();
 
+  @Inject
+  public ServiceUpdateEventPublisher(STOMPUpdatePublisher 
stompUpdatePublisher) {
+    super(stompUpdatePublisher);
+  }
+
+
+  @Override
+  public STOMPEvent.Type getType() {
+    return STOMPEvent.Type.SERVICE;
+  }
 
   @Override
   public void mergeBufferAndPost(List<ServiceUpdateEvent> events, EventBus 
eventBus) {

Reply via email to