EbrahimKaram commented on issue #58679:
URL: https://github.com/apache/airflow/issues/58679#issuecomment-3581899826

   The Custom Sensor 
   class SMBNewFilesSensor(BaseSensorOperator):
       def __init__(self, smb_dir: str, server_name: str, username: str, 
password: str, pattern: str = "*.xlsx", *args, **kwargs):
           """
           smb_dir: path to the SMB folder (e.g., \\\\server\\share\\folder)
           seen_files_var: Airflow Variable name to store seen filenames 
(optional)
           """
           self.smb_dir = smb_dir
           self.server_name = server_name
           self.username = username
           self.password = password
           self.files = []
           self.pattern = pattern
   
           # We will poke every 5 seconds, with exponential backoff, for up to 
1 hour
           kwargs.setdefault("poke_interval", 5)          # start at 5 seconds
           kwargs.setdefault("exponential_backoff", True)  # enable backoff
           # timeout after 1 hour. Time is set in seconds
           kwargs.setdefault("timeout", 60 * 60)
           # maximum wait between pokes is 1 minute
           kwargs.setdefault("max_wait", 60)
           # free up worker slots when not poking
           kwargs.setdefault("mode", "reschedule")
   
           super().__init__(*args, **kwargs)
   
       def poke(self, context):
           # List all files in the directory
           try:
               # We want to get the excel files located there
               smbclient.register_session(
                   server=self.server_name, username=self.username, 
password=self.password, require_signing=True)
   
           except SMBAuthenticationError as e:
               self.log.error(f"Authentication failed: {e}")
               return False
   
           # List all entries and filter for Excel files explicitly to avoid 
API differences
           self.files = smbclient.listdir(
               self.smb_dir, search_pattern=self.pattern)
           if (len(self.files) > 0):
               self.log.info(f"Found {len(self.files)} files: {self.files}")
               return True
           else:
               self.log.info("No new files found")
               return False
   
   
   ## One dag test Example
   import sys
   import os
   
   
   import importlib
   
   from airflow.configuration import conf
   
   from dotenv import load_dotenv
   import smbclient
   from airflow.sdk import dag, task
   import pendulum
   import warnings
   import logging
   
   import pandas as pd
   from io import BytesIO
   from airflow.sdk import get_current_context
   
   
   
   from sqlalchemy import create_engine
   from sqlalchemy.engine import URL
   
   
   sys.path.append(os.path.join(os.path.dirname(__file__)))
   #fmt: off
   import config  # This is our config file
   #fmt: on
   
   # provided by plugins/smb_sensors.py
   # from plugins.smb_sensors import SMBNewFilesSensor
   
   sys.path.append(os.path.join(os.path.dirname(__file__), "..", "..", 
"plugins"))
   sys.path.append(os.path.join(os.path.dirname(__file__), "..", ".."))
   
   if 'smb_sensors' in sys.modules:
       importlib.reload(sys.modules['smb_sensors'])
   
   #fmt: off
   from plugins.smb_sensors import SMBNewFilesSensor
   import plugins.marquez_task_emitter as marquez_emitter
   import plugins.general_df_edits as general_df_edits
   #fmt: on
   
   # Loading environment variables from .env file
   env_file = os.path.join(os.path.dirname(__file__), "..", ".env")
   load_dotenv(env_file)
   
   
   SMB_USERNAME = os.getenv("SMB_username")
   SMB_PASSWORD = os.getenv("SMB_password")
   SMB_SERVER = os.getenv("SMB_server")
   
   
   # SQL Server details
   SQL_SERVER = os.getenv("SQL_SERVER")
   SQL_DATABASE = os.getenv("SQL_DATABASE")
   SQL_USERNAME = os.getenv("SQL_USERNAME")
   SQL_PASSWORD = os.getenv("SQL_PASSWORD")
   SQL_DRIVER = os.getenv("SQL_DRIVER")
   
   if config.PRODUCTION:
       SQL_SERVER = os.getenv("SQL_SERVER_PROD")
       SQL_USERNAME = os.getenv("SQL_USERNAME_PROD")
       SQL_PASSWORD = os.getenv("SQL_PASSWORD_PROD")
   
   
   def disable_all_logging():
       # Suppress specific noisy loggers
       logging.getLogger("airflow").setLevel(logging.CRITICAL)
       logging.getLogger("werkzeug").setLevel(logging.CRITICAL)
       logging.getLogger("sqlalchemy").setLevel(logging.CRITICAL)
       logging.getLogger("urllib3").setLevel(logging.CRITICAL)
       logging.getLogger("airflow.task").setLevel(logging.CRITICAL)
       logging.getLogger("airflow.task.task_runner").setLevel(logging.CRITICAL)
       logging.getLogger("dill").setLevel(logging.CRITICAL)
   
       # Disable all logging
       logging.disable(logging.CRITICAL)
       warnings.filterwarnings("ignore")
   
   
   # This is the function that takes the excel files from smb and loads them to 
sql server
   # it also moves the files to imported folder
   
   
   # To renable logging
   def enable_all_logging():
       logging.disable(logging.NOTSET)
   
   
   MAX_ACTIVE_TASKS = 10  # Limit to 12 files at a time to avoid overwhelming 
SQL Server
   DAG_ID_NAME_VERSION = config.DAG_ID_NAME + "_v7"
   
   default_args = {
       'owner': 'airflow',
       'depends_on_past': False, # Will not run if previous dag run failed
       'email': ['[email protected]'],  # email to receive alert
       'email_on_failure': True,  # must be True to test email
       'email_on_retry': False,
       'retries': 0
   }
   @dag(
       schedule="*/2 * * * *",  # This gets triggered every minute unless it's 
already running
       start_date=pendulum.now("UTC").subtract(minutes=5),
       dag_id=DAG_ID_NAME_VERSION,
       default_args=default_args,
       catchup=False,
       is_paused_upon_creation=True,
       description="Detect new Excel files in Target Load directory and load to 
SQL Server",
       tags=[config.DAG_ID_NAME, "xlsx", "etl",
             "file-processing", "Written-by-Bob"],
       max_active_runs=1,
       max_active_tasks=MAX_ACTIVE_TASKS,
       on_success_callback=marquez_emitter.on_dag_success,
       on_failure_callback=marquez_emitter.on_dag_failure
   )
   def ETL_General_File_Processor():
       check_new_xlsx_files = SMBNewFilesSensor(
           task_id="check_new_xlsx_files",
           smb_dir=rf"\\{SMB_SERVER}{config.FOLDER_PATH}",
           server_name=SMB_SERVER,
           username=SMB_USERNAME,
           password=SMB_PASSWORD,
           pattern="*.xlsx",
           soft_fail=True,  # If the sensor times out, mark as SKIPPED instead 
of FAILED
       )
   
   
       check_new_xlsx_files
   
   
   # This is required for Airflow to recognize the DAG
   ETL_General_File_Processor()
   
   
   if __name__ == "__main__":
       print("Starting script execution...")
   
       dag = ETL_General_File_Processor()
       dag.test()
   
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to