m-trieu commented on code in PR #29041:
URL: https://github.com/apache/beam/pull/29041#discussion_r1362913969


##########
runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java:
##########
@@ -2866,6 +2867,217 @@ public void testActiveThreadMetric() throws Exception {
     executor.shutdown();
   }
 
+  @Test
+  public void testActiveThreadMetric() throws Exception {
+    int maxThreads = 5;
+    int threadExpirationSec = 60;
+    CountDownLatch processStart1 = new CountDownLatch(2);
+    CountDownLatch processStart2 = new CountDownLatch(3);
+    CountDownLatch processStart3 = new CountDownLatch(4);
+    AtomicBoolean stop = new AtomicBoolean(false);
+    // setting up actual implementation of executor instead of mocking to keep 
track of
+    // active thread count.
+    BoundedQueueExecutor executor =
+        new BoundedQueueExecutor(
+            maxThreads,
+            threadExpirationSec,
+            TimeUnit.SECONDS,
+            maxThreads,
+            10000000,
+            new ThreadFactoryBuilder()
+                .setNameFormat("DataflowWorkUnits-%d")
+                .setDaemon(true)
+                .build());
+
+    ComputationState computationState =
+        new ComputationState(
+            "computation",
+            
defaultMapTask(Arrays.asList(makeSourceInstruction(StringUtf8Coder.of()))),
+            executor,
+            ImmutableMap.of(),
+            null);
+
+    ShardedKey key1Shard1 = ShardedKey.create(ByteString.copyFromUtf8("key1"), 
1);
+
+    Consumer<Work> sleepProcessWorkFn =
+        unused -> {
+          processStart1.countDown();
+          processStart2.countDown();
+          processStart3.countDown();
+          int count = 0;
+          while (!stop.get()) {
+            count += 1;
+          }
+        };
+
+    Work m2 = createMockWork(2, sleepProcessWorkFn);
+
+    Work m3 = createMockWork(3, sleepProcessWorkFn);
+
+    Work m4 = createMockWork(4, sleepProcessWorkFn);
+    assertEquals(0, executor.activeCount());
+
+    assertTrue(computationState.activateWork(key1Shard1, m2));
+    // activate work starts executing work if no other work is queued for that 
shard
+    executor.execute(m2, m2.getWorkItem().getSerializedSize());
+    processStart1.await();
+    assertEquals(2, executor.activeCount());
+
+    assertTrue(computationState.activateWork(key1Shard1, m3));
+    assertTrue(computationState.activateWork(key1Shard1, m4));
+    executor.execute(m3, m3.getWorkItem().getSerializedSize());
+    processStart2.await();
+
+    assertEquals(3, executor.activeCount());
+    executor.execute(m4, m4.getWorkItem().getSerializedSize());
+    processStart3.await();
+    assertEquals(4, executor.activeCount());
+    stop.set(true);
+    executor.shutdown();
+  }
+
+  @Test
+  public void testOutstandingBytesMetric() throws Exception {
+    int maxThreads = 5;
+    int threadExpirationSec = 60;
+    CountDownLatch processStart1 = new CountDownLatch(2);
+    CountDownLatch processStart2 = new CountDownLatch(3);
+    CountDownLatch processStart3 = new CountDownLatch(4);
+    AtomicBoolean stop = new AtomicBoolean(false);
+    // setting up actual implementation of executor instead of mocking to keep 
track of
+    // active thread count.
+    BoundedQueueExecutor executor =
+        new BoundedQueueExecutor(
+            maxThreads,
+            threadExpirationSec,
+            TimeUnit.SECONDS,
+            maxThreads,
+            10000000,
+            new ThreadFactoryBuilder()
+                .setNameFormat("DataflowWorkUnits-%d")
+                .setDaemon(true)
+                .build());
+
+    ComputationState computationState =
+        new ComputationState(
+            "computation",
+            
defaultMapTask(Arrays.asList(makeSourceInstruction(StringUtf8Coder.of()))),
+            executor,
+            ImmutableMap.of(),
+            null);
+
+    ShardedKey key1Shard1 = ShardedKey.create(ByteString.copyFromUtf8("key1"), 
1);
+    Consumer<Work> sleepProcessWorkFn =
+        unused -> {
+          processStart1.countDown();
+          processStart2.countDown();
+          processStart3.countDown();
+          int count = 0;

Review Comment:
   whats the point of `count`?



##########
runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java:
##########
@@ -2866,6 +2867,217 @@ public void testActiveThreadMetric() throws Exception {
     executor.shutdown();
   }
 
+  @Test
+  public void testActiveThreadMetric() throws Exception {
+    int maxThreads = 5;
+    int threadExpirationSec = 60;
+    CountDownLatch processStart1 = new CountDownLatch(2);
+    CountDownLatch processStart2 = new CountDownLatch(3);
+    CountDownLatch processStart3 = new CountDownLatch(4);
+    AtomicBoolean stop = new AtomicBoolean(false);
+    // setting up actual implementation of executor instead of mocking to keep 
track of
+    // active thread count.
+    BoundedQueueExecutor executor =
+        new BoundedQueueExecutor(
+            maxThreads,
+            threadExpirationSec,
+            TimeUnit.SECONDS,
+            maxThreads,
+            10000000,

Review Comment:
   nit: 10000000
   name this something so its clear



##########
runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java:
##########
@@ -2866,6 +2867,217 @@ public void testActiveThreadMetric() throws Exception {
     executor.shutdown();
   }
 
+  @Test
+  public void testActiveThreadMetric() throws Exception {
+    int maxThreads = 5;
+    int threadExpirationSec = 60;
+    CountDownLatch processStart1 = new CountDownLatch(2);
+    CountDownLatch processStart2 = new CountDownLatch(3);
+    CountDownLatch processStart3 = new CountDownLatch(4);
+    AtomicBoolean stop = new AtomicBoolean(false);
+    // setting up actual implementation of executor instead of mocking to keep 
track of
+    // active thread count.
+    BoundedQueueExecutor executor =
+        new BoundedQueueExecutor(
+            maxThreads,
+            threadExpirationSec,
+            TimeUnit.SECONDS,
+            maxThreads,
+            10000000,
+            new ThreadFactoryBuilder()
+                .setNameFormat("DataflowWorkUnits-%d")
+                .setDaemon(true)
+                .build());
+
+    ComputationState computationState =
+        new ComputationState(
+            "computation",
+            
defaultMapTask(Arrays.asList(makeSourceInstruction(StringUtf8Coder.of()))),
+            executor,
+            ImmutableMap.of(),
+            null);
+
+    ShardedKey key1Shard1 = ShardedKey.create(ByteString.copyFromUtf8("key1"), 
1);
+
+    Consumer<Work> sleepProcessWorkFn =
+        unused -> {
+          processStart1.countDown();
+          processStart2.countDown();
+          processStart3.countDown();
+          int count = 0;
+          while (!stop.get()) {
+            count += 1;
+          }
+        };
+
+    Work m2 = createMockWork(2, sleepProcessWorkFn);
+
+    Work m3 = createMockWork(3, sleepProcessWorkFn);
+
+    Work m4 = createMockWork(4, sleepProcessWorkFn);
+    assertEquals(0, executor.activeCount());
+
+    assertTrue(computationState.activateWork(key1Shard1, m2));
+    // activate work starts executing work if no other work is queued for that 
shard
+    executor.execute(m2, m2.getWorkItem().getSerializedSize());
+    processStart1.await();
+    assertEquals(2, executor.activeCount());
+
+    assertTrue(computationState.activateWork(key1Shard1, m3));
+    assertTrue(computationState.activateWork(key1Shard1, m4));
+    executor.execute(m3, m3.getWorkItem().getSerializedSize());
+    processStart2.await();
+
+    assertEquals(3, executor.activeCount());
+    executor.execute(m4, m4.getWorkItem().getSerializedSize());
+    processStart3.await();
+    assertEquals(4, executor.activeCount());
+    stop.set(true);
+    executor.shutdown();
+  }
+
+  @Test
+  public void testOutstandingBytesMetric() throws Exception {
+    int maxThreads = 5;
+    int threadExpirationSec = 60;
+    CountDownLatch processStart1 = new CountDownLatch(2);
+    CountDownLatch processStart2 = new CountDownLatch(3);
+    CountDownLatch processStart3 = new CountDownLatch(4);
+    AtomicBoolean stop = new AtomicBoolean(false);
+    // setting up actual implementation of executor instead of mocking to keep 
track of
+    // active thread count.
+    BoundedQueueExecutor executor =
+        new BoundedQueueExecutor(
+            maxThreads,
+            threadExpirationSec,
+            TimeUnit.SECONDS,
+            maxThreads,
+            10000000,

Review Comment:
   nit: see above
   maybe make this a constant something like `private static final 
MAXIMUM_BYTES_OUTSTANDING = 10000000`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to