http://git-wip-us.apache.org/repos/asf/beam/blob/a7f9f7d0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/UnboundedEventSource.java ---------------------------------------------------------------------- diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/UnboundedEventSource.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/UnboundedEventSource.java deleted file mode 100644 index 9573ef7..0000000 --- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/UnboundedEventSource.java +++ /dev/null @@ -1,328 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.integration.nexmark; - -import java.util.ArrayList; -import java.util.List; -import java.util.NoSuchElementException; -import java.util.PriorityQueue; -import java.util.Queue; -import java.util.concurrent.ThreadLocalRandom; -import javax.annotation.Nullable; - -import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.io.UnboundedSource; -import org.apache.beam.sdk.options.PipelineOptions; -import org.apache.beam.sdk.transforms.windowing.BoundedWindow; -import org.apache.beam.sdk.values.TimestampedValue; -import org.joda.time.Duration; -import org.joda.time.Instant; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * A custom, unbounded source of event records. - * - * <p>If {@code isRateLimited} is true, events become available for return from the reader such - * that the overall rate respect the {@code interEventDelayUs} period if possible. Otherwise, - * events are returned every time the system asks for one. - */ -class UnboundedEventSource extends UnboundedSource<Event, Generator.Checkpoint> { - private static final Duration BACKLOG_PERIOD = Duration.standardSeconds(30); - private static final Logger LOG = LoggerFactory.getLogger(UnboundedEventSource.class); - - /** Configuration for generator to use when reading synthetic events. May be split. */ - private final GeneratorConfig config; - - /** How many unbounded sources to create. */ - private final int numEventGenerators; - - /** How many seconds to hold back the watermark. */ - private final long watermarkHoldbackSec; - - /** Are we rate limiting the events? */ - private final boolean isRateLimited; - - public UnboundedEventSource(GeneratorConfig config, int numEventGenerators, - long watermarkHoldbackSec, boolean isRateLimited) { - this.config = config; - this.numEventGenerators = numEventGenerators; - this.watermarkHoldbackSec = watermarkHoldbackSec; - this.isRateLimited = isRateLimited; - } - - /** A reader to pull events from the generator. */ - private class EventReader extends UnboundedReader<Event> { - /** Generator we are reading from. */ - private final Generator generator; - - /** - * Current watermark (ms since epoch). Initially set to beginning of time. - * Then updated to be the time of the next generated event. - * Then, once all events have been generated, set to the end of time. - */ - private long watermark; - - /** - * Current backlog (ms), as delay between timestamp of last returned event and the timestamp - * we should be up to according to wall-clock time. Used only for logging. - */ - private long backlogDurationMs; - - /** - * Current backlog, as estimated number of event bytes we are behind, or null if - * unknown. Reported to callers. - */ - @Nullable - private Long backlogBytes; - - /** - * Wallclock time (ms since epoch) we last reported the backlog, or -1 if never reported. - */ - private long lastReportedBacklogWallclock; - - /** - * Event time (ms since epoch) of pending event at last reported backlog, or -1 if never - * calculated. - */ - private long timestampAtLastReportedBacklogMs; - - /** Next event to make 'current' when wallclock time has advanced sufficiently. */ - @Nullable - private TimestampedValue<Event> pendingEvent; - - /** Wallclock time when {@link #pendingEvent} is due, or -1 if no pending event. */ - private long pendingEventWallclockTime; - - /** Current event to return from getCurrent. */ - @Nullable - private TimestampedValue<Event> currentEvent; - - /** Events which have been held back so as to force them to be late. */ - private Queue<Generator.NextEvent> heldBackEvents = new PriorityQueue<>(); - - public EventReader(Generator generator) { - this.generator = generator; - watermark = NexmarkUtils.BEGINNING_OF_TIME.getMillis(); - lastReportedBacklogWallclock = -1; - pendingEventWallclockTime = -1; - timestampAtLastReportedBacklogMs = -1; - } - - public EventReader(GeneratorConfig config) { - this(new Generator(config)); - } - - @Override - public boolean start() { - LOG.trace("starting unbounded generator {}", generator); - return advance(); - } - - - @Override - public boolean advance() { - long now = System.currentTimeMillis(); - - while (pendingEvent == null) { - if (!generator.hasNext() && heldBackEvents.isEmpty()) { - // No more events, EVER. - if (isRateLimited) { - updateBacklog(System.currentTimeMillis(), 0); - } - if (watermark < BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis()) { - watermark = BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis(); - LOG.trace("stopped unbounded generator {}", generator); - } - return false; - } - - Generator.NextEvent next = heldBackEvents.peek(); - if (next != null && next.wallclockTimestamp <= now) { - // Time to use the held-back event. - heldBackEvents.poll(); - LOG.debug("replaying held-back event {}ms behind watermark", - watermark - next.eventTimestamp); - } else if (generator.hasNext()) { - next = generator.nextEvent(); - if (isRateLimited && config.configuration.probDelayedEvent > 0.0 - && config.configuration.occasionalDelaySec > 0 - && ThreadLocalRandom.current().nextDouble() < config.configuration.probDelayedEvent) { - // We'll hold back this event and go around again. - long delayMs = - ThreadLocalRandom.current().nextLong(config.configuration.occasionalDelaySec * 1000) - + 1L; - LOG.debug("delaying event by {}ms", delayMs); - heldBackEvents.add(next.withDelay(delayMs)); - continue; - } - } else { - // Waiting for held-back event to fire. - if (isRateLimited) { - updateBacklog(now, 0); - } - return false; - } - - pendingEventWallclockTime = next.wallclockTimestamp; - pendingEvent = TimestampedValue.of(next.event, new Instant(next.eventTimestamp)); - long newWatermark = - next.watermark - Duration.standardSeconds(watermarkHoldbackSec).getMillis(); - if (newWatermark > watermark) { - watermark = newWatermark; - } - } - - if (isRateLimited) { - if (pendingEventWallclockTime > now) { - // We want this event to fire in the future. Try again later. - updateBacklog(now, 0); - return false; - } - updateBacklog(now, now - pendingEventWallclockTime); - } - - // This event is ready to fire. - currentEvent = pendingEvent; - pendingEvent = null; - return true; - } - - private void updateBacklog(long now, long newBacklogDurationMs) { - backlogDurationMs = newBacklogDurationMs; - long interEventDelayUs = generator.currentInterEventDelayUs(); - if (interEventDelayUs != 0) { - long backlogEvents = (backlogDurationMs * 1000 + interEventDelayUs - 1) / interEventDelayUs; - backlogBytes = generator.getCurrentConfig().estimatedBytesForEvents(backlogEvents); - } - if (lastReportedBacklogWallclock < 0 - || now - lastReportedBacklogWallclock > BACKLOG_PERIOD.getMillis()) { - double timeDialation = Double.NaN; - if (pendingEvent != null - && lastReportedBacklogWallclock >= 0 - && timestampAtLastReportedBacklogMs >= 0) { - long wallclockProgressionMs = now - lastReportedBacklogWallclock; - long eventTimeProgressionMs = - pendingEvent.getTimestamp().getMillis() - timestampAtLastReportedBacklogMs; - timeDialation = (double) eventTimeProgressionMs / (double) wallclockProgressionMs; - } - LOG.debug( - "unbounded generator backlog now {}ms ({} bytes) at {}us interEventDelay " - + "with {} time dilation", - backlogDurationMs, backlogBytes, interEventDelayUs, timeDialation); - lastReportedBacklogWallclock = now; - if (pendingEvent != null) { - timestampAtLastReportedBacklogMs = pendingEvent.getTimestamp().getMillis(); - } - } - } - - @Override - public Event getCurrent() { - if (currentEvent == null) { - throw new NoSuchElementException(); - } - return currentEvent.getValue(); - } - - @Override - public Instant getCurrentTimestamp() { - if (currentEvent == null) { - throw new NoSuchElementException(); - } - return currentEvent.getTimestamp(); - } - - @Override - public void close() { - // Nothing to close. - } - - @Override - public UnboundedEventSource getCurrentSource() { - return UnboundedEventSource.this; - } - - @Override - public Instant getWatermark() { - return new Instant(watermark); - } - - @Override - public Generator.Checkpoint getCheckpointMark() { - return generator.toCheckpoint(); - } - - @Override - public long getSplitBacklogBytes() { - return backlogBytes == null ? BACKLOG_UNKNOWN : backlogBytes; - } - - @Override - public String toString() { - return String.format("EventReader(%d, %d, %d)", - generator.getCurrentConfig().getStartEventId(), generator.getNextEventId(), - generator.getCurrentConfig().getStopEventId()); - } - } - - @Override - public Coder<Generator.Checkpoint> getCheckpointMarkCoder() { - return Generator.Checkpoint.CODER_INSTANCE; - } - - @Override - public List<UnboundedEventSource> generateInitialSplits( - int desiredNumSplits, PipelineOptions options) { - LOG.trace("splitting unbounded source into {} sub-sources", numEventGenerators); - List<UnboundedEventSource> results = new ArrayList<>(); - // Ignore desiredNumSplits and use numEventGenerators instead. - for (GeneratorConfig subConfig : config.split(numEventGenerators)) { - results.add(new UnboundedEventSource(subConfig, 1, watermarkHoldbackSec, isRateLimited)); - } - return results; - } - - @Override - public EventReader createReader( - PipelineOptions options, @Nullable Generator.Checkpoint checkpoint) { - if (checkpoint == null) { - LOG.trace("creating initial unbounded reader for {}", config); - return new EventReader(config); - } else { - LOG.trace("resuming unbounded reader from {}", checkpoint); - return new EventReader(checkpoint.toGenerator(config)); - } - } - - @Override - public void validate() { - // Nothing to validate. - } - - @Override - public Coder<Event> getDefaultOutputCoder() { - return Event.CODER; - } - - @Override - public String toString() { - return String.format( - "UnboundedEventSource(%d, %d)", config.getStartEventId(), config.getStopEventId()); - } -}
http://git-wip-us.apache.org/repos/asf/beam/blob/a7f9f7d0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/WinningBids.java ---------------------------------------------------------------------- diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/WinningBids.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/WinningBids.java index 594195a..9f1ddf8 100644 --- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/WinningBids.java +++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/WinningBids.java @@ -20,7 +20,6 @@ package org.apache.beam.integration.nexmark; import static com.google.common.base.Preconditions.checkState; import com.fasterxml.jackson.annotation.JsonCreator; - import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; @@ -31,7 +30,11 @@ import java.util.Collection; import java.util.List; import java.util.Map; import java.util.TreeMap; - +import org.apache.beam.integration.nexmark.model.Auction; +import org.apache.beam.integration.nexmark.model.AuctionBid; +import org.apache.beam.integration.nexmark.model.Bid; +import org.apache.beam.integration.nexmark.model.Event; +import org.apache.beam.integration.nexmark.sources.GeneratorConfig; import org.apache.beam.sdk.coders.AtomicCoder; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.CoderException; @@ -45,10 +48,10 @@ import org.apache.beam.sdk.transforms.Sum; import org.apache.beam.sdk.transforms.join.CoGbkResult; import org.apache.beam.sdk.transforms.join.CoGroupByKey; import org.apache.beam.sdk.transforms.join.KeyedPCollectionTuple; -import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.IntervalWindow; import org.apache.beam.sdk.transforms.windowing.Window; import org.apache.beam.sdk.transforms.windowing.WindowFn; +import org.apache.beam.sdk.transforms.windowing.WindowMappingFn; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; import org.joda.time.Instant; @@ -249,7 +252,7 @@ public class WinningBids extends PTransform<PCollection<Event>, PCollection<Auct } @Override - public AuctionOrBidWindow getSideInputWindow(BoundedWindow window) { + public WindowMappingFn<AuctionOrBidWindow> getDefaultWindowMappingFn() { throw new UnsupportedOperationException("AuctionWindowFn not supported for side inputs"); } http://git-wip-us.apache.org/repos/asf/beam/blob/a7f9f7d0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/WinningBidsSimulator.java ---------------------------------------------------------------------- diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/WinningBidsSimulator.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/WinningBidsSimulator.java index dc8094b..e7f51b7 100644 --- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/WinningBidsSimulator.java +++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/WinningBidsSimulator.java @@ -26,6 +26,10 @@ import java.util.TreeMap; import java.util.TreeSet; import javax.annotation.Nullable; +import org.apache.beam.integration.nexmark.model.Auction; +import org.apache.beam.integration.nexmark.model.AuctionBid; +import org.apache.beam.integration.nexmark.model.Bid; +import org.apache.beam.integration.nexmark.model.Event; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.values.TimestampedValue; import org.joda.time.Instant; http://git-wip-us.apache.org/repos/asf/beam/blob/a7f9f7d0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/drivers/NexmarkApexDriver.java ---------------------------------------------------------------------- diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/drivers/NexmarkApexDriver.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/drivers/NexmarkApexDriver.java new file mode 100644 index 0000000..265ccf7 --- /dev/null +++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/drivers/NexmarkApexDriver.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.integration.nexmark.drivers; + +import org.apache.beam.integration.nexmark.NexmarkDriver; +import org.apache.beam.integration.nexmark.NexmarkOptions; +import org.apache.beam.runners.apex.ApexPipelineOptions; +import org.apache.beam.runners.apex.ApexRunner; +import org.apache.beam.sdk.options.PipelineOptionsFactory; + +/** + * Run NexMark queries using the Apex runner. + */ +public class NexmarkApexDriver extends NexmarkDriver<NexmarkApexDriver.NexmarkApexOptions> { + /** + * Command line flags. + */ + public interface NexmarkApexOptions extends NexmarkOptions, ApexPipelineOptions { + } + + /** + * Entry point. + */ + public static void main(String[] args) { + // Gather command line args, baseline, configurations, etc. + NexmarkApexOptions options = PipelineOptionsFactory.fromArgs(args) + .withValidation() + .as(NexmarkApexOptions.class); + options.setRunner(ApexRunner.class); + NexmarkApexRunner runner = new NexmarkApexRunner(options); + new NexmarkApexDriver().runAll(options, runner); + } +} + + http://git-wip-us.apache.org/repos/asf/beam/blob/a7f9f7d0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/drivers/NexmarkApexRunner.java ---------------------------------------------------------------------- diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/drivers/NexmarkApexRunner.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/drivers/NexmarkApexRunner.java new file mode 100644 index 0000000..2bcf82d --- /dev/null +++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/drivers/NexmarkApexRunner.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.integration.nexmark.drivers; + +import javax.annotation.Nullable; + +import org.apache.beam.integration.nexmark.NexmarkPerf; +import org.apache.beam.integration.nexmark.NexmarkQuery; +import org.apache.beam.integration.nexmark.NexmarkRunner; + +/** + * Run a query using the Apex runner. + */ +public class NexmarkApexRunner extends NexmarkRunner<NexmarkApexDriver.NexmarkApexOptions> { + @Override + protected boolean isStreaming() { + return options.isStreaming(); + } + + @Override + protected int coresPerWorker() { + return 4; + } + + @Override + protected int maxNumWorkers() { + return 5; + } + + @Override + protected void invokeBuilderForPublishOnlyPipeline( + PipelineBuilder builder) { + builder.build(options); + } + + @Override + protected void waitForPublisherPreload() { + throw new UnsupportedOperationException(); + } + + @Override + @Nullable + protected NexmarkPerf monitor(NexmarkQuery query) { + return null; + } + + public NexmarkApexRunner(NexmarkApexDriver.NexmarkApexOptions options) { + super(options); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/a7f9f7d0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/drivers/NexmarkDirectDriver.java ---------------------------------------------------------------------- diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/drivers/NexmarkDirectDriver.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/drivers/NexmarkDirectDriver.java new file mode 100644 index 0000000..2b825f3 --- /dev/null +++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/drivers/NexmarkDirectDriver.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.integration.nexmark.drivers; + +import org.apache.beam.integration.nexmark.NexmarkDriver; +import org.apache.beam.integration.nexmark.NexmarkOptions; +import org.apache.beam.runners.direct.DirectOptions; +import org.apache.beam.runners.direct.DirectRunner; +import org.apache.beam.sdk.options.PipelineOptionsFactory; + +/** + * An implementation of the 'NEXMark queries' using the Direct Runner. + */ +class NexmarkDirectDriver extends NexmarkDriver<NexmarkDirectDriver.NexmarkDirectOptions> { + /** + * Command line flags. + */ + public interface NexmarkDirectOptions extends NexmarkOptions, DirectOptions { + } + + /** + * Entry point. + */ + public static void main(String[] args) { + NexmarkDirectOptions options = + PipelineOptionsFactory.fromArgs(args) + .withValidation() + .as(NexmarkDirectOptions.class); + options.setRunner(DirectRunner.class); + NexmarkDirectRunner runner = new NexmarkDirectRunner(options); + new NexmarkDirectDriver().runAll(options, runner); + } +} + http://git-wip-us.apache.org/repos/asf/beam/blob/a7f9f7d0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/drivers/NexmarkDirectRunner.java ---------------------------------------------------------------------- diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/drivers/NexmarkDirectRunner.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/drivers/NexmarkDirectRunner.java new file mode 100644 index 0000000..1391040 --- /dev/null +++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/drivers/NexmarkDirectRunner.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.integration.nexmark.drivers; + +import org.apache.beam.integration.nexmark.NexmarkRunner; + +/** + * Run a single query using the Direct Runner. + */ +class NexmarkDirectRunner extends NexmarkRunner<NexmarkDirectDriver.NexmarkDirectOptions> { + public NexmarkDirectRunner(NexmarkDirectDriver.NexmarkDirectOptions options) { + super(options); + } + + @Override + protected boolean isStreaming() { + return options.isStreaming(); + } + + @Override + protected int coresPerWorker() { + return 4; + } + + @Override + protected int maxNumWorkers() { + return 1; + } + + @Override + protected void invokeBuilderForPublishOnlyPipeline(PipelineBuilder builder) { + throw new UnsupportedOperationException( + "Cannot use --pubSubMode=COMBINED with DirectRunner"); + } + + /** + * Monitor the progress of the publisher job. Return when it has been generating events for + * at least {@code configuration.preloadSeconds}. + */ + @Override + protected void waitForPublisherPreload() { + throw new UnsupportedOperationException( + "Cannot use --pubSubMode=COMBINED with DirectRunner"); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/a7f9f7d0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/drivers/NexmarkFlinkDriver.java ---------------------------------------------------------------------- diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/drivers/NexmarkFlinkDriver.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/drivers/NexmarkFlinkDriver.java new file mode 100644 index 0000000..bf0b115 --- /dev/null +++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/drivers/NexmarkFlinkDriver.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.integration.nexmark.drivers; + +import org.apache.beam.integration.nexmark.NexmarkDriver; +import org.apache.beam.integration.nexmark.NexmarkOptions; +import org.apache.beam.runners.flink.FlinkPipelineOptions; +import org.apache.beam.runners.flink.FlinkRunner; +import org.apache.beam.sdk.options.PipelineOptionsFactory; + +/** + * Run NexMark queries using the Flink runner. + */ +public class NexmarkFlinkDriver extends NexmarkDriver<NexmarkFlinkDriver.NexmarkFlinkOptions> { + /** + * Command line flags. + */ + public interface NexmarkFlinkOptions extends NexmarkOptions, FlinkPipelineOptions { + } + + /** + * Entry point. + */ + public static void main(String[] args) { + // Gather command line args, baseline, configurations, etc. + NexmarkFlinkOptions options = PipelineOptionsFactory.fromArgs(args) + .withValidation() + .as(NexmarkFlinkOptions.class); + options.setRunner(FlinkRunner.class); + NexmarkFlinkRunner runner = new NexmarkFlinkRunner(options); + new NexmarkFlinkDriver().runAll(options, runner); + } +} + + http://git-wip-us.apache.org/repos/asf/beam/blob/a7f9f7d0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/drivers/NexmarkFlinkRunner.java ---------------------------------------------------------------------- diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/drivers/NexmarkFlinkRunner.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/drivers/NexmarkFlinkRunner.java new file mode 100644 index 0000000..9d547ef --- /dev/null +++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/drivers/NexmarkFlinkRunner.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.integration.nexmark.drivers; + +import org.apache.beam.integration.nexmark.NexmarkRunner; + +/** + * Run a query using the Flink runner. + */ +public class NexmarkFlinkRunner extends NexmarkRunner<NexmarkFlinkDriver.NexmarkFlinkOptions> { + @Override + protected boolean isStreaming() { + return options.isStreaming(); + } + + @Override + protected int coresPerWorker() { + return 4; + } + + @Override + protected int maxNumWorkers() { + return 5; + } + + @Override + protected void invokeBuilderForPublishOnlyPipeline( + PipelineBuilder builder) { + builder.build(options); + } + + @Override + protected void waitForPublisherPreload() { + throw new UnsupportedOperationException(); + } + + public NexmarkFlinkRunner(NexmarkFlinkDriver.NexmarkFlinkOptions options) { + super(options); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/a7f9f7d0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/drivers/NexmarkGoogleDriver.java ---------------------------------------------------------------------- diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/drivers/NexmarkGoogleDriver.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/drivers/NexmarkGoogleDriver.java new file mode 100644 index 0000000..f5a9751 --- /dev/null +++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/drivers/NexmarkGoogleDriver.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.integration.nexmark.drivers; + +import org.apache.beam.integration.nexmark.NexmarkDriver; +import org.apache.beam.integration.nexmark.NexmarkOptions; +import org.apache.beam.integration.nexmark.model.Auction; +import org.apache.beam.integration.nexmark.model.Bid; +import org.apache.beam.integration.nexmark.model.Person; +import org.apache.beam.runners.dataflow.DataflowRunner; +import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions; +import org.apache.beam.sdk.options.PipelineOptionsFactory; + +/** + * An implementation of the 'NEXMark queries' for Google Dataflow. + * These are multiple queries over a three table schema representing an online auction system: + * <ul> + * <li>{@link Person} represents a person submitting an item for auction and/or making a bid + * on an auction. + * <li>{@link Auction} represents an item under auction. + * <li>{@link Bid} represents a bid for an item under auction. + * </ul> + * The queries exercise many aspects of streaming dataflow. + * + * <p>We synthesize the creation of people, auctions and bids in real-time. The data is not + * particularly sensible. + * + * <p>See <a href="http://datalab.cs.pdx.edu/niagaraST/NEXMark/"> + * http://datalab.cs.pdx.edu/niagaraST/NEXMark/</a> + */ +class NexmarkGoogleDriver extends NexmarkDriver<NexmarkGoogleDriver.NexmarkGoogleOptions> { + /** + * Command line flags. + */ + public interface NexmarkGoogleOptions extends NexmarkOptions, DataflowPipelineOptions { + + } + + /** + * Entry point. + */ + public static void main(String[] args) { + // Gather command line args, baseline, configurations, etc. + NexmarkGoogleOptions options = PipelineOptionsFactory.fromArgs(args) + .withValidation() + .as(NexmarkGoogleOptions.class); + options.setRunner(DataflowRunner.class); + NexmarkGoogleRunner runner = new NexmarkGoogleRunner(options); + new NexmarkGoogleDriver().runAll(options, runner); + } +} + http://git-wip-us.apache.org/repos/asf/beam/blob/a7f9f7d0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/drivers/NexmarkGoogleRunner.java ---------------------------------------------------------------------- diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/drivers/NexmarkGoogleRunner.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/drivers/NexmarkGoogleRunner.java new file mode 100644 index 0000000..7ffd47a --- /dev/null +++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/drivers/NexmarkGoogleRunner.java @@ -0,0 +1,163 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.integration.nexmark.drivers; + +import static com.google.common.base.Preconditions.checkNotNull; + +import org.apache.beam.integration.nexmark.Monitor; +import org.apache.beam.integration.nexmark.NexmarkRunner; +import org.apache.beam.integration.nexmark.NexmarkUtils; +import org.apache.beam.integration.nexmark.model.Event; +import org.apache.beam.runners.dataflow.DataflowPipelineJob; +import org.apache.beam.sdk.PipelineResult; +import org.joda.time.Duration; + +/** + * Run a singe Nexmark query using a given configuration on Google Dataflow. + */ +class NexmarkGoogleRunner extends NexmarkRunner<NexmarkGoogleDriver.NexmarkGoogleOptions> { + + public NexmarkGoogleRunner(NexmarkGoogleDriver.NexmarkGoogleOptions options) { + super(options); + } + + @Override + protected boolean isStreaming() { + return options.isStreaming(); + } + + @Override + protected int coresPerWorker() { + String machineType = options.getWorkerMachineType(); + if (machineType == null || machineType.isEmpty()) { + return 1; + } + String[] split = machineType.split("-"); + if (split.length != 3) { + return 1; + } + try { + return Integer.parseInt(split[2]); + } catch (NumberFormatException ex) { + return 1; + } + } + + @Override + protected int maxNumWorkers() { + return Math.max(options.getNumWorkers(), options.getMaxNumWorkers()); + } + + @Override + protected String getJobId(PipelineResult job) { + return ((DataflowPipelineJob) job).getJobId(); + } + + @Override + protected void invokeBuilderForPublishOnlyPipeline(PipelineBuilder builder) { + String jobName = options.getJobName(); + String appName = options.getAppName(); + options.setJobName("p-" + jobName); + options.setAppName("p-" + appName); + int coresPerWorker = coresPerWorker(); + int eventGeneratorWorkers = (configuration.numEventGenerators + coresPerWorker - 1) + / coresPerWorker; + options.setMaxNumWorkers(Math.min(options.getMaxNumWorkers(), eventGeneratorWorkers)); + options.setNumWorkers(Math.min(options.getNumWorkers(), eventGeneratorWorkers)); + publisherMonitor = new Monitor<Event>(queryName, "publisher"); + try { + builder.build(options); + } finally { + options.setJobName(jobName); + options.setAppName(appName); + options.setMaxNumWorkers(options.getMaxNumWorkers()); + options.setNumWorkers(options.getNumWorkers()); + } + } + + /** + * Monitor the progress of the publisher job. Return when it has been generating events for + * at least {@code configuration.preloadSeconds}. + */ + @Override + protected void waitForPublisherPreload() { + checkNotNull(publisherMonitor); + checkNotNull(publisherResult); + if (!options.getMonitorJobs()) { + return; + } + if (!(publisherResult instanceof DataflowPipelineJob)) { + return; + } + if (configuration.preloadSeconds <= 0) { + return; + } + + NexmarkUtils.console("waiting for publisher to pre-load"); + + DataflowPipelineJob job = (DataflowPipelineJob) publisherResult; + + long numEvents = 0; + long startMsSinceEpoch = -1; + long endMsSinceEpoch = -1; + while (true) { + PipelineResult.State state = job.getState(); + switch (state) { + case UNKNOWN: + // Keep waiting. + NexmarkUtils.console("%s publisher (%d events)", state, numEvents); + break; + case STOPPED: + case DONE: + case CANCELLED: + case FAILED: + case UPDATED: + NexmarkUtils.console("%s publisher (%d events)", state, numEvents); + return; + case RUNNING: + numEvents = getLong(job, publisherMonitor.getElementCounter()); + if (startMsSinceEpoch < 0 && numEvents > 0) { + startMsSinceEpoch = System.currentTimeMillis(); + endMsSinceEpoch = startMsSinceEpoch + + Duration.standardSeconds(configuration.preloadSeconds).getMillis(); + } + if (endMsSinceEpoch < 0) { + NexmarkUtils.console("%s publisher (%d events)", state, numEvents); + } else { + long remainMs = endMsSinceEpoch - System.currentTimeMillis(); + if (remainMs > 0) { + NexmarkUtils.console("%s publisher (%d events, waiting for %ds)", state, numEvents, + remainMs / 1000); + } else { + NexmarkUtils.console("publisher preloaded %d events", numEvents); + return; + } + } + break; + } + + try { + Thread.sleep(PERF_DELAY.getMillis()); + } catch (InterruptedException e) { + Thread.interrupted(); + throw new RuntimeException("Interrupted: publisher still running."); + } + } + } + +} http://git-wip-us.apache.org/repos/asf/beam/blob/a7f9f7d0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/drivers/NexmarkSparkDriver.java ---------------------------------------------------------------------- diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/drivers/NexmarkSparkDriver.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/drivers/NexmarkSparkDriver.java new file mode 100644 index 0000000..c7c32c2 --- /dev/null +++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/drivers/NexmarkSparkDriver.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.integration.nexmark.drivers; + +import org.apache.beam.integration.nexmark.NexmarkDriver; +import org.apache.beam.integration.nexmark.NexmarkOptions; +import org.apache.beam.runners.spark.SparkPipelineOptions; +import org.apache.beam.runners.spark.SparkRunner; +import org.apache.beam.sdk.options.PipelineOptionsFactory; + +/** + * Run NexMark queries using the Spark runner. + */ +class NexmarkSparkDriver extends NexmarkDriver<NexmarkSparkDriver.NexmarkSparkOptions> { + /** + * Command line flags. + */ + public interface NexmarkSparkOptions extends NexmarkOptions, SparkPipelineOptions { + } + + /** + * Entry point. + */ + public static void main(String[] args) { + NexmarkSparkOptions options = + PipelineOptionsFactory.fromArgs(args) + .withValidation() + .as(NexmarkSparkOptions.class); + options.setRunner(SparkRunner.class); + NexmarkSparkRunner runner = new NexmarkSparkRunner(options); + new NexmarkSparkDriver().runAll(options, runner); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/a7f9f7d0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/drivers/NexmarkSparkRunner.java ---------------------------------------------------------------------- diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/drivers/NexmarkSparkRunner.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/drivers/NexmarkSparkRunner.java new file mode 100644 index 0000000..1d49a3a --- /dev/null +++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/drivers/NexmarkSparkRunner.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.integration.nexmark.drivers; + +import org.apache.beam.integration.nexmark.NexmarkRunner; + +/** + * Run a query using the Spark runner. + */ +public class NexmarkSparkRunner extends NexmarkRunner<NexmarkSparkDriver.NexmarkSparkOptions> { + @Override + protected boolean isStreaming() { + return options.isStreaming(); + } + + @Override + protected int coresPerWorker() { + return 4; + } + + @Override + protected int maxNumWorkers() { + return 5; + } + + @Override + protected void invokeBuilderForPublishOnlyPipeline( + PipelineBuilder builder) { + builder.build(options); + } + + @Override + protected void waitForPublisherPreload() { + throw new UnsupportedOperationException(); + } + + + public NexmarkSparkRunner(NexmarkSparkDriver.NexmarkSparkOptions options) { + super(options); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/a7f9f7d0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/drivers/package-info.java ---------------------------------------------------------------------- diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/drivers/package-info.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/drivers/package-info.java new file mode 100644 index 0000000..c8aa144 --- /dev/null +++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/drivers/package-info.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Nexmark Benchmark Execution Drivers. + */ +package org.apache.beam.integration.nexmark.drivers; http://git-wip-us.apache.org/repos/asf/beam/blob/a7f9f7d0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/io/PubsubHelper.java ---------------------------------------------------------------------- diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/io/PubsubHelper.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/io/PubsubHelper.java new file mode 100644 index 0000000..f5cfc2b --- /dev/null +++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/io/PubsubHelper.java @@ -0,0 +1,217 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.integration.nexmark.io; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; + +import org.apache.beam.integration.nexmark.NexmarkUtils; +import org.apache.beam.sdk.options.PubsubOptions; +import org.apache.beam.sdk.util.PubsubClient; +import org.apache.beam.sdk.util.PubsubJsonClient; + +/** + * Helper for working with pubsub. + */ +public class PubsubHelper implements AutoCloseable { + /** + * Underlying pub/sub client. + */ + private final PubsubClient pubsubClient; + + /** + * Project id. + */ + private final String projectId; + + /** + * Topics we should delete on close. + */ + private final List<PubsubClient.TopicPath> createdTopics; + + /** + * Subscriptions we should delete on close. + */ + private final List<PubsubClient.SubscriptionPath> createdSubscriptions; + + private PubsubHelper(PubsubClient pubsubClient, String projectId) { + this.pubsubClient = pubsubClient; + this.projectId = projectId; + createdTopics = new ArrayList<>(); + createdSubscriptions = new ArrayList<>(); + } + + /** + * Create a helper. + */ + public static PubsubHelper create(PubsubOptions options) { + try { + return new PubsubHelper( + PubsubJsonClient.FACTORY.newClient(null, null, options), + options.getProject()); + } catch (IOException e) { + throw new RuntimeException("Unable to create Pubsub client: ", e); + } + } + + /** + * Create a topic from short name. Delete it if it already exists. Ensure the topic will be + * deleted on cleanup. Return full topic name. + */ + public PubsubClient.TopicPath createTopic(String shortTopic) { + PubsubClient.TopicPath topic = PubsubClient.topicPathFromName(projectId, shortTopic); + try { + if (topicExists(shortTopic)) { + NexmarkUtils.console("attempting to cleanup topic %s", topic); + pubsubClient.deleteTopic(topic); + } + NexmarkUtils.console("create topic %s", topic); + pubsubClient.createTopic(topic); + createdTopics.add(topic); + return topic; + } catch (IOException e) { + throw new RuntimeException("Unable to create Pubsub topic " + topic + ": ", e); + } + } + + /** + * Create a topic from short name if it does not already exist. The topic will not be + * deleted on cleanup. Return full topic name. + */ + public PubsubClient.TopicPath createOrReuseTopic(String shortTopic) { + PubsubClient.TopicPath topic = PubsubClient.topicPathFromName(projectId, shortTopic); + try { + if (topicExists(shortTopic)) { + NexmarkUtils.console("topic %s already exists", topic); + return topic; + } + NexmarkUtils.console("create topic %s", topic); + pubsubClient.createTopic(topic); + return topic; + } catch (IOException e) { + throw new RuntimeException("Unable to create or reuse Pubsub topic " + topic + ": ", e); + } + } + + /** + * Check a topic corresponding to short name exists, and throw exception if not. The + * topic will not be deleted on cleanup. Return full topic name. + */ + public PubsubClient.TopicPath reuseTopic(String shortTopic) { + PubsubClient.TopicPath topic = PubsubClient.topicPathFromName(projectId, shortTopic); + if (topicExists(shortTopic)) { + NexmarkUtils.console("reusing existing topic %s", topic); + return topic; + } + throw new RuntimeException("topic '" + topic + "' does not already exist"); + } + + /** + * Does topic corresponding to short name exist? + */ + public boolean topicExists(String shortTopic) { + PubsubClient.ProjectPath project = PubsubClient.projectPathFromId(projectId); + PubsubClient.TopicPath topic = PubsubClient.topicPathFromName(projectId, shortTopic); + try { + Collection<PubsubClient.TopicPath> existingTopics = pubsubClient.listTopics(project); + return existingTopics.contains(topic); + } catch (IOException e) { + throw new RuntimeException("Unable to check Pubsub topic " + topic + ": ", e); + } + } + + /** + * Create subscription from short name. Delete subscription if it already exists. Ensure the + * subscription will be deleted on cleanup. Return full subscription name. + */ + public PubsubClient.SubscriptionPath createSubscription( + String shortTopic, String shortSubscription) { + PubsubClient.TopicPath topic = PubsubClient.topicPathFromName(projectId, shortTopic); + PubsubClient.SubscriptionPath subscription = + PubsubClient.subscriptionPathFromName(projectId, shortSubscription); + try { + if (subscriptionExists(shortTopic, shortSubscription)) { + NexmarkUtils.console("attempting to cleanup subscription %s", subscription); + pubsubClient.deleteSubscription(subscription); + } + NexmarkUtils.console("create subscription %s", subscription); + pubsubClient.createSubscription(topic, subscription, 60); + createdSubscriptions.add(subscription); + } catch (IOException e) { + throw new RuntimeException("Unable to create Pubsub subscription " + subscription + ": ", e); + } + return subscription; + } + + /** + * Check a subscription corresponding to short name exists, and throw exception if not. The + * subscription will not be deleted on cleanup. Return full topic name. + */ + public PubsubClient.SubscriptionPath reuseSubscription( + String shortTopic, String shortSubscription) { + PubsubClient.SubscriptionPath subscription = + PubsubClient.subscriptionPathFromName(projectId, shortSubscription); + if (subscriptionExists(shortTopic, shortSubscription)) { + NexmarkUtils.console("reusing existing subscription %s", subscription); + return subscription; + } + throw new RuntimeException("subscription'" + subscription + "' does not already exist"); + } + + /** + * Does subscription corresponding to short name exist? + */ + public boolean subscriptionExists(String shortTopic, String shortSubscription) { + PubsubClient.ProjectPath project = PubsubClient.projectPathFromId(projectId); + PubsubClient.TopicPath topic = PubsubClient.topicPathFromName(projectId, shortTopic); + PubsubClient.SubscriptionPath subscription = + PubsubClient.subscriptionPathFromName(projectId, shortSubscription); + try { + Collection<PubsubClient.SubscriptionPath> existingSubscriptions = + pubsubClient.listSubscriptions(project, topic); + return existingSubscriptions.contains(subscription); + } catch (IOException e) { + throw new RuntimeException("Unable to check Pubsub subscription" + subscription + ": ", e); + } + } + + /** + * Delete all the subscriptions and topics we created. + */ + @Override + public void close() { + for (PubsubClient.SubscriptionPath subscription : createdSubscriptions) { + try { + NexmarkUtils.console("delete subscription %s", subscription); + pubsubClient.deleteSubscription(subscription); + } catch (IOException ex) { + NexmarkUtils.console("could not delete subscription %s", subscription); + } + } + for (PubsubClient.TopicPath topic : createdTopics) { + try { + NexmarkUtils.console("delete topic %s", topic); + pubsubClient.deleteTopic(topic); + } catch (IOException ex) { + NexmarkUtils.console("could not delete topic %s", topic); + } + } + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/a7f9f7d0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/io/package-info.java ---------------------------------------------------------------------- diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/io/package-info.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/io/package-info.java new file mode 100644 index 0000000..1161f3e --- /dev/null +++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/io/package-info.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Nexmark Beam IO related utilities. + */ +package org.apache.beam.integration.nexmark.io; http://git-wip-us.apache.org/repos/asf/beam/blob/a7f9f7d0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/Auction.java ---------------------------------------------------------------------- diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/Auction.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/Auction.java new file mode 100644 index 0000000..ac30568 --- /dev/null +++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/Auction.java @@ -0,0 +1,190 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.integration.nexmark.model; + +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.core.JsonProcessingException; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.io.Serializable; + +import org.apache.beam.integration.nexmark.NexmarkUtils; +import org.apache.beam.sdk.coders.AtomicCoder; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.CoderException; +import org.apache.beam.sdk.coders.StringUtf8Coder; +import org.apache.beam.sdk.coders.VarLongCoder; + +/** + * An auction submitted by a person. + */ +public class Auction implements KnownSize, Serializable { + private static final Coder<Long> LONG_CODER = VarLongCoder.of(); + private static final Coder<String> STRING_CODER = StringUtf8Coder.of(); + + public static final Coder<Auction> CODER = new AtomicCoder<Auction>() { + @Override + public void encode(Auction value, OutputStream outStream, + Coder.Context context) + throws CoderException, IOException { + LONG_CODER.encode(value.id, outStream, Context.NESTED); + STRING_CODER.encode(value.itemName, outStream, Context.NESTED); + STRING_CODER.encode(value.description, outStream, Context.NESTED); + LONG_CODER.encode(value.initialBid, outStream, Context.NESTED); + LONG_CODER.encode(value.reserve, outStream, Context.NESTED); + LONG_CODER.encode(value.dateTime, outStream, Context.NESTED); + LONG_CODER.encode(value.expires, outStream, Context.NESTED); + LONG_CODER.encode(value.seller, outStream, Context.NESTED); + LONG_CODER.encode(value.category, outStream, Context.NESTED); + STRING_CODER.encode(value.extra, outStream, Context.NESTED); + } + + @Override + public Auction decode( + InputStream inStream, Coder.Context context) + throws CoderException, IOException { + long id = LONG_CODER.decode(inStream, Context.NESTED); + String itemName = STRING_CODER.decode(inStream, Context.NESTED); + String description = STRING_CODER.decode(inStream, Context.NESTED); + long initialBid = LONG_CODER.decode(inStream, Context.NESTED); + long reserve = LONG_CODER.decode(inStream, Context.NESTED); + long dateTime = LONG_CODER.decode(inStream, Context.NESTED); + long expires = LONG_CODER.decode(inStream, Context.NESTED); + long seller = LONG_CODER.decode(inStream, Context.NESTED); + long category = LONG_CODER.decode(inStream, Context.NESTED); + String extra = STRING_CODER.decode(inStream, Context.NESTED); + return new Auction( + id, itemName, description, initialBid, reserve, dateTime, expires, seller, category, + extra); + } + }; + + + /** Id of auction. */ + @JsonProperty + public final long id; // primary key + + /** Extra auction properties. */ + @JsonProperty + public final String itemName; + + @JsonProperty + public final String description; + + /** Initial bid price, in cents. */ + @JsonProperty + public final long initialBid; + + /** Reserve price, in cents. */ + @JsonProperty + public final long reserve; + + @JsonProperty + public final long dateTime; + + /** When does auction expire? (ms since epoch). Bids at or after this time are ignored. */ + @JsonProperty + public final long expires; + + /** Id of person who instigated auction. */ + @JsonProperty + public final long seller; // foreign key: Person.id + + /** Id of category auction is listed under. */ + @JsonProperty + public final long category; // foreign key: Category.id + + /** Additional arbitrary payload for performance testing. */ + @JsonProperty + public final String extra; + + + // For Avro only. + @SuppressWarnings("unused") + private Auction() { + id = 0; + itemName = null; + description = null; + initialBid = 0; + reserve = 0; + dateTime = 0; + expires = 0; + seller = 0; + category = 0; + extra = null; + } + + public Auction(long id, String itemName, String description, long initialBid, long reserve, + long dateTime, long expires, long seller, long category, String extra) { + this.id = id; + this.itemName = itemName; + this.description = description; + this.initialBid = initialBid; + this.reserve = reserve; + this.dateTime = dateTime; + this.expires = expires; + this.seller = seller; + this.category = category; + this.extra = extra; + } + + /** + * Return a copy of auction which capture the given annotation. + * (Used for debugging). + */ + public Auction withAnnotation(String annotation) { + return new Auction(id, itemName, description, initialBid, reserve, dateTime, expires, seller, + category, annotation + ": " + extra); + } + + /** + * Does auction have {@code annotation}? (Used for debugging.) + */ + public boolean hasAnnotation(String annotation) { + return extra.startsWith(annotation + ": "); + } + + /** + * Remove {@code annotation} from auction. (Used for debugging.) + */ + public Auction withoutAnnotation(String annotation) { + if (hasAnnotation(annotation)) { + return new Auction(id, itemName, description, initialBid, reserve, dateTime, expires, seller, + category, extra.substring(annotation.length() + 2)); + } else { + return this; + } + } + + @Override + public long sizeInBytes() { + return 8 + itemName.length() + 1 + description.length() + 1 + 8 + 8 + 8 + 8 + 8 + 8 + + extra.length() + 1; + } + + @Override + public String toString() { + try { + return NexmarkUtils.MAPPER.writeValueAsString(this); + } catch (JsonProcessingException e) { + throw new RuntimeException(e); + } + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/a7f9f7d0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/AuctionBid.java ---------------------------------------------------------------------- diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/AuctionBid.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/AuctionBid.java new file mode 100644 index 0000000..c014257 --- /dev/null +++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/AuctionBid.java @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.integration.nexmark.model; + +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.core.JsonProcessingException; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.io.Serializable; + +import org.apache.beam.integration.nexmark.NexmarkUtils; +import org.apache.beam.integration.nexmark.WinningBids; +import org.apache.beam.sdk.coders.AtomicCoder; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.CoderException; + +/** + * Result of {@link WinningBids} transform. + */ +public class AuctionBid implements KnownSize, Serializable { + public static final Coder<AuctionBid> CODER = new AtomicCoder<AuctionBid>() { + @Override + public void encode(AuctionBid value, OutputStream outStream, + Coder.Context context) + throws CoderException, IOException { + Auction.CODER.encode(value.auction, outStream, Context.NESTED); + Bid.CODER.encode(value.bid, outStream, Context.NESTED); + } + + @Override + public AuctionBid decode( + InputStream inStream, Coder.Context context) + throws CoderException, IOException { + Auction auction = Auction.CODER.decode(inStream, Context.NESTED); + Bid bid = Bid.CODER.decode(inStream, Context.NESTED); + return new AuctionBid(auction, bid); + } + }; + + @JsonProperty + public final Auction auction; + + @JsonProperty + public final Bid bid; + + // For Avro only. + @SuppressWarnings("unused") + private AuctionBid() { + auction = null; + bid = null; + } + + public AuctionBid(Auction auction, Bid bid) { + this.auction = auction; + this.bid = bid; + } + + @Override + public long sizeInBytes() { + return auction.sizeInBytes() + bid.sizeInBytes(); + } + + @Override + public String toString() { + try { + return NexmarkUtils.MAPPER.writeValueAsString(this); + } catch (JsonProcessingException e) { + throw new RuntimeException(e); + } + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/a7f9f7d0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/AuctionCount.java ---------------------------------------------------------------------- diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/AuctionCount.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/AuctionCount.java new file mode 100644 index 0000000..aa16629 --- /dev/null +++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/AuctionCount.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.integration.nexmark.model; + +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.core.JsonProcessingException; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.io.Serializable; + +import org.apache.beam.integration.nexmark.NexmarkUtils; +import org.apache.beam.sdk.coders.AtomicCoder; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.CoderException; +import org.apache.beam.sdk.coders.VarLongCoder; + +/** + * Result of Query5. + */ +public class AuctionCount implements KnownSize, Serializable { + private static final Coder<Long> LONG_CODER = VarLongCoder.of(); + + public static final Coder<AuctionCount> CODER = new AtomicCoder<AuctionCount>() { + @Override + public void encode(AuctionCount value, OutputStream outStream, + Coder.Context context) + throws CoderException, IOException { + LONG_CODER.encode(value.auction, outStream, Context.NESTED); + LONG_CODER.encode(value.count, outStream, Context.NESTED); + } + + @Override + public AuctionCount decode( + InputStream inStream, Coder.Context context) + throws CoderException, IOException { + long auction = LONG_CODER.decode(inStream, Context.NESTED); + long count = LONG_CODER.decode(inStream, Context.NESTED); + return new AuctionCount(auction, count); + } + }; + + @JsonProperty + public final long auction; + + @JsonProperty + public final long count; + + // For Avro only. + @SuppressWarnings("unused") + private AuctionCount() { + auction = 0; + count = 0; + } + + public AuctionCount(long auction, long count) { + this.auction = auction; + this.count = count; + } + + @Override + public long sizeInBytes() { + return 8 + 8; + } + + @Override + public String toString() { + try { + return NexmarkUtils.MAPPER.writeValueAsString(this); + } catch (JsonProcessingException e) { + throw new RuntimeException(e); + } + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/a7f9f7d0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/AuctionPrice.java ---------------------------------------------------------------------- diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/AuctionPrice.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/AuctionPrice.java new file mode 100644 index 0000000..f365cc8 --- /dev/null +++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/AuctionPrice.java @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.integration.nexmark.model; + +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.core.JsonProcessingException; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.io.Serializable; + +import org.apache.beam.integration.nexmark.NexmarkUtils; +import org.apache.beam.sdk.coders.AtomicCoder; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.CoderException; +import org.apache.beam.sdk.coders.VarLongCoder; + +/** + * Result of Query2. + */ +public class AuctionPrice implements KnownSize, Serializable { + private static final Coder<Long> LONG_CODER = VarLongCoder.of(); + + public static final Coder<AuctionPrice> CODER = new AtomicCoder<AuctionPrice>() { + @Override + public void encode(AuctionPrice value, OutputStream outStream, + Coder.Context context) + throws CoderException, IOException { + LONG_CODER.encode(value.auction, outStream, Context.NESTED); + LONG_CODER.encode(value.price, outStream, Context.NESTED); + } + + @Override + public AuctionPrice decode( + InputStream inStream, Coder.Context context) + throws CoderException, IOException { + long auction = LONG_CODER.decode(inStream, Context.NESTED); + long price = LONG_CODER.decode(inStream, Context.NESTED); + return new AuctionPrice(auction, price); + } + }; + + @JsonProperty + public final long auction; + + /** Price in cents. */ + @JsonProperty + public final long price; + + // For Avro only. + @SuppressWarnings("unused") + private AuctionPrice() { + auction = 0; + price = 0; + } + + public AuctionPrice(long auction, long price) { + this.auction = auction; + this.price = price; + } + + @Override + public long sizeInBytes() { + return 8 + 8; + } + + @Override + public String toString() { + try { + return NexmarkUtils.MAPPER.writeValueAsString(this); + } catch (JsonProcessingException e) { + throw new RuntimeException(e); + } + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/a7f9f7d0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/Bid.java ---------------------------------------------------------------------- diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/Bid.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/Bid.java new file mode 100644 index 0000000..59a33c1 --- /dev/null +++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/Bid.java @@ -0,0 +1,178 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.integration.nexmark.model; + +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.core.JsonProcessingException; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.io.Serializable; +import java.util.Comparator; + +import org.apache.beam.integration.nexmark.NexmarkUtils; +import org.apache.beam.sdk.coders.AtomicCoder; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.CoderException; +import org.apache.beam.sdk.coders.StringUtf8Coder; +import org.apache.beam.sdk.coders.VarLongCoder; + +/** + * A bid for an item on auction. + */ +public class Bid implements KnownSize, Serializable { + private static final Coder<Long> LONG_CODER = VarLongCoder.of(); + private static final Coder<String> STRING_CODER = StringUtf8Coder.of(); + + public static final Coder<Bid> CODER = new AtomicCoder<Bid>() { + @Override + public void encode(Bid value, OutputStream outStream, + Coder.Context context) + throws CoderException, IOException { + LONG_CODER.encode(value.auction, outStream, Context.NESTED); + LONG_CODER.encode(value.bidder, outStream, Context.NESTED); + LONG_CODER.encode(value.price, outStream, Context.NESTED); + LONG_CODER.encode(value.dateTime, outStream, Context.NESTED); + STRING_CODER.encode(value.extra, outStream, Context.NESTED); + } + + @Override + public Bid decode( + InputStream inStream, Coder.Context context) + throws CoderException, IOException { + long auction = LONG_CODER.decode(inStream, Context.NESTED); + long bidder = LONG_CODER.decode(inStream, Context.NESTED); + long price = LONG_CODER.decode(inStream, Context.NESTED); + long dateTime = LONG_CODER.decode(inStream, Context.NESTED); + String extra = STRING_CODER.decode(inStream, Context.NESTED); + return new Bid(auction, bidder, price, dateTime, extra); + } + }; + + /** + * Comparator to order bids by ascending price then descending time + * (for finding winning bids). + */ + public static final Comparator<Bid> PRICE_THEN_DESCENDING_TIME = new Comparator<Bid>() { + @Override + public int compare(Bid left, Bid right) { + int i = Double.compare(left.price, right.price); + if (i != 0) { + return i; + } + return Long.compare(right.dateTime, left.dateTime); + } + }; + + /** + * Comparator to order bids by ascending time then ascending price. + * (for finding most recent bids). + */ + public static final Comparator<Bid> ASCENDING_TIME_THEN_PRICE = new Comparator<Bid>() { + @Override + public int compare(Bid left, Bid right) { + int i = Long.compare(left.dateTime, right.dateTime); + if (i != 0) { + return i; + } + return Double.compare(left.price, right.price); + } + }; + + /** Id of auction this bid is for. */ + @JsonProperty + public final long auction; // foreign key: Auction.id + + /** Id of person bidding in auction. */ + @JsonProperty + public final long bidder; // foreign key: Person.id + + /** Price of bid, in cents. */ + @JsonProperty + public final long price; + + /** + * Instant at which bid was made (ms since epoch). + * NOTE: This may be earlier than the system's event time. + */ + @JsonProperty + public final long dateTime; + + /** Additional arbitrary payload for performance testing. */ + @JsonProperty + public final String extra; + + // For Avro only. + @SuppressWarnings("unused") + private Bid() { + auction = 0; + bidder = 0; + price = 0; + dateTime = 0; + extra = null; + } + + public Bid(long auction, long bidder, long price, long dateTime, String extra) { + this.auction = auction; + this.bidder = bidder; + this.price = price; + this.dateTime = dateTime; + this.extra = extra; + } + + /** + * Return a copy of bid which capture the given annotation. + * (Used for debugging). + */ + public Bid withAnnotation(String annotation) { + return new Bid(auction, bidder, price, dateTime, annotation + ": " + extra); + } + + /** + * Does bid have {@code annotation}? (Used for debugging.) + */ + public boolean hasAnnotation(String annotation) { + return extra.startsWith(annotation + ": "); + } + + /** + * Remove {@code annotation} from bid. (Used for debugging.) + */ + public Bid withoutAnnotation(String annotation) { + if (hasAnnotation(annotation)) { + return new Bid(auction, bidder, price, dateTime, extra.substring(annotation.length() + 2)); + } else { + return this; + } + } + + @Override + public long sizeInBytes() { + return 8 + 8 + 8 + 8 + extra.length() + 1; + } + + @Override + public String toString() { + try { + return NexmarkUtils.MAPPER.writeValueAsString(this); + } catch (JsonProcessingException e) { + throw new RuntimeException(e); + } + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/a7f9f7d0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/BidsPerSession.java ---------------------------------------------------------------------- diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/BidsPerSession.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/BidsPerSession.java new file mode 100644 index 0000000..7c4dfae --- /dev/null +++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/BidsPerSession.java @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.integration.nexmark.model; + +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.core.JsonProcessingException; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.io.Serializable; + +import org.apache.beam.integration.nexmark.NexmarkUtils; +import org.apache.beam.sdk.coders.AtomicCoder; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.CoderException; +import org.apache.beam.sdk.coders.VarLongCoder; + +/** + * Result of query 11. + */ +public class BidsPerSession implements KnownSize, Serializable { + private static final Coder<Long> LONG_CODER = VarLongCoder.of(); + + public static final Coder<BidsPerSession> CODER = new AtomicCoder<BidsPerSession>() { + @Override + public void encode(BidsPerSession value, OutputStream outStream, + Coder.Context context) + throws CoderException, IOException { + LONG_CODER.encode(value.personId, outStream, Context.NESTED); + LONG_CODER.encode(value.bidsPerSession, outStream, Context.NESTED); + } + + @Override + public BidsPerSession decode( + InputStream inStream, Coder.Context context) + throws CoderException, IOException { + long personId = LONG_CODER.decode(inStream, Context.NESTED); + long bidsPerSession = LONG_CODER.decode(inStream, Context.NESTED); + return new BidsPerSession(personId, bidsPerSession); + } + }; + + @JsonProperty + public final long personId; + + @JsonProperty + public final long bidsPerSession; + + public BidsPerSession() { + personId = 0; + bidsPerSession = 0; + } + + public BidsPerSession(long personId, long bidsPerSession) { + this.personId = personId; + this.bidsPerSession = bidsPerSession; + } + + @Override + public long sizeInBytes() { + // Two longs. + return 8 + 8; + } + + @Override + public String toString() { + try { + return NexmarkUtils.MAPPER.writeValueAsString(this); + } catch (JsonProcessingException e) { + throw new RuntimeException(e); + } + } +}
