Repository: samza Updated Branches: refs/heads/master 4660d4ddd -> 715b67d08
SAMZA-1043: Samza performance improvements Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/715b67d0 Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/715b67d0 Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/715b67d0 Branch: refs/heads/master Commit: 715b67d08165418acdfa4ade3cd5f97a0b62e098 Parents: 4660d4d Author: Xinyu Liu <[email protected]> Authored: Wed Nov 9 11:09:32 2016 -0800 Committer: Xinyu Liu <[email protected]> Committed: Wed Nov 9 11:09:32 2016 -0800 ---------------------------------------------------------------------- .../apache/samza/container/RunLoopFactory.java | 28 ++++-- .../org/apache/samza/task/AsyncRunLoop.java | 89 +++++++++++--------- .../apache/samza/task/CoordinatorRequests.java | 16 ++-- .../org/apache/samza/task/TaskCallbackImpl.java | 7 +- .../apache/samza/task/TaskCallbackManager.java | 41 +++++---- .../apache/samza/util/HighResolutionClock.java | 12 +-- .../samza/util/SystemHighResolutionClock.java | 16 +--- .../apache/samza/util/ThrottlingExecutor.java | 21 ++++- .../main/java/org/apache/samza/util/Utils.java | 59 ------------- .../org/apache/samza/config/MetricsConfig.scala | 14 ++- .../apache/samza/container/SamzaContainer.scala | 17 +++- .../apache/samza/system/SystemConsumers.scala | 28 ++---- .../main/scala/org/apache/samza/util/Util.scala | 7 ++ .../org/apache/samza/task/TestAsyncRunLoop.java | 8 +- .../samza/task/TestAsyncStreamAdapter.java | 6 +- .../apache/samza/task/TestTaskCallbackImpl.java | 2 +- .../samza/task/TestTaskCallbackManager.java | 35 ++++---- .../samza/util/TestThrottlingExecutor.java | 32 +++---- .../kv/BaseKeyValueStorageEngineFactory.scala | 15 +++- 19 files changed, 223 insertions(+), 230 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/715b67d0/samza-core/src/main/java/org/apache/samza/container/RunLoopFactory.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/container/RunLoopFactory.java b/samza-core/src/main/java/org/apache/samza/container/RunLoopFactory.java index 609a956..1c66c82 100644 --- a/samza-core/src/main/java/org/apache/samza/container/RunLoopFactory.java +++ b/samza-core/src/main/java/org/apache/samza/container/RunLoopFactory.java @@ -22,6 +22,7 @@ package org.apache.samza.container; import java.util.concurrent.ExecutorService; import org.apache.samza.SamzaException; import org.apache.samza.config.TaskConfig; +import org.apache.samza.util.HighResolutionClock; import org.apache.samza.system.SystemConsumers; import org.apache.samza.task.AsyncRunLoop; import org.apache.samza.task.AsyncStreamTask; @@ -29,10 +30,10 @@ import org.apache.samza.task.StreamTask; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import scala.collection.JavaConversions; +import scala.runtime.AbstractFunction0; import scala.runtime.AbstractFunction1; -import static org.apache.samza.util.Utils.defaultClock; -import static org.apache.samza.util.Utils.defaultValue; +import static org.apache.samza.util.Util.asScalaClock; /** * Factory class to create runloop for a Samza task, based on the type @@ -50,7 +51,8 @@ public class RunLoopFactory { ExecutorService threadPool, long maxThrottlingDelayMs, SamzaContainerMetrics containerMetrics, - TaskConfig config) { + TaskConfig config, + HighResolutionClock clock) { long taskWindowMs = config.getWindowMs().getOrElse(defaultValue(DEFAULT_WINDOW_MS)); @@ -83,7 +85,7 @@ public class RunLoopFactory { maxThrottlingDelayMs, taskWindowMs, taskCommitMs, - defaultClock()); + asScalaClock(() -> System.nanoTime())); } else { Integer taskMaxConcurrency = config.getMaxConcurrency().getOrElse(defaultValue(1)); @@ -106,7 +108,23 @@ public class RunLoopFactory { taskCommitMs, callbackTimeout, maxThrottlingDelayMs, - containerMetrics); + containerMetrics, + clock); } } + + /** + * Returns a default value object for scala option.getOrDefault() to use + * @param value default value + * @param <T> value type + * @return object containing default value + */ + public static <T> AbstractFunction0<T> defaultValue(final T value) { + return new AbstractFunction0<T>() { + @Override + public T apply() { + return value; + } + }; + } } http://git-wip-us.apache.org/repos/asf/samza/blob/715b67d0/samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java b/samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java index 8fac815..ba1e1d9 100644 --- a/samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java +++ b/samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java @@ -39,6 +39,7 @@ import org.apache.samza.container.SamzaContainerMetrics; import org.apache.samza.container.TaskInstance; import org.apache.samza.container.TaskInstanceMetrics; import org.apache.samza.container.TaskName; +import org.apache.samza.util.HighResolutionClock; import org.apache.samza.system.IncomingMessageEnvelope; import org.apache.samza.system.SystemConsumers; import org.apache.samza.system.SystemStreamPartition; @@ -55,7 +56,7 @@ import scala.collection.JavaConversions; public class AsyncRunLoop implements Runnable, Throttleable { private static final Logger log = LoggerFactory.getLogger(AsyncRunLoop.class); - private final Map<TaskName, AsyncTaskWorker> taskWorkers; + private final List<AsyncTaskWorker> taskWorkers; private final SystemConsumers consumerMultiplexer; private final Map<SystemStreamPartition, List<AsyncTaskWorker>> sspToTaskWorkerMapping; @@ -72,6 +73,7 @@ public class AsyncRunLoop implements Runnable, Throttleable { private final ThrottlingScheduler callbackExecutor; private volatile boolean shutdownNow = false; private volatile Throwable throwable = null; + private final HighResolutionClock clock; public AsyncRunLoop(Map<TaskName, TaskInstance<AsyncStreamTask>> taskInstances, ExecutorService threadPool, @@ -81,7 +83,8 @@ public class AsyncRunLoop implements Runnable, Throttleable { long commitMs, long callbackTimeoutMs, long maxThrottlingDelayMs, - SamzaContainerMetrics containerMetrics) { + SamzaContainerMetrics containerMetrics, + HighResolutionClock clock) { this.threadPool = threadPool; this.consumerMultiplexer = consumerMultiplexer; @@ -95,13 +98,14 @@ public class AsyncRunLoop implements Runnable, Throttleable { this.coordinatorRequests = new CoordinatorRequests(taskInstances.keySet()); this.latch = new Object(); this.workerTimer = Executors.newSingleThreadScheduledExecutor(); + this.clock = clock; Map<TaskName, AsyncTaskWorker> workers = new HashMap<>(); for (TaskInstance<AsyncStreamTask> task : taskInstances.values()) { workers.put(task.taskName(), new AsyncTaskWorker(task)); } // Partions and tasks assigned to the container will not change during the run loop life time - this.taskWorkers = Collections.unmodifiableMap(workers); - this.sspToTaskWorkerMapping = Collections.unmodifiableMap(getSspToAsyncTaskWorkerMap(taskInstances, taskWorkers)); + this.sspToTaskWorkerMapping = Collections.unmodifiableMap(getSspToAsyncTaskWorkerMap(taskInstances, workers)); + this.taskWorkers = Collections.unmodifiableList(new ArrayList<>(workers.values())); } /** @@ -130,11 +134,11 @@ public class AsyncRunLoop implements Runnable, Throttleable { @Override public void run() { try { - for (AsyncTaskWorker taskWorker : taskWorkers.values()) { + for (AsyncTaskWorker taskWorker : taskWorkers) { taskWorker.init(); } - long prevNs = System.nanoTime(); + long prevNs = clock.nanoTime(); while (!shutdownNow) { if (throwable != null) { @@ -142,25 +146,29 @@ public class AsyncRunLoop implements Runnable, Throttleable { throw new SamzaException(throwable); } - long startNs = System.nanoTime(); + long startNs = clock.nanoTime(); IncomingMessageEnvelope envelope = chooseEnvelope(); - long chooseNs = System.nanoTime(); + long chooseNs = clock.nanoTime(); containerMetrics.chooseNs().update(chooseNs - startNs); runTasks(envelope); - long blockNs = System.nanoTime(); + long blockNs = clock.nanoTime(); blockIfBusy(envelope); - long currentNs = System.nanoTime(); + long currentNs = clock.nanoTime(); long activeNs = blockNs - chooseNs; long totalNs = currentNs - prevNs; prevNs = currentNs; containerMetrics.blockNs().update(currentNs - blockNs); - containerMetrics.utilization().set(((double) activeNs) / totalNs); + + if (totalNs != 0) { + // totalNs is not 0 if timer metrics are enabled + containerMetrics.utilization().set(((double) activeNs) / totalNs); + } } } finally { workerTimer.shutdown(); @@ -214,7 +222,7 @@ public class AsyncRunLoop implements Runnable, Throttleable { } } - for (AsyncTaskWorker worker: taskWorkers.values()) { + for (AsyncTaskWorker worker: taskWorkers) { worker.run(); } } @@ -227,7 +235,7 @@ public class AsyncRunLoop implements Runnable, Throttleable { private void blockIfBusy(IncomingMessageEnvelope envelope) { synchronized (latch) { while (!shutdownNow && throwable == null) { - for (AsyncTaskWorker worker : taskWorkers.values()) { + for (AsyncTaskWorker worker : taskWorkers) { if (worker.state.isReady()) { // should continue running if any worker state is ready // consumerMultiplexer will block on polling for empty partitions so it won't cause busy loop @@ -310,7 +318,7 @@ public class AsyncRunLoop implements Runnable, Throttleable { AsyncTaskWorker(TaskInstance<AsyncStreamTask> task) { this.task = task; - this.callbackManager = new TaskCallbackManager(this, task.metrics(), callbackTimer, callbackTimeoutMs); + this.callbackManager = new TaskCallbackManager(this, callbackTimer, callbackTimeoutMs, maxConcurrency, clock); Set<SystemStreamPartition> sspSet = getWorkingSSPSet(task); this.state = new AsyncTaskState(task.taskName(), task.metrics(), sspSet); } @@ -430,9 +438,9 @@ public class AsyncRunLoop implements Runnable, Throttleable { containerMetrics.windows().inc(); ReadableCoordinator coordinator = new ReadableCoordinator(task.taskName()); - long startTime = System.nanoTime(); + long startTime = clock.nanoTime(); task.window(coordinator); - containerMetrics.windowNs().update(System.nanoTime() - startTime); + containerMetrics.windowNs().update(clock.nanoTime() - startTime); coordinatorRequests.update(coordinator); state.doneWindowOrCommit(); @@ -466,9 +474,9 @@ public class AsyncRunLoop implements Runnable, Throttleable { try { containerMetrics.commits().inc(); - long startTime = System.nanoTime(); + long startTime = clock.nanoTime(); task.commit(); - containerMetrics.commitNs().update(System.nanoTime() - startTime); + containerMetrics.commitNs().update(clock.nanoTime() - startTime); state.doneWindowOrCommit(); } catch (Throwable t) { @@ -497,17 +505,17 @@ public class AsyncRunLoop implements Runnable, Throttleable { */ @Override public void onComplete(final TaskCallback callback) { - long workNanos = System.nanoTime() - ((TaskCallbackImpl) callback).timeCreatedNs; + long workNanos = clock.nanoTime() - ((TaskCallbackImpl) callback).timeCreatedNs; callbackExecutor.schedule(new Runnable() { @Override public void run() { try { state.doneProcess(); TaskCallbackImpl callbackImpl = (TaskCallbackImpl) callback; - containerMetrics.processNs().update(System.nanoTime() - callbackImpl.timeCreatedNs); + containerMetrics.processNs().update(clock.nanoTime() - callbackImpl.timeCreatedNs); log.trace("Got callback complete for task {}, ssp {}", callbackImpl.taskName, callbackImpl.envelope.getSystemStreamPartition()); - TaskCallbackImpl callbackToUpdate = callbackManager.updateCallback(callbackImpl, true); + TaskCallbackImpl callbackToUpdate = callbackManager.updateCallback(callbackImpl); if (callbackToUpdate != null) { IncomingMessageEnvelope envelope = callbackToUpdate.envelope; log.trace("Update offset for ssp {}, offset {}", envelope.getSystemStreamPartition(), envelope.getOffset()); @@ -540,7 +548,6 @@ public class AsyncRunLoop implements Runnable, Throttleable { abort(t); // update pending count, but not offset TaskCallbackImpl callbackImpl = (TaskCallbackImpl) callback; - callbackManager.updateCallback(callbackImpl, false); log.error("Got callback failure for task {}", callbackImpl.taskName); } catch (Throwable e) { log.error(e.getMessage(), e); @@ -564,7 +571,7 @@ public class AsyncRunLoop implements Runnable, Throttleable { private volatile boolean endOfStream = false; private volatile boolean windowOrCommitInFlight = false; private final AtomicInteger messagesInFlight = new AtomicInteger(0); - private final ArrayDeque<PendingEnvelope> pendingEnvelopQueue; + private final ArrayDeque<PendingEnvelope> pendingEnvelopeQueue; //Set of SSPs that we are currently processing for this task instance private final Set<SystemStreamPartition> processingSspSet; private final TaskName taskName; @@ -573,21 +580,20 @@ public class AsyncRunLoop implements Runnable, Throttleable { AsyncTaskState(TaskName taskName, TaskInstanceMetrics taskMetrics, Set<SystemStreamPartition> sspSet) { this.taskName = taskName; this.taskMetrics = taskMetrics; - this.pendingEnvelopQueue = new ArrayDeque<>(); + this.pendingEnvelopeQueue = new ArrayDeque<>(); this.processingSspSet = sspSet; } private boolean checkEndOfStream() { - PendingEnvelope pendingEnvelope = pendingEnvelopQueue.peek(); - - if (pendingEnvelope != null) { + if (pendingEnvelopeQueue.size() == 1) { + PendingEnvelope pendingEnvelope = pendingEnvelopeQueue.peek(); IncomingMessageEnvelope envelope = pendingEnvelope.envelope; if (envelope.isEndOfStream()) { SystemStreamPartition ssp = envelope.getSystemStreamPartition(); processingSspSet.remove(ssp); - pendingEnvelopQueue.remove(); + pendingEnvelopeQueue.remove(); } } return processingSspSet.isEmpty(); @@ -597,8 +603,13 @@ public class AsyncRunLoop implements Runnable, Throttleable { * Returns whether the task is ready to do process/window/commit. */ private boolean isReady() { - endOfStream |= checkEndOfStream(); - needCommit |= coordinatorRequests.commitRequests().remove(taskName); + if (checkEndOfStream()) { + endOfStream = true; + } + if (coordinatorRequests.commitRequests().remove(taskName)) { + needCommit = true; + } + if (needWindow || needCommit || endOfStream) { // ready for window or commit only when no messages are in progress and // no window/commit in flight @@ -621,7 +632,7 @@ public class AsyncRunLoop implements Runnable, Throttleable { if (needCommit) return WorkerOp.COMMIT; else if (needWindow) return WorkerOp.WINDOW; else if (endOfStream) return WorkerOp.END_OF_STREAM; - else if (!pendingEnvelopQueue.isEmpty()) return WorkerOp.PROCESS; + else if (!pendingEnvelopeQueue.isEmpty()) return WorkerOp.PROCESS; } return WorkerOp.NO_OP; } @@ -645,7 +656,8 @@ public class AsyncRunLoop implements Runnable, Throttleable { } private void startProcess() { - messagesInFlight.incrementAndGet(); + int count = messagesInFlight.incrementAndGet(); + taskMetrics.messagesInFlight().set(count); } private void doneWindowOrCommit() { @@ -653,7 +665,8 @@ public class AsyncRunLoop implements Runnable, Throttleable { } private void doneProcess() { - messagesInFlight.decrementAndGet(); + int count = messagesInFlight.decrementAndGet(); + taskMetrics.messagesInFlight().set(count); } /** @@ -662,8 +675,8 @@ public class AsyncRunLoop implements Runnable, Throttleable { * @param pendingEnvelope */ private void insertEnvelope(PendingEnvelope pendingEnvelope) { - pendingEnvelopQueue.add(pendingEnvelope); - int queueSize = pendingEnvelopQueue.size(); + pendingEnvelopeQueue.add(pendingEnvelope); + int queueSize = pendingEnvelopeQueue.size(); taskMetrics.pendingMessages().set(queueSize); log.trace("Insert envelope to task {} queue.", taskName); log.debug("Task {} pending envelope count is {} after insertion.", taskName, queueSize); @@ -682,8 +695,8 @@ public class AsyncRunLoop implements Runnable, Throttleable { * @return */ private IncomingMessageEnvelope fetchEnvelope() { - PendingEnvelope pendingEnvelope = pendingEnvelopQueue.remove(); - int queueSize = pendingEnvelopQueue.size(); + PendingEnvelope pendingEnvelope = pendingEnvelopeQueue.remove(); + int queueSize = pendingEnvelopeQueue.size(); taskMetrics.pendingMessages().set(queueSize); log.trace("fetch envelope ssp {} offset {} to process.", pendingEnvelope.envelope.getSystemStreamPartition(), pendingEnvelope.envelope.getOffset()); log.debug("Task {} pending envelopes count is {} after fetching.", taskName, queueSize); @@ -691,7 +704,7 @@ public class AsyncRunLoop implements Runnable, Throttleable { if (pendingEnvelope.markProcessed()) { SystemStreamPartition partition = pendingEnvelope.envelope.getSystemStreamPartition(); consumerMultiplexer.tryUpdate(partition); - log.debug("Update chooser for " + partition); + log.debug("Update chooser for {}", partition); } return pendingEnvelope.envelope; } http://git-wip-us.apache.org/repos/asf/samza/blob/715b67d0/samza-core/src/main/java/org/apache/samza/task/CoordinatorRequests.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/task/CoordinatorRequests.java b/samza-core/src/main/java/org/apache/samza/task/CoordinatorRequests.java index 052b3b9..0283d67 100644 --- a/samza-core/src/main/java/org/apache/samza/task/CoordinatorRequests.java +++ b/samza-core/src/main/java/org/apache/samza/task/CoordinatorRequests.java @@ -19,10 +19,8 @@ package org.apache.samza.task; -import java.util.Collections; -import java.util.HashSet; import java.util.Set; - +import java.util.concurrent.CopyOnWriteArraySet; import org.apache.samza.container.TaskName; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -37,8 +35,8 @@ public class CoordinatorRequests { private static final Logger log = LoggerFactory.getLogger(CoordinatorRequests.class); private final Set<TaskName> taskNames; - private final Set<TaskName> taskShutdownRequests = Collections.synchronizedSet(new HashSet<TaskName>()); - private final Set<TaskName> taskCommitRequests = Collections.synchronizedSet(new HashSet<TaskName>()); + private final Set<TaskName> taskShutdownRequests = new CopyOnWriteArraySet<>(); + private final Set<TaskName> taskCommitRequests = new CopyOnWriteArraySet<>(); volatile private boolean shutdownNow = false; public CoordinatorRequests(Set<TaskName> taskNames) { @@ -67,18 +65,18 @@ public class CoordinatorRequests { */ private void checkCoordinator(ReadableCoordinator coordinator) { if (coordinator.requestedCommitTask()) { - log.info("Task " + coordinator.taskName() + " requested commit for current task only"); + log.debug("Task {} requested commit for current task only", coordinator.taskName()); taskCommitRequests.add(coordinator.taskName()); } if (coordinator.requestedCommitAll()) { - log.info("Task " + coordinator.taskName() + " requested commit for all tasks in the container"); + log.debug("Task {} requested commit for all tasks in the container", coordinator.taskName()); taskCommitRequests.addAll(taskNames); } if (coordinator.requestedShutdownOnConsensus()) { taskShutdownRequests.add(coordinator.taskName()); - log.info("Shutdown has now been requested by tasks " + taskShutdownRequests); + log.info("Shutdown has now been requested by tasks {}", taskShutdownRequests); } if (coordinator.requestedShutdownNow() || taskShutdownRequests.size() == taskNames.size()) { @@ -86,4 +84,4 @@ public class CoordinatorRequests { shutdownNow = true; } } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/samza/blob/715b67d0/samza-core/src/main/java/org/apache/samza/task/TaskCallbackImpl.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/task/TaskCallbackImpl.java b/samza-core/src/main/java/org/apache/samza/task/TaskCallbackImpl.java index 9b70099..19b9f1c 100644 --- a/samza-core/src/main/java/org/apache/samza/task/TaskCallbackImpl.java +++ b/samza-core/src/main/java/org/apache/samza/task/TaskCallbackImpl.java @@ -48,13 +48,14 @@ class TaskCallbackImpl implements TaskCallback, Comparable<TaskCallbackImpl> { TaskName taskName, IncomingMessageEnvelope envelope, ReadableCoordinator coordinator, - long seqNum) { + long seqNum, + long timeCreatedNs) { this.listener = listener; this.taskName = taskName; this.envelope = envelope; this.coordinator = coordinator; this.seqNum = seqNum; - this.timeCreatedNs = System.nanoTime(); + this.timeCreatedNs = timeCreatedNs; } @Override @@ -101,4 +102,4 @@ class TaskCallbackImpl implements TaskCallback, Comparable<TaskCallbackImpl> { boolean matchSeqNum(long seqNum) { return this.seqNum == seqNum; } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/samza/blob/715b67d0/samza-core/src/main/java/org/apache/samza/task/TaskCallbackManager.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/task/TaskCallbackManager.java b/samza-core/src/main/java/org/apache/samza/task/TaskCallbackManager.java index 132cf59..5bce778 100644 --- a/samza-core/src/main/java/org/apache/samza/task/TaskCallbackManager.java +++ b/samza-core/src/main/java/org/apache/samza/task/TaskCallbackManager.java @@ -24,10 +24,9 @@ import java.util.Queue; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; -import org.apache.samza.container.TaskInstanceMetrics; import org.apache.samza.container.TaskName; import org.apache.samza.system.IncomingMessageEnvelope; +import org.apache.samza.util.HighResolutionClock; /** @@ -86,27 +85,29 @@ class TaskCallbackManager { } private long seqNum = 0L; - private final AtomicInteger pendingCount = new AtomicInteger(0); - private final TaskCallbacks completeCallbacks = new TaskCallbacks(); - private final TaskInstanceMetrics metrics; + private final TaskCallbacks completedCallbacks = new TaskCallbacks(); private final ScheduledExecutorService timer; private final TaskCallbackListener listener; - private long timeout; - - public TaskCallbackManager(TaskCallbackListener listener, TaskInstanceMetrics metrics, ScheduledExecutorService timer, long timeout) { + private final long timeout; + private final int maxConcurrency; + private final HighResolutionClock clock; + + public TaskCallbackManager(TaskCallbackListener listener, + ScheduledExecutorService timer, + long timeout, + int maxConcurrency, + HighResolutionClock clock) { this.listener = listener; - this.metrics = metrics; this.timer = timer; this.timeout = timeout; + this.maxConcurrency = maxConcurrency; + this.clock = clock; } public TaskCallbackImpl createCallback(TaskName taskName, IncomingMessageEnvelope envelope, ReadableCoordinator coordinator) { - final TaskCallbackImpl callback = new TaskCallbackImpl(listener, taskName, envelope, coordinator, seqNum++); - int count = pendingCount.incrementAndGet(); - metrics.messagesInFlight().set(count); - + final TaskCallbackImpl callback = new TaskCallbackImpl(listener, taskName, envelope, coordinator, seqNum++, clock.nanoTime()); if (timer != null) { Runnable timerTask = new Runnable() { @Override @@ -126,16 +127,14 @@ class TaskCallbackManager { * Update the task callbacks with the new callback completed. * It uses a high-watermark model to roll the callbacks for checkpointing. * @param callback new completed callback - * @param success callback result status * @return the callback for checkpointing */ - public TaskCallbackImpl updateCallback(TaskCallbackImpl callback, boolean success) { - TaskCallbackImpl callbackToCommit = null; - if (success) { - callbackToCommit = completeCallbacks.update(callback); + public TaskCallbackImpl updateCallback(TaskCallbackImpl callback) { + if (maxConcurrency > 1) { + // Use the completedCallbacks queue to handle the out-of-order case when max concurrency is larger than 1 + return completedCallbacks.update(callback); + } else { + return callback; } - int count = pendingCount.decrementAndGet(); - metrics.messagesInFlight().set(count); - return callbackToCommit; } } http://git-wip-us.apache.org/repos/asf/samza/blob/715b67d0/samza-core/src/main/java/org/apache/samza/util/HighResolutionClock.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/util/HighResolutionClock.java b/samza-core/src/main/java/org/apache/samza/util/HighResolutionClock.java index 69ba441..6d40149 100644 --- a/samza-core/src/main/java/org/apache/samza/util/HighResolutionClock.java +++ b/samza-core/src/main/java/org/apache/samza/util/HighResolutionClock.java @@ -25,7 +25,7 @@ package org.apache.samza.util; * <p> * Instances of this interface must be thread-safe. */ -interface HighResolutionClock { +public interface HighResolutionClock { /** * Returns a time point that can be used to calculate the difference in nanoseconds with another * time point. Resolution of the timer is platform dependent and not guaranteed to actually @@ -34,14 +34,4 @@ interface HighResolutionClock { * @return current time point in nanoseconds */ long nanoTime(); - - /** - * Sleeps for a period of time that approximates the requested number of nanoseconds. Actual sleep - * time can vary significantly based on the JVM implementation and platform. This function returns - * the measured error between expected and actual sleep time. - * - * @param nanos the number of nanoseconds to sleep. - * @throws InterruptedException if the current thread is interrupted while blocked in this method. - */ - long sleep(long nanos) throws InterruptedException; } http://git-wip-us.apache.org/repos/asf/samza/blob/715b67d0/samza-core/src/main/java/org/apache/samza/util/SystemHighResolutionClock.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/util/SystemHighResolutionClock.java b/samza-core/src/main/java/org/apache/samza/util/SystemHighResolutionClock.java index 2e65b60..6bfe7c6 100644 --- a/samza-core/src/main/java/org/apache/samza/util/SystemHighResolutionClock.java +++ b/samza-core/src/main/java/org/apache/samza/util/SystemHighResolutionClock.java @@ -19,23 +19,9 @@ package org.apache.samza.util; -import java.util.concurrent.TimeUnit; - class SystemHighResolutionClock implements HighResolutionClock { @Override public long nanoTime() { return System.nanoTime(); } - - @Override - public long sleep(long nanos) throws InterruptedException { - if (nanos <= 0) { - return nanos; - } - - final long start = System.nanoTime(); - TimeUnit.NANOSECONDS.sleep(nanos); - - return Util.clampAdd(nanos, -(System.nanoTime() - start)); - } -} +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/samza/blob/715b67d0/samza-core/src/main/java/org/apache/samza/util/ThrottlingExecutor.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/util/ThrottlingExecutor.java b/samza-core/src/main/java/org/apache/samza/util/ThrottlingExecutor.java index d1298fc..eb956f2 100644 --- a/samza-core/src/main/java/org/apache/samza/util/ThrottlingExecutor.java +++ b/samza-core/src/main/java/org/apache/samza/util/ThrottlingExecutor.java @@ -74,7 +74,7 @@ public class ThrottlingExecutor implements Throttleable, Executor { Util.clampAdd(pendingNanos, (long) (workNanos * currentWorkToIdleFactor))); if (pendingNanos > 0) { try { - pendingNanos = clock.sleep(pendingNanos); + pendingNanos = sleep(pendingNanos); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } @@ -122,4 +122,23 @@ public class ThrottlingExecutor implements Throttleable, Executor { void setPendingNanos(long pendingNanos) { this.pendingNanos = pendingNanos; } + + /** + * Sleeps for a period of time that approximates the requested number of nanoseconds. Actual sleep + * time can vary significantly based on the JVM implementation and platform. This function returns + * the measured error between expected and actual sleep time. + * + * @param nanos the number of nanoseconds to sleep. + * @throws InterruptedException if the current thread is interrupted while blocked in this method. + */ + long sleep(long nanos) throws InterruptedException { + if (nanos <= 0) { + return nanos; + } + + final long start = System.nanoTime(); + TimeUnit.NANOSECONDS.sleep(nanos); + + return Util.clampAdd(nanos, -(System.nanoTime() - start)); + } } http://git-wip-us.apache.org/repos/asf/samza/blob/715b67d0/samza-core/src/main/java/org/apache/samza/util/Utils.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/util/Utils.java b/samza-core/src/main/java/org/apache/samza/util/Utils.java deleted file mode 100644 index 472e0a5..0000000 --- a/samza-core/src/main/java/org/apache/samza/util/Utils.java +++ /dev/null @@ -1,59 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.samza.util; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import scala.runtime.AbstractFunction0; - - -public class Utils { - private static final Logger log = LoggerFactory.getLogger(Utils.class); - - private Utils() {} - - /** - * Returns a default value object for scala option.getOrDefault() to use - * @param value default value - * @param <T> value type - * @return object containing default value - */ - public static <T> AbstractFunction0<T> defaultValue(final T value) { - return new AbstractFunction0<T>() { - @Override - public T apply() { - return value; - } - }; - } - - /** - * Creates a nanosecond clock using default system nanotime - * @return object invokes the system clock - */ - public static AbstractFunction0<Object> defaultClock() { - return new AbstractFunction0<Object>() { - @Override - public Object apply() { - return System.nanoTime(); - } - }; - } -} http://git-wip-us.apache.org/repos/asf/samza/blob/715b67d0/samza-core/src/main/scala/org/apache/samza/config/MetricsConfig.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/config/MetricsConfig.scala b/samza-core/src/main/scala/org/apache/samza/config/MetricsConfig.scala index c3fd8bf..e9b6b76 100644 --- a/samza-core/src/main/scala/org/apache/samza/config/MetricsConfig.scala +++ b/samza-core/src/main/scala/org/apache/samza/config/MetricsConfig.scala @@ -18,7 +18,10 @@ */ package org.apache.samza.config -import scala.collection.JavaConversions._ + + +import org.apache.samza.util.HighResolutionClock + object MetricsConfig { // metrics config constants @@ -26,6 +29,7 @@ object MetricsConfig { val METRICS_REPORTER_FACTORY = "metrics.reporter.%s.class" val METRICS_SNAPSHOT_REPORTER_STREAM = "metrics.reporter.%s.stream" val METRICS_SNAPSHOT_REPORTER_INTERVAL= "metrics.reporter.%s.interval" + val METRICS_TIMER_ENABLED= "metrics.timer.enabled" implicit def Config2Metrics(config: Config) = new MetricsConfig(config) } @@ -53,4 +57,10 @@ class MetricsConfig(config: Config) extends ScalaMapConfig(config) { case _ => List[String]() } } -} + + /** + * Returns the flag to turn on/off the timer metrics. + * @return Boolean flag to enable the timer metrics + */ + def getMetricsTimerEnabled: Boolean = getBoolean(MetricsConfig.METRICS_TIMER_ENABLED, true) +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/samza/blob/715b67d0/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala index e0468ee..c35da92 100644 --- a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala +++ b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala @@ -74,7 +74,9 @@ import org.apache.samza.task.AsyncStreamTask import org.apache.samza.task.AsyncStreamTaskAdapter import org.apache.samza.task.StreamTask import org.apache.samza.task.TaskInstanceCollector +import org.apache.samza.util.HighResolutionClock import org.apache.samza.util.{ExponentialSleepStrategy, Logging, Throttleable, Util} +import org.apache.samza.util.Util.asScalaClock import scala.collection.JavaConversions._ @@ -150,6 +152,15 @@ object SamzaContainer extends Logging { val systemProducersMetrics = new SystemProducersMetrics(registry) val systemConsumersMetrics = new SystemConsumersMetrics(registry) val offsetManagerMetrics = new OffsetManagerMetrics(registry) + val clock = if (config.getMetricsTimerEnabled) { + new HighResolutionClock { + override def nanoTime(): Long = System.nanoTime() + } + } else { + new HighResolutionClock { + override def nanoTime(): Long = 0L + } + } val inputSystemStreamPartitions = containerModel .getTasks @@ -383,7 +394,8 @@ object SamzaContainer extends Logging { serdeManager = serdeManager, metrics = systemConsumersMetrics, dropDeserializationError = dropDeserializationError, - pollIntervalMs = pollIntervalMs) + pollIntervalMs = pollIntervalMs, + clock = clock) val producerMultiplexer = new SystemProducers( producers = producers, @@ -559,7 +571,8 @@ object SamzaContainer extends Logging { taskThreadPool, maxThrottlingDelayMs, samzaContainerMetrics, - config) + config, + clock) val memoryStatisticsMonitor : SystemStatisticsMonitor = new StatisticsMonitorImpl() memoryStatisticsMonitor.registerListener(new SystemStatisticsMonitor.Listener { http://git-wip-us.apache.org/repos/asf/samza/blob/715b67d0/samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala b/samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala index e2aed5b..17d163d 100644 --- a/samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala +++ b/samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala @@ -62,12 +62,12 @@ class SystemConsumers ( /** * The class that handles deserialization of incoming messages. */ - serdeManager: SerdeManager, + serdeManager: SerdeManager = new SerdeManager, /** * A helper class to hold all of SystemConsumers' metrics. */ - metrics: SystemConsumersMetrics, + metrics: SystemConsumersMetrics = new SystemConsumersMetrics, /** * If MessageChooser returns null when it's polled, SystemConsumers will @@ -76,14 +76,14 @@ class SystemConsumers ( * thread will sit in a tight loop polling every SystemConsumer over and * over again if no new messages are available. */ - noNewMessagesTimeout: Int, + noNewMessagesTimeout: Int = SystemConsumers.DEFAULT_NO_NEW_MESSAGES_TIMEOUT, /** * This parameter is to define how to deal with deserialization failure. If * set to true, the task will skip the messages when deserialization fails. * If set to false, the task will throw SamzaException and fail the container. */ - dropDeserializationError: Boolean, + dropDeserializationError: Boolean = SystemConsumers.DEFAULT_DROP_SERIALIZATION_ERROR, /** * <p>Defines an upper bound for how long the SystemConsumers will wait @@ -99,29 +99,13 @@ class SystemConsumers ( * with no remaining unprocessed messages, the SystemConsumers will poll for * it within 50ms of its availability in the stream system.</p> */ - val pollIntervalMs: Int, + val pollIntervalMs: Int = SystemConsumers.DEFAULT_POLL_INTERVAL_MS, /** * Clock can be used to inject a custom clock when mocking this class in * tests. The default implementation returns the current system clock time. */ - val clock: () => Long) extends Logging with TimerUtils { - - def this(chooser: MessageChooser, - consumers: Map[String, SystemConsumer], - serdeManager: SerdeManager = new SerdeManager, - metrics: SystemConsumersMetrics = new SystemConsumersMetrics, - noNewMessagesTimeout: Int = SystemConsumers.DEFAULT_NO_NEW_MESSAGES_TIMEOUT, - dropDeserializationError: Boolean = SystemConsumers.DEFAULT_DROP_SERIALIZATION_ERROR, - pollIntervalMs: Int = SystemConsumers.DEFAULT_POLL_INTERVAL_MS) = - this(chooser, - consumers, - serdeManager, - metrics, - noNewMessagesTimeout, - dropDeserializationError, - pollIntervalMs, - () => System.nanoTime()) + val clock: () => Long = () => System.nanoTime()) extends Logging with TimerUtils { /** * A buffer of incoming messages grouped by SystemStreamPartition. These http://git-wip-us.apache.org/repos/asf/samza/blob/715b67d0/samza-core/src/main/scala/org/apache/samza/util/Util.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/util/Util.scala b/samza-core/src/main/scala/org/apache/samza/util/Util.scala index c4836f2..9019d02 100644 --- a/samza-core/src/main/scala/org/apache/samza/util/Util.scala +++ b/samza-core/src/main/scala/org/apache/samza/util/Util.scala @@ -388,4 +388,11 @@ object Util extends Logging { sum } + + /** + * Implicitly convert the Java TimerClock to Scala clock function which returns long timestamp. + * @param c Java TimeClock + * @return Scala clock function + */ + implicit def asScalaClock(c: HighResolutionClock): () => Long = () => c.nanoTime() } http://git-wip-us.apache.org/repos/asf/samza/blob/715b67d0/samza-core/src/test/java/org/apache/samza/task/TestAsyncRunLoop.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/task/TestAsyncRunLoop.java b/samza-core/src/test/java/org/apache/samza/task/TestAsyncRunLoop.java index 6000ffa..cc3e1b7 100644 --- a/samza-core/src/test/java/org/apache/samza/task/TestAsyncRunLoop.java +++ b/samza-core/src/test/java/org/apache/samza/task/TestAsyncRunLoop.java @@ -31,7 +31,6 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; - import org.apache.samza.Partition; import org.apache.samza.checkpoint.OffsetManager; import org.apache.samza.config.Config; @@ -47,7 +46,6 @@ import org.apache.samza.system.SystemConsumer; import org.apache.samza.system.SystemConsumers; import org.apache.samza.system.SystemStreamPartition; import org.apache.samza.system.TestSystemConsumers; - import org.junit.Before; import org.junit.Test; import scala.Option; @@ -101,7 +99,8 @@ public class TestAsyncRunLoop { commitMs, callbackTimeoutMs, maxThrottlingDelayMs, - containerMetrics); + containerMetrics, + () -> 0L); } TaskInstance<AsyncStreamTask> createTaskInstance(AsyncStreamTask task, TaskName taskName, SystemStreamPartition ssp, OffsetManager manager, SystemConsumers consumers) { @@ -494,7 +493,8 @@ public class TestAsyncRunLoop { commitMs, callbackTimeoutMs, maxThrottlingDelayMs, - containerMetrics); + containerMetrics, + () -> 0L); runLoop.run(); callbackExecutor.awaitTermination(100, TimeUnit.MILLISECONDS); http://git-wip-us.apache.org/repos/asf/samza/blob/715b67d0/samza-core/src/test/java/org/apache/samza/task/TestAsyncStreamAdapter.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/task/TestAsyncStreamAdapter.java b/samza-core/src/test/java/org/apache/samza/task/TestAsyncStreamAdapter.java index 99e1e18..d0b820a 100644 --- a/samza-core/src/test/java/org/apache/samza/task/TestAsyncStreamAdapter.java +++ b/samza-core/src/test/java/org/apache/samza/task/TestAsyncStreamAdapter.java @@ -93,7 +93,7 @@ public class TestAsyncStreamAdapter { public void testAdapterWithoutThreadPool() throws Exception { taskAdaptor = new AsyncStreamTaskAdapter(task, null); TestCallbackListener listener = new TestCallbackListener(); - TaskCallback callback = new TaskCallbackImpl(listener, null, envelope, null, 0L); + TaskCallback callback = new TaskCallbackImpl(listener, null, envelope, null, 0L, 0L); taskAdaptor.init(null, null); assertTrue(task.inited); @@ -116,10 +116,10 @@ public class TestAsyncStreamAdapter { @Test public void testAdapterWithThreadPool() throws Exception { TestCallbackListener listener1 = new TestCallbackListener(); - TaskCallback callback1 = new TaskCallbackImpl(listener1, null, envelope, null, 0L); + TaskCallback callback1 = new TaskCallbackImpl(listener1, null, envelope, null, 0L, 0L); TestCallbackListener listener2 = new TestCallbackListener(); - TaskCallback callback2 = new TaskCallbackImpl(listener2, null, envelope, null, 1L); + TaskCallback callback2 = new TaskCallbackImpl(listener2, null, envelope, null, 1L, 0L); ExecutorService executor = Executors.newFixedThreadPool(2); taskAdaptor = new AsyncStreamTaskAdapter(task, executor); http://git-wip-us.apache.org/repos/asf/samza/blob/715b67d0/samza-core/src/test/java/org/apache/samza/task/TestTaskCallbackImpl.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/task/TestTaskCallbackImpl.java b/samza-core/src/test/java/org/apache/samza/task/TestTaskCallbackImpl.java index f1dbf35..732405b 100644 --- a/samza-core/src/test/java/org/apache/samza/task/TestTaskCallbackImpl.java +++ b/samza-core/src/test/java/org/apache/samza/task/TestTaskCallbackImpl.java @@ -61,7 +61,7 @@ public class TestTaskCallbackImpl { } }; - callback = new TaskCallbackImpl(listener, null, mock(IncomingMessageEnvelope.class), null, 0); + callback = new TaskCallbackImpl(listener, null, mock(IncomingMessageEnvelope.class), null, 0L, 0L); } @Test http://git-wip-us.apache.org/repos/asf/samza/blob/715b67d0/samza-core/src/test/java/org/apache/samza/task/TestTaskCallbackManager.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/task/TestTaskCallbackManager.java b/samza-core/src/test/java/org/apache/samza/task/TestTaskCallbackManager.java index d7110f3..b2ed316 100644 --- a/samza-core/src/test/java/org/apache/samza/task/TestTaskCallbackManager.java +++ b/samza-core/src/test/java/org/apache/samza/task/TestTaskCallbackManager.java @@ -48,8 +48,7 @@ public class TestTaskCallbackManager { public void onFailure(TaskCallback callback, Throwable t) { } }; - callbackManager = new TaskCallbackManager(listener, metrics, null, -1); - + callbackManager = new TaskCallbackManager(listener, null, -1, 2, () -> System.nanoTime()); } @Test @@ -68,15 +67,15 @@ public class TestTaskCallbackManager { ReadableCoordinator coordinator = new ReadableCoordinator(taskName); IncomingMessageEnvelope envelope0 = new IncomingMessageEnvelope(ssp, "0", null, null); - TaskCallbackImpl callback0 = new TaskCallbackImpl(listener, taskName, envelope0, coordinator, 0); - TaskCallbackImpl callbackToCommit = callbackManager.updateCallback(callback0, true); + TaskCallbackImpl callback0 = new TaskCallbackImpl(listener, taskName, envelope0, coordinator, 0, 0); + TaskCallbackImpl callbackToCommit = callbackManager.updateCallback(callback0); assertTrue(callbackToCommit.matchSeqNum(0)); assertEquals(ssp, callbackToCommit.envelope.getSystemStreamPartition()); assertEquals("0", callbackToCommit.envelope.getOffset()); IncomingMessageEnvelope envelope1 = new IncomingMessageEnvelope(ssp, "1", null, null); - TaskCallbackImpl callback1 = new TaskCallbackImpl(listener, taskName, envelope1, coordinator, 1); - callbackToCommit = callbackManager.updateCallback(callback1, true); + TaskCallbackImpl callback1 = new TaskCallbackImpl(listener, taskName, envelope1, coordinator, 1, 0); + callbackToCommit = callbackManager.updateCallback(callback1); assertTrue(callbackToCommit.matchSeqNum(1)); assertEquals(ssp, callbackToCommit.envelope.getSystemStreamPartition()); assertEquals("1", callbackToCommit.envelope.getOffset()); @@ -90,18 +89,18 @@ public class TestTaskCallbackManager { // simulate out of order IncomingMessageEnvelope envelope2 = new IncomingMessageEnvelope(ssp, "2", null, null); - TaskCallbackImpl callback2 = new TaskCallbackImpl(listener, taskName, envelope2, coordinator, 2); - TaskCallbackImpl callbackToCommit = callbackManager.updateCallback(callback2, true); + TaskCallbackImpl callback2 = new TaskCallbackImpl(listener, taskName, envelope2, coordinator, 2, 0); + TaskCallbackImpl callbackToCommit = callbackManager.updateCallback(callback2); assertNull(callbackToCommit); IncomingMessageEnvelope envelope1 = new IncomingMessageEnvelope(ssp, "1", null, null); - TaskCallbackImpl callback1 = new TaskCallbackImpl(listener, taskName, envelope1, coordinator, 1); - callbackToCommit = callbackManager.updateCallback(callback1, true); + TaskCallbackImpl callback1 = new TaskCallbackImpl(listener, taskName, envelope1, coordinator, 1, 0); + callbackToCommit = callbackManager.updateCallback(callback1); assertNull(callbackToCommit); IncomingMessageEnvelope envelope0 = new IncomingMessageEnvelope(ssp, "0", null, null); - TaskCallbackImpl callback0 = new TaskCallbackImpl(listener, taskName, envelope0, coordinator, 0); - callbackToCommit = callbackManager.updateCallback(callback0, true); + TaskCallbackImpl callback0 = new TaskCallbackImpl(listener, taskName, envelope0, coordinator, 0, 0); + callbackToCommit = callbackManager.updateCallback(callback0); assertTrue(callbackToCommit.matchSeqNum(2)); assertEquals(ssp, callbackToCommit.envelope.getSystemStreamPartition()); assertEquals("2", callbackToCommit.envelope.getOffset()); @@ -117,21 +116,21 @@ public class TestTaskCallbackManager { IncomingMessageEnvelope envelope2 = new IncomingMessageEnvelope(ssp, "2", null, null); ReadableCoordinator coordinator2 = new ReadableCoordinator(taskName); coordinator2.shutdown(TaskCoordinator.RequestScope.ALL_TASKS_IN_CONTAINER); - TaskCallbackImpl callback2 = new TaskCallbackImpl(listener, taskName, envelope2, coordinator2, 2); - TaskCallbackImpl callbackToCommit = callbackManager.updateCallback(callback2, true); + TaskCallbackImpl callback2 = new TaskCallbackImpl(listener, taskName, envelope2, coordinator2, 2, 0); + TaskCallbackImpl callbackToCommit = callbackManager.updateCallback(callback2); assertNull(callbackToCommit); IncomingMessageEnvelope envelope1 = new IncomingMessageEnvelope(ssp, "1", null, null); ReadableCoordinator coordinator1 = new ReadableCoordinator(taskName); coordinator1.commit(TaskCoordinator.RequestScope.CURRENT_TASK); - TaskCallbackImpl callback1 = new TaskCallbackImpl(listener, taskName, envelope1, coordinator1, 1); - callbackToCommit = callbackManager.updateCallback(callback1, true); + TaskCallbackImpl callback1 = new TaskCallbackImpl(listener, taskName, envelope1, coordinator1, 1, 0); + callbackToCommit = callbackManager.updateCallback(callback1); assertNull(callbackToCommit); IncomingMessageEnvelope envelope0 = new IncomingMessageEnvelope(ssp, "0", null, null); ReadableCoordinator coordinator = new ReadableCoordinator(taskName); - TaskCallbackImpl callback0 = new TaskCallbackImpl(listener, taskName, envelope0, coordinator, 0); - callbackToCommit = callbackManager.updateCallback(callback0, true); + TaskCallbackImpl callback0 = new TaskCallbackImpl(listener, taskName, envelope0, coordinator, 0, 0); + callbackToCommit = callbackManager.updateCallback(callback0); assertTrue(callbackToCommit.matchSeqNum(1)); assertEquals(ssp, callbackToCommit.envelope.getSystemStreamPartition()); assertEquals("1", callbackToCommit.envelope.getOffset()); http://git-wip-us.apache.org/repos/asf/samza/blob/715b67d0/samza-core/src/test/java/org/apache/samza/util/TestThrottlingExecutor.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/util/TestThrottlingExecutor.java b/samza-core/src/test/java/org/apache/samza/util/TestThrottlingExecutor.java index 0276e6b..ca500fb 100644 --- a/samza-core/src/test/java/org/apache/samza/util/TestThrottlingExecutor.java +++ b/samza-core/src/test/java/org/apache/samza/util/TestThrottlingExecutor.java @@ -28,6 +28,7 @@ import org.junit.Test; import java.util.concurrent.TimeUnit; + public class TestThrottlingExecutor { private static final long MAX_NANOS = Long.MAX_VALUE; @@ -44,7 +45,7 @@ public class TestThrottlingExecutor { @Before public void setUp() { clock = Mockito.mock(HighResolutionClock.class); - executor = new ThrottlingExecutor(MAX_NANOS, clock); + executor = Mockito.spy(new ThrottlingExecutor(MAX_NANOS, clock)); } @Test @@ -85,7 +86,7 @@ public class TestThrottlingExecutor { assertEquals(0L, executor.getPendingNanos()); // At 100% work rate sleep should not be called - Mockito.verify(clock, Mockito.never()).sleep(Mockito.anyLong()); + Mockito.verify(executor, Mockito.never()).sleep(Mockito.anyLong()); } @Test @@ -95,7 +96,7 @@ public class TestThrottlingExecutor { final long workTimeNanos = TimeUnit.MILLISECONDS.toNanos(5); setWorkTime(workTimeNanos); // Sleep time is same as work time at 50% work rate - setExpectedAndActualSleepTime(workTimeNanos, workTimeNanos); + setActualSleepTime(workTimeNanos); executor.execute(NO_OP); verifySleepTime(workTimeNanos); @@ -114,7 +115,7 @@ public class TestThrottlingExecutor { final long delayTimeNanos = (long) (workToDelayFactor * workTimeNanos); setWorkTime(workTimeNanos); - setExpectedAndActualSleepTime(delayTimeNanos, delayTimeNanos); + setActualSleepTime(delayTimeNanos); executor.execute(NO_OP); @@ -131,7 +132,7 @@ public class TestThrottlingExecutor { final long actualDelayTimeNanos = TimeUnit.MILLISECONDS.toNanos(6); setWorkTime(workTimeNanos); - setExpectedAndActualSleepTime(expectedDelayNanos, actualDelayTimeNanos); + setActualSleepTime(actualDelayTimeNanos); executor.execute(NO_OP); @@ -148,7 +149,7 @@ public class TestThrottlingExecutor { final long actualDelayNanos = TimeUnit.MILLISECONDS.toNanos(4); setWorkTime(workTimeNanos); - setExpectedAndActualSleepTime(expectedDelayNanos, actualDelayNanos); + setActualSleepTime(actualDelayNanos); executor.execute(NO_OP); @@ -167,7 +168,7 @@ public class TestThrottlingExecutor { // First execution setWorkTime(workTimeNanos); - setExpectedAndActualSleepTime(workTimeNanos, actualDelayNanos1); + setActualSleepTime(actualDelayNanos1); executor.execute(NO_OP); @@ -177,7 +178,7 @@ public class TestThrottlingExecutor { // Second execution setWorkTime(workTimeNanos); - setExpectedAndActualSleepTime(workTimeNanos, actualDelayNanos2); + setActualSleepTime(actualDelayNanos2); executor.execute(NO_OP); @@ -190,12 +191,12 @@ public class TestThrottlingExecutor { final long maxDelayMillis = 10; final long maxDelayNanos = TimeUnit.MILLISECONDS.toNanos(maxDelayMillis); - executor = new ThrottlingExecutor(maxDelayMillis, clock); + executor = Mockito.spy(new ThrottlingExecutor(maxDelayMillis, clock)); executor.setWorkFactor(0.5); // Note work time exceeds maxDelayMillis setWorkTime(TimeUnit.MILLISECONDS.toNanos(100)); - setExpectedAndActualSleepTime(maxDelayNanos, maxDelayNanos); + setActualSleepTime(maxDelayNanos); executor.execute(NO_OP); @@ -221,6 +222,7 @@ public class TestThrottlingExecutor { // At a 50% work factor we'd expect work and sleep to match. As they don't, the function will // try to increment the pending sleep nanos, which could (but should not) result in overflow. setWorkTime(5000); + setActualSleepTime(Long.MAX_VALUE); executor.execute(NO_OP); @@ -241,7 +243,7 @@ public class TestThrottlingExecutor { executor.execute(NO_OP); // Sleep should not be called with negative pending nanos - Mockito.verify(clock, Mockito.never()).sleep(Mockito.anyLong()); + Mockito.verify(executor, Mockito.never()).sleep(Mockito.anyLong()); assertEquals(-1000 + 500, executor.getPendingNanos()); } @@ -253,6 +255,7 @@ public class TestThrottlingExecutor { assertEquals(-1000, executor.getPendingNanos()); setWorkTime(1250); + setActualSleepTime(1250 + startPendingNanos); executor.execute(NO_OP); @@ -264,12 +267,11 @@ public class TestThrottlingExecutor { Mockito.when(clock.nanoTime()).thenReturn(0L).thenReturn(workTimeNanos); } - private void setExpectedAndActualSleepTime(long expectedDelayTimeNanos, long actualDelayTimeNanos) throws InterruptedException { - Mockito.when(clock.sleep(expectedDelayTimeNanos)) - .thenReturn(expectedDelayTimeNanos - actualDelayTimeNanos); + private void setActualSleepTime(long actualDelayTimeNanos) throws InterruptedException { + Mockito.when(executor.sleep(Mockito.anyLong())).thenAnswer(invocation -> (long) invocation.getArguments()[0] - actualDelayTimeNanos); } private void verifySleepTime(long expectedDelayTimeNanos) throws InterruptedException { - Mockito.verify(clock).sleep(expectedDelayTimeNanos); + Mockito.verify(executor).sleep(expectedDelayTimeNanos); } } http://git-wip-us.apache.org/repos/asf/samza/blob/715b67d0/samza-kv/src/main/scala/org/apache/samza/storage/kv/BaseKeyValueStorageEngineFactory.scala ---------------------------------------------------------------------- diff --git a/samza-kv/src/main/scala/org/apache/samza/storage/kv/BaseKeyValueStorageEngineFactory.scala b/samza-kv/src/main/scala/org/apache/samza/storage/kv/BaseKeyValueStorageEngineFactory.scala index c975893..8ffc817 100644 --- a/samza-kv/src/main/scala/org/apache/samza/storage/kv/BaseKeyValueStorageEngineFactory.scala +++ b/samza-kv/src/main/scala/org/apache/samza/storage/kv/BaseKeyValueStorageEngineFactory.scala @@ -28,6 +28,9 @@ import org.apache.samza.serializers.Serde import org.apache.samza.storage.{StoreProperties, StorageEngine, StorageEngineFactory} import org.apache.samza.system.SystemStreamPartition import org.apache.samza.task.MessageCollector +import org.apache.samza.config.MetricsConfig.Config2Metrics +import org.apache.samza.util.HighResolutionClock +import org.apache.samza.util.Util.asScalaClock /** * A key value storage engine factory implementation @@ -132,7 +135,17 @@ trait BaseKeyValueStorageEngineFactory[K, V] extends StorageEngineFactory[K, V] // create the storage engine and return // TODO: Decide if we should use raw bytes when restoring val keyValueStorageEngineMetrics = new KeyValueStorageEngineMetrics(storeName, registry) - new KeyValueStorageEngine(storePropertiesBuilder.build(), nullSafeStore, rawStore, keyValueStorageEngineMetrics, batchSize) + val clock = if (containerContext.config.getMetricsTimerEnabled) { + new HighResolutionClock { + override def nanoTime(): Long = System.nanoTime() + } + } else { + new HighResolutionClock { + override def nanoTime(): Long = 0L + } + } + + new KeyValueStorageEngine(storePropertiesBuilder.build(), nullSafeStore, rawStore, keyValueStorageEngineMetrics, batchSize, clock) } }
