[
https://issues.apache.org/jira/browse/DRILL-6281?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16464866#comment-16464866
]
ASF GitHub Bot commented on DRILL-6281:
---------------------------------------
asfgit closed pull request #1238: DRILL-6281: Refactor TimedRunnable
URL: https://github.com/apache/drill/pull/1238
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/common/src/main/java/org/apache/drill/common/collections/Collectors.java
b/common/src/main/java/org/apache/drill/common/collections/Collectors.java
new file mode 100644
index 0000000000..3e80b2fe70
--- /dev/null
+++ b/common/src/main/java/org/apache/drill/common/collections/Collectors.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.common.collections;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.function.BiFunction;
+import java.util.function.Function;
+import java.util.function.Predicate;
+
+import com.google.common.base.Preconditions;
+
+public class Collectors {
+ private Collectors() {
+ }
+
+ /**
+ *
+ * @param map {@code Map<K, V>} to collect elements from
+ * @param mapper {@code BiFunction} that maps from (key, value) pair to type
<T>
+ * @param <T> elements type in {@code List}
+ * @param <K> key type in {@code Map}
+ * @param <V> value type in {@code Map}
+ * @return new {@code List} that contains elements after applying mapper
{@code BiFunction} to the input {@code Map}
+ */
+ public static <T, K, V> List<T> toList(Map<K, V> map, BiFunction<K, V, T>
mapper) {
+ return collect(new ArrayList<>(map.size()), map, mapper);
+ }
+
+ /**
+ *
+ * @param map {@code Map<K, V>} to collect elements from
+ * @param mapper {@code BiFunction} that maps from (key, value) pair to type
<T>
+ * @param predicate {@code Predicate} filter to apply
+ * @param <T> elements type in {@code List}
+ * @param <K> keys type in {@code Map}
+ * @param <V> value type in {@code Map}
+ * @return new {@code List} that contains elements that satisfy {@code
Predicate} after applying mapper {@code BiFunction}
+ * to the input {@code Map}
+ */
+ public static <T, K, V> List<T> toList(Map<K, V> map, BiFunction<K, V, T>
mapper, Predicate<T> predicate) {
+ return collect(new ArrayList<>(map.size()), map, mapper, predicate);
+ }
+
+ public static <T, K, V> List<T> collect(List<T> list, Map<K, V> map,
BiFunction<K, V, T> mapper) {
+ Preconditions.checkNotNull(list);
+ Preconditions.checkNotNull(map);
+ Preconditions.checkNotNull(mapper);
+ map.forEach((k, v) -> list.add(mapper.apply(k, v)));
+ return list;
+ }
+
+ public static <T, K, V> List<T> collect(List<T> list, Map<K, V> map,
BiFunction<K, V, T> mapper, Predicate<T> predicate) {
+ Preconditions.checkNotNull(list);
+ Preconditions.checkNotNull(map);
+ Preconditions.checkNotNull(mapper);
+ Preconditions.checkNotNull(predicate);
+ map.forEach((k, v) -> {
+ T t = mapper.apply(k, v);
+ if (predicate.test(t)) {
+ list.add(t);
+ }
+ });
+ return list;
+ }
+
+ /**
+ *
+ * @param collection {@code Collection<E>} of elements of type <E>
+ * @param mapper {@code Function<E, T>} mapper function to apply
+ * @param <T> elements type in {@code List}
+ * @param <E> elements type in {@code Collection}
+ * @return new {@code List} that contains elements that satisfy {@code
Predicate} after applying mapper {@code Function}
+ * to the input {@code Collection}
+ */
+ public static <T, E> List<T> toList(Collection<E> collection, Function<E, T>
mapper) {
+ Preconditions.checkNotNull(collection);
+ Preconditions.checkNotNull(mapper);
+ ArrayList<T> list = new ArrayList<>(collection.size());
+ collection.forEach(e -> list.add(mapper.apply(e)));
+ return list;
+ }
+
+ /**
+ *
+ * @param collection {@code Collection<E>} of elements of type <E>
+ * @param mapper {@code Function<E, T>} mapper function to apply
+ * @param predicate {@code Predicate} filter to apply
+ * @param <T> elements type in {@code List}
+ * @param <E> elements type in {@code Collection}
+ * @return new {@code List} that contains elements after applying mapper
{@code Function} to the input {@code Collection}
+ */
+ public static <T, E> List<T> toList(Collection<E> collection, Function<E, T>
mapper, Predicate<T> predicate) {
+ Preconditions.checkNotNull(collection);
+ Preconditions.checkNotNull(mapper);
+ Preconditions.checkNotNull(predicate);
+ ArrayList<T> list = new ArrayList<>(collection.size());
+ collection.forEach(e -> {
+ T t = mapper.apply(e);
+ if (predicate.test(t)) {
+ list.add(t);
+ }
+ });
+ return list;
+ }
+}
diff --git
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/TimedCallable.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/TimedCallable.java
new file mode 100644
index 0000000000..3c2bbfe50c
--- /dev/null
+++
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/TimedCallable.java
@@ -0,0 +1,261 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
+import java.util.function.Function;
+
+import org.apache.drill.common.collections.Collectors;
+import org.apache.drill.common.exceptions.UserException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Stopwatch;
+import com.google.common.util.concurrent.MoreExecutors;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+/**
+ * Class used to allow parallel executions of tasks in a simplified way. Also
maintains and reports timings of task completion.
+ * TODO: look at switching to fork join.
+ * @param <V> The time value that will be returned when the task is executed.
+ */
+public abstract class TimedCallable<V> implements Callable<V> {
+ private static final Logger logger =
LoggerFactory.getLogger(TimedCallable.class);
+
+ private static long TIMEOUT_PER_RUNNABLE_IN_MSECS = 15000;
+
+ private volatile long startTime = 0;
+ private volatile long executionTime = -1;
+
+ private static class FutureMapper<V> implements Function<Future<V>, V> {
+ int count;
+ Throwable throwable = null;
+
+ private void setThrowable(Throwable t) {
+ if (throwable == null) {
+ throwable = t;
+ } else {
+ throwable.addSuppressed(t);
+ }
+ }
+
+ @Override
+ public V apply(Future<V> future) {
+ Preconditions.checkState(future.isDone());
+ if (!future.isCancelled()) {
+ try {
+ count++;
+ return future.get();
+ } catch (InterruptedException e) {
+ // there is no wait as we are getting result from the completed/done
future
+ logger.error("Unexpected exception", e);
+ throw UserException.internalError(e)
+ .message("Unexpected exception")
+ .build(logger);
+ } catch (ExecutionException e) {
+ setThrowable(e.getCause());
+ }
+ } else {
+ setThrowable(new CancellationException());
+ }
+ return null;
+ }
+ }
+
+ private static class Statistics<V> implements Consumer<TimedCallable<V>> {
+ final long start = System.nanoTime();
+ final Stopwatch watch = Stopwatch.createStarted();
+ long totalExecution;
+ long maxExecution;
+ int count;
+ int startedCount;
+ private int doneCount;
+ // measure thread creation times
+ long earliestStart;
+ long latestStart;
+ long totalStart;
+
+ @Override
+ public void accept(TimedCallable<V> task) {
+ count++;
+ long threadStart = task.getStartTime(TimeUnit.NANOSECONDS) - start;
+ if (threadStart >= 0) {
+ startedCount++;
+ earliestStart = Math.min(earliestStart, threadStart);
+ latestStart = Math.max(latestStart, threadStart);
+ totalStart += threadStart;
+ long executionTime = task.getExecutionTime(TimeUnit.NANOSECONDS);
+ if (executionTime != -1) {
+ doneCount++;
+ totalExecution += executionTime;
+ maxExecution = Math.max(maxExecution, executionTime);
+ } else {
+ logger.info("Task {} started at {} did not finish", task,
threadStart);
+ }
+ } else {
+ logger.info("Task {} never commenced execution", task);
+ }
+ }
+
+ Statistics<V> collect(final List<TimedCallable<V>> tasks) {
+ totalExecution = maxExecution = 0;
+ count = startedCount = doneCount = 0;
+ earliestStart = Long.MAX_VALUE;
+ latestStart = totalStart = 0;
+ tasks.forEach(this);
+ return this;
+ }
+
+ void log(final String activity, final Logger logger, int parallelism) {
+ if (startedCount > 0) {
+ logger.debug("{}: started {} out of {} using {} threads. (start time:
min {} ms, avg {} ms, max {} ms).",
+ activity, startedCount, count, parallelism,
+ TimeUnit.NANOSECONDS.toMillis(earliestStart),
+ TimeUnit.NANOSECONDS.toMillis(totalStart) / startedCount,
+ TimeUnit.NANOSECONDS.toMillis(latestStart));
+ } else {
+ logger.debug("{}: started {} out of {} using {} threads.", activity,
startedCount, count, parallelism);
+ }
+
+ if (doneCount > 0) {
+ logger.debug("{}: completed {} out of {} using {} threads (execution
time: total {} ms, avg {} ms, max {} ms).",
+ activity, doneCount, count, parallelism,
watch.elapsed(TimeUnit.MILLISECONDS),
+ TimeUnit.NANOSECONDS.toMillis(totalExecution) / doneCount,
TimeUnit.NANOSECONDS.toMillis(maxExecution));
+ } else {
+ logger.debug("{}: completed {} out of {} using {} threads", activity,
doneCount, count, parallelism);
+ }
+ }
+ }
+
+ @Override
+ public final V call() throws Exception {
+ long start = System.nanoTime();
+ startTime = start;
+ try {
+ logger.debug("Started execution of '{}' task at {} ms", this,
TimeUnit.MILLISECONDS.convert(start, TimeUnit.NANOSECONDS));
+ return runInner();
+ } catch (InterruptedException e) {
+ logger.warn("Task '{}' interrupted", this, e);
+ throw e;
+ } finally {
+ long time = System.nanoTime() - start;
+ if (logger.isWarnEnabled()) {
+ long timeMillis = TimeUnit.MILLISECONDS.convert(time,
TimeUnit.NANOSECONDS);
+ if (timeMillis > TIMEOUT_PER_RUNNABLE_IN_MSECS) {
+ logger.warn("Task '{}' execution time {} ms exceeds timeout {} ms.",
this, timeMillis, TIMEOUT_PER_RUNNABLE_IN_MSECS);
+ } else {
+ logger.debug("Task '{}' execution time is {} ms", this, timeMillis);
+ }
+ }
+ executionTime = time;
+ }
+ }
+
+ protected abstract V runInner() throws Exception;
+
+ private long getStartTime(TimeUnit unit) {
+ return unit.convert(startTime, TimeUnit.NANOSECONDS);
+ }
+
+ private long getExecutionTime(TimeUnit unit) {
+ return unit.convert(executionTime, TimeUnit.NANOSECONDS);
+ }
+
+
+ /**
+ * Execute the list of runnables with the given parallelization. At end,
return values and report completion time
+ * stats to provided logger. Each runnable is allowed a certain timeout. If
the timeout exceeds, existing/pending
+ * tasks will be cancelled and a {@link UserException} is thrown.
+ * @param activity Name of activity for reporting in logger.
+ * @param logger The logger to use to report results.
+ * @param tasks List of callable that should be executed and timed. If this
list has one item, task will be
+ * completed in-thread. Each callable must handle {@link
InterruptedException}s.
+ * @param parallelism The number of threads that should be run to complete
this task.
+ * @return The list of outcome objects.
+ * @throws IOException All exceptions are coerced to IOException since this
was build for storage system tasks initially.
+ */
+ public static <V> List<V> run(final String activity, final Logger logger,
final List<TimedCallable<V>> tasks, int parallelism) throws IOException {
+ Preconditions.checkArgument(!Preconditions.checkNotNull(tasks).isEmpty(),
"list of tasks is empty");
+ Preconditions.checkArgument(parallelism > 0);
+ parallelism = Math.min(parallelism, tasks.size());
+ final ExecutorService threadPool = parallelism == 1 ?
MoreExecutors.newDirectExecutorService()
+ : Executors.newFixedThreadPool(parallelism, new
ThreadFactoryBuilder().setNameFormat(activity + "-%d").build());
+ final long timeout = TIMEOUT_PER_RUNNABLE_IN_MSECS * ((tasks.size() -
1)/parallelism + 1);
+ final FutureMapper<V> futureMapper = new FutureMapper<>();
+ final Statistics<V> statistics = logger.isDebugEnabled() ? new
Statistics<>() : null;
+ try {
+ return Collectors.toList(threadPool.invokeAll(tasks, timeout,
TimeUnit.MILLISECONDS), futureMapper);
+ } catch (InterruptedException e) {
+ final String errMsg = String.format("Interrupted while waiting for
activity '%s' tasks to be done.", activity);
+ logger.error(errMsg, e);
+ throw UserException.resourceError(e)
+ .message(errMsg)
+ .build(logger);
+ } catch (RejectedExecutionException e) {
+ final String errMsg = String.format("Failure while submitting activity
'%s' tasks for execution.", activity);
+ logger.error(errMsg, e);
+ throw UserException.internalError(e)
+ .message(errMsg)
+ .build(logger);
+ } finally {
+ List<Runnable> notStartedTasks = threadPool.shutdownNow();
+ if (!notStartedTasks.isEmpty()) {
+ logger.error("{} activity '{}' tasks never commenced execution.",
notStartedTasks.size(), activity);
+ }
+ try {
+ // Wait for 5s for currently running threads to terminate. Above call
(threadPool.shutdownNow()) interrupts
+ // any running threads. If the tasks are handling the interrupts
properly they should be able to
+ // wrap up and terminate. If not waiting for 5s here gives a chance to
identify and log any potential
+ // thread leaks.
+ if (!threadPool.awaitTermination(5000, TimeUnit.MILLISECONDS)) {
+ logger.error("Detected run away tasks in activity '{}'.", activity);
+ }
+ } catch (final InterruptedException e) {
+ logger.warn("Interrupted while waiting for pending threads in activity
'{}' to terminate.", activity);
+ }
+
+ if (statistics != null) {
+ statistics.collect(tasks).log(activity, logger, parallelism);
+ }
+ if (futureMapper.count != tasks.size()) {
+ final String errMsg = String.format("Waited for %d ms, but only %d
tasks for '%s' are complete." +
+ " Total number of tasks %d, parallelism %d.", timeout,
futureMapper.count, activity, tasks.size(), parallelism);
+ logger.error(errMsg, futureMapper.throwable);
+ throw UserException.resourceError(futureMapper.throwable)
+ .message(errMsg)
+ .build(logger);
+ }
+ if (futureMapper.throwable != null) {
+ throw (futureMapper.throwable instanceof IOException) ?
+ (IOException)futureMapper.throwable : new
IOException(futureMapper.throwable);
+ }
+ }
+ }
+}
diff --git
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/TimedRunnable.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/TimedRunnable.java
deleted file mode 100644
index 7cce2ad5cc..0000000000
---
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/TimedRunnable.java
+++ /dev/null
@@ -1,208 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.store;
-
-import java.io.IOException;
-import java.util.List;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-
-import com.google.common.base.Stopwatch;
-import org.apache.drill.common.concurrent.ExtendedLatch;
-import org.apache.drill.common.exceptions.UserException;
-import org.slf4j.Logger;
-
-import com.google.common.collect.Lists;
-
-/**
- * Class used to allow parallel executions of tasks in a simplified way. Also
maintains and reports timings of task completion.
- * TODO: look at switching to fork join.
- * @param <V> The time value that will be returned when the task is executed.
- */
-public abstract class TimedRunnable<V> implements Runnable {
-
- private static long TIMEOUT_PER_RUNNABLE_IN_MSECS = 15000;
-
- private volatile Exception e;
- private volatile long threadStart;
- private volatile long timeNanos;
- private volatile V value;
-
- @Override
- public final void run() {
- long start = System.nanoTime();
- threadStart=start;
- try{
- value = runInner();
- }catch(Exception e){
- this.e = e;
- }finally{
- timeNanos = System.nanoTime() - start;
- }
- }
-
- protected abstract V runInner() throws Exception ;
- protected abstract IOException convertToIOException(Exception e);
-
- public long getThreadStart(){
- return threadStart;
- }
- public long getTimeSpentNanos(){
- return timeNanos;
- }
-
- public final V getValue() throws IOException {
- if(e != null){
- if(e instanceof IOException){
- throw (IOException) e;
- }else{
- throw convertToIOException(e);
- }
- }
-
- return value;
- }
-
- private static class LatchedRunnable implements Runnable {
- final CountDownLatch latch;
- final Runnable runnable;
-
- public LatchedRunnable(CountDownLatch latch, Runnable runnable){
- this.latch = latch;
- this.runnable = runnable;
- }
-
- @Override
- public void run() {
- try{
- runnable.run();
- }finally{
- latch.countDown();
- }
- }
- }
-
- /**
- * Execute the list of runnables with the given parallelization. At end,
return values and report completion time
- * stats to provided logger. Each runnable is allowed a certain timeout. If
the timeout exceeds, existing/pending
- * tasks will be cancelled and a {@link UserException} is thrown.
- * @param activity Name of activity for reporting in logger.
- * @param logger The logger to use to report results.
- * @param runnables List of runnables that should be executed and timed. If
this list has one item, task will be
- * completed in-thread. Runnable must handle {@link
InterruptedException}s.
- * @param parallelism The number of threads that should be run to complete
this task.
- * @return The list of outcome objects.
- * @throws IOException All exceptions are coerced to IOException since this
was build for storage system tasks initially.
- */
- public static <V> List<V> run(final String activity, final Logger logger,
final List<TimedRunnable<V>> runnables, int parallelism) throws IOException {
- Stopwatch watch = logger.isDebugEnabled() ? Stopwatch.createStarted() :
null;
- long timedRunnableStart=System.nanoTime();
- if(runnables.size() == 1){
- parallelism = 1;
- runnables.get(0).run();
- }else{
- parallelism = Math.min(parallelism, runnables.size());
- final ExtendedLatch latch = new ExtendedLatch(runnables.size());
- final ExecutorService threadPool =
Executors.newFixedThreadPool(parallelism);
- try{
- for(TimedRunnable<V> runnable : runnables){
- threadPool.submit(new LatchedRunnable(latch, runnable));
- }
-
- final long timeout = (long)Math.ceil((TIMEOUT_PER_RUNNABLE_IN_MSECS *
runnables.size())/parallelism);
- if (!latch.awaitUninterruptibly(timeout)) {
- // Issue a shutdown request. This will cause existing threads to
interrupt and pending threads to cancel.
- // It is highly important that the task Runnables are handling
interrupts correctly.
- threadPool.shutdownNow();
-
- try {
- // Wait for 5s for currently running threads to terminate. Above
call (threadPool.shutdownNow()) interrupts
- // any running threads. If the runnables are handling the
interrupts properly they should be able to
- // wrap up and terminate. If not waiting for 5s here gives a
chance to identify and log any potential
- // thread leaks.
- threadPool.awaitTermination(5, TimeUnit.SECONDS);
- } catch (final InterruptedException e) {
- logger.warn("Interrupted while waiting for pending threads in
activity '{}' to terminate.", activity);
- }
-
- final String errMsg = String.format("Waited for %dms, but tasks for
'%s' are not complete. " +
- "Total runnable size %d, parallelism %d.", timeout, activity,
runnables.size(), parallelism);
- logger.error(errMsg);
- throw UserException.resourceError()
- .message(errMsg)
- .build(logger);
- }
- } finally {
- if (!threadPool.isShutdown()) {
- threadPool.shutdown();
- }
- }
- }
-
- List<V> values = Lists.newArrayList();
- long sum = 0;
- long max = 0;
- long count = 0;
- // measure thread creation times
- long earliestStart=Long.MAX_VALUE;
- long latestStart=0;
- long totalStart=0;
- IOException excep = null;
- for(final TimedRunnable<V> reader : runnables){
- try{
- values.add(reader.getValue());
- sum += reader.getTimeSpentNanos();
- count++;
- max = Math.max(max, reader.getTimeSpentNanos());
- earliestStart=Math.min(earliestStart, reader.getThreadStart() -
timedRunnableStart);
- latestStart=Math.max(latestStart,
reader.getThreadStart()-timedRunnableStart);
- totalStart+=latestStart=Math.max(latestStart,
reader.getThreadStart()-timedRunnableStart);
- }catch(IOException e){
- if(excep == null){
- excep = e;
- }else{
- excep.addSuppressed(e);
- }
- }
- }
-
- if (watch != null) {
- double avg = (sum/1000.0/1000.0)/(count*1.0d);
- double avgStart = (totalStart/1000.0)/(count*1.0d);
-
- logger.debug(
- String.format("%s: Executed %d out of %d using %d threads. "
- + "Time: %dms total, %fms avg, %dms max.",
- activity, count, runnables.size(), parallelism,
watch.elapsed(TimeUnit.MILLISECONDS), avg, max/1000/1000));
- logger.debug(
- String.format("%s: Executed %d out of %d using %d threads. "
- + "Earliest start: %f \u03BCs, Latest start: %f
\u03BCs, Average start: %f \u03BCs .",
- activity, count, runnables.size(), parallelism,
earliestStart/1000.0, latestStart/1000.0, avgStart));
- watch.stop();
- }
-
- if (excep != null) {
- throw excep;
- }
-
- return values;
-
- }
-}
diff --git
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/FooterGatherer.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/FooterGatherer.java
index 3ba6ff0501..ea34c7d8b8 100644
---
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/FooterGatherer.java
+++
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/FooterGatherer.java
@@ -19,11 +19,13 @@
import java.io.ByteArrayInputStream;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import org.apache.commons.lang3.ArrayUtils;
-import org.apache.drill.exec.store.TimedRunnable;
+import org.apache.commons.lang3.builder.ToStringBuilder;
+import org.apache.drill.exec.store.TimedCallable;
import org.apache.drill.exec.util.DrillFileSystemUtil;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
@@ -38,7 +40,8 @@
import org.apache.parquet.hadoop.metadata.ParquetMetadata;
import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
+
+import static
org.apache.commons.lang3.builder.ToStringStyle.SHORT_PREFIX_STYLE;
public class FooterGatherer {
static final org.slf4j.Logger logger =
org.slf4j.LoggerFactory.getLogger(FooterGatherer.class);
@@ -66,8 +69,8 @@ private static void checkMagicBytes(FileStatus status, byte[]
data, int offset)
}
public static List<Footer> getFooters(final Configuration conf,
List<FileStatus> statuses, int parallelism) throws IOException {
- final List<TimedRunnable<Footer>> readers = Lists.newArrayList();
- List<Footer> foundFooters = Lists.newArrayList();
+ final List<TimedCallable<Footer>> readers = new ArrayList<>();
+ final List<Footer> foundFooters = new ArrayList<>();
for (FileStatus status : statuses) {
@@ -92,14 +95,14 @@ private static void checkMagicBytes(FileStatus status,
byte[] data, int offset)
}
if(!readers.isEmpty()){
- foundFooters.addAll(TimedRunnable.run("Fetch Parquet Footers", logger,
readers, parallelism));
+ foundFooters.addAll(TimedCallable.run("Fetch Parquet Footers", logger,
readers, parallelism));
}
return foundFooters;
}
- private static class FooterReader extends TimedRunnable<Footer>{
+ private static class FooterReader extends TimedCallable<Footer> {
final Configuration conf;
final FileStatus status;
@@ -116,10 +119,9 @@ protected Footer runInner() throws Exception {
}
@Override
- protected IOException convertToIOException(Exception e) {
- return new IOException("Failure while trying to get footer for file " +
status.getPath(), e);
+ public String toString() {
+ return new ToStringBuilder(this, SHORT_PREFIX_STYLE).append("path",
status.getPath()).toString();
}
-
}
/**
diff --git
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/Metadata.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/Metadata.java
index cdb28c2d4d..cdf98e605b 100644
---
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/Metadata.java
+++
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/Metadata.java
@@ -27,10 +27,13 @@
import com.google.common.base.Stopwatch;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
+
+import org.apache.commons.lang3.builder.ToStringBuilder;
import org.apache.commons.lang3.tuple.Pair;
+import org.apache.drill.common.collections.Collectors;
import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.common.util.DrillVersionInfo;
-import org.apache.drill.exec.store.TimedRunnable;
+import org.apache.drill.exec.store.TimedCallable;
import org.apache.drill.exec.store.dfs.MetadataContext;
import org.apache.drill.exec.store.parquet.ParquetFormatConfig;
import org.apache.drill.exec.store.parquet.ParquetReaderUtility;
@@ -66,8 +69,8 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
-import java.util.stream.Collectors;
+import static
org.apache.commons.lang3.builder.ToStringStyle.SHORT_PREFIX_STYLE;
import static
org.apache.drill.exec.store.parquet.metadata.MetadataBase.ParquetFileMetadata;
import static
org.apache.drill.exec.store.parquet.metadata.MetadataBase.ParquetTableMetadataBase;
import static
org.apache.drill.exec.store.parquet.metadata.MetadataBase.RowGroupMetadata;
@@ -291,7 +294,7 @@ private ParquetTableMetadata_v3
getParquetTableMetadata(String path, FileSystem
Map<FileStatus, FileSystem> fileStatusMap = fileStatuses.stream()
.collect(
- Collectors.toMap(
+ java.util.stream.Collectors.toMap(
Function.identity(),
s -> fs,
(oldFs, newFs) -> newFs,
@@ -332,20 +335,17 @@ private ParquetTableMetadata_v3
getParquetTableMetadata(Map<FileStatus, FileSyst
*/
private List<ParquetFileMetadata_v3> getParquetFileMetadata_v3(
ParquetTableMetadata_v3 parquetTableMetadata_v3, Map<FileStatus,
FileSystem> fileStatusMap) throws IOException {
-
- List<TimedRunnable<ParquetFileMetadata_v3>> gatherers =
fileStatusMap.entrySet().stream()
- .map(e -> new MetadataGatherer(parquetTableMetadata_v3, e.getKey(),
e.getValue()))
- .collect(Collectors.toList());
-
- List<ParquetFileMetadata_v3> metaDataList = new ArrayList<>();
- metaDataList.addAll(TimedRunnable.run("Fetch parquet metadata", logger,
gatherers, 16));
- return metaDataList;
+ return TimedCallable.run("Fetch parquet metadata", logger,
+ Collectors.toList(fileStatusMap,
+ (fileStatus, fileSystem) -> new
MetadataGatherer(parquetTableMetadata_v3, fileStatus, fileSystem)),
+ 16
+ );
}
/**
* TimedRunnable that reads the footer from parquet and collects file
metadata
*/
- private class MetadataGatherer extends TimedRunnable<ParquetFileMetadata_v3>
{
+ private class MetadataGatherer extends TimedCallable<ParquetFileMetadata_v3>
{
private final ParquetTableMetadata_v3 parquetTableMetadata;
private final FileStatus fileStatus;
@@ -362,13 +362,8 @@ protected ParquetFileMetadata_v3 runInner() throws
Exception {
return getParquetFileMetadata_v3(parquetTableMetadata, fileStatus, fs);
}
- @Override
- protected IOException convertToIOException(Exception e) {
- if (e instanceof IOException) {
- return (IOException) e;
- } else {
- return new IOException(e);
- }
+ public String toString() {
+ return new ToStringBuilder(this, SHORT_PREFIX_STYLE).append("path",
fileStatus.getPath()).toString();
}
}
diff --git
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/schedule/BlockMapBuilder.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/schedule/BlockMapBuilder.java
index 942afa1a43..fdc8ba3f15 100644
---
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/schedule/BlockMapBuilder.java
+++
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/schedule/BlockMapBuilder.java
@@ -19,17 +19,20 @@
import java.io.IOException;
+import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.TimeUnit;
+import org.apache.commons.lang3.builder.ToStringBuilder;
import org.apache.drill.exec.metrics.DrillMetrics;
import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
-import org.apache.drill.exec.store.TimedRunnable;
+import org.apache.drill.exec.store.TimedCallable;
import org.apache.drill.exec.store.dfs.easy.FileWork;
import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.FileStatus;
@@ -48,6 +51,8 @@
import com.google.common.collect.Range;
import com.google.common.collect.Sets;
+import static
org.apache.commons.lang3.builder.ToStringStyle.SHORT_PREFIX_STYLE;
+
public class BlockMapBuilder {
static final org.slf4j.Logger logger =
org.slf4j.LoggerFactory.getLogger(BlockMapBuilder.class);
static final MetricRegistry metrics = DrillMetrics.getRegistry();
@@ -70,11 +75,11 @@ private boolean compressed(FileStatus fileStatus) {
public List<CompleteFileWork> generateFileWork(List<FileStatus> files,
boolean blockify) throws IOException {
- List<TimedRunnable<List<CompleteFileWork>>> readers = Lists.newArrayList();
+ List<TimedCallable<List<CompleteFileWork>>> readers = new
ArrayList<>(files.size());
for(FileStatus status : files){
readers.add(new BlockMapReader(status, blockify));
}
- List<List<CompleteFileWork>> work = TimedRunnable.run("Get block maps",
logger, readers, 16);
+ List<List<CompleteFileWork>> work = TimedCallable.run("Get block maps",
logger, readers, 16);
List<CompleteFileWork> singleList = Lists.newArrayList();
for(List<CompleteFileWork> innerWorkList : work){
singleList.addAll(innerWorkList);
@@ -84,7 +89,7 @@ private boolean compressed(FileStatus fileStatus) {
}
- private class BlockMapReader extends TimedRunnable<List<CompleteFileWork>> {
+ private class BlockMapReader extends TimedCallable<List<CompleteFileWork>> {
final FileStatus status;
// This variable blockify indicates if a single file can be read by
multiple threads
@@ -103,9 +108,9 @@ public BlockMapReader(FileStatus status, boolean blockify) {
@Override
protected List<CompleteFileWork> runInner() throws Exception {
- final List<CompleteFileWork> work = Lists.newArrayList();
+ final List<CompleteFileWork> work = new ArrayList<>();
- final Set<String> noDrillbitHosts = logger.isDebugEnabled() ?
Sets.<String>newHashSet() : null;
+ final Set<String> noDrillbitHosts = logger.isDebugEnabled() ? new
HashSet<>() : null;
boolean error = false;
if (blockify && !compressed(status)) {
@@ -143,12 +148,10 @@ public BlockMapReader(FileStatus status, boolean
blockify) {
return work;
}
-
@Override
- protected IOException convertToIOException(Exception e) {
- return new IOException("Failure while trying to get block map for " +
status.getPath(), e);
+ public String toString() {
+ return new ToStringBuilder(this, SHORT_PREFIX_STYLE).append("path",
status.getPath()).toString();
}
-
}
private class FileStatusWork implements FileWork{
diff --git
a/exec/java-exec/src/test/java/org/apache/drill/exec/store/TestTimedRunnable.java
b/exec/java-exec/src/test/java/org/apache/drill/exec/store/TestTimedCallable.java
similarity index 61%
rename from
exec/java-exec/src/test/java/org/apache/drill/exec/store/TestTimedRunnable.java
rename to
exec/java-exec/src/test/java/org/apache/drill/exec/store/TestTimedCallable.java
index 27b1ed2d93..ea34230d8d 100644
---
a/exec/java-exec/src/test/java/org/apache/drill/exec/store/TestTimedRunnable.java
+++
b/exec/java-exec/src/test/java/org/apache/drill/exec/store/TestTimedCallable.java
@@ -17,7 +17,7 @@
*/
package org.apache.drill.exec.store;
-import com.google.common.collect.Lists;
+import org.apache.commons.lang3.builder.ToStringBuilder;
import org.apache.drill.common.exceptions.UserException;
import org.apache.drill.test.TestTools;
import org.apache.drill.test.DrillTest;
@@ -27,24 +27,25 @@
import org.junit.experimental.categories.Category;
import org.junit.rules.TestRule;
-import java.io.IOException;
+import java.util.ArrayList;
import java.util.List;
+import static
org.apache.commons.lang3.builder.ToStringStyle.SHORT_PREFIX_STYLE;
import static org.hamcrest.core.StringContains.containsString;
-import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertThat;
+import static org.junit.Assert.fail;
/**
- * Unit testing for {@link TimedRunnable}.
+ * Unit testing for {@link TimedCallable}.
*/
@Category({SlowTest.class})
-public class TestTimedRunnable extends DrillTest {
- private static final org.slf4j.Logger logger =
org.slf4j.LoggerFactory.getLogger(TestTimedRunnable.class);
+public class TestTimedCallable extends DrillTest {
+ private static final org.slf4j.Logger logger =
org.slf4j.LoggerFactory.getLogger(TestTimedCallable.class);
@Rule
public final TestRule TIMEOUT = TestTools.getTimeoutRule(180000); // 3mins
- private static class TestTask extends TimedRunnable {
+ private static class TestTask extends TimedCallable {
final long sleepTime; // sleep time in ms
public TestTask(final long sleepTime) {
@@ -53,39 +54,35 @@ public TestTask(final long sleepTime) {
@Override
protected Void runInner() throws Exception {
- try {
- Thread.sleep(sleepTime);
- } catch (InterruptedException e) {
- throw e;
- }
+ Thread.sleep(sleepTime);
return null;
}
@Override
- protected IOException convertToIOException(Exception e) {
- return new IOException("Failure while trying to sleep for sometime", e);
+ public String toString() {
+ return new ToStringBuilder(this, SHORT_PREFIX_STYLE).append("sleepTime",
sleepTime).toString();
}
}
@Test
public void withoutAnyTasksTriggeringTimeout() throws Exception {
- List<TimedRunnable<TestTask>> tasks = Lists.newArrayList();
+ int count = 100;
+ List<TimedCallable<TestTask>> tasks = new ArrayList<>(count);
- for(int i=0; i<100; i++){
+ for (int i = 0; i < count; i++) {
tasks.add(new TestTask(2000));
}
- TimedRunnable.run("Execution without triggering timeout", logger, tasks,
16);
+ TimedCallable.run("Execution without triggering timeout", logger, tasks,
16);
}
@Test
public void withTasksExceedingTimeout() throws Exception {
- UserException ex = null;
-
try {
- List<TimedRunnable<TestTask>> tasks = Lists.newArrayList();
+ int count = 100;
+ List<TimedCallable<TestTask>> tasks = new ArrayList<>(count);
- for (int i = 0; i < 100; i++) {
+ for (int i = 0; i < count; i++) {
if ((i & (i + 1)) == 0) {
tasks.add(new TestTask(2000));
} else {
@@ -93,26 +90,24 @@ public void withTasksExceedingTimeout() throws Exception {
}
}
- TimedRunnable.run("Execution with some tasks triggering timeout",
logger, tasks, 16);
+ TimedCallable.run("Execution with some tasks triggering timeout",
logger, tasks, 16);
+ fail("Expected a UserException");
} catch (UserException e) {
- ex = e;
+ assertThat(e.getMessage(),
+ containsString("Waited for 105000 ms, but only 87 tasks for
'Execution with some tasks triggering timeout' are " +
+ "complete. Total number of tasks 100, parallelism 16."));
}
-
- assertNotNull("Expected a UserException", ex);
- assertThat(ex.getMessage(),
- containsString("Waited for 93750ms, but tasks for 'Execution with some
tasks triggering timeout' are not " +
- "complete. Total runnable size 100, parallelism 16."));
}
@Test
public void withManyTasks() throws Exception {
+ int count = 150000;
+ List<TimedCallable<TestTask>> tasks = new ArrayList<>(count);
- List<TimedRunnable<TestTask>> tasks = Lists.newArrayList();
-
- for (int i = 0; i < 150000; i++) {
+ for (int i = 0; i < count; i++) {
tasks.add(new TestTask(0));
}
- TimedRunnable.run("Execution with lots of tasks", logger, tasks, 16);
+ TimedCallable.run("Execution with lots of tasks", logger, tasks, 16);
}
}
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
> Refactor TimedRunnable
> ----------------------
>
> Key: DRILL-6281
> URL: https://issues.apache.org/jira/browse/DRILL-6281
> Project: Apache Drill
> Issue Type: Sub-task
> Reporter: Vlad Rozov
> Assignee: Vlad Rozov
> Priority: Major
> Labels: ready-to-commit
> Fix For: 1.14.0
>
>
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)