[ 
https://issues.apache.org/jira/browse/APEXCORE-649?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15938145#comment-15938145
 ] 

Tushar Gosavi commented on APEXCORE-649:
----------------------------------------

# Example use case
- One common use case is pushing application metrics to third party monitoring 
systems such as Graphite, OpenTSDB, etc ..., see the `Example Plugin` below how 
this can be achieved.
- Taking decisions based on stats and events, such as kill the application in 
case of container failure, which can be done by monitoring an container killed 
event and then killing the application.

# Writing an Plugin
User can define a Apex Plugin by extending from DAGExecutionPlugin. The 
important methods in the class as as below

- **setup(DAGExecutionPluginContext)**
  In the setup user can register to the interested events the following events 
are supported.
    - *ContainerHeartbeat* - The heartbeat from StreamingContainer as passed to 
the plugin for examination after it has been handled by the application master.
       {code:language=java}
       context.register(HEARTBEAT, new Handler<>(ContainerHeartbeat chb) {
         ...
       });
       {code}

    - *StramEvent* - All Stram events generated by platform can be monitored 
the platform.
       {code:language=java}
       context.register(STRAM_EVENT, new Handler<>(StreamEvent event) {
         ...
       });
      {code}

    - *Committed* - When committed windowId is changed the plugin is notified 
so that    plugin can cleanup cached data if required.
     {code:language=java}
       context.register(COMMIT_EVENT, new Handler<>(Long wid) {
         ...
       });
      {code}

- **teardown()**
  clean additional resources created by the plugin.

The all the handlers should be thread-safe as there is no guarantee that plugin 
execution environment provides for thread safety. Also the plugins should not 
block for long time as it could prevent other plugins from
executing and may result in dropped events in case of full queue.

# Loading of plugin
The plugins are loaded by platform in application master. currently the plugins 
are searched by two methods.

1. Through JavaServiceLoader functionality.
  In this case user can create a jar of his plugin with 
META-INF/services/org.apache.apex.engine.api.DAGExecutionPlugin file in 
resource directory. This file should contain the fully classified name
  of the class of the plugin. (See 
https://docs.oracle.com/javase/tutorial/ext/basics/spi.html)

2. Providing class names of Plugins through application configuration file.
   Alternatively user can provide fully qualified name of the class 
implementing plugin in application configuration file as given below.

    {code:language=java}
    <property>
      <name>apex.plugin.stram.plugins</name>
      <value>{fcn of plugin}</value>
    </property>
   {code}

## Example Plugin.
A sample plugin to push the container free memory metric to Grapite monitoring 
system is given below.

 {code:language=java}
package com.tugo.apex.plugins;

import java.io.IOException;
import java.io.OutputStream;
import java.net.Socket;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;

import org.slf4j.Logger;

import org.apache.apex.engine.api.DAGExecutionPlugin;
import org.apache.apex.engine.api.DAGExecutionPluginContext;

import com.datatorrent.stram.api.StreamingContainerUmbilicalProtocol;

import static org.slf4j.LoggerFactory.getLogger;

public class GraphitePushPlugin implements DAGExecutionPlugin
{
  private static final Logger LOG = getLogger(GraphitePushPlugin.class);

  private DAGExecutionPluginContext context;
  ScheduledExecutorService executorService;
  private String appName;
  private String host;
  private int port;
  private Socket socket;
  private OutputStream output;
  private boolean connected = false;
  @Override
  public void setup(DAGExecutionPluginContext context)
  {
    executorService = Executors.newSingleThreadScheduledExecutor();

    context.register(DAGExecutionPluginContext.HEARTBEAT, new 
DAGExecutionPluginContext.Handler<StreamingContainerUmbilicalProtocol.ContainerHeartbeat>()
    {
      @Override
      public void handle(StreamingContainerUmbilicalProtocol.ContainerHeartbeat 
heartbeat)
      {
        handleHeartbeat(heartbeat);
      }
    });

    appName = context.getApplicationContext().getApplicationName();
    host = context.getLaunchConfig().get("graphite-host");
    port = Integer.parseInt(context.getLaunchConfig().get("graphite-port"));
  }

  synchronized void connect() throws IOException
  {
    if (!connected) {
      socket = new Socket(host, port);
      output = socket.getOutputStream();
      connected = true;
    }
  }

  private void 
handleHeartbeat(StreamingContainerUmbilicalProtocol.ContainerHeartbeat 
heartbeat)
  {
    StringBuilder builder = new StringBuilder(1024);
    
builder.append(appName).append(".").append(heartbeat.getContainerId()).append(".").append("freeMemory").append("
 ")
      .append(heartbeat.memoryMBFree).append(" ").append(heartbeat.sentTms / 
1000).append("\n");
    try {
      connect();
      if (output != null) {
        output.write(builder.toString().getBytes());
      }
    } catch (IOException e) {
      connected = false;
      output = null;
      socket = null;
    }
  }

  @Override
  public void teardown()
  {
    if (socket != null) {
      try {
        if (output != null) {
          output.flush();
        }
        socket.close();
      } catch (IOException e) {
        LOG.warn("error while closing the socket");
      }
    }
  }
}
{code}


> Infrastructure for user define stram event listeners.
> -----------------------------------------------------
>
>                 Key: APEXCORE-649
>                 URL: https://issues.apache.org/jira/browse/APEXCORE-649
>             Project: Apache Apex Core
>          Issue Type: Sub-task
>            Reporter: Tushar Gosavi
>            Assignee: Tushar Gosavi
>
> As  suggested while working on Visitor API, I have came up with following 
> proposal. The idea is to support user defined DAG listeners.  The plan is to
> support limitated set of events for now and we could add more events
> in future.
> For the details functionality propvided check attached document.
> Please provide feedback on provided proposal.
> https://docs.google.com/document/d/1SAIE0EjnCumrB1jKJSnbGvcml47Po8ZHFthfcbNJQgU/edit?usp=sharing



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to