Fix TriggerExampleTest
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/77aa0938 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/77aa0938 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/77aa0938 Branch: refs/heads/master Commit: 77aa0938f75f9f3d18a4fa79a5ffe6159167f4d5 Parents: 810ffeb Author: Kenneth Knowles <[email protected]> Authored: Wed Jun 8 16:22:34 2016 -0700 Committer: Kenneth Knowles <[email protected]> Committed: Thu Jun 9 14:41:09 2016 -0700 ---------------------------------------------------------------------- .../examples/cookbook/TriggerExampleTest.java | 61 +++++++++++++------- 1 file changed, 41 insertions(+), 20 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/77aa0938/examples/java/src/test/java/org/apache/beam/examples/cookbook/TriggerExampleTest.java ---------------------------------------------------------------------- diff --git a/examples/java/src/test/java/org/apache/beam/examples/cookbook/TriggerExampleTest.java b/examples/java/src/test/java/org/apache/beam/examples/cookbook/TriggerExampleTest.java index fe75d14..cddce7f 100644 --- a/examples/java/src/test/java/org/apache/beam/examples/cookbook/TriggerExampleTest.java +++ b/examples/java/src/test/java/org/apache/beam/examples/cookbook/TriggerExampleTest.java @@ -34,6 +34,8 @@ import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.TimestampedValue; import com.google.api.services.bigquery.model.TableRow; +import com.google.common.base.Joiner; +import com.google.common.collect.Lists; import org.joda.time.Duration; import org.joda.time.Instant; @@ -44,7 +46,9 @@ import org.junit.runner.RunWith; import org.junit.runners.JUnit4; import java.util.Arrays; +import java.util.Collections; import java.util.List; +import java.util.Map; /** * Unit Tests for {@link TriggerExample}. @@ -70,21 +74,27 @@ public class TriggerExampleTest { + "0.001,74.8,1,9,3,0.0028,71,1,9,12,0.0099,97.4,1,9,13,0.0121,50.0,1,,,,,0,,,,,0" + ",,,,,0,,,,,0", new Instant(1))); - private static final TableRow OUT_ROW_1 = new TableRow() - .set("trigger_type", "default") - .set("freeway", "5").set("total_flow", 30) - .set("number_of_records", 1) - .set("isFirst", true).set("isLast", true) - .set("timing", "ON_TIME") - .set("window", "[1970-01-01T00:01:00.000Z..1970-01-01T00:02:00.000Z)"); - - private static final TableRow OUT_ROW_2 = new TableRow() - .set("trigger_type", "default") - .set("freeway", "110").set("total_flow", 90) - .set("number_of_records", 2) - .set("isFirst", true).set("isLast", true) - .set("timing", "ON_TIME") - .set("window", "[1970-01-01T00:00:00.000Z..1970-01-01T00:01:00.000Z)"); + private static final TableRow OUT_ROW_1 = + new TableRow() + .set("trigger_type", "default") + .set("freeway", "5") + .set("total_flow", 30) + .set("number_of_records", 1) + .set("isFirst", true) + .set("isLast", true) + .set("timing", "ON_TIME") + .set("window", "[1970-01-01T00:01:00.000Z..1970-01-01T00:02:00.000Z)"); + + private static final TableRow OUT_ROW_2 = + new TableRow() + .set("trigger_type", "default") + .set("freeway", "110") + .set("total_flow", 90) + .set("number_of_records", 2) + .set("isFirst", true) + .set("isLast", true) + .set("timing", "ON_TIME") + .set("window", "[1970-01-01T00:00:00.000Z..1970-01-01T00:01:00.000Z)"); @Test public void testExtractTotalFlow() throws Exception { @@ -112,15 +122,26 @@ public class TriggerExampleTest { .apply(Window.<KV<String, Integer>>into(FixedWindows.of(Duration.standardMinutes(1)))) .apply(new TotalFlow("default")); - PCollection<TableRow> results = totalFlow.apply(ParDo.of(new FormatResults())); + PCollection<String> results = totalFlow.apply(ParDo.of(new FormatResults())); - - PAssert.that(results).containsInAnyOrder(OUT_ROW_1, OUT_ROW_2); + PAssert.that(results) + .containsInAnyOrder(canonicalFormat(OUT_ROW_1), canonicalFormat(OUT_ROW_2)); pipeline.run(); } - static class FormatResults extends DoFn<TableRow, TableRow> { + // Sort the fields and toString() the values, since TableRow has a bit of a dynamically + // typed API and equals()/hashCode() are not appropriate for matching in tests + static String canonicalFormat(TableRow row) { + List<String> entries = Lists.newArrayListWithCapacity(row.size()); + for (Map.Entry<String, Object> entry : row.entrySet()) { + entries.add(entry.getKey() + ":" + entry.getValue()); + } + Collections.sort(entries); + return Joiner.on(",").join(entries); + } + + static class FormatResults extends DoFn<TableRow, String> { @Override public void processElement(ProcessContext c) throws Exception { TableRow element = c.element(); @@ -133,7 +154,7 @@ public class TriggerExampleTest { .set("isLast", element.get("isLast")) .set("timing", element.get("timing")) .set("window", element.get("window")); - c.output(row); + c.output(canonicalFormat(row)); } } }
