bintocher edited a comment on issue #15066:
URL: https://github.com/apache/airflow/issues/15066#issuecomment-809845930
np_run_test_task file:
` python
# sudo pip3 install requests_ntlm
"""Example DAG demonstrating the usage of the PythonOperator."""
import time
import requests
import json
# from pprint import pprint
from requests_ntlm import HttpNtlmAuth
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.utils.dates import days_ago
default_args = {
'owner': 'nprinting',
'depends_on_past': False,
#'retries': 1,
}
# раписание шедулера можно конфигурировать тут: https://crontab.guru/
# время UTC !!!
dag = DAG(
dag_id = 'np_run_test_task_1_3-20_A_A_A',
default_args = default_args ,
start_date = days_ago(1) ,
schedule_interval = '1 3-20 * * *',
description = 'NPrinting Мониторинг обновления',
tags = ['nprinting','apps','мониторинг обновления'],
catchup = False
)
# schedule_interval=None,
requests.packages.urllib3.disable_warnings()
tokenstring = ''
headers = {"Accept": "application/json",
"Content-Type": "application/json"}
session = requests.session()
class ConnectnPrinting:
"""
Instantiates the nPrinting connection class
"""
def __init__(self, server, root = False
, userid = False, credential = False, password = False):
"""
Establishes connectivity with Qlik Sense Repository Service
:param server: servername.domain:4242
:param userid: user to use for queries
:param credential: domain\\username for Windows Authentication
:param password: password of windows credential
"""
self.server = server
self.root = root
self.credential = credential
self.password = password
def get(self,endpoint):
if self.credential is not False:
global tokenstring
session.auth = HttpNtlmAuth(self.credential, self.password,
session)
headers['User-Agent'] = 'Windows'
if tokenstring != '':
headers['X-XSRF-TOKEN'] = tokenstring
response = session.get('http://{0}/{1}'.format (self.server,
endpoint),
headers=headers, verify=self.root)
tokenstring = (session.cookies['NPWEBCONSOLE_XSRF-TOKEN'])
return response.content
def post(self, endpoint, data=None):
global tokenstring
if tokenstring != '':
headers['X-XSRF-TOKEN'] = tokenstring
response = session.post('http://{0}/{1}'.format (self.server,
endpoint),
headers=headers, verify=self.root,
data=data)
return response.status_code
def auth(self):
path = 'login/ntlm'
return json.loads(self.get(path))
def run_task(self, task_id = None):
path = 'tasks'
global tokenstring
if tokenstring != '':
headers['X-XSRF-TOKEN'] = tokenstring
urltorun = 'http://{0}/{1}/{2}/executions'.format (self.server,
path,task_id)
response = session.post(urltorun, headers=headers, verify=self.root)
print (response)
# [START howto_operator_python]
def run_test_task(ds, **kwargs):
"""Print the Airflow context and ds variable from the context."""
print(ds)
np = ConnectnPrinting(server='server-qv:4993/api/v1',
credential='user',
password='pass')
# Авторизуемся, и запоминаем TOKEN, с помощью которого потом выполняем
запросы
auth = np.auth()
# print (tokenstring)
# Запускаем таск по его ID
np.run_task('bc58a89f-4401-4769-abbd-f515e13d386e')
return np
run_this = PythonOperator(
task_id='NPrinting_run_test_task',
python_callable=run_test_task,
dag=dag,
)
# [END howto_operator_python]
`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]