This is an automated email from the ASF dual-hosted git repository. volodymyr pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/drill.git
commit b913b3f47249957b2292864a970847f137090f7a Author: Vlad Rozov <[email protected]> AuthorDate: Fri Apr 20 18:00:20 2018 -0700 DRILL-6281: Refactor TimedRunnable (rename TimedRunnable to TimedCallable) --- .../store/{TimedRunnable.java => TimedCallable.java} | 8 ++++---- .../drill/exec/store/parquet/FooterGatherer.java | 8 ++++---- .../drill/exec/store/parquet/metadata/Metadata.java | 8 ++++---- .../drill/exec/store/schedule/BlockMapBuilder.java | 8 ++++---- ...TestTimedRunnable.java => TestTimedCallable.java} | 20 ++++++++++---------- 5 files changed, 26 insertions(+), 26 deletions(-) diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/TimedRunnable.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/TimedCallable.java similarity index 97% rename from exec/java-exec/src/main/java/org/apache/drill/exec/store/TimedRunnable.java rename to exec/java-exec/src/main/java/org/apache/drill/exec/store/TimedCallable.java index 7cce2ad..1afd716 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/TimedRunnable.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/TimedCallable.java @@ -36,7 +36,7 @@ import com.google.common.collect.Lists; * TODO: look at switching to fork join. * @param <V> The time value that will be returned when the task is executed. */ -public abstract class TimedRunnable<V> implements Runnable { +public abstract class TimedCallable<V> implements Runnable { private static long TIMEOUT_PER_RUNNABLE_IN_MSECS = 15000; @@ -111,7 +111,7 @@ public abstract class TimedRunnable<V> implements Runnable { * @return The list of outcome objects. * @throws IOException All exceptions are coerced to IOException since this was build for storage system tasks initially. */ - public static <V> List<V> run(final String activity, final Logger logger, final List<TimedRunnable<V>> runnables, int parallelism) throws IOException { + public static <V> List<V> run(final String activity, final Logger logger, final List<TimedCallable<V>> runnables, int parallelism) throws IOException { Stopwatch watch = logger.isDebugEnabled() ? Stopwatch.createStarted() : null; long timedRunnableStart=System.nanoTime(); if(runnables.size() == 1){ @@ -122,7 +122,7 @@ public abstract class TimedRunnable<V> implements Runnable { final ExtendedLatch latch = new ExtendedLatch(runnables.size()); final ExecutorService threadPool = Executors.newFixedThreadPool(parallelism); try{ - for(TimedRunnable<V> runnable : runnables){ + for(TimedCallable<V> runnable : runnables){ threadPool.submit(new LatchedRunnable(latch, runnable)); } @@ -165,7 +165,7 @@ public abstract class TimedRunnable<V> implements Runnable { long latestStart=0; long totalStart=0; IOException excep = null; - for(final TimedRunnable<V> reader : runnables){ + for(final TimedCallable<V> reader : runnables){ try{ values.add(reader.getValue()); sum += reader.getTimeSpentNanos(); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/FooterGatherer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/FooterGatherer.java index 3ba6ff0..91219c7 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/FooterGatherer.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/FooterGatherer.java @@ -23,7 +23,7 @@ import java.util.Arrays; import java.util.List; import org.apache.commons.lang3.ArrayUtils; -import org.apache.drill.exec.store.TimedRunnable; +import org.apache.drill.exec.store.TimedCallable; import org.apache.drill.exec.util.DrillFileSystemUtil; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; @@ -66,7 +66,7 @@ public class FooterGatherer { } public static List<Footer> getFooters(final Configuration conf, List<FileStatus> statuses, int parallelism) throws IOException { - final List<TimedRunnable<Footer>> readers = Lists.newArrayList(); + final List<TimedCallable<Footer>> readers = Lists.newArrayList(); List<Footer> foundFooters = Lists.newArrayList(); for (FileStatus status : statuses) { @@ -92,14 +92,14 @@ public class FooterGatherer { } if(!readers.isEmpty()){ - foundFooters.addAll(TimedRunnable.run("Fetch Parquet Footers", logger, readers, parallelism)); + foundFooters.addAll(TimedCallable.run("Fetch Parquet Footers", logger, readers, parallelism)); } return foundFooters; } - private static class FooterReader extends TimedRunnable<Footer>{ + private static class FooterReader extends TimedCallable<Footer> { final Configuration conf; final FileStatus status; diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/Metadata.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/Metadata.java index cdb28c2..24bde1d 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/Metadata.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/Metadata.java @@ -30,7 +30,7 @@ import com.google.common.collect.Maps; import org.apache.commons.lang3.tuple.Pair; import org.apache.drill.common.expression.SchemaPath; import org.apache.drill.common.util.DrillVersionInfo; -import org.apache.drill.exec.store.TimedRunnable; +import org.apache.drill.exec.store.TimedCallable; import org.apache.drill.exec.store.dfs.MetadataContext; import org.apache.drill.exec.store.parquet.ParquetFormatConfig; import org.apache.drill.exec.store.parquet.ParquetReaderUtility; @@ -333,19 +333,19 @@ public class Metadata { private List<ParquetFileMetadata_v3> getParquetFileMetadata_v3( ParquetTableMetadata_v3 parquetTableMetadata_v3, Map<FileStatus, FileSystem> fileStatusMap) throws IOException { - List<TimedRunnable<ParquetFileMetadata_v3>> gatherers = fileStatusMap.entrySet().stream() + List<TimedCallable<ParquetFileMetadata_v3>> gatherers = fileStatusMap.entrySet().stream() .map(e -> new MetadataGatherer(parquetTableMetadata_v3, e.getKey(), e.getValue())) .collect(Collectors.toList()); List<ParquetFileMetadata_v3> metaDataList = new ArrayList<>(); - metaDataList.addAll(TimedRunnable.run("Fetch parquet metadata", logger, gatherers, 16)); + metaDataList.addAll(TimedCallable.run("Fetch parquet metadata", logger, gatherers, 16)); return metaDataList; } /** * TimedRunnable that reads the footer from parquet and collects file metadata */ - private class MetadataGatherer extends TimedRunnable<ParquetFileMetadata_v3> { + private class MetadataGatherer extends TimedCallable<ParquetFileMetadata_v3> { private final ParquetTableMetadata_v3 parquetTableMetadata; private final FileStatus fileStatus; diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/schedule/BlockMapBuilder.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/schedule/BlockMapBuilder.java index 942afa1..6bd2ede 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/schedule/BlockMapBuilder.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/schedule/BlockMapBuilder.java @@ -29,7 +29,7 @@ import java.util.concurrent.TimeUnit; import org.apache.drill.exec.metrics.DrillMetrics; import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint; -import org.apache.drill.exec.store.TimedRunnable; +import org.apache.drill.exec.store.TimedCallable; import org.apache.drill.exec.store.dfs.easy.FileWork; import org.apache.hadoop.fs.BlockLocation; import org.apache.hadoop.fs.FileStatus; @@ -70,11 +70,11 @@ public class BlockMapBuilder { public List<CompleteFileWork> generateFileWork(List<FileStatus> files, boolean blockify) throws IOException { - List<TimedRunnable<List<CompleteFileWork>>> readers = Lists.newArrayList(); + List<TimedCallable<List<CompleteFileWork>>> readers = Lists.newArrayList(); for(FileStatus status : files){ readers.add(new BlockMapReader(status, blockify)); } - List<List<CompleteFileWork>> work = TimedRunnable.run("Get block maps", logger, readers, 16); + List<List<CompleteFileWork>> work = TimedCallable.run("Get block maps", logger, readers, 16); List<CompleteFileWork> singleList = Lists.newArrayList(); for(List<CompleteFileWork> innerWorkList : work){ singleList.addAll(innerWorkList); @@ -84,7 +84,7 @@ public class BlockMapBuilder { } - private class BlockMapReader extends TimedRunnable<List<CompleteFileWork>> { + private class BlockMapReader extends TimedCallable<List<CompleteFileWork>> { final FileStatus status; // This variable blockify indicates if a single file can be read by multiple threads diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/TestTimedRunnable.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/TestTimedCallable.java similarity index 84% rename from exec/java-exec/src/test/java/org/apache/drill/exec/store/TestTimedRunnable.java rename to exec/java-exec/src/test/java/org/apache/drill/exec/store/TestTimedCallable.java index 27b1ed2..a34383d 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/TestTimedRunnable.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/TestTimedCallable.java @@ -35,16 +35,16 @@ import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertThat; /** - * Unit testing for {@link TimedRunnable}. + * Unit testing for {@link TimedCallable}. */ @Category({SlowTest.class}) -public class TestTimedRunnable extends DrillTest { - private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TestTimedRunnable.class); +public class TestTimedCallable extends DrillTest { + private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TestTimedCallable.class); @Rule public final TestRule TIMEOUT = TestTools.getTimeoutRule(180000); // 3mins - private static class TestTask extends TimedRunnable { + private static class TestTask extends TimedCallable { final long sleepTime; // sleep time in ms public TestTask(final long sleepTime) { @@ -69,13 +69,13 @@ public class TestTimedRunnable extends DrillTest { @Test public void withoutAnyTasksTriggeringTimeout() throws Exception { - List<TimedRunnable<TestTask>> tasks = Lists.newArrayList(); + List<TimedCallable<TestTask>> tasks = Lists.newArrayList(); for(int i=0; i<100; i++){ tasks.add(new TestTask(2000)); } - TimedRunnable.run("Execution without triggering timeout", logger, tasks, 16); + TimedCallable.run("Execution without triggering timeout", logger, tasks, 16); } @Test @@ -83,7 +83,7 @@ public class TestTimedRunnable extends DrillTest { UserException ex = null; try { - List<TimedRunnable<TestTask>> tasks = Lists.newArrayList(); + List<TimedCallable<TestTask>> tasks = Lists.newArrayList(); for (int i = 0; i < 100; i++) { if ((i & (i + 1)) == 0) { @@ -93,7 +93,7 @@ public class TestTimedRunnable extends DrillTest { } } - TimedRunnable.run("Execution with some tasks triggering timeout", logger, tasks, 16); + TimedCallable.run("Execution with some tasks triggering timeout", logger, tasks, 16); } catch (UserException e) { ex = e; } @@ -107,12 +107,12 @@ public class TestTimedRunnable extends DrillTest { @Test public void withManyTasks() throws Exception { - List<TimedRunnable<TestTask>> tasks = Lists.newArrayList(); + List<TimedCallable<TestTask>> tasks = Lists.newArrayList(); for (int i = 0; i < 150000; i++) { tasks.add(new TestTask(0)); } - TimedRunnable.run("Execution with lots of tasks", logger, tasks, 16); + TimedCallable.run("Execution with lots of tasks", logger, tasks, 16); } } -- To stop receiving notification emails like this one, please contact [email protected].
