scwhittle commented on code in PR #25300:
URL: https://github.com/apache/beam/pull/25300#discussion_r1096024741


##########
sdks/java/core/src/main/java/org/apache/beam/sdk/util/UnboundedScheduledExecutorService.java:
##########
@@ -201,9 +201,26 @@ public UnboundedScheduledExecutorService() {
         new ThreadPoolExecutor(
             0,
             Integer.MAX_VALUE, // Allow an unlimited number of re-usable 
threads.
-            Long.MAX_VALUE,
-            TimeUnit.NANOSECONDS, // Keep non-core threads alive forever.
-            new SynchronousQueue<>(),
+            // Put a high-timeout on non-core threads. This reduces memory for 
per-thread caches
+            // over time.
+            1,
+            TimeUnit.HOURS,
+            new SynchronousQueue<Runnable>() {
+              @Override
+              public boolean offer(Runnable r) {
+                try {
+                  // By blocking for a little we hope to delay thread creation 
if there are existing

Review Comment:
   Ack



##########
sdks/java/core/src/test/java/org/apache/beam/sdk/util/UnboundedScheduledExecutorServiceTest.java:
##########
@@ -502,4 +509,45 @@ void wakeUpAndCheckTasks(UnboundedScheduledExecutorService 
executorService) thro
     }
     Thread.sleep(100);
   }
+
+  @Test
+  public void testThreadsAreAddedOnlyAsNeededWithContention() throws Exception 
{
+    UnboundedScheduledExecutorService executorService = new 
UnboundedScheduledExecutorService();
+    CountDownLatch done = new CountDownLatch(1);
+
+    ThreadPoolExecutor executor =
+        new ThreadPoolExecutor(100, 100, Long.MAX_VALUE, MILLISECONDS, new 
SynchronousQueue<>());
+    // Schedule 1000 threads that are going to be scheduling work non-stop but 
sequentially.
+    for (int i = 0; i < 100; ++i) {
+      executor.execute(
+          () -> {
+            // Periodically check if done.
+            while (done.getCount() == 1) {
+              for (int j = 0; j < 100; ++j) {
+                try {
+                  executorService.submit(() -> {
+                    try {
+                      Thread.sleep(1);
+                    } catch (InterruptedException e) {
+                      throw new RuntimeException(e);
+                    }
+                  }).get();
+                } catch (InterruptedException | ExecutionException e) {
+                  // Ignore, happens on executor shutdown.
+                }
+              }
+            }
+          });
+    }
+
+    Thread.sleep(20 * 1000);
+    done.countDown();
+    executor.shutdown();
+    executor.awaitTermination(1, MINUTES);
+
+    int largestPool = executorService.threadPoolExecutor.getLargestPoolSize();
+    LOG.info("Created {} threads to execute at most 100 parallel tasks", 
largestPool);
+    assert(largestPool <= 100);

Review Comment:
   Done



##########
sdks/java/core/src/test/java/org/apache/beam/sdk/util/UnboundedScheduledExecutorServiceTest.java:
##########
@@ -502,4 +509,45 @@ void wakeUpAndCheckTasks(UnboundedScheduledExecutorService 
executorService) thro
     }
     Thread.sleep(100);
   }
+
+  @Test
+  public void testThreadsAreAddedOnlyAsNeededWithContention() throws Exception 
{
+    UnboundedScheduledExecutorService executorService = new 
UnboundedScheduledExecutorService();
+    CountDownLatch done = new CountDownLatch(1);
+
+    ThreadPoolExecutor executor =
+        new ThreadPoolExecutor(100, 100, Long.MAX_VALUE, MILLISECONDS, new 
SynchronousQueue<>());
+    // Schedule 1000 threads that are going to be scheduling work non-stop but 
sequentially.
+    for (int i = 0; i < 100; ++i) {
+      executor.execute(
+          () -> {
+            // Periodically check if done.
+            while (done.getCount() == 1) {
+              for (int j = 0; j < 100; ++j) {
+                try {
+                  executorService.submit(() -> {
+                    try {
+                      Thread.sleep(1);
+                    } catch (InterruptedException e) {
+                      throw new RuntimeException(e);
+                    }
+                  }).get();
+                } catch (InterruptedException | ExecutionException e) {
+                  // Ignore, happens on executor shutdown.
+                }
+              }
+            }
+          });
+    }
+
+    Thread.sleep(20 * 1000);

Review Comment:
   Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to