MelodyShen commented on code in PR #30439:
URL: https://github.com/apache/beam/pull/30439#discussion_r1531166753


##########
runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/util/BoundedQueueExecutorTest.java:
##########
@@ -0,0 +1,232 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.worker.util;
+
+import static org.hamcrest.Matchers.greaterThan;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.Timeout;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Unit tests for {@link 
org.apache.beam.runners.dataflow.worker.util.BoundedQueueExecutor}. */
+@RunWith(JUnit4.class)
+// TODO(https://github.com/apache/beam/issues/21230): Remove when new version 
of errorprone is
+// released (2.11.0)
+@SuppressWarnings("unused")
+public class BoundedQueueExecutorTest {
+  @Rule public transient Timeout globalTimeout = Timeout.seconds(300);
+  private static final long MAXIMUM_BYTES_OUTSTANDING = 10000000;
+  private static final int DEFAULT_MAX_THREADS = 2;
+  private static final int DEFAULT_THREAD_EXPIRATION_SEC = 60;
+
+  private BoundedQueueExecutor executor;
+
+  private Runnable createSleepProcessWorkFn(CountDownLatch start, 
CountDownLatch stop) {
+    Runnable runnable =
+        () -> {
+          start.countDown();
+          try {
+            stop.await();
+          } catch (Exception e) {
+            throw new RuntimeException(e);
+          }
+        };
+    return runnable;
+  }
+
+  @Before
+  public void setUp() {
+    this.executor =
+        new BoundedQueueExecutor(
+            DEFAULT_MAX_THREADS,
+            DEFAULT_THREAD_EXPIRATION_SEC,
+            TimeUnit.SECONDS,
+            DEFAULT_MAX_THREADS + 100,
+            MAXIMUM_BYTES_OUTSTANDING,
+            new ThreadFactoryBuilder()
+                .setNameFormat("DataflowWorkUnits-%d")
+                .setDaemon(true)
+                .build());
+  }
+
+  @Test
+  public void testScheduleWork() throws Exception {
+    CountDownLatch processStart1 = new CountDownLatch(1);
+    CountDownLatch processStop1 = new CountDownLatch(1);
+    CountDownLatch processStart2 = new CountDownLatch(1);
+    CountDownLatch processStop2 = new CountDownLatch(1);
+    CountDownLatch processStart3 = new CountDownLatch(1);
+    CountDownLatch processStop3 = new CountDownLatch(1);
+    Runnable m1 = createSleepProcessWorkFn(processStart1, processStop1);
+    Runnable m2 = createSleepProcessWorkFn(processStart2, processStop2);
+    Runnable m3 = createSleepProcessWorkFn(processStart3, processStop3);
+
+    executor.execute(m1, 1);
+    assertTrue(processStart1.await(1000, TimeUnit.MILLISECONDS));
+    executor.execute(m2, 1);
+    assertTrue(processStart2.await(1000, TimeUnit.MILLISECONDS));
+    // m1 and m2 have started and all threads are occupied so m3 will be 
queued and not executed.
+    executor.execute(m3, 1);
+    assertFalse(processStart3.await(1000, TimeUnit.MILLISECONDS));
+
+    // Stop m1 so there is an available thread for m3 to run.
+    processStop1.countDown();
+    assertTrue(processStart3.await(1000, TimeUnit.MILLISECONDS));
+    // m3 started.
+    processStop2.countDown();
+    processStop3.countDown();
+    executor.shutdown();
+  }
+
+  @Test
+  public void testOverrideMaximumThreadCount() throws Exception {
+    CountDownLatch processStart1 = new CountDownLatch(1);
+    CountDownLatch processStart2 = new CountDownLatch(1);
+    CountDownLatch processStart3 = new CountDownLatch(1);
+    CountDownLatch stop = new CountDownLatch(1);
+    Runnable m1 = createSleepProcessWorkFn(processStart1, stop);
+    Runnable m2 = createSleepProcessWorkFn(processStart2, stop);
+    Runnable m3 = createSleepProcessWorkFn(processStart3, stop);
+
+    // Initial state.
+    assertEquals(0, executor.activeCount());
+    assertEquals(2, executor.maximumThreadCount());
+
+    // m1 and m2 are accepted.
+    executor.execute(m1, 1);
+    processStart1.await();
+    assertEquals(1, executor.activeCount());
+    executor.execute(m2, 1);
+    processStart2.await();
+    assertEquals(2, executor.activeCount());
+
+    // Max pool size was reached so no new work is accepted.
+    executor.execute(m3, 1);
+    assertFalse(processStart3.await(1000, TimeUnit.MILLISECONDS));
+
+    // Increase the max thread count

Review Comment:
   Currently the maximumBytesOutstanding is final and we don't change it at 
runtime. I added a test case for scheduling work when maximumBytesOutstanding 
is reached.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to