This is an automated email from the ASF dual-hosted git repository.
mpapirkovskyy pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/ambari.git
The following commit(s) were added to refs/heads/branch-2.7 by this push:
new 77faf73 AMBARI-25063. Restarting Ambari Server Fails Due to Recursive
Injection of STOMPUpdatePublisher. (mpapirkovskyy) (#2826)
77faf73 is described below
commit 77faf73ecff7dfd1fc9d06f34eff48fdd697d7f8
Author: Myroslav Papirkovskyi <[email protected]>
AuthorDate: Wed Feb 20 21:22:07 2019 +0200
AMBARI-25063. Restarting Ambari Server Fails Due to Recursive Injection of
STOMPUpdatePublisher. (mpapirkovskyy) (#2826)
---
.../publishers/BufferedUpdateEventPublisher.java | 24 +++++++++++-
.../HostComponentUpdateEventPublisher.java | 16 +++++++-
.../publishers/RequestUpdateEventPublisher.java | 15 +++++++-
.../events/publishers/STOMPUpdatePublisher.java | 43 ++++++++++++----------
.../publishers/ServiceUpdateEventPublisher.java | 16 +++++++-
5 files changed, 86 insertions(+), 28 deletions(-)
diff --git
a/ambari-server/src/main/java/org/apache/ambari/server/events/publishers/BufferedUpdateEventPublisher.java
b/ambari-server/src/main/java/org/apache/ambari/server/events/publishers/BufferedUpdateEventPublisher.java
index 25d396c..c315f12 100644
---
a/ambari-server/src/main/java/org/apache/ambari/server/events/publishers/BufferedUpdateEventPublisher.java
+++
b/ambari-server/src/main/java/org/apache/ambari/server/events/publishers/BufferedUpdateEventPublisher.java
@@ -20,22 +20,29 @@ package org.apache.ambari.server.events.publishers;
import java.util.ArrayList;
import java.util.List;
+import java.util.Objects;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
+import org.apache.ambari.server.events.STOMPEvent;
+
import com.google.common.eventbus.EventBus;
-import com.google.inject.Singleton;
-@Singleton
public abstract class BufferedUpdateEventPublisher<T> {
private static final long TIMEOUT = 1000L;
private final ConcurrentLinkedQueue<T> buffer = new
ConcurrentLinkedQueue<>();
+ public abstract STOMPEvent.Type getType();
+
private ScheduledExecutorService scheduledExecutorService;
+ public BufferedUpdateEventPublisher(STOMPUpdatePublisher
stompUpdatePublisher) {
+ stompUpdatePublisher.registerPublisher(this);
+ }
+
public void publish(T event, EventBus m_eventBus) {
if (scheduledExecutorService == null) {
scheduledExecutorService =
@@ -77,4 +84,17 @@ public abstract class BufferedUpdateEventPublisher<T> {
mergeBufferAndPost(events, m_eventBus);
}
}
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ BufferedUpdateEventPublisher<?> that = (BufferedUpdateEventPublisher<?>) o;
+ return Objects.equals(getType(), that.getType());
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(getType());
+ }
}
diff --git
a/ambari-server/src/main/java/org/apache/ambari/server/events/publishers/HostComponentUpdateEventPublisher.java
b/ambari-server/src/main/java/org/apache/ambari/server/events/publishers/HostComponentUpdateEventPublisher.java
index d9f51e8..f3b3144 100644
---
a/ambari-server/src/main/java/org/apache/ambari/server/events/publishers/HostComponentUpdateEventPublisher.java
+++
b/ambari-server/src/main/java/org/apache/ambari/server/events/publishers/HostComponentUpdateEventPublisher.java
@@ -21,15 +21,27 @@ package org.apache.ambari.server.events.publishers;
import java.util.List;
import java.util.stream.Collectors;
+import org.apache.ambari.server.EagerSingleton;
import org.apache.ambari.server.events.HostComponentUpdate;
import org.apache.ambari.server.events.HostComponentsUpdateEvent;
+import org.apache.ambari.server.events.STOMPEvent;
import com.google.common.eventbus.EventBus;
-import com.google.inject.Singleton;
+import com.google.inject.Inject;
-@Singleton
+@EagerSingleton
public class HostComponentUpdateEventPublisher extends
BufferedUpdateEventPublisher<HostComponentsUpdateEvent> {
+ @Inject
+ public HostComponentUpdateEventPublisher(STOMPUpdatePublisher
stompUpdatePublisher) {
+ super(stompUpdatePublisher);
+ }
+
+ @Override
+ public STOMPEvent.Type getType() {
+ return STOMPEvent.Type.HOSTCOMPONENT;
+ }
+
@Override
public void mergeBufferAndPost(List<HostComponentsUpdateEvent> events,
EventBus m_eventBus) {
List<HostComponentUpdate> hostComponentUpdates = events.stream().flatMap(
diff --git
a/ambari-server/src/main/java/org/apache/ambari/server/events/publishers/RequestUpdateEventPublisher.java
b/ambari-server/src/main/java/org/apache/ambari/server/events/publishers/RequestUpdateEventPublisher.java
index 42f22ba..7ff4bdd 100644
---
a/ambari-server/src/main/java/org/apache/ambari/server/events/publishers/RequestUpdateEventPublisher.java
+++
b/ambari-server/src/main/java/org/apache/ambari/server/events/publishers/RequestUpdateEventPublisher.java
@@ -22,8 +22,10 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import org.apache.ambari.server.EagerSingleton;
import org.apache.ambari.server.controller.internal.CalculatedStatus;
import org.apache.ambari.server.events.RequestUpdateEvent;
+import org.apache.ambari.server.events.STOMPEvent;
import org.apache.ambari.server.orm.dao.ClusterDAO;
import org.apache.ambari.server.orm.dao.HostRoleCommandDAO;
import org.apache.ambari.server.orm.dao.RequestDAO;
@@ -32,9 +34,8 @@ import org.apache.ambari.server.topology.TopologyManager;
import com.google.common.eventbus.EventBus;
import com.google.inject.Inject;
-import com.google.inject.Singleton;
-@Singleton
+@EagerSingleton
public class RequestUpdateEventPublisher extends
BufferedUpdateEventPublisher<RequestUpdateEvent> {
@Inject
@@ -49,6 +50,16 @@ public class RequestUpdateEventPublisher extends
BufferedUpdateEventPublisher<Re
@Inject
private ClusterDAO clusterDAO;
+ @Inject
+ public RequestUpdateEventPublisher(STOMPUpdatePublisher
stompUpdatePublisher) {
+ super(stompUpdatePublisher);
+ }
+
+ @Override
+ public STOMPEvent.Type getType() {
+ return STOMPEvent.Type.REQUEST;
+ }
+
@Override
public void mergeBufferAndPost(List<RequestUpdateEvent> events, EventBus
m_eventBus) {
Map<Long, RequestUpdateEvent> filteredRequests = new HashMap<>();
diff --git
a/ambari-server/src/main/java/org/apache/ambari/server/events/publishers/STOMPUpdatePublisher.java
b/ambari-server/src/main/java/org/apache/ambari/server/events/publishers/STOMPUpdatePublisher.java
index 073531b..0a1fb4d 100644
---
a/ambari-server/src/main/java/org/apache/ambari/server/events/publishers/STOMPUpdatePublisher.java
+++
b/ambari-server/src/main/java/org/apache/ambari/server/events/publishers/STOMPUpdatePublisher.java
@@ -17,37 +17,29 @@
*/
package org.apache.ambari.server.events.publishers;
+import java.util.ArrayList;
+import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.ambari.server.AmbariRuntimeException;
import org.apache.ambari.server.events.DefaultMessageEmitter;
-import org.apache.ambari.server.events.HostComponentsUpdateEvent;
-import org.apache.ambari.server.events.RequestUpdateEvent;
import org.apache.ambari.server.events.STOMPEvent;
-import org.apache.ambari.server.events.ServiceUpdateEvent;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import com.google.common.eventbus.AsyncEventBus;
import com.google.common.eventbus.EventBus;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
-import com.google.inject.Inject;
import com.google.inject.Singleton;
@Singleton
public class STOMPUpdatePublisher {
+ private static final Logger LOG =
LoggerFactory.getLogger(STOMPUpdatePublisher.class);
private final EventBus agentEventBus;
private final EventBus apiEventBus;
- @Inject
- private RequestUpdateEventPublisher requestUpdateEventPublisher;
-
- @Inject
- private HostComponentUpdateEventPublisher hostComponentUpdateEventPublisher;
-
- @Inject
- private ServiceUpdateEventPublisher serviceUpdateEventPublisher;
-
private final ExecutorService threadPoolExecutorAgent =
Executors.newSingleThreadExecutor(
new ThreadFactoryBuilder().setNameFormat("stomp-agent-bus-%d").build());
private final ExecutorService threadPoolExecutorAPI =
Executors.newSingleThreadExecutor(
@@ -61,6 +53,16 @@ public class STOMPUpdatePublisher {
threadPoolExecutorAPI);
}
+ private List<BufferedUpdateEventPublisher> publishers = new ArrayList<>();
+
+ public void registerPublisher(BufferedUpdateEventPublisher publisher) {
+ if (publishers.contains(publisher)) {
+ LOG.error("Publisher for type {} is already in use",
publisher.getType());
+ } else {
+ publishers.add(publisher);
+ }
+ }
+
public void publish(STOMPEvent event) {
if
(DefaultMessageEmitter.DEFAULT_AGENT_EVENT_TYPES.contains(event.getType())) {
publishAgent(event);
@@ -73,13 +75,14 @@ public class STOMPUpdatePublisher {
}
private void publishAPI(STOMPEvent event) {
- if (event.getType().equals(STOMPEvent.Type.REQUEST)) {
- requestUpdateEventPublisher.publish((RequestUpdateEvent) event,
apiEventBus);
- } else if (event.getType().equals(STOMPEvent.Type.HOSTCOMPONENT)) {
- hostComponentUpdateEventPublisher.publish((HostComponentsUpdateEvent)
event, apiEventBus);
- } else if (event.getType().equals(STOMPEvent.Type.SERVICE)) {
- serviceUpdateEventPublisher.publish((ServiceUpdateEvent) event,
apiEventBus);
- } else {
+ boolean published = false;
+ for (BufferedUpdateEventPublisher publisher : publishers) {
+ if (publisher.getType().equals(event.getType())) {
+ publisher.publish(event, apiEventBus);
+ published = true;
+ }
+ }
+ if (!published) {
apiEventBus.post(event);
}
}
diff --git
a/ambari-server/src/main/java/org/apache/ambari/server/events/publishers/ServiceUpdateEventPublisher.java
b/ambari-server/src/main/java/org/apache/ambari/server/events/publishers/ServiceUpdateEventPublisher.java
index 6dfef43..c5802ac 100644
---
a/ambari-server/src/main/java/org/apache/ambari/server/events/publishers/ServiceUpdateEventPublisher.java
+++
b/ambari-server/src/main/java/org/apache/ambari/server/events/publishers/ServiceUpdateEventPublisher.java
@@ -23,18 +23,30 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import org.apache.ambari.server.EagerSingleton;
import
org.apache.ambari.server.controller.utilities.ServiceCalculatedStateFactory;
import
org.apache.ambari.server.controller.utilities.state.ServiceCalculatedState;
+import org.apache.ambari.server.events.STOMPEvent;
import org.apache.ambari.server.events.ServiceUpdateEvent;
import org.apache.ambari.server.state.State;
import com.google.common.eventbus.EventBus;
-import com.google.inject.Singleton;
+import com.google.inject.Inject;
-@Singleton
+@EagerSingleton
public class ServiceUpdateEventPublisher extends
BufferedUpdateEventPublisher<ServiceUpdateEvent> {
private Map<String, Map<String, State>> states = new HashMap<>();
+ @Inject
+ public ServiceUpdateEventPublisher(STOMPUpdatePublisher
stompUpdatePublisher) {
+ super(stompUpdatePublisher);
+ }
+
+
+ @Override
+ public STOMPEvent.Type getType() {
+ return STOMPEvent.Type.SERVICE;
+ }
@Override
public void mergeBufferAndPost(List<ServiceUpdateEvent> events, EventBus
eventBus) {