Repository: hadoop Updated Branches: refs/heads/trunk 2a7f48599 -> c946f1b12
Revert "HDDS-692. Use the ProgressBar class in the RandomKeyGenerator freon test. Contributed by Zsolt Horvath." This reverts commit 2a7f4859912e83910f9a418f69ce6d4bd4a37815. Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/c946f1b1 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/c946f1b1 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/c946f1b1 Branch: refs/heads/trunk Commit: c946f1b1211ca29a16828ee6824a9ccefa0b1150 Parents: 2a7f485 Author: Márton Elek <[email protected]> Authored: Tue Nov 20 13:35:40 2018 +0100 Committer: Márton Elek <[email protected]> Committed: Tue Nov 20 13:35:40 2018 +0100 ---------------------------------------------------------------------- .../apache/hadoop/ozone/freon/ProgressBar.java | 227 ++++++++++++------- .../hadoop/ozone/freon/RandomKeyGenerator.java | 87 +++++-- .../hadoop/ozone/freon/TestProgressBar.java | 71 ++++-- 3 files changed, 270 insertions(+), 115 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/c946f1b1/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/ProgressBar.java ---------------------------------------------------------------------- diff --git a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/ProgressBar.java b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/ProgressBar.java index a987eea..a8d7e73 100644 --- a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/ProgressBar.java +++ b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/ProgressBar.java @@ -14,134 +14,197 @@ * License for the specific language governing permissions and limitations under * the License. */ - package org.apache.hadoop.ozone.freon; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import java.io.PrintStream; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Function; import java.util.function.Supplier; /** - * Creates and runs a ProgressBar in new Thread which gets printed on - * the provided PrintStream. + * Run an arbitrary code and print progress on the provided stream. The + * progressbar stops when: - the provided currentvalue is less the the maxvalue + * - exception thrown */ public class ProgressBar { - private static final Logger LOG = LoggerFactory.getLogger(ProgressBar.class); private static final long REFRESH_INTERVAL = 1000L; - private final long maxValue; - private final Supplier<Long> currentValue; - private final Thread progressBar; - - private volatile boolean running; - - private volatile long startTime; + private PrintStream stream; + private AtomicLong currentValue; + private long maxValue; + private Thread progressBar; + private volatile boolean exception = false; + private long startTime; /** - * Creates a new ProgressBar instance which prints the progress on the given - * PrintStream when started. - * - * @param stream to display the progress + * @param stream Used to display the progress * @param maxValue Maximum value of the progress - * @param currentValue Supplier that provides the current value */ - public ProgressBar(final PrintStream stream, final Long maxValue, - final Supplier<Long> currentValue) { + ProgressBar(PrintStream stream, long maxValue) { + this.stream = stream; this.maxValue = maxValue; - this.currentValue = currentValue; - this.progressBar = new Thread(getProgressBar(stream)); - this.running = false; + this.currentValue = new AtomicLong(0); + this.progressBar = new Thread(new ProgressBarThread()); } /** - * Starts the ProgressBar in a new Thread. - * This is a non blocking call. + * Start a task with a progessbar without any in/out parameters Runnable used + * just a task wrapper. + * + * @param task Runnable */ - public synchronized void start() { - if (!running) { - running = true; - startTime = System.nanoTime(); + public void start(Runnable task) { + + startTime = System.nanoTime(); + + try { + progressBar.start(); + task.run(); + + } catch (Exception e) { + exception = true; + } finally { + + try { + progressBar.join(); + } catch (InterruptedException e) { + e.printStackTrace(); + } } } /** - * Graceful shutdown, waits for the progress bar to complete. - * This is a blocking call. + * Start a task with only out parameters. + * + * @param task Supplier that represents the task + * @param <T> Generic return type + * @return Whatever the supllier produces */ - public synchronized void shutdown() { - if (running) { + public <T> T start(Supplier<T> task) { + + startTime = System.nanoTime(); + T result = null; + + try { + + progressBar.start(); + result = task.get(); + + } catch (Exception e) { + exception = true; + } finally { + try { progressBar.join(); - running = false; } catch (InterruptedException e) { - LOG.warn("Got interrupted while waiting for the progress bar to " + - "complete."); + e.printStackTrace(); } + + return result; } } /** - * Terminates the progress bar. This doesn't wait for the progress bar - * to complete. + * Start a task with in/out parameters. + * + * @param input Input of the function + * @param task A Function that does the task + * @param <T> type of the input + * @param <R> return type + * @return Whatever the Function returns */ - public synchronized void terminate() { - if (running) { + public <T, R> R start(T input, Function<T, R> task) { + + startTime = System.nanoTime(); + R result = null; + + try { + + progressBar.start(); + result = task.apply(input); + + } catch (Exception e) { + exception = true; + throw e; + } finally { + try { - running = false; progressBar.join(); } catch (InterruptedException e) { - LOG.warn("Got interrupted while waiting for the progress bar to " + - "complete."); + e.printStackTrace(); } + + return result; } } - private Runnable getProgressBar(final PrintStream stream) { - return () -> { - stream.println(); - while (running && currentValue.get() < maxValue) { - print(stream, currentValue.get()); - try { + /** + * Increment the progress with one step. + */ + public void incrementProgress() { + currentValue.incrementAndGet(); + } + + private class ProgressBarThread implements Runnable { + + @Override + public void run() { + try { + + stream.println(); + long value; + + while ((value = currentValue.get()) < maxValue) { + print(value); + + if (exception) { + break; + } Thread.sleep(REFRESH_INTERVAL); - } catch (InterruptedException e) { - LOG.warn("ProgressBar was interrupted."); } - } - print(stream, maxValue); - stream.println(); - running = false; - }; - } - /** - * Given current value prints the progress bar. - * - * @param value current progress position - */ - private void print(final PrintStream stream, final long value) { - stream.print('\r'); - double percent = 100.0 * value / maxValue; - StringBuilder sb = new StringBuilder(); - sb.append(" " + String.format("%.2f", percent) + "% |"); - - for (int i = 0; i <= percent; i++) { - sb.append('â'); + if (exception) { + stream.println(); + stream.println("Incomplete termination, " + "check log for " + + "exception."); + } else { + print(maxValue); + } + stream.println(); + } catch (InterruptedException e) { + stream.println(e); + } } - for (int j = 0; j < 100 - percent; j++) { - sb.append(' '); + + /** + * Given current value prints the progress bar. + * + * @param value current progress position + */ + private void print(long value) { + stream.print('\r'); + double percent = 100.0 * value / maxValue; + StringBuilder sb = new StringBuilder(); + sb.append(" " + String.format("%.2f", percent) + "% |"); + + for (int i = 0; i <= percent; i++) { + sb.append('â'); + } + for (int j = 0; j < 100 - percent; j++) { + sb.append(' '); + } + sb.append("| "); + sb.append(value + "/" + maxValue); + long timeInSec = TimeUnit.SECONDS.convert( + System.nanoTime() - startTime, TimeUnit.NANOSECONDS); + String timeToPrint = String.format("%d:%02d:%02d", timeInSec / 3600, + (timeInSec % 3600) / 60, timeInSec % 60); + sb.append(" Time: " + timeToPrint); + stream.print(sb.toString()); } - sb.append("| "); - sb.append(value + "/" + maxValue); - long timeInSec = TimeUnit.SECONDS.convert( - System.nanoTime() - startTime, TimeUnit.NANOSECONDS); - String timeToPrint = String.format("%d:%02d:%02d", timeInSec / 3600, - (timeInSec % 3600) / 60, timeInSec % 60); - sb.append(" Time: " + timeToPrint); - stream.print(sb.toString()); } -} \ No newline at end of file + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/c946f1b1/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/RandomKeyGenerator.java ---------------------------------------------------------------------- diff --git a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/RandomKeyGenerator.java b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/RandomKeyGenerator.java index ebaece28..184b075 100644 --- a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/RandomKeyGenerator.java +++ b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/RandomKeyGenerator.java @@ -185,7 +185,6 @@ public final class RandomKeyGenerator implements Callable<Void> { private ArrayList<Histogram> histograms = new ArrayList<>(); private OzoneConfiguration ozoneConfiguration; - private ProgressBar progressbar; RandomKeyGenerator() { } @@ -252,26 +251,13 @@ public final class RandomKeyGenerator implements Callable<Void> { validator.start(); LOG.info("Data validation is enabled."); } - - Supplier<Long> currentValue; - long maxValue; - - currentValue = () -> numberOfKeysAdded.get(); - maxValue = numOfVolumes * - numOfBuckets * - numOfKeys; - - progressbar = new ProgressBar(System.out, maxValue, currentValue); - + Thread progressbar = getProgressBarThread(); LOG.info("Starting progress bar Thread."); - progressbar.start(); - processor.shutdown(); processor.awaitTermination(Integer.MAX_VALUE, TimeUnit.MILLISECONDS); - - progressbar.shutdown(); - + completed = true; + progressbar.join(); if (validateWrites) { validator.join(); } @@ -910,6 +896,73 @@ public final class RandomKeyGenerator implements Callable<Void> { } } + private class ProgressBar implements Runnable { + + private static final long REFRESH_INTERVAL = 1000L; + + private PrintStream stream; + private Supplier<Long> currentValue; + private long maxValue; + + ProgressBar(PrintStream stream, Supplier<Long> currentValue, + long maxValue) { + this.stream = stream; + this.currentValue = currentValue; + this.maxValue = maxValue; + } + + @Override + public void run() { + try { + stream.println(); + long value; + while ((value = currentValue.get()) < maxValue) { + print(value); + if (completed) { + break; + } + Thread.sleep(REFRESH_INTERVAL); + } + if (exception) { + stream.println(); + stream.println("Incomplete termination, " + + "check log for exception."); + } else { + print(maxValue); + } + stream.println(); + } catch (InterruptedException e) { + } + } + + /** + * Given current value prints the progress bar. + * + * @param value + */ + private void print(long value) { + stream.print('\r'); + double percent = 100.0 * value / maxValue; + StringBuilder sb = new StringBuilder(); + sb.append(" " + String.format("%.2f", percent) + "% |"); + + for (int i = 0; i <= percent; i++) { + sb.append('â'); + } + for (int j = 0; j < 100 - percent; j++) { + sb.append(' '); + } + sb.append("| "); + sb.append(value + "/" + maxValue); + long timeInSec = TimeUnit.SECONDS.convert( + System.nanoTime() - startTime, TimeUnit.NANOSECONDS); + String timeToPrint = String.format("%d:%02d:%02d", timeInSec / 3600, + (timeInSec % 3600) / 60, timeInSec % 60); + sb.append(" Time: " + timeToPrint); + stream.print(sb); + } + } + /** * Validates the write done in ozone cluster. */ http://git-wip-us.apache.org/repos/asf/hadoop/blob/c946f1b1/hadoop-ozone/tools/src/test/java/org/apache/hadoop/ozone/freon/TestProgressBar.java ---------------------------------------------------------------------- diff --git a/hadoop-ozone/tools/src/test/java/org/apache/hadoop/ozone/freon/TestProgressBar.java b/hadoop-ozone/tools/src/test/java/org/apache/hadoop/ozone/freon/TestProgressBar.java index 90366da..cea7be8 100644 --- a/hadoop-ozone/tools/src/test/java/org/apache/hadoop/ozone/freon/TestProgressBar.java +++ b/hadoop-ozone/tools/src/test/java/org/apache/hadoop/ozone/freon/TestProgressBar.java @@ -22,15 +22,12 @@ import org.junit.runner.RunWith; import org.mockito.junit.MockitoJUnitRunner; import java.io.PrintStream; -import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Function; import java.util.function.Supplier; -import java.util.stream.LongStream; +import java.util.stream.IntStream; import static org.mockito.Mockito.*; -/** - * Using Mockito runner. - */ @RunWith(MockitoJUnitRunner.class) /** * Tests for the Progressbar class for Freon. @@ -38,36 +35,78 @@ import static org.mockito.Mockito.*; public class TestProgressBar { private PrintStream stream; - private AtomicLong numberOfKeysAdded; - private Supplier<Long> currentValue; @Before public void setupMock() { - numberOfKeysAdded = new AtomicLong(0L); - currentValue = () -> numberOfKeysAdded.get(); stream = mock(PrintStream.class); } @Test public void testWithRunnable() { - Long maxValue = 10L; + int maxValue = 10; - ProgressBar progressbar = new ProgressBar(stream, maxValue, currentValue); + ProgressBar progressbar = new ProgressBar(stream, maxValue); Runnable task = () -> { - LongStream.range(0, maxValue).forEach( + IntStream.range(0, maxValue).forEach( counter -> { - numberOfKeysAdded.getAndIncrement(); + progressbar.incrementProgress(); } ); }; - progressbar.start(); - task.run(); - progressbar.shutdown(); + progressbar.start(task); + + verify(stream, atLeastOnce()).print(anyChar()); + verify(stream, atLeastOnce()).print(anyString()); + } + + @Test + public void testWithSupplier() { + + int maxValue = 10; + + ProgressBar progressbar = new ProgressBar(stream, maxValue); + + Supplier<Long> tasks = () -> { + IntStream.range(0, maxValue).forEach( + counter -> { + progressbar.incrementProgress(); + } + ); + + return 1L; //return the result of the dummy task + }; + + progressbar.start(tasks); verify(stream, atLeastOnce()).print(anyChar()); verify(stream, atLeastOnce()).print(anyString()); } + + @Test + public void testWithFunction() { + + int maxValue = 10; + Long result; + + ProgressBar progressbar = new ProgressBar(stream, maxValue); + + Function<Long, String> task = (Long l) -> { + IntStream.range(0, maxValue).forEach( + counter -> { + progressbar.incrementProgress(); + } + ); + + return "dummy result"; //return the result of the dummy task + }; + + progressbar.start(1L, task); + + verify(stream, atLeastOnce()).print(anyChar()); + verify(stream, atLeastOnce()).print(anyString()); + } + } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
