Revision: 18325
          http://sourceforge.net/p/gate/code/18325
Author:   adamfunk
Date:     2014-09-12 13:45:45 +0000 (Fri, 12 Sep 2014)
Log Message:
-----------
Added stream-reading code

Modified Paths:
--------------
    
gate/branches/twitter-pop-dev/plugins/Twitter/src/gate/corpora/twitter/TweetStreamIterator.java

Modified: 
gate/branches/twitter-pop-dev/plugins/Twitter/src/gate/corpora/twitter/TweetStreamIterator.java
===================================================================
--- 
gate/branches/twitter-pop-dev/plugins/Twitter/src/gate/corpora/twitter/TweetStreamIterator.java
     2014-09-12 12:22:38 UTC (rev 18324)
+++ 
gate/branches/twitter-pop-dev/plugins/Twitter/src/gate/corpora/twitter/TweetStreamIterator.java
     2014-09-12 13:45:45 UTC (rev 18325)
@@ -11,14 +11,11 @@
  */
 package gate.corpora.twitter;
 
-import gate.Document;
-import gate.Factory;
-import gate.FeatureMap;
-
 import java.io.IOException;
 import java.io.InputStream;
 import java.util.Iterator;
 import java.util.List;
+import java.util.zip.GZIPInputStream;
 
 import com.fasterxml.jackson.core.JsonParseException;
 import com.fasterxml.jackson.core.JsonParser;
@@ -28,36 +25,43 @@
 import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.MappingIterator;
 import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ArrayNode;
 
+
 public class TweetStreamIterator implements Iterator<Tweet> {
 
   // Borrowed from gcp IOConstants
   public static final String ID_POINTER = "/id_str";
+  public static final String SEARCH_KEY = "search_metadata";
+  public static final String STATUS_KEY = "statuses";
   
-
   private ObjectMapper objectMapper;
   private JsonParser jsonParser;
   private MappingIterator<JsonNode> iterator;
-  private boolean gzip;
   private List<String> contentKeys, featureKeys;
   protected JsonPointer idPointer;
+  private boolean nested;
+  private Iterator<JsonNode> nestedStatuses;
 
 
   public TweetStreamIterator(InputStream input, List<String> contentKeys, 
           List<String> featureKeys, boolean gzip) throws JsonParseException, 
IOException {
     this.contentKeys = contentKeys;
     this.featureKeys = featureKeys;
-    this.gzip = gzip;
+    InputStream workingInput;
     
+    // Following borrowed from gcp JSONStreamingInputHandler
+    idPointer = JsonPointer.compile(ID_POINTER);
+    objectMapper = new ObjectMapper();
+
     if (gzip) {
-      throw new IllegalArgumentException("gzip not yet supported!");
+      workingInput = new GZIPInputStream(input);
     }
-    // TODO support compression
+    else {
+      workingInput = input;
+    }
     
-    // Following borrowed from gcp JSONStreamingInputHandler
-    idPointer = JsonPointer.compile(ID_POINTER);
-    objectMapper = new ObjectMapper();
-    jsonParser = 
objectMapper.getFactory().createParser(input).enable(Feature.AUTO_CLOSE_SOURCE);
+    jsonParser = 
objectMapper.getFactory().createParser(workingInput).enable(Feature.AUTO_CLOSE_SOURCE);
     // If the first token in the stream is the start of an array ("[")
     // then
     // assume the stream as a whole is an array of objects, one per
@@ -69,44 +73,83 @@
       jsonParser.clearCurrentToken();
     }
     iterator = objectMapper.readValues(jsonParser, JsonNode.class);
+    this.nested = false;
+    this.nestedStatuses = null;
   }
 
   
   @Override
   public boolean hasNext() {
-    return iterator.hasNext();
-    // should this be hasNextValue() ?
+    return this.iterator.hasNext() || this.nestedStatuses.hasNext();
+    // should that be iterator.hasNextValue() ?
   }
 
   @Override
   public Tweet next() {
+    Tweet result = null;
     try {
-      // why while not if?
-      while(iterator.hasNextValue()) {
+      if (this.nested && this.nestedStatuses.hasNext()) {
+        result = Tweet.readTweet(this.nestedStatuses.next(), contentKeys, 
featureKeys);
+        // Clear the nested flag once the last item in the statuses
+        // value's list has been used, so that the next call to next()
+        // will drop to the else if clause.
+        this.nested = this.nestedStatuses.hasNext();
+      }
+      
+      else if (iterator.hasNextValue()) {
         JsonNode json = iterator.nextValue();
-        String id = json.at(idPointer).asText();
-        // Is it worth testing IDs here?
-        return Tweet.readTweet(json, contentKeys, featureKeys);
+        
+        if (isSearchResultList(json)) {
+          this.nestedStatuses = getStatuses(json).iterator();
+          this.nested = this.nestedStatuses.hasNext();
+          // Set the nested flag according as there is anything in 
+          // the statuses value array (it could be empty).
+        }
+        
+        // Test nested now: true IFF we are in a search result thingy AND
+        // the statuses array is non-empty.
+        if (this.nested) {
+          result = Tweet.readTweet(this.nestedStatuses.next(), contentKeys, 
featureKeys);
+          // Set the nested flag again for the next call to next()
+          this.nested = this.nestedStatuses.hasNext();
+        }
+        else {
+          result = Tweet.readTweet(json, contentKeys, featureKeys);
+        }
       }
     }
     catch (IOException e) {
       e.printStackTrace();
     }
-    return null;
+    return result;
   }
 
+  
   @Override
   public void remove() {
-    // TODO Auto-generated method stub
-    
+    throw new UnsupportedOperationException("The TweetStream is read-only.");
   }
   
   
-  public void close() {
-    // TODO
+  public void close() throws IOException {
+    iterator.close();
+    jsonParser.close();
   }
   
   
+  public static boolean isSearchResultList(JsonNode json) {
+    return json.has(SEARCH_KEY) && json.has(STATUS_KEY);
+  }
   
   
+  public static ArrayNode getStatuses(JsonNode json) throws IOException {
+    JsonNode statusList = json.get(STATUS_KEY);
+    if (! (statusList instanceof ArrayNode)) {
+      throw new IOException("Bad tweet format: value of 'statuses' is not an 
array!");
+    }
+    return (ArrayNode) statusList;    
+  }
+  
+  
+  
 }

This was sent by the SourceForge.net collaborative development platform, the 
world's largest Open Source development site.


------------------------------------------------------------------------------
Want excitement?
Manually upgrade your production database.
When you want reliability, choose Perforce
Perforce version control. Predictably reliable.
http://pubads.g.doubleclick.net/gampad/clk?id=157508191&iu=/4140/ostg.clktrk
_______________________________________________
GATE-cvs mailing list
GATE-cvs@lists.sourceforge.net
https://lists.sourceforge.net/lists/listinfo/gate-cvs

Reply via email to