[
https://issues.apache.org/jira/browse/AIRFLOW-2265?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16417490#comment-16417490
]
Omer Yampel commented on AIRFLOW-2265:
--------------------------------------
This is my current template, which simply allows you to execute a search in
splunk:
{code:python}
import time
import logging
import splunklib.client as client
import splunklib.results as results
from splunklib.binding import HTTPError
from airflow.hooks.base_hook import BaseHook
from airflow.exceptions import AirflowException
from airflow.models import BaseOperator
from airflow.utils.decorators import apply_defaults
class SplunkHook(BaseHook):
def __init__(self, host, username, password):
"""
Creates a splunk.client session against the splunk instance provided in
the host
parameter using the provided username and password
:param host: Splunk Server
:type host: string
:param username: Splunk password
:type username: string
:param password: Splunk username
:type password: string
"""
self.client_kwargs = {"host": host, "username": username, "password":
password}
def run(self, query, query_kwargs):
splunk_client = client.connect(**self.client_kwargs)
if query.find('search ') == -1:
query = 'search ' + query
# Make sure the splunk query is valid
self.validate_spl(query, splunk_client)
job = self.create_search(query, splunk_client, query_kwargs)
self.watch_search(job)
return self.get_results(job)
def validate_spl(self, query, splunk_client):
try:
splunk_client.parse(query, parse_only=True)
except HTTPError as error:
raise AirflowException("query '{query}' is
invalid:\n\t{error}".format(
query=query, error=error))
def create_search(self, query, splunk_client, query_kwargs):
return splunk_client.jobs.create(query, **query_kwargs)
def watch_search(self, job):
while True:
while not job.is_ready():
pass
if job["isDone"] == "1":
break
time.sleep(2)
return
def get_results(self, job):
out = [result for result in results.ResultsReader(job.results())]
job.cancel()
return out
class SplunkSearchOperator(BaseOperator):
@apply_defaults
def __init__(self,
host,
username,
password,
query,
query_kwargs={"exec_mode": "normal"},
*args,
**kwargs):
super(SplunkSearchOperator, self).__init__(*args, **kwargs)
self.hook = SplunkHook(host, username, password)
self.query = query
self.query_kwargs = query_kwargs
def execute(self, **kwargs):
self.hook.run(self.query, self.query_kwargs)
{code}
> Splunk Hook/Operator
> --------------------
>
> Key: AIRFLOW-2265
> URL: https://issues.apache.org/jira/browse/AIRFLOW-2265
> Project: Apache Airflow
> Issue Type: Improvement
> Components: hooks, operators
> Reporter: Omer Yampel
> Priority: Minor
>
> I'm pretty new to airflow, but I'd like to develop a hook/operator for
> Splunk. Ideally this would give me the ability to execute a search as a task
> in a DAG, then use the data fetched from the results in the next tasks.
>
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)