GitHub user Steexyz added a comment to the discussion: The conn_id `xxxxxxx` 
isn't defined Google Cloud Platform

Hi @shahar1,

Here's a DAG example that used to work with 2.11.

```python
from __future__ import annotations
from airflow import DAG
import pendulum

from operators.extraction import ExtractionOperator
from connectors.jdbc.source.oracle_source_connector import OracleSourceConnector
from connectors.cloud.target.gcs_target_connector import GCSTargetConnector

with DAG(
    dag_id='ar_collectors',
    start_date=pendulum.datetime(2025, 10, 1, tz="America/Toronto"),
    schedule="0 10 * * *",
    catchup=False,
    tags=['jdbc', 'oracleebs', 'ar_collectors']
) as dag:

    # 1. Define the Source (Oracle)
    oracle_source = OracleSourceConnector(
        conn_id='oracleebs',
        sql="SELECT * FROM AR.AR_COLLECTORS",
        query_mode='delta',
        delta_column='LAST_UPDATE_DATE',
        xcom_key='ar_collectors_delta'
    )

    # 2. Define the Target (GCS)
    gcs_target = GCSTargetConnector(
        conn_id='gcs-bucket-project',
        table_name='AR_COLLECTORS',
        gcs_path='raw/OracleEBS'
    )

    # 3. Use the Orchestrator (ExtractionOperator)
    ExtractionOperator(
        task_id='select_ar_collectors',
        source=oracle_source,
        targets=[gcs_target]
    )
``` 

FYI, the OracleSourceConnector does retrieve its connection and can retrieve 
the data.

The Oracle Source Connector retrieves its connection using this parent class:

```python
from airflow.hooks.base import BaseHook
from airflow.exceptions import AirflowException

class BaseHook(BaseHook):
    """Base class for all internal hooks to standardize logging and error 
handling."""
    def __init__(self, conn_id: str, **kwargs):
        super().__init__(**kwargs)
        self.conn_id = conn_id

    def get_connection_metadata(self):
        """Standardized connection retrieval with clear error messaging."""
        try:
            return self.get_connection(self.conn_id)
        except Exception:
            self.log.error(f"Failed to find Airflow Connection: {self.conn_id}")
            raise AirflowException(f"Connection {self.conn_id} is missing or 
inaccessible.")
``` 

GitHub link: 
https://github.com/apache/airflow/discussions/61094#discussioncomment-15610209

----
This is an automatically sent email for [email protected].
To unsubscribe, please send an email to: [email protected]

Reply via email to