Refactor classes into packages The new hierarchy has logically based packages for: - drivers - io - model - queries - sources
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/a7f9f7d0 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/a7f9f7d0 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/a7f9f7d0 Branch: refs/heads/master Commit: a7f9f7d0784d9ba1f53ac4a0b49d2d81700720d0 Parents: 9ce9bf0 Author: Ismaël MejÃa <[email protected]> Authored: Thu Mar 23 19:32:45 2017 +0100 Committer: Ismaël MejÃa <[email protected]> Committed: Wed Aug 23 19:07:27 2017 +0200 ---------------------------------------------------------------------- .../java/nexmark/BEAM_ON_FLINK_ON_GCP.md | 2 +- integration/java/nexmark/README.md | 88 +-- integration/java/nexmark/pom.xml | 22 +- .../integration/nexmark/AbstractSimulator.java | 2 +- .../beam/integration/nexmark/Auction.java | 189 ------ .../beam/integration/nexmark/AuctionBid.java | 86 --- .../beam/integration/nexmark/AuctionCount.java | 89 --- .../beam/integration/nexmark/AuctionPrice.java | 90 --- .../apache/beam/integration/nexmark/Bid.java | 177 ------ .../integration/nexmark/BidsPerSession.java | 88 --- .../integration/nexmark/BoundedEventSource.java | 189 ------ .../beam/integration/nexmark/CategoryPrice.java | 99 ---- .../apache/beam/integration/nexmark/Done.java | 82 --- .../apache/beam/integration/nexmark/Event.java | 179 ------ .../beam/integration/nexmark/Generator.java | 589 ------------------ .../integration/nexmark/GeneratorConfig.java | 294 --------- .../beam/integration/nexmark/IdNameReserve.java | 99 ---- .../beam/integration/nexmark/KnownSize.java | 26 - .../beam/integration/nexmark/Monitor.java | 1 + .../integration/nexmark/NameCityStateId.java | 105 ---- .../integration/nexmark/NexmarkApexDriver.java | 48 -- .../integration/nexmark/NexmarkApexRunner.java | 61 -- .../nexmark/NexmarkConfiguration.java | 6 +- .../nexmark/NexmarkDirectDriver.java | 47 -- .../nexmark/NexmarkDirectRunner.java | 58 -- .../beam/integration/nexmark/NexmarkDriver.java | 5 +- .../integration/nexmark/NexmarkFlinkDriver.java | 48 -- .../integration/nexmark/NexmarkFlinkRunner.java | 53 -- .../nexmark/NexmarkGoogleDriver.java | 62 -- .../nexmark/NexmarkGoogleRunner.java | 159 ----- .../integration/nexmark/NexmarkOptions.java | 386 ++++++++++++ .../beam/integration/nexmark/NexmarkPerf.java | 4 +- .../beam/integration/nexmark/NexmarkQuery.java | 7 +- .../integration/nexmark/NexmarkQueryModel.java | 19 +- .../beam/integration/nexmark/NexmarkRunner.java | 40 +- .../integration/nexmark/NexmarkSparkDriver.java | 46 -- .../integration/nexmark/NexmarkSparkRunner.java | 54 -- .../beam/integration/nexmark/NexmarkSuite.java | 2 +- .../beam/integration/nexmark/NexmarkUtils.java | 25 +- .../beam/integration/nexmark/Options.java | 386 ------------ .../apache/beam/integration/nexmark/Person.java | 165 ------ .../beam/integration/nexmark/PubsubHelper.java | 216 ------- .../apache/beam/integration/nexmark/Query0.java | 67 --- .../beam/integration/nexmark/Query0Model.java | 62 -- .../apache/beam/integration/nexmark/Query1.java | 62 -- .../beam/integration/nexmark/Query10.java | 380 ------------ .../beam/integration/nexmark/Query11.java | 73 --- .../beam/integration/nexmark/Query12.java | 77 --- .../beam/integration/nexmark/Query1Model.java | 73 --- .../apache/beam/integration/nexmark/Query2.java | 73 --- .../beam/integration/nexmark/Query2Model.java | 75 --- .../apache/beam/integration/nexmark/Query3.java | 249 -------- .../beam/integration/nexmark/Query3Model.java | 118 ---- .../apache/beam/integration/nexmark/Query4.java | 107 ---- .../beam/integration/nexmark/Query4Model.java | 179 ------ .../apache/beam/integration/nexmark/Query5.java | 123 ---- .../beam/integration/nexmark/Query5Model.java | 172 ------ .../apache/beam/integration/nexmark/Query6.java | 151 ----- .../beam/integration/nexmark/Query6Model.java | 126 ---- .../apache/beam/integration/nexmark/Query7.java | 85 --- .../beam/integration/nexmark/Query7Model.java | 127 ---- .../apache/beam/integration/nexmark/Query8.java | 91 --- .../beam/integration/nexmark/Query8Model.java | 144 ----- .../apache/beam/integration/nexmark/Query9.java | 39 -- .../beam/integration/nexmark/Query9Model.java | 43 -- .../beam/integration/nexmark/SellerPrice.java | 90 --- .../nexmark/UnboundedEventSource.java | 328 ---------- .../beam/integration/nexmark/WinningBids.java | 11 +- .../nexmark/WinningBidsSimulator.java | 4 + .../nexmark/drivers/NexmarkApexDriver.java | 50 ++ .../nexmark/drivers/NexmarkApexRunner.java | 65 ++ .../nexmark/drivers/NexmarkDirectDriver.java | 49 ++ .../nexmark/drivers/NexmarkDirectRunner.java | 60 ++ .../nexmark/drivers/NexmarkFlinkDriver.java | 50 ++ .../nexmark/drivers/NexmarkFlinkRunner.java | 55 ++ .../nexmark/drivers/NexmarkGoogleDriver.java | 67 +++ .../nexmark/drivers/NexmarkGoogleRunner.java | 163 +++++ .../nexmark/drivers/NexmarkSparkDriver.java | 48 ++ .../nexmark/drivers/NexmarkSparkRunner.java | 56 ++ .../nexmark/drivers/package-info.java | 22 + .../integration/nexmark/io/PubsubHelper.java | 217 +++++++ .../integration/nexmark/io/package-info.java | 22 + .../beam/integration/nexmark/model/Auction.java | 190 ++++++ .../integration/nexmark/model/AuctionBid.java | 88 +++ .../integration/nexmark/model/AuctionCount.java | 90 +++ .../integration/nexmark/model/AuctionPrice.java | 91 +++ .../beam/integration/nexmark/model/Bid.java | 178 ++++++ .../nexmark/model/BidsPerSession.java | 89 +++ .../nexmark/model/CategoryPrice.java | 100 ++++ .../beam/integration/nexmark/model/Done.java | 83 +++ .../beam/integration/nexmark/model/Event.java | 179 ++++++ .../nexmark/model/IdNameReserve.java | 100 ++++ .../integration/nexmark/model/KnownSize.java | 26 + .../nexmark/model/NameCityStateId.java | 106 ++++ .../beam/integration/nexmark/model/Person.java | 166 ++++++ .../integration/nexmark/model/SellerPrice.java | 91 +++ .../integration/nexmark/model/package-info.java | 22 + .../beam/integration/nexmark/package-info.java | 2 +- .../integration/nexmark/queries/Query0.java | 72 +++ .../nexmark/queries/Query0Model.java | 67 +++ .../integration/nexmark/queries/Query1.java | 68 +++ .../integration/nexmark/queries/Query10.java | 384 ++++++++++++ .../integration/nexmark/queries/Query11.java | 80 +++ .../integration/nexmark/queries/Query12.java | 84 +++ .../nexmark/queries/Query1Model.java | 79 +++ .../integration/nexmark/queries/Query2.java | 80 +++ .../nexmark/queries/Query2Model.java | 82 +++ .../integration/nexmark/queries/Query3.java | 256 ++++++++ .../nexmark/queries/Query3Model.java | 126 ++++ .../integration/nexmark/queries/Query4.java | 118 ++++ .../nexmark/queries/Query4Model.java | 188 ++++++ .../integration/nexmark/queries/Query5.java | 129 ++++ .../nexmark/queries/Query5Model.java | 178 ++++++ .../integration/nexmark/queries/Query6.java | 159 +++++ .../nexmark/queries/Query6Model.java | 135 +++++ .../integration/nexmark/queries/Query7.java | 91 +++ .../nexmark/queries/Query7Model.java | 133 +++++ .../integration/nexmark/queries/Query8.java | 98 +++ .../nexmark/queries/Query8Model.java | 150 +++++ .../integration/nexmark/queries/Query9.java | 46 ++ .../nexmark/queries/Query9Model.java | 47 ++ .../nexmark/queries/package-info.java | 22 + .../nexmark/sources/BoundedEventSource.java | 190 ++++++ .../integration/nexmark/sources/Generator.java | 593 +++++++++++++++++++ .../nexmark/sources/GeneratorConfig.java | 296 +++++++++ .../nexmark/sources/UnboundedEventSource.java | 330 +++++++++++ .../nexmark/sources/package-info.java | 22 + .../nexmark/src/main/resources/log4j.properties | 4 + .../nexmark/BoundedEventSourceTest.java | 70 --- .../beam/integration/nexmark/GeneratorTest.java | 110 ---- .../beam/integration/nexmark/QueryTest.java | 107 ---- .../nexmark/UnboundedEventSourceTest.java | 108 ---- .../integration/nexmark/queries/QueryTest.java | 111 ++++ .../nexmark/sources/BoundedEventSourceTest.java | 71 +++ .../nexmark/sources/GeneratorTest.java | 111 ++++ .../sources/UnboundedEventSourceTest.java | 110 ++++ 136 files changed, 7768 insertions(+), 7384 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/a7f9f7d0/integration/java/nexmark/BEAM_ON_FLINK_ON_GCP.md ---------------------------------------------------------------------- diff --git a/integration/java/nexmark/BEAM_ON_FLINK_ON_GCP.md b/integration/java/nexmark/BEAM_ON_FLINK_ON_GCP.md index d1b51e8..6a7fd34 100644 --- a/integration/java/nexmark/BEAM_ON_FLINK_ON_GCP.md +++ b/integration/java/nexmark/BEAM_ON_FLINK_ON_GCP.md @@ -243,7 +243,7 @@ $GCLOUD compute ssh \ --zone=$ZONE \ $MASTER \ --command "~/$FLINK_VER/bin/flink run \ - -c org.apache.beam.integration.nexmark.NexmarkFlinkDriver \ + -c org.apache.beam.integration.nexmark.drivers.NexmarkFlinkDriver \ ~/$FLINK_VER/lib/java-integration-all-bundled-0.2.0-incubating-SNAPSHOT.jar \ --project=$PROJECT \ --streaming=true \ http://git-wip-us.apache.org/repos/asf/beam/blob/a7f9f7d0/integration/java/nexmark/README.md ---------------------------------------------------------------------- diff --git a/integration/java/nexmark/README.md b/integration/java/nexmark/README.md index 7a91ab2..a3549f4 100644 --- a/integration/java/nexmark/README.md +++ b/integration/java/nexmark/README.md @@ -74,14 +74,15 @@ We have augmented the original queries with five more: The queries can be executed using a 'Driver' for a given backend. Currently the supported drivers are: +* **NexmarkApexDriver** for running via the Apex runner. * **NexmarkDirectDriver** for running locally on a single machine. -* **NexmarkGoogleDriver** for running on the Google Cloud Dataflow - service. Requires a Google Cloud account. +* **NexmarkGoogleDriver** for running on the Google Cloud Dataflow service. + Requires a Google Cloud account. * **NexmarkFlinkDriver** for running on a Flink cluster. Requires the cluster to be established and the Nexmark jar to be distributed to each worker. * **NexmarkSparkDriver** for running on a Spark cluster. - + Other drivers are straightforward. Test data is deterministically synthesized on demand. The test @@ -103,9 +104,21 @@ the Google Cloud Dataflow driver. # Configuration -Common configuration parameters: +## Common configuration parameters + +Decide if batch or streaming: + + --streaming=true + +Number of events generators + + --numEventGenerators=4 + +Run query N -Available Suites: + --query=N + +## Available Suites - DEFAULT: Test default configuration with query 0. - SMOKE: Run the 12 default configurations. @@ -114,32 +127,39 @@ Available Suites: --suite=SMOKE -Decide if batch or streaming: - - --streaming=true +### Apex specific configuration -Number of events generators + --suite=SMOKE --manageResources=false --monitorJobs=true - --numEventGenerators=4 +### Dataflow specific configuration -## Apex specific configuration + --query=0 --suite=SMOKE --manageResources=false --monitorJobs=true \ + --enforceEncodability=false --enforceImmutability=false + --project=<your project> \ + --zone=<your zone> \ + --workerMachineType=n1-highmem-8 \ + --stagingLocation=<a gs path for staging> ---suite=SMOKE --manageResources=false --monitorJobs=true + --runner=BlockingDataflowRunner \ + --tempLocation=gs://talend-imejia/nexmark/temp/ \ + --stagingLocation=gs://talend-imejia/nexmark/temp/staging/ \ + --filesToStage=target/beam-integration-java-0.7.0-SNAPSHOT.jar -## Direct specific configuration +### Direct specific configuration ---suite=SMOKE --manageResources=false --monitorJobs=true \ ---enforceEncodability=false --enforceImmutability=false + --suite=SMOKE --manageResources=false --monitorJobs=true \ + --enforceEncodability=false --enforceImmutability=false -## Flink specific configuration +### Flink specific configuration ---suite=SMOKE --manageResources=false --monitorJobs=true \ ---flinkMaster=local + --suite=SMOKE --manageResources=false --monitorJobs=true \ + --flinkMaster=[local] --parallelism=#numcores -## Spark specific configuration +### Spark specific configuration ---suite=SMOKE --manageResources=false --monitorJobs=true --sparkMaster=local \ --Dspark.ui.enabled=false -DSPARK_LOCAL_IP=localhost -Dsun.io.serialization.extendedDebugInfo=true + --suite=SMOKE --manageResources=false --monitorJobs=true \ + --sparkMaster=local \ + -Dspark.ui.enabled=false -DSPARK_LOCAL_IP=localhost -Dsun.io.serialization.extendedDebugInfo=true # Current Status @@ -149,19 +169,19 @@ Open issues are tracked [here](https://github.com../../../../../issues): | Query | Direct | Spark | Flink | Apex | | ----: | ------------------------------ | ------------------------------ | ------------------------------ | ------------------------------- | -| 0 | ok | [#1](../../../../../issues/1) | ok | ok | -| 1 | ok | [#1](../../../../../issues/1) | ok | ok | -| 2 | ok | [#1](../../../../../issues/1) | ok | ok | +| 0 | ok | ok | ok | ok | +| 1 | ok | ok | ok | ok | +| 2 | ok | ok | ok | ok | | 3 | [#7](../../../../../issues/7) | [#7](../../../../../issues/7) | [#7](../../../../../issues/7) | [#7](../../../../../issues/7) | | 4 | ok | ok | [#2](../../../../../issues/2) | ok | -| 5 | ok | [#3](../../../../../issues/3) | ok | ok | +| 5 | ok | ok | ok | ok | | 6 | ok | ok | [#2](../../../../../issues/2) | ok | -| 7 | ok | [#1](../../../../../issues/1) | ok | [#24](../../../../../issues/24) | -| 8 | ok | [#1](../../../../../issues/1) | ok | ok | +| 7 | ok | ok | ok | [#24](../../../../../issues/24) | +| 8 | ok | ok | ok | ok | | 9 | ok | ok | [#2](../../../../../issues/2) | ok | -| 10 | [#5](../../../../../issues/5) | [#4](../../../../../issues/4) | ok | ok | -| 11 | ok | [#1](../../../../../issues/1) | ok | ok | -| 12 | ok | [#1](../../../../../issues/1) | ok | ok | +| 10 | [#5](../../../../../issues/5) | ok | ok | ok | +| 11 | ok | ok | ok | ok | +| 12 | ok | ok | ok | ok | ## Streaming / Synthetic / Local @@ -205,11 +225,11 @@ Batch Mode -Dexec.classpathScope="test" - mvn exec:java -Dexec.mainClass=org.apache.beam.integration.nexmark.NexmarkDirectDriver -Dexec.args="--suite=SMOKE --streaming=false --numEventGenerators=4 --manageResources=false --monitorJobs=false --enforceEncodability=false --enforceImmutability=false" + mvn exec:java -Dexec.mainClass=org.apache.beam.integration.nexmark.drivers.NexmarkDirectDriver -Dexec.args="--suite=SMOKE --streaming=false --numEventGenerators=4 --manageResources=false --monitorJobs=false --enforceEncodability=false --enforceImmutability=false" Streaming Mode - mvn exec:java -Dexec.mainClass=org.apache.beam.integration.nexmark.NexmarkDirectDriver -Dexec.args="--suite=SMOKE --streaming=true --numEventGenerators=4 --manageResources=false --monitorJobs=false --enforceEncodability=false --enforceImmutability=false" + mvn exec:java -Dexec.mainClass=org.apache.beam.integration.nexmark.drivers.NexmarkDirectDriver -Dexec.args="--suite=SMOKE --streaming=true --numEventGenerators=4 --manageResources=false --monitorJobs=false --enforceEncodability=false --enforceImmutability=false" ## Running on Google Cloud Dataflow @@ -218,7 +238,7 @@ service. ``` java -cp integration/java/target/java-integration-all-bundled-0.2.0-incubating-SNAPSHOT.jar \ - org.apache.beam.integration.nexmark.NexmarkGoogleDriver \ + org.apache.beam.integration.nexmark.drivers.NexmarkGoogleDriver \ --project=<your project> \ --zone=<your zone> \ --workerMachineType=n1-highmem-8 \ @@ -251,7 +271,7 @@ java -cp integration/java/target/java-integration-all-bundled-0.2.0-incubating-S ``` java -cp integration/java/target/java-integration-all-bundled-0.2.0-incubating-SNAPSHOT.jar \ - org.apache.beam.integration.nexmark.NexmarkGoogleDriver \ + org.apache.beam.integration.nexmark.drivers.NexmarkGoogleDriver \ --project=<your project> \ --zone=<your zone> \ --workerMachineType=n1-highmem-8 \ http://git-wip-us.apache.org/repos/asf/beam/blob/a7f9f7d0/integration/java/nexmark/pom.xml ---------------------------------------------------------------------- diff --git a/integration/java/nexmark/pom.xml b/integration/java/nexmark/pom.xml index 0ecc298..7cd7d39 100644 --- a/integration/java/nexmark/pom.xml +++ b/integration/java/nexmark/pom.xml @@ -61,11 +61,6 @@ <plugin> <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-dependency-plugin</artifactId> - </plugin> - - <plugin> - <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-checkstyle-plugin</artifactId> </plugin> @@ -139,7 +134,6 @@ <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-dependency-plugin</artifactId> - <version>2.10</version> <executions> <execution> <goals><goal>analyze-only</goal></goals> @@ -196,11 +190,13 @@ <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.10</artifactId> <version>${spark.version}</version> + <scope>runtime</scope> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming_2.10</artifactId> <version>${spark.version}</version> + <scope>runtime</scope> </dependency> <!-- Apex runner --> @@ -215,12 +211,6 @@ <scope>runtime</scope> </dependency> <dependency> - <groupId>com.fasterxml.jackson.core</groupId> - <artifactId>jackson-databind</artifactId> - <version>${jackson.version}</version> - <scope>runtime</scope> - </dependency> - <dependency> <groupId>org.codehaus.jackson</groupId> <artifactId>jackson-mapper-asl</artifactId> <version>${apex.codehaus.jackson.version}</version> @@ -244,6 +234,7 @@ <groupId>com.google.apis</groupId> <artifactId>google-api-services-dataflow</artifactId> <version>${dataflow.version}</version> + <scope>runtime</scope> </dependency> <dependency> @@ -289,13 +280,6 @@ <dependency> <groupId>org.hamcrest</groupId> <artifactId>hamcrest-all</artifactId> - <version>${hamcrest.version}</version> - </dependency> - - <dependency> - <groupId>org.hamcrest</groupId> - <artifactId>hamcrest-library</artifactId> - <version>${hamcrest.version}</version> </dependency> <dependency> http://git-wip-us.apache.org/repos/asf/beam/blob/a7f9f7d0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/AbstractSimulator.java ---------------------------------------------------------------------- diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/AbstractSimulator.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/AbstractSimulator.java index c08cdd3..b012842 100644 --- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/AbstractSimulator.java +++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/AbstractSimulator.java @@ -34,7 +34,7 @@ import org.joda.time.Instant; * @param <InputT> Type of input elements. * @param <OutputT> Type of output elements. */ -abstract class AbstractSimulator<InputT, OutputT> { +public abstract class AbstractSimulator<InputT, OutputT> { /** Window size for action bucket sampling. */ public static final Duration WINDOW_SIZE = Duration.standardMinutes(1); http://git-wip-us.apache.org/repos/asf/beam/blob/a7f9f7d0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Auction.java ---------------------------------------------------------------------- diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Auction.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Auction.java deleted file mode 100644 index 16c28aa..0000000 --- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Auction.java +++ /dev/null @@ -1,189 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.integration.nexmark; - -import com.fasterxml.jackson.annotation.JsonProperty; -import com.fasterxml.jackson.core.JsonProcessingException; - -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.io.Serializable; - -import org.apache.beam.sdk.coders.AtomicCoder; -import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.coders.CoderException; -import org.apache.beam.sdk.coders.StringUtf8Coder; -import org.apache.beam.sdk.coders.VarLongCoder; - -/** - * An auction submitted by a person. - */ -public class Auction implements KnownSize, Serializable { - private static final Coder<Long> LONG_CODER = VarLongCoder.of(); - private static final Coder<String> STRING_CODER = StringUtf8Coder.of(); - - public static final Coder<Auction> CODER = new AtomicCoder<Auction>() { - @Override - public void encode(Auction value, OutputStream outStream, - Coder.Context context) - throws CoderException, IOException { - LONG_CODER.encode(value.id, outStream, Context.NESTED); - STRING_CODER.encode(value.itemName, outStream, Context.NESTED); - STRING_CODER.encode(value.description, outStream, Context.NESTED); - LONG_CODER.encode(value.initialBid, outStream, Context.NESTED); - LONG_CODER.encode(value.reserve, outStream, Context.NESTED); - LONG_CODER.encode(value.dateTime, outStream, Context.NESTED); - LONG_CODER.encode(value.expires, outStream, Context.NESTED); - LONG_CODER.encode(value.seller, outStream, Context.NESTED); - LONG_CODER.encode(value.category, outStream, Context.NESTED); - STRING_CODER.encode(value.extra, outStream, Context.NESTED); - } - - @Override - public Auction decode( - InputStream inStream, Coder.Context context) - throws CoderException, IOException { - long id = LONG_CODER.decode(inStream, Context.NESTED); - String itemName = STRING_CODER.decode(inStream, Context.NESTED); - String description = STRING_CODER.decode(inStream, Context.NESTED); - long initialBid = LONG_CODER.decode(inStream, Context.NESTED); - long reserve = LONG_CODER.decode(inStream, Context.NESTED); - long dateTime = LONG_CODER.decode(inStream, Context.NESTED); - long expires = LONG_CODER.decode(inStream, Context.NESTED); - long seller = LONG_CODER.decode(inStream, Context.NESTED); - long category = LONG_CODER.decode(inStream, Context.NESTED); - String extra = STRING_CODER.decode(inStream, Context.NESTED); - return new Auction( - id, itemName, description, initialBid, reserve, dateTime, expires, seller, category, - extra); - } - }; - - - /** Id of auction. */ - @JsonProperty - public final long id; // primary key - - /** Extra auction properties. */ - @JsonProperty - public final String itemName; - - @JsonProperty - public final String description; - - /** Initial bid price, in cents. */ - @JsonProperty - public final long initialBid; - - /** Reserve price, in cents. */ - @JsonProperty - public final long reserve; - - @JsonProperty - public final long dateTime; - - /** When does auction expire? (ms since epoch). Bids at or after this time are ignored. */ - @JsonProperty - public final long expires; - - /** Id of person who instigated auction. */ - @JsonProperty - public final long seller; // foreign key: Person.id - - /** Id of category auction is listed under. */ - @JsonProperty - public final long category; // foreign key: Category.id - - /** Additional arbitrary payload for performance testing. */ - @JsonProperty - public final String extra; - - - // For Avro only. - @SuppressWarnings("unused") - private Auction() { - id = 0; - itemName = null; - description = null; - initialBid = 0; - reserve = 0; - dateTime = 0; - expires = 0; - seller = 0; - category = 0; - extra = null; - } - - public Auction(long id, String itemName, String description, long initialBid, long reserve, - long dateTime, long expires, long seller, long category, String extra) { - this.id = id; - this.itemName = itemName; - this.description = description; - this.initialBid = initialBid; - this.reserve = reserve; - this.dateTime = dateTime; - this.expires = expires; - this.seller = seller; - this.category = category; - this.extra = extra; - } - - /** - * Return a copy of auction which capture the given annotation. - * (Used for debugging). - */ - public Auction withAnnotation(String annotation) { - return new Auction(id, itemName, description, initialBid, reserve, dateTime, expires, seller, - category, annotation + ": " + extra); - } - - /** - * Does auction have {@code annotation}? (Used for debugging.) - */ - public boolean hasAnnotation(String annotation) { - return extra.startsWith(annotation + ": "); - } - - /** - * Remove {@code annotation} from auction. (Used for debugging.) - */ - public Auction withoutAnnotation(String annotation) { - if (hasAnnotation(annotation)) { - return new Auction(id, itemName, description, initialBid, reserve, dateTime, expires, seller, - category, extra.substring(annotation.length() + 2)); - } else { - return this; - } - } - - @Override - public long sizeInBytes() { - return 8 + itemName.length() + 1 + description.length() + 1 + 8 + 8 + 8 + 8 + 8 + 8 - + extra.length() + 1; - } - - @Override - public String toString() { - try { - return NexmarkUtils.MAPPER.writeValueAsString(this); - } catch (JsonProcessingException e) { - throw new RuntimeException(e); - } - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/a7f9f7d0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/AuctionBid.java ---------------------------------------------------------------------- diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/AuctionBid.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/AuctionBid.java deleted file mode 100644 index cd52b02..0000000 --- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/AuctionBid.java +++ /dev/null @@ -1,86 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.integration.nexmark; - -import com.fasterxml.jackson.annotation.JsonProperty; -import com.fasterxml.jackson.core.JsonProcessingException; - -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.io.Serializable; - -import org.apache.beam.sdk.coders.AtomicCoder; -import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.coders.CoderException; - -/** - * Result of {@link WinningBids} transform. - */ -public class AuctionBid implements KnownSize, Serializable { - public static final Coder<AuctionBid> CODER = new AtomicCoder<AuctionBid>() { - @Override - public void encode(AuctionBid value, OutputStream outStream, - Coder.Context context) - throws CoderException, IOException { - Auction.CODER.encode(value.auction, outStream, Context.NESTED); - Bid.CODER.encode(value.bid, outStream, Context.NESTED); - } - - @Override - public AuctionBid decode( - InputStream inStream, Coder.Context context) - throws CoderException, IOException { - Auction auction = Auction.CODER.decode(inStream, Context.NESTED); - Bid bid = Bid.CODER.decode(inStream, Context.NESTED); - return new AuctionBid(auction, bid); - } - }; - - @JsonProperty - public final Auction auction; - - @JsonProperty - public final Bid bid; - - // For Avro only. - @SuppressWarnings("unused") - private AuctionBid() { - auction = null; - bid = null; - } - - public AuctionBid(Auction auction, Bid bid) { - this.auction = auction; - this.bid = bid; - } - - @Override - public long sizeInBytes() { - return auction.sizeInBytes() + bid.sizeInBytes(); - } - - @Override - public String toString() { - try { - return NexmarkUtils.MAPPER.writeValueAsString(this); - } catch (JsonProcessingException e) { - throw new RuntimeException(e); - } - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/a7f9f7d0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/AuctionCount.java ---------------------------------------------------------------------- diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/AuctionCount.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/AuctionCount.java deleted file mode 100644 index ac1f080..0000000 --- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/AuctionCount.java +++ /dev/null @@ -1,89 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.integration.nexmark; - -import com.fasterxml.jackson.annotation.JsonProperty; -import com.fasterxml.jackson.core.JsonProcessingException; - -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.io.Serializable; - -import org.apache.beam.sdk.coders.AtomicCoder; -import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.coders.CoderException; -import org.apache.beam.sdk.coders.VarLongCoder; - -/** - * Result of {@link Query5}. - */ -public class AuctionCount implements KnownSize, Serializable { - private static final Coder<Long> LONG_CODER = VarLongCoder.of(); - - public static final Coder<AuctionCount> CODER = new AtomicCoder<AuctionCount>() { - @Override - public void encode(AuctionCount value, OutputStream outStream, - Coder.Context context) - throws CoderException, IOException { - LONG_CODER.encode(value.auction, outStream, Context.NESTED); - LONG_CODER.encode(value.count, outStream, Context.NESTED); - } - - @Override - public AuctionCount decode( - InputStream inStream, Coder.Context context) - throws CoderException, IOException { - long auction = LONG_CODER.decode(inStream, Context.NESTED); - long count = LONG_CODER.decode(inStream, Context.NESTED); - return new AuctionCount(auction, count); - } - }; - - @JsonProperty - public final long auction; - - @JsonProperty - public final long count; - - // For Avro only. - @SuppressWarnings("unused") - private AuctionCount() { - auction = 0; - count = 0; - } - - public AuctionCount(long auction, long count) { - this.auction = auction; - this.count = count; - } - - @Override - public long sizeInBytes() { - return 8 + 8; - } - - @Override - public String toString() { - try { - return NexmarkUtils.MAPPER.writeValueAsString(this); - } catch (JsonProcessingException e) { - throw new RuntimeException(e); - } - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/a7f9f7d0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/AuctionPrice.java ---------------------------------------------------------------------- diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/AuctionPrice.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/AuctionPrice.java deleted file mode 100644 index 9bdf11c..0000000 --- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/AuctionPrice.java +++ /dev/null @@ -1,90 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.integration.nexmark; - -import com.fasterxml.jackson.annotation.JsonProperty; -import com.fasterxml.jackson.core.JsonProcessingException; - -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.io.Serializable; - -import org.apache.beam.sdk.coders.AtomicCoder; -import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.coders.CoderException; -import org.apache.beam.sdk.coders.VarLongCoder; - -/** - * Result of {@link Query2}. - */ -public class AuctionPrice implements KnownSize, Serializable { - private static final Coder<Long> LONG_CODER = VarLongCoder.of(); - - public static final Coder<AuctionPrice> CODER = new AtomicCoder<AuctionPrice>() { - @Override - public void encode(AuctionPrice value, OutputStream outStream, - Coder.Context context) - throws CoderException, IOException { - LONG_CODER.encode(value.auction, outStream, Context.NESTED); - LONG_CODER.encode(value.price, outStream, Context.NESTED); - } - - @Override - public AuctionPrice decode( - InputStream inStream, Coder.Context context) - throws CoderException, IOException { - long auction = LONG_CODER.decode(inStream, Context.NESTED); - long price = LONG_CODER.decode(inStream, Context.NESTED); - return new AuctionPrice(auction, price); - } - }; - - @JsonProperty - public final long auction; - - /** Price in cents. */ - @JsonProperty - public final long price; - - // For Avro only. - @SuppressWarnings("unused") - private AuctionPrice() { - auction = 0; - price = 0; - } - - public AuctionPrice(long auction, long price) { - this.auction = auction; - this.price = price; - } - - @Override - public long sizeInBytes() { - return 8 + 8; - } - - @Override - public String toString() { - try { - return NexmarkUtils.MAPPER.writeValueAsString(this); - } catch (JsonProcessingException e) { - throw new RuntimeException(e); - } - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/a7f9f7d0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Bid.java ---------------------------------------------------------------------- diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Bid.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Bid.java deleted file mode 100644 index 04fcfdd..0000000 --- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Bid.java +++ /dev/null @@ -1,177 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.integration.nexmark; - -import com.fasterxml.jackson.annotation.JsonProperty; -import com.fasterxml.jackson.core.JsonProcessingException; - -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.io.Serializable; -import java.util.Comparator; - -import org.apache.beam.sdk.coders.AtomicCoder; -import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.coders.CoderException; -import org.apache.beam.sdk.coders.StringUtf8Coder; -import org.apache.beam.sdk.coders.VarLongCoder; - -/** - * A bid for an item on auction. - */ -public class Bid implements KnownSize, Serializable { - private static final Coder<Long> LONG_CODER = VarLongCoder.of(); - private static final Coder<String> STRING_CODER = StringUtf8Coder.of(); - - public static final Coder<Bid> CODER = new AtomicCoder<Bid>() { - @Override - public void encode(Bid value, OutputStream outStream, - Coder.Context context) - throws CoderException, IOException { - LONG_CODER.encode(value.auction, outStream, Context.NESTED); - LONG_CODER.encode(value.bidder, outStream, Context.NESTED); - LONG_CODER.encode(value.price, outStream, Context.NESTED); - LONG_CODER.encode(value.dateTime, outStream, Context.NESTED); - STRING_CODER.encode(value.extra, outStream, Context.NESTED); - } - - @Override - public Bid decode( - InputStream inStream, Coder.Context context) - throws CoderException, IOException { - long auction = LONG_CODER.decode(inStream, Context.NESTED); - long bidder = LONG_CODER.decode(inStream, Context.NESTED); - long price = LONG_CODER.decode(inStream, Context.NESTED); - long dateTime = LONG_CODER.decode(inStream, Context.NESTED); - String extra = STRING_CODER.decode(inStream, Context.NESTED); - return new Bid(auction, bidder, price, dateTime, extra); - } - }; - - /** - * Comparator to order bids by ascending price then descending time - * (for finding winning bids). - */ - public static final Comparator<Bid> PRICE_THEN_DESCENDING_TIME = new Comparator<Bid>() { - @Override - public int compare(Bid left, Bid right) { - int i = Double.compare(left.price, right.price); - if (i != 0) { - return i; - } - return Long.compare(right.dateTime, left.dateTime); - } - }; - - /** - * Comparator to order bids by ascending time then ascending price. - * (for finding most recent bids). - */ - public static final Comparator<Bid> ASCENDING_TIME_THEN_PRICE = new Comparator<Bid>() { - @Override - public int compare(Bid left, Bid right) { - int i = Long.compare(left.dateTime, right.dateTime); - if (i != 0) { - return i; - } - return Double.compare(left.price, right.price); - } - }; - - /** Id of auction this bid is for. */ - @JsonProperty - public final long auction; // foreign key: Auction.id - - /** Id of person bidding in auction. */ - @JsonProperty - public final long bidder; // foreign key: Person.id - - /** Price of bid, in cents. */ - @JsonProperty - public final long price; - - /** - * Instant at which bid was made (ms since epoch). - * NOTE: This may be earlier than the system's event time. - */ - @JsonProperty - public final long dateTime; - - /** Additional arbitrary payload for performance testing. */ - @JsonProperty - public final String extra; - - // For Avro only. - @SuppressWarnings("unused") - private Bid() { - auction = 0; - bidder = 0; - price = 0; - dateTime = 0; - extra = null; - } - - public Bid(long auction, long bidder, long price, long dateTime, String extra) { - this.auction = auction; - this.bidder = bidder; - this.price = price; - this.dateTime = dateTime; - this.extra = extra; - } - - /** - * Return a copy of bid which capture the given annotation. - * (Used for debugging). - */ - public Bid withAnnotation(String annotation) { - return new Bid(auction, bidder, price, dateTime, annotation + ": " + extra); - } - - /** - * Does bid have {@code annotation}? (Used for debugging.) - */ - public boolean hasAnnotation(String annotation) { - return extra.startsWith(annotation + ": "); - } - - /** - * Remove {@code annotation} from bid. (Used for debugging.) - */ - public Bid withoutAnnotation(String annotation) { - if (hasAnnotation(annotation)) { - return new Bid(auction, bidder, price, dateTime, extra.substring(annotation.length() + 2)); - } else { - return this; - } - } - - @Override - public long sizeInBytes() { - return 8 + 8 + 8 + 8 + extra.length() + 1; - } - - @Override - public String toString() { - try { - return NexmarkUtils.MAPPER.writeValueAsString(this); - } catch (JsonProcessingException e) { - throw new RuntimeException(e); - } - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/a7f9f7d0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/BidsPerSession.java ---------------------------------------------------------------------- diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/BidsPerSession.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/BidsPerSession.java deleted file mode 100644 index c6b0fe3..0000000 --- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/BidsPerSession.java +++ /dev/null @@ -1,88 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.integration.nexmark; - -import com.fasterxml.jackson.annotation.JsonProperty; -import com.fasterxml.jackson.core.JsonProcessingException; - -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.io.Serializable; - -import org.apache.beam.sdk.coders.AtomicCoder; -import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.coders.CoderException; -import org.apache.beam.sdk.coders.VarLongCoder; - -/** - * Result of query 11. - */ -public class BidsPerSession implements KnownSize, Serializable { - private static final Coder<Long> LONG_CODER = VarLongCoder.of(); - - public static final Coder<BidsPerSession> CODER = new AtomicCoder<BidsPerSession>() { - @Override - public void encode(BidsPerSession value, OutputStream outStream, - Coder.Context context) - throws CoderException, IOException { - LONG_CODER.encode(value.personId, outStream, Context.NESTED); - LONG_CODER.encode(value.bidsPerSession, outStream, Context.NESTED); - } - - @Override - public BidsPerSession decode( - InputStream inStream, Coder.Context context) - throws CoderException, IOException { - long personId = LONG_CODER.decode(inStream, Context.NESTED); - long bidsPerSession = LONG_CODER.decode(inStream, Context.NESTED); - return new BidsPerSession(personId, bidsPerSession); - } - }; - - @JsonProperty - public final long personId; - - @JsonProperty - public final long bidsPerSession; - - public BidsPerSession() { - personId = 0; - bidsPerSession = 0; - } - - public BidsPerSession(long personId, long bidsPerSession) { - this.personId = personId; - this.bidsPerSession = bidsPerSession; - } - - @Override - public long sizeInBytes() { - // Two longs. - return 8 + 8; - } - - @Override - public String toString() { - try { - return NexmarkUtils.MAPPER.writeValueAsString(this); - } catch (JsonProcessingException e) { - throw new RuntimeException(e); - } - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/a7f9f7d0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/BoundedEventSource.java ---------------------------------------------------------------------- diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/BoundedEventSource.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/BoundedEventSource.java deleted file mode 100644 index 7dc1bcc..0000000 --- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/BoundedEventSource.java +++ /dev/null @@ -1,189 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.integration.nexmark; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.List; -import java.util.NoSuchElementException; -import javax.annotation.Nullable; - -import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.io.BoundedSource; -import org.apache.beam.sdk.options.PipelineOptions; -import org.apache.beam.sdk.values.TimestampedValue; -import org.joda.time.Instant; - -/** - * A custom, bounded source of event records. - */ -class BoundedEventSource extends BoundedSource<Event> { - /** Configuration we generate events against. */ - private final GeneratorConfig config; - - /** How many bounded sources to create. */ - private final int numEventGenerators; - - public BoundedEventSource(GeneratorConfig config, int numEventGenerators) { - this.config = config; - this.numEventGenerators = numEventGenerators; - } - - /** A reader to pull events from the generator. */ - private static class EventReader extends BoundedReader<Event> { - /** - * Event source we purporting to be reading from. - * (We can't use Java's capture-outer-class pointer since we must update - * this field on calls to splitAtFraction.) - */ - private BoundedEventSource source; - - /** Generator we are reading from. */ - private final Generator generator; - - private boolean reportedStop; - - @Nullable - private TimestampedValue<Event> currentEvent; - - public EventReader(BoundedEventSource source, GeneratorConfig config) { - this.source = source; - generator = new Generator(config); - reportedStop = false; - } - - @Override - public synchronized boolean start() { - NexmarkUtils.info("starting bounded generator %s", generator); - return advance(); - } - - @Override - public synchronized boolean advance() { - if (!generator.hasNext()) { - // No more events. - if (!reportedStop) { - reportedStop = true; - NexmarkUtils.info("stopped bounded generator %s", generator); - } - return false; - } - currentEvent = generator.next(); - return true; - } - - @Override - public synchronized Event getCurrent() throws NoSuchElementException { - if (currentEvent == null) { - throw new NoSuchElementException(); - } - return currentEvent.getValue(); - } - - @Override - public synchronized Instant getCurrentTimestamp() throws NoSuchElementException { - if (currentEvent == null) { - throw new NoSuchElementException(); - } - return currentEvent.getTimestamp(); - } - - @Override - public void close() throws IOException { - // Nothing to close. - } - - @Override - public synchronized Double getFractionConsumed() { - return generator.getFractionConsumed(); - } - - @Override - public synchronized BoundedSource<Event> getCurrentSource() { - return source; - } - - @Override - @Nullable - public synchronized BoundedEventSource splitAtFraction(double fraction) { - long startId = generator.getCurrentConfig().getStartEventId(); - long stopId = generator.getCurrentConfig().getStopEventId(); - long size = stopId - startId; - long splitEventId = startId + Math.min((int) (size * fraction), size); - if (splitEventId <= generator.getNextEventId() || splitEventId == stopId) { - // Already passed this position or split results in left or right being empty. - NexmarkUtils.info("split failed for bounded generator %s at %f", generator, fraction); - return null; - } - - NexmarkUtils.info("about to split bounded generator %s at %d", generator, splitEventId); - - // Scale back the event space of the current generator, and return a generator config - // representing the event space we just 'stole' from the current generator. - GeneratorConfig remainingConfig = generator.splitAtEventId(splitEventId); - - NexmarkUtils.info("split bounded generator into %s and %s", generator, remainingConfig); - - // At this point - // generator.events() ++ new Generator(remainingConfig).events() - // == originalGenerator.events() - - // We need a new source to represent the now smaller key space for this reader, so - // that we can maintain the invariant that - // this.getCurrentSource().createReader(...) - // will yield the same output as this. - source = new BoundedEventSource(generator.getCurrentConfig(), source.numEventGenerators); - - // Return a source from which we may read the 'stolen' event space. - return new BoundedEventSource(remainingConfig, source.numEventGenerators); - } - } - - @Override - public List<BoundedEventSource> splitIntoBundles( - long desiredBundleSizeBytes, PipelineOptions options) { - NexmarkUtils.info("slitting bounded source %s into %d sub-sources", config, numEventGenerators); - List<BoundedEventSource> results = new ArrayList<>(); - // Ignore desiredBundleSizeBytes and use numEventGenerators instead. - for (GeneratorConfig subConfig : config.split(numEventGenerators)) { - results.add(new BoundedEventSource(subConfig, 1)); - } - return results; - } - - @Override - public long getEstimatedSizeBytes(PipelineOptions options) { - return config.getEstimatedSizeBytes(); - } - - @Override - public EventReader createReader(PipelineOptions options) { - NexmarkUtils.info("creating initial bounded reader for %s", config); - return new EventReader(this, config); - } - - @Override - public void validate() { - // Nothing to validate. - } - - @Override - public Coder<Event> getDefaultOutputCoder() { - return Event.CODER; - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/a7f9f7d0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/CategoryPrice.java ---------------------------------------------------------------------- diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/CategoryPrice.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/CategoryPrice.java deleted file mode 100644 index c83fb17..0000000 --- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/CategoryPrice.java +++ /dev/null @@ -1,99 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.integration.nexmark; - -import com.fasterxml.jackson.annotation.JsonProperty; -import com.fasterxml.jackson.core.JsonProcessingException; - -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.io.Serializable; - -import org.apache.beam.sdk.coders.AtomicCoder; -import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.coders.CoderException; -import org.apache.beam.sdk.coders.VarIntCoder; -import org.apache.beam.sdk.coders.VarLongCoder; - -/** - * Result of {@link Query4}. - */ -public class CategoryPrice implements KnownSize, Serializable { - private static final Coder<Long> LONG_CODER = VarLongCoder.of(); - private static final Coder<Integer> INT_CODER = VarIntCoder.of(); - - public static final Coder<CategoryPrice> CODER = new AtomicCoder<CategoryPrice>() { - @Override - public void encode(CategoryPrice value, OutputStream outStream, - Coder.Context context) - throws CoderException, IOException { - LONG_CODER.encode(value.category, outStream, Context.NESTED); - LONG_CODER.encode(value.price, outStream, Context.NESTED); - INT_CODER.encode(value.isLast ? 1 : 0, outStream, Context.NESTED); - } - - @Override - public CategoryPrice decode( - InputStream inStream, Coder.Context context) - throws CoderException, IOException { - long category = LONG_CODER.decode(inStream, Context.NESTED); - long price = LONG_CODER.decode(inStream, Context.NESTED); - boolean isLast = INT_CODER.decode(inStream, context) != 0; - return new CategoryPrice(category, price, isLast); - } - }; - - @JsonProperty - public final long category; - - /** Price in cents. */ - @JsonProperty - public final long price; - - @JsonProperty - public final boolean isLast; - - // For Avro only. - @SuppressWarnings("unused") - private CategoryPrice() { - category = 0; - price = 0; - isLast = false; - } - - public CategoryPrice(long category, long price, boolean isLast) { - this.category = category; - this.price = price; - this.isLast = isLast; - } - - @Override - public long sizeInBytes() { - return 8 + 8 + 1; - } - - @Override - public String toString() { - try { - return NexmarkUtils.MAPPER.writeValueAsString(this); - } catch (JsonProcessingException e) { - throw new RuntimeException(e); - } - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/a7f9f7d0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Done.java ---------------------------------------------------------------------- diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Done.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Done.java deleted file mode 100644 index 3a045f9..0000000 --- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Done.java +++ /dev/null @@ -1,82 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.integration.nexmark; - -import com.fasterxml.jackson.annotation.JsonProperty; -import com.fasterxml.jackson.core.JsonProcessingException; - -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.io.Serializable; - -import org.apache.beam.sdk.coders.AtomicCoder; -import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.coders.CoderException; -import org.apache.beam.sdk.coders.StringUtf8Coder; - -/** - * Result of query 10. - */ -public class Done implements KnownSize, Serializable { - private static final Coder<String> STRING_CODER = StringUtf8Coder.of(); - - public static final Coder<Done> CODER = new AtomicCoder<Done>() { - @Override - public void encode(Done value, OutputStream outStream, - Coder.Context context) - throws CoderException, IOException { - STRING_CODER.encode(value.message, outStream, Context.NESTED); - } - - @Override - public Done decode( - InputStream inStream, Coder.Context context) - throws CoderException, IOException { - String message = STRING_CODER.decode(inStream, Context.NESTED); - return new Done(message); - } - }; - - @JsonProperty - public final String message; - - // For Avro only. - @SuppressWarnings("unused") - public Done() { - message = null; - } - - public Done(String message) { - this.message = message; - } - - @Override - public long sizeInBytes() { - return message.length(); - } - - @Override - public String toString() { - try { - return NexmarkUtils.MAPPER.writeValueAsString(this); - } catch (JsonProcessingException e) { - throw new RuntimeException(e); - } - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/a7f9f7d0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Event.java ---------------------------------------------------------------------- diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Event.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Event.java deleted file mode 100644 index 769cedd..0000000 --- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Event.java +++ /dev/null @@ -1,179 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.integration.nexmark; - -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.io.Serializable; -import javax.annotation.Nullable; - -import org.apache.beam.sdk.coders.AtomicCoder; -import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.coders.CoderException; -import org.apache.beam.sdk.coders.VarIntCoder; - -/** - * An event in the auction system, either a (new) {@link Person}, a (new) {@link Auction}, - * or a {@link Bid}. - */ -public class Event implements KnownSize, Serializable { - private static final Coder<Integer> INT_CODER = VarIntCoder.of(); - - public static final Coder<Event> CODER = new AtomicCoder<Event>() { - @Override - public void encode(Event value, OutputStream outStream, Coder.Context context) - throws CoderException, IOException { - if (value.newPerson != null) { - INT_CODER.encode(0, outStream, Context.NESTED); - Person.CODER.encode(value.newPerson, outStream, Context.NESTED); - } else if (value.newAuction != null) { - INT_CODER.encode(1, outStream, Context.NESTED); - Auction.CODER.encode(value.newAuction, outStream, Context.NESTED); - } else if (value.bid != null) { - INT_CODER.encode(2, outStream, Context.NESTED); - Bid.CODER.encode(value.bid, outStream, Context.NESTED); - } else { - throw new RuntimeException("invalid event"); - } - } - - @Override - public Event decode( - InputStream inStream, Coder.Context context) - throws CoderException, IOException { - int tag = INT_CODER.decode(inStream, context); - if (tag == 0) { - Person person = Person.CODER.decode(inStream, Context.NESTED); - return new Event(person); - } else if (tag == 1) { - Auction auction = Auction.CODER.decode(inStream, Context.NESTED); - return new Event(auction); - } else if (tag == 2) { - Bid bid = Bid.CODER.decode(inStream, Context.NESTED); - return new Event(bid); - } else { - throw new RuntimeException("invalid event encoding"); - } - } - }; - - @Nullable - @org.apache.avro.reflect.Nullable - public final Person newPerson; - - @Nullable - @org.apache.avro.reflect.Nullable - public final Auction newAuction; - - @Nullable - @org.apache.avro.reflect.Nullable - public final Bid bid; - - // For Avro only. - @SuppressWarnings("unused") - private Event() { - newPerson = null; - newAuction = null; - bid = null; - } - - public Event(Person newPerson) { - this.newPerson = newPerson; - newAuction = null; - bid = null; - } - - public Event(Auction newAuction) { - newPerson = null; - this.newAuction = newAuction; - bid = null; - } - - public Event(Bid bid) { - newPerson = null; - newAuction = null; - this.bid = bid; - } - - /** - * Return a copy of event which captures {@code annotation}. - * (Used for debugging). - */ - public Event withAnnotation(String annotation) { - if (newPerson != null) { - return new Event(newPerson.withAnnotation(annotation)); - } else if (newAuction != null) { - return new Event(newAuction.withAnnotation(annotation)); - } else { - return new Event(bid.withAnnotation(annotation)); - } - } - - /** - * Does event have {@code annotation}? (Used for debugging.) - */ - public boolean hasAnnotation(String annotation) { - if (newPerson != null) { - return newPerson.hasAnnotation(annotation); - } else if (newAuction != null) { - return newAuction.hasAnnotation(annotation); - } else { - return bid.hasAnnotation(annotation); - } - } - - /** - * Remove {@code annotation} from event. (Used for debugging.) - */ - public Event withoutAnnotation(String annotation) { - if (newPerson != null) { - return new Event(newPerson.withoutAnnotation(annotation)); - } else if (newAuction != null) { - return new Event(newAuction.withoutAnnotation(annotation)); - } else { - return new Event(bid.withoutAnnotation(annotation)); - } - } - - @Override - public long sizeInBytes() { - if (newPerson != null) { - return 1 + newPerson.sizeInBytes(); - } else if (newAuction != null) { - return 1 + newAuction.sizeInBytes(); - } else if (bid != null) { - return 1 + bid.sizeInBytes(); - } else { - throw new RuntimeException("invalid event"); - } - } - - @Override - public String toString() { - if (newPerson != null) { - return newPerson.toString(); - } else if (newAuction != null) { - return newAuction.toString(); - } else if (bid != null) { - return bid.toString(); - } else { - throw new RuntimeException("invalid event"); - } - } -}
