shahar1 commented on issue #60535:
URL: https://github.com/apache/airflow/issues/60535#issuecomment-3801383485
> Ok, some more insights. I was able to get it working.
>
> See gcs_target_connector.py, most of the changes were to moved the Hook
initialisation out of the **enter** method.
>
> from google.cloud import storage
> from airflow.exceptions import AirflowException
> from connectors.base_connector import TargetConnector
> from airflow.providers.google.cloud.hooks.gcs import GCSHook
> from airflow.providers.google.common.hooks.base_google import
GoogleBaseHook
>
> class GCSTargetConnector(TargetConnector):
> """
> Target connector for writing data to Google Cloud Storage (GCS).
> Handles file naming, bucket/project resolution, and client setup.
> """
> def __init__(self, conn_id="google_cloud_default", bucket_name=None,
> project_id=None, gcs_path=None, table_name=None,
file_prefix="stream"):
> """
> Initialize the GCS target connector.
> conn_id: Airflow connection ID for GCS.
> bucket_name: Name of the GCS bucket to write to.
> project_id: GCP project ID.
> gcs_path: Path in the bucket to write files.
> table_name: Optional table name for metadata.
> file_prefix: Prefix for output files.
> """
> super().__init__(conn_id)
> self.bucket_name = bucket_name
> self.project_id = project_id
> self.gcs_path = gcs_path
> self.table_name = table_name
> self.file_prefix = file_prefix
> self._part_num = 0
> self._hook = None
>
> @property
> def hook(self):
> if self._hook is None:
> # The Task SDK will resolve this only when first accessed
> self.log.info(f"Resolving connection: {self.connection_id}")
> self._hook = GoogleBaseHook(gcp_conn_id=self.connection_id)
> return self._hook
>
> def __enter__(self):
> # 1. Initialize the hook.
> # Note: 'gcp_conn_id' is the standard param for
GoogleCloudBaseHook/GoogleBaseHook
> # self.log.info("Before getting hook")
> # hook = GoogleBaseHook(gcp_conn_id=self.connection_id)
> # self.log.info("After getting hook")
> # # 2. Explicitly fetch credentials.
> # # This is where ADC is triggered.
> # credentials = hook.get_credentials()
> # self.log.info("After getting credentials")
>
> # # 3. Resolve Project ID.
> # # For ADC, 'Project Id' in the Airflow Connection UI is still
highly recommended.
> # self._effective_project_id = self.project_id or hook.project_id
>
> # if not self._effective_project_id:
> # raise AirflowException("GCP Project ID could not be
determined from Connection or ADC.")
>
> # self._effective_bucket_name = self.bucket_name or
self._effective_project_id
>
> # self.log.info("Before initializing client")
>
> # # 4. Initialize storage client
> # self.client = storage.Client(
> # project=self._effective_project_id,
> # credentials=credentials
> # )
> # self.log.info("After initializing client")
> self.log.info("Entering GCS context...")
> return self
>
> def save(self, log, data: bytes, context: dict):
> if not data:
> return None
>
> # Use the property 'self.hook' to ensure _hook is initialized
> hook_obj = self.hook
>
> # Initialize client once or check if it exists
> if self.client is None:
> self.log.info("Initializing GCS Storage Client for the first
batch...")
> credentials = hook_obj.get_credentials()
> self._effective_project_id = self.project_id or
hook_obj.project_id
>
> if not self._effective_project_id:
> raise AirflowException("GCP Project ID could not be
determined.")
>
> self._effective_bucket_name = self.bucket_name or
self._effective_project_id
> self.client = storage.Client(
> project=self._effective_project_id,
> credentials=credentials
> )
>
> logical_date = context['logical_date']
> formatted_date = logical_date.strftime("%Y%m%d%H%M%S%f")[:-3]
>
> file_name =
f"{self.gcs_path}/{self.table_name}/{self.file_prefix}_{self.table_name}_{formatted_date}_{self._part_num}.json"
>
> bucket = self.client.bucket(self._effective_bucket_name)
> blob = bucket.blob(file_name)
>
> log.info(f"Uploading to
gs://{self._effective_bucket_name}/{file_name}")
> blob.upload_from_string(data, content_type="application/jsonl")
>
> self._part_num += 1
> return f"gs://{self._effective_bucket_name}/{file_name}"
> According to Gemini:
>
> 1. Connection Retrieval: The "Thin Worker" Shift
> In Airflow 2, your worker was "thick"—it had a direct connection to the
metadata database. In Airflow 3, workers are "thin" and isolated for security
and scalability. They communicate with an Internal API through the Task SDK.
>
> The Eager Hook Problem: The GoogleBaseHook source code shows that it
attempts to fetch connection details immediately upon instantiation to populate
its "extras".
>
> Initialization Timing: When you instantiated this in **enter**, the Task
SDK hadn't yet fully "hydrated" the task environment. The worker tried to reach
the API before the proxy was ready, resulting in the AirflowNotFoundException.
>
> The Lazy Solution: By moving the hook to a
[@Property](https://github.com/Property), you deferred the connection lookup
until the save method was called. At that point, the task is in its "active"
phase, and the Task SDK is fully synchronized with the Internal API, allowing
the connection to be resolved instantly.
>
> Not sure if that makes any sense or if there's an issue with the task SDK.
It does make sense for me that it's related to an expected behavior of the
Task SDK in Airflow 3 - please read the AIP for more details.
As of Airflow 3, you cannot initialize connections directly in the DAG’s
code (as they don't have access to the meta database to fetch connection
details), and it is done only during their execution with the Task SDK
interface.
I'm converting this issue to a discussion.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]