This is an automated email from the ASF dual-hosted git repository.

lcwik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 780fc17  [BEAM-12640] Enable adding JHM to Java based projects and add 
an example benchmark to the SDK harness. (#15198)
780fc17 is described below

commit 780fc179f21fafa15439ee5f30bf3122252162b6
Author: Lukasz Cwik <[email protected]>
AuthorDate: Fri Jul 23 06:50:08 2021 -0400

    [BEAM-12640] Enable adding JHM to Java based projects and add an example 
benchmark to the SDK harness. (#15198)
---
 .../org/apache/beam/gradle/BeamModulePlugin.groovy |  54 +++++-
 .../src/main/resources/beam/spotbugs-filter.xml    |   3 +
 sdks/java/harness/build.gradle                     |   3 +
 .../logging/BeamFnLoggingClientBenchmark.java      | 199 +++++++++++++++++++++
 .../beam/fn/harness/logging/package-info.java      |  20 +++
 .../fn/harness/logging/BeamFnLoggingClient.java    |   5 +-
 6 files changed, 280 insertions(+), 4 deletions(-)

diff --git 
a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy 
b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy
index a078fe9..53a2dcc 100644
--- a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy
+++ b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy
@@ -130,6 +130,17 @@ class BeamModulePlugin implements Plugin<Project> {
     boolean validateShadowJar = true
 
     /**
+     * Controls whether the 'jmh' source set is enabled for JMH benchmarks.
+     *
+     * Add additional dependencies to the jmhCompile and jmhRuntime dependency
+     * sets.
+     *
+     * Note that the JMH annotation processor is enabled by default and that
+     * a 'jmh' task is created which executes JMH.
+     */
+    boolean enableJmh = false
+
+    /**
      * The set of excludes that should be used during validation of the shadow 
jar. Projects should override
      * the default with the most specific set of excludes that is valid for 
the contents of its shaded jar.
      *
@@ -462,6 +473,7 @@ class BeamModulePlugin implements Plugin<Project> {
     def spotbugs_version = "4.0.6"
     def testcontainers_version = "1.15.1"
     def arrow_version = "4.0.0"
+    def jmh_version = "1.32"
 
     // A map of maps containing common libraries used per language. To use:
     // dependencies {
@@ -628,7 +640,7 @@ class BeamModulePlugin implements Plugin<Project> {
         proto_google_cloud_firestore_v1             : 
"com.google.api.grpc:proto-google-cloud-firestore-v1", // 
google_cloud_platform_libraries_bom sets version
         proto_google_cloud_pubsub_v1                : 
"com.google.api.grpc:proto-google-cloud-pubsub-v1", // 
google_cloud_platform_libraries_bom sets version
         proto_google_cloud_pubsublite_v1            : 
"com.google.api.grpc:proto-google-cloud-pubsublite-v1:$google_cloud_pubsublite_version",
-        proto_google_cloud_spanner_v1: 
"com.google.api.grpc:proto-google-cloud-spanner-v1", // 
google_cloud_platform_libraries_bom sets version
+        proto_google_cloud_spanner_v1               : 
"com.google.api.grpc:proto-google-cloud-spanner-v1", // 
google_cloud_platform_libraries_bom sets version
         proto_google_cloud_spanner_admin_database_v1: 
"com.google.api.grpc:proto-google-cloud-spanner-admin-database-v1", // 
google_cloud_platform_libraries_bom sets version
         proto_google_common_protos                  : 
"com.google.api.grpc:proto-google-common-protos", // 
google_cloud_platform_libraries_bom sets version
         slf4j_api                                   : 
"org.slf4j:slf4j-api:$slf4j_version",
@@ -848,6 +860,7 @@ class BeamModulePlugin implements Plugin<Project> {
       List<String> skipDefRegexes = []
       skipDefRegexes << "AutoValue_.*"
       skipDefRegexes << "AutoOneOf_.*"
+      skipDefRegexes << ".*\\.jmh_generated\\..*"
       skipDefRegexes += configuration.generatedClassPatterns
       skipDefRegexes += configuration.classesTriggerCheckerBugs.keySet()
       String skipDefCombinedRegex = skipDefRegexes.collect({ regex -> 
"(${regex})"}).join("|")
@@ -1240,6 +1253,45 @@ class BeamModulePlugin implements Plugin<Project> {
         project.artifacts.testRuntime project.testJar
       }
 
+      if (configuration.enableJmh) {
+        // We specifically use a separate source set for JMH to ensure that it 
does not
+        // become a required artifact
+        project.sourceSets {
+          jmh {
+            java {
+              srcDir "src/jmh/java"
+            }
+            resources {
+              srcDir "src/jmh/resources"
+            }
+          }
+        }
+
+        project.dependencies {
+          jmhAnnotationProcessor 
"org.openjdk.jmh:jmh-generator-annprocess:$jmh_version"
+          jmhCompile "org.openjdk.jmh:jmh-core:$jmh_version"
+        }
+
+        project.task("jmh", type: JavaExec, dependsOn: project.jmhClasses, {
+          main = "org.openjdk.jmh.Main"
+          classpath = project.sourceSets.jmh.compileClasspath + 
project.sourceSets.jmh.runtimeClasspath
+          // For a list of arguments, see
+          // https://github.com/guozheng/jmh-tutorial/blob/master/README.md
+          //
+          // Filter for a specific benchmark to run (uncomment below)
+          // Note that multiple regex are supported each as a separate 
argument.
+          // args 
'BeamFnLoggingClientBenchmark.testLoggingWithAllOptionalParameters'
+          // args 'additional regexp...'
+          //
+          // Enumerate available benchmarks and exit (uncomment below)
+          // args '-l'
+          //
+          // Enable connecting a debugger by disabling forking (uncomment 
below)
+          // Useful for debugging via an IDE such as Intellij
+          // args '-f0'
+        })
+      }
+
       project.ext.includeInJavaBom = configuration.publish
       project.ext.exportJavadoc = configuration.exportJavadoc
 
diff --git a/sdks/java/build-tools/src/main/resources/beam/spotbugs-filter.xml 
b/sdks/java/build-tools/src/main/resources/beam/spotbugs-filter.xml
index 71f2172..b28371c 100644
--- a/sdks/java/build-tools/src/main/resources/beam/spotbugs-filter.xml
+++ b/sdks/java/build-tools/src/main/resources/beam/spotbugs-filter.xml
@@ -64,6 +64,9 @@
     <Class name="~.*AutoValue_.*"/>
   </Match>
   <Match>
+    <Package name="~.*jmh_generated.*"/>
+  </Match>
+  <Match>
     <Package name="org.apache.beam.sdk.extensions.sql.impl.parser.impl"/>
   </Match>
   <Match>
diff --git a/sdks/java/harness/build.gradle b/sdks/java/harness/build.gradle
index 50ea8c1..3c859ae 100644
--- a/sdks/java/harness/build.gradle
+++ b/sdks/java/harness/build.gradle
@@ -33,6 +33,7 @@ applyJavaNature(
   ],
   automaticModuleName: 'org.apache.beam.fn.harness',
   validateShadowJar: false,
+  enableJmh: true,
   testShadowJar: true,
   shadowClosure:
   // Create an uber jar without repackaging for the SDK harness
@@ -72,4 +73,6 @@ dependencies {
   testCompile project(path: ":sdks:java:core", configuration: "shadowTest")
   testCompile project(":runners:core-construction-java")
   shadowTestRuntimeClasspath library.java.slf4j_jdk14
+  jmhCompile project(path: ":sdks:java:harness", configuration: "shadowTest")
+  jmhRuntime library.java.slf4j_jdk14
 }
diff --git 
a/sdks/java/harness/src/jmh/java/org/apache/beam/fn/harness/logging/BeamFnLoggingClientBenchmark.java
 
b/sdks/java/harness/src/jmh/java/org/apache/beam/fn/harness/logging/BeamFnLoggingClientBenchmark.java
new file mode 100644
index 0000000..9e009c3
--- /dev/null
+++ 
b/sdks/java/harness/src/jmh/java/org/apache/beam/fn/harness/logging/BeamFnLoggingClientBenchmark.java
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.fn.harness.logging;
+
+import java.io.Closeable;
+import java.util.HashMap;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi;
+import org.apache.beam.model.fnexecution.v1.BeamFnLoggingGrpc;
+import org.apache.beam.model.pipeline.v1.Endpoints.ApiServiceDescriptor;
+import org.apache.beam.runners.core.metrics.ExecutionStateTracker;
+import org.apache.beam.runners.core.metrics.MonitoringInfoConstants;
+import org.apache.beam.runners.core.metrics.SimpleExecutionState;
+import org.apache.beam.sdk.fn.channel.ManagedChannelFactory;
+import org.apache.beam.sdk.fn.test.InProcessManagedChannelFactory;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.vendor.grpc.v1p36p0.io.grpc.Server;
+import 
org.apache.beam.vendor.grpc.v1p36p0.io.grpc.inprocess.InProcessServerBuilder;
+import org.apache.beam.vendor.grpc.v1p36p0.io.grpc.stub.StreamObserver;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.Level;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.TearDown;
+import org.openjdk.jmh.annotations.Threads;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** Benchmarks for {@link BeamFnLoggingClient}. */
+public class BeamFnLoggingClientBenchmark {
+  private static final Logger LOG = 
LoggerFactory.getLogger(BeamFnLoggingClientBenchmark.class);
+
+  /** A logging service which counts the number of calls it received. */
+  public static class CallCountLoggingService extends 
BeamFnLoggingGrpc.BeamFnLoggingImplBase {
+    private AtomicInteger callCount = new AtomicInteger();
+
+    @Override
+    public StreamObserver<BeamFnApi.LogEntry.List> logging(
+        StreamObserver<BeamFnApi.LogControl> outboundObserver) {
+      return new StreamObserver<BeamFnApi.LogEntry.List>() {
+
+        @Override
+        public void onNext(BeamFnApi.LogEntry.List list) {
+          callCount.incrementAndGet();
+        }
+
+        @Override
+        public void onError(Throwable throwable) {
+          outboundObserver.onError(throwable);
+        }
+
+        @Override
+        public void onCompleted() {
+          outboundObserver.onCompleted();
+        }
+      };
+    }
+  }
+
+  /** Setup a simple logging service and configure the {@link 
BeamFnLoggingClient}. */
+  @State(Scope.Benchmark)
+  public static class ManageLoggingClientAndService {
+    public final BeamFnLoggingClient loggingClient;
+    public final CallCountLoggingService loggingService;
+    public final Server server;
+
+    public ManageLoggingClientAndService() {
+      try {
+        ApiServiceDescriptor apiServiceDescriptor =
+            ApiServiceDescriptor.newBuilder()
+                .setUrl(BeamFnLoggingClientBenchmark.class.getName() + "#" + 
UUID.randomUUID())
+                .build();
+        ManagedChannelFactory managedChannelFactory = 
InProcessManagedChannelFactory.create();
+        loggingService = new CallCountLoggingService();
+        server =
+            InProcessServerBuilder.forName(apiServiceDescriptor.getUrl())
+                .addService(loggingService)
+                .build();
+        server.start();
+        loggingClient =
+            new BeamFnLoggingClient(
+                PipelineOptionsFactory.create(),
+                apiServiceDescriptor,
+                managedChannelFactory::forDescriptor);
+      } catch (Exception e) {
+        throw new RuntimeException(e);
+      }
+    }
+
+    @TearDown(Level.Trial)
+    public void tearDown() throws Exception {
+      loggingClient.close();
+      server.shutdown();
+      if (server.awaitTermination(30, TimeUnit.SECONDS)) {
+        server.shutdownNow();
+      }
+    }
+  }
+
+  /**
+   * A {@link ManageLoggingClientAndService} which validates that more than 
zero calls made it to
+   * the service.
+   */
+  @State(Scope.Benchmark)
+  public static class ManyExpectedCallsLoggingClientAndService
+      extends ManageLoggingClientAndService {
+    @Override
+    @TearDown
+    public void tearDown() throws Exception {
+      super.tearDown();
+      if (loggingService.callCount.get() <= 0) {
+        throw new IllegalStateException(
+            "Server expected greater then zero calls. Benchmark 
misconfigured?");
+      }
+    }
+  }
+
+  /**
+   * A {@link ManageLoggingClientAndService} which validates that exactly zero 
calls made it to the
+   * service.
+   */
+  @State(Scope.Benchmark)
+  public static class ZeroExpectedCallsLoggingClientAndService
+      extends ManageLoggingClientAndService {
+    @Override
+    @TearDown
+    public void tearDown() throws Exception {
+      super.tearDown();
+      if (loggingService.callCount.get() != 0) {
+        throw new IllegalStateException("Server expected zero calls. Benchmark 
misconfigured?");
+      }
+    }
+  }
+
+  /** Sets up the {@link ExecutionStateTracker} and an execution state. */
+  @State(Scope.Benchmark)
+  public static class ManageExecutionState {
+    private final ExecutionStateTracker executionStateTracker;
+    private final SimpleExecutionState simpleExecutionState;
+
+    public ManageExecutionState() {
+      executionStateTracker = ExecutionStateTracker.newForTest();
+      HashMap<String, String> labelsMetadata = new HashMap<>();
+      labelsMetadata.put(MonitoringInfoConstants.Labels.PTRANSFORM, 
"ptransformId");
+      simpleExecutionState =
+          new SimpleExecutionState(
+              ExecutionStateTracker.PROCESS_STATE_NAME,
+              MonitoringInfoConstants.Urns.PROCESS_BUNDLE_MSECS,
+              labelsMetadata);
+    }
+
+    @TearDown
+    public void tearDown() throws Exception {
+      executionStateTracker.reset();
+    }
+  }
+
+  @Benchmark
+  @Threads(16) // Use several threads since we expect contention during logging
+  public void testLogging(ManyExpectedCallsLoggingClientAndService client) {
+    LOG.warn("log me");
+  }
+
+  @Benchmark
+  @Threads(16) // Use several threads since we expect contention during logging
+  public void testLoggingWithAllOptionalParameters(
+      ManyExpectedCallsLoggingClientAndService client, ManageExecutionState 
executionState)
+      throws Exception {
+    BeamFnLoggingMDC.setInstructionId("instruction id");
+    try (Closeable state =
+        
executionState.executionStateTracker.enterState(executionState.simpleExecutionState))
 {
+      LOG.warn("log me");
+    }
+    BeamFnLoggingMDC.setInstructionId(null);
+  }
+
+  @Benchmark
+  @Threads(16) // Use several threads since we expect contention during logging
+  public void testSkippedLogging(ZeroExpectedCallsLoggingClientAndService 
client) {
+    LOG.trace("no log");
+  }
+}
diff --git 
a/sdks/java/harness/src/jmh/java/org/apache/beam/fn/harness/logging/package-info.java
 
b/sdks/java/harness/src/jmh/java/org/apache/beam/fn/harness/logging/package-info.java
new file mode 100644
index 0000000..304238e
--- /dev/null
+++ 
b/sdks/java/harness/src/jmh/java/org/apache/beam/fn/harness/logging/package-info.java
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/** Benchmarks for logging. */
+package org.apache.beam.fn.harness.logging;
diff --git 
a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/logging/BeamFnLoggingClient.java
 
b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/logging/BeamFnLoggingClient.java
index 6cd9116..fe42f1a 100644
--- 
a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/logging/BeamFnLoggingClient.java
+++ 
b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/logging/BeamFnLoggingClient.java
@@ -57,7 +57,6 @@ import 
org.apache.beam.vendor.grpc.v1p36p0.io.grpc.stub.CallStreamObserver;
 import 
org.apache.beam.vendor.grpc.v1p36p0.io.grpc.stub.ClientCallStreamObserver;
 import org.apache.beam.vendor.grpc.v1p36p0.io.grpc.stub.ClientResponseObserver;
 import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.MoreObjects;
-import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Strings;
 import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
 import org.checkerframework.checker.nullness.qual.Nullable;
 
@@ -230,8 +229,8 @@ public class BeamFnLoggingClient implements AutoCloseable {
         String transformId =
             ((SimpleExecutionState) state)
                 .getLabels()
-                .getOrDefault(MonitoringInfoConstants.Labels.PTRANSFORM, "");
-        if (!Strings.isNullOrEmpty(transformId)) {
+                .get(MonitoringInfoConstants.Labels.PTRANSFORM);
+        if (transformId != null) {
           builder.setTransformId(transformId);
         }
       }

Reply via email to