GitHub user shahar1 added a comment to the discussion: The conn_id `xxxxxxx`
isn't defined Google Cloud Platform
> Ok, some more insights. I was able to get it working.
>
> See gcs_target_connector.py, most of the changes were to moved the Hook
> initialisation out of the **enter** method.
>
> from google.cloud import storage
> from airflow.exceptions import AirflowException
> from connectors.base_connector import TargetConnector
> from airflow.providers.google.cloud.hooks.gcs import GCSHook
> from airflow.providers.google.common.hooks.base_google import GoogleBaseHook
>
> class GCSTargetConnector(TargetConnector):
> """
> Target connector for writing data to Google Cloud Storage (GCS).
> Handles file naming, bucket/project resolution, and client setup.
> """
> def __init__(self, conn_id="google_cloud_default", bucket_name=None,
> project_id=None, gcs_path=None, table_name=None,
> file_prefix="stream"):
> """
> Initialize the GCS target connector.
> conn_id: Airflow connection ID for GCS.
> bucket_name: Name of the GCS bucket to write to.
> project_id: GCP project ID.
> gcs_path: Path in the bucket to write files.
> table_name: Optional table name for metadata.
> file_prefix: Prefix for output files.
> """
> super().__init__(conn_id)
> self.bucket_name = bucket_name
> self.project_id = project_id
> self.gcs_path = gcs_path
> self.table_name = table_name
> self.file_prefix = file_prefix
> self._part_num = 0
> self._hook = None
>
> @property
> def hook(self):
> if self._hook is None:
> # The Task SDK will resolve this only when first accessed
> self.log.info(f"Resolving connection: {self.connection_id}")
> self._hook = GoogleBaseHook(gcp_conn_id=self.connection_id)
> return self._hook
>
> def __enter__(self):
> # 1. Initialize the hook.
> # Note: 'gcp_conn_id' is the standard param for
> GoogleCloudBaseHook/GoogleBaseHook
> # self.log.info("Before getting hook")
> # hook = GoogleBaseHook(gcp_conn_id=self.connection_id)
> # self.log.info("After getting hook")
> # # 2. Explicitly fetch credentials.
> # # This is where ADC is triggered.
> # credentials = hook.get_credentials()
> # self.log.info("After getting credentials")
>
> # # 3. Resolve Project ID.
> # # For ADC, 'Project Id' in the Airflow Connection UI is still
> highly recommended.
> # self._effective_project_id = self.project_id or hook.project_id
>
> # if not self._effective_project_id:
> # raise AirflowException("GCP Project ID could not be determined
> from Connection or ADC.")
>
> # self._effective_bucket_name = self.bucket_name or
> self._effective_project_id
>
> # self.log.info("Before initializing client")
>
> # # 4. Initialize storage client
> # self.client = storage.Client(
> # project=self._effective_project_id,
> # credentials=credentials
> # )
> # self.log.info("After initializing client")
> self.log.info("Entering GCS context...")
> return self
>
> def save(self, log, data: bytes, context: dict):
> if not data:
> return None
>
> # Use the property 'self.hook' to ensure _hook is initialized
> hook_obj = self.hook
>
> # Initialize client once or check if it exists
> if self.client is None:
> self.log.info("Initializing GCS Storage Client for the first
> batch...")
> credentials = hook_obj.get_credentials()
> self._effective_project_id = self.project_id or
> hook_obj.project_id
>
> if not self._effective_project_id:
> raise AirflowException("GCP Project ID could not be
> determined.")
>
> self._effective_bucket_name = self.bucket_name or
> self._effective_project_id
> self.client = storage.Client(
> project=self._effective_project_id,
> credentials=credentials
> )
>
> logical_date = context['logical_date']
> formatted_date = logical_date.strftime("%Y%m%d%H%M%S%f")[:-3]
>
> file_name =
> f"{self.gcs_path}/{self.table_name}/{self.file_prefix}_{self.table_name}_{formatted_date}_{self._part_num}.json"
>
> bucket = self.client.bucket(self._effective_bucket_name)
> blob = bucket.blob(file_name)
>
> log.info(f"Uploading to
> gs://{self._effective_bucket_name}/{file_name}")
> blob.upload_from_string(data, content_type="application/jsonl")
>
> self._part_num += 1
> return f"gs://{self._effective_bucket_name}/{file_name}"
> According to Gemini:
>
> 1. Connection Retrieval: The "Thin Worker" Shift
> In Airflow 2, your worker was "thick"—it had a direct connection to the
> metadata database. In Airflow 3, workers are "thin" and isolated for security
> and scalability. They communicate with an Internal API through the Task SDK.
>
> The Eager Hook Problem: The GoogleBaseHook source code shows that it attempts
> to fetch connection details immediately upon instantiation to populate its
> "extras".
>
> Initialization Timing: When you instantiated this in **enter**, the Task SDK
> hadn't yet fully "hydrated" the task environment. The worker tried to reach
> the API before the proxy was ready, resulting in the AirflowNotFoundException.
>
> The Lazy Solution: By moving the hook to a
> [@Property](https://github.com/Property), you deferred the connection lookup
> until the save method was called. At that point, the task is in its "active"
> phase, and the Task SDK is fully synchronized with the Internal API, allowing
> the connection to be resolved instantly.
>
> Not sure if that makes any sense or if there's an issue with the task SDK.
It does make sense for me that it's related to an expected behavior of the Task
SDK in Airflow 3 - please read the AIP for more details.
As of Airflow 3, you cannot initialize connections directly in the DAG’s code
(as they don't have access to the meta database to fetch connection details),
and it is done only during their execution with the Task SDK interface.
I'm converting this issue to a discussion.
GitHub link:
https://github.com/apache/airflow/discussions/61094#discussioncomment-15610211
----
This is an automatically sent email for [email protected].
To unsubscribe, please send an email to: [email protected]