SENTRY-2027: Create mechanism of delivering commands via WebUI (Vadim Spector, reviewed by Sergio Pena and Arjun Mishra)
Project: http://git-wip-us.apache.org/repos/asf/sentry/repo Commit: http://git-wip-us.apache.org/repos/asf/sentry/commit/64476a74 Tree: http://git-wip-us.apache.org/repos/asf/sentry/tree/64476a74 Diff: http://git-wip-us.apache.org/repos/asf/sentry/diff/64476a74 Branch: refs/heads/akolb-cli Commit: 64476a7478c8f9aec1ba0ad4388d15b0bb6db352 Parents: c6723a8 Author: Vadim Spector <[email protected]> Authored: Thu Nov 2 11:25:14 2017 -0700 Committer: Vadim Spector <[email protected]> Committed: Thu Nov 2 11:25:14 2017 -0700 ---------------------------------------------------------------------- .../apache/sentry/core/common/utils/PubSub.java | 178 ++++++++++++++++++ .../db/service/thrift/PubSubServlet.java | 128 +++++++++++++ .../db/service/thrift/SentryWebServer.java | 5 + .../sentry/service/thrift/ServiceConstants.java | 3 + .../service/thrift/TestSentryServerPubSub.java | 181 +++++++++++++++++++ .../thrift/SentryServiceIntegrationBase.java | 1 + 6 files changed, 496 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/sentry/blob/64476a74/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/utils/PubSub.java ---------------------------------------------------------------------- diff --git a/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/utils/PubSub.java b/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/utils/PubSub.java new file mode 100644 index 0000000..eeb8f80 --- /dev/null +++ b/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/utils/PubSub.java @@ -0,0 +1,178 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sentry.core.common.utils; + +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.NoSuchElementException; +import java.util.Set; + +import com.google.common.base.Preconditions; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * This class is a simple publish-subscribe implementation for internal + * communication between Sentry components. It's a singleton class. + * <p> + * For the initial set of use cases, publish events are expected to be + * extremely rare, so no the data structures have been selected with no + * consideration for high concurrency. + */ +public final class PubSub { + + private static final Logger LOGGER = LoggerFactory.getLogger(PubSub.class); + + private final Map<Topic,Set<Subscriber>> subscriberMap = new HashMap<>(); + + private static PubSub instance; + + /** + * Subscriber callback interface. + * The same subscriber can subscribe to multiple topics, + * so callback API includes both topic and message. + */ + public interface Subscriber { + void onMessage(Topic topic, String message); + } + + /** + * Enumerated topics one can subscribe to. + * To be expanded as needed. + */ + public enum Topic { + HDFS_SYNC_HMS("hdfs-sync-hms"), // upcoming feature, triggering HDFS sync between HMS and Sentry + HDFS_SYNC_NN("hdfs-sync-nn"); // upcoming feature, triggering HDFS sync between Sentry and NameNode + + private final String name; + + private static final Map<String,Topic> map = new HashMap<>(); + static { + for (Topic t : Topic.values()) { + map.put(t.name, t); + } + } + + public static Topic fromString(String name) { + Preconditions.checkNotNull("Enum name cannot be null", name); + name = name.trim().toLowerCase(); + if (map.containsKey(name)) { + return map.get(name); + } + throw new NoSuchElementException(name + " not found"); + } + + private Topic(String name) { + this.name = name.toLowerCase(); + } + + public String getName() { + return name; + } + } + + /** + * Public factory method to guarantee singleton + */ + public static synchronized PubSub getInstance() { + if (instance != null) { + LOGGER.info(instance + " requested"); + } else { + instance = new PubSub(); + LOGGER.info(instance + " created"); + } + return instance; + } + + // declare private to prevent multiple class instantiation + private PubSub() { + } + + /** + * Publish message on given topic. Message is optional. + */ + public synchronized void publish(Topic topic, String message) { + Preconditions.checkNotNull(topic, "Topic cannot be null"); + + Set<Subscriber> subscribers = subscriberMap.get(topic); + if (subscribers == null) { + throw new IllegalArgumentException("cannot publish to unknown topic " + topic + + ", existing topics " + subscriberMap.keySet()); + } + + for (Subscriber subscriber : subscribers) { + // Faulire of one subscriber to process message delivery should not affect + // message delivery to other subscribers, therefore using try-catch. + try { + subscriber.onMessage(topic, message); + } catch (Exception e) { + LOGGER.error("Topic " + topic + ", message " + message + ", delivery error", e); + } + } + LOGGER.info("Topic " + topic + ", message " + message + ": " + subscribers.size() + " subscribers called"); + } + + /** + * Subscribe to given topic. + */ + public synchronized void subscribe(Topic topic, Subscriber subscriber) { + Preconditions.checkNotNull(topic, "Topic cannot be null"); + Preconditions.checkNotNull(subscriber, "Topic %s: Subscriber cannot be null", topic); + + Set<Subscriber> subscribers = subscriberMap.get(topic); + if (subscribers == null) { + LOGGER.info("new topic " + topic); + subscriberMap.put(topic, subscribers = new HashSet<Subscriber>()); + } + subscribers.add(subscriber); + LOGGER.info("Topic " + topic + ", added subscriber " + subscriber + ", total topic subscribers: " + subscribers.size()); + } + + /** + * Unsubscribe from given topic. If the last subscriber, remove the topic. + */ + public synchronized void unsubscribe(Topic topic, Subscriber subscriber) { + Preconditions.checkNotNull(topic, "Topic cannot be null"); + Preconditions.checkNotNull(subscriber, "Topic %s: Subscriber cannot be null", topic); + + Set<Subscriber> subscribers = subscriberMap.get(topic); + if (subscribers == null) { + throw new IllegalArgumentException("cannot unsubscribe from unknown topic " + topic); + } + if (!subscribers.remove(subscriber)) { + throw new IllegalArgumentException("cannot unsubscribe from topic " + topic + ", unknown subscriber"); + } + LOGGER.info("Topic " + topic + ", unsubscribing subscriber " + subscriber + ", total topic subscribers: " + subscribers.size()); + if (subscribers.isEmpty()) { + subscriberMap.remove(topic); + } + } + + /** + * Get all existing topics. + */ + public synchronized Set<Topic> getTopics() { + return subscriberMap.keySet(); + } + + @Override + public String toString() { + return getClass().getSimpleName() + ":" + Integer.toHexString(hashCode()); + } +} http://git-wip-us.apache.org/repos/asf/sentry/blob/64476a74/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/thrift/PubSubServlet.java ---------------------------------------------------------------------- diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/thrift/PubSubServlet.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/thrift/PubSubServlet.java new file mode 100644 index 0000000..6756d91 --- /dev/null +++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/thrift/PubSubServlet.java @@ -0,0 +1,128 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sentry.provider.db.service.thrift; + +import org.apache.sentry.core.common.utils.PubSub; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.servlet.ServletException; +import javax.servlet.ServletRequest; +import javax.servlet.http.HttpServlet; +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; +import java.io.IOException; +import java.io.PrintWriter; + +import static org.apache.commons.lang.StringEscapeUtils.escapeHtml; + +/** + * This servlet facilitates sending {topic, message } tuples to Servlet components + * subscribed to specific topics. + * <p> + * It uses publish-subscribe mechanism implemented by PubSub class. + * The form generated by this servlet consists of the following elements: + * <p> + * a) Topic: pull-down menu of existing topics, i.e. the topics registered with + * PubSub by calling PubSub.subscribe() API. This prevents entering invalid topic. + * <p> + * b) Message: text field for entering a message + * <p> + * c) Submit: button to submit (topic, message) tuple + * <p> + * d) Status: text area printing status of the request or help information. + */ +public class PubSubServlet extends HttpServlet { + + private static final Logger LOGGER = LoggerFactory.getLogger(PubSubServlet.class); + + private static final String FORM_GET = + "<!DOCTYPE html>" + + "<html>" + + "<body>" + + "<form>" + + "<br><br><b>Topic:</b><br><br>" + + "<select name='topic'/>%s</select>" + + "<br><br><b>Message:</b><br><br>" + + "<input type='text' size='50' name='message'/>" + + "<br><br>" + + "<input type='submit' value='Submit'/>" + + "</form>" + + "<br><br><b>Status:</b><br><br>" + + "<textarea rows='4' cols='50'>%s</textarea>" + + "</body>" + + "</html>"; + + /** + * Return parameter on servlet request for the given name + * + * @param request: Servlet request + * @param name: Name of parameter in servlet request + * @return Parameter in servlet request for the given name, return null if can't find parameter. + */ + private static String getParameter(ServletRequest request, String name) { + String s = request.getParameter(name); + if (s == null) { + return null; + } + s = s.trim(); + return s.isEmpty() ? null : s; + } + + /** + * Parse the topic and message values and submit them via PubSub.submit() API. + * Reject request for unknown topic, i.e. topic no one is subscribed to. + */ + @Override + public void doGet(HttpServletRequest request, HttpServletResponse response) + throws ServletException, IOException { + String topic = getParameter(request, "topic"); + String message = getParameter(request, "message"); + response.setContentType("text/html;charset=utf-8"); + response.setStatus(HttpServletResponse.SC_OK); + PrintWriter out = response.getWriter(); + + String msg = "Topic is required, Message is optional.\nValid topics: " + PubSub.getInstance().getTopics(); + if (topic != null) { + LOGGER.info("Submitting topic " + topic + ", message " + message); + try { + PubSub.getInstance().publish(PubSub.Topic.fromString(topic), message); + msg = "Submitted topic " + topic + ", message " + message; + } catch (Exception e) { + msg = "Failed to submit topic " + topic + ", message " + message + " - " + e.getMessage(); + LOGGER.error(msg); + response.sendError(HttpServletResponse.SC_BAD_REQUEST, msg); + return; + } + } + + StringBuilder topics = new StringBuilder(); + for (PubSub.Topic t : PubSub.getInstance().getTopics()) { + topics.append("<option>").append(t.getName()).append("</option>"); + } + + String output = String.format(FORM_GET, topics.toString(), escapeHtml(msg)); + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("HTML Page: " + output); + } + out.write(output); + out.close(); + response.flushBuffer(); + } +} http://git-wip-us.apache.org/repos/asf/sentry/blob/64476a74/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/thrift/SentryWebServer.java ---------------------------------------------------------------------- diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/thrift/SentryWebServer.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/thrift/SentryWebServer.java index 66a2f9e..95b87ad 100644 --- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/thrift/SentryWebServer.java +++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/thrift/SentryWebServer.java @@ -122,6 +122,11 @@ public class SentryWebServer { servletContextHandler.addServlet(new ServletHolder(LogLevelServlet.class), "/admin/logLevel"); + if (conf.getBoolean(ServerConfig.SENTRY_WEB_PUBSUB_SERVLET_ENABLED, + ServerConfig.SENTRY_WEB_PUBSUB_SERVLET_ENABLED_DEFAULT)) { + servletContextHandler.addServlet(new ServletHolder(PubSubServlet.class), "/admin/publishMessage"); + } + ResourceHandler resourceHandler = new ResourceHandler(); resourceHandler.setDirectoriesListed(true); URL url = this.getClass().getResource(RESOURCE_DIR); http://git-wip-us.apache.org/repos/asf/sentry/blob/64476a74/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/ServiceConstants.java ---------------------------------------------------------------------- diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/ServiceConstants.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/ServiceConstants.java index 8dd5497..0a1e0ae 100644 --- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/ServiceConstants.java +++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/ServiceConstants.java @@ -216,6 +216,9 @@ public class ServiceConstants { public static final String SENTRY_WEB_ADMIN_SERVLET_ENABLED = "sentry.web.admin.servlet.enabled"; public static final boolean SENTRY_WEB_ADMIN_SERVLET_ENABLED_DEFAULT = false; + public static final String SENTRY_WEB_PUBSUB_SERVLET_ENABLED = "sentry.web.pubsub.servlet.enabled"; + public static final boolean SENTRY_WEB_PUBSUB_SERVLET_ENABLED_DEFAULT = false; + // max message size for thrift messages public static final String SENTRY_POLICY_SERVER_THRIFT_MAX_MESSAGE_SIZE = "sentry.policy.server.thrift.max.message.size"; public static final long SENTRY_POLICY_SERVER_THRIFT_MAX_MESSAGE_SIZE_DEFAULT = 100 * 1024 * 1024; http://git-wip-us.apache.org/repos/asf/sentry/blob/64476a74/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/provider/db/service/thrift/TestSentryServerPubSub.java ---------------------------------------------------------------------- diff --git a/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/provider/db/service/thrift/TestSentryServerPubSub.java b/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/provider/db/service/thrift/TestSentryServerPubSub.java new file mode 100644 index 0000000..451d7a1 --- /dev/null +++ b/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/provider/db/service/thrift/TestSentryServerPubSub.java @@ -0,0 +1,181 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sentry.provider.db.service.thrift; + +import org.apache.sentry.core.common.utils.PubSub; +import org.apache.sentry.core.common.utils.PubSub.Topic; +import org.apache.sentry.service.thrift.SentryServiceIntegrationBase; + +import org.junit.*; + +import java.net.HttpURLConnection; +import java.net.URL; + +public class TestSentryServerPubSub extends SentryServiceIntegrationBase { + + private static final Topic[] topics = Topic.values(); + private static final String[] messages = { "message1", "message2", "message3", "" }; + + private static volatile String REQUEST_URL; + + private final TestSubscriber testSubscriber = new TestSubscriber(); + + private static final class TestSubscriber implements PubSub.Subscriber { + private volatile Topic topic; + private volatile String message; + private volatile int count; + @Override + public void onMessage(Topic topic, String message) { + this.topic = topic; + this.message = message; + this.count++; + } + } + + @BeforeClass + public static void setup() throws Exception { + webServerEnabled = true; + webSecurity = false; + SentryServiceIntegrationBase.setup(); + REQUEST_URL= "http://" + SERVER_HOST + ":" + webServerPort + "/admin/publishMessage?topic=%s&message=%s"; + } + + @Override + @Before + public void before() throws Exception { + + // Subscribe to all defined topics. + // After each successfull HTTP-GET, testSubscriber.onMessage() + // will be called and "topic" and "message" fields will be + // set according to HTTP-GET parameters. + testSubscriber.count = 0; + for (Topic topic : topics) { + PubSub.getInstance().subscribe(topic, testSubscriber); + } + Assert.assertEquals("Unexpected number of registered topics", topics.length, PubSub.getInstance().getTopics().size()); + } + + @Override + @After + public void after() { + // unsubscribe + for (Topic topic : topics) { + PubSub.getInstance().unsubscribe(topic, testSubscriber); + } + testSubscriber.count = 0; + Assert.assertTrue("Topics should have been removed after unsubscribe()", PubSub.getInstance().getTopics().isEmpty()); + } + + /** + * Successfully publish notifications + * @throws Exception + */ + @Test + public void testPubSub() throws Exception { + int count = 0; + for (Topic topic : topics) { + for (String message : messages) { + URL url = new URL(String.format(REQUEST_URL, topic.getName(), message)); + HttpURLConnection conn = null; + try { + conn = (HttpURLConnection) url.openConnection(); + Assert.assertEquals("Unexpected response code", HttpURLConnection.HTTP_OK, conn.getResponseCode()); + } finally { + safeClose(conn); + } + Assert.assertEquals("Unexpected topic", topic, testSubscriber.topic); + if (message.isEmpty()) { + Assert.assertEquals("Unexpected message", null, testSubscriber.message); + } else { + Assert.assertEquals("Unexpected message", message, testSubscriber.message); + } + Assert.assertEquals("Unexpected number of PubSub.onMessage() callbacks", ++count, testSubscriber.count); + } + } + } + + /** + * Submit empty topic. It's ok, generates form page. + * @throws Exception + */ + @Test + public void testPubSubEmptyTopic() throws Exception { + URL url = new URL(String.format(REQUEST_URL, "", "message")); + HttpURLConnection conn = null; + try { + conn = (HttpURLConnection) url.openConnection(); + Assert.assertEquals("Unexpected response code", HttpURLConnection.HTTP_OK, conn.getResponseCode()); + } finally { + safeClose(conn); + } + Assert.assertEquals("Unexpected number of PubSub.onMessage() callbacks", 0, testSubscriber.count); + } + + /** + * Submit invalid topic + * @throws Exception + */ + @Test + public void testPubSubInvalidTopic() throws Exception { + String[] invalid_topics = { "invalid_topic_1", "invalid_topic_2", "invalid_topic_3" }; + for (String topic : invalid_topics) { + URL url = new URL(String.format(REQUEST_URL, topic, "message")); + HttpURLConnection conn = null; + try { + conn = (HttpURLConnection) url.openConnection(); + Assert.assertEquals("Unexpected response code", HttpURLConnection.HTTP_BAD_REQUEST, conn.getResponseCode()); + } finally { + safeClose(conn); + } + Assert.assertEquals("Unexpected number of PubSub.onMessage() callbacks", 0, testSubscriber.count); + } + } + + /** + * Submit topic that has no subscribers. + * @throws Exception + */ + @Test + public void testPubSubNonSubscribedTopic() throws Exception { + // At this point all valid Topic values have been subscribed to + // in before() method. + // Unsubscribe from one topic and then try publishing to it. + PubSub.getInstance().unsubscribe(Topic.HDFS_SYNC_HMS, testSubscriber); + Assert.assertEquals("Unexpected number of registered topics", topics.length-1, PubSub.getInstance().getTopics().size()); + + URL url = new URL(String.format(REQUEST_URL, Topic.HDFS_SYNC_HMS.getName(), "message")); + HttpURLConnection conn = null; + try { + conn = (HttpURLConnection) url.openConnection(); + Assert.assertEquals("Unexpected response code", HttpURLConnection.HTTP_BAD_REQUEST, conn.getResponseCode()); + } finally { + safeClose(conn); + } + // re-subscribe, not to upset after() method which expects all topics to be subscribed to + PubSub.getInstance().subscribe(Topic.HDFS_SYNC_HMS, testSubscriber); + } + + private static void safeClose(HttpURLConnection conn) { + if (conn != null) { + try { + conn.disconnect(); + } catch (Exception ignore) { + } + } + } +} http://git-wip-us.apache.org/repos/asf/sentry/blob/64476a74/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/service/thrift/SentryServiceIntegrationBase.java ---------------------------------------------------------------------- diff --git a/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/service/thrift/SentryServiceIntegrationBase.java b/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/service/thrift/SentryServiceIntegrationBase.java index 7d9b3ba..65f1e30 100644 --- a/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/service/thrift/SentryServiceIntegrationBase.java +++ b/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/service/thrift/SentryServiceIntegrationBase.java @@ -144,6 +144,7 @@ public abstract class SentryServiceIntegrationBase extends SentryMiniKdcTestcase if (webServerEnabled) { conf.set(ServerConfig.SENTRY_WEB_ENABLE, "true"); conf.set(ServerConfig.SENTRY_WEB_PORT, String.valueOf(webServerPort)); + conf.set(ServerConfig.SENTRY_WEB_PUBSUB_SERVLET_ENABLED, "true"); if (webSecurity) { httpKeytab = new File(kdcWorkDir, "http.keytab"); kdc.createPrincipal(httpKeytab, HTTP_PRINCIPAL);
