SAMZA-863: Multithreading support in Samza
Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/e5f31c57 Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/e5f31c57 Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/e5f31c57 Branch: refs/heads/master Commit: e5f31c57c957e6a38f566d864d0e5acffba0327d Parents: 9396ee5 Author: Xinyu Liu <[email protected]> Authored: Tue Jul 19 11:53:44 2016 -0700 Committer: Yi Pan (Data Infrastructure) <[email protected]> Committed: Tue Jul 19 11:53:44 2016 -0700 ---------------------------------------------------------------------- checkstyle/import-control.xml | 10 +- .../org/apache/samza/task/AsyncStreamTask.java | 60 ++ .../org/apache/samza/task/TaskCallback.java | 38 ++ .../apache/samza/container/RunLoopFactory.java | 112 ++++ .../org/apache/samza/task/AsyncRunLoop.java | 619 +++++++++++++++++++ .../samza/task/AsyncStreamTaskAdapter.java | 92 +++ .../apache/samza/task/CoordinatorRequests.java | 89 +++ .../apache/samza/task/TaskCallbackFactory.java | 28 + .../org/apache/samza/task/TaskCallbackImpl.java | 104 ++++ .../apache/samza/task/TaskCallbackListener.java | 30 + .../apache/samza/task/TaskCallbackManager.java | 141 +++++ .../task/TaskCallbackTimeoutException.java | 42 ++ .../main/java/org/apache/samza/util/Utils.java | 59 ++ .../apache/samza/checkpoint/OffsetManager.scala | 38 +- .../org/apache/samza/config/JobConfig.scala | 11 + .../org/apache/samza/config/TaskConfig.scala | 11 + .../org/apache/samza/container/RunLoop.scala | 92 +-- .../apache/samza/container/SamzaContainer.scala | 195 ++++-- .../samza/container/SamzaContainerMetrics.scala | 2 + .../apache/samza/container/TaskInstance.scala | 44 +- .../samza/container/TaskInstanceMetrics.scala | 2 + .../samza/coordinator/JobCoordinator.scala | 5 +- .../apache/samza/system/SystemConsumers.scala | 31 +- .../org/apache/samza/task/TestAsyncRunLoop.java | 333 ++++++++++ .../samza/task/TestAsyncStreamAdapter.java | 141 +++++ .../samza/task/TestCoordinatorRequests.java | 93 +++ .../apache/samza/task/TestTaskCallbackImpl.java | 125 ++++ .../samza/task/TestTaskCallbackManager.java | 141 +++++ .../apache/samza/container/TestRunLoop.scala | 64 +- .../samza/container/TestSamzaContainer.scala | 6 +- .../samza/container/TestTaskInstance.scala | 6 +- .../samza/system/TestSystemConsumers.scala | 36 +- .../samza/system/hdfs/HdfsSystemProducer.scala | 70 ++- .../migration/KafkaCheckpointMigration.scala | 1 + .../system/kafka/KafkaSystemProducer.scala | 215 ++++--- .../kafka/TestKafkaSystemProducerJava.java | 4 +- .../system/kafka/TestKafkaSystemProducer.scala | 17 +- .../kv/inmemory/InMemoryKeyValueStore.scala | 6 +- .../samza/storage/kv/RocksDbKeyValueStore.scala | 3 - .../storage/kv/TestRocksDbKeyValueStore.scala | 59 +- .../apache/samza/storage/kv/CachedStore.scala | 194 +++--- .../samza/storage/kv/TestCachedStore.scala | 15 - .../samza/storage/kv/TestKeyValueStores.scala | 191 +++++- 43 files changed, 3098 insertions(+), 477 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/e5f31c57/checkstyle/import-control.xml ---------------------------------------------------------------------- diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml index 325c381..c85dc94 100644 --- a/checkstyle/import-control.xml +++ b/checkstyle/import-control.xml @@ -33,6 +33,7 @@ <allow pkg="org.apache.commons" /> <allow class="scala.collection.JavaConversions" /> <allow class="scala.collection.JavaConverters" /> + <allow pkg="scala.runtime" /> <subpackage name="config"> <allow class="org.apache.samza.SamzaException" /> @@ -133,6 +134,10 @@ <allow pkg="org.apache.samza.container" /> <allow pkg="org.apache.samza.metrics" /> <allow pkg="org.apache.samza.system" /> + <allow pkg="org.apache.samza.util" /> + <allow pkg="org.apache.samza.checkpoint" /> + <allow class="org.apache.samza.SamzaException" /> + <allow class="org.apache.samza.Partition" /> </subpackage> <subpackage name="container"> @@ -142,7 +147,10 @@ <allow pkg="org.apache.samza.util" /> <allow pkg="junit.framework" /> <allow class="org.apache.samza.coordinator.stream.AbstractCoordinatorStreamManager" /> - + <allow class="org.apache.samza.SamzaException" /> + <allow pkg="org.apache.samza.system" /> + <allow pkg="org.apache.samza.task" /> + <allow pkg="org.apache.samza.util" /> <subpackage name="grouper"> <subpackage name="stream"> <allow pkg="org.apache.samza.system" /> http://git-wip-us.apache.org/repos/asf/samza/blob/e5f31c57/samza-api/src/main/java/org/apache/samza/task/AsyncStreamTask.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/task/AsyncStreamTask.java b/samza-api/src/main/java/org/apache/samza/task/AsyncStreamTask.java new file mode 100644 index 0000000..684ba0b --- /dev/null +++ b/samza-api/src/main/java/org/apache/samza/task/AsyncStreamTask.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.task; + +import org.apache.samza.system.IncomingMessageEnvelope; + +/** + * An AsyncStreamTask is the basic class to support multithreading execution in Samza container. Itâs provided for better + * parallelism and resource utilization. This class allows task to make asynchronous calls and fire callbacks upon completion. + * Similar to {@link StreamTask}, an AsyncStreamTask may be augmented by implementing other interfaces, such as + * {@link InitableTask}, {@link WindowableTask}, or {@link ClosableTask}. The following invariants hold with these mix-ins: + * + * InitableTask.init - always the first method invoked on an AsyncStreamTask. It happens-before every subsequent + * invocation on AsyncStreamTask (for happens-before semantics, see https://docs.oracle.com/javase/tutorial/essential/concurrency/memconsist.html). + * + * CloseableTask.close - always the last method invoked on an AsyncStreamTask and all other AsyncStreamTask are guaranteed + * to happen-before it. + * + * AsyncStreamTask.processAsync - can run in either a serialized or parallel mode. In the serialized mode (task.process.max.inflight.messages=1), + * each invocation of processAsync is guaranteed to happen-before the next. In a parallel execution mode (task.process.max.inflight.messages>1), + * there is no such happens-before constraint and the AsyncStreamTask is required to coordinate any shared state. + * + * WindowableTask.window - in either above mode, it is called when no invocations to processAsync are pending and no new + * processAsync invocations can be scheduled until it completes. Therefore, a guarantee that all previous processAsync invocations + * happen before an invocation of WindowableTask.window. An invocation to WindowableTask.window is guaranteed to happen-before + * any subsequent processAsync invocations. The Samza engine is responsible for ensuring that window is invoked in a timely manner. + * + * Similar to WindowableTask.window, commits are guaranteed to happen only when there are no pending processAsync or WindowableTask.window + * invocations. All preceding invocations happen-before commit and commit happens-before all subsequent invocations. + */ +public interface AsyncStreamTask { + /** + * Called once for each message that this AsyncStreamTask receives. + * @param envelope Contains the received deserialized message and key, and also information regarding the stream and + * partition of which the message was received from. + * @param collector Contains the means of sending message envelopes to the output stream. The collector must only + * be used during the current call to the process method; you should not reuse the collector between invocations + * of this method. + * @param coordinator Manages execution of tasks. + * @param callback Triggers the completion of the process. + */ + void processAsync(IncomingMessageEnvelope envelope, MessageCollector collector, TaskCoordinator coordinator, TaskCallback callback); +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/samza/blob/e5f31c57/samza-api/src/main/java/org/apache/samza/task/TaskCallback.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/task/TaskCallback.java b/samza-api/src/main/java/org/apache/samza/task/TaskCallback.java new file mode 100644 index 0000000..8ba7a36 --- /dev/null +++ b/samza-api/src/main/java/org/apache/samza/task/TaskCallback.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.task; + +/** + * A TaskCallback is fired by a {@link AsyncStreamTask} to notify when an asynchronous + * process has completed. If the callback is fired multiple times, it will throw IllegalStateException. + */ +public interface TaskCallback { + + /** + * Invoke when the asynchronous process completed with success. + */ + void complete(); + + /** + * Invoke when the asynchronous process failed with an error. + * @param t error throwable + */ + void failure(Throwable t); +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/samza/blob/e5f31c57/samza-core/src/main/java/org/apache/samza/container/RunLoopFactory.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/container/RunLoopFactory.java b/samza-core/src/main/java/org/apache/samza/container/RunLoopFactory.java new file mode 100644 index 0000000..a789d04 --- /dev/null +++ b/samza-core/src/main/java/org/apache/samza/container/RunLoopFactory.java @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.container; + +import java.util.concurrent.Executor; +import java.util.concurrent.ExecutorService; +import org.apache.samza.SamzaException; +import org.apache.samza.config.TaskConfig; +import org.apache.samza.system.SystemConsumers; +import org.apache.samza.task.AsyncRunLoop; +import org.apache.samza.task.AsyncStreamTask; +import org.apache.samza.task.StreamTask; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import scala.collection.JavaConversions; +import scala.runtime.AbstractFunction1; + +import static org.apache.samza.util.Utils.defaultValue; +import static org.apache.samza.util.Utils.defaultClock; + +/** + * Factory class to create runloop for a Samza task, based on the type + * of the task + */ +public class RunLoopFactory { + private static final Logger log = LoggerFactory.getLogger(RunLoopFactory.class); + + private static final long DEFAULT_WINDOW_MS = -1L; + private static final long DEFAULT_COMMIT_MS = 60000L; + private static final long DEFAULT_CALLBACK_TIMEOUT_MS = -1L; + + public static Runnable createRunLoop(scala.collection.immutable.Map<TaskName, TaskInstance<?>> taskInstances, + SystemConsumers consumerMultiplexer, + ExecutorService threadPool, + Executor executor, + SamzaContainerMetrics containerMetrics, + TaskConfig config) { + + long taskWindowMs = config.getWindowMs().getOrElse(defaultValue(DEFAULT_WINDOW_MS)); + + log.info("Got window milliseconds: " + taskWindowMs); + + long taskCommitMs = config.getCommitMs().getOrElse(defaultValue(DEFAULT_COMMIT_MS)); + + log.info("Got commit milliseconds: " + taskCommitMs); + + int asyncTaskCount = taskInstances.values().count(new AbstractFunction1<TaskInstance<?>, Object>() { + @Override + public Boolean apply(TaskInstance<?> t) { + return t.isAsyncTask(); + } + }); + + // asyncTaskCount should be either 0 or the number of all taskInstances + if (asyncTaskCount > 0 && asyncTaskCount < taskInstances.size()) { + throw new SamzaException("Mixing StreamTask and AsyncStreamTask is not supported"); + } + + if (asyncTaskCount == 0) { + log.info("Run loop in single thread mode."); + + scala.collection.immutable.Map<TaskName, TaskInstance<StreamTask>> streamTaskInstances = (scala.collection.immutable.Map) taskInstances; + return new RunLoop( + streamTaskInstances, + consumerMultiplexer, + containerMetrics, + taskWindowMs, + taskCommitMs, + defaultClock(), + executor); + } else { + Integer taskMaxConcurrency = config.getMaxConcurrency().getOrElse(defaultValue(1)); + + log.info("Got max messages in flight: " + taskMaxConcurrency); + + Long callbackTimeout = config.getCallbackTimeoutMs().getOrElse(defaultValue(DEFAULT_CALLBACK_TIMEOUT_MS)); + + log.info("Got callback timeout: " + callbackTimeout); + + scala.collection.immutable.Map<TaskName, TaskInstance<AsyncStreamTask>> asyncStreamTaskInstances = (scala.collection.immutable.Map) taskInstances; + + log.info("Run loop in asynchronous mode."); + + return new AsyncRunLoop( + JavaConversions.asJavaMap(asyncStreamTaskInstances), + threadPool, + consumerMultiplexer, + taskMaxConcurrency, + taskWindowMs, + taskCommitMs, + callbackTimeout, + containerMetrics); + } + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/e5f31c57/samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java b/samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java new file mode 100644 index 0000000..a510bb0 --- /dev/null +++ b/samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java @@ -0,0 +1,619 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.task; + +import java.util.ArrayDeque; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import org.apache.samza.SamzaException; +import org.apache.samza.container.SamzaContainerMetrics; +import org.apache.samza.container.TaskInstance; +import org.apache.samza.container.TaskInstanceMetrics; +import org.apache.samza.container.TaskName; +import org.apache.samza.system.IncomingMessageEnvelope; +import org.apache.samza.system.SystemConsumers; +import org.apache.samza.system.SystemStreamPartition; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import scala.collection.JavaConversions; + + +/** + * The AsyncRunLoop supports multithreading execution of Samza {@link AsyncStreamTask}s. + */ +public class AsyncRunLoop implements Runnable { + private static final Logger log = LoggerFactory.getLogger(AsyncRunLoop.class); + + private final Map<TaskName, AsyncTaskWorker> taskWorkers; + private final SystemConsumers consumerMultiplexer; + private final Map<SystemStreamPartition, List<AsyncTaskWorker>> sspToTaskWorkerMapping; + private final ExecutorService threadPool; + private final CoordinatorRequests coordinatorRequests; + private final Object latch; + private final int maxConcurrency; + private final long windowMs; + private final long commitMs; + private final long callbackTimeoutMs; + private final SamzaContainerMetrics containerMetrics; + private final ScheduledExecutorService workerTimer; + private final ScheduledExecutorService callbackTimer; + private volatile boolean shutdownNow = false; + private volatile Throwable throwable = null; + + public AsyncRunLoop(Map<TaskName, TaskInstance<AsyncStreamTask>> taskInstances, + ExecutorService threadPool, + SystemConsumers consumerMultiplexer, + int maxConcurrency, + long windowMs, + long commitMs, + long callbackTimeoutMs, + SamzaContainerMetrics containerMetrics) { + + this.threadPool = threadPool; + this.consumerMultiplexer = consumerMultiplexer; + this.containerMetrics = containerMetrics; + this.windowMs = windowMs; + this.commitMs = commitMs; + this.maxConcurrency = maxConcurrency; + this.callbackTimeoutMs = callbackTimeoutMs; + this.callbackTimer = (callbackTimeoutMs > 0) ? Executors.newSingleThreadScheduledExecutor() : null; + this.coordinatorRequests = new CoordinatorRequests(taskInstances.keySet()); + this.latch = new Object(); + this.workerTimer = Executors.newSingleThreadScheduledExecutor(); + Map<TaskName, AsyncTaskWorker> workers = new HashMap<>(); + for (TaskInstance<AsyncStreamTask> task : taskInstances.values()) { + workers.put(task.taskName(), new AsyncTaskWorker(task)); + } + // Partions and tasks assigned to the container will not change during the run loop life time + this.taskWorkers = Collections.unmodifiableMap(workers); + this.sspToTaskWorkerMapping = Collections.unmodifiableMap(getSspToAsyncTaskWorkerMap(taskInstances, taskWorkers)); + } + + /** + * Returns mapping of the SystemStreamPartition to the AsyncTaskWorkers to efficiently route the envelopes + */ + private static Map<SystemStreamPartition, List<AsyncTaskWorker>> getSspToAsyncTaskWorkerMap( + Map<TaskName, TaskInstance<AsyncStreamTask>> taskInstances, Map<TaskName, AsyncTaskWorker> taskWorkers) { + Map<SystemStreamPartition, List<AsyncTaskWorker>> sspToWorkerMap = new HashMap<>(); + for (TaskInstance<AsyncStreamTask> task : taskInstances.values()) { + Set<SystemStreamPartition> ssps = JavaConversions.asJavaSet(task.systemStreamPartitions()); + for (SystemStreamPartition ssp : ssps) { + if (sspToWorkerMap.get(ssp) == null) { + sspToWorkerMap.put(ssp, new ArrayList<AsyncTaskWorker>()); + } + sspToWorkerMap.get(ssp).add(taskWorkers.get(task.taskName())); + } + } + return sspToWorkerMap; + } + + /** + * The run loop chooses messages from the SystemConsumers, and run the ready tasks asynchronously. + * Window and commit are run in a thread pool, and they are mutual exclusive with task process. + * The loop thread will block if all tasks are busy, and resume if any task finishes. + */ + @Override + public void run() { + try { + for (AsyncTaskWorker taskWorker : taskWorkers.values()) { + taskWorker.init(); + } + + long prevNs = System.nanoTime(); + + while (!shutdownNow) { + if (throwable != null) { + log.error("Caught throwable and stopping run loop", throwable); + throw new SamzaException(throwable); + } + + long startNs = System.nanoTime(); + + IncomingMessageEnvelope envelope = chooseEnvelope(); + + long chooseNs = System.nanoTime(); + + containerMetrics.chooseNs().update(chooseNs - startNs); + + runTasks(envelope); + + long blockNs = System.nanoTime(); + + blockIfBusy(envelope); + + long currentNs = System.nanoTime(); + long activeNs = blockNs - chooseNs; + long totalNs = currentNs - prevNs; + prevNs = currentNs; + containerMetrics.blockNs().update(currentNs - blockNs); + containerMetrics.utilization().set(((double) activeNs) / totalNs); + } + } finally { + workerTimer.shutdown(); + if (callbackTimer != null) callbackTimer.shutdown(); + } + } + + public void shutdown() { + shutdownNow = true; + } + + /** + * Chooses an envelope from messageChooser without updating it. This enables flow control + * on the SSP level, meaning the task will not get further messages for the SSP if it cannot + * process it. The chooser is updated only after the callback to process is invoked, then the task + * is able to process more messages. This flow control does not block. so in case of empty message chooser, + * it will return null immediately without blocking, and the chooser will not poll the underlying system + * consumer since there are still messages in the SystemConsumers buffer. + */ + private IncomingMessageEnvelope chooseEnvelope() { + IncomingMessageEnvelope envelope = consumerMultiplexer.choose(false); + if (envelope != null) { + log.trace("Choose envelope ssp {} offset {} for processing", envelope.getSystemStreamPartition(), envelope.getOffset()); + containerMetrics.envelopes().inc(); + } else { + log.trace("No envelope is available"); + containerMetrics.nullEnvelopes().inc(); + } + return envelope; + } + + /** + * Insert the envelope into the task pending queues and run all the tasks + */ + private void runTasks(IncomingMessageEnvelope envelope) { + if (envelope != null) { + PendingEnvelope pendingEnvelope = new PendingEnvelope(envelope); + for (AsyncTaskWorker worker : sspToTaskWorkerMapping.get(envelope.getSystemStreamPartition())) { + worker.state.insertEnvelope(pendingEnvelope); + } + } + + for (AsyncTaskWorker worker: taskWorkers.values()) { + worker.run(); + } + } + + /** + * Block the runloop thread if all tasks are busy. Due to limitation of non-blocking for the flow control, + * we block the run loop when there are no runnable tasks, or all tasks are idle (no pending messages) while + * chooser is empty too. When a task worker finishes or window/commit completes, it will resume the runloop. + */ + private void blockIfBusy(IncomingMessageEnvelope envelope) { + synchronized (latch) { + while (!shutdownNow && throwable == null) { + for (AsyncTaskWorker worker : taskWorkers.values()) { + if (worker.state.isReady() && (envelope != null || worker.state.hasPendingOps())) { + // should continue running since the worker state is ready and there is either new message + // or some pending operations for the worker + return; + } + } + + try { + log.trace("Block loop thread"); + + if (envelope == null) { + // If the envelope is null then we will wait for a poll interval, otherwise next choose() will + // return null immediately and we will have a busy loop + latch.wait(consumerMultiplexer.pollIntervalMs()); + return; + } else { + latch.wait(); + } + } catch (InterruptedException e) { + throw new SamzaException("Run loop is interrupted", e); + } + } + } + } + + /** + * Resume the runloop thread. It is triggered once a task becomes ready again or has failure. + */ + private void resume() { + log.trace("Resume loop thread"); + if (coordinatorRequests.shouldShutdownNow() && coordinatorRequests.commitRequests().isEmpty()) { + shutdownNow = true; + } + synchronized (latch) { + latch.notifyAll(); + } + } + + /** + * Set the throwable and abort run loop. The throwable will be thrown from the run loop thread + * @param t throwable + */ + private void abort(Throwable t) { + throwable = t; + } + + /** + * PendingEnvenlope contains an envelope that is not processed by this task, and + * a flag indicating whether it has been processed by any tasks. + */ + private static final class PendingEnvelope { + private final IncomingMessageEnvelope envelope; + private boolean processed = false; + + PendingEnvelope(IncomingMessageEnvelope envelope) { + this.envelope = envelope; + } + + /** + * Returns true if the envelope has not been processed. + */ + private boolean markProcessed() { + boolean oldValue = processed; + processed = true; + return !oldValue; + } + } + + + private enum WorkerOp { + WINDOW, + COMMIT, + PROCESS, + NO_OP + } + + /** + * The AsyncTaskWorker encapsulates the states of an {@link AsyncStreamTask}. If the task becomes ready, it + * will run the task asynchronously. It runs window and commit in the provided thread pool. + */ + private class AsyncTaskWorker implements TaskCallbackListener { + private final TaskInstance<AsyncStreamTask> task; + private final TaskCallbackManager callbackManager; + private volatile AsyncTaskState state; + + AsyncTaskWorker(TaskInstance<AsyncStreamTask> task) { + this.task = task; + this.callbackManager = new TaskCallbackManager(this, task.metrics(), callbackTimer, callbackTimeoutMs); + this.state = new AsyncTaskState(task.taskName(), task.metrics()); + } + + private void init() { + // schedule the timer for windowing and commiting + if (task.isWindowableTask() && windowMs > 0L) { + workerTimer.scheduleAtFixedRate(new Runnable() { + @Override + public void run() { + log.trace("Task {} need window", task.taskName()); + state.needWindow(); + resume(); + } + }, windowMs, windowMs, TimeUnit.MILLISECONDS); + } + + if (commitMs > 0L) { + workerTimer.scheduleAtFixedRate(new Runnable() { + @Override + public void run() { + log.trace("Task {} need commit", task.taskName()); + state.needCommit(); + resume(); + } + }, commitMs, commitMs, TimeUnit.MILLISECONDS); + } + } + + /** + * Invoke next task operation based on its state + */ + private void run() { + switch (state.nextOp()) { + case PROCESS: + process(); + break; + case WINDOW: + window(); + break; + case COMMIT: + commit(); + break; + default: + //no op + break; + } + } + + /** + * Process asynchronously. The callback needs to be fired once the processing is done. + */ + private void process() { + final IncomingMessageEnvelope envelope = state.fetchEnvelope(); + log.trace("Process ssp {} offset {}", envelope.getSystemStreamPartition(), envelope.getOffset()); + + final ReadableCoordinator coordinator = new ReadableCoordinator(task.taskName()); + TaskCallbackFactory callbackFactory = new TaskCallbackFactory() { + @Override + public TaskCallback createCallback() { + state.startProcess(); + containerMetrics.processes().inc(); + return callbackManager.createCallback(task.taskName(), envelope, coordinator); + } + }; + + task.process(envelope, coordinator, callbackFactory); + } + + /** + * Invoke window. Run window in thread pool if not the single thread mode. + */ + private void window() { + state.startWindow(); + Runnable windowWorker = new Runnable() { + @Override + public void run() { + try { + containerMetrics.windows().inc(); + + ReadableCoordinator coordinator = new ReadableCoordinator(task.taskName()); + long startTime = System.nanoTime(); + task.window(coordinator); + containerMetrics.windowNs().update(System.nanoTime() - startTime); + coordinatorRequests.update(coordinator); + + state.doneWindowOrCommit(); + } catch (Throwable t) { + log.error("Task {} window failed", task.taskName(), t); + abort(t); + } finally { + log.trace("Task {} window completed", task.taskName()); + resume(); + } + } + }; + + if (threadPool != null) { + log.trace("Task {} window on the thread pool", task.taskName()); + threadPool.submit(windowWorker); + } else { + log.trace("Task {} window on the run loop thread", task.taskName()); + windowWorker.run(); + } + } + + /** + * Invoke commit. Run commit in thread pool if not the single thread mode. + */ + private void commit() { + state.startCommit(); + Runnable commitWorker = new Runnable() { + @Override + public void run() { + try { + containerMetrics.commits().inc(); + + long startTime = System.nanoTime(); + task.commit(); + containerMetrics.commitNs().update(System.nanoTime() - startTime); + + state.doneWindowOrCommit(); + } catch (Throwable t) { + log.error("Task {} commit failed", task.taskName(), t); + abort(t); + } finally { + log.trace("Task {} commit completed", task.taskName()); + resume(); + } + } + }; + + if (threadPool != null) { + log.trace("Task {} commits on the thread pool", task.taskName()); + threadPool.submit(commitWorker); + } else { + log.trace("Task {} commits on the run loop thread", task.taskName()); + commitWorker.run(); + } + } + + + + /** + * Task process completes successfully, update the offsets based on the high-water mark. + * Then it will trigger the listener for task state change. + * * @param callback AsyncSteamTask.processAsync callback + */ + @Override + public void onComplete(TaskCallback callback) { + try { + state.doneProcess(); + TaskCallbackImpl callbackImpl = (TaskCallbackImpl) callback; + containerMetrics.processNs().update(System.nanoTime() - callbackImpl.timeCreatedNs); + log.trace("Got callback complete for task {}, ssp {}", callbackImpl.taskName, callbackImpl.envelope.getSystemStreamPartition()); + + TaskCallbackImpl callbackToUpdate = callbackManager.updateCallback(callbackImpl, true); + if (callbackToUpdate != null) { + IncomingMessageEnvelope envelope = callbackToUpdate.envelope; + log.trace("Update offset for ssp {}, offset {}", envelope.getSystemStreamPartition(), envelope.getOffset()); + + // update offset + task.offsetManager().update(task.taskName(), envelope.getSystemStreamPartition(), envelope.getOffset()); + + // update coordinator + coordinatorRequests.update(callbackToUpdate.coordinator); + } + } catch (Throwable t) { + log.error(t.getMessage(), t); + abort(t); + } finally { + resume(); + } + } + + /** + * Task process fails. Trigger the listener indicating failure. + * @param callback AsyncSteamTask.processAsync callback + * @param t throwable of the failure + */ + @Override + public void onFailure(TaskCallback callback, Throwable t) { + try { + state.doneProcess(); + abort(t); + // update pending count, but not offset + TaskCallbackImpl callbackImpl = (TaskCallbackImpl) callback; + callbackManager.updateCallback(callbackImpl, false); + log.error("Got callback failure for task {}", callbackImpl.taskName); + } catch (Throwable e) { + log.error(e.getMessage(), e); + } finally { + resume(); + } + } + } + + + /** + * AsyncTaskState manages the state of the AsyncStreamTask. In summary, a worker has the following states: + * ready - ready for window, commit or process next incoming message. + * busy - doing window, commit or not able to process next message. + * idle - no pending messages, and no window/commit + */ + private final class AsyncTaskState { + private volatile boolean needWindow = false; + private volatile boolean needCommit = false; + private volatile boolean windowOrCommitInFlight = false; + private final AtomicInteger messagesInFlight = new AtomicInteger(0); + private final ArrayDeque<PendingEnvelope> pendingEnvelopQueue; + private final TaskName taskName; + private final TaskInstanceMetrics taskMetrics; + + AsyncTaskState(TaskName taskName, TaskInstanceMetrics taskMetrics) { + this.taskName = taskName; + this.taskMetrics = taskMetrics; + this.pendingEnvelopQueue = new ArrayDeque<>(); + } + + /** + * Returns whether the task is ready to do process/window/commit. + */ + private boolean isReady() { + needCommit |= coordinatorRequests.commitRequests().remove(taskName); + if (needWindow || needCommit) { + // ready for window or commit only when no messages are in progress and + // no window/commit in flight + return messagesInFlight.get() == 0 && !windowOrCommitInFlight; + } else { + // ready for process only when the inflight message count does not exceed threshold + // and no window/commit in flight + return messagesInFlight.get() < maxConcurrency && !windowOrCommitInFlight; + } + } + + private boolean hasPendingOps() { + return !pendingEnvelopQueue.isEmpty() || needCommit || needWindow; + } + + /** + * Returns the next operation by this taskWorker + */ + private WorkerOp nextOp() { + if (isReady()) { + if (needCommit) return WorkerOp.COMMIT; + else if (needWindow) return WorkerOp.WINDOW; + else if (!pendingEnvelopQueue.isEmpty()) return WorkerOp.PROCESS; + } + return WorkerOp.NO_OP; + } + + private void needWindow() { + needWindow = true; + } + + private void needCommit() { + needCommit = true; + } + + private void startWindow() { + needWindow = false; + windowOrCommitInFlight = true; + } + + private void startCommit() { + needCommit = false; + windowOrCommitInFlight = true; + } + + private void startProcess() { + messagesInFlight.incrementAndGet(); + } + + private void doneWindowOrCommit() { + windowOrCommitInFlight = false; + } + + private void doneProcess() { + messagesInFlight.decrementAndGet(); + } + + /** + * Insert an PendingEnvelope into the pending envelope queue. + * The function will be called in the run loop thread so no synchronization. + * @param pendingEnvelope + */ + private void insertEnvelope(PendingEnvelope pendingEnvelope) { + pendingEnvelopQueue.add(pendingEnvelope); + int queueSize = pendingEnvelopQueue.size(); + taskMetrics.pendingMessages().set(queueSize); + log.trace("Insert envelope to task {} queue.", taskName); + log.debug("Task {} pending envelope count is {} after insertion.", taskName, queueSize); + } + + /** + * Fetch the pending envelope in the pending queue for the task to process. + * Update the chooser for flow control on the SSP level. Once it's updated, the AsyncRunLoop + * will be able to choose new messages from this SSP for the task to process. Note that we + * update only when the envelope is first time being processed. This solves the issue in + * Broadcast stream where a message need to be processed by multiple tasks. In that case, + * the envelope will be in the pendingEnvelopeQueue of each task. Only the first fetch updates + * the chooser with the next envelope in the broadcast stream partition. + * The function will be called in the run loop thread so no synchronization. + * @return + */ + private IncomingMessageEnvelope fetchEnvelope() { + PendingEnvelope pendingEnvelope = pendingEnvelopQueue.remove(); + int queueSize = pendingEnvelopQueue.size(); + taskMetrics.pendingMessages().set(queueSize); + log.trace("fetch envelope ssp {} offset {} to process.", pendingEnvelope.envelope.getSystemStreamPartition(), pendingEnvelope.envelope.getOffset()); + log.debug("Task {} pending envelopes count is {} after fetching.", taskName, queueSize); + + if (pendingEnvelope.markProcessed()) { + SystemStreamPartition partition = pendingEnvelope.envelope.getSystemStreamPartition(); + consumerMultiplexer.tryUpdate(partition); + log.debug("Update chooser for " + partition); + } + return pendingEnvelope.envelope; + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/samza/blob/e5f31c57/samza-core/src/main/java/org/apache/samza/task/AsyncStreamTaskAdapter.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/task/AsyncStreamTaskAdapter.java b/samza-core/src/main/java/org/apache/samza/task/AsyncStreamTaskAdapter.java new file mode 100644 index 0000000..1fc6456 --- /dev/null +++ b/samza-core/src/main/java/org/apache/samza/task/AsyncStreamTaskAdapter.java @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.task; + +import java.util.concurrent.ExecutorService; +import org.apache.samza.config.Config; +import org.apache.samza.system.IncomingMessageEnvelope; + + +/** + * AsyncStreamTaskAdapter allows a StreamTask to be executed in parallel.The class + * uses the build-in thread pool to invoke StreamTask.process and triggers + * the callbacks once it's done. If the thread pool is null, it follows the legacy + * synchronous model to execute the tasks on the run loop thread. + */ +public class AsyncStreamTaskAdapter implements AsyncStreamTask, InitableTask, WindowableTask, ClosableTask { + private final StreamTask wrappedTask; + private final ExecutorService executor; + + public AsyncStreamTaskAdapter(StreamTask task, ExecutorService executor) { + this.wrappedTask = task; + this.executor = executor; + } + + @Override + public void init(Config config, TaskContext context) throws Exception { + if (wrappedTask instanceof InitableTask) { + ((InitableTask) wrappedTask).init(config, context); + } + } + + @Override + public void processAsync(final IncomingMessageEnvelope envelope, + final MessageCollector collector, + final TaskCoordinator coordinator, + final TaskCallback callback) { + if (executor != null) { + executor.submit(new Runnable() { + @Override + public void run() { + process(envelope, collector, coordinator, callback); + } + }); + } else { + // legacy mode: running all tasks in the runloop thread + process(envelope, collector, coordinator, callback); + } + } + + private void process(IncomingMessageEnvelope envelope, + MessageCollector collector, + TaskCoordinator coordinator, + TaskCallback callback) { + try { + wrappedTask.process(envelope, collector, coordinator); + callback.complete(); + } catch (Throwable t) { + callback.failure(t); + } + } + + @Override + public void window(MessageCollector collector, TaskCoordinator coordinator) throws Exception { + if (wrappedTask instanceof WindowableTask) { + ((WindowableTask) wrappedTask).window(collector, coordinator); + } + } + + @Override + public void close() throws Exception { + if (wrappedTask instanceof ClosableTask) { + ((ClosableTask) wrappedTask).close(); + } + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/e5f31c57/samza-core/src/main/java/org/apache/samza/task/CoordinatorRequests.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/task/CoordinatorRequests.java b/samza-core/src/main/java/org/apache/samza/task/CoordinatorRequests.java new file mode 100644 index 0000000..052b3b9 --- /dev/null +++ b/samza-core/src/main/java/org/apache/samza/task/CoordinatorRequests.java @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.task; + +import java.util.Collections; +import java.util.HashSet; +import java.util.Set; + +import org.apache.samza.container.TaskName; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * TaskCoordinatorRequests is used in run loop to collect the coordinator + * requests from tasks, including commit requests and shutdown requests. + * It is thread safe so it can be updated from multiple task threads. + */ +public class CoordinatorRequests { + private static final Logger log = LoggerFactory.getLogger(CoordinatorRequests.class); + + private final Set<TaskName> taskNames; + private final Set<TaskName> taskShutdownRequests = Collections.synchronizedSet(new HashSet<TaskName>()); + private final Set<TaskName> taskCommitRequests = Collections.synchronizedSet(new HashSet<TaskName>()); + volatile private boolean shutdownNow = false; + + public CoordinatorRequests(Set<TaskName> taskNames) { + this.taskNames = taskNames; + } + + public void update(ReadableCoordinator coordinator) { + if (coordinator.commitRequest().isDefined() || coordinator.shutdownRequest().isDefined()) { + checkCoordinator(coordinator); + } + } + + public Set<TaskName> commitRequests() { + return taskCommitRequests; + } + + public boolean shouldShutdownNow() { + return shutdownNow; + } + + /** + * A new TaskCoordinator object is passed to a task on every call to StreamTask.process + * and WindowableTask.window. This method checks whether the task requested that we + * do something that affects the run loop (such as commit or shut down), and updates + * run loop state accordingly. + */ + private void checkCoordinator(ReadableCoordinator coordinator) { + if (coordinator.requestedCommitTask()) { + log.info("Task " + coordinator.taskName() + " requested commit for current task only"); + taskCommitRequests.add(coordinator.taskName()); + } + + if (coordinator.requestedCommitAll()) { + log.info("Task " + coordinator.taskName() + " requested commit for all tasks in the container"); + taskCommitRequests.addAll(taskNames); + } + + if (coordinator.requestedShutdownOnConsensus()) { + taskShutdownRequests.add(coordinator.taskName()); + log.info("Shutdown has now been requested by tasks " + taskShutdownRequests); + } + + if (coordinator.requestedShutdownNow() || taskShutdownRequests.size() == taskNames.size()) { + log.info("Shutdown requested."); + shutdownNow = true; + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/samza/blob/e5f31c57/samza-core/src/main/java/org/apache/samza/task/TaskCallbackFactory.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/task/TaskCallbackFactory.java b/samza-core/src/main/java/org/apache/samza/task/TaskCallbackFactory.java new file mode 100644 index 0000000..7dddb67 --- /dev/null +++ b/samza-core/src/main/java/org/apache/samza/task/TaskCallbackFactory.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.task; + +/** + * TaskCallbackFactory creates the {@link TaskCallback} for {@link org.apache.samza.container.TaskInstance} + * to process asynchronously + */ +public interface TaskCallbackFactory { + TaskCallback createCallback(); +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/samza/blob/e5f31c57/samza-core/src/main/java/org/apache/samza/task/TaskCallbackImpl.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/task/TaskCallbackImpl.java b/samza-core/src/main/java/org/apache/samza/task/TaskCallbackImpl.java new file mode 100644 index 0000000..9b70099 --- /dev/null +++ b/samza-core/src/main/java/org/apache/samza/task/TaskCallbackImpl.java @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.task; + +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.atomic.AtomicBoolean; +import org.apache.samza.container.TaskName; +import org.apache.samza.system.IncomingMessageEnvelope; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * This class implements {@link TaskCallback}. It triggers the + * {@link TaskCallbackListener} with the callback result. If the + * callback is called multiple times, it will throw IllegalStateException + * to the listener. + */ +class TaskCallbackImpl implements TaskCallback, Comparable<TaskCallbackImpl> { + private static final Logger log = LoggerFactory.getLogger(TaskCallbackImpl.class); + + final TaskName taskName; + final IncomingMessageEnvelope envelope; + final ReadableCoordinator coordinator; + final long timeCreatedNs; + private final AtomicBoolean isComplete = new AtomicBoolean(false); + private final TaskCallbackListener listener; + private ScheduledFuture scheduledFuture = null; + private final long seqNum; + + public TaskCallbackImpl(TaskCallbackListener listener, + TaskName taskName, + IncomingMessageEnvelope envelope, + ReadableCoordinator coordinator, + long seqNum) { + this.listener = listener; + this.taskName = taskName; + this.envelope = envelope; + this.coordinator = coordinator; + this.seqNum = seqNum; + this.timeCreatedNs = System.nanoTime(); + } + + @Override + public void complete() { + if (scheduledFuture != null) { + scheduledFuture.cancel(true); + } + log.trace("Callback complete for ssp {} offset {}.", envelope.getSystemStreamPartition(), envelope.getOffset()); + + if (isComplete.compareAndSet(false, true)) { + listener.onComplete(this); + } else { + Throwable throwable = new IllegalStateException("TaskCallback complete has been invoked after completion"); + log.error("Callback for process task {}, envelope {}.", new Object[] {taskName, envelope}, throwable); + listener.onFailure(this, throwable); + } + } + + @Override + public void failure(Throwable t) { + if (scheduledFuture != null) { + scheduledFuture.cancel(true); + } + log.error("Callback fails for task {} envelope {}.", new Object[] {taskName, envelope}, t); + + if (isComplete.compareAndSet(false, true)) { + listener.onFailure(this, t); + } else { + Throwable throwable = new IllegalStateException("TaskCallback failure has been invoked after completion", t); + log.error("Callback for process task {}, envelope {}.", new Object[] {taskName, envelope}, throwable); + listener.onFailure(this, throwable); + } + } + + void setScheduledFuture(ScheduledFuture scheduledFuture) { + this.scheduledFuture = scheduledFuture; + } + + @Override + public int compareTo(TaskCallbackImpl callback) { + return Long.compare(this.seqNum, callback.seqNum); + } + + boolean matchSeqNum(long seqNum) { + return this.seqNum == seqNum; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/samza/blob/e5f31c57/samza-core/src/main/java/org/apache/samza/task/TaskCallbackListener.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/task/TaskCallbackListener.java b/samza-core/src/main/java/org/apache/samza/task/TaskCallbackListener.java new file mode 100644 index 0000000..de4ee58 --- /dev/null +++ b/samza-core/src/main/java/org/apache/samza/task/TaskCallbackListener.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.task; + +/** + * The interface of the listener to the {@link AsyncStreamTask}.processAsync + * callback events. If the callback completes with success, onComplete() will be fired. + * If the callback fails, onFailure() will be fired. + */ +interface TaskCallbackListener { + void onComplete(TaskCallback callback); + void onFailure(TaskCallback callback, Throwable t); +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/samza/blob/e5f31c57/samza-core/src/main/java/org/apache/samza/task/TaskCallbackManager.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/task/TaskCallbackManager.java b/samza-core/src/main/java/org/apache/samza/task/TaskCallbackManager.java new file mode 100644 index 0000000..132cf59 --- /dev/null +++ b/samza-core/src/main/java/org/apache/samza/task/TaskCallbackManager.java @@ -0,0 +1,141 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.task; + +import java.util.PriorityQueue; +import java.util.Queue; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import org.apache.samza.container.TaskInstanceMetrics; +import org.apache.samza.container.TaskName; +import org.apache.samza.system.IncomingMessageEnvelope; + + +/** + * TaskCallbackManager manages the life cycle of {@link AsyncStreamTask} callbacks, + * including creation, update and status. Internally it maintains a PriorityQueue + * for the callbacks based on the sequence number, and updates the offsets for checkpointing + * by always moving forward to the latest contiguous callback (uses the high watermark). + */ +class TaskCallbackManager { + + private static final class TaskCallbacks { + private final Queue<TaskCallbackImpl> callbacks = new PriorityQueue<>(); + private final Object lock = new Object(); + private long nextSeqNum = 0L; + + /** + * Adding the newly complete callback to the callback queue + * Move the queue to the last contiguous callback to commit offset + * @param cb new callback completed + * @return callback of highest watermark needed to be committed + */ + TaskCallbackImpl update(TaskCallbackImpl cb) { + synchronized (lock) { + callbacks.add(cb); + + TaskCallbackImpl callback = null; + TaskCallbackImpl callbackToCommit = null; + TaskCoordinator.RequestScope shutdownRequest = null; + // look for the last contiguous callback + while (!callbacks.isEmpty() && callbacks.peek().matchSeqNum(nextSeqNum)) { + ++nextSeqNum; + callback = callbacks.poll(); + + if (callback.coordinator.commitRequest().isDefined()) { + callbackToCommit = callback; + } + + if (callback.coordinator.shutdownRequest().isDefined()) { + shutdownRequest = callback.coordinator.shutdownRequest().get(); + } + } + + // if there is no manual commit, use the highest contiguous callback message offset + if (callbackToCommit == null) { + callbackToCommit = callback; + } + + // if there is a shutdown request, merge it into the coordinator to commit + if (shutdownRequest != null) { + callbackToCommit.coordinator.shutdown(shutdownRequest); + } + + return callbackToCommit; + } + } + } + + private long seqNum = 0L; + private final AtomicInteger pendingCount = new AtomicInteger(0); + private final TaskCallbacks completeCallbacks = new TaskCallbacks(); + private final TaskInstanceMetrics metrics; + private final ScheduledExecutorService timer; + private final TaskCallbackListener listener; + private long timeout; + + public TaskCallbackManager(TaskCallbackListener listener, TaskInstanceMetrics metrics, ScheduledExecutorService timer, long timeout) { + this.listener = listener; + this.metrics = metrics; + this.timer = timer; + this.timeout = timeout; + } + + public TaskCallbackImpl createCallback(TaskName taskName, + IncomingMessageEnvelope envelope, + ReadableCoordinator coordinator) { + final TaskCallbackImpl callback = new TaskCallbackImpl(listener, taskName, envelope, coordinator, seqNum++); + int count = pendingCount.incrementAndGet(); + metrics.messagesInFlight().set(count); + + if (timer != null) { + Runnable timerTask = new Runnable() { + @Override + public void run() { + String msg = "Task " + callback.taskName + " callback times out"; + callback.failure(new TaskCallbackTimeoutException(msg)); + } + }; + ScheduledFuture scheduledFuture = timer.schedule(timerTask, timeout, TimeUnit.MILLISECONDS); + callback.setScheduledFuture(scheduledFuture); + } + + return callback; + } + + /** + * Update the task callbacks with the new callback completed. + * It uses a high-watermark model to roll the callbacks for checkpointing. + * @param callback new completed callback + * @param success callback result status + * @return the callback for checkpointing + */ + public TaskCallbackImpl updateCallback(TaskCallbackImpl callback, boolean success) { + TaskCallbackImpl callbackToCommit = null; + if (success) { + callbackToCommit = completeCallbacks.update(callback); + } + int count = pendingCount.decrementAndGet(); + metrics.messagesInFlight().set(count); + return callbackToCommit; + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/e5f31c57/samza-core/src/main/java/org/apache/samza/task/TaskCallbackTimeoutException.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/task/TaskCallbackTimeoutException.java b/samza-core/src/main/java/org/apache/samza/task/TaskCallbackTimeoutException.java new file mode 100644 index 0000000..bf7f13c --- /dev/null +++ b/samza-core/src/main/java/org/apache/samza/task/TaskCallbackTimeoutException.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.task; + +import org.apache.samza.SamzaException; + + +/** + * Specific {@link SamzaException}s thrown when a task callback times out + */ +public class TaskCallbackTimeoutException extends SamzaException { + private static final long serialVersionUID = -2342134146355610665L; + + public TaskCallbackTimeoutException(Throwable e) { + super(e); + } + + public TaskCallbackTimeoutException(String msg) { + super(msg); + } + + public TaskCallbackTimeoutException(String msg, Throwable e) { + super(msg, e); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/samza/blob/e5f31c57/samza-core/src/main/java/org/apache/samza/util/Utils.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/util/Utils.java b/samza-core/src/main/java/org/apache/samza/util/Utils.java new file mode 100644 index 0000000..472e0a5 --- /dev/null +++ b/samza-core/src/main/java/org/apache/samza/util/Utils.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.util; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import scala.runtime.AbstractFunction0; + + +public class Utils { + private static final Logger log = LoggerFactory.getLogger(Utils.class); + + private Utils() {} + + /** + * Returns a default value object for scala option.getOrDefault() to use + * @param value default value + * @param <T> value type + * @return object containing default value + */ + public static <T> AbstractFunction0<T> defaultValue(final T value) { + return new AbstractFunction0<T>() { + @Override + public T apply() { + return value; + } + }; + } + + /** + * Creates a nanosecond clock using default system nanotime + * @return object invokes the system clock + */ + public static AbstractFunction0<Object> defaultClock() { + return new AbstractFunction0<Object>() { + @Override + public Object apply() { + return System.nanoTime(); + } + }; + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/e5f31c57/samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala b/samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala index 00648e4..7245902 100644 --- a/samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala +++ b/samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala @@ -19,6 +19,9 @@ package org.apache.samza.checkpoint + +import java.util.concurrent.ConcurrentHashMap + import org.apache.samza.system.SystemStream import org.apache.samza.system.SystemStreamPartition import org.apache.samza.system.SystemStreamMetadata @@ -146,7 +149,7 @@ class OffsetManager( /** * Last offsets processed for each SystemStreamPartition. */ - var lastProcessedOffsets = Map[TaskName, Map[SystemStreamPartition, String]]() + val lastProcessedOffsets = new ConcurrentHashMap[TaskName, ConcurrentHashMap[SystemStreamPartition, String]]() /** * Offsets to start reading from for each SystemStreamPartition. This @@ -182,20 +185,15 @@ class OffsetManager( * Set the last processed offset for a given SystemStreamPartition. */ def update(taskName: TaskName, systemStreamPartition: SystemStreamPartition, offset: String) { - lastProcessedOffsets.get(taskName) match { - case Some(sspToOffsets) => lastProcessedOffsets += taskName -> (sspToOffsets + (systemStreamPartition -> offset)) - case None => lastProcessedOffsets += (taskName -> Map(systemStreamPartition -> offset)) - } + lastProcessedOffsets.putIfAbsent(taskName, new ConcurrentHashMap[SystemStreamPartition, String]()) + lastProcessedOffsets.get(taskName).put(systemStreamPartition, offset) } /** * Get the last processed offset for a SystemStreamPartition. */ - def getLastProcessedOffset(taskName: TaskName, systemStreamPartition: SystemStreamPartition) = { - lastProcessedOffsets.get(taskName) match { - case Some(sspToOffsets) => sspToOffsets.get(systemStreamPartition) - case None => None - } + def getLastProcessedOffset(taskName: TaskName, systemStreamPartition: SystemStreamPartition): Option[String] = { + Option(lastProcessedOffsets.get(taskName)).map(_.get(systemStreamPartition)) } /** @@ -217,7 +215,7 @@ class OffsetManager( debug("Checkpointing offsets for taskName %s." format taskName) val sspsForTaskName = systemStreamPartitions.getOrElse(taskName, throw new SamzaException("No such SystemStreamPartition set " + taskName + " registered for this checkpointmanager")).toSet - val partitionOffsets = lastProcessedOffsets.get(taskName) match { + val partitionOffsets = Option(lastProcessedOffsets.get(taskName)) match { case Some(sspToOffsets) => sspToOffsets.filterKeys(sspsForTaskName.contains(_)) case None => { warn(taskName + " is not found... ") @@ -225,8 +223,9 @@ class OffsetManager( } } + partitionOffsets.foreach(p => info("task " + taskName + " checkpoint " + p._1 + ", " + p._2)) checkpointManager.writeCheckpoint(taskName, new Checkpoint(partitionOffsets)) - lastProcessedOffsets.get(taskName) match { + Option(lastProcessedOffsets.get(taskName)) match { case Some(sspToOffsets) => sspToOffsets.foreach { case (ssp, checkpoint) => offsetManagerMetrics.checkpointedOffsets(ssp).set(checkpoint) } case None => } @@ -270,9 +269,8 @@ class OffsetManager( .keys .flatMap(restoreOffsetsFromCheckpoint(_)) .toMap - lastProcessedOffsets ++= result.map { - case (taskName, sspToOffset) => { - taskName -> sspToOffset.filter { + result.map { case (taskName, sspToOffset) => { + lastProcessedOffsets.put(taskName, new ConcurrentHashMap[SystemStreamPartition, String](sspToOffset.filter { case (systemStreamPartition, offset) => val shouldKeep = offsetSettings.contains(systemStreamPartition.getSystemStream) if (!shouldKeep) { @@ -280,7 +278,7 @@ class OffsetManager( } info("Checkpointed offset is currently %s for %s" format (offset, systemStreamPartition)) shouldKeep - } + })) } } } else { @@ -324,17 +322,15 @@ class OffsetManager( } } - lastProcessedOffsets = lastProcessedOffsets.map { - case (taskName, sspToOffsets) => { - taskName -> (sspToOffsets -- systemStreamPartitionsToReset(taskName)) - } + lastProcessedOffsets.keys().foreach { taskName => + lastProcessedOffsets.get(taskName).keySet().removeAll(systemStreamPartitionsToReset(taskName)) } } /** * Returns a map of all SystemStreamPartitions in lastProcessedOffsets that need to be reset */ - private def getSystemStreamPartitionsToReset(taskNameTosystemStreamPartitions: Map[TaskName, Map[SystemStreamPartition, String]]): Map[TaskName, Set[SystemStreamPartition]] = { + private def getSystemStreamPartitionsToReset(taskNameTosystemStreamPartitions: ConcurrentHashMap[TaskName, ConcurrentHashMap[SystemStreamPartition, String]]): Map[TaskName, Set[SystemStreamPartition]] = { taskNameTosystemStreamPartitions.map { case (taskName, sspToOffsets) => { taskName -> (sspToOffsets.filter { http://git-wip-us.apache.org/repos/asf/samza/blob/e5f31c57/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala b/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala index 49b08f6..13b72fa 100644 --- a/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala +++ b/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala @@ -44,6 +44,8 @@ object JobConfig { val SAMZA_FWK_VERSION = "samza.fwk.version" val JOB_COORDINATOR_SYSTEM = "job.coordinator.system" val JOB_CONTAINER_COUNT = "job.container.count" + val jOB_CONTAINER_THREAD_POOL_SIZE = "job.container.thread.pool.size" + val JOB_CONTAINER_SINGLE_THREAD_MODE = "job.container.single.thread.mode" val JOB_REPLICATION_FACTOR = "job.coordinator.replication.factor" val JOB_SEGMENT_BYTES = "job.coordinator.segment.bytes" val SSP_GROUPER_FACTORY = "job.systemstreampartition.grouper.factory" @@ -167,4 +169,13 @@ class JobConfig(config: Config) extends ScalaMapConfig(config) with Logging { def getSSPMatcherConfigJobFactoryRegex = getOrElse(JobConfig.SSP_MATCHER_CONFIG_JOB_FACTORY_REGEX, JobConfig.DEFAULT_SSP_MATCHER_CONFIG_JOB_FACTORY_REGEX) + def getThreadPoolSize = getOption(JobConfig.jOB_CONTAINER_THREAD_POOL_SIZE) match { + case Some(size) => size.toInt + case _ => 0 + } + + def getSingleThreadMode = getOption(JobConfig.JOB_CONTAINER_SINGLE_THREAD_MODE) match { + case Some(mode) => mode.toBoolean + case _ => false + } } http://git-wip-us.apache.org/repos/asf/samza/blob/e5f31c57/samza-core/src/main/scala/org/apache/samza/config/TaskConfig.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/config/TaskConfig.scala b/samza-core/src/main/scala/org/apache/samza/config/TaskConfig.scala index 08a4deb..90c1904 100644 --- a/samza-core/src/main/scala/org/apache/samza/config/TaskConfig.scala +++ b/samza-core/src/main/scala/org/apache/samza/config/TaskConfig.scala @@ -38,6 +38,8 @@ object TaskConfig { val DROP_SERIALIZATION_ERROR = "task.drop.serialization.errors" // define whether drop the messages or not when serialization fails val IGNORED_EXCEPTIONS = "task.ignored.exceptions" // exceptions to ignore in process and window val GROUPER_FACTORY = "task.name.grouper.factory" // class name for task grouper + val MAX_CONCURRENCY = "task.max.concurrency" // max number of concurrent process for a AsyncStreamTask + val CALLBACK_TIMEOUT_MS = "task.callback.timeout.ms" // timeout period for triggering a callback /** * Samza's container polls for more messages under two conditions. The first @@ -117,4 +119,13 @@ class TaskConfig(config: Config) extends ScalaMapConfig(config) with Logging { } } + def getMaxConcurrency: Option[Int] = getOption(TaskConfig.MAX_CONCURRENCY) match { + case Some(count) => Some(count.toInt) + case _ => None + } + + def getCallbackTimeoutMs: Option[Long] = getOption(TaskConfig.CALLBACK_TIMEOUT_MS) match { + case Some(ms) => Some(ms.toLong) + case _ => None + } } http://git-wip-us.apache.org/repos/asf/samza/blob/e5f31c57/samza-core/src/main/scala/org/apache/samza/container/RunLoop.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/container/RunLoop.scala b/samza-core/src/main/scala/org/apache/samza/container/RunLoop.scala index cf05c15..bb2c376 100644 --- a/samza-core/src/main/scala/org/apache/samza/container/RunLoop.scala +++ b/samza-core/src/main/scala/org/apache/samza/container/RunLoop.scala @@ -21,12 +21,18 @@ package org.apache.samza.container import java.util.concurrent.Executor -import org.apache.samza.system.{SystemConsumers, SystemStreamPartition} +import org.apache.samza.system.SystemConsumers +import org.apache.samza.system.SystemStreamPartition +import org.apache.samza.task.CoordinatorRequests import org.apache.samza.task.ReadableCoordinator -import org.apache.samza.util.{Logging, TimerUtils} +import org.apache.samza.task.StreamTask +import org.apache.samza.util.Logging +import org.apache.samza.util.TimerUtils + +import scala.collection.JavaConversions._ /** - * Each {@link SamzaContainer} uses a single-threaded execution model: activities for + * The run loop uses a single-threaded execution model: activities for * all {@link TaskInstance}s within a container are multiplexed onto one execution * thread. Those activities include task callbacks (such as StreamTask.process and * WindowableTask.window), committing checkpoints, etc. @@ -34,31 +40,29 @@ import org.apache.samza.util.{Logging, TimerUtils} * <p>This class manages the execution of that run loop, determining what needs to * be done when. */ -class RunLoop( - val taskInstances: Map[TaskName, TaskInstance], +class RunLoop ( + val taskInstances: Map[TaskName, TaskInstance[StreamTask]], val consumerMultiplexer: SystemConsumers, val metrics: SamzaContainerMetrics, val windowMs: Long = -1, val commitMs: Long = 60000, val clock: () => Long = { System.nanoTime }, - val shutdownMs: Long = 5000, val executor: Executor = new SameThreadExecutor()) extends Runnable with TimerUtils with Logging { private val metricsMsOffset = 1000000L private var lastWindowNs = clock() private var lastCommitNs = clock() private var activeNs = 0L - private var taskShutdownRequests: Set[TaskName] = Set() - private var taskCommitRequests: Set[TaskName] = Set() @volatile private var shutdownNow = false + private val coordinatorRequests: CoordinatorRequests = new CoordinatorRequests(taskInstances.keySet) // Messages come from the chooser with no connection to the TaskInstance they're bound for. // Keep a mapping of SystemStreamPartition to TaskInstance to efficiently route them. val systemStreamPartitionToTaskInstances = getSystemStreamPartitionToTaskInstancesMapping - def getSystemStreamPartitionToTaskInstancesMapping: Map[SystemStreamPartition, List[TaskInstance]] = { + def getSystemStreamPartitionToTaskInstancesMapping: Map[SystemStreamPartition, List[TaskInstance[StreamTask]]] = { // We could just pass in the SystemStreamPartitionMap during construction, but it's safer and cleaner to derive the information directly - def getSystemStreamPartitionToTaskInstance(taskInstance: TaskInstance) = taskInstance.systemStreamPartitions.map(_ -> taskInstance).toMap + def getSystemStreamPartitionToTaskInstance(taskInstance: TaskInstance[StreamTask]) = taskInstance.systemStreamPartitions.map(_ -> taskInstance).toMap taskInstances.values.map { getSystemStreamPartitionToTaskInstance }.flatten.groupBy(_._1).map { case (ssp, ssp2taskInstance) => ssp -> ssp2taskInstance.map(_._2).toList @@ -70,8 +74,6 @@ class RunLoop( * unhandled exception is thrown. */ def run { - addShutdownHook(Thread.currentThread()) - val runTask = new Runnable() { override def run(): Unit = { val loopStartTime = clock() @@ -89,19 +91,8 @@ class RunLoop( } } - private def addShutdownHook(runLoopThread: Thread) { - Runtime.getRuntime().addShutdownHook(new Thread() { - override def run() = { - info("Shutting down, will wait up to %s ms" format shutdownMs) - shutdownNow = true - runLoopThread.join(shutdownMs) - if (runLoopThread.isAlive) { - warn("Did not shut down within %s ms, exiting" format shutdownMs) - } else { - info("Shutdown complete") - } - } - }) + def shutdown = { + shutdownNow = true } /** @@ -115,7 +106,7 @@ class RunLoop( // Exclude choose time from activeNs. Although it includes deserialization time, // it most closely captures idle time. val envelope = updateTimer(metrics.chooseNs) { - consumerMultiplexer.choose + consumerMultiplexer.choose() } activeNs += updateTimerAndGetDuration(metrics.processNs) ((currentTimeNs: Long) => { @@ -128,11 +119,11 @@ class RunLoop( val taskInstances = systemStreamPartitionToTaskInstances(ssp) taskInstances.foreach { taskInstance => - { - val coordinator = new ReadableCoordinator(taskInstance.taskName) - taskInstance.process(envelope, coordinator) - checkCoordinator(coordinator) - } + { + val coordinator = new ReadableCoordinator(taskInstance.taskName) + taskInstance.process(envelope, coordinator) + coordinatorRequests.update(coordinator) + } } } else { trace("No incoming message envelope was available.") @@ -155,7 +146,7 @@ class RunLoop( case (taskName, task) => val coordinator = new ReadableCoordinator(taskName) task.window(coordinator) - checkCoordinator(coordinator) + coordinatorRequests.update(coordinator) } } }) @@ -167,47 +158,20 @@ class RunLoop( private def commit { activeNs += updateTimerAndGetDuration(metrics.commitNs) ((currentTimeNs: Long) => { if (commitMs >= 0 && lastCommitNs + commitMs * metricsMsOffset < currentTimeNs) { - trace("Committing task instances because the commit interval has elapsed.") + info("Committing task instances because the commit interval has elapsed.") lastCommitNs = currentTimeNs metrics.commits.inc taskInstances.values.foreach(_.commit) - } else if (!taskCommitRequests.isEmpty) { + } else if (!coordinatorRequests.commitRequests.isEmpty){ trace("Committing due to explicit commit request.") metrics.commits.inc - taskCommitRequests.foreach(taskName => { + coordinatorRequests.commitRequests.foreach(taskName => { taskInstances(taskName).commit }) } - taskCommitRequests = Set() + shutdownNow |= coordinatorRequests.shouldShutdownNow + coordinatorRequests.commitRequests.clear() }) } - - /** - * A new TaskCoordinator object is passed to a task on every call to StreamTask.process - * and WindowableTask.window. This method checks whether the task requested that we - * do something that affects the run loop (such as commit or shut down), and updates - * run loop state accordingly. - */ - private def checkCoordinator(coordinator: ReadableCoordinator) { - if (coordinator.requestedCommitTask) { - debug("Task %s requested commit for current task only" format coordinator.taskName) - taskCommitRequests += coordinator.taskName - } - - if (coordinator.requestedCommitAll) { - debug("Task %s requested commit for all tasks in the container" format coordinator.taskName) - taskCommitRequests ++= taskInstances.keys - } - - if (coordinator.requestedShutdownOnConsensus) { - taskShutdownRequests += coordinator.taskName - info("Shutdown has now been requested by tasks: %s" format taskShutdownRequests) - } - - if (coordinator.requestedShutdownNow || taskShutdownRequests.size == taskInstances.size) { - info("Shutdown requested.") - shutdownNow = true - } - } }
