lukecwik commented on code in PR #22190:
URL: https://github.com/apache/beam/pull/22190#discussion_r920517855
##########
sdks/java/harness/jmh/src/main/java/org/apache/beam/fn/harness/jmh/control/ExecutionStateSamplerBenchmark.java:
##########
@@ -138,33 +142,37 @@ public void
testTinyBundleHarnessStateSampler(HarnessStateSampler state) throws
}
@Benchmark
- @Threads(1)
- public void testLargeBundleRunnersCoreStateSampler(RunnersCoreStateSampler
state)
+ @Threads(10)
+ public void testLargeBundleRunnersCoreStateSampler(RunnersCoreStateSampler
state, Blackhole bh)
throws Exception {
- state.tracker.activate();
+ ExecutionStateTracker tracker = new ExecutionStateTracker(state.sampler);
+ Closeable c = tracker.activate();
for (int i = 0; i < 1000; ) {
- Closeable close1 = state.tracker.enterState(state.state1);
- Closeable close2 = state.tracker.enterState(state.state2);
- Closeable close3 = state.tracker.enterState(state.state3);
+ Closeable close1 = tracker.enterState(state.state1);
+ Closeable close2 = tracker.enterState(state.state2);
+ Closeable close3 = tracker.enterState(state.state3);
// trival code that is being sampled for this state
i += 1;
+ bh.consume(i);
close3.close();
close2.close();
close1.close();
}
- state.tracker.reset();
+ c.close();
}
@Benchmark
- @Threads(1)
- public void testLargeBundleHarnessStateSampler(HarnessStateSampler state)
throws Exception {
+ @Threads(10)
+ public void testLargeBundleHarnessStateSampler(HarnessStateSampler state,
Blackhole bh)
+ throws Exception {
state.tracker.start("processBundleId");
Review Comment:
We don't want to have this tracker be shared across threads.
##########
sdks/java/harness/jmh/src/main/java/org/apache/beam/fn/harness/jmh/control/ExecutionStateSamplerBenchmark.java:
##########
@@ -103,33 +103,37 @@ public void tearDown() {
}
@Benchmark
- @Threads(1)
- public void testTinyBundleRunnersCoreStateSampler(RunnersCoreStateSampler
state)
+ @Threads(10)
+ public void testTinyBundleRunnersCoreStateSampler(RunnersCoreStateSampler
state, Blackhole bh)
throws Exception {
- state.tracker.activate();
+ ExecutionStateTracker tracker = new ExecutionStateTracker(state.sampler);
Review Comment:
We should create a JMH `@State` called `RunnersCoreStateTracker` at thread
scope. Then in the setup method take in a parameter of the
RunnersCoreStateSampler.
You can see an example of what I mean here:
https://github.com/openjdk/jmh/blob/1f2befef92c3eb1466124f37d00f496b4105d3c5/jmh-samples/src/main/java/org/openjdk/jmh/samples/JMHSample_29_StatesDAG.java#L134
This will allow us to move the initialization outside of the benchmark into
state.
Ditto on creating `HarnessStateTracker` as well.
##########
runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/ExecutionStateTracker.java:
##########
@@ -298,7 +304,17 @@ public long getNextLullReportMs() {
return nextLullReportMs;
}
- protected void takeSample(long millisSinceLastSample) {
+ void takeSample(long millisSinceLastSample) {
+ if (SAMPLING_UPDATER.compareAndSet(this, 0, 1)) {
Review Comment:
Thanks, I missed that.
##########
sdks/java/harness/jmh/src/main/java/org/apache/beam/fn/harness/jmh/control/ExecutionStateSamplerBenchmark.java:
##########
@@ -103,33 +104,38 @@ public void tearDown() {
}
@Benchmark
- @Threads(1)
- public void testTinyBundleRunnersCoreStateSampler(RunnersCoreStateSampler
state)
+ @Threads(10)
Review Comment:
Did we mean to have 512 threads for the tiny bundles and 16 for the large
bundles or are you still meaning to revert this?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]