ibzib commented on a change in pull request #16542:
URL: https://github.com/apache/beam/pull/16542#discussion_r787208021



##########
File path: 
buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy
##########
@@ -1320,7 +1314,34 @@ class BeamModulePlugin implements Plugin<Project> {
           // Enable connecting a debugger by disabling forking (uncomment 
below)
           // Useful for debugging via an IDE such as Intellij
           // args '-f0'
+          // Specify -Pbenchmark=ProcessBundleBenchmark.testTinyBundle on the 
command
+          // line to enable running a single benchmark.
+
+          // Enable Google Cloud Profiler and upload the benchmarks GCP.
+          if (project.hasProperty("benchmark")) {
+            args project.getProperty("benchmark")
+            // Add JVM arguments allowing one to additionally use Google's 
Java Profiler
+            // Agent: (see 
https://cloud.google.com/profiler/docs/profiling-java#installing-profiler for 
instructions on how to install)
+            if (project.file("/opt/cprof/profiler_java_agent.so").exists()) {
+              def gcpProject = project.findProperty('gcpProject') ?: 
'apache-beam-testing'
+              def userName = 
System.getProperty("user.name").toLowerCase().replaceAll(" ", "_")
+              //jvmArgs 
'-agentpath:/opt/cprof/profiler_java_agent.so=-cprof_service=' + userName + "_" 
+ project.getProperty("benchmark").toLowerCase() + '_' + 
System.currentTimeMillis() + ',-cprof_project_id=' + gcpProject + 
',-cprof_zone_name=us-central1-a'

Review comment:
       Why is this commented out?

##########
File path: 
sdks/java/harness/jmh/src/main/java/org/apache/beam/fn/harness/jmh/ProcessBundleBenchmark.java
##########
@@ -283,4 +300,207 @@ public void testLargeBundle(TrivialTransform 
trivialTransform) throws Exception
     }
     assertEquals(3_000, outputValuesCount.getAndSet(0));
   }
+
+  @State(Scope.Benchmark)
+  public static class StatefulTransform extends SdkHarness {
+    final BundleProcessor processor;
+    final ExecutableProcessBundleDescriptor descriptor;
+    final StateRequestHandler nonCachingStateRequestHandler;
+    final StateRequestHandler cachingStateRequestHandler;
+
+    @SuppressWarnings({
+      "unused" // TODO(BEAM-13271): Remove when new version of errorprone is 
released (2.11.0)
+    })
+    private static class StatefulOutputZeroOneTwo
+        extends DoFn<KV<String, String>, KV<String, String>> {
+      private static final String STATE_ID = "bagState";
+
+      @StateId(STATE_ID)
+      private final StateSpec<BagState<String>> bagStateSpec = 
StateSpecs.bag(StringUtf8Coder.of());
+
+      @ProcessElement
+      public void process(ProcessContext ctxt, @StateId(STATE_ID) 
BagState<String> state) {
+        int size = Iterables.size(state.read());
+        if (size == 3) {
+          for (String value : state.read()) {
+            ctxt.output(KV.of(ctxt.element().getKey(), value));
+          }
+          state.clear();
+        } else {
+          state.add(ctxt.element().getValue());
+        }
+      }
+    }
+
+    private static class ToKeyAndValueDoFn extends DoFn<byte[], KV<String, 
String>> {
+      @ProcessElement
+      public void process(ProcessContext ctxt) {
+        ctxt.output(KV.of("key", "value"));
+      }
+    }
+
+    public StatefulTransform() {
+      try {
+        Pipeline p = Pipeline.create();
+        p.apply("impulse", Impulse.create())
+            .apply("toKeyAndValue", ParDo.of(new ToKeyAndValueDoFn()))
+            .apply("stateful", ParDo.of(new StatefulOutputZeroOneTwo()))
+            // Force the output to be materialized
+            .apply("gbk", GroupByKey.create());
+
+        RunnerApi.Pipeline pipelineProto = PipelineTranslation.toProto(p);
+        FusedPipeline fused = GreedyPipelineFuser.fuse(pipelineProto);
+        checkState(fused.getFusedStages().size() == 2, "Expected exactly two 
fused stages");
+        ExecutableStage stage = null;
+        for (ExecutableStage value : fused.getFusedStages()) {
+          if (!value.getUserStates().isEmpty()) {
+            stage = value;
+            break;
+          }
+        }
+        if (stage == null) {
+          throw new IllegalStateException("Stage with stateful DoFn not 
found.");
+        }
+
+        this.descriptor =
+            ProcessBundleDescriptors.fromExecutableStage(
+                "my_stage",
+                stage,
+                dataServer.getApiServiceDescriptor(),
+                stateServer.getApiServiceDescriptor());
+
+        this.processor =
+            controlClient.getProcessor(
+                descriptor.getProcessBundleDescriptor(),
+                descriptor.getRemoteInputDestinations(),
+                stateDelegator);
+        this.nonCachingStateRequestHandler = new InMemoryBagUserStateHandler();
+
+        List<CacheToken> cacheTokens = new ArrayList<>();
+        cacheTokens.add(
+            CacheToken.newBuilder()
+                .setUserState(CacheToken.UserState.newBuilder())
+                .setToken(ByteString.copyFromUtf8("cacheMe"))
+                .build());
+        this.cachingStateRequestHandler =
+            new InMemoryBagUserStateHandler() {
+              @Override
+              public Iterable<CacheToken> getCacheTokens() {
+                return cacheTokens;
+              }
+            };
+      } catch (Exception e) {
+        throw new RuntimeException(e);
+      }
+    }
+  }
+
+  private static class InMemoryBagUserStateHandler implements 
StateRequestHandler {
+    private final Map<ByteString, ByteString> bagState = new 
ConcurrentHashMap<>();
+
+    @Override
+    public CompletionStage<StateResponse.Builder> handle(StateRequest request) 
throws Exception {
+      if (!request.getStateKey().hasBagUserState()) {
+        throw new IllegalStateException(
+            "Unknown state key type " + request.getStateKey().getTypeCase());
+      }
+      StateResponse.Builder response = StateResponse.newBuilder();
+      StateKey.BagUserState stateKey = request.getStateKey().getBagUserState();
+      switch (request.getRequestCase()) {
+        case APPEND:
+          {
+            ByteString data =
+                bagState.computeIfAbsent(stateKey.getKey(), (unused) -> 
ByteString.EMPTY);
+            bagState.put(stateKey.getKey(), 
data.concat(request.getAppend().getData()));
+            response.getAppendBuilder().build();
+            break;
+          }
+        case CLEAR:
+          {
+            bagState.remove(stateKey.getKey());
+            response.getClearBuilder().build();
+            break;
+          }
+        case GET:
+          {
+            ByteString data =
+                bagState.computeIfAbsent(stateKey.getKey(), (unused) -> 
ByteString.EMPTY);
+            response.getGetBuilder().setData(data).build();
+            break;
+          }
+        default:
+          throw new IllegalStateException("Unknown request type " + 
request.getRequestCase());
+      }
+      return CompletableFuture.completedFuture(response);
+    }
+  }
+
+  @Benchmark
+  @Threads(16) // Use several threads since we expect contention during bundle 
processing.
+  public void testStateWithoutCaching(StatefulTransform statefulTransform) 
throws Exception {
+    testState(statefulTransform, 
statefulTransform.nonCachingStateRequestHandler);
+  }
+
+  @Benchmark
+  @Threads(16) // Use several threads since we expect contention during bundle 
processing.
+  public void testStateWithCaching(StatefulTransform statefulTransform) throws 
Exception {
+    testState(statefulTransform, statefulTransform.cachingStateRequestHandler);
+  }
+
+  private static void testState(
+      StatefulTransform statefulTransform, StateRequestHandler 
stateRequestHandler)
+      throws Exception {
+    Map<String, ? super Coder<WindowedValue<?>>> remoteOutputCoders =
+        statefulTransform.descriptor.getRemoteOutputCoders();
+    Map<String, RemoteOutputReceiver<?>> outputReceivers = new HashMap<>();
+    AtomicInteger outputValuesCount = new AtomicInteger();
+    for (Entry<String, ? super Coder<WindowedValue<?>>> remoteOutputCoder :
+        remoteOutputCoders.entrySet()) {
+      outputReceivers.put(
+          remoteOutputCoder.getKey(),
+          RemoteOutputReceiver.of(
+              (Coder) remoteOutputCoder.getValue(),
+              (FnDataReceiver<? super WindowedValue<?>>)
+                  (WindowedValue<?> value) -> 
outputValuesCount.incrementAndGet()));
+    }
+    String key = 
padLeftZeros(Long.toHexString(Thread.currentThread().getId()), 16);
+    try (RemoteBundle bundle =
+        statefulTransform.processor.newBundle(
+            outputReceivers, stateRequestHandler, 
BundleProgressHandler.ignored())) {
+      Iterables.getOnlyElement(bundle.getInputReceivers().values())
+          .accept(valueInGlobalWindow(KV.of(key, "zero")));
+    }
+    try (RemoteBundle bundle =
+        statefulTransform.processor.newBundle(
+            outputReceivers, stateRequestHandler, 
BundleProgressHandler.ignored())) {
+      Iterables.getOnlyElement(bundle.getInputReceivers().values())
+          .accept(valueInGlobalWindow(KV.of(key, "one")));
+    }
+    try (RemoteBundle bundle =
+        statefulTransform.processor.newBundle(
+            outputReceivers, stateRequestHandler, 
BundleProgressHandler.ignored())) {
+      Iterables.getOnlyElement(bundle.getInputReceivers().values())
+          .accept(valueInGlobalWindow(KV.of(key, "two")));
+    }
+    try (RemoteBundle bundle =
+        statefulTransform.processor.newBundle(
+            outputReceivers, stateRequestHandler, 
BundleProgressHandler.ignored())) {
+      Iterables.getOnlyElement(bundle.getInputReceivers().values())
+          .accept(valueInGlobalWindow(KV.of(key, "flush")));
+    }
+    assertEquals(3, outputValuesCount.getAndSet(0));
+  }
+
+  private static String padLeftZeros(String inputString, int length) {

Review comment:
       Can we use a library to do this? e.g. 
https://guava.dev/releases/19.0/api/docs/com/google/common/base/Strings.html#padStart(java.lang.String,%20int,%20char)

##########
File path: 
sdks/java/harness/jmh/src/main/java/org/apache/beam/fn/harness/jmh/ProcessBundleBenchmark.java
##########
@@ -283,4 +300,207 @@ public void testLargeBundle(TrivialTransform 
trivialTransform) throws Exception
     }
     assertEquals(3_000, outputValuesCount.getAndSet(0));
   }
+
+  @State(Scope.Benchmark)
+  public static class StatefulTransform extends SdkHarness {
+    final BundleProcessor processor;
+    final ExecutableProcessBundleDescriptor descriptor;
+    final StateRequestHandler nonCachingStateRequestHandler;
+    final StateRequestHandler cachingStateRequestHandler;
+
+    @SuppressWarnings({
+      "unused" // TODO(BEAM-13271): Remove when new version of errorprone is 
released (2.11.0)
+    })
+    private static class StatefulOutputZeroOneTwo
+        extends DoFn<KV<String, String>, KV<String, String>> {
+      private static final String STATE_ID = "bagState";
+
+      @StateId(STATE_ID)
+      private final StateSpec<BagState<String>> bagStateSpec = 
StateSpecs.bag(StringUtf8Coder.of());
+
+      @ProcessElement
+      public void process(ProcessContext ctxt, @StateId(STATE_ID) 
BagState<String> state) {
+        int size = Iterables.size(state.read());
+        if (size == 3) {
+          for (String value : state.read()) {
+            ctxt.output(KV.of(ctxt.element().getKey(), value));
+          }
+          state.clear();
+        } else {
+          state.add(ctxt.element().getValue());
+        }
+      }
+    }
+
+    private static class ToKeyAndValueDoFn extends DoFn<byte[], KV<String, 
String>> {
+      @ProcessElement
+      public void process(ProcessContext ctxt) {
+        ctxt.output(KV.of("key", "value"));
+      }
+    }
+
+    public StatefulTransform() {
+      try {
+        Pipeline p = Pipeline.create();
+        p.apply("impulse", Impulse.create())
+            .apply("toKeyAndValue", ParDo.of(new ToKeyAndValueDoFn()))
+            .apply("stateful", ParDo.of(new StatefulOutputZeroOneTwo()))
+            // Force the output to be materialized
+            .apply("gbk", GroupByKey.create());
+
+        RunnerApi.Pipeline pipelineProto = PipelineTranslation.toProto(p);
+        FusedPipeline fused = GreedyPipelineFuser.fuse(pipelineProto);
+        checkState(fused.getFusedStages().size() == 2, "Expected exactly two 
fused stages");

Review comment:
       What are the two expected stages? 1. impulse+toKeyAndValue 2. 
stateful+gbk?
   
   My understanding is that `toKeyAndValue` is just a dummy transform that is 
added only for pipeline construction, and then its stage is not actually 
executed. Is that correct? And can we add a comment to clarify it?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to