Move WinningBids into the queries package
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/a39cb800 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/a39cb800 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/a39cb800 Branch: refs/heads/master Commit: a39cb80009f569e1c8ba82ee9c67a7c5dbe3d16f Parents: a6dbdfa Author: Ismaël MejÃa <[email protected]> Authored: Sun Apr 30 17:44:07 2017 +0200 Committer: Ismaël MejÃa <[email protected]> Committed: Wed Aug 23 19:07:28 2017 +0200 ---------------------------------------------------------------------- .../integration/nexmark/AbstractSimulator.java | 210 ---------- .../beam/integration/nexmark/NexmarkQuery.java | 267 ------------- .../integration/nexmark/NexmarkQueryModel.java | 122 ------ .../beam/integration/nexmark/NexmarkRunner.java | 2 + .../beam/integration/nexmark/WinningBids.java | 377 ------------------ .../nexmark/WinningBidsSimulator.java | 205 ---------- .../integration/nexmark/model/AuctionBid.java | 3 +- .../nexmark/queries/AbstractSimulator.java | 211 +++++++++++ .../nexmark/queries/NexmarkQuery.java | 270 +++++++++++++ .../nexmark/queries/NexmarkQueryModel.java | 123 ++++++ .../integration/nexmark/queries/Query0.java | 1 - .../nexmark/queries/Query0Model.java | 4 +- .../integration/nexmark/queries/Query1.java | 1 - .../integration/nexmark/queries/Query10.java | 1 - .../integration/nexmark/queries/Query11.java | 1 - .../integration/nexmark/queries/Query12.java | 1 - .../nexmark/queries/Query1Model.java | 2 - .../integration/nexmark/queries/Query2.java | 1 - .../nexmark/queries/Query2Model.java | 2 - .../integration/nexmark/queries/Query3.java | 1 - .../nexmark/queries/Query3Model.java | 2 - .../integration/nexmark/queries/Query4.java | 2 - .../nexmark/queries/Query4Model.java | 3 - .../integration/nexmark/queries/Query5.java | 1 - .../nexmark/queries/Query5Model.java | 2 - .../integration/nexmark/queries/Query6.java | 2 - .../nexmark/queries/Query6Model.java | 3 - .../integration/nexmark/queries/Query7.java | 1 - .../nexmark/queries/Query7Model.java | 2 - .../integration/nexmark/queries/Query8.java | 1 - .../nexmark/queries/Query8Model.java | 2 - .../integration/nexmark/queries/Query9.java | 2 - .../nexmark/queries/Query9Model.java | 3 - .../nexmark/queries/WinningBids.java | 379 +++++++++++++++++++ .../nexmark/queries/WinningBidsSimulator.java | 207 ++++++++++ .../integration/nexmark/queries/QueryTest.java | 2 - 36 files changed, 1194 insertions(+), 1225 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/a39cb800/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/AbstractSimulator.java ---------------------------------------------------------------------- diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/AbstractSimulator.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/AbstractSimulator.java deleted file mode 100644 index b012842..0000000 --- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/AbstractSimulator.java +++ /dev/null @@ -1,210 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.beam.integration.nexmark; - -import java.util.ArrayList; -import java.util.Iterator; -import java.util.List; -import javax.annotation.Nullable; - -import org.apache.beam.sdk.transforms.windowing.BoundedWindow; -import org.apache.beam.sdk.values.TimestampedValue; -import org.joda.time.Duration; -import org.joda.time.Instant; - -/** - * Abstract base class for simulator of a query. - * - * @param <InputT> Type of input elements. - * @param <OutputT> Type of output elements. - */ -public abstract class AbstractSimulator<InputT, OutputT> { - /** Window size for action bucket sampling. */ - public static final Duration WINDOW_SIZE = Duration.standardMinutes(1); - - /** Input event stream we should draw from. */ - private final Iterator<TimestampedValue<InputT>> input; - - /** Set to true when no more results. */ - private boolean isDone; - - /** - * Results which have not yet been returned by the {@link #results} iterator. - */ - private final List<TimestampedValue<OutputT>> pendingResults; - - /** - * Current window timestamp (ms since epoch). - */ - private long currentWindow; - - /** - * Number of (possibly intermediate) results for the current window. - */ - private long currentCount; - - /** - * Result counts per window which have not yet been returned by the {@link #resultsPerWindow} - * iterator. - */ - private final List<Long> pendingCounts; - - public AbstractSimulator(Iterator<TimestampedValue<InputT>> input) { - this.input = input; - isDone = false; - pendingResults = new ArrayList<>(); - currentWindow = BoundedWindow.TIMESTAMP_MIN_VALUE.getMillis(); - currentCount = 0; - pendingCounts = new ArrayList<>(); - } - - /** Called by implementors of {@link #run}: Fetch the next input element. */ - @Nullable - protected TimestampedValue<InputT> nextInput() { - if (!input.hasNext()) { - return null; - } - TimestampedValue<InputT> timestampedInput = input.next(); - NexmarkUtils.info("input: %s", timestampedInput); - return timestampedInput; - } - - /** - * Called by implementors of {@link #run}: Capture an intermediate result, for the purpose of - * recording the expected activity of the query over time. - */ - protected void addIntermediateResult(TimestampedValue<OutputT> result) { - NexmarkUtils.info("intermediate result: %s", result); - updateCounts(result.getTimestamp()); - } - - /** - * Called by implementors of {@link #run}: Capture a final result, for the purpose of checking - * semantic correctness. - */ - protected void addResult(TimestampedValue<OutputT> result) { - NexmarkUtils.info("result: %s", result); - pendingResults.add(result); - updateCounts(result.getTimestamp()); - } - - /** - * Update window and counts. - */ - private void updateCounts(Instant timestamp) { - long window = timestamp.getMillis() - timestamp.getMillis() % WINDOW_SIZE.getMillis(); - if (window > currentWindow) { - if (currentWindow > BoundedWindow.TIMESTAMP_MIN_VALUE.getMillis()) { - pendingCounts.add(currentCount); - } - currentCount = 0; - currentWindow = window; - } - currentCount++; - } - - /** Called by implementors of {@link #run}: Record that no more results will be emitted. */ - protected void allDone() { - isDone = true; - } - - /** - * Overridden by derived classes to do the next increment of work. Each call should - * call one or more of {@link #nextInput}, {@link #addIntermediateResult}, {@link #addResult} - * or {@link #allDone}. It is ok for a single call to emit more than one result via - * {@link #addResult}. It is ok for a single call to run the entire simulation, though - * this will prevent the {@link #results} and {@link #resultsPerWindow} iterators to - * stall. - */ - protected abstract void run(); - - /** - * Return iterator over all expected timestamped results. The underlying simulator state is - * changed. Only one of {@link #results} or {@link #resultsPerWindow} can be called. - */ - public Iterator<TimestampedValue<OutputT>> results() { - return new Iterator<TimestampedValue<OutputT>>() { - @Override - public boolean hasNext() { - while (true) { - if (!pendingResults.isEmpty()) { - return true; - } - if (isDone) { - return false; - } - run(); - } - } - - @Override - public TimestampedValue<OutputT> next() { - TimestampedValue<OutputT> result = pendingResults.get(0); - pendingResults.remove(0); - return result; - } - - @Override - public void remove() { - throw new UnsupportedOperationException(); - } - }; - } - - /** - * Return an iterator over the number of results per {@link #WINDOW_SIZE} period. The underlying - * simulator state is changed. Only one of {@link #results} or {@link #resultsPerWindow} can be - * called. - */ - public Iterator<Long> resultsPerWindow() { - return new Iterator<Long>() { - @Override - public boolean hasNext() { - while (true) { - if (!pendingCounts.isEmpty()) { - return true; - } - if (isDone) { - if (currentCount > 0) { - pendingCounts.add(currentCount); - currentCount = 0; - currentWindow = BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis(); - return true; - } else { - return false; - } - } - run(); - } - } - - @Override - public Long next() { - Long result = pendingCounts.get(0); - pendingCounts.remove(0); - return result; - } - - @Override - public void remove() { - throw new UnsupportedOperationException(); - } - }; - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/a39cb800/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkQuery.java ---------------------------------------------------------------------- diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkQuery.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkQuery.java deleted file mode 100644 index ab1c305..0000000 --- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkQuery.java +++ /dev/null @@ -1,267 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.integration.nexmark; - -import org.apache.beam.integration.nexmark.model.Auction; -import org.apache.beam.integration.nexmark.model.Bid; -import org.apache.beam.integration.nexmark.model.Event; -import org.apache.beam.integration.nexmark.model.KnownSize; -import org.apache.beam.integration.nexmark.model.Person; -import org.apache.beam.sdk.metrics.Counter; -import org.apache.beam.sdk.metrics.Metrics; -import org.apache.beam.sdk.transforms.DoFn; -import org.apache.beam.sdk.transforms.Filter; -import org.apache.beam.sdk.transforms.PTransform; -import org.apache.beam.sdk.transforms.ParDo; -import org.apache.beam.sdk.transforms.SerializableFunction; -import org.apache.beam.sdk.values.KV; -import org.apache.beam.sdk.values.PCollection; -import org.apache.beam.sdk.values.TimestampedValue; -import org.apache.beam.sdk.values.TupleTag; -import org.joda.time.Instant; - -/** - * Base class for the eight 'NEXMark' queries. Supplies some fragments common to - * multiple queries. - */ -public abstract class NexmarkQuery - extends PTransform<PCollection<Event>, PCollection<TimestampedValue<KnownSize>>> { - protected static final TupleTag<Auction> AUCTION_TAG = new TupleTag<>("auctions"); - protected static final TupleTag<Bid> BID_TAG = new TupleTag<>("bids"); - protected static final TupleTag<Person> PERSON_TAG = new TupleTag<>("person"); - - /** Predicate to detect a new person event. */ - protected static final SerializableFunction<Event, Boolean> IS_NEW_PERSON = - new SerializableFunction<Event, Boolean>() { - @Override - public Boolean apply(Event event) { - return event.newPerson != null; - } - }; - - /** DoFn to convert a new person event to a person. */ - protected static final DoFn<Event, Person> AS_PERSON = new DoFn<Event, Person>() { - @ProcessElement - public void processElement(ProcessContext c) { - c.output(c.element().newPerson); - } - }; - - /** Predicate to detect a new auction event. */ - protected static final SerializableFunction<Event, Boolean> IS_NEW_AUCTION = - new SerializableFunction<Event, Boolean>() { - @Override - public Boolean apply(Event event) { - return event.newAuction != null; - } - }; - - /** DoFn to convert a new auction event to an auction. */ - protected static final DoFn<Event, Auction> AS_AUCTION = new DoFn<Event, Auction>() { - @ProcessElement - public void processElement(ProcessContext c) { - c.output(c.element().newAuction); - } - }; - - /** Predicate to detect a new bid event. */ - protected static final SerializableFunction<Event, Boolean> IS_BID = - new SerializableFunction<Event, Boolean>() { - @Override - public Boolean apply(Event event) { - return event.bid != null; - } - }; - - /** DoFn to convert a bid event to a bid. */ - protected static final DoFn<Event, Bid> AS_BID = new DoFn<Event, Bid>() { - @ProcessElement - public void processElement(ProcessContext c) { - c.output(c.element().bid); - } - }; - - /** Transform to key each person by their id. */ - protected static final ParDo.SingleOutput<Person, KV<Long, Person>> PERSON_BY_ID = - ParDo.of(new DoFn<Person, KV<Long, Person>>() { - @ProcessElement - public void processElement(ProcessContext c) { - c.output(KV.of(c.element().id, c.element())); - } - }); - - /** Transform to key each auction by its id. */ - protected static final ParDo.SingleOutput<Auction, KV<Long, Auction>> AUCTION_BY_ID = - ParDo.of(new DoFn<Auction, KV<Long, Auction>>() { - @ProcessElement - public void processElement(ProcessContext c) { - c.output(KV.of(c.element().id, c.element())); - } - }); - - /** Transform to key each auction by its seller id. */ - protected static final ParDo.SingleOutput<Auction, KV<Long, Auction>> AUCTION_BY_SELLER = - ParDo.of(new DoFn<Auction, KV<Long, Auction>>() { - @ProcessElement - public void processElement(ProcessContext c) { - c.output(KV.of(c.element().seller, c.element())); - } - }); - - /** Transform to key each bid by it's auction id. */ - protected static final ParDo.SingleOutput<Bid, KV<Long, Bid>> BID_BY_AUCTION = - ParDo.of(new DoFn<Bid, KV<Long, Bid>>() { - @ProcessElement - public void processElement(ProcessContext c) { - c.output(KV.of(c.element().auction, c.element())); - } - }); - - /** Transform to project the auction id from each bid. */ - protected static final ParDo.SingleOutput<Bid, Long> BID_TO_AUCTION = - ParDo.of(new DoFn<Bid, Long>() { - @ProcessElement - public void processElement(ProcessContext c) { - c.output(c.element().auction); - } - }); - - /** Transform to project the price from each bid. */ - protected static final ParDo.SingleOutput<Bid, Long> BID_TO_PRICE = - ParDo.of(new DoFn<Bid, Long>() { - @ProcessElement - public void processElement(ProcessContext c) { - c.output(c.element().price); - } - }); - - /** Transform to emit each event with the timestamp embedded within it. */ - public static final ParDo.SingleOutput<Event, Event> EVENT_TIMESTAMP_FROM_DATA = - ParDo.of(new DoFn<Event, Event>() { - @ProcessElement - public void processElement(ProcessContext c) { - Event e = c.element(); - if (e.bid != null) { - c.outputWithTimestamp(e, new Instant(e.bid.dateTime)); - } else if (e.newPerson != null) { - c.outputWithTimestamp(e, new Instant(e.newPerson.dateTime)); - } else if (e.newAuction != null) { - c.outputWithTimestamp(e, new Instant(e.newAuction.dateTime)); - } - } - }); - - /** - * Transform to filter for just the new auction events. - */ - protected static final PTransform<PCollection<Event>, PCollection<Auction>> JUST_NEW_AUCTIONS = - new PTransform<PCollection<Event>, PCollection<Auction>>("justNewAuctions") { - @Override - public PCollection<Auction> expand(PCollection<Event> input) { - return input.apply("IsNewAuction", Filter.by(IS_NEW_AUCTION)) - .apply("AsAuction", ParDo.of(AS_AUCTION)); - } - }; - - /** - * Transform to filter for just the new person events. - */ - protected static final PTransform<PCollection<Event>, PCollection<Person>> JUST_NEW_PERSONS = - new PTransform<PCollection<Event>, PCollection<Person>>("justNewPersons") { - @Override - public PCollection<Person> expand(PCollection<Event> input) { - return input.apply("IsNewPerson", Filter.by(IS_NEW_PERSON)) - .apply("AsPerson", ParDo.of(AS_PERSON)); - } - }; - - /** - * Transform to filter for just the bid events. - */ - protected static final PTransform<PCollection<Event>, PCollection<Bid>> JUST_BIDS = - new PTransform<PCollection<Event>, PCollection<Bid>>("justBids") { - @Override - public PCollection<Bid> expand(PCollection<Event> input) { - return input.apply("IsBid", Filter.by(IS_BID)) - .apply("AsBid", ParDo.of(AS_BID)); - } - }; - - protected final NexmarkConfiguration configuration; - public final Monitor<Event> eventMonitor; - public final Monitor<KnownSize> resultMonitor; - public final Monitor<Event> endOfStreamMonitor; - protected final Counter fatalCounter; - - protected NexmarkQuery(NexmarkConfiguration configuration, String name) { - super(name); - this.configuration = configuration; - if (configuration.debug) { - eventMonitor = new Monitor<>(name + ".Events", "event"); - resultMonitor = new Monitor<>(name + ".Results", "result"); - endOfStreamMonitor = new Monitor<>(name + ".EndOfStream", "end"); - fatalCounter = Metrics.counter(name , "fatal"); - } else { - eventMonitor = null; - resultMonitor = null; - endOfStreamMonitor = null; - fatalCounter = null; - } - } - - /** - * Implement the actual query. All we know about the result is it has a known encoded size. - */ - protected abstract PCollection<KnownSize> applyPrim(PCollection<Event> events); - - @Override - public PCollection<TimestampedValue<KnownSize>> expand(PCollection<Event> events) { - - if (configuration.debug) { - events = - events - // Monitor events as they go by. - .apply(name + ".Monitor", eventMonitor.getTransform()) - // Count each type of event. - .apply(name + ".Snoop", NexmarkUtils.snoop(name)); - } - - if (configuration.cpuDelayMs > 0) { - // Slow down by pegging one core at 100%. - events = events.apply(name + ".CpuDelay", - NexmarkUtils.<Event>cpuDelay(name, configuration.cpuDelayMs)); - } - - if (configuration.diskBusyBytes > 0) { - // Slow down by forcing bytes to durable store. - events = events.apply(name + ".DiskBusy", - NexmarkUtils.<Event>diskBusy(name, configuration.diskBusyBytes)); - } - - // Run the query. - PCollection<KnownSize> queryResults = applyPrim(events); - - if (configuration.debug) { - // Monitor results as they go by. - queryResults = queryResults.apply(name + ".Debug", resultMonitor.getTransform()); - } - - // Timestamp the query results. - return queryResults.apply(name + ".Stamp", NexmarkUtils.<KnownSize>stamp(name)); - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/a39cb800/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkQueryModel.java ---------------------------------------------------------------------- diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkQueryModel.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkQueryModel.java deleted file mode 100644 index b2b1826..0000000 --- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkQueryModel.java +++ /dev/null @@ -1,122 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.integration.nexmark; - -import java.io.Serializable; -import java.util.ArrayList; -import java.util.Collection; -import java.util.HashSet; -import java.util.Iterator; -import java.util.List; -import java.util.Set; - -import org.apache.beam.integration.nexmark.model.KnownSize; -import org.apache.beam.sdk.transforms.SerializableFunction; -import org.apache.beam.sdk.values.TimestampedValue; - - -import org.hamcrest.core.IsEqual; -import org.joda.time.Duration; -import org.joda.time.Instant; -import org.junit.Assert; - -/** - * Base class for models of the eight NEXMark queries. Provides an assertion function which can be - * applied against the actual query results to check their consistency with the model. - */ -public abstract class NexmarkQueryModel implements Serializable { - protected final NexmarkConfiguration configuration; - - public NexmarkQueryModel(NexmarkConfiguration configuration) { - this.configuration = configuration; - } - - /** - * Return the start of the most recent window of {@code size} and {@code period} which ends - * strictly before {@code timestamp}. - */ - public static Instant windowStart(Duration size, Duration period, Instant timestamp) { - long ts = timestamp.getMillis(); - long p = period.getMillis(); - long lim = ts - ts % p; - long s = size.getMillis(); - return new Instant(lim - s); - } - - /** Convert {@code itr} to strings capturing values, timestamps and order. */ - protected static <T> List<String> toValueTimestampOrder(Iterator<TimestampedValue<T>> itr) { - List<String> strings = new ArrayList<>(); - while (itr.hasNext()) { - strings.add(itr.next().toString()); - } - return strings; - } - - /** Convert {@code itr} to strings capturing values and order. */ - protected static <T> List<String> toValueOrder(Iterator<TimestampedValue<T>> itr) { - List<String> strings = new ArrayList<>(); - while (itr.hasNext()) { - strings.add(itr.next().getValue().toString()); - } - return strings; - } - - /** Convert {@code itr} to strings capturing values only. */ - protected static <T> Set<String> toValue(Iterator<TimestampedValue<T>> itr) { - Set<String> strings = new HashSet<>(); - while (itr.hasNext()) { - strings.add(itr.next().getValue().toString()); - } - return strings; - } - - /** Return simulator for query. */ - protected abstract AbstractSimulator<?, ?> simulator(); - - /** Return sub-sequence of results which are significant for model. */ - protected Iterable<TimestampedValue<KnownSize>> relevantResults( - Iterable<TimestampedValue<KnownSize>> results) { - return results; - } - - /** - * Convert iterator of elements to collection of strings to use when testing coherence of model - * against actual query results. - */ - protected abstract <T> Collection<String> toCollection(Iterator<TimestampedValue<T>> itr); - - /** Return assertion to use on results of pipeline for this query. */ - public SerializableFunction<Iterable<TimestampedValue<KnownSize>>, Void> assertionFor() { - final Collection<String> expectedStrings = toCollection(simulator().results()); - final String[] expectedStringsArray = - expectedStrings.toArray(new String[expectedStrings.size()]); - - return new SerializableFunction<Iterable<TimestampedValue<KnownSize>>, Void>() { - @Override - public Void apply(Iterable<TimestampedValue<KnownSize>> actual) { - Collection<String> actualStrings = toCollection(relevantResults(actual).iterator()); - Assert.assertThat("wrong pipeline output", actualStrings, - IsEqual.equalTo(expectedStrings)); -//compare without order -// Assert.assertThat("wrong pipeline output", actualStrings, -// IsIterableContainingInAnyOrder.containsInAnyOrder(expectedStringsArray)); - return null; - } - }; - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/a39cb800/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkRunner.java ---------------------------------------------------------------------- diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkRunner.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkRunner.java index ebfd196..a3c4d33 100644 --- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkRunner.java +++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkRunner.java @@ -38,6 +38,8 @@ import org.apache.beam.integration.nexmark.model.Bid; import org.apache.beam.integration.nexmark.model.Event; import org.apache.beam.integration.nexmark.model.KnownSize; import org.apache.beam.integration.nexmark.model.Person; +import org.apache.beam.integration.nexmark.queries.NexmarkQuery; +import org.apache.beam.integration.nexmark.queries.NexmarkQueryModel; import org.apache.beam.integration.nexmark.queries.Query0; import org.apache.beam.integration.nexmark.queries.Query0Model; import org.apache.beam.integration.nexmark.queries.Query1; http://git-wip-us.apache.org/repos/asf/beam/blob/a39cb800/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/WinningBids.java ---------------------------------------------------------------------- diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/WinningBids.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/WinningBids.java deleted file mode 100644 index 3815b9d..0000000 --- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/WinningBids.java +++ /dev/null @@ -1,377 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.integration.nexmark; - -import static com.google.common.base.Preconditions.checkState; - -import com.fasterxml.jackson.annotation.JsonCreator; -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.io.Serializable; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.List; -import java.util.Map; -import java.util.TreeMap; -import org.apache.beam.integration.nexmark.model.Auction; -import org.apache.beam.integration.nexmark.model.AuctionBid; -import org.apache.beam.integration.nexmark.model.Bid; -import org.apache.beam.integration.nexmark.model.Event; -import org.apache.beam.integration.nexmark.sources.GeneratorConfig; -import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.coders.CoderException; -import org.apache.beam.sdk.coders.CustomCoder; -import org.apache.beam.sdk.coders.VarIntCoder; -import org.apache.beam.sdk.coders.VarLongCoder; -import org.apache.beam.sdk.metrics.Counter; -import org.apache.beam.sdk.metrics.Metrics; -import org.apache.beam.sdk.transforms.DoFn; -import org.apache.beam.sdk.transforms.PTransform; -import org.apache.beam.sdk.transforms.ParDo; -import org.apache.beam.sdk.transforms.join.CoGbkResult; -import org.apache.beam.sdk.transforms.join.CoGroupByKey; -import org.apache.beam.sdk.transforms.join.KeyedPCollectionTuple; -import org.apache.beam.sdk.transforms.windowing.IntervalWindow; -import org.apache.beam.sdk.transforms.windowing.Window; -import org.apache.beam.sdk.transforms.windowing.WindowFn; -import org.apache.beam.sdk.transforms.windowing.WindowMappingFn; -import org.apache.beam.sdk.values.KV; -import org.apache.beam.sdk.values.PCollection; -import org.joda.time.Instant; - -/** - * A transform to find the winning bid for each closed auction. In pseudo CQL syntax: - * - * <pre>{@code - * SELECT Rstream(A.*, B.auction, B.bidder, MAX(B.price), B.dateTime) - * FROM Auction A [ROWS UNBOUNDED], Bid B [ROWS UNBOUNDED] - * WHERE A.id = B.auction AND B.datetime < A.expires AND A.expires < CURRENT_TIME - * GROUP BY A.id - * }</pre> - * - * <p>We will also check that the winning bid is above the auction reserve. Note that - * we ignore the auction opening bid value since it has no impact on which bid eventually wins, - * if any. - * - * <p>Our implementation will use a custom windowing function in order to bring bids and - * auctions together without requiring global state. - */ -public class WinningBids extends PTransform<PCollection<Event>, PCollection<AuctionBid>> { - /** Windows for open auctions and bids. */ - private static class AuctionOrBidWindow extends IntervalWindow implements Serializable { - /** Id of auction this window is for. */ - public final long auction; - - /** - * True if this window represents an actual auction, and thus has a start/end - * time matching that of the auction. False if this window represents a bid, and - * thus has an unbounded start/end time. - */ - public final boolean isAuctionWindow; - - /** For avro only. */ - private AuctionOrBidWindow() { - super(TIMESTAMP_MIN_VALUE, TIMESTAMP_MAX_VALUE); - auction = 0; - isAuctionWindow = false; - } - - private AuctionOrBidWindow( - Instant start, Instant end, long auctionId, boolean isAuctionWindow) { - super(start, end); - this.auction = auctionId; - this.isAuctionWindow = isAuctionWindow; - } - - /** Return an auction window for {@code auction}. */ - public static AuctionOrBidWindow forAuction(Instant timestamp, Auction auction) { - AuctionOrBidWindow result = - new AuctionOrBidWindow(timestamp, new Instant(auction.expires), auction.id, true); - return result; - } - - /** - * Return a bid window for {@code bid}. It should later be merged into - * the corresponding auction window. However, it is possible this bid is for an already - * expired auction, or for an auction which the system has not yet seen. So we - * give the bid a bit of wiggle room in its interval. - */ - public static AuctionOrBidWindow forBid( - long expectedAuctionDurationMs, Instant timestamp, Bid bid) { - // At this point we don't know which auctions are still valid, and the bid may - // be for an auction which won't start until some unknown time in the future - // (due to Generator.AUCTION_ID_LEAD in Generator.nextBid). - // A real system would atomically reconcile bids and auctions by a separate mechanism. - // If we give bids an unbounded window it is possible a bid for an auction which - // has already expired would cause the system watermark to stall, since that window - // would never be retired. - // Instead, we will just give the bid a finite window which expires at - // the upper bound of auctions assuming the auction starts at the same time as the bid, - // and assuming the system is running at its lowest event rate (as per interEventDelayUs). - AuctionOrBidWindow result = new AuctionOrBidWindow( - timestamp, timestamp.plus(expectedAuctionDurationMs * 2), bid.auction, false); - return result; - } - - /** Is this an auction window? */ - public boolean isAuctionWindow() { - return isAuctionWindow; - } - - @Override - public String toString() { - return String.format("AuctionOrBidWindow{start:%s; end:%s; auction:%d; isAuctionWindow:%s}", - start(), end(), auction, isAuctionWindow); - } - } - - /** - * Encodes an {@link AuctionOrBidWindow} as an {@link IntervalWindow} and an auction id long. - */ - private static class AuctionOrBidWindowCoder extends CustomCoder<AuctionOrBidWindow> { - private static final AuctionOrBidWindowCoder INSTANCE = new AuctionOrBidWindowCoder(); - private static final Coder<IntervalWindow> SUPER_CODER = IntervalWindow.getCoder(); - private static final Coder<Long> ID_CODER = VarLongCoder.of(); - private static final Coder<Integer> INT_CODER = VarIntCoder.of(); - - @JsonCreator - public static AuctionOrBidWindowCoder of() { - return INSTANCE; - } - - @Override - public void encode(AuctionOrBidWindow window, OutputStream outStream, Coder.Context context) - throws IOException, CoderException { - SUPER_CODER.encode(window, outStream, Coder.Context.NESTED); - ID_CODER.encode(window.auction, outStream, Coder.Context.NESTED); - INT_CODER.encode(window.isAuctionWindow ? 1 : 0, outStream, Coder.Context.NESTED); - } - - @Override - public AuctionOrBidWindow decode(InputStream inStream, Coder.Context context) - throws IOException, CoderException { - IntervalWindow superWindow = SUPER_CODER.decode(inStream, Coder.Context.NESTED); - long auction = ID_CODER.decode(inStream, Coder.Context.NESTED); - boolean isAuctionWindow = - INT_CODER.decode(inStream, Coder.Context.NESTED) == 0 ? false : true; - return new AuctionOrBidWindow( - superWindow.start(), superWindow.end(), auction, isAuctionWindow); - } - - @Override public void verifyDeterministic() throws NonDeterministicException {} - } - - /** Assign events to auction windows and merges them intelligently. */ - private static class AuctionOrBidWindowFn extends WindowFn<Event, AuctionOrBidWindow> { - /** Expected duration of auctions in ms. */ - private final long expectedAuctionDurationMs; - - public AuctionOrBidWindowFn(long expectedAuctionDurationMs) { - this.expectedAuctionDurationMs = expectedAuctionDurationMs; - } - - @Override - public Collection<AuctionOrBidWindow> assignWindows(AssignContext c) { - Event event = c.element(); - if (event.newAuction != null) { - // Assign auctions to an auction window which expires at the auction's close. - return Arrays.asList(AuctionOrBidWindow.forAuction(c.timestamp(), event.newAuction)); - } else if (event.bid != null) { - // Assign bids to a temporary bid window which will later be merged into the appropriate - // auction window. - return Arrays.asList( - AuctionOrBidWindow.forBid(expectedAuctionDurationMs, c.timestamp(), event.bid)); - } else { - // Don't assign people to any window. They will thus be dropped. - return Arrays.asList(); - } - } - - @Override - public void mergeWindows(MergeContext c) throws Exception { - // Split and index the auction and bid windows by auction id. - Map<Long, AuctionOrBidWindow> idToTrueAuctionWindow = new TreeMap<>(); - Map<Long, List<AuctionOrBidWindow>> idToBidAuctionWindows = new TreeMap<>(); - for (AuctionOrBidWindow window : c.windows()) { - if (window.isAuctionWindow()) { - idToTrueAuctionWindow.put(window.auction, window); - } else { - List<AuctionOrBidWindow> bidWindows = idToBidAuctionWindows.get(window.auction); - if (bidWindows == null) { - bidWindows = new ArrayList<>(); - idToBidAuctionWindows.put(window.auction, bidWindows); - } - bidWindows.add(window); - } - } - - // Merge all 'bid' windows into their corresponding 'auction' window, provided the - // auction has not expired. - for (long auction : idToTrueAuctionWindow.keySet()) { - AuctionOrBidWindow auctionWindow = idToTrueAuctionWindow.get(auction); - List<AuctionOrBidWindow> bidWindows = idToBidAuctionWindows.get(auction); - if (bidWindows != null) { - List<AuctionOrBidWindow> toBeMerged = new ArrayList<>(); - for (AuctionOrBidWindow bidWindow : bidWindows) { - if (bidWindow.start().isBefore(auctionWindow.end())) { - toBeMerged.add(bidWindow); - } - // else: This bid window will remain until its expire time, at which point it - // will expire without ever contributing to an output. - } - if (!toBeMerged.isEmpty()) { - toBeMerged.add(auctionWindow); - c.merge(toBeMerged, auctionWindow); - } - } - } - } - - @Override - public boolean isCompatible(WindowFn<?, ?> other) { - return other instanceof AuctionOrBidWindowFn; - } - - @Override - public Coder<AuctionOrBidWindow> windowCoder() { - return AuctionOrBidWindowCoder.of(); - } - - @Override - public WindowMappingFn<AuctionOrBidWindow> getDefaultWindowMappingFn() { - throw new UnsupportedOperationException("AuctionWindowFn not supported for side inputs"); - } - - /** - * Below we will GBK auctions and bids on their auction ids. Then we will reduce those - * per id to emit {@code (auction, winning bid)} pairs for auctions which have expired with at - * least one valid bid. We would like those output pairs to have a timestamp of the auction's - * expiry (since that's the earliest we know for sure we have the correct winner). We would - * also like to make that winning results are available to following stages at the auction's - * expiry. - * - * <p>Each result of the GBK will have a timestamp of the min of the result of this object's - * assignOutputTime over all records which end up in one of its iterables. Thus we get the - * desired behavior if we ignore each record's timestamp and always return the auction window's - * 'maxTimestamp', which will correspond to the auction's expiry. - * - * <p>In contrast, if this object's assignOutputTime were to return 'inputTimestamp' - * (the usual implementation), then each GBK record will take as its timestamp the minimum of - * the timestamps of all bids and auctions within it, which will always be the auction's - * timestamp. An auction which expires well into the future would thus hold up the watermark - * of the GBK results until that auction expired. That in turn would hold up all winning pairs. - */ - @Override - public Instant getOutputTime( - Instant inputTimestamp, AuctionOrBidWindow window) { - return window.maxTimestamp(); - } - } - - private final AuctionOrBidWindowFn auctionOrBidWindowFn; - - public WinningBids(String name, NexmarkConfiguration configuration) { - super(name); - // What's the expected auction time (when the system is running at the lowest event rate). - long[] interEventDelayUs = configuration.rateShape.interEventDelayUs( - configuration.firstEventRate, configuration.nextEventRate, - configuration.rateUnit, configuration.numEventGenerators); - long longestDelayUs = 0; - for (int i = 0; i < interEventDelayUs.length; i++) { - longestDelayUs = Math.max(longestDelayUs, interEventDelayUs[i]); - } - // Adjust for proportion of auction events amongst all events. - longestDelayUs = - (longestDelayUs * GeneratorConfig.PROPORTION_DENOMINATOR) - / GeneratorConfig.AUCTION_PROPORTION; - // Adjust for number of in-flight auctions. - longestDelayUs = longestDelayUs * configuration.numInFlightAuctions; - long expectedAuctionDurationMs = (longestDelayUs + 999) / 1000; - NexmarkUtils.console("Expected auction duration is %d ms", expectedAuctionDurationMs); - auctionOrBidWindowFn = new AuctionOrBidWindowFn(expectedAuctionDurationMs); - } - - @Override - public PCollection<AuctionBid> expand(PCollection<Event> events) { - // Window auctions and bids into custom auction windows. New people events will be discarded. - // This will allow us to bring bids and auctions together irrespective of how long - // each auction is open for. - events = events.apply("Window", Window.into(auctionOrBidWindowFn)); - - // Key auctions by their id. - PCollection<KV<Long, Auction>> auctionsById = - events.apply(NexmarkQuery.JUST_NEW_AUCTIONS) - .apply("AuctionById:", NexmarkQuery.AUCTION_BY_ID); - - // Key bids by their auction id. - PCollection<KV<Long, Bid>> bidsByAuctionId = - events.apply(NexmarkQuery.JUST_BIDS).apply("BidByAuction", NexmarkQuery.BID_BY_AUCTION); - - // Find the highest price valid bid for each closed auction. - return - // Join auctions and bids. - KeyedPCollectionTuple.of(NexmarkQuery.AUCTION_TAG, auctionsById) - .and(NexmarkQuery.BID_TAG, bidsByAuctionId) - .apply(CoGroupByKey.<Long>create()) - // Filter and select. - .apply(name + ".Join", - ParDo.of(new DoFn<KV<Long, CoGbkResult>, AuctionBid>() { - private final Counter noAuctionCounter = Metrics.counter(name, "noAuction"); - private final Counter underReserveCounter = Metrics.counter(name, "underReserve"); - private final Counter noValidBidsCounter = Metrics.counter(name, "noValidBids"); - - @ProcessElement - public void processElement(ProcessContext c) { - Auction auction = - c.element().getValue().getOnly(NexmarkQuery.AUCTION_TAG, null); - if (auction == null) { - // We have bids without a matching auction. Give up. - noAuctionCounter.inc(); - return; - } - // Find the current winning bid for auction. - // The earliest bid with the maximum price above the reserve wins. - Bid bestBid = null; - for (Bid bid : c.element().getValue().getAll(NexmarkQuery.BID_TAG)) { - // Bids too late for their auction will have been - // filtered out by the window merge function. - checkState(bid.dateTime < auction.expires); - if (bid.price < auction.reserve) { - // Bid price is below auction reserve. - underReserveCounter.inc(); - continue; - } - - if (bestBid == null - || Bid.PRICE_THEN_DESCENDING_TIME.compare(bid, bestBid) > 0) { - bestBid = bid; - } - } - if (bestBid == null) { - // We don't have any valid bids for auction. - noValidBidsCounter.inc(); - return; - } - c.output(new AuctionBid(auction, bestBid)); - } - } - )); - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/a39cb800/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/WinningBidsSimulator.java ---------------------------------------------------------------------- diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/WinningBidsSimulator.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/WinningBidsSimulator.java deleted file mode 100644 index e7f51b7..0000000 --- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/WinningBidsSimulator.java +++ /dev/null @@ -1,205 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.integration.nexmark; - -import java.util.ArrayList; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.TreeMap; -import java.util.TreeSet; -import javax.annotation.Nullable; - -import org.apache.beam.integration.nexmark.model.Auction; -import org.apache.beam.integration.nexmark.model.AuctionBid; -import org.apache.beam.integration.nexmark.model.Bid; -import org.apache.beam.integration.nexmark.model.Event; -import org.apache.beam.sdk.transforms.windowing.BoundedWindow; -import org.apache.beam.sdk.values.TimestampedValue; -import org.joda.time.Instant; - -/** - * A simulator of the {@code WinningBids} query. - */ -public class WinningBidsSimulator extends AbstractSimulator<Event, AuctionBid> { - /** Auctions currently still open, indexed by auction id. */ - private final Map<Long, Auction> openAuctions; - - /** The ids of auctions known to be closed. */ - private final Set<Long> closedAuctions; - - /** Current best valid bids for open auctions, indexed by auction id. */ - private final Map<Long, Bid> bestBids; - - /** Bids for auctions we havn't seen yet. */ - private final List<Bid> bidsWithoutAuctions; - - /** - * Timestamp of last new auction or bid event (ms since epoch). - */ - private long lastTimestamp; - - public WinningBidsSimulator(NexmarkConfiguration configuration) { - super(NexmarkUtils.standardEventIterator(configuration)); - openAuctions = new TreeMap<>(); - closedAuctions = new TreeSet<>(); - bestBids = new TreeMap<>(); - bidsWithoutAuctions = new ArrayList<>(); - lastTimestamp = BoundedWindow.TIMESTAMP_MIN_VALUE.getMillis(); - } - - /** - * Try to account for {@code bid} in state. Return true if bid has now been - * accounted for by {@code bestBids}. - */ - private boolean captureBestBid(Bid bid, boolean shouldLog) { - if (closedAuctions.contains(bid.auction)) { - // Ignore bids for known, closed auctions. - if (shouldLog) { - NexmarkUtils.info("closed auction: %s", bid); - } - return true; - } - Auction auction = openAuctions.get(bid.auction); - if (auction == null) { - // We don't have an auction for this bid yet, so can't determine if it is - // winning or not. - if (shouldLog) { - NexmarkUtils.info("pending auction: %s", bid); - } - return false; - } - if (bid.price < auction.reserve) { - // Bid price is too low. - if (shouldLog) { - NexmarkUtils.info("below reserve: %s", bid); - } - return true; - } - Bid existingBid = bestBids.get(bid.auction); - if (existingBid == null || Bid.PRICE_THEN_DESCENDING_TIME.compare(existingBid, bid) < 0) { - // We've found a (new) best bid for a known auction. - bestBids.put(bid.auction, bid); - if (shouldLog) { - NexmarkUtils.info("new winning bid: %s", bid); - } - } else { - if (shouldLog) { - NexmarkUtils.info("ignoring low bid: %s", bid); - } - } - return true; - } - - /** - * Try to match bids without auctions to auctions. - */ - private void flushBidsWithoutAuctions() { - Iterator<Bid> itr = bidsWithoutAuctions.iterator(); - while (itr.hasNext()) { - Bid bid = itr.next(); - if (captureBestBid(bid, false)) { - NexmarkUtils.info("bid now accounted for: %s", bid); - itr.remove(); - } - } - } - - /** - * Return the next winning bid for an expired auction relative to {@code timestamp}. - * Return null if no more winning bids, in which case all expired auctions will - * have been removed from our state. Retire auctions in order of expire time. - */ - @Nullable - private TimestampedValue<AuctionBid> nextWinningBid(long timestamp) { - Map<Long, List<Long>> toBeRetired = new TreeMap<>(); - for (Map.Entry<Long, Auction> entry : openAuctions.entrySet()) { - if (entry.getValue().expires <= timestamp) { - List<Long> idsAtTime = toBeRetired.get(entry.getValue().expires); - if (idsAtTime == null) { - idsAtTime = new ArrayList<>(); - toBeRetired.put(entry.getValue().expires, idsAtTime); - } - idsAtTime.add(entry.getKey()); - } - } - for (Map.Entry<Long, List<Long>> entry : toBeRetired.entrySet()) { - for (long id : entry.getValue()) { - Auction auction = openAuctions.get(id); - NexmarkUtils.info("retiring auction: %s", auction); - openAuctions.remove(id); - Bid bestBid = bestBids.get(id); - if (bestBid != null) { - TimestampedValue<AuctionBid> result = - TimestampedValue.of(new AuctionBid(auction, bestBid), new Instant(auction.expires)); - NexmarkUtils.info("winning: %s", result); - return result; - } - } - } - return null; - } - - @Override - protected void run() { - if (lastTimestamp > BoundedWindow.TIMESTAMP_MIN_VALUE.getMillis()) { - // We may have finally seen the auction a bid was intended for. - flushBidsWithoutAuctions(); - TimestampedValue<AuctionBid> result = nextWinningBid(lastTimestamp); - if (result != null) { - addResult(result); - return; - } - } - - TimestampedValue<Event> timestampedEvent = nextInput(); - if (timestampedEvent == null) { - // No more events. Flush any still open auctions. - TimestampedValue<AuctionBid> result = - nextWinningBid(BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis()); - if (result == null) { - // We are done. - allDone(); - return; - } - addResult(result); - //TODO test fails because offset of some hundreds of ms beween expect and actual - return; - } - - Event event = timestampedEvent.getValue(); - if (event.newPerson != null) { - // Ignore new person events. - return; - } - - lastTimestamp = timestampedEvent.getTimestamp().getMillis(); - if (event.newAuction != null) { - // Add this new open auction to our state. - openAuctions.put(event.newAuction.id, event.newAuction); - } else { - if (!captureBestBid(event.bid, true)) { - // We don't know what to do with this bid yet. - NexmarkUtils.info("bid not yet accounted for: %s", event.bid); - bidsWithoutAuctions.add(event.bid); - } - } - // Keep looking for winning bids. - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/a39cb800/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/AuctionBid.java ---------------------------------------------------------------------- diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/AuctionBid.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/AuctionBid.java index 7f6b7c9..b1d9ec2 100644 --- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/AuctionBid.java +++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/AuctionBid.java @@ -24,13 +24,12 @@ import java.io.InputStream; import java.io.OutputStream; import java.io.Serializable; import org.apache.beam.integration.nexmark.NexmarkUtils; -import org.apache.beam.integration.nexmark.WinningBids; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.CoderException; import org.apache.beam.sdk.coders.CustomCoder; /** - * Result of {@link WinningBids} transform. + * Result of {@link org.apache.beam.integration.nexmark.queries.WinningBids} transform. */ public class AuctionBid implements KnownSize, Serializable { public static final Coder<AuctionBid> CODER = new CustomCoder<AuctionBid>() { http://git-wip-us.apache.org/repos/asf/beam/blob/a39cb800/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/AbstractSimulator.java ---------------------------------------------------------------------- diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/AbstractSimulator.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/AbstractSimulator.java new file mode 100644 index 0000000..270b5c3 --- /dev/null +++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/AbstractSimulator.java @@ -0,0 +1,211 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.integration.nexmark.queries; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import javax.annotation.Nullable; + +import org.apache.beam.integration.nexmark.NexmarkUtils; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.values.TimestampedValue; +import org.joda.time.Duration; +import org.joda.time.Instant; + +/** + * Abstract base class for simulator of a query. + * + * @param <InputT> Type of input elements. + * @param <OutputT> Type of output elements. + */ +public abstract class AbstractSimulator<InputT, OutputT> { + /** Window size for action bucket sampling. */ + public static final Duration WINDOW_SIZE = Duration.standardMinutes(1); + + /** Input event stream we should draw from. */ + private final Iterator<TimestampedValue<InputT>> input; + + /** Set to true when no more results. */ + private boolean isDone; + + /** + * Results which have not yet been returned by the {@link #results} iterator. + */ + private final List<TimestampedValue<OutputT>> pendingResults; + + /** + * Current window timestamp (ms since epoch). + */ + private long currentWindow; + + /** + * Number of (possibly intermediate) results for the current window. + */ + private long currentCount; + + /** + * Result counts per window which have not yet been returned by the {@link #resultsPerWindow} + * iterator. + */ + private final List<Long> pendingCounts; + + public AbstractSimulator(Iterator<TimestampedValue<InputT>> input) { + this.input = input; + isDone = false; + pendingResults = new ArrayList<>(); + currentWindow = BoundedWindow.TIMESTAMP_MIN_VALUE.getMillis(); + currentCount = 0; + pendingCounts = new ArrayList<>(); + } + + /** Called by implementors of {@link #run}: Fetch the next input element. */ + @Nullable + protected TimestampedValue<InputT> nextInput() { + if (!input.hasNext()) { + return null; + } + TimestampedValue<InputT> timestampedInput = input.next(); + NexmarkUtils.info("input: %s", timestampedInput); + return timestampedInput; + } + + /** + * Called by implementors of {@link #run}: Capture an intermediate result, for the purpose of + * recording the expected activity of the query over time. + */ + protected void addIntermediateResult(TimestampedValue<OutputT> result) { + NexmarkUtils.info("intermediate result: %s", result); + updateCounts(result.getTimestamp()); + } + + /** + * Called by implementors of {@link #run}: Capture a final result, for the purpose of checking + * semantic correctness. + */ + protected void addResult(TimestampedValue<OutputT> result) { + NexmarkUtils.info("result: %s", result); + pendingResults.add(result); + updateCounts(result.getTimestamp()); + } + + /** + * Update window and counts. + */ + private void updateCounts(Instant timestamp) { + long window = timestamp.getMillis() - timestamp.getMillis() % WINDOW_SIZE.getMillis(); + if (window > currentWindow) { + if (currentWindow > BoundedWindow.TIMESTAMP_MIN_VALUE.getMillis()) { + pendingCounts.add(currentCount); + } + currentCount = 0; + currentWindow = window; + } + currentCount++; + } + + /** Called by implementors of {@link #run}: Record that no more results will be emitted. */ + protected void allDone() { + isDone = true; + } + + /** + * Overridden by derived classes to do the next increment of work. Each call should + * call one or more of {@link #nextInput}, {@link #addIntermediateResult}, {@link #addResult} + * or {@link #allDone}. It is ok for a single call to emit more than one result via + * {@link #addResult}. It is ok for a single call to run the entire simulation, though + * this will prevent the {@link #results} and {@link #resultsPerWindow} iterators to + * stall. + */ + protected abstract void run(); + + /** + * Return iterator over all expected timestamped results. The underlying simulator state is + * changed. Only one of {@link #results} or {@link #resultsPerWindow} can be called. + */ + public Iterator<TimestampedValue<OutputT>> results() { + return new Iterator<TimestampedValue<OutputT>>() { + @Override + public boolean hasNext() { + while (true) { + if (!pendingResults.isEmpty()) { + return true; + } + if (isDone) { + return false; + } + run(); + } + } + + @Override + public TimestampedValue<OutputT> next() { + TimestampedValue<OutputT> result = pendingResults.get(0); + pendingResults.remove(0); + return result; + } + + @Override + public void remove() { + throw new UnsupportedOperationException(); + } + }; + } + + /** + * Return an iterator over the number of results per {@link #WINDOW_SIZE} period. The underlying + * simulator state is changed. Only one of {@link #results} or {@link #resultsPerWindow} can be + * called. + */ + public Iterator<Long> resultsPerWindow() { + return new Iterator<Long>() { + @Override + public boolean hasNext() { + while (true) { + if (!pendingCounts.isEmpty()) { + return true; + } + if (isDone) { + if (currentCount > 0) { + pendingCounts.add(currentCount); + currentCount = 0; + currentWindow = BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis(); + return true; + } else { + return false; + } + } + run(); + } + } + + @Override + public Long next() { + Long result = pendingCounts.get(0); + pendingCounts.remove(0); + return result; + } + + @Override + public void remove() { + throw new UnsupportedOperationException(); + } + }; + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/a39cb800/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/NexmarkQuery.java ---------------------------------------------------------------------- diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/NexmarkQuery.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/NexmarkQuery.java new file mode 100644 index 0000000..0796ce5 --- /dev/null +++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/NexmarkQuery.java @@ -0,0 +1,270 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.integration.nexmark.queries; + +import org.apache.beam.integration.nexmark.Monitor; +import org.apache.beam.integration.nexmark.NexmarkConfiguration; +import org.apache.beam.integration.nexmark.NexmarkUtils; +import org.apache.beam.integration.nexmark.model.Auction; +import org.apache.beam.integration.nexmark.model.Bid; +import org.apache.beam.integration.nexmark.model.Event; +import org.apache.beam.integration.nexmark.model.KnownSize; +import org.apache.beam.integration.nexmark.model.Person; +import org.apache.beam.sdk.metrics.Counter; +import org.apache.beam.sdk.metrics.Metrics; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.Filter; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.SerializableFunction; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.TimestampedValue; +import org.apache.beam.sdk.values.TupleTag; +import org.joda.time.Instant; + +/** + * Base class for the eight 'NEXMark' queries. Supplies some fragments common to + * multiple queries. + */ +public abstract class NexmarkQuery + extends PTransform<PCollection<Event>, PCollection<TimestampedValue<KnownSize>>> { + public static final TupleTag<Auction> AUCTION_TAG = new TupleTag<>("auctions"); + public static final TupleTag<Bid> BID_TAG = new TupleTag<>("bids"); + protected static final TupleTag<Person> PERSON_TAG = new TupleTag<>("person"); + + /** Predicate to detect a new person event. */ + protected static final SerializableFunction<Event, Boolean> IS_NEW_PERSON = + new SerializableFunction<Event, Boolean>() { + @Override + public Boolean apply(Event event) { + return event.newPerson != null; + } + }; + + /** DoFn to convert a new person event to a person. */ + protected static final DoFn<Event, Person> AS_PERSON = new DoFn<Event, Person>() { + @ProcessElement + public void processElement(ProcessContext c) { + c.output(c.element().newPerson); + } + }; + + /** Predicate to detect a new auction event. */ + protected static final SerializableFunction<Event, Boolean> IS_NEW_AUCTION = + new SerializableFunction<Event, Boolean>() { + @Override + public Boolean apply(Event event) { + return event.newAuction != null; + } + }; + + /** DoFn to convert a new auction event to an auction. */ + protected static final DoFn<Event, Auction> AS_AUCTION = new DoFn<Event, Auction>() { + @ProcessElement + public void processElement(ProcessContext c) { + c.output(c.element().newAuction); + } + }; + + /** Predicate to detect a new bid event. */ + protected static final SerializableFunction<Event, Boolean> IS_BID = + new SerializableFunction<Event, Boolean>() { + @Override + public Boolean apply(Event event) { + return event.bid != null; + } + }; + + /** DoFn to convert a bid event to a bid. */ + protected static final DoFn<Event, Bid> AS_BID = new DoFn<Event, Bid>() { + @ProcessElement + public void processElement(ProcessContext c) { + c.output(c.element().bid); + } + }; + + /** Transform to key each person by their id. */ + protected static final ParDo.SingleOutput<Person, KV<Long, Person>> PERSON_BY_ID = + ParDo.of(new DoFn<Person, KV<Long, Person>>() { + @ProcessElement + public void processElement(ProcessContext c) { + c.output(KV.of(c.element().id, c.element())); + } + }); + + /** Transform to key each auction by its id. */ + protected static final ParDo.SingleOutput<Auction, KV<Long, Auction>> AUCTION_BY_ID = + ParDo.of(new DoFn<Auction, KV<Long, Auction>>() { + @ProcessElement + public void processElement(ProcessContext c) { + c.output(KV.of(c.element().id, c.element())); + } + }); + + /** Transform to key each auction by its seller id. */ + protected static final ParDo.SingleOutput<Auction, KV<Long, Auction>> AUCTION_BY_SELLER = + ParDo.of(new DoFn<Auction, KV<Long, Auction>>() { + @ProcessElement + public void processElement(ProcessContext c) { + c.output(KV.of(c.element().seller, c.element())); + } + }); + + /** Transform to key each bid by it's auction id. */ + protected static final ParDo.SingleOutput<Bid, KV<Long, Bid>> BID_BY_AUCTION = + ParDo.of(new DoFn<Bid, KV<Long, Bid>>() { + @ProcessElement + public void processElement(ProcessContext c) { + c.output(KV.of(c.element().auction, c.element())); + } + }); + + /** Transform to project the auction id from each bid. */ + protected static final ParDo.SingleOutput<Bid, Long> BID_TO_AUCTION = + ParDo.of(new DoFn<Bid, Long>() { + @ProcessElement + public void processElement(ProcessContext c) { + c.output(c.element().auction); + } + }); + + /** Transform to project the price from each bid. */ + protected static final ParDo.SingleOutput<Bid, Long> BID_TO_PRICE = + ParDo.of(new DoFn<Bid, Long>() { + @ProcessElement + public void processElement(ProcessContext c) { + c.output(c.element().price); + } + }); + + /** Transform to emit each event with the timestamp embedded within it. */ + public static final ParDo.SingleOutput<Event, Event> EVENT_TIMESTAMP_FROM_DATA = + ParDo.of(new DoFn<Event, Event>() { + @ProcessElement + public void processElement(ProcessContext c) { + Event e = c.element(); + if (e.bid != null) { + c.outputWithTimestamp(e, new Instant(e.bid.dateTime)); + } else if (e.newPerson != null) { + c.outputWithTimestamp(e, new Instant(e.newPerson.dateTime)); + } else if (e.newAuction != null) { + c.outputWithTimestamp(e, new Instant(e.newAuction.dateTime)); + } + } + }); + + /** + * Transform to filter for just the new auction events. + */ + public static final PTransform<PCollection<Event>, PCollection<Auction>> JUST_NEW_AUCTIONS = + new PTransform<PCollection<Event>, PCollection<Auction>>("justNewAuctions") { + @Override + public PCollection<Auction> expand(PCollection<Event> input) { + return input.apply("IsNewAuction", Filter.by(IS_NEW_AUCTION)) + .apply("AsAuction", ParDo.of(AS_AUCTION)); + } + }; + + /** + * Transform to filter for just the new person events. + */ + public static final PTransform<PCollection<Event>, PCollection<Person>> JUST_NEW_PERSONS = + new PTransform<PCollection<Event>, PCollection<Person>>("justNewPersons") { + @Override + public PCollection<Person> expand(PCollection<Event> input) { + return input.apply("IsNewPerson", Filter.by(IS_NEW_PERSON)) + .apply("AsPerson", ParDo.of(AS_PERSON)); + } + }; + + /** + * Transform to filter for just the bid events. + */ + public static final PTransform<PCollection<Event>, PCollection<Bid>> JUST_BIDS = + new PTransform<PCollection<Event>, PCollection<Bid>>("justBids") { + @Override + public PCollection<Bid> expand(PCollection<Event> input) { + return input.apply("IsBid", Filter.by(IS_BID)) + .apply("AsBid", ParDo.of(AS_BID)); + } + }; + + protected final NexmarkConfiguration configuration; + public final Monitor<Event> eventMonitor; + public final Monitor<KnownSize> resultMonitor; + public final Monitor<Event> endOfStreamMonitor; + protected final Counter fatalCounter; + + protected NexmarkQuery(NexmarkConfiguration configuration, String name) { + super(name); + this.configuration = configuration; + if (configuration.debug) { + eventMonitor = new Monitor<>(name + ".Events", "event"); + resultMonitor = new Monitor<>(name + ".Results", "result"); + endOfStreamMonitor = new Monitor<>(name + ".EndOfStream", "end"); + fatalCounter = Metrics.counter(name , "fatal"); + } else { + eventMonitor = null; + resultMonitor = null; + endOfStreamMonitor = null; + fatalCounter = null; + } + } + + /** + * Implement the actual query. All we know about the result is it has a known encoded size. + */ + protected abstract PCollection<KnownSize> applyPrim(PCollection<Event> events); + + @Override + public PCollection<TimestampedValue<KnownSize>> expand(PCollection<Event> events) { + + if (configuration.debug) { + events = + events + // Monitor events as they go by. + .apply(name + ".Monitor", eventMonitor.getTransform()) + // Count each type of event. + .apply(name + ".Snoop", NexmarkUtils.snoop(name)); + } + + if (configuration.cpuDelayMs > 0) { + // Slow down by pegging one core at 100%. + events = events.apply(name + ".CpuDelay", + NexmarkUtils.<Event>cpuDelay(name, configuration.cpuDelayMs)); + } + + if (configuration.diskBusyBytes > 0) { + // Slow down by forcing bytes to durable store. + events = events.apply(name + ".DiskBusy", + NexmarkUtils.<Event>diskBusy(name, configuration.diskBusyBytes)); + } + + // Run the query. + PCollection<KnownSize> queryResults = applyPrim(events); + + if (configuration.debug) { + // Monitor results as they go by. + queryResults = queryResults.apply(name + ".Debug", resultMonitor.getTransform()); + } + + // Timestamp the query results. + return queryResults.apply(name + ".Stamp", NexmarkUtils.<KnownSize>stamp(name)); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/a39cb800/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/NexmarkQueryModel.java ---------------------------------------------------------------------- diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/NexmarkQueryModel.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/NexmarkQueryModel.java new file mode 100644 index 0000000..1ad9099 --- /dev/null +++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/NexmarkQueryModel.java @@ -0,0 +1,123 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.integration.nexmark.queries; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Set; + +import org.apache.beam.integration.nexmark.NexmarkConfiguration; +import org.apache.beam.integration.nexmark.model.KnownSize; +import org.apache.beam.sdk.transforms.SerializableFunction; +import org.apache.beam.sdk.values.TimestampedValue; + + +import org.hamcrest.core.IsEqual; +import org.joda.time.Duration; +import org.joda.time.Instant; +import org.junit.Assert; + +/** + * Base class for models of the eight NEXMark queries. Provides an assertion function which can be + * applied against the actual query results to check their consistency with the model. + */ +public abstract class NexmarkQueryModel implements Serializable { + public final NexmarkConfiguration configuration; + + public NexmarkQueryModel(NexmarkConfiguration configuration) { + this.configuration = configuration; + } + + /** + * Return the start of the most recent window of {@code size} and {@code period} which ends + * strictly before {@code timestamp}. + */ + public static Instant windowStart(Duration size, Duration period, Instant timestamp) { + long ts = timestamp.getMillis(); + long p = period.getMillis(); + long lim = ts - ts % p; + long s = size.getMillis(); + return new Instant(lim - s); + } + + /** Convert {@code itr} to strings capturing values, timestamps and order. */ + protected static <T> List<String> toValueTimestampOrder(Iterator<TimestampedValue<T>> itr) { + List<String> strings = new ArrayList<>(); + while (itr.hasNext()) { + strings.add(itr.next().toString()); + } + return strings; + } + + /** Convert {@code itr} to strings capturing values and order. */ + protected static <T> List<String> toValueOrder(Iterator<TimestampedValue<T>> itr) { + List<String> strings = new ArrayList<>(); + while (itr.hasNext()) { + strings.add(itr.next().getValue().toString()); + } + return strings; + } + + /** Convert {@code itr} to strings capturing values only. */ + protected static <T> Set<String> toValue(Iterator<TimestampedValue<T>> itr) { + Set<String> strings = new HashSet<>(); + while (itr.hasNext()) { + strings.add(itr.next().getValue().toString()); + } + return strings; + } + + /** Return simulator for query. */ + public abstract AbstractSimulator<?, ?> simulator(); + + /** Return sub-sequence of results which are significant for model. */ + protected Iterable<TimestampedValue<KnownSize>> relevantResults( + Iterable<TimestampedValue<KnownSize>> results) { + return results; + } + + /** + * Convert iterator of elements to collection of strings to use when testing coherence of model + * against actual query results. + */ + protected abstract <T> Collection<String> toCollection(Iterator<TimestampedValue<T>> itr); + + /** Return assertion to use on results of pipeline for this query. */ + public SerializableFunction<Iterable<TimestampedValue<KnownSize>>, Void> assertionFor() { + final Collection<String> expectedStrings = toCollection(simulator().results()); + final String[] expectedStringsArray = + expectedStrings.toArray(new String[expectedStrings.size()]); + + return new SerializableFunction<Iterable<TimestampedValue<KnownSize>>, Void>() { + @Override + public Void apply(Iterable<TimestampedValue<KnownSize>> actual) { + Collection<String> actualStrings = toCollection(relevantResults(actual).iterator()); + Assert.assertThat("wrong pipeline output", actualStrings, + IsEqual.equalTo(expectedStrings)); +//compare without order +// Assert.assertThat("wrong pipeline output", actualStrings, +// IsIterableContainingInAnyOrder.containsInAnyOrder(expectedStringsArray)); + return null; + } + }; + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/a39cb800/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query0.java ---------------------------------------------------------------------- diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query0.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query0.java index 84696c4..00a49a8 100644 --- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query0.java +++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query0.java @@ -22,7 +22,6 @@ import java.io.ByteArrayOutputStream; import java.io.IOException; import org.apache.beam.integration.nexmark.NexmarkConfiguration; -import org.apache.beam.integration.nexmark.NexmarkQuery; import org.apache.beam.integration.nexmark.NexmarkUtils; import org.apache.beam.integration.nexmark.model.Event; import org.apache.beam.integration.nexmark.model.KnownSize; http://git-wip-us.apache.org/repos/asf/beam/blob/a39cb800/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query0Model.java ---------------------------------------------------------------------- diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query0Model.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query0Model.java index 991b1d4..6fb6613 100644 --- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query0Model.java +++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query0Model.java @@ -20,9 +20,7 @@ package org.apache.beam.integration.nexmark.queries; import java.util.Collection; import java.util.Iterator; -import org.apache.beam.integration.nexmark.AbstractSimulator; import org.apache.beam.integration.nexmark.NexmarkConfiguration; -import org.apache.beam.integration.nexmark.NexmarkQueryModel; import org.apache.beam.integration.nexmark.NexmarkUtils; import org.apache.beam.integration.nexmark.model.Event; import org.apache.beam.sdk.values.TimestampedValue; @@ -56,7 +54,7 @@ public class Query0Model extends NexmarkQueryModel { } @Override - protected AbstractSimulator<?, ?> simulator() { + public AbstractSimulator<?, ?> simulator() { return new Simulator(configuration); } http://git-wip-us.apache.org/repos/asf/beam/blob/a39cb800/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query1.java ---------------------------------------------------------------------- diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query1.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query1.java index 0be77ce..8d90b70 100644 --- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query1.java +++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query1.java @@ -18,7 +18,6 @@ package org.apache.beam.integration.nexmark.queries; import org.apache.beam.integration.nexmark.NexmarkConfiguration; -import org.apache.beam.integration.nexmark.NexmarkQuery; import org.apache.beam.integration.nexmark.NexmarkUtils; import org.apache.beam.integration.nexmark.model.Bid; import org.apache.beam.integration.nexmark.model.Event; http://git-wip-us.apache.org/repos/asf/beam/blob/a39cb800/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query10.java ---------------------------------------------------------------------- diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query10.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query10.java index d9b3557..c919691 100644 --- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query10.java +++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query10.java @@ -28,7 +28,6 @@ import java.nio.channels.WritableByteChannel; import java.util.concurrent.ThreadLocalRandom; import javax.annotation.Nullable; import org.apache.beam.integration.nexmark.NexmarkConfiguration; -import org.apache.beam.integration.nexmark.NexmarkQuery; import org.apache.beam.integration.nexmark.NexmarkUtils; import org.apache.beam.integration.nexmark.model.Done; import org.apache.beam.integration.nexmark.model.Event; http://git-wip-us.apache.org/repos/asf/beam/blob/a39cb800/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query11.java ---------------------------------------------------------------------- diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query11.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query11.java index a8a61ae..fd936a9 100644 --- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query11.java +++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query11.java @@ -18,7 +18,6 @@ package org.apache.beam.integration.nexmark.queries; import org.apache.beam.integration.nexmark.NexmarkConfiguration; -import org.apache.beam.integration.nexmark.NexmarkQuery; import org.apache.beam.integration.nexmark.NexmarkUtils; import org.apache.beam.integration.nexmark.model.Bid; import org.apache.beam.integration.nexmark.model.BidsPerSession; http://git-wip-us.apache.org/repos/asf/beam/blob/a39cb800/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query12.java ---------------------------------------------------------------------- diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query12.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query12.java index a5db504..20f45fb 100644 --- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query12.java +++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query12.java @@ -18,7 +18,6 @@ package org.apache.beam.integration.nexmark.queries; import org.apache.beam.integration.nexmark.NexmarkConfiguration; -import org.apache.beam.integration.nexmark.NexmarkQuery; import org.apache.beam.integration.nexmark.NexmarkUtils; import org.apache.beam.integration.nexmark.model.Bid; import org.apache.beam.integration.nexmark.model.BidsPerSession; http://git-wip-us.apache.org/repos/asf/beam/blob/a39cb800/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query1Model.java ---------------------------------------------------------------------- diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query1Model.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query1Model.java index 58037d3..0388687 100644 --- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query1Model.java +++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query1Model.java @@ -21,9 +21,7 @@ import java.io.Serializable; import java.util.Collection; import java.util.Iterator; -import org.apache.beam.integration.nexmark.AbstractSimulator; import org.apache.beam.integration.nexmark.NexmarkConfiguration; -import org.apache.beam.integration.nexmark.NexmarkQueryModel; import org.apache.beam.integration.nexmark.NexmarkUtils; import org.apache.beam.integration.nexmark.model.Bid; import org.apache.beam.integration.nexmark.model.Event; http://git-wip-us.apache.org/repos/asf/beam/blob/a39cb800/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query2.java ---------------------------------------------------------------------- diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query2.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query2.java index 4c8f878..a365b97 100644 --- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query2.java +++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query2.java @@ -18,7 +18,6 @@ package org.apache.beam.integration.nexmark.queries; import org.apache.beam.integration.nexmark.NexmarkConfiguration; -import org.apache.beam.integration.nexmark.NexmarkQuery; import org.apache.beam.integration.nexmark.NexmarkUtils; import org.apache.beam.integration.nexmark.model.AuctionPrice; import org.apache.beam.integration.nexmark.model.Bid; http://git-wip-us.apache.org/repos/asf/beam/blob/a39cb800/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query2Model.java ---------------------------------------------------------------------- diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query2Model.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query2Model.java index f578e4c..e00992f 100644 --- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query2Model.java +++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query2Model.java @@ -21,9 +21,7 @@ import java.io.Serializable; import java.util.Collection; import java.util.Iterator; -import org.apache.beam.integration.nexmark.AbstractSimulator; import org.apache.beam.integration.nexmark.NexmarkConfiguration; -import org.apache.beam.integration.nexmark.NexmarkQueryModel; import org.apache.beam.integration.nexmark.NexmarkUtils; import org.apache.beam.integration.nexmark.model.AuctionPrice; import org.apache.beam.integration.nexmark.model.Bid;
