AMBARI-22738. Setup heartbeat for api endpoint. (mpapirkovskyy)
Project: http://git-wip-us.apache.org/repos/asf/ambari/repo Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/d1a11a31 Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/d1a11a31 Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/d1a11a31 Branch: refs/heads/branch-3.0-perf Commit: d1a11a312d852964821bf4cb66682b6bd3bc5ffb Parents: 9bda5e2 Author: Myroslav Papirkovskyi <[email protected]> Authored: Wed Nov 15 14:29:06 2017 +0200 Committer: Myroslav Papirkovskyi <[email protected]> Committed: Fri Jan 5 20:12:28 2018 +0200 ---------------------------------------------------------------------- .../server/configuration/Configuration.java | 14 +++++++++++++ .../configuration/spring/ApiStompConfig.java | 22 +++++++++++++++++++- 2 files changed, 35 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ambari/blob/d1a11a31/ambari-server/src/main/java/org/apache/ambari/server/configuration/Configuration.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/configuration/Configuration.java b/ambari-server/src/main/java/org/apache/ambari/server/configuration/Configuration.java index 4dffcaa..5384da4 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/configuration/Configuration.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/configuration/Configuration.java @@ -2176,6 +2176,13 @@ public class Configuration { "agents.reports.thread.pool.size", 10); /** + * Server to API STOMP endpoint heartbeat interval in milliseconds. + */ + @Markdown(description = "Server to API STOMP endpoint heartbeat interval in milliseconds.") + public static final ConfigurationProperty<Integer> API_HEARTBEAT_INTERVAL = new ConfigurationProperty<>( + "api.heartbeat.interval", 10000); + + /** * The maximum number of threads used to extract Ambari Views when Ambari * Server is starting up. */ @@ -4905,6 +4912,13 @@ public class Configuration { } /** + * @return server to API STOMP endpoint heartbeat interval in milliseconds. + */ + public int getAPIHeartbeatInterval() { + return Integer.parseInt(getProperty(API_HEARTBEAT_INTERVAL)); + } + + /** * @return max thread pool size for agents, default 25 */ public int getAgentThreadPoolSize() { http://git-wip-us.apache.org/repos/asf/ambari/blob/d1a11a31/ambari-server/src/main/java/org/apache/ambari/server/configuration/spring/ApiStompConfig.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/configuration/spring/ApiStompConfig.java b/ambari-server/src/main/java/org/apache/ambari/server/configuration/spring/ApiStompConfig.java index e968b11..38a3673 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/configuration/spring/ApiStompConfig.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/configuration/spring/ApiStompConfig.java @@ -23,6 +23,8 @@ import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.ComponentScan; import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Import; +import org.springframework.messaging.simp.config.MessageBrokerRegistry; +import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler; import org.springframework.web.socket.config.annotation.AbstractWebSocketMessageBrokerConfigurer; import org.springframework.web.socket.config.annotation.EnableWebSocketMessageBroker; import org.springframework.web.socket.config.annotation.StompEndpointRegistry; @@ -34,6 +36,13 @@ import com.google.inject.Injector; @ComponentScan(basePackageClasses = {TestController.class}) @Import(RootStompConfig.class) public class ApiStompConfig extends AbstractWebSocketMessageBrokerConfigurer { + private final String HEARTBEAT_THREAD_NAME = "ws-heartbeat-thread-"; + private final int HEARTBEAT_POOL_SIZE = 1; + private final org.apache.ambari.server.configuration.Configuration configuration; + + public ApiStompConfig(Injector injector) { + configuration = injector.getInstance(org.apache.ambari.server.configuration.Configuration.class); + } @Bean public StateUpdateListener requestStatusListener(Injector injector) { @@ -44,6 +53,17 @@ public class ApiStompConfig extends AbstractWebSocketMessageBrokerConfigurer { public void registerStompEndpoints(StompEndpointRegistry registry) { registry.addEndpoint("/v1") .setAllowedOrigins("*") - .withSockJS(); + .withSockJS().setHeartbeatTime(configuration.getAPIHeartbeatInterval()); + } + + @Override + public void configureMessageBroker(MessageBrokerRegistry registry) { + ThreadPoolTaskScheduler taskScheduler = new ThreadPoolTaskScheduler(); + taskScheduler.setPoolSize(HEARTBEAT_POOL_SIZE); + taskScheduler.setThreadNamePrefix(HEARTBEAT_THREAD_NAME); + taskScheduler.initialize(); + + registry.enableSimpleBroker("/").setTaskScheduler(taskScheduler) + .setHeartbeatValue(new long[]{configuration.getAPIHeartbeatInterval(), configuration.getAPIHeartbeatInterval()}); } }
