This is an automated email from the ASF dual-hosted git repository. volodymyr pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/drill.git
commit ac96381e455a6127b7abad2cf19508f160b136cd Author: Vlad Rozov <[email protected]> AuthorDate: Sat Apr 21 08:07:42 2018 -0700 DRILL-6281: Refactor TimedRunnable --- .../org/apache/drill/exec/store/TimedCallable.java | 334 ++++++++++++--------- .../drill/exec/store/parquet/FooterGatherer.java | 14 +- .../exec/store/parquet/metadata/Metadata.java | 12 +- .../drill/exec/store/schedule/BlockMapBuilder.java | 17 +- .../apache/drill/exec/store/TestTimedCallable.java | 45 ++- 5 files changed, 239 insertions(+), 183 deletions(-) diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/TimedCallable.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/TimedCallable.java index 1afd716..ecc5579 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/TimedCallable.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/TimedCallable.java @@ -19,190 +19,248 @@ package org.apache.drill.exec.store; import java.io.IOException; import java.util.List; -import java.util.concurrent.CountDownLatch; +import java.util.Objects; +import java.util.concurrent.Callable; +import java.util.concurrent.CancellationException; +import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.TimeUnit; +import java.util.function.Consumer; +import java.util.function.Function; +import java.util.stream.Collectors; -import com.google.common.base.Stopwatch; -import org.apache.drill.common.concurrent.ExtendedLatch; import org.apache.drill.common.exceptions.UserException; + import org.slf4j.Logger; +import org.slf4j.LoggerFactory; -import com.google.common.collect.Lists; +import com.google.common.base.Preconditions; +import com.google.common.base.Stopwatch; +import com.google.common.util.concurrent.MoreExecutors; +import com.google.common.util.concurrent.ThreadFactoryBuilder; /** * Class used to allow parallel executions of tasks in a simplified way. Also maintains and reports timings of task completion. * TODO: look at switching to fork join. * @param <V> The time value that will be returned when the task is executed. */ -public abstract class TimedCallable<V> implements Runnable { +public abstract class TimedCallable<V> implements Callable<V> { + private static final Logger logger = LoggerFactory.getLogger(TimedCallable.class); private static long TIMEOUT_PER_RUNNABLE_IN_MSECS = 15000; - private volatile Exception e; - private volatile long threadStart; - private volatile long timeNanos; - private volatile V value; + private volatile long startTime = 0; + private volatile long executionTime = -1; - @Override - public final void run() { - long start = System.nanoTime(); - threadStart=start; - try{ - value = runInner(); - }catch(Exception e){ - this.e = e; - }finally{ - timeNanos = System.nanoTime() - start; - } - } + private static class FutureMapper<V> implements Function<Future<V>, V> { + int count; + Throwable throwable = null; - protected abstract V runInner() throws Exception ; - protected abstract IOException convertToIOException(Exception e); + private void setThrowable(Throwable t) { + if (throwable == null) { + throwable = t; + } else { + throwable.addSuppressed(t); + } + } - public long getThreadStart(){ - return threadStart; - } - public long getTimeSpentNanos(){ - return timeNanos; + @Override + public V apply(Future<V> future) { + Preconditions.checkState(future.isDone()); + if (!future.isCancelled()) { + try { + count++; + return future.get(); + } catch (InterruptedException e) { + // there is no wait as we are getting result from the completed/done future + logger.error("Unexpected exception", e); + throw UserException.internalError(e) + .message("Unexpected exception") + .build(logger); + } catch (ExecutionException e) { + setThrowable(e.getCause()); + } + } else { + setThrowable(new CancellationException()); + } + return null; + } } - public final V getValue() throws IOException { - if(e != null){ - if(e instanceof IOException){ - throw (IOException) e; - }else{ - throw convertToIOException(e); + private static class Statistics<V> implements Consumer<TimedCallable<V>> { + final long start = System.nanoTime(); + final Stopwatch watch = Stopwatch.createStarted(); + long totalExecution; + long maxExecution; + int count; + int startedCount; + private int doneCount; + // measure thread creation times + long earliestStart; + long latestStart; + long totalStart; + + @Override + public void accept(TimedCallable<V> task) { + count++; + long threadStart = task.getStartTime(TimeUnit.NANOSECONDS) - start; + if (threadStart >= 0) { + startedCount++; + earliestStart = Math.min(earliestStart, threadStart); + latestStart = Math.max(latestStart, threadStart); + totalStart += threadStart; + long executionTime = task.getExecutionTime(TimeUnit.NANOSECONDS); + if (executionTime != -1) { + doneCount++; + totalExecution += executionTime; + maxExecution = Math.max(maxExecution, executionTime); + } else { + logger.info("Task {} started at {} did not finish", task, threadStart); + } + } else { + logger.info("Task {} never commenced execution", task); } } - return value; - } + Statistics<V> collect(final List<TimedCallable<V>> tasks) { + totalExecution = maxExecution = 0; + count = startedCount = doneCount = 0; + earliestStart = Long.MAX_VALUE; + latestStart = totalStart = 0; + tasks.forEach(this); + return this; + } - private static class LatchedRunnable implements Runnable { - final CountDownLatch latch; - final Runnable runnable; + void log(final String activity, final Logger logger, int parallelism) { + if (startedCount > 0) { + logger.debug("{}: started {} out of {} using {} threads. (start time: min {} ms, avg {} ms, max {} ms).", + activity, startedCount, count, parallelism, + TimeUnit.NANOSECONDS.toMillis(earliestStart), + TimeUnit.NANOSECONDS.toMillis(totalStart) / startedCount, + TimeUnit.NANOSECONDS.toMillis(latestStart)); + } else { + logger.debug("{}: started {} out of {} using {} threads.", activity, startedCount, count, parallelism); + } - public LatchedRunnable(CountDownLatch latch, Runnable runnable){ - this.latch = latch; - this.runnable = runnable; + if (doneCount > 0) { + logger.debug("{}: completed {} out of {} using {} threads (execution time: total {} ms, avg {} ms, max {} ms).", + activity, doneCount, count, parallelism, watch.elapsed(TimeUnit.MILLISECONDS), + TimeUnit.NANOSECONDS.toMillis(totalExecution) / doneCount, TimeUnit.NANOSECONDS.toMillis(maxExecution)); + } else { + logger.debug("{}: completed {} out of {} using {} threads", activity, doneCount, count, parallelism); + } } + } - @Override - public void run() { - try{ - runnable.run(); - }finally{ - latch.countDown(); + @Override + public final V call() throws Exception { + long start = System.nanoTime(); + startTime = start; + try { + logger.debug("Started execution of '{}' task at {} ms", this, TimeUnit.MILLISECONDS.convert(start, TimeUnit.NANOSECONDS)); + return runInner(); + } catch (InterruptedException e) { + logger.warn("Task '{}' interrupted", this, e); + throw e; + } finally { + long time = System.nanoTime() - start; + if (logger.isWarnEnabled()) { + long timeMillis = TimeUnit.MILLISECONDS.convert(time, TimeUnit.NANOSECONDS); + if (timeMillis > TIMEOUT_PER_RUNNABLE_IN_MSECS) { + logger.warn("Task '{}' execution time {} ms exceeds timeout {} ms.", this, timeMillis, TIMEOUT_PER_RUNNABLE_IN_MSECS); + } else { + logger.debug("Task '{}' execution time is {} ms", this, timeMillis); + } } + executionTime = time; } } + protected abstract V runInner() throws Exception; + + private long getStartTime(TimeUnit unit) { + return unit.convert(startTime, TimeUnit.NANOSECONDS); + } + + private long getExecutionTime(TimeUnit unit) { + return unit.convert(executionTime, TimeUnit.NANOSECONDS); + } + + /** * Execute the list of runnables with the given parallelization. At end, return values and report completion time * stats to provided logger. Each runnable is allowed a certain timeout. If the timeout exceeds, existing/pending * tasks will be cancelled and a {@link UserException} is thrown. * @param activity Name of activity for reporting in logger. * @param logger The logger to use to report results. - * @param runnables List of runnables that should be executed and timed. If this list has one item, task will be - * completed in-thread. Runnable must handle {@link InterruptedException}s. + * @param tasks List of callable that should be executed and timed. If this list has one item, task will be + * completed in-thread. Each callable must handle {@link InterruptedException}s. * @param parallelism The number of threads that should be run to complete this task. * @return The list of outcome objects. * @throws IOException All exceptions are coerced to IOException since this was build for storage system tasks initially. */ - public static <V> List<V> run(final String activity, final Logger logger, final List<TimedCallable<V>> runnables, int parallelism) throws IOException { - Stopwatch watch = logger.isDebugEnabled() ? Stopwatch.createStarted() : null; - long timedRunnableStart=System.nanoTime(); - if(runnables.size() == 1){ - parallelism = 1; - runnables.get(0).run(); - }else{ - parallelism = Math.min(parallelism, runnables.size()); - final ExtendedLatch latch = new ExtendedLatch(runnables.size()); - final ExecutorService threadPool = Executors.newFixedThreadPool(parallelism); - try{ - for(TimedCallable<V> runnable : runnables){ - threadPool.submit(new LatchedRunnable(latch, runnable)); - } - - final long timeout = (long)Math.ceil((TIMEOUT_PER_RUNNABLE_IN_MSECS * runnables.size())/parallelism); - if (!latch.awaitUninterruptibly(timeout)) { - // Issue a shutdown request. This will cause existing threads to interrupt and pending threads to cancel. - // It is highly important that the task Runnables are handling interrupts correctly. - threadPool.shutdownNow(); - - try { - // Wait for 5s for currently running threads to terminate. Above call (threadPool.shutdownNow()) interrupts - // any running threads. If the runnables are handling the interrupts properly they should be able to - // wrap up and terminate. If not waiting for 5s here gives a chance to identify and log any potential - // thread leaks. - threadPool.awaitTermination(5, TimeUnit.SECONDS); - } catch (final InterruptedException e) { - logger.warn("Interrupted while waiting for pending threads in activity '{}' to terminate.", activity); - } - - final String errMsg = String.format("Waited for %dms, but tasks for '%s' are not complete. " + - "Total runnable size %d, parallelism %d.", timeout, activity, runnables.size(), parallelism); - logger.error(errMsg); - throw UserException.resourceError() - .message(errMsg) - .build(logger); - } - } finally { - if (!threadPool.isShutdown()) { - threadPool.shutdown(); - } + public static <V> List<V> run(final String activity, final Logger logger, final List<TimedCallable<V>> tasks, int parallelism) throws IOException { + Preconditions.checkArgument(!Preconditions.checkNotNull(tasks).isEmpty(), "list of tasks is empty"); + Preconditions.checkArgument(parallelism > 0); + parallelism = Math.min(parallelism, tasks.size()); + final ExecutorService threadPool = parallelism == 1 ? MoreExecutors.newDirectExecutorService() + : Executors.newFixedThreadPool(parallelism, new ThreadFactoryBuilder().setNameFormat(activity + "-%d").build()); + final long timeout = TIMEOUT_PER_RUNNABLE_IN_MSECS * ((tasks.size() - 1)/parallelism + 1); + final FutureMapper<V> futureMapper = new FutureMapper<>(); + final Statistics<V> statistics = logger.isDebugEnabled() ? new Statistics<>() : null; + try { + return threadPool.invokeAll(tasks, timeout, TimeUnit.MILLISECONDS) + .stream() + .map(futureMapper) + .filter(Objects::nonNull) + .collect(Collectors.toList()); + } catch (InterruptedException e) { + final String errMsg = String.format("Interrupted while waiting for activity '%s' tasks to be done.", activity); + logger.error(errMsg, e); + throw UserException.resourceError(e) + .message(errMsg) + .build(logger); + } catch (RejectedExecutionException e) { + final String errMsg = String.format("Failure while submitting activity '%s' tasks for execution.", activity); + logger.error(errMsg, e); + throw UserException.internalError(e) + .message(errMsg) + .build(logger); + } finally { + List<Runnable> notStartedTasks = threadPool.shutdownNow(); + if (!notStartedTasks.isEmpty()) { + logger.error("{} activity '{}' tasks never commenced execution.", notStartedTasks.size(), activity); } - } - - List<V> values = Lists.newArrayList(); - long sum = 0; - long max = 0; - long count = 0; - // measure thread creation times - long earliestStart=Long.MAX_VALUE; - long latestStart=0; - long totalStart=0; - IOException excep = null; - for(final TimedCallable<V> reader : runnables){ - try{ - values.add(reader.getValue()); - sum += reader.getTimeSpentNanos(); - count++; - max = Math.max(max, reader.getTimeSpentNanos()); - earliestStart=Math.min(earliestStart, reader.getThreadStart() - timedRunnableStart); - latestStart=Math.max(latestStart, reader.getThreadStart()-timedRunnableStart); - totalStart+=latestStart=Math.max(latestStart, reader.getThreadStart()-timedRunnableStart); - }catch(IOException e){ - if(excep == null){ - excep = e; - }else{ - excep.addSuppressed(e); + try { + // Wait for 5s for currently running threads to terminate. Above call (threadPool.shutdownNow()) interrupts + // any running threads. If the tasks are handling the interrupts properly they should be able to + // wrap up and terminate. If not waiting for 5s here gives a chance to identify and log any potential + // thread leaks. + if (!threadPool.awaitTermination(5000, TimeUnit.MILLISECONDS)) { + logger.error("Detected run away tasks in activity '{}'.", activity); } + } catch (final InterruptedException e) { + logger.warn("Interrupted while waiting for pending threads in activity '{}' to terminate.", activity); } - } - - if (watch != null) { - double avg = (sum/1000.0/1000.0)/(count*1.0d); - double avgStart = (totalStart/1000.0)/(count*1.0d); - - logger.debug( - String.format("%s: Executed %d out of %d using %d threads. " - + "Time: %dms total, %fms avg, %dms max.", - activity, count, runnables.size(), parallelism, watch.elapsed(TimeUnit.MILLISECONDS), avg, max/1000/1000)); - logger.debug( - String.format("%s: Executed %d out of %d using %d threads. " - + "Earliest start: %f \u03BCs, Latest start: %f \u03BCs, Average start: %f \u03BCs .", - activity, count, runnables.size(), parallelism, earliestStart/1000.0, latestStart/1000.0, avgStart)); - watch.stop(); - } - if (excep != null) { - throw excep; + if (statistics != null) { + statistics.collect(tasks).log(activity, logger, parallelism); + } + if (futureMapper.count != tasks.size()) { + final String errMsg = String.format("Waited for %d ms, but only %d tasks for '%s' are complete." + + " Total number of tasks %d, parallelism %d.", timeout, futureMapper.count, activity, tasks.size(), parallelism); + logger.error(errMsg, futureMapper.throwable); + throw UserException.resourceError(futureMapper.throwable) + .message(errMsg) + .build(logger); + } + if (futureMapper.throwable != null) { + throw (futureMapper.throwable instanceof IOException) ? + (IOException)futureMapper.throwable : new IOException(futureMapper.throwable); + } } - - return values; - } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/FooterGatherer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/FooterGatherer.java index 91219c7..ea34c7d 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/FooterGatherer.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/FooterGatherer.java @@ -19,10 +19,12 @@ package org.apache.drill.exec.store.parquet; import java.io.ByteArrayInputStream; import java.io.IOException; +import java.util.ArrayList; import java.util.Arrays; import java.util.List; import org.apache.commons.lang3.ArrayUtils; +import org.apache.commons.lang3.builder.ToStringBuilder; import org.apache.drill.exec.store.TimedCallable; import org.apache.drill.exec.util.DrillFileSystemUtil; import org.apache.hadoop.conf.Configuration; @@ -38,7 +40,8 @@ import org.apache.parquet.hadoop.ParquetFileWriter; import org.apache.parquet.hadoop.metadata.ParquetMetadata; import com.google.common.base.Preconditions; -import com.google.common.collect.Lists; + +import static org.apache.commons.lang3.builder.ToStringStyle.SHORT_PREFIX_STYLE; public class FooterGatherer { static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FooterGatherer.class); @@ -66,8 +69,8 @@ public class FooterGatherer { } public static List<Footer> getFooters(final Configuration conf, List<FileStatus> statuses, int parallelism) throws IOException { - final List<TimedCallable<Footer>> readers = Lists.newArrayList(); - List<Footer> foundFooters = Lists.newArrayList(); + final List<TimedCallable<Footer>> readers = new ArrayList<>(); + final List<Footer> foundFooters = new ArrayList<>(); for (FileStatus status : statuses) { @@ -116,10 +119,9 @@ public class FooterGatherer { } @Override - protected IOException convertToIOException(Exception e) { - return new IOException("Failure while trying to get footer for file " + status.getPath(), e); + public String toString() { + return new ToStringBuilder(this, SHORT_PREFIX_STYLE).append("path", status.getPath()).toString(); } - } /** diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/Metadata.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/Metadata.java index 24bde1d..49a6b52 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/Metadata.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/Metadata.java @@ -27,6 +27,8 @@ import com.fasterxml.jackson.module.afterburner.AfterburnerModule; import com.google.common.base.Stopwatch; import com.google.common.collect.Lists; import com.google.common.collect.Maps; + +import org.apache.commons.lang3.builder.ToStringBuilder; import org.apache.commons.lang3.tuple.Pair; import org.apache.drill.common.expression.SchemaPath; import org.apache.drill.common.util.DrillVersionInfo; @@ -68,6 +70,7 @@ import java.util.concurrent.TimeUnit; import java.util.function.Function; import java.util.stream.Collectors; +import static org.apache.commons.lang3.builder.ToStringStyle.SHORT_PREFIX_STYLE; import static org.apache.drill.exec.store.parquet.metadata.MetadataBase.ParquetFileMetadata; import static org.apache.drill.exec.store.parquet.metadata.MetadataBase.ParquetTableMetadataBase; import static org.apache.drill.exec.store.parquet.metadata.MetadataBase.RowGroupMetadata; @@ -362,13 +365,8 @@ public class Metadata { return getParquetFileMetadata_v3(parquetTableMetadata, fileStatus, fs); } - @Override - protected IOException convertToIOException(Exception e) { - if (e instanceof IOException) { - return (IOException) e; - } else { - return new IOException(e); - } + public String toString() { + return new ToStringBuilder(this, SHORT_PREFIX_STYLE).append("path", fileStatus.getPath()).toString(); } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/schedule/BlockMapBuilder.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/schedule/BlockMapBuilder.java index 6bd2ede..fdc8ba3 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/schedule/BlockMapBuilder.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/schedule/BlockMapBuilder.java @@ -19,14 +19,17 @@ package org.apache.drill.exec.store.schedule; import java.io.IOException; +import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.Set; import java.util.concurrent.TimeUnit; +import org.apache.commons.lang3.builder.ToStringBuilder; import org.apache.drill.exec.metrics.DrillMetrics; import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint; import org.apache.drill.exec.store.TimedCallable; @@ -48,6 +51,8 @@ import com.google.common.collect.Maps; import com.google.common.collect.Range; import com.google.common.collect.Sets; +import static org.apache.commons.lang3.builder.ToStringStyle.SHORT_PREFIX_STYLE; + public class BlockMapBuilder { static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BlockMapBuilder.class); static final MetricRegistry metrics = DrillMetrics.getRegistry(); @@ -70,7 +75,7 @@ public class BlockMapBuilder { public List<CompleteFileWork> generateFileWork(List<FileStatus> files, boolean blockify) throws IOException { - List<TimedCallable<List<CompleteFileWork>>> readers = Lists.newArrayList(); + List<TimedCallable<List<CompleteFileWork>>> readers = new ArrayList<>(files.size()); for(FileStatus status : files){ readers.add(new BlockMapReader(status, blockify)); } @@ -103,9 +108,9 @@ public class BlockMapBuilder { @Override protected List<CompleteFileWork> runInner() throws Exception { - final List<CompleteFileWork> work = Lists.newArrayList(); + final List<CompleteFileWork> work = new ArrayList<>(); - final Set<String> noDrillbitHosts = logger.isDebugEnabled() ? Sets.<String>newHashSet() : null; + final Set<String> noDrillbitHosts = logger.isDebugEnabled() ? new HashSet<>() : null; boolean error = false; if (blockify && !compressed(status)) { @@ -143,12 +148,10 @@ public class BlockMapBuilder { return work; } - @Override - protected IOException convertToIOException(Exception e) { - return new IOException("Failure while trying to get block map for " + status.getPath(), e); + public String toString() { + return new ToStringBuilder(this, SHORT_PREFIX_STYLE).append("path", status.getPath()).toString(); } - } private class FileStatusWork implements FileWork{ diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/TestTimedCallable.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/TestTimedCallable.java index a34383d..ea34230 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/TestTimedCallable.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/TestTimedCallable.java @@ -17,7 +17,7 @@ */ package org.apache.drill.exec.store; -import com.google.common.collect.Lists; +import org.apache.commons.lang3.builder.ToStringBuilder; import org.apache.drill.common.exceptions.UserException; import org.apache.drill.test.TestTools; import org.apache.drill.test.DrillTest; @@ -27,12 +27,13 @@ import org.junit.Test; import org.junit.experimental.categories.Category; import org.junit.rules.TestRule; -import java.io.IOException; +import java.util.ArrayList; import java.util.List; +import static org.apache.commons.lang3.builder.ToStringStyle.SHORT_PREFIX_STYLE; import static org.hamcrest.core.StringContains.containsString; -import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertThat; +import static org.junit.Assert.fail; /** * Unit testing for {@link TimedCallable}. @@ -53,25 +54,22 @@ public class TestTimedCallable extends DrillTest { @Override protected Void runInner() throws Exception { - try { - Thread.sleep(sleepTime); - } catch (InterruptedException e) { - throw e; - } + Thread.sleep(sleepTime); return null; } @Override - protected IOException convertToIOException(Exception e) { - return new IOException("Failure while trying to sleep for sometime", e); + public String toString() { + return new ToStringBuilder(this, SHORT_PREFIX_STYLE).append("sleepTime", sleepTime).toString(); } } @Test public void withoutAnyTasksTriggeringTimeout() throws Exception { - List<TimedCallable<TestTask>> tasks = Lists.newArrayList(); + int count = 100; + List<TimedCallable<TestTask>> tasks = new ArrayList<>(count); - for(int i=0; i<100; i++){ + for (int i = 0; i < count; i++) { tasks.add(new TestTask(2000)); } @@ -80,12 +78,11 @@ public class TestTimedCallable extends DrillTest { @Test public void withTasksExceedingTimeout() throws Exception { - UserException ex = null; - try { - List<TimedCallable<TestTask>> tasks = Lists.newArrayList(); + int count = 100; + List<TimedCallable<TestTask>> tasks = new ArrayList<>(count); - for (int i = 0; i < 100; i++) { + for (int i = 0; i < count; i++) { if ((i & (i + 1)) == 0) { tasks.add(new TestTask(2000)); } else { @@ -94,22 +91,20 @@ public class TestTimedCallable extends DrillTest { } TimedCallable.run("Execution with some tasks triggering timeout", logger, tasks, 16); + fail("Expected a UserException"); } catch (UserException e) { - ex = e; + assertThat(e.getMessage(), + containsString("Waited for 105000 ms, but only 87 tasks for 'Execution with some tasks triggering timeout' are " + + "complete. Total number of tasks 100, parallelism 16.")); } - - assertNotNull("Expected a UserException", ex); - assertThat(ex.getMessage(), - containsString("Waited for 93750ms, but tasks for 'Execution with some tasks triggering timeout' are not " + - "complete. Total runnable size 100, parallelism 16.")); } @Test public void withManyTasks() throws Exception { + int count = 150000; + List<TimedCallable<TestTask>> tasks = new ArrayList<>(count); - List<TimedCallable<TestTask>> tasks = Lists.newArrayList(); - - for (int i = 0; i < 150000; i++) { + for (int i = 0; i < count; i++) { tasks.add(new TestTask(0)); } -- To stop receiving notification emails like this one, please contact [email protected].
