Author: sblackmon
Date: Thu Feb 27 00:13:52 2014
New Revision: 1572361
URL: http://svn.apache.org/r1572361
Log:
some refactoring for dataflow
Added:
incubator/streams/branches/STREAMS-26/streams-contrib/streams-persist-console/src/main/java/org/apache/streams/console/ConsolePersistReader.java
Modified:
incubator/streams/branches/STREAMS-26/streams-contrib/streams-persist-console/src/main/java/org/apache/streams/console/ConsolePersistWriter.java
incubator/streams/branches/STREAMS-26/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProvider.java
Added:
incubator/streams/branches/STREAMS-26/streams-contrib/streams-persist-console/src/main/java/org/apache/streams/console/ConsolePersistReader.java
URL:
http://svn.apache.org/viewvc/incubator/streams/branches/STREAMS-26/streams-contrib/streams-persist-console/src/main/java/org/apache/streams/console/ConsolePersistReader.java?rev=1572361&view=auto
==============================================================================
---
incubator/streams/branches/STREAMS-26/streams-contrib/streams-persist-console/src/main/java/org/apache/streams/console/ConsolePersistReader.java
(added)
+++
incubator/streams/branches/STREAMS-26/streams-contrib/streams-persist-console/src/main/java/org/apache/streams/console/ConsolePersistReader.java
Thu Feb 27 00:13:52 2014
@@ -0,0 +1,80 @@
+package org.apache.streams.console;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Preconditions;
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.core.StreamsPersistReader;
+import org.apache.streams.core.StreamsPersistWriter;
+import org.apache.streams.core.StreamsResultSet;
+import org.joda.time.DateTime;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedInputStream;
+import java.io.InputStream;
+import java.math.BigInteger;
+import java.util.Queue;
+import java.util.Scanner;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+public class ConsolePersistReader implements StreamsPersistReader {
+
+ private final static String STREAMS_ID = "ConsolePersistReader";
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(ConsolePersistReader.class);
+
+ protected volatile Queue<StreamsDatum> persistQueue;
+
+ private ObjectMapper mapper = new ObjectMapper();
+
+ public ConsolePersistReader() {
+ this.persistQueue = new ConcurrentLinkedQueue<StreamsDatum>();
+ }
+
+ public ConsolePersistReader(Queue<StreamsDatum> persistQueue) {
+ this.persistQueue = persistQueue;
+ }
+
+ public void prepare(Object o) {
+
+ }
+
+ public void cleanUp() {
+
+ }
+
+ @Override
+ public StreamsResultSet readAll() {
+ return readCurrent();
+ }
+
+ @Override
+ public StreamsResultSet readCurrent() {
+
+ LOGGER.info("{} readCurrent", STREAMS_ID);
+
+ Scanner sc = new Scanner(System.in);
+
+ while( sc.hasNextLine() ) {
+
+ persistQueue.offer(new StreamsDatum(sc.nextLine()));
+
+ }
+
+ LOGGER.info("{} providing {} docs", STREAMS_ID, persistQueue.size());
+ LOGGER.info("{} Exiting", STREAMS_ID);
+
+ return (StreamsResultSet) persistQueue;
+ }
+
+ @Override
+ public StreamsResultSet readNew(BigInteger sequence) {
+ return readCurrent();
+ }
+
+ @Override
+ public StreamsResultSet readRange(DateTime start, DateTime end) {
+ return readCurrent();
+ }
+}
Modified:
incubator/streams/branches/STREAMS-26/streams-contrib/streams-persist-console/src/main/java/org/apache/streams/console/ConsolePersistWriter.java
URL:
http://svn.apache.org/viewvc/incubator/streams/branches/STREAMS-26/streams-contrib/streams-persist-console/src/main/java/org/apache/streams/console/ConsolePersistWriter.java?rev=1572361&r1=1572360&r2=1572361&view=diff
==============================================================================
---
incubator/streams/branches/STREAMS-26/streams-contrib/streams-persist-console/src/main/java/org/apache/streams/console/ConsolePersistWriter.java
(original)
+++
incubator/streams/branches/STREAMS-26/streams-contrib/streams-persist-console/src/main/java/org/apache/streams/console/ConsolePersistWriter.java
Thu Feb 27 00:13:52 2014
@@ -12,7 +12,7 @@ import org.slf4j.LoggerFactory;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
-public class ConsolePersistWriter implements StreamsPersistWriter, Runnable {
+public class ConsolePersistWriter implements StreamsPersistWriter {
private static final Logger LOGGER =
LoggerFactory.getLogger(ConsolePersistWriter.class);
@@ -29,7 +29,7 @@ public class ConsolePersistWriter implem
}
public void prepare(Object o) {
-
+ Preconditions.checkNotNull(persistQueue);
}
public void cleanUp() {
@@ -51,10 +51,4 @@ public class ConsolePersistWriter implem
}
- @Override
- public void run() {
- Preconditions.checkNotNull(persistQueue);
- new Thread(new ConsolePersistWriterTask(this)).start();
- }
-
}
Modified:
incubator/streams/branches/STREAMS-26/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProvider.java
URL:
http://svn.apache.org/viewvc/incubator/streams/branches/STREAMS-26/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProvider.java?rev=1572361&r1=1572360&r2=1572361&view=diff
==============================================================================
---
incubator/streams/branches/STREAMS-26/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProvider.java
(original)
+++
incubator/streams/branches/STREAMS-26/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProvider.java
Thu Feb 27 00:13:52 2014
@@ -36,6 +36,8 @@ import java.util.concurrent.*;
*/
public class TwitterStreamProvider implements StreamsProvider, Serializable {
+ private final static String STREAMS_ID = "TwitterStreamProvider";
+
private final static Logger LOGGER =
LoggerFactory.getLogger(TwitterStreamProvider.class);
private TwitterStreamConfiguration config;