Chikkl commented on issue #15607:
URL: https://github.com/apache/airflow/issues/15607#issuecomment-1546903925

   Please help me
   
   `import airflow
   from datetime import timedelta
   from airflow import DAG
   from airflow.operators.postgres_operator import PostgresOperator
   from airflow.utils.dates import days_ago
   import pandas as pd
   from datetime import datetime, timedelta
   from airflow.operators.python import PythonOperator, BranchPythonOperator
   from random import randint
   from airflow.operators.bash import BashOperator
   import requests
   import json
   import time
   import os
   from datetime import datetime
   from datetime import timedelta
   import psycopg2
   import pandas as pd
   from sqlalchemy import create_engine, VARCHAR, INT, Boolean, ARRAY
   
   
   args={'owner': 'airflow'}
   headers = {"useragent": "Mozilla/5.0 (Windows; U; MSIE 9.0; Windows NT 9.0; 
en-US)"}
   
   create_table_sql_query = """CREATE TABLE IF NOT EXISTS vacaniec(
                       id VARCHAR(9),
                       premium Boolean,
                       name VARCHAR(100),
                       area VARCHAR(50),
                       salary INT,
                       type VARCHAR(50),
                       address VARCHAR(300),
                       metro VARCHAR(40),
                       experience VARCHAR(50),
                       schedule VARCHAR(50),
                       employment VARCHAR(50),
                       department VARCHAR(100),
                       contacts VARCHAR(100),
                       key_skills VARCHAR[],
                       professional_roles VARCHAR(100),
                       employer VARCHAR(200), 
                       url VARCHAR(300),
                       published_at VARCHAR(100)
                       )"""
   
   
   def get_id_appended_vac():
       qq = "select id from vacaniec"
       df = pd.read_sql(qq)
       if df.empty:
           return []
       
       return set(i[0] for i in df.values)
   
   
   def change_time(time):
       return "T".join(time.split())+"+1000"
   
   def date_to(time=False):
       if time:
           time = datetime.strptime(" ".join(time[:19].split("T")), '%Y-%m-%d 
%H:%M:%S')
           return change_time(str(time + timedelta(minutes=30)))
       else:
           return change_time(str(datetime.now()).split(".")[0])
   
   def date_from():
       return change_time(str(datetime.now() - 
timedelta(minutes=30)).split(".")[0])
   
   
   class ParserRU():
       """Класс представляет необходимый набор функционала для сбора данных 
через API с сайта hh.ru
       Каждая функция содержит краткое описание того, что она делает."""
       
       PROFESSIONAL_ROLES = ['156', '160', '10', '12', '150', '25', '165', 
'34', '36', '73', '155', '96',
                                 '164', '104', '157', '107', '112', '113', 
'148', '114', '116', '121', '124', '125', '126']
       ALL_NEED_ROLES = ('156', '160', '10', '12', '150', '25', '165', '34', 
'36', '73', '155', '96',
                                 '164', '104', '157', '107', '112', '113', 
'148', '114', '116', '121', '124', '125', '126')
       EXP = ["noExperience", "between1And3", "between3And6", "moreThan6"]
   
       vacaniecc = []
   
       @staticmethod
       def get_time(t = time.localtime()) -> str:
           """Функция возвращает текущее время"""
           current_time = time.strftime("%H : %M : %S", t)
           return current_time
   
       @staticmethod
       def get_areas() -> list:
           """Функция для получения всех зон для парсинга (включает в себя все 
субъекты РФ и два города федерального значения [Москва, Санкт-Петербург])"""
           req = requests.get('https://api.hh.ru/areas', headers=headers)
           req.close()
           js_obj = json.loads(req.content.decode())
           areas = []
           for element in js_obj:
               for i in range(len(element['areas'])):
                   if element['areas'][i]["parent_id"] == '113':                
     # Проверка на принадлежность стране
                       if len(element['areas'][i]['areas']) != 0:               
       # Если у зоны есть внутренние зоны
                           areas.append([element['areas'][i]['id'],
                                       element['areas'][i]['name']])
                       # else:                                                  
              # Если у зоны нет внутренних зон
                       #     areas.append([element['areas'][i]['id'],
                       #                   element['areas'][i]['name']])
           return areas
       
       
       def get_vacancies_per_page(self, page:int = 0, area:str = '1', 
prof_role:str = '156', exp = ("noExperience", "between1And3", "between3And6", 
"moreThan6"),
                                   period:bool = False) -> list:
           """Функция для получения всех вакансий с определённой страницы"""
           period = period
           params={
               "area":area,
               "professional_role":prof_role,
               "only_with_salary":True,
               "page":page,
               "employment":("full", "part","probation"),
               "experience":("noExperience", "between1And3", "between3And6", 
"moreThan6"),
               "per_page":100
           }
           if period:
               params["date_from"] = date_from()
               params["date_to"] = date_to()
               req = requests.get('https://api.hh.ru/vacancies/', params, 
headers=headers)
               data = req.content.decode()
               req.close()
           return data
   
       @staticmethod
       def get_vacancies_with_specific_id(id:str, retry:int=5) -> list:
           ''' Функция для получения определённой вакансии по её id'''
           try:
               req = requests.get(f'https://api.hh.ru/vacancies/{id}', 
headers=headers)
           except Exception as ex:
                return
           else:
                return req
   
       
       def vacancies_saver(self, page: int, area: list, role: str, dop_num:int 
= 0, period:bool = False) -> None:
           """Функция сохраняет полученные вакансии с определённой страницы в 
отдельный файл с расширением .json"""
           js_objs = self.vacaniecc
           js_obj = json.loads(self.get_vacancies_per_page(page, area, role, 
period=period))
           for i in range(len(js_obj["items"])):
                           
                           id = js_obj["items"][i]["id"]
   
                           req = self.get_vacancies_with_specific_id(id)
   
                           if req:
                               
                               obj = json.loads(req.content.decode())
   
                               vacancie = {                                
#Словарь хранит в себе список всех необходимых нам полей из получаемой вакансии
                                   "id":obj["id"],
                                   "premium":obj["premium"],
                                   "name":obj["name"],
                                   "area":obj["area"],
                                   "salary":obj["salary"],
                                   "type":obj["type"],
                                   "address":obj["address"],
                                   "experience":obj["experience"],
                                   "schedule":obj["schedule"],
                                   "employment":obj["employment"],
                                   "department":obj["department"],
                                   "contacts":obj["contacts"],
                                   "key_skills":obj["key_skills"],
                                   
"professional_roles":obj["professional_roles"],
                                   "employer":obj["employer"],
                                   "url":obj["alternate_url"],
                                   "published_at":obj["published_at"],
                                   "working_days":obj["working_days"],
                                   
"working_time_intervals":obj["working_time_intervals"],
                                   
"working_time_modes":obj["working_time_modes"]
                               }
   
                               js_objs.append(vacancie)
                               req.close()
                               
                               time.sleep(0.6)
                           else:
                               continue
   
           if (js_obj["pages"] - page) <= 1:
               return str(js_objs)
   
       
       def start_parsing(self, period):
           """Конечная функция. Координирует работу всех остальных функций. Для 
начала она получает список всех зон.
           После чего начинает перебирать все вакансии в этой зоне и 
обрабатывает их исходя из выполняемых условий."""
           print(f"Время начала сбора данных {self.get_time()}")
           areas = [["1", "Москва"], ["2", "Санкт-петербург"]]
           areas.extend(self.get_areas())
           self.ALL_AREAS = tuple([i[0] for i in areas])
   
           total_vacancies = 0                  # Переменная хранит в себе 
кол-во всех вакансий
           area_list_id = 1                    # Переменная хранит в себе 
кол-во пройденных зон
   
           vac1 = json.loads(self.get_vacancies_per_page(0, self.ALL_AREAS, 
self.ALL_NEED_ROLES, period=period))
           count_vacancies1 = vac1['found']
           print(count_vacancies1)
           pages1 = vac1['pages']
           if count_vacancies1 > 2000:
               for area in areas:
                   area_id, area_name = area[0], area[1]
                   vac = json.loads(self.get_vacancies_per_page(0, area_id, 
self.ALL_NEED_ROLES, period=period))
                   count_vacancies = vac['found']
                   pages = vac['pages']
                   print('[{0}/{1}] Область: {2} ({3}) - Вакансий: 
{4}'.format(area_list_id, len(areas), area_name, area_id, count_vacancies))
                       
                   if count_vacancies > 2000:                           # Если 
вакансий по запросу больше 2к, разбиваем этот запрос на подзапросы по 
специализации
                       dop_num = 0
                       for role in self.PROFESSIONAL_ROLES:
                           need_vac = json.loads(self.get_vacancies_per_page(0, 
area_id, role, period=period))
                           c_v= need_vac['found']
                           pag = need_vac['pages']
                           if c_v > 2000:                              # Если 
вакансий по запросу больше 2к, разбиваем этот запрос на подзапросы по опыту
                               for exp in self.EXP:
                                   exp_vac = 
json.loads(self.get_vacancies_per_page(0, area_id, role, exp, period=period))
                                   page_exp = exp_vac['pages']
                                   for page in range(page_exp):
                                       self.vacancies_saver(page, area_id, 
role, dop_num, period=period)
                                       time.sleep(0.5)
                                       dop_num+=1
                                   time.sleep(0.5)
                               time.sleep(0.5)
                           else:
                               for page in range(pag):
                                   self.vacancies_saver(page, area_id, role, 
dop_num, period=period)
                                   time.sleep(0.5)
                                   dop_num+=1
                               time.sleep(0.5)
                       time.sleep(0.5)
                   else:                                               # Иначе 
собираем все вакансии, по всем специализациям сразу
                       role = self.ALL_NEED_ROLES
                       for page in range(pages):
                           self.vacancies_saver(page, area_id, role, 
period=period)
                           time.sleep(0.5)
                       time.sleep(0.5)
   
                   area_list_id+=1
                   total_vacancies+=count_vacancies
   
   
                   if count_vacancies != 0:
                       print(f"Добавленно {count_vacancies} вакансий")
           else:
               for page in range(pages1):
                   self.vacancies_saver(page, self.ALL_AREAS, 
self.ALL_NEED_ROLES, period=period)
                   time.sleep(0.5)
               
               total_vacancies+=count_vacancies1
   
           print(f"Вакансии собраны, всего {total_vacancies}", end="\n\n")
           return json.dumps(self.vacaniecc, indent=4, ensure_ascii=False)
   
   default_args = {
       'owner': 'airflow',    
       #'start_date': airflow.utils.dates.days_ago(2),
       # 'end_date': datetime(),
       # 'depends_on_past': False,
       #'email': ['[email protected]'],
       #'email_on_failure': False,
       # 'email_on_retry': False,
       # If a task fails, retry it once after waiting
       # at least 5 minutes
       'retries': 1,
       'retry_delay': timedelta(minutes=5),
   }
   
   
   parser = ParserRU()
   
   with DAG(
       dag_id = "demo_etl",
       default_args=args,
       # schedule_interval='0 0 * * *',
       schedule_interval='@once',       
       dagrun_timeout=timedelta(minutes=60),
       description='use case of psql operator in airflow',
       start_date = airflow.utils.dates.days_ago(1)
   ) as dag_psql:
       start_pars = PythonOperator(
           task_id="start_pars",
           python_callable = parser.start_parsing(True),
           )
       
       create_table = PostgresOperator(
       sql = create_table_sql_query,
       task_id = "create_table_task",
       postgres_conn_id = "con_app_db"
       )
   
       get_id_vac = PostgresOperator(
           sql = get_id_appended_vac,
           task_id = "get_id_appended_vac",
           postgres_conn_id = "con_app_db"
           )
   
       create_table >> [get_id_vac, start_pars] 
   
   
   
   
   
   if __name__ == "__main__":
           dag_psql.cli()
   `


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to