Steexyz commented on issue #60535:
URL: https://github.com/apache/airflow/issues/60535#issuecomment-3800779814
Ok, some more insights. I was able to get it working.
See gcs_target_connector.py, most of the changes were to moved the Hook
initialisation out of the __enter__ method.
```python
from google.cloud import storage
from airflow.exceptions import AirflowException
from connectors.base_connector import TargetConnector
from airflow.providers.google.cloud.hooks.gcs import GCSHook
from airflow.providers.google.common.hooks.base_google import GoogleBaseHook
class GCSTargetConnector(TargetConnector):
"""
Target connector for writing data to Google Cloud Storage (GCS).
Handles file naming, bucket/project resolution, and client setup.
"""
def __init__(self, conn_id="google_cloud_default", bucket_name=None,
project_id=None, gcs_path=None, table_name=None,
file_prefix="stream"):
"""
Initialize the GCS target connector.
conn_id: Airflow connection ID for GCS.
bucket_name: Name of the GCS bucket to write to.
project_id: GCP project ID.
gcs_path: Path in the bucket to write files.
table_name: Optional table name for metadata.
file_prefix: Prefix for output files.
"""
super().__init__(conn_id)
self.bucket_name = bucket_name
self.project_id = project_id
self.gcs_path = gcs_path
self.table_name = table_name
self.file_prefix = file_prefix
self._part_num = 0
self._hook = None
@property
def hook(self):
if self._hook is None:
# The Task SDK will resolve this only when first accessed
self.log.info(f"Resolving connection: {self.connection_id}")
self._hook = GoogleBaseHook(gcp_conn_id=self.connection_id)
return self._hook
def __enter__(self):
# 1. Initialize the hook.
# Note: 'gcp_conn_id' is the standard param for
GoogleCloudBaseHook/GoogleBaseHook
# self.log.info("Before getting hook")
# hook = GoogleBaseHook(gcp_conn_id=self.connection_id)
# self.log.info("After getting hook")
# # 2. Explicitly fetch credentials.
# # This is where ADC is triggered.
# credentials = hook.get_credentials()
# self.log.info("After getting credentials")
# # 3. Resolve Project ID.
# # For ADC, 'Project Id' in the Airflow Connection UI is still
highly recommended.
# self._effective_project_id = self.project_id or hook.project_id
# if not self._effective_project_id:
# raise AirflowException("GCP Project ID could not be determined
from Connection or ADC.")
# self._effective_bucket_name = self.bucket_name or
self._effective_project_id
# self.log.info("Before initializing client")
# # 4. Initialize storage client
# self.client = storage.Client(
# project=self._effective_project_id,
# credentials=credentials
# )
# self.log.info("After initializing client")
self.log.info("Entering GCS context...")
return self
def save(self, log, data: bytes, context: dict):
if not data:
return None
# Use the property 'self.hook' to ensure _hook is initialized
hook_obj = self.hook
# Initialize client once or check if it exists
if self.client is None:
self.log.info("Initializing GCS Storage Client for the first
batch...")
credentials = hook_obj.get_credentials()
self._effective_project_id = self.project_id or
hook_obj.project_id
if not self._effective_project_id:
raise AirflowException("GCP Project ID could not be
determined.")
self._effective_bucket_name = self.bucket_name or
self._effective_project_id
self.client = storage.Client(
project=self._effective_project_id,
credentials=credentials
)
logical_date = context['logical_date']
formatted_date = logical_date.strftime("%Y%m%d%H%M%S%f")[:-3]
file_name =
f"{self.gcs_path}/{self.table_name}/{self.file_prefix}_{self.table_name}_{formatted_date}_{self._part_num}.json"
bucket = self.client.bucket(self._effective_bucket_name)
blob = bucket.blob(file_name)
log.info(f"Uploading to
gs://{self._effective_bucket_name}/{file_name}")
blob.upload_from_string(data, content_type="application/jsonl")
self._part_num += 1
return f"gs://{self._effective_bucket_name}/{file_name}"
```
According to Gemini:
---------------
1. Connection Retrieval: The "Thin Worker" Shift
In Airflow 2, your worker was "thick"—it had a direct connection to the
metadata database. In Airflow 3, workers are "thin" and isolated for security
and scalability. They communicate with an Internal API through the Task SDK.
The Eager Hook Problem: The GoogleBaseHook source code shows that it
attempts to fetch connection details immediately upon instantiation to populate
its "extras".
Initialization Timing: When you instantiated this in __enter__, the Task SDK
hadn't yet fully "hydrated" the task environment. The worker tried to reach the
API before the proxy was ready, resulting in the AirflowNotFoundException.
The Lazy Solution: By moving the hook to a @property, you deferred the
connection lookup until the save method was called. At that point, the task is
in its "active" phase, and the Task SDK is fully synchronized with the Internal
API, allowing the connection to be resolved instantly.
---------------
Not sure if that makes any sense or if there's an issue with the task SDK.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]