http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/f699409f/crunch/src/test/java/org/apache/crunch/lib/SortTest.java ---------------------------------------------------------------------- diff --git a/crunch/src/test/java/org/apache/crunch/lib/SortTest.java b/crunch/src/test/java/org/apache/crunch/lib/SortTest.java index 8d2838f..981426b 100644 --- a/crunch/src/test/java/org/apache/crunch/lib/SortTest.java +++ b/crunch/src/test/java/org/apache/crunch/lib/SortTest.java @@ -20,17 +20,17 @@ package org.apache.crunch.lib; import static org.apache.crunch.lib.Sort.ColumnOrder.by; import static org.apache.crunch.lib.Sort.Order.ASCENDING; import static org.apache.crunch.lib.Sort.Order.DESCENDING; +import static org.apache.crunch.test.StringWrapper.wrap; import static org.junit.Assert.assertEquals; import java.io.IOException; import java.io.Serializable; import java.util.Arrays; - -import org.junit.Ignore; -import org.junit.Test; +import java.util.List; import org.apache.crunch.DoFn; import org.apache.crunch.Emitter; +import org.apache.crunch.MapFn; import org.apache.crunch.PCollection; import org.apache.crunch.PTable; import org.apache.crunch.Pair; @@ -42,113 +42,163 @@ import org.apache.crunch.impl.mr.MRPipeline; import org.apache.crunch.lib.Sort.ColumnOrder; import org.apache.crunch.lib.Sort.Order; import org.apache.crunch.test.FileHelper; +import org.apache.crunch.test.StringWrapper; import org.apache.crunch.types.PType; import org.apache.crunch.types.PTypeFamily; import org.apache.crunch.types.avro.AvroTypeFamily; +import org.apache.crunch.types.avro.Avros; import org.apache.crunch.types.writable.WritableTypeFamily; +import org.junit.Ignore; +import org.junit.Test; + +import com.google.common.collect.Lists; public class SortTest implements Serializable { - + @Test public void testWritableSortAsc() throws Exception { - runSingle(new MRPipeline(SortTest.class), WritableTypeFamily.getInstance(), - Order.ASCENDING, "A\tand this text as well"); + runSingle(new MRPipeline(SortTest.class), WritableTypeFamily.getInstance(), Order.ASCENDING, + "A\tand this text as well"); } @Test public void testWritableSortDesc() throws Exception { - runSingle(new MRPipeline(SortTest.class), WritableTypeFamily.getInstance(), - Order.DESCENDING, "B\tthis doc has some text"); + runSingle(new MRPipeline(SortTest.class), WritableTypeFamily.getInstance(), Order.DESCENDING, + "B\tthis doc has some text"); } - + @Test public void testWritableSortAscDesc() throws Exception { - runPair(new MRPipeline(SortTest.class), WritableTypeFamily.getInstance(), - by(1, ASCENDING), by(2, DESCENDING), "A", "this doc has this text"); + runPair(new MRPipeline(SortTest.class), WritableTypeFamily.getInstance(), by(1, ASCENDING), + by(2, DESCENDING), "A", "this doc has this text"); } @Test public void testWritableSortSecondDescFirstDesc() throws Exception { - runPair(new MRPipeline(SortTest.class), WritableTypeFamily.getInstance(), - by(2, DESCENDING), by(1, ASCENDING), "A", "this doc has this text"); + runPair(new MRPipeline(SortTest.class), WritableTypeFamily.getInstance(), by(2, DESCENDING), + by(1, ASCENDING), "A", "this doc has this text"); } @Test public void testWritableSortTripleAscDescAsc() throws Exception { - runTriple(new MRPipeline(SortTest.class), WritableTypeFamily.getInstance(), - by(1, ASCENDING), by(2, DESCENDING), by(3, ASCENDING), "A", "this", "doc"); + runTriple(new MRPipeline(SortTest.class), WritableTypeFamily.getInstance(), by(1, ASCENDING), + by(2, DESCENDING), by(3, ASCENDING), "A", "this", "doc"); } @Test public void testWritableSortQuadAscDescAscDesc() throws Exception { - runQuad(new MRPipeline(SortTest.class), WritableTypeFamily.getInstance(), - by(1, ASCENDING), by(2, DESCENDING), by(3, ASCENDING), by(4, DESCENDING), "A", "this", "doc", "has"); + runQuad(new MRPipeline(SortTest.class), WritableTypeFamily.getInstance(), by(1, ASCENDING), + by(2, DESCENDING), by(3, ASCENDING), by(4, DESCENDING), "A", "this", "doc", "has"); } @Test public void testWritableSortTupleNAscDesc() throws Exception { - runTupleN(new MRPipeline(SortTest.class), WritableTypeFamily.getInstance(), - new ColumnOrder[] { by(1, ASCENDING), by(2, DESCENDING)}, new String[] { "A", "this doc has this text" }); + runTupleN(new MRPipeline(SortTest.class), WritableTypeFamily.getInstance(), new ColumnOrder[] { + by(1, ASCENDING), by(2, DESCENDING) }, new String[] { "A", "this doc has this text" }); } @Test public void testWritableSortTable() throws Exception { - runTable(new MRPipeline(SortTest.class), WritableTypeFamily.getInstance(), - "A"); + runTable(new MRPipeline(SortTest.class), WritableTypeFamily.getInstance(), "A"); } - + @Test public void testAvroSortAsc() throws Exception { - runSingle(new MRPipeline(SortTest.class), AvroTypeFamily.getInstance(), - Order.ASCENDING, "A\tand this text as well"); + runSingle(new MRPipeline(SortTest.class), AvroTypeFamily.getInstance(), Order.ASCENDING, + "A\tand this text as well"); } - + @Test public void testAvroSortDesc() throws Exception { - runSingle(new MRPipeline(SortTest.class), AvroTypeFamily.getInstance(), - Order.DESCENDING, "B\tthis doc has some text"); + runSingle(new MRPipeline(SortTest.class), AvroTypeFamily.getInstance(), Order.DESCENDING, + "B\tthis doc has some text"); } - + @Test public void testAvroSortPairAscAsc() throws Exception { - runPair(new MRPipeline(SortTest.class), AvroTypeFamily.getInstance(), - by(1, ASCENDING), by(2, DESCENDING), "A", "this doc has this text"); + runPair(new MRPipeline(SortTest.class), AvroTypeFamily.getInstance(), by(1, ASCENDING), + by(2, DESCENDING), "A", "this doc has this text"); } - + @Test @Ignore("Avro sorting only works in field order at the moment") public void testAvroSortPairSecondAscFirstDesc() throws Exception { - runPair(new MRPipeline(SortTest.class), AvroTypeFamily.getInstance(), - by(2, DESCENDING), by(1, ASCENDING), "A", "this doc has this text"); + runPair(new MRPipeline(SortTest.class), AvroTypeFamily.getInstance(), by(2, DESCENDING), + by(1, ASCENDING), "A", "this doc has this text"); } - + @Test public void testAvroSortTripleAscDescAsc() throws Exception { - runTriple(new MRPipeline(SortTest.class), AvroTypeFamily.getInstance(), - by(1, ASCENDING), by(2, DESCENDING), by(3, ASCENDING), "A", "this", "doc"); + runTriple(new MRPipeline(SortTest.class), AvroTypeFamily.getInstance(), by(1, ASCENDING), + by(2, DESCENDING), by(3, ASCENDING), "A", "this", "doc"); } @Test public void testAvroSortQuadAscDescAscDesc() throws Exception { - runQuad(new MRPipeline(SortTest.class), AvroTypeFamily.getInstance(), - by(1, ASCENDING), by(2, DESCENDING), by(3, ASCENDING), by(4, DESCENDING), "A", "this", "doc", "has"); + runQuad(new MRPipeline(SortTest.class), AvroTypeFamily.getInstance(), by(1, ASCENDING), + by(2, DESCENDING), by(3, ASCENDING), by(4, DESCENDING), "A", "this", "doc", "has"); } @Test public void testAvroSortTupleNAscDesc() throws Exception { runTupleN(new MRPipeline(SortTest.class), AvroTypeFamily.getInstance(), - new ColumnOrder[] { by(1, ASCENDING), by(2, DESCENDING) }, new String[] { "A", "this doc has this text" }); + new ColumnOrder[] { by(1, ASCENDING), by(2, DESCENDING) }, new String[] { "A", + "this doc has this text" }); + } + + @Test + public void testAvroReflectSortPair() throws IOException { + Pipeline pipeline = new MRPipeline(SortTest.class); + PCollection<Pair<String, StringWrapper>> sorted = pipeline + .readTextFile(FileHelper.createTempCopyOf("set2.txt")) + .parallelDo(new MapFn<String, Pair<String, StringWrapper>>() { + + @Override + public Pair<String, StringWrapper> map(String input) { + return Pair.of(input, wrap(input)); + } + }, Avros.pairs(Avros.strings(), Avros.reflects(StringWrapper.class))).sort(true); + + List<Pair<String, StringWrapper>> expected = Lists.newArrayList(); + expected.add(Pair.of("a", wrap("a"))); + expected.add(Pair.of("c", wrap("c"))); + expected.add(Pair.of("d", wrap("d"))); + + assertEquals(expected, Lists.newArrayList(sorted.materialize())); + } + + @Test + public void testAvroReflectSortTable() throws IOException { + Pipeline pipeline = new MRPipeline(SortTest.class); + PTable<String, StringWrapper> unsorted = pipeline.readTextFile( + FileHelper.createTempCopyOf("set2.txt")).parallelDo( + new MapFn<String, Pair<String, StringWrapper>>() { + + @Override + public Pair<String, StringWrapper> map(String input) { + return Pair.of(input, wrap(input)); + } + }, Avros.tableOf(Avros.strings(), Avros.reflects(StringWrapper.class))); + + PTable<String, StringWrapper> sorted = Sort.sort(unsorted); + + List<Pair<String, StringWrapper>> expected = Lists.newArrayList(); + expected.add(Pair.of("a", wrap("a"))); + expected.add(Pair.of("c", wrap("c"))); + expected.add(Pair.of("d", wrap("d"))); + + assertEquals(expected, Lists.newArrayList(sorted.materialize())); } - + @Test public void testAvroSortTable() throws Exception { runTable(new MRPipeline(SortTest.class), AvroTypeFamily.getInstance(), "A"); } - private void runSingle(Pipeline pipeline, PTypeFamily typeFamily, - Order order, String firstLine) throws IOException { + private void runSingle(Pipeline pipeline, PTypeFamily typeFamily, Order order, String firstLine) + throws IOException { String inputPath = FileHelper.createTempCopyOf("docs.txt"); - + PCollection<String> input = pipeline.readTextFile(inputPath); // following turns the input from Writables to required type family PCollection<String> input2 = input.parallelDo(new DoFn<String, String>() { @@ -159,24 +209,24 @@ public class SortTest implements Serializable { }, typeFamily.strings()); PCollection<String> sorted = Sort.sort(input2, order); Iterable<String> lines = sorted.materialize(); - + assertEquals(firstLine, lines.iterator().next()); pipeline.done(); // TODO: finally } - - private void runPair(Pipeline pipeline, PTypeFamily typeFamily, - ColumnOrder first, ColumnOrder second, String firstField, String secondField) throws IOException { + + private void runPair(Pipeline pipeline, PTypeFamily typeFamily, ColumnOrder first, + ColumnOrder second, String firstField, String secondField) throws IOException { String inputPath = FileHelper.createTempCopyOf("docs.txt"); - + PCollection<String> input = pipeline.readTextFile(inputPath); PCollection<Pair<String, String>> kv = input.parallelDo( - new DoFn<String, Pair<String, String>>() { - @Override - public void process(String input, Emitter<Pair<String, String>> emitter) { - String[] split = input.split("[\t]+"); - emitter.emit(Pair.of(split[0], split[1])); - } - }, typeFamily.pairs(typeFamily.strings(), typeFamily.strings())); + new DoFn<String, Pair<String, String>>() { + @Override + public void process(String input, Emitter<Pair<String, String>> emitter) { + String[] split = input.split("[\t]+"); + emitter.emit(Pair.of(split[0], split[1])); + } + }, typeFamily.pairs(typeFamily.strings(), typeFamily.strings())); PCollection<Pair<String, String>> sorted = Sort.sortPairs(kv, first, second); Iterable<Pair<String, String>> lines = sorted.materialize(); Pair<String, String> l = lines.iterator().next(); @@ -184,21 +234,22 @@ public class SortTest implements Serializable { assertEquals(secondField, l.second()); pipeline.done(); } - - private void runTriple(Pipeline pipeline, PTypeFamily typeFamily, - ColumnOrder first, ColumnOrder second, ColumnOrder third, String firstField, String secondField, String thirdField) throws IOException { + + private void runTriple(Pipeline pipeline, PTypeFamily typeFamily, ColumnOrder first, + ColumnOrder second, ColumnOrder third, String firstField, String secondField, + String thirdField) throws IOException { String inputPath = FileHelper.createTempCopyOf("docs.txt"); - + PCollection<String> input = pipeline.readTextFile(inputPath); PCollection<Tuple3<String, String, String>> kv = input.parallelDo( - new DoFn<String, Tuple3<String, String, String>>() { - @Override - public void process(String input, Emitter<Tuple3<String, String, String>> emitter) { - String[] split = input.split("[\t ]+"); - int len = split.length; - emitter.emit(Tuple3.of(split[0], split[1 % len], split[2 % len])); - } - }, typeFamily.triples(typeFamily.strings(), typeFamily.strings(), typeFamily.strings())); + new DoFn<String, Tuple3<String, String, String>>() { + @Override + public void process(String input, Emitter<Tuple3<String, String, String>> emitter) { + String[] split = input.split("[\t ]+"); + int len = split.length; + emitter.emit(Tuple3.of(split[0], split[1 % len], split[2 % len])); + } + }, typeFamily.triples(typeFamily.strings(), typeFamily.strings(), typeFamily.strings())); PCollection<Tuple3<String, String, String>> sorted = Sort.sortTriples(kv, first, second, third); Iterable<Tuple3<String, String, String>> lines = sorted.materialize(); Tuple3<String, String, String> l = lines.iterator().next(); @@ -207,23 +258,25 @@ public class SortTest implements Serializable { assertEquals(thirdField, l.third()); pipeline.done(); } - - private void runQuad(Pipeline pipeline, PTypeFamily typeFamily, - ColumnOrder first, ColumnOrder second, ColumnOrder third, ColumnOrder fourth, - String firstField, String secondField, String thirdField, String fourthField) throws IOException { + + private void runQuad(Pipeline pipeline, PTypeFamily typeFamily, ColumnOrder first, + ColumnOrder second, ColumnOrder third, ColumnOrder fourth, String firstField, + String secondField, String thirdField, String fourthField) throws IOException { String inputPath = FileHelper.createTempCopyOf("docs.txt"); - + PCollection<String> input = pipeline.readTextFile(inputPath); PCollection<Tuple4<String, String, String, String>> kv = input.parallelDo( - new DoFn<String, Tuple4<String, String, String, String>>() { - @Override - public void process(String input, Emitter<Tuple4<String, String, String, String>> emitter) { - String[] split = input.split("[\t ]+"); - int len = split.length; - emitter.emit(Tuple4.of(split[0], split[1 % len], split[2 % len], split[3 % len])); - } - }, typeFamily.quads(typeFamily.strings(), typeFamily.strings(), typeFamily.strings(), typeFamily.strings())); - PCollection<Tuple4<String, String, String, String>> sorted = Sort.sortQuads(kv, first, second, third, fourth); + new DoFn<String, Tuple4<String, String, String, String>>() { + @Override + public void process(String input, Emitter<Tuple4<String, String, String, String>> emitter) { + String[] split = input.split("[\t ]+"); + int len = split.length; + emitter.emit(Tuple4.of(split[0], split[1 % len], split[2 % len], split[3 % len])); + } + }, typeFamily.quads(typeFamily.strings(), typeFamily.strings(), typeFamily.strings(), + typeFamily.strings())); + PCollection<Tuple4<String, String, String, String>> sorted = Sort.sortQuads(kv, first, second, + third, fourth); Iterable<Tuple4<String, String, String, String>> lines = sorted.materialize(); Tuple4<String, String, String, String> l = lines.iterator().next(); assertEquals(firstField, l.first()); @@ -232,46 +285,44 @@ public class SortTest implements Serializable { assertEquals(fourthField, l.fourth()); pipeline.done(); } - - private void runTupleN(Pipeline pipeline, PTypeFamily typeFamily, - ColumnOrder[] orders, String[] fields) throws IOException { + + private void runTupleN(Pipeline pipeline, PTypeFamily typeFamily, ColumnOrder[] orders, + String[] fields) throws IOException { String inputPath = FileHelper.createTempCopyOf("docs.txt"); - + PCollection<String> input = pipeline.readTextFile(inputPath); PType[] types = new PType[orders.length]; Arrays.fill(types, typeFamily.strings()); - PCollection<TupleN> kv = input.parallelDo( - new DoFn<String, TupleN>() { - @Override - public void process(String input, Emitter<TupleN> emitter) { - String[] split = input.split("[\t]+"); - emitter.emit(new TupleN(split)); - } + PCollection<TupleN> kv = input.parallelDo(new DoFn<String, TupleN>() { + @Override + public void process(String input, Emitter<TupleN> emitter) { + String[] split = input.split("[\t]+"); + emitter.emit(new TupleN(split)); + } }, typeFamily.tuples(types)); PCollection<TupleN> sorted = Sort.sortTuples(kv, orders); Iterable<TupleN> lines = sorted.materialize(); TupleN l = lines.iterator().next(); int i = 0; for (String field : fields) { - assertEquals(field, l.get(i++)); + assertEquals(field, l.get(i++)); } pipeline.done(); } - private void runTable(Pipeline pipeline, PTypeFamily typeFamily, - String firstKey) throws IOException { + private void runTable(Pipeline pipeline, PTypeFamily typeFamily, String firstKey) + throws IOException { String inputPath = FileHelper.createTempCopyOf("docs.txt"); - + PCollection<String> input = pipeline.readTextFile(inputPath); - PTable<String, String> table = input.parallelDo( - new DoFn<String, Pair<String, String>>() { - @Override - public void process(String input, Emitter<Pair<String, String>> emitter) { - String[] split = input.split("[\t]+"); - emitter.emit(Pair.of(split[0], split[1])); - } - }, typeFamily.tableOf(typeFamily.strings(), typeFamily.strings())); - + PTable<String, String> table = input.parallelDo(new DoFn<String, Pair<String, String>>() { + @Override + public void process(String input, Emitter<Pair<String, String>> emitter) { + String[] split = input.split("[\t]+"); + emitter.emit(Pair.of(split[0], split[1])); + } + }, typeFamily.tableOf(typeFamily.strings(), typeFamily.strings())); + PTable<String, String> sorted = Sort.sort(table); Iterable<Pair<String, String>> lines = sorted.materialize(); Pair<String, String> l = lines.iterator().next();
http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/f699409f/crunch/src/test/java/org/apache/crunch/test/StringWrapper.java ---------------------------------------------------------------------- diff --git a/crunch/src/test/java/org/apache/crunch/test/StringWrapper.java b/crunch/src/test/java/org/apache/crunch/test/StringWrapper.java new file mode 100644 index 0000000..5b3c4c4 --- /dev/null +++ b/crunch/src/test/java/org/apache/crunch/test/StringWrapper.java @@ -0,0 +1,85 @@ +package org.apache.crunch.test; + +import org.apache.crunch.MapFn; + +/** + * Simple String wrapper for testing with Avro reflection. + */ +public class StringWrapper implements Comparable<StringWrapper> { + + public static class StringToStringWrapperMapFn extends MapFn<String, StringWrapper> { + + @Override + public StringWrapper map(String input) { + return wrap(input); + } + + } + + public static class StringWrapperToStringMapFn extends MapFn<StringWrapper, String> { + + @Override + public String map(StringWrapper input) { + return input.getValue(); + } + + } + + private String value; + + public StringWrapper() { + this(""); + } + + public StringWrapper(String value) { + this.value = value; + } + + @Override + public int compareTo(StringWrapper o) { + return this.value.compareTo(o.value); + } + + public String getValue() { + return value; + } + + public void setValue(String value) { + this.value = value; + } + + @Override + public int hashCode() { + final int prime = 31; + int result = 1; + result = prime * result + ((value == null) ? 0 : value.hashCode()); + return result; + } + + @Override + public boolean equals(Object obj) { + if (this == obj) + return true; + if (obj == null) + return false; + if (getClass() != obj.getClass()) + return false; + StringWrapper other = (StringWrapper) obj; + if (value == null) { + if (other.value != null) + return false; + } else if (!value.equals(other.value)) + return false; + return true; + } + + @Override + public String toString() { + return "StringWrapper [value=" + value + "]"; + } + + public static StringWrapper wrap(String value) { + return new StringWrapper(value); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/f699409f/crunch/src/test/java/org/apache/crunch/types/avro/AvroTableTypeTest.java ---------------------------------------------------------------------- diff --git a/crunch/src/test/java/org/apache/crunch/types/avro/AvroTableTypeTest.java b/crunch/src/test/java/org/apache/crunch/types/avro/AvroTableTypeTest.java index fa9da1a..9917685 100644 --- a/crunch/src/test/java/org/apache/crunch/types/avro/AvroTableTypeTest.java +++ b/crunch/src/test/java/org/apache/crunch/types/avro/AvroTableTypeTest.java @@ -18,13 +18,16 @@ package org.apache.crunch.types.avro; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotSame; import static org.junit.Assert.assertSame; - -import org.junit.Test; +import static org.junit.Assert.assertTrue; import org.apache.crunch.Pair; import org.apache.crunch.test.Person; +import org.apache.crunch.test.StringWrapper; +import org.junit.Test; + import com.google.common.collect.Lists; public class AvroTableTypeTest { @@ -49,4 +52,19 @@ public class AvroTableTypeTest { assertNotSame(person, detachedPair.second()); } + @Test + public void testIsReflect_ContainsReflectKey() { + assertTrue(Avros.tableOf(Avros.reflects(StringWrapper.class), Avros.ints()).isReflect()); + } + + @Test + public void testIsReflect_ContainsReflectValue() { + assertTrue(Avros.tableOf(Avros.ints(), Avros.reflects(StringWrapper.class)).isReflect()); + } + + @Test + public void testReflect_NoReflectKeyOrValue() { + assertFalse(Avros.tableOf(Avros.ints(), Avros.ints()).isReflect()); + } + } http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/f699409f/crunch/src/test/java/org/apache/crunch/types/avro/AvroTypeTest.java ---------------------------------------------------------------------- diff --git a/crunch/src/test/java/org/apache/crunch/types/avro/AvroTypeTest.java b/crunch/src/test/java/org/apache/crunch/types/avro/AvroTypeTest.java index 2bebca1..2a80a5e 100644 --- a/crunch/src/test/java/org/apache/crunch/types/avro/AvroTypeTest.java +++ b/crunch/src/test/java/org/apache/crunch/types/avro/AvroTypeTest.java @@ -25,66 +25,113 @@ import static org.junit.Assert.assertTrue; import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericData.Record; +import org.apache.crunch.test.Person; +import org.apache.crunch.test.StringWrapper; import org.junit.Test; -import org.apache.crunch.test.Person; import com.google.common.collect.Lists; public class AvroTypeTest { - @Test - public void testIsSpecific_SpecificData() { - assertTrue(Avros.records(Person.class).isSpecific()); - } - - @Test - public void testIsGeneric_SpecificData() { - assertFalse(Avros.records(Person.class).isGeneric()); - } - - @Test - public void testIsSpecific_GenericData() { - assertFalse(Avros.generics(Person.SCHEMA$).isSpecific()); - } - - @Test - public void testIsGeneric_GenericData() { - assertTrue(Avros.generics(Person.SCHEMA$).isGeneric()); - } - - @Test - public void testIsSpecific_NonAvroClass() { - assertFalse(Avros.ints().isSpecific()); - } - - @Test - public void testIsGeneric_NonAvroClass() { - assertFalse(Avros.ints().isGeneric()); - } - - @Test - public void testIsSpecific_SpecificAvroTable() { - assertTrue(Avros.tableOf(Avros.strings(), Avros.records(Person.class)) - .isSpecific()); - } - - @Test - public void testIsGeneric_SpecificAvroTable() { - assertFalse(Avros.tableOf(Avros.strings(), Avros.records(Person.class)) - .isGeneric()); - } - - @Test - public void testIsSpecific_GenericAvroTable() { - assertFalse(Avros.tableOf(Avros.strings(), - Avros.generics(Person.SCHEMA$)).isSpecific()); - } - - @Test - public void testIsGeneric_GenericAvroTable() { - assertTrue(Avros.tableOf(Avros.strings(), - Avros.generics(Person.SCHEMA$)).isGeneric()); - } + @Test + public void testIsSpecific_SpecificData() { + assertTrue(Avros.records(Person.class).isSpecific()); + } + + @Test + public void testIsGeneric_SpecificData() { + assertFalse(Avros.records(Person.class).isGeneric()); + } + + @Test + public void testIsSpecific_GenericData() { + assertFalse(Avros.generics(Person.SCHEMA$).isSpecific()); + } + + @Test + public void testIsGeneric_GenericData() { + assertTrue(Avros.generics(Person.SCHEMA$).isGeneric()); + } + + @Test + public void testIsSpecific_NonAvroClass() { + assertFalse(Avros.ints().isSpecific()); + } + + @Test + public void testIsGeneric_NonAvroClass() { + assertFalse(Avros.ints().isGeneric()); + } + + @Test + public void testIsSpecific_SpecificAvroTable() { + assertFalse(Avros.tableOf(Avros.strings(), Avros.records(Person.class)).isSpecific()); + } + + @Test + public void testIsGeneric_SpecificAvroTable() { + assertFalse(Avros.tableOf(Avros.strings(), Avros.records(Person.class)).isGeneric()); + } + + @Test + public void testIsSpecific_GenericAvroTable() { + assertFalse(Avros.tableOf(Avros.strings(), Avros.generics(Person.SCHEMA$)).isSpecific()); + } + + @Test + public void testIsGeneric_GenericAvroTable() { + assertFalse(Avros.tableOf(Avros.strings(), Avros.generics(Person.SCHEMA$)).isGeneric()); + } + + @Test + public void testIsReflect_GenericType() { + assertFalse(Avros.generics(Person.SCHEMA$).isReflect()); + } + + @Test + public void testIsReflect_SpecificType() { + assertFalse(Avros.records(Person.class).isReflect()); + } + + @Test + public void testIsReflect_ReflectSimpleType() { + assertTrue(Avros.reflects(StringWrapper.class).isReflect()); + } + + @Test + public void testIsReflect_NonReflectSubType() { + assertFalse(Avros.pairs(Avros.ints(), Avros.ints()).isReflect()); + } + + @Test + public void testIsReflect_ReflectSubType() { + assertTrue(Avros.pairs(Avros.ints(), Avros.reflects(StringWrapper.class)).isReflect()); + } + + @Test + public void testIsReflect_TableOfNonReflectTypes() { + assertFalse(Avros.tableOf(Avros.ints(), Avros.strings()).isReflect()); + } + + @Test + public void testIsReflect_TableWithReflectKey() { + assertTrue(Avros.tableOf(Avros.reflects(StringWrapper.class), Avros.ints()).isReflect()); + } + + @Test + public void testIsReflect_TableWithReflectValue() { + assertTrue(Avros.tableOf(Avros.ints(), Avros.reflects(StringWrapper.class)).isReflect()); + } + + @Test + public void testReflect_CollectionContainingReflectValue() { + assertTrue(Avros.collections(Avros.reflects(StringWrapper.class)).isReflect()); + } + + @Test + public void testReflect_CollectionNotContainingReflectValue() { + assertFalse(Avros.collections(Avros.generics(Person.SCHEMA$)).isReflect()); + } @Test public void testGetDetachedValue_AlreadyMappedAvroType() { http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/f699409f/crunch/src/test/java/org/apache/crunch/types/avro/AvrosTest.java ---------------------------------------------------------------------- diff --git a/crunch/src/test/java/org/apache/crunch/types/avro/AvrosTest.java b/crunch/src/test/java/org/apache/crunch/types/avro/AvrosTest.java index d9a2735..c71207b 100644 --- a/crunch/src/test/java/org/apache/crunch/types/avro/AvrosTest.java +++ b/crunch/src/test/java/org/apache/crunch/types/avro/AvrosTest.java @@ -24,14 +24,14 @@ import static org.junit.Assert.assertTrue; import java.nio.ByteBuffer; import java.util.Collection; +import java.util.Collections; import org.apache.avro.Schema; import org.apache.avro.Schema.Type; import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericData.Record; +import org.apache.avro.reflect.ReflectData; import org.apache.avro.util.Utf8; -import org.apache.hadoop.io.LongWritable; -import org.junit.Test; - import org.apache.crunch.Pair; import org.apache.crunch.Tuple3; import org.apache.crunch.Tuple4; @@ -39,6 +39,10 @@ import org.apache.crunch.TupleN; import org.apache.crunch.test.Person; import org.apache.crunch.types.PTableType; import org.apache.crunch.types.PType; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.LongWritable; +import org.junit.Test; + import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; @@ -52,41 +56,44 @@ public class AvrosTest { Void n = null; testInputOutputFn(Avros.nulls(), n, n); } - + @Test public void testStrings() throws Exception { String s = "abc"; Utf8 w = new Utf8(s); testInputOutputFn(Avros.strings(), s, w); } - + @Test public void testInts() throws Exception { int j = 55; testInputOutputFn(Avros.ints(), j, j); } + @Test public void testLongs() throws Exception { long j = Long.MAX_VALUE; testInputOutputFn(Avros.longs(), j, j); } + @Test public void testFloats() throws Exception { float j = Float.MIN_VALUE; testInputOutputFn(Avros.floats(), j, j); } + @Test public void testDoubles() throws Exception { double j = Double.MIN_VALUE; testInputOutputFn(Avros.doubles(), j, j); } - + @Test public void testBooleans() throws Exception { boolean j = true; testInputOutputFn(Avros.booleans(), j, j); } - + @Test public void testBytes() throws Exception { byte[] bytes = new byte[] { 17, 26, -98 }; @@ -99,22 +106,21 @@ public class AvrosTest { Collection<String> j = Lists.newArrayList(); j.add("a"); j.add("b"); - Schema collectionSchema = Schema.createArray( - Schema.createUnion(ImmutableList.of( - Avros.strings().getSchema(), Schema.create(Type.NULL)))); + Schema collectionSchema = Schema.createArray(Schema.createUnion(ImmutableList.of(Avros + .strings().getSchema(), Schema.create(Type.NULL)))); GenericData.Array<Utf8> w = new GenericData.Array<Utf8>(2, collectionSchema); w.add(new Utf8("a")); w.add(new Utf8("b")); testInputOutputFn(Avros.collections(Avros.strings()), j, w); } - + @Test public void testNestedTables() throws Exception { PTableType<Long, Long> pll = Avros.tableOf(Avros.longs(), Avros.longs()); String schema = Avros.tableOf(pll, Avros.strings()).getSchema().toString(); assertNotNull(schema); } - + @Test public void testPairs() throws Exception { AvroType<Pair<String, String>> at = Avros.pairs(Avros.strings(), Avros.strings()); @@ -124,15 +130,15 @@ public class AvrosTest { w.put(1, new Utf8("b")); testInputOutputFn(at, j, w); } - + @Test public void testPairEquals() throws Exception { - AvroType<Pair<Long, ByteBuffer>> at1 = Avros.pairs(Avros.longs(), Avros.bytes()); - AvroType<Pair<Long, ByteBuffer>> at2 = Avros.pairs(Avros.longs(), Avros.bytes()); - assertEquals(at1, at2); - assertEquals(at1.hashCode(), at2.hashCode()); + AvroType<Pair<Long, ByteBuffer>> at1 = Avros.pairs(Avros.longs(), Avros.bytes()); + AvroType<Pair<Long, ByteBuffer>> at2 = Avros.pairs(Avros.longs(), Avros.bytes()); + assertEquals(at1, at2); + assertEquals(at1.hashCode(), at2.hashCode()); } - + @Test @SuppressWarnings("rawtypes") public void testTriples() throws Exception { @@ -144,7 +150,7 @@ public class AvrosTest { w.put(2, new Utf8("c")); testInputOutputFn(at, j, w); } - + @Test @SuppressWarnings("rawtypes") public void testQuads() throws Exception { @@ -157,7 +163,7 @@ public class AvrosTest { w.put(3, new Utf8("d")); testInputOutputFn(at, j, w); } - + @Test @SuppressWarnings("rawtypes") public void testTupleN() throws Exception { @@ -171,9 +177,9 @@ public class AvrosTest { w.put(3, new Utf8("d")); w.put(4, new Utf8("e")); testInputOutputFn(at, j, w); - + } - + @Test @SuppressWarnings("rawtypes") public void testWritables() throws Exception { @@ -181,7 +187,7 @@ public class AvrosTest { LongWritable lw = new LongWritable(1729L); assertEquals(lw, at.getInputMapFn().map(at.getOutputMapFn().map(lw))); } - + @Test @SuppressWarnings("rawtypes") public void testTableOf() throws Exception { @@ -193,18 +199,18 @@ public class AvrosTest { // TODO update this after resolving the o.a.a.m.Pair.equals issue initialize(at); assertEquals(j, at.getInputMapFn().map(w)); - org.apache.avro.mapred.Pair converted = - (org.apache.avro.mapred.Pair) at.getOutputMapFn().map(j); + org.apache.avro.mapred.Pair converted = (org.apache.avro.mapred.Pair) at.getOutputMapFn() + .map(j); assertEquals(w.key(), converted.key()); assertEquals(w.value(), converted.value()); } - + private static void initialize(PType ptype) { ptype.getInputMapFn().initialize(); ptype.getOutputMapFn().initialize(); } - - @SuppressWarnings({"unchecked", "rawtypes"}) + + @SuppressWarnings({ "unchecked", "rawtypes" }) protected static void testInputOutputFn(PType ptype, Object java, Object avro) { initialize(ptype); assertEquals(java, ptype.getInputMapFn().map(avro)); @@ -221,4 +227,67 @@ public class AvrosTest { assertFalse(Avros.isPrimitive(Avros.reflects(Person.class))); } + @Test + public void testPairs_Generic() { + Schema schema = ReflectData.get().getSchema(IntWritable.class); + + GenericData.Record recordA = new GenericData.Record(schema); + GenericData.Record recordB = new GenericData.Record(schema); + + AvroType<Pair<Record, Record>> pairType = Avros.pairs(Avros.generics(schema), + Avros.generics(schema)); + Pair<Record, Record> pair = Pair.of(recordA, recordB); + pairType.getOutputMapFn().initialize(); + pairType.getInputMapFn().initialize(); + Object mapped = pairType.getOutputMapFn().map(pair); + Pair<Record, Record> doubleMappedPair = pairType.getInputMapFn().map(mapped); + + assertEquals(pair, doubleMappedPair); + mapped.hashCode(); + } + + @Test + public void testPairs_Reflect() { + IntWritable intWritableA = new IntWritable(1); + IntWritable intWritableB = new IntWritable(2); + + AvroType<Pair<IntWritable, IntWritable>> pairType = Avros.pairs( + Avros.reflects(IntWritable.class), Avros.reflects(IntWritable.class)); + Pair<IntWritable, IntWritable> pair = Pair.of(intWritableA, intWritableB); + pairType.getOutputMapFn().initialize(); + pairType.getInputMapFn().initialize(); + Object mapped = pairType.getOutputMapFn().map(pair); + + Pair<IntWritable, IntWritable> doubleMappedPair = pairType.getInputMapFn().map(mapped); + + assertEquals(pair, doubleMappedPair); + } + + @Test + public void testPairs_Specific() { + Person personA = new Person(); + Person personB = new Person(); + + personA.setAge(1); + personA.setName("A"); + personA.setSiblingnames(Collections.<CharSequence> emptyList()); + + personB.setAge(2); + personB.setName("B"); + personB.setSiblingnames(Collections.<CharSequence> emptyList()); + + AvroType<Pair<Person, Person>> pairType = Avros.pairs(Avros.records(Person.class), + Avros.records(Person.class)); + + Pair<Person, Person> pair = Pair.of(personA, personB); + pairType.getOutputMapFn().initialize(); + pairType.getInputMapFn().initialize(); + + Object mapped = pairType.getOutputMapFn().map(pair); + Pair<Person, Person> doubleMappedPair = pairType.getInputMapFn().map(mapped); + + assertEquals(pair, doubleMappedPair); + + } + }
