Steexyz commented on issue #60535:
URL: https://github.com/apache/airflow/issues/60535#issuecomment-3800779814

   Ok, some more insights. I was able to get it working.
   
   See gcs_target_connector.py, most of the changes were to moved the Hook 
initialisation out of the __enter__ method.
   
   ```python
   from google.cloud import storage
   from airflow.exceptions import AirflowException
   from connectors.base_connector import TargetConnector
   from airflow.providers.google.cloud.hooks.gcs import GCSHook
   from airflow.providers.google.common.hooks.base_google import GoogleBaseHook
   
   class GCSTargetConnector(TargetConnector):
       """
       Target connector for writing data to Google Cloud Storage (GCS).
       Handles file naming, bucket/project resolution, and client setup.
       """
       def __init__(self, conn_id="google_cloud_default", bucket_name=None, 
                    project_id=None, gcs_path=None, table_name=None, 
file_prefix="stream"):
           """
           Initialize the GCS target connector.
           conn_id: Airflow connection ID for GCS.
           bucket_name: Name of the GCS bucket to write to.
           project_id: GCP project ID.
           gcs_path: Path in the bucket to write files.
           table_name: Optional table name for metadata.
           file_prefix: Prefix for output files.
           """
           super().__init__(conn_id)
           self.bucket_name = bucket_name
           self.project_id = project_id
           self.gcs_path = gcs_path
           self.table_name = table_name
           self.file_prefix = file_prefix
           self._part_num = 0
           self._hook = None
       
       @property
       def hook(self):
           if self._hook is None:
               # The Task SDK will resolve this only when first accessed
               self.log.info(f"Resolving connection: {self.connection_id}")
               self._hook = GoogleBaseHook(gcp_conn_id=self.connection_id)
           return self._hook
   
       def __enter__(self):
           # 1. Initialize the hook. 
           # Note: 'gcp_conn_id' is the standard param for 
GoogleCloudBaseHook/GoogleBaseHook
           # self.log.info("Before getting hook")
           # hook = GoogleBaseHook(gcp_conn_id=self.connection_id)
           # self.log.info("After getting hook")
           # # 2. Explicitly fetch credentials. 
           # # This is where ADC is triggered.
           # credentials = hook.get_credentials()
           # self.log.info("After getting credentials")
           
           # # 3. Resolve Project ID. 
           # # For ADC, 'Project Id' in the Airflow Connection UI is still 
highly recommended.
           # self._effective_project_id = self.project_id or hook.project_id
           
           # if not self._effective_project_id:
           #     raise AirflowException("GCP Project ID could not be determined 
from Connection or ADC.")
   
           # self._effective_bucket_name = self.bucket_name or 
self._effective_project_id
   
           # self.log.info("Before initializing client")
           
           # # 4. Initialize storage client
           # self.client = storage.Client(
           #     project=self._effective_project_id,
           #     credentials=credentials
           # )
           # self.log.info("After initializing client")
           self.log.info("Entering GCS context...")
           return self
   
       def save(self, log, data: bytes, context: dict): 
           if not data:
               return None
           
           # Use the property 'self.hook' to ensure _hook is initialized
           hook_obj = self.hook 
           
           # Initialize client once or check if it exists
           if self.client is None:
               self.log.info("Initializing GCS Storage Client for the first 
batch...")
               credentials = hook_obj.get_credentials()
               self._effective_project_id = self.project_id or 
hook_obj.project_id
               
               if not self._effective_project_id:
                   raise AirflowException("GCP Project ID could not be 
determined.")
               
               self._effective_bucket_name = self.bucket_name or 
self._effective_project_id
               self.client = storage.Client(
                    project=self._effective_project_id,
                    credentials=credentials
               )
   
           logical_date = context['logical_date']
           formatted_date = logical_date.strftime("%Y%m%d%H%M%S%f")[:-3]
           
           file_name = 
f"{self.gcs_path}/{self.table_name}/{self.file_prefix}_{self.table_name}_{formatted_date}_{self._part_num}.json"
           
           bucket = self.client.bucket(self._effective_bucket_name)
           blob = bucket.blob(file_name)
           
           log.info(f"Uploading to 
gs://{self._effective_bucket_name}/{file_name}")
           blob.upload_from_string(data, content_type="application/jsonl")
           
           self._part_num += 1
           return f"gs://{self._effective_bucket_name}/{file_name}"
   ``` 
   
   According to Gemini:
   
   ---------------
   
   1. Connection Retrieval: The "Thin Worker" Shift
   In Airflow 2, your worker was "thick"—it had a direct connection to the 
metadata database. In Airflow 3, workers are "thin" and isolated for security 
and scalability. They communicate with an Internal API through the Task SDK.
   
   The Eager Hook Problem: The GoogleBaseHook source code shows that it 
attempts to fetch connection details immediately upon instantiation to populate 
its "extras".
   
   Initialization Timing: When you instantiated this in __enter__, the Task SDK 
hadn't yet fully "hydrated" the task environment. The worker tried to reach the 
API before the proxy was ready, resulting in the AirflowNotFoundException.
   
   The Lazy Solution: By moving the hook to a @property, you deferred the 
connection lookup until the save method was called. At that point, the task is 
in its "active" phase, and the Task SDK is fully synchronized with the Internal 
API, allowing the connection to be resolved instantly.
   
   ---------------
   
   Not sure if that makes any sense or if there's an issue with the task SDK.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to