Steexyz commented on issue #60535:
URL: https://github.com/apache/airflow/issues/60535#issuecomment-3799796994

   Hi @shahar1,
   
   Here's a DAG example that used to work with 2.11.
   
   ```python
   from __future__ import annotations
   from airflow import DAG
   import pendulum
   
   from operators.extraction import ExtractionOperator
   from connectors.jdbc.source.oracle_source_connector import 
OracleSourceConnector
   from connectors.cloud.target.gcs_target_connector import GCSTargetConnector
   
   with DAG(
       dag_id='ar_collectors',
       start_date=pendulum.datetime(2025, 10, 1, tz="America/Toronto"),
       schedule="0 10 * * *",
       catchup=False,
       tags=['jdbc', 'oracleebs', 'ar_collectors']
   ) as dag:
   
       # 1. Define the Source (Oracle)
       oracle_source = OracleSourceConnector(
           conn_id='oracleebs',
           sql="SELECT * FROM AR.AR_COLLECTORS",
           query_mode='delta',
           delta_column='LAST_UPDATE_DATE',
           xcom_key='ar_collectors_delta'
       )
   
       # 2. Define the Target (GCS)
       gcs_target = GCSTargetConnector(
           conn_id='gcs-bucket-project',
           table_name='AR_COLLECTORS',
           gcs_path='raw/OracleEBS'
       )
   
       # 3. Use the Orchestrator (ExtractionOperator)
       ExtractionOperator(
           task_id='select_ar_collectors',
           source=oracle_source,
           targets=[gcs_target]
       )
   ``` 
   
   FYI, the OracleSourceConnector does retrieve its connection and can retrieve 
the data.
   
   The Oracle Source Connector retrieves its connection using this parent class:
   
   ```python
   from airflow.hooks.base import BaseHook
   from airflow.exceptions import AirflowException
   
   class BaseHook(BaseHook):
       """Base class for all internal hooks to standardize logging and error 
handling."""
       def __init__(self, conn_id: str, **kwargs):
           super().__init__(**kwargs)
           self.conn_id = conn_id
   
       def get_connection_metadata(self):
           """Standardized connection retrieval with clear error messaging."""
           try:
               return self.get_connection(self.conn_id)
           except Exception:
               self.log.error(f"Failed to find Airflow Connection: 
{self.conn_id}")
               raise AirflowException(f"Connection {self.conn_id} is missing or 
inaccessible.")
   ``` 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to