http://git-wip-us.apache.org/repos/asf/beam/blob/f4333df7/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/AbstractSimulator.java ---------------------------------------------------------------------- diff --git a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/AbstractSimulator.java b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/AbstractSimulator.java new file mode 100644 index 0000000..6f4ad56 --- /dev/null +++ b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/AbstractSimulator.java @@ -0,0 +1,211 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.sdk.nexmark.queries; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import javax.annotation.Nullable; + +import org.apache.beam.sdk.nexmark.NexmarkUtils; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.values.TimestampedValue; +import org.joda.time.Duration; +import org.joda.time.Instant; + +/** + * Abstract base class for simulator of a query. + * + * @param <InputT> Type of input elements. + * @param <OutputT> Type of output elements. + */ +public abstract class AbstractSimulator<InputT, OutputT> { + /** Window size for action bucket sampling. */ + private static final Duration WINDOW_SIZE = Duration.standardMinutes(1); + + /** Input event stream we should draw from. */ + private final Iterator<TimestampedValue<InputT>> input; + + /** Set to true when no more results. */ + private boolean isDone; + + /** + * Results which have not yet been returned by the {@link #results} iterator. + */ + private final List<TimestampedValue<OutputT>> pendingResults; + + /** + * Current window timestamp (ms since epoch). + */ + private long currentWindow; + + /** + * Number of (possibly intermediate) results for the current window. + */ + private long currentCount; + + /** + * Result counts per window which have not yet been returned by the {@link #resultsPerWindow} + * iterator. + */ + private final List<Long> pendingCounts; + + public AbstractSimulator(Iterator<TimestampedValue<InputT>> input) { + this.input = input; + isDone = false; + pendingResults = new ArrayList<>(); + currentWindow = BoundedWindow.TIMESTAMP_MIN_VALUE.getMillis(); + currentCount = 0; + pendingCounts = new ArrayList<>(); + } + + /** Called by implementors of {@link #run}: Fetch the next input element. */ + @Nullable + TimestampedValue<InputT> nextInput() { + if (!input.hasNext()) { + return null; + } + TimestampedValue<InputT> timestampedInput = input.next(); + NexmarkUtils.info("input: %s", timestampedInput); + return timestampedInput; + } + + /** + * Called by implementors of {@link #run}: Capture an intermediate result, for the purpose of + * recording the expected activity of the query over time. + */ + void addIntermediateResult(TimestampedValue<OutputT> result) { + NexmarkUtils.info("intermediate result: %s", result); + updateCounts(result.getTimestamp()); + } + + /** + * Called by implementors of {@link #run}: Capture a final result, for the purpose of checking + * semantic correctness. + */ + void addResult(TimestampedValue<OutputT> result) { + NexmarkUtils.info("result: %s", result); + pendingResults.add(result); + updateCounts(result.getTimestamp()); + } + + /** + * Update window and counts. + */ + private void updateCounts(Instant timestamp) { + long window = timestamp.getMillis() - timestamp.getMillis() % WINDOW_SIZE.getMillis(); + if (window > currentWindow) { + if (currentWindow > BoundedWindow.TIMESTAMP_MIN_VALUE.getMillis()) { + pendingCounts.add(currentCount); + } + currentCount = 0; + currentWindow = window; + } + currentCount++; + } + + /** Called by implementors of {@link #run}: Record that no more results will be emitted. */ + void allDone() { + isDone = true; + } + + /** + * Overridden by derived classes to do the next increment of work. Each call should + * call one or more of {@link #nextInput}, {@link #addIntermediateResult}, {@link #addResult} + * or {@link #allDone}. It is ok for a single call to emit more than one result via + * {@link #addResult}. It is ok for a single call to run the entire simulation, though + * this will prevent the {@link #results} and {@link #resultsPerWindow} iterators to + * stall. + */ + protected abstract void run(); + + /** + * Return iterator over all expected timestamped results. The underlying simulator state is + * changed. Only one of {@link #results} or {@link #resultsPerWindow} can be called. + */ + public Iterator<TimestampedValue<OutputT>> results() { + return new Iterator<TimestampedValue<OutputT>>() { + @Override + public boolean hasNext() { + while (true) { + if (!pendingResults.isEmpty()) { + return true; + } + if (isDone) { + return false; + } + run(); + } + } + + @Override + public TimestampedValue<OutputT> next() { + TimestampedValue<OutputT> result = pendingResults.get(0); + pendingResults.remove(0); + return result; + } + + @Override + public void remove() { + throw new UnsupportedOperationException(); + } + }; + } + + /** + * Return an iterator over the number of results per {@link #WINDOW_SIZE} period. The underlying + * simulator state is changed. Only one of {@link #results} or {@link #resultsPerWindow} can be + * called. + */ + public Iterator<Long> resultsPerWindow() { + return new Iterator<Long>() { + @Override + public boolean hasNext() { + while (true) { + if (!pendingCounts.isEmpty()) { + return true; + } + if (isDone) { + if (currentCount > 0) { + pendingCounts.add(currentCount); + currentCount = 0; + currentWindow = BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis(); + return true; + } else { + return false; + } + } + run(); + } + } + + @Override + public Long next() { + Long result = pendingCounts.get(0); + pendingCounts.remove(0); + return result; + } + + @Override + public void remove() { + throw new UnsupportedOperationException(); + } + }; + } +}
http://git-wip-us.apache.org/repos/asf/beam/blob/f4333df7/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/NexmarkQuery.java ---------------------------------------------------------------------- diff --git a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/NexmarkQuery.java b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/NexmarkQuery.java new file mode 100644 index 0000000..d070058 --- /dev/null +++ b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/NexmarkQuery.java @@ -0,0 +1,270 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.nexmark.queries; + +import org.apache.beam.sdk.metrics.Counter; +import org.apache.beam.sdk.metrics.Metrics; +import org.apache.beam.sdk.nexmark.Monitor; +import org.apache.beam.sdk.nexmark.NexmarkConfiguration; +import org.apache.beam.sdk.nexmark.NexmarkUtils; +import org.apache.beam.sdk.nexmark.model.Auction; +import org.apache.beam.sdk.nexmark.model.Bid; +import org.apache.beam.sdk.nexmark.model.Event; +import org.apache.beam.sdk.nexmark.model.KnownSize; +import org.apache.beam.sdk.nexmark.model.Person; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.Filter; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.SerializableFunction; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.TimestampedValue; +import org.apache.beam.sdk.values.TupleTag; +import org.joda.time.Instant; + +/** + * Base class for the eight 'NEXMark' queries. Supplies some fragments common to + * multiple queries. + */ +public abstract class NexmarkQuery + extends PTransform<PCollection<Event>, PCollection<TimestampedValue<KnownSize>>> { + public static final TupleTag<Auction> AUCTION_TAG = new TupleTag<>("auctions"); + public static final TupleTag<Bid> BID_TAG = new TupleTag<>("bids"); + static final TupleTag<Person> PERSON_TAG = new TupleTag<>("person"); + + /** Predicate to detect a new person event. */ + private static final SerializableFunction<Event, Boolean> IS_NEW_PERSON = + new SerializableFunction<Event, Boolean>() { + @Override + public Boolean apply(Event event) { + return event.newPerson != null; + } + }; + + /** DoFn to convert a new person event to a person. */ + private static final DoFn<Event, Person> AS_PERSON = new DoFn<Event, Person>() { + @ProcessElement + public void processElement(ProcessContext c) { + c.output(c.element().newPerson); + } + }; + + /** Predicate to detect a new auction event. */ + private static final SerializableFunction<Event, Boolean> IS_NEW_AUCTION = + new SerializableFunction<Event, Boolean>() { + @Override + public Boolean apply(Event event) { + return event.newAuction != null; + } + }; + + /** DoFn to convert a new auction event to an auction. */ + private static final DoFn<Event, Auction> AS_AUCTION = new DoFn<Event, Auction>() { + @ProcessElement + public void processElement(ProcessContext c) { + c.output(c.element().newAuction); + } + }; + + /** Predicate to detect a new bid event. */ + private static final SerializableFunction<Event, Boolean> IS_BID = + new SerializableFunction<Event, Boolean>() { + @Override + public Boolean apply(Event event) { + return event.bid != null; + } + }; + + /** DoFn to convert a bid event to a bid. */ + private static final DoFn<Event, Bid> AS_BID = new DoFn<Event, Bid>() { + @ProcessElement + public void processElement(ProcessContext c) { + c.output(c.element().bid); + } + }; + + /** Transform to key each person by their id. */ + static final ParDo.SingleOutput<Person, KV<Long, Person>> PERSON_BY_ID = + ParDo.of(new DoFn<Person, KV<Long, Person>>() { + @ProcessElement + public void processElement(ProcessContext c) { + c.output(KV.of(c.element().id, c.element())); + } + }); + + /** Transform to key each auction by its id. */ + static final ParDo.SingleOutput<Auction, KV<Long, Auction>> AUCTION_BY_ID = + ParDo.of(new DoFn<Auction, KV<Long, Auction>>() { + @ProcessElement + public void processElement(ProcessContext c) { + c.output(KV.of(c.element().id, c.element())); + } + }); + + /** Transform to key each auction by its seller id. */ + static final ParDo.SingleOutput<Auction, KV<Long, Auction>> AUCTION_BY_SELLER = + ParDo.of(new DoFn<Auction, KV<Long, Auction>>() { + @ProcessElement + public void processElement(ProcessContext c) { + c.output(KV.of(c.element().seller, c.element())); + } + }); + + /** Transform to key each bid by it's auction id. */ + static final ParDo.SingleOutput<Bid, KV<Long, Bid>> BID_BY_AUCTION = + ParDo.of(new DoFn<Bid, KV<Long, Bid>>() { + @ProcessElement + public void processElement(ProcessContext c) { + c.output(KV.of(c.element().auction, c.element())); + } + }); + + /** Transform to project the auction id from each bid. */ + static final ParDo.SingleOutput<Bid, Long> BID_TO_AUCTION = + ParDo.of(new DoFn<Bid, Long>() { + @ProcessElement + public void processElement(ProcessContext c) { + c.output(c.element().auction); + } + }); + + /** Transform to project the price from each bid. */ + static final ParDo.SingleOutput<Bid, Long> BID_TO_PRICE = + ParDo.of(new DoFn<Bid, Long>() { + @ProcessElement + public void processElement(ProcessContext c) { + c.output(c.element().price); + } + }); + + /** Transform to emit each event with the timestamp embedded within it. */ + public static final ParDo.SingleOutput<Event, Event> EVENT_TIMESTAMP_FROM_DATA = + ParDo.of(new DoFn<Event, Event>() { + @ProcessElement + public void processElement(ProcessContext c) { + Event e = c.element(); + if (e.bid != null) { + c.outputWithTimestamp(e, new Instant(e.bid.dateTime)); + } else if (e.newPerson != null) { + c.outputWithTimestamp(e, new Instant(e.newPerson.dateTime)); + } else if (e.newAuction != null) { + c.outputWithTimestamp(e, new Instant(e.newAuction.dateTime)); + } + } + }); + + /** + * Transform to filter for just the new auction events. + */ + public static final PTransform<PCollection<Event>, PCollection<Auction>> JUST_NEW_AUCTIONS = + new PTransform<PCollection<Event>, PCollection<Auction>>("justNewAuctions") { + @Override + public PCollection<Auction> expand(PCollection<Event> input) { + return input.apply("IsNewAuction", Filter.by(IS_NEW_AUCTION)) + .apply("AsAuction", ParDo.of(AS_AUCTION)); + } + }; + + /** + * Transform to filter for just the new person events. + */ + public static final PTransform<PCollection<Event>, PCollection<Person>> JUST_NEW_PERSONS = + new PTransform<PCollection<Event>, PCollection<Person>>("justNewPersons") { + @Override + public PCollection<Person> expand(PCollection<Event> input) { + return input.apply("IsNewPerson", Filter.by(IS_NEW_PERSON)) + .apply("AsPerson", ParDo.of(AS_PERSON)); + } + }; + + /** + * Transform to filter for just the bid events. + */ + public static final PTransform<PCollection<Event>, PCollection<Bid>> JUST_BIDS = + new PTransform<PCollection<Event>, PCollection<Bid>>("justBids") { + @Override + public PCollection<Bid> expand(PCollection<Event> input) { + return input.apply("IsBid", Filter.by(IS_BID)) + .apply("AsBid", ParDo.of(AS_BID)); + } + }; + + final NexmarkConfiguration configuration; + public final Monitor<Event> eventMonitor; + public final Monitor<KnownSize> resultMonitor; + private final Monitor<Event> endOfStreamMonitor; + private final Counter fatalCounter; + + NexmarkQuery(NexmarkConfiguration configuration, String name) { + super(name); + this.configuration = configuration; + if (configuration.debug) { + eventMonitor = new Monitor<>(name + ".Events", "event"); + resultMonitor = new Monitor<>(name + ".Results", "result"); + endOfStreamMonitor = new Monitor<>(name + ".EndOfStream", "end"); + fatalCounter = Metrics.counter(name , "fatal"); + } else { + eventMonitor = null; + resultMonitor = null; + endOfStreamMonitor = null; + fatalCounter = null; + } + } + + /** + * Implement the actual query. All we know about the result is it has a known encoded size. + */ + protected abstract PCollection<KnownSize> applyPrim(PCollection<Event> events); + + @Override + public PCollection<TimestampedValue<KnownSize>> expand(PCollection<Event> events) { + + if (configuration.debug) { + events = + events + // Monitor events as they go by. + .apply(name + ".Monitor", eventMonitor.getTransform()) + // Count each type of event. + .apply(name + ".Snoop", NexmarkUtils.snoop(name)); + } + + if (configuration.cpuDelayMs > 0) { + // Slow down by pegging one core at 100%. + events = events.apply(name + ".CpuDelay", + NexmarkUtils.<Event>cpuDelay(name, configuration.cpuDelayMs)); + } + + if (configuration.diskBusyBytes > 0) { + // Slow down by forcing bytes to durable store. + events = events.apply(name + ".DiskBusy", + NexmarkUtils.<Event>diskBusy(configuration.diskBusyBytes)); + } + + // Run the query. + PCollection<KnownSize> queryResults = applyPrim(events); + + if (configuration.debug) { + // Monitor results as they go by. + queryResults = queryResults.apply(name + ".Debug", resultMonitor.getTransform()); + } + + // Timestamp the query results. + return queryResults.apply(name + ".Stamp", NexmarkUtils.<KnownSize>stamp(name)); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/f4333df7/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/NexmarkQueryModel.java ---------------------------------------------------------------------- diff --git a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/NexmarkQueryModel.java b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/NexmarkQueryModel.java new file mode 100644 index 0000000..1f093a0 --- /dev/null +++ b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/NexmarkQueryModel.java @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.nexmark.queries; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Set; + +import org.apache.beam.sdk.nexmark.NexmarkConfiguration; +import org.apache.beam.sdk.nexmark.model.KnownSize; +import org.apache.beam.sdk.transforms.SerializableFunction; +import org.apache.beam.sdk.values.TimestampedValue; + + +import org.hamcrest.core.IsEqual; +import org.joda.time.Duration; +import org.joda.time.Instant; +import org.junit.Assert; + +/** + * Base class for models of the eight NEXMark queries. Provides an assertion function which can be + * applied against the actual query results to check their consistency with the model. + */ +public abstract class NexmarkQueryModel implements Serializable { + public final NexmarkConfiguration configuration; + + NexmarkQueryModel(NexmarkConfiguration configuration) { + this.configuration = configuration; + } + + /** + * Return the start of the most recent window of {@code size} and {@code period} which ends + * strictly before {@code timestamp}. + */ + static Instant windowStart(Duration size, Duration period, Instant timestamp) { + long ts = timestamp.getMillis(); + long p = period.getMillis(); + long lim = ts - ts % p; + long s = size.getMillis(); + return new Instant(lim - s); + } + + /** Convert {@code itr} to strings capturing values, timestamps and order. */ + static <T> List<String> toValueTimestampOrder(Iterator<TimestampedValue<T>> itr) { + List<String> strings = new ArrayList<>(); + while (itr.hasNext()) { + strings.add(itr.next().toString()); + } + return strings; + } + + /** Convert {@code itr} to strings capturing values and order. */ + static <T> List<String> toValueOrder(Iterator<TimestampedValue<T>> itr) { + List<String> strings = new ArrayList<>(); + while (itr.hasNext()) { + strings.add(itr.next().getValue().toString()); + } + return strings; + } + + /** Convert {@code itr} to strings capturing values only. */ + static <T> Set<String> toValue(Iterator<TimestampedValue<T>> itr) { + Set<String> strings = new HashSet<>(); + while (itr.hasNext()) { + strings.add(itr.next().getValue().toString()); + } + return strings; + } + + /** Return simulator for query. */ + public abstract AbstractSimulator<?, ?> simulator(); + + /** Return sub-sequence of results which are significant for model. */ + Iterable<TimestampedValue<KnownSize>> relevantResults( + Iterable<TimestampedValue<KnownSize>> results) { + return results; + } + + /** + * Convert iterator of elements to collection of strings to use when testing coherence of model + * against actual query results. + */ + protected abstract <T> Collection<String> toCollection(Iterator<TimestampedValue<T>> itr); + + /** Return assertion to use on results of pipeline for this query. */ + public SerializableFunction<Iterable<TimestampedValue<KnownSize>>, Void> assertionFor() { + final Collection<String> expectedStrings = toCollection(simulator().results()); + + return new SerializableFunction<Iterable<TimestampedValue<KnownSize>>, Void>() { + @Override + public Void apply(Iterable<TimestampedValue<KnownSize>> actual) { + Collection<String> actualStrings = toCollection(relevantResults(actual).iterator()); + Assert.assertThat("wrong pipeline output", actualStrings, + IsEqual.equalTo(expectedStrings)); + return null; + } + }; + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/f4333df7/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query0.java ---------------------------------------------------------------------- diff --git a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query0.java b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query0.java new file mode 100644 index 0000000..68bf78e --- /dev/null +++ b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query0.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.nexmark.queries; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.CoderException; +import org.apache.beam.sdk.metrics.Counter; +import org.apache.beam.sdk.metrics.Metrics; +import org.apache.beam.sdk.nexmark.NexmarkConfiguration; +import org.apache.beam.sdk.nexmark.NexmarkUtils; +import org.apache.beam.sdk.nexmark.model.Event; +import org.apache.beam.sdk.nexmark.model.KnownSize; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.values.PCollection; + +/** + * Query 0: Pass events through unchanged. However, force them to do a round trip through + * serialization so that we measure the impact of the choice of coders. + */ +public class Query0 extends NexmarkQuery { + public Query0(NexmarkConfiguration configuration) { + super(configuration, "Query0"); + } + + private PCollection<Event> applyTyped(PCollection<Event> events) { + final Coder<Event> coder = events.getCoder(); + return events + // Force round trip through coder. + .apply(name + ".Serialize", + ParDo.of(new DoFn<Event, Event>() { + private final Counter bytesMetric = + Metrics.counter(name , "bytes"); + + @ProcessElement + public void processElement(ProcessContext c) throws CoderException, IOException { + ByteArrayOutputStream outStream = new ByteArrayOutputStream(); + coder.encode(c.element(), outStream, Coder.Context.OUTER); + byte[] byteArray = outStream.toByteArray(); + bytesMetric.inc((long) byteArray.length); + ByteArrayInputStream inStream = new ByteArrayInputStream(byteArray); + Event event = coder.decode(inStream, Coder.Context.OUTER); + c.output(event); + } + })); + } + + @Override + protected PCollection<KnownSize> applyPrim(PCollection<Event> events) { + return NexmarkUtils.castToKnownSize(name, applyTyped(events)); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/f4333df7/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query0Model.java ---------------------------------------------------------------------- diff --git a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query0Model.java b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query0Model.java new file mode 100644 index 0000000..0e73a21 --- /dev/null +++ b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query0Model.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.nexmark.queries; + +import java.util.Collection; +import java.util.Iterator; + +import org.apache.beam.sdk.nexmark.NexmarkConfiguration; +import org.apache.beam.sdk.nexmark.NexmarkUtils; +import org.apache.beam.sdk.nexmark.model.Event; +import org.apache.beam.sdk.values.TimestampedValue; + +/** + * A direct implementation of {@link Query0}. + */ +public class Query0Model extends NexmarkQueryModel { + /** + * Simulator for query 0. + */ + private static class Simulator extends AbstractSimulator<Event, Event> { + public Simulator(NexmarkConfiguration configuration) { + super(NexmarkUtils.standardEventIterator(configuration)); + } + + @Override + protected void run() { + TimestampedValue<Event> timestampedEvent = nextInput(); + if (timestampedEvent == null) { + allDone(); + return; + } + addResult(timestampedEvent); + } + } + + public Query0Model(NexmarkConfiguration configuration) { + super(configuration); + } + + @Override + public AbstractSimulator<?, ?> simulator() { + return new Simulator(configuration); + } + + @Override + protected <T> Collection<String> toCollection(Iterator<TimestampedValue<T>> itr) { + return toValueTimestampOrder(itr); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/f4333df7/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query1.java ---------------------------------------------------------------------- diff --git a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query1.java b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query1.java new file mode 100644 index 0000000..810cd87 --- /dev/null +++ b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query1.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.nexmark.queries; + +import org.apache.beam.sdk.nexmark.NexmarkConfiguration; +import org.apache.beam.sdk.nexmark.NexmarkUtils; +import org.apache.beam.sdk.nexmark.model.Bid; +import org.apache.beam.sdk.nexmark.model.Event; +import org.apache.beam.sdk.nexmark.model.KnownSize; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.values.PCollection; + +/** + * Query 1, 'Currency Conversion'. Convert each bid value from dollars to euros. + * In CQL syntax: + * + * <pre> + * SELECT Istream(auction, DOLTOEUR(price), bidder, datetime) + * FROM bid [ROWS UNBOUNDED]; + * </pre> + * + * <p>To make things more interesting, allow the 'currency conversion' to be arbitrarily + * slowed down. + */ +public class Query1 extends NexmarkQuery { + public Query1(NexmarkConfiguration configuration) { + super(configuration, "Query1"); + } + + private PCollection<Bid> applyTyped(PCollection<Event> events) { + return events + // Only want the bid events. + .apply(JUST_BIDS) + + // Map the conversion function over all bids. + .apply(name + ".ToEuros", + ParDo.of(new DoFn<Bid, Bid>() { + @ProcessElement + public void processElement(ProcessContext c) { + Bid bid = c.element(); + c.output(new Bid( + bid.auction, bid.bidder, (bid.price * 89) / 100, bid.dateTime, bid.extra)); + } + })); + } + + @Override + protected PCollection<KnownSize> applyPrim(PCollection<Event> events) { + return NexmarkUtils.castToKnownSize(name, applyTyped(events)); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/f4333df7/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query10.java ---------------------------------------------------------------------- diff --git a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query10.java b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query10.java new file mode 100644 index 0000000..1c4e443 --- /dev/null +++ b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query10.java @@ -0,0 +1,367 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.nexmark.queries; + +import java.io.IOException; +import java.io.OutputStream; +import java.io.Serializable; +import java.nio.channels.Channels; +import java.nio.channels.WritableByteChannel; +import java.util.concurrent.ThreadLocalRandom; +import javax.annotation.Nullable; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.extensions.gcp.options.GcsOptions; +import org.apache.beam.sdk.metrics.Counter; +import org.apache.beam.sdk.metrics.Metrics; +import org.apache.beam.sdk.nexmark.NexmarkConfiguration; +import org.apache.beam.sdk.nexmark.NexmarkUtils; +import org.apache.beam.sdk.nexmark.model.Done; +import org.apache.beam.sdk.nexmark.model.Event; +import org.apache.beam.sdk.nexmark.model.KnownSize; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.GroupByKey; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.windowing.AfterEach; +import org.apache.beam.sdk.transforms.windowing.AfterFirst; +import org.apache.beam.sdk.transforms.windowing.AfterPane; +import org.apache.beam.sdk.transforms.windowing.AfterProcessingTime; +import org.apache.beam.sdk.transforms.windowing.AfterWatermark; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.transforms.windowing.FixedWindows; +import org.apache.beam.sdk.transforms.windowing.PaneInfo; +import org.apache.beam.sdk.transforms.windowing.PaneInfo.Timing; +import org.apache.beam.sdk.transforms.windowing.Repeatedly; +import org.apache.beam.sdk.transforms.windowing.Window; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; +import org.joda.time.Duration; +import org.joda.time.Instant; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Query "10", 'Log to sharded files' (Not in original suite.) + * + * <p>Every windowSizeSec, save all events from the last period into 2*maxWorkers log files. + */ +public class Query10 extends NexmarkQuery { + private static final Logger LOG = LoggerFactory.getLogger(Query10.class); + private static final int CHANNEL_BUFFER = 8 << 20; // 8MB + private static final int NUM_SHARDS_PER_WORKER = 5; + private static final Duration LATE_BATCHING_PERIOD = Duration.standardSeconds(10); + + /** + * Capture everything we need to know about the records in a single output file. + */ + private static class OutputFile implements Serializable { + /** Maximum possible timestamp of records in file. */ + private final Instant maxTimestamp; + /** Shard within window. */ + private final String shard; + /** Index of file in all files in shard. */ + private final long index; + /** Timing of records in this file. */ + private final PaneInfo.Timing timing; + /** Path to file containing records, or {@literal null} if no output required. */ + @Nullable + private final String filename; + + public OutputFile( + Instant maxTimestamp, + String shard, + long index, + PaneInfo.Timing timing, + @Nullable String filename) { + this.maxTimestamp = maxTimestamp; + this.shard = shard; + this.index = index; + this.timing = timing; + this.filename = filename; + } + + @Override + public String toString() { + return String.format("%s %s %d %s %s%n", maxTimestamp, shard, index, timing, filename); + } + } + + /** + * GCS uri prefix for all log and 'finished' files. If null they won't be written. + */ + @Nullable + private String outputPath; + + /** + * Maximum number of workers, used to determine log sharding factor. + */ + private int maxNumWorkers; + + public Query10(NexmarkConfiguration configuration) { + super(configuration, "Query10"); + } + + public void setOutputPath(@Nullable String outputPath) { + this.outputPath = outputPath; + } + + public void setMaxNumWorkers(int maxNumWorkers) { + this.maxNumWorkers = maxNumWorkers; + } + + /** + * Return channel for writing bytes to GCS. + */ + private WritableByteChannel openWritableGcsFile(GcsOptions options, String filename) + throws IOException { + //TODO + // Fix after PR: right now this is a specific Google added use case + // Discuss it on ML: shall we keep GCS or use HDFS or use a generic beam filesystem way. + throw new UnsupportedOperationException("Disabled after removal of GcsIOChannelFactory"); + } + + /** Return a short string to describe {@code timing}. */ + private String timingToString(PaneInfo.Timing timing) { + switch (timing) { + case EARLY: + return "E"; + case ON_TIME: + return "O"; + case LATE: + return "L"; + } + throw new RuntimeException(); // cases are exhaustive + } + + /** Construct an {@link OutputFile} for {@code pane} in {@code window} for {@code shard}. */ + private OutputFile outputFileFor(BoundedWindow window, String shard, PaneInfo pane) { + @Nullable String filename = + outputPath == null + ? null + : String.format("%s/LOG-%s-%s-%03d-%s-%x", + outputPath, window.maxTimestamp(), shard, pane.getIndex(), + timingToString(pane.getTiming()), + ThreadLocalRandom.current().nextLong()); + return new OutputFile(window.maxTimestamp(), shard, pane.getIndex(), + pane.getTiming(), filename); + } + + /** + * Return path to which we should write the index for {@code window}, or {@literal null} + * if no output required. + */ + @Nullable + private String indexPathFor(BoundedWindow window) { + if (outputPath == null) { + return null; + } + return String.format("%s/INDEX-%s", outputPath, window.maxTimestamp()); + } + + private PCollection<Done> applyTyped(PCollection<Event> events) { + final int numLogShards = maxNumWorkers * NUM_SHARDS_PER_WORKER; + + return events + .apply(name + ".ShardEvents", + ParDo.of(new DoFn<Event, KV<String, Event>>() { + private final Counter lateCounter = Metrics.counter(name , "actuallyLateEvent"); + private final Counter onTimeCounter = Metrics.counter(name , "onTimeCounter"); + + @ProcessElement + public void processElement(ProcessContext c) { + if (c.element().hasAnnotation("LATE")) { + lateCounter.inc(); + LOG.info("Observed late: %s", c.element()); + } else { + onTimeCounter.inc(); + } + int shardNum = (int) Math.abs((long) c.element().hashCode() % numLogShards); + String shard = String.format("shard-%05d-of-%05d", shardNum, numLogShards); + c.output(KV.of(shard, c.element())); + } + })) + .apply(name + ".WindowEvents", + Window.<KV<String, Event>>into( + FixedWindows.of(Duration.standardSeconds(configuration.windowSizeSec))) + .triggering(AfterEach.inOrder( + Repeatedly + .forever(AfterPane.elementCountAtLeast(configuration.maxLogEvents)) + .orFinally(AfterWatermark.pastEndOfWindow()), + Repeatedly.forever( + AfterFirst.of(AfterPane.elementCountAtLeast(configuration.maxLogEvents), + AfterProcessingTime.pastFirstElementInPane() + .plusDelayOf(LATE_BATCHING_PERIOD))))) + .discardingFiredPanes() + // Use a 1 day allowed lateness so that any forgotten hold will stall the + // pipeline for that period and be very noticeable. + .withAllowedLateness(Duration.standardDays(1))) + .apply(name + ".GroupByKey", GroupByKey.<String, Event>create()) + .apply(name + ".CheckForLateEvents", + ParDo.of(new DoFn<KV<String, Iterable<Event>>, + KV<String, Iterable<Event>>>() { + private final Counter earlyCounter = Metrics.counter(name , "earlyShard"); + private final Counter onTimeCounter = Metrics.counter(name , "onTimeShard"); + private final Counter lateCounter = Metrics.counter(name , "lateShard"); + private final Counter unexpectedLatePaneCounter = + Metrics.counter(name , "ERROR_unexpectedLatePane"); + private final Counter unexpectedOnTimeElementCounter = + Metrics.counter(name , "ERROR_unexpectedOnTimeElement"); + + @ProcessElement + public void processElement(ProcessContext c, BoundedWindow window) { + int numLate = 0; + int numOnTime = 0; + for (Event event : c.element().getValue()) { + if (event.hasAnnotation("LATE")) { + numLate++; + } else { + numOnTime++; + } + } + String shard = c.element().getKey(); + LOG.info(String.format( + "%s with timestamp %s has %d actually late and %d on-time " + + "elements in pane %s for window %s", + shard, c.timestamp(), numLate, numOnTime, c.pane(), + window.maxTimestamp())); + if (c.pane().getTiming() == PaneInfo.Timing.LATE) { + if (numLate == 0) { + LOG.error( + "ERROR! No late events in late pane for %s", shard); + unexpectedLatePaneCounter.inc(); + } + if (numOnTime > 0) { + LOG.error( + "ERROR! Have %d on-time events in late pane for %s", + numOnTime, shard); + unexpectedOnTimeElementCounter.inc(); + } + lateCounter.inc(); + } else if (c.pane().getTiming() == PaneInfo.Timing.EARLY) { + if (numOnTime + numLate < configuration.maxLogEvents) { + LOG.error( + "ERROR! Only have %d events in early pane for %s", + numOnTime + numLate, shard); + } + earlyCounter.inc(); + } else { + onTimeCounter.inc(); + } + c.output(c.element()); + } + })) + .apply(name + ".UploadEvents", + ParDo.of(new DoFn<KV<String, Iterable<Event>>, + KV<Void, OutputFile>>() { + private final Counter savedFileCounter = Metrics.counter(name , "savedFile"); + private final Counter writtenRecordsCounter = Metrics.counter(name , "writtenRecords"); + + @ProcessElement + public void processElement(ProcessContext c, BoundedWindow window) + throws IOException { + String shard = c.element().getKey(); + GcsOptions options = c.getPipelineOptions().as(GcsOptions.class); + OutputFile outputFile = outputFileFor(window, shard, c.pane()); + LOG.info(String.format( + "Writing %s with record timestamp %s, window timestamp %s, pane %s", + shard, c.timestamp(), window.maxTimestamp(), c.pane())); + if (outputFile.filename != null) { + LOG.info("Beginning write to '%s'", outputFile.filename); + int n = 0; + try (OutputStream output = + Channels.newOutputStream(openWritableGcsFile(options, outputFile + .filename))) { + for (Event event : c.element().getValue()) { + Event.CODER.encode(event, output, Coder.Context.OUTER); + writtenRecordsCounter.inc(); + if (++n % 10000 == 0) { + LOG.info("So far written %d records to '%s'", n, + outputFile.filename); + } + } + } + LOG.info("Written all %d records to '%s'", n, outputFile.filename); + } + savedFileCounter.inc(); + c.output(KV.<Void, OutputFile>of(null, outputFile)); + } + })) + // Clear fancy triggering from above. + .apply(name + ".WindowLogFiles", Window.<KV<Void, OutputFile>>into( + FixedWindows.of(Duration.standardSeconds(configuration.windowSizeSec))) + .triggering(AfterWatermark.pastEndOfWindow()) + // We expect no late data here, but we'll assume the worst so we can detect any. + .withAllowedLateness(Duration.standardDays(1)) + .discardingFiredPanes()) + // this GroupByKey allows to have one file per window + .apply(name + ".GroupByKey2", GroupByKey.<Void, OutputFile>create()) + .apply(name + ".Index", + ParDo.of(new DoFn<KV<Void, Iterable<OutputFile>>, Done>() { + private final Counter unexpectedLateCounter = + Metrics.counter(name , "ERROR_unexpectedLate"); + private final Counter unexpectedEarlyCounter = + Metrics.counter(name , "ERROR_unexpectedEarly"); + private final Counter unexpectedIndexCounter = + Metrics.counter(name , "ERROR_unexpectedIndex"); + private final Counter finalizedCounter = Metrics.counter(name , "indexed"); + + @ProcessElement + public void processElement(ProcessContext c, BoundedWindow window) + throws IOException { + if (c.pane().getTiming() == Timing.LATE) { + unexpectedLateCounter.inc(); + LOG.error("ERROR! Unexpected LATE pane: %s", c.pane()); + } else if (c.pane().getTiming() == Timing.EARLY) { + unexpectedEarlyCounter.inc(); + LOG.error("ERROR! Unexpected EARLY pane: %s", c.pane()); + } else if (c.pane().getTiming() == Timing.ON_TIME + && c.pane().getIndex() != 0) { + unexpectedIndexCounter.inc(); + LOG.error("ERROR! Unexpected ON_TIME pane index: %s", c.pane()); + } else { + GcsOptions options = c.getPipelineOptions().as(GcsOptions.class); + LOG.info( + "Index with record timestamp %s, window timestamp %s, pane %s", + c.timestamp(), window.maxTimestamp(), c.pane()); + + @Nullable String filename = indexPathFor(window); + if (filename != null) { + LOG.info("Beginning write to '%s'", filename); + int n = 0; + try (OutputStream output = + Channels.newOutputStream( + openWritableGcsFile(options, filename))) { + for (OutputFile outputFile : c.element().getValue()) { + output.write(outputFile.toString().getBytes("UTF-8")); + n++; + } + } + LOG.info("Written all %d lines to '%s'", n, filename); + } + c.output( + new Done("written for timestamp " + window.maxTimestamp())); + finalizedCounter.inc(); + } + } + })); + } + + @Override + protected PCollection<KnownSize> applyPrim(PCollection<Event> events) { + return NexmarkUtils.castToKnownSize(name, applyTyped(events)); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/f4333df7/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query11.java ---------------------------------------------------------------------- diff --git a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query11.java b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query11.java new file mode 100644 index 0000000..47e7c00 --- /dev/null +++ b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query11.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.nexmark.queries; + +import org.apache.beam.sdk.nexmark.NexmarkConfiguration; +import org.apache.beam.sdk.nexmark.NexmarkUtils; +import org.apache.beam.sdk.nexmark.model.Bid; +import org.apache.beam.sdk.nexmark.model.BidsPerSession; +import org.apache.beam.sdk.nexmark.model.Event; +import org.apache.beam.sdk.nexmark.model.KnownSize; +import org.apache.beam.sdk.transforms.Count; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.windowing.AfterPane; +import org.apache.beam.sdk.transforms.windowing.Repeatedly; +import org.apache.beam.sdk.transforms.windowing.Sessions; +import org.apache.beam.sdk.transforms.windowing.Window; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; +import org.joda.time.Duration; + +/** + * Query "11", 'User sessions' (Not in original suite.) + * + * <p>Group bids by the same user into sessions with {@code windowSizeSec} max gap. + * However limit the session to at most {@code maxLogEvents}. Emit the number of + * bids per session. + */ +public class Query11 extends NexmarkQuery { + public Query11(NexmarkConfiguration configuration) { + super(configuration, "Query11"); + } + + private PCollection<BidsPerSession> applyTyped(PCollection<Event> events) { + PCollection<Long> bidders = events.apply(JUST_BIDS).apply(name + ".Rekey", + ParDo.of(new DoFn<Bid, Long>() { + + @ProcessElement public void processElement(ProcessContext c) { + Bid bid = c.element(); + c.output(bid.bidder); + } + })); + + PCollection<Long> biddersWindowed = bidders.apply( + Window.<Long>into( + Sessions.withGapDuration(Duration.standardSeconds(configuration.windowSizeSec))) + .triggering( + Repeatedly.forever(AfterPane.elementCountAtLeast(configuration.maxLogEvents))) + .discardingFiredPanes() + .withAllowedLateness(Duration.standardSeconds(configuration.occasionalDelaySec / 2))); + return biddersWindowed.apply(Count.<Long>perElement()) + .apply(name + ".ToResult", ParDo.of(new DoFn<KV<Long, Long>, BidsPerSession>() { + + @ProcessElement public void processElement(ProcessContext c) { + c.output(new BidsPerSession(c.element().getKey(), c.element().getValue())); + } + })); + } + + @Override + protected PCollection<KnownSize> applyPrim(PCollection<Event> events) { + return NexmarkUtils.castToKnownSize(name, applyTyped(events)); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/f4333df7/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query12.java ---------------------------------------------------------------------- diff --git a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query12.java b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query12.java new file mode 100644 index 0000000..0f4b232 --- /dev/null +++ b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query12.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.nexmark.queries; + +import org.apache.beam.sdk.nexmark.NexmarkConfiguration; +import org.apache.beam.sdk.nexmark.NexmarkUtils; +import org.apache.beam.sdk.nexmark.model.Bid; +import org.apache.beam.sdk.nexmark.model.BidsPerSession; +import org.apache.beam.sdk.nexmark.model.Event; +import org.apache.beam.sdk.nexmark.model.KnownSize; +import org.apache.beam.sdk.transforms.Count; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.windowing.AfterProcessingTime; +import org.apache.beam.sdk.transforms.windowing.GlobalWindows; +import org.apache.beam.sdk.transforms.windowing.Repeatedly; +import org.apache.beam.sdk.transforms.windowing.Window; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; +import org.joda.time.Duration; + +/** + * Query "12", 'Processing time windows' (Not in original suite.) + * + * <p>Group bids by the same user into processing time windows of windowSize. Emit the count + * of bids per window. + */ +public class Query12 extends NexmarkQuery { + public Query12(NexmarkConfiguration configuration) { + super(configuration, "Query12"); + } + + private PCollection<BidsPerSession> applyTyped(PCollection<Event> events) { + return events + .apply(JUST_BIDS) + .apply(ParDo.of(new DoFn<Bid, Long>() { + @ProcessElement + public void processElement(ProcessContext c){ + c.output(c.element().bidder); + } + })) + .apply(Window.<Long>into(new GlobalWindows()) + .triggering( + Repeatedly.forever( + AfterProcessingTime.pastFirstElementInPane() + .plusDelayOf( + Duration.standardSeconds(configuration.windowSizeSec)))) + .discardingFiredPanes() + .withAllowedLateness(Duration.ZERO)) + .apply(Count.<Long>perElement()) + .apply(name + ".ToResult", + ParDo.of(new DoFn<KV<Long, Long>, BidsPerSession>() { + @ProcessElement + public void processElement(ProcessContext c) { + c.output( + new BidsPerSession(c.element().getKey(), c.element().getValue())); + } + })); + } + + @Override + protected PCollection<KnownSize> applyPrim(PCollection<Event> events) { + return NexmarkUtils.castToKnownSize(name, applyTyped(events)); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/f4333df7/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query1Model.java ---------------------------------------------------------------------- diff --git a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query1Model.java b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query1Model.java new file mode 100644 index 0000000..76c182a --- /dev/null +++ b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query1Model.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.nexmark.queries; + +import java.io.Serializable; +import java.util.Collection; +import java.util.Iterator; + +import org.apache.beam.sdk.nexmark.NexmarkConfiguration; +import org.apache.beam.sdk.nexmark.NexmarkUtils; +import org.apache.beam.sdk.nexmark.model.Bid; +import org.apache.beam.sdk.nexmark.model.Event; +import org.apache.beam.sdk.values.TimestampedValue; + +/** + * A direct implementation of {@link Query1}. + */ +public class Query1Model extends NexmarkQueryModel implements Serializable { + /** + * Simulator for query 1. + */ + private static class Simulator extends AbstractSimulator<Event, Bid> { + public Simulator(NexmarkConfiguration configuration) { + super(NexmarkUtils.standardEventIterator(configuration)); + } + + @Override + protected void run() { + TimestampedValue<Event> timestampedEvent = nextInput(); + if (timestampedEvent == null) { + allDone(); + return; + } + Event event = timestampedEvent.getValue(); + if (event.bid == null) { + // Ignore non-bid events. + return; + } + Bid bid = event.bid; + Bid resultBid = + new Bid(bid.auction, bid.bidder, bid.price * 89 / 100, bid.dateTime, bid.extra); + TimestampedValue<Bid> result = + TimestampedValue.of(resultBid, timestampedEvent.getTimestamp()); + addResult(result); + } + } + + public Query1Model(NexmarkConfiguration configuration) { + super(configuration); + } + + @Override + public AbstractSimulator<?, ?> simulator() { + return new Simulator(configuration); + } + + @Override + protected <T> Collection<String> toCollection(Iterator<TimestampedValue<T>> itr) { + return toValueTimestampOrder(itr); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/f4333df7/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query2.java ---------------------------------------------------------------------- diff --git a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query2.java b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query2.java new file mode 100644 index 0000000..c5ab992 --- /dev/null +++ b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query2.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.nexmark.queries; + +import org.apache.beam.sdk.nexmark.NexmarkConfiguration; +import org.apache.beam.sdk.nexmark.NexmarkUtils; +import org.apache.beam.sdk.nexmark.model.AuctionPrice; +import org.apache.beam.sdk.nexmark.model.Bid; +import org.apache.beam.sdk.nexmark.model.Event; +import org.apache.beam.sdk.nexmark.model.KnownSize; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.Filter; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.SerializableFunction; +import org.apache.beam.sdk.values.PCollection; + +/** + * Query 2, 'Filtering. Find bids with specific auction ids and show their bid price. + * In CQL syntax: + * + * <pre> + * SELECT Rstream(auction, price) + * FROM Bid [NOW] + * WHERE auction = 1007 OR auction = 1020 OR auction = 2001 OR auction = 2019 OR auction = 2087; + * </pre> + * + * <p>As written that query will only yield a few hundred results over event streams of + * arbitrary size. To make it more interesting we instead choose bids for every + * {@code auctionSkip}'th auction. + */ +public class Query2 extends NexmarkQuery { + public Query2(NexmarkConfiguration configuration) { + super(configuration, "Query2"); + } + + private PCollection<AuctionPrice> applyTyped(PCollection<Event> events) { + return events + // Only want the bid events. + .apply(JUST_BIDS) + + // Select just the bids for the auctions we care about. + .apply(Filter.by(new SerializableFunction<Bid, Boolean>() { + @Override + public Boolean apply(Bid bid) { + return bid.auction % configuration.auctionSkip == 0; + } + })) + + // Project just auction id and price. + .apply(name + ".Project", + ParDo.of(new DoFn<Bid, AuctionPrice>() { + @ProcessElement + public void processElement(ProcessContext c) { + Bid bid = c.element(); + c.output(new AuctionPrice(bid.auction, bid.price)); + } + })); + } + + @Override + protected PCollection<KnownSize> applyPrim(PCollection<Event> events) { + return NexmarkUtils.castToKnownSize(name, applyTyped(events)); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/f4333df7/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query2Model.java ---------------------------------------------------------------------- diff --git a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query2Model.java b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query2Model.java new file mode 100644 index 0000000..33a1f8d --- /dev/null +++ b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query2Model.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.nexmark.queries; + +import java.io.Serializable; +import java.util.Collection; +import java.util.Iterator; + +import org.apache.beam.sdk.nexmark.NexmarkConfiguration; +import org.apache.beam.sdk.nexmark.NexmarkUtils; +import org.apache.beam.sdk.nexmark.model.AuctionPrice; +import org.apache.beam.sdk.nexmark.model.Bid; +import org.apache.beam.sdk.nexmark.model.Event; +import org.apache.beam.sdk.values.TimestampedValue; + +/** + * A direct implementation of {@link Query2}. + */ +public class Query2Model extends NexmarkQueryModel implements Serializable { + /** + * Simulator for query 2. + */ + private class Simulator extends AbstractSimulator<Event, AuctionPrice> { + public Simulator(NexmarkConfiguration configuration) { + super(NexmarkUtils.standardEventIterator(configuration)); + } + + @Override + protected void run() { + TimestampedValue<Event> timestampedEvent = nextInput(); + if (timestampedEvent == null) { + allDone(); + return; + } + Event event = timestampedEvent.getValue(); + if (event.bid == null) { + // Ignore non bid events. + return; + } + Bid bid = event.bid; + if (bid.auction % configuration.auctionSkip != 0) { + // Ignore bids for auctions we don't care about. + return; + } + AuctionPrice auctionPrice = new AuctionPrice(bid.auction, bid.price); + TimestampedValue<AuctionPrice> result = + TimestampedValue.of(auctionPrice, timestampedEvent.getTimestamp()); + addResult(result); + } + } + + public Query2Model(NexmarkConfiguration configuration) { + super(configuration); + } + + @Override + public AbstractSimulator<?, ?> simulator() { + return new Simulator(configuration); + } + + @Override + protected <T> Collection<String> toCollection(Iterator<TimestampedValue<T>> itr) { + return toValueTimestampOrder(itr); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/f4333df7/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query3.java ---------------------------------------------------------------------- diff --git a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query3.java b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query3.java new file mode 100644 index 0000000..6f8d72d --- /dev/null +++ b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query3.java @@ -0,0 +1,301 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.nexmark.queries; + +import java.util.ArrayList; +import java.util.List; +import org.apache.beam.sdk.coders.ListCoder; +import org.apache.beam.sdk.metrics.Counter; +import org.apache.beam.sdk.metrics.Metrics; +import org.apache.beam.sdk.nexmark.NexmarkConfiguration; +import org.apache.beam.sdk.nexmark.NexmarkUtils; +import org.apache.beam.sdk.nexmark.model.Auction; +import org.apache.beam.sdk.nexmark.model.Event; +import org.apache.beam.sdk.nexmark.model.KnownSize; +import org.apache.beam.sdk.nexmark.model.NameCityStateId; +import org.apache.beam.sdk.nexmark.model.Person; +import org.apache.beam.sdk.state.StateSpec; +import org.apache.beam.sdk.state.StateSpecs; +import org.apache.beam.sdk.state.TimeDomain; +import org.apache.beam.sdk.state.Timer; +import org.apache.beam.sdk.state.TimerSpec; +import org.apache.beam.sdk.state.TimerSpecs; +import org.apache.beam.sdk.state.ValueState; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.Filter; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.SerializableFunction; +import org.apache.beam.sdk.transforms.join.CoGbkResult; +import org.apache.beam.sdk.transforms.join.CoGroupByKey; +import org.apache.beam.sdk.transforms.join.KeyedPCollectionTuple; +import org.apache.beam.sdk.transforms.windowing.AfterPane; +import org.apache.beam.sdk.transforms.windowing.GlobalWindows; +import org.apache.beam.sdk.transforms.windowing.Repeatedly; +import org.apache.beam.sdk.transforms.windowing.Window; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; +import org.joda.time.Duration; +import org.joda.time.Instant; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Query 3, 'Local Item Suggestion'. Who is selling in OR, ID or CA in category 10, and for what + * auction ids? In CQL syntax: + * + * <pre> + * SELECT Istream(P.name, P.city, P.state, A.id) + * FROM Auction A [ROWS UNBOUNDED], Person P [ROWS UNBOUNDED] + * WHERE A.seller = P.id AND (P.state = `OR' OR P.state = `ID' OR P.state = `CA') AND A.category + * = 10; + * </pre> + * + * <p>We'll implement this query to allow 'new auction' events to come before the 'new person' + * events for the auction seller. Those auctions will be stored until the matching person is seen. + * Then all subsequent auctions for a person will use the stored person record. + * + * <p>A real system would use an external system to maintain the id-to-person association. + */ +public class Query3 extends NexmarkQuery { + + private static final Logger LOG = LoggerFactory.getLogger(Query3.class); + private final JoinDoFn joinDoFn; + + public Query3(NexmarkConfiguration configuration) { + super(configuration, "Query3"); + joinDoFn = new JoinDoFn(name, configuration.maxAuctionsWaitingTime); + } + + private PCollection<NameCityStateId> applyTyped(PCollection<Event> events) { + int numEventsInPane = 30; + + PCollection<Event> eventsWindowed = + events.apply( + Window.<Event>into(new GlobalWindows()) + .triggering(Repeatedly.forever((AfterPane.elementCountAtLeast(numEventsInPane)))) + .discardingFiredPanes() + .withAllowedLateness(Duration.ZERO)); + PCollection<KV<Long, Auction>> auctionsBySellerId = + eventsWindowed + // Only want the new auction events. + .apply(JUST_NEW_AUCTIONS) + + // We only want auctions in category 10. + .apply( + name + ".InCategory", + Filter.by( + new SerializableFunction<Auction, Boolean>() { + + @Override + public Boolean apply(Auction auction) { + return auction.category == 10; + } + })) + + // Key auctions by their seller id. + .apply("AuctionBySeller", AUCTION_BY_SELLER); + + PCollection<KV<Long, Person>> personsById = + eventsWindowed + // Only want the new people events. + .apply(JUST_NEW_PERSONS) + + // We only want people in OR, ID, CA. + .apply( + name + ".InState", + Filter.by( + new SerializableFunction<Person, Boolean>() { + + @Override + public Boolean apply(Person person) { + return person.state.equals("OR") + || person.state.equals("ID") + || person.state.equals("CA"); + } + })) + + // Key people by their id. + .apply("PersonById", PERSON_BY_ID); + + return + // Join auctions and people. + // concatenate KeyedPCollections + KeyedPCollectionTuple.of(AUCTION_TAG, auctionsBySellerId) + .and(PERSON_TAG, personsById) + // group auctions and persons by personId + .apply(CoGroupByKey.<Long>create()) + .apply(name + ".Join", ParDo.of(joinDoFn)) + + // Project what we want. + .apply( + name + ".Project", + ParDo.of( + new DoFn<KV<Auction, Person>, NameCityStateId>() { + + @ProcessElement + public void processElement(ProcessContext c) { + Auction auction = c.element().getKey(); + Person person = c.element().getValue(); + c.output( + new NameCityStateId(person.name, person.city, person.state, auction.id)); + } + })); + } + + @Override + protected PCollection<KnownSize> applyPrim(PCollection<Event> events) { + return NexmarkUtils.castToKnownSize(name, applyTyped(events)); + } + + /** + * Join {@code auctions} and {@code people} by person id and emit their cross-product one pair at + * a time. + * + * <p>We know a person may submit any number of auctions. Thus new person event must have the + * person record stored in persistent state in order to match future auctions by that person. + * + * <p>However we know that each auction is associated with at most one person, so only need to + * store auction records in persistent state until we have seen the corresponding person record. + * And of course may have already seen that record. + */ + private static class JoinDoFn extends DoFn<KV<Long, CoGbkResult>, KV<Auction, Person>> { + + private final int maxAuctionsWaitingTime; + private static final String AUCTIONS = "auctions"; + private static final String PERSON = "person"; + + @StateId(PERSON) + private static final StateSpec<ValueState<Person>> personSpec = + StateSpecs.value(Person.CODER); + + private static final String PERSON_STATE_EXPIRING = "personStateExpiring"; + + @StateId(AUCTIONS) + private final StateSpec<ValueState<List<Auction>>> auctionsSpec = + StateSpecs.value(ListCoder.of(Auction.CODER)); + + @TimerId(PERSON_STATE_EXPIRING) + private final TimerSpec timerSpec = TimerSpecs.timer(TimeDomain.EVENT_TIME); + + // Used to refer the metrics namespace + private final String name; + + private final Counter newAuctionCounter; + private final Counter newPersonCounter; + private final Counter newNewOutputCounter; + private final Counter newOldOutputCounter; + private final Counter oldNewOutputCounter; + private final Counter fatalCounter; + + private JoinDoFn(String name, int maxAuctionsWaitingTime) { + this.name = name; + this.maxAuctionsWaitingTime = maxAuctionsWaitingTime; + newAuctionCounter = Metrics.counter(name, "newAuction"); + newPersonCounter = Metrics.counter(name, "newPerson"); + newNewOutputCounter = Metrics.counter(name, "newNewOutput"); + newOldOutputCounter = Metrics.counter(name, "newOldOutput"); + oldNewOutputCounter = Metrics.counter(name, "oldNewOutput"); + fatalCounter = Metrics.counter(name , "fatal"); + } + + @ProcessElement + public void processElement( + ProcessContext c, + @TimerId(PERSON_STATE_EXPIRING) Timer timer, + @StateId(PERSON) ValueState<Person> personState, + @StateId(AUCTIONS) ValueState<List<Auction>> auctionsState) { + // We would *almost* implement this by rewindowing into the global window and + // running a combiner over the result. The combiner's accumulator would be the + // state we use below. However, combiners cannot emit intermediate results, thus + // we need to wait for the pending ReduceFn API. + + Person existingPerson = personState.read(); + if (existingPerson != null) { + // We've already seen the new person event for this person id. + // We can join with any new auctions on-the-fly without needing any + // additional persistent state. + for (Auction newAuction : c.element().getValue().getAll(AUCTION_TAG)) { + newAuctionCounter.inc(); + newOldOutputCounter.inc(); + c.output(KV.of(newAuction, existingPerson)); + } + return; + } + + Person theNewPerson = null; + for (Person newPerson : c.element().getValue().getAll(PERSON_TAG)) { + if (theNewPerson == null) { + theNewPerson = newPerson; + } else { + if (theNewPerson.equals(newPerson)) { + LOG.error("Duplicate person {}", theNewPerson); + } else { + LOG.error("Conflicting persons {} and {}", theNewPerson, newPerson); + } + fatalCounter.inc(); + continue; + } + newPersonCounter.inc(); + // We've now seen the person for this person id so can flush any + // pending auctions for the same seller id (an auction is done by only one seller). + List<Auction> pendingAuctions = auctionsState.read(); + if (pendingAuctions != null) { + for (Auction pendingAuction : pendingAuctions) { + oldNewOutputCounter.inc(); + c.output(KV.of(pendingAuction, newPerson)); + } + auctionsState.clear(); + } + // Also deal with any new auctions. + for (Auction newAuction : c.element().getValue().getAll(AUCTION_TAG)) { + newAuctionCounter.inc(); + newNewOutputCounter.inc(); + c.output(KV.of(newAuction, newPerson)); + } + // Remember this person for any future auctions. + personState.write(newPerson); + //set a time out to clear this state + Instant firingTime = new Instant(newPerson.dateTime) + .plus(Duration.standardSeconds(maxAuctionsWaitingTime)); + timer.set(firingTime); + } + if (theNewPerson != null) { + return; + } + + // We'll need to remember the auctions until we see the corresponding + // new person event. + List<Auction> pendingAuctions = auctionsState.read(); + if (pendingAuctions == null) { + pendingAuctions = new ArrayList<>(); + } + for (Auction newAuction : c.element().getValue().getAll(AUCTION_TAG)) { + newAuctionCounter.inc(); + pendingAuctions.add(newAuction); + } + auctionsState.write(pendingAuctions); + } + + @OnTimer(PERSON_STATE_EXPIRING) + public void onTimerCallback( + OnTimerContext context, + @StateId(PERSON) ValueState<Person> personState) { + personState.clear(); + } + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/f4333df7/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query3Model.java ---------------------------------------------------------------------- diff --git a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query3Model.java b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query3Model.java new file mode 100644 index 0000000..94f24cb --- /dev/null +++ b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query3Model.java @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.nexmark.queries; + +import com.google.common.collect.ArrayListMultimap; +import com.google.common.collect.Multimap; + +import java.io.Serializable; +import java.util.Collection; +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; + +import org.apache.beam.sdk.nexmark.NexmarkConfiguration; +import org.apache.beam.sdk.nexmark.NexmarkUtils; +import org.apache.beam.sdk.nexmark.model.Auction; +import org.apache.beam.sdk.nexmark.model.Event; +import org.apache.beam.sdk.nexmark.model.NameCityStateId; +import org.apache.beam.sdk.nexmark.model.Person; +import org.apache.beam.sdk.values.TimestampedValue; +import org.joda.time.Instant; + +/** + * A direct implementation of {@link Query3}. + */ +public class Query3Model extends NexmarkQueryModel implements Serializable { + /** + * Simulator for query 3. + */ + private static class Simulator extends AbstractSimulator<Event, NameCityStateId> { + /** Auctions, indexed by seller id. */ + private final Multimap<Long, Auction> newAuctions; + + /** Persons, indexed by id. */ + private final Map<Long, Person> newPersons; + + public Simulator(NexmarkConfiguration configuration) { + super(NexmarkUtils.standardEventIterator(configuration)); + newPersons = new HashMap<>(); + newAuctions = ArrayListMultimap.create(); + } + + /** + * Capture new result. + */ + private void addResult(Auction auction, Person person, Instant timestamp) { + TimestampedValue<NameCityStateId> result = TimestampedValue.of( + new NameCityStateId(person.name, person.city, person.state, auction.id), timestamp); + addResult(result); + } + + @Override + protected void run() { + TimestampedValue<Event> timestampedEvent = nextInput(); + if (timestampedEvent == null) { + allDone(); + return; + } + Event event = timestampedEvent.getValue(); + if (event.bid != null) { + // Ignore bid events. + return; + } + + Instant timestamp = timestampedEvent.getTimestamp(); + + if (event.newAuction != null) { + // Only want auctions in category 10. + if (event.newAuction.category == 10) { + // Join new auction with existing person, if any. + Person person = newPersons.get(event.newAuction.seller); + if (person != null) { + addResult(event.newAuction, person, timestamp); + } else { + // Remember auction for future new person event. + newAuctions.put(event.newAuction.seller, event.newAuction); + } + } + } else { + // Only want people in OR, ID or CA. + if (event.newPerson.state.equals("OR") || event.newPerson.state.equals("ID") + || event.newPerson.state.equals("CA")) { + // Join new person with existing auctions. + for (Auction auction : newAuctions.get(event.newPerson.id)) { + addResult(auction, event.newPerson, timestamp); + } + // We'll never need these auctions again. + newAuctions.removeAll(event.newPerson.id); + // Remember person for future auctions. + newPersons.put(event.newPerson.id, event.newPerson); + } + } + } + } + + public Query3Model(NexmarkConfiguration configuration) { + super(configuration); + } + + @Override + public AbstractSimulator<?, ?> simulator() { + return new Simulator(configuration); + } + + @Override + protected <T> Collection<String> toCollection(Iterator<TimestampedValue<T>> itr) { + return toValue(itr); + } +}
