http://git-wip-us.apache.org/repos/asf/beam/blob/1f08970a/integration/java/src/main/java/org/apache/beam/integration/nexmark/NexmarkUtils.java ---------------------------------------------------------------------- diff --git a/integration/java/src/main/java/org/apache/beam/integration/nexmark/NexmarkUtils.java b/integration/java/src/main/java/org/apache/beam/integration/nexmark/NexmarkUtils.java new file mode 100644 index 0000000..13ed580 --- /dev/null +++ b/integration/java/src/main/java/org/apache/beam/integration/nexmark/NexmarkUtils.java @@ -0,0 +1,681 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.integration.nexmark; + +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.coders.AvroCoder; +import org.apache.beam.sdk.coders.ByteArrayCoder; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.CoderException; +import org.apache.beam.sdk.coders.CoderRegistry; +import org.apache.beam.sdk.coders.CustomCoder; +import org.apache.beam.sdk.coders.SerializableCoder; +import org.apache.beam.sdk.io.Read; +import org.apache.beam.sdk.runners.DirectPipelineRunner; +import org.apache.beam.sdk.runners.PipelineRunner; +import org.apache.beam.sdk.transforms.Aggregator; +import org.apache.beam.sdk.transforms.Combine; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.Sum.SumLongFn; +import org.apache.beam.sdk.transforms.windowing.AfterPane; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.transforms.windowing.GlobalWindows; +import org.apache.beam.sdk.transforms.windowing.Window; +import org.apache.beam.sdk.util.state.StateNamespaces; +import org.apache.beam.sdk.util.state.StateTag; +import org.apache.beam.sdk.util.state.StateTags; +import org.apache.beam.sdk.util.state.ValueState; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PInput; +import org.apache.beam.sdk.values.TimestampedValue; +import com.google.common.collect.ImmutableList; +import com.google.common.hash.Hashing; + +import com.fasterxml.jackson.databind.ObjectMapper; + +import org.joda.time.Duration; +import org.joda.time.Instant; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.nio.charset.StandardCharsets; +import java.util.Iterator; +import java.util.List; + +/** + * Odd's 'n Ends used throughout queries and driver. + */ +public class NexmarkUtils { + private static final Logger LOG = LoggerFactory.getLogger(NexmarkGoogleDriver.class.getName()); + + /** + * Mapper for (de)serializing JSON. + */ + static final ObjectMapper MAPPER = new ObjectMapper(); + + /** + * Possible sources for events. + */ + public enum SourceType { + /** + * Produce events directly. + */ + DIRECT, + /** + * Read events from an Avro file. + */ + AVRO, + /** + * Read from a PubSub topic. It will be fed the same synthetic events by this pipeline. + */ + PUBSUB + } + + /** + * Possible sinks for query results. + */ + public enum SinkType { + /** + * Discard all results. + */ + COUNT_ONLY, + /** + * Discard all results after converting them to strings. + */ + DEVNULL, + /** + * Write to a PubSub topic. It will be drained by this pipeline. + */ + PUBSUB, + /** + * Write to a text file. Only works in batch mode. + */ + TEXT, + /** + * Write raw Events to Avro. Only works in batch mode. + */ + AVRO, + /** + * Write raw Events to BigQuery. + */ + BIGQUERY, + } + + /** + * Pub/sub mode to run in. + */ + public enum PubSubMode { + /** + * Publish events to pub/sub, but don't run the query. + */ + PUBLISH_ONLY, + /** + * Consume events from pub/sub and run the query, but don't publish. + */ + SUBSCRIBE_ONLY, + /** + * Both publish and consume, but as separate jobs. + */ + COMBINED + } + + /** + * Coder strategies. + */ + public enum CoderStrategy { + /** + * Hand-written. + */ + HAND, + /** + * Avro. + */ + AVRO, + /** + * Java serialization. + */ + JAVA + } + + /** + * How to determine resource names. + */ + public enum ResourceNameMode { + /** Names are used as provided. */ + VERBATIM, + /** Names are suffixed with the query being run. */ + QUERY, + /** Names are suffixed with the query being run and a random number. */ + QUERY_AND_SALT; + } + + /** + * Units for rates. + */ + public enum RateUnit { + PER_SECOND(1_000_000L), + PER_MINUTE(60_000_000L); + + RateUnit(long usPerUnit) { + this.usPerUnit = usPerUnit; + } + + /** + * Number of microseconds per unit. + */ + private final long usPerUnit; + + /** + * Number of microseconds between events at given rate. + */ + public long rateToPeriodUs(long rate) { + return (usPerUnit + rate / 2) / rate; + } + } + + /** + * Shape of event rate. + */ + public static enum RateShape { + SQUARE, + SINE; + + /** + * Number of steps used to approximate sine wave. + */ + private static final int N = 10; + + /** + * Return inter-event delay, in microseconds, for each generator + * to follow in order to achieve {@code rate} at {@code unit} using {@code numGenerators}. + */ + public long interEventDelayUs(int rate, RateUnit unit, int numGenerators) { + return unit.rateToPeriodUs(rate) * numGenerators; + } + + /** + * Return array of successive inter-event delays, in microseconds, for each generator + * to follow in order to achieve this shape with {@code firstRate/nextRate} at + * {@code unit} using {@code numGenerators}. + */ + public long[] interEventDelayUs( + int firstRate, int nextRate, RateUnit unit, int numGenerators) { + if (firstRate == nextRate) { + long[] interEventDelayUs = new long[1]; + interEventDelayUs[0] = unit.rateToPeriodUs(firstRate) * numGenerators; + return interEventDelayUs; + } + + switch (this) { + case SQUARE: { + long[] interEventDelayUs = new long[2]; + interEventDelayUs[0] = unit.rateToPeriodUs(firstRate) * numGenerators; + interEventDelayUs[1] = unit.rateToPeriodUs(nextRate) * numGenerators; + return interEventDelayUs; + } + case SINE: { + double mid = (firstRate + nextRate) / 2.0; + double amp = (firstRate - nextRate) / 2.0; // may be -ve + long[] interEventDelayUs = new long[N]; + for (int i = 0; i < N; i++) { + double r = (2.0 * Math.PI * i) / N; + double rate = mid + amp * Math.cos(r); + interEventDelayUs[i] = unit.rateToPeriodUs(Math.round(rate)) * numGenerators; + } + return interEventDelayUs; + } + } + throw new RuntimeException(); // switch should be exhaustive + } + + /** + * Return delay between steps, in seconds, for result of {@link #interEventDelayUs}, so + * as to cycle through the entire sequence every {@code ratePeriodSec}. + */ + public int stepLengthSec(int ratePeriodSec) { + int n = 0; + switch (this) { + case SQUARE: + n = 2; + break; + case SINE: + n = N; + break; + } + return (ratePeriodSec + n - 1) / n; + } + } + + /** + * Set to true to capture all info messages. The logging level flags don't currently work. + */ + private static final boolean LOG_INFO = false; + + /** + * Set to true to capture all error messages. The logging level flags don't currently work. + */ + private static final boolean LOG_ERROR = true; + + /** + * Set to true to log directly to stdout on VM. You can watch the results in real-time with: + * tail -f /var/log/dataflow/streaming-harness/harness-stdout.log + */ + private static final boolean LOG_TO_CONSOLE = false; + + /** + * Log info message. + */ + public static void info(String format, Object... args) { + if (LOG_INFO) { + LOG.info(String.format(format, args)); + if (LOG_TO_CONSOLE) { + System.out.println(String.format(format, args)); + } + } + } + + /** + * Log error message. + */ + public static void error(String format, Object... args) { + if (LOG_ERROR) { + LOG.error(String.format(format, args)); + if (LOG_TO_CONSOLE) { + System.out.println(String.format(format, args)); + } + } + } + + /** + * Log message to console. For client side only. + */ + public static void console(String format, Object... args) { + System.out.printf("%s %s\n", Instant.now(), String.format(format, args)); + } + + /** + * Label to use for timestamps on pub/sub messages. + */ + public static final String PUBSUB_TIMESTAMP = "timestamp"; + + /** + * Label to use for windmill ids on pub/sub messages. + */ + public static final String PUBSUB_ID = "id"; + + /** + * All events will be given a timestamp relative to this time (ms since epoch). + */ + public static final long BASE_TIME = Instant.parse("2015-07-15T00:00:00.000Z").getMillis(); + + /** + * Instants guaranteed to be strictly before and after all event timestamps, and which won't + * be subject to underflow/overflow. + */ + public static final Instant BEGINNING_OF_TIME = new Instant(0).plus(Duration.standardDays(365)); + public static final Instant END_OF_TIME = + BoundedWindow.TIMESTAMP_MAX_VALUE.minus(Duration.standardDays(365)); + + /** + * Setup pipeline with codes and some other options. + */ + public static void setupPipeline(CoderStrategy coderStrategy, Pipeline p) { + PipelineRunner<?> runner = p.getRunner(); + if (runner instanceof DirectPipelineRunner) { + // Disable randomization of output since we want to check batch and streaming match the + // model both locally and on the cloud. + ((DirectPipelineRunner) runner).withUnorderednessTesting(false); + } + + CoderRegistry registry = p.getCoderRegistry(); + switch (coderStrategy) { + case HAND: + registry.registerCoder(Auction.class, Auction.CODER); + registry.registerCoder(AuctionBid.class, AuctionBid.CODER); + registry.registerCoder(AuctionCount.class, AuctionCount.CODER); + registry.registerCoder(AuctionPrice.class, AuctionPrice.CODER); + registry.registerCoder(Bid.class, Bid.CODER); + registry.registerCoder(CategoryPrice.class, CategoryPrice.CODER); + registry.registerCoder(Event.class, Event.CODER); + registry.registerCoder(IdNameReserve.class, IdNameReserve.CODER); + registry.registerCoder(NameCityStateId.class, NameCityStateId.CODER); + registry.registerCoder(Person.class, Person.CODER); + registry.registerCoder(SellerPrice.class, SellerPrice.CODER); + registry.registerCoder(Done.class, Done.CODER); + registry.registerCoder(BidsPerSession.class, BidsPerSession.CODER); + break; + case AVRO: + registry.setFallbackCoderProvider(AvroCoder.PROVIDER); + break; + case JAVA: + registry.setFallbackCoderProvider(SerializableCoder.PROVIDER); + break; + } + } + + /** + * Return a generator config to match the given {@code options}. + */ + public static GeneratorConfig standardGeneratorConfig(NexmarkConfiguration configuration) { + return new GeneratorConfig(configuration, + configuration.useWallclockEventTime ? System.currentTimeMillis() + : BASE_TIME, 0, + configuration.numEvents, 0); + } + + /** + * Return an iterator of events using the 'standard' generator config. + */ + public static Iterator<TimestampedValue<Event>> standardEventIterator( + NexmarkConfiguration configuration) { + return new Generator(standardGeneratorConfig(configuration)); + } + + /** + * Return a transform which yields a finite number of synthesized events generated + * as a batch. + */ + public static PTransform<PInput, PCollection<Event>> batchEventsSource( + String name, NexmarkConfiguration configuration) { + return Read + .from(new BoundedEventSource( + NexmarkUtils.standardGeneratorConfig(configuration), configuration.numEventGenerators)) + .named(name + ".ReadBounded"); + } + + /** + * Return a transform which yields a finite number of synthesized events generated + * on-the-fly in real time. + */ + public static PTransform<PInput, PCollection<Event>> streamEventsSource( + String name, NexmarkConfiguration configuration) { + return Read.from(new UnboundedEventSource(NexmarkUtils.standardGeneratorConfig(configuration), + configuration.numEventGenerators, + configuration.watermarkHoldbackSec, + configuration.isRateLimited)) + .named(name + ".ReadUnbounded"); + } + + /** + * Return a transform to pass-through events, but count them as they go by. + */ + public static ParDo.Bound<Event, Event> snoop(final String name) { + return ParDo.named(name + ".Snoop") + .of(new DoFn<Event, Event>() { + final Aggregator<Long, Long> eventCounter = + createAggregator("events", new SumLongFn()); + final Aggregator<Long, Long> newPersonCounter = + createAggregator("newPersons", new SumLongFn()); + final Aggregator<Long, Long> newAuctionCounter = + createAggregator("newAuctions", new SumLongFn()); + final Aggregator<Long, Long> bidCounter = + createAggregator("bids", new SumLongFn()); + final Aggregator<Long, Long> endOfStreamCounter = + createAggregator("endOfStream", new SumLongFn()); + + @Override + public void processElement(ProcessContext c) { + eventCounter.addValue(1L); + if (c.element().newPerson != null) { + newPersonCounter.addValue(1L); + } else if (c.element().newAuction != null) { + newAuctionCounter.addValue(1L); + } else if (c.element().bid != null) { + bidCounter.addValue(1L); + } else { + endOfStreamCounter.addValue(1L); + } + info("%s snooping element %s", name, c.element()); + c.output(c.element()); + } + }); + } + + /** + * Return a transform to count and discard each element. + */ + public static <T> ParDo.Bound<T, Void> devNull(String name) { + return ParDo.named(name + ".DevNull") + .of(new DoFn<T, Void>() { + final Aggregator<Long, Long> discardCounter = + createAggregator("discarded", new SumLongFn()); + + @Override + public void processElement(ProcessContext c) { + discardCounter.addValue(1L); + } + }); + } + + /** + * Return a transform to log each element, passing it through unchanged. + */ + public static <T> ParDo.Bound<T, T> log(final String name) { + return ParDo.named(name + ".Log") + .of(new DoFn<T, T>() { + @Override + public void processElement(ProcessContext c) { + error("%s: %s", name, c.element()); + c.output(c.element()); + } + }); + } + + /** + * Return a transform to format each element as a string. + */ + public static <T> ParDo.Bound<T, String> format(String name) { + return ParDo.named(name + ".Format") + .of(new DoFn<T, String>() { + final Aggregator<Long, Long> recordCounter = + createAggregator("records", new SumLongFn()); + + @Override + public void processElement(ProcessContext c) { + recordCounter.addValue(1L); + c.output(c.element().toString()); + } + }); + } + + /** + * Return a transform to make explicit the timestamp of each element. + */ + public static <T> ParDo.Bound<T, TimestampedValue<T>> stamp(String name) { + return ParDo.named(name + ".Stamp") + .of(new DoFn<T, TimestampedValue<T>>() { + @Override + public void processElement(ProcessContext c) { + c.output(TimestampedValue.of(c.element(), c.timestamp())); + } + }); + } + + /** + * Return a transform to reduce a stream to a single, order-invariant long hash. + */ + public static <T> PTransform<PCollection<T>, PCollection<Long>> hash( + final long numEvents, String name) { + return new PTransform<PCollection<T>, PCollection<Long>>(name) { + @Override + public PCollection<Long> apply(PCollection<T> input) { + return input.apply(Window.<T>into(new GlobalWindows()) + .triggering(AfterPane.elementCountAtLeast((int) numEvents)) + .withAllowedLateness(Duration.standardDays(1)) + .discardingFiredPanes()) + + .apply(ParDo.named(name + ".Hash").of(new DoFn<T, Long>() { + @Override + public void processElement(ProcessContext c) { + long hash = + Hashing.murmur3_128() + .newHasher() + .putLong(c.timestamp().getMillis()) + .putString(c.element().toString(), StandardCharsets.UTF_8) + .hash() + .asLong(); + c.output(hash); + } + })) + + .apply(Combine.globally(new Combine.BinaryCombineFn<Long>() { + @Override + public Long apply(Long left, Long right) { + return left ^ right; + } + })); + } + }; + } + + private static final long MASK = (1L << 16) - 1L; + private static final long HASH = 0x243F6A8885A308D3L; + private static final long INIT_PLAINTEXT = 50000L; + + /** + * Return a transform to keep the CPU busy for given milliseconds on every record. + */ + public static <T> ParDo.Bound<T, T> cpuDelay(String name, final long delayMs) { + return ParDo.named(name + ".CpuDelay") + .of(new DoFn<T, T>() { + @Override + public void processElement(ProcessContext c) { + long now = System.currentTimeMillis(); + long end = now + delayMs; + while (now < end) { + // Find plaintext which hashes to HASH in lowest MASK bits. + // Values chosen to roughly take 1ms on typical workstation. + long p = INIT_PLAINTEXT; + while (true) { + long t = Hashing.murmur3_128().hashLong(p).asLong(); + if ((t & MASK) == (HASH & MASK)) { + break; + } + p++; + } + long next = System.currentTimeMillis(); + now = next; + } + c.output(c.element()); + } + }); + } + + private static final StateTag<Object, ValueState<byte[]>> DUMMY_TAG = + StateTags.value("dummy", ByteArrayCoder.of()); + private static final int MAX_BUFFER_SIZE = 1 << 24; + + /** + * Return a transform to write given number of bytes to durable store on every record. + */ + public static <T> ParDo.Bound<T, T> diskBusy(String name, final long bytes) { + return ParDo.named(name + ".DiskBusy") + .of(new DoFn<T, T>() { + @Override + public void processElement(ProcessContext c) { + long remain = bytes; + long start = System.currentTimeMillis(); + long now = start; + while (remain > 0) { + long thisBytes = Math.min(remain, MAX_BUFFER_SIZE); + remain -= thisBytes; + byte[] arr = new byte[(int) thisBytes]; + for (int i = 0; i < thisBytes; i++) { + arr[i] = (byte) now; + } + ValueState<byte[]> state = c.windowingInternals().stateInternals().state( + StateNamespaces.global(), DUMMY_TAG); + state.write(arr); + now = System.currentTimeMillis(); + } + c.output(c.element()); + } + }); + } + + /** + * Return a transform to cast each element to {@link KnownSize}. + */ + private static <T extends KnownSize> ParDo.Bound<T, KnownSize> castToKnownSize( + final String name) { + return ParDo.named(name + ".Forget") + .of(new DoFn<T, KnownSize>() { + @Override + public void processElement(ProcessContext c) { + c.output(c.element()); + } + }); + } + + /** + * A coder for instances of {@code T} cast up to {@link KnownSize}. + * + * @param <T> True type of object. + */ + private static class CastingCoder<T extends KnownSize> extends CustomCoder<KnownSize> { + private final Coder<T> trueCoder; + + public CastingCoder(Coder<T> trueCoder) { + this.trueCoder = trueCoder; + } + + @Override + public void encode(KnownSize value, OutputStream outStream, Context context) + throws CoderException, IOException { + @SuppressWarnings("unchecked") + T typedValue = (T) value; + trueCoder.encode(typedValue, outStream, context); + } + + @Override + public KnownSize decode(InputStream inStream, Context context) + throws CoderException, IOException { + return trueCoder.decode(inStream, context); + } + + @Override + public List<? extends Coder<?>> getComponents() { + return ImmutableList.of(trueCoder); + } + } + + /** + * Return a coder for {@code KnownSize} that are known to be exactly of type {@code T}. + */ + private static <T extends KnownSize> Coder<KnownSize> makeCastingCoder(Coder<T> trueCoder) { + return new CastingCoder<>(trueCoder); + } + + /** + * Return {@code elements} as {@code KnownSize}s. + */ + public static <T extends KnownSize> PCollection<KnownSize> castToKnownSize( + final String name, PCollection<T> elements) { + return elements.apply(castToKnownSize(name)).setCoder(makeCastingCoder(elements.getCoder())); + } + + // Do not instantiate. + private NexmarkUtils() { + } +}
http://git-wip-us.apache.org/repos/asf/beam/blob/1f08970a/integration/java/src/main/java/org/apache/beam/integration/nexmark/Options.java ---------------------------------------------------------------------- diff --git a/integration/java/src/main/java/org/apache/beam/integration/nexmark/Options.java b/integration/java/src/main/java/org/apache/beam/integration/nexmark/Options.java new file mode 100644 index 0000000..4f5304d --- /dev/null +++ b/integration/java/src/main/java/org/apache/beam/integration/nexmark/Options.java @@ -0,0 +1,360 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.integration.nexmark; + +import org.apache.beam.sdk.options.Default; +import org.apache.beam.sdk.options.Description; +import org.apache.beam.sdk.options.PubsubOptions; +import javax.annotation.Nullable; + +/** + * Command line flags. + */ +public interface Options extends PubsubOptions { + @Description("Which suite to run. Default is to use command line arguments for one job.") + @Default.Enum("DEFAULT") + NexmarkSuite getSuite(); + + void setSuite(NexmarkSuite suite); + + @Description("If true, and using the DataflowPipelineRunner, monitor the jobs as they run.") + @Default.Boolean(false) + boolean getMonitorJobs(); + + void setMonitorJobs(boolean monitorJobs); + + @Description("Where the events come from.") + @Nullable + NexmarkUtils.SourceType getSourceType(); + + void setSourceType(NexmarkUtils.SourceType sourceType); + + @Description("Prefix for input files if using avro input") + @Nullable + String getInputPath(); + + void setInputPath(String inputPath); + + @Description("Where results go.") + @Nullable + NexmarkUtils.SinkType getSinkType(); + + void setSinkType(NexmarkUtils.SinkType sinkType); + + @Description("Which mode to run in when source is PUBSUB.") + @Nullable + NexmarkUtils.PubSubMode getPubSubMode(); + + void setPubSubMode(NexmarkUtils.PubSubMode pubSubMode); + + @Description("Which query to run.") + @Nullable + Integer getQuery(); + + void setQuery(Integer query); + + @Description("Prefix for output files if using text output for results or running Query 10.") + @Nullable + String getOutputPath(); + + void setOutputPath(String outputPath); + + @Description("Base name of pubsub topic to publish to in streaming mode.") + @Nullable + @Default.String("nexmark") + String getPubsubTopic(); + + void setPubsubTopic(String pubsubTopic); + + @Description("Base name of pubsub subscription to read from in streaming mode.") + @Nullable + @Default.String("nexmark") + String getPubsubSubscription(); + + void setPubsubSubscription(String pubsubSubscription); + + @Description("Base name of BigQuery table name if using BigQuery output.") + @Nullable + @Default.String("nexmark") + String getBigQueryTable(); + + void setBigQueryTable(String bigQueryTable); + + @Description("Approximate number of events to generate. " + + "Zero for effectively unlimited in streaming mode.") + @Nullable + Long getNumEvents(); + + void setNumEvents(Long numEvents); + + @Description("Time in seconds to preload the subscription with data, at the initial input rate " + + "of the pipeline.") + @Nullable + Integer getPreloadSeconds(); + + void setPreloadSeconds(Integer preloadSeconds); + + @Description("Number of unbounded sources to create events.") + @Nullable + Integer getNumEventGenerators(); + + void setNumEventGenerators(Integer numEventGenerators); + + @Description("Shape of event rate curve.") + @Nullable + NexmarkUtils.RateShape getRateShape(); + + void setRateShape(NexmarkUtils.RateShape rateShape); + + @Description("Initial overall event rate (in --rateUnit).") + @Nullable + Integer getFirstEventRate(); + + void setFirstEventRate(Integer firstEventRate); + + @Description("Next overall event rate (in --rateUnit).") + @Nullable + Integer getNextEventRate(); + + void setNextEventRate(Integer nextEventRate); + + @Description("Unit for rates.") + @Nullable + NexmarkUtils.RateUnit getRateUnit(); + + void setRateUnit(NexmarkUtils.RateUnit rateUnit); + + @Description("Overall period of rate shape, in seconds.") + @Nullable + Integer getRatePeriodSec(); + + void setRatePeriodSec(Integer ratePeriodSec); + + @Description("If true, relay events in real time in streaming mode.") + @Nullable + Boolean getIsRateLimited(); + + void setIsRateLimited(Boolean isRateLimited); + + @Description("If true, use wallclock time as event time. Otherwise, use a deterministic" + + " time in the past so that multiple runs will see exactly the same event streams" + + " and should thus have exactly the same results.") + @Nullable + Boolean getUseWallclockEventTime(); + + void setUseWallclockEventTime(Boolean useWallclockEventTime); + + @Description("Assert pipeline results match model results.") + @Nullable + boolean getAssertCorrectness(); + + void setAssertCorrectness(boolean assertCorrectness); + + @Description("Log all input events.") + @Nullable + boolean getLogEvents(); + + void setLogEvents(boolean logEvents); + + @Description("Log all query results.") + @Nullable + boolean getLogResults(); + + void setLogResults(boolean logResults); + + @Description("Average size in bytes for a person record.") + @Nullable + Integer getAvgPersonByteSize(); + + void setAvgPersonByteSize(Integer avgPersonByteSize); + + @Description("Average size in bytes for an auction record.") + @Nullable + Integer getAvgAuctionByteSize(); + + void setAvgAuctionByteSize(Integer avgAuctionByteSize); + + @Description("Average size in bytes for a bid record.") + @Nullable + Integer getAvgBidByteSize(); + + void setAvgBidByteSize(Integer avgBidByteSize); + + @Description("Ratio of bids for 'hot' auctions above the background.") + @Nullable + Integer getHotAuctionRatio(); + + void setHotAuctionRatio(Integer hotAuctionRatio); + + @Description("Ratio of auctions for 'hot' sellers above the background.") + @Nullable + Integer getHotSellersRatio(); + + void setHotSellersRatio(Integer hotSellersRatio); + + @Description("Ratio of auctions for 'hot' bidders above the background.") + @Nullable + Integer getHotBiddersRatio(); + + void setHotBiddersRatio(Integer hotBiddersRatio); + + @Description("Window size in seconds.") + @Nullable + Long getWindowSizeSec(); + + void setWindowSizeSec(Long windowSizeSec); + + @Description("Window period in seconds.") + @Nullable + Long getWindowPeriodSec(); + + void setWindowPeriodSec(Long windowPeriodSec); + + @Description("If in streaming mode, the holdback for watermark in seconds.") + @Nullable + Long getWatermarkHoldbackSec(); + + void setWatermarkHoldbackSec(Long watermarkHoldbackSec); + + @Description("Roughly how many auctions should be in flight for each generator.") + @Nullable + Integer getNumInFlightAuctions(); + + void setNumInFlightAuctions(Integer numInFlightAuctions); + + + @Description("Maximum number of people to consider as active for placing auctions or bids.") + @Nullable + Integer getNumActivePeople(); + + void setNumActivePeople(Integer numActivePeople); + + @Description("Filename of perf data to append to.") + @Nullable + String getPerfFilename(); + + void setPerfFilename(String perfFilename); + + @Description("Filename of baseline perf data to read from.") + @Nullable + String getBaselineFilename(); + + void setBaselineFilename(String baselineFilename); + + @Description("Filename of summary perf data to append to.") + @Nullable + String getSummaryFilename(); + + void setSummaryFilename(String summaryFilename); + + @Description("Filename for javascript capturing all perf data and any baselines.") + @Nullable + String getJavascriptFilename(); + + void setJavascriptFilename(String javascriptFilename); + + @Description("If true, don't run the actual query. Instead, calculate the distribution " + + "of number of query results per (event time) minute according to the query model.") + @Nullable + boolean getJustModelResultRate(); + + void setJustModelResultRate(boolean justModelResultRate); + + @Description("Coder strategy to use.") + @Nullable + NexmarkUtils.CoderStrategy getCoderStrategy(); + + void setCoderStrategy(NexmarkUtils.CoderStrategy coderStrategy); + + @Description("Delay, in milliseconds, for each event. We will peg one core for this " + + "number of milliseconds to simulate CPU-bound computation.") + @Nullable + Long getCpuDelayMs(); + + void setCpuDelayMs(Long cpuDelayMs); + + @Description("Extra data, in bytes, to save to persistent state for each event. " + + "This will force I/O all the way to durable storage to simulate an " + + "I/O-bound computation.") + @Nullable + Long getDiskBusyBytes(); + + void setDiskBusyBytes(Long diskBusyBytes); + + @Description("Skip factor for query 2. We select bids for every {@code auctionSkip}'th auction") + @Nullable + Integer getAuctionSkip(); + + void setAuctionSkip(Integer auctionSkip); + + @Description("Fanout for queries 4 (groups by category id) and 7 (finds a global maximum).") + @Nullable + Integer getFanout(); + + void setFanout(Integer fanout); + + @Description("Length of occasional delay to impose on events (in seconds).") + @Nullable + Long getOccasionalDelaySec(); + + void setOccasionalDelaySec(Long occasionalDelaySec); + + @Description("Probability that an event will be delayed by delayS.") + @Nullable + Double getProbDelayedEvent(); + + void setProbDelayedEvent(Double probDelayedEvent); + + @Description("Maximum size of each log file (in events). For Query10 only.") + @Nullable + Integer getMaxLogEvents(); + + void setMaxLogEvents(Integer maxLogEvents); + + @Description("How to derive names of resources.") + @Default.Enum("QUERY_AND_SALT") + NexmarkUtils.ResourceNameMode getResourceNameMode(); + + void setResourceNameMode(NexmarkUtils.ResourceNameMode mode); + + @Description("If true, manage the creation and cleanup of topics, subscriptions and gcs files.") + @Default.Boolean(true) + boolean getManageResources(); + + void setManageResources(boolean manageResources); + + @Description("If true, use pub/sub publish time instead of event time.") + @Nullable + Boolean getUsePubsubPublishTime(); + + void setUsePubsubPublishTime(Boolean usePubsubPublishTime); + + @Description("Number of events in out-of-order groups. 1 implies no out-of-order events. " + + "1000 implies every 1000 events per generator are emitted in pseudo-random order.") + @Nullable + Long getOutOfOrderGroupSize(); + + void setOutOfOrderGroupSize(Long outOfOrderGroupSize); + + @Description("If false, do not add the Monitor and Snoop transforms.") + @Nullable + Boolean getDebug(); + + void setDebug(Boolean value); +} http://git-wip-us.apache.org/repos/asf/beam/blob/1f08970a/integration/java/src/main/java/org/apache/beam/integration/nexmark/Person.java ---------------------------------------------------------------------- diff --git a/integration/java/src/main/java/org/apache/beam/integration/nexmark/Person.java b/integration/java/src/main/java/org/apache/beam/integration/nexmark/Person.java new file mode 100644 index 0000000..6fcf388 --- /dev/null +++ b/integration/java/src/main/java/org/apache/beam/integration/nexmark/Person.java @@ -0,0 +1,166 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.integration.nexmark; + +import org.apache.beam.sdk.coders.AtomicCoder; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.CoderException; +import org.apache.beam.sdk.coders.StringUtf8Coder; +import org.apache.beam.sdk.coders.VarLongCoder; + +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.core.JsonProcessingException; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.io.Serializable; + +/** + * A person either creating an auction or making a bid. + */ +public class Person implements KnownSize, Serializable { + private static final Coder<Long> LONG_CODER = VarLongCoder.of(); + private static final Coder<String> STRING_CODER = StringUtf8Coder.of(); + public static final Coder<Person> CODER = new AtomicCoder<Person>() { + @Override + public void encode(Person value, OutputStream outStream, + Coder.Context context) + throws CoderException, IOException { + LONG_CODER.encode(value.id, outStream, Context.NESTED); + STRING_CODER.encode(value.name, outStream, Context.NESTED); + STRING_CODER.encode(value.emailAddress, outStream, Context.NESTED); + STRING_CODER.encode(value.creditCard, outStream, Context.NESTED); + STRING_CODER.encode(value.city, outStream, Context.NESTED); + STRING_CODER.encode(value.state, outStream, Context.NESTED); + LONG_CODER.encode(value.dateTime, outStream, Context.NESTED); + STRING_CODER.encode(value.extra, outStream, Context.NESTED); + } + + @Override + public Person decode( + InputStream inStream, Coder.Context context) + throws CoderException, IOException { + long id = LONG_CODER.decode(inStream, Context.NESTED); + String name = STRING_CODER.decode(inStream, Context.NESTED); + String emailAddress = STRING_CODER.decode(inStream, Context.NESTED); + String creditCard = STRING_CODER.decode(inStream, Context.NESTED); + String city = STRING_CODER.decode(inStream, Context.NESTED); + String state = STRING_CODER.decode(inStream, Context.NESTED); + long dateTime = LONG_CODER.decode(inStream, Context.NESTED); + String extra = STRING_CODER.decode(inStream, Context.NESTED); + return new Person(id, name, emailAddress, creditCard, city, state, dateTime, extra); + } + }; + + /** Id of person. */ + @JsonProperty + public final long id; // primary key + + /** Extra person properties. */ + @JsonProperty + public final String name; + + @JsonProperty + public final String emailAddress; + + @JsonProperty + public final String creditCard; + + @JsonProperty + public final String city; + + @JsonProperty + public final String state; + + @JsonProperty + public final long dateTime; + + /** Additional arbitrary payload for performance testing. */ + @JsonProperty + public final String extra; + + // For Avro only. + @SuppressWarnings("unused") + private Person() { + id = 0; + name = null; + emailAddress = null; + creditCard = null; + city = null; + state = null; + dateTime = 0; + extra = null; + } + + public Person(long id, String name, String emailAddress, String creditCard, String city, + String state, long dateTime, String extra) { + this.id = id; + this.name = name; + this.emailAddress = emailAddress; + this.creditCard = creditCard; + this.city = city; + this.state = state; + this.dateTime = dateTime; + this.extra = extra; + } + + /** + * Return a copy of person which capture the given annotation. + * (Used for debugging). + */ + public Person withAnnotation(String annotation) { + return new Person(id, name, emailAddress, creditCard, city, state, dateTime, + annotation + ": " + extra); + } + + /** + * Does person have {@code annotation}? (Used for debugging.) + */ + public boolean hasAnnotation(String annotation) { + return extra.startsWith(annotation + ": "); + } + + /** + * Remove {@code annotation} from person. (Used for debugging.) + */ + public Person withoutAnnotation(String annotation) { + if (hasAnnotation(annotation)) { + return new Person(id, name, emailAddress, creditCard, city, state, dateTime, + extra.substring(annotation.length() + 2)); + } else { + return this; + } + } + + @Override + public long sizeInBytes() { + return 8 + name.length() + 1 + emailAddress.length() + 1 + creditCard.length() + 1 + + city.length() + 1 + state.length() + 8 + 1 + extra.length() + 1; + } + + @Override + public String toString() { + try { + return NexmarkUtils.MAPPER.writeValueAsString(this); + } catch (JsonProcessingException e) { + throw new RuntimeException(e); + } + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/1f08970a/integration/java/src/main/java/org/apache/beam/integration/nexmark/PubsubHelper.java ---------------------------------------------------------------------- diff --git a/integration/java/src/main/java/org/apache/beam/integration/nexmark/PubsubHelper.java b/integration/java/src/main/java/org/apache/beam/integration/nexmark/PubsubHelper.java new file mode 100644 index 0000000..1255154 --- /dev/null +++ b/integration/java/src/main/java/org/apache/beam/integration/nexmark/PubsubHelper.java @@ -0,0 +1,217 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.integration.nexmark; + +import org.apache.beam.sdk.options.PubsubOptions; +import org.apache.beam.sdk.util.PubsubClient; +import org.apache.beam.sdk.util.PubsubJsonClient; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; + +/** + * Helper for working with pubsub. + */ +public class PubsubHelper implements AutoCloseable { + /** + * Underlying pub/sub client. + */ + private final PubsubClient pubsubClient; + + /** + * Project id. + */ + private final String projectId; + + /** + * Topics we should delete on close. + */ + private final List<PubsubClient.TopicPath> createdTopics; + + /** + * Subscriptions we should delete on close. + */ + private final List<PubsubClient.SubscriptionPath> createdSubscriptions; + + private PubsubHelper(PubsubClient pubsubClient, String projectId) { + this.pubsubClient = pubsubClient; + this.projectId = projectId; + createdTopics = new ArrayList<>(); + createdSubscriptions = new ArrayList<>(); + } + + /** + * Create a helper. + */ + public static PubsubHelper create(PubsubOptions options) { + try { + return new PubsubHelper( + PubsubJsonClient.FACTORY.newClient(null, null, options), + options.getProject()); + } catch (IOException e) { + throw new RuntimeException("Unable to create Pubsub client: ", e); + } + } + + /** + * Create a topic from short name. Delete it if it already exists. Ensure the topic will be + * deleted on cleanup. Return full topic name. + */ + public PubsubClient.TopicPath createTopic(String shortTopic) { + PubsubClient.TopicPath topic = PubsubClient.topicPathFromName(projectId, shortTopic); + try { + if (topicExists(shortTopic)) { + NexmarkUtils.console("attempting to cleanup topic %s", topic); + pubsubClient.deleteTopic(topic); + } + NexmarkUtils.console("create topic %s", topic); + pubsubClient.createTopic(topic); + createdTopics.add(topic); + return topic; + } catch (IOException e) { + throw new RuntimeException("Unable to create Pubsub topic " + topic + ": ", e); + } + } + + /** + * Create a topic from short name if it does not already exist. The topic will not be + * deleted on cleanup. Return full topic name. + */ + public PubsubClient.TopicPath createOrReuseTopic(String shortTopic) { + PubsubClient.TopicPath topic = PubsubClient.topicPathFromName(projectId, shortTopic); + try { + if (topicExists(shortTopic)) { + NexmarkUtils.console("topic %s already exists", topic); + return topic; + } + NexmarkUtils.console("create topic %s", topic); + pubsubClient.createTopic(topic); + return topic; + } catch (IOException e) { + throw new RuntimeException("Unable to create or reuse Pubsub topic " + topic + ": ", e); + } + } + + /** + * Check a topic corresponding to short name exists, and throw exception if not. The + * topic will not be deleted on cleanup. Return full topic name. + */ + public PubsubClient.TopicPath reuseTopic(String shortTopic) { + PubsubClient.TopicPath topic = PubsubClient.topicPathFromName(projectId, shortTopic); + if (topicExists(shortTopic)) { + NexmarkUtils.console("reusing existing topic %s", topic); + return topic; + } + throw new RuntimeException("topic '" + topic + "' does not already exist"); + } + + /** + * Does topic corresponding to short name exist? + */ + public boolean topicExists(String shortTopic) { + PubsubClient.ProjectPath project = PubsubClient.projectPathFromId(projectId); + PubsubClient.TopicPath topic = PubsubClient.topicPathFromName(projectId, shortTopic); + try { + Collection<PubsubClient.TopicPath> existingTopics = pubsubClient.listTopics(project); + return existingTopics.contains(topic); + } catch (IOException e) { + throw new RuntimeException("Unable to check Pubsub topic " + topic + ": ", e); + } + } + + /** + * Create subscription from short name. Delete subscription if it already exists. Ensure the + * subscription will be deleted on cleanup. Return full subscription name. + */ + public PubsubClient.SubscriptionPath createSubscription( + String shortTopic, String shortSubscription) { + PubsubClient.TopicPath topic = PubsubClient.topicPathFromName(projectId, shortTopic); + PubsubClient.SubscriptionPath subscription = + PubsubClient.subscriptionPathFromName(projectId, shortSubscription); + try { + if (subscriptionExists(shortTopic, shortSubscription)) { + NexmarkUtils.console("attempting to cleanup subscription %s", subscription); + pubsubClient.deleteSubscription(subscription); + } + NexmarkUtils.console("create subscription %s", subscription); + pubsubClient.createSubscription(topic, subscription, 60); + createdSubscriptions.add(subscription); + } catch (IOException e) { + throw new RuntimeException("Unable to create Pubsub subscription " + subscription + ": ", e); + } + return subscription; + } + + /** + * Check a subscription corresponding to short name exists, and throw exception if not. The + * subscription will not be deleted on cleanup. Return full topic name. + */ + public PubsubClient.SubscriptionPath reuseSubscription( + String shortTopic, String shortSubscription) { + PubsubClient.SubscriptionPath subscription = + PubsubClient.subscriptionPathFromName(projectId, shortSubscription); + if (subscriptionExists(shortTopic, shortSubscription)) { + NexmarkUtils.console("reusing existing subscription %s", subscription); + return subscription; + } + throw new RuntimeException("subscription'" + subscription + "' does not already exist"); + } + + /** + * Does subscription corresponding to short name exist? + */ + public boolean subscriptionExists(String shortTopic, String shortSubscription) { + PubsubClient.ProjectPath project = PubsubClient.projectPathFromId(projectId); + PubsubClient.TopicPath topic = PubsubClient.topicPathFromName(projectId, shortTopic); + PubsubClient.SubscriptionPath subscription = + PubsubClient.subscriptionPathFromName(projectId, shortSubscription); + try { + Collection<PubsubClient.SubscriptionPath> existingSubscriptions = + pubsubClient.listSubscriptions(project, topic); + return existingSubscriptions.contains(subscription); + } catch (IOException e) { + throw new RuntimeException("Unable to check Pubsub subscription" + subscription + ": ", e); + } + } + + /** + * Delete all the subscriptions and topics we created. + */ + @Override + public void close() { + for (PubsubClient.SubscriptionPath subscription : createdSubscriptions) { + try { + NexmarkUtils.console("delete subscription %s", subscription); + pubsubClient.deleteSubscription(subscription); + } catch (IOException ex) { + NexmarkUtils.console("could not delete subscription %s", subscription); + } + } + for (PubsubClient.TopicPath topic : createdTopics) { + try { + NexmarkUtils.console("delete topic %s", topic); + pubsubClient.deleteTopic(topic); + } catch (IOException ex) { + NexmarkUtils.console("could not delete topic %s", topic); + } + } + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/1f08970a/integration/java/src/main/java/org/apache/beam/integration/nexmark/Query0.java ---------------------------------------------------------------------- diff --git a/integration/java/src/main/java/org/apache/beam/integration/nexmark/Query0.java b/integration/java/src/main/java/org/apache/beam/integration/nexmark/Query0.java new file mode 100644 index 0000000..ea0d7ca --- /dev/null +++ b/integration/java/src/main/java/org/apache/beam/integration/nexmark/Query0.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.integration.nexmark; + +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.CoderException; +import org.apache.beam.sdk.transforms.Aggregator; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.Sum.SumLongFn; +import org.apache.beam.sdk.values.PCollection; + + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; + +/** + * Query 0: Pass events through unchanged. However, force them to do a round trip through + * serialization so that we measure the impact of the choice of coders. + */ +public class Query0 extends NexmarkQuery { + public Query0(NexmarkConfiguration configuration) { + super(configuration, "Query0"); + } + + private PCollection<Event> applyTyped(PCollection<Event> events) { + final Coder<Event> coder = events.getCoder(); + + return events + + // Force round trip through coder. + .apply( + ParDo.named(name + ".Serialize") + .of(new DoFn<Event, Event>() { + private final Aggregator<Long, Long> bytes = + createAggregator("bytes", new SumLongFn()); + + @Override + public void processElement(ProcessContext c) throws CoderException, IOException { + ByteArrayOutputStream outStream = new ByteArrayOutputStream(); + coder.encode(c.element(), outStream, Coder.Context.OUTER); + byte[] byteArray = outStream.toByteArray(); + bytes.addValue((long) byteArray.length); + ByteArrayInputStream inStream = new ByteArrayInputStream(byteArray); + Event event = coder.decode(inStream, Coder.Context.OUTER); + c.output(event); + } + })); + } + + @Override + protected PCollection<KnownSize> applyPrim(PCollection<Event> events) { + return NexmarkUtils.castToKnownSize(name, applyTyped(events)); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/1f08970a/integration/java/src/main/java/org/apache/beam/integration/nexmark/Query0Model.java ---------------------------------------------------------------------- diff --git a/integration/java/src/main/java/org/apache/beam/integration/nexmark/Query0Model.java b/integration/java/src/main/java/org/apache/beam/integration/nexmark/Query0Model.java new file mode 100644 index 0000000..f3ceca2 --- /dev/null +++ b/integration/java/src/main/java/org/apache/beam/integration/nexmark/Query0Model.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.integration.nexmark; + +import org.apache.beam.sdk.values.TimestampedValue; + +import java.util.Collection; +import java.util.Iterator; + +/** + * A direct implementation of {@link Query0}. + */ +public class Query0Model extends NexmarkQueryModel { + /** + * Simulator for query 0. + */ + private class Simulator extends AbstractSimulator<Event, Event> { + public Simulator(NexmarkConfiguration configuration) { + super(NexmarkUtils.standardEventIterator(configuration)); + } + + @Override + protected void run() { + TimestampedValue<Event> timestampedEvent = nextInput(); + if (timestampedEvent == null) { + allDone(); + return; + } + addResult(timestampedEvent); + } + } + + public Query0Model(NexmarkConfiguration configuration) { + super(configuration); + } + + @Override + protected AbstractSimulator<?, ?> simulator() { + return new Simulator(configuration); + } + + @Override + protected <T> Collection<String> toCollection(Iterator<TimestampedValue<T>> itr) { + return toValueTimestampOrder(itr); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/1f08970a/integration/java/src/main/java/org/apache/beam/integration/nexmark/Query1.java ---------------------------------------------------------------------- diff --git a/integration/java/src/main/java/org/apache/beam/integration/nexmark/Query1.java b/integration/java/src/main/java/org/apache/beam/integration/nexmark/Query1.java new file mode 100644 index 0000000..7e60b9c --- /dev/null +++ b/integration/java/src/main/java/org/apache/beam/integration/nexmark/Query1.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.integration.nexmark; + +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.values.PCollection; + +/** + * Query 1, 'Currency Conversion'. Convert each bid value from dollars to euros. + * In CQL syntax: + * + * <pre> + * SELECT Istream(auction, DOLTOEUR(price), bidder, datetime) + * FROM bid [ROWS UNBOUNDED]; + * </pre> + * + * <p>To make things more interesting, allow the 'currency conversion' to be arbitrarily + * slowed down. + */ +class Query1 extends NexmarkQuery { + public Query1(NexmarkConfiguration configuration) { + super(configuration, "Query1"); + } + + private PCollection<Bid> applyTyped(PCollection<Event> events) { + return events + // Only want the bid events. + .apply(JUST_BIDS) + + // Map the conversion function over all bids. + .apply( + ParDo.named(name + ".ToEuros") + .of(new DoFn<Bid, Bid>() { + @Override + public void processElement(ProcessContext c) { + Bid bid = c.element(); + c.output(new Bid( + bid.auction, bid.bidder, (bid.price * 89) / 100, bid.dateTime, bid.extra)); + } + })); + } + + @Override + protected PCollection<KnownSize> applyPrim(PCollection<Event> events) { + return NexmarkUtils.castToKnownSize(name, applyTyped(events)); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/1f08970a/integration/java/src/main/java/org/apache/beam/integration/nexmark/Query10.java ---------------------------------------------------------------------- diff --git a/integration/java/src/main/java/org/apache/beam/integration/nexmark/Query10.java b/integration/java/src/main/java/org/apache/beam/integration/nexmark/Query10.java new file mode 100644 index 0000000..74fb28c --- /dev/null +++ b/integration/java/src/main/java/org/apache/beam/integration/nexmark/Query10.java @@ -0,0 +1,378 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.integration.nexmark; + +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.options.GcsOptions; +import org.apache.beam.sdk.transforms.Aggregator; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.DoFnWithContext; +import org.apache.beam.sdk.transforms.GroupByKey; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.Sum.SumLongFn; +import org.apache.beam.sdk.transforms.windowing.AfterEach; +import org.apache.beam.sdk.transforms.windowing.AfterFirst; +import org.apache.beam.sdk.transforms.windowing.AfterPane; +import org.apache.beam.sdk.transforms.windowing.AfterProcessingTime; +import org.apache.beam.sdk.transforms.windowing.AfterWatermark; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.transforms.windowing.FixedWindows; +import org.apache.beam.sdk.transforms.windowing.PaneInfo; +import org.apache.beam.sdk.transforms.windowing.PaneInfo.Timing; +import org.apache.beam.sdk.transforms.windowing.Repeatedly; +import org.apache.beam.sdk.transforms.windowing.Window; +import org.apache.beam.sdk.util.GcsIOChannelFactory; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; + +import com.google.cloud.hadoop.gcsio.GoogleCloudStorageWriteChannel; +import com.google.common.base.Preconditions; + +import org.joda.time.Duration; +import org.joda.time.Instant; + +import java.io.IOException; +import java.io.OutputStream; +import java.io.Serializable; +import java.nio.channels.Channels; +import java.nio.channels.WritableByteChannel; +import java.util.concurrent.ThreadLocalRandom; +import javax.annotation.Nullable; + +/** + * Query "10", 'Log to sharded files' (Not in original suite.) + * + * <p>Every windowSizeSec, save all events from the last period into 2*maxWorkers log files. + */ +class Query10 extends NexmarkQuery { + private static final int CHANNEL_BUFFER = 8 << 20; // 8MB + private static final int NUM_SHARDS_PER_WORKER = 5; + private static final Duration LATE_BATCHING_PERIOD = Duration.standardSeconds(10); + + /** + * Capture everything we need to know about the records in a single output file. + */ + private static class OutputFile implements Serializable { + /** Maximum possible timestamp of records in file. */ + private final Instant maxTimestamp; + /** Shard within window. */ + private final String shard; + /** Index of file in all files in shard. */ + private final long index; + /** Timing of records in this file. */ + private final PaneInfo.Timing timing; + /** Path to file containing records, or {@literal null} if no output required. */ + @Nullable + private final String filename; + + public OutputFile( + Instant maxTimestamp, + String shard, + long index, + PaneInfo.Timing timing, + @Nullable String filename) { + this.maxTimestamp = maxTimestamp; + this.shard = shard; + this.index = index; + this.timing = timing; + this.filename = filename; + } + + @Override + public String toString() { + return String.format("%s %s %d %s %s\n", maxTimestamp, shard, index, timing, filename); + } + } + + /** + * GCS uri prefix for all log and 'finished' files. If null they won't be written. + */ + @Nullable + private String outputPath; + + /** + * Maximum number of workers, used to determine log sharding factor. + */ + private int maxNumWorkers; + + public Query10(NexmarkConfiguration configuration) { + super(configuration, "Query10"); + } + + public void setOutputPath(@Nullable String outputPath) { + this.outputPath = outputPath; + } + + public void setMaxNumWorkers(int maxNumWorkers) { + this.maxNumWorkers = maxNumWorkers; + } + + /** + * Return channel for writing bytes to GCS. + * + * @throws IOException + */ + private WritableByteChannel openWritableGcsFile(GcsOptions options, String filename) + throws IOException { + WritableByteChannel channel = new GcsIOChannelFactory(options).create(filename, "text/plain"); + Preconditions.checkState(channel instanceof GoogleCloudStorageWriteChannel); + ((GoogleCloudStorageWriteChannel) channel).setUploadBufferSize(CHANNEL_BUFFER); + return channel; + } + + /** Return a short string to describe {@code timing}. */ + private String timingToString(PaneInfo.Timing timing) { + switch (timing) { + case EARLY: + return "E"; + case ON_TIME: + return "O"; + case LATE: + return "L"; + } + throw new RuntimeException(); // cases are exhaustive + } + + /** Construct an {@link OutputFile} for {@code pane} in {@code window} for {@code shard}. */ + private OutputFile outputFileFor(BoundedWindow window, String shard, PaneInfo pane) { + @Nullable String filename = + outputPath == null + ? null + : String.format("%s/LOG-%s-%s-%03d-%s-%x", + outputPath, window.maxTimestamp(), shard, pane.getIndex(), + timingToString(pane.getTiming()), + ThreadLocalRandom.current().nextLong()); + return new OutputFile(window.maxTimestamp(), shard, pane.getIndex(), + pane.getTiming(), filename); + } + + /** + * Return path to which we should write the index for {@code window}, or {@literal null} + * if no output required. + */ + @Nullable + private String indexPathFor(BoundedWindow window) { + if (outputPath == null) { + return null; + } + return String.format("%s/INDEX-%s", outputPath, window.maxTimestamp()); + } + + private PCollection<Done> applyTyped(PCollection<Event> events) { + final int numLogShards = maxNumWorkers * NUM_SHARDS_PER_WORKER; + + return events + .apply(ParDo.named(name + ".ShardEvents") + .of(new DoFn<Event, KV<String, Event>>() { + final Aggregator<Long, Long> lateCounter = + createAggregator("actuallyLateEvent", new SumLongFn()); + final Aggregator<Long, Long> onTimeCounter = + createAggregator("actuallyOnTimeEvent", new SumLongFn()); + + @Override + public void processElement(ProcessContext c) { + if (c.element().hasAnnotation("LATE")) { + lateCounter.addValue(1L); + NexmarkUtils.error("Observed late: %s", c.element()); + } else { + onTimeCounter.addValue(1L); + } + int shardNum = (int) Math.abs((long) c.element().hashCode() % numLogShards); + String shard = String.format("shard-%05d-of-%05d", shardNum, numLogShards); + c.output(KV.of(shard, c.element())); + } + })) + .apply(Window.<KV<String, Event>>into( + FixedWindows.of(Duration.standardSeconds(configuration.windowSizeSec))) + .named(name + ".WindowEvents") + .triggering(AfterEach.inOrder( + Repeatedly + .forever(AfterPane.elementCountAtLeast(configuration.maxLogEvents)) + .orFinally(AfterWatermark.pastEndOfWindow()), + Repeatedly.forever( + AfterFirst.of(AfterPane.elementCountAtLeast(configuration.maxLogEvents), + AfterProcessingTime.pastFirstElementInPane() + .plusDelayOf(LATE_BATCHING_PERIOD))))) + .discardingFiredPanes() + // Use a 1 day allowed lateness so that any forgotten hold will stall the + // pipeline for that period and be very noticeable. + .withAllowedLateness(Duration.standardDays(1))) + .apply(GroupByKey.<String, Event>create()) + .apply( + ParDo.named(name + ".CheckForLateEvents") + .of(new DoFnWithContext<KV<String, Iterable<Event>>, + KV<String, Iterable<Event>>>() { + final Aggregator<Long, Long> earlyCounter = + createAggregator("earlyShard", new SumLongFn()); + final Aggregator<Long, Long> onTimeCounter = + createAggregator("onTimeShard", new SumLongFn()); + final Aggregator<Long, Long> lateCounter = + createAggregator("lateShard", new SumLongFn()); + final Aggregator<Long, Long> unexpectedLatePaneCounter = + createAggregator("ERROR_unexpectedLatePane", new SumLongFn()); + final Aggregator<Long, Long> unexpectedOnTimeElementCounter = + createAggregator("ERROR_unexpectedOnTimeElement", new SumLongFn()); + + @ProcessElement + public void processElement(ProcessContext c, BoundedWindow window) { + int numLate = 0; + int numOnTime = 0; + for (Event event : c.element().getValue()) { + if (event.hasAnnotation("LATE")) { + numLate++; + } else { + numOnTime++; + } + } + String shard = c.element().getKey(); + NexmarkUtils.error( + "%s with timestamp %s has %d actually late and %d on-time " + + "elements in pane %s for window %s", + shard, c.timestamp(), numLate, numOnTime, c.pane(), + window.maxTimestamp()); + if (c.pane().getTiming() == PaneInfo.Timing.LATE) { + if (numLate == 0) { + NexmarkUtils.error( + "ERROR! No late events in late pane for %s", shard); + unexpectedLatePaneCounter.addValue(1L); + } + if (numOnTime > 0) { + NexmarkUtils.error( + "ERROR! Have %d on-time events in late pane for %s", + numOnTime, shard); + unexpectedOnTimeElementCounter.addValue(1L); + } + lateCounter.addValue(1L); + } else if (c.pane().getTiming() == PaneInfo.Timing.EARLY) { + if (numOnTime + numLate < configuration.maxLogEvents) { + NexmarkUtils.error( + "ERROR! Only have %d events in early pane for %s", + numOnTime + numLate, shard); + } + earlyCounter.addValue(1L); + } else { + onTimeCounter.addValue(1L); + } + c.output(c.element()); + } + })) + .apply( + ParDo.named(name + ".UploadEvents") + .of(new DoFnWithContext<KV<String, Iterable<Event>>, + KV<Void, OutputFile>>() { + final Aggregator<Long, Long> savedFileCounter = + createAggregator("savedFile", new SumLongFn()); + final Aggregator<Long, Long> writtenRecordsCounter = + createAggregator("writtenRecords", new SumLongFn()); + + @ProcessElement + public void process(ProcessContext c, BoundedWindow window) throws IOException { + String shard = c.element().getKey(); + GcsOptions options = c.getPipelineOptions().as(GcsOptions.class); + OutputFile outputFile = outputFileFor(window, shard, c.pane()); + NexmarkUtils.error( + "Writing %s with record timestamp %s, window timestamp %s, pane %s", + shard, c.timestamp(), window.maxTimestamp(), c.pane()); + if (outputFile.filename != null) { + NexmarkUtils.error("Beginning write to '%s'", outputFile.filename); + int n = 0; + try (OutputStream output = + Channels.newOutputStream(openWritableGcsFile(options, outputFile + .filename))) { + for (Event event : c.element().getValue()) { + Event.CODER.encode(event, output, Coder.Context.OUTER); + writtenRecordsCounter.addValue(1L); + if (++n % 10000 == 0) { + NexmarkUtils.error("So far written %d records to '%s'", n, + outputFile.filename); + } + } + } + NexmarkUtils.error("Written all %d records to '%s'", n, outputFile.filename); + } + savedFileCounter.addValue(1L); + c.output(KV.<Void, OutputFile>of(null, outputFile)); + } + })) + // Clear fancy triggering from above. + .apply(Window.<KV<Void, OutputFile>>into( + FixedWindows.of(Duration.standardSeconds(configuration.windowSizeSec))) + .named(name + ".WindowLogFiles") + .triggering(AfterWatermark.pastEndOfWindow()) + // We expect no late data here, but we'll assume the worst so we can detect any. + .withAllowedLateness(Duration.standardDays(1)) + .discardingFiredPanes()) + .apply(GroupByKey.<Void, OutputFile>create()) + .apply( + ParDo.named(name + ".Index") + .of(new DoFnWithContext<KV<Void, Iterable<OutputFile>>, Done>() { + final Aggregator<Long, Long> unexpectedLateCounter = + createAggregator("ERROR_unexpectedLate", new SumLongFn()); + final Aggregator<Long, Long> unexpectedEarlyCounter = + createAggregator("ERROR_unexpectedEarly", new SumLongFn()); + final Aggregator<Long, Long> unexpectedIndexCounter = + createAggregator("ERROR_unexpectedIndex", new SumLongFn()); + final Aggregator<Long, Long> finalizedCounter = + createAggregator("indexed", new SumLongFn()); + + @ProcessElement + public void process(ProcessContext c, BoundedWindow window) throws IOException { + if (c.pane().getTiming() == Timing.LATE) { + unexpectedLateCounter.addValue(1L); + NexmarkUtils.error("ERROR! Unexpected LATE pane: %s", c.pane()); + } else if (c.pane().getTiming() == Timing.EARLY) { + unexpectedEarlyCounter.addValue(1L); + NexmarkUtils.error("ERROR! Unexpected EARLY pane: %s", c.pane()); + } else if (c.pane().getTiming() == Timing.ON_TIME + && c.pane().getIndex() != 0) { + unexpectedIndexCounter.addValue(1L); + NexmarkUtils.error("ERROR! Unexpected ON_TIME pane index: %s", c.pane()); + } else { + GcsOptions options = c.getPipelineOptions().as(GcsOptions.class); + NexmarkUtils.error( + "Index with record timestamp %s, window timestamp %s, pane %s", + c.timestamp(), window.maxTimestamp(), c.pane()); + + @Nullable String filename = indexPathFor(window); + if (filename != null) { + NexmarkUtils.error("Beginning write to '%s'", filename); + int n = 0; + try (OutputStream output = + Channels.newOutputStream( + openWritableGcsFile(options, filename))) { + for (OutputFile outputFile : c.element().getValue()) { + output.write(outputFile.toString().getBytes()); + n++; + } + } + NexmarkUtils.error("Written all %d lines to '%s'", n, filename); + } + c.output( + new Done("written for timestamp " + window.maxTimestamp())); + finalizedCounter.addValue(1L); + } + } + })); + } + + @Override + protected PCollection<KnownSize> applyPrim(PCollection<Event> events) { + return NexmarkUtils.castToKnownSize(name, applyTyped(events)); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/1f08970a/integration/java/src/main/java/org/apache/beam/integration/nexmark/Query11.java ---------------------------------------------------------------------- diff --git a/integration/java/src/main/java/org/apache/beam/integration/nexmark/Query11.java b/integration/java/src/main/java/org/apache/beam/integration/nexmark/Query11.java new file mode 100644 index 0000000..9841421 --- /dev/null +++ b/integration/java/src/main/java/org/apache/beam/integration/nexmark/Query11.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.integration.nexmark; + +import org.apache.beam.sdk.transforms.Count; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.windowing.AfterPane; +import org.apache.beam.sdk.transforms.windowing.Repeatedly; +import org.apache.beam.sdk.transforms.windowing.Sessions; +import org.apache.beam.sdk.transforms.windowing.Window; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; + +import org.joda.time.Duration; + +/** + * Query "11", 'User sessions' (Not in original suite.) + * + * <p>Group bids by the same user into sessions with {@code windowSizeSec} max gap. + * However limit the session to at most {@code maxLogEvents}. Emit the number of + * bids per session. + */ +class Query11 extends NexmarkQuery { + public Query11(NexmarkConfiguration configuration) { + super(configuration, "Query11"); + } + + private PCollection<BidsPerSession> applyTyped(PCollection<Event> events) { + return events.apply(JUST_BIDS) + .apply( + ParDo.named(name + ".Rekey") + .of(new DoFn<Bid, KV<Long, Void>>() { + @Override + public void processElement(ProcessContext c) { + Bid bid = c.element(); + c.output(KV.of(bid.bidder, (Void) null)); + } + })) + .apply(Window.<KV<Long, Void>>into( + Sessions.withGapDuration(Duration.standardSeconds(configuration.windowSizeSec))) + .triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(configuration.maxLogEvents))) + .discardingFiredPanes() + .withAllowedLateness(Duration.standardSeconds(configuration.occasionalDelaySec / 2))) + .apply(Count.<Long, Void>perKey()) + .apply( + ParDo.named(name + ".ToResult") + .of(new DoFn<KV<Long, Long>, BidsPerSession>() { + @Override + public void processElement(ProcessContext c) { + c.output(new BidsPerSession(c.element().getKey(), c.element().getValue())); + } + })); + } + + @Override + protected PCollection<KnownSize> applyPrim(PCollection<Event> events) { + return NexmarkUtils.castToKnownSize(name, applyTyped(events)); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/1f08970a/integration/java/src/main/java/org/apache/beam/integration/nexmark/Query12.java ---------------------------------------------------------------------- diff --git a/integration/java/src/main/java/org/apache/beam/integration/nexmark/Query12.java b/integration/java/src/main/java/org/apache/beam/integration/nexmark/Query12.java new file mode 100644 index 0000000..dd39971 --- /dev/null +++ b/integration/java/src/main/java/org/apache/beam/integration/nexmark/Query12.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.integration.nexmark; + +import org.apache.beam.sdk.transforms.Count; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.windowing.AfterProcessingTime; +import org.apache.beam.sdk.transforms.windowing.GlobalWindows; +import org.apache.beam.sdk.transforms.windowing.Repeatedly; +import org.apache.beam.sdk.transforms.windowing.Window; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; +import org.joda.time.Duration; + +/** + * Query "12", 'Processing time windows' (Not in original suite.) + * <p> + * <p>Group bids by the same user into processing time windows of windowSize. Emit the count + * of bids per window. + */ +class Query12 extends NexmarkQuery { + public Query12(NexmarkConfiguration configuration) { + super(configuration, "Query12"); + } + + private PCollection<BidsPerSession> applyTyped(PCollection<Event> events) { + return events + .apply(JUST_BIDS) + .apply( + ParDo.named(name + ".Rekey") + .of(new DoFn<Bid, KV<Long, Void>>() { + @Override + public void processElement(ProcessContext c) { + Bid bid = c.element(); + c.output(KV.of(bid.bidder, (Void) null)); + } + })) + .apply(Window.<KV<Long, Void>>into(new GlobalWindows()) + .triggering( + Repeatedly.forever( + AfterProcessingTime.pastFirstElementInPane() + .plusDelayOf( + Duration.standardSeconds(configuration.windowSizeSec)))) + .discardingFiredPanes() + .withAllowedLateness(Duration.ZERO)) + .apply(Count.<Long, Void>perKey()) + .apply( + ParDo.named(name + ".ToResult") + .of(new DoFn<KV<Long, Long>, BidsPerSession>() { + @Override + public void processElement(ProcessContext c) { + c.output( + new BidsPerSession(c.element().getKey(), c.element().getValue())); + } + })); + } + + @Override + protected PCollection<KnownSize> applyPrim(PCollection<Event> events) { + return NexmarkUtils.castToKnownSize(name, applyTyped(events)); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/1f08970a/integration/java/src/main/java/org/apache/beam/integration/nexmark/Query1Model.java ---------------------------------------------------------------------- diff --git a/integration/java/src/main/java/org/apache/beam/integration/nexmark/Query1Model.java b/integration/java/src/main/java/org/apache/beam/integration/nexmark/Query1Model.java new file mode 100644 index 0000000..462d426 --- /dev/null +++ b/integration/java/src/main/java/org/apache/beam/integration/nexmark/Query1Model.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.integration.nexmark; + +import org.apache.beam.sdk.values.TimestampedValue; + +import java.io.Serializable; +import java.util.Collection; +import java.util.Iterator; + +/** + * A direct implementation of {@link Query1}. + */ +public class Query1Model extends NexmarkQueryModel implements Serializable { + /** + * Simulator for query 1. + */ + private class Simulator extends AbstractSimulator<Event, Bid> { + public Simulator(NexmarkConfiguration configuration) { + super(NexmarkUtils.standardEventIterator(configuration)); + } + + @Override + protected void run() { + TimestampedValue<Event> timestampedEvent = nextInput(); + if (timestampedEvent == null) { + allDone(); + return; + } + Event event = timestampedEvent.getValue(); + if (event.bid == null) { + // Ignore non-bid events. + return; + } + Bid bid = event.bid; + Bid resultBid = + new Bid(bid.auction, bid.bidder, bid.price * 89 / 100, bid.dateTime, bid.extra); + TimestampedValue<Bid> result = + TimestampedValue.of(resultBid, timestampedEvent.getTimestamp()); + addResult(result); + } + } + + public Query1Model(NexmarkConfiguration configuration) { + super(configuration); + } + + @Override + public AbstractSimulator<?, ?> simulator() { + return new Simulator(configuration); + } + + @Override + protected <T> Collection<String> toCollection(Iterator<TimestampedValue<T>> itr) { + return toValueTimestampOrder(itr); + } +}
