[ 
https://issues.apache.org/jira/browse/AIRFLOW-2265?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16417490#comment-16417490
 ] 

Omer Yampel commented on AIRFLOW-2265:
--------------------------------------

This is my current template, which simply allows you to execute a search in 
splunk:

 
{code:python}
import time
import logging

import splunklib.client as client
import splunklib.results as results
from splunklib.binding import HTTPError

from airflow.hooks.base_hook import BaseHook
from airflow.exceptions import AirflowException
from airflow.models import BaseOperator
from airflow.utils.decorators import apply_defaults


class SplunkHook(BaseHook):
    def __init__(self, host, username, password):
        """
        Creates a splunk.client session against the splunk instance provided in 
the host
        parameter using the provided username and password

        :param host: Splunk Server
        :type host: string
        :param username: Splunk password
        :type username: string
        :param password: Splunk username
        :type password: string
        """
        self.client_kwargs = {"host": host, "username": username, "password": 
password}

    def run(self, query, query_kwargs):
        splunk_client = client.connect(**self.client_kwargs)

        if query.find('search ') == -1:
            query = 'search ' + query

        # Make sure the splunk query is valid
        self.validate_spl(query, splunk_client)

        job = self.create_search(query, splunk_client, query_kwargs)
        self.watch_search(job)
        return self.get_results(job)

    def validate_spl(self, query, splunk_client):
        try:
            splunk_client.parse(query, parse_only=True)
        except HTTPError as error:
            raise AirflowException("query '{query}' is 
invalid:\n\t{error}".format(
                query=query, error=error))

    def create_search(self, query, splunk_client, query_kwargs):
        return splunk_client.jobs.create(query, **query_kwargs)

    def watch_search(self, job):
        while True:
            while not job.is_ready():
                pass

            if job["isDone"] == "1":
                break
            time.sleep(2)

        return

    def get_results(self, job):
        out = [result for result in results.ResultsReader(job.results())]
        job.cancel()
        return out


class SplunkSearchOperator(BaseOperator):
    @apply_defaults
    def __init__(self,
                 host,
                 username,
                 password,
                 query,
                 query_kwargs={"exec_mode": "normal"},
                 *args,
                 **kwargs):

        super(SplunkSearchOperator, self).__init__(*args, **kwargs)

        self.hook = SplunkHook(host, username, password)
        self.query = query
        self.query_kwargs = query_kwargs

    def execute(self, **kwargs):
        self.hook.run(self.query, self.query_kwargs)
{code}



> Splunk Hook/Operator
> --------------------
>
>                 Key: AIRFLOW-2265
>                 URL: https://issues.apache.org/jira/browse/AIRFLOW-2265
>             Project: Apache Airflow
>          Issue Type: Improvement
>          Components: hooks, operators
>            Reporter: Omer Yampel
>            Priority: Minor
>
> I'm pretty new to airflow, but I'd like to develop a hook/operator for 
> Splunk. Ideally this would give me the ability to execute a search as a task 
> in a DAG, then use the data fetched from the results in the next tasks.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to