shahar1 commented on code in PR #65198:
URL: https://github.com/apache/airflow/pull/65198#discussion_r3223550467
##########
providers/google/src/airflow/providers/google/cloud/log/stackdriver_task_handler.py:
##########
@@ -56,6 +68,204 @@
["https://www.googleapis.com/auth/logging.read",
"https://www.googleapis.com/auth/logging.write"]
)
+LABEL_TASK_ID = "task_id"
+LABEL_DAG_ID = "dag_id"
+LABEL_LOGICAL_DATE = "logical_date" if AIRFLOW_V_3_0_PLUS else "execution_date"
+LABEL_TRY_NUMBER = "try_number"
+
+
[email protected](kw_only=True)
+class StackdriverRemoteLogIO(LoggingMixin):
+ """Remote log IO that streams logs to and reads from Google Cloud
Stackdriver Logging."""
+
+ base_log_folder: Path = attrs.field(converter=Path)
+ delete_local_copy: bool = True
+
+ gcp_key_path: str | None = None
+ scopes: Collection[str] | None = _DEFAULT_SCOPESS
+ gcp_log_name: str = DEFAULT_LOGGER_NAME
+ transport_type: type[Transport] = BackgroundThreadTransport
+ resource: Resource = _GLOBAL_RESOURCE
+ labels: dict[str, str] | None = None
+
+ @cached_property
+ def credentials_and_project(self) -> tuple[Credentials, str]:
+ credentials, project = get_credentials_and_project_id(
+ key_path=self.gcp_key_path, scopes=self.scopes,
disable_logging=True
+ )
+ return credentials, project
+
+ @cached_property
+ def _client(self) -> gcp_logging.Client:
+ """The Cloud Library API client."""
+ credentials, project = self.credentials_and_project
+ return gcp_logging.Client(
+ credentials=credentials,
+ project=project,
+ client_info=CLIENT_INFO,
+ )
+
+ @cached_property
+ def _logging_service_client(self) -> LoggingServiceV2Client:
+ """The Cloud logging service v2 client."""
+ credentials, _ = self.credentials_and_project
+ return LoggingServiceV2Client(
+ credentials=credentials,
+ client_info=CLIENT_INFO,
+ )
+
+ @cached_property
+ def transport(self) -> Transport:
+ """Object responsible for sending data to Stackdriver."""
+ return self.transport_type(self._client, self.gcp_log_name)
+
+ @cached_property
+ def processors(self) -> tuple[structlog.typing.Processor, ...]:
+ import structlog.stdlib
+
+ from airflow.sdk.log import relative_path_from_logger
+
+ log_record_factory = getLogRecordFactory()
+ _transport = self.transport
+
+ def proc(
+ logger: structlog.typing.WrappedLogger,
+ method_name: str,
+ event: structlog.typing.EventDict,
+ ):
+ if not logger or not relative_path_from_logger(logger):
+ return event
+
+ name = event.get("logger_name") or event.get("logger", "")
+ level = structlog.stdlib.NAME_TO_LEVEL.get(method_name.lower(),
logging.INFO)
+ msg = copy.copy(event)
+ created = None
+ if ts := msg.pop("timestamp", None):
+ with contextlib.suppress(Exception):
+ created = datetime.fromisoformat(ts)
+ record = log_record_factory(
+ name,
+ level,
+ pathname="",
+ lineno=0,
+ msg=msg,
+ args=(),
+ exc_info=None,
+ func=None,
+ sinfo=None,
+ )
+ if created is not None:
+ ct = created.timestamp()
+ record.created = ct
+ record.msecs = int((ct - int(ct)) * 1000) + 0.0
+ _transport.send(
+ record, str(msg.get("event", "")), resource=self.resource,
labels=self.labels or {}
+ )
Review Comment:
Bug: Here you send only the static labels (without TI-derived ones), but in
lines 188-193 you filter by task_id/dag_id/try_number - so reads won't return
anything.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]