Repository: incubator-beam Updated Branches: refs/heads/master c834ecd3d -> 88db3beb8
Add initial microbenchmarks directory Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/fa8bf328 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/fa8bf328 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/fa8bf328 Branch: refs/heads/master Commit: fa8bf32833b2bb7670bde3eaa71a9701ef060857 Parents: c834ecd Author: bchambers <[email protected]> Authored: Tue Jun 28 15:55:16 2016 -0700 Committer: bchambers <[email protected]> Committed: Fri Jul 1 20:11:35 2016 -0700 ---------------------------------------------------------------------- .../beam/sdk/transforms/DoFnReflector.java | 12 +- sdks/java/microbenchmarks/README.md | 42 ++++ sdks/java/microbenchmarks/pom.xml | 110 +++++++++ .../coders/AvroCoderBenchmark.java | 121 ++++++++++ .../coders/ByteArrayCoderBenchmark.java | 66 +++++ .../coders/CoderBenchmarking.java | 42 ++++ .../coders/StringUtf8CoderBenchmark.java | 72 ++++++ .../transforms/DoFnReflectorBenchmark.java | 239 +++++++++++++++++++ sdks/java/pom.xml | 1 + 9 files changed, 699 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/fa8bf328/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnReflector.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnReflector.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnReflector.java index 452ee8e..e711d04 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnReflector.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnReflector.java @@ -123,7 +123,7 @@ public abstract class DoFnReflector { * @param c the {@link org.apache.beam.sdk.transforms.DoFnWithContext.ProcessContext} * to pass to {@link ProcessElement}. */ - abstract <InputT, OutputT> void invokeProcessElement( + public abstract <InputT, OutputT> void invokeProcessElement( DoFnWithContext<InputT, OutputT> fn, DoFnWithContext<InputT, OutputT>.ProcessContext c, ExtraContextFactory<InputT, OutputT> extra); @@ -135,7 +135,7 @@ public abstract class DoFnReflector { * @param c the {@link org.apache.beam.sdk.transforms.DoFnWithContext.Context} * to pass to {@link StartBundle}. */ - <InputT, OutputT> void invokeStartBundle( + public <InputT, OutputT> void invokeStartBundle( DoFnWithContext<InputT, OutputT> fn, DoFnWithContext<InputT, OutputT>.Context c, ExtraContextFactory<InputT, OutputT> extra) { @@ -149,7 +149,7 @@ public abstract class DoFnReflector { * @param c the {@link org.apache.beam.sdk.transforms.DoFnWithContext.Context} * to pass to {@link FinishBundle}. */ - abstract <InputT, OutputT> void invokeFinishBundle( + public abstract <InputT, OutputT> void invokeFinishBundle( DoFnWithContext<InputT, OutputT> fn, DoFnWithContext<InputT, OutputT>.Context c, ExtraContextFactory<InputT, OutputT> extra); @@ -430,7 +430,7 @@ public abstract class DoFnReflector { } @Override - <InputT, OutputT> void invokeProcessElement( + public <InputT, OutputT> void invokeProcessElement( DoFnWithContext<InputT, OutputT> fn, DoFnWithContext<InputT, OutputT>.ProcessContext c, ExtraContextFactory<InputT, OutputT> extra) { @@ -438,7 +438,7 @@ public abstract class DoFnReflector { } @Override - <InputT, OutputT> void invokeStartBundle( + public <InputT, OutputT> void invokeStartBundle( DoFnWithContext<InputT, OutputT> fn, DoFnWithContext<InputT, OutputT>.Context c, ExtraContextFactory<InputT, OutputT> extra) { @@ -449,7 +449,7 @@ public abstract class DoFnReflector { } @Override - <InputT, OutputT> void invokeFinishBundle( + public <InputT, OutputT> void invokeFinishBundle( DoFnWithContext<InputT, OutputT> fn, DoFnWithContext<InputT, OutputT>.Context c, ExtraContextFactory<InputT, OutputT> extra) { http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/fa8bf328/sdks/java/microbenchmarks/README.md ---------------------------------------------------------------------- diff --git a/sdks/java/microbenchmarks/README.md b/sdks/java/microbenchmarks/README.md new file mode 100644 index 0000000..627e669 --- /dev/null +++ b/sdks/java/microbenchmarks/README.md @@ -0,0 +1,42 @@ +<!-- + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. +--> + +# Microbenchmarks for parts of the Beam SDK + +To run benchmarks: + + 1. Run `mvn install` in the top directory to install the SDK. + + 2. Build the benchmark package: + + cd microbenchmarks + mvn package + + 3. run benchmark harness: + + java -jar target/microbenchmarks.jar + + 4. (alternate to step 3) + to run just a subset of benchmarks, pass a regular expression that + matches the benchmarks you want to run (this can match against the class + name, or the method name). E.g., to run any benchmarks with + "DoFnReflector" in the name: + + java -jar target/microbenchmarks.jar ".*DoFnReflector.*" + http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/fa8bf328/sdks/java/microbenchmarks/pom.xml ---------------------------------------------------------------------- diff --git a/sdks/java/microbenchmarks/pom.xml b/sdks/java/microbenchmarks/pom.xml new file mode 100644 index 0000000..96a76ec --- /dev/null +++ b/sdks/java/microbenchmarks/pom.xml @@ -0,0 +1,110 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + + <modelVersion>4.0.0</modelVersion> + + <parent> + <groupId>org.apache.beam</groupId> + <artifactId>beam-sdks-java-parent</artifactId> + <version>0.2.0-incubating-SNAPSHOT</version> + <relativePath>../pom.xml</relativePath> + </parent> + + <artifactId>beam-sdks-java-microbenchmarks</artifactId> + <name>Apache Beam :: SDKs :: Java :: Microbenchmarks</name> + <description>Microbenchmarks for components in the Beam Java SDK.</description> + <packaging>jar</packaging> + + <build> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-compiler-plugin</artifactId> + </plugin> + + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-source-plugin</artifactId> + </plugin> + + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-jar-plugin</artifactId> + </plugin> + + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-checkstyle-plugin</artifactId> + </plugin> + + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-shade-plugin</artifactId> + <executions> + <execution> + <phase>package</phase> + <goals> + <goal>shade</goal> + </goals> + <configuration> + <finalName>microbenchmarks</finalName> + <transformers> + <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"> + <mainClass>org.openjdk.jmh.Main</mainClass> + </transformer> + </transformers> + </configuration> + </execution> + </executions> + </plugin> + </plugins> + </build> + + <dependencies> + <dependency> + <groupId>org.apache.beam</groupId> + <artifactId>beam-sdks-java-core</artifactId> + </dependency> + + <dependency> + <groupId>joda-time</groupId> + <artifactId>joda-time</artifactId> + </dependency> + + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-jdk14</artifactId> + <!-- When loaded at runtime this will wire up slf4j to the JUL backend --> + <scope>runtime</scope> + </dependency> + + <dependency> + <groupId>org.openjdk.jmh</groupId> + <artifactId>jmh-core</artifactId> + <version>1.0.1</version> + </dependency> + + <dependency> + <groupId>org.openjdk.jmh</groupId> + <artifactId>jmh-generator-annprocess</artifactId> + <version>1.6.1</version> + <scope>provided</scope> + </dependency> + </dependencies> +</project> http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/fa8bf328/sdks/java/microbenchmarks/src/main/java/org/apache/beam/sdk/microbenchmarks/coders/AvroCoderBenchmark.java ---------------------------------------------------------------------- diff --git a/sdks/java/microbenchmarks/src/main/java/org/apache/beam/sdk/microbenchmarks/coders/AvroCoderBenchmark.java b/sdks/java/microbenchmarks/src/main/java/org/apache/beam/sdk/microbenchmarks/coders/AvroCoderBenchmark.java new file mode 100644 index 0000000..39b31ef --- /dev/null +++ b/sdks/java/microbenchmarks/src/main/java/org/apache/beam/sdk/microbenchmarks/coders/AvroCoderBenchmark.java @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.microbenchmarks.coders; + +import org.apache.beam.sdk.coders.AvroCoder; +import org.apache.beam.sdk.coders.DefaultCoder; + +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Param; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.Warmup; + +import java.io.IOException; +import java.util.Arrays; + +/** + * Benchmarks for {@link AvroCoder}. + */ +@State(Scope.Benchmark) +@Fork(1) +@Warmup(iterations = 5) +public class AvroCoderBenchmark { + + @DefaultCoder(AvroCoder.class) + private static class Pojo { + public String text; + public int count; + + // Empty constructor required for Avro decoding. + @SuppressWarnings("unused") + public Pojo() { + } + + public Pojo(String text, int count) { + this.text = text; + this.count = count; + } + + // auto-generated + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + Pojo pojo = (Pojo) o; + + if (count != pojo.count) { + return false; + } + if (text != null + ? !text.equals(pojo.text) + : pojo.text != null) { + return false; + } + + return true; + } + + @Override + public int hashCode() { + return 0; + } + + @Override + public String toString() { + return "Pojo{" + + "text='" + text + '\'' + + ", count=" + count + + '}'; + } + } + + AvroCoder<Pojo> coder = AvroCoder.of(Pojo.class); + + @Param({"true", "false"}) + boolean isWholeStream; + + Pojo shortPojo; + Pojo longPojo; + + @Setup + public void setUp() { + shortPojo = new Pojo("hello world", 42); + + char[] bytes60k = new char[60 * 1024]; + Arrays.fill(bytes60k, 'a'); + longPojo = new Pojo(new String(bytes60k), 42); + } + + @Benchmark + public Pojo codeShortPojo() throws IOException { + return CoderBenchmarking.testCoder(coder, isWholeStream, shortPojo); + } + + @Benchmark + public Pojo codeLongPojo() throws Exception { + return CoderBenchmarking.testCoder(coder, isWholeStream, longPojo); + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/fa8bf328/sdks/java/microbenchmarks/src/main/java/org/apache/beam/sdk/microbenchmarks/coders/ByteArrayCoderBenchmark.java ---------------------------------------------------------------------- diff --git a/sdks/java/microbenchmarks/src/main/java/org/apache/beam/sdk/microbenchmarks/coders/ByteArrayCoderBenchmark.java b/sdks/java/microbenchmarks/src/main/java/org/apache/beam/sdk/microbenchmarks/coders/ByteArrayCoderBenchmark.java new file mode 100644 index 0000000..df20a15 --- /dev/null +++ b/sdks/java/microbenchmarks/src/main/java/org/apache/beam/sdk/microbenchmarks/coders/ByteArrayCoderBenchmark.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.microbenchmarks.coders; + +import org.apache.beam.sdk.coders.ByteArrayCoder; + +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Param; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.Warmup; + +import java.io.IOException; +import java.util.Arrays; + +/** + * Benchmarks for {@link ByteArrayCoder}. + */ +@State(Scope.Benchmark) +@Fork(1) +@Warmup(iterations = 5) +public class ByteArrayCoderBenchmark { + + ByteArrayCoder coder = ByteArrayCoder.of(); + + @Param({"true", "false"}) + boolean isWholeStream; + + byte[] shortArray; + byte[] longArray; + + @Setup + public void setUp() { + shortArray = new byte[10]; + Arrays.fill(shortArray, (byte) 47); + longArray = new byte[60 * 1024]; + Arrays.fill(longArray, (byte) 47); + } + + @Benchmark + public byte[] codeShortArray() throws IOException { + return CoderBenchmarking.testCoder(coder, isWholeStream, shortArray); + } + + @Benchmark + public byte[] codeLongArray() throws Exception { + return CoderBenchmarking.testCoder(coder, isWholeStream, longArray); + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/fa8bf328/sdks/java/microbenchmarks/src/main/java/org/apache/beam/sdk/microbenchmarks/coders/CoderBenchmarking.java ---------------------------------------------------------------------- diff --git a/sdks/java/microbenchmarks/src/main/java/org/apache/beam/sdk/microbenchmarks/coders/CoderBenchmarking.java b/sdks/java/microbenchmarks/src/main/java/org/apache/beam/sdk/microbenchmarks/coders/CoderBenchmarking.java new file mode 100644 index 0000000..8523cb2 --- /dev/null +++ b/sdks/java/microbenchmarks/src/main/java/org/apache/beam/sdk/microbenchmarks/coders/CoderBenchmarking.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.microbenchmarks.coders; + +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.util.CoderUtils; + +import java.io.IOException; + +/** + * Utilities for writing coder benchmarks. + */ +class CoderBenchmarking { + + /** + * Encodes and decodes the given value using the specified Coder. + * + * @throws IOException if there are errors during encoding or decoding + */ + public static <T> T testCoder( + Coder<T> coder, boolean isWholeStream, T value) throws IOException { + Coder.Context context = + isWholeStream ? Coder.Context.OUTER : Coder.Context.NESTED; + byte[] encoded = CoderUtils.encodeToByteArray(coder, value, context); + return CoderUtils.decodeFromByteArray(coder, encoded, context); + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/fa8bf328/sdks/java/microbenchmarks/src/main/java/org/apache/beam/sdk/microbenchmarks/coders/StringUtf8CoderBenchmark.java ---------------------------------------------------------------------- diff --git a/sdks/java/microbenchmarks/src/main/java/org/apache/beam/sdk/microbenchmarks/coders/StringUtf8CoderBenchmark.java b/sdks/java/microbenchmarks/src/main/java/org/apache/beam/sdk/microbenchmarks/coders/StringUtf8CoderBenchmark.java new file mode 100644 index 0000000..c0bcb45 --- /dev/null +++ b/sdks/java/microbenchmarks/src/main/java/org/apache/beam/sdk/microbenchmarks/coders/StringUtf8CoderBenchmark.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.microbenchmarks.coders; + +import org.apache.beam.sdk.coders.StringUtf8Coder; + +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Param; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.Warmup; + +import java.io.IOException; +import java.util.Arrays; + +/** + * Benchmarks for {@link StringUtf8Coder}. + */ +@State(Scope.Benchmark) +@Fork(1) +@Warmup(iterations = 5) +public class StringUtf8CoderBenchmark { + + StringUtf8Coder coder = StringUtf8Coder.of(); + + @Param({"true", "false"}) + boolean isWholeStream; + + String shortString; + String longString; + + @Setup + public void setUp() { + shortString = "hello world"; + + char[] bytes60k = new char[60 * 1024]; + Arrays.fill(bytes60k, 'a'); + longString = new String(bytes60k); + } + + @Benchmark + public String codeEmptyString() throws IOException { + return CoderBenchmarking.testCoder(coder, isWholeStream, ""); + } + + @Benchmark + public String codeShortString() throws IOException { + return CoderBenchmarking.testCoder(coder, isWholeStream, shortString); + } + + @Benchmark + public String codeLongString() throws IOException { + return CoderBenchmarking.testCoder(coder, isWholeStream, longString); + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/fa8bf328/sdks/java/microbenchmarks/src/main/java/org/apache/beam/sdk/microbenchmarks/transforms/DoFnReflectorBenchmark.java ---------------------------------------------------------------------- diff --git a/sdks/java/microbenchmarks/src/main/java/org/apache/beam/sdk/microbenchmarks/transforms/DoFnReflectorBenchmark.java b/sdks/java/microbenchmarks/src/main/java/org/apache/beam/sdk/microbenchmarks/transforms/DoFnReflectorBenchmark.java new file mode 100644 index 0000000..1b8ec2a --- /dev/null +++ b/sdks/java/microbenchmarks/src/main/java/org/apache/beam/sdk/microbenchmarks/transforms/DoFnReflectorBenchmark.java @@ -0,0 +1,239 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.microbenchmarks.transforms; + +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.transforms.Aggregator; +import org.apache.beam.sdk.transforms.Combine.CombineFn; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.DoFnReflector; +import org.apache.beam.sdk.transforms.DoFnWithContext; +import org.apache.beam.sdk.transforms.DoFnWithContext.ExtraContextFactory; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.transforms.windowing.PaneInfo; +import org.apache.beam.sdk.util.WindowingInternals; +import org.apache.beam.sdk.values.PCollectionView; +import org.apache.beam.sdk.values.TupleTag; + +import org.joda.time.Instant; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.Warmup; + +/** + * Benchmarks for {@link DoFn} and {@link DoFnWithContext} invocations, specifically + * for measuring the overhead of {@link DoFnReflector}. + */ +@State(Scope.Benchmark) +@Fork(1) +@Warmup(iterations = 5) +public class DoFnReflectorBenchmark { + + private static final String ELEMENT = "some string to use for testing"; + + private DoFn<String, String> doFn = new UpperCaseDoFn(); + private DoFnWithContext<String, String> doFnWithContext = new UpperCaseDoFnWithContext(); + + private StubDoFnProcessContext stubDoFnContext = new StubDoFnProcessContext(doFn, ELEMENT); + private StubDoFnWithContextProcessContext stubDoFnWithContextContext = + new StubDoFnWithContextProcessContext(doFnWithContext, ELEMENT); + private ExtraContextFactory<String, String> extraContextFactory = + new ExtraContextFactory<String, String>() { + + @Override + public BoundedWindow window() { + return null; + } + + @Override + public WindowingInternals<String, String> windowingInternals() { + return null; + } + }; + + private DoFnReflector doFnReflector; + private DoFn<String, String> adaptedDoFnWithContext; + + @Setup + public void setUp() { + doFnReflector = DoFnReflector.of(doFnWithContext.getClass()); + adaptedDoFnWithContext = doFnReflector.toDoFn(doFnWithContext); + } + + @Benchmark + public String invokeDoFn() throws Exception { + doFn.processElement(stubDoFnContext); + return stubDoFnContext.output; + } + + @Benchmark + public String invokeDoFnWithContextViaAdaptor() throws Exception { + adaptedDoFnWithContext.processElement(stubDoFnContext); + return stubDoFnContext.output; + } + + @Benchmark + public String invokeDoFnWithContext() throws Exception { + doFnReflector.invokeProcessElement( + doFnWithContext, stubDoFnWithContextContext, extraContextFactory); + return stubDoFnWithContextContext.output; + } + + private static class UpperCaseDoFn extends DoFn<String, String> { + + @Override + public void processElement(ProcessContext c) throws Exception { + c.output(c.element().toUpperCase()); + } + } + + private static class UpperCaseDoFnWithContext extends DoFnWithContext<String, String> { + + @ProcessElement + public void processElement(ProcessContext c) throws Exception { + c.output(c.element().toUpperCase()); + } + } + + private static class StubDoFnProcessContext extends DoFn<String, String>.ProcessContext { + + private final String element; + private String output; + + public StubDoFnProcessContext(DoFn<String, String> fn, String element) { + fn.super(); + this.element = element; + } + + @Override + public String element() { + return element; + } + + @Override + public <T> T sideInput(PCollectionView<T> view) { + return null; + } + + @Override + public Instant timestamp() { + return null; + } + + @Override + public BoundedWindow window() { + return null; + } + + @Override + public PaneInfo pane() { + return null; + } + + @Override + public WindowingInternals<String, String> windowingInternals() { + return null; + } + + @Override + public PipelineOptions getPipelineOptions() { + return null; + } + + @Override + public void output(String output) { + this.output = output; + } + + @Override + public void outputWithTimestamp(String output, Instant timestamp) { + output(output); + } + + @Override + public <T> void sideOutput(TupleTag<T> tag, T output) { + } + + @Override + public <T> void sideOutputWithTimestamp(TupleTag<T> tag, T output, Instant timestamp) { + } + + @Override + protected <AggInputT, AggOutputT> Aggregator<AggInputT, AggOutputT> + createAggregatorInternal(String name, CombineFn<AggInputT, ?, AggOutputT> combiner) { + return null; + } + } + + private static class StubDoFnWithContextProcessContext + extends DoFnWithContext<String, String>.ProcessContext { + private final String element; + private String output; + + public StubDoFnWithContextProcessContext(DoFnWithContext<String, String> fn, String element) { + fn.super(); + this.element = element; + } + + @Override + public String element() { + return element; + } + + @Override + public <T> T sideInput(PCollectionView<T> view) { + return null; + } + + @Override + public Instant timestamp() { + return null; + } + + @Override + public PaneInfo pane() { + return null; + } + + @Override + public PipelineOptions getPipelineOptions() { + return null; + } + + @Override + public void output(String output) { + this.output = output; + } + + @Override + public void outputWithTimestamp(String output, Instant timestamp) { + output(output); + } + + @Override + public <T> void sideOutput(TupleTag<T> tag, T output) { + } + + @Override + public <T> void sideOutputWithTimestamp(TupleTag<T> tag, T output, Instant timestamp) { + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/fa8bf328/sdks/java/pom.xml ---------------------------------------------------------------------- diff --git a/sdks/java/pom.xml b/sdks/java/pom.xml index 55aea6a..0350804 100644 --- a/sdks/java/pom.xml +++ b/sdks/java/pom.xml @@ -41,6 +41,7 @@ a released artifact exists, we need to modify the build order. <module>maven-archetypes</module> --> <module>extensions</module> + <module>microbenchmarks</module> </modules> <profiles>
