Repository: incubator-samza
Updated Branches:
  refs/heads/master 247733e38 -> feba2d03c


SAMZA-437; remove task lifecycle listener


Project: http://git-wip-us.apache.org/repos/asf/incubator-samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-samza/commit/feba2d03
Tree: http://git-wip-us.apache.org/repos/asf/incubator-samza/tree/feba2d03
Diff: http://git-wip-us.apache.org/repos/asf/incubator-samza/diff/feba2d03

Branch: refs/heads/master
Commit: feba2d03c0512832ed0790a0ad46f12eb5fea5d4
Parents: 247733e
Author: Chris Riccomini <[email protected]>
Authored: Mon Nov 3 14:13:58 2014 -0800
Committer: Chris Riccomini <[email protected]>
Committed: Mon Nov 3 14:13:58 2014 -0800

----------------------------------------------------------------------
 .../versioned/container/event-loop.md           | 18 +---
 .../versioned/jobs/configuration-table.html     | 25 ------
 .../java/org/apache/samza/task/TaskContext.java |  2 +-
 .../samza/task/TaskLifecycleListener.java       | 93 --------------------
 .../task/TaskLifecycleListenerFactory.java      | 29 ------
 .../apache/samza/container/SamzaContainer.scala | 21 -----
 .../apache/samza/container/TaskInstance.scala   | 14 ---
 7 files changed, 4 insertions(+), 198 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/feba2d03/docs/learn/documentation/versioned/container/event-loop.md
----------------------------------------------------------------------
diff --git a/docs/learn/documentation/versioned/container/event-loop.md 
b/docs/learn/documentation/versioned/container/event-loop.md
index f0f21b0..1162383 100644
--- a/docs/learn/documentation/versioned/container/event-loop.md
+++ b/docs/learn/documentation/versioned/container/event-loop.md
@@ -39,22 +39,10 @@ The event loop works as follows:
 
 The container does this, in a loop, until it is shut down. Note that although 
there can be multiple task instances within a container (depending on the 
number of input stream partitions), their process() and window() methods are 
all called on the same thread, never concurrently on different threads.
 
-### Lifecycle Listeners
+### Lifecycle
 
-Sometimes, you need to run your own code at specific points in a task's 
lifecycle. For example, you might want to set up some context in the container 
whenever a new message arrives, or perform some operations on startup or 
shutdown.
+The only way in which a developer can hook into a SamzaContainer's lifecycle 
is through the standard InitableTask, ClosableTask, StreamTask, and 
WindowableTask. In cases where pluggable logic needs to be added to wrap a 
StreamTask, the StreamTask can be wrapped by another StreamTask implementation 
that handles the custom logic before calling into the wrapped StreamTask.
 
-To receive notifications when such events happen, you can implement the 
[TaskLifecycleListenerFactory](../api/javadocs/org/apache/samza/task/TaskLifecycleListenerFactory.html)
 interface. It returns a 
[TaskLifecycleListener](../api/javadocs/org/apache/samza/task/TaskLifecycleListener.html),
 whose methods are called by Samza at the appropriate times.
-
-You can then tell Samza to use your lifecycle listener with the following 
properties in your job configuration:
-
-{% highlight jproperties %}
-# Define a listener called "my-listener" by giving the factory class name
-task.lifecycle.listener.my-listener.class=com.example.foo.MyListenerFactory
-
-# Enable it in this job (multiple listeners can be separated by commas)
-task.lifecycle.listeners=my-listener
-{% endhighlight %}
-
-The Samza container creates one instance of your 
[TaskLifecycleListener](../api/javadocs/org/apache/samza/task/TaskLifecycleListener.html).
 If the container has multiple task instances (processing different input 
stream partitions), the beforeInit, afterInit, beforeClose and afterClose 
methods are called for each task instance. The 
[TaskContext](../api/javadocs/org/apache/samza/task/TaskContext.html) argument 
of those methods gives you more information about the partitions.
+A concrete example is a set of StreamTasks that all want to share the same 
try/catch logic in their process() method. A StreamTask can be implemented that 
wraps the original StreamTasks, and surrounds the original process() call with 
the appropriate try/catch logic. For more details, see [this 
discussion](https://issues.apache.org/jira/browse/SAMZA-437).
 
 ## [JMX &raquo;](jmx.html)

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/feba2d03/docs/learn/documentation/versioned/jobs/configuration-table.html
----------------------------------------------------------------------
diff --git a/docs/learn/documentation/versioned/jobs/configuration-table.html 
b/docs/learn/documentation/versioned/jobs/configuration-table.html
index e3251a6..e7a4d3c 100644
--- a/docs/learn/documentation/versioned/jobs/configuration-table.html
+++ b/docs/learn/documentation/versioned/jobs/configuration-table.html
@@ -354,31 +354,6 @@
                 </tr>
 
                 <tr>
-                    <td class="property" 
id="task-lifecycle-listener-class">task.lifecycle.listener.<br><span 
class="listener">listener-name</span>.class</td>
-                    <td class="default"></td>
-                    <td class="description">
-                        Use this property to register a
-                        <a 
href="../container/event-loop.html#lifecycle-listeners">lifecycle listener</a>, 
which can receive
-                        a notification when a container starts up or shuts 
down, or when a message is processed.
-                        The value is the fully-qualified name of a Java class 
that implements
-                        <a 
href="../api/javadocs/org/apache/samza/task/TaskLifecycleListenerFactory.html">TaskLifecycleListenerFactory</a>.
-                        You can define multiple lifecycle listeners, each with 
a different <span class="listener">listener-name</span>,
-                        and reference them in <a 
href="#task-lifecycle-listeners" class="property">task.lifecycle.listeners</a>.
-                    </td>
-                </tr>
-
-                <tr>
-                    <td class="property" 
id="task-lifecycle-listeners">task.lifecycle.listeners</td>
-                    <td class="default"></td>
-                    <td class="description">
-                        If you have defined <a 
href="../container/event-loop.html#lifecycle-listeners">lifecycle listeners</a> 
with
-                        <a href="#task-lifecycle-listener-class" 
class="property">task.lifecycle.listener.*.class</a>,
-                        you need to list them here in order to enable them. 
The value of this property is a
-                        comma-separated list of <span 
class="listener">listener-name</span> tokens.
-                    </td>
-                </tr>
-
-                <tr>
                     <td class="property" 
id="task-drop-deserialization-errors">task.drop.deserialization.errors</td>
                     <td class="default"></td>
                     <td class="description">

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/feba2d03/samza-api/src/main/java/org/apache/samza/task/TaskContext.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/task/TaskContext.java 
b/samza-api/src/main/java/org/apache/samza/task/TaskContext.java
index 35de8cc..6d10212 100644
--- a/samza-api/src/main/java/org/apache/samza/task/TaskContext.java
+++ b/samza-api/src/main/java/org/apache/samza/task/TaskContext.java
@@ -27,7 +27,7 @@ import java.util.Set;
 
 /**
  * A TaskContext provides resources about the {@link 
org.apache.samza.task.StreamTask}, particularly during
- * initialization in an {@link org.apache.samza.task.InitableTask} and during 
calls to {@link org.apache.samza.task.TaskLifecycleListener}s.
+ * initialization in an {@link org.apache.samza.task.InitableTask}.
  */
 public interface TaskContext {
   MetricsRegistry getMetricsRegistry();

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/feba2d03/samza-api/src/main/java/org/apache/samza/task/TaskLifecycleListener.java
----------------------------------------------------------------------
diff --git 
a/samza-api/src/main/java/org/apache/samza/task/TaskLifecycleListener.java 
b/samza-api/src/main/java/org/apache/samza/task/TaskLifecycleListener.java
deleted file mode 100644
index 55524a1..0000000
--- a/samza-api/src/main/java/org/apache/samza/task/TaskLifecycleListener.java
+++ /dev/null
@@ -1,93 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.task;
-
-import org.apache.samza.config.Config;
-import org.apache.samza.system.IncomingMessageEnvelope;
-
-/**
- * Used to get before/after notifications before initializing/closing all tasks
- * in a given container (JVM/process).
- */
-public interface TaskLifecycleListener {
-  /**
-   * Called before all tasks in TaskRunner are initialized.
-   * 
-   * @param config
-   *          Config for the Samza job.
-   * @param context
-   *          TaskContext for the StreamTask that's being initialized.
-   */
-  void beforeInit(Config config, TaskContext context);
-
-  /**
-   * Called after all tasks in TaskRunner are initialized.
-   * 
-   * @param config
-   *          Config for the Samza job.
-   * @param context
-   *          TaskContext for the StreamTask that's being initialized.
-   */
-  void afterInit(Config config, TaskContext context);
-
-  /**
-   * Called before a message is processed by a task.
-   * 
-   * @param envelope
-   *          The envelope to be processed by the StreamTask.
-   * @param config
-   *          Config for the Samza job.
-   * @param context
-   *          TaskContext for the StreamTask that's about to process a message.
-   */
-  void beforeProcess(IncomingMessageEnvelope envelope, Config config, 
TaskContext context);
-
-  /**
-   * Called after a message is processed by a task.
-   * 
-   * @param envelope
-   *          The envelope that was processed by the StreamTask.
-   * @param config
-   *          Config for the Samza job.
-   * @param context
-   *          TaskContext for the StreamTask that just processed a message.
-   */
-  void afterProcess(IncomingMessageEnvelope envelope, Config config, 
TaskContext context);
-
-  /**
-   * Called before all tasks in TaskRunner are closed.
-   * 
-   * @param config
-   *          Config for the Samza job.
-   * @param context
-   *          TaskContext for the StreamTask that's about to be shutdown.
-   */
-  void beforeClose(Config config, TaskContext context);
-
-  /**
-   * Called after all tasks in TaskRunner are closed.
-   * 
-   * @param config
-   *          Config for the Samza job.
-   * @param context
-   *          TaskContext for the StreamTask that was just shutdown.
-   */
-  void afterClose(Config config, TaskContext context);
-}

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/feba2d03/samza-api/src/main/java/org/apache/samza/task/TaskLifecycleListenerFactory.java
----------------------------------------------------------------------
diff --git 
a/samza-api/src/main/java/org/apache/samza/task/TaskLifecycleListenerFactory.java
 
b/samza-api/src/main/java/org/apache/samza/task/TaskLifecycleListenerFactory.java
deleted file mode 100644
index 5ed7054..0000000
--- 
a/samza-api/src/main/java/org/apache/samza/task/TaskLifecycleListenerFactory.java
+++ /dev/null
@@ -1,29 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.task;
-
-import org.apache.samza.config.Config;
-
-/**
- * Build a {@link org.apache.samza.task.TaskLifecycleListener}
- */
-public interface TaskLifecycleListenerFactory {
-  TaskLifecycleListener getLifecyleListener(String name, Config config);
-}

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/feba2d03/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala 
b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
index 5885a88..2b53440 100644
--- a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
+++ b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
@@ -54,8 +54,6 @@ import org.apache.samza.system.chooser.MessageChooserFactory
 import org.apache.samza.system.chooser.RoundRobinChooserFactory
 import org.apache.samza.task.StreamTask
 import org.apache.samza.task.TaskInstanceCollector
-import org.apache.samza.task.TaskLifecycleListener
-import org.apache.samza.task.TaskLifecycleListenerFactory
 import org.apache.samza.util.Logging
 import org.apache.samza.util.Util
 import scala.collection.JavaConversions._
@@ -340,24 +338,6 @@ object SamzaContainer extends Logging {
       metrics = systemProducersMetrics,
       dropSerializationError = dropSerializationError)
 
-    val listeners = config.getLifecycleListeners match {
-      case Some(listeners) => {
-        listeners.split(",").map(listenerName => {
-          info("Loading lifecycle listener: %s" format listenerName)
-
-          val listenerClassName = 
config.getLifecycleListenerClass(listenerName).getOrElse(throw new 
SamzaException("Referencing missing listener %s in config" format listenerName))
-
-          Util.getObj[TaskLifecycleListenerFactory](listenerClassName)
-            .getLifecyleListener(listenerName, config)
-        }).toList
-      }
-      case _ => {
-        info("No lifecycle listeners found")
-
-        List[TaskLifecycleListener]()
-      }
-    }
-
     // TODO not sure how we should make this config based, or not. Kind of
     // strange, since it has some dynamic directories when used with YARN.
     val storeBaseDir = new File(System.getProperty("user.dir"), "state")
@@ -481,7 +461,6 @@ object SamzaContainer extends Logging {
         offsetManager = offsetManager,
         storageManager = storageManager,
         reporters = reporters,
-        listeners = listeners,
         systemStreamPartitions = systemStreamPartitions,
         exceptionHandler = TaskInstanceExceptionHandler(taskInstanceMetrics, 
config))
 

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/feba2d03/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala 
b/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
index 66f7dbe..327299b 100644
--- a/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
+++ b/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
@@ -32,7 +32,6 @@ import org.apache.samza.task.TaskContext
 import org.apache.samza.task.ClosableTask
 import org.apache.samza.task.InitableTask
 import org.apache.samza.task.WindowableTask
-import org.apache.samza.task.TaskLifecycleListener
 import org.apache.samza.task.StreamTask
 import org.apache.samza.task.ReadableCoordinator
 import org.apache.samza.task.TaskInstanceCollector
@@ -50,7 +49,6 @@ class TaskInstance(
   offsetManager: OffsetManager = new OffsetManager,
   storageManager: TaskStorageManager = null,
   reporters: Map[String, MetricsReporter] = Map(),
-  listeners: Seq[TaskLifecycleListener] = Seq(),
   val systemStreamPartitions: Set[SystemStreamPartition] = Set(),
   val exceptionHandler: TaskInstanceExceptionHandler = new 
TaskInstanceExceptionHandler) extends Logging {
   val isInitableTask = task.isInstanceOf[InitableTask]
@@ -92,8 +90,6 @@ class TaskInstance(
   }
 
   def initTask {
-    listeners.foreach(_.beforeInit(config, context))
-
     if (isInitableTask) {
       debug("Initializing task for taskName: %s" format taskName)
 
@@ -101,8 +97,6 @@ class TaskInstance(
     } else {
       debug("Skipping task initialization for taskName: %s" format taskName)
     }
-
-    listeners.foreach(_.afterInit(config, context))
   }
 
   def registerProducers {
@@ -129,16 +123,12 @@ class TaskInstance(
   def process(envelope: IncomingMessageEnvelope, coordinator: 
ReadableCoordinator) {
     metrics.processes.inc
 
-    listeners.foreach(_.beforeProcess(envelope, config, context))
-
     trace("Processing incoming message envelope for taskName and SSP: %s, %s" 
format (taskName, envelope.getSystemStreamPartition))
 
     exceptionHandler.maybeHandle {
       task.process(envelope, collector, coordinator)
     }
 
-    listeners.foreach(_.afterProcess(envelope, config, context))
-
     trace("Updating offset map for taskName, SSP and offset: %s, %s, %s" 
format (taskName, envelope.getSystemStreamPartition, envelope.getOffset))
 
     offsetManager.update(envelope.getSystemStreamPartition, envelope.getOffset)
@@ -173,8 +163,6 @@ class TaskInstance(
   }
 
   def shutdownTask {
-    listeners.foreach(_.beforeClose(config, context))
-
     if (task.isInstanceOf[ClosableTask]) {
       debug("Shutting down stream task for taskName: %s" format taskName)
 
@@ -182,8 +170,6 @@ class TaskInstance(
     } else {
       debug("Skipping stream task shutdown for taskName: %s" format taskName)
     }
-
-    listeners.foreach(_.afterClose(config, context))
   }
 
   def shutdownStores {

Reply via email to