http://git-wip-us.apache.org/repos/asf/beam/blob/f4333df7/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkUtils.java ---------------------------------------------------------------------- diff --git a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkUtils.java b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkUtils.java new file mode 100644 index 0000000..fa1ef16 --- /dev/null +++ b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkUtils.java @@ -0,0 +1,674 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.nexmark; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.hash.Hashing; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.nio.charset.StandardCharsets; +import java.util.Iterator; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.coders.AvroCoder; +import org.apache.beam.sdk.coders.ByteArrayCoder; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.CoderException; +import org.apache.beam.sdk.coders.CoderRegistry; +import org.apache.beam.sdk.coders.CustomCoder; +import org.apache.beam.sdk.coders.SerializableCoder; +import org.apache.beam.sdk.io.Read; +import org.apache.beam.sdk.metrics.Counter; +import org.apache.beam.sdk.metrics.Metrics; +import org.apache.beam.sdk.nexmark.model.Auction; +import org.apache.beam.sdk.nexmark.model.AuctionBid; +import org.apache.beam.sdk.nexmark.model.AuctionCount; +import org.apache.beam.sdk.nexmark.model.AuctionPrice; +import org.apache.beam.sdk.nexmark.model.Bid; +import org.apache.beam.sdk.nexmark.model.BidsPerSession; +import org.apache.beam.sdk.nexmark.model.CategoryPrice; +import org.apache.beam.sdk.nexmark.model.Done; +import org.apache.beam.sdk.nexmark.model.Event; +import org.apache.beam.sdk.nexmark.model.IdNameReserve; +import org.apache.beam.sdk.nexmark.model.KnownSize; +import org.apache.beam.sdk.nexmark.model.NameCityStateId; +import org.apache.beam.sdk.nexmark.model.Person; +import org.apache.beam.sdk.nexmark.model.SellerPrice; +import org.apache.beam.sdk.nexmark.sources.BoundedEventSource; +import org.apache.beam.sdk.nexmark.sources.Generator; +import org.apache.beam.sdk.nexmark.sources.GeneratorConfig; +import org.apache.beam.sdk.nexmark.sources.UnboundedEventSource; +import org.apache.beam.sdk.state.StateSpec; +import org.apache.beam.sdk.state.StateSpecs; +import org.apache.beam.sdk.state.ValueState; +import org.apache.beam.sdk.transforms.Combine; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.windowing.AfterPane; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.transforms.windowing.GlobalWindows; +import org.apache.beam.sdk.transforms.windowing.Window; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PBegin; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.TimestampedValue; +import org.joda.time.Duration; +import org.joda.time.Instant; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Odd's 'n Ends used throughout queries and driver. + */ +public class NexmarkUtils { + private static final Logger LOG = LoggerFactory.getLogger(NexmarkUtils.class); + + /** + * Mapper for (de)serializing JSON. + */ + public static final ObjectMapper MAPPER = new ObjectMapper(); + + /** + * Possible sources for events. + */ + public enum SourceType { + /** + * Produce events directly. + */ + DIRECT, + /** + * Read events from an Avro file. + */ + AVRO, + /** + * Read from a PubSub topic. It will be fed the same synthetic events by this pipeline. + */ + PUBSUB + } + + /** + * Possible sinks for query results. + */ + public enum SinkType { + /** + * Discard all results. + */ + COUNT_ONLY, + /** + * Discard all results after converting them to strings. + */ + DEVNULL, + /** + * Write to a PubSub topic. It will be drained by this pipeline. + */ + PUBSUB, + /** + * Write to a text file. Only works in batch mode. + */ + TEXT, + /** + * Write raw Events to Avro. Only works in batch mode. + */ + AVRO, + /** + * Write raw Events to BigQuery. + */ + BIGQUERY, + } + + /** + * Pub/sub mode to run in. + */ + public enum PubSubMode { + /** + * Publish events to pub/sub, but don't run the query. + */ + PUBLISH_ONLY, + /** + * Consume events from pub/sub and run the query, but don't publish. + */ + SUBSCRIBE_ONLY, + /** + * Both publish and consume, but as separate jobs. + */ + COMBINED + } + + /** + * Coder strategies. + */ + public enum CoderStrategy { + /** + * Hand-written. + */ + HAND, + /** + * Avro. + */ + AVRO, + /** + * Java serialization. + */ + JAVA + } + + /** + * How to determine resource names. + */ + public enum ResourceNameMode { + /** Names are used as provided. */ + VERBATIM, + /** Names are suffixed with the query being run. */ + QUERY, + /** Names are suffixed with the query being run and a random number. */ + QUERY_AND_SALT + } + + /** + * Units for rates. + */ + public enum RateUnit { + PER_SECOND(1_000_000L), + PER_MINUTE(60_000_000L); + + RateUnit(long usPerUnit) { + this.usPerUnit = usPerUnit; + } + + /** + * Number of microseconds per unit. + */ + private final long usPerUnit; + + /** + * Number of microseconds between events at given rate. + */ + public long rateToPeriodUs(long rate) { + return (usPerUnit + rate / 2) / rate; + } + } + + /** + * Shape of event rate. + */ + public enum RateShape { + SQUARE, + SINE; + + /** + * Number of steps used to approximate sine wave. + */ + private static final int N = 10; + + /** + * Return inter-event delay, in microseconds, for each generator + * to follow in order to achieve {@code rate} at {@code unit} using {@code numGenerators}. + */ + public long interEventDelayUs(int rate, RateUnit unit, int numGenerators) { + return unit.rateToPeriodUs(rate) * numGenerators; + } + + /** + * Return array of successive inter-event delays, in microseconds, for each generator + * to follow in order to achieve this shape with {@code firstRate/nextRate} at + * {@code unit} using {@code numGenerators}. + */ + public long[] interEventDelayUs( + int firstRate, int nextRate, RateUnit unit, int numGenerators) { + if (firstRate == nextRate) { + long[] interEventDelayUs = new long[1]; + interEventDelayUs[0] = unit.rateToPeriodUs(firstRate) * numGenerators; + return interEventDelayUs; + } + + switch (this) { + case SQUARE: { + long[] interEventDelayUs = new long[2]; + interEventDelayUs[0] = unit.rateToPeriodUs(firstRate) * numGenerators; + interEventDelayUs[1] = unit.rateToPeriodUs(nextRate) * numGenerators; + return interEventDelayUs; + } + case SINE: { + double mid = (firstRate + nextRate) / 2.0; + double amp = (firstRate - nextRate) / 2.0; // may be -ve + long[] interEventDelayUs = new long[N]; + for (int i = 0; i < N; i++) { + double r = (2.0 * Math.PI * i) / N; + double rate = mid + amp * Math.cos(r); + interEventDelayUs[i] = unit.rateToPeriodUs(Math.round(rate)) * numGenerators; + } + return interEventDelayUs; + } + } + throw new RuntimeException(); // switch should be exhaustive + } + + /** + * Return delay between steps, in seconds, for result of {@link #interEventDelayUs}, so + * as to cycle through the entire sequence every {@code ratePeriodSec}. + */ + public int stepLengthSec(int ratePeriodSec) { + int n = 0; + switch (this) { + case SQUARE: + n = 2; + break; + case SINE: + n = N; + break; + } + return (ratePeriodSec + n - 1) / n; + } + } + + /** + * Set to true to capture all info messages. The logging level flags don't currently work. + */ + private static final boolean LOG_INFO = false; + + /** + * Set to true to capture all error messages. The logging level flags don't currently work. + */ + private static final boolean LOG_ERROR = true; + + /** + * Set to true to log directly to stdout. If run using Google Dataflow, you can watch the results + * in real-time with: tail -f /var/log/dataflow/streaming-harness/harness-stdout.log + */ + private static final boolean LOG_TO_CONSOLE = false; + + /** + * Log info message. + */ + public static void info(String format, Object... args) { + if (LOG_INFO) { + LOG.info(String.format(format, args)); + if (LOG_TO_CONSOLE) { + System.out.println(String.format(format, args)); + } + } + } + + /** + * Log message to console. For client side only. + */ + public static void console(String format, Object... args) { + System.out.printf("%s %s%n", Instant.now(), String.format(format, args)); + } + + /** + * Label to use for timestamps on pub/sub messages. + */ + public static final String PUBSUB_TIMESTAMP = "timestamp"; + + /** + * Label to use for windmill ids on pub/sub messages. + */ + public static final String PUBSUB_ID = "id"; + + /** + * All events will be given a timestamp relative to this time (ms since epoch). + */ + private static final long BASE_TIME = Instant.parse("2015-07-15T00:00:00.000Z").getMillis(); + + /** + * Instants guaranteed to be strictly before and after all event timestamps, and which won't + * be subject to underflow/overflow. + */ + public static final Instant BEGINNING_OF_TIME = new Instant(0).plus(Duration.standardDays(365)); + public static final Instant END_OF_TIME = + BoundedWindow.TIMESTAMP_MAX_VALUE.minus(Duration.standardDays(365)); + + /** + * Setup pipeline with codes and some other options. + */ + public static void setupPipeline(CoderStrategy coderStrategy, Pipeline p) { + CoderRegistry registry = p.getCoderRegistry(); + switch (coderStrategy) { + case HAND: + registry.registerCoderForClass(Auction.class, Auction.CODER); + registry.registerCoderForClass(AuctionBid.class, AuctionBid.CODER); + registry.registerCoderForClass(AuctionCount.class, AuctionCount.CODER); + registry.registerCoderForClass(AuctionPrice.class, AuctionPrice.CODER); + registry.registerCoderForClass(Bid.class, Bid.CODER); + registry.registerCoderForClass(CategoryPrice.class, CategoryPrice.CODER); + registry.registerCoderForClass(Event.class, Event.CODER); + registry.registerCoderForClass(IdNameReserve.class, IdNameReserve.CODER); + registry.registerCoderForClass(NameCityStateId.class, NameCityStateId.CODER); + registry.registerCoderForClass(Person.class, Person.CODER); + registry.registerCoderForClass(SellerPrice.class, SellerPrice.CODER); + registry.registerCoderForClass(Done.class, Done.CODER); + registry.registerCoderForClass(BidsPerSession.class, BidsPerSession.CODER); + break; + case AVRO: + registry.registerCoderProvider(AvroCoder.getCoderProvider()); + break; + case JAVA: + registry.registerCoderProvider(SerializableCoder.getCoderProvider()); + break; + } + } + + /** + * Return a generator config to match the given {@code options}. + */ + private static GeneratorConfig standardGeneratorConfig(NexmarkConfiguration configuration) { + return new GeneratorConfig(configuration, + configuration.useWallclockEventTime ? System.currentTimeMillis() + : BASE_TIME, 0, + configuration.numEvents, 0); + } + + /** + * Return an iterator of events using the 'standard' generator config. + */ + public static Iterator<TimestampedValue<Event>> standardEventIterator( + NexmarkConfiguration configuration) { + return new Generator(standardGeneratorConfig(configuration)); + } + + /** + * Return a transform which yields a finite number of synthesized events generated + * as a batch. + */ + public static PTransform<PBegin, PCollection<Event>> batchEventsSource( + NexmarkConfiguration configuration) { + return Read.from(new BoundedEventSource(standardGeneratorConfig(configuration), + configuration.numEventGenerators)); + } + + /** + * Return a transform which yields a finite number of synthesized events generated + * on-the-fly in real time. + */ + public static PTransform<PBegin, PCollection<Event>> streamEventsSource( + NexmarkConfiguration configuration) { + return Read.from(new UnboundedEventSource(NexmarkUtils.standardGeneratorConfig(configuration), + configuration.numEventGenerators, + configuration.watermarkHoldbackSec, + configuration.isRateLimited)); + } + + /** + * Return a transform to pass-through events, but count them as they go by. + */ + public static ParDo.SingleOutput<Event, Event> snoop(final String name) { + return ParDo.of(new DoFn<Event, Event>() { + final Counter eventCounter = Metrics.counter(name, "events"); + final Counter newPersonCounter = Metrics.counter(name, "newPersons"); + final Counter newAuctionCounter = Metrics.counter(name, "newAuctions"); + final Counter bidCounter = Metrics.counter(name, "bids"); + final Counter endOfStreamCounter = Metrics.counter(name, "endOfStream"); + + @ProcessElement + public void processElement(ProcessContext c) { + eventCounter.inc(); + if (c.element().newPerson != null) { + newPersonCounter.inc(); + } else if (c.element().newAuction != null) { + newAuctionCounter.inc(); + } else if (c.element().bid != null) { + bidCounter.inc(); + } else { + endOfStreamCounter.inc(); + } + info("%s snooping element %s", name, c.element()); + c.output(c.element()); + } + }); + } + + /** + * Return a transform to count and discard each element. + */ + public static <T> ParDo.SingleOutput<T, Void> devNull(final String name) { + return ParDo.of(new DoFn<T, Void>() { + final Counter discardedCounterMetric = Metrics.counter(name, "discarded"); + + @ProcessElement + public void processElement(ProcessContext c) { + discardedCounterMetric.inc(); + } + }); + } + + /** + * Return a transform to log each element, passing it through unchanged. + */ + public static <T> ParDo.SingleOutput<T, T> log(final String name) { + return ParDo.of(new DoFn<T, T>() { + @ProcessElement + public void processElement(ProcessContext c) { + LOG.info("%s: %s", name, c.element()); + c.output(c.element()); + } + }); + } + + /** + * Return a transform to format each element as a string. + */ + public static <T> ParDo.SingleOutput<T, String> format(final String name) { + return ParDo.of(new DoFn<T, String>() { + final Counter recordCounterMetric = Metrics.counter(name, "records"); + + @ProcessElement + public void processElement(ProcessContext c) { + recordCounterMetric.inc(); + c.output(c.element().toString()); + } + }); + } + + /** + * Return a transform to make explicit the timestamp of each element. + */ + public static <T> ParDo.SingleOutput<T, TimestampedValue<T>> stamp(String name) { + return ParDo.of(new DoFn<T, TimestampedValue<T>>() { + @ProcessElement + public void processElement(ProcessContext c) { + c.output(TimestampedValue.of(c.element(), c.timestamp())); + } + }); + } + + /** + * Return a transform to reduce a stream to a single, order-invariant long hash. + */ + public static <T> PTransform<PCollection<T>, PCollection<Long>> hash( + final long numEvents, String name) { + return new PTransform<PCollection<T>, PCollection<Long>>(name) { + @Override + public PCollection<Long> expand(PCollection<T> input) { + return input.apply(Window.<T>into(new GlobalWindows()) + .triggering(AfterPane.elementCountAtLeast((int) numEvents)) + .withAllowedLateness(Duration.standardDays(1)) + .discardingFiredPanes()) + + .apply(name + ".Hash", ParDo.of(new DoFn<T, Long>() { + @ProcessElement + public void processElement(ProcessContext c) { + long hash = + Hashing.murmur3_128() + .newHasher() + .putLong(c.timestamp().getMillis()) + .putString(c.element().toString(), StandardCharsets.UTF_8) + .hash() + .asLong(); + c.output(hash); + } + })) + + .apply(Combine.globally(new Combine.BinaryCombineFn<Long>() { + @Override + public Long apply(Long left, Long right) { + return left ^ right; + } + })); + } + }; + } + + private static final long MASK = (1L << 16) - 1L; + private static final long HASH = 0x243F6A8885A308D3L; + private static final long INIT_PLAINTEXT = 50000L; + + /** + * Return a transform to keep the CPU busy for given milliseconds on every record. + */ + public static <T> ParDo.SingleOutput<T, T> cpuDelay(String name, final long delayMs) { + return ParDo.of(new DoFn<T, T>() { + @ProcessElement + public void processElement(ProcessContext c) { + long now = System.currentTimeMillis(); + long end = now + delayMs; + while (now < end) { + // Find plaintext which hashes to HASH in lowest MASK bits. + // Values chosen to roughly take 1ms on typical workstation. + long p = INIT_PLAINTEXT; + while (true) { + long t = Hashing.murmur3_128().hashLong(p).asLong(); + if ((t & MASK) == (HASH & MASK)) { + break; + } + p++; + } + now = System.currentTimeMillis(); + } + c.output(c.element()); + } + }); + } + + private static final int MAX_BUFFER_SIZE = 1 << 24; + + private static class DiskBusyTransform<T> extends PTransform<PCollection<T>, PCollection<T>>{ + + private long bytes; + + private DiskBusyTransform(long bytes) { + this.bytes = bytes; + } + + @Override public PCollection<T> expand(PCollection<T> input) { + // Add dummy key to be able to use State API + PCollection<KV<Integer, T>> kvCollection = input + .apply("diskBusy.keyElements", ParDo.of(new DoFn<T, KV<Integer, T>>() { + + @ProcessElement public void processElement(ProcessContext context) { + context.output(KV.of(0, context.element())); + } + })); + // Apply actual transform that generates disk IO using state API + PCollection<T> output = kvCollection + .apply("diskBusy.generateIO", ParDo.of(new DoFn<KV<Integer, T>, T>() { + + private static final String DISK_BUSY = "diskBusy"; + + @StateId(DISK_BUSY) private final StateSpec<ValueState<byte[]>> spec = StateSpecs + .value(ByteArrayCoder.of()); + + @ProcessElement public void processElement(ProcessContext c, + @StateId(DISK_BUSY) ValueState<byte[]> state) { + long remain = bytes; + long now = System.currentTimeMillis(); + while (remain > 0) { + long thisBytes = Math.min(remain, MAX_BUFFER_SIZE); + remain -= thisBytes; + byte[] arr = new byte[(int) thisBytes]; + for (int i = 0; i < thisBytes; i++) { + arr[i] = (byte) now; + } + state.write(arr); + now = System.currentTimeMillis(); + } + c.output(c.element().getValue()); + } + })); + return output; + } + } + + + /** + * Return a transform to write given number of bytes to durable store on every record. + */ + public static <T> PTransform<PCollection<T>, PCollection<T>> diskBusy(final long bytes) { + return new DiskBusyTransform<>(bytes); + } + + /** + * Return a transform to cast each element to {@link KnownSize}. + */ + private static <T extends KnownSize> ParDo.SingleOutput<T, KnownSize> castToKnownSize() { + return ParDo.of(new DoFn<T, KnownSize>() { + @ProcessElement + public void processElement(ProcessContext c) { + c.output(c.element()); + } + }); + } + + /** + * A coder for instances of {@code T} cast up to {@link KnownSize}. + * + * @param <T> True type of object. + */ + private static class CastingCoder<T extends KnownSize> extends CustomCoder<KnownSize> { + private final Coder<T> trueCoder; + + public CastingCoder(Coder<T> trueCoder) { + this.trueCoder = trueCoder; + } + + @Override + public void encode(KnownSize value, OutputStream outStream) + throws CoderException, IOException { + @SuppressWarnings("unchecked") + T typedValue = (T) value; + trueCoder.encode(typedValue, outStream); + } + + @Override + public KnownSize decode(InputStream inStream) + throws CoderException, IOException { + return trueCoder.decode(inStream); + } + } + + /** + * Return a coder for {@code KnownSize} that are known to be exactly of type {@code T}. + */ + private static <T extends KnownSize> Coder<KnownSize> makeCastingCoder(Coder<T> trueCoder) { + return new CastingCoder<>(trueCoder); + } + + /** + * Return {@code elements} as {@code KnownSize}s. + */ + public static <T extends KnownSize> PCollection<KnownSize> castToKnownSize( + final String name, PCollection<T> elements) { + return elements.apply(name + ".Forget", castToKnownSize()) + .setCoder(makeCastingCoder(elements.getCoder())); + } + + // Do not instantiate. + private NexmarkUtils() { + } +}
http://git-wip-us.apache.org/repos/asf/beam/blob/f4333df7/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/model/Auction.java ---------------------------------------------------------------------- diff --git a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/model/Auction.java b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/model/Auction.java new file mode 100644 index 0000000..6a37ade --- /dev/null +++ b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/model/Auction.java @@ -0,0 +1,187 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.nexmark.model; + +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.core.JsonProcessingException; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.io.Serializable; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.CoderException; +import org.apache.beam.sdk.coders.CustomCoder; +import org.apache.beam.sdk.coders.StringUtf8Coder; +import org.apache.beam.sdk.coders.VarLongCoder; +import org.apache.beam.sdk.nexmark.NexmarkUtils; + +/** + * An auction submitted by a person. + */ +public class Auction implements KnownSize, Serializable { + private static final Coder<Long> LONG_CODER = VarLongCoder.of(); + private static final Coder<String> STRING_CODER = StringUtf8Coder.of(); + + public static final Coder<Auction> CODER = new CustomCoder<Auction>() { + @Override + public void encode(Auction value, OutputStream outStream) + throws CoderException, IOException { + LONG_CODER.encode(value.id, outStream); + STRING_CODER.encode(value.itemName, outStream); + STRING_CODER.encode(value.description, outStream); + LONG_CODER.encode(value.initialBid, outStream); + LONG_CODER.encode(value.reserve, outStream); + LONG_CODER.encode(value.dateTime, outStream); + LONG_CODER.encode(value.expires, outStream); + LONG_CODER.encode(value.seller, outStream); + LONG_CODER.encode(value.category, outStream); + STRING_CODER.encode(value.extra, outStream); + } + + @Override + public Auction decode( + InputStream inStream) + throws CoderException, IOException { + long id = LONG_CODER.decode(inStream); + String itemName = STRING_CODER.decode(inStream); + String description = STRING_CODER.decode(inStream); + long initialBid = LONG_CODER.decode(inStream); + long reserve = LONG_CODER.decode(inStream); + long dateTime = LONG_CODER.decode(inStream); + long expires = LONG_CODER.decode(inStream); + long seller = LONG_CODER.decode(inStream); + long category = LONG_CODER.decode(inStream); + String extra = STRING_CODER.decode(inStream); + return new Auction( + id, itemName, description, initialBid, reserve, dateTime, expires, seller, category, + extra); + } + }; + + + /** Id of auction. */ + @JsonProperty + public final long id; // primary key + + /** Extra auction properties. */ + @JsonProperty + private final String itemName; + + @JsonProperty + private final String description; + + /** Initial bid price, in cents. */ + @JsonProperty + private final long initialBid; + + /** Reserve price, in cents. */ + @JsonProperty + public final long reserve; + + @JsonProperty + public final long dateTime; + + /** When does auction expire? (ms since epoch). Bids at or after this time are ignored. */ + @JsonProperty + public final long expires; + + /** Id of person who instigated auction. */ + @JsonProperty + public final long seller; // foreign key: Person.id + + /** Id of category auction is listed under. */ + @JsonProperty + public final long category; // foreign key: Category.id + + /** Additional arbitrary payload for performance testing. */ + @JsonProperty + private final String extra; + + + // For Avro only. + @SuppressWarnings("unused") + private Auction() { + id = 0; + itemName = null; + description = null; + initialBid = 0; + reserve = 0; + dateTime = 0; + expires = 0; + seller = 0; + category = 0; + extra = null; + } + + public Auction(long id, String itemName, String description, long initialBid, long reserve, + long dateTime, long expires, long seller, long category, String extra) { + this.id = id; + this.itemName = itemName; + this.description = description; + this.initialBid = initialBid; + this.reserve = reserve; + this.dateTime = dateTime; + this.expires = expires; + this.seller = seller; + this.category = category; + this.extra = extra; + } + + /** + * Return a copy of auction which capture the given annotation. + * (Used for debugging). + */ + public Auction withAnnotation(String annotation) { + return new Auction(id, itemName, description, initialBid, reserve, dateTime, expires, seller, + category, annotation + ": " + extra); + } + + /** + * Does auction have {@code annotation}? (Used for debugging.) + */ + public boolean hasAnnotation(String annotation) { + return extra.startsWith(annotation + ": "); + } + + /** + * Remove {@code annotation} from auction. (Used for debugging.) + */ + public Auction withoutAnnotation(String annotation) { + if (hasAnnotation(annotation)) { + return new Auction(id, itemName, description, initialBid, reserve, dateTime, expires, seller, + category, extra.substring(annotation.length() + 2)); + } else { + return this; + } + } + + @Override + public long sizeInBytes() { + return 8 + itemName.length() + 1 + description.length() + 1 + 8 + 8 + 8 + 8 + 8 + 8 + + extra.length() + 1; + } + + @Override + public String toString() { + try { + return NexmarkUtils.MAPPER.writeValueAsString(this); + } catch (JsonProcessingException e) { + throw new RuntimeException(e); + } + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/f4333df7/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/model/AuctionBid.java ---------------------------------------------------------------------- diff --git a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/model/AuctionBid.java b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/model/AuctionBid.java new file mode 100644 index 0000000..cb1aac5 --- /dev/null +++ b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/model/AuctionBid.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.nexmark.model; + +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.core.JsonProcessingException; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.io.Serializable; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.CoderException; +import org.apache.beam.sdk.coders.CustomCoder; +import org.apache.beam.sdk.nexmark.NexmarkUtils; +import org.apache.beam.sdk.nexmark.queries.WinningBids; + +/** + * Result of {@link WinningBids} transform. + */ +public class AuctionBid implements KnownSize, Serializable { + public static final Coder<AuctionBid> CODER = new CustomCoder<AuctionBid>() { + @Override + public void encode(AuctionBid value, OutputStream outStream) + throws CoderException, IOException { + Auction.CODER.encode(value.auction, outStream); + Bid.CODER.encode(value.bid, outStream); + } + + @Override + public AuctionBid decode( + InputStream inStream) + throws CoderException, IOException { + Auction auction = Auction.CODER.decode(inStream); + Bid bid = Bid.CODER.decode(inStream); + return new AuctionBid(auction, bid); + } + }; + + @JsonProperty + public final Auction auction; + + @JsonProperty + public final Bid bid; + + // For Avro only. + @SuppressWarnings("unused") + private AuctionBid() { + auction = null; + bid = null; + } + + public AuctionBid(Auction auction, Bid bid) { + this.auction = auction; + this.bid = bid; + } + + @Override + public long sizeInBytes() { + return auction.sizeInBytes() + bid.sizeInBytes(); + } + + @Override + public String toString() { + try { + return NexmarkUtils.MAPPER.writeValueAsString(this); + } catch (JsonProcessingException e) { + throw new RuntimeException(e); + } + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/f4333df7/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/model/AuctionCount.java ---------------------------------------------------------------------- diff --git a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/model/AuctionCount.java b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/model/AuctionCount.java new file mode 100644 index 0000000..4d15d25 --- /dev/null +++ b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/model/AuctionCount.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.nexmark.model; + +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.core.JsonProcessingException; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.io.Serializable; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.CoderException; +import org.apache.beam.sdk.coders.CustomCoder; +import org.apache.beam.sdk.coders.VarLongCoder; +import org.apache.beam.sdk.nexmark.NexmarkUtils; + +/** + * Result of Query5. + */ +public class AuctionCount implements KnownSize, Serializable { + private static final Coder<Long> LONG_CODER = VarLongCoder.of(); + + public static final Coder<AuctionCount> CODER = new CustomCoder<AuctionCount>() { + @Override + public void encode(AuctionCount value, OutputStream outStream) + throws CoderException, IOException { + LONG_CODER.encode(value.auction, outStream); + LONG_CODER.encode(value.count, outStream); + } + + @Override + public AuctionCount decode(InputStream inStream) + throws CoderException, IOException { + long auction = LONG_CODER.decode(inStream); + long count = LONG_CODER.decode(inStream); + return new AuctionCount(auction, count); + } + }; + + @JsonProperty private final long auction; + + @JsonProperty private final long count; + + // For Avro only. + @SuppressWarnings("unused") + private AuctionCount() { + auction = 0; + count = 0; + } + + public AuctionCount(long auction, long count) { + this.auction = auction; + this.count = count; + } + + @Override + public long sizeInBytes() { + return 8 + 8; + } + + @Override + public String toString() { + try { + return NexmarkUtils.MAPPER.writeValueAsString(this); + } catch (JsonProcessingException e) { + throw new RuntimeException(e); + } + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/f4333df7/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/model/AuctionPrice.java ---------------------------------------------------------------------- diff --git a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/model/AuctionPrice.java b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/model/AuctionPrice.java new file mode 100644 index 0000000..f4fe881 --- /dev/null +++ b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/model/AuctionPrice.java @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.nexmark.model; + +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.core.JsonProcessingException; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.io.Serializable; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.CoderException; +import org.apache.beam.sdk.coders.CustomCoder; +import org.apache.beam.sdk.coders.VarLongCoder; +import org.apache.beam.sdk.nexmark.NexmarkUtils; + +/** + * Result of Query2. + */ +public class AuctionPrice implements KnownSize, Serializable { + private static final Coder<Long> LONG_CODER = VarLongCoder.of(); + + public static final Coder<AuctionPrice> CODER = new CustomCoder<AuctionPrice>() { + @Override + public void encode(AuctionPrice value, OutputStream outStream) + throws CoderException, IOException { + LONG_CODER.encode(value.auction, outStream); + LONG_CODER.encode(value.price, outStream); + } + + @Override + public AuctionPrice decode( + InputStream inStream) + throws CoderException, IOException { + long auction = LONG_CODER.decode(inStream); + long price = LONG_CODER.decode(inStream); + return new AuctionPrice(auction, price); + } + }; + + @JsonProperty + private final long auction; + + /** Price in cents. */ + @JsonProperty + private final long price; + + // For Avro only. + @SuppressWarnings("unused") + private AuctionPrice() { + auction = 0; + price = 0; + } + + public AuctionPrice(long auction, long price) { + this.auction = auction; + this.price = price; + } + + @Override + public long sizeInBytes() { + return 8 + 8; + } + + @Override + public String toString() { + try { + return NexmarkUtils.MAPPER.writeValueAsString(this); + } catch (JsonProcessingException e) { + throw new RuntimeException(e); + } + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/f4333df7/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/model/Bid.java ---------------------------------------------------------------------- diff --git a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/model/Bid.java b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/model/Bid.java new file mode 100644 index 0000000..b465e62 --- /dev/null +++ b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/model/Bid.java @@ -0,0 +1,177 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.nexmark.model; + +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.core.JsonProcessingException; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.io.Serializable; +import java.util.Comparator; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.CoderException; +import org.apache.beam.sdk.coders.CustomCoder; +import org.apache.beam.sdk.coders.StringUtf8Coder; +import org.apache.beam.sdk.coders.VarLongCoder; +import org.apache.beam.sdk.nexmark.NexmarkUtils; + +/** + * A bid for an item on auction. + */ +public class Bid implements KnownSize, Serializable { + private static final Coder<Long> LONG_CODER = VarLongCoder.of(); + private static final Coder<String> STRING_CODER = StringUtf8Coder.of(); + + public static final Coder<Bid> CODER = new CustomCoder<Bid>() { + @Override + public void encode(Bid value, OutputStream outStream) + throws CoderException, IOException { + LONG_CODER.encode(value.auction, outStream); + LONG_CODER.encode(value.bidder, outStream); + LONG_CODER.encode(value.price, outStream); + LONG_CODER.encode(value.dateTime, outStream); + STRING_CODER.encode(value.extra, outStream); + } + + @Override + public Bid decode( + InputStream inStream) + throws CoderException, IOException { + long auction = LONG_CODER.decode(inStream); + long bidder = LONG_CODER.decode(inStream); + long price = LONG_CODER.decode(inStream); + long dateTime = LONG_CODER.decode(inStream); + String extra = STRING_CODER.decode(inStream); + return new Bid(auction, bidder, price, dateTime, extra); + } + + @Override public void verifyDeterministic() throws NonDeterministicException {} + }; + + /** + * Comparator to order bids by ascending price then descending time + * (for finding winning bids). + */ + public static final Comparator<Bid> PRICE_THEN_DESCENDING_TIME = new Comparator<Bid>() { + @Override + public int compare(Bid left, Bid right) { + int i = Double.compare(left.price, right.price); + if (i != 0) { + return i; + } + return Long.compare(right.dateTime, left.dateTime); + } + }; + + /** + * Comparator to order bids by ascending time then ascending price. + * (for finding most recent bids). + */ + public static final Comparator<Bid> ASCENDING_TIME_THEN_PRICE = new Comparator<Bid>() { + @Override + public int compare(Bid left, Bid right) { + int i = Long.compare(left.dateTime, right.dateTime); + if (i != 0) { + return i; + } + return Double.compare(left.price, right.price); + } + }; + + /** Id of auction this bid is for. */ + @JsonProperty + public final long auction; // foreign key: Auction.id + + /** Id of person bidding in auction. */ + @JsonProperty + public final long bidder; // foreign key: Person.id + + /** Price of bid, in cents. */ + @JsonProperty + public final long price; + + /** + * Instant at which bid was made (ms since epoch). + * NOTE: This may be earlier than the system's event time. + */ + @JsonProperty + public final long dateTime; + + /** Additional arbitrary payload for performance testing. */ + @JsonProperty + public final String extra; + + // For Avro only. + @SuppressWarnings("unused") + private Bid() { + auction = 0; + bidder = 0; + price = 0; + dateTime = 0; + extra = null; + } + + public Bid(long auction, long bidder, long price, long dateTime, String extra) { + this.auction = auction; + this.bidder = bidder; + this.price = price; + this.dateTime = dateTime; + this.extra = extra; + } + + /** + * Return a copy of bid which capture the given annotation. + * (Used for debugging). + */ + public Bid withAnnotation(String annotation) { + return new Bid(auction, bidder, price, dateTime, annotation + ": " + extra); + } + + /** + * Does bid have {@code annotation}? (Used for debugging.) + */ + public boolean hasAnnotation(String annotation) { + return extra.startsWith(annotation + ": "); + } + + /** + * Remove {@code annotation} from bid. (Used for debugging.) + */ + public Bid withoutAnnotation(String annotation) { + if (hasAnnotation(annotation)) { + return new Bid(auction, bidder, price, dateTime, extra.substring(annotation.length() + 2)); + } else { + return this; + } + } + + @Override + public long sizeInBytes() { + return 8 + 8 + 8 + 8 + extra.length() + 1; + } + + @Override + public String toString() { + try { + return NexmarkUtils.MAPPER.writeValueAsString(this); + } catch (JsonProcessingException e) { + throw new RuntimeException(e); + } + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/f4333df7/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/model/BidsPerSession.java ---------------------------------------------------------------------- diff --git a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/model/BidsPerSession.java b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/model/BidsPerSession.java new file mode 100644 index 0000000..84e23e7 --- /dev/null +++ b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/model/BidsPerSession.java @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.nexmark.model; + +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.core.JsonProcessingException; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.io.Serializable; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.CoderException; +import org.apache.beam.sdk.coders.CustomCoder; +import org.apache.beam.sdk.coders.VarLongCoder; +import org.apache.beam.sdk.nexmark.NexmarkUtils; + +/** + * Result of query 11. + */ +public class BidsPerSession implements KnownSize, Serializable { + private static final Coder<Long> LONG_CODER = VarLongCoder.of(); + + public static final Coder<BidsPerSession> CODER = new CustomCoder<BidsPerSession>() { + @Override + public void encode(BidsPerSession value, OutputStream outStream) + throws CoderException, IOException { + LONG_CODER.encode(value.personId, outStream); + LONG_CODER.encode(value.bidsPerSession, outStream); + } + + @Override + public BidsPerSession decode( + InputStream inStream) + throws CoderException, IOException { + long personId = LONG_CODER.decode(inStream); + long bidsPerSession = LONG_CODER.decode(inStream); + return new BidsPerSession(personId, bidsPerSession); + } + @Override public void verifyDeterministic() throws NonDeterministicException {} + }; + + @JsonProperty + private final long personId; + + @JsonProperty + private final long bidsPerSession; + + public BidsPerSession() { + personId = 0; + bidsPerSession = 0; + } + + public BidsPerSession(long personId, long bidsPerSession) { + this.personId = personId; + this.bidsPerSession = bidsPerSession; + } + + @Override + public long sizeInBytes() { + // Two longs. + return 8 + 8; + } + + @Override + public String toString() { + try { + return NexmarkUtils.MAPPER.writeValueAsString(this); + } catch (JsonProcessingException e) { + throw new RuntimeException(e); + } + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/f4333df7/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/model/CategoryPrice.java ---------------------------------------------------------------------- diff --git a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/model/CategoryPrice.java b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/model/CategoryPrice.java new file mode 100644 index 0000000..3b33635 --- /dev/null +++ b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/model/CategoryPrice.java @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.nexmark.model; + +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.core.JsonProcessingException; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.io.Serializable; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.CoderException; +import org.apache.beam.sdk.coders.CustomCoder; +import org.apache.beam.sdk.coders.VarIntCoder; +import org.apache.beam.sdk.coders.VarLongCoder; +import org.apache.beam.sdk.nexmark.NexmarkUtils; + +/** + * Result of Query4. + */ +public class CategoryPrice implements KnownSize, Serializable { + private static final Coder<Long> LONG_CODER = VarLongCoder.of(); + private static final Coder<Integer> INT_CODER = VarIntCoder.of(); + + public static final Coder<CategoryPrice> CODER = new CustomCoder<CategoryPrice>() { + @Override + public void encode(CategoryPrice value, OutputStream outStream) + throws CoderException, IOException { + LONG_CODER.encode(value.category, outStream); + LONG_CODER.encode(value.price, outStream); + INT_CODER.encode(value.isLast ? 1 : 0, outStream); + } + + @Override + public CategoryPrice decode(InputStream inStream) + throws CoderException, IOException { + long category = LONG_CODER.decode(inStream); + long price = LONG_CODER.decode(inStream); + boolean isLast = INT_CODER.decode(inStream) != 0; + return new CategoryPrice(category, price, isLast); + } + @Override public void verifyDeterministic() throws NonDeterministicException {} + }; + + @JsonProperty + public final long category; + + /** Price in cents. */ + @JsonProperty + public final long price; + + @JsonProperty + public final boolean isLast; + + // For Avro only. + @SuppressWarnings("unused") + private CategoryPrice() { + category = 0; + price = 0; + isLast = false; + } + + public CategoryPrice(long category, long price, boolean isLast) { + this.category = category; + this.price = price; + this.isLast = isLast; + } + + @Override + public long sizeInBytes() { + return 8 + 8 + 1; + } + + @Override + public String toString() { + try { + return NexmarkUtils.MAPPER.writeValueAsString(this); + } catch (JsonProcessingException e) { + throw new RuntimeException(e); + } + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/f4333df7/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/model/Done.java ---------------------------------------------------------------------- diff --git a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/model/Done.java b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/model/Done.java new file mode 100644 index 0000000..e285041 --- /dev/null +++ b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/model/Done.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.nexmark.model; + +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.core.JsonProcessingException; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.io.Serializable; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.CoderException; +import org.apache.beam.sdk.coders.CustomCoder; +import org.apache.beam.sdk.coders.StringUtf8Coder; +import org.apache.beam.sdk.nexmark.NexmarkUtils; + +/** + * Result of query 10. + */ +public class Done implements KnownSize, Serializable { + private static final Coder<String> STRING_CODER = StringUtf8Coder.of(); + + public static final Coder<Done> CODER = new CustomCoder<Done>() { + @Override + public void encode(Done value, OutputStream outStream) + throws CoderException, IOException { + STRING_CODER.encode(value.message, outStream); + } + + @Override + public Done decode(InputStream inStream) + throws CoderException, IOException { + String message = STRING_CODER.decode(inStream); + return new Done(message); + } + @Override public void verifyDeterministic() throws NonDeterministicException {} + }; + + @JsonProperty + private final String message; + + // For Avro only. + @SuppressWarnings("unused") + public Done() { + message = null; + } + + public Done(String message) { + this.message = message; + } + + @Override + public long sizeInBytes() { + return message.length(); + } + + @Override + public String toString() { + try { + return NexmarkUtils.MAPPER.writeValueAsString(this); + } catch (JsonProcessingException e) { + throw new RuntimeException(e); + } + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/f4333df7/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/model/Event.java ---------------------------------------------------------------------- diff --git a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/model/Event.java b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/model/Event.java new file mode 100644 index 0000000..880cfe4 --- /dev/null +++ b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/model/Event.java @@ -0,0 +1,171 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.nexmark.model; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.io.Serializable; +import javax.annotation.Nullable; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.CustomCoder; +import org.apache.beam.sdk.coders.VarIntCoder; + +/** + * An event in the auction system, either a (new) {@link Person}, a (new) {@link Auction}, or a + * {@link Bid}. + */ +public class Event implements KnownSize, Serializable { + private enum Tag { + PERSON(0), + AUCTION(1), + BID(2); + + private int value = -1; + + Tag(int value){ + this.value = value; + } + } + private static final Coder<Integer> INT_CODER = VarIntCoder.of(); + + public static final Coder<Event> CODER = + new CustomCoder<Event>() { + @Override + public void encode(Event value, OutputStream outStream) throws IOException { + if (value.newPerson != null) { + INT_CODER.encode(Tag.PERSON.value, outStream); + Person.CODER.encode(value.newPerson, outStream); + } else if (value.newAuction != null) { + INT_CODER.encode(Tag.AUCTION.value, outStream); + Auction.CODER.encode(value.newAuction, outStream); + } else if (value.bid != null) { + INT_CODER.encode(Tag.BID.value, outStream); + Bid.CODER.encode(value.bid, outStream); + } else { + throw new RuntimeException("invalid event"); + } + } + + @Override + public Event decode(InputStream inStream) throws IOException { + int tag = INT_CODER.decode(inStream); + if (tag == Tag.PERSON.value) { + Person person = Person.CODER.decode(inStream); + return new Event(person); + } else if (tag == Tag.AUCTION.value) { + Auction auction = Auction.CODER.decode(inStream); + return new Event(auction); + } else if (tag == Tag.BID.value) { + Bid bid = Bid.CODER.decode(inStream); + return new Event(bid); + } else { + throw new RuntimeException("invalid event encoding"); + } + } + + @Override + public void verifyDeterministic() throws NonDeterministicException {} + }; + + @Nullable + @org.apache.avro.reflect.Nullable + public final Person newPerson; + + @Nullable + @org.apache.avro.reflect.Nullable + public final Auction newAuction; + + @Nullable + @org.apache.avro.reflect.Nullable + public final Bid bid; + + // For Avro only. + @SuppressWarnings("unused") + private Event() { + newPerson = null; + newAuction = null; + bid = null; + } + + public Event(Person newPerson) { + this.newPerson = newPerson; + newAuction = null; + bid = null; + } + + public Event(Auction newAuction) { + newPerson = null; + this.newAuction = newAuction; + bid = null; + } + + public Event(Bid bid) { + newPerson = null; + newAuction = null; + this.bid = bid; + } + + /** Return a copy of event which captures {@code annotation}. (Used for debugging). */ + public Event withAnnotation(String annotation) { + if (newPerson != null) { + return new Event(newPerson.withAnnotation(annotation)); + } else if (newAuction != null) { + return new Event(newAuction.withAnnotation(annotation)); + } else { + return new Event(bid.withAnnotation(annotation)); + } + } + + /** Does event have {@code annotation}? (Used for debugging.) */ + public boolean hasAnnotation(String annotation) { + if (newPerson != null) { + return newPerson.hasAnnotation(annotation); + } else if (newAuction != null) { + return newAuction.hasAnnotation(annotation); + } else { + return bid.hasAnnotation(annotation); + } + } + + @Override + public long sizeInBytes() { + if (newPerson != null) { + return 1 + newPerson.sizeInBytes(); + } else if (newAuction != null) { + return 1 + newAuction.sizeInBytes(); + } else if (bid != null) { + return 1 + bid.sizeInBytes(); + } else { + throw new RuntimeException("invalid event"); + } + } + + @Override + public String toString() { + if (newPerson != null) { + return newPerson.toString(); + } else if (newAuction != null) { + return newAuction.toString(); + } else if (bid != null) { + return bid.toString(); + } else { + throw new RuntimeException("invalid event"); + } + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/f4333df7/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/model/IdNameReserve.java ---------------------------------------------------------------------- diff --git a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/model/IdNameReserve.java b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/model/IdNameReserve.java new file mode 100644 index 0000000..0519f5d --- /dev/null +++ b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/model/IdNameReserve.java @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.nexmark.model; + +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.core.JsonProcessingException; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.io.Serializable; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.CoderException; +import org.apache.beam.sdk.coders.CustomCoder; +import org.apache.beam.sdk.coders.StringUtf8Coder; +import org.apache.beam.sdk.coders.VarLongCoder; +import org.apache.beam.sdk.nexmark.NexmarkUtils; + +/** + * Result type of Query8. + */ +public class IdNameReserve implements KnownSize, Serializable { + private static final Coder<Long> LONG_CODER = VarLongCoder.of(); + private static final Coder<String> STRING_CODER = StringUtf8Coder.of(); + + public static final Coder<IdNameReserve> CODER = new CustomCoder<IdNameReserve>() { + @Override + public void encode(IdNameReserve value, OutputStream outStream) + throws CoderException, IOException { + LONG_CODER.encode(value.id, outStream); + STRING_CODER.encode(value.name, outStream); + LONG_CODER.encode(value.reserve, outStream); + } + + @Override + public IdNameReserve decode( + InputStream inStream) + throws CoderException, IOException { + long id = LONG_CODER.decode(inStream); + String name = STRING_CODER.decode(inStream); + long reserve = LONG_CODER.decode(inStream); + return new IdNameReserve(id, name, reserve); + } + @Override public void verifyDeterministic() throws NonDeterministicException {} + }; + + @JsonProperty + private final long id; + + @JsonProperty + private final String name; + + /** Reserve price in cents. */ + @JsonProperty + private final long reserve; + + // For Avro only. + @SuppressWarnings("unused") + private IdNameReserve() { + id = 0; + name = null; + reserve = 0; + } + + public IdNameReserve(long id, String name, long reserve) { + this.id = id; + this.name = name; + this.reserve = reserve; + } + + @Override + public long sizeInBytes() { + return 8 + name.length() + 1 + 8; + } + + @Override + public String toString() { + try { + return NexmarkUtils.MAPPER.writeValueAsString(this); + } catch (JsonProcessingException e) { + throw new RuntimeException(e); + } + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/f4333df7/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/model/KnownSize.java ---------------------------------------------------------------------- diff --git a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/model/KnownSize.java b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/model/KnownSize.java new file mode 100644 index 0000000..45af3fc --- /dev/null +++ b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/model/KnownSize.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.nexmark.model; + +/** + * Interface for elements which can quickly estimate their encoded byte size. + */ +public interface KnownSize { + long sizeInBytes(); +} + http://git-wip-us.apache.org/repos/asf/beam/blob/f4333df7/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/model/NameCityStateId.java ---------------------------------------------------------------------- diff --git a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/model/NameCityStateId.java b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/model/NameCityStateId.java new file mode 100644 index 0000000..55fca62 --- /dev/null +++ b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/model/NameCityStateId.java @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.nexmark.model; + +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.core.JsonProcessingException; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.io.Serializable; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.CoderException; +import org.apache.beam.sdk.coders.CustomCoder; +import org.apache.beam.sdk.coders.StringUtf8Coder; +import org.apache.beam.sdk.coders.VarLongCoder; +import org.apache.beam.sdk.nexmark.NexmarkUtils; + +/** + * Result of Query3. + */ +public class NameCityStateId implements KnownSize, Serializable { + private static final Coder<Long> LONG_CODER = VarLongCoder.of(); + private static final Coder<String> STRING_CODER = StringUtf8Coder.of(); + + public static final Coder<NameCityStateId> CODER = new CustomCoder<NameCityStateId>() { + @Override + public void encode(NameCityStateId value, OutputStream outStream) + throws CoderException, IOException { + STRING_CODER.encode(value.name, outStream); + STRING_CODER.encode(value.city, outStream); + STRING_CODER.encode(value.state, outStream); + LONG_CODER.encode(value.id, outStream); + } + + @Override + public NameCityStateId decode(InputStream inStream) + throws CoderException, IOException { + String name = STRING_CODER.decode(inStream); + String city = STRING_CODER.decode(inStream); + String state = STRING_CODER.decode(inStream); + long id = LONG_CODER.decode(inStream); + return new NameCityStateId(name, city, state, id); + } + @Override public void verifyDeterministic() throws NonDeterministicException {} + }; + + @JsonProperty + private final String name; + + @JsonProperty + private final String city; + + @JsonProperty + private final String state; + + @JsonProperty + private final long id; + + // For Avro only. + @SuppressWarnings("unused") + private NameCityStateId() { + name = null; + city = null; + state = null; + id = 0; + } + + public NameCityStateId(String name, String city, String state, long id) { + this.name = name; + this.city = city; + this.state = state; + this.id = id; + } + + @Override + public long sizeInBytes() { + return name.length() + 1 + city.length() + 1 + state.length() + 1 + 8; + } + + @Override + public String toString() { + try { + return NexmarkUtils.MAPPER.writeValueAsString(this); + } catch (JsonProcessingException e) { + throw new RuntimeException(e); + } + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/f4333df7/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/model/Person.java ---------------------------------------------------------------------- diff --git a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/model/Person.java b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/model/Person.java new file mode 100644 index 0000000..800f937 --- /dev/null +++ b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/model/Person.java @@ -0,0 +1,163 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.nexmark.model; + +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.core.JsonProcessingException; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.io.Serializable; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.CoderException; +import org.apache.beam.sdk.coders.CustomCoder; +import org.apache.beam.sdk.coders.StringUtf8Coder; +import org.apache.beam.sdk.coders.VarLongCoder; +import org.apache.beam.sdk.nexmark.NexmarkUtils; + +/** + * A person either creating an auction or making a bid. + */ +public class Person implements KnownSize, Serializable { + private static final Coder<Long> LONG_CODER = VarLongCoder.of(); + private static final Coder<String> STRING_CODER = StringUtf8Coder.of(); + public static final Coder<Person> CODER = new CustomCoder<Person>() { + @Override + public void encode(Person value, OutputStream outStream) + throws CoderException, IOException { + LONG_CODER.encode(value.id, outStream); + STRING_CODER.encode(value.name, outStream); + STRING_CODER.encode(value.emailAddress, outStream); + STRING_CODER.encode(value.creditCard, outStream); + STRING_CODER.encode(value.city, outStream); + STRING_CODER.encode(value.state, outStream); + LONG_CODER.encode(value.dateTime, outStream); + STRING_CODER.encode(value.extra, outStream); + } + + @Override + public Person decode(InputStream inStream) + throws CoderException, IOException { + long id = LONG_CODER.decode(inStream); + String name = STRING_CODER.decode(inStream); + String emailAddress = STRING_CODER.decode(inStream); + String creditCard = STRING_CODER.decode(inStream); + String city = STRING_CODER.decode(inStream); + String state = STRING_CODER.decode(inStream); + long dateTime = LONG_CODER.decode(inStream); + String extra = STRING_CODER.decode(inStream); + return new Person(id, name, emailAddress, creditCard, city, state, dateTime, extra); + } + @Override public void verifyDeterministic() throws NonDeterministicException {} + }; + + /** Id of person. */ + @JsonProperty + public final long id; // primary key + + /** Extra person properties. */ + @JsonProperty + public final String name; + + @JsonProperty + private final String emailAddress; + + @JsonProperty + private final String creditCard; + + @JsonProperty + public final String city; + + @JsonProperty + public final String state; + + @JsonProperty + public final long dateTime; + + /** Additional arbitrary payload for performance testing. */ + @JsonProperty + private final String extra; + + // For Avro only. + @SuppressWarnings("unused") + private Person() { + id = 0; + name = null; + emailAddress = null; + creditCard = null; + city = null; + state = null; + dateTime = 0; + extra = null; + } + + public Person(long id, String name, String emailAddress, String creditCard, String city, + String state, long dateTime, String extra) { + this.id = id; + this.name = name; + this.emailAddress = emailAddress; + this.creditCard = creditCard; + this.city = city; + this.state = state; + this.dateTime = dateTime; + this.extra = extra; + } + + /** + * Return a copy of person which capture the given annotation. + * (Used for debugging). + */ + public Person withAnnotation(String annotation) { + return new Person(id, name, emailAddress, creditCard, city, state, dateTime, + annotation + ": " + extra); + } + + /** + * Does person have {@code annotation}? (Used for debugging.) + */ + public boolean hasAnnotation(String annotation) { + return extra.startsWith(annotation + ": "); + } + + /** + * Remove {@code annotation} from person. (Used for debugging.) + */ + public Person withoutAnnotation(String annotation) { + if (hasAnnotation(annotation)) { + return new Person(id, name, emailAddress, creditCard, city, state, dateTime, + extra.substring(annotation.length() + 2)); + } else { + return this; + } + } + + @Override + public long sizeInBytes() { + return 8 + name.length() + 1 + emailAddress.length() + 1 + creditCard.length() + 1 + + city.length() + 1 + state.length() + 8 + 1 + extra.length() + 1; + } + + @Override + public String toString() { + try { + return NexmarkUtils.MAPPER.writeValueAsString(this); + } catch (JsonProcessingException e) { + throw new RuntimeException(e); + } + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/f4333df7/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/model/SellerPrice.java ---------------------------------------------------------------------- diff --git a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/model/SellerPrice.java b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/model/SellerPrice.java new file mode 100644 index 0000000..82b551c --- /dev/null +++ b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/model/SellerPrice.java @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.nexmark.model; + +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.core.JsonProcessingException; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.io.Serializable; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.CoderException; +import org.apache.beam.sdk.coders.CustomCoder; +import org.apache.beam.sdk.coders.VarLongCoder; +import org.apache.beam.sdk.nexmark.NexmarkUtils; + +/** + * Result of Query6. + */ +public class SellerPrice implements KnownSize, Serializable { + private static final Coder<Long> LONG_CODER = VarLongCoder.of(); + + public static final Coder<SellerPrice> CODER = new CustomCoder<SellerPrice>() { + @Override + public void encode(SellerPrice value, OutputStream outStream) + throws CoderException, IOException { + LONG_CODER.encode(value.seller, outStream); + LONG_CODER.encode(value.price, outStream); + } + + @Override + public SellerPrice decode( + InputStream inStream) + throws CoderException, IOException { + long seller = LONG_CODER.decode(inStream); + long price = LONG_CODER.decode(inStream); + return new SellerPrice(seller, price); + } + @Override public void verifyDeterministic() throws NonDeterministicException {} + }; + + @JsonProperty + public final long seller; + + /** Price in cents. */ + @JsonProperty + private final long price; + + // For Avro only. + @SuppressWarnings("unused") + private SellerPrice() { + seller = 0; + price = 0; + } + + public SellerPrice(long seller, long price) { + this.seller = seller; + this.price = price; + } + + @Override + public long sizeInBytes() { + return 8 + 8; + } + + @Override + public String toString() { + try { + return NexmarkUtils.MAPPER.writeValueAsString(this); + } catch (JsonProcessingException e) { + throw new RuntimeException(e); + } + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/f4333df7/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/model/package-info.java ---------------------------------------------------------------------- diff --git a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/model/package-info.java b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/model/package-info.java new file mode 100644 index 0000000..3b4bb63 --- /dev/null +++ b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/model/package-info.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Nexmark Benchmark Model. + */ +package org.apache.beam.sdk.nexmark.model; http://git-wip-us.apache.org/repos/asf/beam/blob/f4333df7/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/package-info.java ---------------------------------------------------------------------- diff --git a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/package-info.java b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/package-info.java new file mode 100644 index 0000000..7500a24 --- /dev/null +++ b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/package-info.java @@ -0,0 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/** + * Nexmark. + */ +package org.apache.beam.sdk.nexmark;
