This is an automated email from the ASF dual-hosted git repository.

gian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 73675d06712 clean up some thread pools in tests (#17421)
73675d06712 is described below

commit 73675d067128412b54877a9c26de71d467599b7b
Author: Clint Wylie <[email protected]>
AuthorDate: Mon Oct 28 09:05:15 2024 -0700

    clean up some thread pools in tests (#17421)
---
 .../apache/druid/common/guava/GuavaUtilsTest.java  |  77 +++---
 .../java/util/emitter/core/HttpEmitterTest.java    |  36 +--
 .../core/HttpPostEmitterLoggerStressTest.java      |  96 ++++----
 .../emitter/core/HttpPostEmitterStressTest.java    | 273 +++++++++++----------
 .../util/emitter/core/HttpPostEmitterTest.java     |  32 +--
 .../util/metrics/BasicMonitorSchedulerTest.java    |   7 +
 .../epinephelinae/ConcurrentGrouperTest.java       | 210 ++++++++--------
 .../nested/NestedDataColumnSupplierTest.java       |  45 ++--
 .../nested/NestedDataColumnSupplierV4Test.java     |  45 ++--
 .../nested/ScalarDoubleColumnSupplierTest.java     |  45 ++--
 .../nested/ScalarLongColumnSupplierTest.java       |  45 ++--
 .../nested/ScalarStringColumnSupplierTest.java     |  45 ++--
 .../segment/nested/VariantColumnSupplierTest.java  |  43 ++--
 13 files changed, 535 insertions(+), 464 deletions(-)

diff --git 
a/processing/src/test/java/org/apache/druid/common/guava/GuavaUtilsTest.java 
b/processing/src/test/java/org/apache/druid/common/guava/GuavaUtilsTest.java
index 317cb350f97..8ec9584dcac 100644
--- a/processing/src/test/java/org/apache/druid/common/guava/GuavaUtilsTest.java
+++ b/processing/src/test/java/org/apache/druid/common/guava/GuavaUtilsTest.java
@@ -75,46 +75,53 @@ public class GuavaUtilsTest
     int tasks = 3;
     ExecutorService service = Execs.multiThreaded(tasks, "GuavaUtilsTest-%d");
     ListeningExecutorService exc = MoreExecutors.listeningDecorator(service);
-    //a flag what time to throw exception.
-    AtomicBoolean someoneFailed = new AtomicBoolean(false);
-    List<CountDownLatch> latches = new ArrayList<>(tasks);
-    Function<Integer, List<ListenableFuture<Object>>> function = (taskCount) 
-> {
-      List<ListenableFuture<Object>> futures = new ArrayList<>();
-      for (int i = 0; i < taskCount; i++) {
-        final CountDownLatch latch = new CountDownLatch(1);
-        latches.add(latch);
-        ListenableFuture<Object> future = exc.submit(new Callable<Object>() {
-          @Override
-          public Object call() throws RuntimeException, InterruptedException
+    try {
+      //a flag what time to throw exception.
+      AtomicBoolean someoneFailed = new AtomicBoolean(false);
+      List<CountDownLatch> latches = new ArrayList<>(tasks);
+      Function<Integer, List<ListenableFuture<Object>>> function = (taskCount) 
-> {
+        List<ListenableFuture<Object>> futures = new ArrayList<>();
+        for (int i = 0; i < taskCount; i++) {
+          final CountDownLatch latch = new CountDownLatch(1);
+          latches.add(latch);
+          ListenableFuture<Object> future = exc.submit(new Callable<Object>()
           {
-            latch.await(60, TimeUnit.SECONDS);
-            if (someoneFailed.compareAndSet(false, true)) {
-              throw new RuntimeException("This exception simulates an error");
+            @Override
+            public Object call() throws RuntimeException, InterruptedException
+            {
+              latch.await(60, TimeUnit.SECONDS);
+              if (someoneFailed.compareAndSet(false, true)) {
+                throw new RuntimeException("This exception simulates an 
error");
+              }
+              return null;
             }
-            return null;
-          }
-        });
-        futures.add(future);
-      }
-      return futures;
-    };
+          });
+          futures.add(future);
+        }
+        return futures;
+      };
 
-    List<ListenableFuture<Object>> futures = function.apply(tasks);
-    Assert.assertEquals(tasks, futures.stream().filter(f -> 
!f.isDone()).count());
-    // "release" the last tasks, which will cause it to fail as someoneFailed 
will still be false
-    latches.get(tasks - 1).countDown();
+      List<ListenableFuture<Object>> futures = function.apply(tasks);
+      Assert.assertEquals(tasks, futures.stream().filter(f -> 
!f.isDone()).count());
+      // "release" the last tasks, which will cause it to fail as 
someoneFailed will still be false
+      latches.get(tasks - 1).countDown();
 
-    ListenableFuture<List<Object>> future = Futures.allAsList(futures);
+      ListenableFuture<List<Object>> future = Futures.allAsList(futures);
 
-    ExecutionException thrown = Assert.assertThrows(
-        ExecutionException.class,
-        future::get
-    );
-    Assert.assertEquals("This exception simulates an error", 
thrown.getCause().getMessage());
-    GuavaUtils.cancelAll(true, future, futures);
-    Assert.assertEquals(0, futures.stream().filter(f -> !f.isDone()).count());
-    for (CountDownLatch latch : latches) {
-      latch.countDown();
+      ExecutionException thrown = Assert.assertThrows(
+          ExecutionException.class,
+          future::get
+      );
+      Assert.assertEquals("This exception simulates an error", 
thrown.getCause().getMessage());
+      GuavaUtils.cancelAll(true, future, futures);
+      Assert.assertEquals(0, futures.stream().filter(f -> 
!f.isDone()).count());
+      for (CountDownLatch latch : latches) {
+        latch.countDown();
+      }
+    }
+    finally {
+      exc.shutdownNow();
+      service.shutdownNow();
     }
   }
 }
diff --git 
a/processing/src/test/java/org/apache/druid/java/util/emitter/core/HttpEmitterTest.java
 
b/processing/src/test/java/org/apache/druid/java/util/emitter/core/HttpEmitterTest.java
index b589455b4d8..384b3a0a8ea 100644
--- 
a/processing/src/test/java/org/apache/druid/java/util/emitter/core/HttpEmitterTest.java
+++ 
b/processing/src/test/java/org/apache/druid/java/util/emitter/core/HttpEmitterTest.java
@@ -72,21 +72,27 @@ public class HttpEmitterTest
         .setHttpTimeoutAllowanceFactor(timeoutAllowanceFactor)
         .setFlushTimeout(BaseHttpEmittingConfig.TEST_FLUSH_TIMEOUT_MILLIS)
         .build();
-    final HttpPostEmitter emitter = new HttpPostEmitter(config, httpClient, 
OBJECT_MAPPER);
+    try (final HttpPostEmitter emitter = new HttpPostEmitter(config, 
httpClient, OBJECT_MAPPER)) {
+      long startMs = System.currentTimeMillis();
+      emitter.start();
+      emitter.emitAndReturnBatch(new IntEvent());
+      emitter.flush();
+      long fillTimeMs = System.currentTimeMillis() - startMs;
+      MatcherAssert.assertThat(
+          (double) timeoutUsed.get(),
+          Matchers.lessThan(fillTimeMs * (timeoutAllowanceFactor + 0.5))
+      );
 
-    long startMs = System.currentTimeMillis();
-    emitter.start();
-    emitter.emitAndReturnBatch(new IntEvent());
-    emitter.flush();
-    long fillTimeMs = System.currentTimeMillis() - startMs;
-    MatcherAssert.assertThat((double) timeoutUsed.get(), 
Matchers.lessThan(fillTimeMs * (timeoutAllowanceFactor + 0.5)));
-
-    startMs = System.currentTimeMillis();
-    final Batch batch = emitter.emitAndReturnBatch(new IntEvent());
-    Thread.sleep(1000);
-    batch.seal();
-    emitter.flush();
-    fillTimeMs = System.currentTimeMillis() - startMs;
-    MatcherAssert.assertThat((double) timeoutUsed.get(), 
Matchers.lessThan(fillTimeMs * (timeoutAllowanceFactor + 0.5)));
+      startMs = System.currentTimeMillis();
+      final Batch batch = emitter.emitAndReturnBatch(new IntEvent());
+      Thread.sleep(1000);
+      batch.seal();
+      emitter.flush();
+      fillTimeMs = System.currentTimeMillis() - startMs;
+      MatcherAssert.assertThat(
+          (double) timeoutUsed.get(),
+          Matchers.lessThan(fillTimeMs * (timeoutAllowanceFactor + 0.5))
+      );
+    }
   }
 }
diff --git 
a/processing/src/test/java/org/apache/druid/java/util/emitter/core/HttpPostEmitterLoggerStressTest.java
 
b/processing/src/test/java/org/apache/druid/java/util/emitter/core/HttpPostEmitterLoggerStressTest.java
index ad409467929..b4a6f5a25e3 100644
--- 
a/processing/src/test/java/org/apache/druid/java/util/emitter/core/HttpPostEmitterLoggerStressTest.java
+++ 
b/processing/src/test/java/org/apache/druid/java/util/emitter/core/HttpPostEmitterLoggerStressTest.java
@@ -52,61 +52,63 @@ public class HttpPostEmitterLoggerStressTest
         .setBatchQueueSizeLimit(10)
         .setMinHttpTimeoutMillis(100)
         .build();
-    final HttpPostEmitter emitter = new HttpPostEmitter(config, httpClient, 
new ObjectMapper());
+    try (HttpPostEmitter emitter = new HttpPostEmitter(config, httpClient, new 
ObjectMapper())) {
 
-    emitter.start();
+      emitter.start();
 
-    httpClient.setGoHandler(new GoHandler() {
-      @Override
-      protected ListenableFuture<Response> go(Request request)
+      httpClient.setGoHandler(new GoHandler()
       {
-        return GoHandlers.immediateFuture(EmitterTest.okResponse());
+        @Override
+        protected ListenableFuture<Response> go(Request request)
+        {
+          return GoHandlers.immediateFuture(EmitterTest.okResponse());
+        }
+      });
+
+      Event smallEvent = ServiceMetricEvent.builder()
+                                           .setFeed("smallEvents")
+                                           .setDimension("test", "hi")
+                                           .setMetric("metric", 10)
+                                           .build("qwerty", "asdfgh");
+
+      for (int i = 0; i < 1000; i++) {
+        emitter.emit(smallEvent);
+
+        Assert.assertTrue(emitter.getTotalFailedBuffers() <= 10);
+        Assert.assertTrue(emitter.getBuffersToEmit() <= 12);
       }
-    });
 
-    Event smallEvent = ServiceMetricEvent.builder()
-                                       .setFeed("smallEvents")
-                                       .setDimension("test", "hi")
-                                       .setMetric("metric", 10)
-                                       .build("qwerty", "asdfgh");
+      // by the end of this test, there should be no outstanding failed buffers
 
-    for (int i = 0; i < 1000; i++) {
-      emitter.emit(smallEvent);
+      // with a flush time of 5s, min timeout of 100ms, 20s should be
+      // easily enough to get through all of the events
 
-      Assert.assertTrue(emitter.getTotalFailedBuffers() <= 10);
-      Assert.assertTrue(emitter.getBuffersToEmit() <= 12);
-    }
-
-    // by the end of this test, there should be no outstanding failed buffers
-
-    // with a flush time of 5s, min timeout of 100ms, 20s should be
-    // easily enough to get through all of the events
+      while (emitter.getTotalFailedBuffers() > 0) {
+        Thread.sleep(500);
+      }
 
-    while (emitter.getTotalFailedBuffers() > 0) {
-      Thread.sleep(500);
+      // there is also no reason to have too many log events
+      // refer to: https://github.com/apache/druid/issues/11279;
+
+      long countOfTimeouts = logCapture.getLogEvents().stream()
+                                       .filter(ev -> ev.getLevel() == 
Level.DEBUG)
+                                       .filter(ev -> ev.getThrown() instanceof 
TimeoutException)
+                                       .count();
+
+      // 1000 events limit, implies we should have no more than
+      // 1000 rejected send events within the expected 20sec
+      // duration of the test
+      long limitTimeoutEvents = 1000;
+
+      Assert.assertTrue(
+          String.format(
+              Locale.getDefault(),
+              "too many timeouts (%d), expect less than (%d)",
+              countOfTimeouts,
+              limitTimeoutEvents
+          ),
+          countOfTimeouts < limitTimeoutEvents
+      );
     }
-
-    // there is also no reason to have too many log events
-    // refer to: https://github.com/apache/druid/issues/11279;
-
-    long countOfTimeouts = logCapture.getLogEvents().stream()
-        .filter(ev -> ev.getLevel() == Level.DEBUG)
-        .filter(ev -> ev.getThrown() instanceof TimeoutException)
-        .count();
-
-    // 1000 events limit, implies we should have no more than
-    // 1000 rejected send events within the expected 20sec
-    // duration of the test
-    long limitTimeoutEvents = 1000;
-
-    Assert.assertTrue(
-        String.format(
-          Locale.getDefault(),
-          "too many timeouts (%d), expect less than (%d)",
-          countOfTimeouts,
-          limitTimeoutEvents),
-        countOfTimeouts < limitTimeoutEvents);
-
-    emitter.close();
   }
 }
diff --git 
a/processing/src/test/java/org/apache/druid/java/util/emitter/core/HttpPostEmitterStressTest.java
 
b/processing/src/test/java/org/apache/druid/java/util/emitter/core/HttpPostEmitterStressTest.java
index 5935175d7de..2509db7a4b0 100644
--- 
a/processing/src/test/java/org/apache/druid/java/util/emitter/core/HttpPostEmitterStressTest.java
+++ 
b/processing/src/test/java/org/apache/druid/java/util/emitter/core/HttpPostEmitterStressTest.java
@@ -65,78 +65,80 @@ public class HttpPostEmitterStressTest
         // For this test, we don't need any batches to be dropped, i. e. 
"gaps" in data
         .setBatchQueueSizeLimit(1000)
         .build();
-    final HttpPostEmitter emitter = new HttpPostEmitter(config, httpClient, 
OBJECT_MAPPER);
-    int nThreads = Runtime.getRuntime().availableProcessors() * 2;
-    final List<IntList> eventsPerThread = new ArrayList<>(nThreads);
-    final List<List<Batch>> eventBatchesPerThread = new ArrayList<>(nThreads);
-    for (int i = 0; i < nThreads; i++) {
-      eventsPerThread.add(new IntArrayList());
-      eventBatchesPerThread.add(new ArrayList<Batch>());
-    }
-    for (int i = 0; i < N; i++) {
-      
eventsPerThread.get(ThreadLocalRandom.current().nextInt(nThreads)).add(i);
-    }
-    final BitSet emittedEvents = new BitSet(N);
-    httpClient.setGoHandler(new GoHandler()
-    {
-      @Override
-      protected ListenableFuture<Response> go(Request request)
-      {
-        ByteBuffer batch = request.getByteBufferData().slice();
-        while (batch.remaining() > 0) {
-          emittedEvents.set(batch.getInt());
-        }
-        return GoHandlers.immediateFuture(EmitterTest.okResponse());
+    try (final HttpPostEmitter emitter = new HttpPostEmitter(config, 
httpClient, OBJECT_MAPPER)) {
+      int nThreads = Runtime.getRuntime().availableProcessors() * 2;
+      final List<IntList> eventsPerThread = new ArrayList<>(nThreads);
+      final List<List<Batch>> eventBatchesPerThread = new 
ArrayList<>(nThreads);
+      for (int i = 0; i < nThreads; i++) {
+        eventsPerThread.add(new IntArrayList());
+        eventBatchesPerThread.add(new ArrayList<Batch>());
+      }
+      for (int i = 0; i < N; i++) {
+        
eventsPerThread.get(ThreadLocalRandom.current().nextInt(nThreads)).add(i);
       }
-    });
-    emitter.start();
-    final CountDownLatch threadsCompleted = new CountDownLatch(nThreads);
-    for (int i = 0; i < nThreads; i++) {
-      final int threadIndex = i;
-      new Thread() {
+      final BitSet emittedEvents = new BitSet(N);
+      httpClient.setGoHandler(new GoHandler()
+      {
         @Override
-        public void run()
+        protected ListenableFuture<Response> go(Request request)
         {
-          IntList events = eventsPerThread.get(threadIndex);
-          List<Batch> eventBatches = eventBatchesPerThread.get(threadIndex);
-          IntEvent event = new IntEvent();
-          for (int i = 0, eventsSize = events.size(); i < eventsSize; i++) {
-            event.index = events.getInt(i);
-            eventBatches.add(emitter.emitAndReturnBatch(event));
-            if (i % 16 == 0) {
-              try {
-                Thread.sleep(10);
-              }
-              catch (InterruptedException e) {
-                throw new RuntimeException(e);
+          ByteBuffer batch = request.getByteBufferData().slice();
+          while (batch.remaining() > 0) {
+            emittedEvents.set(batch.getInt());
+          }
+          return GoHandlers.immediateFuture(EmitterTest.okResponse());
+        }
+      });
+      emitter.start();
+      final CountDownLatch threadsCompleted = new CountDownLatch(nThreads);
+      for (int i = 0; i < nThreads; i++) {
+        final int threadIndex = i;
+        new Thread()
+        {
+          @Override
+          public void run()
+          {
+            IntList events = eventsPerThread.get(threadIndex);
+            List<Batch> eventBatches = eventBatchesPerThread.get(threadIndex);
+            IntEvent event = new IntEvent();
+            for (int i = 0, eventsSize = events.size(); i < eventsSize; i++) {
+              event.index = events.getInt(i);
+              eventBatches.add(emitter.emitAndReturnBatch(event));
+              if (i % 16 == 0) {
+                try {
+                  Thread.sleep(10);
+                }
+                catch (InterruptedException e) {
+                  throw new RuntimeException(e);
+                }
               }
             }
+            threadsCompleted.countDown();
           }
-          threadsCompleted.countDown();
-        }
-      }.start();
-    }
-    threadsCompleted.await();
-    emitter.flush();
-    System.out.println("Allocated buffers: " + 
emitter.getTotalAllocatedBuffers());
-    for (int eventIndex = 0; eventIndex < N; eventIndex++) {
-      if (!emittedEvents.get(eventIndex)) {
-        for (int threadIndex = 0; threadIndex < eventsPerThread.size(); 
threadIndex++) {
-          IntList threadEvents = eventsPerThread.get(threadIndex);
-          int indexOfEvent = threadEvents.indexOf(eventIndex);
-          if (indexOfEvent >= 0) {
-            Batch batch = 
eventBatchesPerThread.get(threadIndex).get(indexOfEvent);
-            System.err.println(batch);
-            int bufferWatermark = batch.getSealedBufferWatermark();
-            ByteBuffer batchBuffer = ByteBuffer.wrap(batch.buffer);
-            batchBuffer.limit(bufferWatermark);
-            while (batchBuffer.remaining() > 0) {
-              System.err.println(batchBuffer.getInt());
+        }.start();
+      }
+      threadsCompleted.await();
+      emitter.flush();
+      System.out.println("Allocated buffers: " + 
emitter.getTotalAllocatedBuffers());
+      for (int eventIndex = 0; eventIndex < N; eventIndex++) {
+        if (!emittedEvents.get(eventIndex)) {
+          for (int threadIndex = 0; threadIndex < eventsPerThread.size(); 
threadIndex++) {
+            IntList threadEvents = eventsPerThread.get(threadIndex);
+            int indexOfEvent = threadEvents.indexOf(eventIndex);
+            if (indexOfEvent >= 0) {
+              Batch batch = 
eventBatchesPerThread.get(threadIndex).get(indexOfEvent);
+              System.err.println(batch);
+              int bufferWatermark = batch.getSealedBufferWatermark();
+              ByteBuffer batchBuffer = ByteBuffer.wrap(batch.buffer);
+              batchBuffer.limit(bufferWatermark);
+              while (batchBuffer.remaining() > 0) {
+                System.err.println(batchBuffer.getInt());
+              }
+              break;
             }
-            break;
           }
+          throw new AssertionError("event " + eventIndex);
         }
-        throw new AssertionError("event " + eventIndex);
       }
     }
   }
@@ -151,34 +153,36 @@ public class HttpPostEmitterStressTest
         .setMaxBatchSize(1024 * 1024)
         .setBatchQueueSizeLimit(10)
         .build();
-    final HttpPostEmitter emitter = new HttpPostEmitter(config, httpClient, 
new ObjectMapper());
+    try (final HttpPostEmitter emitter = new HttpPostEmitter(config, 
httpClient, new ObjectMapper())) {
 
-    emitter.start();
+      emitter.start();
 
-    httpClient.setGoHandler(new GoHandler() {
-      @Override
-      protected ListenableFuture<Response> go(Request request)
+      httpClient.setGoHandler(new GoHandler()
       {
-        return GoHandlers.immediateFuture(EmitterTest.BAD_RESPONSE);
-      }
-    });
+        @Override
+        protected ListenableFuture<Response> go(Request request)
+        {
+          return GoHandlers.immediateFuture(EmitterTest.BAD_RESPONSE);
+        }
+      });
 
-    char[] chars = new char[600000];
-    Arrays.fill(chars, '*');
-    String bigString = new String(chars);
+      char[] chars = new char[600000];
+      Arrays.fill(chars, '*');
+      String bigString = new String(chars);
 
-    Event bigEvent = ServiceMetricEvent.builder()
-                                       .setFeed("bigEvents")
-                                       .setDimension("test", bigString)
-                                       .setMetric("metric", 10)
-                                       .build("qwerty", "asdfgh");
+      Event bigEvent = ServiceMetricEvent.builder()
+                                         .setFeed("bigEvents")
+                                         .setDimension("test", bigString)
+                                         .setMetric("metric", 10)
+                                         .build("qwerty", "asdfgh");
 
-    for (int i = 0; i < 1000; i++) {
-      emitter.emit(bigEvent);
-      Assert.assertTrue(emitter.getLargeEventsToEmit() <= 11);
-    }
+      for (int i = 0; i < 1000; i++) {
+        emitter.emit(bigEvent);
+        Assert.assertTrue(emitter.getLargeEventsToEmit() <= 11);
+      }
 
-    emitter.flush();
+      emitter.flush();
+    }
   }
 
   @Test
@@ -191,64 +195,67 @@ public class HttpPostEmitterStressTest
         .setMaxBatchSize(1024 * 1024)
         .setBatchQueueSizeLimit(10)
         .build();
-    final HttpPostEmitter emitter = new HttpPostEmitter(config, httpClient, 
new ObjectMapper());
+    try (final HttpPostEmitter emitter = new HttpPostEmitter(config, 
httpClient, new ObjectMapper())) {
+      emitter.start();
 
-    emitter.start();
-
-    httpClient.setGoHandler(new GoHandler() {
-      @Override
-      protected ListenableFuture<Response> go(Request request)
+      httpClient.setGoHandler(new GoHandler()
       {
-        return GoHandlers.immediateFuture(EmitterTest.BAD_RESPONSE);
-      }
-    });
-
-    char[] chars = new char[600000];
-    Arrays.fill(chars, '*');
-    String bigString = new String(chars);
-
-    Event smallEvent = ServiceMetricEvent.builder()
-                                       .setFeed("smallEvents")
-                                       .setDimension("test", "hi")
-                                       .setMetric("metric", 10)
-                                       .build("qwerty", "asdfgh");
-
-    Event bigEvent = ServiceMetricEvent.builder()
-                                       .setFeed("bigEvents")
-                                       .setDimension("test", bigString)
-                                       .setMetric("metric", 10)
-                                       .build("qwerty", "asdfgh");
-
-    final CountDownLatch threadsCompleted = new CountDownLatch(2);
-    new Thread() {
-      @Override
-      public void run()
+        @Override
+        protected ListenableFuture<Response> go(Request request)
+        {
+          return GoHandlers.immediateFuture(EmitterTest.BAD_RESPONSE);
+        }
+      });
+
+      char[] chars = new char[600000];
+      Arrays.fill(chars, '*');
+      String bigString = new String(chars);
+
+      Event smallEvent = ServiceMetricEvent.builder()
+                                           .setFeed("smallEvents")
+                                           .setDimension("test", "hi")
+                                           .setMetric("metric", 10)
+                                           .build("qwerty", "asdfgh");
+
+      Event bigEvent = ServiceMetricEvent.builder()
+                                         .setFeed("bigEvents")
+                                         .setDimension("test", bigString)
+                                         .setMetric("metric", 10)
+                                         .build("qwerty", "asdfgh");
+
+      final CountDownLatch threadsCompleted = new CountDownLatch(2);
+      new Thread()
       {
-        for (int i = 0; i < 1000; i++) {
+        @Override
+        public void run()
+        {
+          for (int i = 0; i < 1000; i++) {
 
-          emitter.emit(smallEvent);
+            emitter.emit(smallEvent);
 
-          Assert.assertTrue(emitter.getTotalFailedBuffers() <= 10);
-          Assert.assertTrue(emitter.getBuffersToEmit() <= 12);
+            Assert.assertTrue(emitter.getTotalFailedBuffers() <= 10);
+            Assert.assertTrue(emitter.getBuffersToEmit() <= 12);
+          }
+          threadsCompleted.countDown();
         }
-        threadsCompleted.countDown();
-      }
-    }.start();
-    new Thread() {
-      @Override
-      public void run()
+      }.start();
+      new Thread()
       {
-        for (int i = 0; i < 1000; i++) {
+        @Override
+        public void run()
+        {
+          for (int i = 0; i < 1000; i++) {
 
-          emitter.emit(bigEvent);
+            emitter.emit(bigEvent);
 
-          Assert.assertTrue(emitter.getTotalFailedBuffers() <= 10);
-          Assert.assertTrue(emitter.getBuffersToEmit() <= 12);
+            Assert.assertTrue(emitter.getTotalFailedBuffers() <= 10);
+            Assert.assertTrue(emitter.getBuffersToEmit() <= 12);
+          }
+          threadsCompleted.countDown();
         }
-        threadsCompleted.countDown();
-      }
-    }.start();
-    threadsCompleted.await();
-    emitter.flush();
+      }.start();
+      threadsCompleted.await();
+      emitter.flush();
+    }
   }
 }
diff --git 
a/processing/src/test/java/org/apache/druid/java/util/emitter/core/HttpPostEmitterTest.java
 
b/processing/src/test/java/org/apache/druid/java/util/emitter/core/HttpPostEmitterTest.java
index fd3d312e91f..987de0b3c5e 100644
--- 
a/processing/src/test/java/org/apache/druid/java/util/emitter/core/HttpPostEmitterTest.java
+++ 
b/processing/src/test/java/org/apache/druid/java/util/emitter/core/HttpPostEmitterTest.java
@@ -73,26 +73,26 @@ public class HttpPostEmitterTest
         .setMaxBatchSize(1024 * 1024)
         .setBatchQueueSizeLimit(1000)
         .build();
-    final HttpPostEmitter emitter = new HttpPostEmitter(config, httpClient, 
OBJECT_MAPPER);
-    emitter.start();
+    try (final HttpPostEmitter emitter = new HttpPostEmitter(config, 
httpClient, OBJECT_MAPPER)) {
+      emitter.start();
 
-    // emit first event
-    emitter.emitAndReturnBatch(new IntEvent());
-    Thread.sleep(1000L);
+      // emit first event
+      emitter.emitAndReturnBatch(new IntEvent());
+      Thread.sleep(1000L);
 
-    // get concurrentBatch reference and set value to lon as if it would fail 
while
-    // HttpPostEmitter#onSealExclusive method invocation.
-    Field concurrentBatch = 
emitter.getClass().getDeclaredField("concurrentBatch");
-    concurrentBatch.setAccessible(true);
-    ((AtomicReference<Object>) concurrentBatch.get(emitter)).getAndSet(1L);
-    // something terrible happened previously so that batch has to recover
+      // get concurrentBatch reference and set value to lon as if it would 
fail while
+      // HttpPostEmitter#onSealExclusive method invocation.
+      Field concurrentBatch = 
emitter.getClass().getDeclaredField("concurrentBatch");
+      concurrentBatch.setAccessible(true);
+      ((AtomicReference<Object>) concurrentBatch.get(emitter)).getAndSet(1L);
+      // something terrible happened previously so that batch has to recover
 
-    // emit second event
-    emitter.emitAndReturnBatch(new IntEvent());
+      // emit second event
+      emitter.emitAndReturnBatch(new IntEvent());
 
-    emitter.flush();
-    emitter.close();
+      emitter.flush();
 
-    Assert.assertEquals(2, emitter.getTotalEmittedEvents());
+      Assert.assertEquals(2, emitter.getTotalEmittedEvents());
+    }
   }
 }
diff --git 
a/processing/src/test/java/org/apache/druid/java/util/metrics/BasicMonitorSchedulerTest.java
 
b/processing/src/test/java/org/apache/druid/java/util/metrics/BasicMonitorSchedulerTest.java
index ceac1e55644..6e6ca49cdd9 100644
--- 
a/processing/src/test/java/org/apache/druid/java/util/metrics/BasicMonitorSchedulerTest.java
+++ 
b/processing/src/test/java/org/apache/druid/java/util/metrics/BasicMonitorSchedulerTest.java
@@ -23,6 +23,7 @@ import com.google.common.collect.ImmutableList;
 import org.apache.druid.java.util.common.concurrent.Execs;
 import org.apache.druid.java.util.emitter.service.ServiceEmitter;
 import org.joda.time.Duration;
+import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 import org.mockito.ArgumentMatchers;
@@ -50,6 +51,12 @@ public class BasicMonitorSchedulerTest
     exec = Execs.scheduledSingleThreaded("BasicMonitorSchedulerTest");
   }
 
+  @After
+  public void teardown()
+  {
+    exec.shutdownNow();
+  }
+
   @Test
   public void testStart_RepeatScheduling() throws InterruptedException
   {
diff --git 
a/processing/src/test/java/org/apache/druid/query/groupby/epinephelinae/ConcurrentGrouperTest.java
 
b/processing/src/test/java/org/apache/druid/query/groupby/epinephelinae/ConcurrentGrouperTest.java
index 6ab2f1a7bf0..4da2fbbb9a2 100644
--- 
a/processing/src/test/java/org/apache/druid/query/groupby/epinephelinae/ConcurrentGrouperTest.java
+++ 
b/processing/src/test/java/org/apache/druid/query/groupby/epinephelinae/ConcurrentGrouperTest.java
@@ -24,6 +24,7 @@ import com.fasterxml.jackson.annotation.JsonValue;
 import com.google.common.base.Supplier;
 import com.google.common.collect.ImmutableList;
 import com.google.common.primitives.Longs;
+import com.google.common.util.concurrent.ListeningExecutorService;
 import com.google.common.util.concurrent.MoreExecutors;
 import org.apache.druid.collections.ReferenceCountingResourceHolder;
 import org.apache.druid.jackson.DefaultObjectMapper;
@@ -150,67 +151,72 @@ public class ConcurrentGrouperTest extends 
InitializedNullHandlingTest
         temporaryFolder.newFolder(),
         1024 * 1024
     );
+    final ListeningExecutorService service = 
MoreExecutors.listeningDecorator(exec);
+    try {
+      final ConcurrentGrouper<LongKey> grouper = new ConcurrentGrouper<>(
+          bufferSupplier,
+          TEST_RESOURCE_HOLDER,
+          KEY_SERDE_FACTORY,
+          KEY_SERDE_FACTORY,
+          NULL_FACTORY,
+          new AggregatorFactory[]{new CountAggregatorFactory("cnt")},
+          1024,
+          0.7f,
+          1,
+          temporaryStorage,
+          new DefaultObjectMapper(),
+          concurrencyHint,
+          null,
+          false,
+          service,
+          0,
+          false,
+          0,
+          4,
+          parallelCombineThreads,
+          mergeThreadLocal
+      );
+      closer.register(grouper);
+      grouper.init();
+
+      final int numRows = 1000;
+
+      Future<?>[] futures = new Future[concurrencyHint];
+
+      for (int i = 0; i < concurrencyHint; i++) {
+        futures[i] = exec.submit(() -> {
+          for (long j = 0; j < numRows; j++) {
+            if (!grouper.aggregate(new LongKey(j)).isOk()) {
+              throw new ISE("Grouper is full");
+            }
+          }
+        });
+      }
 
-    final ConcurrentGrouper<LongKey> grouper = new ConcurrentGrouper<>(
-        bufferSupplier,
-        TEST_RESOURCE_HOLDER,
-        KEY_SERDE_FACTORY,
-        KEY_SERDE_FACTORY,
-        NULL_FACTORY,
-        new AggregatorFactory[]{new CountAggregatorFactory("cnt")},
-        1024,
-        0.7f,
-        1,
-        temporaryStorage,
-        new DefaultObjectMapper(),
-        concurrencyHint,
-        null,
-        false,
-        MoreExecutors.listeningDecorator(exec),
-        0,
-        false,
-        0,
-        4,
-        parallelCombineThreads,
-        mergeThreadLocal
-    );
-    closer.register(grouper);
-    grouper.init();
+      for (Future eachFuture : futures) {
+        eachFuture.get();
+      }
 
-    final int numRows = 1000;
+      final List<Entry<LongKey>> expected = new ArrayList<>();
+      for (long i = 0; i < numRows; i++) {
+        expected.add(new ReusableEntry<>(new LongKey(i), new Object[]{(long) 
concurrencyHint}));
+      }
 
-    Future<?>[] futures = new Future[concurrencyHint];
+      final CloseableIterator<Entry<LongKey>> iterator = 
closer.register(grouper.iterator(true));
 
-    for (int i = 0; i < concurrencyHint; i++) {
-      futures[i] = exec.submit(() -> {
-        for (long j = 0; j < numRows; j++) {
-          if (!grouper.aggregate(new LongKey(j)).isOk()) {
-            throw new ISE("Grouper is full");
-          }
-        }
-      });
-    }
-
-    for (Future eachFuture : futures) {
-      eachFuture.get();
-    }
+      if (parallelCombineThreads > 1 && (mergeThreadLocal || 
temporaryStorage.currentSize() > 0)) {
+        // Parallel combiner configured, and expected to actually be used due 
to thread-local merge (either explicitly
+        // configured, or due to spilling).
+        Assert.assertTrue(TEST_RESOURCE_HOLDER.taken);
+      } else {
+        Assert.assertFalse(TEST_RESOURCE_HOLDER.taken);
+      }
 
-    final List<Entry<LongKey>> expected = new ArrayList<>();
-    for (long i = 0; i < numRows; i++) {
-      expected.add(new ReusableEntry<>(new LongKey(i), new Object[]{(long) 
concurrencyHint}));
+      GrouperTestUtil.assertEntriesEquals(expected.iterator(), iterator);
     }
-
-    final CloseableIterator<Entry<LongKey>> iterator = 
closer.register(grouper.iterator(true));
-
-    if (parallelCombineThreads > 1 && (mergeThreadLocal || 
temporaryStorage.currentSize() > 0)) {
-      // Parallel combiner configured, and expected to actually be used due to 
thread-local merge (either explicitly
-      // configured, or due to spilling).
-      Assert.assertTrue(TEST_RESOURCE_HOLDER.taken);
-    } else {
-      Assert.assertFalse(TEST_RESOURCE_HOLDER.taken);
+    finally {
+      service.shutdownNow();
     }
-
-    GrouperTestUtil.assertEntriesEquals(expected.iterator(), iterator);
   }
 
   @Test
@@ -221,56 +227,62 @@ public class ConcurrentGrouperTest extends 
InitializedNullHandlingTest
       return;
     }
 
-    final ConcurrentGrouper<LongKey> grouper = new ConcurrentGrouper<>(
-        bufferSupplier,
-        TEST_RESOURCE_HOLDER,
-        KEY_SERDE_FACTORY,
-        KEY_SERDE_FACTORY,
-        NULL_FACTORY,
-        new AggregatorFactory[]{new CountAggregatorFactory("cnt")},
-        1024,
-        0.7f,
-        1,
-        new LimitedTemporaryStorage(temporaryFolder.newFolder(), 1024 * 1024),
-        new DefaultObjectMapper(),
-        concurrencyHint,
-        null,
-        false,
-        MoreExecutors.listeningDecorator(exec),
-        0,
-        true,
-        1,
-        4,
-        parallelCombineThreads,
-        mergeThreadLocal
-    );
-    closer.register(grouper);
-    grouper.init();
+    ListeningExecutorService service = MoreExecutors.listeningDecorator(exec);
+    try {
+      final ConcurrentGrouper<LongKey> grouper = new ConcurrentGrouper<>(
+          bufferSupplier,
+          TEST_RESOURCE_HOLDER,
+          KEY_SERDE_FACTORY,
+          KEY_SERDE_FACTORY,
+          NULL_FACTORY,
+          new AggregatorFactory[]{new CountAggregatorFactory("cnt")},
+          1024,
+          0.7f,
+          1,
+          new LimitedTemporaryStorage(temporaryFolder.newFolder(), 1024 * 
1024),
+          new DefaultObjectMapper(),
+          concurrencyHint,
+          null,
+          false,
+          service,
+          0,
+          true,
+          1,
+          4,
+          parallelCombineThreads,
+          mergeThreadLocal
+      );
+      closer.register(grouper);
+      grouper.init();
+
+      final int numRows = 1000;
+
+      Future<?>[] futures = new Future[concurrencyHint];
+
+      for (int i = 0; i < concurrencyHint; i++) {
+        futures[i] = exec.submit(() -> {
+          for (long j = 0; j < numRows; j++) {
+            if (!grouper.aggregate(new LongKey(j)).isOk()) {
+              throw new ISE("Grouper is full");
+            }
+          }
+        });
+      }
 
-    final int numRows = 1000;
+      for (Future eachFuture : futures) {
+        eachFuture.get();
+      }
 
-    Future<?>[] futures = new Future[concurrencyHint];
+      final QueryTimeoutException e = Assert.assertThrows(
+          QueryTimeoutException.class,
+          () -> closer.register(grouper.iterator(true))
+      );
 
-    for (int i = 0; i < concurrencyHint; i++) {
-      futures[i] = exec.submit(() -> {
-        for (long j = 0; j < numRows; j++) {
-          if (!grouper.aggregate(new LongKey(j)).isOk()) {
-            throw new ISE("Grouper is full");
-          }
-        }
-      });
+      Assert.assertEquals("Query timeout", e.getErrorCode());
     }
-
-    for (Future eachFuture : futures) {
-      eachFuture.get();
+    finally {
+      service.shutdownNow();
     }
-
-    final QueryTimeoutException e = Assert.assertThrows(
-        QueryTimeoutException.class,
-        () -> closer.register(grouper.iterator(true))
-    );
-
-    Assert.assertEquals("Query timeout", e.getErrorCode());
   }
 
   static class TestResourceHolder extends 
ReferenceCountingResourceHolder<ByteBuffer>
diff --git 
a/processing/src/test/java/org/apache/druid/segment/nested/NestedDataColumnSupplierTest.java
 
b/processing/src/test/java/org/apache/druid/segment/nested/NestedDataColumnSupplierTest.java
index 64ba2679f04..7cf94220100 100644
--- 
a/processing/src/test/java/org/apache/druid/segment/nested/NestedDataColumnSupplierTest.java
+++ 
b/processing/src/test/java/org/apache/druid/segment/nested/NestedDataColumnSupplierTest.java
@@ -299,30 +299,35 @@ public class NestedDataColumnSupplierTest extends 
InitializedNullHandlingTest
 
     final int threads = 10;
     ListeningExecutorService executorService = 
MoreExecutors.listeningDecorator(
-        Execs.multiThreaded(threads, "StandardNestedColumnSupplierTest-%d")
+        Execs.multiThreaded(threads, "NestedColumnSupplierTest-%d")
     );
-    Collection<ListenableFuture<?>> futures = new ArrayList<>(threads);
-    final CountDownLatch threadsStartLatch = new CountDownLatch(1);
-    for (int i = 0; i < threads; ++i) {
-      futures.add(
-          executorService.submit(() -> {
-            try {
-              threadsStartLatch.await();
-              for (int iter = 0; iter < 5000; iter++) {
-                try (NestedDataComplexColumn column = 
(NestedDataComplexColumn) supplier.get()) {
-                  smokeTest(column);
+    try {
+      Collection<ListenableFuture<?>> futures = new ArrayList<>(threads);
+      final CountDownLatch threadsStartLatch = new CountDownLatch(1);
+      for (int i = 0; i < threads; ++i) {
+        futures.add(
+            executorService.submit(() -> {
+              try {
+                threadsStartLatch.await();
+                for (int iter = 0; iter < 5000; iter++) {
+                  try (NestedDataComplexColumn column = 
(NestedDataComplexColumn) supplier.get()) {
+                    smokeTest(column);
+                  }
                 }
               }
-            }
-            catch (Throwable ex) {
-              failureReason.set(ex.getMessage());
-            }
-          })
-      );
+              catch (Throwable ex) {
+                failureReason.set(ex.getMessage());
+              }
+            })
+        );
+      }
+      threadsStartLatch.countDown();
+      Futures.allAsList(futures).get();
+      Assert.assertEquals(expectedReason, failureReason.get());
+    }
+    finally {
+      executorService.shutdownNow();
     }
-    threadsStartLatch.countDown();
-    Futures.allAsList(futures).get();
-    Assert.assertEquals(expectedReason, failureReason.get());
   }
 
   private void smokeTest(NestedDataComplexColumn column) throws IOException
diff --git 
a/processing/src/test/java/org/apache/druid/segment/nested/NestedDataColumnSupplierV4Test.java
 
b/processing/src/test/java/org/apache/druid/segment/nested/NestedDataColumnSupplierV4Test.java
index aa42d58710d..315077b4160 100644
--- 
a/processing/src/test/java/org/apache/druid/segment/nested/NestedDataColumnSupplierV4Test.java
+++ 
b/processing/src/test/java/org/apache/druid/segment/nested/NestedDataColumnSupplierV4Test.java
@@ -255,30 +255,35 @@ public class NestedDataColumnSupplierV4Test extends 
InitializedNullHandlingTest
 
     final int threads = 10;
     ListeningExecutorService executorService = 
MoreExecutors.listeningDecorator(
-        Execs.multiThreaded(threads, "NestedDataColumnSupplierTest-%d")
+        Execs.multiThreaded(threads, "NestedDataColumnSupplierV4Test-%d")
     );
-    Collection<ListenableFuture<?>> futures = new ArrayList<>(threads);
-    final CountDownLatch threadsStartLatch = new CountDownLatch(1);
-    for (int i = 0; i < threads; ++i) {
-      futures.add(
-          executorService.submit(() -> {
-            try {
-              threadsStartLatch.await();
-              for (int iter = 0; iter < 5000; iter++) {
-                try (NestedDataComplexColumn column = 
(NestedDataComplexColumn) supplier.get()) {
-                  smokeTest(column);
+    try {
+      Collection<ListenableFuture<?>> futures = new ArrayList<>(threads);
+      final CountDownLatch threadsStartLatch = new CountDownLatch(1);
+      for (int i = 0; i < threads; ++i) {
+        futures.add(
+            executorService.submit(() -> {
+              try {
+                threadsStartLatch.await();
+                for (int iter = 0; iter < 5000; iter++) {
+                  try (NestedDataComplexColumn column = 
(NestedDataComplexColumn) supplier.get()) {
+                    smokeTest(column);
+                  }
                 }
               }
-            }
-            catch (Throwable ex) {
-              failureReason.set(ex.getMessage());
-            }
-          })
-      );
+              catch (Throwable ex) {
+                failureReason.set(ex.getMessage());
+              }
+            })
+        );
+      }
+      threadsStartLatch.countDown();
+      Futures.allAsList(futures).get();
+      Assert.assertEquals(expectedReason, failureReason.get());
+    }
+    finally {
+      executorService.shutdownNow();
     }
-    threadsStartLatch.countDown();
-    Futures.allAsList(futures).get();
-    Assert.assertEquals(expectedReason, failureReason.get());
   }
 
   @Test
diff --git 
a/processing/src/test/java/org/apache/druid/segment/nested/ScalarDoubleColumnSupplierTest.java
 
b/processing/src/test/java/org/apache/druid/segment/nested/ScalarDoubleColumnSupplierTest.java
index f483e297be0..b095ad73aa6 100644
--- 
a/processing/src/test/java/org/apache/druid/segment/nested/ScalarDoubleColumnSupplierTest.java
+++ 
b/processing/src/test/java/org/apache/druid/segment/nested/ScalarDoubleColumnSupplierTest.java
@@ -219,30 +219,35 @@ public class ScalarDoubleColumnSupplierTest extends 
InitializedNullHandlingTest
 
     final int threads = 10;
     ListeningExecutorService executorService = 
MoreExecutors.listeningDecorator(
-        Execs.multiThreaded(threads, "StandardNestedColumnSupplierTest-%d")
+        Execs.multiThreaded(threads, "ScalarDoubleColumnSupplierTest-%d")
     );
-    Collection<ListenableFuture<?>> futures = new ArrayList<>(threads);
-    final CountDownLatch threadsStartLatch = new CountDownLatch(1);
-    for (int i = 0; i < threads; ++i) {
-      futures.add(
-          executorService.submit(() -> {
-            try {
-              threadsStartLatch.await();
-              for (int iter = 0; iter < 5000; iter++) {
-                try (ScalarDoubleColumn column = (ScalarDoubleColumn) 
supplier.get()) {
-                  smokeTest(supplier, column);
+    try {
+      Collection<ListenableFuture<?>> futures = new ArrayList<>(threads);
+      final CountDownLatch threadsStartLatch = new CountDownLatch(1);
+      for (int i = 0; i < threads; ++i) {
+        futures.add(
+            executorService.submit(() -> {
+              try {
+                threadsStartLatch.await();
+                for (int iter = 0; iter < 5000; iter++) {
+                  try (ScalarDoubleColumn column = (ScalarDoubleColumn) 
supplier.get()) {
+                    smokeTest(supplier, column);
+                  }
                 }
               }
-            }
-            catch (Throwable ex) {
-              failureReason.set(ex.getMessage());
-            }
-          })
-      );
+              catch (Throwable ex) {
+                failureReason.set(ex.getMessage());
+              }
+            })
+        );
+      }
+      threadsStartLatch.countDown();
+      Futures.allAsList(futures).get();
+      Assert.assertEquals(expectedReason, failureReason.get());
+    }
+    finally {
+      executorService.shutdownNow();
     }
-    threadsStartLatch.countDown();
-    Futures.allAsList(futures).get();
-    Assert.assertEquals(expectedReason, failureReason.get());
   }
 
   private void smokeTest(ScalarDoubleColumnAndIndexSupplier supplier, 
ScalarDoubleColumn column)
diff --git 
a/processing/src/test/java/org/apache/druid/segment/nested/ScalarLongColumnSupplierTest.java
 
b/processing/src/test/java/org/apache/druid/segment/nested/ScalarLongColumnSupplierTest.java
index c8830f3aefd..68b73a27bc2 100644
--- 
a/processing/src/test/java/org/apache/druid/segment/nested/ScalarLongColumnSupplierTest.java
+++ 
b/processing/src/test/java/org/apache/druid/segment/nested/ScalarLongColumnSupplierTest.java
@@ -219,30 +219,35 @@ public class ScalarLongColumnSupplierTest extends 
InitializedNullHandlingTest
 
     final int threads = 10;
     ListeningExecutorService executorService = 
MoreExecutors.listeningDecorator(
-        Execs.multiThreaded(threads, "StandardNestedColumnSupplierTest-%d")
+        Execs.multiThreaded(threads, "ScalarLongColumnSupplierTest-%d")
     );
-    Collection<ListenableFuture<?>> futures = new ArrayList<>(threads);
-    final CountDownLatch threadsStartLatch = new CountDownLatch(1);
-    for (int i = 0; i < threads; ++i) {
-      futures.add(
-          executorService.submit(() -> {
-            try {
-              threadsStartLatch.await();
-              for (int iter = 0; iter < 5000; iter++) {
-                try (ScalarLongColumn column = (ScalarLongColumn) 
supplier.get()) {
-                  smokeTest(supplier, column);
+    try {
+      Collection<ListenableFuture<?>> futures = new ArrayList<>(threads);
+      final CountDownLatch threadsStartLatch = new CountDownLatch(1);
+      for (int i = 0; i < threads; ++i) {
+        futures.add(
+            executorService.submit(() -> {
+              try {
+                threadsStartLatch.await();
+                for (int iter = 0; iter < 5000; iter++) {
+                  try (ScalarLongColumn column = (ScalarLongColumn) 
supplier.get()) {
+                    smokeTest(supplier, column);
+                  }
                 }
               }
-            }
-            catch (Throwable ex) {
-              failureReason.set(ex.getMessage());
-            }
-          })
-      );
+              catch (Throwable ex) {
+                failureReason.set(ex.getMessage());
+              }
+            })
+        );
+      }
+      threadsStartLatch.countDown();
+      Futures.allAsList(futures).get();
+      Assert.assertEquals(expectedReason, failureReason.get());
+    }
+    finally {
+      executorService.shutdownNow();
     }
-    threadsStartLatch.countDown();
-    Futures.allAsList(futures).get();
-    Assert.assertEquals(expectedReason, failureReason.get());
   }
 
   private void smokeTest(ScalarLongColumnAndIndexSupplier supplier, 
ScalarLongColumn column)
diff --git 
a/processing/src/test/java/org/apache/druid/segment/nested/ScalarStringColumnSupplierTest.java
 
b/processing/src/test/java/org/apache/druid/segment/nested/ScalarStringColumnSupplierTest.java
index d72970b3b12..b8684c9d370 100644
--- 
a/processing/src/test/java/org/apache/druid/segment/nested/ScalarStringColumnSupplierTest.java
+++ 
b/processing/src/test/java/org/apache/druid/segment/nested/ScalarStringColumnSupplierTest.java
@@ -217,30 +217,35 @@ public class ScalarStringColumnSupplierTest extends 
InitializedNullHandlingTest
 
     final int threads = 10;
     ListeningExecutorService executorService = 
MoreExecutors.listeningDecorator(
-        Execs.multiThreaded(threads, "StandardNestedColumnSupplierTest-%d")
+        Execs.multiThreaded(threads, "ScalarStringColumnSupplierTest-%d")
     );
-    Collection<ListenableFuture<?>> futures = new ArrayList<>(threads);
-    final CountDownLatch threadsStartLatch = new CountDownLatch(1);
-    for (int i = 0; i < threads; ++i) {
-      futures.add(
-          executorService.submit(() -> {
-            try {
-              threadsStartLatch.await();
-              for (int iter = 0; iter < 5000; iter++) {
-                try (StringUtf8DictionaryEncodedColumn column = 
(StringUtf8DictionaryEncodedColumn) supplier.get()) {
-                  smokeTest(supplier, column);
+    try {
+      Collection<ListenableFuture<?>> futures = new ArrayList<>(threads);
+      final CountDownLatch threadsStartLatch = new CountDownLatch(1);
+      for (int i = 0; i < threads; ++i) {
+        futures.add(
+            executorService.submit(() -> {
+              try {
+                threadsStartLatch.await();
+                for (int iter = 0; iter < 5000; iter++) {
+                  try (StringUtf8DictionaryEncodedColumn column = 
(StringUtf8DictionaryEncodedColumn) supplier.get()) {
+                    smokeTest(supplier, column);
+                  }
                 }
               }
-            }
-            catch (Throwable ex) {
-              failureReason.set(ex.getMessage());
-            }
-          })
-      );
+              catch (Throwable ex) {
+                failureReason.set(ex.getMessage());
+              }
+            })
+        );
+      }
+      threadsStartLatch.countDown();
+      Futures.allAsList(futures).get();
+      Assert.assertEquals(expectedReason, failureReason.get());
+    }
+    finally {
+      executorService.shutdownNow();
     }
-    threadsStartLatch.countDown();
-    Futures.allAsList(futures).get();
-    Assert.assertEquals(expectedReason, failureReason.get());
   }
 
   private void smokeTest(ScalarStringColumnAndIndexSupplier supplier, 
StringUtf8DictionaryEncodedColumn column)
diff --git 
a/processing/src/test/java/org/apache/druid/segment/nested/VariantColumnSupplierTest.java
 
b/processing/src/test/java/org/apache/druid/segment/nested/VariantColumnSupplierTest.java
index 0598552d519..14ce4652104 100644
--- 
a/processing/src/test/java/org/apache/druid/segment/nested/VariantColumnSupplierTest.java
+++ 
b/processing/src/test/java/org/apache/druid/segment/nested/VariantColumnSupplierTest.java
@@ -346,28 +346,33 @@ public class VariantColumnSupplierTest extends 
InitializedNullHandlingTest
     ListeningExecutorService executorService = 
MoreExecutors.listeningDecorator(
         Execs.multiThreaded(threads, "StandardNestedColumnSupplierTest-%d")
     );
-    Collection<ListenableFuture<?>> futures = new ArrayList<>(threads);
-    final CountDownLatch threadsStartLatch = new CountDownLatch(1);
-    for (int i = 0; i < threads; ++i) {
-      futures.add(
-          executorService.submit(() -> {
-            try {
-              threadsStartLatch.await();
-              for (int iter = 0; iter < 5000; iter++) {
-                try (VariantColumn column = (VariantColumn) supplier.get()) {
-                  smokeTest(supplier, column, data, expectedTypes);
+    try {
+      Collection<ListenableFuture<?>> futures = new ArrayList<>(threads);
+      final CountDownLatch threadsStartLatch = new CountDownLatch(1);
+      for (int i = 0; i < threads; ++i) {
+        futures.add(
+            executorService.submit(() -> {
+              try {
+                threadsStartLatch.await();
+                for (int iter = 0; iter < 5000; iter++) {
+                  try (VariantColumn column = (VariantColumn) supplier.get()) {
+                    smokeTest(supplier, column, data, expectedTypes);
+                  }
                 }
               }
-            }
-            catch (Throwable ex) {
-              failureReason.set(ex.getMessage());
-            }
-          })
-      );
+              catch (Throwable ex) {
+                failureReason.set(ex.getMessage());
+              }
+            })
+        );
+      }
+      threadsStartLatch.countDown();
+      Futures.allAsList(futures).get();
+      Assert.assertEquals(expectedReason, failureReason.get());
+    }
+    finally {
+      executorService.shutdownNow();
     }
-    threadsStartLatch.countDown();
-    Futures.allAsList(futures).get();
-    Assert.assertEquals(expectedReason, failureReason.get());
   }
 
   private void smokeTest(


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to