jason810496 commented on code in PR #66610:
URL: https://github.com/apache/airflow/pull/66610#discussion_r3448184594


##########
airflow-core/src/airflow/api_fastapi/core_api/routes/ui/deadlines.py:
##########
@@ -201,3 +206,80 @@ def get_dag_deadline_alerts(
     alerts = session.scalars(alerts_select)
 
     return DeadlineAlertCollectionResponse(deadline_alerts=alerts, 
total_entries=total_entries)
+
+
+@deadlines_router.get(
+    "/dagRuns/{dag_run_id}/callbacks/{callback_id}/logs",
+    responses=create_openapi_http_exception_doc(
+        [
+            status.HTTP_404_NOT_FOUND,
+        ]
+    ),
+    dependencies=[
+        Depends(
+            requires_access_dag(
+                method="GET",
+                access_entity=DagAccessEntity.TASK_LOGS,
+            )
+        ),
+    ],
+    response_model=TaskInstancesLogResponse,
+    response_model_exclude_unset=True,
+)
+def get_callback_logs(
+    dag_id: str,
+    dag_run_id: str,
+    callback_id: UUID,
+    session: SessionDep,
+) -> TaskInstancesLogResponse:
+    """
+    Get execution logs for a callback associated with a deadline.
+
+    Returns the logs produced during callback execution. These logs are 
uploaded
+    to remote storage (or written locally) by the callback supervisor after 
execution.
+    """
+    # Sanitize path components to prevent path traversal via URL parameters.
+    for param_name, param_value in [("dag_id", dag_id), ("dag_run_id", 
dag_run_id)]:
+        if os.sep in param_value or "\\" in param_value or ".." in param_value:
+            raise HTTPException(
+                status.HTTP_400_BAD_REQUEST,
+                f"Invalid characters in {param_name}",
+            )
+
+    # Verify the callback exists
+    callback = session.scalar(select(Callback).where(Callback.id == 
callback_id))
+    if callback is None:
+        raise HTTPException(
+            status.HTTP_404_NOT_FOUND,
+            f"Callback with id `{callback_id}` was not found",
+        )
+
+    # Verify the dag_run exists with matching dag_id
+    dag_run = session.scalar(select(DagRun).where(DagRun.dag_id == dag_id, 
DagRun.run_id == dag_run_id))
+    if dag_run is None:
+        raise HTTPException(
+            status.HTTP_404_NOT_FOUND,
+            f"DagRun with dag_id: `{dag_id}` and run_id: `{dag_run_id}` was 
not found",
+        )
+
+    # Verify the callback actually belongs to this dag_run (via the Deadline 
relationship)
+    deadline = session.scalar(
+        select(Deadline).where(Deadline.callback_id == callback_id, 
Deadline.dagrun_id == dag_run.id)
+    )
+    if deadline is None:
+        raise HTTPException(
+            status.HTTP_404_NOT_FOUND,
+            f"Callback `{callback_id}` is not associated with DagRun 
`{dag_run_id}` of Dag `{dag_id}`",
+        )
+
+    try:
+        log_stream = read_callback_log(
+            dag_id=dag_id,
+            run_id=dag_run_id,
+            callback_id=str(callback_id),
+        )
+    except ValueError:
+        raise HTTPException(status.HTTP_400_BAD_REQUEST, "Invalid callback log 
path")
+
+    content = list(log_stream)
+    return TaskInstancesLogResponse.model_construct(content=content, 
continuation_token=None)

Review Comment:
   We should support JSONL "content type" header type that support streaming 
response like the Get TaskInstance logs route.
   
   The `list` copy and the pydantic model serialization will cause the OOM for 
API server when the log size increases.



##########
airflow-core/src/airflow/api_fastapi/core_api/routes/ui/deadlines.py:
##########
@@ -201,3 +206,80 @@ def get_dag_deadline_alerts(
     alerts = session.scalars(alerts_select)
 
     return DeadlineAlertCollectionResponse(deadline_alerts=alerts, 
total_entries=total_entries)
+
+
+@deadlines_router.get(
+    "/dagRuns/{dag_run_id}/callbacks/{callback_id}/logs",
+    responses=create_openapi_http_exception_doc(
+        [
+            status.HTTP_404_NOT_FOUND,
+        ]
+    ),
+    dependencies=[
+        Depends(
+            requires_access_dag(
+                method="GET",
+                access_entity=DagAccessEntity.TASK_LOGS,
+            )
+        ),
+    ],
+    response_model=TaskInstancesLogResponse,
+    response_model_exclude_unset=True,
+)
+def get_callback_logs(
+    dag_id: str,
+    dag_run_id: str,
+    callback_id: UUID,
+    session: SessionDep,
+) -> TaskInstancesLogResponse:
+    """
+    Get execution logs for a callback associated with a deadline.
+
+    Returns the logs produced during callback execution. These logs are 
uploaded
+    to remote storage (or written locally) by the callback supervisor after 
execution.
+    """
+    # Sanitize path components to prevent path traversal via URL parameters.
+    for param_name, param_value in [("dag_id", dag_id), ("dag_run_id", 
dag_run_id)]:
+        if os.sep in param_value or "\\" in param_value or ".." in param_value:
+            raise HTTPException(
+                status.HTTP_400_BAD_REQUEST,
+                f"Invalid characters in {param_name}",
+            )

Review Comment:
   Would it be better to move this to depends level so that all the injected 
parameters at route level are already valid.



##########
airflow-core/src/airflow/api_fastapi/core_api/routes/ui/deadlines.py:
##########
@@ -201,3 +206,80 @@ def get_dag_deadline_alerts(
     alerts = session.scalars(alerts_select)
 
     return DeadlineAlertCollectionResponse(deadline_alerts=alerts, 
total_entries=total_entries)
+
+
+@deadlines_router.get(
+    "/dagRuns/{dag_run_id}/callbacks/{callback_id}/logs",
+    responses=create_openapi_http_exception_doc(
+        [
+            status.HTTP_404_NOT_FOUND,
+        ]
+    ),
+    dependencies=[
+        Depends(
+            requires_access_dag(
+                method="GET",
+                access_entity=DagAccessEntity.TASK_LOGS,
+            )
+        ),
+    ],
+    response_model=TaskInstancesLogResponse,
+    response_model_exclude_unset=True,
+)
+def get_callback_logs(
+    dag_id: str,
+    dag_run_id: str,
+    callback_id: UUID,
+    session: SessionDep,
+) -> TaskInstancesLogResponse:
+    """
+    Get execution logs for a callback associated with a deadline.
+
+    Returns the logs produced during callback execution. These logs are 
uploaded
+    to remote storage (or written locally) by the callback supervisor after 
execution.
+    """
+    # Sanitize path components to prevent path traversal via URL parameters.
+    for param_name, param_value in [("dag_id", dag_id), ("dag_run_id", 
dag_run_id)]:
+        if os.sep in param_value or "\\" in param_value or ".." in param_value:
+            raise HTTPException(
+                status.HTTP_400_BAD_REQUEST,
+                f"Invalid characters in {param_name}",
+            )
+
+    # Verify the callback exists
+    callback = session.scalar(select(Callback).where(Callback.id == 
callback_id))
+    if callback is None:
+        raise HTTPException(
+            status.HTTP_404_NOT_FOUND,
+            f"Callback with id `{callback_id}` was not found",
+        )
+

Review Comment:
   The PR body claims "6/6 API unit tests pass (test_callback_logs.py)", but no 
such file is in the diff. 



##########
airflow-core/src/airflow/utils/log/callback_log_reader.py:
##########
@@ -0,0 +1,182 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""Reader for callback execution logs stored in remote or local storage."""
+
+from __future__ import annotations
+
+import logging
+import os
+from collections.abc import Generator
+from contextlib import suppress
+from pathlib import Path
+from typing import TYPE_CHECKING
+
+from airflow.configuration import conf
+from airflow.utils.log.file_task_handler import (
+    StructuredLogMessage,
+    _interleave_logs,
+    _stream_lines_by_chunk,
+)
+
+if TYPE_CHECKING:
+    from airflow._shared.logging.remote import LogSourceInfo, RawLogStream
+
+logger = logging.getLogger(__name__)
+
+
+def _get_callback_log_relative_paths(dag_id: str, run_id: str, callback_id: 
str) -> list[str]:
+    """
+    Construct the relative log paths for a callback execution.
+
+    Returns paths for both executor callbacks (sync) and triggerer callbacks 
(async).
+    The executor path matches the format used in ExecuteCallback.make():
+        executor_callbacks/{dag_id}/{run_id}/{callback_id}
+    The triggerer path matches what TriggerLoggingFactory writes for callback 
triggers:
+        triggerer_callbacks/{dag_id}/{run_id}/{callback_id}
+    """
+    return [
+        f"executor_callbacks/{dag_id}/{run_id}/{callback_id}",
+        f"triggerer_callbacks/{dag_id}/{run_id}/{callback_id}",
+    ]
+
+
+def read_callback_log(
+    dag_id: str,
+    run_id: str,
+    callback_id: str,
+) -> Generator[StructuredLogMessage, None, None]:
+    """
+    Read callback logs from remote and/or local storage.
+
+    Tries both executor_callbacks and triggerer_callbacks paths.
+    For each path, tries remote storage first (if configured), then falls back 
to local filesystem.
+    Returns a generator of StructuredLogMessage objects suitable for the API 
response.
+
+    :param dag_id: The Dag ID associated with the callback.
+    :param run_id: The Dag run ID associated with the callback.
+    :param callback_id: The unique callback identifier.
+    :return: Generator of StructuredLogMessage objects.
+    """
+    relative_paths = _get_callback_log_relative_paths(dag_id, run_id, 
callback_id)
+
+    sources: LogSourceInfo = []
+    remote_logs: list[RawLogStream] = []
+    local_logs: list[RawLogStream] = []
+
+    for relative_path in relative_paths:
+        # Try remote storage first
+        with suppress(Exception):
+            remote_sources, remote_log_streams = 
_read_callback_remote_logs(relative_path)
+            if remote_log_streams:
+                sources.extend(remote_sources)
+                remote_logs.extend(remote_log_streams)
+
+        # Try local filesystem
+        if not remote_logs:
+            local_sources, local_log_streams = 
_read_callback_local_logs(relative_path)
+            if local_log_streams:
+                sources.extend(local_sources)
+                local_logs.extend(local_log_streams)
+
+        # If we found logs at this path, no need to check the next path
+        if remote_logs or local_logs:
+            break
+
+    if not remote_logs and not local_logs:
+        yield StructuredLogMessage(event="No callback logs found.", 
timestamp=None)
+        return
+
+    # Emit source information header
+    yield StructuredLogMessage(event="::group::Log message source details", 
sources=sources)  # type: ignore[call-arg]
+    yield StructuredLogMessage(event="::endgroup::")
+
+    # Interleave and yield all log streams
+    log_stream = _interleave_logs(*remote_logs, *local_logs)
+    yield from log_stream
+
+
+def _read_callback_remote_logs(
+    relative_path: str,
+) -> tuple[list[str], list[RawLogStream]]:

Review Comment:
   There's an existing `StreamingLogResponse` type annotation alias for 
`tuple[list[str], list[RawLogStream]]`.
   
   ```suggestion
   def _read_callback_remote_logs(
       relative_path: str,
   )  -> StreamingLogResponse:
   ```
   
   
https://github.com/apache/airflow/blob/db9fa4289fe0d01e764b377c5c7f13d247af8218/airflow-core/src/airflow/utils/log/file_task_handler.py#L967-L968



##########
airflow-core/src/airflow/jobs/triggerer_job_runner.py:
##########
@@ -428,8 +428,35 @@ def upload_to_remote(self):
             # Never actually called, nothing to do
             return
 
+        if self.ti is None:
+            # Callback triggers have no task instance — upload using the path 
directly.
+            self._upload_callback_log_to_remote()
+            return
+
         upload_to_remote(self.bound_logger, self.ti)
 
+    def _upload_callback_log_to_remote(self):
+        """Upload callback trigger logs to remote storage without a task 
instance."""
+        from airflow.sdk.log import load_remote_log_handler, 
relative_path_from_logger

Review Comment:
   `sdk/log.upload_to_remote(logger, ti=None)` already defaults and accepts 
`ti=None`, and `RemoteLogIO.upload(self, path, ti=None)` 
(`_shared/logging/remote.py:59`) accepts it too. 
   
   `_upload_callback_log_to_remote`'s body is a near-verbatim copy of 
`upload_to_remote` minus a `try/except` warning — and the caller at 
`triggerer_job_runner.py:555-559` *already* wraps `factory.upload_to_remote()` 
in a try/except that logs. So both the `if self.ti is None:` dispatch and the 
whole helper can go; just call `upload_to_remote(self.bound_logger, self.ti)` 
unconditionally. That removes ~20 lines and the `# type: ignore[arg-type]`.



##########
airflow-core/src/airflow/api_fastapi/core_api/routes/ui/deadlines.py:
##########
@@ -201,3 +206,80 @@ def get_dag_deadline_alerts(
     alerts = session.scalars(alerts_select)
 
     return DeadlineAlertCollectionResponse(deadline_alerts=alerts, 
total_entries=total_entries)
+
+
+@deadlines_router.get(
+    "/dagRuns/{dag_run_id}/callbacks/{callback_id}/logs",
+    responses=create_openapi_http_exception_doc(
+        [
+            status.HTTP_404_NOT_FOUND,
+        ]
+    ),
+    dependencies=[
+        Depends(
+            requires_access_dag(
+                method="GET",
+                access_entity=DagAccessEntity.TASK_LOGS,
+            )
+        ),
+    ],
+    response_model=TaskInstancesLogResponse,
+    response_model_exclude_unset=True,
+)
+def get_callback_logs(
+    dag_id: str,
+    dag_run_id: str,
+    callback_id: UUID,
+    session: SessionDep,
+) -> TaskInstancesLogResponse:
+    """
+    Get execution logs for a callback associated with a deadline.
+
+    Returns the logs produced during callback execution. These logs are 
uploaded
+    to remote storage (or written locally) by the callback supervisor after 
execution.
+    """
+    # Sanitize path components to prevent path traversal via URL parameters.
+    for param_name, param_value in [("dag_id", dag_id), ("dag_run_id", 
dag_run_id)]:
+        if os.sep in param_value or "\\" in param_value or ".." in param_value:
+            raise HTTPException(
+                status.HTTP_400_BAD_REQUEST,
+                f"Invalid characters in {param_name}",
+            )
+
+    # Verify the callback exists
+    callback = session.scalar(select(Callback).where(Callback.id == 
callback_id))

Review Comment:
   Additionally, IIUC, for these select statement we just need to make sure 
these entities (DagRun, Deadline as well) are "existed". No need to select all 
the columns.



##########
airflow-core/src/airflow/api_fastapi/core_api/routes/ui/deadlines.py:
##########
@@ -201,3 +206,80 @@ def get_dag_deadline_alerts(
     alerts = session.scalars(alerts_select)
 
     return DeadlineAlertCollectionResponse(deadline_alerts=alerts, 
total_entries=total_entries)
+
+
+@deadlines_router.get(
+    "/dagRuns/{dag_run_id}/callbacks/{callback_id}/logs",
+    responses=create_openapi_http_exception_doc(
+        [
+            status.HTTP_404_NOT_FOUND,
+        ]
+    ),
+    dependencies=[
+        Depends(
+            requires_access_dag(
+                method="GET",
+                access_entity=DagAccessEntity.TASK_LOGS,
+            )
+        ),
+    ],
+    response_model=TaskInstancesLogResponse,
+    response_model_exclude_unset=True,
+)
+def get_callback_logs(
+    dag_id: str,
+    dag_run_id: str,
+    callback_id: UUID,
+    session: SessionDep,
+) -> TaskInstancesLogResponse:
+    """
+    Get execution logs for a callback associated with a deadline.
+
+    Returns the logs produced during callback execution. These logs are 
uploaded
+    to remote storage (or written locally) by the callback supervisor after 
execution.
+    """
+    # Sanitize path components to prevent path traversal via URL parameters.
+    for param_name, param_value in [("dag_id", dag_id), ("dag_run_id", 
dag_run_id)]:
+        if os.sep in param_value or "\\" in param_value or ".." in param_value:
+            raise HTTPException(
+                status.HTTP_400_BAD_REQUEST,
+                f"Invalid characters in {param_name}",
+            )
+
+    # Verify the callback exists
+    callback = session.scalar(select(Callback).where(Callback.id == 
callback_id))
+    if callback is None:
+        raise HTTPException(
+            status.HTTP_404_NOT_FOUND,
+            f"Callback with id `{callback_id}` was not found",
+        )
+
+    # Verify the dag_run exists with matching dag_id
+    dag_run = session.scalar(select(DagRun).where(DagRun.dag_id == dag_id, 
DagRun.run_id == dag_run_id))
+    if dag_run is None:
+        raise HTTPException(
+            status.HTTP_404_NOT_FOUND,
+            f"DagRun with dag_id: `{dag_id}` and run_id: `{dag_run_id}` was 
not found",
+        )
+
+    # Verify the callback actually belongs to this dag_run (via the Deadline 
relationship)
+    deadline = session.scalar(
+        select(Deadline).where(Deadline.callback_id == callback_id, 
Deadline.dagrun_id == dag_run.id)
+    )
+    if deadline is None:
+        raise HTTPException(
+            status.HTTP_404_NOT_FOUND,
+            f"Callback `{callback_id}` is not associated with DagRun 
`{dag_run_id}` of Dag `{dag_id}`",
+        )
+
+    try:
+        log_stream = read_callback_log(
+            dag_id=dag_id,
+            run_id=dag_run_id,
+            callback_id=str(callback_id),
+        )
+    except ValueError:
+        raise HTTPException(status.HTTP_400_BAD_REQUEST, "Invalid callback log 
path")

Review Comment:
   The `read_callback_log` is a generator, so the `ValueError` exception here 
is a dead code that will never fired.



##########
airflow-core/src/airflow/utils/log/callback_log_reader.py:
##########
@@ -0,0 +1,182 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""Reader for callback execution logs stored in remote or local storage."""
+
+from __future__ import annotations
+
+import logging
+import os
+from collections.abc import Generator
+from contextlib import suppress
+from pathlib import Path
+from typing import TYPE_CHECKING
+
+from airflow.configuration import conf
+from airflow.utils.log.file_task_handler import (
+    StructuredLogMessage,
+    _interleave_logs,
+    _stream_lines_by_chunk,
+)
+
+if TYPE_CHECKING:
+    from airflow._shared.logging.remote import LogSourceInfo, RawLogStream
+
+logger = logging.getLogger(__name__)
+
+
+def _get_callback_log_relative_paths(dag_id: str, run_id: str, callback_id: 
str) -> list[str]:
+    """
+    Construct the relative log paths for a callback execution.
+
+    Returns paths for both executor callbacks (sync) and triggerer callbacks 
(async).
+    The executor path matches the format used in ExecuteCallback.make():
+        executor_callbacks/{dag_id}/{run_id}/{callback_id}
+    The triggerer path matches what TriggerLoggingFactory writes for callback 
triggers:
+        triggerer_callbacks/{dag_id}/{run_id}/{callback_id}
+    """
+    return [
+        f"executor_callbacks/{dag_id}/{run_id}/{callback_id}",
+        f"triggerer_callbacks/{dag_id}/{run_id}/{callback_id}",
+    ]
+
+
+def read_callback_log(
+    dag_id: str,
+    run_id: str,
+    callback_id: str,
+) -> Generator[StructuredLogMessage, None, None]:
+    """
+    Read callback logs from remote and/or local storage.
+
+    Tries both executor_callbacks and triggerer_callbacks paths.
+    For each path, tries remote storage first (if configured), then falls back 
to local filesystem.
+    Returns a generator of StructuredLogMessage objects suitable for the API 
response.
+
+    :param dag_id: The Dag ID associated with the callback.
+    :param run_id: The Dag run ID associated with the callback.
+    :param callback_id: The unique callback identifier.
+    :return: Generator of StructuredLogMessage objects.
+    """
+    relative_paths = _get_callback_log_relative_paths(dag_id, run_id, 
callback_id)
+
+    sources: LogSourceInfo = []
+    remote_logs: list[RawLogStream] = []
+    local_logs: list[RawLogStream] = []
+
+    for relative_path in relative_paths:
+        # Try remote storage first
+        with suppress(Exception):
+            remote_sources, remote_log_streams = 
_read_callback_remote_logs(relative_path)
+            if remote_log_streams:
+                sources.extend(remote_sources)
+                remote_logs.extend(remote_log_streams)
+
+        # Try local filesystem
+        if not remote_logs:
+            local_sources, local_log_streams = 
_read_callback_local_logs(relative_path)
+            if local_log_streams:
+                sources.extend(local_sources)
+                local_logs.extend(local_log_streams)
+
+        # If we found logs at this path, no need to check the next path
+        if remote_logs or local_logs:
+            break
+
+    if not remote_logs and not local_logs:
+        yield StructuredLogMessage(event="No callback logs found.", 
timestamp=None)
+        return
+
+    # Emit source information header
+    yield StructuredLogMessage(event="::group::Log message source details", 
sources=sources)  # type: ignore[call-arg]
+    yield StructuredLogMessage(event="::endgroup::")
+
+    # Interleave and yield all log streams
+    log_stream = _interleave_logs(*remote_logs, *local_logs)
+    yield from log_stream
+
+
+def _read_callback_remote_logs(
+    relative_path: str,
+) -> tuple[list[str], list[RawLogStream]]:
+    """Read callback logs from the configured remote log storage."""
+    from airflow.logging_config import get_remote_task_log
+
+    remote_io = get_remote_task_log()
+    if remote_io is None:
+        return [], []
+
+    # RemoteLogIO.read() takes (relative_path, ti) -- for S3 the ti is not 
used,
+    # for CloudWatch it uses ti.end_date (with getattr fallback to None).
+    # We pass None since callbacks don't have a TaskInstance.
+    if stream_method := getattr(remote_io, "stream", None):
+        sources, logs = stream_method(relative_path, None)
+        return sources, logs or []
+
+    sources, logs = remote_io.read(relative_path, None)  # type: 
ignore[arg-type]
+    if not logs:
+        return sources, []
+
+    # Convert legacy string logs to stream format
+    from airflow.utils.log.file_task_handler import _get_compatible_log_stream
+
+    return sources, [_get_compatible_log_stream(logs)]
+
+
+def _validate_path_component(component: str) -> str:
+    """Validate and return a path component, raising ValueError if unsafe."""
+    import re
+
+    if component in (".", "..") or not re.fullmatch(r"[A-Za-z0-9._:+\-~@]+", 
component):
+        raise ValueError(f"Invalid path component: {component!r}")
+    return component
+
+
+def _read_callback_local_logs(
+    relative_path: str,
+) -> tuple[list[str], list[RawLogStream]]:

Review Comment:
   Ditto.



##########
airflow-core/src/airflow/api_fastapi/core_api/datamodels/ui/deadline.py:
##########
@@ -37,6 +37,8 @@ class DeadlineResponse(BaseModel):
     dag_run_id: str = Field(validation_alias=AliasPath("dagrun", "run_id"))
     alert_id: UUID | None = Field(validation_alias="deadline_alert_id", 
default=None)
     alert_name: str | None = 
Field(validation_alias=AliasPath("deadline_alert", "name"), default=None)
+    callback_id: UUID | None = Field(validation_alias="callback_id", 
default=None)
+    callback_state: str | None = Field(validation_alias=AliasPath("callback", 
"state"), default=None)

Review Comment:
   Should we annotate as `CallbackState` instead of `str`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to