bintocher edited a comment on issue #15066:
URL: https://github.com/apache/airflow/issues/15066#issuecomment-809845930


   np_run_test_task file:
   ` python
   # sudo pip3 install requests_ntlm
   """Example DAG demonstrating the usage of the PythonOperator."""
   import time
   import requests
   import json
   # from pprint import pprint
   from requests_ntlm import HttpNtlmAuth
   from airflow import DAG
   from airflow.operators.python import PythonOperator
   from airflow.utils.dates import days_ago
   
   default_args  = {
       'owner': 'nprinting',
       'depends_on_past': False,
       #'retries': 1,
   }
   # раписание шедулера можно конфигурировать тут: https://crontab.guru/
   # время UTC !!!
   dag = DAG(
       dag_id = 'np_run_test_task_1_3-20_A_A_A',
       default_args = default_args ,
       start_date = days_ago(1) ,
       schedule_interval = '1 3-20 * * *',
       description = 'NPrinting Мониторинг обновления', 
       tags = ['nprinting','apps','мониторинг обновления'],
       catchup = False
   )
   
       # schedule_interval=None,
   
   requests.packages.urllib3.disable_warnings()
   tokenstring = ''
   headers = {"Accept": "application/json",
               "Content-Type": "application/json"}
   
   session = requests.session()
   
   class ConnectnPrinting:
       """
       Instantiates the nPrinting connection class
       """
   
       def __init__(self, server, root = False
           , userid = False, credential = False, password = False):
           """
           Establishes connectivity with Qlik Sense Repository Service
           :param server: servername.domain:4242
           :param userid: user to use for queries
           :param credential: domain\\username for Windows Authentication
           :param password: password of windows credential
           """
           self.server = server
           self.root = root
           self.credential = credential
           self.password = password
   
       def get(self,endpoint):
           if self.credential is not False:
               global tokenstring
               session.auth = HttpNtlmAuth(self.credential, self.password, 
session)
               headers['User-Agent'] = 'Windows'   
               if tokenstring != '':
                   headers['X-XSRF-TOKEN'] = tokenstring
   
               response = session.get('http://{0}/{1}'.format (self.server, 
endpoint),
                                           headers=headers, verify=self.root)
               tokenstring = (session.cookies['NPWEBCONSOLE_XSRF-TOKEN'])
               return response.content
   
       def post(self, endpoint, data=None):
           global tokenstring
           if tokenstring != '':
                   headers['X-XSRF-TOKEN'] = tokenstring
           response = session.post('http://{0}/{1}'.format (self.server, 
endpoint),
                                       headers=headers, verify=self.root, 
data=data)
           return response.status_code
   
       def auth(self):
           path = 'login/ntlm'
           return json.loads(self.get(path))
   
       def run_task(self, task_id = None):
           path = 'tasks'
           global tokenstring
           if tokenstring != '':
                   headers['X-XSRF-TOKEN'] = tokenstring
           urltorun = 'http://{0}/{1}/{2}/executions'.format (self.server, 
path,task_id)
           response = session.post(urltorun, headers=headers, verify=self.root)
           print (response)
   
   # [START howto_operator_python]
   def run_test_task(ds, **kwargs):
       """Print the Airflow context and ds variable from the context."""
       print(ds)
       np = ConnectnPrinting(server='server-qv:4993/api/v1', 
                       credential='user',
                       password='pass')
       # Авторизуемся, и запоминаем TOKEN, с помощью которого потом выполняем 
запросы
       auth = np.auth()
       # print (tokenstring)
       # Запускаем таск по его ID
       np.run_task('bc58a89f-4401-4769-abbd-f515e13d386e')
       return np
   
   run_this = PythonOperator(
       task_id='NPrinting_run_test_task',
       python_callable=run_test_task,
       dag=dag,
   )
   # [END howto_operator_python]
   
   
   
       `


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to