gmarendaz commented on issue #35979:
URL: https://github.com/apache/airflow/issues/35979#issuecomment-1838256382
@yiqijiu : thanks for your answer. Indeed, I didn't have the encoding
declaration at the beginning of my DAG code and I added it but the problem
remains the same.
Here is the full code below :
`
# -*- coding: utf-8 -*-
from _lib import _template_slave
from airflow import DAG
from airflow.hooks.base_hook import BaseHook
from airflow.models import Variable
from airflow.models import XCom
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import BranchPythonOperator
from airflow.operators.python_operator import PythonOperator
from airflow.utils.dates import days_ago
from airflow.utils.db import provide_session
from datetime import datetime
from io import StringIO, BytesIO
import chardet
import datetime
import json
import logging
import ntpath
import numpy as np
import os
import pandas as pd
import re
import sqlalchemy
import unicodedata
import yaml
DAG_ID = os.path.basename(__file__).replace(".pyc", "").replace(".py", "")
default_args = {
'owner': 'IMEDA',
'start_date': days_ago(2),
'weight_rule': "upstream"
}
## CHECK
def check_file(**kwargs):
path = kwargs["dag_run"].conf['path']
blacklist = Variable.get("manufacturing_blacklist_masp",
deserialize_json=True)
if any(s in path for s in blacklist):
return "blacklist"
else:
return "pick_extract_mode"
def pick_extract_mode(**kwargs):
path = kwargs["dag_run"].conf['path']
with open(path, encoding='latin_1') as f:
file_ = [line for line in f]
for n,line in enumerate(file_):
if line.startswith("<Info>"):
return "new_extract"
return "old_extract"
## EXTRACT
def get_header_fields(path, df, comment):
with open(path, encoding='latin_1') as f:
for line in f:
if line.startswith(comment + "\tNo de série :"):
df["batch_id"] = re.search(r'.*No de série
:\t([a-zA-Z0-9]*)',line).group(1)
n_plaque = re.search(r'.*N° plaque
:\t([0-9]*)',line).group(1)
# Fill with zeros to keep order 01, 02, .. , 10, 11,...
if len(n_plaque) < 2:
n_plaque = n_plaque.zfill(2)
df["n_plaque"] = n_plaque
elif line.startswith(comment + "\tProduit :"):
df["produit"] = re.search(r'.*Produit :\t(.*)\ttechnologie
:',line).group(1)
df["technologie"] = re.search(r'.*technologie
:\t(.*)\tOpérateur',line).group(1)
if line.startswith(comment + "\tFichier limites :"):
limit_file = re.search(r'\tFichier limites :\t(.*)\tdu
:',line).group(1)
df["limit_file"] = limit_file.lower().replace(" ", "")
return df
def old_extract(**kwargs):
path = kwargs["dag_run"].conf['path']
header_size = _template_slave.header(path, "Puce N°")
raw_df = pd.read_csv(path, header=header_size, sep="\t",
engine='python', encoding='latin_1', encoding_errors='ignore',
on_bad_lines='skip')
# Keep only rows with more than 5 non-NaN values
df = raw_df[raw_df.count(axis=1) > 6][1:]
df = get_header_fields(path, df, comment = "")
str_columns = [col for col in df.columns if isinstance(col, str)]
df[str_columns] = df[str_columns].rename(columns=str.lower)\
.rename(columns=_template_slave.remove_accents)\
.rename(columns=_template_slave.remove_special_characters)
return df
def new_extract(**kwargs):
path = kwargs["dag_run"].conf['path']
df = pd.read_csv(path, comment='#', sep="\t", header=[0,1],
engine='python', encoding='latin_1', encoding_errors='ignore',
on_bad_lines='skip')
df.columns = df.columns.map('_'.join)
str_columns = [col for col in df.columns if isinstance(col, str)]
df[str_columns] = df[str_columns].rename(columns=str.lower)\
.rename(columns=_template_slave.remove_accents)\
.rename(columns=_template_slave.remove_special_characters)
df = get_header_fields(path, df, comment="###\t")
with open(path, encoding='latin_1') as f:
file_ = [line for line in f]
for n,line in enumerate(file_):
if line.startswith("Fichier clôturé"):
return df[1:-1]
return df[1:]
## TRANSFORM
"""def custom_deserialize(df):
return json.load(df.decode('latin_1'))"""
def concat_time_date(df):
df['date_heure'] = pd.to_datetime(df['info_date'] + " " +
df['info_heure'], dayfirst = True, errors="coerce")
df = df.drop(['info_date', 'info_heure'], axis=1)
return df
def transform(**kwargs):
ti = kwargs['ti']
task_outputs = ti.xcom_pull(task_ids=["old_extract", "new_extract"])
logging.info(task_outputs)
extracting_res = [output for output in task_outputs if output is not
None]
df = extracting_res[0]
df = df.rename(columns={"puce_n":"info_puce_n",
"pos_x": "info_pos_x",
"pos_y": "info_pos_y",
"date" : "info_date",
"heure" : "info_heure",
'pos_a_x_g' : 'capa_meas_pos_meas_at_x',
"status" : "info_status",
"csup_a_x_g" : "capa_meas_csup_x_g",
'cinf_a_x_g' : 'capa_meas_cinf_x_g',
'csup_a_0_g' : 'capa_meas_csup_0_g',
'rsup_a_0_g' : 'capa_meas_rsup_0_g',
'cinf_a_0_g' : 'capa_meas_cinf_0_g',
'rinf_a_0_g' : 'capa_meas_rinf_0_g',
'csup_pol' : 'capa_meas_csup_pol',
'cinf_pol' : 'capa_meas_cinf_pol',
'csup_hyst' : 'capa_meas_csup_hys',
'cinf_hyst' : 'capa_meas_cinf_hys',
'c_tot' : 'capa_meas_c_tot',
'sz' : 'capa_meas_sz_percent',
'sz_1' : 'capa_meas_sz_pf',
'so' : 'capa_meas_s0_a',
'd_pol_sup' : 'capa_meas_delta_csup_pol',
'd_pol_inf' : 'capa_meas_delta_cinf_pol',
'd_hyst_sup' : 'capa_meas_delta_csup_hys',
'd_hyst_inf' : 'capa_meas_delta_cinf_hys',
'ampl_a_0' : 'bw_meas_ampl_0_hz',
'ampl_x_hz' : 'bw_meas_ampl_x_hz',
'fc' : 'bw_meas_fc',
'ampl_min' : 'bw_meas_ampl_min',
'f_min' : 'bw_meas_f_min',
'ampl_res' : 'bw_meas_ampl_res',
'f_res' : 'bw_meas_f_res',
'mse' : 'bw_meas_mse',
'press' : 'bw_meas_press',
'q' : 'bw_meas_q'})
schema = _template_slave.get_schema("test_wafer")
df = concat_time_date(df)
df = _template_slave.filter_(df, schema)
df = _template_slave.cast(df, schema)
df["location"] = kwargs["dag_run"].conf['location']
df["src_path"] = kwargs["dag_run"].conf['path']
return df
## LOAD
def load(**kwargs):
engine =
sqlalchemy.create_engine(BaseHook.get_connection("database").get_uri())
ti = kwargs['ti']
df = ti.xcom_pull(task_ids='transform')
try:
df.to_sql("test_wafer", con=engine, if_exists='append', index=False)
except sqlalchemy.exc.IntegrityError:
pass
@provide_session
def cleanup_xcom(session=None, **kwargs):
session.query(XCom).filter(XCom.dag_id == DAG_ID)\
.filter(XCom.execution_date ==
kwargs["execution_date"])\
.delete()
with DAG(
dag_id=DAG_ID,
default_args=default_args,
schedule_interval=None,
max_active_runs=1,
tags = ["manufacturing", "tests"]
) as dag:
check_task = BranchPythonOperator(
task_id="check_file",
python_callable=check_file
)
blacklist_task = DummyOperator(
task_id="blacklist"
)
pick_extract_mode_task = BranchPythonOperator(
task_id="pick_extract_mode",
python_callable=pick_extract_mode
)
old_extract_task = PythonOperator(
task_id='old_extract',
python_callable=old_extract,
)
new_extract_task = PythonOperator(
task_id='new_extract',
python_callable=new_extract,
)
transform_task = PythonOperator(
task_id='transform',
python_callable=transform,
trigger_rule="none_failed"
)
load_task = PythonOperator(
task_id='load',
python_callable=load,
pool="wafer_pool"
)
clean_xcom_task = PythonOperator(
task_id="clean_xcom",
python_callable = cleanup_xcom,
)
check_task >> [pick_extract_mode_task, blacklist_task]
pick_extract_mode_task >> [old_extract_task, new_extract_task]
[old_extract_task, new_extract_task] >> transform_task >> load_task >>
clean_xcom_task
`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]