edman124 commented on code in PR #29041:
URL: https://github.com/apache/beam/pull/29041#discussion_r1365063519


##########
runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java:
##########
@@ -2866,6 +2867,217 @@ public void testActiveThreadMetric() throws Exception {
     executor.shutdown();
   }
 
+  @Test
+  public void testActiveThreadMetric() throws Exception {
+    int maxThreads = 5;
+    int threadExpirationSec = 60;
+    CountDownLatch processStart1 = new CountDownLatch(2);
+    CountDownLatch processStart2 = new CountDownLatch(3);
+    CountDownLatch processStart3 = new CountDownLatch(4);
+    AtomicBoolean stop = new AtomicBoolean(false);
+    // setting up actual implementation of executor instead of mocking to keep 
track of
+    // active thread count.
+    BoundedQueueExecutor executor =
+        new BoundedQueueExecutor(
+            maxThreads,
+            threadExpirationSec,
+            TimeUnit.SECONDS,
+            maxThreads,
+            10000000,
+            new ThreadFactoryBuilder()
+                .setNameFormat("DataflowWorkUnits-%d")
+                .setDaemon(true)
+                .build());
+
+    ComputationState computationState =
+        new ComputationState(
+            "computation",
+            
defaultMapTask(Arrays.asList(makeSourceInstruction(StringUtf8Coder.of()))),
+            executor,
+            ImmutableMap.of(),
+            null);
+
+    ShardedKey key1Shard1 = ShardedKey.create(ByteString.copyFromUtf8("key1"), 
1);
+
+    Consumer<Work> sleepProcessWorkFn =
+        unused -> {
+          processStart1.countDown();
+          processStart2.countDown();
+          processStart3.countDown();
+          int count = 0;
+          while (!stop.get()) {
+            count += 1;
+          }
+        };
+
+    Work m2 = createMockWork(2, sleepProcessWorkFn);
+
+    Work m3 = createMockWork(3, sleepProcessWorkFn);
+
+    Work m4 = createMockWork(4, sleepProcessWorkFn);
+    assertEquals(0, executor.activeCount());
+
+    assertTrue(computationState.activateWork(key1Shard1, m2));
+    // activate work starts executing work if no other work is queued for that 
shard
+    executor.execute(m2, m2.getWorkItem().getSerializedSize());
+    processStart1.await();
+    assertEquals(2, executor.activeCount());
+
+    assertTrue(computationState.activateWork(key1Shard1, m3));
+    assertTrue(computationState.activateWork(key1Shard1, m4));
+    executor.execute(m3, m3.getWorkItem().getSerializedSize());
+    processStart2.await();
+
+    assertEquals(3, executor.activeCount());
+    executor.execute(m4, m4.getWorkItem().getSerializedSize());
+    processStart3.await();
+    assertEquals(4, executor.activeCount());
+    stop.set(true);
+    executor.shutdown();
+  }
+
+  @Test
+  public void testOutstandingBytesMetric() throws Exception {
+    int maxThreads = 5;
+    int threadExpirationSec = 60;
+    CountDownLatch processStart1 = new CountDownLatch(2);
+    CountDownLatch processStart2 = new CountDownLatch(3);
+    CountDownLatch processStart3 = new CountDownLatch(4);
+    AtomicBoolean stop = new AtomicBoolean(false);
+    // setting up actual implementation of executor instead of mocking to keep 
track of
+    // active thread count.
+    BoundedQueueExecutor executor =
+        new BoundedQueueExecutor(
+            maxThreads,
+            threadExpirationSec,
+            TimeUnit.SECONDS,
+            maxThreads,
+            10000000,

Review Comment:
   Fixed.



##########
runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java:
##########
@@ -2866,6 +2867,217 @@ public void testActiveThreadMetric() throws Exception {
     executor.shutdown();
   }
 
+  @Test
+  public void testActiveThreadMetric() throws Exception {
+    int maxThreads = 5;
+    int threadExpirationSec = 60;
+    CountDownLatch processStart1 = new CountDownLatch(2);
+    CountDownLatch processStart2 = new CountDownLatch(3);
+    CountDownLatch processStart3 = new CountDownLatch(4);
+    AtomicBoolean stop = new AtomicBoolean(false);
+    // setting up actual implementation of executor instead of mocking to keep 
track of
+    // active thread count.
+    BoundedQueueExecutor executor =
+        new BoundedQueueExecutor(
+            maxThreads,
+            threadExpirationSec,
+            TimeUnit.SECONDS,
+            maxThreads,
+            10000000,

Review Comment:
   Fixed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to