Abacn commented on code in PR #28513:
URL: https://github.com/apache/beam/pull/28513#discussion_r1366178028
##########
runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java:
##########
@@ -2793,6 +2793,79 @@ public void testMaxThreadMetric() throws Exception {
executor.shutdown();
}
+ volatile boolean stop = false;
+
+ @Test
+ public void testActiveThreadMetric() throws Exception {
+ int maxThreads = 5;
+ int threadExpirationSec = 60;
+ // setting up actual implementation of executor instead of mocking to keep
track of
+ // active thread count.
+ BoundedQueueExecutor executor =
+ new BoundedQueueExecutor(
+ maxThreads,
+ threadExpirationSec,
+ TimeUnit.SECONDS,
+ maxThreads,
+ 10000000,
+ new ThreadFactoryBuilder()
+ .setNameFormat("DataflowWorkUnits-%d")
+ .setDaemon(true)
+ .build());
+
+ ComputationState computationState =
+ new ComputationState(
+ "computation",
+
defaultMapTask(Arrays.asList(makeSourceInstruction(StringUtf8Coder.of()))),
+ executor,
+ ImmutableMap.of(),
+ null);
+
+ ShardedKey key1Shard1 = ShardedKey.create(ByteString.copyFromUtf8("key1"),
1);
+
+ Consumer<Work> sleepProcessWorkFn =
+ unused -> {
+ synchronized (this) {
+ this.notify();
+ }
+ int count = 0;
+ while (!stop) {
+ count += 1;
+ }
+ };
+
+ Work m2 = createMockWork(2, sleepProcessWorkFn);
+
+ Work m3 = createMockWork(3, sleepProcessWorkFn);
+
+ Work m4 = createMockWork(4, sleepProcessWorkFn);
+ assertEquals(0, executor.activeCount());
+
+ assertTrue(computationState.activateWork(key1Shard1, m2));
+ synchronized (this) {
+ executor.execute(m2, m2.getWorkItem().getSerializedSize());
+ this.wait();
+ // Seems current executor executes the initial work item twice
+ this.wait();
Review Comment:
Thanks for the fix. While #29041 is in progress, would you mind lgtm #29065
so the current stuck test won't affect the whole test suite so that it can
still finish?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]