Fix Queries tests Workaround for issue #22 + extra cleaning
Replace junit asserts by hamcrest asserts Set numEvents in test to the minimum number that makes the tests pass issue #15 comments, improve asserts (hamcrest), reformat For now make generate monothreaded Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/1bd57351 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/1bd57351 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/1bd57351 Branch: refs/heads/master Commit: 1bd57351f1db9b932b253c36d08098cf57ce652b Parents: a1fe33b Author: Etienne Chauchot <[email protected]> Authored: Thu Mar 16 11:38:08 2017 +0100 Committer: Ismaël MejÃa <[email protected]> Committed: Wed Aug 23 19:07:27 2017 +0200 ---------------------------------------------------------------------- integration/java/nexmark/pom.xml | 8 +++- .../integration/nexmark/NexmarkQueryModel.java | 49 ++++++++++---------- .../beam/integration/nexmark/NexmarkUtils.java | 3 +- .../beam/integration/nexmark/Query0Model.java | 1 + .../beam/integration/nexmark/Query1Model.java | 1 + .../beam/integration/nexmark/Query7Model.java | 1 + .../beam/integration/nexmark/Query8Model.java | 2 +- .../nexmark/WinningBidsSimulator.java | 1 + .../beam/integration/nexmark/QueryTest.java | 13 +++--- 9 files changed, 45 insertions(+), 34 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/1bd57351/integration/java/nexmark/pom.xml ---------------------------------------------------------------------- diff --git a/integration/java/nexmark/pom.xml b/integration/java/nexmark/pom.xml index febd96d..27abb0e 100644 --- a/integration/java/nexmark/pom.xml +++ b/integration/java/nexmark/pom.xml @@ -264,7 +264,13 @@ <dependency> <groupId>org.hamcrest</groupId> <artifactId>hamcrest-all</artifactId> - <scope>test</scope> + <version>${hamcrest.version}</version> + </dependency> + + <dependency> + <groupId>org.hamcrest</groupId> + <artifactId>hamcrest-library</artifactId> + <version>${hamcrest.version}</version> </dependency> <dependency> http://git-wip-us.apache.org/repos/asf/beam/blob/1bd57351/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkQueryModel.java ---------------------------------------------------------------------- diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkQueryModel.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkQueryModel.java index a23f82b..f265e0d 100644 --- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkQueryModel.java +++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkQueryModel.java @@ -17,6 +17,11 @@ */ package org.apache.beam.integration.nexmark; +import static org.hamcrest.CoreMatchers.containsString; +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.CoreMatchers.hasItems; +import org.hamcrest.collection.IsIterableContainingInAnyOrder; + import java.io.Serializable; import java.util.ArrayList; import java.util.Collection; @@ -28,16 +33,23 @@ import java.util.Set; import org.apache.beam.sdk.transforms.SerializableFunction; import org.apache.beam.sdk.values.TimestampedValue; +import org.hamcrest.core.IsCollectionContaining; +import org.hamcrest.core.IsEqual; import org.joda.time.Duration; import org.joda.time.Instant; import org.junit.Assert; /** - * Base class for models of the eight NEXMark queries. Provides an assertion - * function which can be applied against the actual query results to check their consistency - * with the model. + * Base class for models of the eight NEXMark queries. Provides an assertion function which can be + * applied against the actual query results to check their consistency with the model. */ public abstract class NexmarkQueryModel implements Serializable { + protected final NexmarkConfiguration configuration; + + public NexmarkQueryModel(NexmarkConfiguration configuration) { + this.configuration = configuration; + } + /** * Return the start of the most recent window of {@code size} and {@code period} which ends * strictly before {@code timestamp}. @@ -50,15 +62,7 @@ public abstract class NexmarkQueryModel implements Serializable { return new Instant(lim - s); } - protected final NexmarkConfiguration configuration; - - public NexmarkQueryModel(NexmarkConfiguration configuration) { - this.configuration = configuration; - } - - /** - * Convert {@code itr} to strings capturing values, timestamps and order. - */ + /** Convert {@code itr} to strings capturing values, timestamps and order. */ protected static <T> List<String> toValueTimestampOrder(Iterator<TimestampedValue<T>> itr) { List<String> strings = new ArrayList<>(); while (itr.hasNext()) { @@ -67,9 +71,7 @@ public abstract class NexmarkQueryModel implements Serializable { return strings; } - /** - * Convert {@code itr} to strings capturing values and order. - */ + /** Convert {@code itr} to strings capturing values and order. */ protected static <T> List<String> toValueOrder(Iterator<TimestampedValue<T>> itr) { List<String> strings = new ArrayList<>(); while (itr.hasNext()) { @@ -78,9 +80,7 @@ public abstract class NexmarkQueryModel implements Serializable { return strings; } - /** - * Convert {@code itr} to strings capturing values only. - */ + /** Convert {@code itr} to strings capturing values only. */ protected static <T> Set<String> toValue(Iterator<TimestampedValue<T>> itr) { Set<String> strings = new HashSet<>(); while (itr.hasNext()) { @@ -99,22 +99,23 @@ public abstract class NexmarkQueryModel implements Serializable { } /** - * Convert iterator of elements to collection of strings to use when testing coherence - * of model against actual query results. + * Convert iterator of elements to collection of strings to use when testing coherence of model + * against actual query results. */ protected abstract <T> Collection<String> toCollection(Iterator<TimestampedValue<T>> itr); - /** - * Return assertion to use on results of pipeline for this query. - */ + /** Return assertion to use on results of pipeline for this query. */ public SerializableFunction<Iterable<TimestampedValue<KnownSize>>, Void> assertionFor() { final Collection<String> expectedStrings = toCollection(simulator().results()); + final String[] expectedStringsArray = expectedStrings.toArray(new String[expectedStrings.size()]); return new SerializableFunction<Iterable<TimestampedValue<KnownSize>>, Void>() { @Override public Void apply(Iterable<TimestampedValue<KnownSize>> actual) { Collection<String> actualStrings = toCollection(relevantResults(actual).iterator()); - Assert.assertEquals(expectedStrings, actualStrings); + Assert.assertThat("wrong pipeline output", actualStrings, IsEqual.equalTo(expectedStrings)); +//compare without order +// Assert.assertThat("wrong pipeline output", actualStrings, IsIterableContainingInAnyOrder.containsInAnyOrder(expectedStringsArray)); return null; } }; http://git-wip-us.apache.org/repos/asf/beam/blob/1bd57351/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkUtils.java ---------------------------------------------------------------------- diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkUtils.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkUtils.java index 8f4cb22..f7417d3 100644 --- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkUtils.java +++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkUtils.java @@ -382,8 +382,7 @@ public class NexmarkUtils { */ public static PTransform<PBegin, PCollection<Event>> batchEventsSource( NexmarkConfiguration configuration) { - return Read.from(new BoundedEventSource( - NexmarkUtils.standardGeneratorConfig(configuration), configuration.numEventGenerators)); + return Read.from(new BoundedEventSource(standardGeneratorConfig(configuration), configuration.numEventGenerators)); } /** http://git-wip-us.apache.org/repos/asf/beam/blob/1bd57351/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Query0Model.java ---------------------------------------------------------------------- diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Query0Model.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Query0Model.java index b7cdf1c..37e3f93 100644 --- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Query0Model.java +++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Query0Model.java @@ -42,6 +42,7 @@ public class Query0Model extends NexmarkQueryModel { return; } addResult(timestampedEvent); + //TODO test fails because offset of some hundreds of ms beween expect and actual } } http://git-wip-us.apache.org/repos/asf/beam/blob/1bd57351/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Query1Model.java ---------------------------------------------------------------------- diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Query1Model.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Query1Model.java index ace6f7e..16287e6 100644 --- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Query1Model.java +++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Query1Model.java @@ -53,6 +53,7 @@ public class Query1Model extends NexmarkQueryModel implements Serializable { TimestampedValue<Bid> result = TimestampedValue.of(resultBid, timestampedEvent.getTimestamp()); addResult(result); + //TODO test fails because offset of some hundreds of ms beween expect and actual } } http://git-wip-us.apache.org/repos/asf/beam/blob/1bd57351/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Query7Model.java ---------------------------------------------------------------------- diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Query7Model.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Query7Model.java index 73e96e2..0033c68 100644 --- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Query7Model.java +++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Query7Model.java @@ -107,6 +107,7 @@ public class Query7Model extends NexmarkQueryModel implements Serializable { } // Keep only the highest bids. captureBid(event.bid); + //TODO test fails because offset of some hundreds of ms between expect and actual } } http://git-wip-us.apache.org/repos/asf/beam/blob/1bd57351/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Query8Model.java ---------------------------------------------------------------------- diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Query8Model.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Query8Model.java index fdd2a35..261e383 100644 --- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Query8Model.java +++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Query8Model.java @@ -115,7 +115,7 @@ public class Query8Model extends NexmarkQueryModel implements Serializable { // Remember auction for future new people. newAuctions.put(event.newAuction.seller, event.newAuction); } - } else { + } else { // event is not an auction, nor a bid, so it is a person // Join new person with existing auctions. for (Auction auction : newAuctions.get(event.newPerson.id)) { addResult(auction, event.newPerson, timestamp); http://git-wip-us.apache.org/repos/asf/beam/blob/1bd57351/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/WinningBidsSimulator.java ---------------------------------------------------------------------- diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/WinningBidsSimulator.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/WinningBidsSimulator.java index 5970556..dc8094b 100644 --- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/WinningBidsSimulator.java +++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/WinningBidsSimulator.java @@ -175,6 +175,7 @@ public class WinningBidsSimulator extends AbstractSimulator<Event, AuctionBid> { return; } addResult(result); + //TODO test fails because offset of some hundreds of ms beween expect and actual return; } http://git-wip-us.apache.org/repos/asf/beam/blob/1bd57351/integration/java/nexmark/src/test/java/org/apache/beam/integration/nexmark/QueryTest.java ---------------------------------------------------------------------- diff --git a/integration/java/nexmark/src/test/java/org/apache/beam/integration/nexmark/QueryTest.java b/integration/java/nexmark/src/test/java/org/apache/beam/integration/nexmark/QueryTest.java index d4d51f1..e481eac 100644 --- a/integration/java/nexmark/src/test/java/org/apache/beam/integration/nexmark/QueryTest.java +++ b/integration/java/nexmark/src/test/java/org/apache/beam/integration/nexmark/QueryTest.java @@ -25,6 +25,7 @@ import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.TimestampedValue; import org.junit.Ignore; +import org.junit.Rule; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; @@ -33,23 +34,23 @@ import org.junit.runners.JUnit4; * Test the various NEXMark queries yield results coherent with their models. */ @RunWith(JUnit4.class) -@Ignore -//TODO Ismael public class QueryTest { private static final NexmarkConfiguration CONFIG = NexmarkConfiguration.DEFAULT.clone(); + @Rule + public TestPipeline p = TestPipeline.create(); static { - CONFIG.numEvents = 2000; + //careful, results of tests are linked to numEvents value + CONFIG.numEvents = 100; } /** Test {@code query} matches {@code model}. */ - private static void queryMatchesModel(String name, NexmarkQuery query, NexmarkQueryModel model) { - Pipeline p = TestPipeline.create(); + private void queryMatchesModel(String name, NexmarkQuery query, NexmarkQueryModel model) { NexmarkUtils.setupPipeline(NexmarkUtils.CoderStrategy.HAND, p); PCollection<TimestampedValue<KnownSize>> results = p.apply(name + ".ReadBounded", NexmarkUtils.batchEventsSource(CONFIG)).apply(query); //TODO Ismael this should not be called explicitly -// results.setIsBoundedInternal(IsBounded.BOUNDED); + results.setIsBoundedInternal(PCollection.IsBounded.BOUNDED); PAssert.that(results).satisfies(model.assertionFor()); p.run().waitUntilFinish(); }
