This is an automated email from the ASF dual-hosted git repository.
mpapirkovskyy pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/ambari.git
The following commit(s) were added to refs/heads/branch-2.7 by this push:
new 9719e54 AMBARI-24291. Start All Services on 100-nodes cluster timed
out after 1 hour. (#2449)
9719e54 is described below
commit 9719e54fa944f48f04fa401c9c8a9669bf94b620
Author: Myroslav Papirkovskyi <[email protected]>
AuthorDate: Fri Oct 19 15:04:10 2018 +0300
AMBARI-24291. Start All Services on 100-nodes cluster timed out after 1
hour. (#2449)
* AMBARI-24291. Start All Services on 100-nodes cluster timed out after 1
hour. (mpapirkovskyy)
* AMBARI-24291. Start All Services on 100-nodes cluster timed out after 1
hour. (mpapirkovskyy)
---
.../ambari/server/AmbariRuntimeException.java | 4 ++
.../apache/ambari/server/actionmanager/Stage.java | 5 ++-
.../controller/AmbariManagementControllerImpl.java | 1 -
.../HostComponentsUpdateListener.java | 1 -
.../listeners/requests/STOMPUpdateListener.java | 3 +-
.../listeners/services/ServiceUpdateListener.java | 2 +-
.../listeners/upgrade/UpgradeUpdateListener.java | 2 +-
.../events/publishers/STOMPUpdatePublisher.java | 52 +++++++++++++++++-----
.../metrics/system/impl/MetricsServiceImpl.java | 3 +-
.../svccomphost/ServiceComponentHostImpl.java | 20 ++++-----
10 files changed, 65 insertions(+), 28 deletions(-)
diff --git
a/ambari-server/src/main/java/org/apache/ambari/server/AmbariRuntimeException.java
b/ambari-server/src/main/java/org/apache/ambari/server/AmbariRuntimeException.java
index c6a20eb..b26f106 100644
---
a/ambari-server/src/main/java/org/apache/ambari/server/AmbariRuntimeException.java
+++
b/ambari-server/src/main/java/org/apache/ambari/server/AmbariRuntimeException.java
@@ -25,4 +25,8 @@ public class AmbariRuntimeException extends RuntimeException {
public AmbariRuntimeException(String message, Throwable cause) {
super(message, cause);
}
+
+ public AmbariRuntimeException(String message) {
+ super(message);
+ }
}
diff --git
a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/Stage.java
b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/Stage.java
index b88275a..420c04b 100644
---
a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/Stage.java
+++
b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/Stage.java
@@ -224,11 +224,12 @@ public class Stage {
void loadExecutionCommandWrappers() {
for (Map.Entry<String, Map<String, HostRoleCommand>> hostRoleCommandEntry
: hostRoleCommands.entrySet()) {
String hostname = hostRoleCommandEntry.getKey();
- commandsToSend.put(hostname, new ArrayList<>());
+ List<ExecutionCommandWrapper> wrappers = new ArrayList<>();
Map<String, HostRoleCommand> roleCommandMap =
hostRoleCommandEntry.getValue();
for (Map.Entry<String, HostRoleCommand> roleCommandEntry :
roleCommandMap.entrySet()) {
-
commandsToSend.get(hostname).add(roleCommandEntry.getValue().getExecutionCommandWrapper());
+ wrappers.add(roleCommandEntry.getValue().getExecutionCommandWrapper());
}
+ commandsToSend.put(hostname, wrappers);
}
}
diff --git
a/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariManagementControllerImpl.java
b/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariManagementControllerImpl.java
index 4927c34..a98891f 100644
---
a/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariManagementControllerImpl.java
+++
b/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariManagementControllerImpl.java
@@ -2554,7 +2554,6 @@ public class AmbariManagementControllerImpl implements
AmbariManagementControlle
}
Map<String, String> hostParams = new TreeMap<>();
- hostParams.putAll(getRcaParameters());
if (roleCommand.equals(RoleCommand.INSTALL)) {
List<ServiceOsSpecific.Package> packages =
diff --git
a/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/hostcomponents/HostComponentsUpdateListener.java
b/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/hostcomponents/HostComponentsUpdateListener.java
index feda69d..4ecb00c 100644
---
a/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/hostcomponents/HostComponentsUpdateListener.java
+++
b/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/hostcomponents/HostComponentsUpdateListener.java
@@ -49,7 +49,6 @@ public class HostComponentsUpdateListener {
public HostComponentsUpdateListener(AmbariEventPublisher
ambariEventPublisher,
STOMPUpdatePublisher
STOMPUpdatePublisher) {
ambariEventPublisher.register(this);
- STOMPUpdatePublisher.register(this);
this.STOMPUpdatePublisher = STOMPUpdatePublisher;
}
diff --git
a/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/requests/STOMPUpdateListener.java
b/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/requests/STOMPUpdateListener.java
index fde7854..8492156 100644
---
a/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/requests/STOMPUpdateListener.java
+++
b/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/requests/STOMPUpdateListener.java
@@ -41,7 +41,8 @@ public class STOMPUpdateListener {
public STOMPUpdateListener(Injector injector, Set<STOMPEvent.Type>
typesToProcess) {
STOMPUpdatePublisher STOMPUpdatePublisher =
injector.getInstance(STOMPUpdatePublisher.class);
- STOMPUpdatePublisher.register(this);
+ STOMPUpdatePublisher.registerAgent(this);
+ STOMPUpdatePublisher.registerAPI(this);
this.typesToProcess = typesToProcess == null ? Collections.emptySet() :
typesToProcess;
}
diff --git
a/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/services/ServiceUpdateListener.java
b/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/services/ServiceUpdateListener.java
index 0cdb34f..c35cc08 100644
---
a/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/services/ServiceUpdateListener.java
+++
b/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/services/ServiceUpdateListener.java
@@ -57,7 +57,7 @@ public class ServiceUpdateListener {
@Inject
public ServiceUpdateListener(STOMPUpdatePublisher STOMPUpdatePublisher,
AmbariEventPublisher ambariEventPublisher) {
- STOMPUpdatePublisher.register(this);
+ STOMPUpdatePublisher.registerAPI(this);
ambariEventPublisher.register(this);
this.STOMPUpdatePublisher = STOMPUpdatePublisher;
diff --git
a/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/upgrade/UpgradeUpdateListener.java
b/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/upgrade/UpgradeUpdateListener.java
index a39ed8a..d6dd89b 100644
---
a/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/upgrade/UpgradeUpdateListener.java
+++
b/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/upgrade/UpgradeUpdateListener.java
@@ -48,7 +48,7 @@ public class UpgradeUpdateListener {
@Inject
public UpgradeUpdateListener(STOMPUpdatePublisher STOMPUpdatePublisher,
AmbariEventPublisher ambariEventPublisher) {
- STOMPUpdatePublisher.register(this);
+ STOMPUpdatePublisher.registerAPI(this);
this.STOMPUpdatePublisher = STOMPUpdatePublisher;
}
diff --git
a/ambari-server/src/main/java/org/apache/ambari/server/events/publishers/STOMPUpdatePublisher.java
b/ambari-server/src/main/java/org/apache/ambari/server/events/publishers/STOMPUpdatePublisher.java
index 99a03d6..073531b 100644
---
a/ambari-server/src/main/java/org/apache/ambari/server/events/publishers/STOMPUpdatePublisher.java
+++
b/ambari-server/src/main/java/org/apache/ambari/server/events/publishers/STOMPUpdatePublisher.java
@@ -17,8 +17,11 @@
*/
package org.apache.ambari.server.events.publishers;
+import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import org.apache.ambari.server.AmbariRuntimeException;
+import org.apache.ambari.server.events.DefaultMessageEmitter;
import org.apache.ambari.server.events.HostComponentsUpdateEvent;
import org.apache.ambari.server.events.RequestUpdateEvent;
import org.apache.ambari.server.events.STOMPEvent;
@@ -26,13 +29,15 @@ import org.apache.ambari.server.events.ServiceUpdateEvent;
import com.google.common.eventbus.AsyncEventBus;
import com.google.common.eventbus.EventBus;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.inject.Inject;
import com.google.inject.Singleton;
@Singleton
public class STOMPUpdatePublisher {
- private final EventBus m_eventBus;
+ private final EventBus agentEventBus;
+ private final EventBus apiEventBus;
@Inject
private RequestUpdateEventPublisher requestUpdateEventPublisher;
@@ -43,24 +48,51 @@ public class STOMPUpdatePublisher {
@Inject
private ServiceUpdateEventPublisher serviceUpdateEventPublisher;
- public STOMPUpdatePublisher() {
- m_eventBus = new AsyncEventBus("ambari-update-bus",
- Executors.newSingleThreadExecutor());
+ private final ExecutorService threadPoolExecutorAgent =
Executors.newSingleThreadExecutor(
+ new ThreadFactoryBuilder().setNameFormat("stomp-agent-bus-%d").build());
+ private final ExecutorService threadPoolExecutorAPI =
Executors.newSingleThreadExecutor(
+ new ThreadFactoryBuilder().setNameFormat("stomp-api-bus-%d").build());
+
+ public STOMPUpdatePublisher() throws NoSuchFieldException,
IllegalAccessException {
+ agentEventBus = new AsyncEventBus("agent-update-bus",
+ threadPoolExecutorAgent);
+
+ apiEventBus = new AsyncEventBus("api-update-bus",
+ threadPoolExecutorAPI);
}
public void publish(STOMPEvent event) {
+ if
(DefaultMessageEmitter.DEFAULT_AGENT_EVENT_TYPES.contains(event.getType())) {
+ publishAgent(event);
+ } else if
(DefaultMessageEmitter.DEFAULT_API_EVENT_TYPES.contains(event.getType())) {
+ publishAPI(event);
+ } else {
+ // TODO need better solution
+ throw new AmbariRuntimeException("Event with type {" + event.getType() +
"} can not be published.");
+ }
+ }
+
+ private void publishAPI(STOMPEvent event) {
if (event.getType().equals(STOMPEvent.Type.REQUEST)) {
- requestUpdateEventPublisher.publish((RequestUpdateEvent) event,
m_eventBus);
+ requestUpdateEventPublisher.publish((RequestUpdateEvent) event,
apiEventBus);
} else if (event.getType().equals(STOMPEvent.Type.HOSTCOMPONENT)) {
- hostComponentUpdateEventPublisher.publish((HostComponentsUpdateEvent)
event, m_eventBus);
+ hostComponentUpdateEventPublisher.publish((HostComponentsUpdateEvent)
event, apiEventBus);
} else if (event.getType().equals(STOMPEvent.Type.SERVICE)) {
- serviceUpdateEventPublisher.publish((ServiceUpdateEvent) event,
m_eventBus);
+ serviceUpdateEventPublisher.publish((ServiceUpdateEvent) event,
apiEventBus);
} else {
- m_eventBus.post(event);
+ apiEventBus.post(event);
}
}
- public void register(Object object) {
- m_eventBus.register(object);
+ private void publishAgent(STOMPEvent event) {
+ agentEventBus.post(event);
+ }
+
+ public void registerAgent(Object object) {
+ agentEventBus.register(object);
+ }
+
+ public void registerAPI(Object object) {
+ apiEventBus.register(object);
}
}
diff --git
a/ambari-server/src/main/java/org/apache/ambari/server/metrics/system/impl/MetricsServiceImpl.java
b/ambari-server/src/main/java/org/apache/ambari/server/metrics/system/impl/MetricsServiceImpl.java
index 37a7082..03f67ad 100644
---
a/ambari-server/src/main/java/org/apache/ambari/server/metrics/system/impl/MetricsServiceImpl.java
+++
b/ambari-server/src/main/java/org/apache/ambari/server/metrics/system/impl/MetricsServiceImpl.java
@@ -116,7 +116,8 @@ public class MetricsServiceImpl implements MetricsService {
src.init(MetricsConfiguration.getSubsetConfiguration(configuration,
"source." + sourceName + "."), sink);
sources.put(sourceName, src);
if (src instanceof StompEventsMetricsSource) {
- STOMPUpdatePublisher.register(src);
+ STOMPUpdatePublisher.registerAPI(src);
+ STOMPUpdatePublisher.registerAgent(src);
}
src.start();
}
diff --git
a/ambari-server/src/main/java/org/apache/ambari/server/state/svccomphost/ServiceComponentHostImpl.java
b/ambari-server/src/main/java/org/apache/ambari/server/state/svccomphost/ServiceComponentHostImpl.java
index 8006e53..7801f0d 100644
---
a/ambari-server/src/main/java/org/apache/ambari/server/state/svccomphost/ServiceComponentHostImpl.java
+++
b/ambari-server/src/main/java/org/apache/ambari/server/state/svccomphost/ServiceComponentHostImpl.java
@@ -1280,6 +1280,16 @@ public class ServiceComponentHostImpl implements
ServiceComponentHost {
LOG.error("Could not determine stale config", e);
}
+ try {
+ Cluster cluster = clusters.getCluster(clusterName);
+ ServiceComponent serviceComponent =
cluster.getService(serviceName).getServiceComponent(serviceComponentName);
+ ServiceComponentHost sch =
serviceComponent.getServiceComponentHost(hostName);
+ String refreshConfigsCommand =
helper.getRefreshConfigsCommand(cluster,sch);
+ r.setReloadConfig(refreshConfigsCommand != null);
+ } catch (Exception e) {
+ LOG.error("Could not determine reload config flag", e);
+ }
+
return r;
}
@@ -1307,16 +1317,6 @@ public class ServiceComponentHostImpl implements
ServiceComponentHost {
r.setStaleConfig(false);
}
- try {
- Cluster cluster = clusters.getCluster(clusterName);
- ServiceComponent serviceComponent =
cluster.getService(serviceName).getServiceComponent(serviceComponentName);
- ServiceComponentHost sch =
serviceComponent.getServiceComponentHost(hostName);
- String refreshConfigsCommand =
helper.getRefreshConfigsCommand(cluster,sch);
- r.setReloadConfig(refreshConfigsCommand != null);
- } catch (Exception e) {
- LOG.error("Could not determine reload config flag", e);
- }
-
return r;
}