EbrahimKaram commented on issue #58679:
URL: https://github.com/apache/airflow/issues/58679#issuecomment-3581899826
The Custom Sensor
class SMBNewFilesSensor(BaseSensorOperator):
def __init__(self, smb_dir: str, server_name: str, username: str,
password: str, pattern: str = "*.xlsx", *args, **kwargs):
"""
smb_dir: path to the SMB folder (e.g., \\\\server\\share\\folder)
seen_files_var: Airflow Variable name to store seen filenames
(optional)
"""
self.smb_dir = smb_dir
self.server_name = server_name
self.username = username
self.password = password
self.files = []
self.pattern = pattern
# We will poke every 5 seconds, with exponential backoff, for up to
1 hour
kwargs.setdefault("poke_interval", 5) # start at 5 seconds
kwargs.setdefault("exponential_backoff", True) # enable backoff
# timeout after 1 hour. Time is set in seconds
kwargs.setdefault("timeout", 60 * 60)
# maximum wait between pokes is 1 minute
kwargs.setdefault("max_wait", 60)
# free up worker slots when not poking
kwargs.setdefault("mode", "reschedule")
super().__init__(*args, **kwargs)
def poke(self, context):
# List all files in the directory
try:
# We want to get the excel files located there
smbclient.register_session(
server=self.server_name, username=self.username,
password=self.password, require_signing=True)
except SMBAuthenticationError as e:
self.log.error(f"Authentication failed: {e}")
return False
# List all entries and filter for Excel files explicitly to avoid
API differences
self.files = smbclient.listdir(
self.smb_dir, search_pattern=self.pattern)
if (len(self.files) > 0):
self.log.info(f"Found {len(self.files)} files: {self.files}")
return True
else:
self.log.info("No new files found")
return False
## One dag test Example
import sys
import os
import importlib
from airflow.configuration import conf
from dotenv import load_dotenv
import smbclient
from airflow.sdk import dag, task
import pendulum
import warnings
import logging
import pandas as pd
from io import BytesIO
from airflow.sdk import get_current_context
from sqlalchemy import create_engine
from sqlalchemy.engine import URL
sys.path.append(os.path.join(os.path.dirname(__file__)))
#fmt: off
import config # This is our config file
#fmt: on
# provided by plugins/smb_sensors.py
# from plugins.smb_sensors import SMBNewFilesSensor
sys.path.append(os.path.join(os.path.dirname(__file__), "..", "..",
"plugins"))
sys.path.append(os.path.join(os.path.dirname(__file__), "..", ".."))
if 'smb_sensors' in sys.modules:
importlib.reload(sys.modules['smb_sensors'])
#fmt: off
from plugins.smb_sensors import SMBNewFilesSensor
import plugins.marquez_task_emitter as marquez_emitter
import plugins.general_df_edits as general_df_edits
#fmt: on
# Loading environment variables from .env file
env_file = os.path.join(os.path.dirname(__file__), "..", ".env")
load_dotenv(env_file)
SMB_USERNAME = os.getenv("SMB_username")
SMB_PASSWORD = os.getenv("SMB_password")
SMB_SERVER = os.getenv("SMB_server")
# SQL Server details
SQL_SERVER = os.getenv("SQL_SERVER")
SQL_DATABASE = os.getenv("SQL_DATABASE")
SQL_USERNAME = os.getenv("SQL_USERNAME")
SQL_PASSWORD = os.getenv("SQL_PASSWORD")
SQL_DRIVER = os.getenv("SQL_DRIVER")
if config.PRODUCTION:
SQL_SERVER = os.getenv("SQL_SERVER_PROD")
SQL_USERNAME = os.getenv("SQL_USERNAME_PROD")
SQL_PASSWORD = os.getenv("SQL_PASSWORD_PROD")
def disable_all_logging():
# Suppress specific noisy loggers
logging.getLogger("airflow").setLevel(logging.CRITICAL)
logging.getLogger("werkzeug").setLevel(logging.CRITICAL)
logging.getLogger("sqlalchemy").setLevel(logging.CRITICAL)
logging.getLogger("urllib3").setLevel(logging.CRITICAL)
logging.getLogger("airflow.task").setLevel(logging.CRITICAL)
logging.getLogger("airflow.task.task_runner").setLevel(logging.CRITICAL)
logging.getLogger("dill").setLevel(logging.CRITICAL)
# Disable all logging
logging.disable(logging.CRITICAL)
warnings.filterwarnings("ignore")
# This is the function that takes the excel files from smb and loads them to
sql server
# it also moves the files to imported folder
# To renable logging
def enable_all_logging():
logging.disable(logging.NOTSET)
MAX_ACTIVE_TASKS = 10 # Limit to 12 files at a time to avoid overwhelming
SQL Server
DAG_ID_NAME_VERSION = config.DAG_ID_NAME + "_v7"
default_args = {
'owner': 'airflow',
'depends_on_past': False, # Will not run if previous dag run failed
'email': ['[email protected]'], # email to receive alert
'email_on_failure': True, # must be True to test email
'email_on_retry': False,
'retries': 0
}
@dag(
schedule="*/2 * * * *", # This gets triggered every minute unless it's
already running
start_date=pendulum.now("UTC").subtract(minutes=5),
dag_id=DAG_ID_NAME_VERSION,
default_args=default_args,
catchup=False,
is_paused_upon_creation=True,
description="Detect new Excel files in Target Load directory and load to
SQL Server",
tags=[config.DAG_ID_NAME, "xlsx", "etl",
"file-processing", "Written-by-Bob"],
max_active_runs=1,
max_active_tasks=MAX_ACTIVE_TASKS,
on_success_callback=marquez_emitter.on_dag_success,
on_failure_callback=marquez_emitter.on_dag_failure
)
def ETL_General_File_Processor():
check_new_xlsx_files = SMBNewFilesSensor(
task_id="check_new_xlsx_files",
smb_dir=rf"\\{SMB_SERVER}{config.FOLDER_PATH}",
server_name=SMB_SERVER,
username=SMB_USERNAME,
password=SMB_PASSWORD,
pattern="*.xlsx",
soft_fail=True, # If the sensor times out, mark as SKIPPED instead
of FAILED
)
check_new_xlsx_files
# This is required for Airflow to recognize the DAG
ETL_General_File_Processor()
if __name__ == "__main__":
print("Starting script execution...")
dag = ETL_General_File_Processor()
dag.test()
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]