This is an automated email from the ASF dual-hosted git repository.
gian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 73675d06712 clean up some thread pools in tests (#17421)
73675d06712 is described below
commit 73675d067128412b54877a9c26de71d467599b7b
Author: Clint Wylie <[email protected]>
AuthorDate: Mon Oct 28 09:05:15 2024 -0700
clean up some thread pools in tests (#17421)
---
.../apache/druid/common/guava/GuavaUtilsTest.java | 77 +++---
.../java/util/emitter/core/HttpEmitterTest.java | 36 +--
.../core/HttpPostEmitterLoggerStressTest.java | 96 ++++----
.../emitter/core/HttpPostEmitterStressTest.java | 273 +++++++++++----------
.../util/emitter/core/HttpPostEmitterTest.java | 32 +--
.../util/metrics/BasicMonitorSchedulerTest.java | 7 +
.../epinephelinae/ConcurrentGrouperTest.java | 210 ++++++++--------
.../nested/NestedDataColumnSupplierTest.java | 45 ++--
.../nested/NestedDataColumnSupplierV4Test.java | 45 ++--
.../nested/ScalarDoubleColumnSupplierTest.java | 45 ++--
.../nested/ScalarLongColumnSupplierTest.java | 45 ++--
.../nested/ScalarStringColumnSupplierTest.java | 45 ++--
.../segment/nested/VariantColumnSupplierTest.java | 43 ++--
13 files changed, 535 insertions(+), 464 deletions(-)
diff --git
a/processing/src/test/java/org/apache/druid/common/guava/GuavaUtilsTest.java
b/processing/src/test/java/org/apache/druid/common/guava/GuavaUtilsTest.java
index 317cb350f97..8ec9584dcac 100644
--- a/processing/src/test/java/org/apache/druid/common/guava/GuavaUtilsTest.java
+++ b/processing/src/test/java/org/apache/druid/common/guava/GuavaUtilsTest.java
@@ -75,46 +75,53 @@ public class GuavaUtilsTest
int tasks = 3;
ExecutorService service = Execs.multiThreaded(tasks, "GuavaUtilsTest-%d");
ListeningExecutorService exc = MoreExecutors.listeningDecorator(service);
- //a flag what time to throw exception.
- AtomicBoolean someoneFailed = new AtomicBoolean(false);
- List<CountDownLatch> latches = new ArrayList<>(tasks);
- Function<Integer, List<ListenableFuture<Object>>> function = (taskCount)
-> {
- List<ListenableFuture<Object>> futures = new ArrayList<>();
- for (int i = 0; i < taskCount; i++) {
- final CountDownLatch latch = new CountDownLatch(1);
- latches.add(latch);
- ListenableFuture<Object> future = exc.submit(new Callable<Object>() {
- @Override
- public Object call() throws RuntimeException, InterruptedException
+ try {
+ //a flag what time to throw exception.
+ AtomicBoolean someoneFailed = new AtomicBoolean(false);
+ List<CountDownLatch> latches = new ArrayList<>(tasks);
+ Function<Integer, List<ListenableFuture<Object>>> function = (taskCount)
-> {
+ List<ListenableFuture<Object>> futures = new ArrayList<>();
+ for (int i = 0; i < taskCount; i++) {
+ final CountDownLatch latch = new CountDownLatch(1);
+ latches.add(latch);
+ ListenableFuture<Object> future = exc.submit(new Callable<Object>()
{
- latch.await(60, TimeUnit.SECONDS);
- if (someoneFailed.compareAndSet(false, true)) {
- throw new RuntimeException("This exception simulates an error");
+ @Override
+ public Object call() throws RuntimeException, InterruptedException
+ {
+ latch.await(60, TimeUnit.SECONDS);
+ if (someoneFailed.compareAndSet(false, true)) {
+ throw new RuntimeException("This exception simulates an
error");
+ }
+ return null;
}
- return null;
- }
- });
- futures.add(future);
- }
- return futures;
- };
+ });
+ futures.add(future);
+ }
+ return futures;
+ };
- List<ListenableFuture<Object>> futures = function.apply(tasks);
- Assert.assertEquals(tasks, futures.stream().filter(f ->
!f.isDone()).count());
- // "release" the last tasks, which will cause it to fail as someoneFailed
will still be false
- latches.get(tasks - 1).countDown();
+ List<ListenableFuture<Object>> futures = function.apply(tasks);
+ Assert.assertEquals(tasks, futures.stream().filter(f ->
!f.isDone()).count());
+ // "release" the last tasks, which will cause it to fail as
someoneFailed will still be false
+ latches.get(tasks - 1).countDown();
- ListenableFuture<List<Object>> future = Futures.allAsList(futures);
+ ListenableFuture<List<Object>> future = Futures.allAsList(futures);
- ExecutionException thrown = Assert.assertThrows(
- ExecutionException.class,
- future::get
- );
- Assert.assertEquals("This exception simulates an error",
thrown.getCause().getMessage());
- GuavaUtils.cancelAll(true, future, futures);
- Assert.assertEquals(0, futures.stream().filter(f -> !f.isDone()).count());
- for (CountDownLatch latch : latches) {
- latch.countDown();
+ ExecutionException thrown = Assert.assertThrows(
+ ExecutionException.class,
+ future::get
+ );
+ Assert.assertEquals("This exception simulates an error",
thrown.getCause().getMessage());
+ GuavaUtils.cancelAll(true, future, futures);
+ Assert.assertEquals(0, futures.stream().filter(f ->
!f.isDone()).count());
+ for (CountDownLatch latch : latches) {
+ latch.countDown();
+ }
+ }
+ finally {
+ exc.shutdownNow();
+ service.shutdownNow();
}
}
}
diff --git
a/processing/src/test/java/org/apache/druid/java/util/emitter/core/HttpEmitterTest.java
b/processing/src/test/java/org/apache/druid/java/util/emitter/core/HttpEmitterTest.java
index b589455b4d8..384b3a0a8ea 100644
---
a/processing/src/test/java/org/apache/druid/java/util/emitter/core/HttpEmitterTest.java
+++
b/processing/src/test/java/org/apache/druid/java/util/emitter/core/HttpEmitterTest.java
@@ -72,21 +72,27 @@ public class HttpEmitterTest
.setHttpTimeoutAllowanceFactor(timeoutAllowanceFactor)
.setFlushTimeout(BaseHttpEmittingConfig.TEST_FLUSH_TIMEOUT_MILLIS)
.build();
- final HttpPostEmitter emitter = new HttpPostEmitter(config, httpClient,
OBJECT_MAPPER);
+ try (final HttpPostEmitter emitter = new HttpPostEmitter(config,
httpClient, OBJECT_MAPPER)) {
+ long startMs = System.currentTimeMillis();
+ emitter.start();
+ emitter.emitAndReturnBatch(new IntEvent());
+ emitter.flush();
+ long fillTimeMs = System.currentTimeMillis() - startMs;
+ MatcherAssert.assertThat(
+ (double) timeoutUsed.get(),
+ Matchers.lessThan(fillTimeMs * (timeoutAllowanceFactor + 0.5))
+ );
- long startMs = System.currentTimeMillis();
- emitter.start();
- emitter.emitAndReturnBatch(new IntEvent());
- emitter.flush();
- long fillTimeMs = System.currentTimeMillis() - startMs;
- MatcherAssert.assertThat((double) timeoutUsed.get(),
Matchers.lessThan(fillTimeMs * (timeoutAllowanceFactor + 0.5)));
-
- startMs = System.currentTimeMillis();
- final Batch batch = emitter.emitAndReturnBatch(new IntEvent());
- Thread.sleep(1000);
- batch.seal();
- emitter.flush();
- fillTimeMs = System.currentTimeMillis() - startMs;
- MatcherAssert.assertThat((double) timeoutUsed.get(),
Matchers.lessThan(fillTimeMs * (timeoutAllowanceFactor + 0.5)));
+ startMs = System.currentTimeMillis();
+ final Batch batch = emitter.emitAndReturnBatch(new IntEvent());
+ Thread.sleep(1000);
+ batch.seal();
+ emitter.flush();
+ fillTimeMs = System.currentTimeMillis() - startMs;
+ MatcherAssert.assertThat(
+ (double) timeoutUsed.get(),
+ Matchers.lessThan(fillTimeMs * (timeoutAllowanceFactor + 0.5))
+ );
+ }
}
}
diff --git
a/processing/src/test/java/org/apache/druid/java/util/emitter/core/HttpPostEmitterLoggerStressTest.java
b/processing/src/test/java/org/apache/druid/java/util/emitter/core/HttpPostEmitterLoggerStressTest.java
index ad409467929..b4a6f5a25e3 100644
---
a/processing/src/test/java/org/apache/druid/java/util/emitter/core/HttpPostEmitterLoggerStressTest.java
+++
b/processing/src/test/java/org/apache/druid/java/util/emitter/core/HttpPostEmitterLoggerStressTest.java
@@ -52,61 +52,63 @@ public class HttpPostEmitterLoggerStressTest
.setBatchQueueSizeLimit(10)
.setMinHttpTimeoutMillis(100)
.build();
- final HttpPostEmitter emitter = new HttpPostEmitter(config, httpClient,
new ObjectMapper());
+ try (HttpPostEmitter emitter = new HttpPostEmitter(config, httpClient, new
ObjectMapper())) {
- emitter.start();
+ emitter.start();
- httpClient.setGoHandler(new GoHandler() {
- @Override
- protected ListenableFuture<Response> go(Request request)
+ httpClient.setGoHandler(new GoHandler()
{
- return GoHandlers.immediateFuture(EmitterTest.okResponse());
+ @Override
+ protected ListenableFuture<Response> go(Request request)
+ {
+ return GoHandlers.immediateFuture(EmitterTest.okResponse());
+ }
+ });
+
+ Event smallEvent = ServiceMetricEvent.builder()
+ .setFeed("smallEvents")
+ .setDimension("test", "hi")
+ .setMetric("metric", 10)
+ .build("qwerty", "asdfgh");
+
+ for (int i = 0; i < 1000; i++) {
+ emitter.emit(smallEvent);
+
+ Assert.assertTrue(emitter.getTotalFailedBuffers() <= 10);
+ Assert.assertTrue(emitter.getBuffersToEmit() <= 12);
}
- });
- Event smallEvent = ServiceMetricEvent.builder()
- .setFeed("smallEvents")
- .setDimension("test", "hi")
- .setMetric("metric", 10)
- .build("qwerty", "asdfgh");
+ // by the end of this test, there should be no outstanding failed buffers
- for (int i = 0; i < 1000; i++) {
- emitter.emit(smallEvent);
+ // with a flush time of 5s, min timeout of 100ms, 20s should be
+ // easily enough to get through all of the events
- Assert.assertTrue(emitter.getTotalFailedBuffers() <= 10);
- Assert.assertTrue(emitter.getBuffersToEmit() <= 12);
- }
-
- // by the end of this test, there should be no outstanding failed buffers
-
- // with a flush time of 5s, min timeout of 100ms, 20s should be
- // easily enough to get through all of the events
+ while (emitter.getTotalFailedBuffers() > 0) {
+ Thread.sleep(500);
+ }
- while (emitter.getTotalFailedBuffers() > 0) {
- Thread.sleep(500);
+ // there is also no reason to have too many log events
+ // refer to: https://github.com/apache/druid/issues/11279;
+
+ long countOfTimeouts = logCapture.getLogEvents().stream()
+ .filter(ev -> ev.getLevel() ==
Level.DEBUG)
+ .filter(ev -> ev.getThrown() instanceof
TimeoutException)
+ .count();
+
+ // 1000 events limit, implies we should have no more than
+ // 1000 rejected send events within the expected 20sec
+ // duration of the test
+ long limitTimeoutEvents = 1000;
+
+ Assert.assertTrue(
+ String.format(
+ Locale.getDefault(),
+ "too many timeouts (%d), expect less than (%d)",
+ countOfTimeouts,
+ limitTimeoutEvents
+ ),
+ countOfTimeouts < limitTimeoutEvents
+ );
}
-
- // there is also no reason to have too many log events
- // refer to: https://github.com/apache/druid/issues/11279;
-
- long countOfTimeouts = logCapture.getLogEvents().stream()
- .filter(ev -> ev.getLevel() == Level.DEBUG)
- .filter(ev -> ev.getThrown() instanceof TimeoutException)
- .count();
-
- // 1000 events limit, implies we should have no more than
- // 1000 rejected send events within the expected 20sec
- // duration of the test
- long limitTimeoutEvents = 1000;
-
- Assert.assertTrue(
- String.format(
- Locale.getDefault(),
- "too many timeouts (%d), expect less than (%d)",
- countOfTimeouts,
- limitTimeoutEvents),
- countOfTimeouts < limitTimeoutEvents);
-
- emitter.close();
}
}
diff --git
a/processing/src/test/java/org/apache/druid/java/util/emitter/core/HttpPostEmitterStressTest.java
b/processing/src/test/java/org/apache/druid/java/util/emitter/core/HttpPostEmitterStressTest.java
index 5935175d7de..2509db7a4b0 100644
---
a/processing/src/test/java/org/apache/druid/java/util/emitter/core/HttpPostEmitterStressTest.java
+++
b/processing/src/test/java/org/apache/druid/java/util/emitter/core/HttpPostEmitterStressTest.java
@@ -65,78 +65,80 @@ public class HttpPostEmitterStressTest
// For this test, we don't need any batches to be dropped, i. e.
"gaps" in data
.setBatchQueueSizeLimit(1000)
.build();
- final HttpPostEmitter emitter = new HttpPostEmitter(config, httpClient,
OBJECT_MAPPER);
- int nThreads = Runtime.getRuntime().availableProcessors() * 2;
- final List<IntList> eventsPerThread = new ArrayList<>(nThreads);
- final List<List<Batch>> eventBatchesPerThread = new ArrayList<>(nThreads);
- for (int i = 0; i < nThreads; i++) {
- eventsPerThread.add(new IntArrayList());
- eventBatchesPerThread.add(new ArrayList<Batch>());
- }
- for (int i = 0; i < N; i++) {
-
eventsPerThread.get(ThreadLocalRandom.current().nextInt(nThreads)).add(i);
- }
- final BitSet emittedEvents = new BitSet(N);
- httpClient.setGoHandler(new GoHandler()
- {
- @Override
- protected ListenableFuture<Response> go(Request request)
- {
- ByteBuffer batch = request.getByteBufferData().slice();
- while (batch.remaining() > 0) {
- emittedEvents.set(batch.getInt());
- }
- return GoHandlers.immediateFuture(EmitterTest.okResponse());
+ try (final HttpPostEmitter emitter = new HttpPostEmitter(config,
httpClient, OBJECT_MAPPER)) {
+ int nThreads = Runtime.getRuntime().availableProcessors() * 2;
+ final List<IntList> eventsPerThread = new ArrayList<>(nThreads);
+ final List<List<Batch>> eventBatchesPerThread = new
ArrayList<>(nThreads);
+ for (int i = 0; i < nThreads; i++) {
+ eventsPerThread.add(new IntArrayList());
+ eventBatchesPerThread.add(new ArrayList<Batch>());
+ }
+ for (int i = 0; i < N; i++) {
+
eventsPerThread.get(ThreadLocalRandom.current().nextInt(nThreads)).add(i);
}
- });
- emitter.start();
- final CountDownLatch threadsCompleted = new CountDownLatch(nThreads);
- for (int i = 0; i < nThreads; i++) {
- final int threadIndex = i;
- new Thread() {
+ final BitSet emittedEvents = new BitSet(N);
+ httpClient.setGoHandler(new GoHandler()
+ {
@Override
- public void run()
+ protected ListenableFuture<Response> go(Request request)
{
- IntList events = eventsPerThread.get(threadIndex);
- List<Batch> eventBatches = eventBatchesPerThread.get(threadIndex);
- IntEvent event = new IntEvent();
- for (int i = 0, eventsSize = events.size(); i < eventsSize; i++) {
- event.index = events.getInt(i);
- eventBatches.add(emitter.emitAndReturnBatch(event));
- if (i % 16 == 0) {
- try {
- Thread.sleep(10);
- }
- catch (InterruptedException e) {
- throw new RuntimeException(e);
+ ByteBuffer batch = request.getByteBufferData().slice();
+ while (batch.remaining() > 0) {
+ emittedEvents.set(batch.getInt());
+ }
+ return GoHandlers.immediateFuture(EmitterTest.okResponse());
+ }
+ });
+ emitter.start();
+ final CountDownLatch threadsCompleted = new CountDownLatch(nThreads);
+ for (int i = 0; i < nThreads; i++) {
+ final int threadIndex = i;
+ new Thread()
+ {
+ @Override
+ public void run()
+ {
+ IntList events = eventsPerThread.get(threadIndex);
+ List<Batch> eventBatches = eventBatchesPerThread.get(threadIndex);
+ IntEvent event = new IntEvent();
+ for (int i = 0, eventsSize = events.size(); i < eventsSize; i++) {
+ event.index = events.getInt(i);
+ eventBatches.add(emitter.emitAndReturnBatch(event));
+ if (i % 16 == 0) {
+ try {
+ Thread.sleep(10);
+ }
+ catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
}
}
+ threadsCompleted.countDown();
}
- threadsCompleted.countDown();
- }
- }.start();
- }
- threadsCompleted.await();
- emitter.flush();
- System.out.println("Allocated buffers: " +
emitter.getTotalAllocatedBuffers());
- for (int eventIndex = 0; eventIndex < N; eventIndex++) {
- if (!emittedEvents.get(eventIndex)) {
- for (int threadIndex = 0; threadIndex < eventsPerThread.size();
threadIndex++) {
- IntList threadEvents = eventsPerThread.get(threadIndex);
- int indexOfEvent = threadEvents.indexOf(eventIndex);
- if (indexOfEvent >= 0) {
- Batch batch =
eventBatchesPerThread.get(threadIndex).get(indexOfEvent);
- System.err.println(batch);
- int bufferWatermark = batch.getSealedBufferWatermark();
- ByteBuffer batchBuffer = ByteBuffer.wrap(batch.buffer);
- batchBuffer.limit(bufferWatermark);
- while (batchBuffer.remaining() > 0) {
- System.err.println(batchBuffer.getInt());
+ }.start();
+ }
+ threadsCompleted.await();
+ emitter.flush();
+ System.out.println("Allocated buffers: " +
emitter.getTotalAllocatedBuffers());
+ for (int eventIndex = 0; eventIndex < N; eventIndex++) {
+ if (!emittedEvents.get(eventIndex)) {
+ for (int threadIndex = 0; threadIndex < eventsPerThread.size();
threadIndex++) {
+ IntList threadEvents = eventsPerThread.get(threadIndex);
+ int indexOfEvent = threadEvents.indexOf(eventIndex);
+ if (indexOfEvent >= 0) {
+ Batch batch =
eventBatchesPerThread.get(threadIndex).get(indexOfEvent);
+ System.err.println(batch);
+ int bufferWatermark = batch.getSealedBufferWatermark();
+ ByteBuffer batchBuffer = ByteBuffer.wrap(batch.buffer);
+ batchBuffer.limit(bufferWatermark);
+ while (batchBuffer.remaining() > 0) {
+ System.err.println(batchBuffer.getInt());
+ }
+ break;
}
- break;
}
+ throw new AssertionError("event " + eventIndex);
}
- throw new AssertionError("event " + eventIndex);
}
}
}
@@ -151,34 +153,36 @@ public class HttpPostEmitterStressTest
.setMaxBatchSize(1024 * 1024)
.setBatchQueueSizeLimit(10)
.build();
- final HttpPostEmitter emitter = new HttpPostEmitter(config, httpClient,
new ObjectMapper());
+ try (final HttpPostEmitter emitter = new HttpPostEmitter(config,
httpClient, new ObjectMapper())) {
- emitter.start();
+ emitter.start();
- httpClient.setGoHandler(new GoHandler() {
- @Override
- protected ListenableFuture<Response> go(Request request)
+ httpClient.setGoHandler(new GoHandler()
{
- return GoHandlers.immediateFuture(EmitterTest.BAD_RESPONSE);
- }
- });
+ @Override
+ protected ListenableFuture<Response> go(Request request)
+ {
+ return GoHandlers.immediateFuture(EmitterTest.BAD_RESPONSE);
+ }
+ });
- char[] chars = new char[600000];
- Arrays.fill(chars, '*');
- String bigString = new String(chars);
+ char[] chars = new char[600000];
+ Arrays.fill(chars, '*');
+ String bigString = new String(chars);
- Event bigEvent = ServiceMetricEvent.builder()
- .setFeed("bigEvents")
- .setDimension("test", bigString)
- .setMetric("metric", 10)
- .build("qwerty", "asdfgh");
+ Event bigEvent = ServiceMetricEvent.builder()
+ .setFeed("bigEvents")
+ .setDimension("test", bigString)
+ .setMetric("metric", 10)
+ .build("qwerty", "asdfgh");
- for (int i = 0; i < 1000; i++) {
- emitter.emit(bigEvent);
- Assert.assertTrue(emitter.getLargeEventsToEmit() <= 11);
- }
+ for (int i = 0; i < 1000; i++) {
+ emitter.emit(bigEvent);
+ Assert.assertTrue(emitter.getLargeEventsToEmit() <= 11);
+ }
- emitter.flush();
+ emitter.flush();
+ }
}
@Test
@@ -191,64 +195,67 @@ public class HttpPostEmitterStressTest
.setMaxBatchSize(1024 * 1024)
.setBatchQueueSizeLimit(10)
.build();
- final HttpPostEmitter emitter = new HttpPostEmitter(config, httpClient,
new ObjectMapper());
+ try (final HttpPostEmitter emitter = new HttpPostEmitter(config,
httpClient, new ObjectMapper())) {
+ emitter.start();
- emitter.start();
-
- httpClient.setGoHandler(new GoHandler() {
- @Override
- protected ListenableFuture<Response> go(Request request)
+ httpClient.setGoHandler(new GoHandler()
{
- return GoHandlers.immediateFuture(EmitterTest.BAD_RESPONSE);
- }
- });
-
- char[] chars = new char[600000];
- Arrays.fill(chars, '*');
- String bigString = new String(chars);
-
- Event smallEvent = ServiceMetricEvent.builder()
- .setFeed("smallEvents")
- .setDimension("test", "hi")
- .setMetric("metric", 10)
- .build("qwerty", "asdfgh");
-
- Event bigEvent = ServiceMetricEvent.builder()
- .setFeed("bigEvents")
- .setDimension("test", bigString)
- .setMetric("metric", 10)
- .build("qwerty", "asdfgh");
-
- final CountDownLatch threadsCompleted = new CountDownLatch(2);
- new Thread() {
- @Override
- public void run()
+ @Override
+ protected ListenableFuture<Response> go(Request request)
+ {
+ return GoHandlers.immediateFuture(EmitterTest.BAD_RESPONSE);
+ }
+ });
+
+ char[] chars = new char[600000];
+ Arrays.fill(chars, '*');
+ String bigString = new String(chars);
+
+ Event smallEvent = ServiceMetricEvent.builder()
+ .setFeed("smallEvents")
+ .setDimension("test", "hi")
+ .setMetric("metric", 10)
+ .build("qwerty", "asdfgh");
+
+ Event bigEvent = ServiceMetricEvent.builder()
+ .setFeed("bigEvents")
+ .setDimension("test", bigString)
+ .setMetric("metric", 10)
+ .build("qwerty", "asdfgh");
+
+ final CountDownLatch threadsCompleted = new CountDownLatch(2);
+ new Thread()
{
- for (int i = 0; i < 1000; i++) {
+ @Override
+ public void run()
+ {
+ for (int i = 0; i < 1000; i++) {
- emitter.emit(smallEvent);
+ emitter.emit(smallEvent);
- Assert.assertTrue(emitter.getTotalFailedBuffers() <= 10);
- Assert.assertTrue(emitter.getBuffersToEmit() <= 12);
+ Assert.assertTrue(emitter.getTotalFailedBuffers() <= 10);
+ Assert.assertTrue(emitter.getBuffersToEmit() <= 12);
+ }
+ threadsCompleted.countDown();
}
- threadsCompleted.countDown();
- }
- }.start();
- new Thread() {
- @Override
- public void run()
+ }.start();
+ new Thread()
{
- for (int i = 0; i < 1000; i++) {
+ @Override
+ public void run()
+ {
+ for (int i = 0; i < 1000; i++) {
- emitter.emit(bigEvent);
+ emitter.emit(bigEvent);
- Assert.assertTrue(emitter.getTotalFailedBuffers() <= 10);
- Assert.assertTrue(emitter.getBuffersToEmit() <= 12);
+ Assert.assertTrue(emitter.getTotalFailedBuffers() <= 10);
+ Assert.assertTrue(emitter.getBuffersToEmit() <= 12);
+ }
+ threadsCompleted.countDown();
}
- threadsCompleted.countDown();
- }
- }.start();
- threadsCompleted.await();
- emitter.flush();
+ }.start();
+ threadsCompleted.await();
+ emitter.flush();
+ }
}
}
diff --git
a/processing/src/test/java/org/apache/druid/java/util/emitter/core/HttpPostEmitterTest.java
b/processing/src/test/java/org/apache/druid/java/util/emitter/core/HttpPostEmitterTest.java
index fd3d312e91f..987de0b3c5e 100644
---
a/processing/src/test/java/org/apache/druid/java/util/emitter/core/HttpPostEmitterTest.java
+++
b/processing/src/test/java/org/apache/druid/java/util/emitter/core/HttpPostEmitterTest.java
@@ -73,26 +73,26 @@ public class HttpPostEmitterTest
.setMaxBatchSize(1024 * 1024)
.setBatchQueueSizeLimit(1000)
.build();
- final HttpPostEmitter emitter = new HttpPostEmitter(config, httpClient,
OBJECT_MAPPER);
- emitter.start();
+ try (final HttpPostEmitter emitter = new HttpPostEmitter(config,
httpClient, OBJECT_MAPPER)) {
+ emitter.start();
- // emit first event
- emitter.emitAndReturnBatch(new IntEvent());
- Thread.sleep(1000L);
+ // emit first event
+ emitter.emitAndReturnBatch(new IntEvent());
+ Thread.sleep(1000L);
- // get concurrentBatch reference and set value to lon as if it would fail
while
- // HttpPostEmitter#onSealExclusive method invocation.
- Field concurrentBatch =
emitter.getClass().getDeclaredField("concurrentBatch");
- concurrentBatch.setAccessible(true);
- ((AtomicReference<Object>) concurrentBatch.get(emitter)).getAndSet(1L);
- // something terrible happened previously so that batch has to recover
+ // get concurrentBatch reference and set value to lon as if it would
fail while
+ // HttpPostEmitter#onSealExclusive method invocation.
+ Field concurrentBatch =
emitter.getClass().getDeclaredField("concurrentBatch");
+ concurrentBatch.setAccessible(true);
+ ((AtomicReference<Object>) concurrentBatch.get(emitter)).getAndSet(1L);
+ // something terrible happened previously so that batch has to recover
- // emit second event
- emitter.emitAndReturnBatch(new IntEvent());
+ // emit second event
+ emitter.emitAndReturnBatch(new IntEvent());
- emitter.flush();
- emitter.close();
+ emitter.flush();
- Assert.assertEquals(2, emitter.getTotalEmittedEvents());
+ Assert.assertEquals(2, emitter.getTotalEmittedEvents());
+ }
}
}
diff --git
a/processing/src/test/java/org/apache/druid/java/util/metrics/BasicMonitorSchedulerTest.java
b/processing/src/test/java/org/apache/druid/java/util/metrics/BasicMonitorSchedulerTest.java
index ceac1e55644..6e6ca49cdd9 100644
---
a/processing/src/test/java/org/apache/druid/java/util/metrics/BasicMonitorSchedulerTest.java
+++
b/processing/src/test/java/org/apache/druid/java/util/metrics/BasicMonitorSchedulerTest.java
@@ -23,6 +23,7 @@ import com.google.common.collect.ImmutableList;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.joda.time.Duration;
+import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.mockito.ArgumentMatchers;
@@ -50,6 +51,12 @@ public class BasicMonitorSchedulerTest
exec = Execs.scheduledSingleThreaded("BasicMonitorSchedulerTest");
}
+ @After
+ public void teardown()
+ {
+ exec.shutdownNow();
+ }
+
@Test
public void testStart_RepeatScheduling() throws InterruptedException
{
diff --git
a/processing/src/test/java/org/apache/druid/query/groupby/epinephelinae/ConcurrentGrouperTest.java
b/processing/src/test/java/org/apache/druid/query/groupby/epinephelinae/ConcurrentGrouperTest.java
index 6ab2f1a7bf0..4da2fbbb9a2 100644
---
a/processing/src/test/java/org/apache/druid/query/groupby/epinephelinae/ConcurrentGrouperTest.java
+++
b/processing/src/test/java/org/apache/druid/query/groupby/epinephelinae/ConcurrentGrouperTest.java
@@ -24,6 +24,7 @@ import com.fasterxml.jackson.annotation.JsonValue;
import com.google.common.base.Supplier;
import com.google.common.collect.ImmutableList;
import com.google.common.primitives.Longs;
+import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import org.apache.druid.collections.ReferenceCountingResourceHolder;
import org.apache.druid.jackson.DefaultObjectMapper;
@@ -150,67 +151,72 @@ public class ConcurrentGrouperTest extends
InitializedNullHandlingTest
temporaryFolder.newFolder(),
1024 * 1024
);
+ final ListeningExecutorService service =
MoreExecutors.listeningDecorator(exec);
+ try {
+ final ConcurrentGrouper<LongKey> grouper = new ConcurrentGrouper<>(
+ bufferSupplier,
+ TEST_RESOURCE_HOLDER,
+ KEY_SERDE_FACTORY,
+ KEY_SERDE_FACTORY,
+ NULL_FACTORY,
+ new AggregatorFactory[]{new CountAggregatorFactory("cnt")},
+ 1024,
+ 0.7f,
+ 1,
+ temporaryStorage,
+ new DefaultObjectMapper(),
+ concurrencyHint,
+ null,
+ false,
+ service,
+ 0,
+ false,
+ 0,
+ 4,
+ parallelCombineThreads,
+ mergeThreadLocal
+ );
+ closer.register(grouper);
+ grouper.init();
+
+ final int numRows = 1000;
+
+ Future<?>[] futures = new Future[concurrencyHint];
+
+ for (int i = 0; i < concurrencyHint; i++) {
+ futures[i] = exec.submit(() -> {
+ for (long j = 0; j < numRows; j++) {
+ if (!grouper.aggregate(new LongKey(j)).isOk()) {
+ throw new ISE("Grouper is full");
+ }
+ }
+ });
+ }
- final ConcurrentGrouper<LongKey> grouper = new ConcurrentGrouper<>(
- bufferSupplier,
- TEST_RESOURCE_HOLDER,
- KEY_SERDE_FACTORY,
- KEY_SERDE_FACTORY,
- NULL_FACTORY,
- new AggregatorFactory[]{new CountAggregatorFactory("cnt")},
- 1024,
- 0.7f,
- 1,
- temporaryStorage,
- new DefaultObjectMapper(),
- concurrencyHint,
- null,
- false,
- MoreExecutors.listeningDecorator(exec),
- 0,
- false,
- 0,
- 4,
- parallelCombineThreads,
- mergeThreadLocal
- );
- closer.register(grouper);
- grouper.init();
+ for (Future eachFuture : futures) {
+ eachFuture.get();
+ }
- final int numRows = 1000;
+ final List<Entry<LongKey>> expected = new ArrayList<>();
+ for (long i = 0; i < numRows; i++) {
+ expected.add(new ReusableEntry<>(new LongKey(i), new Object[]{(long)
concurrencyHint}));
+ }
- Future<?>[] futures = new Future[concurrencyHint];
+ final CloseableIterator<Entry<LongKey>> iterator =
closer.register(grouper.iterator(true));
- for (int i = 0; i < concurrencyHint; i++) {
- futures[i] = exec.submit(() -> {
- for (long j = 0; j < numRows; j++) {
- if (!grouper.aggregate(new LongKey(j)).isOk()) {
- throw new ISE("Grouper is full");
- }
- }
- });
- }
-
- for (Future eachFuture : futures) {
- eachFuture.get();
- }
+ if (parallelCombineThreads > 1 && (mergeThreadLocal ||
temporaryStorage.currentSize() > 0)) {
+ // Parallel combiner configured, and expected to actually be used due
to thread-local merge (either explicitly
+ // configured, or due to spilling).
+ Assert.assertTrue(TEST_RESOURCE_HOLDER.taken);
+ } else {
+ Assert.assertFalse(TEST_RESOURCE_HOLDER.taken);
+ }
- final List<Entry<LongKey>> expected = new ArrayList<>();
- for (long i = 0; i < numRows; i++) {
- expected.add(new ReusableEntry<>(new LongKey(i), new Object[]{(long)
concurrencyHint}));
+ GrouperTestUtil.assertEntriesEquals(expected.iterator(), iterator);
}
-
- final CloseableIterator<Entry<LongKey>> iterator =
closer.register(grouper.iterator(true));
-
- if (parallelCombineThreads > 1 && (mergeThreadLocal ||
temporaryStorage.currentSize() > 0)) {
- // Parallel combiner configured, and expected to actually be used due to
thread-local merge (either explicitly
- // configured, or due to spilling).
- Assert.assertTrue(TEST_RESOURCE_HOLDER.taken);
- } else {
- Assert.assertFalse(TEST_RESOURCE_HOLDER.taken);
+ finally {
+ service.shutdownNow();
}
-
- GrouperTestUtil.assertEntriesEquals(expected.iterator(), iterator);
}
@Test
@@ -221,56 +227,62 @@ public class ConcurrentGrouperTest extends
InitializedNullHandlingTest
return;
}
- final ConcurrentGrouper<LongKey> grouper = new ConcurrentGrouper<>(
- bufferSupplier,
- TEST_RESOURCE_HOLDER,
- KEY_SERDE_FACTORY,
- KEY_SERDE_FACTORY,
- NULL_FACTORY,
- new AggregatorFactory[]{new CountAggregatorFactory("cnt")},
- 1024,
- 0.7f,
- 1,
- new LimitedTemporaryStorage(temporaryFolder.newFolder(), 1024 * 1024),
- new DefaultObjectMapper(),
- concurrencyHint,
- null,
- false,
- MoreExecutors.listeningDecorator(exec),
- 0,
- true,
- 1,
- 4,
- parallelCombineThreads,
- mergeThreadLocal
- );
- closer.register(grouper);
- grouper.init();
+ ListeningExecutorService service = MoreExecutors.listeningDecorator(exec);
+ try {
+ final ConcurrentGrouper<LongKey> grouper = new ConcurrentGrouper<>(
+ bufferSupplier,
+ TEST_RESOURCE_HOLDER,
+ KEY_SERDE_FACTORY,
+ KEY_SERDE_FACTORY,
+ NULL_FACTORY,
+ new AggregatorFactory[]{new CountAggregatorFactory("cnt")},
+ 1024,
+ 0.7f,
+ 1,
+ new LimitedTemporaryStorage(temporaryFolder.newFolder(), 1024 *
1024),
+ new DefaultObjectMapper(),
+ concurrencyHint,
+ null,
+ false,
+ service,
+ 0,
+ true,
+ 1,
+ 4,
+ parallelCombineThreads,
+ mergeThreadLocal
+ );
+ closer.register(grouper);
+ grouper.init();
+
+ final int numRows = 1000;
+
+ Future<?>[] futures = new Future[concurrencyHint];
+
+ for (int i = 0; i < concurrencyHint; i++) {
+ futures[i] = exec.submit(() -> {
+ for (long j = 0; j < numRows; j++) {
+ if (!grouper.aggregate(new LongKey(j)).isOk()) {
+ throw new ISE("Grouper is full");
+ }
+ }
+ });
+ }
- final int numRows = 1000;
+ for (Future eachFuture : futures) {
+ eachFuture.get();
+ }
- Future<?>[] futures = new Future[concurrencyHint];
+ final QueryTimeoutException e = Assert.assertThrows(
+ QueryTimeoutException.class,
+ () -> closer.register(grouper.iterator(true))
+ );
- for (int i = 0; i < concurrencyHint; i++) {
- futures[i] = exec.submit(() -> {
- for (long j = 0; j < numRows; j++) {
- if (!grouper.aggregate(new LongKey(j)).isOk()) {
- throw new ISE("Grouper is full");
- }
- }
- });
+ Assert.assertEquals("Query timeout", e.getErrorCode());
}
-
- for (Future eachFuture : futures) {
- eachFuture.get();
+ finally {
+ service.shutdownNow();
}
-
- final QueryTimeoutException e = Assert.assertThrows(
- QueryTimeoutException.class,
- () -> closer.register(grouper.iterator(true))
- );
-
- Assert.assertEquals("Query timeout", e.getErrorCode());
}
static class TestResourceHolder extends
ReferenceCountingResourceHolder<ByteBuffer>
diff --git
a/processing/src/test/java/org/apache/druid/segment/nested/NestedDataColumnSupplierTest.java
b/processing/src/test/java/org/apache/druid/segment/nested/NestedDataColumnSupplierTest.java
index 64ba2679f04..7cf94220100 100644
---
a/processing/src/test/java/org/apache/druid/segment/nested/NestedDataColumnSupplierTest.java
+++
b/processing/src/test/java/org/apache/druid/segment/nested/NestedDataColumnSupplierTest.java
@@ -299,30 +299,35 @@ public class NestedDataColumnSupplierTest extends
InitializedNullHandlingTest
final int threads = 10;
ListeningExecutorService executorService =
MoreExecutors.listeningDecorator(
- Execs.multiThreaded(threads, "StandardNestedColumnSupplierTest-%d")
+ Execs.multiThreaded(threads, "NestedColumnSupplierTest-%d")
);
- Collection<ListenableFuture<?>> futures = new ArrayList<>(threads);
- final CountDownLatch threadsStartLatch = new CountDownLatch(1);
- for (int i = 0; i < threads; ++i) {
- futures.add(
- executorService.submit(() -> {
- try {
- threadsStartLatch.await();
- for (int iter = 0; iter < 5000; iter++) {
- try (NestedDataComplexColumn column =
(NestedDataComplexColumn) supplier.get()) {
- smokeTest(column);
+ try {
+ Collection<ListenableFuture<?>> futures = new ArrayList<>(threads);
+ final CountDownLatch threadsStartLatch = new CountDownLatch(1);
+ for (int i = 0; i < threads; ++i) {
+ futures.add(
+ executorService.submit(() -> {
+ try {
+ threadsStartLatch.await();
+ for (int iter = 0; iter < 5000; iter++) {
+ try (NestedDataComplexColumn column =
(NestedDataComplexColumn) supplier.get()) {
+ smokeTest(column);
+ }
}
}
- }
- catch (Throwable ex) {
- failureReason.set(ex.getMessage());
- }
- })
- );
+ catch (Throwable ex) {
+ failureReason.set(ex.getMessage());
+ }
+ })
+ );
+ }
+ threadsStartLatch.countDown();
+ Futures.allAsList(futures).get();
+ Assert.assertEquals(expectedReason, failureReason.get());
+ }
+ finally {
+ executorService.shutdownNow();
}
- threadsStartLatch.countDown();
- Futures.allAsList(futures).get();
- Assert.assertEquals(expectedReason, failureReason.get());
}
private void smokeTest(NestedDataComplexColumn column) throws IOException
diff --git
a/processing/src/test/java/org/apache/druid/segment/nested/NestedDataColumnSupplierV4Test.java
b/processing/src/test/java/org/apache/druid/segment/nested/NestedDataColumnSupplierV4Test.java
index aa42d58710d..315077b4160 100644
---
a/processing/src/test/java/org/apache/druid/segment/nested/NestedDataColumnSupplierV4Test.java
+++
b/processing/src/test/java/org/apache/druid/segment/nested/NestedDataColumnSupplierV4Test.java
@@ -255,30 +255,35 @@ public class NestedDataColumnSupplierV4Test extends
InitializedNullHandlingTest
final int threads = 10;
ListeningExecutorService executorService =
MoreExecutors.listeningDecorator(
- Execs.multiThreaded(threads, "NestedDataColumnSupplierTest-%d")
+ Execs.multiThreaded(threads, "NestedDataColumnSupplierV4Test-%d")
);
- Collection<ListenableFuture<?>> futures = new ArrayList<>(threads);
- final CountDownLatch threadsStartLatch = new CountDownLatch(1);
- for (int i = 0; i < threads; ++i) {
- futures.add(
- executorService.submit(() -> {
- try {
- threadsStartLatch.await();
- for (int iter = 0; iter < 5000; iter++) {
- try (NestedDataComplexColumn column =
(NestedDataComplexColumn) supplier.get()) {
- smokeTest(column);
+ try {
+ Collection<ListenableFuture<?>> futures = new ArrayList<>(threads);
+ final CountDownLatch threadsStartLatch = new CountDownLatch(1);
+ for (int i = 0; i < threads; ++i) {
+ futures.add(
+ executorService.submit(() -> {
+ try {
+ threadsStartLatch.await();
+ for (int iter = 0; iter < 5000; iter++) {
+ try (NestedDataComplexColumn column =
(NestedDataComplexColumn) supplier.get()) {
+ smokeTest(column);
+ }
}
}
- }
- catch (Throwable ex) {
- failureReason.set(ex.getMessage());
- }
- })
- );
+ catch (Throwable ex) {
+ failureReason.set(ex.getMessage());
+ }
+ })
+ );
+ }
+ threadsStartLatch.countDown();
+ Futures.allAsList(futures).get();
+ Assert.assertEquals(expectedReason, failureReason.get());
+ }
+ finally {
+ executorService.shutdownNow();
}
- threadsStartLatch.countDown();
- Futures.allAsList(futures).get();
- Assert.assertEquals(expectedReason, failureReason.get());
}
@Test
diff --git
a/processing/src/test/java/org/apache/druid/segment/nested/ScalarDoubleColumnSupplierTest.java
b/processing/src/test/java/org/apache/druid/segment/nested/ScalarDoubleColumnSupplierTest.java
index f483e297be0..b095ad73aa6 100644
---
a/processing/src/test/java/org/apache/druid/segment/nested/ScalarDoubleColumnSupplierTest.java
+++
b/processing/src/test/java/org/apache/druid/segment/nested/ScalarDoubleColumnSupplierTest.java
@@ -219,30 +219,35 @@ public class ScalarDoubleColumnSupplierTest extends
InitializedNullHandlingTest
final int threads = 10;
ListeningExecutorService executorService =
MoreExecutors.listeningDecorator(
- Execs.multiThreaded(threads, "StandardNestedColumnSupplierTest-%d")
+ Execs.multiThreaded(threads, "ScalarDoubleColumnSupplierTest-%d")
);
- Collection<ListenableFuture<?>> futures = new ArrayList<>(threads);
- final CountDownLatch threadsStartLatch = new CountDownLatch(1);
- for (int i = 0; i < threads; ++i) {
- futures.add(
- executorService.submit(() -> {
- try {
- threadsStartLatch.await();
- for (int iter = 0; iter < 5000; iter++) {
- try (ScalarDoubleColumn column = (ScalarDoubleColumn)
supplier.get()) {
- smokeTest(supplier, column);
+ try {
+ Collection<ListenableFuture<?>> futures = new ArrayList<>(threads);
+ final CountDownLatch threadsStartLatch = new CountDownLatch(1);
+ for (int i = 0; i < threads; ++i) {
+ futures.add(
+ executorService.submit(() -> {
+ try {
+ threadsStartLatch.await();
+ for (int iter = 0; iter < 5000; iter++) {
+ try (ScalarDoubleColumn column = (ScalarDoubleColumn)
supplier.get()) {
+ smokeTest(supplier, column);
+ }
}
}
- }
- catch (Throwable ex) {
- failureReason.set(ex.getMessage());
- }
- })
- );
+ catch (Throwable ex) {
+ failureReason.set(ex.getMessage());
+ }
+ })
+ );
+ }
+ threadsStartLatch.countDown();
+ Futures.allAsList(futures).get();
+ Assert.assertEquals(expectedReason, failureReason.get());
+ }
+ finally {
+ executorService.shutdownNow();
}
- threadsStartLatch.countDown();
- Futures.allAsList(futures).get();
- Assert.assertEquals(expectedReason, failureReason.get());
}
private void smokeTest(ScalarDoubleColumnAndIndexSupplier supplier,
ScalarDoubleColumn column)
diff --git
a/processing/src/test/java/org/apache/druid/segment/nested/ScalarLongColumnSupplierTest.java
b/processing/src/test/java/org/apache/druid/segment/nested/ScalarLongColumnSupplierTest.java
index c8830f3aefd..68b73a27bc2 100644
---
a/processing/src/test/java/org/apache/druid/segment/nested/ScalarLongColumnSupplierTest.java
+++
b/processing/src/test/java/org/apache/druid/segment/nested/ScalarLongColumnSupplierTest.java
@@ -219,30 +219,35 @@ public class ScalarLongColumnSupplierTest extends
InitializedNullHandlingTest
final int threads = 10;
ListeningExecutorService executorService =
MoreExecutors.listeningDecorator(
- Execs.multiThreaded(threads, "StandardNestedColumnSupplierTest-%d")
+ Execs.multiThreaded(threads, "ScalarLongColumnSupplierTest-%d")
);
- Collection<ListenableFuture<?>> futures = new ArrayList<>(threads);
- final CountDownLatch threadsStartLatch = new CountDownLatch(1);
- for (int i = 0; i < threads; ++i) {
- futures.add(
- executorService.submit(() -> {
- try {
- threadsStartLatch.await();
- for (int iter = 0; iter < 5000; iter++) {
- try (ScalarLongColumn column = (ScalarLongColumn)
supplier.get()) {
- smokeTest(supplier, column);
+ try {
+ Collection<ListenableFuture<?>> futures = new ArrayList<>(threads);
+ final CountDownLatch threadsStartLatch = new CountDownLatch(1);
+ for (int i = 0; i < threads; ++i) {
+ futures.add(
+ executorService.submit(() -> {
+ try {
+ threadsStartLatch.await();
+ for (int iter = 0; iter < 5000; iter++) {
+ try (ScalarLongColumn column = (ScalarLongColumn)
supplier.get()) {
+ smokeTest(supplier, column);
+ }
}
}
- }
- catch (Throwable ex) {
- failureReason.set(ex.getMessage());
- }
- })
- );
+ catch (Throwable ex) {
+ failureReason.set(ex.getMessage());
+ }
+ })
+ );
+ }
+ threadsStartLatch.countDown();
+ Futures.allAsList(futures).get();
+ Assert.assertEquals(expectedReason, failureReason.get());
+ }
+ finally {
+ executorService.shutdownNow();
}
- threadsStartLatch.countDown();
- Futures.allAsList(futures).get();
- Assert.assertEquals(expectedReason, failureReason.get());
}
private void smokeTest(ScalarLongColumnAndIndexSupplier supplier,
ScalarLongColumn column)
diff --git
a/processing/src/test/java/org/apache/druid/segment/nested/ScalarStringColumnSupplierTest.java
b/processing/src/test/java/org/apache/druid/segment/nested/ScalarStringColumnSupplierTest.java
index d72970b3b12..b8684c9d370 100644
---
a/processing/src/test/java/org/apache/druid/segment/nested/ScalarStringColumnSupplierTest.java
+++
b/processing/src/test/java/org/apache/druid/segment/nested/ScalarStringColumnSupplierTest.java
@@ -217,30 +217,35 @@ public class ScalarStringColumnSupplierTest extends
InitializedNullHandlingTest
final int threads = 10;
ListeningExecutorService executorService =
MoreExecutors.listeningDecorator(
- Execs.multiThreaded(threads, "StandardNestedColumnSupplierTest-%d")
+ Execs.multiThreaded(threads, "ScalarStringColumnSupplierTest-%d")
);
- Collection<ListenableFuture<?>> futures = new ArrayList<>(threads);
- final CountDownLatch threadsStartLatch = new CountDownLatch(1);
- for (int i = 0; i < threads; ++i) {
- futures.add(
- executorService.submit(() -> {
- try {
- threadsStartLatch.await();
- for (int iter = 0; iter < 5000; iter++) {
- try (StringUtf8DictionaryEncodedColumn column =
(StringUtf8DictionaryEncodedColumn) supplier.get()) {
- smokeTest(supplier, column);
+ try {
+ Collection<ListenableFuture<?>> futures = new ArrayList<>(threads);
+ final CountDownLatch threadsStartLatch = new CountDownLatch(1);
+ for (int i = 0; i < threads; ++i) {
+ futures.add(
+ executorService.submit(() -> {
+ try {
+ threadsStartLatch.await();
+ for (int iter = 0; iter < 5000; iter++) {
+ try (StringUtf8DictionaryEncodedColumn column =
(StringUtf8DictionaryEncodedColumn) supplier.get()) {
+ smokeTest(supplier, column);
+ }
}
}
- }
- catch (Throwable ex) {
- failureReason.set(ex.getMessage());
- }
- })
- );
+ catch (Throwable ex) {
+ failureReason.set(ex.getMessage());
+ }
+ })
+ );
+ }
+ threadsStartLatch.countDown();
+ Futures.allAsList(futures).get();
+ Assert.assertEquals(expectedReason, failureReason.get());
+ }
+ finally {
+ executorService.shutdownNow();
}
- threadsStartLatch.countDown();
- Futures.allAsList(futures).get();
- Assert.assertEquals(expectedReason, failureReason.get());
}
private void smokeTest(ScalarStringColumnAndIndexSupplier supplier,
StringUtf8DictionaryEncodedColumn column)
diff --git
a/processing/src/test/java/org/apache/druid/segment/nested/VariantColumnSupplierTest.java
b/processing/src/test/java/org/apache/druid/segment/nested/VariantColumnSupplierTest.java
index 0598552d519..14ce4652104 100644
---
a/processing/src/test/java/org/apache/druid/segment/nested/VariantColumnSupplierTest.java
+++
b/processing/src/test/java/org/apache/druid/segment/nested/VariantColumnSupplierTest.java
@@ -346,28 +346,33 @@ public class VariantColumnSupplierTest extends
InitializedNullHandlingTest
ListeningExecutorService executorService =
MoreExecutors.listeningDecorator(
Execs.multiThreaded(threads, "StandardNestedColumnSupplierTest-%d")
);
- Collection<ListenableFuture<?>> futures = new ArrayList<>(threads);
- final CountDownLatch threadsStartLatch = new CountDownLatch(1);
- for (int i = 0; i < threads; ++i) {
- futures.add(
- executorService.submit(() -> {
- try {
- threadsStartLatch.await();
- for (int iter = 0; iter < 5000; iter++) {
- try (VariantColumn column = (VariantColumn) supplier.get()) {
- smokeTest(supplier, column, data, expectedTypes);
+ try {
+ Collection<ListenableFuture<?>> futures = new ArrayList<>(threads);
+ final CountDownLatch threadsStartLatch = new CountDownLatch(1);
+ for (int i = 0; i < threads; ++i) {
+ futures.add(
+ executorService.submit(() -> {
+ try {
+ threadsStartLatch.await();
+ for (int iter = 0; iter < 5000; iter++) {
+ try (VariantColumn column = (VariantColumn) supplier.get()) {
+ smokeTest(supplier, column, data, expectedTypes);
+ }
}
}
- }
- catch (Throwable ex) {
- failureReason.set(ex.getMessage());
- }
- })
- );
+ catch (Throwable ex) {
+ failureReason.set(ex.getMessage());
+ }
+ })
+ );
+ }
+ threadsStartLatch.countDown();
+ Futures.allAsList(futures).get();
+ Assert.assertEquals(expectedReason, failureReason.get());
+ }
+ finally {
+ executorService.shutdownNow();
}
- threadsStartLatch.countDown();
- Futures.allAsList(futures).get();
- Assert.assertEquals(expectedReason, failureReason.get());
}
private void smokeTest(
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]