darkfennertrader commented on issue #28973:
URL: https://github.com/apache/airflow/issues/28973#issuecomment-1491639692

   Hello,
   just to let you know that the problem stil exists with multiple dynamic 
tasks in cascade. I solved the issue using an empty operator.
   
   Airflow 2.5.2 (using the official docker-compose file)
   OS: Ubunut 22.04
   python version: 3.10.6
   
   
![airflow_bug](https://user-images.githubusercontent.com/46807063/229083904-4868b76e-9147-4251-bb54-6052742cb32d.png)
   
   ```
   import os
   from typing import Dict, Any
   from datetime import datetime
   import logging
   import pendulum
   from airflow.decorators import dag, task  # type: ignore
   from airflow.providers.postgres.operators.postgres import PostgresOperator
   from airflow.operators.empty import EmptyOperator
   from dags_conf import default_args
   from custom_operators.csv_to_postgres import CsvToPostgres  # type: ignore
   
   local_tz = pendulum.timezone("Europe/Rome")  # type: ignore
   task_logger = logging.getLogger("airflow.task")
   airflow_home: Any = os.getenv("AIRFLOW_HOME")
   BASE_DIR: str = airflow_home + "/data_sources/ibkr"
   provider_name: Any = os.getenv("ibkr_provider_name")
   frequency: Any = os.getenv("ibkr_frequency")
   
   
   @dag(
       dag_id="ibkr_db_init_v_1_1_0",
       start_date=datetime(2023, 1, 1, tzinfo=local_tz),
       schedule=None,
       catchup=False,
       tags=["db init"],
       default_args=default_args,
   )
   def populate_db() -> None:
       @task(do_xcom_push=True)  # type: ignore
       def get_assets_years_and_files() -> list[Dict[str, str | list[str]]]:
   
           # (1) create list of assets
           assets = list(set(os.listdir(BASE_DIR)))
   
           # (2) create list of assets and relative directories
           assets_years = list({})
           for asset in assets:
               years: list[str] = list(set(os.listdir(BASE_DIR + "/" + asset)))
               years = [
                   y for y in years if os.path.isdir(BASE_DIR + "/" + asset + 
"/" + y)
               ]
   
               assets_years.append({"symbol": asset, "years": years})
   
           return assets_years
   
       @task  # type: ignore
       def deduplication(symbol: str, years: list[str]) -> None:
   
           import pandas as pd
   
           overall_file_path = BASE_DIR + "/" + symbol + "/"
   
           # print(file_list)
           dframe = pd.DataFrame(
               columns=["Datetime", "Open", "High", "Low", "Close", "Volume"]
           )
           for year in years:
               base_path = BASE_DIR + "/" + symbol + "/" + year
               # list all files within a directory
               data_path: list[str] = [
                   f
                   for f in os.listdir(base_path)
                   if os.path.isfile(os.path.join(base_path, f))
               ]
   
               # remove overall file if present for idempotency
               if os.path.isfile(os.path.join(overall_file_path, "final.csv")):
                   os.remove(os.path.join(overall_file_path, "final.csv"))
   
               print(symbol)
   
               for file in data_path:
                   filepath = base_path = BASE_DIR + "/" + symbol + "/" + year 
+ "/" + file
                   # print(filepath)
                   data = pd.read_csv(filepath, parse_dates=["Datetime"], 
date_parser=lambda x: pd.to_datetime(x).tz_localize("UTC"))  # type: ignore
                   dframe = pd.concat([dframe, data])  # type: ignore
   
           # renaming columns to make them compatible with db table columns
           dframe.rename(
               columns={
                   "Datetime": "time",
                   "Open": "open",
                   "High": "high",
                   "Low": "low",
                   "Close": "close",
                   "Volume": "volume",
               },
               inplace=True,
           )
   
           dframe.set_index("time", drop=True, inplace=True)  # type: ignore
           # deduplication
           dframe = dframe[~dframe.index.duplicated(keep="first")]  # type: 
ignore
           dframe.sort_index(inplace=True)  # type: ignore
           dframe.to_csv(overall_file_path + "/final.csv")
           print(dframe.shape)
   
       def list_of_dicts(elem: Dict[str, Any]) -> Dict[str, str]:
           return {
               "symbol": elem["symbol"].replace("-", "/"),
               "provider_name": provider_name,
               "frequency": frequency,
           }
   
       assets_years = get_assets_years_and_files()
       pg_input: list[dict(str, str)] = assets_years.map(list_of_dicts)  # 
type: ignore
       deduplicate = deduplication.partial().expand_kwargs(assets_years)  # 
type: ignore
   
       complementary_info = PostgresOperator.partial(
           task_id="complementary_info",
           postgres_conn_id="postgres_conn",  # created as env variable
           sql="sql/GET_INFO_FROM_SYMBOLS_PROVIDERS.sql",
       ).expand(  # type: ignore
           parameters=pg_input
       )
   
       def list_of_str_int(elem: list[list[str | int]]) -> list[str | int]:
           return [y for x in elem for y in x]
   
       task_input = complementary_info.output.map(list_of_str_int)
   
       # save complementary info in csv files for postgres IBKR table 
compatibility
       @task(trigger_rule="all_success", depends_on_past=False, 
wait_for_downstream=False)  # type: ignore
       def enrich_csv(extra_info: list[Any]) -> None:
           import pandas as pd
   
           symbol, symbol_id, provider_id, asset_class_id, frequency_id = (
               extra_info[0],
               extra_info[1],
               extra_info[2],
               extra_info[3],
               extra_info[4],
           )
           print(symbol)
           filepath = BASE_DIR + "/" + symbol.replace("/", "-") + "/final.csv"
           dframe = pd.read_csv(filepath, parse_dates=["time"], 
index_col="time")  # type: ignore
           print(f"before: {dframe.shape}")
           dframe["provider_id"] = provider_id
           dframe["asset_class_id"] = asset_class_id
           dframe["frequency_id"] = frequency_id
           dframe["symbol_id"] = symbol_id
           print(f"after: {dframe.shape}")
   
           dframe.to_csv(filepath, header=True)
   
           print(extra_info)
   
       enrich = enrich_csv.partial().expand(extra_info=task_input)  # type: 
ignore
   
       @task  # type: ignore
       def prepare_input() -> list[str]:
           assets = list(set(os.listdir(BASE_DIR)))
           filepaths: list[str] = []
           for elem in assets:
               filepath = "data_sources/ibkr/" + elem + "/final.csv"
               filepaths.append(filepath)
   
           return filepaths
   
       csvpg_input = prepare_input()
   
       solve_bug = EmptyOperator(task_id="solve_bug")
   
       # save csv to Postgres database
       kwargs: dict[str, Any] = {
           "task_id": "save_data_to_db",
       }
   
       # filepath = "data_sources/ibkr/AUD-CAD/final.csv"
       # save_to_db = CsvToPostgres(
       #     filepath=filepath, sql="sql/COPY_INTO_IBKR_DATA.sql", **kwargs
       # )
   
       save_to_db = CsvToPostgres.partial(  # type: ignore
           sql="sql/COPY_INTO_IBKR_DATA.sql", **kwargs
       ).expand(  # type: ignore
           filepath=csvpg_input
       )
       [deduplicate, complementary_info] >> enrich >> solve_bug >> save_to_db  
# type: ignore
   
   
   populate_db()
   ```
   
   
   
   
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to