Github user neykov commented on a diff in the pull request: https://github.com/apache/brooklyn-server/pull/493#discussion_r98035197 --- Diff: utils/common/src/main/java/org/apache/brooklyn/util/repeat/Repeater.java --- @@ -301,89 +332,96 @@ public Repeater limitTimeTo(Duration duration) { public boolean run() { return runKeepingError().getWithoutError(); } - + public void runRequiringTrue() { Stopwatch timer = Stopwatch.createStarted(); ReferenceWithError<Boolean> result = runKeepingError(); result.checkNoError(); - if (!result.get()) + if (!result.get()) { throw new IllegalStateException(description+" unsatisfied after "+Duration.of(timer)); + } } - + public ReferenceWithError<Boolean> runKeepingError() { - Preconditions.checkState(body != null, "repeat() method has not been called to set the body"); - Preconditions.checkState(exitCondition != null, "until() method has not been called to set the exit condition"); - Preconditions.checkState(delayOnIteration != null, "every() method (or other delaySupplier() / backoff() method) has not been called to set the loop delay"); + Preconditions.checkNotNull(exitCondition, "until() method has not been called to set the exit condition"); + Preconditions.checkNotNull(delayOnIteration, "every() method (or other delaySupplier() / backoff() method) has not been called to set the loop delay"); boolean hasLoggedTransientException = false; Throwable lastError = null; int iterations = 0; CountdownTimer timer = timeLimit!=null ? CountdownTimer.newInstanceStarted(timeLimit) : CountdownTimer.newInstancePaused(Duration.PRACTICALLY_FOREVER); - while (true) { - Duration delayThisIteration = delayOnIteration.apply(iterations); - iterations++; - - try { - body.call(); - } catch (Throwable e) { - log.warn(description, e); - if (rethrowImmediatelyCondition.apply(e)) throw Exceptions.propagate(e); - } - - boolean done = false; - try { - lastError = null; - done = exitCondition.call(); - hasLoggedTransientException = false; - } catch (Throwable e) { - if (hasLoggedTransientException) { - if (log.isDebugEnabled()) log.debug(description + " (repeated failure; excluding stacktrace): " + e); - } else { - if (log.isDebugEnabled()) log.debug(description, e); - hasLoggedTransientException = true; + try { + while (true) { + Duration delayThisIteration = delayOnIteration.apply(iterations); + iterations++; + + Future<?> call = executor.submit(body); + try { + call.get(delayThisIteration.toMilliseconds(), TimeUnit.MILLISECONDS); + } catch (Throwable e) { + log.warn(description, e); + if (rethrowImmediatelyCondition.apply(e)) throw Exceptions.propagate(e); + } finally { + call.cancel(true); } - lastError = e; - if (rethrowImmediatelyCondition.apply(e)) throw Exceptions.propagate(e); - } - if (done) { - if (log.isDebugEnabled()) log.debug("{}: condition satisfied", description); - return ReferenceWithError.newInstanceWithoutError(true); - } else { - if (log.isDebugEnabled()) { - String msg = String.format("%s: unsatisfied during iteration %s %s", description, iterations, - (iterationLimit > 0 ? "(max "+iterationLimit+" attempts)" : "") + - (timer.isNotPaused() ? "("+Time.makeTimeStringRounded(timer.getDurationRemaining())+" remaining)" : "")); - if (iterations == 1) { - log.debug(msg); + + boolean done = false; + try { + lastError = null; + done = exitCondition.call(); + hasLoggedTransientException = false; + } catch (Throwable e) { + if (hasLoggedTransientException) { + log.debug("{}: repeated failure; excluding stacktrace: {}", description, e); } else { - log.trace(msg); + log.debug(description, e); + hasLoggedTransientException = true; } + lastError = e; + if (rethrowImmediatelyCondition.apply(e)) throw Exceptions.propagate(e); } - } - - if (iterationLimit > 0 && iterations >= iterationLimit) { - if (log.isDebugEnabled()) log.debug("{}: condition not satisfied and exceeded iteration limit", description); - if (rethrowException && lastError != null) { - log.warn("{}: error caught checking condition (rethrowing): {}", description, lastError.getMessage()); - throw Exceptions.propagate(lastError); + if (done) { + log.debug("{}: condition satisfied", description); + return ReferenceWithError.newInstanceWithoutError(true); + } else { + if (log.isDebugEnabled()) { + String msg = String.format("%s: unsatisfied during iteration %s %s", description, iterations, + (iterationLimit > 0 ? "(max "+iterationLimit+" attempts)" : "") + + (timer.isNotPaused() ? "("+Time.makeTimeStringRounded(timer.getDurationRemaining())+" remaining)" : "")); + if (iterations == 1) { + log.debug(msg); + } else { + log.trace(msg); + } + } } - if (warnOnUnRethrownException && lastError != null) - log.warn("{}: error caught checking condition: {}", description, lastError.getMessage()); - return ReferenceWithError.newInstanceMaskingError(false, lastError); - } - if (timer.isExpired()) { - if (log.isDebugEnabled()) log.debug("{}: condition not satisfied, with {} elapsed (limit {})", - new Object[] { description, Time.makeTimeStringRounded(timer.getDurationElapsed()), Time.makeTimeStringRounded(timeLimit) }); - if (rethrowException && lastError != null) { - log.error("{}: error caught checking condition: {}", description, lastError.getMessage()); - throw Exceptions.propagate(lastError); + if (iterationLimit > 0 && iterations >= iterationLimit) { + log.debug("{}: condition not satisfied and exceeded iteration limit", description); + if (rethrowException && lastError != null) { + log.warn("{}: error caught checking condition (rethrowing): {}", description, lastError.getMessage()); + throw Exceptions.propagate(lastError); + } + if (warnOnUnRethrownException && lastError != null) + log.warn("{}: error caught checking condition: {}", description, lastError.getMessage()); + return ReferenceWithError.newInstanceMaskingError(false, lastError); } - return ReferenceWithError.newInstanceMaskingError(false, lastError); + + if (timer.isExpired()) { + log.debug("{}: condition not satisfied, with {} elapsed (limit {})", + new Object[] { description, Time.makeTimeStringRounded(timer.getDurationElapsed()), Time.makeTimeStringRounded(timeLimit) }); + if (rethrowException && lastError != null) { + log.error("{}: error caught checking condition: {}", description, lastError.getMessage()); + throw Exceptions.propagate(lastError); + } + return ReferenceWithError.newInstanceMaskingError(false, lastError); + } + + Time.sleep(delayThisIteration); } - - Time.sleep(delayThisIteration); + } finally { + executor.shutdownNow(); --- End diff -- Don't shut down if passed from caller.
--- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---