NexMark Port unit tests, cleanup pom and add license to readme
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/1f08970a Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/1f08970a Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/1f08970a Branch: refs/heads/master Commit: 1f08970a8fdc9c5e5613227031125d9d929ca841 Parents: f0ce31b Author: Mark Shields <[email protected]> Authored: Mon Mar 28 16:25:29 2016 -0700 Committer: Ismaël MejÃa <[email protected]> Committed: Wed Aug 23 19:07:26 2017 +0200 ---------------------------------------------------------------------- integration/java/pom.xml | 288 +++++++ .../integration/nexmark/AbstractSimulator.java | 212 ++++++ .../beam/integration/nexmark/Auction.java | 190 +++++ .../beam/integration/nexmark/AuctionBid.java | 87 +++ .../beam/integration/nexmark/AuctionCount.java | 90 +++ .../beam/integration/nexmark/AuctionPrice.java | 91 +++ .../integration/nexmark/BEAM_ON_FLINK_ON_GCP.md | 282 +++++++ .../apache/beam/integration/nexmark/Bid.java | 178 +++++ .../integration/nexmark/BidsPerSession.java | 89 +++ .../integration/nexmark/BoundedEventSource.java | 197 +++++ .../beam/integration/nexmark/CategoryPrice.java | 100 +++ .../apache/beam/integration/nexmark/Done.java | 83 +++ .../apache/beam/integration/nexmark/Event.java | 181 +++++ .../beam/integration/nexmark/Generator.java | 590 +++++++++++++++ .../integration/nexmark/GeneratorConfig.java | 295 ++++++++ .../beam/integration/nexmark/IdNameReserve.java | 100 +++ .../beam/integration/nexmark/KnownSize.java | 27 + .../beam/integration/nexmark/Monitor.java | 102 +++ .../integration/nexmark/NameCityStateId.java | 106 +++ .../nexmark/NexmarkConfiguration.java | 662 ++++++++++++++++ .../beam/integration/nexmark/NexmarkDriver.java | 297 ++++++++ .../integration/nexmark/NexmarkFlinkDriver.java | 49 ++ .../integration/nexmark/NexmarkFlinkRunner.java | 67 ++ .../nexmark/NexmarkGoogleDriver.java | 90 +++ .../nexmark/NexmarkGoogleRunner.java | 660 ++++++++++++++++ .../nexmark/NexmarkInProcessDriver.java | 48 ++ .../nexmark/NexmarkInProcessRunner.java | 77 ++ .../beam/integration/nexmark/NexmarkPerf.java | 212 ++++++ .../beam/integration/nexmark/NexmarkQuery.java | 276 +++++++ .../integration/nexmark/NexmarkQueryModel.java | 123 +++ .../beam/integration/nexmark/NexmarkRunner.java | 746 +++++++++++++++++++ .../beam/integration/nexmark/NexmarkSuite.java | 112 +++ .../beam/integration/nexmark/NexmarkUtils.java | 681 +++++++++++++++++ .../beam/integration/nexmark/Options.java | 360 +++++++++ .../apache/beam/integration/nexmark/Person.java | 166 +++++ .../beam/integration/nexmark/PubsubHelper.java | 217 ++++++ .../apache/beam/integration/nexmark/Query0.java | 72 ++ .../beam/integration/nexmark/Query0Model.java | 62 ++ .../apache/beam/integration/nexmark/Query1.java | 64 ++ .../beam/integration/nexmark/Query10.java | 378 ++++++++++ .../beam/integration/nexmark/Query11.java | 76 ++ .../beam/integration/nexmark/Query12.java | 79 ++ .../beam/integration/nexmark/Query1Model.java | 73 ++ .../apache/beam/integration/nexmark/Query2.java | 75 ++ .../beam/integration/nexmark/Query2Model.java | 76 ++ .../apache/beam/integration/nexmark/Query3.java | 248 ++++++ .../beam/integration/nexmark/Query3Model.java | 119 +++ .../apache/beam/integration/nexmark/Query4.java | 110 +++ .../beam/integration/nexmark/Query4Model.java | 181 +++++ .../apache/beam/integration/nexmark/Query5.java | 127 ++++ .../beam/integration/nexmark/Query5Model.java | 174 +++++ .../apache/beam/integration/nexmark/Query6.java | 154 ++++ .../beam/integration/nexmark/Query6Model.java | 128 ++++ .../apache/beam/integration/nexmark/Query7.java | 87 +++ .../beam/integration/nexmark/Query7Model.java | 128 ++++ .../apache/beam/integration/nexmark/Query8.java | 92 +++ .../beam/integration/nexmark/Query8Model.java | 145 ++++ .../apache/beam/integration/nexmark/Query9.java | 40 + .../beam/integration/nexmark/Query9Model.java | 44 ++ .../apache/beam/integration/nexmark/README.md | 166 +++++ .../beam/integration/nexmark/SellerPrice.java | 91 +++ .../nexmark/UnboundedEventSource.java | 329 ++++++++ .../beam/integration/nexmark/WinningBids.java | 378 ++++++++++ .../nexmark/WinningBidsSimulator.java | 203 +++++ .../nexmark/BoundedEventSourceTest.java | 71 ++ .../beam/integration/nexmark/GeneratorTest.java | 111 +++ .../beam/integration/nexmark/QueryTest.java | 103 +++ .../nexmark/UnboundedEventSourceTest.java | 109 +++ 68 files changed, 12424 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/1f08970a/integration/java/pom.xml ---------------------------------------------------------------------- diff --git a/integration/java/pom.xml b/integration/java/pom.xml new file mode 100644 index 0000000..b160b56 --- /dev/null +++ b/integration/java/pom.xml @@ -0,0 +1,288 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + + <parent> + <groupId>org.apache.beam</groupId> + <artifactId>parent</artifactId> + <version>0.2.0-incubating-SNAPSHOT</version> + <relativePath>../../pom.xml</relativePath> + </parent> + + <artifactId>java-integration-all</artifactId> + + <name>Apache Beam :: Integration Tests :: Java All</name> + + <packaging>jar</packaging> + + <properties> + <skipITs>true</skipITs> + </properties> + + <build> + <extensions> + <!-- Use os-maven-plugin to initialize the "os.detected" properties --> + <extension> + <groupId>kr.motd.maven</groupId> + <artifactId>os-maven-plugin</artifactId> + <version>1.4.0.Final</version> + </extension> + </extensions> + + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-compiler-plugin</artifactId> + </plugin> + + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-surefire-plugin</artifactId> + <configuration> + <systemPropertyVariables> + <beamTestPipelineOptions> + </beamTestPipelineOptions> + </systemPropertyVariables> + </configuration> + </plugin> + + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-dependency-plugin</artifactId> + </plugin> + + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-checkstyle-plugin</artifactId> + </plugin> + + + <!-- Source plugin for generating source and test-source JARs. --> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-source-plugin</artifactId> + </plugin> + + + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-shade-plugin</artifactId> + <executions> + <execution> + <phase>package</phase> + <goals> + <goal>shade</goal> + </goals> + <configuration> + <finalName>${project.artifactId}-bundled-${project.version}</finalName> + <artifactSet> + <includes> + <include>*:*</include> + </includes> + </artifactSet> + <filters> + <filter> + <artifact>*:*</artifact> + <excludes> + <exclude>META-INF/*.SF</exclude> + <exclude>META-INF/*.DSA</exclude> + <exclude>META-INF/*.RSA</exclude> + </excludes> + </filter> + </filters> + </configuration> + </execution> + </executions> + </plugin> + + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-jar-plugin</artifactId> + </plugin> + + <!-- Avro plugin for automatic code generation --> + <plugin> + <groupId>org.apache.avro</groupId> + <artifactId>avro-maven-plugin</artifactId> + <version>${avro.version}</version> + <executions> + <execution> + <id>schemas</id> + <phase>generate-sources</phase> + <goals> + <goal>schema</goal> + </goals> + <configuration> + <sourceDirectory>${project.basedir}/src/main/</sourceDirectory> + <outputDirectory>${project.build.directory}/generated-sources/java</outputDirectory> + </configuration> + </execution> + </executions> + </plugin> + + <!-- Coverage analysis for unit tests. --> + <plugin> + <groupId>org.jacoco</groupId> + <artifactId>jacoco-maven-plugin</artifactId> + </plugin> + </plugins> + </build> + + <dependencies> + <!-- Java SDK --> + <dependency> + <groupId>org.apache.beam</groupId> + <artifactId>java-sdk-all</artifactId> + </dependency> + + <!-- Java runner for Google Cloud Dataflow --> + <dependency> + <groupId>org.apache.beam.runners</groupId> + <artifactId>google-cloud-dataflow-java</artifactId> + <version>${project.version}</version> + </dependency> + + <!-- Direct runner --> + <dependency> + <groupId>org.apache.beam.runners</groupId> + <artifactId>direct</artifactId> + <version>${project.version}</version> + </dependency> + + <!-- Java runner for Flink --> + <dependency> + <groupId>org.apache.beam.runners</groupId> + <artifactId>flink_2.10</artifactId> + <version>${project.version}</version> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-shaded-hadoop2</artifactId> + <version>1.0.3</version> + <scope>provided</scope> + </dependency> + + <!-- Extra libraries --> + <dependency> + <groupId>com.google.apis</groupId> + <artifactId>google-api-services-dataflow</artifactId> + <version>${dataflow.version}</version> + </dependency> + + <dependency> + <groupId>joda-time</groupId> + <artifactId>joda-time</artifactId> + </dependency> + + <dependency> + <groupId>com.fasterxml.jackson.core</groupId> + <artifactId>jackson-core</artifactId> + <version>${jackson.version}</version> + </dependency> + + <dependency> + <groupId>com.google.apis</groupId> + <artifactId>google-api-services-bigquery</artifactId> + <version>${bigquery.version}</version> + </dependency> + + <dependency> + <groupId>com.google.cloud.bigdataoss</groupId> + <artifactId>gcsio</artifactId> + <version>${google-cloud-bigdataoss.version}</version> + </dependency> + + <dependency> + <groupId>com.fasterxml.jackson.core</groupId> + <artifactId>jackson-annotations</artifactId> + <version>${jackson.version}</version> + </dependency> + + <dependency> + <groupId>com.fasterxml.jackson.core</groupId> + <artifactId>jackson-databind</artifactId> + <version>${jackson.version}</version> + </dependency> + + <dependency> + <groupId>org.apache.avro</groupId> + <artifactId>avro</artifactId> + <version>${avro.version}</version> + </dependency> + + <dependency> + <groupId>com.google.guava</groupId> + <artifactId>guava</artifactId> + <version>${guava.version}</version> + </dependency> + + <dependency> + <groupId>org.hamcrest</groupId> + <artifactId>hamcrest-all</artifactId> + <version>${hamcrest.version}</version> + </dependency> + + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-api</artifactId> + <version>${slf4j.version}</version> + <scope>compile</scope> + </dependency> + + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-jdk14</artifactId> + <version>${slf4j.version}</version> + <!-- When loaded at runtime this will wire up slf4j to the JUL backend --> + <scope>runtime</scope> + </dependency> + + <dependency> + <groupId>com.google.code.findbugs</groupId> + <artifactId>jsr305</artifactId> + <version>${jsr305.version}</version> + <scope>runtime</scope> + </dependency> + + <dependency> + <groupId>log4j</groupId> + <artifactId>log4j</artifactId> + <version>1.2.17</version> + <scope>runtime</scope> + </dependency> + + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + <version>${junit.version}</version> + <scope>compile</scope> + </dependency> + + <dependency> + <groupId>io.netty</groupId> + <artifactId>netty-tcnative-boringssl-static</artifactId> + <version>1.1.33.Fork13</version> + <classifier>${os.detected.classifier}</classifier> + <scope>runtime</scope> + </dependency> + </dependencies> +</project> http://git-wip-us.apache.org/repos/asf/beam/blob/1f08970a/integration/java/src/main/java/org/apache/beam/integration/nexmark/AbstractSimulator.java ---------------------------------------------------------------------- diff --git a/integration/java/src/main/java/org/apache/beam/integration/nexmark/AbstractSimulator.java b/integration/java/src/main/java/org/apache/beam/integration/nexmark/AbstractSimulator.java new file mode 100644 index 0000000..6473c35 --- /dev/null +++ b/integration/java/src/main/java/org/apache/beam/integration/nexmark/AbstractSimulator.java @@ -0,0 +1,212 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.integration.nexmark; + +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.values.TimestampedValue; + +import org.joda.time.Duration; +import org.joda.time.Instant; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; + +import javax.annotation.Nullable; + +/** + * Abstract base class for simulator of a query. + * + * @param <InputT> Type of input elements. + * @param <OutputT> Type of output elements. + */ +abstract class AbstractSimulator<InputT, OutputT> { + /** Window size for action bucket sampling. */ + public static final Duration WINDOW_SIZE = Duration.standardMinutes(1); + + /** Input event stream we should draw from. */ + private final Iterator<TimestampedValue<InputT>> input; + + /** Set to true when no more results. */ + private boolean isDone; + + /** + * Results which have not yet been returned by the {@link #results} iterator. + */ + private final List<TimestampedValue<OutputT>> pendingResults; + + /** + * Current window timestamp (ms since epoch). + */ + private long currentWindow; + + /** + * Number of (possibly intermediate) results for the current window. + */ + private long currentCount; + + /** + * Result counts per window which have not yet been returned by the {@link #resultsPerWindow} + * iterator. + */ + private final List<Long> pendingCounts; + + public AbstractSimulator(Iterator<TimestampedValue<InputT>> input) { + this.input = input; + isDone = false; + pendingResults = new ArrayList<>(); + currentWindow = BoundedWindow.TIMESTAMP_MIN_VALUE.getMillis(); + currentCount = 0; + pendingCounts = new ArrayList<>(); + } + + /** Called by implementors of {@link #run}: Fetch the next input element. */ + @Nullable + protected TimestampedValue<InputT> nextInput() { + if (!input.hasNext()) { + return null; + } + TimestampedValue<InputT> timestampedInput = input.next(); + NexmarkUtils.info("input: %s", timestampedInput); + return timestampedInput; + } + + /** + * Called by implementors of {@link #run}: Capture an intermediate result, for the purpose of + * recording the expected activity of the query over time. + */ + protected void addIntermediateResult(TimestampedValue<OutputT> result) { + NexmarkUtils.info("intermediate result: %s", result); + updateCounts(result.getTimestamp()); + } + + /** + * Called by implementors of {@link #run}: Capture a final result, for the purpose of checking + * semantic correctness. + */ + protected void addResult(TimestampedValue<OutputT> result) { + NexmarkUtils.info("result: %s", result); + pendingResults.add(result); + updateCounts(result.getTimestamp()); + } + + /** + * Update window and counts. + */ + private void updateCounts(Instant timestamp) { + long window = timestamp.getMillis() - timestamp.getMillis() % WINDOW_SIZE.getMillis(); + if (window > currentWindow) { + if (currentWindow > BoundedWindow.TIMESTAMP_MIN_VALUE.getMillis()) { + pendingCounts.add(currentCount); + } + currentCount = 0; + currentWindow = window; + } + currentCount++; + } + + /** Called by implementors of {@link #run}: Record that no more results will be emitted. */ + protected void allDone() { + isDone = true; + } + + /** + * Overridden by derived classes to do the next increment of work. Each call should + * call one or more of {@link #nextInput}, {@link #addIntermediateResult}, {@link #addResult} + * or {@link #allDone}. It is ok for a single call to emit more than one result via + * {@link #addResult}. It is ok for a single call to run the entire simulation, though + * this will prevent the {@link #results} and {@link #resultsPerWindow} iterators to + * stall. + */ + protected abstract void run(); + + /** + * Return iterator over all expected timestamped results. The underlying simulator state is + * changed. Only one of {@link #results} or {@link #resultsPerWindow} can be called. + */ + public Iterator<TimestampedValue<OutputT>> results() { + return new Iterator<TimestampedValue<OutputT>>() { + @Override + public boolean hasNext() { + while (true) { + if (!pendingResults.isEmpty()) { + return true; + } + if (isDone) { + return false; + } + run(); + } + } + + @Override + public TimestampedValue<OutputT> next() { + TimestampedValue<OutputT> result = pendingResults.get(0); + pendingResults.remove(0); + return result; + } + + @Override + public void remove() { + throw new UnsupportedOperationException(); + } + }; + } + + /** + * Return an iterator over the number of results per {@link #WINDOW_SIZE} period. The underlying + * simulator state is changed. Only one of {@link #results} or {@link #resultsPerWindow} can be + * called. + */ + public Iterator<Long> resultsPerWindow() { + return new Iterator<Long>() { + @Override + public boolean hasNext() { + while (true) { + if (!pendingCounts.isEmpty()) { + return true; + } + if (isDone) { + if (currentCount > 0) { + pendingCounts.add(currentCount); + currentCount = 0; + currentWindow = BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis(); + return true; + } else { + return false; + } + } + run(); + } + } + + @Override + public Long next() { + Long result = pendingCounts.get(0); + pendingCounts.remove(0); + return result; + } + + @Override + public void remove() { + throw new UnsupportedOperationException(); + } + }; + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/1f08970a/integration/java/src/main/java/org/apache/beam/integration/nexmark/Auction.java ---------------------------------------------------------------------- diff --git a/integration/java/src/main/java/org/apache/beam/integration/nexmark/Auction.java b/integration/java/src/main/java/org/apache/beam/integration/nexmark/Auction.java new file mode 100644 index 0000000..94f2647 --- /dev/null +++ b/integration/java/src/main/java/org/apache/beam/integration/nexmark/Auction.java @@ -0,0 +1,190 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.integration.nexmark; + +import org.apache.beam.sdk.coders.AtomicCoder; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.CoderException; +import org.apache.beam.sdk.coders.StringUtf8Coder; +import org.apache.beam.sdk.coders.VarLongCoder; + +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.core.JsonProcessingException; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.io.Serializable; + +/** + * An auction submitted by a person. + */ +public class Auction implements KnownSize, Serializable { + private static final Coder<Long> LONG_CODER = VarLongCoder.of(); + private static final Coder<String> STRING_CODER = StringUtf8Coder.of(); + + public static final Coder<Auction> CODER = new AtomicCoder<Auction>() { + @Override + public void encode(Auction value, OutputStream outStream, + Coder.Context context) + throws CoderException, IOException { + LONG_CODER.encode(value.id, outStream, Context.NESTED); + STRING_CODER.encode(value.itemName, outStream, Context.NESTED); + STRING_CODER.encode(value.description, outStream, Context.NESTED); + LONG_CODER.encode(value.initialBid, outStream, Context.NESTED); + LONG_CODER.encode(value.reserve, outStream, Context.NESTED); + LONG_CODER.encode(value.dateTime, outStream, Context.NESTED); + LONG_CODER.encode(value.expires, outStream, Context.NESTED); + LONG_CODER.encode(value.seller, outStream, Context.NESTED); + LONG_CODER.encode(value.category, outStream, Context.NESTED); + STRING_CODER.encode(value.extra, outStream, Context.NESTED); + } + + @Override + public Auction decode( + InputStream inStream, Coder.Context context) + throws CoderException, IOException { + long id = LONG_CODER.decode(inStream, Context.NESTED); + String itemName = STRING_CODER.decode(inStream, Context.NESTED); + String description = STRING_CODER.decode(inStream, Context.NESTED); + long initialBid = LONG_CODER.decode(inStream, Context.NESTED); + long reserve = LONG_CODER.decode(inStream, Context.NESTED); + long dateTime = LONG_CODER.decode(inStream, Context.NESTED); + long expires = LONG_CODER.decode(inStream, Context.NESTED); + long seller = LONG_CODER.decode(inStream, Context.NESTED); + long category = LONG_CODER.decode(inStream, Context.NESTED); + String extra = STRING_CODER.decode(inStream, Context.NESTED); + return new Auction( + id, itemName, description, initialBid, reserve, dateTime, expires, seller, category, + extra); + } + }; + + + /** Id of auction. */ + @JsonProperty + public final long id; // primary key + + /** Extra auction properties. */ + @JsonProperty + public final String itemName; + + @JsonProperty + public final String description; + + /** Initial bid price, in cents. */ + @JsonProperty + public final long initialBid; + + /** Reserve price, in cents. */ + @JsonProperty + public final long reserve; + + @JsonProperty + public final long dateTime; + + /** When does auction expire? (ms since epoch). Bids at or after this time are ignored. */ + @JsonProperty + public final long expires; + + /** Id of person who instigated auction. */ + @JsonProperty + public final long seller; // foreign key: Person.id + + /** Id of category auction is listed under. */ + @JsonProperty + public final long category; // foreign key: Category.id + + /** Additional arbitrary payload for performance testing. */ + @JsonProperty + public final String extra; + + + // For Avro only. + @SuppressWarnings("unused") + private Auction() { + id = 0; + itemName = null; + description = null; + initialBid = 0; + reserve = 0; + dateTime = 0; + expires = 0; + seller = 0; + category = 0; + extra = null; + } + + public Auction(long id, String itemName, String description, long initialBid, long reserve, + long dateTime, long expires, long seller, long category, String extra) { + this.id = id; + this.itemName = itemName; + this.description = description; + this.initialBid = initialBid; + this.reserve = reserve; + this.dateTime = dateTime; + this.expires = expires; + this.seller = seller; + this.category = category; + this.extra = extra; + } + + /** + * Return a copy of auction which capture the given annotation. + * (Used for debugging). + */ + public Auction withAnnotation(String annotation) { + return new Auction(id, itemName, description, initialBid, reserve, dateTime, expires, seller, + category, annotation + ": " + extra); + } + + /** + * Does auction have {@code annotation}? (Used for debugging.) + */ + public boolean hasAnnotation(String annotation) { + return extra.startsWith(annotation + ": "); + } + + /** + * Remove {@code annotation} from auction. (Used for debugging.) + */ + public Auction withoutAnnotation(String annotation) { + if (hasAnnotation(annotation)) { + return new Auction(id, itemName, description, initialBid, reserve, dateTime, expires, seller, + category, extra.substring(annotation.length() + 2)); + } else { + return this; + } + } + + @Override + public long sizeInBytes() { + return 8 + itemName.length() + 1 + description.length() + 1 + 8 + 8 + 8 + 8 + 8 + 8 + + extra.length() + 1; + } + + @Override + public String toString() { + try { + return NexmarkUtils.MAPPER.writeValueAsString(this); + } catch (JsonProcessingException e) { + throw new RuntimeException(e); + } + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/1f08970a/integration/java/src/main/java/org/apache/beam/integration/nexmark/AuctionBid.java ---------------------------------------------------------------------- diff --git a/integration/java/src/main/java/org/apache/beam/integration/nexmark/AuctionBid.java b/integration/java/src/main/java/org/apache/beam/integration/nexmark/AuctionBid.java new file mode 100644 index 0000000..8c3697a --- /dev/null +++ b/integration/java/src/main/java/org/apache/beam/integration/nexmark/AuctionBid.java @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.integration.nexmark; + +import org.apache.beam.sdk.coders.AtomicCoder; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.CoderException; + +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.core.JsonProcessingException; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.io.Serializable; + +/** + * Result of {@link WinningBids} transform. + */ +public class AuctionBid implements KnownSize, Serializable { + public static final Coder<AuctionBid> CODER = new AtomicCoder<AuctionBid>() { + @Override + public void encode(AuctionBid value, OutputStream outStream, + Coder.Context context) + throws CoderException, IOException { + Auction.CODER.encode(value.auction, outStream, Context.NESTED); + Bid.CODER.encode(value.bid, outStream, Context.NESTED); + } + + @Override + public AuctionBid decode( + InputStream inStream, Coder.Context context) + throws CoderException, IOException { + Auction auction = Auction.CODER.decode(inStream, Context.NESTED); + Bid bid = Bid.CODER.decode(inStream, Context.NESTED); + return new AuctionBid(auction, bid); + } + }; + + @JsonProperty + public final Auction auction; + + @JsonProperty + public final Bid bid; + + // For Avro only. + @SuppressWarnings("unused") + private AuctionBid() { + auction = null; + bid = null; + } + + public AuctionBid(Auction auction, Bid bid) { + this.auction = auction; + this.bid = bid; + } + + @Override + public long sizeInBytes() { + return auction.sizeInBytes() + bid.sizeInBytes(); + } + + @Override + public String toString() { + try { + return NexmarkUtils.MAPPER.writeValueAsString(this); + } catch (JsonProcessingException e) { + throw new RuntimeException(e); + } + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/1f08970a/integration/java/src/main/java/org/apache/beam/integration/nexmark/AuctionCount.java ---------------------------------------------------------------------- diff --git a/integration/java/src/main/java/org/apache/beam/integration/nexmark/AuctionCount.java b/integration/java/src/main/java/org/apache/beam/integration/nexmark/AuctionCount.java new file mode 100644 index 0000000..a0fbebc --- /dev/null +++ b/integration/java/src/main/java/org/apache/beam/integration/nexmark/AuctionCount.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.integration.nexmark; + +import org.apache.beam.sdk.coders.AtomicCoder; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.CoderException; +import org.apache.beam.sdk.coders.VarLongCoder; + +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.core.JsonProcessingException; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.io.Serializable; + +/** + * Result of {@link Query5}. + */ +public class AuctionCount implements KnownSize, Serializable { + private static final Coder<Long> LONG_CODER = VarLongCoder.of(); + + public static final Coder<AuctionCount> CODER = new AtomicCoder<AuctionCount>() { + @Override + public void encode(AuctionCount value, OutputStream outStream, + Coder.Context context) + throws CoderException, IOException { + LONG_CODER.encode(value.auction, outStream, Context.NESTED); + LONG_CODER.encode(value.count, outStream, Context.NESTED); + } + + @Override + public AuctionCount decode( + InputStream inStream, Coder.Context context) + throws CoderException, IOException { + long auction = LONG_CODER.decode(inStream, Context.NESTED); + long count = LONG_CODER.decode(inStream, Context.NESTED); + return new AuctionCount(auction, count); + } + }; + + @JsonProperty + public final long auction; + + @JsonProperty + public final long count; + + // For Avro only. + @SuppressWarnings("unused") + private AuctionCount() { + auction = 0; + count = 0; + } + + public AuctionCount(long auction, long count) { + this.auction = auction; + this.count = count; + } + + @Override + public long sizeInBytes() { + return 8 + 8; + } + + @Override + public String toString() { + try { + return NexmarkUtils.MAPPER.writeValueAsString(this); + } catch (JsonProcessingException e) { + throw new RuntimeException(e); + } + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/1f08970a/integration/java/src/main/java/org/apache/beam/integration/nexmark/AuctionPrice.java ---------------------------------------------------------------------- diff --git a/integration/java/src/main/java/org/apache/beam/integration/nexmark/AuctionPrice.java b/integration/java/src/main/java/org/apache/beam/integration/nexmark/AuctionPrice.java new file mode 100644 index 0000000..4f25a9b --- /dev/null +++ b/integration/java/src/main/java/org/apache/beam/integration/nexmark/AuctionPrice.java @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.integration.nexmark; + +import org.apache.beam.sdk.coders.AtomicCoder; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.CoderException; +import org.apache.beam.sdk.coders.VarLongCoder; + +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.core.JsonProcessingException; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.io.Serializable; + +/** + * Result of {@link Query2}. + */ +public class AuctionPrice implements KnownSize, Serializable { + private static final Coder<Long> LONG_CODER = VarLongCoder.of(); + + public static final Coder<AuctionPrice> CODER = new AtomicCoder<AuctionPrice>() { + @Override + public void encode(AuctionPrice value, OutputStream outStream, + Coder.Context context) + throws CoderException, IOException { + LONG_CODER.encode(value.auction, outStream, Context.NESTED); + LONG_CODER.encode(value.price, outStream, Context.NESTED); + } + + @Override + public AuctionPrice decode( + InputStream inStream, Coder.Context context) + throws CoderException, IOException { + long auction = LONG_CODER.decode(inStream, Context.NESTED); + long price = LONG_CODER.decode(inStream, Context.NESTED); + return new AuctionPrice(auction, price); + } + }; + + @JsonProperty + public final long auction; + + /** Price in cents. */ + @JsonProperty + public final long price; + + // For Avro only. + @SuppressWarnings("unused") + private AuctionPrice() { + auction = 0; + price = 0; + } + + public AuctionPrice(long auction, long price) { + this.auction = auction; + this.price = price; + } + + @Override + public long sizeInBytes() { + return 8 + 8; + } + + @Override + public String toString() { + try { + return NexmarkUtils.MAPPER.writeValueAsString(this); + } catch (JsonProcessingException e) { + throw new RuntimeException(e); + } + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/1f08970a/integration/java/src/main/java/org/apache/beam/integration/nexmark/BEAM_ON_FLINK_ON_GCP.md ---------------------------------------------------------------------- diff --git a/integration/java/src/main/java/org/apache/beam/integration/nexmark/BEAM_ON_FLINK_ON_GCP.md b/integration/java/src/main/java/org/apache/beam/integration/nexmark/BEAM_ON_FLINK_ON_GCP.md new file mode 100644 index 0000000..d1b51e8 --- /dev/null +++ b/integration/java/src/main/java/org/apache/beam/integration/nexmark/BEAM_ON_FLINK_ON_GCP.md @@ -0,0 +1,282 @@ +<!-- + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. +--> + +# Running NexMark on Beam on Flink on Google Compute Platform + +Here's how to create a cluster of VMs on Google Compute Platform, deploy +Flink to them, and invoke a NexMark pipeline using the Beam-on-Flink +runner. + +These instructions are somewhat baroque and I hope they can be +simplified over time. + +## Prerequisites + +You'll need: + +* the Google Cloud SDK +* a clone of the Beam repository +* a Flink binary distribution +* a project on Google Compute Platform. + +## Establish the shell environment + +``` +# Beam root +BEAM=<path to Beam source directory> +# Flink root +FLINK_VER=flink-1.0.3 +FLINK=<path to Flink distribution directory> +# Google Cloud project +PROJECT=<your project id> +# Google Cloud zone +ZONE=<your project zone> +# Cloud commands +GCLOUD=<path to gcloud command> +GSUTIL=<path to gsutil command> +``` + +## Establish VM names for Flink master and workers + +``` +MASTER=flink-m +NUM_WORKERS=5 +WORKERS="" +for (( i = 0; i < $NUM_WORKERS; i++ )); do + WORKERS="$WORKERS flink-w-$i" +done +ALL="$MASTER $WORKERS" +``` + +## Build Beam + +``` +( cd $BEAM && mvn clean install ) +``` + +## Bring up the cluster + +Establish project defaults and authenticate: +``` +$GCLOUD init +$GCLOUD auth login +``` + +Build Google Cloud Dataproc cluster: +``` +$GCLOUD beta dataproc clusters create \ + --project=$PROJECT \ + --zone=$ZONE \ + --bucket=nexmark \ + --scopes=cloud-platform \ + --num-workers=$NUM_WORKERS \ + --image-version=preview \ + flink +``` + +Force google_compute_engine ssh keys to be generated locally: +``` +$GCLOUD compute ssh \ + --project=$PROJECT \ + --zone=$ZONE \ + $MASTER \ + --command 'exit' +``` + +Open ports on the VMs: +``` +$GCLOUD compute firewall-rules create allow-monitoring --allow tcp:8080-8081 +$GCLOUD compute firewall-rules create allow-debug --allow tcp:5555 +``` + +Establish keys on master and workers +**CAUTION:** This will leave your private key on your master VM. +Better would be to create a key just for inter-worker ssh. +``` +for m in $ALL; do + echo "*** $m ***" + $GCLOUD beta compute scp \ + --project=$PROJECT \ + --zone=$ZONE \ + ~/.ssh/google_compute_engine.pub $m:~/.ssh/ +done +$GCLOUD beta compute scp \ + --project=$PROJECT \ + --zone=$ZONE \ + ~/.ssh/google_compute_engine $MASTER:~/.ssh/ +``` + +Collect IP addresses for workers: +``` +MASTER_EXT_IP=$($GCLOUD compute instances describe \ + --project=$PROJECT \ + --zone=$ZONE \ + $MASTER | grep natIP: | sed 's/[ ]*natIP:[ ]*//') +MASTER_IP=$($GCLOUD compute instances describe \ + --project=$PROJECT \ + --zone=$ZONE \ + $MASTER | grep networkIP: | sed 's/[ ]*networkIP:[ ]*//') +WORKER_IPS="" +for m in $WORKERS; do + echo "*** $m ***" + WORKER_IP=$($GCLOUD compute instances describe \ + --project=$PROJECT \ + --zone=$ZONE \ + $m | grep networkIP: | sed 's/[ ]*networkIP:[ ]*//') + WORKER_IPS="$WORKER_IPS $WORKER_IP" +done +``` + +Configure Flink: +``` +cat $FLINK/conf/flink-conf.yaml \ + | sed "s|.*\(jobmanager.rpc.address\):.*|\1: $MASTER_IP|g" \ + | sed "s|.*\(jobmanager.heap.mb\):.*|\1: 4096|g" \ + | sed "s|.*\(taskmanager.heap.mb\):.*|\1: 8192|g" \ + | sed "s|.*\(parallelism.default\):.*|\1: $(($NUM_WORKERS * 4))|g" \ + | sed "s|.*\(fs.hdfs.hadoopconf\):.*|\1: /etc/hadoop/conf|g" \ + | sed "s|.*\(taskmanager.numberOfTaskSlots\):.*|\1: 4|g" \ + | sed "s|.*\(jobmanager.web.submit.enable\):.*|\1: false|g" \ + | sed "s|.*\(env.ssh.opts\):.*||g" \ + > ~/flink-conf.yaml +cat $FLINK/conf/log4j.properties \ + | sed "s|.*\(log4j.rootLogger\)=.*|\1=ERROR, file|g" \ + > ~/log4j.properties +echo "env.ssh.opts: -i /home/$USER/.ssh/google_compute_engine -o StrictHostKeyChecking=no" >> ~/flink-conf.yaml +echo "$MASTER_IP:8081" > ~/masters +echo -n > ~/slaves +for ip in $WORKER_IPS; do + echo $ip >> ~/slaves +done +cp -f \ + ~/flink-conf.yaml \ + ~/masters ~/slaves \ + ~/log4j.properties \ + $FLINK/conf/ +``` + +Package configured Flink for distribution to workers: +``` +( cd ~/ && tar -cvzf ~/flink.tgz $FLINK/* ) +``` + +Distribute: +``` +$GSUTIL cp ~/flink.tgz gs://nexmark +for m in $ALL; do + echo "*** $m ***" + $GCLOUD compute ssh \ + --project=$PROJECT \ + --zone=$ZONE \ + $m \ + --command 'gsutil cp gs://nexmark/flink.tgz ~/ && tar -xvzf ~/flink.tgz' +done +``` + +Start the Flink cluster: +``` +$GCLOUD compute ssh \ + --project=$PROJECT \ + --zone=$ZONE \ + $MASTER \ + --command "~/$FLINK_VER/bin/start-cluster.sh" +``` + +Bring up the Flink monitoring UI: +``` +/usr/bin/google-chrome $MASTER_EXT_IP:8081 & +``` + +## Run NexMark + +Distribute the Beam + NexMark jar to all workers: +``` +$GSUTIL cp $BEAM/integration/java/target/java-integration-all-bundled-0.2.0-incubating-SNAPSHOT.jar gs://nexmark +for m in $ALL; do + echo "*** $m ***" + $GCLOUD compute ssh \ + --project=$PROJECT \ + --zone=$ZONE \ + $m \ + --command "gsutil cp gs://nexmark/java-integration-all-bundled-0.2.0-incubating-SNAPSHOT.jar ~/$FLINK_VER/lib/" +done +``` + +Create a Pubsub topic and subscription for testing: +``` +$GCLOUD alpha pubsub \ + --project=$PROJECT \ + topics create flink_test + +$GCLOUD alpha pubsub \ + --project=$PROJECT \ + subscriptions create flink_test \ + --topic flink_test \ + --ack-deadline=60 \ + --topic-project=$PROJECT +``` + +Launch! +**NOTE:** As of flink-1.0.3 this will throw a `NullPointerException` +in `org.apache.beam.sdk.io.PubsubUnboundedSink$WriterFn.startBundle`. +See Jira issue [BEAM-196](https://issues.apache.org/jira/browse/BEAM-196). + +``` +$GCLOUD compute ssh \ + --project=$PROJECT \ + --zone=$ZONE \ + $MASTER \ + --command "~/$FLINK_VER/bin/flink run \ + -c org.apache.beam.integration.nexmark.NexmarkFlinkDriver \ + ~/$FLINK_VER/lib/java-integration-all-bundled-0.2.0-incubating-SNAPSHOT.jar \ + --project=$PROJECT \ + --streaming=true \ + --query=0 \ + --sourceType=PUBSUB \ + --pubSubMode=COMBINED \ + --pubsubTopic=flink_test \ + --resourceNameMode=VERBATIM \ + --manageResources=false \ + --monitorJobs=false \ + --numEventGenerators=5 \ + --firstEventRate=1000 \ + --nextEventRate=1000 \ + --isRateLimited=true \ + --numEvents=0 \ + --useWallclockEventTime=true \ + --usePubsubPublishTime=true" +``` + +## Teardown the cluster + +Stop the Flink cluster: +``` +$GCLOUD compute ssh \ + --project=$PROJECT \ + --zone=$ZONE \ + $MASTER \ + --command "~/$FLINK_VER/bin/stop-cluster.sh" +``` + +Teardown the Dataproc cluster: +``` +$GCLOUD beta dataproc clusters delete \ + --project=$PROJECT \ + flink +``` http://git-wip-us.apache.org/repos/asf/beam/blob/1f08970a/integration/java/src/main/java/org/apache/beam/integration/nexmark/Bid.java ---------------------------------------------------------------------- diff --git a/integration/java/src/main/java/org/apache/beam/integration/nexmark/Bid.java b/integration/java/src/main/java/org/apache/beam/integration/nexmark/Bid.java new file mode 100644 index 0000000..ce2184b --- /dev/null +++ b/integration/java/src/main/java/org/apache/beam/integration/nexmark/Bid.java @@ -0,0 +1,178 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.integration.nexmark; + +import org.apache.beam.sdk.coders.AtomicCoder; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.CoderException; +import org.apache.beam.sdk.coders.StringUtf8Coder; +import org.apache.beam.sdk.coders.VarLongCoder; + +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.core.JsonProcessingException; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.io.Serializable; +import java.util.Comparator; + +/** + * A bid for an item on auction. + */ +public class Bid implements KnownSize, Serializable { + private static final Coder<Long> LONG_CODER = VarLongCoder.of(); + private static final Coder<String> STRING_CODER = StringUtf8Coder.of(); + + public static final Coder<Bid> CODER = new AtomicCoder<Bid>() { + @Override + public void encode(Bid value, OutputStream outStream, + Coder.Context context) + throws CoderException, IOException { + LONG_CODER.encode(value.auction, outStream, Context.NESTED); + LONG_CODER.encode(value.bidder, outStream, Context.NESTED); + LONG_CODER.encode(value.price, outStream, Context.NESTED); + LONG_CODER.encode(value.dateTime, outStream, Context.NESTED); + STRING_CODER.encode(value.extra, outStream, Context.NESTED); + } + + @Override + public Bid decode( + InputStream inStream, Coder.Context context) + throws CoderException, IOException { + long auction = LONG_CODER.decode(inStream, Context.NESTED); + long bidder = LONG_CODER.decode(inStream, Context.NESTED); + long price = LONG_CODER.decode(inStream, Context.NESTED); + long dateTime = LONG_CODER.decode(inStream, Context.NESTED); + String extra = STRING_CODER.decode(inStream, Context.NESTED); + return new Bid(auction, bidder, price, dateTime, extra); + } + }; + + /** + * Comparator to order bids by ascending price then descending time + * (for finding winning bids). + */ + public static final Comparator<Bid> PRICE_THEN_DESCENDING_TIME = new Comparator<Bid>() { + @Override + public int compare(Bid left, Bid right) { + int i = Double.compare(left.price, right.price); + if (i != 0) { + return i; + } + return Long.compare(right.dateTime, left.dateTime); + } + }; + + /** + * Comparator to order bids by ascending time then ascending price. + * (for finding most recent bids). + */ + public static final Comparator<Bid> ASCENDING_TIME_THEN_PRICE = new Comparator<Bid>() { + @Override + public int compare(Bid left, Bid right) { + int i = Long.compare(left.dateTime, right.dateTime); + if (i != 0) { + return i; + } + return Double.compare(left.price, right.price); + } + }; + + /** Id of auction this bid is for. */ + @JsonProperty + public final long auction; // foreign key: Auction.id + + /** Id of person bidding in auction. */ + @JsonProperty + public final long bidder; // foreign key: Person.id + + /** Price of bid, in cents. */ + @JsonProperty + public final long price; + + /** + * Instant at which bid was made (ms since epoch). + * NOTE: This may be earlier than the system's event time. + */ + @JsonProperty + public final long dateTime; + + /** Additional arbitrary payload for performance testing. */ + @JsonProperty + public final String extra; + + // For Avro only. + @SuppressWarnings("unused") + private Bid() { + auction = 0; + bidder = 0; + price = 0; + dateTime = 0; + extra = null; + } + + public Bid(long auction, long bidder, long price, long dateTime, String extra) { + this.auction = auction; + this.bidder = bidder; + this.price = price; + this.dateTime = dateTime; + this.extra = extra; + } + + /** + * Return a copy of bid which capture the given annotation. + * (Used for debugging). + */ + public Bid withAnnotation(String annotation) { + return new Bid(auction, bidder, price, dateTime, annotation + ": " + extra); + } + + /** + * Does bid have {@code annotation}? (Used for debugging.) + */ + public boolean hasAnnotation(String annotation) { + return extra.startsWith(annotation + ": "); + } + + /** + * Remove {@code annotation} from bid. (Used for debugging.) + */ + public Bid withoutAnnotation(String annotation) { + if (hasAnnotation(annotation)) { + return new Bid(auction, bidder, price, dateTime, extra.substring(annotation.length() + 2)); + } else { + return this; + } + } + + @Override + public long sizeInBytes() { + return 8 + 8 + 8 + 8 + extra.length() + 1; + } + + @Override + public String toString() { + try { + return NexmarkUtils.MAPPER.writeValueAsString(this); + } catch (JsonProcessingException e) { + throw new RuntimeException(e); + } + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/1f08970a/integration/java/src/main/java/org/apache/beam/integration/nexmark/BidsPerSession.java ---------------------------------------------------------------------- diff --git a/integration/java/src/main/java/org/apache/beam/integration/nexmark/BidsPerSession.java b/integration/java/src/main/java/org/apache/beam/integration/nexmark/BidsPerSession.java new file mode 100644 index 0000000..cfdd170 --- /dev/null +++ b/integration/java/src/main/java/org/apache/beam/integration/nexmark/BidsPerSession.java @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.integration.nexmark; + +import org.apache.beam.sdk.coders.AtomicCoder; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.CoderException; +import org.apache.beam.sdk.coders.VarLongCoder; + +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.core.JsonProcessingException; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.io.Serializable; + +/** + * Result of query 11. + */ +public class BidsPerSession implements KnownSize, Serializable { + private static final Coder<Long> LONG_CODER = VarLongCoder.of(); + + public static final Coder<BidsPerSession> CODER = new AtomicCoder<BidsPerSession>() { + @Override + public void encode(BidsPerSession value, OutputStream outStream, + Coder.Context context) + throws CoderException, IOException { + LONG_CODER.encode(value.personId, outStream, Context.NESTED); + LONG_CODER.encode(value.bidsPerSession, outStream, Context.NESTED); + } + + @Override + public BidsPerSession decode( + InputStream inStream, Coder.Context context) + throws CoderException, IOException { + long personId = LONG_CODER.decode(inStream, Context.NESTED); + long bidsPerSession = LONG_CODER.decode(inStream, Context.NESTED); + return new BidsPerSession(personId, bidsPerSession); + } + }; + + @JsonProperty + public final long personId; + + @JsonProperty + public final long bidsPerSession; + + public BidsPerSession() { + personId = 0; + bidsPerSession = 0; + } + + public BidsPerSession(long personId, long bidsPerSession) { + this.personId = personId; + this.bidsPerSession = bidsPerSession; + } + + @Override + public long sizeInBytes() { + // Two longs. + return 8 + 8; + } + + @Override + public String toString() { + try { + return NexmarkUtils.MAPPER.writeValueAsString(this); + } catch (JsonProcessingException e) { + throw new RuntimeException(e); + } + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/1f08970a/integration/java/src/main/java/org/apache/beam/integration/nexmark/BoundedEventSource.java ---------------------------------------------------------------------- diff --git a/integration/java/src/main/java/org/apache/beam/integration/nexmark/BoundedEventSource.java b/integration/java/src/main/java/org/apache/beam/integration/nexmark/BoundedEventSource.java new file mode 100644 index 0000000..f6cc16a --- /dev/null +++ b/integration/java/src/main/java/org/apache/beam/integration/nexmark/BoundedEventSource.java @@ -0,0 +1,197 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.integration.nexmark; + +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.io.BoundedSource; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.values.TimestampedValue; + +import org.joda.time.Instant; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.NoSuchElementException; + +import javax.annotation.Nullable; + +/** + * A custom, bounded source of event records. + */ +class BoundedEventSource extends BoundedSource<Event> { + /** Configuration we generate events against. */ + private final GeneratorConfig config; + + /** How many bounded sources to create. */ + private final int numEventGenerators; + + public BoundedEventSource(GeneratorConfig config, int numEventGenerators) { + this.config = config; + this.numEventGenerators = numEventGenerators; + } + + /** A reader to pull events from the generator. */ + private static class EventReader extends BoundedReader<Event> { + /** + * Event source we purporting to be reading from. + * (We can't use Java's capture-outer-class pointer since we must update + * this field on calls to splitAtFraction.) + */ + private BoundedEventSource source; + + /** Generator we are reading from. */ + private final Generator generator; + + private boolean reportedStop; + + @Nullable + private TimestampedValue<Event> currentEvent; + + public EventReader(BoundedEventSource source, GeneratorConfig config) { + this.source = source; + generator = new Generator(config); + reportedStop = false; + } + + @Override + public synchronized boolean start() { + NexmarkUtils.info("starting bounded generator %s", generator); + return advance(); + } + + @Override + public synchronized boolean advance() { + if (!generator.hasNext()) { + // No more events. + if (!reportedStop) { + reportedStop = true; + NexmarkUtils.info("stopped bounded generator %s", generator); + } + return false; + } + currentEvent = generator.next(); + return true; + } + + @Override + public synchronized Event getCurrent() throws NoSuchElementException { + if (currentEvent == null) { + throw new NoSuchElementException(); + } + return currentEvent.getValue(); + } + + @Override + public synchronized Instant getCurrentTimestamp() throws NoSuchElementException { + if (currentEvent == null) { + throw new NoSuchElementException(); + } + return currentEvent.getTimestamp(); + } + + @Override + public void close() throws IOException { + // Nothing to close. + } + + @Override + public synchronized Double getFractionConsumed() { + return generator.getFractionConsumed(); + } + + @Override + public synchronized BoundedSource<Event> getCurrentSource() { + return source; + } + + @Override + @Nullable + public synchronized BoundedEventSource splitAtFraction(double fraction) { + long startId = generator.getCurrentConfig().getStartEventId(); + long stopId = generator.getCurrentConfig().getStopEventId(); + long size = stopId - startId; + long splitEventId = startId + Math.min((int) (size * fraction), size); + if (splitEventId <= generator.getNextEventId() || splitEventId == stopId) { + // Already passed this position or split results in left or right being empty. + NexmarkUtils.info("split failed for bounded generator %s at %f", generator, fraction); + return null; + } + + NexmarkUtils.info("about to split bounded generator %s at %d", generator, splitEventId); + + // Scale back the event space of the current generator, and return a generator config + // representing the event space we just 'stole' from the current generator. + GeneratorConfig remainingConfig = generator.splitAtEventId(splitEventId); + + NexmarkUtils.info("split bounded generator into %s and %s", generator, remainingConfig); + + // At this point + // generator.events() ++ new Generator(remainingConfig).events() + // == originalGenerator.events() + + // We need a new source to represent the now smaller key space for this reader, so + // that we can maintain the invariant that + // this.getCurrentSource().createReader(...) + // will yield the same output as this. + source = new BoundedEventSource(generator.getCurrentConfig(), source.numEventGenerators); + + // Return a source from which we may read the 'stolen' event space. + return new BoundedEventSource(remainingConfig, source.numEventGenerators); + } + } + + @Override + public List<BoundedEventSource> splitIntoBundles( + long desiredBundleSizeBytes, PipelineOptions options) { + NexmarkUtils.info("slitting bounded source %s into %d sub-sources", config, numEventGenerators); + List<BoundedEventSource> results = new ArrayList<>(); + // Ignore desiredBundleSizeBytes and use numEventGenerators instead. + for (GeneratorConfig subConfig : config.split(numEventGenerators)) { + results.add(new BoundedEventSource(subConfig, 1)); + } + return results; + } + + @Override + public long getEstimatedSizeBytes(PipelineOptions options) { + return config.getEstimatedSizeBytes(); + } + + @Override + public boolean producesSortedKeys(PipelineOptions options) { + return false; + } + + @Override + public EventReader createReader(PipelineOptions options) { + NexmarkUtils.info("creating initial bounded reader for %s", config); + return new EventReader(this, config); + } + + @Override + public void validate() { + // Nothing to validate. + } + + @Override + public Coder<Event> getDefaultOutputCoder() { + return Event.CODER; + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/1f08970a/integration/java/src/main/java/org/apache/beam/integration/nexmark/CategoryPrice.java ---------------------------------------------------------------------- diff --git a/integration/java/src/main/java/org/apache/beam/integration/nexmark/CategoryPrice.java b/integration/java/src/main/java/org/apache/beam/integration/nexmark/CategoryPrice.java new file mode 100644 index 0000000..ab5d92d --- /dev/null +++ b/integration/java/src/main/java/org/apache/beam/integration/nexmark/CategoryPrice.java @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.integration.nexmark; + +import org.apache.beam.sdk.coders.AtomicCoder; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.CoderException; +import org.apache.beam.sdk.coders.VarIntCoder; +import org.apache.beam.sdk.coders.VarLongCoder; + +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.core.JsonProcessingException; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.io.Serializable; + +/** + * Result of {@link Query4}. + */ +public class CategoryPrice implements KnownSize, Serializable { + private static final Coder<Long> LONG_CODER = VarLongCoder.of(); + private static final Coder<Integer> INT_CODER = VarIntCoder.of(); + + public static final Coder<CategoryPrice> CODER = new AtomicCoder<CategoryPrice>() { + @Override + public void encode(CategoryPrice value, OutputStream outStream, + Coder.Context context) + throws CoderException, IOException { + LONG_CODER.encode(value.category, outStream, Context.NESTED); + LONG_CODER.encode(value.price, outStream, Context.NESTED); + INT_CODER.encode(value.isLast ? 1 : 0, outStream, Context.NESTED); + } + + @Override + public CategoryPrice decode( + InputStream inStream, Coder.Context context) + throws CoderException, IOException { + long category = LONG_CODER.decode(inStream, Context.NESTED); + long price = LONG_CODER.decode(inStream, Context.NESTED); + boolean isLast = INT_CODER.decode(inStream, context) != 0; + return new CategoryPrice(category, price, isLast); + } + }; + + @JsonProperty + public final long category; + + /** Price in cents. */ + @JsonProperty + public final long price; + + @JsonProperty + public final boolean isLast; + + // For Avro only. + @SuppressWarnings("unused") + private CategoryPrice() { + category = 0; + price = 0; + isLast = false; + } + + public CategoryPrice(long category, long price, boolean isLast) { + this.category = category; + this.price = price; + this.isLast = isLast; + } + + @Override + public long sizeInBytes() { + return 8 + 8 + 1; + } + + @Override + public String toString() { + try { + return NexmarkUtils.MAPPER.writeValueAsString(this); + } catch (JsonProcessingException e) { + throw new RuntimeException(e); + } + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/1f08970a/integration/java/src/main/java/org/apache/beam/integration/nexmark/Done.java ---------------------------------------------------------------------- diff --git a/integration/java/src/main/java/org/apache/beam/integration/nexmark/Done.java b/integration/java/src/main/java/org/apache/beam/integration/nexmark/Done.java new file mode 100644 index 0000000..659da44 --- /dev/null +++ b/integration/java/src/main/java/org/apache/beam/integration/nexmark/Done.java @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.integration.nexmark; + +import org.apache.beam.sdk.coders.AtomicCoder; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.CoderException; +import org.apache.beam.sdk.coders.StringUtf8Coder; + +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.core.JsonProcessingException; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.io.Serializable; + +/** + * Result of query 10. + */ +public class Done implements KnownSize, Serializable { + private static final Coder<String> STRING_CODER = StringUtf8Coder.of(); + + public static final Coder<Done> CODER = new AtomicCoder<Done>() { + @Override + public void encode(Done value, OutputStream outStream, + Coder.Context context) + throws CoderException, IOException { + STRING_CODER.encode(value.message, outStream, Context.NESTED); + } + + @Override + public Done decode( + InputStream inStream, Coder.Context context) + throws CoderException, IOException { + String message = STRING_CODER.decode(inStream, Context.NESTED); + return new Done(message); + } + }; + + @JsonProperty + public final String message; + + // For Avro only. + @SuppressWarnings("unused") + public Done() { + message = null; + } + + public Done(String message) { + this.message = message; + } + + @Override + public long sizeInBytes() { + return message.length(); + } + + @Override + public String toString() { + try { + return NexmarkUtils.MAPPER.writeValueAsString(this); + } catch (JsonProcessingException e) { + throw new RuntimeException(e); + } + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/1f08970a/integration/java/src/main/java/org/apache/beam/integration/nexmark/Event.java ---------------------------------------------------------------------- diff --git a/integration/java/src/main/java/org/apache/beam/integration/nexmark/Event.java b/integration/java/src/main/java/org/apache/beam/integration/nexmark/Event.java new file mode 100644 index 0000000..a382b8e --- /dev/null +++ b/integration/java/src/main/java/org/apache/beam/integration/nexmark/Event.java @@ -0,0 +1,181 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.integration.nexmark; + +import org.apache.beam.sdk.coders.AtomicCoder; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.CoderException; +import org.apache.beam.sdk.coders.VarIntCoder; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.io.Serializable; + +import javax.annotation.Nullable; + +/** + * An event in the auction system, either a (new) {@link Person}, a (new) {@link Auction}, + * or a {@link Bid}. + */ +public class Event implements KnownSize, Serializable { + private static final Coder<Integer> INT_CODER = VarIntCoder.of(); + + public static final Coder<Event> CODER = new AtomicCoder<Event>() { + @Override + public void encode(Event value, OutputStream outStream, Coder.Context context) + throws CoderException, IOException { + if (value.newPerson != null) { + INT_CODER.encode(0, outStream, Context.NESTED); + Person.CODER.encode(value.newPerson, outStream, Context.NESTED); + } else if (value.newAuction != null) { + INT_CODER.encode(1, outStream, Context.NESTED); + Auction.CODER.encode(value.newAuction, outStream, Context.NESTED); + } else if (value.bid != null) { + INT_CODER.encode(2, outStream, Context.NESTED); + Bid.CODER.encode(value.bid, outStream, Context.NESTED); + } else { + throw new RuntimeException("invalid event"); + } + } + + @Override + public Event decode( + InputStream inStream, Coder.Context context) + throws CoderException, IOException { + int tag = INT_CODER.decode(inStream, context); + if (tag == 0) { + Person person = Person.CODER.decode(inStream, Context.NESTED); + return new Event(person); + } else if (tag == 1) { + Auction auction = Auction.CODER.decode(inStream, Context.NESTED); + return new Event(auction); + } else if (tag == 2) { + Bid bid = Bid.CODER.decode(inStream, Context.NESTED); + return new Event(bid); + } else { + throw new RuntimeException("invalid event encoding"); + } + } + }; + + @Nullable + @org.apache.avro.reflect.Nullable + public final Person newPerson; + + @Nullable + @org.apache.avro.reflect.Nullable + public final Auction newAuction; + + @Nullable + @org.apache.avro.reflect.Nullable + public final Bid bid; + + // For Avro only. + @SuppressWarnings("unused") + private Event() { + newPerson = null; + newAuction = null; + bid = null; + } + + public Event(Person newPerson) { + this.newPerson = newPerson; + newAuction = null; + bid = null; + } + + public Event(Auction newAuction) { + newPerson = null; + this.newAuction = newAuction; + bid = null; + } + + public Event(Bid bid) { + newPerson = null; + newAuction = null; + this.bid = bid; + } + + /** + * Return a copy of event which captures {@code annotation}. + * (Used for debugging). + */ + public Event withAnnotation(String annotation) { + if (newPerson != null) { + return new Event(newPerson.withAnnotation(annotation)); + } else if (newAuction != null) { + return new Event(newAuction.withAnnotation(annotation)); + } else { + return new Event(bid.withAnnotation(annotation)); + } + } + + /** + * Does event have {@code annotation}? (Used for debugging.) + */ + public boolean hasAnnotation(String annotation) { + if (newPerson != null) { + return newPerson.hasAnnotation(annotation); + } else if (newAuction != null) { + return newAuction.hasAnnotation(annotation); + } else { + return bid.hasAnnotation(annotation); + } + } + + /** + * Remove {@code annotation} from event. (Used for debugging.) + */ + public Event withoutAnnotation(String annotation) { + if (newPerson != null) { + return new Event(newPerson.withoutAnnotation(annotation)); + } else if (newAuction != null) { + return new Event(newAuction.withoutAnnotation(annotation)); + } else { + return new Event(bid.withoutAnnotation(annotation)); + } + } + + @Override + public long sizeInBytes() { + if (newPerson != null) { + return 1 + newPerson.sizeInBytes(); + } else if (newAuction != null) { + return 1 + newAuction.sizeInBytes(); + } else if (bid != null) { + return 1 + bid.sizeInBytes(); + } else { + throw new RuntimeException("invalid event"); + } + } + + @Override + public String toString() { + if (newPerson != null) { + return newPerson.toString(); + } else if (newAuction != null) { + return newAuction.toString(); + } else if (bid != null) { + return bid.toString(); + } else { + throw new RuntimeException("invalid event"); + } + } +}
