darkfennertrader commented on issue #28973: URL: https://github.com/apache/airflow/issues/28973#issuecomment-1491639692
Hello, just to let you know that the problem stil exists with multiple dynamic tasks in cascade. I solved the issue using an empty operator. Airflow 2.5.2 (using the official docker-compose file) OS: Ubunut 22.04 python version: 3.10.6  ``` import os from typing import Dict, Any from datetime import datetime import logging import pendulum from airflow.decorators import dag, task # type: ignore from airflow.providers.postgres.operators.postgres import PostgresOperator from airflow.operators.empty import EmptyOperator from dags_conf import default_args from custom_operators.csv_to_postgres import CsvToPostgres # type: ignore local_tz = pendulum.timezone("Europe/Rome") # type: ignore task_logger = logging.getLogger("airflow.task") airflow_home: Any = os.getenv("AIRFLOW_HOME") BASE_DIR: str = airflow_home + "/data_sources/ibkr" provider_name: Any = os.getenv("ibkr_provider_name") frequency: Any = os.getenv("ibkr_frequency") @dag( dag_id="ibkr_db_init_v_1_1_0", start_date=datetime(2023, 1, 1, tzinfo=local_tz), schedule=None, catchup=False, tags=["db init"], default_args=default_args, ) def populate_db() -> None: @task(do_xcom_push=True) # type: ignore def get_assets_years_and_files() -> list[Dict[str, str | list[str]]]: # (1) create list of assets assets = list(set(os.listdir(BASE_DIR))) # (2) create list of assets and relative directories assets_years = list({}) for asset in assets: years: list[str] = list(set(os.listdir(BASE_DIR + "/" + asset))) years = [ y for y in years if os.path.isdir(BASE_DIR + "/" + asset + "/" + y) ] assets_years.append({"symbol": asset, "years": years}) return assets_years @task # type: ignore def deduplication(symbol: str, years: list[str]) -> None: import pandas as pd overall_file_path = BASE_DIR + "/" + symbol + "/" # print(file_list) dframe = pd.DataFrame( columns=["Datetime", "Open", "High", "Low", "Close", "Volume"] ) for year in years: base_path = BASE_DIR + "/" + symbol + "/" + year # list all files within a directory data_path: list[str] = [ f for f in os.listdir(base_path) if os.path.isfile(os.path.join(base_path, f)) ] # remove overall file if present for idempotency if os.path.isfile(os.path.join(overall_file_path, "final.csv")): os.remove(os.path.join(overall_file_path, "final.csv")) print(symbol) for file in data_path: filepath = base_path = BASE_DIR + "/" + symbol + "/" + year + "/" + file # print(filepath) data = pd.read_csv(filepath, parse_dates=["Datetime"], date_parser=lambda x: pd.to_datetime(x).tz_localize("UTC")) # type: ignore dframe = pd.concat([dframe, data]) # type: ignore # renaming columns to make them compatible with db table columns dframe.rename( columns={ "Datetime": "time", "Open": "open", "High": "high", "Low": "low", "Close": "close", "Volume": "volume", }, inplace=True, ) dframe.set_index("time", drop=True, inplace=True) # type: ignore # deduplication dframe = dframe[~dframe.index.duplicated(keep="first")] # type: ignore dframe.sort_index(inplace=True) # type: ignore dframe.to_csv(overall_file_path + "/final.csv") print(dframe.shape) def list_of_dicts(elem: Dict[str, Any]) -> Dict[str, str]: return { "symbol": elem["symbol"].replace("-", "/"), "provider_name": provider_name, "frequency": frequency, } assets_years = get_assets_years_and_files() pg_input: list[dict(str, str)] = assets_years.map(list_of_dicts) # type: ignore deduplicate = deduplication.partial().expand_kwargs(assets_years) # type: ignore complementary_info = PostgresOperator.partial( task_id="complementary_info", postgres_conn_id="postgres_conn", # created as env variable sql="sql/GET_INFO_FROM_SYMBOLS_PROVIDERS.sql", ).expand( # type: ignore parameters=pg_input ) def list_of_str_int(elem: list[list[str | int]]) -> list[str | int]: return [y for x in elem for y in x] task_input = complementary_info.output.map(list_of_str_int) # save complementary info in csv files for postgres IBKR table compatibility @task(trigger_rule="all_success", depends_on_past=False, wait_for_downstream=False) # type: ignore def enrich_csv(extra_info: list[Any]) -> None: import pandas as pd symbol, symbol_id, provider_id, asset_class_id, frequency_id = ( extra_info[0], extra_info[1], extra_info[2], extra_info[3], extra_info[4], ) print(symbol) filepath = BASE_DIR + "/" + symbol.replace("/", "-") + "/final.csv" dframe = pd.read_csv(filepath, parse_dates=["time"], index_col="time") # type: ignore print(f"before: {dframe.shape}") dframe["provider_id"] = provider_id dframe["asset_class_id"] = asset_class_id dframe["frequency_id"] = frequency_id dframe["symbol_id"] = symbol_id print(f"after: {dframe.shape}") dframe.to_csv(filepath, header=True) print(extra_info) enrich = enrich_csv.partial().expand(extra_info=task_input) # type: ignore @task # type: ignore def prepare_input() -> list[str]: assets = list(set(os.listdir(BASE_DIR))) filepaths: list[str] = [] for elem in assets: filepath = "data_sources/ibkr/" + elem + "/final.csv" filepaths.append(filepath) return filepaths csvpg_input = prepare_input() solve_bug = EmptyOperator(task_id="solve_bug") # save csv to Postgres database kwargs: dict[str, Any] = { "task_id": "save_data_to_db", } # filepath = "data_sources/ibkr/AUD-CAD/final.csv" # save_to_db = CsvToPostgres( # filepath=filepath, sql="sql/COPY_INTO_IBKR_DATA.sql", **kwargs # ) save_to_db = CsvToPostgres.partial( # type: ignore sql="sql/COPY_INTO_IBKR_DATA.sql", **kwargs ).expand( # type: ignore filepath=csvpg_input ) [deduplicate, complementary_info] >> enrich >> solve_bug >> save_to_db # type: ignore populate_db() ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
