http://git-wip-us.apache.org/repos/asf/beam/blob/f4333df7/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/AbstractSimulator.java ---------------------------------------------------------------------- diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/AbstractSimulator.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/AbstractSimulator.java deleted file mode 100644 index 1395182..0000000 --- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/AbstractSimulator.java +++ /dev/null @@ -1,211 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.beam.integration.nexmark.queries; - -import java.util.ArrayList; -import java.util.Iterator; -import java.util.List; -import javax.annotation.Nullable; - -import org.apache.beam.integration.nexmark.NexmarkUtils; -import org.apache.beam.sdk.transforms.windowing.BoundedWindow; -import org.apache.beam.sdk.values.TimestampedValue; -import org.joda.time.Duration; -import org.joda.time.Instant; - -/** - * Abstract base class for simulator of a query. - * - * @param <InputT> Type of input elements. - * @param <OutputT> Type of output elements. - */ -public abstract class AbstractSimulator<InputT, OutputT> { - /** Window size for action bucket sampling. */ - private static final Duration WINDOW_SIZE = Duration.standardMinutes(1); - - /** Input event stream we should draw from. */ - private final Iterator<TimestampedValue<InputT>> input; - - /** Set to true when no more results. */ - private boolean isDone; - - /** - * Results which have not yet been returned by the {@link #results} iterator. - */ - private final List<TimestampedValue<OutputT>> pendingResults; - - /** - * Current window timestamp (ms since epoch). - */ - private long currentWindow; - - /** - * Number of (possibly intermediate) results for the current window. - */ - private long currentCount; - - /** - * Result counts per window which have not yet been returned by the {@link #resultsPerWindow} - * iterator. - */ - private final List<Long> pendingCounts; - - public AbstractSimulator(Iterator<TimestampedValue<InputT>> input) { - this.input = input; - isDone = false; - pendingResults = new ArrayList<>(); - currentWindow = BoundedWindow.TIMESTAMP_MIN_VALUE.getMillis(); - currentCount = 0; - pendingCounts = new ArrayList<>(); - } - - /** Called by implementors of {@link #run}: Fetch the next input element. */ - @Nullable - TimestampedValue<InputT> nextInput() { - if (!input.hasNext()) { - return null; - } - TimestampedValue<InputT> timestampedInput = input.next(); - NexmarkUtils.info("input: %s", timestampedInput); - return timestampedInput; - } - - /** - * Called by implementors of {@link #run}: Capture an intermediate result, for the purpose of - * recording the expected activity of the query over time. - */ - void addIntermediateResult(TimestampedValue<OutputT> result) { - NexmarkUtils.info("intermediate result: %s", result); - updateCounts(result.getTimestamp()); - } - - /** - * Called by implementors of {@link #run}: Capture a final result, for the purpose of checking - * semantic correctness. - */ - void addResult(TimestampedValue<OutputT> result) { - NexmarkUtils.info("result: %s", result); - pendingResults.add(result); - updateCounts(result.getTimestamp()); - } - - /** - * Update window and counts. - */ - private void updateCounts(Instant timestamp) { - long window = timestamp.getMillis() - timestamp.getMillis() % WINDOW_SIZE.getMillis(); - if (window > currentWindow) { - if (currentWindow > BoundedWindow.TIMESTAMP_MIN_VALUE.getMillis()) { - pendingCounts.add(currentCount); - } - currentCount = 0; - currentWindow = window; - } - currentCount++; - } - - /** Called by implementors of {@link #run}: Record that no more results will be emitted. */ - void allDone() { - isDone = true; - } - - /** - * Overridden by derived classes to do the next increment of work. Each call should - * call one or more of {@link #nextInput}, {@link #addIntermediateResult}, {@link #addResult} - * or {@link #allDone}. It is ok for a single call to emit more than one result via - * {@link #addResult}. It is ok for a single call to run the entire simulation, though - * this will prevent the {@link #results} and {@link #resultsPerWindow} iterators to - * stall. - */ - protected abstract void run(); - - /** - * Return iterator over all expected timestamped results. The underlying simulator state is - * changed. Only one of {@link #results} or {@link #resultsPerWindow} can be called. - */ - public Iterator<TimestampedValue<OutputT>> results() { - return new Iterator<TimestampedValue<OutputT>>() { - @Override - public boolean hasNext() { - while (true) { - if (!pendingResults.isEmpty()) { - return true; - } - if (isDone) { - return false; - } - run(); - } - } - - @Override - public TimestampedValue<OutputT> next() { - TimestampedValue<OutputT> result = pendingResults.get(0); - pendingResults.remove(0); - return result; - } - - @Override - public void remove() { - throw new UnsupportedOperationException(); - } - }; - } - - /** - * Return an iterator over the number of results per {@link #WINDOW_SIZE} period. The underlying - * simulator state is changed. Only one of {@link #results} or {@link #resultsPerWindow} can be - * called. - */ - public Iterator<Long> resultsPerWindow() { - return new Iterator<Long>() { - @Override - public boolean hasNext() { - while (true) { - if (!pendingCounts.isEmpty()) { - return true; - } - if (isDone) { - if (currentCount > 0) { - pendingCounts.add(currentCount); - currentCount = 0; - currentWindow = BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis(); - return true; - } else { - return false; - } - } - run(); - } - } - - @Override - public Long next() { - Long result = pendingCounts.get(0); - pendingCounts.remove(0); - return result; - } - - @Override - public void remove() { - throw new UnsupportedOperationException(); - } - }; - } -}
http://git-wip-us.apache.org/repos/asf/beam/blob/f4333df7/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/NexmarkQuery.java ---------------------------------------------------------------------- diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/NexmarkQuery.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/NexmarkQuery.java deleted file mode 100644 index 8b74282..0000000 --- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/NexmarkQuery.java +++ /dev/null @@ -1,270 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.integration.nexmark.queries; - -import org.apache.beam.integration.nexmark.Monitor; -import org.apache.beam.integration.nexmark.NexmarkConfiguration; -import org.apache.beam.integration.nexmark.NexmarkUtils; -import org.apache.beam.integration.nexmark.model.Auction; -import org.apache.beam.integration.nexmark.model.Bid; -import org.apache.beam.integration.nexmark.model.Event; -import org.apache.beam.integration.nexmark.model.KnownSize; -import org.apache.beam.integration.nexmark.model.Person; -import org.apache.beam.sdk.metrics.Counter; -import org.apache.beam.sdk.metrics.Metrics; -import org.apache.beam.sdk.transforms.DoFn; -import org.apache.beam.sdk.transforms.Filter; -import org.apache.beam.sdk.transforms.PTransform; -import org.apache.beam.sdk.transforms.ParDo; -import org.apache.beam.sdk.transforms.SerializableFunction; -import org.apache.beam.sdk.values.KV; -import org.apache.beam.sdk.values.PCollection; -import org.apache.beam.sdk.values.TimestampedValue; -import org.apache.beam.sdk.values.TupleTag; -import org.joda.time.Instant; - -/** - * Base class for the eight 'NEXMark' queries. Supplies some fragments common to - * multiple queries. - */ -public abstract class NexmarkQuery - extends PTransform<PCollection<Event>, PCollection<TimestampedValue<KnownSize>>> { - public static final TupleTag<Auction> AUCTION_TAG = new TupleTag<>("auctions"); - public static final TupleTag<Bid> BID_TAG = new TupleTag<>("bids"); - static final TupleTag<Person> PERSON_TAG = new TupleTag<>("person"); - - /** Predicate to detect a new person event. */ - private static final SerializableFunction<Event, Boolean> IS_NEW_PERSON = - new SerializableFunction<Event, Boolean>() { - @Override - public Boolean apply(Event event) { - return event.newPerson != null; - } - }; - - /** DoFn to convert a new person event to a person. */ - private static final DoFn<Event, Person> AS_PERSON = new DoFn<Event, Person>() { - @ProcessElement - public void processElement(ProcessContext c) { - c.output(c.element().newPerson); - } - }; - - /** Predicate to detect a new auction event. */ - private static final SerializableFunction<Event, Boolean> IS_NEW_AUCTION = - new SerializableFunction<Event, Boolean>() { - @Override - public Boolean apply(Event event) { - return event.newAuction != null; - } - }; - - /** DoFn to convert a new auction event to an auction. */ - private static final DoFn<Event, Auction> AS_AUCTION = new DoFn<Event, Auction>() { - @ProcessElement - public void processElement(ProcessContext c) { - c.output(c.element().newAuction); - } - }; - - /** Predicate to detect a new bid event. */ - private static final SerializableFunction<Event, Boolean> IS_BID = - new SerializableFunction<Event, Boolean>() { - @Override - public Boolean apply(Event event) { - return event.bid != null; - } - }; - - /** DoFn to convert a bid event to a bid. */ - private static final DoFn<Event, Bid> AS_BID = new DoFn<Event, Bid>() { - @ProcessElement - public void processElement(ProcessContext c) { - c.output(c.element().bid); - } - }; - - /** Transform to key each person by their id. */ - static final ParDo.SingleOutput<Person, KV<Long, Person>> PERSON_BY_ID = - ParDo.of(new DoFn<Person, KV<Long, Person>>() { - @ProcessElement - public void processElement(ProcessContext c) { - c.output(KV.of(c.element().id, c.element())); - } - }); - - /** Transform to key each auction by its id. */ - static final ParDo.SingleOutput<Auction, KV<Long, Auction>> AUCTION_BY_ID = - ParDo.of(new DoFn<Auction, KV<Long, Auction>>() { - @ProcessElement - public void processElement(ProcessContext c) { - c.output(KV.of(c.element().id, c.element())); - } - }); - - /** Transform to key each auction by its seller id. */ - static final ParDo.SingleOutput<Auction, KV<Long, Auction>> AUCTION_BY_SELLER = - ParDo.of(new DoFn<Auction, KV<Long, Auction>>() { - @ProcessElement - public void processElement(ProcessContext c) { - c.output(KV.of(c.element().seller, c.element())); - } - }); - - /** Transform to key each bid by it's auction id. */ - static final ParDo.SingleOutput<Bid, KV<Long, Bid>> BID_BY_AUCTION = - ParDo.of(new DoFn<Bid, KV<Long, Bid>>() { - @ProcessElement - public void processElement(ProcessContext c) { - c.output(KV.of(c.element().auction, c.element())); - } - }); - - /** Transform to project the auction id from each bid. */ - static final ParDo.SingleOutput<Bid, Long> BID_TO_AUCTION = - ParDo.of(new DoFn<Bid, Long>() { - @ProcessElement - public void processElement(ProcessContext c) { - c.output(c.element().auction); - } - }); - - /** Transform to project the price from each bid. */ - static final ParDo.SingleOutput<Bid, Long> BID_TO_PRICE = - ParDo.of(new DoFn<Bid, Long>() { - @ProcessElement - public void processElement(ProcessContext c) { - c.output(c.element().price); - } - }); - - /** Transform to emit each event with the timestamp embedded within it. */ - public static final ParDo.SingleOutput<Event, Event> EVENT_TIMESTAMP_FROM_DATA = - ParDo.of(new DoFn<Event, Event>() { - @ProcessElement - public void processElement(ProcessContext c) { - Event e = c.element(); - if (e.bid != null) { - c.outputWithTimestamp(e, new Instant(e.bid.dateTime)); - } else if (e.newPerson != null) { - c.outputWithTimestamp(e, new Instant(e.newPerson.dateTime)); - } else if (e.newAuction != null) { - c.outputWithTimestamp(e, new Instant(e.newAuction.dateTime)); - } - } - }); - - /** - * Transform to filter for just the new auction events. - */ - public static final PTransform<PCollection<Event>, PCollection<Auction>> JUST_NEW_AUCTIONS = - new PTransform<PCollection<Event>, PCollection<Auction>>("justNewAuctions") { - @Override - public PCollection<Auction> expand(PCollection<Event> input) { - return input.apply("IsNewAuction", Filter.by(IS_NEW_AUCTION)) - .apply("AsAuction", ParDo.of(AS_AUCTION)); - } - }; - - /** - * Transform to filter for just the new person events. - */ - public static final PTransform<PCollection<Event>, PCollection<Person>> JUST_NEW_PERSONS = - new PTransform<PCollection<Event>, PCollection<Person>>("justNewPersons") { - @Override - public PCollection<Person> expand(PCollection<Event> input) { - return input.apply("IsNewPerson", Filter.by(IS_NEW_PERSON)) - .apply("AsPerson", ParDo.of(AS_PERSON)); - } - }; - - /** - * Transform to filter for just the bid events. - */ - public static final PTransform<PCollection<Event>, PCollection<Bid>> JUST_BIDS = - new PTransform<PCollection<Event>, PCollection<Bid>>("justBids") { - @Override - public PCollection<Bid> expand(PCollection<Event> input) { - return input.apply("IsBid", Filter.by(IS_BID)) - .apply("AsBid", ParDo.of(AS_BID)); - } - }; - - final NexmarkConfiguration configuration; - public final Monitor<Event> eventMonitor; - public final Monitor<KnownSize> resultMonitor; - private final Monitor<Event> endOfStreamMonitor; - private final Counter fatalCounter; - - NexmarkQuery(NexmarkConfiguration configuration, String name) { - super(name); - this.configuration = configuration; - if (configuration.debug) { - eventMonitor = new Monitor<>(name + ".Events", "event"); - resultMonitor = new Monitor<>(name + ".Results", "result"); - endOfStreamMonitor = new Monitor<>(name + ".EndOfStream", "end"); - fatalCounter = Metrics.counter(name , "fatal"); - } else { - eventMonitor = null; - resultMonitor = null; - endOfStreamMonitor = null; - fatalCounter = null; - } - } - - /** - * Implement the actual query. All we know about the result is it has a known encoded size. - */ - protected abstract PCollection<KnownSize> applyPrim(PCollection<Event> events); - - @Override - public PCollection<TimestampedValue<KnownSize>> expand(PCollection<Event> events) { - - if (configuration.debug) { - events = - events - // Monitor events as they go by. - .apply(name + ".Monitor", eventMonitor.getTransform()) - // Count each type of event. - .apply(name + ".Snoop", NexmarkUtils.snoop(name)); - } - - if (configuration.cpuDelayMs > 0) { - // Slow down by pegging one core at 100%. - events = events.apply(name + ".CpuDelay", - NexmarkUtils.<Event>cpuDelay(name, configuration.cpuDelayMs)); - } - - if (configuration.diskBusyBytes > 0) { - // Slow down by forcing bytes to durable store. - events = events.apply(name + ".DiskBusy", - NexmarkUtils.<Event>diskBusy(configuration.diskBusyBytes)); - } - - // Run the query. - PCollection<KnownSize> queryResults = applyPrim(events); - - if (configuration.debug) { - // Monitor results as they go by. - queryResults = queryResults.apply(name + ".Debug", resultMonitor.getTransform()); - } - - // Timestamp the query results. - return queryResults.apply(name + ".Stamp", NexmarkUtils.<KnownSize>stamp(name)); - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/f4333df7/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/NexmarkQueryModel.java ---------------------------------------------------------------------- diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/NexmarkQueryModel.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/NexmarkQueryModel.java deleted file mode 100644 index bfa668b..0000000 --- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/NexmarkQueryModel.java +++ /dev/null @@ -1,118 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.integration.nexmark.queries; - -import java.io.Serializable; -import java.util.ArrayList; -import java.util.Collection; -import java.util.HashSet; -import java.util.Iterator; -import java.util.List; -import java.util.Set; - -import org.apache.beam.integration.nexmark.NexmarkConfiguration; -import org.apache.beam.integration.nexmark.model.KnownSize; -import org.apache.beam.sdk.transforms.SerializableFunction; -import org.apache.beam.sdk.values.TimestampedValue; - - -import org.hamcrest.core.IsEqual; -import org.joda.time.Duration; -import org.joda.time.Instant; -import org.junit.Assert; - -/** - * Base class for models of the eight NEXMark queries. Provides an assertion function which can be - * applied against the actual query results to check their consistency with the model. - */ -public abstract class NexmarkQueryModel implements Serializable { - public final NexmarkConfiguration configuration; - - NexmarkQueryModel(NexmarkConfiguration configuration) { - this.configuration = configuration; - } - - /** - * Return the start of the most recent window of {@code size} and {@code period} which ends - * strictly before {@code timestamp}. - */ - static Instant windowStart(Duration size, Duration period, Instant timestamp) { - long ts = timestamp.getMillis(); - long p = period.getMillis(); - long lim = ts - ts % p; - long s = size.getMillis(); - return new Instant(lim - s); - } - - /** Convert {@code itr} to strings capturing values, timestamps and order. */ - static <T> List<String> toValueTimestampOrder(Iterator<TimestampedValue<T>> itr) { - List<String> strings = new ArrayList<>(); - while (itr.hasNext()) { - strings.add(itr.next().toString()); - } - return strings; - } - - /** Convert {@code itr} to strings capturing values and order. */ - static <T> List<String> toValueOrder(Iterator<TimestampedValue<T>> itr) { - List<String> strings = new ArrayList<>(); - while (itr.hasNext()) { - strings.add(itr.next().getValue().toString()); - } - return strings; - } - - /** Convert {@code itr} to strings capturing values only. */ - static <T> Set<String> toValue(Iterator<TimestampedValue<T>> itr) { - Set<String> strings = new HashSet<>(); - while (itr.hasNext()) { - strings.add(itr.next().getValue().toString()); - } - return strings; - } - - /** Return simulator for query. */ - public abstract AbstractSimulator<?, ?> simulator(); - - /** Return sub-sequence of results which are significant for model. */ - Iterable<TimestampedValue<KnownSize>> relevantResults( - Iterable<TimestampedValue<KnownSize>> results) { - return results; - } - - /** - * Convert iterator of elements to collection of strings to use when testing coherence of model - * against actual query results. - */ - protected abstract <T> Collection<String> toCollection(Iterator<TimestampedValue<T>> itr); - - /** Return assertion to use on results of pipeline for this query. */ - public SerializableFunction<Iterable<TimestampedValue<KnownSize>>, Void> assertionFor() { - final Collection<String> expectedStrings = toCollection(simulator().results()); - - return new SerializableFunction<Iterable<TimestampedValue<KnownSize>>, Void>() { - @Override - public Void apply(Iterable<TimestampedValue<KnownSize>> actual) { - Collection<String> actualStrings = toCollection(relevantResults(actual).iterator()); - Assert.assertThat("wrong pipeline output", actualStrings, - IsEqual.equalTo(expectedStrings)); - return null; - } - }; - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/f4333df7/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query0.java ---------------------------------------------------------------------- diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query0.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query0.java deleted file mode 100644 index 00a49a8..0000000 --- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query0.java +++ /dev/null @@ -1,71 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.integration.nexmark.queries; - -import java.io.ByteArrayInputStream; -import java.io.ByteArrayOutputStream; -import java.io.IOException; - -import org.apache.beam.integration.nexmark.NexmarkConfiguration; -import org.apache.beam.integration.nexmark.NexmarkUtils; -import org.apache.beam.integration.nexmark.model.Event; -import org.apache.beam.integration.nexmark.model.KnownSize; -import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.coders.CoderException; -import org.apache.beam.sdk.metrics.Counter; -import org.apache.beam.sdk.metrics.Metrics; -import org.apache.beam.sdk.transforms.DoFn; -import org.apache.beam.sdk.transforms.ParDo; -import org.apache.beam.sdk.values.PCollection; - -/** - * Query 0: Pass events through unchanged. However, force them to do a round trip through - * serialization so that we measure the impact of the choice of coders. - */ -public class Query0 extends NexmarkQuery { - public Query0(NexmarkConfiguration configuration) { - super(configuration, "Query0"); - } - - private PCollection<Event> applyTyped(PCollection<Event> events) { - final Coder<Event> coder = events.getCoder(); - return events - // Force round trip through coder. - .apply(name + ".Serialize", - ParDo.of(new DoFn<Event, Event>() { - private final Counter bytesMetric = - Metrics.counter(name , "bytes"); - - @ProcessElement - public void processElement(ProcessContext c) throws CoderException, IOException { - ByteArrayOutputStream outStream = new ByteArrayOutputStream(); - coder.encode(c.element(), outStream, Coder.Context.OUTER); - byte[] byteArray = outStream.toByteArray(); - bytesMetric.inc((long) byteArray.length); - ByteArrayInputStream inStream = new ByteArrayInputStream(byteArray); - Event event = coder.decode(inStream, Coder.Context.OUTER); - c.output(event); - } - })); - } - - @Override - protected PCollection<KnownSize> applyPrim(PCollection<Event> events) { - return NexmarkUtils.castToKnownSize(name, applyTyped(events)); - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/f4333df7/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query0Model.java ---------------------------------------------------------------------- diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query0Model.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query0Model.java deleted file mode 100644 index e2522b8..0000000 --- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query0Model.java +++ /dev/null @@ -1,64 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.integration.nexmark.queries; - -import java.util.Collection; -import java.util.Iterator; - -import org.apache.beam.integration.nexmark.NexmarkConfiguration; -import org.apache.beam.integration.nexmark.NexmarkUtils; -import org.apache.beam.integration.nexmark.model.Event; -import org.apache.beam.sdk.values.TimestampedValue; - -/** - * A direct implementation of {@link Query0}. - */ -public class Query0Model extends NexmarkQueryModel { - /** - * Simulator for query 0. - */ - private static class Simulator extends AbstractSimulator<Event, Event> { - public Simulator(NexmarkConfiguration configuration) { - super(NexmarkUtils.standardEventIterator(configuration)); - } - - @Override - protected void run() { - TimestampedValue<Event> timestampedEvent = nextInput(); - if (timestampedEvent == null) { - allDone(); - return; - } - addResult(timestampedEvent); - } - } - - public Query0Model(NexmarkConfiguration configuration) { - super(configuration); - } - - @Override - public AbstractSimulator<?, ?> simulator() { - return new Simulator(configuration); - } - - @Override - protected <T> Collection<String> toCollection(Iterator<TimestampedValue<T>> itr) { - return toValueTimestampOrder(itr); - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/f4333df7/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query1.java ---------------------------------------------------------------------- diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query1.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query1.java deleted file mode 100644 index 8d90b70..0000000 --- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query1.java +++ /dev/null @@ -1,67 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.integration.nexmark.queries; - -import org.apache.beam.integration.nexmark.NexmarkConfiguration; -import org.apache.beam.integration.nexmark.NexmarkUtils; -import org.apache.beam.integration.nexmark.model.Bid; -import org.apache.beam.integration.nexmark.model.Event; -import org.apache.beam.integration.nexmark.model.KnownSize; -import org.apache.beam.sdk.transforms.DoFn; -import org.apache.beam.sdk.transforms.ParDo; -import org.apache.beam.sdk.values.PCollection; - -/** - * Query 1, 'Currency Conversion'. Convert each bid value from dollars to euros. - * In CQL syntax: - * - * <pre> - * SELECT Istream(auction, DOLTOEUR(price), bidder, datetime) - * FROM bid [ROWS UNBOUNDED]; - * </pre> - * - * <p>To make things more interesting, allow the 'currency conversion' to be arbitrarily - * slowed down. - */ -public class Query1 extends NexmarkQuery { - public Query1(NexmarkConfiguration configuration) { - super(configuration, "Query1"); - } - - private PCollection<Bid> applyTyped(PCollection<Event> events) { - return events - // Only want the bid events. - .apply(JUST_BIDS) - - // Map the conversion function over all bids. - .apply(name + ".ToEuros", - ParDo.of(new DoFn<Bid, Bid>() { - @ProcessElement - public void processElement(ProcessContext c) { - Bid bid = c.element(); - c.output(new Bid( - bid.auction, bid.bidder, (bid.price * 89) / 100, bid.dateTime, bid.extra)); - } - })); - } - - @Override - protected PCollection<KnownSize> applyPrim(PCollection<Event> events) { - return NexmarkUtils.castToKnownSize(name, applyTyped(events)); - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/f4333df7/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query10.java ---------------------------------------------------------------------- diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query10.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query10.java deleted file mode 100644 index 378d01e..0000000 --- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query10.java +++ /dev/null @@ -1,367 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.integration.nexmark.queries; - -import java.io.IOException; -import java.io.OutputStream; -import java.io.Serializable; -import java.nio.channels.Channels; -import java.nio.channels.WritableByteChannel; -import java.util.concurrent.ThreadLocalRandom; -import javax.annotation.Nullable; -import org.apache.beam.integration.nexmark.NexmarkConfiguration; -import org.apache.beam.integration.nexmark.NexmarkUtils; -import org.apache.beam.integration.nexmark.model.Done; -import org.apache.beam.integration.nexmark.model.Event; -import org.apache.beam.integration.nexmark.model.KnownSize; -import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.extensions.gcp.options.GcsOptions; -import org.apache.beam.sdk.metrics.Counter; -import org.apache.beam.sdk.metrics.Metrics; -import org.apache.beam.sdk.transforms.DoFn; -import org.apache.beam.sdk.transforms.GroupByKey; -import org.apache.beam.sdk.transforms.ParDo; -import org.apache.beam.sdk.transforms.windowing.AfterEach; -import org.apache.beam.sdk.transforms.windowing.AfterFirst; -import org.apache.beam.sdk.transforms.windowing.AfterPane; -import org.apache.beam.sdk.transforms.windowing.AfterProcessingTime; -import org.apache.beam.sdk.transforms.windowing.AfterWatermark; -import org.apache.beam.sdk.transforms.windowing.BoundedWindow; -import org.apache.beam.sdk.transforms.windowing.FixedWindows; -import org.apache.beam.sdk.transforms.windowing.PaneInfo; -import org.apache.beam.sdk.transforms.windowing.PaneInfo.Timing; -import org.apache.beam.sdk.transforms.windowing.Repeatedly; -import org.apache.beam.sdk.transforms.windowing.Window; -import org.apache.beam.sdk.values.KV; -import org.apache.beam.sdk.values.PCollection; -import org.joda.time.Duration; -import org.joda.time.Instant; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Query "10", 'Log to sharded files' (Not in original suite.) - * - * <p>Every windowSizeSec, save all events from the last period into 2*maxWorkers log files. - */ -public class Query10 extends NexmarkQuery { - private static final Logger LOG = LoggerFactory.getLogger(Query10.class); - private static final int CHANNEL_BUFFER = 8 << 20; // 8MB - private static final int NUM_SHARDS_PER_WORKER = 5; - private static final Duration LATE_BATCHING_PERIOD = Duration.standardSeconds(10); - - /** - * Capture everything we need to know about the records in a single output file. - */ - private static class OutputFile implements Serializable { - /** Maximum possible timestamp of records in file. */ - private final Instant maxTimestamp; - /** Shard within window. */ - private final String shard; - /** Index of file in all files in shard. */ - private final long index; - /** Timing of records in this file. */ - private final PaneInfo.Timing timing; - /** Path to file containing records, or {@literal null} if no output required. */ - @Nullable - private final String filename; - - public OutputFile( - Instant maxTimestamp, - String shard, - long index, - PaneInfo.Timing timing, - @Nullable String filename) { - this.maxTimestamp = maxTimestamp; - this.shard = shard; - this.index = index; - this.timing = timing; - this.filename = filename; - } - - @Override - public String toString() { - return String.format("%s %s %d %s %s%n", maxTimestamp, shard, index, timing, filename); - } - } - - /** - * GCS uri prefix for all log and 'finished' files. If null they won't be written. - */ - @Nullable - private String outputPath; - - /** - * Maximum number of workers, used to determine log sharding factor. - */ - private int maxNumWorkers; - - public Query10(NexmarkConfiguration configuration) { - super(configuration, "Query10"); - } - - public void setOutputPath(@Nullable String outputPath) { - this.outputPath = outputPath; - } - - public void setMaxNumWorkers(int maxNumWorkers) { - this.maxNumWorkers = maxNumWorkers; - } - - /** - * Return channel for writing bytes to GCS. - */ - private WritableByteChannel openWritableGcsFile(GcsOptions options, String filename) - throws IOException { - //TODO - // Fix after PR: right now this is a specific Google added use case - // Discuss it on ML: shall we keep GCS or use HDFS or use a generic beam filesystem way. - throw new UnsupportedOperationException("Disabled after removal of GcsIOChannelFactory"); - } - - /** Return a short string to describe {@code timing}. */ - private String timingToString(PaneInfo.Timing timing) { - switch (timing) { - case EARLY: - return "E"; - case ON_TIME: - return "O"; - case LATE: - return "L"; - } - throw new RuntimeException(); // cases are exhaustive - } - - /** Construct an {@link OutputFile} for {@code pane} in {@code window} for {@code shard}. */ - private OutputFile outputFileFor(BoundedWindow window, String shard, PaneInfo pane) { - @Nullable String filename = - outputPath == null - ? null - : String.format("%s/LOG-%s-%s-%03d-%s-%x", - outputPath, window.maxTimestamp(), shard, pane.getIndex(), - timingToString(pane.getTiming()), - ThreadLocalRandom.current().nextLong()); - return new OutputFile(window.maxTimestamp(), shard, pane.getIndex(), - pane.getTiming(), filename); - } - - /** - * Return path to which we should write the index for {@code window}, or {@literal null} - * if no output required. - */ - @Nullable - private String indexPathFor(BoundedWindow window) { - if (outputPath == null) { - return null; - } - return String.format("%s/INDEX-%s", outputPath, window.maxTimestamp()); - } - - private PCollection<Done> applyTyped(PCollection<Event> events) { - final int numLogShards = maxNumWorkers * NUM_SHARDS_PER_WORKER; - - return events - .apply(name + ".ShardEvents", - ParDo.of(new DoFn<Event, KV<String, Event>>() { - private final Counter lateCounter = Metrics.counter(name , "actuallyLateEvent"); - private final Counter onTimeCounter = Metrics.counter(name , "onTimeCounter"); - - @ProcessElement - public void processElement(ProcessContext c) { - if (c.element().hasAnnotation("LATE")) { - lateCounter.inc(); - LOG.info("Observed late: %s", c.element()); - } else { - onTimeCounter.inc(); - } - int shardNum = (int) Math.abs((long) c.element().hashCode() % numLogShards); - String shard = String.format("shard-%05d-of-%05d", shardNum, numLogShards); - c.output(KV.of(shard, c.element())); - } - })) - .apply(name + ".WindowEvents", - Window.<KV<String, Event>>into( - FixedWindows.of(Duration.standardSeconds(configuration.windowSizeSec))) - .triggering(AfterEach.inOrder( - Repeatedly - .forever(AfterPane.elementCountAtLeast(configuration.maxLogEvents)) - .orFinally(AfterWatermark.pastEndOfWindow()), - Repeatedly.forever( - AfterFirst.of(AfterPane.elementCountAtLeast(configuration.maxLogEvents), - AfterProcessingTime.pastFirstElementInPane() - .plusDelayOf(LATE_BATCHING_PERIOD))))) - .discardingFiredPanes() - // Use a 1 day allowed lateness so that any forgotten hold will stall the - // pipeline for that period and be very noticeable. - .withAllowedLateness(Duration.standardDays(1))) - .apply(name + ".GroupByKey", GroupByKey.<String, Event>create()) - .apply(name + ".CheckForLateEvents", - ParDo.of(new DoFn<KV<String, Iterable<Event>>, - KV<String, Iterable<Event>>>() { - private final Counter earlyCounter = Metrics.counter(name , "earlyShard"); - private final Counter onTimeCounter = Metrics.counter(name , "onTimeShard"); - private final Counter lateCounter = Metrics.counter(name , "lateShard"); - private final Counter unexpectedLatePaneCounter = - Metrics.counter(name , "ERROR_unexpectedLatePane"); - private final Counter unexpectedOnTimeElementCounter = - Metrics.counter(name , "ERROR_unexpectedOnTimeElement"); - - @ProcessElement - public void processElement(ProcessContext c, BoundedWindow window) { - int numLate = 0; - int numOnTime = 0; - for (Event event : c.element().getValue()) { - if (event.hasAnnotation("LATE")) { - numLate++; - } else { - numOnTime++; - } - } - String shard = c.element().getKey(); - LOG.info(String.format( - "%s with timestamp %s has %d actually late and %d on-time " - + "elements in pane %s for window %s", - shard, c.timestamp(), numLate, numOnTime, c.pane(), - window.maxTimestamp())); - if (c.pane().getTiming() == PaneInfo.Timing.LATE) { - if (numLate == 0) { - LOG.error( - "ERROR! No late events in late pane for %s", shard); - unexpectedLatePaneCounter.inc(); - } - if (numOnTime > 0) { - LOG.error( - "ERROR! Have %d on-time events in late pane for %s", - numOnTime, shard); - unexpectedOnTimeElementCounter.inc(); - } - lateCounter.inc(); - } else if (c.pane().getTiming() == PaneInfo.Timing.EARLY) { - if (numOnTime + numLate < configuration.maxLogEvents) { - LOG.error( - "ERROR! Only have %d events in early pane for %s", - numOnTime + numLate, shard); - } - earlyCounter.inc(); - } else { - onTimeCounter.inc(); - } - c.output(c.element()); - } - })) - .apply(name + ".UploadEvents", - ParDo.of(new DoFn<KV<String, Iterable<Event>>, - KV<Void, OutputFile>>() { - private final Counter savedFileCounter = Metrics.counter(name , "savedFile"); - private final Counter writtenRecordsCounter = Metrics.counter(name , "writtenRecords"); - - @ProcessElement - public void processElement(ProcessContext c, BoundedWindow window) - throws IOException { - String shard = c.element().getKey(); - GcsOptions options = c.getPipelineOptions().as(GcsOptions.class); - OutputFile outputFile = outputFileFor(window, shard, c.pane()); - LOG.info(String.format( - "Writing %s with record timestamp %s, window timestamp %s, pane %s", - shard, c.timestamp(), window.maxTimestamp(), c.pane())); - if (outputFile.filename != null) { - LOG.info("Beginning write to '%s'", outputFile.filename); - int n = 0; - try (OutputStream output = - Channels.newOutputStream(openWritableGcsFile(options, outputFile - .filename))) { - for (Event event : c.element().getValue()) { - Event.CODER.encode(event, output, Coder.Context.OUTER); - writtenRecordsCounter.inc(); - if (++n % 10000 == 0) { - LOG.info("So far written %d records to '%s'", n, - outputFile.filename); - } - } - } - LOG.info("Written all %d records to '%s'", n, outputFile.filename); - } - savedFileCounter.inc(); - c.output(KV.<Void, OutputFile>of(null, outputFile)); - } - })) - // Clear fancy triggering from above. - .apply(name + ".WindowLogFiles", Window.<KV<Void, OutputFile>>into( - FixedWindows.of(Duration.standardSeconds(configuration.windowSizeSec))) - .triggering(AfterWatermark.pastEndOfWindow()) - // We expect no late data here, but we'll assume the worst so we can detect any. - .withAllowedLateness(Duration.standardDays(1)) - .discardingFiredPanes()) - // this GroupByKey allows to have one file per window - .apply(name + ".GroupByKey2", GroupByKey.<Void, OutputFile>create()) - .apply(name + ".Index", - ParDo.of(new DoFn<KV<Void, Iterable<OutputFile>>, Done>() { - private final Counter unexpectedLateCounter = - Metrics.counter(name , "ERROR_unexpectedLate"); - private final Counter unexpectedEarlyCounter = - Metrics.counter(name , "ERROR_unexpectedEarly"); - private final Counter unexpectedIndexCounter = - Metrics.counter(name , "ERROR_unexpectedIndex"); - private final Counter finalizedCounter = Metrics.counter(name , "indexed"); - - @ProcessElement - public void processElement(ProcessContext c, BoundedWindow window) - throws IOException { - if (c.pane().getTiming() == Timing.LATE) { - unexpectedLateCounter.inc(); - LOG.error("ERROR! Unexpected LATE pane: %s", c.pane()); - } else if (c.pane().getTiming() == Timing.EARLY) { - unexpectedEarlyCounter.inc(); - LOG.error("ERROR! Unexpected EARLY pane: %s", c.pane()); - } else if (c.pane().getTiming() == Timing.ON_TIME - && c.pane().getIndex() != 0) { - unexpectedIndexCounter.inc(); - LOG.error("ERROR! Unexpected ON_TIME pane index: %s", c.pane()); - } else { - GcsOptions options = c.getPipelineOptions().as(GcsOptions.class); - LOG.info( - "Index with record timestamp %s, window timestamp %s, pane %s", - c.timestamp(), window.maxTimestamp(), c.pane()); - - @Nullable String filename = indexPathFor(window); - if (filename != null) { - LOG.info("Beginning write to '%s'", filename); - int n = 0; - try (OutputStream output = - Channels.newOutputStream( - openWritableGcsFile(options, filename))) { - for (OutputFile outputFile : c.element().getValue()) { - output.write(outputFile.toString().getBytes("UTF-8")); - n++; - } - } - LOG.info("Written all %d lines to '%s'", n, filename); - } - c.output( - new Done("written for timestamp " + window.maxTimestamp())); - finalizedCounter.inc(); - } - } - })); - } - - @Override - protected PCollection<KnownSize> applyPrim(PCollection<Event> events) { - return NexmarkUtils.castToKnownSize(name, applyTyped(events)); - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/f4333df7/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query11.java ---------------------------------------------------------------------- diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query11.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query11.java deleted file mode 100644 index 6db9bcf..0000000 --- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query11.java +++ /dev/null @@ -1,79 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.integration.nexmark.queries; - -import org.apache.beam.integration.nexmark.NexmarkConfiguration; -import org.apache.beam.integration.nexmark.NexmarkUtils; -import org.apache.beam.integration.nexmark.model.Bid; -import org.apache.beam.integration.nexmark.model.BidsPerSession; -import org.apache.beam.integration.nexmark.model.Event; -import org.apache.beam.integration.nexmark.model.KnownSize; -import org.apache.beam.sdk.transforms.Count; -import org.apache.beam.sdk.transforms.DoFn; -import org.apache.beam.sdk.transforms.ParDo; -import org.apache.beam.sdk.transforms.windowing.AfterPane; -import org.apache.beam.sdk.transforms.windowing.Repeatedly; -import org.apache.beam.sdk.transforms.windowing.Sessions; -import org.apache.beam.sdk.transforms.windowing.Window; -import org.apache.beam.sdk.values.KV; -import org.apache.beam.sdk.values.PCollection; -import org.joda.time.Duration; - -/** - * Query "11", 'User sessions' (Not in original suite.) - * - * <p>Group bids by the same user into sessions with {@code windowSizeSec} max gap. - * However limit the session to at most {@code maxLogEvents}. Emit the number of - * bids per session. - */ -public class Query11 extends NexmarkQuery { - public Query11(NexmarkConfiguration configuration) { - super(configuration, "Query11"); - } - - private PCollection<BidsPerSession> applyTyped(PCollection<Event> events) { - PCollection<Long> bidders = events.apply(JUST_BIDS).apply(name + ".Rekey", - ParDo.of(new DoFn<Bid, Long>() { - - @ProcessElement public void processElement(ProcessContext c) { - Bid bid = c.element(); - c.output(bid.bidder); - } - })); - - PCollection<Long> biddersWindowed = bidders.apply( - Window.<Long>into( - Sessions.withGapDuration(Duration.standardSeconds(configuration.windowSizeSec))) - .triggering( - Repeatedly.forever(AfterPane.elementCountAtLeast(configuration.maxLogEvents))) - .discardingFiredPanes() - .withAllowedLateness(Duration.standardSeconds(configuration.occasionalDelaySec / 2))); - return biddersWindowed.apply(Count.<Long>perElement()) - .apply(name + ".ToResult", ParDo.of(new DoFn<KV<Long, Long>, BidsPerSession>() { - - @ProcessElement public void processElement(ProcessContext c) { - c.output(new BidsPerSession(c.element().getKey(), c.element().getValue())); - } - })); - } - - @Override - protected PCollection<KnownSize> applyPrim(PCollection<Event> events) { - return NexmarkUtils.castToKnownSize(name, applyTyped(events)); - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/f4333df7/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query12.java ---------------------------------------------------------------------- diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query12.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query12.java deleted file mode 100644 index 20f45fb..0000000 --- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query12.java +++ /dev/null @@ -1,80 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.integration.nexmark.queries; - -import org.apache.beam.integration.nexmark.NexmarkConfiguration; -import org.apache.beam.integration.nexmark.NexmarkUtils; -import org.apache.beam.integration.nexmark.model.Bid; -import org.apache.beam.integration.nexmark.model.BidsPerSession; -import org.apache.beam.integration.nexmark.model.Event; -import org.apache.beam.integration.nexmark.model.KnownSize; -import org.apache.beam.sdk.transforms.Count; -import org.apache.beam.sdk.transforms.DoFn; -import org.apache.beam.sdk.transforms.ParDo; -import org.apache.beam.sdk.transforms.windowing.AfterProcessingTime; -import org.apache.beam.sdk.transforms.windowing.GlobalWindows; -import org.apache.beam.sdk.transforms.windowing.Repeatedly; -import org.apache.beam.sdk.transforms.windowing.Window; -import org.apache.beam.sdk.values.KV; -import org.apache.beam.sdk.values.PCollection; -import org.joda.time.Duration; - -/** - * Query "12", 'Processing time windows' (Not in original suite.) - * - * <p>Group bids by the same user into processing time windows of windowSize. Emit the count - * of bids per window. - */ -public class Query12 extends NexmarkQuery { - public Query12(NexmarkConfiguration configuration) { - super(configuration, "Query12"); - } - - private PCollection<BidsPerSession> applyTyped(PCollection<Event> events) { - return events - .apply(JUST_BIDS) - .apply(ParDo.of(new DoFn<Bid, Long>() { - @ProcessElement - public void processElement(ProcessContext c){ - c.output(c.element().bidder); - } - })) - .apply(Window.<Long>into(new GlobalWindows()) - .triggering( - Repeatedly.forever( - AfterProcessingTime.pastFirstElementInPane() - .plusDelayOf( - Duration.standardSeconds(configuration.windowSizeSec)))) - .discardingFiredPanes() - .withAllowedLateness(Duration.ZERO)) - .apply(Count.<Long>perElement()) - .apply(name + ".ToResult", - ParDo.of(new DoFn<KV<Long, Long>, BidsPerSession>() { - @ProcessElement - public void processElement(ProcessContext c) { - c.output( - new BidsPerSession(c.element().getKey(), c.element().getValue())); - } - })); - } - - @Override - protected PCollection<KnownSize> applyPrim(PCollection<Event> events) { - return NexmarkUtils.castToKnownSize(name, applyTyped(events)); - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/f4333df7/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query1Model.java ---------------------------------------------------------------------- diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query1Model.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query1Model.java deleted file mode 100644 index f07db80..0000000 --- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query1Model.java +++ /dev/null @@ -1,76 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.integration.nexmark.queries; - -import java.io.Serializable; -import java.util.Collection; -import java.util.Iterator; - -import org.apache.beam.integration.nexmark.NexmarkConfiguration; -import org.apache.beam.integration.nexmark.NexmarkUtils; -import org.apache.beam.integration.nexmark.model.Bid; -import org.apache.beam.integration.nexmark.model.Event; -import org.apache.beam.sdk.values.TimestampedValue; - -/** - * A direct implementation of {@link Query1}. - */ -public class Query1Model extends NexmarkQueryModel implements Serializable { - /** - * Simulator for query 1. - */ - private static class Simulator extends AbstractSimulator<Event, Bid> { - public Simulator(NexmarkConfiguration configuration) { - super(NexmarkUtils.standardEventIterator(configuration)); - } - - @Override - protected void run() { - TimestampedValue<Event> timestampedEvent = nextInput(); - if (timestampedEvent == null) { - allDone(); - return; - } - Event event = timestampedEvent.getValue(); - if (event.bid == null) { - // Ignore non-bid events. - return; - } - Bid bid = event.bid; - Bid resultBid = - new Bid(bid.auction, bid.bidder, bid.price * 89 / 100, bid.dateTime, bid.extra); - TimestampedValue<Bid> result = - TimestampedValue.of(resultBid, timestampedEvent.getTimestamp()); - addResult(result); - } - } - - public Query1Model(NexmarkConfiguration configuration) { - super(configuration); - } - - @Override - public AbstractSimulator<?, ?> simulator() { - return new Simulator(configuration); - } - - @Override - protected <T> Collection<String> toCollection(Iterator<TimestampedValue<T>> itr) { - return toValueTimestampOrder(itr); - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/f4333df7/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query2.java ---------------------------------------------------------------------- diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query2.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query2.java deleted file mode 100644 index a365b97..0000000 --- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query2.java +++ /dev/null @@ -1,79 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.integration.nexmark.queries; - -import org.apache.beam.integration.nexmark.NexmarkConfiguration; -import org.apache.beam.integration.nexmark.NexmarkUtils; -import org.apache.beam.integration.nexmark.model.AuctionPrice; -import org.apache.beam.integration.nexmark.model.Bid; -import org.apache.beam.integration.nexmark.model.Event; -import org.apache.beam.integration.nexmark.model.KnownSize; -import org.apache.beam.sdk.transforms.DoFn; -import org.apache.beam.sdk.transforms.Filter; -import org.apache.beam.sdk.transforms.ParDo; -import org.apache.beam.sdk.transforms.SerializableFunction; -import org.apache.beam.sdk.values.PCollection; - -/** - * Query 2, 'Filtering. Find bids with specific auction ids and show their bid price. - * In CQL syntax: - * - * <pre> - * SELECT Rstream(auction, price) - * FROM Bid [NOW] - * WHERE auction = 1007 OR auction = 1020 OR auction = 2001 OR auction = 2019 OR auction = 2087; - * </pre> - * - * <p>As written that query will only yield a few hundred results over event streams of - * arbitrary size. To make it more interesting we instead choose bids for every - * {@code auctionSkip}'th auction. - */ -public class Query2 extends NexmarkQuery { - public Query2(NexmarkConfiguration configuration) { - super(configuration, "Query2"); - } - - private PCollection<AuctionPrice> applyTyped(PCollection<Event> events) { - return events - // Only want the bid events. - .apply(JUST_BIDS) - - // Select just the bids for the auctions we care about. - .apply(Filter.by(new SerializableFunction<Bid, Boolean>() { - @Override - public Boolean apply(Bid bid) { - return bid.auction % configuration.auctionSkip == 0; - } - })) - - // Project just auction id and price. - .apply(name + ".Project", - ParDo.of(new DoFn<Bid, AuctionPrice>() { - @ProcessElement - public void processElement(ProcessContext c) { - Bid bid = c.element(); - c.output(new AuctionPrice(bid.auction, bid.price)); - } - })); - } - - @Override - protected PCollection<KnownSize> applyPrim(PCollection<Event> events) { - return NexmarkUtils.castToKnownSize(name, applyTyped(events)); - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/f4333df7/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query2Model.java ---------------------------------------------------------------------- diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query2Model.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query2Model.java deleted file mode 100644 index e00992f..0000000 --- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query2Model.java +++ /dev/null @@ -1,80 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.integration.nexmark.queries; - -import java.io.Serializable; -import java.util.Collection; -import java.util.Iterator; - -import org.apache.beam.integration.nexmark.NexmarkConfiguration; -import org.apache.beam.integration.nexmark.NexmarkUtils; -import org.apache.beam.integration.nexmark.model.AuctionPrice; -import org.apache.beam.integration.nexmark.model.Bid; -import org.apache.beam.integration.nexmark.model.Event; -import org.apache.beam.sdk.values.TimestampedValue; - -/** - * A direct implementation of {@link Query2}. - */ -public class Query2Model extends NexmarkQueryModel implements Serializable { - /** - * Simulator for query 2. - */ - private class Simulator extends AbstractSimulator<Event, AuctionPrice> { - public Simulator(NexmarkConfiguration configuration) { - super(NexmarkUtils.standardEventIterator(configuration)); - } - - @Override - protected void run() { - TimestampedValue<Event> timestampedEvent = nextInput(); - if (timestampedEvent == null) { - allDone(); - return; - } - Event event = timestampedEvent.getValue(); - if (event.bid == null) { - // Ignore non bid events. - return; - } - Bid bid = event.bid; - if (bid.auction % configuration.auctionSkip != 0) { - // Ignore bids for auctions we don't care about. - return; - } - AuctionPrice auctionPrice = new AuctionPrice(bid.auction, bid.price); - TimestampedValue<AuctionPrice> result = - TimestampedValue.of(auctionPrice, timestampedEvent.getTimestamp()); - addResult(result); - } - } - - public Query2Model(NexmarkConfiguration configuration) { - super(configuration); - } - - @Override - public AbstractSimulator<?, ?> simulator() { - return new Simulator(configuration); - } - - @Override - protected <T> Collection<String> toCollection(Iterator<TimestampedValue<T>> itr) { - return toValueTimestampOrder(itr); - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/f4333df7/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query3.java ---------------------------------------------------------------------- diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query3.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query3.java deleted file mode 100644 index f2b66d7..0000000 --- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query3.java +++ /dev/null @@ -1,301 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.integration.nexmark.queries; - -import java.util.ArrayList; -import java.util.List; -import org.apache.beam.integration.nexmark.NexmarkConfiguration; -import org.apache.beam.integration.nexmark.NexmarkUtils; -import org.apache.beam.integration.nexmark.model.Auction; -import org.apache.beam.integration.nexmark.model.Event; -import org.apache.beam.integration.nexmark.model.KnownSize; -import org.apache.beam.integration.nexmark.model.NameCityStateId; -import org.apache.beam.integration.nexmark.model.Person; -import org.apache.beam.sdk.coders.ListCoder; -import org.apache.beam.sdk.metrics.Counter; -import org.apache.beam.sdk.metrics.Metrics; -import org.apache.beam.sdk.state.StateSpec; -import org.apache.beam.sdk.state.StateSpecs; -import org.apache.beam.sdk.state.TimeDomain; -import org.apache.beam.sdk.state.Timer; -import org.apache.beam.sdk.state.TimerSpec; -import org.apache.beam.sdk.state.TimerSpecs; -import org.apache.beam.sdk.state.ValueState; -import org.apache.beam.sdk.transforms.DoFn; -import org.apache.beam.sdk.transforms.Filter; -import org.apache.beam.sdk.transforms.ParDo; -import org.apache.beam.sdk.transforms.SerializableFunction; -import org.apache.beam.sdk.transforms.join.CoGbkResult; -import org.apache.beam.sdk.transforms.join.CoGroupByKey; -import org.apache.beam.sdk.transforms.join.KeyedPCollectionTuple; -import org.apache.beam.sdk.transforms.windowing.AfterPane; -import org.apache.beam.sdk.transforms.windowing.GlobalWindows; -import org.apache.beam.sdk.transforms.windowing.Repeatedly; -import org.apache.beam.sdk.transforms.windowing.Window; -import org.apache.beam.sdk.values.KV; -import org.apache.beam.sdk.values.PCollection; -import org.joda.time.Duration; -import org.joda.time.Instant; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Query 3, 'Local Item Suggestion'. Who is selling in OR, ID or CA in category 10, and for what - * auction ids? In CQL syntax: - * - * <pre> - * SELECT Istream(P.name, P.city, P.state, A.id) - * FROM Auction A [ROWS UNBOUNDED], Person P [ROWS UNBOUNDED] - * WHERE A.seller = P.id AND (P.state = `OR' OR P.state = `ID' OR P.state = `CA') AND A.category - * = 10; - * </pre> - * - * <p>We'll implement this query to allow 'new auction' events to come before the 'new person' - * events for the auction seller. Those auctions will be stored until the matching person is seen. - * Then all subsequent auctions for a person will use the stored person record. - * - * <p>A real system would use an external system to maintain the id-to-person association. - */ -public class Query3 extends NexmarkQuery { - - private static final Logger LOG = LoggerFactory.getLogger(Query3.class); - private final JoinDoFn joinDoFn; - - public Query3(NexmarkConfiguration configuration) { - super(configuration, "Query3"); - joinDoFn = new JoinDoFn(name, configuration.maxAuctionsWaitingTime); - } - - private PCollection<NameCityStateId> applyTyped(PCollection<Event> events) { - int numEventsInPane = 30; - - PCollection<Event> eventsWindowed = - events.apply( - Window.<Event>into(new GlobalWindows()) - .triggering(Repeatedly.forever((AfterPane.elementCountAtLeast(numEventsInPane)))) - .discardingFiredPanes() - .withAllowedLateness(Duration.ZERO)); - PCollection<KV<Long, Auction>> auctionsBySellerId = - eventsWindowed - // Only want the new auction events. - .apply(JUST_NEW_AUCTIONS) - - // We only want auctions in category 10. - .apply( - name + ".InCategory", - Filter.by( - new SerializableFunction<Auction, Boolean>() { - - @Override - public Boolean apply(Auction auction) { - return auction.category == 10; - } - })) - - // Key auctions by their seller id. - .apply("AuctionBySeller", AUCTION_BY_SELLER); - - PCollection<KV<Long, Person>> personsById = - eventsWindowed - // Only want the new people events. - .apply(JUST_NEW_PERSONS) - - // We only want people in OR, ID, CA. - .apply( - name + ".InState", - Filter.by( - new SerializableFunction<Person, Boolean>() { - - @Override - public Boolean apply(Person person) { - return person.state.equals("OR") - || person.state.equals("ID") - || person.state.equals("CA"); - } - })) - - // Key people by their id. - .apply("PersonById", PERSON_BY_ID); - - return - // Join auctions and people. - // concatenate KeyedPCollections - KeyedPCollectionTuple.of(AUCTION_TAG, auctionsBySellerId) - .and(PERSON_TAG, personsById) - // group auctions and persons by personId - .apply(CoGroupByKey.<Long>create()) - .apply(name + ".Join", ParDo.of(joinDoFn)) - - // Project what we want. - .apply( - name + ".Project", - ParDo.of( - new DoFn<KV<Auction, Person>, NameCityStateId>() { - - @ProcessElement - public void processElement(ProcessContext c) { - Auction auction = c.element().getKey(); - Person person = c.element().getValue(); - c.output( - new NameCityStateId(person.name, person.city, person.state, auction.id)); - } - })); - } - - @Override - protected PCollection<KnownSize> applyPrim(PCollection<Event> events) { - return NexmarkUtils.castToKnownSize(name, applyTyped(events)); - } - - /** - * Join {@code auctions} and {@code people} by person id and emit their cross-product one pair at - * a time. - * - * <p>We know a person may submit any number of auctions. Thus new person event must have the - * person record stored in persistent state in order to match future auctions by that person. - * - * <p>However we know that each auction is associated with at most one person, so only need to - * store auction records in persistent state until we have seen the corresponding person record. - * And of course may have already seen that record. - */ - private static class JoinDoFn extends DoFn<KV<Long, CoGbkResult>, KV<Auction, Person>> { - - private final int maxAuctionsWaitingTime; - private static final String AUCTIONS = "auctions"; - private static final String PERSON = "person"; - - @StateId(PERSON) - private static final StateSpec<ValueState<Person>> personSpec = - StateSpecs.value(Person.CODER); - - private static final String PERSON_STATE_EXPIRING = "personStateExpiring"; - - @StateId(AUCTIONS) - private final StateSpec<ValueState<List<Auction>>> auctionsSpec = - StateSpecs.value(ListCoder.of(Auction.CODER)); - - @TimerId(PERSON_STATE_EXPIRING) - private final TimerSpec timerSpec = TimerSpecs.timer(TimeDomain.EVENT_TIME); - - // Used to refer the metrics namespace - private final String name; - - private final Counter newAuctionCounter; - private final Counter newPersonCounter; - private final Counter newNewOutputCounter; - private final Counter newOldOutputCounter; - private final Counter oldNewOutputCounter; - private final Counter fatalCounter; - - private JoinDoFn(String name, int maxAuctionsWaitingTime) { - this.name = name; - this.maxAuctionsWaitingTime = maxAuctionsWaitingTime; - newAuctionCounter = Metrics.counter(name, "newAuction"); - newPersonCounter = Metrics.counter(name, "newPerson"); - newNewOutputCounter = Metrics.counter(name, "newNewOutput"); - newOldOutputCounter = Metrics.counter(name, "newOldOutput"); - oldNewOutputCounter = Metrics.counter(name, "oldNewOutput"); - fatalCounter = Metrics.counter(name , "fatal"); - } - - @ProcessElement - public void processElement( - ProcessContext c, - @TimerId(PERSON_STATE_EXPIRING) Timer timer, - @StateId(PERSON) ValueState<Person> personState, - @StateId(AUCTIONS) ValueState<List<Auction>> auctionsState) { - // We would *almost* implement this by rewindowing into the global window and - // running a combiner over the result. The combiner's accumulator would be the - // state we use below. However, combiners cannot emit intermediate results, thus - // we need to wait for the pending ReduceFn API. - - Person existingPerson = personState.read(); - if (existingPerson != null) { - // We've already seen the new person event for this person id. - // We can join with any new auctions on-the-fly without needing any - // additional persistent state. - for (Auction newAuction : c.element().getValue().getAll(AUCTION_TAG)) { - newAuctionCounter.inc(); - newOldOutputCounter.inc(); - c.output(KV.of(newAuction, existingPerson)); - } - return; - } - - Person theNewPerson = null; - for (Person newPerson : c.element().getValue().getAll(PERSON_TAG)) { - if (theNewPerson == null) { - theNewPerson = newPerson; - } else { - if (theNewPerson.equals(newPerson)) { - LOG.error("Duplicate person {}", theNewPerson); - } else { - LOG.error("Conflicting persons {} and {}", theNewPerson, newPerson); - } - fatalCounter.inc(); - continue; - } - newPersonCounter.inc(); - // We've now seen the person for this person id so can flush any - // pending auctions for the same seller id (an auction is done by only one seller). - List<Auction> pendingAuctions = auctionsState.read(); - if (pendingAuctions != null) { - for (Auction pendingAuction : pendingAuctions) { - oldNewOutputCounter.inc(); - c.output(KV.of(pendingAuction, newPerson)); - } - auctionsState.clear(); - } - // Also deal with any new auctions. - for (Auction newAuction : c.element().getValue().getAll(AUCTION_TAG)) { - newAuctionCounter.inc(); - newNewOutputCounter.inc(); - c.output(KV.of(newAuction, newPerson)); - } - // Remember this person for any future auctions. - personState.write(newPerson); - //set a time out to clear this state - Instant firingTime = new Instant(newPerson.dateTime) - .plus(Duration.standardSeconds(maxAuctionsWaitingTime)); - timer.set(firingTime); - } - if (theNewPerson != null) { - return; - } - - // We'll need to remember the auctions until we see the corresponding - // new person event. - List<Auction> pendingAuctions = auctionsState.read(); - if (pendingAuctions == null) { - pendingAuctions = new ArrayList<>(); - } - for (Auction newAuction : c.element().getValue().getAll(AUCTION_TAG)) { - newAuctionCounter.inc(); - pendingAuctions.add(newAuction); - } - auctionsState.write(pendingAuctions); - } - - @OnTimer(PERSON_STATE_EXPIRING) - public void onTimerCallback( - OnTimerContext context, - @StateId(PERSON) ValueState<Person> personState) { - personState.clear(); - } - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/f4333df7/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query3Model.java ---------------------------------------------------------------------- diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query3Model.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query3Model.java deleted file mode 100644 index f415709..0000000 --- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query3Model.java +++ /dev/null @@ -1,124 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.integration.nexmark.queries; - -import com.google.common.collect.ArrayListMultimap; -import com.google.common.collect.Multimap; - -import java.io.Serializable; -import java.util.Collection; -import java.util.HashMap; -import java.util.Iterator; -import java.util.Map; - -import org.apache.beam.integration.nexmark.NexmarkConfiguration; -import org.apache.beam.integration.nexmark.NexmarkUtils; -import org.apache.beam.integration.nexmark.model.Auction; -import org.apache.beam.integration.nexmark.model.Event; -import org.apache.beam.integration.nexmark.model.NameCityStateId; -import org.apache.beam.integration.nexmark.model.Person; -import org.apache.beam.sdk.values.TimestampedValue; -import org.joda.time.Instant; - -/** - * A direct implementation of {@link Query3}. - */ -public class Query3Model extends NexmarkQueryModel implements Serializable { - /** - * Simulator for query 3. - */ - private static class Simulator extends AbstractSimulator<Event, NameCityStateId> { - /** Auctions, indexed by seller id. */ - private final Multimap<Long, Auction> newAuctions; - - /** Persons, indexed by id. */ - private final Map<Long, Person> newPersons; - - public Simulator(NexmarkConfiguration configuration) { - super(NexmarkUtils.standardEventIterator(configuration)); - newPersons = new HashMap<>(); - newAuctions = ArrayListMultimap.create(); - } - - /** - * Capture new result. - */ - private void addResult(Auction auction, Person person, Instant timestamp) { - TimestampedValue<NameCityStateId> result = TimestampedValue.of( - new NameCityStateId(person.name, person.city, person.state, auction.id), timestamp); - addResult(result); - } - - @Override - protected void run() { - TimestampedValue<Event> timestampedEvent = nextInput(); - if (timestampedEvent == null) { - allDone(); - return; - } - Event event = timestampedEvent.getValue(); - if (event.bid != null) { - // Ignore bid events. - return; - } - - Instant timestamp = timestampedEvent.getTimestamp(); - - if (event.newAuction != null) { - // Only want auctions in category 10. - if (event.newAuction.category == 10) { - // Join new auction with existing person, if any. - Person person = newPersons.get(event.newAuction.seller); - if (person != null) { - addResult(event.newAuction, person, timestamp); - } else { - // Remember auction for future new person event. - newAuctions.put(event.newAuction.seller, event.newAuction); - } - } - } else { - // Only want people in OR, ID or CA. - if (event.newPerson.state.equals("OR") || event.newPerson.state.equals("ID") - || event.newPerson.state.equals("CA")) { - // Join new person with existing auctions. - for (Auction auction : newAuctions.get(event.newPerson.id)) { - addResult(auction, event.newPerson, timestamp); - } - // We'll never need these auctions again. - newAuctions.removeAll(event.newPerson.id); - // Remember person for future auctions. - newPersons.put(event.newPerson.id, event.newPerson); - } - } - } - } - - public Query3Model(NexmarkConfiguration configuration) { - super(configuration); - } - - @Override - public AbstractSimulator<?, ?> simulator() { - return new Simulator(configuration); - } - - @Override - protected <T> Collection<String> toCollection(Iterator<TimestampedValue<T>> itr) { - return toValue(itr); - } -}
