This is an automated email from the ASF dual-hosted git repository. rdhabalia pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push: new c03943a Add REST api to check host-status for adding/removing from vip (#1241) c03943a is described below commit c03943ae8bdd597b2619766a96233e4a35f49344 Author: Rajan Dhabalia <rdhaba...@apache.org> AuthorDate: Mon Feb 19 17:44:41 2018 -0800 Add REST api to check host-status for adding/removing from vip (#1241) * Add REST api to check host-status for adding/removing from vip * move vipStatus to common place * address comment --- conf/proxy.conf | 4 ++++ conf/websocket.conf | 4 ++++ pom.xml | 6 +++++ pulsar-broker-common/pom.xml | 6 +++++ .../pulsar/common/configuration}/VipStatus.java | 27 ++++++++++++++-------- .../org/apache/pulsar/broker/PulsarService.java | 25 +++++++++++++------- .../org/apache/pulsar/broker/web/WebService.java | 13 +++++++---- .../pulsar/proxy/server/ProxyConfiguration.java | 12 ++++++++++ .../pulsar/proxy/server/ProxyServiceStarter.java | 11 +++++---- .../org/apache/pulsar/proxy/server/WebServer.java | 19 +++++++++++++++ .../pulsar/websocket/service/ProxyServer.java | 6 ++--- .../service/WebSocketProxyConfiguration.java | 12 ++++++++++ .../websocket/service/WebSocketServiceStarter.java | 6 ++++- 13 files changed, 120 insertions(+), 31 deletions(-) diff --git a/conf/proxy.conf b/conf/proxy.conf index d7c5afc..0939452 100644 --- a/conf/proxy.conf +++ b/conf/proxy.conf @@ -38,6 +38,10 @@ webServicePort=8080 # Port to use to server HTTPS request webServicePortTls=8443 +# Path for the file used to determine the rotation status for the proxy-instance when responding +# to service discovery health checks +statusFilePath= + ### --- Authentication --- ### # Enable authentication diff --git a/conf/websocket.conf b/conf/websocket.conf index 404bdef..399efed 100644 --- a/conf/websocket.conf +++ b/conf/websocket.conf @@ -36,6 +36,10 @@ webServicePort=8080 # Port to use to server HTTPS request webServicePortTls=8443 +# Path for the file used to determine the rotation status for the proxy-instance when responding +# to service discovery health checks +statusFilePath= + # Hostname or IP address the service binds on, default is 0.0.0.0. bindAddress=0.0.0.0 diff --git a/pom.xml b/pom.xml index 192769f..eb97e0b 100644 --- a/pom.xml +++ b/pom.xml @@ -374,6 +374,12 @@ flexible messaging model and an intuitive client API.</description> <artifactId>jersey-container-servlet</artifactId> <version>2.23.2</version> </dependency> + + <dependency> + <groupId>javax.ws.rs</groupId> + <artifactId>javax.ws.rs-api</artifactId> + <version>2.0.1</version> + </dependency> <dependency> <groupId>org.glassfish.jersey.media</groupId> diff --git a/pulsar-broker-common/pom.xml b/pulsar-broker-common/pom.xml index 666fec8..75edadb 100644 --- a/pulsar-broker-common/pom.xml +++ b/pulsar-broker-common/pom.xml @@ -48,5 +48,11 @@ <groupId>javax.servlet</groupId> <artifactId>javax.servlet-api</artifactId> </dependency> + + <dependency> + <groupId>javax.ws.rs</groupId> + <artifactId>javax.ws.rs-api</artifactId> + </dependency> + </dependencies> </project> diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/VipStatus.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/common/configuration/VipStatus.java similarity index 64% rename from pulsar-broker/src/main/java/org/apache/pulsar/broker/web/VipStatus.java rename to pulsar-broker-common/src/main/java/org/apache/pulsar/common/configuration/VipStatus.java index 1e2499c..5dfba94 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/VipStatus.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/common/configuration/VipStatus.java @@ -16,10 +16,11 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.pulsar.broker.web; +package org.apache.pulsar.common.configuration; import java.io.File; +import javax.servlet.ServletContext; import javax.ws.rs.GET; import javax.ws.rs.Path; import javax.ws.rs.WebApplicationException; @@ -27,22 +28,28 @@ import javax.ws.rs.core.Context; import javax.ws.rs.core.Response.Status; /** - * Web resource used by the VIP service to check to availability of the Pulsar broker instance. + * Web resource used by the VIP service to check to availability of the service instance. */ @Path("/status.html") -@NoSwaggerDocumentation -public class VipStatus extends PulsarWebResource { +public class VipStatus { + + public static final String ATTRIBUTE_STATUS_FILE_PATH = "statusFilePath"; + + @Context + protected ServletContext servletContext; @GET @Context public String checkStatus() { - String statusFilePath = pulsar().getStatusFilePath(); - File statusFile = new File(statusFilePath); - if (statusFile.exists()) { - return "OK"; - } else { - throw new WebApplicationException(Status.NOT_FOUND); + String statusFilePath = (String) servletContext.getAttribute(ATTRIBUTE_STATUS_FILE_PATH); + if (statusFilePath != null) { + File statusFile = new File(statusFilePath); + if (statusFile.exists() && statusFile.isFile()) { + return "OK"; + } } + throw new WebApplicationException(Status.NOT_FOUND); } + } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java index dee8b5c..bec354b 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java @@ -23,6 +23,7 @@ import static org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES; import java.io.IOException; import java.net.URL; import java.util.List; +import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; @@ -55,6 +56,7 @@ import org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsServlet; import org.apache.pulsar.broker.web.WebService; import org.apache.pulsar.client.admin.PulsarAdmin; import org.apache.pulsar.common.configuration.PulsarConfigurationLoader; +import org.apache.pulsar.common.configuration.VipStatus; import org.apache.pulsar.common.naming.DestinationName; import org.apache.pulsar.common.naming.NamespaceBundle; import org.apache.pulsar.common.naming.NamespaceName; @@ -77,6 +79,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.google.common.collect.Lists; +import com.google.common.collect.Maps; import io.netty.util.concurrent.DefaultThreadFactory; @@ -275,13 +278,19 @@ public class PulsarService implements AutoCloseable { brokerService.start(); this.webService = new WebService(this); - this.webService.addRestResources("/", "org.apache.pulsar.broker.web", false); - this.webService.addRestResources("/admin", "org.apache.pulsar.broker.admin.v1", true); - this.webService.addRestResources("/admin/v2", "org.apache.pulsar.broker.admin.v2", true); - this.webService.addRestResources("/lookup", "org.apache.pulsar.broker.lookup", true); + Map<String, Object> attributeMap = Maps.newHashMap(); + attributeMap.put(WebService.ATTRIBUTE_PULSAR_NAME, this); + Map<String, Object> vipAttributeMap = Maps.newHashMap(); + vipAttributeMap.put(VipStatus.ATTRIBUTE_STATUS_FILE_PATH, this.config.getStatusFilePath()); + this.webService.addRestResources("/", VipStatus.class.getPackage().getName(), false, vipAttributeMap); + this.webService.addRestResources("/", "org.apache.pulsar.broker.web", false, attributeMap); + this.webService.addRestResources("/admin", "org.apache.pulsar.broker.admin.v1", true, attributeMap); + this.webService.addRestResources("/admin/v2", "org.apache.pulsar.broker.admin.v2", true, attributeMap); + this.webService.addRestResources("/lookup", "org.apache.pulsar.broker.lookup", true, attributeMap); this.webService.addServlet("/metrics", - new ServletHolder(new PrometheusMetricsServlet(this, config.exposeTopicLevelMetricsInPrometheus())), false); + new ServletHolder(new PrometheusMetricsServlet(this, config.exposeTopicLevelMetricsInPrometheus())), + false, attributeMap); if (config.isWebSocketServiceEnabled()) { // Use local broker address to avoid different IP address when using a VIP for service discovery @@ -290,11 +299,11 @@ public class PulsarService implements AutoCloseable { config); this.webSocketService.start(); this.webService.addServlet(WebSocketProducerServlet.SERVLET_PATH, - new ServletHolder(new WebSocketProducerServlet(webSocketService)), true); + new ServletHolder(new WebSocketProducerServlet(webSocketService)), true, attributeMap); this.webService.addServlet(WebSocketConsumerServlet.SERVLET_PATH, - new ServletHolder(new WebSocketConsumerServlet(webSocketService)), true); + new ServletHolder(new WebSocketConsumerServlet(webSocketService)), true, attributeMap); this.webService.addServlet(WebSocketReaderServlet.SERVLET_PATH, - new ServletHolder(new WebSocketReaderServlet(webSocketService)), true); + new ServletHolder(new WebSocketReaderServlet(webSocketService)), true, attributeMap); } if (LOG.isDebugEnabled()) { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/WebService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/WebService.java index cb5584f..20ff44b 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/WebService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/WebService.java @@ -22,6 +22,7 @@ import java.security.GeneralSecurityException; import java.util.ArrayList; import java.util.EnumSet; import java.util.List; +import java.util.Map; import java.util.TimeZone; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -113,7 +114,7 @@ public class WebService implements AutoCloseable { server.setConnectors(connectors.toArray(new ServerConnector[connectors.size()])); } - public void addRestResources(String basePath, String javaPackages, boolean requiresAuthentication) { + public void addRestResources(String basePath, String javaPackages, boolean requiresAuthentication, Map<String,Object> attributeMap) { JacksonJaxbJsonProvider provider = new JacksonJaxbJsonProvider(); provider.setMapper(ObjectMapperFactory.create()); ResourceConfig config = new ResourceConfig(); @@ -121,14 +122,18 @@ public class WebService implements AutoCloseable { config.register(provider); ServletHolder servletHolder = new ServletHolder(new ServletContainer(config)); servletHolder.setAsyncSupported(true); - addServlet(basePath, servletHolder, requiresAuthentication); + addServlet(basePath, servletHolder, requiresAuthentication, attributeMap); } - public void addServlet(String path, ServletHolder servletHolder, boolean requiresAuthentication) { + public void addServlet(String path, ServletHolder servletHolder, boolean requiresAuthentication, Map<String,Object> attributeMap) { ServletContextHandler context = new ServletContextHandler(ServletContextHandler.SESSIONS); context.setContextPath(path); context.addServlet(servletHolder, MATCH_ALL); - context.setAttribute(WebService.ATTRIBUTE_PULSAR_NAME, pulsar); + if (attributeMap != null) { + attributeMap.forEach((key, value) -> { + context.setAttribute(key, value); + }); + } if (requiresAuthentication && pulsar.getConfiguration().isAuthenticationEnabled()) { FilterHolder filter = new FilterHolder(new AuthenticationFilter(pulsar)); diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java index 71022ea..f947305 100644 --- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java +++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java @@ -49,6 +49,10 @@ public class ProxyConfiguration implements PulsarConfiguration { private int webServicePort = 8080; // Port to use to server HTTPS request private int webServicePortTls = 8443; + + // Path for the file used to determine the rotation status for the broker + // when responding to service discovery health checks + private String statusFilePath; // Role names that are treated as "super-user", meaning they will be able to // do all admin operations and publish/consume from all topics @@ -176,6 +180,14 @@ public class ProxyConfiguration implements PulsarConfiguration { this.webServicePortTls = webServicePortTls; } + public String getStatusFilePath() { + return statusFilePath; + } + + public void setStatusFilePath(String statusFilePath) { + this.statusFilePath = statusFilePath; + } + public boolean isTlsEnabledInProxy() { return tlsEnabledInProxy; } diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyServiceStarter.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyServiceStarter.java index d9fc4af..93067f8 100644 --- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyServiceStarter.java +++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyServiceStarter.java @@ -35,6 +35,7 @@ import com.beust.jcommander.Parameter; import io.prometheus.client.exporter.MetricsServlet; import io.prometheus.client.hotspot.DefaultExports; +import org.apache.pulsar.common.configuration.VipStatus; /** * Starts an instance of the Pulsar ProxyService @@ -96,8 +97,8 @@ public class ProxyServiceStarter { java.security.Security.addProvider(new org.bouncycastle.jce.provider.BouncyCastleProvider()); - // create broker service - ProxyService discoveryService = new ProxyService(config); + // create proxy service + ProxyService proxyService = new ProxyService(config); // create a web-service final WebServer server = new WebServer(config); @@ -105,7 +106,7 @@ public class ProxyServiceStarter { @Override public void run() { try { - discoveryService.close(); + proxyService.close(); server.stop(); } catch (Exception e) { log.warn("server couldn't stop gracefully {}", e.getMessage(), e); @@ -113,11 +114,13 @@ public class ProxyServiceStarter { } }); - discoveryService.start(); + proxyService.start(); // Setup metrics DefaultExports.initialize(); server.addServlet("/metrics", new ServletHolder(MetricsServlet.class)); + server.addRestResources("/", VipStatus.class.getPackage().getName(), + VipStatus.ATTRIBUTE_STATUS_FILE_PATH, config.getStatusFilePath()); // start web-service server.start(); diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/WebServer.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/WebServer.java index a809a3d..edc7188 100644 --- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/WebServer.java +++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/WebServer.java @@ -27,6 +27,7 @@ import java.util.concurrent.Executors; import javax.net.ssl.SSLContext; +import org.apache.pulsar.common.util.ObjectMapperFactory; import org.apache.pulsar.common.util.SecurityUtility; import org.eclipse.jetty.server.Handler; import org.eclipse.jetty.server.Server; @@ -40,9 +41,12 @@ import org.eclipse.jetty.servlet.ServletContextHandler; import org.eclipse.jetty.servlet.ServletHolder; import org.eclipse.jetty.util.ssl.SslContextFactory; import org.eclipse.jetty.util.thread.ExecutorThreadPool; +import org.glassfish.jersey.server.ResourceConfig; +import org.glassfish.jersey.servlet.ServletContainer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.fasterxml.jackson.jaxrs.json.JacksonJaxbJsonProvider; import com.google.common.collect.Lists; import io.netty.util.concurrent.DefaultThreadFactory; @@ -99,6 +103,21 @@ public class WebServer { handlers.add(context); } + public void addRestResources(String basePath, String javaPackages, String attribute, Object attributeValue) { + JacksonJaxbJsonProvider provider = new JacksonJaxbJsonProvider(); + provider.setMapper(ObjectMapperFactory.create()); + ResourceConfig config = new ResourceConfig(); + config.packages("jersey.config.server.provider.packages", javaPackages); + config.register(provider); + ServletHolder servletHolder = new ServletHolder(new ServletContainer(config)); + servletHolder.setAsyncSupported(true); + ServletContextHandler context = new ServletContextHandler(ServletContextHandler.SESSIONS); + context.setContextPath(basePath); + context.addServlet(servletHolder, "/*"); + context.setAttribute(attribute, attributeValue); + handlers.add(context); + } + public int getExternalServicePort() { return externalServicePort; } diff --git a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/service/ProxyServer.java b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/service/ProxyServer.java index ff1bfe5..bf8846e 100644 --- a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/service/ProxyServer.java +++ b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/service/ProxyServer.java @@ -18,8 +18,6 @@ */ package org.apache.pulsar.websocket.service; -import static org.apache.pulsar.websocket.admin.WebSocketWebResource.ATTRIBUTE_PROXY_SERVICE_NAME; - import java.net.MalformedURLException; import java.security.GeneralSecurityException; import java.util.ArrayList; @@ -113,7 +111,7 @@ public class ProxyServer { handlers.add(context); } - public void addRestResources(String basePath, String javaPackages, WebSocketService service) { + public void addRestResources(String basePath, String javaPackages, String attribute, Object attributeValue) { JacksonJaxbJsonProvider provider = new JacksonJaxbJsonProvider(); provider.setMapper(ObjectMapperFactory.create()); ResourceConfig config = new ResourceConfig(); @@ -124,7 +122,7 @@ public class ProxyServer { ServletContextHandler context = new ServletContextHandler(ServletContextHandler.SESSIONS); context.setContextPath(basePath); context.addServlet(servletHolder, "/*"); - context.setAttribute(ATTRIBUTE_PROXY_SERVICE_NAME, service); + context.setAttribute(attribute, attributeValue); handlers.add(context); } diff --git a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/service/WebSocketProxyConfiguration.java b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/service/WebSocketProxyConfiguration.java index 5cea3df..8e266b8 100644 --- a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/service/WebSocketProxyConfiguration.java +++ b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/service/WebSocketProxyConfiguration.java @@ -45,6 +45,10 @@ public class WebSocketProxyConfiguration implements PulsarConfiguration { private String serviceUrlTls; private String brokerServiceUrl; private String brokerServiceUrlTls; + + // Path for the file used to determine the rotation status for the broker + // when responding to service discovery health checks + private String statusFilePath; // Global Zookeeper quorum connection string private String globalZookeeperServers; @@ -143,6 +147,14 @@ public class WebSocketProxyConfiguration implements PulsarConfiguration { this.brokerServiceUrlTls = brokerServiceUrlTls; } + public String getStatusFilePath() { + return statusFilePath; + } + + public void setStatusFilePath(String statusFilePath) { + this.statusFilePath = statusFilePath; + } + public String getGlobalZookeeperServers() { return globalZookeeperServers; } diff --git a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/service/WebSocketServiceStarter.java b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/service/WebSocketServiceStarter.java index 690ad6a..4672044 100644 --- a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/service/WebSocketServiceStarter.java +++ b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/service/WebSocketServiceStarter.java @@ -22,6 +22,7 @@ import static com.google.common.base.Preconditions.checkArgument; import static org.apache.pulsar.websocket.admin.WebSocketWebResource.ADMIN_PATH; import org.apache.pulsar.common.configuration.PulsarConfigurationLoader; +import org.apache.pulsar.common.configuration.VipStatus; import org.apache.pulsar.websocket.WebSocketConsumerServlet; import org.apache.pulsar.websocket.WebSocketProducerServlet; import org.apache.pulsar.websocket.WebSocketReaderServlet; @@ -29,6 +30,7 @@ import org.apache.pulsar.websocket.WebSocketService; import org.apache.pulsar.websocket.admin.WebSocketProxyStats; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import static org.apache.pulsar.websocket.admin.WebSocketWebResource.ATTRIBUTE_PROXY_SERVICE_NAME; public class WebSocketServiceStarter { @@ -53,7 +55,9 @@ public class WebSocketServiceStarter { proxyServer.addWebSocketServlet(WebSocketProducerServlet.SERVLET_PATH, new WebSocketProducerServlet(service)); proxyServer.addWebSocketServlet(WebSocketConsumerServlet.SERVLET_PATH, new WebSocketConsumerServlet(service)); proxyServer.addWebSocketServlet(WebSocketReaderServlet.SERVLET_PATH, new WebSocketReaderServlet(service)); - proxyServer.addRestResources(ADMIN_PATH, WebSocketProxyStats.class.getPackage().getName(), service); + proxyServer.addRestResources(ADMIN_PATH, WebSocketProxyStats.class.getPackage().getName(), ATTRIBUTE_PROXY_SERVICE_NAME, service); + proxyServer.addRestResources("/", VipStatus.class.getPackage().getName(), + VipStatus.ATTRIBUTE_STATUS_FILE_PATH, service.getConfig().getStatusFilePath()); proxyServer.start(); service.start(); } -- To stop receiving notification emails like this one, please contact rdhaba...@apache.org.