scwhittle commented on code in PR #25300:
URL: https://github.com/apache/beam/pull/25300#discussion_r1096024741
##########
sdks/java/core/src/main/java/org/apache/beam/sdk/util/UnboundedScheduledExecutorService.java:
##########
@@ -201,9 +201,26 @@ public UnboundedScheduledExecutorService() {
new ThreadPoolExecutor(
0,
Integer.MAX_VALUE, // Allow an unlimited number of re-usable
threads.
- Long.MAX_VALUE,
- TimeUnit.NANOSECONDS, // Keep non-core threads alive forever.
- new SynchronousQueue<>(),
+ // Put a high-timeout on non-core threads. This reduces memory for
per-thread caches
+ // over time.
+ 1,
+ TimeUnit.HOURS,
+ new SynchronousQueue<Runnable>() {
+ @Override
+ public boolean offer(Runnable r) {
+ try {
+ // By blocking for a little we hope to delay thread creation
if there are existing
Review Comment:
Ack
##########
sdks/java/core/src/test/java/org/apache/beam/sdk/util/UnboundedScheduledExecutorServiceTest.java:
##########
@@ -502,4 +509,45 @@ void wakeUpAndCheckTasks(UnboundedScheduledExecutorService
executorService) thro
}
Thread.sleep(100);
}
+
+ @Test
+ public void testThreadsAreAddedOnlyAsNeededWithContention() throws Exception
{
+ UnboundedScheduledExecutorService executorService = new
UnboundedScheduledExecutorService();
+ CountDownLatch done = new CountDownLatch(1);
+
+ ThreadPoolExecutor executor =
+ new ThreadPoolExecutor(100, 100, Long.MAX_VALUE, MILLISECONDS, new
SynchronousQueue<>());
+ // Schedule 1000 threads that are going to be scheduling work non-stop but
sequentially.
+ for (int i = 0; i < 100; ++i) {
+ executor.execute(
+ () -> {
+ // Periodically check if done.
+ while (done.getCount() == 1) {
+ for (int j = 0; j < 100; ++j) {
+ try {
+ executorService.submit(() -> {
+ try {
+ Thread.sleep(1);
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ }).get();
+ } catch (InterruptedException | ExecutionException e) {
+ // Ignore, happens on executor shutdown.
+ }
+ }
+ }
+ });
+ }
+
+ Thread.sleep(20 * 1000);
+ done.countDown();
+ executor.shutdown();
+ executor.awaitTermination(1, MINUTES);
+
+ int largestPool = executorService.threadPoolExecutor.getLargestPoolSize();
+ LOG.info("Created {} threads to execute at most 100 parallel tasks",
largestPool);
+ assert(largestPool <= 100);
Review Comment:
Done
##########
sdks/java/core/src/test/java/org/apache/beam/sdk/util/UnboundedScheduledExecutorServiceTest.java:
##########
@@ -502,4 +509,45 @@ void wakeUpAndCheckTasks(UnboundedScheduledExecutorService
executorService) thro
}
Thread.sleep(100);
}
+
+ @Test
+ public void testThreadsAreAddedOnlyAsNeededWithContention() throws Exception
{
+ UnboundedScheduledExecutorService executorService = new
UnboundedScheduledExecutorService();
+ CountDownLatch done = new CountDownLatch(1);
+
+ ThreadPoolExecutor executor =
+ new ThreadPoolExecutor(100, 100, Long.MAX_VALUE, MILLISECONDS, new
SynchronousQueue<>());
+ // Schedule 1000 threads that are going to be scheduling work non-stop but
sequentially.
+ for (int i = 0; i < 100; ++i) {
+ executor.execute(
+ () -> {
+ // Periodically check if done.
+ while (done.getCount() == 1) {
+ for (int j = 0; j < 100; ++j) {
+ try {
+ executorService.submit(() -> {
+ try {
+ Thread.sleep(1);
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ }).get();
+ } catch (InterruptedException | ExecutionException e) {
+ // Ignore, happens on executor shutdown.
+ }
+ }
+ }
+ });
+ }
+
+ Thread.sleep(20 * 1000);
Review Comment:
Done
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]