Revision: 18325 http://sourceforge.net/p/gate/code/18325 Author: adamfunk Date: 2014-09-12 13:45:45 +0000 (Fri, 12 Sep 2014) Log Message: ----------- Added stream-reading code
Modified Paths: -------------- gate/branches/twitter-pop-dev/plugins/Twitter/src/gate/corpora/twitter/TweetStreamIterator.java Modified: gate/branches/twitter-pop-dev/plugins/Twitter/src/gate/corpora/twitter/TweetStreamIterator.java =================================================================== --- gate/branches/twitter-pop-dev/plugins/Twitter/src/gate/corpora/twitter/TweetStreamIterator.java 2014-09-12 12:22:38 UTC (rev 18324) +++ gate/branches/twitter-pop-dev/plugins/Twitter/src/gate/corpora/twitter/TweetStreamIterator.java 2014-09-12 13:45:45 UTC (rev 18325) @@ -11,14 +11,11 @@ */ package gate.corpora.twitter; -import gate.Document; -import gate.Factory; -import gate.FeatureMap; - import java.io.IOException; import java.io.InputStream; import java.util.Iterator; import java.util.List; +import java.util.zip.GZIPInputStream; import com.fasterxml.jackson.core.JsonParseException; import com.fasterxml.jackson.core.JsonParser; @@ -28,36 +25,43 @@ import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.MappingIterator; import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ArrayNode; + public class TweetStreamIterator implements Iterator<Tweet> { // Borrowed from gcp IOConstants public static final String ID_POINTER = "/id_str"; + public static final String SEARCH_KEY = "search_metadata"; + public static final String STATUS_KEY = "statuses"; - private ObjectMapper objectMapper; private JsonParser jsonParser; private MappingIterator<JsonNode> iterator; - private boolean gzip; private List<String> contentKeys, featureKeys; protected JsonPointer idPointer; + private boolean nested; + private Iterator<JsonNode> nestedStatuses; public TweetStreamIterator(InputStream input, List<String> contentKeys, List<String> featureKeys, boolean gzip) throws JsonParseException, IOException { this.contentKeys = contentKeys; this.featureKeys = featureKeys; - this.gzip = gzip; + InputStream workingInput; + // Following borrowed from gcp JSONStreamingInputHandler + idPointer = JsonPointer.compile(ID_POINTER); + objectMapper = new ObjectMapper(); + if (gzip) { - throw new IllegalArgumentException("gzip not yet supported!"); + workingInput = new GZIPInputStream(input); } - // TODO support compression + else { + workingInput = input; + } - // Following borrowed from gcp JSONStreamingInputHandler - idPointer = JsonPointer.compile(ID_POINTER); - objectMapper = new ObjectMapper(); - jsonParser = objectMapper.getFactory().createParser(input).enable(Feature.AUTO_CLOSE_SOURCE); + jsonParser = objectMapper.getFactory().createParser(workingInput).enable(Feature.AUTO_CLOSE_SOURCE); // If the first token in the stream is the start of an array ("[") // then // assume the stream as a whole is an array of objects, one per @@ -69,44 +73,83 @@ jsonParser.clearCurrentToken(); } iterator = objectMapper.readValues(jsonParser, JsonNode.class); + this.nested = false; + this.nestedStatuses = null; } @Override public boolean hasNext() { - return iterator.hasNext(); - // should this be hasNextValue() ? + return this.iterator.hasNext() || this.nestedStatuses.hasNext(); + // should that be iterator.hasNextValue() ? } @Override public Tweet next() { + Tweet result = null; try { - // why while not if? - while(iterator.hasNextValue()) { + if (this.nested && this.nestedStatuses.hasNext()) { + result = Tweet.readTweet(this.nestedStatuses.next(), contentKeys, featureKeys); + // Clear the nested flag once the last item in the statuses + // value's list has been used, so that the next call to next() + // will drop to the else if clause. + this.nested = this.nestedStatuses.hasNext(); + } + + else if (iterator.hasNextValue()) { JsonNode json = iterator.nextValue(); - String id = json.at(idPointer).asText(); - // Is it worth testing IDs here? - return Tweet.readTweet(json, contentKeys, featureKeys); + + if (isSearchResultList(json)) { + this.nestedStatuses = getStatuses(json).iterator(); + this.nested = this.nestedStatuses.hasNext(); + // Set the nested flag according as there is anything in + // the statuses value array (it could be empty). + } + + // Test nested now: true IFF we are in a search result thingy AND + // the statuses array is non-empty. + if (this.nested) { + result = Tweet.readTweet(this.nestedStatuses.next(), contentKeys, featureKeys); + // Set the nested flag again for the next call to next() + this.nested = this.nestedStatuses.hasNext(); + } + else { + result = Tweet.readTweet(json, contentKeys, featureKeys); + } } } catch (IOException e) { e.printStackTrace(); } - return null; + return result; } + @Override public void remove() { - // TODO Auto-generated method stub - + throw new UnsupportedOperationException("The TweetStream is read-only."); } - public void close() { - // TODO + public void close() throws IOException { + iterator.close(); + jsonParser.close(); } + public static boolean isSearchResultList(JsonNode json) { + return json.has(SEARCH_KEY) && json.has(STATUS_KEY); + } + public static ArrayNode getStatuses(JsonNode json) throws IOException { + JsonNode statusList = json.get(STATUS_KEY); + if (! (statusList instanceof ArrayNode)) { + throw new IOException("Bad tweet format: value of 'statuses' is not an array!"); + } + return (ArrayNode) statusList; + } + + + } This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. ------------------------------------------------------------------------------ Want excitement? Manually upgrade your production database. When you want reliability, choose Perforce Perforce version control. Predictably reliable. http://pubads.g.doubleclick.net/gampad/clk?id=157508191&iu=/4140/ostg.clktrk _______________________________________________ GATE-cvs mailing list GATE-cvs@lists.sourceforge.net https://lists.sourceforge.net/lists/listinfo/gate-cvs