http://git-wip-us.apache.org/repos/asf/beam/blob/a7f9f7d0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkRunner.java ---------------------------------------------------------------------- diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkRunner.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkRunner.java index d311dc4..e8d791f 100644 --- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkRunner.java +++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkRunner.java @@ -26,7 +26,6 @@ import com.google.common.base.Strings; import com.google.common.collect.ImmutableList; import com.google.common.collect.Iterables; import com.google.common.collect.Lists; - import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; @@ -35,6 +34,35 @@ import java.util.Collections; import java.util.List; import java.util.concurrent.ThreadLocalRandom; import javax.annotation.Nullable; +import org.apache.beam.integration.nexmark.io.PubsubHelper; +import org.apache.beam.integration.nexmark.model.Auction; +import org.apache.beam.integration.nexmark.model.Bid; +import org.apache.beam.integration.nexmark.model.Event; +import org.apache.beam.integration.nexmark.model.KnownSize; +import org.apache.beam.integration.nexmark.model.Person; +import org.apache.beam.integration.nexmark.queries.Query0; +import org.apache.beam.integration.nexmark.queries.Query0Model; +import org.apache.beam.integration.nexmark.queries.Query1; +import org.apache.beam.integration.nexmark.queries.Query10; +import org.apache.beam.integration.nexmark.queries.Query11; +import org.apache.beam.integration.nexmark.queries.Query12; +import org.apache.beam.integration.nexmark.queries.Query1Model; +import org.apache.beam.integration.nexmark.queries.Query2; +import org.apache.beam.integration.nexmark.queries.Query2Model; +import org.apache.beam.integration.nexmark.queries.Query3; +import org.apache.beam.integration.nexmark.queries.Query3Model; +import org.apache.beam.integration.nexmark.queries.Query4; +import org.apache.beam.integration.nexmark.queries.Query4Model; +import org.apache.beam.integration.nexmark.queries.Query5; +import org.apache.beam.integration.nexmark.queries.Query5Model; +import org.apache.beam.integration.nexmark.queries.Query6; +import org.apache.beam.integration.nexmark.queries.Query6Model; +import org.apache.beam.integration.nexmark.queries.Query7; +import org.apache.beam.integration.nexmark.queries.Query7Model; +import org.apache.beam.integration.nexmark.queries.Query8; +import org.apache.beam.integration.nexmark.queries.Query8Model; +import org.apache.beam.integration.nexmark.queries.Query9; +import org.apache.beam.integration.nexmark.queries.Query9Model; import org.apache.beam.sdk.AggregatorRetrievalException; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.PipelineResult; @@ -56,7 +84,7 @@ import org.joda.time.Duration; /** * Run a single Nexmark query using a given configuration. */ -public abstract class NexmarkRunner<OptionT extends Options> { +public abstract class NexmarkRunner<OptionT extends NexmarkOptions> { /** * Minimum number of samples needed for 'stead-state' rate calculation. */ @@ -84,7 +112,7 @@ public abstract class NexmarkRunner<OptionT extends Options> { */ private static final Duration STUCK_TERMINATE_DELAY = Duration.standardDays(3); /** - * Options shared by all runs. + * NexmarkOptions shared by all runs. */ protected final OptionT options; @@ -359,7 +387,7 @@ public abstract class NexmarkRunner<OptionT extends Options> { return perf; } - String getJobId(PipelineResult job) { + protected String getJobId(PipelineResult job) { return ""; } @@ -461,7 +489,7 @@ public abstract class NexmarkRunner<OptionT extends Options> { /** * Build and run a pipeline using specified options. */ - protected interface PipelineBuilder<OptionT extends Options> { + protected interface PipelineBuilder<OptionT extends NexmarkOptions> { void build(OptionT publishOnlyOptions); } @@ -966,7 +994,7 @@ public abstract class NexmarkRunner<OptionT extends Options> { // We'll shutdown the publisher job when we notice the main job has finished. invokeBuilderForPublishOnlyPipeline(new PipelineBuilder() { @Override - public void build(Options publishOnlyOptions) { + public void build(NexmarkOptions publishOnlyOptions) { Pipeline sp = Pipeline.create(options); NexmarkUtils.setupPipeline(configuration.coderStrategy, sp); publisherMonitor = new Monitor<Event>(queryName, "publisher");
http://git-wip-us.apache.org/repos/asf/beam/blob/a7f9f7d0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkSparkDriver.java ---------------------------------------------------------------------- diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkSparkDriver.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkSparkDriver.java deleted file mode 100644 index a46d38a..0000000 --- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkSparkDriver.java +++ /dev/null @@ -1,46 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.integration.nexmark; - -import org.apache.beam.runners.spark.SparkPipelineOptions; -import org.apache.beam.sdk.options.PipelineOptionsFactory; - -/** - * Run NexMark queries using the Spark runner. - */ -class NexmarkSparkDriver extends NexmarkDriver<NexmarkSparkDriver.NexmarkSparkOptions> { - /** - * Command line flags. - */ - public interface NexmarkSparkOptions extends Options, SparkPipelineOptions { - } - - /** - * Entry point. - */ - public static void main(String[] args) { - NexmarkSparkOptions options = - PipelineOptionsFactory.fromArgs(args) - .withValidation() - .as(NexmarkSparkOptions.class); -// options.setRunner(org.apache.beam.runners.spark.SparkRunner.class); - options.setRunner(org.apache.beam.runners.spark.SparkRunnerDebugger.class); - NexmarkSparkRunner runner = new NexmarkSparkRunner(options); - new NexmarkSparkDriver().runAll(options, runner); - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/a7f9f7d0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkSparkRunner.java ---------------------------------------------------------------------- diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkSparkRunner.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkSparkRunner.java deleted file mode 100644 index 30ae9ca..0000000 --- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkSparkRunner.java +++ /dev/null @@ -1,54 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.integration.nexmark; - -/** - * Run a query using the Spark runner. - */ -public class NexmarkSparkRunner extends NexmarkRunner<NexmarkSparkDriver.NexmarkSparkOptions> { - @Override - protected boolean isStreaming() { - return options.isStreaming(); - } - - @Override - protected int coresPerWorker() { - return 4; - } - - @Override - protected int maxNumWorkers() { - return 5; - } - - @Override - protected void invokeBuilderForPublishOnlyPipeline( - PipelineBuilder builder) { - builder.build(options); - } - - @Override - protected void waitForPublisherPreload() { - throw new UnsupportedOperationException(); - } - - - public NexmarkSparkRunner(NexmarkSparkDriver.NexmarkSparkOptions options) { - super(options); - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/a7f9f7d0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkSuite.java ---------------------------------------------------------------------- diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkSuite.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkSuite.java index bc47540..be7d7b8 100644 --- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkSuite.java +++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkSuite.java @@ -100,7 +100,7 @@ public enum NexmarkSuite { * with any set command line flags, except for --isStreaming which is only respected for * the {@link #DEFAULT} suite. */ - public Iterable<NexmarkConfiguration> getConfigurations(Options options) { + public Iterable<NexmarkConfiguration> getConfigurations(NexmarkOptions options) { Set<NexmarkConfiguration> results = new LinkedHashSet<>(); for (NexmarkConfiguration configuration : configurations) { NexmarkConfiguration result = configuration.clone(); http://git-wip-us.apache.org/repos/asf/beam/blob/a7f9f7d0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkUtils.java ---------------------------------------------------------------------- diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkUtils.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkUtils.java index f7417d3..b0421a4 100644 --- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkUtils.java +++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkUtils.java @@ -20,14 +20,30 @@ package org.apache.beam.integration.nexmark; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.ImmutableList; import com.google.common.hash.Hashing; - import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.nio.charset.StandardCharsets; import java.util.Iterator; import java.util.List; - +import org.apache.beam.integration.nexmark.model.Auction; +import org.apache.beam.integration.nexmark.model.AuctionBid; +import org.apache.beam.integration.nexmark.model.AuctionCount; +import org.apache.beam.integration.nexmark.model.AuctionPrice; +import org.apache.beam.integration.nexmark.model.Bid; +import org.apache.beam.integration.nexmark.model.BidsPerSession; +import org.apache.beam.integration.nexmark.model.CategoryPrice; +import org.apache.beam.integration.nexmark.model.Done; +import org.apache.beam.integration.nexmark.model.Event; +import org.apache.beam.integration.nexmark.model.IdNameReserve; +import org.apache.beam.integration.nexmark.model.KnownSize; +import org.apache.beam.integration.nexmark.model.NameCityStateId; +import org.apache.beam.integration.nexmark.model.Person; +import org.apache.beam.integration.nexmark.model.SellerPrice; +import org.apache.beam.integration.nexmark.sources.BoundedEventSource; +import org.apache.beam.integration.nexmark.sources.Generator; +import org.apache.beam.integration.nexmark.sources.GeneratorConfig; +import org.apache.beam.integration.nexmark.sources.UnboundedEventSource; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.coders.AvroCoder; import org.apache.beam.sdk.coders.ByteArrayCoder; @@ -67,7 +83,7 @@ public class NexmarkUtils { /** * Mapper for (de)serializing JSON. */ - static final ObjectMapper MAPPER = new ObjectMapper(); + public static final ObjectMapper MAPPER = new ObjectMapper(); /** * Possible sources for events. @@ -382,7 +398,8 @@ public class NexmarkUtils { */ public static PTransform<PBegin, PCollection<Event>> batchEventsSource( NexmarkConfiguration configuration) { - return Read.from(new BoundedEventSource(standardGeneratorConfig(configuration), configuration.numEventGenerators)); + return Read.from(new BoundedEventSource(standardGeneratorConfig(configuration), + configuration.numEventGenerators)); } /** http://git-wip-us.apache.org/repos/asf/beam/blob/a7f9f7d0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Options.java ---------------------------------------------------------------------- diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Options.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Options.java deleted file mode 100644 index 388473d..0000000 --- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Options.java +++ /dev/null @@ -1,386 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.integration.nexmark; - -import javax.annotation.Nullable; - -import org.apache.beam.sdk.options.Default; -import org.apache.beam.sdk.options.Description; -import org.apache.beam.sdk.options.PubsubOptions; - -/** - * Command line flags. - */ -public interface Options extends PubsubOptions { - @Description("Which suite to run. Default is to use command line arguments for one job.") - @Default.Enum("DEFAULT") - NexmarkSuite getSuite(); - - void setSuite(NexmarkSuite suite); - - @Description("If true, and using the DataflowPipelineRunner, monitor the jobs as they run.") - @Default.Boolean(false) - boolean getMonitorJobs(); - - void setMonitorJobs(boolean monitorJobs); - - @Description("Where the events come from.") - @Nullable - NexmarkUtils.SourceType getSourceType(); - - void setSourceType(NexmarkUtils.SourceType sourceType); - - @Description("Prefix for input files if using avro input") - @Nullable - String getInputPath(); - - void setInputPath(String inputPath); - - @Description("Where results go.") - @Nullable - NexmarkUtils.SinkType getSinkType(); - - void setSinkType(NexmarkUtils.SinkType sinkType); - - @Description("Which mode to run in when source is PUBSUB.") - @Nullable - NexmarkUtils.PubSubMode getPubSubMode(); - - void setPubSubMode(NexmarkUtils.PubSubMode pubSubMode); - - @Description("Which query to run.") - @Nullable - Integer getQuery(); - - void setQuery(Integer query); - - @Description("Prefix for output files if using text output for results or running Query 10.") - @Nullable - String getOutputPath(); - - void setOutputPath(String outputPath); - - @Description("Base name of pubsub topic to publish to in streaming mode.") - @Nullable - @Default.String("nexmark") - String getPubsubTopic(); - - void setPubsubTopic(String pubsubTopic); - - @Description("Base name of pubsub subscription to read from in streaming mode.") - @Nullable - @Default.String("nexmark") - String getPubsubSubscription(); - - void setPubsubSubscription(String pubsubSubscription); - - @Description("Base name of BigQuery table name if using BigQuery output.") - @Nullable - @Default.String("nexmark") - String getBigQueryTable(); - - void setBigQueryTable(String bigQueryTable); - - @Description("Approximate number of events to generate. " - + "Zero for effectively unlimited in streaming mode.") - @Nullable - Long getNumEvents(); - - void setNumEvents(Long numEvents); - - @Description("Time in seconds to preload the subscription with data, at the initial input rate " - + "of the pipeline.") - @Nullable - Integer getPreloadSeconds(); - - void setPreloadSeconds(Integer preloadSeconds); - - @Description("Number of unbounded sources to create events.") - @Nullable - Integer getNumEventGenerators(); - - void setNumEventGenerators(Integer numEventGenerators); - - @Description("Shape of event rate curve.") - @Nullable - NexmarkUtils.RateShape getRateShape(); - - void setRateShape(NexmarkUtils.RateShape rateShape); - - @Description("Initial overall event rate (in --rateUnit).") - @Nullable - Integer getFirstEventRate(); - - void setFirstEventRate(Integer firstEventRate); - - @Description("Next overall event rate (in --rateUnit).") - @Nullable - Integer getNextEventRate(); - - void setNextEventRate(Integer nextEventRate); - - @Description("Unit for rates.") - @Nullable - NexmarkUtils.RateUnit getRateUnit(); - - void setRateUnit(NexmarkUtils.RateUnit rateUnit); - - @Description("Overall period of rate shape, in seconds.") - @Nullable - Integer getRatePeriodSec(); - - void setRatePeriodSec(Integer ratePeriodSec); - - @Description("If true, relay events in real time in streaming mode.") - @Nullable - Boolean getIsRateLimited(); - - void setIsRateLimited(Boolean isRateLimited); - - @Description("If true, use wallclock time as event time. Otherwise, use a deterministic" - + " time in the past so that multiple runs will see exactly the same event streams" - + " and should thus have exactly the same results.") - @Nullable - Boolean getUseWallclockEventTime(); - - void setUseWallclockEventTime(Boolean useWallclockEventTime); - - @Description("Assert pipeline results match model results.") - @Nullable - boolean getAssertCorrectness(); - - void setAssertCorrectness(boolean assertCorrectness); - - @Description("Log all input events.") - @Nullable - boolean getLogEvents(); - - void setLogEvents(boolean logEvents); - - @Description("Log all query results.") - @Nullable - boolean getLogResults(); - - void setLogResults(boolean logResults); - - @Description("Average size in bytes for a person record.") - @Nullable - Integer getAvgPersonByteSize(); - - void setAvgPersonByteSize(Integer avgPersonByteSize); - - @Description("Average size in bytes for an auction record.") - @Nullable - Integer getAvgAuctionByteSize(); - - void setAvgAuctionByteSize(Integer avgAuctionByteSize); - - @Description("Average size in bytes for a bid record.") - @Nullable - Integer getAvgBidByteSize(); - - void setAvgBidByteSize(Integer avgBidByteSize); - - @Description("Ratio of bids for 'hot' auctions above the background.") - @Nullable - Integer getHotAuctionRatio(); - - void setHotAuctionRatio(Integer hotAuctionRatio); - - @Description("Ratio of auctions for 'hot' sellers above the background.") - @Nullable - Integer getHotSellersRatio(); - - void setHotSellersRatio(Integer hotSellersRatio); - - @Description("Ratio of auctions for 'hot' bidders above the background.") - @Nullable - Integer getHotBiddersRatio(); - - void setHotBiddersRatio(Integer hotBiddersRatio); - - @Description("Window size in seconds.") - @Nullable - Long getWindowSizeSec(); - - void setWindowSizeSec(Long windowSizeSec); - - @Description("Window period in seconds.") - @Nullable - Long getWindowPeriodSec(); - - void setWindowPeriodSec(Long windowPeriodSec); - - @Description("If in streaming mode, the holdback for watermark in seconds.") - @Nullable - Long getWatermarkHoldbackSec(); - - void setWatermarkHoldbackSec(Long watermarkHoldbackSec); - - @Description("Roughly how many auctions should be in flight for each generator.") - @Nullable - Integer getNumInFlightAuctions(); - - void setNumInFlightAuctions(Integer numInFlightAuctions); - - - @Description("Maximum number of people to consider as active for placing auctions or bids.") - @Nullable - Integer getNumActivePeople(); - - void setNumActivePeople(Integer numActivePeople); - - @Description("Filename of perf data to append to.") - @Nullable - String getPerfFilename(); - - void setPerfFilename(String perfFilename); - - @Description("Filename of baseline perf data to read from.") - @Nullable - String getBaselineFilename(); - - void setBaselineFilename(String baselineFilename); - - @Description("Filename of summary perf data to append to.") - @Nullable - String getSummaryFilename(); - - void setSummaryFilename(String summaryFilename); - - @Description("Filename for javascript capturing all perf data and any baselines.") - @Nullable - String getJavascriptFilename(); - - void setJavascriptFilename(String javascriptFilename); - - @Description("If true, don't run the actual query. Instead, calculate the distribution " - + "of number of query results per (event time) minute according to the query model.") - @Nullable - boolean getJustModelResultRate(); - - void setJustModelResultRate(boolean justModelResultRate); - - @Description("Coder strategy to use.") - @Nullable - NexmarkUtils.CoderStrategy getCoderStrategy(); - - void setCoderStrategy(NexmarkUtils.CoderStrategy coderStrategy); - - @Description("Delay, in milliseconds, for each event. We will peg one core for this " - + "number of milliseconds to simulate CPU-bound computation.") - @Nullable - Long getCpuDelayMs(); - - void setCpuDelayMs(Long cpuDelayMs); - - @Description("Extra data, in bytes, to save to persistent state for each event. " - + "This will force I/O all the way to durable storage to simulate an " - + "I/O-bound computation.") - @Nullable - Long getDiskBusyBytes(); - - void setDiskBusyBytes(Long diskBusyBytes); - - @Description("Skip factor for query 2. We select bids for every {@code auctionSkip}'th auction") - @Nullable - Integer getAuctionSkip(); - - void setAuctionSkip(Integer auctionSkip); - - @Description("Fanout for queries 4 (groups by category id) and 7 (finds a global maximum).") - @Nullable - Integer getFanout(); - - void setFanout(Integer fanout); - - @Description("Length of occasional delay to impose on events (in seconds).") - @Nullable - Long getOccasionalDelaySec(); - - void setOccasionalDelaySec(Long occasionalDelaySec); - - @Description("Probability that an event will be delayed by delayS.") - @Nullable - Double getProbDelayedEvent(); - - void setProbDelayedEvent(Double probDelayedEvent); - - @Description("Maximum size of each log file (in events). For Query10 only.") - @Nullable - Integer getMaxLogEvents(); - - void setMaxLogEvents(Integer maxLogEvents); - - @Description("How to derive names of resources.") - @Default.Enum("QUERY_AND_SALT") - NexmarkUtils.ResourceNameMode getResourceNameMode(); - - void setResourceNameMode(NexmarkUtils.ResourceNameMode mode); - - @Description("If true, manage the creation and cleanup of topics, subscriptions and gcs files.") - @Default.Boolean(true) - boolean getManageResources(); - - void setManageResources(boolean manageResources); - - @Description("If true, use pub/sub publish time instead of event time.") - @Nullable - Boolean getUsePubsubPublishTime(); - - void setUsePubsubPublishTime(Boolean usePubsubPublishTime); - - @Description("Number of events in out-of-order groups. 1 implies no out-of-order events. " - + "1000 implies every 1000 events per generator are emitted in pseudo-random order.") - @Nullable - Long getOutOfOrderGroupSize(); - - void setOutOfOrderGroupSize(Long outOfOrderGroupSize); - - @Description("If false, do not add the Monitor and Snoop transforms.") - @Nullable - Boolean getDebug(); - - void setDebug(Boolean value); - - @Description("If set, cancel running pipelines after this long") - @Nullable - Long getRunningTimeMinutes(); - - void setRunningTimeMinutes(Long value); - - @Description("If set and --monitorJobs is true, check that the system watermark is never more " - + "than this far behind real time") - @Nullable - Long getMaxSystemLagSeconds(); - - void setMaxSystemLagSeconds(Long value); - - @Description("If set and --monitorJobs is true, check that the data watermark is never more " - + "than this far behind real time") - @Nullable - Long getMaxDataLagSeconds(); - - void setMaxDataLagSeconds(Long value); - - @Description("Only start validating watermarks after this many seconds") - @Nullable - Long getWatermarkValidationDelaySeconds(); - - void setWatermarkValidationDelaySeconds(Long value); -} http://git-wip-us.apache.org/repos/asf/beam/blob/a7f9f7d0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Person.java ---------------------------------------------------------------------- diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Person.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Person.java deleted file mode 100644 index 251a6ee..0000000 --- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Person.java +++ /dev/null @@ -1,165 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.integration.nexmark; - -import com.fasterxml.jackson.annotation.JsonProperty; -import com.fasterxml.jackson.core.JsonProcessingException; - -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.io.Serializable; - -import org.apache.beam.sdk.coders.AtomicCoder; -import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.coders.CoderException; -import org.apache.beam.sdk.coders.StringUtf8Coder; -import org.apache.beam.sdk.coders.VarLongCoder; - -/** - * A person either creating an auction or making a bid. - */ -public class Person implements KnownSize, Serializable { - private static final Coder<Long> LONG_CODER = VarLongCoder.of(); - private static final Coder<String> STRING_CODER = StringUtf8Coder.of(); - public static final Coder<Person> CODER = new AtomicCoder<Person>() { - @Override - public void encode(Person value, OutputStream outStream, - Coder.Context context) - throws CoderException, IOException { - LONG_CODER.encode(value.id, outStream, Context.NESTED); - STRING_CODER.encode(value.name, outStream, Context.NESTED); - STRING_CODER.encode(value.emailAddress, outStream, Context.NESTED); - STRING_CODER.encode(value.creditCard, outStream, Context.NESTED); - STRING_CODER.encode(value.city, outStream, Context.NESTED); - STRING_CODER.encode(value.state, outStream, Context.NESTED); - LONG_CODER.encode(value.dateTime, outStream, Context.NESTED); - STRING_CODER.encode(value.extra, outStream, Context.NESTED); - } - - @Override - public Person decode( - InputStream inStream, Coder.Context context) - throws CoderException, IOException { - long id = LONG_CODER.decode(inStream, Context.NESTED); - String name = STRING_CODER.decode(inStream, Context.NESTED); - String emailAddress = STRING_CODER.decode(inStream, Context.NESTED); - String creditCard = STRING_CODER.decode(inStream, Context.NESTED); - String city = STRING_CODER.decode(inStream, Context.NESTED); - String state = STRING_CODER.decode(inStream, Context.NESTED); - long dateTime = LONG_CODER.decode(inStream, Context.NESTED); - String extra = STRING_CODER.decode(inStream, Context.NESTED); - return new Person(id, name, emailAddress, creditCard, city, state, dateTime, extra); - } - }; - - /** Id of person. */ - @JsonProperty - public final long id; // primary key - - /** Extra person properties. */ - @JsonProperty - public final String name; - - @JsonProperty - public final String emailAddress; - - @JsonProperty - public final String creditCard; - - @JsonProperty - public final String city; - - @JsonProperty - public final String state; - - @JsonProperty - public final long dateTime; - - /** Additional arbitrary payload for performance testing. */ - @JsonProperty - public final String extra; - - // For Avro only. - @SuppressWarnings("unused") - private Person() { - id = 0; - name = null; - emailAddress = null; - creditCard = null; - city = null; - state = null; - dateTime = 0; - extra = null; - } - - public Person(long id, String name, String emailAddress, String creditCard, String city, - String state, long dateTime, String extra) { - this.id = id; - this.name = name; - this.emailAddress = emailAddress; - this.creditCard = creditCard; - this.city = city; - this.state = state; - this.dateTime = dateTime; - this.extra = extra; - } - - /** - * Return a copy of person which capture the given annotation. - * (Used for debugging). - */ - public Person withAnnotation(String annotation) { - return new Person(id, name, emailAddress, creditCard, city, state, dateTime, - annotation + ": " + extra); - } - - /** - * Does person have {@code annotation}? (Used for debugging.) - */ - public boolean hasAnnotation(String annotation) { - return extra.startsWith(annotation + ": "); - } - - /** - * Remove {@code annotation} from person. (Used for debugging.) - */ - public Person withoutAnnotation(String annotation) { - if (hasAnnotation(annotation)) { - return new Person(id, name, emailAddress, creditCard, city, state, dateTime, - extra.substring(annotation.length() + 2)); - } else { - return this; - } - } - - @Override - public long sizeInBytes() { - return 8 + name.length() + 1 + emailAddress.length() + 1 + creditCard.length() + 1 - + city.length() + 1 + state.length() + 8 + 1 + extra.length() + 1; - } - - @Override - public String toString() { - try { - return NexmarkUtils.MAPPER.writeValueAsString(this); - } catch (JsonProcessingException e) { - throw new RuntimeException(e); - } - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/a7f9f7d0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/PubsubHelper.java ---------------------------------------------------------------------- diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/PubsubHelper.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/PubsubHelper.java deleted file mode 100644 index a79a25b..0000000 --- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/PubsubHelper.java +++ /dev/null @@ -1,216 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.integration.nexmark; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.Collection; -import java.util.List; - -import org.apache.beam.sdk.options.PubsubOptions; -import org.apache.beam.sdk.util.PubsubClient; -import org.apache.beam.sdk.util.PubsubJsonClient; - -/** - * Helper for working with pubsub. - */ -public class PubsubHelper implements AutoCloseable { - /** - * Underlying pub/sub client. - */ - private final PubsubClient pubsubClient; - - /** - * Project id. - */ - private final String projectId; - - /** - * Topics we should delete on close. - */ - private final List<PubsubClient.TopicPath> createdTopics; - - /** - * Subscriptions we should delete on close. - */ - private final List<PubsubClient.SubscriptionPath> createdSubscriptions; - - private PubsubHelper(PubsubClient pubsubClient, String projectId) { - this.pubsubClient = pubsubClient; - this.projectId = projectId; - createdTopics = new ArrayList<>(); - createdSubscriptions = new ArrayList<>(); - } - - /** - * Create a helper. - */ - public static PubsubHelper create(PubsubOptions options) { - try { - return new PubsubHelper( - PubsubJsonClient.FACTORY.newClient(null, null, options), - options.getProject()); - } catch (IOException e) { - throw new RuntimeException("Unable to create Pubsub client: ", e); - } - } - - /** - * Create a topic from short name. Delete it if it already exists. Ensure the topic will be - * deleted on cleanup. Return full topic name. - */ - public PubsubClient.TopicPath createTopic(String shortTopic) { - PubsubClient.TopicPath topic = PubsubClient.topicPathFromName(projectId, shortTopic); - try { - if (topicExists(shortTopic)) { - NexmarkUtils.console("attempting to cleanup topic %s", topic); - pubsubClient.deleteTopic(topic); - } - NexmarkUtils.console("create topic %s", topic); - pubsubClient.createTopic(topic); - createdTopics.add(topic); - return topic; - } catch (IOException e) { - throw new RuntimeException("Unable to create Pubsub topic " + topic + ": ", e); - } - } - - /** - * Create a topic from short name if it does not already exist. The topic will not be - * deleted on cleanup. Return full topic name. - */ - public PubsubClient.TopicPath createOrReuseTopic(String shortTopic) { - PubsubClient.TopicPath topic = PubsubClient.topicPathFromName(projectId, shortTopic); - try { - if (topicExists(shortTopic)) { - NexmarkUtils.console("topic %s already exists", topic); - return topic; - } - NexmarkUtils.console("create topic %s", topic); - pubsubClient.createTopic(topic); - return topic; - } catch (IOException e) { - throw new RuntimeException("Unable to create or reuse Pubsub topic " + topic + ": ", e); - } - } - - /** - * Check a topic corresponding to short name exists, and throw exception if not. The - * topic will not be deleted on cleanup. Return full topic name. - */ - public PubsubClient.TopicPath reuseTopic(String shortTopic) { - PubsubClient.TopicPath topic = PubsubClient.topicPathFromName(projectId, shortTopic); - if (topicExists(shortTopic)) { - NexmarkUtils.console("reusing existing topic %s", topic); - return topic; - } - throw new RuntimeException("topic '" + topic + "' does not already exist"); - } - - /** - * Does topic corresponding to short name exist? - */ - public boolean topicExists(String shortTopic) { - PubsubClient.ProjectPath project = PubsubClient.projectPathFromId(projectId); - PubsubClient.TopicPath topic = PubsubClient.topicPathFromName(projectId, shortTopic); - try { - Collection<PubsubClient.TopicPath> existingTopics = pubsubClient.listTopics(project); - return existingTopics.contains(topic); - } catch (IOException e) { - throw new RuntimeException("Unable to check Pubsub topic " + topic + ": ", e); - } - } - - /** - * Create subscription from short name. Delete subscription if it already exists. Ensure the - * subscription will be deleted on cleanup. Return full subscription name. - */ - public PubsubClient.SubscriptionPath createSubscription( - String shortTopic, String shortSubscription) { - PubsubClient.TopicPath topic = PubsubClient.topicPathFromName(projectId, shortTopic); - PubsubClient.SubscriptionPath subscription = - PubsubClient.subscriptionPathFromName(projectId, shortSubscription); - try { - if (subscriptionExists(shortTopic, shortSubscription)) { - NexmarkUtils.console("attempting to cleanup subscription %s", subscription); - pubsubClient.deleteSubscription(subscription); - } - NexmarkUtils.console("create subscription %s", subscription); - pubsubClient.createSubscription(topic, subscription, 60); - createdSubscriptions.add(subscription); - } catch (IOException e) { - throw new RuntimeException("Unable to create Pubsub subscription " + subscription + ": ", e); - } - return subscription; - } - - /** - * Check a subscription corresponding to short name exists, and throw exception if not. The - * subscription will not be deleted on cleanup. Return full topic name. - */ - public PubsubClient.SubscriptionPath reuseSubscription( - String shortTopic, String shortSubscription) { - PubsubClient.SubscriptionPath subscription = - PubsubClient.subscriptionPathFromName(projectId, shortSubscription); - if (subscriptionExists(shortTopic, shortSubscription)) { - NexmarkUtils.console("reusing existing subscription %s", subscription); - return subscription; - } - throw new RuntimeException("subscription'" + subscription + "' does not already exist"); - } - - /** - * Does subscription corresponding to short name exist? - */ - public boolean subscriptionExists(String shortTopic, String shortSubscription) { - PubsubClient.ProjectPath project = PubsubClient.projectPathFromId(projectId); - PubsubClient.TopicPath topic = PubsubClient.topicPathFromName(projectId, shortTopic); - PubsubClient.SubscriptionPath subscription = - PubsubClient.subscriptionPathFromName(projectId, shortSubscription); - try { - Collection<PubsubClient.SubscriptionPath> existingSubscriptions = - pubsubClient.listSubscriptions(project, topic); - return existingSubscriptions.contains(subscription); - } catch (IOException e) { - throw new RuntimeException("Unable to check Pubsub subscription" + subscription + ": ", e); - } - } - - /** - * Delete all the subscriptions and topics we created. - */ - @Override - public void close() { - for (PubsubClient.SubscriptionPath subscription : createdSubscriptions) { - try { - NexmarkUtils.console("delete subscription %s", subscription); - pubsubClient.deleteSubscription(subscription); - } catch (IOException ex) { - NexmarkUtils.console("could not delete subscription %s", subscription); - } - } - for (PubsubClient.TopicPath topic : createdTopics) { - try { - NexmarkUtils.console("delete topic %s", topic); - pubsubClient.deleteTopic(topic); - } catch (IOException ex) { - NexmarkUtils.console("could not delete topic %s", topic); - } - } - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/a7f9f7d0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Query0.java ---------------------------------------------------------------------- diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Query0.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Query0.java deleted file mode 100644 index e88fce0..0000000 --- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Query0.java +++ /dev/null @@ -1,67 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.integration.nexmark; - -import java.io.ByteArrayInputStream; -import java.io.ByteArrayOutputStream; -import java.io.IOException; - -import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.coders.CoderException; -import org.apache.beam.sdk.transforms.Aggregator; -import org.apache.beam.sdk.transforms.DoFn; -import org.apache.beam.sdk.transforms.ParDo; -import org.apache.beam.sdk.transforms.Sum; -import org.apache.beam.sdk.values.PCollection; - -/** - * Query 0: Pass events through unchanged. However, force them to do a round trip through - * serialization so that we measure the impact of the choice of coders. - */ -public class Query0 extends NexmarkQuery { - public Query0(NexmarkConfiguration configuration) { - super(configuration, "Query0"); - } - - private PCollection<Event> applyTyped(PCollection<Event> events) { - final Coder<Event> coder = events.getCoder(); - return events - // Force round trip through coder. - .apply(name + ".Serialize", - ParDo.of(new DoFn<Event, Event>() { - private final Aggregator<Long, Long> bytes = - createAggregator("bytes", Sum.ofLongs()); - - @ProcessElement - public void processElement(ProcessContext c) throws CoderException, IOException { - ByteArrayOutputStream outStream = new ByteArrayOutputStream(); - coder.encode(c.element(), outStream, Coder.Context.OUTER); - byte[] byteArray = outStream.toByteArray(); - bytes.addValue((long) byteArray.length); - ByteArrayInputStream inStream = new ByteArrayInputStream(byteArray); - Event event = coder.decode(inStream, Coder.Context.OUTER); - c.output(event); - } - })); - } - - @Override - protected PCollection<KnownSize> applyPrim(PCollection<Event> events) { - return NexmarkUtils.castToKnownSize(name, applyTyped(events)); - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/a7f9f7d0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Query0Model.java ---------------------------------------------------------------------- diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Query0Model.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Query0Model.java deleted file mode 100644 index 37e3f93..0000000 --- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Query0Model.java +++ /dev/null @@ -1,62 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.integration.nexmark; - -import java.util.Collection; -import java.util.Iterator; - -import org.apache.beam.sdk.values.TimestampedValue; - -/** - * A direct implementation of {@link Query0}. - */ -public class Query0Model extends NexmarkQueryModel { - /** - * Simulator for query 0. - */ - private class Simulator extends AbstractSimulator<Event, Event> { - public Simulator(NexmarkConfiguration configuration) { - super(NexmarkUtils.standardEventIterator(configuration)); - } - - @Override - protected void run() { - TimestampedValue<Event> timestampedEvent = nextInput(); - if (timestampedEvent == null) { - allDone(); - return; - } - addResult(timestampedEvent); - //TODO test fails because offset of some hundreds of ms beween expect and actual - } - } - - public Query0Model(NexmarkConfiguration configuration) { - super(configuration); - } - - @Override - protected AbstractSimulator<?, ?> simulator() { - return new Simulator(configuration); - } - - @Override - protected <T> Collection<String> toCollection(Iterator<TimestampedValue<T>> itr) { - return toValueTimestampOrder(itr); - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/a7f9f7d0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Query1.java ---------------------------------------------------------------------- diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Query1.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Query1.java deleted file mode 100644 index a1ecdeb..0000000 --- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Query1.java +++ /dev/null @@ -1,62 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.integration.nexmark; - -import org.apache.beam.sdk.transforms.DoFn; -import org.apache.beam.sdk.transforms.ParDo; -import org.apache.beam.sdk.values.PCollection; - -/** - * Query 1, 'Currency Conversion'. Convert each bid value from dollars to euros. - * In CQL syntax: - * - * <pre> - * SELECT Istream(auction, DOLTOEUR(price), bidder, datetime) - * FROM bid [ROWS UNBOUNDED]; - * </pre> - * - * <p>To make things more interesting, allow the 'currency conversion' to be arbitrarily - * slowed down. - */ -class Query1 extends NexmarkQuery { - public Query1(NexmarkConfiguration configuration) { - super(configuration, "Query1"); - } - - private PCollection<Bid> applyTyped(PCollection<Event> events) { - return events - // Only want the bid events. - .apply(JUST_BIDS) - - // Map the conversion function over all bids. - .apply(name + ".ToEuros", - ParDo.of(new DoFn<Bid, Bid>() { - @ProcessElement - public void processElement(ProcessContext c) { - Bid bid = c.element(); - c.output(new Bid( - bid.auction, bid.bidder, (bid.price * 89) / 100, bid.dateTime, bid.extra)); - } - })); - } - - @Override - protected PCollection<KnownSize> applyPrim(PCollection<Event> events) { - return NexmarkUtils.castToKnownSize(name, applyTyped(events)); - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/a7f9f7d0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Query10.java ---------------------------------------------------------------------- diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Query10.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Query10.java deleted file mode 100644 index 7bdcb36..0000000 --- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Query10.java +++ /dev/null @@ -1,380 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.integration.nexmark; - -import static com.google.common.base.Preconditions.checkState; - -import com.google.cloud.hadoop.gcsio.GoogleCloudStorageWriteChannel; - -import java.io.IOException; -import java.io.OutputStream; -import java.io.Serializable; -import java.nio.channels.Channels; -import java.nio.channels.WritableByteChannel; -import java.util.concurrent.ThreadLocalRandom; -import javax.annotation.Nullable; - -import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.options.GcsOptions; -import org.apache.beam.sdk.transforms.Aggregator; -import org.apache.beam.sdk.transforms.DoFn; -import org.apache.beam.sdk.transforms.GroupByKey; -import org.apache.beam.sdk.transforms.ParDo; -import org.apache.beam.sdk.transforms.Sum; -import org.apache.beam.sdk.transforms.windowing.AfterEach; -import org.apache.beam.sdk.transforms.windowing.AfterFirst; -import org.apache.beam.sdk.transforms.windowing.AfterPane; -import org.apache.beam.sdk.transforms.windowing.AfterProcessingTime; -import org.apache.beam.sdk.transforms.windowing.AfterWatermark; -import org.apache.beam.sdk.transforms.windowing.BoundedWindow; -import org.apache.beam.sdk.transforms.windowing.FixedWindows; -import org.apache.beam.sdk.transforms.windowing.PaneInfo; -import org.apache.beam.sdk.transforms.windowing.PaneInfo.Timing; -import org.apache.beam.sdk.transforms.windowing.Repeatedly; -import org.apache.beam.sdk.transforms.windowing.Window; -import org.apache.beam.sdk.util.GcsIOChannelFactory; -import org.apache.beam.sdk.values.KV; -import org.apache.beam.sdk.values.PCollection; -import org.joda.time.Duration; -import org.joda.time.Instant; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - - -/** - * Query "10", 'Log to sharded files' (Not in original suite.) - * - * <p>Every windowSizeSec, save all events from the last period into 2*maxWorkers log files. - */ -class Query10 extends NexmarkQuery { - private static final Logger LOG = LoggerFactory.getLogger(Query10.class); - private static final int CHANNEL_BUFFER = 8 << 20; // 8MB - private static final int NUM_SHARDS_PER_WORKER = 5; - private static final Duration LATE_BATCHING_PERIOD = Duration.standardSeconds(10); - - /** - * Capture everything we need to know about the records in a single output file. - */ - private static class OutputFile implements Serializable { - /** Maximum possible timestamp of records in file. */ - private final Instant maxTimestamp; - /** Shard within window. */ - private final String shard; - /** Index of file in all files in shard. */ - private final long index; - /** Timing of records in this file. */ - private final PaneInfo.Timing timing; - /** Path to file containing records, or {@literal null} if no output required. */ - @Nullable - private final String filename; - - public OutputFile( - Instant maxTimestamp, - String shard, - long index, - PaneInfo.Timing timing, - @Nullable String filename) { - this.maxTimestamp = maxTimestamp; - this.shard = shard; - this.index = index; - this.timing = timing; - this.filename = filename; - } - - @Override - public String toString() { - return String.format("%s %s %d %s %s\n", maxTimestamp, shard, index, timing, filename); - } - } - - /** - * GCS uri prefix for all log and 'finished' files. If null they won't be written. - */ - @Nullable - private String outputPath; - - /** - * Maximum number of workers, used to determine log sharding factor. - */ - private int maxNumWorkers; - - public Query10(NexmarkConfiguration configuration) { - super(configuration, "Query10"); - } - - public void setOutputPath(@Nullable String outputPath) { - this.outputPath = outputPath; - } - - public void setMaxNumWorkers(int maxNumWorkers) { - this.maxNumWorkers = maxNumWorkers; - } - - /** - * Return channel for writing bytes to GCS. - * - * @throws IOException - */ - private WritableByteChannel openWritableGcsFile(GcsOptions options, String filename) - throws IOException { - WritableByteChannel channel = - GcsIOChannelFactory.fromOptions(options).create(filename, "text/plain"); - checkState(channel instanceof GoogleCloudStorageWriteChannel); - ((GoogleCloudStorageWriteChannel) channel).setUploadBufferSize(CHANNEL_BUFFER); - return channel; - } - - /** Return a short string to describe {@code timing}. */ - private String timingToString(PaneInfo.Timing timing) { - switch (timing) { - case EARLY: - return "E"; - case ON_TIME: - return "O"; - case LATE: - return "L"; - } - throw new RuntimeException(); // cases are exhaustive - } - - /** Construct an {@link OutputFile} for {@code pane} in {@code window} for {@code shard}. */ - private OutputFile outputFileFor(BoundedWindow window, String shard, PaneInfo pane) { - @Nullable String filename = - outputPath == null - ? null - : String.format("%s/LOG-%s-%s-%03d-%s-%x", - outputPath, window.maxTimestamp(), shard, pane.getIndex(), - timingToString(pane.getTiming()), - ThreadLocalRandom.current().nextLong()); - return new OutputFile(window.maxTimestamp(), shard, pane.getIndex(), - pane.getTiming(), filename); - } - - /** - * Return path to which we should write the index for {@code window}, or {@literal null} - * if no output required. - */ - @Nullable - private String indexPathFor(BoundedWindow window) { - if (outputPath == null) { - return null; - } - return String.format("%s/INDEX-%s", outputPath, window.maxTimestamp()); - } - - private PCollection<Done> applyTyped(PCollection<Event> events) { - final int numLogShards = maxNumWorkers * NUM_SHARDS_PER_WORKER; - - return events.apply(name + ".ShardEvents", - ParDo.of(new DoFn<Event, KV<String, Event>>() { - final Aggregator<Long, Long> lateCounter = - createAggregator("actuallyLateEvent", Sum.ofLongs()); - final Aggregator<Long, Long> onTimeCounter = - createAggregator("actuallyOnTimeEvent", Sum.ofLongs()); - - @ProcessElement - public void processElement(ProcessContext c) { - if (c.element().hasAnnotation("LATE")) { - lateCounter.addValue(1L); - LOG.error("Observed late: %s", c.element()); - } else { - onTimeCounter.addValue(1L); - } - int shardNum = (int) Math.abs((long) c.element().hashCode() % numLogShards); - String shard = String.format("shard-%05d-of-%05d", shardNum, numLogShards); - c.output(KV.of(shard, c.element())); - } - })) - .apply(name + ".WindowEvents", - Window.<KV<String, Event>>into( - FixedWindows.of(Duration.standardSeconds(configuration.windowSizeSec))) - .triggering(AfterEach.inOrder( - Repeatedly - .forever(AfterPane.elementCountAtLeast(configuration.maxLogEvents)) - .orFinally(AfterWatermark.pastEndOfWindow()), - Repeatedly.forever( - AfterFirst.of(AfterPane.elementCountAtLeast(configuration.maxLogEvents), - AfterProcessingTime.pastFirstElementInPane() - .plusDelayOf(LATE_BATCHING_PERIOD))))) - .discardingFiredPanes() - // Use a 1 day allowed lateness so that any forgotten hold will stall the - // pipeline for that period and be very noticeable. - .withAllowedLateness(Duration.standardDays(1))) - .apply(name + ".GroupByKey", GroupByKey.<String, Event>create()) - .apply(name + ".CheckForLateEvents", - ParDo.of(new DoFn<KV<String, Iterable<Event>>, - KV<String, Iterable<Event>>>() { - final Aggregator<Long, Long> earlyCounter = - createAggregator("earlyShard", Sum.ofLongs()); - final Aggregator<Long, Long> onTimeCounter = - createAggregator("onTimeShard", Sum.ofLongs()); - final Aggregator<Long, Long> lateCounter = - createAggregator("lateShard", Sum.ofLongs()); - final Aggregator<Long, Long> unexpectedLatePaneCounter = - createAggregator("ERROR_unexpectedLatePane", Sum.ofLongs()); - final Aggregator<Long, Long> unexpectedOnTimeElementCounter = - createAggregator("ERROR_unexpectedOnTimeElement", Sum.ofLongs()); - - @ProcessElement - public void processElement(ProcessContext c, BoundedWindow window) { - int numLate = 0; - int numOnTime = 0; - for (Event event : c.element().getValue()) { - if (event.hasAnnotation("LATE")) { - numLate++; - } else { - numOnTime++; - } - } - String shard = c.element().getKey(); - LOG.error( - "%s with timestamp %s has %d actually late and %d on-time " - + "elements in pane %s for window %s", - shard, c.timestamp(), numLate, numOnTime, c.pane(), - window.maxTimestamp()); - if (c.pane().getTiming() == PaneInfo.Timing.LATE) { - if (numLate == 0) { - LOG.error( - "ERROR! No late events in late pane for %s", shard); - unexpectedLatePaneCounter.addValue(1L); - } - if (numOnTime > 0) { - LOG.error( - "ERROR! Have %d on-time events in late pane for %s", - numOnTime, shard); - unexpectedOnTimeElementCounter.addValue(1L); - } - lateCounter.addValue(1L); - } else if (c.pane().getTiming() == PaneInfo.Timing.EARLY) { - if (numOnTime + numLate < configuration.maxLogEvents) { - LOG.error( - "ERROR! Only have %d events in early pane for %s", - numOnTime + numLate, shard); - } - earlyCounter.addValue(1L); - } else { - onTimeCounter.addValue(1L); - } - c.output(c.element()); - } - })) - .apply(name + ".UploadEvents", - ParDo.of(new DoFn<KV<String, Iterable<Event>>, - KV<Void, OutputFile>>() { - final Aggregator<Long, Long> savedFileCounter = - createAggregator("savedFile", Sum.ofLongs()); - final Aggregator<Long, Long> writtenRecordsCounter = - createAggregator("writtenRecords", Sum.ofLongs()); - - @ProcessElement - public void processElement(ProcessContext c, BoundedWindow window) - throws IOException { - String shard = c.element().getKey(); - GcsOptions options = c.getPipelineOptions().as(GcsOptions.class); - OutputFile outputFile = outputFileFor(window, shard, c.pane()); - LOG.error( - "Writing %s with record timestamp %s, window timestamp %s, pane %s", - shard, c.timestamp(), window.maxTimestamp(), c.pane()); - if (outputFile.filename != null) { - LOG.error("Beginning write to '%s'", outputFile.filename); - int n = 0; - try (OutputStream output = - Channels.newOutputStream(openWritableGcsFile(options, outputFile - .filename))) { - for (Event event : c.element().getValue()) { - Event.CODER.encode(event, output, Coder.Context.OUTER); - writtenRecordsCounter.addValue(1L); - if (++n % 10000 == 0) { - LOG.error("So far written %d records to '%s'", n, - outputFile.filename); - } - } - } - LOG.error("Written all %d records to '%s'", n, outputFile.filename); - } - savedFileCounter.addValue(1L); - c.output(KV.<Void, OutputFile>of(null, outputFile)); - } - })) - // Clear fancy triggering from above. - .apply(name + ".WindowLogFiles", Window.<KV<Void, OutputFile>>into( - FixedWindows.of(Duration.standardSeconds(configuration.windowSizeSec))) - .triggering(AfterWatermark.pastEndOfWindow()) - // We expect no late data here, but we'll assume the worst so we can detect any. - .withAllowedLateness(Duration.standardDays(1)) - .discardingFiredPanes()) - // TODO etienne: unnecessary groupByKey? because aggregators are shared in parallel - // and Pardo is also in parallel, why group all elements in memory of the same executor? - .apply(name + ".GroupByKey2", GroupByKey.<Void, OutputFile>create()) - .apply(name + ".Index", - ParDo.of(new DoFn<KV<Void, Iterable<OutputFile>>, Done>() { - final Aggregator<Long, Long> unexpectedLateCounter = - createAggregator("ERROR_unexpectedLate", Sum.ofLongs()); - final Aggregator<Long, Long> unexpectedEarlyCounter = - createAggregator("ERROR_unexpectedEarly", Sum.ofLongs()); - final Aggregator<Long, Long> unexpectedIndexCounter = - createAggregator("ERROR_unexpectedIndex", Sum.ofLongs()); - final Aggregator<Long, Long> finalizedCounter = - createAggregator("indexed", Sum.ofLongs()); - - @ProcessElement - public void processElement(ProcessContext c, BoundedWindow window) - throws IOException { - if (c.pane().getTiming() == Timing.LATE) { - unexpectedLateCounter.addValue(1L); - LOG.error("ERROR! Unexpected LATE pane: %s", c.pane()); - } else if (c.pane().getTiming() == Timing.EARLY) { - unexpectedEarlyCounter.addValue(1L); - LOG.error("ERROR! Unexpected EARLY pane: %s", c.pane()); - } else if (c.pane().getTiming() == Timing.ON_TIME - && c.pane().getIndex() != 0) { - unexpectedIndexCounter.addValue(1L); - LOG.error("ERROR! Unexpected ON_TIME pane index: %s", c.pane()); - } else { - GcsOptions options = c.getPipelineOptions().as(GcsOptions.class); - LOG.error( - "Index with record timestamp %s, window timestamp %s, pane %s", - c.timestamp(), window.maxTimestamp(), c.pane()); - - @Nullable String filename = indexPathFor(window); - if (filename != null) { - LOG.error("Beginning write to '%s'", filename); - int n = 0; - try (OutputStream output = - Channels.newOutputStream( - openWritableGcsFile(options, filename))) { - for (OutputFile outputFile : c.element().getValue()) { - output.write(outputFile.toString().getBytes()); - n++; - } - } - LOG.error("Written all %d lines to '%s'", n, filename); - } - c.output( - new Done("written for timestamp " + window.maxTimestamp())); - finalizedCounter.addValue(1L); - } - } - })); - } - - @Override - protected PCollection<KnownSize> applyPrim(PCollection<Event> events) { - return NexmarkUtils.castToKnownSize(name, applyTyped(events)); - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/a7f9f7d0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Query11.java ---------------------------------------------------------------------- diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Query11.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Query11.java deleted file mode 100644 index d610b7c..0000000 --- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Query11.java +++ /dev/null @@ -1,73 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.integration.nexmark; - -import org.apache.beam.sdk.transforms.Count; -import org.apache.beam.sdk.transforms.DoFn; -import org.apache.beam.sdk.transforms.ParDo; -import org.apache.beam.sdk.transforms.windowing.AfterPane; -import org.apache.beam.sdk.transforms.windowing.Repeatedly; -import org.apache.beam.sdk.transforms.windowing.Sessions; -import org.apache.beam.sdk.transforms.windowing.Window; -import org.apache.beam.sdk.values.KV; -import org.apache.beam.sdk.values.PCollection; -import org.joda.time.Duration; - -/** - * Query "11", 'User sessions' (Not in original suite.) - * - * <p>Group bids by the same user into sessions with {@code windowSizeSec} max gap. - * However limit the session to at most {@code maxLogEvents}. Emit the number of - * bids per session. - */ -class Query11 extends NexmarkQuery { - public Query11(NexmarkConfiguration configuration) { - super(configuration, "Query11"); - } - - private PCollection<BidsPerSession> applyTyped(PCollection<Event> events) { - return events.apply(JUST_BIDS) - .apply(name + ".Rekey", - // TODO etienne: why not avoid this ParDo and do a Cont.perElement? - ParDo.of(new DoFn<Bid, KV<Long, Void>>() { - @ProcessElement - public void processElement(ProcessContext c) { - Bid bid = c.element(); - c.output(KV.of(bid.bidder, (Void) null)); - } - })) - .apply(Window.<KV<Long, Void>>into( - Sessions.withGapDuration(Duration.standardSeconds(configuration.windowSizeSec))) - .triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(configuration.maxLogEvents))) - .discardingFiredPanes() - .withAllowedLateness(Duration.standardSeconds(configuration.occasionalDelaySec / 2))) - .apply(Count.<Long, Void>perKey()) - .apply(name + ".ToResult", - ParDo.of(new DoFn<KV<Long, Long>, BidsPerSession>() { - @ProcessElement - public void processElement(ProcessContext c) { - c.output(new BidsPerSession(c.element().getKey(), c.element().getValue())); - } - })); - } - - @Override - protected PCollection<KnownSize> applyPrim(PCollection<Event> events) { - return NexmarkUtils.castToKnownSize(name, applyTyped(events)); - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/a7f9f7d0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Query12.java ---------------------------------------------------------------------- diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Query12.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Query12.java deleted file mode 100644 index 72fbb57..0000000 --- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Query12.java +++ /dev/null @@ -1,77 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.integration.nexmark; - -import org.apache.beam.sdk.transforms.Count; -import org.apache.beam.sdk.transforms.DoFn; -import org.apache.beam.sdk.transforms.ParDo; -import org.apache.beam.sdk.transforms.windowing.AfterProcessingTime; -import org.apache.beam.sdk.transforms.windowing.GlobalWindows; -import org.apache.beam.sdk.transforms.windowing.Repeatedly; -import org.apache.beam.sdk.transforms.windowing.Window; -import org.apache.beam.sdk.values.KV; -import org.apache.beam.sdk.values.PCollection; -import org.joda.time.Duration; - -/** - * Query "12", 'Processing time windows' (Not in original suite.) - * - * <p>Group bids by the same user into processing time windows of windowSize. Emit the count - * of bids per window. - */ -class Query12 extends NexmarkQuery { - public Query12(NexmarkConfiguration configuration) { - super(configuration, "Query12"); - } - - private PCollection<BidsPerSession> applyTyped(PCollection<Event> events) { - return events - .apply(JUST_BIDS) - .apply(name + ".Rekey", - // TODO etienne: why not avoid this ParDo and do a Cont.perElement? - ParDo.of(new DoFn<Bid, KV<Long, Void>>() { - @ProcessElement - public void processElement(ProcessContext c) { - Bid bid = c.element(); - c.output(KV.of(bid.bidder, (Void) null)); - } - })) - .apply(Window.<KV<Long, Void>>into(new GlobalWindows()) - .triggering( - Repeatedly.forever( - AfterProcessingTime.pastFirstElementInPane() - .plusDelayOf( - Duration.standardSeconds(configuration.windowSizeSec)))) - .discardingFiredPanes() - .withAllowedLateness(Duration.ZERO)) - .apply(Count.<Long, Void>perKey()) - .apply(name + ".ToResult", - ParDo.of(new DoFn<KV<Long, Long>, BidsPerSession>() { - @ProcessElement - public void processElement(ProcessContext c) { - c.output( - new BidsPerSession(c.element().getKey(), c.element().getValue())); - } - })); - } - - @Override - protected PCollection<KnownSize> applyPrim(PCollection<Event> events) { - return NexmarkUtils.castToKnownSize(name, applyTyped(events)); - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/a7f9f7d0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Query1Model.java ---------------------------------------------------------------------- diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Query1Model.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Query1Model.java deleted file mode 100644 index 16287e6..0000000 --- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Query1Model.java +++ /dev/null @@ -1,73 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.integration.nexmark; - -import java.io.Serializable; -import java.util.Collection; -import java.util.Iterator; - -import org.apache.beam.sdk.values.TimestampedValue; - -/** - * A direct implementation of {@link Query1}. - */ -public class Query1Model extends NexmarkQueryModel implements Serializable { - /** - * Simulator for query 1. - */ - private class Simulator extends AbstractSimulator<Event, Bid> { - public Simulator(NexmarkConfiguration configuration) { - super(NexmarkUtils.standardEventIterator(configuration)); - } - - @Override - protected void run() { - TimestampedValue<Event> timestampedEvent = nextInput(); - if (timestampedEvent == null) { - allDone(); - return; - } - Event event = timestampedEvent.getValue(); - if (event.bid == null) { - // Ignore non-bid events. - return; - } - Bid bid = event.bid; - Bid resultBid = - new Bid(bid.auction, bid.bidder, bid.price * 89 / 100, bid.dateTime, bid.extra); - TimestampedValue<Bid> result = - TimestampedValue.of(resultBid, timestampedEvent.getTimestamp()); - addResult(result); - //TODO test fails because offset of some hundreds of ms beween expect and actual - } - } - - public Query1Model(NexmarkConfiguration configuration) { - super(configuration); - } - - @Override - public AbstractSimulator<?, ?> simulator() { - return new Simulator(configuration); - } - - @Override - protected <T> Collection<String> toCollection(Iterator<TimestampedValue<T>> itr) { - return toValueTimestampOrder(itr); - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/a7f9f7d0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Query2.java ---------------------------------------------------------------------- diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Query2.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Query2.java deleted file mode 100644 index 828cdf5..0000000 --- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Query2.java +++ /dev/null @@ -1,73 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.integration.nexmark; - -import org.apache.beam.sdk.transforms.DoFn; -import org.apache.beam.sdk.transforms.Filter; -import org.apache.beam.sdk.transforms.ParDo; -import org.apache.beam.sdk.transforms.SerializableFunction; -import org.apache.beam.sdk.values.PCollection; - -/** - * Query 2, 'Filtering. Find bids with specific auction ids and show their bid price. - * In CQL syntax: - * - * <pre> - * SELECT Rstream(auction, price) - * FROM Bid [NOW] - * WHERE auction = 1007 OR auction = 1020 OR auction = 2001 OR auction = 2019 OR auction = 2087; - * </pre> - * - * <p>As written that query will only yield a few hundred results over event streams of - * arbitrary size. To make it more interesting we instead choose bids for every - * {@code auctionSkip}'th auction. - */ -class Query2 extends NexmarkQuery { - public Query2(NexmarkConfiguration configuration) { - super(configuration, "Query2"); - } - - private PCollection<AuctionPrice> applyTyped(PCollection<Event> events) { - return events - // Only want the bid events. - .apply(JUST_BIDS) - - // Select just the bids for the auctions we care about. - .apply(Filter.by(new SerializableFunction<Bid, Boolean>() { - @Override - public Boolean apply(Bid bid) { - return bid.auction % configuration.auctionSkip == 0; - } - })) - - // Project just auction id and price. - .apply(name + ".Project", - ParDo.of(new DoFn<Bid, AuctionPrice>() { - @ProcessElement - public void processElement(ProcessContext c) { - Bid bid = c.element(); - c.output(new AuctionPrice(bid.auction, bid.price)); - } - })); - } - - @Override - protected PCollection<KnownSize> applyPrim(PCollection<Event> events) { - return NexmarkUtils.castToKnownSize(name, applyTyped(events)); - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/a7f9f7d0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Query2Model.java ---------------------------------------------------------------------- diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Query2Model.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Query2Model.java deleted file mode 100644 index 7769e52..0000000 --- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Query2Model.java +++ /dev/null @@ -1,75 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.integration.nexmark; - -import java.io.Serializable; -import java.util.Collection; -import java.util.Iterator; - -import org.apache.beam.sdk.values.TimestampedValue; - -/** - * A direct implementation of {@link Query2}. - */ -public class Query2Model extends NexmarkQueryModel implements Serializable { - /** - * Simulator for query 2. - */ - private class Simulator extends AbstractSimulator<Event, AuctionPrice> { - public Simulator(NexmarkConfiguration configuration) { - super(NexmarkUtils.standardEventIterator(configuration)); - } - - @Override - protected void run() { - TimestampedValue<Event> timestampedEvent = nextInput(); - if (timestampedEvent == null) { - allDone(); - return; - } - Event event = timestampedEvent.getValue(); - if (event.bid == null) { - // Ignore non bid events. - return; - } - Bid bid = event.bid; - if (bid.auction % configuration.auctionSkip != 0) { - // Ignore bids for auctions we don't care about. - return; - } - AuctionPrice auctionPrice = new AuctionPrice(bid.auction, bid.price); - TimestampedValue<AuctionPrice> result = - TimestampedValue.of(auctionPrice, timestampedEvent.getTimestamp()); - addResult(result); - } - } - - public Query2Model(NexmarkConfiguration configuration) { - super(configuration); - } - - @Override - public AbstractSimulator<?, ?> simulator() { - return new Simulator(configuration); - } - - @Override - protected <T> Collection<String> toCollection(Iterator<TimestampedValue<T>> itr) { - return toValueTimestampOrder(itr); - } -}
