http://git-wip-us.apache.org/repos/asf/beam/blob/f4333df7/integration/java/pom.xml ---------------------------------------------------------------------- diff --git a/integration/java/pom.xml b/integration/java/pom.xml deleted file mode 100644 index b0c3853..0000000 --- a/integration/java/pom.xml +++ /dev/null @@ -1,37 +0,0 @@ -<?xml version="1.0" encoding="UTF-8"?> -<!-- - Licensed to the Apache Software Foundation (ASF) under one or more - contributor license agreements. See the NOTICE file distributed with - this work for additional information regarding copyright ownership. - The ASF licenses this file to You under the Apache License, Version 2.0 - (the "License"); you may not use this file except in compliance with - the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. ---> -<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> - - <modelVersion>4.0.0</modelVersion> - - <parent> - <groupId>org.apache.beam</groupId> - <artifactId>beam-integration-parent</artifactId> - <version>2.1.0-SNAPSHOT</version> - <relativePath>../pom.xml</relativePath> - </parent> - - <artifactId>beam-integration-java-parent</artifactId> - <packaging>pom</packaging> - <name>Apache Beam :: Integration Tests :: Java</name> - - <modules> - <module>nexmark</module> - </modules> - -</project>
http://git-wip-us.apache.org/repos/asf/beam/blob/f4333df7/integration/pom.xml ---------------------------------------------------------------------- diff --git a/integration/pom.xml b/integration/pom.xml deleted file mode 100644 index 4254819..0000000 --- a/integration/pom.xml +++ /dev/null @@ -1,51 +0,0 @@ -<?xml version="1.0" encoding="UTF-8"?> -<!-- - Licensed to the Apache Software Foundation (ASF) under one or more - contributor license agreements. See the NOTICE file distributed with - this work for additional information regarding copyright ownership. - The ASF licenses this file to You under the Apache License, Version 2.0 - (the "License"); you may not use this file except in compliance with - the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. ---> -<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> - - <modelVersion>4.0.0</modelVersion> - - <parent> - <groupId>org.apache.beam</groupId> - <artifactId>beam-parent</artifactId> - <version>2.1.0-SNAPSHOT</version> - <relativePath>../pom.xml</relativePath> - </parent> - - <artifactId>beam-integration-parent</artifactId> - <packaging>pom</packaging> - <name>Apache Beam :: Integration Tests</name> - - <profiles> - <profile> - <id>release</id> - <build> - <plugins> - <plugin> - <groupId>org.codehaus.mojo</groupId> - <artifactId>findbugs-maven-plugin</artifactId> - </plugin> - </plugins> - </build> - </profile> - </profiles> - - <modules> - <module>java</module> - </modules> - -</project> http://git-wip-us.apache.org/repos/asf/beam/blob/f4333df7/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index bddbf1f..5fd1297 100644 --- a/pom.xml +++ b/pom.xml @@ -187,7 +187,6 @@ <module>sdks</module> <module>runners</module> <module>examples</module> - <module>integration</module> <!-- sdks/java/javadoc builds project-wide Javadoc. It has to run last. --> <module>sdks/java/javadoc</module> </modules> http://git-wip-us.apache.org/repos/asf/beam/blob/f4333df7/sdks/java/nexmark/README.md ---------------------------------------------------------------------- diff --git a/sdks/java/nexmark/README.md b/sdks/java/nexmark/README.md new file mode 100644 index 0000000..a9acd63 --- /dev/null +++ b/sdks/java/nexmark/README.md @@ -0,0 +1,340 @@ +<!-- + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. +--> + +# NEXMark integration suite + +This is a suite of pipelines inspired by the 'continuous data stream' +queries in [http://datalab.cs.pdx.edu/niagaraST/NEXMark/] +(http://datalab.cs.pdx.edu/niagaraST/NEXMark/). + +These are multiple queries over a three entities model representing on online auction system: + + - **Person** represents a person submitting an item for auction and/or making a bid + on an auction. + - **Auction** represents an item under auction. + - **Bid** represents a bid for an item under auction. + +The queries exercise many aspects of Beam model: + +* **Query1**: What are the bid values in Euro's? + Illustrates a simple map. +* **Query2**: What are the auctions with particular auction numbers? + Illustrates a simple filter. +* **Query3**: Who is selling in particular US states? + Illustrates an incremental join (using per-key state and timer) and filter. +* **Query4**: What is the average selling price for each auction + category? + Illustrates complex join (using custom window functions) and + aggregation. +* **Query5**: Which auctions have seen the most bids in the last period? + Illustrates sliding windows and combiners. +* **Query6**: What is the average selling price per seller for their + last 10 closed auctions. + Shares the same 'winning bids' core as for **Query4**, and + illustrates a specialized combiner. +* **Query7**: What are the highest bids per period? + Deliberately implemented using a side input to illustrate fanout. +* **Query8**: Who has entered the system and created an auction in + the last period? + Illustrates a simple join. + +We have augmented the original queries with five more: + +* **Query0**: Pass-through. + Allows us to measure the monitoring overhead. +* **Query9**: Winning-bids. + A common sub-query shared by **Query4** and **Query6**. +* **Query10**: Log all events to GCS files. + Illustrates windows with large side effects on firing. +* **Query11**: How many bids did a user make in each session they + were active? + Illustrates session windows. +* **Query12**: How many bids does a user make within a fixed + processing time limit? + Illustrates working in processing time in the Global window, as + compared with event time in non-Global windows for all the other + queries. + +We can specify the Beam runner to use with maven profiles, available profiles are: + +* direct-runner +* spark-runner +* flink-runner +* apex-runner + +The runner must also be specified like in any other Beam pipeline using + + --runner + + +Test data is deterministically synthesized on demand. The test +data may be synthesized in the same pipeline as the query itself, +or may be published to Pubsub. + +The query results may be: + +* Published to Pubsub. +* Written to text files as plain text. +* Written to text files using an Avro encoding. +* Send to BigQuery. +* Discarded. + +# Configuration + +## Common configuration parameters + +Decide if batch or streaming: + + --streaming=true + +Number of events generators + + --numEventGenerators=4 + +Run query N + + --query=N + +## Available Suites +The suite to run can be chosen using this configuration parameter: + + --suite=SUITE + +Available suites are: +* DEFAULT: Test default configuration with query 0. +* SMOKE: Run the 12 default configurations. +* STRESS: Like smoke but for 1m events. +* FULL_THROTTLE: Like SMOKE but 100m events. + + + +## Apex specific configuration + + --manageResources=false --monitorJobs=false + +## Dataflow specific configuration + + --manageResources=false --monitorJobs=true \ + --enforceEncodability=false --enforceImmutability=false + --project=<your project> \ + --zone=<your zone> \ + --workerMachineType=n1-highmem-8 \ + --stagingLocation=<a gs path for staging> \ + --runner=DataflowRunner \ + --tempLocation=gs://talend-imejia/nexmark/temp/ \ + --stagingLocation=gs://talend-imejia/nexmark/temp/staging/ \ + --filesToStage=target/beam-integration-java-0.7.0-SNAPSHOT.jar + +## Direct specific configuration + + --manageResources=false --monitorJobs=true \ + --enforceEncodability=false --enforceImmutability=false + +## Flink specific configuration + + --manageResources=false --monitorJobs=true \ + --flinkMaster=local --parallelism=#numcores + +## Spark specific configuration + + --manageResources=false --monitorJobs=true \ + --sparkMaster=local \ + -Dspark.ui.enabled=false -DSPARK_LOCAL_IP=localhost -Dsun.io.serialization.extendedDebugInfo=true + +# Current Status + +Open issues are tracked [here](https://github.com../../../../../issues): + +## Batch / Synthetic / Local + +| Query | Direct | Spark | Flink | Apex | +| ----: | ------ | ------------------------------------------------------------ | ---------------------------------------------------------- | ------------------------------------------------------------ | +| 0 | ok | ok | ok | ok | +| 1 | ok | ok | ok | ok | +| 2 | ok | ok | ok | ok | +| 3 | ok | [BEAM-1035](https://issues.apache.org/jira/browse/BEAM-1035) | ok | [BEAM-1037](https://issues.apache.org/jira/browse/BEAM-1037) | +| 4 | ok | ok | ok | ok | +| 5 | ok | ok | ok | ok | +| 6 | ok | ok | ok | ok | +| 7 | ok | ok | ok | ok | +| 8 | ok | ok | ok | ok | +| 9 | ok | ok | ok | ok | +| 10 | ok | ok | ok | ok | +| 11 | ok | ok | ok | ok | +| 12 | ok | ok | ok | ok | + +## Streaming / Synthetic / Local + +| Query | Direct | Spark | Flink | Apex | +| ----: | ------ | ------------------------------------------------------------ | ---------------------------------------------------------- | ------------------------------------------------------------ | +| 0 | ok | ok | [BEAM-593](https://issues.apache.org/jira/browse/BEAM-593) | ok | +| 1 | ok | ok | [BEAM-593](https://issues.apache.org/jira/browse/BEAM-593) | ok | +| 2 | ok | ok | [BEAM-593](https://issues.apache.org/jira/browse/BEAM-593) | ok | +| 3 | ok | [BEAM-1035](https://issues.apache.org/jira/browse/BEAM-1035) | [BEAM-593](https://issues.apache.org/jira/browse/BEAM-593) | [BEAM-1037](https://issues.apache.org/jira/browse/BEAM-1037) | +| 4 | ok | ok | [BEAM-593](https://issues.apache.org/jira/browse/BEAM-593) | ok | +| 5 | ok | ok | [BEAM-593](https://issues.apache.org/jira/browse/BEAM-593) | ok | +| 6 | ok | ok | [BEAM-593](https://issues.apache.org/jira/browse/BEAM-593) | ok | +| 7 | ok | [BEAM-2112](https://issues.apache.org/jira/browse/BEAM-2112) | [BEAM-593](https://issues.apache.org/jira/browse/BEAM-593) | ok | +| 8 | ok | ok | [BEAM-593](https://issues.apache.org/jira/browse/BEAM-593) | ok | +| 9 | ok | ok | [BEAM-593](https://issues.apache.org/jira/browse/BEAM-593) | ok | +| 10 | ok | ok | [BEAM-593](https://issues.apache.org/jira/browse/BEAM-593) | ok | +| 11 | ok | ok | [BEAM-593](https://issues.apache.org/jira/browse/BEAM-593) | ok | +| 12 | ok | ok | [BEAM-593](https://issues.apache.org/jira/browse/BEAM-593) | ok | + +## Batch / Synthetic / Cluster + +TODO + +| Query | Dataflow | Spark | Flink | Apex | +| ----: | ------------------------------ | ------------------------------ | ------------------------------ | ------------------------------ | +| 0 | | | | | + +## Streaming / Synthetic / Cluster + +TODO + +| Query | Dataflow | Spark | Flink | Apex | +| ----: | ------------------------------ | ------------------------------ | ------------------------------ | ------------------------------ | +| 0 | | | | | + +# Running Nexmark + +## Running SMOKE suite on the DirectRunner (local) + +Batch Mode + + mvn exec:java -Dexec.mainClass=org.apache.beam.integration.nexmark.Main -Pdirect-runner -Dexec.args="--runner=DirectRunner --suite=SMOKE --streaming=false --manageResources=false --monitorJobs=true --enforceEncodability=true --enforceImmutability=true" + +Streaming Mode + + mvn exec:java -Dexec.mainClass=org.apache.beam.integration.nexmark.Main -Pdirect-runner -Dexec.args="--runner=DirectRunner --suite=SMOKE --streaming=true --manageResources=false --monitorJobs=true --enforceEncodability=true --enforceImmutability=true" + + +## Running SMOKE suite on the SparkRunner (local) + +Batch Mode + + mvn exec:java -Dexec.mainClass=org.apache.beam.integration.nexmark.Main -Pspark-runner "-Dexec.args=--runner=SparkRunner --suite=SMOKE --streamTimeout=60 --streaming=false --manageResources=false --monitorJobs=true" + +Streaming Mode + + mvn exec:java -Dexec.mainClass=org.apache.beam.integration.nexmark.Main -Pspark-runner "-Dexec.args=--runner=SparkRunner --suite=SMOKE --streamTimeout=60 --streaming=true --manageResources=false --monitorJobs=true" + + +## Running SMOKE suite on the FlinkRunner (local) + +Batch Mode + + mvn exec:java -Dexec.mainClass=org.apache.beam.integration.nexmark.Main -Pflink-runner "-Dexec.args=--runner=FlinkRunner --suite=SMOKE --streamTimeout=60 --streaming=false --manageResources=false --monitorJobs=true --flinkMaster=local" + +Streaming Mode + + mvn exec:java -Dexec.mainClass=org.apache.beam.integration.nexmark.Main -Pflink-runner "-Dexec.args=--runner=FlinkRunner --suite=SMOKE --streamTimeout=60 --streaming=true --manageResources=false --monitorJobs=true --flinkMaster=local" + + +## Running SMOKE suite on the ApexRunner (local) + +Batch Mode + + mvn exec:java -Dexec.mainClass=org.apache.beam.integration.nexmark.Main -Papex-runner "-Dexec.args=--runner=ApexRunner --suite=SMOKE --streamTimeout=60 --streaming=false --manageResources=false --monitorJobs=false" + +Streaming Mode + + mvn exec:java -Dexec.mainClass=org.apache.beam.integration.nexmark.Main -Papex-runner "-Dexec.args=--runner=ApexRunner --suite=SMOKE --streamTimeout=60 --streaming=true --manageResources=false --monitorJobs=false" + + +## Running SMOKE suite on Google Cloud Dataflow + +Building package + + mvn clean package -Pdataflow-runner + +Submit to Google Dataflow service + + +``` +java -cp integration/java/target/beam-integration-java-nexmark-bundled-2.1.0-SNAPSHOT.jar \ + org.apache.beam.integration.nexmark.Main \ + --runner=DataflowRunner + --project=<your project> \ + --zone=<your zone> \ + --workerMachineType=n1-highmem-8 \ + --stagingLocation=<a gs path for staging> \ + --streaming=true \ + --sourceType=PUBSUB \ + --pubSubMode=PUBLISH_ONLY \ + --pubsubTopic=<an existing Pubsub topic> \ + --resourceNameMode=VERBATIM \ + --manageResources=false \ + --monitorJobs=false \ + --numEventGenerators=64 \ + --numWorkers=16 \ + --maxNumWorkers=16 \ + --suite=SMOKE \ + --firstEventRate=100000 \ + --nextEventRate=100000 \ + --ratePeriodSec=3600 \ + --isRateLimited=true \ + --avgPersonByteSize=500 \ + --avgAuctionByteSize=500 \ + --avgBidByteSize=500 \ + --probDelayedEvent=0.000001 \ + --occasionalDelaySec=3600 \ + --numEvents=0 \ + --useWallclockEventTime=true \ + --usePubsubPublishTime=true \ + --experiments=enable_custom_pubsub_sink +``` + +``` +java -cp integration/java/target/beam-integration-java-nexmark-bundled-2.1.0-SNAPSHOT.jar \ + org.apache.beam.integration.nexmark.Main \ + --runner=DataflowRunner + --project=<your project> \ + --zone=<your zone> \ + --workerMachineType=n1-highmem-8 \ + --stagingLocation=<a gs path for staging> \ + --streaming=true \ + --sourceType=PUBSUB \ + --pubSubMode=SUBSCRIBE_ONLY \ + --pubsubSubscription=<an existing Pubsub subscription to above topic> \ + --resourceNameMode=VERBATIM \ + --manageResources=false \ + --monitorJobs=false \ + --numWorkers=64 \ + --maxNumWorkers=64 \ + --suite=SMOKE \ + --usePubsubPublishTime=true \ + --outputPath=<a gs path under which log files will be written> \ + --windowSizeSec=600 \ + --occasionalDelaySec=3600 \ + --maxLogEvents=10000 \ + --experiments=enable_custom_pubsub_source +``` + +## Running query 0 on a Spark cluster with yarn + +Building package + + mvn clean package -Pspark-runner + +Submit to the cluster + + spark-submit --master yarn-client --class org.apache.beam.integration.nexmark.Main --driver-memory 512m --executor-memory 512m --executor-cores 1 beam-integration-java-nexmark-bundled-2.1.0-SNAPSHOT.jar --runner=SparkRunner --query=0 --streamTimeout=60 --streaming=false --manageResources=false --monitorJobs=true + http://git-wip-us.apache.org/repos/asf/beam/blob/f4333df7/sdks/java/nexmark/pom.xml ---------------------------------------------------------------------- diff --git a/sdks/java/nexmark/pom.xml b/sdks/java/nexmark/pom.xml new file mode 100644 index 0000000..c1b6025 --- /dev/null +++ b/sdks/java/nexmark/pom.xml @@ -0,0 +1,292 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + + <parent> + <groupId>org.apache.beam</groupId> + <artifactId>beam-sdks-java-parent</artifactId> + <version>2.1.0-SNAPSHOT</version> + <relativePath>../pom.xml</relativePath> + </parent> + + <artifactId>beam-sdks-java-nexmark</artifactId> + <name>Apache Beam :: SDKs :: Java :: Nexmark</name> + <packaging>jar</packaging> + + <profiles> + + <!-- + The direct runner is available by default. + You can also include it on the classpath explicitly with -P direct-runner + --> + <profile> + <id>direct-runner</id> + <activation> + <activeByDefault>true</activeByDefault> + </activation> + <dependencies> + <dependency> + <groupId>org.apache.beam</groupId> + <artifactId>beam-runners-direct-java</artifactId> + <scope>runtime</scope> + </dependency> + </dependencies> + </profile> + + <!-- Include the Apache Apex runner with -P apex-runner --> + <profile> + <id>apex-runner</id> + <dependencies> + <dependency> + <groupId>org.apache.beam</groupId> + <artifactId>beam-runners-apex</artifactId> + <scope>runtime</scope> + </dependency> + <!-- + Apex depends on httpclient version 4.3.5, project has a transitive dependency to httpclient 4.0.1 from + google-http-client. Apex dependency version being specified explicitly so that it gets picked up. This + can be removed when the project no longer has a dependency on a different httpclient version. + --> + <dependency> + <groupId>org.apache.httpcomponents</groupId> + <artifactId>httpclient</artifactId> + <version>4.3.5</version> + <scope>runtime</scope> + <exclusions> + <exclusion> + <groupId>commons-codec</groupId> + <artifactId>commons-codec</artifactId> + </exclusion> + </exclusions> + </dependency> + </dependencies> + </profile> + + <!-- Include the Apache Flink runner with -P flink-runner --> + <profile> + <id>flink-runner</id> + <dependencies> + <dependency> + <groupId>org.apache.beam</groupId> + <artifactId>beam-runners-flink_2.10</artifactId> + <scope>runtime</scope> + </dependency> + </dependencies> + </profile> + + <!-- Include the Apache Spark runner -P spark-runner --> + <profile> + <id>spark-runner</id> + <dependencies> + <dependency> + <groupId>org.apache.beam</groupId> + <artifactId>beam-runners-spark</artifactId> + <scope>runtime</scope> + </dependency> + <dependency> + <groupId>org.apache.spark</groupId> + <artifactId>spark-streaming_2.10</artifactId> + <version>${spark.version}</version> + <scope>runtime</scope> + </dependency> + <dependency> + <groupId>org.apache.spark</groupId> + <artifactId>spark-core_2.10</artifactId> + <version>${spark.version}</version> + <scope>runtime</scope> + <exclusions> + <exclusion> + <groupId>org.slf4j</groupId> + <artifactId>jul-to-slf4j</artifactId> + </exclusion> + </exclusions> + </dependency> + </dependencies> + </profile> + + <!-- Include the Google Cloud Dataflow runner -P dataflow-runner --> + <profile> + <id>dataflow-runner</id> + <dependencies> + <dependency> + <groupId>org.apache.beam</groupId> + <artifactId>beam-runners-google-cloud-dataflow-java</artifactId> + <scope>runtime</scope> + </dependency> + </dependencies> + </profile> + </profiles> + + <build> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-shade-plugin</artifactId> + <executions> + <execution> + <phase>package</phase> + <goals> + <goal>shade</goal> + </goals> + <configuration> + <finalName>${project.artifactId}-bundled-${project.version}</finalName> + <artifactSet> + <includes> + <include>*:*</include> + </includes> + </artifactSet> + <filters> + <filter> + <artifact>*:*</artifact> + <excludes> + <exclude>META-INF/*.SF</exclude> + <exclude>META-INF/*.DSA</exclude> + <exclude>META-INF/*.RSA</exclude> + </excludes> + </filter> + </filters> + </configuration> + </execution> + </executions> + </plugin> + + <!-- Avro plugin for automatic code generation --> + <plugin> + <groupId>org.apache.avro</groupId> + <artifactId>avro-maven-plugin</artifactId> + <executions> + <execution> + <id>schemas</id> + <phase>generate-sources</phase> + <goals> + <goal>schema</goal> + </goals> + <configuration> + <sourceDirectory>${project.basedir}/src/main/</sourceDirectory> + <outputDirectory>${project.build.directory}/generated-sources/java</outputDirectory> + </configuration> + </execution> + </executions> + </plugin> + + <!-- Coverage analysis for unit tests. --> + <plugin> + <groupId>org.jacoco</groupId> + <artifactId>jacoco-maven-plugin</artifactId> + </plugin> + </plugins> + </build> + + <dependencies> + <!-- Java SDK --> + <dependency> + <groupId>org.apache.beam</groupId> + <artifactId>beam-sdks-java-core</artifactId> + </dependency> + + <!-- IOs --> + <dependency> + <groupId>org.apache.beam</groupId> + <artifactId>beam-sdks-java-io-google-cloud-platform</artifactId> + </dependency> + + <dependency> + <groupId>org.apache.beam</groupId> + <artifactId>beam-sdks-java-extensions-google-cloud-platform-core</artifactId> + </dependency> + + <dependency> + <groupId>com.google.apis</groupId> + <artifactId>google-api-services-bigquery</artifactId> + </dependency> + + <!-- Extra libraries --> + <dependency> + <groupId>com.fasterxml.jackson.core</groupId> + <artifactId>jackson-core</artifactId> + </dependency> + + <dependency> + <groupId>com.fasterxml.jackson.core</groupId> + <artifactId>jackson-annotations</artifactId> + </dependency> + + <dependency> + <groupId>com.fasterxml.jackson.core</groupId> + <artifactId>jackson-databind</artifactId> + </dependency> + + <dependency> + <groupId>org.apache.avro</groupId> + <artifactId>avro</artifactId> + </dependency> + + <dependency> + <groupId>joda-time</groupId> + <artifactId>joda-time</artifactId> + </dependency> + + <dependency> + <groupId>com.google.guava</groupId> + <artifactId>guava</artifactId> + </dependency> + + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-api</artifactId> + </dependency> + + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-jdk14</artifactId> + <scope>runtime</scope> + </dependency> + + <dependency> + <groupId>com.google.code.findbugs</groupId> + <artifactId>jsr305</artifactId> + </dependency> + + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + <scope>compile</scope> + </dependency> + + <dependency> + <groupId>org.hamcrest</groupId> + <artifactId>hamcrest-core</artifactId> + <scope>compile</scope> + </dependency> + + <dependency> + <groupId>org.hamcrest</groupId> + <artifactId>hamcrest-all</artifactId> + </dependency> + + <!-- Test --> + <dependency> + <groupId>org.apache.beam</groupId> + <artifactId>beam-runners-direct-java</artifactId> + <scope>test</scope> + </dependency> + </dependencies> +</project> http://git-wip-us.apache.org/repos/asf/beam/blob/f4333df7/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/Main.java ---------------------------------------------------------------------- diff --git a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/Main.java b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/Main.java new file mode 100644 index 0000000..ab2284c --- /dev/null +++ b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/Main.java @@ -0,0 +1,303 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.nexmark; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.nio.file.StandardOpenOption; +import java.util.ArrayList; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import javax.annotation.Nullable; +import org.apache.beam.sdk.nexmark.model.Auction; +import org.apache.beam.sdk.nexmark.model.Bid; +import org.apache.beam.sdk.nexmark.model.Person; +import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.joda.time.Duration; +import org.joda.time.Instant; + +/** + * An implementation of the 'NEXMark queries' for Beam. + * These are multiple queries over a three table schema representing an online auction system: + * <ul> + * <li>{@link Person} represents a person submitting an item for auction and/or making a bid + * on an auction. + * <li>{@link Auction} represents an item under auction. + * <li>{@link Bid} represents a bid for an item under auction. + * </ul> + * The queries exercise many aspects of the Beam model. + * + * <p>We synthesize the creation of people, auctions and bids in real-time. The data is not + * particularly sensible. + * + * <p>See <a href="http://datalab.cs.pdx.edu/niagaraST/NEXMark/"> + * http://datalab.cs.pdx.edu/niagaraST/NEXMark/</a> + */ +public class Main<OptionT extends NexmarkOptions> { + + /** + * Entry point. + */ + void runAll(OptionT options, NexmarkLauncher nexmarkLauncher) { + Instant start = Instant.now(); + Map<NexmarkConfiguration, NexmarkPerf> baseline = loadBaseline(options.getBaselineFilename()); + Map<NexmarkConfiguration, NexmarkPerf> actual = new LinkedHashMap<>(); + Iterable<NexmarkConfiguration> configurations = options.getSuite().getConfigurations(options); + + boolean successful = true; + try { + // Run all the configurations. + for (NexmarkConfiguration configuration : configurations) { + NexmarkPerf perf = nexmarkLauncher.run(configuration); + if (perf != null) { + if (perf.errors == null || perf.errors.size() > 0) { + successful = false; + } + appendPerf(options.getPerfFilename(), configuration, perf); + actual.put(configuration, perf); + // Summarize what we've run so far. + saveSummary(null, configurations, actual, baseline, start); + } + } + } finally { + if (options.getMonitorJobs()) { + // Report overall performance. + saveSummary(options.getSummaryFilename(), configurations, actual, baseline, start); + saveJavascript(options.getJavascriptFilename(), configurations, actual, baseline, start); + } + } + + if (!successful) { + throw new RuntimeException("Execution was not successful"); + } + } + + /** + * Append the pair of {@code configuration} and {@code perf} to perf file. + */ + private void appendPerf( + @Nullable String perfFilename, NexmarkConfiguration configuration, + NexmarkPerf perf) { + if (perfFilename == null) { + return; + } + List<String> lines = new ArrayList<>(); + lines.add(""); + lines.add(String.format("# %s", Instant.now())); + lines.add(String.format("# %s", configuration.toShortString())); + lines.add(configuration.toString()); + lines.add(perf.toString()); + try { + Files.write(Paths.get(perfFilename), lines, StandardCharsets.UTF_8, StandardOpenOption.CREATE, + StandardOpenOption.APPEND); + } catch (IOException e) { + throw new RuntimeException("Unable to write perf file: ", e); + } + NexmarkUtils.console("appended results to perf file %s.", perfFilename); + } + + /** + * Load the baseline perf. + */ + @Nullable + private static Map<NexmarkConfiguration, NexmarkPerf> loadBaseline( + @Nullable String baselineFilename) { + if (baselineFilename == null) { + return null; + } + Map<NexmarkConfiguration, NexmarkPerf> baseline = new LinkedHashMap<>(); + List<String> lines; + try { + lines = Files.readAllLines(Paths.get(baselineFilename), StandardCharsets.UTF_8); + } catch (IOException e) { + throw new RuntimeException("Unable to read baseline perf file: ", e); + } + for (int i = 0; i < lines.size(); i++) { + if (lines.get(i).startsWith("#") || lines.get(i).trim().isEmpty()) { + continue; + } + NexmarkConfiguration configuration = NexmarkConfiguration.fromString(lines.get(i++)); + NexmarkPerf perf = NexmarkPerf.fromString(lines.get(i)); + baseline.put(configuration, perf); + } + NexmarkUtils.console("loaded %d entries from baseline file %s.", baseline.size(), + baselineFilename); + return baseline; + } + + private static final String LINE = + "=========================================================================================="; + + /** + * Print summary of {@code actual} vs (if non-null) {@code baseline}. + */ + private static void saveSummary( + @Nullable String summaryFilename, + Iterable<NexmarkConfiguration> configurations, Map<NexmarkConfiguration, NexmarkPerf> actual, + @Nullable Map<NexmarkConfiguration, NexmarkPerf> baseline, Instant start) { + List<String> lines = new ArrayList<>(); + + lines.add(""); + lines.add(LINE); + + lines.add( + String.format("Run started %s and ran for %s", start, new Duration(start, Instant.now()))); + lines.add(""); + + lines.add("Default configuration:"); + lines.add(NexmarkConfiguration.DEFAULT.toString()); + lines.add(""); + + lines.add("Configurations:"); + lines.add(" Conf Description"); + int conf = 0; + for (NexmarkConfiguration configuration : configurations) { + lines.add(String.format(" %04d %s", conf++, configuration.toShortString())); + NexmarkPerf actualPerf = actual.get(configuration); + if (actualPerf != null && actualPerf.jobId != null) { + lines.add(String.format(" %4s [Ran as job %s]", "", actualPerf.jobId)); + } + } + + lines.add(""); + lines.add("Performance:"); + lines.add(String.format(" %4s %12s %12s %12s %12s %12s %12s", "Conf", "Runtime(sec)", + "(Baseline)", "Events(/sec)", "(Baseline)", "Results", "(Baseline)")); + conf = 0; + for (NexmarkConfiguration configuration : configurations) { + String line = String.format(" %04d ", conf++); + NexmarkPerf actualPerf = actual.get(configuration); + if (actualPerf == null) { + line += "*** not run ***"; + } else { + NexmarkPerf baselinePerf = baseline == null ? null : baseline.get(configuration); + double runtimeSec = actualPerf.runtimeSec; + line += String.format("%12.1f ", runtimeSec); + if (baselinePerf == null) { + line += String.format("%12s ", ""); + } else { + double baselineRuntimeSec = baselinePerf.runtimeSec; + double diff = ((runtimeSec - baselineRuntimeSec) / baselineRuntimeSec) * 100.0; + line += String.format("%+11.2f%% ", diff); + } + + double eventsPerSec = actualPerf.eventsPerSec; + line += String.format("%12.1f ", eventsPerSec); + if (baselinePerf == null) { + line += String.format("%12s ", ""); + } else { + double baselineEventsPerSec = baselinePerf.eventsPerSec; + double diff = ((eventsPerSec - baselineEventsPerSec) / baselineEventsPerSec) * 100.0; + line += String.format("%+11.2f%% ", diff); + } + + long numResults = actualPerf.numResults; + line += String.format("%12d ", numResults); + if (baselinePerf == null) { + line += String.format("%12s", ""); + } else { + long baselineNumResults = baselinePerf.numResults; + long diff = numResults - baselineNumResults; + line += String.format("%+12d", diff); + } + } + lines.add(line); + + if (actualPerf != null) { + List<String> errors = actualPerf.errors; + if (errors == null) { + errors = new ArrayList<>(); + errors.add("NexmarkGoogleRunner returned null errors list"); + } + for (String error : errors) { + lines.add(String.format(" %4s *** %s ***", "", error)); + } + } + } + + lines.add(LINE); + lines.add(""); + + for (String line : lines) { + System.out.println(line); + } + + if (summaryFilename != null) { + try { + Files.write(Paths.get(summaryFilename), lines, StandardCharsets.UTF_8, + StandardOpenOption.CREATE, StandardOpenOption.APPEND); + } catch (IOException e) { + throw new RuntimeException("Unable to save summary file: ", e); + } + NexmarkUtils.console("appended summary to summary file %s.", summaryFilename); + } + } + + /** + * Write all perf data and any baselines to a javascript file which can be used by + * graphing page etc. + */ + private static void saveJavascript( + @Nullable String javascriptFilename, + Iterable<NexmarkConfiguration> configurations, Map<NexmarkConfiguration, NexmarkPerf> actual, + @Nullable Map<NexmarkConfiguration, NexmarkPerf> baseline, Instant start) { + if (javascriptFilename == null) { + return; + } + + List<String> lines = new ArrayList<>(); + lines.add(String.format( + "// Run started %s and ran for %s", start, new Duration(start, Instant.now()))); + lines.add("var all = ["); + + for (NexmarkConfiguration configuration : configurations) { + lines.add(" {"); + lines.add(String.format(" config: %s", configuration)); + NexmarkPerf actualPerf = actual.get(configuration); + if (actualPerf != null) { + lines.add(String.format(" ,perf: %s", actualPerf)); + } + NexmarkPerf baselinePerf = baseline == null ? null : baseline.get(configuration); + if (baselinePerf != null) { + lines.add(String.format(" ,baseline: %s", baselinePerf)); + } + lines.add(" },"); + } + + lines.add("];"); + + try { + Files.write(Paths.get(javascriptFilename), lines, StandardCharsets.UTF_8, + StandardOpenOption.CREATE, StandardOpenOption.TRUNCATE_EXISTING); + } catch (IOException e) { + throw new RuntimeException("Unable to save javascript file: ", e); + } + NexmarkUtils.console("saved javascript to file %s.", javascriptFilename); + } + + public static void main(String[] args) { + NexmarkOptions options = PipelineOptionsFactory.fromArgs(args) + .withValidation() + .as(NexmarkOptions.class); + NexmarkLauncher<NexmarkOptions> nexmarkLauncher = new NexmarkLauncher<>(options); + new Main<>().runAll(options, nexmarkLauncher); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/f4333df7/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/Monitor.java ---------------------------------------------------------------------- diff --git a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/Monitor.java b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/Monitor.java new file mode 100644 index 0000000..f45c387 --- /dev/null +++ b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/Monitor.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.nexmark; + +import java.io.Serializable; +import org.apache.beam.sdk.metrics.Counter; +import org.apache.beam.sdk.metrics.Distribution; +import org.apache.beam.sdk.metrics.Metrics; +import org.apache.beam.sdk.nexmark.model.KnownSize; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.values.PCollection; + +/** + * A monitor of elements with support for later retrieving their metrics. + * + * @param <T> Type of element we are monitoring. + */ +public class Monitor<T extends KnownSize> implements Serializable { + private class MonitorDoFn extends DoFn<T, T> { + final Counter elementCounter = + Metrics.counter(name , prefix + ".elements"); + final Counter bytesCounter = + Metrics.counter(name , prefix + ".bytes"); + final Distribution startTime = + Metrics.distribution(name , prefix + ".startTime"); + final Distribution endTime = + Metrics.distribution(name , prefix + ".endTime"); + final Distribution startTimestamp = + Metrics.distribution(name , prefix + ".startTimestamp"); + final Distribution endTimestamp = + Metrics.distribution(name , prefix + ".endTimestamp"); + + @ProcessElement + public void processElement(ProcessContext c) { + elementCounter.inc(); + bytesCounter.inc(c.element().sizeInBytes()); + long now = System.currentTimeMillis(); + startTime.update(now); + endTime.update(now); + startTimestamp.update(c.timestamp().getMillis()); + endTimestamp.update(c.timestamp().getMillis()); + c.output(c.element()); + } + } + + public final String name; + public final String prefix; + private final MonitorDoFn doFn; + private final PTransform<PCollection<? extends T>, PCollection<T>> transform; + + public Monitor(String name, String prefix) { + this.name = name; + this.prefix = prefix; + doFn = new MonitorDoFn(); + transform = ParDo.of(doFn); + } + + public PTransform<PCollection<? extends T>, PCollection<T>> getTransform() { + return transform; + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/f4333df7/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkConfiguration.java ---------------------------------------------------------------------- diff --git a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkConfiguration.java b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkConfiguration.java new file mode 100644 index 0000000..904fcd5 --- /dev/null +++ b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkConfiguration.java @@ -0,0 +1,721 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.nexmark; + +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.core.JsonProcessingException; +import java.io.IOException; +import java.io.Serializable; +import java.util.Objects; + +/** + * Configuration controlling how a query is run. May be supplied by command line or + * programmatically. We only capture properties which may influence the resulting + * pipeline performance, as captured by {@link NexmarkPerf}. + */ +public class NexmarkConfiguration implements Serializable { + public static final NexmarkConfiguration DEFAULT = new NexmarkConfiguration(); + + /** If {@literal true}, include additional debugging and monitoring stats. */ + @JsonProperty + public boolean debug = true; + + /** Which query to run, in [0,9]. */ + @JsonProperty + public int query = 0; + + /** Where events come from. */ + @JsonProperty + public NexmarkUtils.SourceType sourceType = NexmarkUtils.SourceType.DIRECT; + + /** Where results go to. */ + @JsonProperty + public NexmarkUtils.SinkType sinkType = NexmarkUtils.SinkType.DEVNULL; + + /** + * Control whether pub/sub publishing is done in a stand-alone pipeline or is integrated + * into the overall query pipeline. + */ + @JsonProperty + public NexmarkUtils.PubSubMode pubSubMode = NexmarkUtils.PubSubMode.COMBINED; + + /** + * Number of events to generate. If zero, generate as many as possible without overflowing + * internal counters etc. + */ + @JsonProperty + public long numEvents = 100000; + + /** + * Number of event generators to use. Each generates events in its own timeline. + */ + @JsonProperty + public int numEventGenerators = 100; + + /** + * Shape of event rate curve. + */ + @JsonProperty + public NexmarkUtils.RateShape rateShape = NexmarkUtils.RateShape.SINE; + + /** + * Initial overall event rate (in {@link #rateUnit}). + */ + @JsonProperty + public int firstEventRate = 10000; + + /** + * Next overall event rate (in {@link #rateUnit}). + */ + @JsonProperty + public int nextEventRate = 10000; + + /** + * Unit for rates. + */ + @JsonProperty + public NexmarkUtils.RateUnit rateUnit = NexmarkUtils.RateUnit.PER_SECOND; + + /** + * Overall period of rate shape, in seconds. + */ + @JsonProperty + public int ratePeriodSec = 600; + + /** + * Time in seconds to preload the subscription with data, at the initial input rate of the + * pipeline. + */ + @JsonProperty + public int preloadSeconds = 0; + + /** + * Timeout for stream pipelines to stop in seconds. + */ + @JsonProperty + public int streamTimeout = 240; + + /** + * If true, and in streaming mode, generate events only when they are due according to their + * timestamp. + */ + @JsonProperty + public boolean isRateLimited = false; + + /** + * If true, use wallclock time as event time. Otherwise, use a deterministic + * time in the past so that multiple runs will see exactly the same event streams + * and should thus have exactly the same results. + */ + @JsonProperty + public boolean useWallclockEventTime = false; + + /** Average idealized size of a 'new person' event, in bytes. */ + @JsonProperty + public int avgPersonByteSize = 200; + + /** Average idealized size of a 'new auction' event, in bytes. */ + @JsonProperty + public int avgAuctionByteSize = 500; + + /** Average idealized size of a 'bid' event, in bytes. */ + @JsonProperty + public int avgBidByteSize = 100; + + /** Ratio of bids to 'hot' auctions compared to all other auctions. */ + @JsonProperty + public int hotAuctionRatio = 2; + + /** Ratio of auctions for 'hot' sellers compared to all other people. */ + @JsonProperty + public int hotSellersRatio = 4; + + /** Ratio of bids for 'hot' bidders compared to all other people. */ + @JsonProperty + public int hotBiddersRatio = 4; + + /** Window size, in seconds, for queries 3, 5, 7 and 8. */ + @JsonProperty + public long windowSizeSec = 10; + + /** Sliding window period, in seconds, for query 5. */ + @JsonProperty + public long windowPeriodSec = 5; + + /** Number of seconds to hold back events according to their reported timestamp. */ + @JsonProperty + public long watermarkHoldbackSec = 0; + + /** Average number of auction which should be inflight at any time, per generator. */ + @JsonProperty + public int numInFlightAuctions = 100; + + /** Maximum number of people to consider as active for placing auctions or bids. */ + @JsonProperty + public int numActivePeople = 1000; + + /** Coder strategy to follow. */ + @JsonProperty + public NexmarkUtils.CoderStrategy coderStrategy = NexmarkUtils.CoderStrategy.HAND; + + /** + * Delay, in milliseconds, for each event. This will peg one core for this number + * of milliseconds to simulate CPU-bound computation. + */ + @JsonProperty + public long cpuDelayMs = 0; + + /** + * Extra data, in bytes, to save to persistent state for each event. This will force + * i/o all the way to durable storage to simulate an I/O-bound computation. + */ + @JsonProperty + public long diskBusyBytes = 0; + + /** + * Skip factor for query 2. We select bids for every {@code auctionSkip}'th auction. + */ + @JsonProperty + public int auctionSkip = 123; + + /** + * Fanout for queries 4 (groups by category id), 5 and 7 (find a global maximum). + */ + @JsonProperty + public int fanout = 5; + + /** + * Maximum waiting time to clean personState in query3 + * (ie maximum waiting of the auctions related to person in state in seconds in event time). + */ + @JsonProperty + public int maxAuctionsWaitingTime = 600; + + /** + * Length of occasional delay to impose on events (in seconds). + */ + @JsonProperty + public long occasionalDelaySec = 3; + + /** + * Probability that an event will be delayed by delayS. + */ + @JsonProperty + public double probDelayedEvent = 0.1; + + /** + * Maximum size of each log file (in events). For Query10 only. + */ + @JsonProperty + public int maxLogEvents = 100_000; + + /** + * If true, use pub/sub publish time instead of event time. + */ + @JsonProperty + public boolean usePubsubPublishTime = false; + + /** + * Number of events in out-of-order groups. 1 implies no out-of-order events. 1000 implies + * every 1000 events per generator are emitted in pseudo-random order. + */ + @JsonProperty + public long outOfOrderGroupSize = 1; + + /** + * Replace any properties of this configuration which have been supplied by the command line. + */ + public void overrideFromOptions(NexmarkOptions options) { + if (options.getDebug() != null) { + debug = options.getDebug(); + } + if (options.getQuery() != null) { + query = options.getQuery(); + } + if (options.getSourceType() != null) { + sourceType = options.getSourceType(); + } + if (options.getSinkType() != null) { + sinkType = options.getSinkType(); + } + if (options.getPubSubMode() != null) { + pubSubMode = options.getPubSubMode(); + } + if (options.getNumEvents() != null) { + numEvents = options.getNumEvents(); + } + if (options.getNumEventGenerators() != null) { + numEventGenerators = options.getNumEventGenerators(); + } + if (options.getRateShape() != null) { + rateShape = options.getRateShape(); + } + if (options.getFirstEventRate() != null) { + firstEventRate = options.getFirstEventRate(); + } + if (options.getNextEventRate() != null) { + nextEventRate = options.getNextEventRate(); + } + if (options.getRateUnit() != null) { + rateUnit = options.getRateUnit(); + } + if (options.getRatePeriodSec() != null) { + ratePeriodSec = options.getRatePeriodSec(); + } + if (options.getPreloadSeconds() != null) { + preloadSeconds = options.getPreloadSeconds(); + } + if (options.getStreamTimeout() != null) { + streamTimeout = options.getStreamTimeout(); + } + if (options.getIsRateLimited() != null) { + isRateLimited = options.getIsRateLimited(); + } + if (options.getUseWallclockEventTime() != null) { + useWallclockEventTime = options.getUseWallclockEventTime(); + } + if (options.getAvgPersonByteSize() != null) { + avgPersonByteSize = options.getAvgPersonByteSize(); + } + if (options.getAvgAuctionByteSize() != null) { + avgAuctionByteSize = options.getAvgAuctionByteSize(); + } + if (options.getAvgBidByteSize() != null) { + avgBidByteSize = options.getAvgBidByteSize(); + } + if (options.getHotAuctionRatio() != null) { + hotAuctionRatio = options.getHotAuctionRatio(); + } + if (options.getHotSellersRatio() != null) { + hotSellersRatio = options.getHotSellersRatio(); + } + if (options.getHotBiddersRatio() != null) { + hotBiddersRatio = options.getHotBiddersRatio(); + } + if (options.getWindowSizeSec() != null) { + windowSizeSec = options.getWindowSizeSec(); + } + if (options.getWindowPeriodSec() != null) { + windowPeriodSec = options.getWindowPeriodSec(); + } + if (options.getWatermarkHoldbackSec() != null) { + watermarkHoldbackSec = options.getWatermarkHoldbackSec(); + } + if (options.getNumInFlightAuctions() != null) { + numInFlightAuctions = options.getNumInFlightAuctions(); + } + if (options.getNumActivePeople() != null) { + numActivePeople = options.getNumActivePeople(); + } + if (options.getCoderStrategy() != null) { + coderStrategy = options.getCoderStrategy(); + } + if (options.getCpuDelayMs() != null) { + cpuDelayMs = options.getCpuDelayMs(); + } + if (options.getDiskBusyBytes() != null) { + diskBusyBytes = options.getDiskBusyBytes(); + } + if (options.getAuctionSkip() != null) { + auctionSkip = options.getAuctionSkip(); + } + if (options.getFanout() != null) { + fanout = options.getFanout(); + } + if (options.getMaxAuctionsWaitingTime() != null) { + fanout = options.getMaxAuctionsWaitingTime(); + } + if (options.getOccasionalDelaySec() != null) { + occasionalDelaySec = options.getOccasionalDelaySec(); + } + if (options.getProbDelayedEvent() != null) { + probDelayedEvent = options.getProbDelayedEvent(); + } + if (options.getMaxLogEvents() != null) { + maxLogEvents = options.getMaxLogEvents(); + } + if (options.getUsePubsubPublishTime() != null) { + usePubsubPublishTime = options.getUsePubsubPublishTime(); + } + if (options.getOutOfOrderGroupSize() != null) { + outOfOrderGroupSize = options.getOutOfOrderGroupSize(); + } + } + + /** + * Return copy of configuration with given label. + */ + public NexmarkConfiguration copy() { + NexmarkConfiguration result; + result = new NexmarkConfiguration(); + result.debug = debug; + result.query = query; + result.sourceType = sourceType; + result.sinkType = sinkType; + result.pubSubMode = pubSubMode; + result.numEvents = numEvents; + result.numEventGenerators = numEventGenerators; + result.rateShape = rateShape; + result.firstEventRate = firstEventRate; + result.nextEventRate = nextEventRate; + result.rateUnit = rateUnit; + result.ratePeriodSec = ratePeriodSec; + result.preloadSeconds = preloadSeconds; + result.streamTimeout = streamTimeout; + result.isRateLimited = isRateLimited; + result.useWallclockEventTime = useWallclockEventTime; + result.avgPersonByteSize = avgPersonByteSize; + result.avgAuctionByteSize = avgAuctionByteSize; + result.avgBidByteSize = avgBidByteSize; + result.hotAuctionRatio = hotAuctionRatio; + result.hotSellersRatio = hotSellersRatio; + result.hotBiddersRatio = hotBiddersRatio; + result.windowSizeSec = windowSizeSec; + result.windowPeriodSec = windowPeriodSec; + result.watermarkHoldbackSec = watermarkHoldbackSec; + result.numInFlightAuctions = numInFlightAuctions; + result.numActivePeople = numActivePeople; + result.coderStrategy = coderStrategy; + result.cpuDelayMs = cpuDelayMs; + result.diskBusyBytes = diskBusyBytes; + result.auctionSkip = auctionSkip; + result.fanout = fanout; + result.maxAuctionsWaitingTime = maxAuctionsWaitingTime; + result.occasionalDelaySec = occasionalDelaySec; + result.probDelayedEvent = probDelayedEvent; + result.maxLogEvents = maxLogEvents; + result.usePubsubPublishTime = usePubsubPublishTime; + result.outOfOrderGroupSize = outOfOrderGroupSize; + return result; + } + + /** + * Return short description of configuration (suitable for use in logging). We only render + * the core fields plus those which do not have default values. + */ + public String toShortString() { + StringBuilder sb = new StringBuilder(); + sb.append(String.format("query:%d", query)); + if (debug != DEFAULT.debug) { + sb.append(String.format("; debug:%s", debug)); + } + if (sourceType != DEFAULT.sourceType) { + sb.append(String.format("; sourceType:%s", sourceType)); + } + if (sinkType != DEFAULT.sinkType) { + sb.append(String.format("; sinkType:%s", sinkType)); + } + if (pubSubMode != DEFAULT.pubSubMode) { + sb.append(String.format("; pubSubMode:%s", pubSubMode)); + } + if (numEvents != DEFAULT.numEvents) { + sb.append(String.format("; numEvents:%d", numEvents)); + } + if (numEventGenerators != DEFAULT.numEventGenerators) { + sb.append(String.format("; numEventGenerators:%d", numEventGenerators)); + } + if (rateShape != DEFAULT.rateShape) { + sb.append(String.format("; rateShape:%s", rateShape)); + } + if (firstEventRate != DEFAULT.firstEventRate || nextEventRate != DEFAULT.nextEventRate) { + sb.append(String.format("; firstEventRate:%d", firstEventRate)); + sb.append(String.format("; nextEventRate:%d", nextEventRate)); + } + if (rateUnit != DEFAULT.rateUnit) { + sb.append(String.format("; rateUnit:%s", rateUnit)); + } + if (ratePeriodSec != DEFAULT.ratePeriodSec) { + sb.append(String.format("; ratePeriodSec:%d", ratePeriodSec)); + } + if (preloadSeconds != DEFAULT.preloadSeconds) { + sb.append(String.format("; preloadSeconds:%d", preloadSeconds)); + } + if (streamTimeout != DEFAULT.streamTimeout) { + sb.append(String.format("; streamTimeout:%d", streamTimeout)); + } + if (isRateLimited != DEFAULT.isRateLimited) { + sb.append(String.format("; isRateLimited:%s", isRateLimited)); + } + if (useWallclockEventTime != DEFAULT.useWallclockEventTime) { + sb.append(String.format("; useWallclockEventTime:%s", useWallclockEventTime)); + } + if (avgPersonByteSize != DEFAULT.avgPersonByteSize) { + sb.append(String.format("; avgPersonByteSize:%d", avgPersonByteSize)); + } + if (avgAuctionByteSize != DEFAULT.avgAuctionByteSize) { + sb.append(String.format("; avgAuctionByteSize:%d", avgAuctionByteSize)); + } + if (avgBidByteSize != DEFAULT.avgBidByteSize) { + sb.append(String.format("; avgBidByteSize:%d", avgBidByteSize)); + } + if (hotAuctionRatio != DEFAULT.hotAuctionRatio) { + sb.append(String.format("; hotAuctionRatio:%d", hotAuctionRatio)); + } + if (hotSellersRatio != DEFAULT.hotSellersRatio) { + sb.append(String.format("; hotSellersRatio:%d", hotSellersRatio)); + } + if (hotBiddersRatio != DEFAULT.hotBiddersRatio) { + sb.append(String.format("; hotBiddersRatio:%d", hotBiddersRatio)); + } + if (windowSizeSec != DEFAULT.windowSizeSec) { + sb.append(String.format("; windowSizeSec:%d", windowSizeSec)); + } + if (windowPeriodSec != DEFAULT.windowPeriodSec) { + sb.append(String.format("; windowPeriodSec:%d", windowPeriodSec)); + } + if (watermarkHoldbackSec != DEFAULT.watermarkHoldbackSec) { + sb.append(String.format("; watermarkHoldbackSec:%d", watermarkHoldbackSec)); + } + if (numInFlightAuctions != DEFAULT.numInFlightAuctions) { + sb.append(String.format("; numInFlightAuctions:%d", numInFlightAuctions)); + } + if (numActivePeople != DEFAULT.numActivePeople) { + sb.append(String.format("; numActivePeople:%d", numActivePeople)); + } + if (coderStrategy != DEFAULT.coderStrategy) { + sb.append(String.format("; coderStrategy:%s", coderStrategy)); + } + if (cpuDelayMs != DEFAULT.cpuDelayMs) { + sb.append(String.format("; cpuSlowdownMs:%d", cpuDelayMs)); + } + if (diskBusyBytes != DEFAULT.diskBusyBytes) { + sb.append(String.format("; diskBuysBytes:%d", diskBusyBytes)); + } + if (auctionSkip != DEFAULT.auctionSkip) { + sb.append(String.format("; auctionSkip:%d", auctionSkip)); + } + if (fanout != DEFAULT.fanout) { + sb.append(String.format("; fanout:%d", fanout)); + } + if (maxAuctionsWaitingTime != DEFAULT.maxAuctionsWaitingTime) { + sb.append(String.format("; maxAuctionsWaitingTime:%d", fanout)); + } + if (occasionalDelaySec != DEFAULT.occasionalDelaySec) { + sb.append(String.format("; occasionalDelaySec:%d", occasionalDelaySec)); + } + if (probDelayedEvent != DEFAULT.probDelayedEvent) { + sb.append(String.format("; probDelayedEvent:%f", probDelayedEvent)); + } + if (maxLogEvents != DEFAULT.maxLogEvents) { + sb.append(String.format("; maxLogEvents:%d", maxLogEvents)); + } + if (usePubsubPublishTime != DEFAULT.usePubsubPublishTime) { + sb.append(String.format("; usePubsubPublishTime:%s", usePubsubPublishTime)); + } + if (outOfOrderGroupSize != DEFAULT.outOfOrderGroupSize) { + sb.append(String.format("; outOfOrderGroupSize:%d", outOfOrderGroupSize)); + } + return sb.toString(); + } + + /** + * Return full description as a string. + */ + @Override + public String toString() { + try { + return NexmarkUtils.MAPPER.writeValueAsString(this); + } catch (JsonProcessingException e) { + throw new RuntimeException(e); + } + } + + /** + * Parse an object from {@code string}. + */ + public static NexmarkConfiguration fromString(String string) { + try { + return NexmarkUtils.MAPPER.readValue(string, NexmarkConfiguration.class); + } catch (IOException e) { + throw new RuntimeException("Unable to parse nexmark configuration: ", e); + } + } + + @Override + public int hashCode() { + return Objects.hash( + debug, + query, + sourceType, + sinkType, + pubSubMode, + numEvents, + numEventGenerators, + rateShape, + firstEventRate, + nextEventRate, + rateUnit, + ratePeriodSec, + preloadSeconds, + streamTimeout, + isRateLimited, + useWallclockEventTime, + avgPersonByteSize, + avgAuctionByteSize, + avgBidByteSize, + hotAuctionRatio, + hotSellersRatio, + hotBiddersRatio, + windowSizeSec, + windowPeriodSec, + watermarkHoldbackSec, + numInFlightAuctions, + numActivePeople, + coderStrategy, + cpuDelayMs, + diskBusyBytes, + auctionSkip, + fanout, + maxAuctionsWaitingTime, + occasionalDelaySec, + probDelayedEvent, + maxLogEvents, + usePubsubPublishTime, + outOfOrderGroupSize); + } + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + if (obj == null) { + return false; + } + if (getClass() != obj.getClass()) { + return false; + } + NexmarkConfiguration other = (NexmarkConfiguration) obj; + if (debug != other.debug) { + return false; + } + if (auctionSkip != other.auctionSkip) { + return false; + } + if (avgAuctionByteSize != other.avgAuctionByteSize) { + return false; + } + if (avgBidByteSize != other.avgBidByteSize) { + return false; + } + if (avgPersonByteSize != other.avgPersonByteSize) { + return false; + } + if (coderStrategy != other.coderStrategy) { + return false; + } + if (cpuDelayMs != other.cpuDelayMs) { + return false; + } + if (diskBusyBytes != other.diskBusyBytes) { + return false; + } + if (fanout != other.fanout) { + return false; + } + if (maxAuctionsWaitingTime != other.maxAuctionsWaitingTime) { + return false; + } + if (firstEventRate != other.firstEventRate) { + return false; + } + if (hotAuctionRatio != other.hotAuctionRatio) { + return false; + } + if (hotBiddersRatio != other.hotBiddersRatio) { + return false; + } + if (hotSellersRatio != other.hotSellersRatio) { + return false; + } + if (isRateLimited != other.isRateLimited) { + return false; + } + if (maxLogEvents != other.maxLogEvents) { + return false; + } + if (nextEventRate != other.nextEventRate) { + return false; + } + if (rateUnit != other.rateUnit) { + return false; + } + if (numEventGenerators != other.numEventGenerators) { + return false; + } + if (numEvents != other.numEvents) { + return false; + } + if (numInFlightAuctions != other.numInFlightAuctions) { + return false; + } + if (numActivePeople != other.numActivePeople) { + return false; + } + if (occasionalDelaySec != other.occasionalDelaySec) { + return false; + } + if (preloadSeconds != other.preloadSeconds) { + return false; + } + if (streamTimeout != other.streamTimeout) { + return false; + } + if (Double.doubleToLongBits(probDelayedEvent) + != Double.doubleToLongBits(other.probDelayedEvent)) { + return false; + } + if (pubSubMode != other.pubSubMode) { + return false; + } + if (ratePeriodSec != other.ratePeriodSec) { + return false; + } + if (rateShape != other.rateShape) { + return false; + } + if (query != other.query) { + return false; + } + if (sinkType != other.sinkType) { + return false; + } + if (sourceType != other.sourceType) { + return false; + } + if (useWallclockEventTime != other.useWallclockEventTime) { + return false; + } + if (watermarkHoldbackSec != other.watermarkHoldbackSec) { + return false; + } + if (windowPeriodSec != other.windowPeriodSec) { + return false; + } + if (windowSizeSec != other.windowSizeSec) { + return false; + } + if (usePubsubPublishTime != other.usePubsubPublishTime) { + return false; + } + if (outOfOrderGroupSize != other.outOfOrderGroupSize) { + return false; + } + return true; + } +}
