scwhittle commented on code in PR #30439:
URL: https://github.com/apache/beam/pull/30439#discussion_r1530536071


##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/BoundedQueueExecutor.java:
##########
@@ -59,8 +72,8 @@ public BoundedQueueExecutor(
           @Override
           protected void beforeExecute(Thread t, Runnable r) {
             super.beforeExecute(t, r);
-            synchronized (this) {
-              if (activeCount.getAndIncrement() >= maximumPoolSize - 1) {
+            synchronized (BoundedQueueExecutor.this) {
+              if (activeCount++ >= maximumThreadCount - 1 && 
startTimeMaxActiveThreadsUsed == 0) {

Review Comment:
   nit: how about
   ++activeCount >= maximumThreadCount
   
   seems easier to read



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/BoundedQueueExecutor.java:
##########
@@ -69,8 +82,8 @@ protected void beforeExecute(Thread t, Runnable r) {
           @Override
           protected void afterExecute(Runnable r, Throwable t) {
             super.afterExecute(r, t);
-            synchronized (this) {
-              if (activeCount.getAndDecrement() == maximumPoolSize) {
+            synchronized (BoundedQueueExecutor.this) {
+              if (activeCount-- <= maximumThreadCount && 
startTimeMaxActiveThreadsUsed > 0) {

Review Comment:
   ditto,
   --activeCount < maximumThreadCount 
   seems simpler



##########
runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/util/BoundedQueueExecutorTest.java:
##########
@@ -0,0 +1,232 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.worker.util;
+
+import static org.hamcrest.Matchers.greaterThan;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.Timeout;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Unit tests for {@link 
org.apache.beam.runners.dataflow.worker.util.BoundedQueueExecutor}. */
+@RunWith(JUnit4.class)
+// TODO(https://github.com/apache/beam/issues/21230): Remove when new version 
of errorprone is
+// released (2.11.0)
+@SuppressWarnings("unused")
+public class BoundedQueueExecutorTest {
+  @Rule public transient Timeout globalTimeout = Timeout.seconds(300);
+  private static final long MAXIMUM_BYTES_OUTSTANDING = 10000000;
+  private static final int DEFAULT_MAX_THREADS = 2;
+  private static final int DEFAULT_THREAD_EXPIRATION_SEC = 60;
+
+  private BoundedQueueExecutor executor;
+
+  private Runnable createSleepProcessWorkFn(CountDownLatch start, 
CountDownLatch stop) {
+    Runnable runnable =
+        () -> {
+          start.countDown();
+          try {
+            stop.await();
+          } catch (Exception e) {
+            throw new RuntimeException(e);
+          }
+        };
+    return runnable;
+  }
+
+  @Before
+  public void setUp() {
+    this.executor =
+        new BoundedQueueExecutor(
+            DEFAULT_MAX_THREADS,
+            DEFAULT_THREAD_EXPIRATION_SEC,
+            TimeUnit.SECONDS,
+            DEFAULT_MAX_THREADS + 100,
+            MAXIMUM_BYTES_OUTSTANDING,
+            new ThreadFactoryBuilder()
+                .setNameFormat("DataflowWorkUnits-%d")
+                .setDaemon(true)
+                .build());
+  }
+
+  @Test
+  public void testScheduleWork() throws Exception {
+    CountDownLatch processStart1 = new CountDownLatch(1);
+    CountDownLatch processStop1 = new CountDownLatch(1);
+    CountDownLatch processStart2 = new CountDownLatch(1);
+    CountDownLatch processStop2 = new CountDownLatch(1);
+    CountDownLatch processStart3 = new CountDownLatch(1);
+    CountDownLatch processStop3 = new CountDownLatch(1);
+    Runnable m1 = createSleepProcessWorkFn(processStart1, processStop1);
+    Runnable m2 = createSleepProcessWorkFn(processStart2, processStop2);
+    Runnable m3 = createSleepProcessWorkFn(processStart3, processStop3);
+
+    executor.execute(m1, 1);
+    assertTrue(processStart1.await(1000, TimeUnit.MILLISECONDS));
+    executor.execute(m2, 1);
+    assertTrue(processStart2.await(1000, TimeUnit.MILLISECONDS));
+    // m1 and m2 have started and all threads are occupied so m3 will be 
queued and not executed.
+    executor.execute(m3, 1);
+    assertFalse(processStart3.await(1000, TimeUnit.MILLISECONDS));
+
+    // Stop m1 so there is an available thread for m3 to run.
+    processStop1.countDown();
+    assertTrue(processStart3.await(1000, TimeUnit.MILLISECONDS));
+    // m3 started.
+    processStop2.countDown();
+    processStop3.countDown();
+    executor.shutdown();
+  }
+
+  @Test
+  public void testOverrideMaximumThreadCount() throws Exception {
+    CountDownLatch processStart1 = new CountDownLatch(1);
+    CountDownLatch processStart2 = new CountDownLatch(1);
+    CountDownLatch processStart3 = new CountDownLatch(1);
+    CountDownLatch stop = new CountDownLatch(1);
+    Runnable m1 = createSleepProcessWorkFn(processStart1, stop);
+    Runnable m2 = createSleepProcessWorkFn(processStart2, stop);
+    Runnable m3 = createSleepProcessWorkFn(processStart3, stop);
+
+    // Initial state.
+    assertEquals(0, executor.activeCount());
+    assertEquals(2, executor.maximumThreadCount());
+
+    // m1 and m2 are accepted.
+    executor.execute(m1, 1);
+    processStart1.await();
+    assertEquals(1, executor.activeCount());
+    executor.execute(m2, 1);
+    processStart2.await();
+    assertEquals(2, executor.activeCount());
+
+    // Max pool size was reached so no new work is accepted.
+    executor.execute(m3, 1);
+    assertFalse(processStart3.await(1000, TimeUnit.MILLISECONDS));
+
+    // Increase the max thread count

Review Comment:
   could you also verify that increasing the bytes would let something in 
similarly?



##########
runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/util/BoundedQueueExecutorTest.java:
##########
@@ -0,0 +1,232 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.worker.util;
+
+import static org.hamcrest.Matchers.greaterThan;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.Timeout;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Unit tests for {@link 
org.apache.beam.runners.dataflow.worker.util.BoundedQueueExecutor}. */
+@RunWith(JUnit4.class)
+// TODO(https://github.com/apache/beam/issues/21230): Remove when new version 
of errorprone is
+// released (2.11.0)
+@SuppressWarnings("unused")
+public class BoundedQueueExecutorTest {
+  @Rule public transient Timeout globalTimeout = Timeout.seconds(300);
+  private static final long MAXIMUM_BYTES_OUTSTANDING = 10000000;
+  private static final int DEFAULT_MAX_THREADS = 2;
+  private static final int DEFAULT_THREAD_EXPIRATION_SEC = 60;
+
+  private BoundedQueueExecutor executor;
+
+  private Runnable createSleepProcessWorkFn(CountDownLatch start, 
CountDownLatch stop) {
+    Runnable runnable =
+        () -> {
+          start.countDown();
+          try {
+            stop.await();
+          } catch (Exception e) {
+            throw new RuntimeException(e);
+          }
+        };
+    return runnable;
+  }
+
+  @Before
+  public void setUp() {
+    this.executor =
+        new BoundedQueueExecutor(
+            DEFAULT_MAX_THREADS,
+            DEFAULT_THREAD_EXPIRATION_SEC,
+            TimeUnit.SECONDS,
+            DEFAULT_MAX_THREADS + 100,
+            MAXIMUM_BYTES_OUTSTANDING,
+            new ThreadFactoryBuilder()
+                .setNameFormat("DataflowWorkUnits-%d")
+                .setDaemon(true)
+                .build());
+  }
+
+  @Test
+  public void testScheduleWork() throws Exception {
+    CountDownLatch processStart1 = new CountDownLatch(1);
+    CountDownLatch processStop1 = new CountDownLatch(1);
+    CountDownLatch processStart2 = new CountDownLatch(1);
+    CountDownLatch processStop2 = new CountDownLatch(1);
+    CountDownLatch processStart3 = new CountDownLatch(1);
+    CountDownLatch processStop3 = new CountDownLatch(1);
+    Runnable m1 = createSleepProcessWorkFn(processStart1, processStop1);
+    Runnable m2 = createSleepProcessWorkFn(processStart2, processStop2);
+    Runnable m3 = createSleepProcessWorkFn(processStart3, processStop3);
+
+    executor.execute(m1, 1);
+    assertTrue(processStart1.await(1000, TimeUnit.MILLISECONDS));
+    executor.execute(m2, 1);
+    assertTrue(processStart2.await(1000, TimeUnit.MILLISECONDS));
+    // m1 and m2 have started and all threads are occupied so m3 will be 
queued and not executed.
+    executor.execute(m3, 1);
+    assertFalse(processStart3.await(1000, TimeUnit.MILLISECONDS));
+
+    // Stop m1 so there is an available thread for m3 to run.
+    processStop1.countDown();
+    assertTrue(processStart3.await(1000, TimeUnit.MILLISECONDS));
+    // m3 started.
+    processStop2.countDown();
+    processStop3.countDown();
+    executor.shutdown();
+  }
+
+  @Test
+  public void testOverrideMaximumThreadCount() throws Exception {
+    CountDownLatch processStart1 = new CountDownLatch(1);
+    CountDownLatch processStart2 = new CountDownLatch(1);
+    CountDownLatch processStart3 = new CountDownLatch(1);
+    CountDownLatch stop = new CountDownLatch(1);
+    Runnable m1 = createSleepProcessWorkFn(processStart1, stop);
+    Runnable m2 = createSleepProcessWorkFn(processStart2, stop);
+    Runnable m3 = createSleepProcessWorkFn(processStart3, stop);
+
+    // Initial state.
+    assertEquals(0, executor.activeCount());
+    assertEquals(2, executor.maximumThreadCount());
+
+    // m1 and m2 are accepted.
+    executor.execute(m1, 1);
+    processStart1.await();
+    assertEquals(1, executor.activeCount());
+    executor.execute(m2, 1);
+    processStart2.await();
+    assertEquals(2, executor.activeCount());
+
+    // Max pool size was reached so no new work is accepted.
+    executor.execute(m3, 1);
+    assertFalse(processStart3.await(1000, TimeUnit.MILLISECONDS));
+
+    // Increase the max thread count
+    executor.setMaximumPoolSize(3, 103);
+    assertEquals(3, executor.maximumThreadCount());
+
+    // m3 is accepted
+    processStart3.await();
+    assertEquals(3, executor.activeCount());
+
+    stop.countDown();
+    executor.shutdown();
+  }
+
+  @Test
+  public void testRecordTotalTimeMaxActiveThreadsUsed() throws Exception {
+    CountDownLatch processStart1 = new CountDownLatch(1);
+    CountDownLatch processStart2 = new CountDownLatch(1);
+    CountDownLatch processStart3 = new CountDownLatch(1);
+    CountDownLatch stop = new CountDownLatch(1);
+    Runnable m1 = createSleepProcessWorkFn(processStart1, stop);
+    Runnable m2 = createSleepProcessWorkFn(processStart2, stop);
+    Runnable m3 = createSleepProcessWorkFn(processStart3, stop);
+
+    // Initial state.
+    assertEquals(0, executor.activeCount());
+    assertEquals(2, executor.maximumThreadCount());
+
+    // m1 and m2 are accepted.
+    executor.execute(m1, 1);
+    processStart1.await();
+    assertEquals(1, executor.activeCount());
+    executor.execute(m2, 1);
+    processStart2.await();
+    assertEquals(2, executor.activeCount());
+
+    // Max pool size was reached so no new work is accepted.
+    executor.execute(m3, 1);
+    assertFalse(processStart3.await(1000, TimeUnit.MILLISECONDS));
+
+    assertEquals(0l, executor.allThreadsActiveTime());
+    stop.countDown();
+    while (executor.activeCount() != 0) {
+      // Waiting for all threads to be ended.
+    }
+    // Max pool size was reached so the allThreadsActiveTime() was updated.
+    assertThat(executor.allThreadsActiveTime(), greaterThan(0l));
+
+    executor.shutdown();
+  }
+
+  @Test
+  public void 
testRecordTotalTimeMaxActiveThreadsUsedWhenMaximumThreadCountUpdated()
+      throws Exception {
+    CountDownLatch processStart1 = new CountDownLatch(1);
+    CountDownLatch processStart2 = new CountDownLatch(1);
+    CountDownLatch processStart3 = new CountDownLatch(1);
+    CountDownLatch stop = new CountDownLatch(1);
+    Runnable m1 = createSleepProcessWorkFn(processStart1, stop);
+    Runnable m2 = createSleepProcessWorkFn(processStart2, stop);
+    Runnable m3 = createSleepProcessWorkFn(processStart3, stop);
+
+    // Initial state.
+    assertEquals(0, executor.activeCount());
+    assertEquals(2, executor.maximumThreadCount());
+
+    // m1 and m2 are accepted.
+    executor.execute(m1, 1);
+    processStart1.await();
+    assertEquals(1, executor.activeCount());
+    executor.execute(m2, 1);
+    processStart2.await();
+    assertEquals(2, executor.activeCount());
+
+    // Max pool size was reached so no new work is accepted.
+    executor.execute(m3, 1);
+    assertFalse(processStart3.await(1000, TimeUnit.MILLISECONDS));
+
+    assertEquals(0l, executor.allThreadsActiveTime());

Review Comment:
   this made me realize we're only updating the count after the active work 
completes.  That could delay possibly requesting additional threads if there is 
long-running work on the available threads.  Maybe in a followup we might want 
to periodically increase this with some monitoring thread instead of relying on 
the finishing threads themselves to increment.
   
   



##########
runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/util/BoundedQueueExecutorTest.java:
##########
@@ -0,0 +1,232 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.worker.util;
+
+import static org.hamcrest.Matchers.greaterThan;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.Timeout;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Unit tests for {@link 
org.apache.beam.runners.dataflow.worker.util.BoundedQueueExecutor}. */
+@RunWith(JUnit4.class)
+// TODO(https://github.com/apache/beam/issues/21230): Remove when new version 
of errorprone is
+// released (2.11.0)
+@SuppressWarnings("unused")
+public class BoundedQueueExecutorTest {
+  @Rule public transient Timeout globalTimeout = Timeout.seconds(300);
+  private static final long MAXIMUM_BYTES_OUTSTANDING = 10000000;
+  private static final int DEFAULT_MAX_THREADS = 2;
+  private static final int DEFAULT_THREAD_EXPIRATION_SEC = 60;
+
+  private BoundedQueueExecutor executor;
+
+  private Runnable createSleepProcessWorkFn(CountDownLatch start, 
CountDownLatch stop) {
+    Runnable runnable =
+        () -> {
+          start.countDown();
+          try {
+            stop.await();
+          } catch (Exception e) {
+            throw new RuntimeException(e);
+          }
+        };
+    return runnable;
+  }
+
+  @Before
+  public void setUp() {
+    this.executor =
+        new BoundedQueueExecutor(
+            DEFAULT_MAX_THREADS,
+            DEFAULT_THREAD_EXPIRATION_SEC,
+            TimeUnit.SECONDS,
+            DEFAULT_MAX_THREADS + 100,
+            MAXIMUM_BYTES_OUTSTANDING,
+            new ThreadFactoryBuilder()
+                .setNameFormat("DataflowWorkUnits-%d")
+                .setDaemon(true)
+                .build());
+  }
+
+  @Test
+  public void testScheduleWork() throws Exception {
+    CountDownLatch processStart1 = new CountDownLatch(1);
+    CountDownLatch processStop1 = new CountDownLatch(1);
+    CountDownLatch processStart2 = new CountDownLatch(1);
+    CountDownLatch processStop2 = new CountDownLatch(1);
+    CountDownLatch processStart3 = new CountDownLatch(1);
+    CountDownLatch processStop3 = new CountDownLatch(1);
+    Runnable m1 = createSleepProcessWorkFn(processStart1, processStop1);
+    Runnable m2 = createSleepProcessWorkFn(processStart2, processStop2);
+    Runnable m3 = createSleepProcessWorkFn(processStart3, processStop3);
+
+    executor.execute(m1, 1);
+    assertTrue(processStart1.await(1000, TimeUnit.MILLISECONDS));
+    executor.execute(m2, 1);
+    assertTrue(processStart2.await(1000, TimeUnit.MILLISECONDS));
+    // m1 and m2 have started and all threads are occupied so m3 will be 
queued and not executed.
+    executor.execute(m3, 1);
+    assertFalse(processStart3.await(1000, TimeUnit.MILLISECONDS));
+
+    // Stop m1 so there is an available thread for m3 to run.
+    processStop1.countDown();
+    assertTrue(processStart3.await(1000, TimeUnit.MILLISECONDS));
+    // m3 started.
+    processStop2.countDown();
+    processStop3.countDown();
+    executor.shutdown();
+  }
+
+  @Test
+  public void testOverrideMaximumThreadCount() throws Exception {
+    CountDownLatch processStart1 = new CountDownLatch(1);
+    CountDownLatch processStart2 = new CountDownLatch(1);
+    CountDownLatch processStart3 = new CountDownLatch(1);
+    CountDownLatch stop = new CountDownLatch(1);
+    Runnable m1 = createSleepProcessWorkFn(processStart1, stop);
+    Runnable m2 = createSleepProcessWorkFn(processStart2, stop);
+    Runnable m3 = createSleepProcessWorkFn(processStart3, stop);
+
+    // Initial state.
+    assertEquals(0, executor.activeCount());
+    assertEquals(2, executor.maximumThreadCount());
+
+    // m1 and m2 are accepted.
+    executor.execute(m1, 1);
+    processStart1.await();
+    assertEquals(1, executor.activeCount());
+    executor.execute(m2, 1);
+    processStart2.await();
+    assertEquals(2, executor.activeCount());
+
+    // Max pool size was reached so no new work is accepted.
+    executor.execute(m3, 1);
+    assertFalse(processStart3.await(1000, TimeUnit.MILLISECONDS));
+
+    // Increase the max thread count
+    executor.setMaximumPoolSize(3, 103);
+    assertEquals(3, executor.maximumThreadCount());
+
+    // m3 is accepted
+    processStart3.await();
+    assertEquals(3, executor.activeCount());
+
+    stop.countDown();
+    executor.shutdown();
+  }
+
+  @Test
+  public void testRecordTotalTimeMaxActiveThreadsUsed() throws Exception {
+    CountDownLatch processStart1 = new CountDownLatch(1);
+    CountDownLatch processStart2 = new CountDownLatch(1);
+    CountDownLatch processStart3 = new CountDownLatch(1);
+    CountDownLatch stop = new CountDownLatch(1);
+    Runnable m1 = createSleepProcessWorkFn(processStart1, stop);
+    Runnable m2 = createSleepProcessWorkFn(processStart2, stop);
+    Runnable m3 = createSleepProcessWorkFn(processStart3, stop);
+
+    // Initial state.
+    assertEquals(0, executor.activeCount());
+    assertEquals(2, executor.maximumThreadCount());
+
+    // m1 and m2 are accepted.
+    executor.execute(m1, 1);
+    processStart1.await();
+    assertEquals(1, executor.activeCount());
+    executor.execute(m2, 1);
+    processStart2.await();
+    assertEquals(2, executor.activeCount());
+
+    // Max pool size was reached so no new work is accepted.
+    executor.execute(m3, 1);
+    assertFalse(processStart3.await(1000, TimeUnit.MILLISECONDS));
+
+    assertEquals(0l, executor.allThreadsActiveTime());
+    stop.countDown();
+    while (executor.activeCount() != 0) {
+      // Waiting for all threads to be ended.

Review Comment:
   sleep a little here while polling to not burn cpu and slow down completion.



##########
runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/util/BoundedQueueExecutorTest.java:
##########
@@ -0,0 +1,232 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.worker.util;
+
+import static org.hamcrest.Matchers.greaterThan;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.Timeout;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Unit tests for {@link 
org.apache.beam.runners.dataflow.worker.util.BoundedQueueExecutor}. */
+@RunWith(JUnit4.class)
+// TODO(https://github.com/apache/beam/issues/21230): Remove when new version 
of errorprone is
+// released (2.11.0)
+@SuppressWarnings("unused")
+public class BoundedQueueExecutorTest {
+  @Rule public transient Timeout globalTimeout = Timeout.seconds(300);
+  private static final long MAXIMUM_BYTES_OUTSTANDING = 10000000;
+  private static final int DEFAULT_MAX_THREADS = 2;
+  private static final int DEFAULT_THREAD_EXPIRATION_SEC = 60;
+
+  private BoundedQueueExecutor executor;
+
+  private Runnable createSleepProcessWorkFn(CountDownLatch start, 
CountDownLatch stop) {
+    Runnable runnable =
+        () -> {
+          start.countDown();
+          try {
+            stop.await();
+          } catch (Exception e) {
+            throw new RuntimeException(e);
+          }
+        };
+    return runnable;
+  }
+
+  @Before
+  public void setUp() {
+    this.executor =
+        new BoundedQueueExecutor(
+            DEFAULT_MAX_THREADS,
+            DEFAULT_THREAD_EXPIRATION_SEC,
+            TimeUnit.SECONDS,
+            DEFAULT_MAX_THREADS + 100,
+            MAXIMUM_BYTES_OUTSTANDING,
+            new ThreadFactoryBuilder()
+                .setNameFormat("DataflowWorkUnits-%d")
+                .setDaemon(true)
+                .build());
+  }
+
+  @Test
+  public void testScheduleWork() throws Exception {
+    CountDownLatch processStart1 = new CountDownLatch(1);
+    CountDownLatch processStop1 = new CountDownLatch(1);
+    CountDownLatch processStart2 = new CountDownLatch(1);
+    CountDownLatch processStop2 = new CountDownLatch(1);
+    CountDownLatch processStart3 = new CountDownLatch(1);
+    CountDownLatch processStop3 = new CountDownLatch(1);
+    Runnable m1 = createSleepProcessWorkFn(processStart1, processStop1);
+    Runnable m2 = createSleepProcessWorkFn(processStart2, processStop2);
+    Runnable m3 = createSleepProcessWorkFn(processStart3, processStop3);
+
+    executor.execute(m1, 1);
+    assertTrue(processStart1.await(1000, TimeUnit.MILLISECONDS));
+    executor.execute(m2, 1);
+    assertTrue(processStart2.await(1000, TimeUnit.MILLISECONDS));
+    // m1 and m2 have started and all threads are occupied so m3 will be 
queued and not executed.
+    executor.execute(m3, 1);
+    assertFalse(processStart3.await(1000, TimeUnit.MILLISECONDS));
+
+    // Stop m1 so there is an available thread for m3 to run.
+    processStop1.countDown();
+    assertTrue(processStart3.await(1000, TimeUnit.MILLISECONDS));
+    // m3 started.
+    processStop2.countDown();
+    processStop3.countDown();
+    executor.shutdown();
+  }
+
+  @Test
+  public void testOverrideMaximumThreadCount() throws Exception {
+    CountDownLatch processStart1 = new CountDownLatch(1);
+    CountDownLatch processStart2 = new CountDownLatch(1);
+    CountDownLatch processStart3 = new CountDownLatch(1);
+    CountDownLatch stop = new CountDownLatch(1);
+    Runnable m1 = createSleepProcessWorkFn(processStart1, stop);
+    Runnable m2 = createSleepProcessWorkFn(processStart2, stop);
+    Runnable m3 = createSleepProcessWorkFn(processStart3, stop);
+
+    // Initial state.
+    assertEquals(0, executor.activeCount());
+    assertEquals(2, executor.maximumThreadCount());
+
+    // m1 and m2 are accepted.
+    executor.execute(m1, 1);
+    processStart1.await();
+    assertEquals(1, executor.activeCount());
+    executor.execute(m2, 1);
+    processStart2.await();
+    assertEquals(2, executor.activeCount());
+
+    // Max pool size was reached so no new work is accepted.
+    executor.execute(m3, 1);
+    assertFalse(processStart3.await(1000, TimeUnit.MILLISECONDS));
+
+    // Increase the max thread count
+    executor.setMaximumPoolSize(3, 103);
+    assertEquals(3, executor.maximumThreadCount());
+
+    // m3 is accepted
+    processStart3.await();
+    assertEquals(3, executor.activeCount());
+
+    stop.countDown();
+    executor.shutdown();
+  }
+
+  @Test
+  public void testRecordTotalTimeMaxActiveThreadsUsed() throws Exception {
+    CountDownLatch processStart1 = new CountDownLatch(1);
+    CountDownLatch processStart2 = new CountDownLatch(1);
+    CountDownLatch processStart3 = new CountDownLatch(1);
+    CountDownLatch stop = new CountDownLatch(1);
+    Runnable m1 = createSleepProcessWorkFn(processStart1, stop);
+    Runnable m2 = createSleepProcessWorkFn(processStart2, stop);
+    Runnable m3 = createSleepProcessWorkFn(processStart3, stop);
+
+    // Initial state.
+    assertEquals(0, executor.activeCount());
+    assertEquals(2, executor.maximumThreadCount());
+
+    // m1 and m2 are accepted.
+    executor.execute(m1, 1);
+    processStart1.await();
+    assertEquals(1, executor.activeCount());
+    executor.execute(m2, 1);
+    processStart2.await();
+    assertEquals(2, executor.activeCount());
+
+    // Max pool size was reached so no new work is accepted.
+    executor.execute(m3, 1);
+    assertFalse(processStart3.await(1000, TimeUnit.MILLISECONDS));
+
+    assertEquals(0l, executor.allThreadsActiveTime());
+    stop.countDown();
+    while (executor.activeCount() != 0) {
+      // Waiting for all threads to be ended.
+    }
+    // Max pool size was reached so the allThreadsActiveTime() was updated.
+    assertThat(executor.allThreadsActiveTime(), greaterThan(0l));
+
+    executor.shutdown();
+  }
+
+  @Test
+  public void 
testRecordTotalTimeMaxActiveThreadsUsedWhenMaximumThreadCountUpdated()
+      throws Exception {
+    CountDownLatch processStart1 = new CountDownLatch(1);
+    CountDownLatch processStart2 = new CountDownLatch(1);
+    CountDownLatch processStart3 = new CountDownLatch(1);
+    CountDownLatch stop = new CountDownLatch(1);
+    Runnable m1 = createSleepProcessWorkFn(processStart1, stop);
+    Runnable m2 = createSleepProcessWorkFn(processStart2, stop);
+    Runnable m3 = createSleepProcessWorkFn(processStart3, stop);
+
+    // Initial state.
+    assertEquals(0, executor.activeCount());
+    assertEquals(2, executor.maximumThreadCount());
+
+    // m1 and m2 are accepted.
+    executor.execute(m1, 1);
+    processStart1.await();
+    assertEquals(1, executor.activeCount());
+    executor.execute(m2, 1);
+    processStart2.await();
+    assertEquals(2, executor.activeCount());
+
+    // Max pool size was reached so no new work is accepted.
+    executor.execute(m3, 1);
+    assertFalse(processStart3.await(1000, TimeUnit.MILLISECONDS));
+
+    assertEquals(0l, executor.allThreadsActiveTime());
+    // Increase the max thread count
+    executor.setMaximumPoolSize(5, 105);
+    stop.countDown();
+    while (executor.activeCount() != 0) {
+      // Waiting for all threads to be ended.

Review Comment:
   ditto



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/BoundedQueueExecutor.java:
##########
@@ -104,6 +117,21 @@ public void forceExecute(Runnable work, long workBytes) {
     executeLockHeld(work, workBytes);

Review Comment:
   can you rename executeLockHeld to executeMonitorHeld to clarify since we 
have both monitor and synchronized lock now?



##########
runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/util/BoundedQueueExecutorTest.java:
##########
@@ -0,0 +1,232 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.worker.util;
+
+import static org.hamcrest.Matchers.greaterThan;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.Timeout;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Unit tests for {@link 
org.apache.beam.runners.dataflow.worker.util.BoundedQueueExecutor}. */
+@RunWith(JUnit4.class)
+// TODO(https://github.com/apache/beam/issues/21230): Remove when new version 
of errorprone is
+// released (2.11.0)
+@SuppressWarnings("unused")
+public class BoundedQueueExecutorTest {
+  @Rule public transient Timeout globalTimeout = Timeout.seconds(300);
+  private static final long MAXIMUM_BYTES_OUTSTANDING = 10000000;
+  private static final int DEFAULT_MAX_THREADS = 2;
+  private static final int DEFAULT_THREAD_EXPIRATION_SEC = 60;
+
+  private BoundedQueueExecutor executor;
+
+  private Runnable createSleepProcessWorkFn(CountDownLatch start, 
CountDownLatch stop) {
+    Runnable runnable =
+        () -> {
+          start.countDown();
+          try {
+            stop.await();
+          } catch (Exception e) {
+            throw new RuntimeException(e);
+          }
+        };
+    return runnable;
+  }
+
+  @Before
+  public void setUp() {
+    this.executor =
+        new BoundedQueueExecutor(
+            DEFAULT_MAX_THREADS,
+            DEFAULT_THREAD_EXPIRATION_SEC,
+            TimeUnit.SECONDS,
+            DEFAULT_MAX_THREADS + 100,
+            MAXIMUM_BYTES_OUTSTANDING,
+            new ThreadFactoryBuilder()
+                .setNameFormat("DataflowWorkUnits-%d")
+                .setDaemon(true)
+                .build());
+  }
+
+  @Test
+  public void testScheduleWork() throws Exception {
+    CountDownLatch processStart1 = new CountDownLatch(1);
+    CountDownLatch processStop1 = new CountDownLatch(1);
+    CountDownLatch processStart2 = new CountDownLatch(1);
+    CountDownLatch processStop2 = new CountDownLatch(1);
+    CountDownLatch processStart3 = new CountDownLatch(1);
+    CountDownLatch processStop3 = new CountDownLatch(1);
+    Runnable m1 = createSleepProcessWorkFn(processStart1, processStop1);
+    Runnable m2 = createSleepProcessWorkFn(processStart2, processStop2);
+    Runnable m3 = createSleepProcessWorkFn(processStart3, processStop3);
+
+    executor.execute(m1, 1);
+    assertTrue(processStart1.await(1000, TimeUnit.MILLISECONDS));

Review Comment:
   for all of these awaits where we expect it to happen, await without a 
timeout to prevent possible test flakiness. test deadline can be sufficient



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to