SPOI-1770 exclude special topics in latest topic list

Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-apex-core/commit/9a9f1538
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/tree/9a9f1538
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/diff/9a9f1538

Branch: refs/heads/devel-3
Commit: 9a9f15381607c34b56e600a8083bdd41caf6f137
Parents: 4e5c9e5
Author: David Yan <[email protected]>
Authored: Thu Jan 16 18:12:04 2014 -0800
Committer: David Yan <[email protected]>
Committed: Fri Aug 28 10:56:56 2015 -0700

----------------------------------------------------------------------
 PubSubWebSocketServlet.java | 38 ++++++++++++++++++++++----------------
 1 file changed, 22 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/9a9f1538/PubSubWebSocketServlet.java
----------------------------------------------------------------------
diff --git a/PubSubWebSocketServlet.java b/PubSubWebSocketServlet.java
index 21bfd73..11bf98e 100644
--- a/PubSubWebSocketServlet.java
+++ b/PubSubWebSocketServlet.java
@@ -4,23 +4,26 @@
  */
 package com.datatorrent.gateway;
 
-import com.datatorrent.api.util.JacksonObjectMapperProvider;
-import com.datatorrent.api.util.PubSubMessage;
-import com.datatorrent.api.util.PubSubMessage.PubSubMessageType;
-import com.datatorrent.api.util.PubSubMessageCodec;
-import com.datatorrent.stram.util.LRUCache;
-
 import java.io.IOException;
 import java.util.*;
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.BlockingQueue;
+
 import javax.servlet.http.HttpServletRequest;
+
 import org.codehaus.jackson.map.ObjectMapper;
 import org.eclipse.jetty.websocket.WebSocket;
 import org.eclipse.jetty.websocket.WebSocketServlet;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.datatorrent.lib.util.JacksonObjectMapperProvider;
+import com.datatorrent.lib.util.PubSubMessage;
+import com.datatorrent.lib.util.PubSubMessage.PubSubMessageType;
+import com.datatorrent.lib.util.PubSubMessageCodec;
+
+import com.datatorrent.stram.util.LRUCache;
+
 /**
  * <p>PubSubWebSocketServlet class.</p>
  *
@@ -37,8 +40,9 @@ public class PubSubWebSocketServlet extends WebSocketServlet
   private PubSubMessageCodec<Object> codec = new 
PubSubMessageCodec<Object>(mapper);
   private InternalMessageHandler internalMessageHandler = null;
   private static final int latestTopicCount = 100;
-  private LRUCache<String, Long> latestTopics = new LRUCache<String, 
Long>(latestTopicCount, false)
+  private final LRUCache<String, Long> latestTopics = new LRUCache<String, 
Long>(latestTopicCount, false)
   {
+    private static final long serialVersionUID = 20140131L;
     @Override
     public Long put(String key, Long value)
     {
@@ -100,7 +104,7 @@ public class PubSubWebSocketServlet extends WebSocketServlet
       topicSet = socketToTopicMap.get(webSocket);
     }
     topicSet.add(topic);
-    publish(topic + ".numSubscribers", new Integer(getNumSubscribers(topic)));
+    publish(topic + "." + PubSubMessage.NUM_SUBSCRIBERS_SUFFIX, 
getNumSubscribers(topic));
   }
 
   private synchronized void unsubscribe(PubSubWebSocket webSocket, String 
topic)
@@ -121,7 +125,7 @@ public class PubSubWebSocketServlet extends WebSocketServlet
     if (topicSet.isEmpty()) {
       socketToTopicMap.remove(webSocket);
     }
-    publish(topic + ".numSubscribers", new Integer(getNumSubscribers(topic)));
+    publish(topic + "." + PubSubMessage.NUM_SUBSCRIBERS_SUFFIX, 
getNumSubscribers(topic));
   }
 
   private synchronized void unsubscribeAll(PubSubWebSocket webSocket)
@@ -134,7 +138,7 @@ public class PubSubWebSocketServlet extends WebSocketServlet
         if (wsSet.isEmpty()) {
           topicToSocketMap.remove(topic);
         }
-        publish(topic + ".numSubscribers", new 
Integer(getNumSubscribers(topic)));
+        publish(topic + "." + PubSubMessage.NUM_SUBSCRIBERS_SUFFIX, 
getNumSubscribers(topic));
       }
       socketToTopicMap.remove(webSocket);
     }
@@ -163,7 +167,9 @@ public class PubSubWebSocketServlet extends WebSocketServlet
 
   public synchronized void publish(String topic, Object data)
   {
-    latestTopics.put(topic, System.currentTimeMillis());
+    if (!topic.endsWith("." + PubSubMessage.NUM_SUBSCRIBERS_SUFFIX) && 
!topic.startsWith(PubSubMessage.INTERNAL_TOPIC_PREFIX + ".")) {
+      latestTopics.put(topic, System.currentTimeMillis());
+    }
     HashSet<PubSubWebSocket> wsSet = topicToSocketMap.get(topic);
     if (wsSet != null) {
       Iterator<PubSubWebSocket> it = wsSet.iterator();
@@ -183,8 +189,8 @@ public class PubSubWebSocketServlet extends WebSocketServlet
   private class PubSubWebSocket implements WebSocket.OnTextMessage
   {
     private Connection connection;
-    private BlockingQueue<String> messageQueue = new 
ArrayBlockingQueue<String>(32);
-    private Thread messengerThread = new Thread(new Messenger());
+    private final BlockingQueue<String> messageQueue = new 
ArrayBlockingQueue<String>(32);
+    private final Thread messengerThread = new Thread(new Messenger());
 
     @Override
     public void onMessage(String message)
@@ -222,13 +228,13 @@ public class PubSubWebSocketServlet extends 
WebSocketServlet
             }
             else if (type.equals(PubSubMessageType.SUBSCRIBE_NUM_SUBSCRIBERS)) 
{
               if (topic != null) {
-                subscribe(this, topic + ".numSubscribers");
-                sendData(this, topic + ".numSubscribers", new 
Integer(getNumSubscribers(topic)));
+                subscribe(this, topic + "." + 
PubSubMessage.NUM_SUBSCRIBERS_SUFFIX);
+                sendData(this, topic + "." + 
PubSubMessage.NUM_SUBSCRIBERS_SUFFIX, getNumSubscribers(topic));
               }
             }
             else if 
(type.equals(PubSubMessageType.UNSUBSCRIBE_NUM_SUBSCRIBERS)) {
               if (topic != null) {
-                unsubscribe(this, topic + ".numSubscribers");
+                unsubscribe(this, topic + "." + 
PubSubMessage.NUM_SUBSCRIBERS_SUFFIX);
               }
             }
             else if (type.equals(PubSubMessageType.GET_LATEST_TOPICS)) {

Reply via email to