http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/test/writer/DatumCounterWriter.java ---------------------------------------------------------------------- diff --git a/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/test/writer/DatumCounterWriter.java b/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/test/writer/DatumCounterWriter.java index b7f777e..0c7af1e 100644 --- a/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/test/writer/DatumCounterWriter.java +++ b/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/test/writer/DatumCounterWriter.java @@ -18,11 +18,16 @@ package org.apache.streams.local.test.writer; -import com.google.common.collect.Lists; import org.apache.streams.core.StreamsDatum; import org.apache.streams.core.StreamsPersistWriter; -import java.util.*; +import com.google.common.collect.Lists; + +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Random; +import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicLong; @@ -31,79 +36,79 @@ import java.util.concurrent.atomic.AtomicLong; */ public class DatumCounterWriter implements StreamsPersistWriter{ - @Override - public String getId() { - return "DatumCounterWriter"; - } + @Override + public String getId() { + return "DatumCounterWriter"; + } - /** - * Set of all ids that have been claimed. Ensures all instances are assigned unique ids - */ - public static Set<Integer> CLAIMED_ID = new HashSet<Integer>(); - /** - * Random instance to generate ids - */ - public static final Random RAND = new Random(); - /** - * Set of instance ids that received data. Usefully for testing parrallelization is actually working. - */ - public final static Set<Integer> SEEN_DATA = Collections.newSetFromMap(new ConcurrentHashMap<Integer, Boolean>()); - /** - * The total count of data seen by a all instances of a processor. - */ - public static final ConcurrentHashMap<String, AtomicLong> COUNTS = new ConcurrentHashMap<>(); - /** - * The documents received - */ - public static final ConcurrentHashMap<String, List<Object>> RECEIVED = new ConcurrentHashMap<>(); + /** + * Set of all ids that have been claimed. Ensures all instances are assigned unique ids + */ + public static Set<Integer> CLAIMED_ID = new HashSet<Integer>(); + /** + * Random instance to generate ids + */ + public static final Random RAND = new Random(); + /** + * Set of instance ids that received data. Usefully for testing parrallelization is actually working. + */ + public final static Set<Integer> SEEN_DATA = Collections.newSetFromMap(new ConcurrentHashMap<Integer, Boolean>()); + /** + * The total count of data seen by a all instances of a processor. + */ + public static final ConcurrentHashMap<String, AtomicLong> COUNTS = new ConcurrentHashMap<>(); + /** + * The documents received + */ + public static final ConcurrentHashMap<String, List<Object>> RECEIVED = new ConcurrentHashMap<>(); - private int counter = 0; - private String writerId; - private Integer id; + private int counter = 0; + private String writerId; + private Integer id; - public DatumCounterWriter(String writerId) { - this.writerId = writerId; - } + public DatumCounterWriter(String writerId) { + this.writerId = writerId; + } - @Override - public void write(StreamsDatum entry) { - ++this.counter; - SEEN_DATA.add(this.id); - synchronized (RECEIVED) { - List<Object> documents = RECEIVED.get(this.writerId); - if(documents == null) { - List<Object> docs = Lists.newLinkedList(); - docs.add(entry.getDocument()); - RECEIVED.put(this.writerId, docs); - } else { - documents.add(entry.getDocument()); - } - } + @Override + public void write(StreamsDatum entry) { + ++this.counter; + SEEN_DATA.add(this.id); + synchronized (RECEIVED) { + List<Object> documents = RECEIVED.get(this.writerId); + if(documents == null) { + List<Object> docs = Lists.newLinkedList(); + docs.add(entry.getDocument()); + RECEIVED.put(this.writerId, docs); + } else { + documents.add(entry.getDocument()); + } } + } - @Override - public void prepare(Object configurationObject) { - synchronized (CLAIMED_ID) { - this.id = RAND.nextInt(); - while(!CLAIMED_ID.add(this.id)) { - this.id = RAND.nextInt(); - } - } + @Override + public void prepare(Object configurationObject) { + synchronized (CLAIMED_ID) { + this.id = RAND.nextInt(); + while(!CLAIMED_ID.add(this.id)) { + this.id = RAND.nextInt(); + } } + } - @Override - public void cleanUp() { - synchronized (COUNTS) { - AtomicLong count = COUNTS.get(this.writerId); - if(count == null) { - COUNTS.put(this.writerId, new AtomicLong(this.counter)); - } else { - count.addAndGet(this.counter); - } - } + @Override + public void cleanUp() { + synchronized (COUNTS) { + AtomicLong count = COUNTS.get(this.writerId); + if(count == null) { + COUNTS.put(this.writerId, new AtomicLong(this.counter)); + } else { + count.addAndGet(this.counter); + } } + } - public int getDatumsCounted() { - return this.counter; - } + public int getDatumsCounted() { + return this.counter; + } }
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/test/writer/DoNothingWriter.java ---------------------------------------------------------------------- diff --git a/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/test/writer/DoNothingWriter.java b/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/test/writer/DoNothingWriter.java index d9ec6d3..48f4b68 100644 --- a/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/test/writer/DoNothingWriter.java +++ b/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/test/writer/DoNothingWriter.java @@ -20,6 +20,7 @@ package org.apache.streams.local.test.writer; import org.apache.streams.core.StreamsDatum; import org.apache.streams.core.StreamsPersistWriter; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -28,25 +29,25 @@ import org.slf4j.LoggerFactory; */ public class DoNothingWriter implements StreamsPersistWriter { - private final static Logger LOGGER = LoggerFactory.getLogger(DoNothingWriter.class); + private final static Logger LOGGER = LoggerFactory.getLogger(DoNothingWriter.class); - @Override - public String getId() { - return "DoNothingWriter"; - } + @Override + public String getId() { + return "DoNothingWriter"; + } - @Override - public void write(StreamsDatum entry) { + @Override + public void write(StreamsDatum entry) { - } + } - @Override - public void prepare(Object configurationObject) { + @Override + public void prepare(Object configurationObject) { - } + } - @Override - public void cleanUp() { - LOGGER.debug("Writer Clean Up!"); - } + @Override + public void cleanUp() { + LOGGER.debug("Writer Clean Up!"); + } } http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/test/writer/SystemOutWriter.java ---------------------------------------------------------------------- diff --git a/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/test/writer/SystemOutWriter.java b/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/test/writer/SystemOutWriter.java index 76ce353..2711ae1 100644 --- a/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/test/writer/SystemOutWriter.java +++ b/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/test/writer/SystemOutWriter.java @@ -20,6 +20,7 @@ package org.apache.streams.local.test.writer; import org.apache.streams.core.StreamsDatum; import org.apache.streams.core.StreamsPersistWriter; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -28,25 +29,25 @@ import org.slf4j.LoggerFactory; */ public class SystemOutWriter implements StreamsPersistWriter { - private final static Logger LOGGER = LoggerFactory.getLogger(SystemOutWriter.class); + private final static Logger LOGGER = LoggerFactory.getLogger(SystemOutWriter.class); - @Override - public String getId() { - return "SystemOutWriter"; - } + @Override + public String getId() { + return "SystemOutWriter"; + } - @Override - public void write(StreamsDatum entry) { - System.out.println(entry.document); - } + @Override + public void write(StreamsDatum entry) { + System.out.println(entry.document); + } - @Override - public void prepare(Object configurationObject) { + @Override + public void prepare(Object configurationObject) { - } + } - @Override - public void cleanUp() { - LOGGER.debug("Clean up called writer!"); - } + @Override + public void cleanUp() { + LOGGER.debug("Clean up called writer!"); + } } http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/test/component/ExpectedDatumsPersistWriter.java ---------------------------------------------------------------------- diff --git a/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/test/component/ExpectedDatumsPersistWriter.java b/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/test/component/ExpectedDatumsPersistWriter.java index 80d4a24..16b98c4 100644 --- a/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/test/component/ExpectedDatumsPersistWriter.java +++ b/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/test/component/ExpectedDatumsPersistWriter.java @@ -20,58 +20,57 @@ package org.apache.streams.test.component; import org.apache.streams.core.StreamsDatum; import org.apache.streams.core.StreamsPersistWriter; -import static org.junit.Assert.*; -import java.io.File; -import java.io.FileNotFoundException; -import java.io.InputStream; import java.util.LinkedList; import java.util.List; import java.util.Scanner; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotEquals; + /** * Created by rebanks on 2/27/14. */ public class ExpectedDatumsPersistWriter implements StreamsPersistWriter{ - @Override - public String getId() { - return "ExpectedDatumsPersistWriter"; - } + @Override + public String getId() { + return "ExpectedDatumsPersistWriter"; + } - private StreamsDatumConverter converter; - private String fileName; - private List<StreamsDatum> expectedDatums; - private int counted = 0; - private int expectedSize = 0; + private StreamsDatumConverter converter; + private String fileName; + private List<StreamsDatum> expectedDatums; + private int counted = 0; + private int expectedSize = 0; - public ExpectedDatumsPersistWriter(StreamsDatumConverter converter, String filePathInResources) { - this.converter = converter; - this.fileName = filePathInResources; - } + public ExpectedDatumsPersistWriter(StreamsDatumConverter converter, String filePathInResources) { + this.converter = converter; + this.fileName = filePathInResources; + } - @Override - public void write(StreamsDatum entry) { - int index = this.expectedDatums.indexOf(entry); - assertNotEquals("Datum not expected. "+entry.toString(), -1, index); - this.expectedDatums.remove(index); - ++this.counted; - } + @Override + public void write(StreamsDatum entry) { + int index = this.expectedDatums.indexOf(entry); + assertNotEquals("Datum not expected. "+entry.toString(), -1, index); + this.expectedDatums.remove(index); + ++this.counted; + } - @Override - public void prepare(Object configurationObject) { - Scanner scanner = new Scanner(ExpectedDatumsPersistWriter.class.getResourceAsStream(this.fileName)); - this.expectedDatums = new LinkedList<StreamsDatum>(); - while(scanner.hasNextLine()) { - this.expectedDatums.add(this.converter.convert(scanner.nextLine())); - } - this.expectedSize = this.expectedDatums.size(); + @Override + public void prepare(Object configurationObject) { + Scanner scanner = new Scanner(ExpectedDatumsPersistWriter.class.getResourceAsStream(this.fileName)); + this.expectedDatums = new LinkedList<StreamsDatum>(); + while(scanner.hasNextLine()) { + this.expectedDatums.add(this.converter.convert(scanner.nextLine())); } + this.expectedSize = this.expectedDatums.size(); + } - @Override - public void cleanUp() { - assertEquals("Did not received the expected number of StreamsDatums", this.expectedSize, this.counted); - } + @Override + public void cleanUp() { + assertEquals("Did not received the expected number of StreamsDatums", this.expectedSize, this.counted); + } } http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/test/component/FileReaderProvider.java ---------------------------------------------------------------------- diff --git a/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/test/component/FileReaderProvider.java b/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/test/component/FileReaderProvider.java index 41e7eed..0fbfae9 100644 --- a/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/test/component/FileReaderProvider.java +++ b/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/test/component/FileReaderProvider.java @@ -18,10 +18,11 @@ package org.apache.streams.test.component; -import com.google.common.collect.Queues; import org.apache.streams.core.StreamsDatum; import org.apache.streams.core.StreamsProvider; import org.apache.streams.core.StreamsResultSet; + +import com.google.common.collect.Queues; import org.joda.time.DateTime; import java.math.BigInteger; @@ -37,64 +38,64 @@ import java.util.Scanner; */ public class FileReaderProvider implements StreamsProvider { - private String fileName; - private Scanner scanner; - private StreamsDatumConverter converter; - - public FileReaderProvider(String filePathInResources, StreamsDatumConverter converter) { - this.fileName = filePathInResources; - this.converter = converter; - } - - @Override - public String getId() { - return "FileReaderProvider"; - } - - @Override - public void startStream() { - - } - - @Override - public StreamsResultSet readCurrent() { - return new StreamsResultSet(constructQueue(this.scanner)); - } - - @Override - public StreamsResultSet readNew(BigInteger sequence) { - throw new UnsupportedOperationException(); - } - - @Override - public StreamsResultSet readRange(DateTime start, DateTime end) { - throw new UnsupportedOperationException(); - } - - @Override - public boolean isRunning() { - return this.scanner != null && this.scanner.hasNextLine(); - } - - @Override - public void prepare(Object configurationObject) { - this.scanner = new Scanner(FileReaderProvider.class.getResourceAsStream(this.fileName)); - } - - @Override - public void cleanUp() { - if(this.scanner!= null) { - this.scanner.close(); - this.scanner = null; - } + private String fileName; + private Scanner scanner; + private StreamsDatumConverter converter; + + public FileReaderProvider(String filePathInResources, StreamsDatumConverter converter) { + this.fileName = filePathInResources; + this.converter = converter; + } + + @Override + public String getId() { + return "FileReaderProvider"; + } + + @Override + public void startStream() { + + } + + @Override + public StreamsResultSet readCurrent() { + return new StreamsResultSet(constructQueue(this.scanner)); + } + + @Override + public StreamsResultSet readNew(BigInteger sequence) { + throw new UnsupportedOperationException(); + } + + @Override + public StreamsResultSet readRange(DateTime start, DateTime end) { + throw new UnsupportedOperationException(); + } + + @Override + public boolean isRunning() { + return this.scanner != null && this.scanner.hasNextLine(); + } + + @Override + public void prepare(Object configurationObject) { + this.scanner = new Scanner(FileReaderProvider.class.getResourceAsStream(this.fileName)); + } + + @Override + public void cleanUp() { + if(this.scanner!= null) { + this.scanner.close(); + this.scanner = null; } + } - private Queue<StreamsDatum> constructQueue(Scanner scanner) { - Queue<StreamsDatum> data = Queues.newLinkedBlockingQueue(); - while(scanner.hasNextLine()) { - data.add(converter.convert(scanner.nextLine())); - } - cleanUp(); - return data; + private Queue<StreamsDatum> constructQueue(Scanner scanner) { + Queue<StreamsDatum> data = Queues.newLinkedBlockingQueue(); + while(scanner.hasNextLine()) { + data.add(converter.convert(scanner.nextLine())); } + cleanUp(); + return data; + } } http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/test/component/StreamsDatumConverter.java ---------------------------------------------------------------------- diff --git a/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/test/component/StreamsDatumConverter.java b/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/test/component/StreamsDatumConverter.java index e3b7dd1..9172167 100644 --- a/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/test/component/StreamsDatumConverter.java +++ b/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/test/component/StreamsDatumConverter.java @@ -27,5 +27,5 @@ import java.io.Serializable; */ public interface StreamsDatumConverter extends Serializable { - public StreamsDatum convert(String s); + public StreamsDatum convert(String s); } http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/test/component/StringToDocumentConverter.java ---------------------------------------------------------------------- diff --git a/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/test/component/StringToDocumentConverter.java b/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/test/component/StringToDocumentConverter.java index 6f4e620..3727aa1 100644 --- a/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/test/component/StringToDocumentConverter.java +++ b/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/test/component/StringToDocumentConverter.java @@ -25,9 +25,9 @@ import org.apache.streams.core.StreamsDatum; */ public class StringToDocumentConverter implements StreamsDatumConverter { - @Override - public StreamsDatum convert(String s) { - return new StreamsDatum(s); - } + @Override + public StreamsDatum convert(String s) { + return new StreamsDatum(s); + } } http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/test/component/tests/TestComponentsLocalStream.java ---------------------------------------------------------------------- diff --git a/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/test/component/tests/TestComponentsLocalStream.java b/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/test/component/tests/TestComponentsLocalStream.java index 935c8fe..5154ea3 100644 --- a/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/test/component/tests/TestComponentsLocalStream.java +++ b/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/test/component/tests/TestComponentsLocalStream.java @@ -23,6 +23,7 @@ import org.apache.streams.test.component.ExpectedDatumsPersistWriter; import org.apache.streams.test.component.FileReaderProvider; import org.apache.streams.test.component.StringToDocumentConverter; import org.apache.streams.util.ComponentUtils; + import org.junit.After; import org.junit.Test; @@ -31,22 +32,22 @@ import org.junit.Test; */ public class TestComponentsLocalStream { - @After - public void removeLocalMBeans() { - try { - ComponentUtils.removeAllMBeansOfDomain("org.apache.streams.local"); - } catch (Exception e) { - //No op. proceed to next test - } + @After + public void removeLocalMBeans() { + try { + ComponentUtils.removeAllMBeansOfDomain("org.apache.streams.local"); + } catch (Exception e) { + //No op. proceed to next test } + } - @Test - public void testLocalStreamWithComponent() { - LocalStreamBuilder builder = new LocalStreamBuilder(); - builder.newReadCurrentStream("provider", new FileReaderProvider("/TestFile.txt", - new StringToDocumentConverter())); - builder.addStreamsPersistWriter("writer", new ExpectedDatumsPersistWriter(new StringToDocumentConverter(), - "/TestFile.txt"), 1, "provider") + @Test + public void testLocalStreamWithComponent() { + LocalStreamBuilder builder = new LocalStreamBuilder(); + builder.newReadCurrentStream("provider", new FileReaderProvider("/TestFile.txt", + new StringToDocumentConverter())); + builder.addStreamsPersistWriter("writer", new ExpectedDatumsPersistWriter(new StringToDocumentConverter(), + "/TestFile.txt"), 1, "provider") .start(); - } + } } http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/test/component/tests/TestExpectedDatumsPersitWriter.java ---------------------------------------------------------------------- diff --git a/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/test/component/tests/TestExpectedDatumsPersitWriter.java b/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/test/component/tests/TestExpectedDatumsPersitWriter.java index 11e891b..0535295 100644 --- a/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/test/component/tests/TestExpectedDatumsPersitWriter.java +++ b/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/test/component/tests/TestExpectedDatumsPersitWriter.java @@ -22,8 +22,8 @@ import org.apache.streams.core.StreamsDatum; import org.apache.streams.test.component.ExpectedDatumsPersistWriter; import org.apache.streams.test.component.StringToDocumentConverter; import org.apache.streams.util.ComponentUtils; + import org.junit.After; -import org.junit.Ignore; import org.junit.Test; /** @@ -31,37 +31,37 @@ import org.junit.Test; */ public class TestExpectedDatumsPersitWriter { - private static final StreamsDatum[] INPUT_DATUMS = new StreamsDatum[] { - new StreamsDatum("Document1"), - new StreamsDatum("Document2"), - new StreamsDatum("Document3"), - new StreamsDatum("Document4") + private static final StreamsDatum[] INPUT_DATUMS = new StreamsDatum[] { + new StreamsDatum("Document1"), + new StreamsDatum("Document2"), + new StreamsDatum("Document3"), + new StreamsDatum("Document4") // Uncomment to prove failures occur, or comment out a datum above // ,new StreamsDatum("Document5") - }; + }; - @After - public void removeLocalMBeans() { - try { - ComponentUtils.removeAllMBeansOfDomain("org.apache.streams.local"); - } catch (Exception e) { - //No op. proceed to next test - } + @After + public void removeLocalMBeans() { + try { + ComponentUtils.removeAllMBeansOfDomain("org.apache.streams.local"); + } catch (Exception e) { + //No op. proceed to next test } + } - @Test - public void testExpectedDatumsPersistWriterFileName() { - testDatums(new ExpectedDatumsPersistWriter(new StringToDocumentConverter(), "/TestFile.txt")); - } + @Test + public void testExpectedDatumsPersistWriterFileName() { + testDatums(new ExpectedDatumsPersistWriter(new StringToDocumentConverter(), "/TestFile.txt")); + } - private void testDatums(ExpectedDatumsPersistWriter writer) { - writer.prepare(null); - for(StreamsDatum datum : INPUT_DATUMS) { - writer.write(datum); - } - writer.cleanUp(); + private void testDatums(ExpectedDatumsPersistWriter writer) { + writer.prepare(null); + for(StreamsDatum datum : INPUT_DATUMS) { + writer.write(datum); } + writer.cleanUp(); + } } http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/test/component/tests/TestFileReaderProvider.java ---------------------------------------------------------------------- diff --git a/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/test/component/tests/TestFileReaderProvider.java b/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/test/component/tests/TestFileReaderProvider.java index 1ae9a24..a2b7bba 100644 --- a/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/test/component/tests/TestFileReaderProvider.java +++ b/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/test/component/tests/TestFileReaderProvider.java @@ -23,41 +23,39 @@ import org.apache.streams.core.StreamsResultSet; import org.apache.streams.test.component.FileReaderProvider; import org.apache.streams.test.component.StringToDocumentConverter; import org.apache.streams.util.ComponentUtils; + import org.junit.After; -import org.junit.Ignore; import org.junit.Test; -import java.io.InputStream; - -import static org.junit.Assert.*; +import static org.junit.Assert.assertEquals; /** * */ public class TestFileReaderProvider { - @After - public void removeLocalMBeans() { - try { - ComponentUtils.removeAllMBeansOfDomain("org.apache.streams.local"); - } catch (Exception e) { - //No op. proceed to next test - } + @After + public void removeLocalMBeans() { + try { + ComponentUtils.removeAllMBeansOfDomain("org.apache.streams.local"); + } catch (Exception e) { + //No op. proceed to next test } + } - @Test - public void testFileReaderProviderFileName() { - String fileName = "/TestFile.txt"; - FileReaderProvider provider = new FileReaderProvider(fileName, new StringToDocumentConverter()); - provider.prepare(null); - StreamsResultSet resultSet = provider.readCurrent(); - int count = 0; - for(StreamsDatum datum : resultSet) { - ++count; - } - assertEquals(4, count); - provider.cleanUp(); + @Test + public void testFileReaderProviderFileName() { + String fileName = "/TestFile.txt"; + FileReaderProvider provider = new FileReaderProvider(fileName, new StringToDocumentConverter()); + provider.prepare(null); + StreamsResultSet resultSet = provider.readCurrent(); + int count = 0; + for(StreamsDatum datum : resultSet) { + ++count; } + assertEquals(4, count); + provider.cleanUp(); + } http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-runtimes/streams-runtime-pig/src/main/java/org/apache/streams/pig/StreamsComponentFactory.java ---------------------------------------------------------------------- diff --git a/streams-runtimes/streams-runtime-pig/src/main/java/org/apache/streams/pig/StreamsComponentFactory.java b/streams-runtimes/streams-runtime-pig/src/main/java/org/apache/streams/pig/StreamsComponentFactory.java index 9b887af..44ade9c 100644 --- a/streams-runtimes/streams-runtime-pig/src/main/java/org/apache/streams/pig/StreamsComponentFactory.java +++ b/streams-runtimes/streams-runtime-pig/src/main/java/org/apache/streams/pig/StreamsComponentFactory.java @@ -19,54 +19,47 @@ package org.apache.streams.pig; -import com.google.common.base.Preconditions; -import com.google.common.collect.Lists; -import org.apache.commons.lang.ArrayUtils; import org.apache.streams.core.StreamsProcessor; import org.apache.streams.data.ActivityConverter; -import org.apache.streams.data.ActivitySerializer; -import org.slf4j.Logger; -import java.lang.reflect.InvocationTargetException; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; +import com.google.common.base.Preconditions; +import org.slf4j.Logger; /** - * Static reflection wrappers for instantiating StreamsComponents + * Static reflection wrappers for instantiating StreamsComponents. */ public class StreamsComponentFactory { - private static final Logger LOGGER = org.slf4j.LoggerFactory.getLogger(StreamsComponentFactory.class); + private static final Logger LOGGER = org.slf4j.LoggerFactory.getLogger(StreamsComponentFactory.class); - public static ActivityConverter getConverterInstance(Class<?> converterClazz) { + public static ActivityConverter getConverterInstance(Class<?> converterClazz) { - Object object = null; - try { - object = converterClazz.getConstructor().newInstance(); - } catch (Exception e) { - LOGGER.error(e.getMessage()); - } - - Preconditions.checkNotNull(object); + Object object = null; + try { + object = converterClazz.getConstructor().newInstance(); + } catch (Exception e) { + LOGGER.error(e.getMessage()); + } - ActivityConverter converter = (ActivityConverter) object; + Preconditions.checkNotNull(object); - return converter; + ActivityConverter converter = (ActivityConverter) object; - } + return converter; - public static StreamsProcessor getProcessorInstance(Class<?> processorClazz) { + } - Object object = null; - try { - object = processorClazz.getConstructor().newInstance(); - } catch (Exception e) { - LOGGER.error(e.getMessage()); - } - StreamsProcessor processor = (StreamsProcessor) object; - return processor; + public static StreamsProcessor getProcessorInstance(Class<?> processorClazz) { + Object object = null; + try { + object = processorClazz.getConstructor().newInstance(); + } catch (Exception e) { + LOGGER.error(e.getMessage()); } + StreamsProcessor processor = (StreamsProcessor) object; + return processor; + + } } http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-runtimes/streams-runtime-pig/src/main/java/org/apache/streams/pig/StreamsPigBuilder.java ---------------------------------------------------------------------- diff --git a/streams-runtimes/streams-runtime-pig/src/main/java/org/apache/streams/pig/StreamsPigBuilder.java b/streams-runtimes/streams-runtime-pig/src/main/java/org/apache/streams/pig/StreamsPigBuilder.java deleted file mode 100644 index 5ff4145..0000000 --- a/streams-runtimes/streams-runtime-pig/src/main/java/org/apache/streams/pig/StreamsPigBuilder.java +++ /dev/null @@ -1,88 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.streams.pig; - -import org.apache.streams.config.StreamsConfiguration; -import org.apache.streams.core.StreamBuilder; -import org.apache.streams.core.StreamsPersistWriter; -import org.apache.streams.core.StreamsProcessor; -import org.apache.streams.core.StreamsProvider; -import org.joda.time.DateTime; - -import java.math.BigInteger; - -/** - * Goal is to be able to build a pig workflow using same syntax as other - * StreamsBuilders - * - * Currently implementers must write own pig scripts to use this module - */ -public class StreamsPigBuilder implements StreamBuilder { - - @Override - public StreamBuilder setStreamsConfiguration(StreamsConfiguration configuration) { - return null; - } - - @Override - public StreamsConfiguration getStreamsConfiguration() { - return null; - } - - @Override - public StreamBuilder addStreamsProcessor(String s, StreamsProcessor streamsProcessor, int i, String... strings) { - return null; - } - - @Override - public StreamBuilder addStreamsPersistWriter(String s, StreamsPersistWriter streamsPersistWriter, int i, String... strings) { - return null; - } - - @Override - public StreamBuilder newPerpetualStream(String s, StreamsProvider streamsProvider) { - return null; - } - - @Override - public StreamBuilder newReadCurrentStream(String s, StreamsProvider streamsProvider) { - return null; - } - - @Override - public StreamBuilder newReadNewStream(String s, StreamsProvider streamsProvider, BigInteger bigInteger) { - return null; - } - - @Override - public StreamBuilder newReadRangeStream(String s, StreamsProvider streamsProvider, DateTime dateTime, DateTime dateTime2) { - return null; - } - - @Override - public void start() { - - } - - @Override - public void stop() { - - } -} http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-runtimes/streams-runtime-pig/src/main/java/org/apache/streams/pig/StreamsProcessDatumExec.java ---------------------------------------------------------------------- diff --git a/streams-runtimes/streams-runtime-pig/src/main/java/org/apache/streams/pig/StreamsProcessDatumExec.java b/streams-runtimes/streams-runtime-pig/src/main/java/org/apache/streams/pig/StreamsProcessDatumExec.java index 74f7eb5..cd08020 100644 --- a/streams-runtimes/streams-runtime-pig/src/main/java/org/apache/streams/pig/StreamsProcessDatumExec.java +++ b/streams-runtimes/streams-runtime-pig/src/main/java/org/apache/streams/pig/StreamsProcessDatumExec.java @@ -19,21 +19,25 @@ package org.apache.streams.pig; +import org.apache.streams.core.StreamsDatum; +import org.apache.streams.core.StreamsProcessor; +import org.apache.streams.data.util.RFC3339Utils; +import org.apache.streams.jackson.StreamsJacksonMapper; + import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Preconditions; import com.google.common.collect.Lists; import datafu.pig.util.AliasableEvalFunc; import org.apache.commons.lang.ArrayUtils; import org.apache.hadoop.conf.Configuration; -import org.apache.pig.EvalFunc; import org.apache.pig.builtin.MonitoredUDF; -import org.apache.pig.data.*; +import org.apache.pig.data.BagFactory; +import org.apache.pig.data.DataBag; +import org.apache.pig.data.DataType; +import org.apache.pig.data.Tuple; +import org.apache.pig.data.TupleFactory; import org.apache.pig.impl.logicalLayer.schema.Schema; import org.apache.pig.impl.util.UDFContext; -import org.apache.streams.core.StreamsDatum; -import org.apache.streams.core.StreamsProcessor; -import org.apache.streams.data.util.RFC3339Utils; -import org.apache.streams.jackson.StreamsJacksonMapper; import org.joda.time.DateTime; import org.slf4j.Logger; @@ -48,40 +52,40 @@ import java.util.concurrent.TimeUnit; @MonitoredUDF(timeUnit = TimeUnit.SECONDS, duration = 30, intDefault = 10) public class StreamsProcessDatumExec extends AliasableEvalFunc<DataBag> { - private static final Logger LOGGER = org.slf4j.LoggerFactory.getLogger(StreamsProcessDatumExec.class); - - TupleFactory mTupleFactory = TupleFactory.getInstance(); - BagFactory mBagFactory = BagFactory.getInstance(); - - StreamsProcessor streamsProcessor; - - ObjectMapper mapper = StreamsJacksonMapper.getInstance(); - - public StreamsProcessDatumExec(String... execArgs) throws ClassNotFoundException{ - Preconditions.checkNotNull(execArgs); - Preconditions.checkArgument(execArgs.length > 0); - String classFullName = execArgs[0]; - Preconditions.checkNotNull(classFullName); - String[] prepareArgs = (String[]) ArrayUtils.remove(execArgs, 0); - streamsProcessor = StreamsComponentFactory.getProcessorInstance(Class.forName(classFullName)); - if( execArgs.length == 1 ) { - LOGGER.debug("prepare (null)"); - streamsProcessor.prepare(null); - } else if( execArgs.length > 1 ) { - LOGGER.debug("prepare " + Arrays.toString(prepareArgs)); - streamsProcessor.prepare(prepareArgs); - } + private static final Logger LOGGER = org.slf4j.LoggerFactory.getLogger(StreamsProcessDatumExec.class); + + TupleFactory mTupleFactory = TupleFactory.getInstance(); + BagFactory mBagFactory = BagFactory.getInstance(); + + StreamsProcessor streamsProcessor; + + ObjectMapper mapper = StreamsJacksonMapper.getInstance(); + + public StreamsProcessDatumExec(String... execArgs) throws ClassNotFoundException{ + Preconditions.checkNotNull(execArgs); + Preconditions.checkArgument(execArgs.length > 0); + String classFullName = execArgs[0]; + Preconditions.checkNotNull(classFullName); + String[] prepareArgs = (String[]) ArrayUtils.remove(execArgs, 0); + streamsProcessor = StreamsComponentFactory.getProcessorInstance(Class.forName(classFullName)); + if( execArgs.length == 1 ) { + LOGGER.debug("prepare (null)"); + streamsProcessor.prepare(null); + } else if( execArgs.length > 1 ) { + LOGGER.debug("prepare " + Arrays.toString(prepareArgs)); + streamsProcessor.prepare(prepareArgs); } + } - @Override - public DataBag exec(Tuple input) throws IOException { + @Override + public DataBag exec(Tuple input) throws IOException { - if (input == null || input.size() == 0) - return null; + if (input == null || input.size() == 0) + return null; - DataBag output = BagFactory.getInstance().newDefaultBag(); + DataBag output = BagFactory.getInstance().newDefaultBag(); - Configuration conf = UDFContext.getUDFContext().getJobConf(); + Configuration conf = UDFContext.getUDFContext().getJobConf(); // I would prefer it work this way, but at the moment it doesn't @@ -95,91 +99,91 @@ public class StreamsProcessDatumExec extends AliasableEvalFunc<DataBag> { // } // String object = getString(input, "object"); - String id = (String) input.get(0); - String source = (String) input.get(1); - Long timestamp; - try { - timestamp = (Long) input.get(2); - } catch( Exception e ) { - timestamp = RFC3339Utils.parseUTC((String)input.get(2)).getMillis(); - } - String object = (String) input.get(3); + String id = (String) input.get(0); + String source = (String) input.get(1); + Long timestamp; + try { + timestamp = (Long) input.get(2); + } catch( Exception e ) { + timestamp = RFC3339Utils.parseUTC((String)input.get(2)).getMillis(); + } + String object = (String) input.get(3); - StreamsDatum entry = new StreamsDatum(object, id, new DateTime(timestamp)); + StreamsDatum entry = new StreamsDatum(object, id, new DateTime(timestamp)); - List<StreamsDatum> resultSet = streamsProcessor.process(entry); - List<Tuple> resultTupleList = Lists.newArrayList(); + List<StreamsDatum> resultSet = streamsProcessor.process(entry); + List<Tuple> resultTupleList = Lists.newArrayList(); - for( StreamsDatum resultDatum : resultSet ) { - Tuple tuple = mTupleFactory.newTuple(); - tuple.append(id); - tuple.append(source); - tuple.append(timestamp); + for( StreamsDatum resultDatum : resultSet ) { + Tuple tuple = mTupleFactory.newTuple(); + tuple.append(id); + tuple.append(source); + tuple.append(timestamp); - if( resultDatum.getDocument() instanceof String ) - tuple.append(resultDatum.getDocument()); - else - tuple.append(mapper.writeValueAsString(resultDatum.getDocument())); - resultTupleList.add(tuple); - } + if( resultDatum.getDocument() instanceof String ) + tuple.append(resultDatum.getDocument()); + else + tuple.append(mapper.writeValueAsString(resultDatum.getDocument())); + resultTupleList.add(tuple); + } - DataBag result = mBagFactory.newDefaultBag(resultTupleList); + DataBag result = mBagFactory.newDefaultBag(resultTupleList); - return result; + return result; - } + } - public void finish() { - streamsProcessor.cleanUp(); + public void finish() { + streamsProcessor.cleanUp(); + } + + @Override + public Schema getOutputSchema(Schema schema) { + // Check that we were passed two fields + String error = "Expected: id\tsource\ttimestamp\tobject"; + if (schema.size() != 4) { + throw new RuntimeException(error); } - @Override - public Schema getOutputSchema(Schema schema) { - // Check that we were passed two fields - String error = "Expected: id\tsource\ttimestamp\tobject"; - if (schema.size() != 4) { - throw new RuntimeException(error); - } - - try { - // Get the types for both columns and check them. If they are - // wrong, figure out what types were passed and give a good error - // message. - if (schema.getField(0).type != DataType.CHARARRAY && - schema.getField(0).type != DataType.LONG) { - error += "Problem with id: must be CHARARRAY or LONG"; - error += "\t("; - error += DataType.findTypeName(schema.getField(0).type); - error += ")\n"; - throw new RuntimeException(error); - } - if (schema.getField(1).type != DataType.CHARARRAY) { - error += "Problem with source: must be CHARARRAY"; - error += "\t("; - error += DataType.findTypeName(schema.getField(1).type); - error += ")\n"; - throw new RuntimeException(error); - } - if (schema.getField(2).type != DataType.CHARARRAY && - schema.getField(2).type != DataType.LONG) { - error += "Problem with timestamp: must be CHARARRAY or LONG"; - error += "\t("; - error += DataType.findTypeName(schema.getField(2).type); - error += ")\n"; - throw new RuntimeException(error); - } - if (schema.getField(3).type != DataType.CHARARRAY) { - error += "Problem with object: must be CHARARRAY"; - error += "\t("; - error += DataType.findTypeName(schema.getField(3).type); - error += ")\n"; - throw new RuntimeException(error); - } - } catch (Exception e) { - throw new RuntimeException(error); - } - - // Always hand back the same schema we are passed - return schema; + try { + // Get the types for both columns and check them. If they are + // wrong, figure out what types were passed and give a good error + // message. + if (schema.getField(0).type != DataType.CHARARRAY && + schema.getField(0).type != DataType.LONG) { + error += "Problem with id: must be CHARARRAY or LONG"; + error += "\t("; + error += DataType.findTypeName(schema.getField(0).type); + error += ")\n"; + throw new RuntimeException(error); + } + if (schema.getField(1).type != DataType.CHARARRAY) { + error += "Problem with source: must be CHARARRAY"; + error += "\t("; + error += DataType.findTypeName(schema.getField(1).type); + error += ")\n"; + throw new RuntimeException(error); + } + if (schema.getField(2).type != DataType.CHARARRAY && + schema.getField(2).type != DataType.LONG) { + error += "Problem with timestamp: must be CHARARRAY or LONG"; + error += "\t("; + error += DataType.findTypeName(schema.getField(2).type); + error += ")\n"; + throw new RuntimeException(error); + } + if (schema.getField(3).type != DataType.CHARARRAY) { + error += "Problem with object: must be CHARARRAY"; + error += "\t("; + error += DataType.findTypeName(schema.getField(3).type); + error += ")\n"; + throw new RuntimeException(error); + } + } catch (Exception e) { + throw new RuntimeException(error); } + + // Always hand back the same schema we are passed + return schema; + } } http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-runtimes/streams-runtime-pig/src/main/java/org/apache/streams/pig/StreamsProcessDocumentExec.java ---------------------------------------------------------------------- diff --git a/streams-runtimes/streams-runtime-pig/src/main/java/org/apache/streams/pig/StreamsProcessDocumentExec.java b/streams-runtimes/streams-runtime-pig/src/main/java/org/apache/streams/pig/StreamsProcessDocumentExec.java index 788b347..2f40923 100644 --- a/streams-runtimes/streams-runtime-pig/src/main/java/org/apache/streams/pig/StreamsProcessDocumentExec.java +++ b/streams-runtimes/streams-runtime-pig/src/main/java/org/apache/streams/pig/StreamsProcessDocumentExec.java @@ -19,25 +19,15 @@ package org.apache.streams.pig; +import org.apache.streams.core.StreamsDatum; +import org.apache.streams.core.StreamsProcessor; +import org.apache.streams.jackson.StreamsJacksonMapper; + import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.databind.node.ObjectNode; import com.google.common.base.Preconditions; -import com.google.common.collect.Lists; import datafu.pig.util.SimpleEvalFunc; import org.apache.commons.lang.ArrayUtils; -import org.apache.hadoop.conf.Configuration; -import org.apache.pig.EvalFunc; import org.apache.pig.builtin.MonitoredUDF; -import org.apache.pig.data.BagFactory; -import org.apache.pig.data.DataBag; -import org.apache.pig.data.Tuple; -import org.apache.pig.data.TupleFactory; -import org.apache.pig.impl.util.UDFContext; -import org.apache.streams.core.StreamsDatum; -import org.apache.streams.core.StreamsProcessor; -import org.apache.streams.data.ActivitySerializer; -import org.apache.streams.jackson.StreamsJacksonMapper; -import org.apache.streams.pojo.json.Activity; import org.slf4j.Logger; import java.io.IOException; @@ -54,59 +44,59 @@ import java.util.concurrent.TimeUnit; @MonitoredUDF(timeUnit = TimeUnit.SECONDS, duration = 30, intDefault = 10) public class StreamsProcessDocumentExec extends SimpleEvalFunc<String> { - private static final Logger LOGGER = org.slf4j.LoggerFactory.getLogger(StreamsProcessDocumentExec.class); - - StreamsProcessor streamsProcessor; - ObjectMapper mapper = StreamsJacksonMapper.getInstance(); - - public StreamsProcessDocumentExec(String... execArgs) throws ClassNotFoundException{ - Preconditions.checkNotNull(execArgs); - Preconditions.checkArgument(execArgs.length > 0); - String classFullName = execArgs[0]; - Preconditions.checkNotNull(classFullName); - String[] prepareArgs = (String[]) ArrayUtils.remove(execArgs, 0); - streamsProcessor = StreamsComponentFactory.getProcessorInstance(Class.forName(classFullName)); - if( execArgs.length == 1 ) { - LOGGER.debug("prepare (null)"); - streamsProcessor.prepare(null); - } else if( execArgs.length > 1 ) { - LOGGER.debug("prepare " + Arrays.toString(prepareArgs)); - streamsProcessor.prepare(prepareArgs); - } + private static final Logger LOGGER = org.slf4j.LoggerFactory.getLogger(StreamsProcessDocumentExec.class); + + StreamsProcessor streamsProcessor; + ObjectMapper mapper = StreamsJacksonMapper.getInstance(); + + public StreamsProcessDocumentExec(String... execArgs) throws ClassNotFoundException{ + Preconditions.checkNotNull(execArgs); + Preconditions.checkArgument(execArgs.length > 0); + String classFullName = execArgs[0]; + Preconditions.checkNotNull(classFullName); + String[] prepareArgs = (String[]) ArrayUtils.remove(execArgs, 0); + streamsProcessor = StreamsComponentFactory.getProcessorInstance(Class.forName(classFullName)); + if( execArgs.length == 1 ) { + LOGGER.debug("prepare (null)"); + streamsProcessor.prepare(null); + } else if( execArgs.length > 1 ) { + LOGGER.debug("prepare " + Arrays.toString(prepareArgs)); + streamsProcessor.prepare(prepareArgs); } + } - public String call(String document) throws IOException { + public String call(String document) throws IOException { - Preconditions.checkNotNull(streamsProcessor); - Preconditions.checkNotNull(document); + Preconditions.checkNotNull(streamsProcessor); + Preconditions.checkNotNull(document); - LOGGER.debug(document); + LOGGER.debug(document); - StreamsDatum entry = new StreamsDatum(document); + StreamsDatum entry = new StreamsDatum(document); - Preconditions.checkNotNull(entry); + Preconditions.checkNotNull(entry); - LOGGER.debug(entry.toString()); + LOGGER.debug(entry.toString()); - List<StreamsDatum> resultSet = streamsProcessor.process(entry); + List<StreamsDatum> resultSet = streamsProcessor.process(entry); - LOGGER.debug(resultSet.toString()); + LOGGER.debug(resultSet.toString()); - Object resultDoc = null; - for( StreamsDatum resultDatum : resultSet ) { - resultDoc = resultDatum.getDocument(); - } + Object resultDoc = null; + for( StreamsDatum resultDatum : resultSet ) { + resultDoc = resultDatum.getDocument(); + } - Preconditions.checkNotNull(resultDoc); + Preconditions.checkNotNull(resultDoc); - if( resultDoc instanceof String ) - return (String) resultDoc; - else - return mapper.writeValueAsString(resultDoc); + if( resultDoc instanceof String ) + return (String) resultDoc; + else + return mapper.writeValueAsString(resultDoc); - } + } - public void finish() { - streamsProcessor.cleanUp(); - } + public void finish() { + streamsProcessor.cleanUp(); + } } http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-runtimes/streams-runtime-pig/src/main/java/org/apache/streams/pig/StreamsStorage.java ---------------------------------------------------------------------- diff --git a/streams-runtimes/streams-runtime-pig/src/main/java/org/apache/streams/pig/StreamsStorage.java b/streams-runtimes/streams-runtime-pig/src/main/java/org/apache/streams/pig/StreamsStorage.java deleted file mode 100644 index 7692763..0000000 --- a/streams-runtimes/streams-runtime-pig/src/main/java/org/apache/streams/pig/StreamsStorage.java +++ /dev/null @@ -1,30 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.streams.pig; - -import org.apache.pig.builtin.PigStorage; - -/** - * It would be nice if streams persisters could be used for input / output - * within the pig runtime. - */ -public class StreamsStorage extends PigStorage { - -} http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-runtimes/streams-runtime-pig/src/test/java/org/apache/streams/pig/test/AppendStringProcessor.java ---------------------------------------------------------------------- diff --git a/streams-runtimes/streams-runtime-pig/src/test/java/org/apache/streams/pig/test/AppendStringProcessor.java b/streams-runtimes/streams-runtime-pig/src/test/java/org/apache/streams/pig/test/AppendStringProcessor.java index 4db38fd..a48a5e8 100644 --- a/streams-runtimes/streams-runtime-pig/src/test/java/org/apache/streams/pig/test/AppendStringProcessor.java +++ b/streams-runtimes/streams-runtime-pig/src/test/java/org/apache/streams/pig/test/AppendStringProcessor.java @@ -21,46 +21,47 @@ package org.apache.streams.pig.test; import org.apache.streams.core.StreamsDatum; import org.apache.streams.core.StreamsProcessor; + import org.slf4j.Logger; import java.util.LinkedList; import java.util.List; /** - * Used to Test Pig processor wrapper with arguments to prepare method + * Used to Test Pig processor wrapper with arguments to prepare method. */ public class AppendStringProcessor implements StreamsProcessor { - public final static String STREAMS_ID = "AppendStringProcessor"; + public final static String STREAMS_ID = "AppendStringProcessor"; - private static final Logger LOGGER = org.slf4j.LoggerFactory.getLogger(AppendStringProcessor.class); + private static final Logger LOGGER = org.slf4j.LoggerFactory.getLogger(AppendStringProcessor.class); - String append; + String append; - public AppendStringProcessor() { - } + public AppendStringProcessor() { + } - @Override - public String getId() { - return STREAMS_ID; - } + @Override + public String getId() { + return STREAMS_ID; + } - @Override - public List<StreamsDatum> process(StreamsDatum entry) { - List<StreamsDatum> resultSet; - resultSet = new LinkedList<StreamsDatum>(); - String value = (String) entry.getDocument()+ new String(append); - resultSet.add(new StreamsDatum(value)); - return resultSet; - } + @Override + public List<StreamsDatum> process(StreamsDatum entry) { + List<StreamsDatum> resultSet; + resultSet = new LinkedList<StreamsDatum>(); + String value = (String) entry.getDocument()+ new String(append); + resultSet.add(new StreamsDatum(value)); + return resultSet; + } - @Override - public void prepare(Object configurationObject) { - append = ((String[]) configurationObject)[0]; - } + @Override + public void prepare(Object configurationObject) { + append = ((String[]) configurationObject)[0]; + } - @Override - public void cleanUp() { - LOGGER.info("Processor clean up"); - } + @Override + public void cleanUp() { + LOGGER.info("Processor clean up"); + } } http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-runtimes/streams-runtime-pig/src/test/java/org/apache/streams/pig/test/CopyThriceProcessor.java ---------------------------------------------------------------------- diff --git a/streams-runtimes/streams-runtime-pig/src/test/java/org/apache/streams/pig/test/CopyThriceProcessor.java b/streams-runtimes/streams-runtime-pig/src/test/java/org/apache/streams/pig/test/CopyThriceProcessor.java index 2b687b1..5336007 100644 --- a/streams-runtimes/streams-runtime-pig/src/test/java/org/apache/streams/pig/test/CopyThriceProcessor.java +++ b/streams-runtimes/streams-runtime-pig/src/test/java/org/apache/streams/pig/test/CopyThriceProcessor.java @@ -21,46 +21,47 @@ package org.apache.streams.pig.test; import org.apache.streams.core.StreamsDatum; import org.apache.streams.core.StreamsProcessor; + import org.slf4j.Logger; import java.util.LinkedList; import java.util.List; /** - * Used to Test Pig processor wrapper when multiple datums are returned + * Used to Test Pig processor wrapper when multiple datums are returned. */ public class CopyThriceProcessor implements StreamsProcessor { - public final static String STREAMS_ID = "CopyThriceProcessor"; + public final static String STREAMS_ID = "CopyThriceProcessor"; - private static final Logger LOGGER = org.slf4j.LoggerFactory.getLogger(CopyThriceProcessor.class); + private static final Logger LOGGER = org.slf4j.LoggerFactory.getLogger(CopyThriceProcessor.class); - List<StreamsDatum> result; + List<StreamsDatum> result; - public CopyThriceProcessor() { - } + public CopyThriceProcessor() { + } - @Override - public String getId() { - return STREAMS_ID; - } + @Override + public String getId() { + return STREAMS_ID; + } - @Override - public List<StreamsDatum> process(StreamsDatum entry) { - this.result = new LinkedList<StreamsDatum>(); - result.add(entry); - result.add(entry); - result.add(entry); - return result; - } + @Override + public List<StreamsDatum> process(StreamsDatum entry) { + this.result = new LinkedList<StreamsDatum>(); + result.add(entry); + result.add(entry); + result.add(entry); + return result; + } - @Override - public void prepare(Object configurationObject) { + @Override + public void prepare(Object configurationObject) { - } + } - @Override - public void cleanUp() { - LOGGER.info("Processor clean up"); - } + @Override + public void cleanUp() { + LOGGER.info("Processor clean up"); + } } http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-runtimes/streams-runtime-pig/src/test/java/org/apache/streams/pig/test/DoNothingProcessor.java ---------------------------------------------------------------------- diff --git a/streams-runtimes/streams-runtime-pig/src/test/java/org/apache/streams/pig/test/DoNothingProcessor.java b/streams-runtimes/streams-runtime-pig/src/test/java/org/apache/streams/pig/test/DoNothingProcessor.java index 5528a38..07d3b6f 100644 --- a/streams-runtimes/streams-runtime-pig/src/test/java/org/apache/streams/pig/test/DoNothingProcessor.java +++ b/streams-runtimes/streams-runtime-pig/src/test/java/org/apache/streams/pig/test/DoNothingProcessor.java @@ -21,44 +21,45 @@ package org.apache.streams.pig.test; import org.apache.streams.core.StreamsDatum; import org.apache.streams.core.StreamsProcessor; + import org.slf4j.Logger; import java.util.LinkedList; import java.util.List; /** - * Used to Test Pig processor wrapper - datum passthrough + * Used to Test Pig processor wrapper - datum passthrough. */ public class DoNothingProcessor implements StreamsProcessor { - public final static String STREAMS_ID = "DoNothingProcessor"; + public final static String STREAMS_ID = "DoNothingProcessor"; - private static final Logger LOGGER = org.slf4j.LoggerFactory.getLogger(DoNothingProcessor.class); + private static final Logger LOGGER = org.slf4j.LoggerFactory.getLogger(DoNothingProcessor.class); - List<StreamsDatum> result; + List<StreamsDatum> result; - public DoNothingProcessor() { - } + public DoNothingProcessor() { + } - @Override - public String getId() { - return STREAMS_ID; - } + @Override + public String getId() { + return STREAMS_ID; + } - @Override - public List<StreamsDatum> process(StreamsDatum entry) { - this.result = new LinkedList<StreamsDatum>(); - result.add(entry); - return result; - } + @Override + public List<StreamsDatum> process(StreamsDatum entry) { + this.result = new LinkedList<StreamsDatum>(); + result.add(entry); + return result; + } - @Override - public void prepare(Object configurationObject) { - LOGGER.info("Processor prepare"); - } + @Override + public void prepare(Object configurationObject) { + LOGGER.info("Processor prepare"); + } - @Override - public void cleanUp() { - LOGGER.info("Processor clean up"); - } + @Override + public void cleanUp() { + LOGGER.info("Processor clean up"); + } } http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-runtimes/streams-runtime-pig/src/test/java/org/apache/streams/pig/test/PigConverterTest.java ---------------------------------------------------------------------- diff --git a/streams-runtimes/streams-runtime-pig/src/test/java/org/apache/streams/pig/test/PigConverterTest.java b/streams-runtimes/streams-runtime-pig/src/test/java/org/apache/streams/pig/test/PigConverterTest.java index 5dad52c..a983cc7 100644 --- a/streams-runtimes/streams-runtime-pig/src/test/java/org/apache/streams/pig/test/PigConverterTest.java +++ b/streams-runtimes/streams-runtime-pig/src/test/java/org/apache/streams/pig/test/PigConverterTest.java @@ -19,13 +19,14 @@ package org.apache.streams.pig.test; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.common.collect.Lists; -import org.apache.pig.pigunit.PigTest; import org.apache.streams.jackson.StreamsJacksonMapper; import org.apache.streams.twitter.converter.TwitterDateTimeFormat; import org.apache.streams.twitter.converter.TwitterJsonRetweetActivityConverter; import org.apache.streams.twitter.pojo.Retweet; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.Lists; +import org.apache.pig.pigunit.PigTest; import org.apache.tools.ant.util.StringUtils; import org.junit.Test; @@ -34,23 +35,23 @@ import org.junit.Test; */ public class PigConverterTest { - @Test - public void testPigConverter() throws Exception { + @Test + public void testPigConverter() throws Exception { - String[] input = { - "159475541894897679\ttwitter,statuses/user_timeline\t1384499359006\t{\"retweeted_status\":{\"contributors\":null,\"text\":\"The Costa Concordia cruise ship accident could be a disaster for the industry | http://t.co/M9UUNvZi (via @TIMEMoneyland)\",\"geo\":null,\"retweeted\":false,\"in_reply_to_screen_name\":null,\"possibly_sensitive\":false,\"truncated\":false,\"lang\":\"en\",\"entities\":{\"symbols\":[],\"urls\":[{\"expanded_url\":\"http://ti.me/zYyEtD\",\"indices\":[80,100],\"display_url\":\"ti.me/zYyEtD\",\"url\":\"http://t.co/M9UUNvZi\"}],\"hashtags\":[],\"user_mentions\":[{\"id\":245888431,\"name\":\"TIME Moneyland\",\"indices\":[106,120],\"screen_name\":\"TIMEMoneyland\",\"id_str\":\"245888431\"}]},\"in_reply_to_status_id_str\":null,\"id\":159470076259602432,\"source\":\"<a href=\\\"http://www.hootsuite.com\\\" rel=\\\"nofollow\\\">HootSuite<\\/a>\",\"in_reply_to_user_id_str\":null,\"favorited\":false,\"in_reply_to_status_id\":null,\"retweet_count\":71,\"create d_at\":\"Wed Jan 18 03:00:03 +0000 2012\",\"in_reply_to_user_id\":null,\"favorite_count\":14,\"id_str\":\"159470076259602432\",\"place\":null,\"user\":{\"location\":\"\",\"default_profile\":false,\"profile_background_tile\":true,\"statuses_count\":70754,\"lang\":\"en\",\"profile_link_color\":\"1B4F89\",\"profile_banner_url\":\"https://pbs.twimg.com/profile_banners/14293310/1355243462\",\"id\":14293310,\"following\":false,\"protected\":false,\"favourites_count\":59,\"profile_text_color\":\"000000\",\"description\":\"Breaking news and current events from around the globe. Hosted by TIME staff. Tweet questions to our customer service team @TIMEmag_Service.\",\"verified\":true,\"contributors_enabled\":false,\"profile_sidebar_border_color\":\"000000\",\"name\":\"TIME.com\",\"profile_background_color\":\"CC0000\",\"created_at\":\"Thu Apr 03 13:54:30 +0000 2008\",\"default_profile_image\":false,\"followers_count\":5146268,\"profile_image_url_https\":\"https://pbs.twimg.com/profile_images/1 700796190/Picture_24_normal.png\",\"geo_enabled\":false,\"profile_background_image_url\":\"http://a0.twimg.com/profile_background_images/735228291/107f1a300a90ee713937234bb3d139c0.jpeg\",\"profile_background_image_url_https\":\"https://si0.twimg.com/profile_background_images/735228291/107f1a300a90ee713937234bb3d139c0.jpeg\",\"follow_request_sent\":false,\"entities\":{\"description\":{\"urls\":[]},\"url\":{\"urls\":[{\"expanded_url\":\"http://www.time.com\",\"indices\":[0,22],\"display_url\":\"time.com\",\"url\":\"http://t.co/4aYbUuAeSh\"}]}},\"url\":\"http://t.co/4aYbUuAeSh\",\"utc_offset\":-18000,\"time_zone\":\"Eastern Time (US & Canada)\",\"notifications\":false,\"profile_use_background_image\":true,\"friends_count\":742,\"profile_sidebar_fill_color\":\"D9D9D9\",\"screen_name\":\"TIME\",\"id_str\":\"14293310\",\"profile_image_url\":\"http://pbs.twimg.com/profile_images/1700796190/Picture_24_normal.png\",\"listed_count\":76944,\"is_translator\":false},\"coordinates\":null},\"contr ibutors\":null,\"text\":\"RT @TIME: The Costa Concordia cruise ship accident could be a disaster for the industry | http://t.co/M9UUNvZi (via @TIMEMoneyland)\",\"geo\":null,\"retweeted\":false,\"in_reply_to_screen_name\":null,\"possibly_sensitive\":false,\"truncated\":false,\"lang\":\"en\",\"entities\":{\"symbols\":[],\"urls\":[{\"expanded_url\":\"http://ti.me/zYyEtD\",\"indices\":[90,110],\"display_url\":\"ti.me/zYyEtD\",\"url\":\"http://t.co/M9UUNvZi\"}],\"hashtags\":[],\"user_mentions\":[{\"id\":14293310,\"name\":\"TIME.com\",\"indices\":[3,8],\"screen_name\":\"TIME\",\"id_str\":\"14293310\"},{\"id\":245888431,\"name\":\"TIME Moneyland\",\"indices\":[116,130],\"screen_name\":\"TIMEMoneyland\",\"id_str\":\"245888431\"}]},\"in_reply_to_status_id_str\":null,\"id\":159475541894897679,\"source\":\"<a href=\\\"http://twitter.com/download/iphone\\\" rel=\\\"nofollow\\\">Twitter for iPhone<\\/a>\",\"in_reply_to_user_id_str\":null,\"favorited\":false,\"in_reply_to_status_id\":null,\"retwe et_count\":71,\"created_at\":\"Wed Jan 18 03:21:46 +0000 2012\",\"in_reply_to_user_id\":null,\"favorite_count\":0,\"id_str\":\"159475541894897679\",\"place\":null,\"user\":{\"location\":\"\",\"default_profile\":false,\"profile_background_tile\":true,\"statuses_count\":5053,\"lang\":\"en\",\"profile_link_color\":\"738D84\",\"id\":27552112,\"following\":false,\"protected\":false,\"favourites_count\":52,\"profile_text_color\":\"97CEC9\",\"description\":\"\",\"verified\":false,\"contributors_enabled\":false,\"profile_sidebar_border_color\":\"A9AC00\",\"name\":\"rafael medina-flores\",\"profile_background_color\":\"C5EFE3\",\"created_at\":\"Mon Mar 30 01:21:55 +0000 2009\",\"default_profile_image\":false,\"followers_count\":963,\"profile_image_url_https\":\"https://pbs.twimg.com/profile_images/2519547938/image_normal.jpg\",\"geo_enabled\":true,\"profile_background_image_url\":\"http://a0.twimg.com/profile_background_images/167479660/trireme.jpg\",\"profile_background_image_url_https\":\" https://si0.twimg.com/profile_background_images/167479660/trireme.jpg\",\"follow_request_sent\":false,\"entities\":{\"description\":{\"urls\":[]}},\"url\":null,\"utc_offset\":-25200,\"time_zone\":\"Mountain Time (US & Canada)\",\"notifications\":false,\"profile_use_background_image\":true,\"friends_count\":1800,\"profile_sidebar_fill_color\":\"5C4F3C\",\"screen_name\":\"rmedinaflores\",\"id_str\":\"27552112\",\"profile_image_url\":\"http://pbs.twimg.com/profile_images/2519547938/image_normal.jpg\",\"listed_count\":50,\"is_translator\":false},\"coordinates\":null}" - }; + String[] input = { + "159475541894897679\ttwitter,statuses/user_timeline\t1384499359006\t{\"retweeted_status\":{\"contributors\":null,\"text\":\"The Costa Concordia cruise ship accident could be a disaster for the industry | http://t.co/M9UUNvZi (via @TIMEMoneyland)\",\"geo\":null,\"retweeted\":false,\"in_reply_to_screen_name\":null,\"possibly_sensitive\":false,\"truncated\":false,\"lang\":\"en\",\"entities\":{\"symbols\":[],\"urls\":[{\"expanded_url\":\"http://ti.me/zYyEtD\",\"indices\":[80,100],\"display_url\":\"ti.me/zYyEtD\",\"url\":\"http://t.co/M9UUNvZi\"}],\"hashtags\":[],\"user_mentions\":[{\"id\":245888431,\"name\":\"TIME Moneyland\",\"indices\":[106,120],\"screen_name\":\"TIMEMoneyland\",\"id_str\":\"245888431\"}]},\"in_reply_to_status_id_str\":null,\"id\":159470076259602432,\"source\":\"<a href=\\\"http://www.hootsuite.com\\\" rel=\\\"nofollow\\\">HootSuite<\\/a>\",\"in_reply_to_user_id_str\":null,\"favorited\":false,\"in_reply_to_status_id\":null,\"retweet_count\":71,\"created_at\":\ "Wed Jan 18 03:00:03 +0000 2012\",\"in_reply_to_user_id\":null,\"favorite_count\":14,\"id_str\":\"159470076259602432\",\"place\":null,\"user\":{\"location\":\"\",\"default_profile\":false,\"profile_background_tile\":true,\"statuses_count\":70754,\"lang\":\"en\",\"profile_link_color\":\"1B4F89\",\"profile_banner_url\":\"https://pbs.twimg.com/profile_banners/14293310/1355243462\",\"id\":14293310,\"following\":false,\"protected\":false,\"favourites_count\":59,\"profile_text_color\":\"000000\",\"description\":\"Breaking news and current events from around the globe. Hosted by TIME staff. Tweet questions to our customer service team @TIMEmag_Service.\",\"verified\":true,\"contributors_enabled\":false,\"profile_sidebar_border_color\":\"000000\",\"name\":\"TIME.com\",\"profile_background_color\":\"CC0000\",\"created_at\":\"Thu Apr 03 13:54:30 +0000 2008\",\"default_profile_image\":false,\"followers_count\":5146268,\"profile_image_url_https\":\"https://pbs.twimg.com/profile_images/170079619 0/Picture_24_normal.png\",\"geo_enabled\":false,\"profile_background_image_url\":\"http://a0.twimg.com/profile_background_images/735228291/107f1a300a90ee713937234bb3d139c0.jpeg\",\"profile_background_image_url_https\":\"https://si0.twimg.com/profile_background_images/735228291/107f1a300a90ee713937234bb3d139c0.jpeg\",\"follow_request_sent\":false,\"entities\":{\"description\":{\"urls\":[]},\"url\":{\"urls\":[{\"expanded_url\":\"http://www.time.com\",\"indices\":[0,22],\"display_url\":\"time.com\",\"url\":\"http://t.co/4aYbUuAeSh\"}]}},\"url\":\"http://t.co/4aYbUuAeSh\",\"utc_offset\":-18000,\"time_zone\":\"Eastern Time (US & Canada)\",\"notifications\":false,\"profile_use_background_image\":true,\"friends_count\":742,\"profile_sidebar_fill_color\":\"D9D9D9\",\"screen_name\":\"TIME\",\"id_str\":\"14293310\",\"profile_image_url\":\"http://pbs.twimg.com/profile_images/1700796190/Picture_24_normal.png\",\"listed_count\":76944,\"is_translator\":false},\"coordinates\":null},\"contributors\ ":null,\"text\":\"RT @TIME: The Costa Concordia cruise ship accident could be a disaster for the industry | http://t.co/M9UUNvZi (via @TIMEMoneyland)\",\"geo\":null,\"retweeted\":false,\"in_reply_to_screen_name\":null,\"possibly_sensitive\":false,\"truncated\":false,\"lang\":\"en\",\"entities\":{\"symbols\":[],\"urls\":[{\"expanded_url\":\"http://ti.me/zYyEtD\",\"indices\":[90,110],\"display_url\":\"ti.me/zYyEtD\",\"url\":\"http://t.co/M9UUNvZi\"}],\"hashtags\":[],\"user_mentions\":[{\"id\":14293310,\"name\":\"TIME.com\",\"indices\":[3,8],\"screen_name\":\"TIME\",\"id_str\":\"14293310\"},{\"id\":245888431,\"name\":\"TIME Moneyland\",\"indices\":[116,130],\"screen_name\":\"TIMEMoneyland\",\"id_str\":\"245888431\"}]},\"in_reply_to_status_id_str\":null,\"id\":159475541894897679,\"source\":\"<a href=\\\"http://twitter.com/download/iphone\\\" rel=\\\"nofollow\\\">Twitter for iPhone<\\/a>\",\"in_reply_to_user_id_str\":null,\"favorited\":false,\"in_reply_to_status_id\":null,\"retweet_count \":71,\"created_at\":\"Wed Jan 18 03:21:46 +0000 2012\",\"in_reply_to_user_id\":null,\"favorite_count\":0,\"id_str\":\"159475541894897679\",\"place\":null,\"user\":{\"location\":\"\",\"default_profile\":false,\"profile_background_tile\":true,\"statuses_count\":5053,\"lang\":\"en\",\"profile_link_color\":\"738D84\",\"id\":27552112,\"following\":false,\"protected\":false,\"favourites_count\":52,\"profile_text_color\":\"97CEC9\",\"description\":\"\",\"verified\":false,\"contributors_enabled\":false,\"profile_sidebar_border_color\":\"A9AC00\",\"name\":\"rafael medina-flores\",\"profile_background_color\":\"C5EFE3\",\"created_at\":\"Mon Mar 30 01:21:55 +0000 2009\",\"default_profile_image\":false,\"followers_count\":963,\"profile_image_url_https\":\"https://pbs.twimg.com/profile_images/2519547938/image_normal.jpg\",\"geo_enabled\":true,\"profile_background_image_url\":\"http://a0.twimg.com/profile_background_images/167479660/trireme.jpg\",\"profile_background_image_url_https\":\"https:// si0.twimg.com/profile_background_images/167479660/trireme.jpg\",\"follow_request_sent\":false,\"entities\":{\"description\":{\"urls\":[]}},\"url\":null,\"utc_offset\":-25200,\"time_zone\":\"Mountain Time (US & Canada)\",\"notifications\":false,\"profile_use_background_image\":true,\"friends_count\":1800,\"profile_sidebar_fill_color\":\"5C4F3C\",\"screen_name\":\"rmedinaflores\",\"id_str\":\"27552112\",\"profile_image_url\":\"http://pbs.twimg.com/profile_images/2519547938/image_normal.jpg\",\"listed_count\":50,\"is_translator\":false},\"coordinates\":null}" + }; - String doc = (String) StringUtils.split(input[0], '\t').get(3); - ObjectMapper mapper = StreamsJacksonMapper.getInstance(Lists.newArrayList(new TwitterDateTimeFormat().getFormat())); - String outdoc = mapper.writeValueAsString(new TwitterJsonRetweetActivityConverter().toActivityList(mapper.readValue(doc, Retweet.class)).get(0)); + String doc = (String) StringUtils.split(input[0], '\t').get(3); + ObjectMapper mapper = StreamsJacksonMapper.getInstance(Lists.newArrayList(new TwitterDateTimeFormat().getFormat())); + String outdoc = mapper.writeValueAsString(new TwitterJsonRetweetActivityConverter().toActivityList(mapper.readValue(doc, Retweet.class)).get(0)); - String[] output = new String[1]; - output[0] = "(159475541894897679,twitter,statuses/user_timeline,1384499359006," + outdoc + ")"; + String[] output = new String[1]; + output[0] = "(159475541894897679,twitter,statuses/user_timeline,1384499359006," + outdoc + ")"; - PigTest test; - test = new PigTest("src/test/resources/pigconvertertest.pig"); - test.assertOutput("in", input, "out", output); + PigTest test; + test = new PigTest("src/test/resources/pigconvertertest.pig"); + test.assertOutput("in", input, "out", output); - } + } } http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-runtimes/streams-runtime-pig/src/test/java/org/apache/streams/pig/test/PigProcessDatumTest.java ---------------------------------------------------------------------- diff --git a/streams-runtimes/streams-runtime-pig/src/test/java/org/apache/streams/pig/test/PigProcessDatumTest.java b/streams-runtimes/streams-runtime-pig/src/test/java/org/apache/streams/pig/test/PigProcessDatumTest.java index 80b17b4..1cb7252 100644 --- a/streams-runtimes/streams-runtime-pig/src/test/java/org/apache/streams/pig/test/PigProcessDatumTest.java +++ b/streams-runtimes/streams-runtime-pig/src/test/java/org/apache/streams/pig/test/PigProcessDatumTest.java @@ -19,74 +19,74 @@ package org.apache.streams.pig.test; -import org.apache.pig.pigunit.PigTest; import org.apache.streams.core.StreamsDatum; -import org.apache.streams.twitter.converter.TwitterJsonTweetActivityConverter; + +import org.apache.pig.pigunit.PigTest; import org.apache.tools.ant.util.StringUtils; import org.junit.Test; import java.util.List; /** - * These are tests for StreamsProcessDatumExec + * These are tests for StreamsProcessDatumExec. */ public class PigProcessDatumTest { - @Test - public void testPigDoNothingSingleDatum() throws Exception { - String[] args = {}; + @Test + public void testPigDoNothingSingleDatum() throws Exception { + String[] args = {}; - String[] input = { - "159475541894897679\ttwitter,statuses/user_timeline\t1384499359006\t{content:\"content\",[\"a\":1,\"b\":\"c\"}", - }; + String[] input = { + "159475541894897679\ttwitter,statuses/user_timeline\t1384499359006\t{content:\"content\",[\"a\":1,\"b\":\"c\"}", + }; - DoNothingProcessor processor = new DoNothingProcessor(); + DoNothingProcessor processor = new DoNothingProcessor(); - String doc = (String) StringUtils.split(input[0], '\t').get(3); - StreamsDatum inputDatum = new StreamsDatum(doc); - inputDatum.setId((String) StringUtils.split(input[0], '\t').get(0)); + String doc = (String) StringUtils.split(input[0], '\t').get(3); + StreamsDatum inputDatum = new StreamsDatum(doc); + inputDatum.setId((String) StringUtils.split(input[0], '\t').get(0)); - processor.prepare(null); + processor.prepare(null); - StreamsDatum resultDatum = processor.process(inputDatum).get(0); - String resultDocument = (String) resultDatum.getDocument(); + StreamsDatum resultDatum = processor.process(inputDatum).get(0); + String resultDocument = (String) resultDatum.getDocument(); - String[] output = new String[1]; - output[0] = "(159475541894897679,twitter,statuses/user_timeline,1384499359006," + resultDocument + ")"; + String[] output = new String[1]; + output[0] = "(159475541894897679,twitter,statuses/user_timeline,1384499359006," + resultDocument + ")"; - PigTest test; - test = new PigTest("src/test/resources/pigprocessdatumtest.pig", args); - test.assertOutput("in", input, "out", output); + PigTest test; + test = new PigTest("src/test/resources/pigprocessdatumtest.pig", args); + test.assertOutput("in", input, "out", output); - } + } - @Test - public void testPigCopyThriceSingleDatum() throws Exception { - String[] args = {}; + @Test + public void testPigCopyThriceSingleDatum() throws Exception { + String[] args = {}; - String[] input = { - "159475541894897679\ttwitter,statuses/user_timeline\t1384499359006\t{content:\"content\",[\"a\":1,\"b\":\"c\"}", - }; + String[] input = { + "159475541894897679\ttwitter,statuses/user_timeline\t1384499359006\t{content:\"content\",[\"a\":1,\"b\":\"c\"}", + }; - CopyThriceProcessor processor = new CopyThriceProcessor(); + CopyThriceProcessor processor = new CopyThriceProcessor(); - String doc = (String) StringUtils.split(input[0], '\t').get(3); - StreamsDatum inputDatum = new StreamsDatum(doc); - inputDatum.setId((String) StringUtils.split(input[0], '\t').get(0)); + String doc = (String) StringUtils.split(input[0], '\t').get(3); + StreamsDatum inputDatum = new StreamsDatum(doc); + inputDatum.setId((String) StringUtils.split(input[0], '\t').get(0)); - processor.prepare(null); + processor.prepare(null); - List<StreamsDatum> resultSet = processor.process(inputDatum); + List<StreamsDatum> resultSet = processor.process(inputDatum); - String[] output = new String[resultSet.size()]; + String[] output = new String[resultSet.size()]; - for( int i = 0; i < output.length; i++ ) { - output[i] = "(159475541894897679,twitter,statuses/user_timeline,1384499359006," + resultSet.get(i).getDocument() + ")"; - } + for( int i = 0; i < output.length; i++ ) { + output[i] = "(159475541894897679,twitter,statuses/user_timeline,1384499359006," + resultSet.get(i).getDocument() + ")"; + } - PigTest test; - test = new PigTest("src/test/resources/pigprocessdatumcopytest.pig", args); - test.assertOutput("in", input, "out", output); + PigTest test; + test = new PigTest("src/test/resources/pigprocessdatumcopytest.pig", args); + test.assertOutput("in", input, "out", output); - } + } } http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-runtimes/streams-runtime-pig/src/test/java/org/apache/streams/pig/test/PigProcessDocumentTest.java ---------------------------------------------------------------------- diff --git a/streams-runtimes/streams-runtime-pig/src/test/java/org/apache/streams/pig/test/PigProcessDocumentTest.java b/streams-runtimes/streams-runtime-pig/src/test/java/org/apache/streams/pig/test/PigProcessDocumentTest.java index dd30eb1..2832fdc 100644 --- a/streams-runtimes/streams-runtime-pig/src/test/java/org/apache/streams/pig/test/PigProcessDocumentTest.java +++ b/streams-runtimes/streams-runtime-pig/src/test/java/org/apache/streams/pig/test/PigProcessDocumentTest.java @@ -19,94 +19,95 @@ package org.apache.streams.pig.test; -import org.apache.pig.pigunit.PigTest; import org.apache.streams.core.StreamsDatum; + +import org.apache.pig.pigunit.PigTest; import org.apache.tools.ant.util.StringUtils; import org.junit.Test; /** - * These are tests for StreamsProcessDocumentExec + * These are tests for StreamsProcessDocumentExec. */ public class PigProcessDocumentTest { - @Test - public void testPigProcessEmptyDocument() throws Exception { + @Test + public void testPigProcessEmptyDocument() throws Exception { - String[] input = { - "159475541894897679\ttwitter,statuses/user_timeline\t1384499359006\t{}" - }; + String[] input = { + "159475541894897679\ttwitter,statuses/user_timeline\t1384499359006\t{}" + }; - DoNothingProcessor processor = new DoNothingProcessor(); + DoNothingProcessor processor = new DoNothingProcessor(); - String doc = (String) StringUtils.split(input[0], '\t').get(3); - StreamsDatum inputDatum = new StreamsDatum(doc); - inputDatum.setId((String) StringUtils.split(input[0], '\t').get(0)); + String doc = (String) StringUtils.split(input[0], '\t').get(3); + StreamsDatum inputDatum = new StreamsDatum(doc); + inputDatum.setId((String) StringUtils.split(input[0], '\t').get(0)); - processor.prepare(null); + processor.prepare(null); - StreamsDatum resultDatum = processor.process(inputDatum).get(0); - String resultDocument = (String) resultDatum.getDocument(); + StreamsDatum resultDatum = processor.process(inputDatum).get(0); + String resultDocument = (String) resultDatum.getDocument(); - String[] output = new String[1]; - output[0] = "(159475541894897679,twitter,statuses/user_timeline,1384499359006,"+resultDocument+")"; + String[] output = new String[1]; + output[0] = "(159475541894897679,twitter,statuses/user_timeline,1384499359006,"+resultDocument+")"; - PigTest test; - test = new PigTest("src/test/resources/pigprocessdocumenttest.pig"); - test.assertOutput("in", input, "out", output); + PigTest test; + test = new PigTest("src/test/resources/pigprocessdocumenttest.pig"); + test.assertOutput("in", input, "out", output); - } + } - @Test - public void testPigProcessJsonDocument() throws Exception { + @Test + public void testPigProcessJsonDocument() throws Exception { - String[] input = { - "159475541894897679\ttwitter,statuses/user_timeline\t1384499359006\t{content:\"content\",[\"a\":1,\"b\":\"c\"}" - }; + String[] input = { + "159475541894897679\ttwitter,statuses/user_timeline\t1384499359006\t{content:\"content\",[\"a\":1,\"b\":\"c\"}" + }; - DoNothingProcessor processor = new DoNothingProcessor(); + DoNothingProcessor processor = new DoNothingProcessor(); - String doc = (String) StringUtils.split(input[0], '\t').get(3); - StreamsDatum inputDatum = new StreamsDatum(doc); - inputDatum.setId((String) StringUtils.split(input[0], '\t').get(0)); + String doc = (String) StringUtils.split(input[0], '\t').get(3); + StreamsDatum inputDatum = new StreamsDatum(doc); + inputDatum.setId((String) StringUtils.split(input[0], '\t').get(0)); - processor.prepare(null); + processor.prepare(null); - StreamsDatum resultDatum = processor.process(inputDatum).get(0); - String resultDocument = (String) resultDatum.getDocument(); + StreamsDatum resultDatum = processor.process(inputDatum).get(0); + String resultDocument = (String) resultDatum.getDocument(); - String[] output = new String[1]; - output[0] = "(159475541894897679,twitter,statuses/user_timeline,1384499359006,"+resultDocument+")"; + String[] output = new String[1]; + output[0] = "(159475541894897679,twitter,statuses/user_timeline,1384499359006,"+resultDocument+")"; - PigTest test; - test = new PigTest("src/test/resources/pigprocessdocumenttest.pig"); - test.assertOutput("in", input, "out", output); + PigTest test; + test = new PigTest("src/test/resources/pigprocessdocumenttest.pig"); + test.assertOutput("in", input, "out", output); - } + } - @Test - public void testPigProcessAppendDocument() throws Exception { + @Test + public void testPigProcessAppendDocument() throws Exception { - String[] input = { - "159475541894897679\ttwitter,statuses/user_timeline\t1384499359006\thowdy" - }; + String[] input = { + "159475541894897679\ttwitter,statuses/user_timeline\t1384499359006\thowdy" + }; - AppendStringProcessor processor = new AppendStringProcessor(); + AppendStringProcessor processor = new AppendStringProcessor(); - String doc = (String) StringUtils.split(input[0], '\t').get(3); - StreamsDatum inputDatum = new StreamsDatum(doc); - inputDatum.setId((String) StringUtils.split(input[0], '\t').get(0)); + String doc = (String) StringUtils.split(input[0], '\t').get(3); + StreamsDatum inputDatum = new StreamsDatum(doc); + inputDatum.setId((String) StringUtils.split(input[0], '\t').get(0)); - processor.prepare(new String[]{"doody"}); + processor.prepare(new String[]{"doody"}); - StreamsDatum resultDatum = processor.process(inputDatum).get(0); - String resultDocument = (String) resultDatum.getDocument(); + StreamsDatum resultDatum = processor.process(inputDatum).get(0); + String resultDocument = (String) resultDatum.getDocument(); - String[] output = new String[1]; - output[0] = "(159475541894897679,twitter,statuses/user_timeline,1384499359006,"+resultDocument+")"; + String[] output = new String[1]; + output[0] = "(159475541894897679,twitter,statuses/user_timeline,1384499359006,"+resultDocument+")"; - PigTest test; - test = new PigTest("src/test/resources/pigprocessdocumentappendtest.pig"); - test.assertOutput("in", input, "out", output); + PigTest test; + test = new PigTest("src/test/resources/pigprocessdocumentappendtest.pig"); + test.assertOutput("in", input, "out", output); - } + } }
