Repository: incubator-samza Updated Branches: refs/heads/master 247733e38 -> feba2d03c
SAMZA-437; remove task lifecycle listener Project: http://git-wip-us.apache.org/repos/asf/incubator-samza/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-samza/commit/feba2d03 Tree: http://git-wip-us.apache.org/repos/asf/incubator-samza/tree/feba2d03 Diff: http://git-wip-us.apache.org/repos/asf/incubator-samza/diff/feba2d03 Branch: refs/heads/master Commit: feba2d03c0512832ed0790a0ad46f12eb5fea5d4 Parents: 247733e Author: Chris Riccomini <[email protected]> Authored: Mon Nov 3 14:13:58 2014 -0800 Committer: Chris Riccomini <[email protected]> Committed: Mon Nov 3 14:13:58 2014 -0800 ---------------------------------------------------------------------- .../versioned/container/event-loop.md | 18 +--- .../versioned/jobs/configuration-table.html | 25 ------ .../java/org/apache/samza/task/TaskContext.java | 2 +- .../samza/task/TaskLifecycleListener.java | 93 -------------------- .../task/TaskLifecycleListenerFactory.java | 29 ------ .../apache/samza/container/SamzaContainer.scala | 21 ----- .../apache/samza/container/TaskInstance.scala | 14 --- 7 files changed, 4 insertions(+), 198 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/feba2d03/docs/learn/documentation/versioned/container/event-loop.md ---------------------------------------------------------------------- diff --git a/docs/learn/documentation/versioned/container/event-loop.md b/docs/learn/documentation/versioned/container/event-loop.md index f0f21b0..1162383 100644 --- a/docs/learn/documentation/versioned/container/event-loop.md +++ b/docs/learn/documentation/versioned/container/event-loop.md @@ -39,22 +39,10 @@ The event loop works as follows: The container does this, in a loop, until it is shut down. Note that although there can be multiple task instances within a container (depending on the number of input stream partitions), their process() and window() methods are all called on the same thread, never concurrently on different threads. -### Lifecycle Listeners +### Lifecycle -Sometimes, you need to run your own code at specific points in a task's lifecycle. For example, you might want to set up some context in the container whenever a new message arrives, or perform some operations on startup or shutdown. +The only way in which a developer can hook into a SamzaContainer's lifecycle is through the standard InitableTask, ClosableTask, StreamTask, and WindowableTask. In cases where pluggable logic needs to be added to wrap a StreamTask, the StreamTask can be wrapped by another StreamTask implementation that handles the custom logic before calling into the wrapped StreamTask. -To receive notifications when such events happen, you can implement the [TaskLifecycleListenerFactory](../api/javadocs/org/apache/samza/task/TaskLifecycleListenerFactory.html) interface. It returns a [TaskLifecycleListener](../api/javadocs/org/apache/samza/task/TaskLifecycleListener.html), whose methods are called by Samza at the appropriate times. - -You can then tell Samza to use your lifecycle listener with the following properties in your job configuration: - -{% highlight jproperties %} -# Define a listener called "my-listener" by giving the factory class name -task.lifecycle.listener.my-listener.class=com.example.foo.MyListenerFactory - -# Enable it in this job (multiple listeners can be separated by commas) -task.lifecycle.listeners=my-listener -{% endhighlight %} - -The Samza container creates one instance of your [TaskLifecycleListener](../api/javadocs/org/apache/samza/task/TaskLifecycleListener.html). If the container has multiple task instances (processing different input stream partitions), the beforeInit, afterInit, beforeClose and afterClose methods are called for each task instance. The [TaskContext](../api/javadocs/org/apache/samza/task/TaskContext.html) argument of those methods gives you more information about the partitions. +A concrete example is a set of StreamTasks that all want to share the same try/catch logic in their process() method. A StreamTask can be implemented that wraps the original StreamTasks, and surrounds the original process() call with the appropriate try/catch logic. For more details, see [this discussion](https://issues.apache.org/jira/browse/SAMZA-437). ## [JMX »](jmx.html) http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/feba2d03/docs/learn/documentation/versioned/jobs/configuration-table.html ---------------------------------------------------------------------- diff --git a/docs/learn/documentation/versioned/jobs/configuration-table.html b/docs/learn/documentation/versioned/jobs/configuration-table.html index e3251a6..e7a4d3c 100644 --- a/docs/learn/documentation/versioned/jobs/configuration-table.html +++ b/docs/learn/documentation/versioned/jobs/configuration-table.html @@ -354,31 +354,6 @@ </tr> <tr> - <td class="property" id="task-lifecycle-listener-class">task.lifecycle.listener.<br><span class="listener">listener-name</span>.class</td> - <td class="default"></td> - <td class="description"> - Use this property to register a - <a href="../container/event-loop.html#lifecycle-listeners">lifecycle listener</a>, which can receive - a notification when a container starts up or shuts down, or when a message is processed. - The value is the fully-qualified name of a Java class that implements - <a href="../api/javadocs/org/apache/samza/task/TaskLifecycleListenerFactory.html">TaskLifecycleListenerFactory</a>. - You can define multiple lifecycle listeners, each with a different <span class="listener">listener-name</span>, - and reference them in <a href="#task-lifecycle-listeners" class="property">task.lifecycle.listeners</a>. - </td> - </tr> - - <tr> - <td class="property" id="task-lifecycle-listeners">task.lifecycle.listeners</td> - <td class="default"></td> - <td class="description"> - If you have defined <a href="../container/event-loop.html#lifecycle-listeners">lifecycle listeners</a> with - <a href="#task-lifecycle-listener-class" class="property">task.lifecycle.listener.*.class</a>, - you need to list them here in order to enable them. The value of this property is a - comma-separated list of <span class="listener">listener-name</span> tokens. - </td> - </tr> - - <tr> <td class="property" id="task-drop-deserialization-errors">task.drop.deserialization.errors</td> <td class="default"></td> <td class="description"> http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/feba2d03/samza-api/src/main/java/org/apache/samza/task/TaskContext.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/task/TaskContext.java b/samza-api/src/main/java/org/apache/samza/task/TaskContext.java index 35de8cc..6d10212 100644 --- a/samza-api/src/main/java/org/apache/samza/task/TaskContext.java +++ b/samza-api/src/main/java/org/apache/samza/task/TaskContext.java @@ -27,7 +27,7 @@ import java.util.Set; /** * A TaskContext provides resources about the {@link org.apache.samza.task.StreamTask}, particularly during - * initialization in an {@link org.apache.samza.task.InitableTask} and during calls to {@link org.apache.samza.task.TaskLifecycleListener}s. + * initialization in an {@link org.apache.samza.task.InitableTask}. */ public interface TaskContext { MetricsRegistry getMetricsRegistry(); http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/feba2d03/samza-api/src/main/java/org/apache/samza/task/TaskLifecycleListener.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/task/TaskLifecycleListener.java b/samza-api/src/main/java/org/apache/samza/task/TaskLifecycleListener.java deleted file mode 100644 index 55524a1..0000000 --- a/samza-api/src/main/java/org/apache/samza/task/TaskLifecycleListener.java +++ /dev/null @@ -1,93 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.samza.task; - -import org.apache.samza.config.Config; -import org.apache.samza.system.IncomingMessageEnvelope; - -/** - * Used to get before/after notifications before initializing/closing all tasks - * in a given container (JVM/process). - */ -public interface TaskLifecycleListener { - /** - * Called before all tasks in TaskRunner are initialized. - * - * @param config - * Config for the Samza job. - * @param context - * TaskContext for the StreamTask that's being initialized. - */ - void beforeInit(Config config, TaskContext context); - - /** - * Called after all tasks in TaskRunner are initialized. - * - * @param config - * Config for the Samza job. - * @param context - * TaskContext for the StreamTask that's being initialized. - */ - void afterInit(Config config, TaskContext context); - - /** - * Called before a message is processed by a task. - * - * @param envelope - * The envelope to be processed by the StreamTask. - * @param config - * Config for the Samza job. - * @param context - * TaskContext for the StreamTask that's about to process a message. - */ - void beforeProcess(IncomingMessageEnvelope envelope, Config config, TaskContext context); - - /** - * Called after a message is processed by a task. - * - * @param envelope - * The envelope that was processed by the StreamTask. - * @param config - * Config for the Samza job. - * @param context - * TaskContext for the StreamTask that just processed a message. - */ - void afterProcess(IncomingMessageEnvelope envelope, Config config, TaskContext context); - - /** - * Called before all tasks in TaskRunner are closed. - * - * @param config - * Config for the Samza job. - * @param context - * TaskContext for the StreamTask that's about to be shutdown. - */ - void beforeClose(Config config, TaskContext context); - - /** - * Called after all tasks in TaskRunner are closed. - * - * @param config - * Config for the Samza job. - * @param context - * TaskContext for the StreamTask that was just shutdown. - */ - void afterClose(Config config, TaskContext context); -} http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/feba2d03/samza-api/src/main/java/org/apache/samza/task/TaskLifecycleListenerFactory.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/task/TaskLifecycleListenerFactory.java b/samza-api/src/main/java/org/apache/samza/task/TaskLifecycleListenerFactory.java deleted file mode 100644 index 5ed7054..0000000 --- a/samza-api/src/main/java/org/apache/samza/task/TaskLifecycleListenerFactory.java +++ /dev/null @@ -1,29 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.samza.task; - -import org.apache.samza.config.Config; - -/** - * Build a {@link org.apache.samza.task.TaskLifecycleListener} - */ -public interface TaskLifecycleListenerFactory { - TaskLifecycleListener getLifecyleListener(String name, Config config); -} http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/feba2d03/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala index 5885a88..2b53440 100644 --- a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala +++ b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala @@ -54,8 +54,6 @@ import org.apache.samza.system.chooser.MessageChooserFactory import org.apache.samza.system.chooser.RoundRobinChooserFactory import org.apache.samza.task.StreamTask import org.apache.samza.task.TaskInstanceCollector -import org.apache.samza.task.TaskLifecycleListener -import org.apache.samza.task.TaskLifecycleListenerFactory import org.apache.samza.util.Logging import org.apache.samza.util.Util import scala.collection.JavaConversions._ @@ -340,24 +338,6 @@ object SamzaContainer extends Logging { metrics = systemProducersMetrics, dropSerializationError = dropSerializationError) - val listeners = config.getLifecycleListeners match { - case Some(listeners) => { - listeners.split(",").map(listenerName => { - info("Loading lifecycle listener: %s" format listenerName) - - val listenerClassName = config.getLifecycleListenerClass(listenerName).getOrElse(throw new SamzaException("Referencing missing listener %s in config" format listenerName)) - - Util.getObj[TaskLifecycleListenerFactory](listenerClassName) - .getLifecyleListener(listenerName, config) - }).toList - } - case _ => { - info("No lifecycle listeners found") - - List[TaskLifecycleListener]() - } - } - // TODO not sure how we should make this config based, or not. Kind of // strange, since it has some dynamic directories when used with YARN. val storeBaseDir = new File(System.getProperty("user.dir"), "state") @@ -481,7 +461,6 @@ object SamzaContainer extends Logging { offsetManager = offsetManager, storageManager = storageManager, reporters = reporters, - listeners = listeners, systemStreamPartitions = systemStreamPartitions, exceptionHandler = TaskInstanceExceptionHandler(taskInstanceMetrics, config)) http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/feba2d03/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala b/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala index 66f7dbe..327299b 100644 --- a/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala +++ b/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala @@ -32,7 +32,6 @@ import org.apache.samza.task.TaskContext import org.apache.samza.task.ClosableTask import org.apache.samza.task.InitableTask import org.apache.samza.task.WindowableTask -import org.apache.samza.task.TaskLifecycleListener import org.apache.samza.task.StreamTask import org.apache.samza.task.ReadableCoordinator import org.apache.samza.task.TaskInstanceCollector @@ -50,7 +49,6 @@ class TaskInstance( offsetManager: OffsetManager = new OffsetManager, storageManager: TaskStorageManager = null, reporters: Map[String, MetricsReporter] = Map(), - listeners: Seq[TaskLifecycleListener] = Seq(), val systemStreamPartitions: Set[SystemStreamPartition] = Set(), val exceptionHandler: TaskInstanceExceptionHandler = new TaskInstanceExceptionHandler) extends Logging { val isInitableTask = task.isInstanceOf[InitableTask] @@ -92,8 +90,6 @@ class TaskInstance( } def initTask { - listeners.foreach(_.beforeInit(config, context)) - if (isInitableTask) { debug("Initializing task for taskName: %s" format taskName) @@ -101,8 +97,6 @@ class TaskInstance( } else { debug("Skipping task initialization for taskName: %s" format taskName) } - - listeners.foreach(_.afterInit(config, context)) } def registerProducers { @@ -129,16 +123,12 @@ class TaskInstance( def process(envelope: IncomingMessageEnvelope, coordinator: ReadableCoordinator) { metrics.processes.inc - listeners.foreach(_.beforeProcess(envelope, config, context)) - trace("Processing incoming message envelope for taskName and SSP: %s, %s" format (taskName, envelope.getSystemStreamPartition)) exceptionHandler.maybeHandle { task.process(envelope, collector, coordinator) } - listeners.foreach(_.afterProcess(envelope, config, context)) - trace("Updating offset map for taskName, SSP and offset: %s, %s, %s" format (taskName, envelope.getSystemStreamPartition, envelope.getOffset)) offsetManager.update(envelope.getSystemStreamPartition, envelope.getOffset) @@ -173,8 +163,6 @@ class TaskInstance( } def shutdownTask { - listeners.foreach(_.beforeClose(config, context)) - if (task.isInstanceOf[ClosableTask]) { debug("Shutting down stream task for taskName: %s" format taskName) @@ -182,8 +170,6 @@ class TaskInstance( } else { debug("Skipping stream task shutdown for taskName: %s" format taskName) } - - listeners.foreach(_.afterClose(config, context)) } def shutdownStores {
