http://git-wip-us.apache.org/repos/asf/beam/blob/f4333df7/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkUtils.java ---------------------------------------------------------------------- diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkUtils.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkUtils.java deleted file mode 100644 index 7926690..0000000 --- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkUtils.java +++ /dev/null @@ -1,672 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.integration.nexmark; - -import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.common.hash.Hashing; -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.nio.charset.StandardCharsets; -import java.util.Iterator; -import org.apache.beam.integration.nexmark.model.Auction; -import org.apache.beam.integration.nexmark.model.AuctionBid; -import org.apache.beam.integration.nexmark.model.AuctionCount; -import org.apache.beam.integration.nexmark.model.AuctionPrice; -import org.apache.beam.integration.nexmark.model.Bid; -import org.apache.beam.integration.nexmark.model.BidsPerSession; -import org.apache.beam.integration.nexmark.model.CategoryPrice; -import org.apache.beam.integration.nexmark.model.Done; -import org.apache.beam.integration.nexmark.model.Event; -import org.apache.beam.integration.nexmark.model.IdNameReserve; -import org.apache.beam.integration.nexmark.model.KnownSize; -import org.apache.beam.integration.nexmark.model.NameCityStateId; -import org.apache.beam.integration.nexmark.model.Person; -import org.apache.beam.integration.nexmark.model.SellerPrice; -import org.apache.beam.integration.nexmark.sources.BoundedEventSource; -import org.apache.beam.integration.nexmark.sources.Generator; -import org.apache.beam.integration.nexmark.sources.GeneratorConfig; -import org.apache.beam.integration.nexmark.sources.UnboundedEventSource; -import org.apache.beam.sdk.Pipeline; -import org.apache.beam.sdk.coders.AvroCoder; -import org.apache.beam.sdk.coders.ByteArrayCoder; -import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.coders.CoderException; -import org.apache.beam.sdk.coders.CoderRegistry; -import org.apache.beam.sdk.coders.CustomCoder; -import org.apache.beam.sdk.coders.SerializableCoder; -import org.apache.beam.sdk.io.Read; -import org.apache.beam.sdk.metrics.Counter; -import org.apache.beam.sdk.metrics.Metrics; -import org.apache.beam.sdk.state.StateSpec; -import org.apache.beam.sdk.state.StateSpecs; -import org.apache.beam.sdk.state.ValueState; -import org.apache.beam.sdk.transforms.Combine; -import org.apache.beam.sdk.transforms.DoFn; -import org.apache.beam.sdk.transforms.PTransform; -import org.apache.beam.sdk.transforms.ParDo; -import org.apache.beam.sdk.transforms.windowing.AfterPane; -import org.apache.beam.sdk.transforms.windowing.BoundedWindow; -import org.apache.beam.sdk.transforms.windowing.GlobalWindows; -import org.apache.beam.sdk.transforms.windowing.Window; -import org.apache.beam.sdk.values.KV; -import org.apache.beam.sdk.values.PBegin; -import org.apache.beam.sdk.values.PCollection; -import org.apache.beam.sdk.values.TimestampedValue; -import org.joda.time.Duration; -import org.joda.time.Instant; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Odd's 'n Ends used throughout queries and driver. - */ -public class NexmarkUtils { - private static final Logger LOG = LoggerFactory.getLogger(NexmarkUtils.class); - - /** - * Mapper for (de)serializing JSON. - */ - public static final ObjectMapper MAPPER = new ObjectMapper(); - - /** - * Possible sources for events. - */ - public enum SourceType { - /** - * Produce events directly. - */ - DIRECT, - /** - * Read events from an Avro file. - */ - AVRO, - /** - * Read from a PubSub topic. It will be fed the same synthetic events by this pipeline. - */ - PUBSUB - } - - /** - * Possible sinks for query results. - */ - public enum SinkType { - /** - * Discard all results. - */ - COUNT_ONLY, - /** - * Discard all results after converting them to strings. - */ - DEVNULL, - /** - * Write to a PubSub topic. It will be drained by this pipeline. - */ - PUBSUB, - /** - * Write to a text file. Only works in batch mode. - */ - TEXT, - /** - * Write raw Events to Avro. Only works in batch mode. - */ - AVRO, - /** - * Write raw Events to BigQuery. - */ - BIGQUERY, - } - - /** - * Pub/sub mode to run in. - */ - public enum PubSubMode { - /** - * Publish events to pub/sub, but don't run the query. - */ - PUBLISH_ONLY, - /** - * Consume events from pub/sub and run the query, but don't publish. - */ - SUBSCRIBE_ONLY, - /** - * Both publish and consume, but as separate jobs. - */ - COMBINED - } - - /** - * Coder strategies. - */ - public enum CoderStrategy { - /** - * Hand-written. - */ - HAND, - /** - * Avro. - */ - AVRO, - /** - * Java serialization. - */ - JAVA - } - - /** - * How to determine resource names. - */ - public enum ResourceNameMode { - /** Names are used as provided. */ - VERBATIM, - /** Names are suffixed with the query being run. */ - QUERY, - /** Names are suffixed with the query being run and a random number. */ - QUERY_AND_SALT - } - - /** - * Units for rates. - */ - public enum RateUnit { - PER_SECOND(1_000_000L), - PER_MINUTE(60_000_000L); - - RateUnit(long usPerUnit) { - this.usPerUnit = usPerUnit; - } - - /** - * Number of microseconds per unit. - */ - private final long usPerUnit; - - /** - * Number of microseconds between events at given rate. - */ - public long rateToPeriodUs(long rate) { - return (usPerUnit + rate / 2) / rate; - } - } - - /** - * Shape of event rate. - */ - public enum RateShape { - SQUARE, - SINE; - - /** - * Number of steps used to approximate sine wave. - */ - private static final int N = 10; - - /** - * Return inter-event delay, in microseconds, for each generator - * to follow in order to achieve {@code rate} at {@code unit} using {@code numGenerators}. - */ - public long interEventDelayUs(int rate, RateUnit unit, int numGenerators) { - return unit.rateToPeriodUs(rate) * numGenerators; - } - - /** - * Return array of successive inter-event delays, in microseconds, for each generator - * to follow in order to achieve this shape with {@code firstRate/nextRate} at - * {@code unit} using {@code numGenerators}. - */ - public long[] interEventDelayUs( - int firstRate, int nextRate, RateUnit unit, int numGenerators) { - if (firstRate == nextRate) { - long[] interEventDelayUs = new long[1]; - interEventDelayUs[0] = unit.rateToPeriodUs(firstRate) * numGenerators; - return interEventDelayUs; - } - - switch (this) { - case SQUARE: { - long[] interEventDelayUs = new long[2]; - interEventDelayUs[0] = unit.rateToPeriodUs(firstRate) * numGenerators; - interEventDelayUs[1] = unit.rateToPeriodUs(nextRate) * numGenerators; - return interEventDelayUs; - } - case SINE: { - double mid = (firstRate + nextRate) / 2.0; - double amp = (firstRate - nextRate) / 2.0; // may be -ve - long[] interEventDelayUs = new long[N]; - for (int i = 0; i < N; i++) { - double r = (2.0 * Math.PI * i) / N; - double rate = mid + amp * Math.cos(r); - interEventDelayUs[i] = unit.rateToPeriodUs(Math.round(rate)) * numGenerators; - } - return interEventDelayUs; - } - } - throw new RuntimeException(); // switch should be exhaustive - } - - /** - * Return delay between steps, in seconds, for result of {@link #interEventDelayUs}, so - * as to cycle through the entire sequence every {@code ratePeriodSec}. - */ - public int stepLengthSec(int ratePeriodSec) { - int n = 0; - switch (this) { - case SQUARE: - n = 2; - break; - case SINE: - n = N; - break; - } - return (ratePeriodSec + n - 1) / n; - } - } - - /** - * Set to true to capture all info messages. The logging level flags don't currently work. - */ - private static final boolean LOG_INFO = false; - - /** - * Set to true to capture all error messages. The logging level flags don't currently work. - */ - private static final boolean LOG_ERROR = true; - - /** - * Set to true to log directly to stdout. If run using Google Dataflow, you can watch the results - * in real-time with: tail -f /var/log/dataflow/streaming-harness/harness-stdout.log - */ - private static final boolean LOG_TO_CONSOLE = false; - - /** - * Log info message. - */ - public static void info(String format, Object... args) { - if (LOG_INFO) { - LOG.info(String.format(format, args)); - if (LOG_TO_CONSOLE) { - System.out.println(String.format(format, args)); - } - } - } - - /** - * Log message to console. For client side only. - */ - public static void console(String format, Object... args) { - System.out.printf("%s %s%n", Instant.now(), String.format(format, args)); - } - - /** - * Label to use for timestamps on pub/sub messages. - */ - public static final String PUBSUB_TIMESTAMP = "timestamp"; - - /** - * Label to use for windmill ids on pub/sub messages. - */ - public static final String PUBSUB_ID = "id"; - - /** - * All events will be given a timestamp relative to this time (ms since epoch). - */ - private static final long BASE_TIME = Instant.parse("2015-07-15T00:00:00.000Z").getMillis(); - - /** - * Instants guaranteed to be strictly before and after all event timestamps, and which won't - * be subject to underflow/overflow. - */ - public static final Instant BEGINNING_OF_TIME = new Instant(0).plus(Duration.standardDays(365)); - public static final Instant END_OF_TIME = - BoundedWindow.TIMESTAMP_MAX_VALUE.minus(Duration.standardDays(365)); - - /** - * Setup pipeline with codes and some other options. - */ - public static void setupPipeline(CoderStrategy coderStrategy, Pipeline p) { - CoderRegistry registry = p.getCoderRegistry(); - switch (coderStrategy) { - case HAND: - registry.registerCoderForClass(Auction.class, Auction.CODER); - registry.registerCoderForClass(AuctionBid.class, AuctionBid.CODER); - registry.registerCoderForClass(AuctionCount.class, AuctionCount.CODER); - registry.registerCoderForClass(AuctionPrice.class, AuctionPrice.CODER); - registry.registerCoderForClass(Bid.class, Bid.CODER); - registry.registerCoderForClass(CategoryPrice.class, CategoryPrice.CODER); - registry.registerCoderForClass(Event.class, Event.CODER); - registry.registerCoderForClass(IdNameReserve.class, IdNameReserve.CODER); - registry.registerCoderForClass(NameCityStateId.class, NameCityStateId.CODER); - registry.registerCoderForClass(Person.class, Person.CODER); - registry.registerCoderForClass(SellerPrice.class, SellerPrice.CODER); - registry.registerCoderForClass(Done.class, Done.CODER); - registry.registerCoderForClass(BidsPerSession.class, BidsPerSession.CODER); - break; - case AVRO: - registry.registerCoderProvider(AvroCoder.getCoderProvider()); - break; - case JAVA: - registry.registerCoderProvider(SerializableCoder.getCoderProvider()); - break; - } - } - - /** - * Return a generator config to match the given {@code options}. - */ - private static GeneratorConfig standardGeneratorConfig(NexmarkConfiguration configuration) { - return new GeneratorConfig(configuration, - configuration.useWallclockEventTime ? System.currentTimeMillis() - : BASE_TIME, 0, - configuration.numEvents, 0); - } - - /** - * Return an iterator of events using the 'standard' generator config. - */ - public static Iterator<TimestampedValue<Event>> standardEventIterator( - NexmarkConfiguration configuration) { - return new Generator(standardGeneratorConfig(configuration)); - } - - /** - * Return a transform which yields a finite number of synthesized events generated - * as a batch. - */ - public static PTransform<PBegin, PCollection<Event>> batchEventsSource( - NexmarkConfiguration configuration) { - return Read.from(new BoundedEventSource(standardGeneratorConfig(configuration), - configuration.numEventGenerators)); - } - - /** - * Return a transform which yields a finite number of synthesized events generated - * on-the-fly in real time. - */ - public static PTransform<PBegin, PCollection<Event>> streamEventsSource( - NexmarkConfiguration configuration) { - return Read.from(new UnboundedEventSource(NexmarkUtils.standardGeneratorConfig(configuration), - configuration.numEventGenerators, - configuration.watermarkHoldbackSec, - configuration.isRateLimited)); - } - - /** - * Return a transform to pass-through events, but count them as they go by. - */ - public static ParDo.SingleOutput<Event, Event> snoop(final String name) { - return ParDo.of(new DoFn<Event, Event>() { - final Counter eventCounter = Metrics.counter(name, "events"); - final Counter newPersonCounter = Metrics.counter(name, "newPersons"); - final Counter newAuctionCounter = Metrics.counter(name, "newAuctions"); - final Counter bidCounter = Metrics.counter(name, "bids"); - final Counter endOfStreamCounter = Metrics.counter(name, "endOfStream"); - - @ProcessElement - public void processElement(ProcessContext c) { - eventCounter.inc(); - if (c.element().newPerson != null) { - newPersonCounter.inc(); - } else if (c.element().newAuction != null) { - newAuctionCounter.inc(); - } else if (c.element().bid != null) { - bidCounter.inc(); - } else { - endOfStreamCounter.inc(); - } - info("%s snooping element %s", name, c.element()); - c.output(c.element()); - } - }); - } - - /** - * Return a transform to count and discard each element. - */ - public static <T> ParDo.SingleOutput<T, Void> devNull(final String name) { - return ParDo.of(new DoFn<T, Void>() { - final Counter discardedCounterMetric = Metrics.counter(name, "discarded"); - - @ProcessElement - public void processElement(ProcessContext c) { - discardedCounterMetric.inc(); - } - }); - } - - /** - * Return a transform to log each element, passing it through unchanged. - */ - public static <T> ParDo.SingleOutput<T, T> log(final String name) { - return ParDo.of(new DoFn<T, T>() { - @ProcessElement - public void processElement(ProcessContext c) { - LOG.info("%s: %s", name, c.element()); - c.output(c.element()); - } - }); - } - - /** - * Return a transform to format each element as a string. - */ - public static <T> ParDo.SingleOutput<T, String> format(final String name) { - return ParDo.of(new DoFn<T, String>() { - final Counter recordCounterMetric = Metrics.counter(name, "records"); - - @ProcessElement - public void processElement(ProcessContext c) { - recordCounterMetric.inc(); - c.output(c.element().toString()); - } - }); - } - - /** - * Return a transform to make explicit the timestamp of each element. - */ - public static <T> ParDo.SingleOutput<T, TimestampedValue<T>> stamp(String name) { - return ParDo.of(new DoFn<T, TimestampedValue<T>>() { - @ProcessElement - public void processElement(ProcessContext c) { - c.output(TimestampedValue.of(c.element(), c.timestamp())); - } - }); - } - - /** - * Return a transform to reduce a stream to a single, order-invariant long hash. - */ - public static <T> PTransform<PCollection<T>, PCollection<Long>> hash( - final long numEvents, String name) { - return new PTransform<PCollection<T>, PCollection<Long>>(name) { - @Override - public PCollection<Long> expand(PCollection<T> input) { - return input.apply(Window.<T>into(new GlobalWindows()) - .triggering(AfterPane.elementCountAtLeast((int) numEvents)) - .withAllowedLateness(Duration.standardDays(1)) - .discardingFiredPanes()) - - .apply(name + ".Hash", ParDo.of(new DoFn<T, Long>() { - @ProcessElement - public void processElement(ProcessContext c) { - long hash = - Hashing.murmur3_128() - .newHasher() - .putLong(c.timestamp().getMillis()) - .putString(c.element().toString(), StandardCharsets.UTF_8) - .hash() - .asLong(); - c.output(hash); - } - })) - - .apply(Combine.globally(new Combine.BinaryCombineFn<Long>() { - @Override - public Long apply(Long left, Long right) { - return left ^ right; - } - })); - } - }; - } - - private static final long MASK = (1L << 16) - 1L; - private static final long HASH = 0x243F6A8885A308D3L; - private static final long INIT_PLAINTEXT = 50000L; - - /** - * Return a transform to keep the CPU busy for given milliseconds on every record. - */ - public static <T> ParDo.SingleOutput<T, T> cpuDelay(String name, final long delayMs) { - return ParDo.of(new DoFn<T, T>() { - @ProcessElement - public void processElement(ProcessContext c) { - long now = System.currentTimeMillis(); - long end = now + delayMs; - while (now < end) { - // Find plaintext which hashes to HASH in lowest MASK bits. - // Values chosen to roughly take 1ms on typical workstation. - long p = INIT_PLAINTEXT; - while (true) { - long t = Hashing.murmur3_128().hashLong(p).asLong(); - if ((t & MASK) == (HASH & MASK)) { - break; - } - p++; - } - now = System.currentTimeMillis(); - } - c.output(c.element()); - } - }); - } - - private static final int MAX_BUFFER_SIZE = 1 << 24; - - private static class DiskBusyTransform<T> extends PTransform<PCollection<T>, PCollection<T>>{ - - private long bytes; - - private DiskBusyTransform(long bytes) { - this.bytes = bytes; - } - - @Override public PCollection<T> expand(PCollection<T> input) { - // Add dummy key to be able to use State API - PCollection<KV<Integer, T>> kvCollection = input.apply("diskBusy.keyElements", ParDo.of(new DoFn<T, KV<Integer, T>>() { - - @ProcessElement public void processElement(ProcessContext context) { - context.output(KV.of(0, context.element())); - } - })); - // Apply actual transform that generates disk IO using state API - PCollection<T> output = kvCollection.apply("diskBusy.generateIO", ParDo.of(new DoFn<KV<Integer, T>, T>() { - - private static final String DISK_BUSY = "diskBusy"; - - @StateId(DISK_BUSY) private final StateSpec<ValueState<byte[]>> spec = StateSpecs - .value(ByteArrayCoder.of()); - - @ProcessElement public void processElement(ProcessContext c, - @StateId(DISK_BUSY) ValueState<byte[]> state) { - long remain = bytes; - long now = System.currentTimeMillis(); - while (remain > 0) { - long thisBytes = Math.min(remain, MAX_BUFFER_SIZE); - remain -= thisBytes; - byte[] arr = new byte[(int) thisBytes]; - for (int i = 0; i < thisBytes; i++) { - arr[i] = (byte) now; - } - state.write(arr); - now = System.currentTimeMillis(); - } - c.output(c.element().getValue()); - } - })); - return output; - } - } - - - /** - * Return a transform to write given number of bytes to durable store on every record. - */ - public static <T> PTransform<PCollection<T>, PCollection<T>> diskBusy(final long bytes) { - return new DiskBusyTransform<>(bytes); - } - - /** - * Return a transform to cast each element to {@link KnownSize}. - */ - private static <T extends KnownSize> ParDo.SingleOutput<T, KnownSize> castToKnownSize() { - return ParDo.of(new DoFn<T, KnownSize>() { - @ProcessElement - public void processElement(ProcessContext c) { - c.output(c.element()); - } - }); - } - - /** - * A coder for instances of {@code T} cast up to {@link KnownSize}. - * - * @param <T> True type of object. - */ - private static class CastingCoder<T extends KnownSize> extends CustomCoder<KnownSize> { - private final Coder<T> trueCoder; - - public CastingCoder(Coder<T> trueCoder) { - this.trueCoder = trueCoder; - } - - @Override - public void encode(KnownSize value, OutputStream outStream) - throws CoderException, IOException { - @SuppressWarnings("unchecked") - T typedValue = (T) value; - trueCoder.encode(typedValue, outStream); - } - - @Override - public KnownSize decode(InputStream inStream) - throws CoderException, IOException { - return trueCoder.decode(inStream); - } - } - - /** - * Return a coder for {@code KnownSize} that are known to be exactly of type {@code T}. - */ - private static <T extends KnownSize> Coder<KnownSize> makeCastingCoder(Coder<T> trueCoder) { - return new CastingCoder<>(trueCoder); - } - - /** - * Return {@code elements} as {@code KnownSize}s. - */ - public static <T extends KnownSize> PCollection<KnownSize> castToKnownSize( - final String name, PCollection<T> elements) { - return elements.apply(name + ".Forget", castToKnownSize()) - .setCoder(makeCastingCoder(elements.getCoder())); - } - - // Do not instantiate. - private NexmarkUtils() { - } -}
http://git-wip-us.apache.org/repos/asf/beam/blob/f4333df7/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/Auction.java ---------------------------------------------------------------------- diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/Auction.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/Auction.java deleted file mode 100644 index 9f5d7c0..0000000 --- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/Auction.java +++ /dev/null @@ -1,187 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.integration.nexmark.model; - -import com.fasterxml.jackson.annotation.JsonProperty; -import com.fasterxml.jackson.core.JsonProcessingException; -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.io.Serializable; -import org.apache.beam.integration.nexmark.NexmarkUtils; -import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.coders.CoderException; -import org.apache.beam.sdk.coders.CustomCoder; -import org.apache.beam.sdk.coders.StringUtf8Coder; -import org.apache.beam.sdk.coders.VarLongCoder; - -/** - * An auction submitted by a person. - */ -public class Auction implements KnownSize, Serializable { - private static final Coder<Long> LONG_CODER = VarLongCoder.of(); - private static final Coder<String> STRING_CODER = StringUtf8Coder.of(); - - public static final Coder<Auction> CODER = new CustomCoder<Auction>() { - @Override - public void encode(Auction value, OutputStream outStream) - throws CoderException, IOException { - LONG_CODER.encode(value.id, outStream); - STRING_CODER.encode(value.itemName, outStream); - STRING_CODER.encode(value.description, outStream); - LONG_CODER.encode(value.initialBid, outStream); - LONG_CODER.encode(value.reserve, outStream); - LONG_CODER.encode(value.dateTime, outStream); - LONG_CODER.encode(value.expires, outStream); - LONG_CODER.encode(value.seller, outStream); - LONG_CODER.encode(value.category, outStream); - STRING_CODER.encode(value.extra, outStream); - } - - @Override - public Auction decode( - InputStream inStream) - throws CoderException, IOException { - long id = LONG_CODER.decode(inStream); - String itemName = STRING_CODER.decode(inStream); - String description = STRING_CODER.decode(inStream); - long initialBid = LONG_CODER.decode(inStream); - long reserve = LONG_CODER.decode(inStream); - long dateTime = LONG_CODER.decode(inStream); - long expires = LONG_CODER.decode(inStream); - long seller = LONG_CODER.decode(inStream); - long category = LONG_CODER.decode(inStream); - String extra = STRING_CODER.decode(inStream); - return new Auction( - id, itemName, description, initialBid, reserve, dateTime, expires, seller, category, - extra); - } - }; - - - /** Id of auction. */ - @JsonProperty - public final long id; // primary key - - /** Extra auction properties. */ - @JsonProperty - private final String itemName; - - @JsonProperty - private final String description; - - /** Initial bid price, in cents. */ - @JsonProperty - private final long initialBid; - - /** Reserve price, in cents. */ - @JsonProperty - public final long reserve; - - @JsonProperty - public final long dateTime; - - /** When does auction expire? (ms since epoch). Bids at or after this time are ignored. */ - @JsonProperty - public final long expires; - - /** Id of person who instigated auction. */ - @JsonProperty - public final long seller; // foreign key: Person.id - - /** Id of category auction is listed under. */ - @JsonProperty - public final long category; // foreign key: Category.id - - /** Additional arbitrary payload for performance testing. */ - @JsonProperty - private final String extra; - - - // For Avro only. - @SuppressWarnings("unused") - private Auction() { - id = 0; - itemName = null; - description = null; - initialBid = 0; - reserve = 0; - dateTime = 0; - expires = 0; - seller = 0; - category = 0; - extra = null; - } - - public Auction(long id, String itemName, String description, long initialBid, long reserve, - long dateTime, long expires, long seller, long category, String extra) { - this.id = id; - this.itemName = itemName; - this.description = description; - this.initialBid = initialBid; - this.reserve = reserve; - this.dateTime = dateTime; - this.expires = expires; - this.seller = seller; - this.category = category; - this.extra = extra; - } - - /** - * Return a copy of auction which capture the given annotation. - * (Used for debugging). - */ - public Auction withAnnotation(String annotation) { - return new Auction(id, itemName, description, initialBid, reserve, dateTime, expires, seller, - category, annotation + ": " + extra); - } - - /** - * Does auction have {@code annotation}? (Used for debugging.) - */ - public boolean hasAnnotation(String annotation) { - return extra.startsWith(annotation + ": "); - } - - /** - * Remove {@code annotation} from auction. (Used for debugging.) - */ - public Auction withoutAnnotation(String annotation) { - if (hasAnnotation(annotation)) { - return new Auction(id, itemName, description, initialBid, reserve, dateTime, expires, seller, - category, extra.substring(annotation.length() + 2)); - } else { - return this; - } - } - - @Override - public long sizeInBytes() { - return 8 + itemName.length() + 1 + description.length() + 1 + 8 + 8 + 8 + 8 + 8 + 8 - + extra.length() + 1; - } - - @Override - public String toString() { - try { - return NexmarkUtils.MAPPER.writeValueAsString(this); - } catch (JsonProcessingException e) { - throw new RuntimeException(e); - } - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/f4333df7/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/AuctionBid.java ---------------------------------------------------------------------- diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/AuctionBid.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/AuctionBid.java deleted file mode 100644 index b9d79db..0000000 --- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/AuctionBid.java +++ /dev/null @@ -1,84 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.integration.nexmark.model; - -import com.fasterxml.jackson.annotation.JsonProperty; -import com.fasterxml.jackson.core.JsonProcessingException; -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.io.Serializable; -import org.apache.beam.integration.nexmark.NexmarkUtils; -import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.coders.CoderException; -import org.apache.beam.sdk.coders.CustomCoder; - -/** - * Result of {@link org.apache.beam.integration.nexmark.queries.WinningBids} transform. - */ -public class AuctionBid implements KnownSize, Serializable { - public static final Coder<AuctionBid> CODER = new CustomCoder<AuctionBid>() { - @Override - public void encode(AuctionBid value, OutputStream outStream) - throws CoderException, IOException { - Auction.CODER.encode(value.auction, outStream); - Bid.CODER.encode(value.bid, outStream); - } - - @Override - public AuctionBid decode( - InputStream inStream) - throws CoderException, IOException { - Auction auction = Auction.CODER.decode(inStream); - Bid bid = Bid.CODER.decode(inStream); - return new AuctionBid(auction, bid); - } - }; - - @JsonProperty - public final Auction auction; - - @JsonProperty - public final Bid bid; - - // For Avro only. - @SuppressWarnings("unused") - private AuctionBid() { - auction = null; - bid = null; - } - - public AuctionBid(Auction auction, Bid bid) { - this.auction = auction; - this.bid = bid; - } - - @Override - public long sizeInBytes() { - return auction.sizeInBytes() + bid.sizeInBytes(); - } - - @Override - public String toString() { - try { - return NexmarkUtils.MAPPER.writeValueAsString(this); - } catch (JsonProcessingException e) { - throw new RuntimeException(e); - } - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/f4333df7/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/AuctionCount.java ---------------------------------------------------------------------- diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/AuctionCount.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/AuctionCount.java deleted file mode 100644 index 0e643ff..0000000 --- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/AuctionCount.java +++ /dev/null @@ -1,84 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.integration.nexmark.model; - -import com.fasterxml.jackson.annotation.JsonProperty; -import com.fasterxml.jackson.core.JsonProcessingException; -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.io.Serializable; -import org.apache.beam.integration.nexmark.NexmarkUtils; -import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.coders.CoderException; -import org.apache.beam.sdk.coders.CustomCoder; -import org.apache.beam.sdk.coders.VarLongCoder; - -/** - * Result of Query5. - */ -public class AuctionCount implements KnownSize, Serializable { - private static final Coder<Long> LONG_CODER = VarLongCoder.of(); - - public static final Coder<AuctionCount> CODER = new CustomCoder<AuctionCount>() { - @Override - public void encode(AuctionCount value, OutputStream outStream) - throws CoderException, IOException { - LONG_CODER.encode(value.auction, outStream); - LONG_CODER.encode(value.count, outStream); - } - - @Override - public AuctionCount decode(InputStream inStream) - throws CoderException, IOException { - long auction = LONG_CODER.decode(inStream); - long count = LONG_CODER.decode(inStream); - return new AuctionCount(auction, count); - } - }; - - @JsonProperty private final long auction; - - @JsonProperty private final long count; - - // For Avro only. - @SuppressWarnings("unused") - private AuctionCount() { - auction = 0; - count = 0; - } - - public AuctionCount(long auction, long count) { - this.auction = auction; - this.count = count; - } - - @Override - public long sizeInBytes() { - return 8 + 8; - } - - @Override - public String toString() { - try { - return NexmarkUtils.MAPPER.writeValueAsString(this); - } catch (JsonProcessingException e) { - throw new RuntimeException(e); - } - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/f4333df7/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/AuctionPrice.java ---------------------------------------------------------------------- diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/AuctionPrice.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/AuctionPrice.java deleted file mode 100644 index 7d51a21..0000000 --- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/AuctionPrice.java +++ /dev/null @@ -1,88 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.integration.nexmark.model; - -import com.fasterxml.jackson.annotation.JsonProperty; -import com.fasterxml.jackson.core.JsonProcessingException; -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.io.Serializable; -import org.apache.beam.integration.nexmark.NexmarkUtils; -import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.coders.CoderException; -import org.apache.beam.sdk.coders.CustomCoder; -import org.apache.beam.sdk.coders.VarLongCoder; - -/** - * Result of Query2. - */ -public class AuctionPrice implements KnownSize, Serializable { - private static final Coder<Long> LONG_CODER = VarLongCoder.of(); - - public static final Coder<AuctionPrice> CODER = new CustomCoder<AuctionPrice>() { - @Override - public void encode(AuctionPrice value, OutputStream outStream) - throws CoderException, IOException { - LONG_CODER.encode(value.auction, outStream); - LONG_CODER.encode(value.price, outStream); - } - - @Override - public AuctionPrice decode( - InputStream inStream) - throws CoderException, IOException { - long auction = LONG_CODER.decode(inStream); - long price = LONG_CODER.decode(inStream); - return new AuctionPrice(auction, price); - } - }; - - @JsonProperty - private final long auction; - - /** Price in cents. */ - @JsonProperty - private final long price; - - // For Avro only. - @SuppressWarnings("unused") - private AuctionPrice() { - auction = 0; - price = 0; - } - - public AuctionPrice(long auction, long price) { - this.auction = auction; - this.price = price; - } - - @Override - public long sizeInBytes() { - return 8 + 8; - } - - @Override - public String toString() { - try { - return NexmarkUtils.MAPPER.writeValueAsString(this); - } catch (JsonProcessingException e) { - throw new RuntimeException(e); - } - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/f4333df7/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/Bid.java ---------------------------------------------------------------------- diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/Bid.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/Bid.java deleted file mode 100644 index 4fa9ea0..0000000 --- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/Bid.java +++ /dev/null @@ -1,177 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.integration.nexmark.model; - -import com.fasterxml.jackson.annotation.JsonProperty; -import com.fasterxml.jackson.core.JsonProcessingException; -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.io.Serializable; -import java.util.Comparator; -import org.apache.beam.integration.nexmark.NexmarkUtils; -import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.coders.CoderException; -import org.apache.beam.sdk.coders.CustomCoder; -import org.apache.beam.sdk.coders.StringUtf8Coder; -import org.apache.beam.sdk.coders.VarLongCoder; - -/** - * A bid for an item on auction. - */ -public class Bid implements KnownSize, Serializable { - private static final Coder<Long> LONG_CODER = VarLongCoder.of(); - private static final Coder<String> STRING_CODER = StringUtf8Coder.of(); - - public static final Coder<Bid> CODER = new CustomCoder<Bid>() { - @Override - public void encode(Bid value, OutputStream outStream) - throws CoderException, IOException { - LONG_CODER.encode(value.auction, outStream); - LONG_CODER.encode(value.bidder, outStream); - LONG_CODER.encode(value.price, outStream); - LONG_CODER.encode(value.dateTime, outStream); - STRING_CODER.encode(value.extra, outStream); - } - - @Override - public Bid decode( - InputStream inStream) - throws CoderException, IOException { - long auction = LONG_CODER.decode(inStream); - long bidder = LONG_CODER.decode(inStream); - long price = LONG_CODER.decode(inStream); - long dateTime = LONG_CODER.decode(inStream); - String extra = STRING_CODER.decode(inStream); - return new Bid(auction, bidder, price, dateTime, extra); - } - - @Override public void verifyDeterministic() throws NonDeterministicException {} - }; - - /** - * Comparator to order bids by ascending price then descending time - * (for finding winning bids). - */ - public static final Comparator<Bid> PRICE_THEN_DESCENDING_TIME = new Comparator<Bid>() { - @Override - public int compare(Bid left, Bid right) { - int i = Double.compare(left.price, right.price); - if (i != 0) { - return i; - } - return Long.compare(right.dateTime, left.dateTime); - } - }; - - /** - * Comparator to order bids by ascending time then ascending price. - * (for finding most recent bids). - */ - public static final Comparator<Bid> ASCENDING_TIME_THEN_PRICE = new Comparator<Bid>() { - @Override - public int compare(Bid left, Bid right) { - int i = Long.compare(left.dateTime, right.dateTime); - if (i != 0) { - return i; - } - return Double.compare(left.price, right.price); - } - }; - - /** Id of auction this bid is for. */ - @JsonProperty - public final long auction; // foreign key: Auction.id - - /** Id of person bidding in auction. */ - @JsonProperty - public final long bidder; // foreign key: Person.id - - /** Price of bid, in cents. */ - @JsonProperty - public final long price; - - /** - * Instant at which bid was made (ms since epoch). - * NOTE: This may be earlier than the system's event time. - */ - @JsonProperty - public final long dateTime; - - /** Additional arbitrary payload for performance testing. */ - @JsonProperty - public final String extra; - - // For Avro only. - @SuppressWarnings("unused") - private Bid() { - auction = 0; - bidder = 0; - price = 0; - dateTime = 0; - extra = null; - } - - public Bid(long auction, long bidder, long price, long dateTime, String extra) { - this.auction = auction; - this.bidder = bidder; - this.price = price; - this.dateTime = dateTime; - this.extra = extra; - } - - /** - * Return a copy of bid which capture the given annotation. - * (Used for debugging). - */ - public Bid withAnnotation(String annotation) { - return new Bid(auction, bidder, price, dateTime, annotation + ": " + extra); - } - - /** - * Does bid have {@code annotation}? (Used for debugging.) - */ - public boolean hasAnnotation(String annotation) { - return extra.startsWith(annotation + ": "); - } - - /** - * Remove {@code annotation} from bid. (Used for debugging.) - */ - public Bid withoutAnnotation(String annotation) { - if (hasAnnotation(annotation)) { - return new Bid(auction, bidder, price, dateTime, extra.substring(annotation.length() + 2)); - } else { - return this; - } - } - - @Override - public long sizeInBytes() { - return 8 + 8 + 8 + 8 + extra.length() + 1; - } - - @Override - public String toString() { - try { - return NexmarkUtils.MAPPER.writeValueAsString(this); - } catch (JsonProcessingException e) { - throw new RuntimeException(e); - } - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/f4333df7/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/BidsPerSession.java ---------------------------------------------------------------------- diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/BidsPerSession.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/BidsPerSession.java deleted file mode 100644 index 3211456..0000000 --- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/BidsPerSession.java +++ /dev/null @@ -1,87 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.integration.nexmark.model; - -import com.fasterxml.jackson.annotation.JsonProperty; -import com.fasterxml.jackson.core.JsonProcessingException; -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.io.Serializable; -import org.apache.beam.integration.nexmark.NexmarkUtils; -import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.coders.CoderException; -import org.apache.beam.sdk.coders.CustomCoder; -import org.apache.beam.sdk.coders.VarLongCoder; - -/** - * Result of query 11. - */ -public class BidsPerSession implements KnownSize, Serializable { - private static final Coder<Long> LONG_CODER = VarLongCoder.of(); - - public static final Coder<BidsPerSession> CODER = new CustomCoder<BidsPerSession>() { - @Override - public void encode(BidsPerSession value, OutputStream outStream) - throws CoderException, IOException { - LONG_CODER.encode(value.personId, outStream); - LONG_CODER.encode(value.bidsPerSession, outStream); - } - - @Override - public BidsPerSession decode( - InputStream inStream) - throws CoderException, IOException { - long personId = LONG_CODER.decode(inStream); - long bidsPerSession = LONG_CODER.decode(inStream); - return new BidsPerSession(personId, bidsPerSession); - } - @Override public void verifyDeterministic() throws NonDeterministicException {} - }; - - @JsonProperty - private final long personId; - - @JsonProperty - private final long bidsPerSession; - - public BidsPerSession() { - personId = 0; - bidsPerSession = 0; - } - - public BidsPerSession(long personId, long bidsPerSession) { - this.personId = personId; - this.bidsPerSession = bidsPerSession; - } - - @Override - public long sizeInBytes() { - // Two longs. - return 8 + 8; - } - - @Override - public String toString() { - try { - return NexmarkUtils.MAPPER.writeValueAsString(this); - } catch (JsonProcessingException e) { - throw new RuntimeException(e); - } - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/f4333df7/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/CategoryPrice.java ---------------------------------------------------------------------- diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/CategoryPrice.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/CategoryPrice.java deleted file mode 100644 index 2678198..0000000 --- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/CategoryPrice.java +++ /dev/null @@ -1,97 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.integration.nexmark.model; - -import com.fasterxml.jackson.annotation.JsonProperty; -import com.fasterxml.jackson.core.JsonProcessingException; -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.io.Serializable; -import org.apache.beam.integration.nexmark.NexmarkUtils; -import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.coders.CoderException; -import org.apache.beam.sdk.coders.CustomCoder; -import org.apache.beam.sdk.coders.VarIntCoder; -import org.apache.beam.sdk.coders.VarLongCoder; - -/** - * Result of Query4. - */ -public class CategoryPrice implements KnownSize, Serializable { - private static final Coder<Long> LONG_CODER = VarLongCoder.of(); - private static final Coder<Integer> INT_CODER = VarIntCoder.of(); - - public static final Coder<CategoryPrice> CODER = new CustomCoder<CategoryPrice>() { - @Override - public void encode(CategoryPrice value, OutputStream outStream) - throws CoderException, IOException { - LONG_CODER.encode(value.category, outStream); - LONG_CODER.encode(value.price, outStream); - INT_CODER.encode(value.isLast ? 1 : 0, outStream); - } - - @Override - public CategoryPrice decode(InputStream inStream) - throws CoderException, IOException { - long category = LONG_CODER.decode(inStream); - long price = LONG_CODER.decode(inStream); - boolean isLast = INT_CODER.decode(inStream) != 0; - return new CategoryPrice(category, price, isLast); - } - @Override public void verifyDeterministic() throws NonDeterministicException {} - }; - - @JsonProperty - public final long category; - - /** Price in cents. */ - @JsonProperty - public final long price; - - @JsonProperty - public final boolean isLast; - - // For Avro only. - @SuppressWarnings("unused") - private CategoryPrice() { - category = 0; - price = 0; - isLast = false; - } - - public CategoryPrice(long category, long price, boolean isLast) { - this.category = category; - this.price = price; - this.isLast = isLast; - } - - @Override - public long sizeInBytes() { - return 8 + 8 + 1; - } - - @Override - public String toString() { - try { - return NexmarkUtils.MAPPER.writeValueAsString(this); - } catch (JsonProcessingException e) { - throw new RuntimeException(e); - } - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/f4333df7/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/Done.java ---------------------------------------------------------------------- diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/Done.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/Done.java deleted file mode 100644 index b0a88d4..0000000 --- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/Done.java +++ /dev/null @@ -1,80 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.integration.nexmark.model; - -import com.fasterxml.jackson.annotation.JsonProperty; -import com.fasterxml.jackson.core.JsonProcessingException; -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.io.Serializable; -import org.apache.beam.integration.nexmark.NexmarkUtils; -import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.coders.CoderException; -import org.apache.beam.sdk.coders.CustomCoder; -import org.apache.beam.sdk.coders.StringUtf8Coder; - -/** - * Result of query 10. - */ -public class Done implements KnownSize, Serializable { - private static final Coder<String> STRING_CODER = StringUtf8Coder.of(); - - public static final Coder<Done> CODER = new CustomCoder<Done>() { - @Override - public void encode(Done value, OutputStream outStream) - throws CoderException, IOException { - STRING_CODER.encode(value.message, outStream); - } - - @Override - public Done decode(InputStream inStream) - throws CoderException, IOException { - String message = STRING_CODER.decode(inStream); - return new Done(message); - } - @Override public void verifyDeterministic() throws NonDeterministicException {} - }; - - @JsonProperty - private final String message; - - // For Avro only. - @SuppressWarnings("unused") - public Done() { - message = null; - } - - public Done(String message) { - this.message = message; - } - - @Override - public long sizeInBytes() { - return message.length(); - } - - @Override - public String toString() { - try { - return NexmarkUtils.MAPPER.writeValueAsString(this); - } catch (JsonProcessingException e) { - throw new RuntimeException(e); - } - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/f4333df7/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/Event.java ---------------------------------------------------------------------- diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/Event.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/Event.java deleted file mode 100644 index 0e1672e..0000000 --- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/Event.java +++ /dev/null @@ -1,171 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.integration.nexmark.model; - -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.io.Serializable; -import javax.annotation.Nullable; -import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.coders.CustomCoder; -import org.apache.beam.sdk.coders.VarIntCoder; - -/** - * An event in the auction system, either a (new) {@link Person}, a (new) {@link Auction}, or a - * {@link Bid}. - */ -public class Event implements KnownSize, Serializable { - private enum Tag { - PERSON(0), - AUCTION(1), - BID(2); - - private int value = -1; - - Tag(int value){ - this.value = value; - } - } - private static final Coder<Integer> INT_CODER = VarIntCoder.of(); - - public static final Coder<Event> CODER = - new CustomCoder<Event>() { - @Override - public void encode(Event value, OutputStream outStream) throws IOException { - if (value.newPerson != null) { - INT_CODER.encode(Tag.PERSON.value, outStream); - Person.CODER.encode(value.newPerson, outStream); - } else if (value.newAuction != null) { - INT_CODER.encode(Tag.AUCTION.value, outStream); - Auction.CODER.encode(value.newAuction, outStream); - } else if (value.bid != null) { - INT_CODER.encode(Tag.BID.value, outStream); - Bid.CODER.encode(value.bid, outStream); - } else { - throw new RuntimeException("invalid event"); - } - } - - @Override - public Event decode(InputStream inStream) throws IOException { - int tag = INT_CODER.decode(inStream); - if (tag == Tag.PERSON.value) { - Person person = Person.CODER.decode(inStream); - return new Event(person); - } else if (tag == Tag.AUCTION.value) { - Auction auction = Auction.CODER.decode(inStream); - return new Event(auction); - } else if (tag == Tag.BID.value) { - Bid bid = Bid.CODER.decode(inStream); - return new Event(bid); - } else { - throw new RuntimeException("invalid event encoding"); - } - } - - @Override - public void verifyDeterministic() throws NonDeterministicException {} - }; - - @Nullable - @org.apache.avro.reflect.Nullable - public final Person newPerson; - - @Nullable - @org.apache.avro.reflect.Nullable - public final Auction newAuction; - - @Nullable - @org.apache.avro.reflect.Nullable - public final Bid bid; - - // For Avro only. - @SuppressWarnings("unused") - private Event() { - newPerson = null; - newAuction = null; - bid = null; - } - - public Event(Person newPerson) { - this.newPerson = newPerson; - newAuction = null; - bid = null; - } - - public Event(Auction newAuction) { - newPerson = null; - this.newAuction = newAuction; - bid = null; - } - - public Event(Bid bid) { - newPerson = null; - newAuction = null; - this.bid = bid; - } - - /** Return a copy of event which captures {@code annotation}. (Used for debugging). */ - public Event withAnnotation(String annotation) { - if (newPerson != null) { - return new Event(newPerson.withAnnotation(annotation)); - } else if (newAuction != null) { - return new Event(newAuction.withAnnotation(annotation)); - } else { - return new Event(bid.withAnnotation(annotation)); - } - } - - /** Does event have {@code annotation}? (Used for debugging.) */ - public boolean hasAnnotation(String annotation) { - if (newPerson != null) { - return newPerson.hasAnnotation(annotation); - } else if (newAuction != null) { - return newAuction.hasAnnotation(annotation); - } else { - return bid.hasAnnotation(annotation); - } - } - - @Override - public long sizeInBytes() { - if (newPerson != null) { - return 1 + newPerson.sizeInBytes(); - } else if (newAuction != null) { - return 1 + newAuction.sizeInBytes(); - } else if (bid != null) { - return 1 + bid.sizeInBytes(); - } else { - throw new RuntimeException("invalid event"); - } - } - - @Override - public String toString() { - if (newPerson != null) { - return newPerson.toString(); - } else if (newAuction != null) { - return newAuction.toString(); - } else if (bid != null) { - return bid.toString(); - } else { - throw new RuntimeException("invalid event"); - } - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/f4333df7/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/IdNameReserve.java ---------------------------------------------------------------------- diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/IdNameReserve.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/IdNameReserve.java deleted file mode 100644 index 8cade4e..0000000 --- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/IdNameReserve.java +++ /dev/null @@ -1,98 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.integration.nexmark.model; - -import com.fasterxml.jackson.annotation.JsonProperty; -import com.fasterxml.jackson.core.JsonProcessingException; -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.io.Serializable; -import org.apache.beam.integration.nexmark.NexmarkUtils; -import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.coders.CoderException; -import org.apache.beam.sdk.coders.CustomCoder; -import org.apache.beam.sdk.coders.StringUtf8Coder; -import org.apache.beam.sdk.coders.VarLongCoder; - -/** - * Result type of Query8. - */ -public class IdNameReserve implements KnownSize, Serializable { - private static final Coder<Long> LONG_CODER = VarLongCoder.of(); - private static final Coder<String> STRING_CODER = StringUtf8Coder.of(); - - public static final Coder<IdNameReserve> CODER = new CustomCoder<IdNameReserve>() { - @Override - public void encode(IdNameReserve value, OutputStream outStream) - throws CoderException, IOException { - LONG_CODER.encode(value.id, outStream); - STRING_CODER.encode(value.name, outStream); - LONG_CODER.encode(value.reserve, outStream); - } - - @Override - public IdNameReserve decode( - InputStream inStream) - throws CoderException, IOException { - long id = LONG_CODER.decode(inStream); - String name = STRING_CODER.decode(inStream); - long reserve = LONG_CODER.decode(inStream); - return new IdNameReserve(id, name, reserve); - } - @Override public void verifyDeterministic() throws NonDeterministicException {} - }; - - @JsonProperty - private final long id; - - @JsonProperty - private final String name; - - /** Reserve price in cents. */ - @JsonProperty - private final long reserve; - - // For Avro only. - @SuppressWarnings("unused") - private IdNameReserve() { - id = 0; - name = null; - reserve = 0; - } - - public IdNameReserve(long id, String name, long reserve) { - this.id = id; - this.name = name; - this.reserve = reserve; - } - - @Override - public long sizeInBytes() { - return 8 + name.length() + 1 + 8; - } - - @Override - public String toString() { - try { - return NexmarkUtils.MAPPER.writeValueAsString(this); - } catch (JsonProcessingException e) { - throw new RuntimeException(e); - } - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/f4333df7/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/KnownSize.java ---------------------------------------------------------------------- diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/KnownSize.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/KnownSize.java deleted file mode 100644 index c742eac..0000000 --- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/KnownSize.java +++ /dev/null @@ -1,26 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.integration.nexmark.model; - -/** - * Interface for elements which can quickly estimate their encoded byte size. - */ -public interface KnownSize { - long sizeInBytes(); -} - http://git-wip-us.apache.org/repos/asf/beam/blob/f4333df7/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/NameCityStateId.java ---------------------------------------------------------------------- diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/NameCityStateId.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/NameCityStateId.java deleted file mode 100644 index 37bd3c6..0000000 --- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/NameCityStateId.java +++ /dev/null @@ -1,103 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.integration.nexmark.model; - -import com.fasterxml.jackson.annotation.JsonProperty; -import com.fasterxml.jackson.core.JsonProcessingException; -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.io.Serializable; -import org.apache.beam.integration.nexmark.NexmarkUtils; -import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.coders.CoderException; -import org.apache.beam.sdk.coders.CustomCoder; -import org.apache.beam.sdk.coders.StringUtf8Coder; -import org.apache.beam.sdk.coders.VarLongCoder; - -/** - * Result of Query3. - */ -public class NameCityStateId implements KnownSize, Serializable { - private static final Coder<Long> LONG_CODER = VarLongCoder.of(); - private static final Coder<String> STRING_CODER = StringUtf8Coder.of(); - - public static final Coder<NameCityStateId> CODER = new CustomCoder<NameCityStateId>() { - @Override - public void encode(NameCityStateId value, OutputStream outStream) - throws CoderException, IOException { - STRING_CODER.encode(value.name, outStream); - STRING_CODER.encode(value.city, outStream); - STRING_CODER.encode(value.state, outStream); - LONG_CODER.encode(value.id, outStream); - } - - @Override - public NameCityStateId decode(InputStream inStream) - throws CoderException, IOException { - String name = STRING_CODER.decode(inStream); - String city = STRING_CODER.decode(inStream); - String state = STRING_CODER.decode(inStream); - long id = LONG_CODER.decode(inStream); - return new NameCityStateId(name, city, state, id); - } - @Override public void verifyDeterministic() throws NonDeterministicException {} - }; - - @JsonProperty - private final String name; - - @JsonProperty - private final String city; - - @JsonProperty - private final String state; - - @JsonProperty - private final long id; - - // For Avro only. - @SuppressWarnings("unused") - private NameCityStateId() { - name = null; - city = null; - state = null; - id = 0; - } - - public NameCityStateId(String name, String city, String state, long id) { - this.name = name; - this.city = city; - this.state = state; - this.id = id; - } - - @Override - public long sizeInBytes() { - return name.length() + 1 + city.length() + 1 + state.length() + 1 + 8; - } - - @Override - public String toString() { - try { - return NexmarkUtils.MAPPER.writeValueAsString(this); - } catch (JsonProcessingException e) { - throw new RuntimeException(e); - } - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/f4333df7/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/Person.java ---------------------------------------------------------------------- diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/Person.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/Person.java deleted file mode 100644 index bde587d..0000000 --- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/Person.java +++ /dev/null @@ -1,163 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.integration.nexmark.model; - -import com.fasterxml.jackson.annotation.JsonProperty; -import com.fasterxml.jackson.core.JsonProcessingException; -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.io.Serializable; -import org.apache.beam.integration.nexmark.NexmarkUtils; -import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.coders.CoderException; -import org.apache.beam.sdk.coders.CustomCoder; -import org.apache.beam.sdk.coders.StringUtf8Coder; -import org.apache.beam.sdk.coders.VarLongCoder; - -/** - * A person either creating an auction or making a bid. - */ -public class Person implements KnownSize, Serializable { - private static final Coder<Long> LONG_CODER = VarLongCoder.of(); - private static final Coder<String> STRING_CODER = StringUtf8Coder.of(); - public static final Coder<Person> CODER = new CustomCoder<Person>() { - @Override - public void encode(Person value, OutputStream outStream) - throws CoderException, IOException { - LONG_CODER.encode(value.id, outStream); - STRING_CODER.encode(value.name, outStream); - STRING_CODER.encode(value.emailAddress, outStream); - STRING_CODER.encode(value.creditCard, outStream); - STRING_CODER.encode(value.city, outStream); - STRING_CODER.encode(value.state, outStream); - LONG_CODER.encode(value.dateTime, outStream); - STRING_CODER.encode(value.extra, outStream); - } - - @Override - public Person decode(InputStream inStream) - throws CoderException, IOException { - long id = LONG_CODER.decode(inStream); - String name = STRING_CODER.decode(inStream); - String emailAddress = STRING_CODER.decode(inStream); - String creditCard = STRING_CODER.decode(inStream); - String city = STRING_CODER.decode(inStream); - String state = STRING_CODER.decode(inStream); - long dateTime = LONG_CODER.decode(inStream); - String extra = STRING_CODER.decode(inStream); - return new Person(id, name, emailAddress, creditCard, city, state, dateTime, extra); - } - @Override public void verifyDeterministic() throws NonDeterministicException {} - }; - - /** Id of person. */ - @JsonProperty - public final long id; // primary key - - /** Extra person properties. */ - @JsonProperty - public final String name; - - @JsonProperty - private final String emailAddress; - - @JsonProperty - private final String creditCard; - - @JsonProperty - public final String city; - - @JsonProperty - public final String state; - - @JsonProperty - public final long dateTime; - - /** Additional arbitrary payload for performance testing. */ - @JsonProperty - private final String extra; - - // For Avro only. - @SuppressWarnings("unused") - private Person() { - id = 0; - name = null; - emailAddress = null; - creditCard = null; - city = null; - state = null; - dateTime = 0; - extra = null; - } - - public Person(long id, String name, String emailAddress, String creditCard, String city, - String state, long dateTime, String extra) { - this.id = id; - this.name = name; - this.emailAddress = emailAddress; - this.creditCard = creditCard; - this.city = city; - this.state = state; - this.dateTime = dateTime; - this.extra = extra; - } - - /** - * Return a copy of person which capture the given annotation. - * (Used for debugging). - */ - public Person withAnnotation(String annotation) { - return new Person(id, name, emailAddress, creditCard, city, state, dateTime, - annotation + ": " + extra); - } - - /** - * Does person have {@code annotation}? (Used for debugging.) - */ - public boolean hasAnnotation(String annotation) { - return extra.startsWith(annotation + ": "); - } - - /** - * Remove {@code annotation} from person. (Used for debugging.) - */ - public Person withoutAnnotation(String annotation) { - if (hasAnnotation(annotation)) { - return new Person(id, name, emailAddress, creditCard, city, state, dateTime, - extra.substring(annotation.length() + 2)); - } else { - return this; - } - } - - @Override - public long sizeInBytes() { - return 8 + name.length() + 1 + emailAddress.length() + 1 + creditCard.length() + 1 - + city.length() + 1 + state.length() + 8 + 1 + extra.length() + 1; - } - - @Override - public String toString() { - try { - return NexmarkUtils.MAPPER.writeValueAsString(this); - } catch (JsonProcessingException e) { - throw new RuntimeException(e); - } - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/f4333df7/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/SellerPrice.java ---------------------------------------------------------------------- diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/SellerPrice.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/SellerPrice.java deleted file mode 100644 index 61537f6..0000000 --- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/SellerPrice.java +++ /dev/null @@ -1,89 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.integration.nexmark.model; - -import com.fasterxml.jackson.annotation.JsonProperty; -import com.fasterxml.jackson.core.JsonProcessingException; -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.io.Serializable; -import org.apache.beam.integration.nexmark.NexmarkUtils; -import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.coders.CoderException; -import org.apache.beam.sdk.coders.CustomCoder; -import org.apache.beam.sdk.coders.VarLongCoder; - -/** - * Result of Query6. - */ -public class SellerPrice implements KnownSize, Serializable { - private static final Coder<Long> LONG_CODER = VarLongCoder.of(); - - public static final Coder<SellerPrice> CODER = new CustomCoder<SellerPrice>() { - @Override - public void encode(SellerPrice value, OutputStream outStream) - throws CoderException, IOException { - LONG_CODER.encode(value.seller, outStream); - LONG_CODER.encode(value.price, outStream); - } - - @Override - public SellerPrice decode( - InputStream inStream) - throws CoderException, IOException { - long seller = LONG_CODER.decode(inStream); - long price = LONG_CODER.decode(inStream); - return new SellerPrice(seller, price); - } - @Override public void verifyDeterministic() throws NonDeterministicException {} - }; - - @JsonProperty - public final long seller; - - /** Price in cents. */ - @JsonProperty - private final long price; - - // For Avro only. - @SuppressWarnings("unused") - private SellerPrice() { - seller = 0; - price = 0; - } - - public SellerPrice(long seller, long price) { - this.seller = seller; - this.price = price; - } - - @Override - public long sizeInBytes() { - return 8 + 8; - } - - @Override - public String toString() { - try { - return NexmarkUtils.MAPPER.writeValueAsString(this); - } catch (JsonProcessingException e) { - throw new RuntimeException(e); - } - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/f4333df7/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/package-info.java ---------------------------------------------------------------------- diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/package-info.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/package-info.java deleted file mode 100644 index e1d6113..0000000 --- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/package-info.java +++ /dev/null @@ -1,22 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -/** - * Nexmark Benchmark Model. - */ -package org.apache.beam.integration.nexmark.model; http://git-wip-us.apache.org/repos/asf/beam/blob/f4333df7/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/package-info.java ---------------------------------------------------------------------- diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/package-info.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/package-info.java deleted file mode 100644 index df6f09f..0000000 --- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/package-info.java +++ /dev/null @@ -1,21 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -/** - * Nexmark. - */ -package org.apache.beam.integration.nexmark;
