Copilot commented on code in PR #64960:
URL: https://github.com/apache/airflow/pull/64960#discussion_r3066475885
##########
providers/databricks/tests/unit/databricks/triggers/test_databricks.py:
##########
@@ -118,6 +122,32 @@
],
}
+TRIGGER_INIT_CASES = [
+ pytest.param(
+ DatabricksExecutionTrigger,
+ {
+ "run_id": RUN_ID,
+ "databricks_conn_id": DEFAULT_CONN_ID,
+ },
+ id="execution_trigger",
+ ),
+ pytest.param(
+ DatabricksSQLStatementExecutionTrigger,
+ {
+ "statement_id": STATEMENT_ID,
+ "databricks_conn_id": DEFAULT_CONN_ID,
+ "end_time": time.time() + 60,
Review Comment:
Using `time.time()` in test data makes the case slightly non-deterministic
and harder to reason about, especially since `end_time` is not relevant to the
behavior under test (retry args validation). Consider replacing it with a fixed
constant (e.g., `end_time=1234567890.0`) to keep the test fully deterministic.
```suggestion
"end_time": 1234567890.0,
```
##########
providers/databricks/src/airflow/providers/databricks/utils/retry.py:
##########
@@ -0,0 +1,35 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+from typing import Any
+
+from airflow.sdk.serde import serialize as serde_serialize
+
+
+def validate_deferrable_databricks_retry_args(retry_args: dict[Any, Any] |
None, *, owner: str) -> None:
+ """Validate retry args that need to cross the trigger serialization
boundary."""
+ if retry_args is None:
+ return
+
+ try:
+ serde_serialize(retry_args)
+ except (AttributeError, RecursionError, TypeError) as err:
+ raise ValueError(
+ f"{owner} does not support non-serializable databricks_retry_args
when deferrable=True. "
+ "Use JSON-serializable values, remove callable retry strategies,
or disable deferrable mode."
+ ) from err
Review Comment:
`airflow.sdk.serde.serialize` can also fail with `ValueError`
(implementation-dependent), which currently bypasses the intended “clear
ValueError” message and may surface a less actionable error. Consider including
`ValueError` in the caught exceptions (or catching a broader serde-specific
base exception if available) and re-raising with the standardized message to
ensure consistent fail-fast behavior.
##########
providers/databricks/src/airflow/providers/databricks/utils/retry.py:
##########
@@ -0,0 +1,35 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+from typing import Any
+
+from airflow.sdk.serde import serialize as serde_serialize
+
+
+def validate_deferrable_databricks_retry_args(retry_args: dict[Any, Any] |
None, *, owner: str) -> None:
Review Comment:
The signature `dict[Any, Any]` suggests arbitrary key types are acceptable,
but the function’s intent is “JSON/serde-serializable” retry configuration,
which typically implies string keys (JSON object keys). Consider narrowing the
type to `Mapping[str, Any] | None` (or `dict[str, Any] | None`) to better
document the expected API contract and help callers catch issues earlier via
typing.
```suggestion
from typing import Any, Mapping
from airflow.sdk.serde import serialize as serde_serialize
def validate_deferrable_databricks_retry_args(
retry_args: Mapping[str, Any] | None, *, owner: str
) -> None:
```
##########
providers/databricks/src/airflow/providers/databricks/utils/retry.py:
##########
@@ -0,0 +1,35 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+from typing import Any
+
+from airflow.sdk.serde import serialize as serde_serialize
+
+
+def validate_deferrable_databricks_retry_args(retry_args: dict[Any, Any] |
None, *, owner: str) -> None:
+ """Validate retry args that need to cross the trigger serialization
boundary."""
+ if retry_args is None:
+ return
+
+ try:
+ serde_serialize(retry_args)
+ except (AttributeError, RecursionError, TypeError) as err:
+ raise ValueError(
+ f"{owner} does not support non-serializable databricks_retry_args
when deferrable=True. "
+ "Use JSON-serializable values, remove callable retry strategies,
or disable deferrable mode."
+ ) from err
Review Comment:
The validation is invoked for trigger constructor argument `retry_args`, but
the error message only references `databricks_retry_args`, which can be
confusing when failures occur in trigger init paths. Consider updating the
message to mention both (`retry_args` / `databricks_retry_args`) or accepting a
`param_name` argument so the error can accurately name the failing parameter
depending on call site.
##########
providers/databricks/src/airflow/providers/databricks/triggers/databricks.py:
##########
@@ -53,6 +54,7 @@ def __init__(
caller: str = "DatabricksExecutionTrigger",
) -> None:
super().__init__()
+ validate_deferrable_databricks_retry_args(retry_args,
owner=self.__class__.__name__)
Review Comment:
The trigger constructor already accepts a `caller` argument (likely used to
report the originating component), but the new validation uses
`self.__class__.__name__` for `owner`. If `caller` is meant to carry more
precise context (e.g., operator vs trigger), consider passing `owner=caller` to
keep error attribution consistent with existing patterns.
```suggestion
validate_deferrable_databricks_retry_args(retry_args, owner=caller)
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]