Chikkl commented on issue #15607:
URL: https://github.com/apache/airflow/issues/15607#issuecomment-1546903925
Please help me
`import airflow
from datetime import timedelta
from airflow import DAG
from airflow.operators.postgres_operator import PostgresOperator
from airflow.utils.dates import days_ago
import pandas as pd
from datetime import datetime, timedelta
from airflow.operators.python import PythonOperator, BranchPythonOperator
from random import randint
from airflow.operators.bash import BashOperator
import requests
import json
import time
import os
from datetime import datetime
from datetime import timedelta
import psycopg2
import pandas as pd
from sqlalchemy import create_engine, VARCHAR, INT, Boolean, ARRAY
args={'owner': 'airflow'}
headers = {"useragent": "Mozilla/5.0 (Windows; U; MSIE 9.0; Windows NT 9.0;
en-US)"}
create_table_sql_query = """CREATE TABLE IF NOT EXISTS vacaniec(
id VARCHAR(9),
premium Boolean,
name VARCHAR(100),
area VARCHAR(50),
salary INT,
type VARCHAR(50),
address VARCHAR(300),
metro VARCHAR(40),
experience VARCHAR(50),
schedule VARCHAR(50),
employment VARCHAR(50),
department VARCHAR(100),
contacts VARCHAR(100),
key_skills VARCHAR[],
professional_roles VARCHAR(100),
employer VARCHAR(200),
url VARCHAR(300),
published_at VARCHAR(100)
)"""
def get_id_appended_vac():
qq = "select id from vacaniec"
df = pd.read_sql(qq)
if df.empty:
return []
return set(i[0] for i in df.values)
def change_time(time):
return "T".join(time.split())+"+1000"
def date_to(time=False):
if time:
time = datetime.strptime(" ".join(time[:19].split("T")), '%Y-%m-%d
%H:%M:%S')
return change_time(str(time + timedelta(minutes=30)))
else:
return change_time(str(datetime.now()).split(".")[0])
def date_from():
return change_time(str(datetime.now() -
timedelta(minutes=30)).split(".")[0])
class ParserRU():
"""Класс представляет необходимый набор функционала для сбора данных
через API с сайта hh.ru
Каждая функция содержит краткое описание того, что она делает."""
PROFESSIONAL_ROLES = ['156', '160', '10', '12', '150', '25', '165',
'34', '36', '73', '155', '96',
'164', '104', '157', '107', '112', '113',
'148', '114', '116', '121', '124', '125', '126']
ALL_NEED_ROLES = ('156', '160', '10', '12', '150', '25', '165', '34',
'36', '73', '155', '96',
'164', '104', '157', '107', '112', '113',
'148', '114', '116', '121', '124', '125', '126')
EXP = ["noExperience", "between1And3", "between3And6", "moreThan6"]
vacaniecc = []
@staticmethod
def get_time(t = time.localtime()) -> str:
"""Функция возвращает текущее время"""
current_time = time.strftime("%H : %M : %S", t)
return current_time
@staticmethod
def get_areas() -> list:
"""Функция для получения всех зон для парсинга (включает в себя все
субъекты РФ и два города федерального значения [Москва, Санкт-Петербург])"""
req = requests.get('https://api.hh.ru/areas', headers=headers)
req.close()
js_obj = json.loads(req.content.decode())
areas = []
for element in js_obj:
for i in range(len(element['areas'])):
if element['areas'][i]["parent_id"] == '113':
# Проверка на принадлежность стране
if len(element['areas'][i]['areas']) != 0:
# Если у зоны есть внутренние зоны
areas.append([element['areas'][i]['id'],
element['areas'][i]['name']])
# else:
# Если у зоны нет внутренних зон
# areas.append([element['areas'][i]['id'],
# element['areas'][i]['name']])
return areas
def get_vacancies_per_page(self, page:int = 0, area:str = '1',
prof_role:str = '156', exp = ("noExperience", "between1And3", "between3And6",
"moreThan6"),
period:bool = False) -> list:
"""Функция для получения всех вакансий с определённой страницы"""
period = period
params={
"area":area,
"professional_role":prof_role,
"only_with_salary":True,
"page":page,
"employment":("full", "part","probation"),
"experience":("noExperience", "between1And3", "between3And6",
"moreThan6"),
"per_page":100
}
if period:
params["date_from"] = date_from()
params["date_to"] = date_to()
req = requests.get('https://api.hh.ru/vacancies/', params,
headers=headers)
data = req.content.decode()
req.close()
return data
@staticmethod
def get_vacancies_with_specific_id(id:str, retry:int=5) -> list:
''' Функция для получения определённой вакансии по её id'''
try:
req = requests.get(f'https://api.hh.ru/vacancies/{id}',
headers=headers)
except Exception as ex:
return
else:
return req
def vacancies_saver(self, page: int, area: list, role: str, dop_num:int
= 0, period:bool = False) -> None:
"""Функция сохраняет полученные вакансии с определённой страницы в
отдельный файл с расширением .json"""
js_objs = self.vacaniecc
js_obj = json.loads(self.get_vacancies_per_page(page, area, role,
period=period))
for i in range(len(js_obj["items"])):
id = js_obj["items"][i]["id"]
req = self.get_vacancies_with_specific_id(id)
if req:
obj = json.loads(req.content.decode())
vacancie = {
#Словарь хранит в себе список всех необходимых нам полей из получаемой вакансии
"id":obj["id"],
"premium":obj["premium"],
"name":obj["name"],
"area":obj["area"],
"salary":obj["salary"],
"type":obj["type"],
"address":obj["address"],
"experience":obj["experience"],
"schedule":obj["schedule"],
"employment":obj["employment"],
"department":obj["department"],
"contacts":obj["contacts"],
"key_skills":obj["key_skills"],
"professional_roles":obj["professional_roles"],
"employer":obj["employer"],
"url":obj["alternate_url"],
"published_at":obj["published_at"],
"working_days":obj["working_days"],
"working_time_intervals":obj["working_time_intervals"],
"working_time_modes":obj["working_time_modes"]
}
js_objs.append(vacancie)
req.close()
time.sleep(0.6)
else:
continue
if (js_obj["pages"] - page) <= 1:
return str(js_objs)
def start_parsing(self, period):
"""Конечная функция. Координирует работу всех остальных функций. Для
начала она получает список всех зон.
После чего начинает перебирать все вакансии в этой зоне и
обрабатывает их исходя из выполняемых условий."""
print(f"Время начала сбора данных {self.get_time()}")
areas = [["1", "Москва"], ["2", "Санкт-петербург"]]
areas.extend(self.get_areas())
self.ALL_AREAS = tuple([i[0] for i in areas])
total_vacancies = 0 # Переменная хранит в себе
кол-во всех вакансий
area_list_id = 1 # Переменная хранит в себе
кол-во пройденных зон
vac1 = json.loads(self.get_vacancies_per_page(0, self.ALL_AREAS,
self.ALL_NEED_ROLES, period=period))
count_vacancies1 = vac1['found']
print(count_vacancies1)
pages1 = vac1['pages']
if count_vacancies1 > 2000:
for area in areas:
area_id, area_name = area[0], area[1]
vac = json.loads(self.get_vacancies_per_page(0, area_id,
self.ALL_NEED_ROLES, period=period))
count_vacancies = vac['found']
pages = vac['pages']
print('[{0}/{1}] Область: {2} ({3}) - Вакансий:
{4}'.format(area_list_id, len(areas), area_name, area_id, count_vacancies))
if count_vacancies > 2000: # Если
вакансий по запросу больше 2к, разбиваем этот запрос на подзапросы по
специализации
dop_num = 0
for role in self.PROFESSIONAL_ROLES:
need_vac = json.loads(self.get_vacancies_per_page(0,
area_id, role, period=period))
c_v= need_vac['found']
pag = need_vac['pages']
if c_v > 2000: # Если
вакансий по запросу больше 2к, разбиваем этот запрос на подзапросы по опыту
for exp in self.EXP:
exp_vac =
json.loads(self.get_vacancies_per_page(0, area_id, role, exp, period=period))
page_exp = exp_vac['pages']
for page in range(page_exp):
self.vacancies_saver(page, area_id,
role, dop_num, period=period)
time.sleep(0.5)
dop_num+=1
time.sleep(0.5)
time.sleep(0.5)
else:
for page in range(pag):
self.vacancies_saver(page, area_id, role,
dop_num, period=period)
time.sleep(0.5)
dop_num+=1
time.sleep(0.5)
time.sleep(0.5)
else: # Иначе
собираем все вакансии, по всем специализациям сразу
role = self.ALL_NEED_ROLES
for page in range(pages):
self.vacancies_saver(page, area_id, role,
period=period)
time.sleep(0.5)
time.sleep(0.5)
area_list_id+=1
total_vacancies+=count_vacancies
if count_vacancies != 0:
print(f"Добавленно {count_vacancies} вакансий")
else:
for page in range(pages1):
self.vacancies_saver(page, self.ALL_AREAS,
self.ALL_NEED_ROLES, period=period)
time.sleep(0.5)
total_vacancies+=count_vacancies1
print(f"Вакансии собраны, всего {total_vacancies}", end="\n\n")
return json.dumps(self.vacaniecc, indent=4, ensure_ascii=False)
default_args = {
'owner': 'airflow',
#'start_date': airflow.utils.dates.days_ago(2),
# 'end_date': datetime(),
# 'depends_on_past': False,
#'email': ['[email protected]'],
#'email_on_failure': False,
# 'email_on_retry': False,
# If a task fails, retry it once after waiting
# at least 5 minutes
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
parser = ParserRU()
with DAG(
dag_id = "demo_etl",
default_args=args,
# schedule_interval='0 0 * * *',
schedule_interval='@once',
dagrun_timeout=timedelta(minutes=60),
description='use case of psql operator in airflow',
start_date = airflow.utils.dates.days_ago(1)
) as dag_psql:
start_pars = PythonOperator(
task_id="start_pars",
python_callable = parser.start_parsing(True),
)
create_table = PostgresOperator(
sql = create_table_sql_query,
task_id = "create_table_task",
postgres_conn_id = "con_app_db"
)
get_id_vac = PostgresOperator(
sql = get_id_appended_vac,
task_id = "get_id_appended_vac",
postgres_conn_id = "con_app_db"
)
create_table >> [get_id_vac, start_pars]
if __name__ == "__main__":
dag_psql.cli()
`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]