edman124 commented on code in PR #28513:
URL: https://github.com/apache/beam/pull/28513#discussion_r1365957340


##########
runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java:
##########
@@ -2793,6 +2793,79 @@ public void testMaxThreadMetric() throws Exception {
     executor.shutdown();
   }
 
+  volatile boolean stop = false;
+
+  @Test
+  public void testActiveThreadMetric() throws Exception {
+    int maxThreads = 5;
+    int threadExpirationSec = 60;
+    // setting up actual implementation of executor instead of mocking to keep 
track of
+    // active thread count.
+    BoundedQueueExecutor executor =
+        new BoundedQueueExecutor(
+            maxThreads,
+            threadExpirationSec,
+            TimeUnit.SECONDS,
+            maxThreads,
+            10000000,
+            new ThreadFactoryBuilder()
+                .setNameFormat("DataflowWorkUnits-%d")
+                .setDaemon(true)
+                .build());
+
+    ComputationState computationState =
+        new ComputationState(
+            "computation",
+            
defaultMapTask(Arrays.asList(makeSourceInstruction(StringUtf8Coder.of()))),
+            executor,
+            ImmutableMap.of(),
+            null);
+
+    ShardedKey key1Shard1 = ShardedKey.create(ByteString.copyFromUtf8("key1"), 
1);
+
+    Consumer<Work> sleepProcessWorkFn =
+        unused -> {
+          synchronized (this) {
+            this.notify();
+          }
+          int count = 0;
+          while (!stop) {
+            count += 1;
+          }
+        };
+
+    Work m2 = createMockWork(2, sleepProcessWorkFn);
+
+    Work m3 = createMockWork(3, sleepProcessWorkFn);
+
+    Work m4 = createMockWork(4, sleepProcessWorkFn);
+    assertEquals(0, executor.activeCount());
+
+    assertTrue(computationState.activateWork(key1Shard1, m2));
+    synchronized (this) {
+      executor.execute(m2, m2.getWorkItem().getSerializedSize());
+      this.wait();
+      // Seems current executor executes the initial work item twice
+      this.wait();

Review Comment:
   It could be the first thread started runs before the synchronized block 
starts causing a deadlock. I have a fix in 
https://github.com/apache/beam/pull/29041 that uses countdown latches.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to