SPOI-2695 #resolve renamed simple authentication to password authentication


Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-apex-core/commit/26c05205
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/tree/26c05205
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/diff/26c05205

Branch: refs/heads/devel-3
Commit: 26c0520552b6d6100d9382ee310c175a44380681
Parents: 55844f8
Author: David Yan <[email protected]>
Authored: Wed Aug 13 16:13:46 2014 -0700
Committer: David Yan <[email protected]>
Committed: Fri Aug 28 10:56:57 2015 -0700

----------------------------------------------------------------------
 PubSubWebSocketServlet.java | 108 +++++++++++++++++++++++++++++++++------
 1 file changed, 91 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/26c05205/PubSubWebSocketServlet.java
----------------------------------------------------------------------
diff --git a/PubSubWebSocketServlet.java b/PubSubWebSocketServlet.java
index 21b48f7..ba1ac9c 100644
--- a/PubSubWebSocketServlet.java
+++ b/PubSubWebSocketServlet.java
@@ -4,13 +4,16 @@
  */
 package com.datatorrent.gateway;
 
+import com.datatorrent.gateway.security.AuthDatabase;
 import com.datatorrent.gateway.security.AuthenticationException;
 import java.io.IOException;
 import java.util.*;
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.BlockingQueue;
 
+import javax.servlet.ServletException;
 import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
 
 import org.codehaus.jackson.map.ObjectMapper;
 import org.eclipse.jetty.websocket.WebSocket;
@@ -18,17 +21,14 @@ import org.eclipse.jetty.websocket.WebSocketServlet;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.datatorrent.gateway.security.DTPrincipal;
 import com.datatorrent.lib.util.JacksonObjectMapperProvider;
 import com.datatorrent.lib.util.PubSubMessage;
 import com.datatorrent.lib.util.PubSubMessage.PubSubMessageType;
 import com.datatorrent.lib.util.PubSubMessageCodec;
-
 import com.datatorrent.stram.util.LRUCache;
-import javax.servlet.ServletException;
 import javax.servlet.http.Cookie;
-import javax.servlet.http.HttpServletResponse;
-import javax.ws.rs.WebApplicationException;
-import javax.ws.rs.core.Response.Status;
+
 
 /**
  * <p>PubSubWebSocketServlet class.</p>
@@ -47,6 +47,9 @@ public class PubSubWebSocketServlet extends WebSocketServlet
   private InternalMessageHandler internalMessageHandler = null;
   private static final int latestTopicCount = 100;
   private final DTGateway gateway;
+  private static final String AUTH_ATTRIBUTE = 
"com.datatorrent.auth.principal";
+  private SubscribeFilter subscribeFilter;
+  private SendFilter sendFilter;
   private final LRUCache<String, Long> latestTopics = new LRUCache<String, 
Long>(latestTopicCount, false)
   {
     private static final long serialVersionUID = 20140131L;
@@ -60,6 +63,45 @@ public class PubSubWebSocketServlet extends WebSocketServlet
 
   };
 
+  public interface SubscribeFilter
+  {
+
+    /**
+     * Returns whether or not the principal is allowed to subscribe to this 
topic
+     *
+     * @param gateway
+     * @param principal
+     * @param topic
+     * @return
+     */
+    boolean filter(DTGateway gateway, DTPrincipal principal, String topic);
+  }
+
+  public interface SendFilter
+  {
+
+    /**
+     * Returns the data it should be sent given the principal
+     *
+     * @param gateway
+     * @param principal
+     * @param topic
+     * @param data
+     * @return the data it should send to the websocket
+     */
+    Object filter(DTGateway gateway, DTPrincipal principal, String topic, 
Object data);
+  }
+
+  public void registerSubscribeFilter(SubscribeFilter filter)
+  {
+    subscribeFilter = filter;
+  }
+
+  public void registerSendFilter(SendFilter filter)
+  {
+    sendFilter = filter;
+  }
+
   public interface InternalMessageHandler
   {
     void onMessage(String topic, Object data);
@@ -79,36 +121,50 @@ public class PubSubWebSocketServlet extends 
WebSocketServlet
   @Override
   protected void service(HttpServletRequest request, HttpServletResponse 
response) throws ServletException, IOException
   {
-    if ("simple".equals(gateway.getWebAuthType())) {
+    if (DTGateway.WEB_AUTH_TYPE_PASSWORD.equals(gateway.getWebAuthType())) {
       Cookie[] cookies = request.getCookies();
       if (cookies != null) {
         for (Cookie cookie : cookies) {
           if ("session".equals(cookie.getName())) {
             try {
-              gateway.getAuthDatabase().authenticateSession(cookie.getValue());
+              AuthDatabase auth = gateway.getAuthDatabase();
+              DTPrincipal principal = 
auth.authenticateSession(cookie.getValue());
+              request.setAttribute(AUTH_ATTRIBUTE, principal);
             }
             catch (AuthenticationException ex) {
-              throw new WebApplicationException(ex, Status.FORBIDDEN);
+              /* commenting this out to allow anonymous publish from stram
+               throw new WebApplicationException(ex, Status.FORBIDDEN);
+               */
             }
-            super.service(request, response);
+            //super.service(request, response);
           }
         }
       }
-      throw new WebApplicationException(Status.FORBIDDEN);
-    }
-    else {
-      super.service(request, response);
+      /* commenting this out to allow anonymous publish from stram
+       throw new WebApplicationException(Status.UNAUTHORIZED);
+       */
     }
+     
+    super.service(request, response);
   }
 
   @Override
-  public WebSocket doWebSocketConnect(HttpServletRequest hsr, String protocol)
+  public WebSocket doWebSocketConnect(HttpServletRequest request, String 
protocol)
   {
-    return new PubSubWebSocket();
+    DTPrincipal principal = (DTPrincipal)request.getAttribute(AUTH_ATTRIBUTE);
+    return new PubSubWebSocket(principal);
   }
 
   private synchronized void subscribe(PubSubWebSocket webSocket, String topic)
   {
+    if (subscribeFilter != null && !subscribeFilter.filter(gateway, 
webSocket.getPrincipal(), topic)) {
+      LOG.warn("Subscribe filter returns false for topic {}, user {}. Ignoring 
subscribe request", topic, webSocket.getPrincipal());
+      return;
+    }
+    else {
+      LOG.info("Subscribe is allowed for topic {}, user {}", topic, 
webSocket.getPrincipal());
+    }
+
     HashSet<PubSubWebSocket> wsSet;
     if (!topicToSocketMap.containsKey(topic)) {
       wsSet = new HashSet<PubSubWebSocket>();
@@ -185,7 +241,7 @@ public class PubSubWebSocketServlet extends WebSocketServlet
     pubSubMessage.setType(PubSubMessageType.DATA);
     pubSubMessage.setTopic(topic);
     pubSubMessage.setData(data);
-    LOG.debug("Sending data of {} to subscriber...", topic);
+    LOG.debug("Sending data {} to subscriber...", topic);
     webSocket.sendMessage(codec.formatMessage(pubSubMessage));
   }
 
@@ -200,9 +256,16 @@ public class PubSubWebSocketServlet extends 
WebSocketServlet
       while (it.hasNext()) {
         PubSubWebSocket socket = it.next();
         try {
-          sendData(socket, topic, data);
+          if (sendFilter != null) {
+            Object filteredData = sendFilter.filter(gateway, 
socket.getPrincipal(), topic, data);
+            sendData(socket, topic, filteredData);
+          }
+          else {
+            sendData(socket, topic, data);
+          }
         }
         catch (Exception ex) {
+          LOG.error("Cannot send message", ex);
           it.remove();
           disconnect(socket);
         }
@@ -215,6 +278,17 @@ public class PubSubWebSocketServlet extends 
WebSocketServlet
     private Connection connection;
     private final BlockingQueue<String> messageQueue = new 
ArrayBlockingQueue<String>(32);
     private final Thread messengerThread = new Thread(new Messenger());
+    private final DTPrincipal principal;
+
+    public PubSubWebSocket(DTPrincipal principal)
+    {
+      this.principal = principal;
+    }
+
+    public DTPrincipal getPrincipal()
+    {
+      return principal;
+    }
 
     @Override
     public void onMessage(String message)

Reply via email to