SPOI-2695 #resolve renamed simple authentication to password authentication
Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/commit/26c05205 Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/tree/26c05205 Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/diff/26c05205 Branch: refs/heads/devel-3 Commit: 26c0520552b6d6100d9382ee310c175a44380681 Parents: 55844f8 Author: David Yan <[email protected]> Authored: Wed Aug 13 16:13:46 2014 -0700 Committer: David Yan <[email protected]> Committed: Fri Aug 28 10:56:57 2015 -0700 ---------------------------------------------------------------------- PubSubWebSocketServlet.java | 108 +++++++++++++++++++++++++++++++++------ 1 file changed, 91 insertions(+), 17 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/26c05205/PubSubWebSocketServlet.java ---------------------------------------------------------------------- diff --git a/PubSubWebSocketServlet.java b/PubSubWebSocketServlet.java index 21b48f7..ba1ac9c 100644 --- a/PubSubWebSocketServlet.java +++ b/PubSubWebSocketServlet.java @@ -4,13 +4,16 @@ */ package com.datatorrent.gateway; +import com.datatorrent.gateway.security.AuthDatabase; import com.datatorrent.gateway.security.AuthenticationException; import java.io.IOException; import java.util.*; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; +import javax.servlet.ServletException; import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; import org.codehaus.jackson.map.ObjectMapper; import org.eclipse.jetty.websocket.WebSocket; @@ -18,17 +21,14 @@ import org.eclipse.jetty.websocket.WebSocketServlet; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.datatorrent.gateway.security.DTPrincipal; import com.datatorrent.lib.util.JacksonObjectMapperProvider; import com.datatorrent.lib.util.PubSubMessage; import com.datatorrent.lib.util.PubSubMessage.PubSubMessageType; import com.datatorrent.lib.util.PubSubMessageCodec; - import com.datatorrent.stram.util.LRUCache; -import javax.servlet.ServletException; import javax.servlet.http.Cookie; -import javax.servlet.http.HttpServletResponse; -import javax.ws.rs.WebApplicationException; -import javax.ws.rs.core.Response.Status; + /** * <p>PubSubWebSocketServlet class.</p> @@ -47,6 +47,9 @@ public class PubSubWebSocketServlet extends WebSocketServlet private InternalMessageHandler internalMessageHandler = null; private static final int latestTopicCount = 100; private final DTGateway gateway; + private static final String AUTH_ATTRIBUTE = "com.datatorrent.auth.principal"; + private SubscribeFilter subscribeFilter; + private SendFilter sendFilter; private final LRUCache<String, Long> latestTopics = new LRUCache<String, Long>(latestTopicCount, false) { private static final long serialVersionUID = 20140131L; @@ -60,6 +63,45 @@ public class PubSubWebSocketServlet extends WebSocketServlet }; + public interface SubscribeFilter + { + + /** + * Returns whether or not the principal is allowed to subscribe to this topic + * + * @param gateway + * @param principal + * @param topic + * @return + */ + boolean filter(DTGateway gateway, DTPrincipal principal, String topic); + } + + public interface SendFilter + { + + /** + * Returns the data it should be sent given the principal + * + * @param gateway + * @param principal + * @param topic + * @param data + * @return the data it should send to the websocket + */ + Object filter(DTGateway gateway, DTPrincipal principal, String topic, Object data); + } + + public void registerSubscribeFilter(SubscribeFilter filter) + { + subscribeFilter = filter; + } + + public void registerSendFilter(SendFilter filter) + { + sendFilter = filter; + } + public interface InternalMessageHandler { void onMessage(String topic, Object data); @@ -79,36 +121,50 @@ public class PubSubWebSocketServlet extends WebSocketServlet @Override protected void service(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException { - if ("simple".equals(gateway.getWebAuthType())) { + if (DTGateway.WEB_AUTH_TYPE_PASSWORD.equals(gateway.getWebAuthType())) { Cookie[] cookies = request.getCookies(); if (cookies != null) { for (Cookie cookie : cookies) { if ("session".equals(cookie.getName())) { try { - gateway.getAuthDatabase().authenticateSession(cookie.getValue()); + AuthDatabase auth = gateway.getAuthDatabase(); + DTPrincipal principal = auth.authenticateSession(cookie.getValue()); + request.setAttribute(AUTH_ATTRIBUTE, principal); } catch (AuthenticationException ex) { - throw new WebApplicationException(ex, Status.FORBIDDEN); + /* commenting this out to allow anonymous publish from stram + throw new WebApplicationException(ex, Status.FORBIDDEN); + */ } - super.service(request, response); + //super.service(request, response); } } } - throw new WebApplicationException(Status.FORBIDDEN); - } - else { - super.service(request, response); + /* commenting this out to allow anonymous publish from stram + throw new WebApplicationException(Status.UNAUTHORIZED); + */ } + + super.service(request, response); } @Override - public WebSocket doWebSocketConnect(HttpServletRequest hsr, String protocol) + public WebSocket doWebSocketConnect(HttpServletRequest request, String protocol) { - return new PubSubWebSocket(); + DTPrincipal principal = (DTPrincipal)request.getAttribute(AUTH_ATTRIBUTE); + return new PubSubWebSocket(principal); } private synchronized void subscribe(PubSubWebSocket webSocket, String topic) { + if (subscribeFilter != null && !subscribeFilter.filter(gateway, webSocket.getPrincipal(), topic)) { + LOG.warn("Subscribe filter returns false for topic {}, user {}. Ignoring subscribe request", topic, webSocket.getPrincipal()); + return; + } + else { + LOG.info("Subscribe is allowed for topic {}, user {}", topic, webSocket.getPrincipal()); + } + HashSet<PubSubWebSocket> wsSet; if (!topicToSocketMap.containsKey(topic)) { wsSet = new HashSet<PubSubWebSocket>(); @@ -185,7 +241,7 @@ public class PubSubWebSocketServlet extends WebSocketServlet pubSubMessage.setType(PubSubMessageType.DATA); pubSubMessage.setTopic(topic); pubSubMessage.setData(data); - LOG.debug("Sending data of {} to subscriber...", topic); + LOG.debug("Sending data {} to subscriber...", topic); webSocket.sendMessage(codec.formatMessage(pubSubMessage)); } @@ -200,9 +256,16 @@ public class PubSubWebSocketServlet extends WebSocketServlet while (it.hasNext()) { PubSubWebSocket socket = it.next(); try { - sendData(socket, topic, data); + if (sendFilter != null) { + Object filteredData = sendFilter.filter(gateway, socket.getPrincipal(), topic, data); + sendData(socket, topic, filteredData); + } + else { + sendData(socket, topic, data); + } } catch (Exception ex) { + LOG.error("Cannot send message", ex); it.remove(); disconnect(socket); } @@ -215,6 +278,17 @@ public class PubSubWebSocketServlet extends WebSocketServlet private Connection connection; private final BlockingQueue<String> messageQueue = new ArrayBlockingQueue<String>(32); private final Thread messengerThread = new Thread(new Messenger()); + private final DTPrincipal principal; + + public PubSubWebSocket(DTPrincipal principal) + { + this.principal = principal; + } + + public DTPrincipal getPrincipal() + { + return principal; + } @Override public void onMessage(String message)
