Copilot commented on code in PR #58329:
URL: https://github.com/apache/airflow/pull/58329#discussion_r2528460863


##########
task-sdk/src/airflow/sdk/io/path.py:
##########
@@ -408,15 +411,75 @@ def serialize(self) -> dict[str, Any]:
     def deserialize(cls, data: dict, version: int) -> ObjectStoragePath:
         if version > cls.__version__:
             raise ValueError(f"Cannot deserialize version {version} with 
version {cls.__version__}.")
-
         _kwargs = data.pop("kwargs")
         path = data.pop("path")
         conn_id = data.pop("conn_id", None)
-
         return ObjectStoragePath(path, conn_id=conn_id, **_kwargs)
 
     def __str__(self):
         conn_id = self.storage_options.get("conn_id")
         if self._protocol and conn_id:
             return f"{self._protocol}://{conn_id}@{self.path}"
         return super().__str__()
+
+    @classmethod
+    def __get_pydantic_core_schema__(
+        cls,
+        _source_type: Any,
+        _handler: GetCoreSchemaHandler,
+    ) -> core_schema.CoreSchema:

Review Comment:
   The `__get_pydantic_core_schema__` method lacks a docstring explaining its 
purpose, parameters, and the validation logic it implements. This is a public 
API method that integrates with Pydantic and should be documented for 
maintainability and clarity.
   
   Consider adding a docstring:
   ```python
   @classmethod
   def __get_pydantic_core_schema__(
       cls,
       _source_type: Any,
       _handler: GetCoreSchemaHandler,
   ) -> core_schema.CoreSchema:
       """
       Generate the Pydantic core schema for ObjectStoragePath validation and 
serialization.
       
       This method enables ObjectStoragePath to be used as a Pydantic field 
type with support for:
       - Validation from serialized dict representation
       - Validation from string paths
       - Validation of existing ObjectStoragePath instances
       - Serialization to dict format
       
       :param _source_type: The source type being validated (unused)
       :param _handler: Pydantic's schema handler (unused)
       :return: A CoreSchema defining validation and serialization behavior
       """
   ```
   ```suggestion
       ) -> core_schema.CoreSchema:
           """
           Generate the Pydantic core schema for ObjectStoragePath validation 
and serialization.
   
           This method enables ObjectStoragePath to be used as a Pydantic field 
type with support for:
           - Validation from serialized dict representation
           - Validation from string paths
           - Validation of existing ObjectStoragePath instances
           - Serialization to dict format
   
           :param _source_type: The source type being validated (unused)
           :param _handler: Pydantic's schema handler (unused)
           :return: A CoreSchema defining validation and serialization behavior
           """
   ```



##########
task-sdk/src/airflow/sdk/io/path.py:
##########
@@ -408,15 +411,75 @@ def serialize(self) -> dict[str, Any]:
     def deserialize(cls, data: dict, version: int) -> ObjectStoragePath:
         if version > cls.__version__:
             raise ValueError(f"Cannot deserialize version {version} with 
version {cls.__version__}.")
-
         _kwargs = data.pop("kwargs")
         path = data.pop("path")
         conn_id = data.pop("conn_id", None)
-
         return ObjectStoragePath(path, conn_id=conn_id, **_kwargs)
 
     def __str__(self):
         conn_id = self.storage_options.get("conn_id")
         if self._protocol and conn_id:
             return f"{self._protocol}://{conn_id}@{self.path}"
         return super().__str__()
+
+    @classmethod
+    def __get_pydantic_core_schema__(
+        cls,
+        _source_type: Any,
+        _handler: GetCoreSchemaHandler,
+    ) -> core_schema.CoreSchema:
+        def validate_from_typed_dict(value: dict) -> ObjectStoragePath:
+            result = cls.deserialize(value, version=cls.__version__)
+            return result
+
+        python_schema = core_schema.union_schema(
+            [
+                core_schema.chain_schema(
+                    [
+                        core_schema.typed_dict_schema(
+                            {
+                                "path": 
core_schema.typed_dict_field(core_schema.str_schema()),
+                                "conn_id": core_schema.typed_dict_field(
+                                    core_schema.union_schema(
+                                        [core_schema.str_schema(), 
core_schema.none_schema()]
+                                    ),
+                                    required=False,
+                                ),
+                                "kwargs": core_schema.typed_dict_field(
+                                    core_schema.dict_schema(), required=False
+                                ),
+                            },
+                        ),
+                        
core_schema.no_info_plain_validator_function(validate_from_typed_dict),
+                    ],
+                ),
+                core_schema.chain_schema(
+                    [
+                        core_schema.str_schema(),
+                        core_schema.no_info_plain_validator_function(lambda v: 
cls(v)),
+                    ],
+                ),
+            ],
+            ref=cls.__qualname__,
+        )
+
+        return core_schema.json_or_python_schema(
+            json_schema=python_schema,
+            python_schema=core_schema.union_schema(
+                [
+                    # check if it's an instance first before doing any further 
work
+                    python_schema,
+                    core_schema.is_instance_schema(cls),
+                ]
+            ),
+            
serialization=core_schema.plain_serializer_function_ser_schema(cls.serialize),
+        )
+
+    @classmethod
+    def __get_pydantic_json_schema__(
+        cls, _core_schema: core_schema.CoreSchema, handler: 
GetJsonSchemaHandler
+    ) -> JsonSchemaValue:

Review Comment:
   The `__get_pydantic_json_schema__` method lacks a docstring explaining its 
purpose and how it customizes the JSON schema generation. This is a public API 
method that should be documented.
   
   Consider adding a docstring:
   ```python
   @classmethod
   def __get_pydantic_json_schema__(
       cls, _core_schema: core_schema.CoreSchema, handler: GetJsonSchemaHandler
   ) -> JsonSchemaValue:
       """
       Generate the JSON schema representation for ObjectStoragePath.
       
       This method customizes the JSON schema output to include the class name 
as the title.
       
       :param _core_schema: The core schema to generate JSON schema from
       :param handler: The JSON schema handler
       :return: A JSON schema dictionary with the ObjectStoragePath title
       """
   ```
   ```suggestion
       ) -> JsonSchemaValue:
           """
           Generate the JSON schema representation for ObjectStoragePath.
   
           This method customizes the JSON schema output to include the class 
name as the title.
   
           :param _core_schema: The core schema to generate JSON schema from.
           :param handler: The JSON schema handler.
           :return: A JSON schema dictionary with the ObjectStoragePath title.
           """
   ```



##########
task-sdk/src/airflow/sdk/io/path.py:
##########
@@ -408,15 +411,75 @@ def serialize(self) -> dict[str, Any]:
     def deserialize(cls, data: dict, version: int) -> ObjectStoragePath:
         if version > cls.__version__:
             raise ValueError(f"Cannot deserialize version {version} with 
version {cls.__version__}.")
-
         _kwargs = data.pop("kwargs")
         path = data.pop("path")
         conn_id = data.pop("conn_id", None)
-
         return ObjectStoragePath(path, conn_id=conn_id, **_kwargs)
 
     def __str__(self):
         conn_id = self.storage_options.get("conn_id")
         if self._protocol and conn_id:
             return f"{self._protocol}://{conn_id}@{self.path}"
         return super().__str__()
+
+    @classmethod
+    def __get_pydantic_core_schema__(
+        cls,
+        _source_type: Any,
+        _handler: GetCoreSchemaHandler,
+    ) -> core_schema.CoreSchema:
+        def validate_from_typed_dict(value: dict) -> ObjectStoragePath:
+            result = cls.deserialize(value, version=cls.__version__)
+            return result

Review Comment:
   [nitpick] The `validate_from_typed_dict` function and its immediate return 
can be simplified. The intermediate `result` variable serves no purpose and can 
be removed for clarity.
   
   Simplify to:
   ```python
   def validate_from_typed_dict(value: dict) -> ObjectStoragePath:
       return cls.deserialize(value.copy(), version=cls.__version__)
   ```
   
   Note: This assumes the mutation issue from comment ID 001 is also addressed 
by adding `.copy()`.
   ```suggestion
               return cls.deserialize(value.copy(), version=cls.__version__)
   ```



##########
task-sdk/src/airflow/sdk/io/path.py:
##########
@@ -408,15 +411,75 @@ def serialize(self) -> dict[str, Any]:
     def deserialize(cls, data: dict, version: int) -> ObjectStoragePath:
         if version > cls.__version__:
             raise ValueError(f"Cannot deserialize version {version} with 
version {cls.__version__}.")
-
         _kwargs = data.pop("kwargs")
         path = data.pop("path")
         conn_id = data.pop("conn_id", None)
-
         return ObjectStoragePath(path, conn_id=conn_id, **_kwargs)
 
     def __str__(self):
         conn_id = self.storage_options.get("conn_id")
         if self._protocol and conn_id:
             return f"{self._protocol}://{conn_id}@{self.path}"
         return super().__str__()
+
+    @classmethod
+    def __get_pydantic_core_schema__(
+        cls,
+        _source_type: Any,
+        _handler: GetCoreSchemaHandler,
+    ) -> core_schema.CoreSchema:
+        def validate_from_typed_dict(value: dict) -> ObjectStoragePath:
+            result = cls.deserialize(value, version=cls.__version__)
+            return result
+
+        python_schema = core_schema.union_schema(
+            [
+                core_schema.chain_schema(
+                    [
+                        core_schema.typed_dict_schema(
+                            {
+                                "path": 
core_schema.typed_dict_field(core_schema.str_schema()),
+                                "conn_id": core_schema.typed_dict_field(
+                                    core_schema.union_schema(
+                                        [core_schema.str_schema(), 
core_schema.none_schema()]
+                                    ),
+                                    required=False,
+                                ),
+                                "kwargs": core_schema.typed_dict_field(
+                                    core_schema.dict_schema(), required=False
+                                ),
+                            },
+                        ),
+                        
core_schema.no_info_plain_validator_function(validate_from_typed_dict),
+                    ],
+                ),
+                core_schema.chain_schema(
+                    [
+                        core_schema.str_schema(),
+                        core_schema.no_info_plain_validator_function(lambda v: 
cls(v)),
+                    ],
+                ),
+            ],
+            ref=cls.__qualname__,
+        )

Review Comment:
   The order of schemas in the `python_schema` union is incorrect. The union 
should check if the value is already an instance of the class first (using 
`is_instance_schema`) before attempting to deserialize from dict or string. 
This prevents unnecessary deserialization attempts when the value is already a 
valid `ObjectStoragePath` instance.
   
   The current ordering at line 468-474 correctly places `is_instance_schema` 
after other validations in the `python_schema` union, but the union at lines 
435-464 should also check for instance first. Consider reordering to:
   ```python
   python_schema = core_schema.union_schema(
       [
           core_schema.is_instance_schema(cls),
           core_schema.chain_schema([...]),  # typed_dict validation
           core_schema.chain_schema([...]),  # string validation
       ],
       ref=cls.__qualname__,
   )
   ```



##########
task-sdk/src/airflow/sdk/io/path.py:
##########
@@ -408,15 +411,75 @@ def serialize(self) -> dict[str, Any]:
     def deserialize(cls, data: dict, version: int) -> ObjectStoragePath:
         if version > cls.__version__:
             raise ValueError(f"Cannot deserialize version {version} with 
version {cls.__version__}.")
-
         _kwargs = data.pop("kwargs")
         path = data.pop("path")
         conn_id = data.pop("conn_id", None)
-
         return ObjectStoragePath(path, conn_id=conn_id, **_kwargs)
 
     def __str__(self):
         conn_id = self.storage_options.get("conn_id")
         if self._protocol and conn_id:
             return f"{self._protocol}://{conn_id}@{self.path}"
         return super().__str__()
+
+    @classmethod
+    def __get_pydantic_core_schema__(
+        cls,
+        _source_type: Any,
+        _handler: GetCoreSchemaHandler,
+    ) -> core_schema.CoreSchema:
+        def validate_from_typed_dict(value: dict) -> ObjectStoragePath:
+            result = cls.deserialize(value, version=cls.__version__)

Review Comment:
   The `validate_from_typed_dict` function passes the value directly to 
`cls.deserialize()`, which uses `data.pop()` to mutate the input dictionary. 
This can cause issues if Pydantic attempts to revalidate or if the dictionary 
is reused elsewhere. Consider either making a copy of the dictionary before 
passing it to `deserialize`, or modifying `deserialize` to not mutate its input.
   
   Example fix:
   ```python
   def validate_from_typed_dict(value: dict) -> ObjectStoragePath:
       result = cls.deserialize(value.copy(), version=cls.__version__)
       return result
   ```
   ```suggestion
               result = cls.deserialize(value.copy(), version=cls.__version__)
   ```



##########
task-sdk/tests/task_sdk/io/test_path.py:
##########
@@ -352,3 +353,58 @@ def get_fs_no_storage_options(_: str):
         assert get_fs("file")
         with pytest.raises(AttributeError):
             get_fs("file", storage_options={"foo": "bar"})
+
+
+class TestPydanticModel(BaseModel):
+    key: str
+    path: ObjectStoragePath
+
+    model_config = ConfigDict(from_attributes=True, 
arbitrary_types_allowed=True)
+
+
+class TestPydanticSerDe:
+    @pytest.fixture
+    def type_adapter(self) -> TypeAdapter[ObjectStoragePath]:
+        return TypeAdapter(ObjectStoragePath)
+
+    @pytest.mark.parametrize(
+        ("path", "kwargs"),
+        (
+            ("file:///tmp/foo", {}),
+            ("s3://conn_id@bucket/test.txt", {"aws_access_key": "admin", 
"aws_secret_key": "password"}),
+        ),
+    )
+    def test_pydantic_serde(self, path: str, kwargs: dict, type_adapter: 
TypeAdapter[ObjectStoragePath]):
+        path: ObjectStoragePath = ObjectStoragePath(path, **kwargs)
+        serialized = type_adapter.dump_python(path, mode="json")
+        assert serialized == path.serialize()
+
+    @pytest.mark.parametrize(
+        "serialized",
+        (
+            {"path": ".", "conn_id": None, "kwargs": {}},
+            {
+                "path": "s3://bucket/test.txt",
+                "conn_id": "conn_id",
+                "kwargs": {"aws_access_key": "admin", "aws_secret_key": 
"password"},
+            },
+            {"path": "file:///tmp/foo", "conn_id": None, "kwargs": {}},
+        ),
+    )
+    def test_pydantic_deserialize(self, serialized: dict | str, type_adapter: 
TypeAdapter[ObjectStoragePath]):

Review Comment:
   The type hint in the function signature is incorrect. The parameter 
`serialized` is annotated as `dict | str`, but the test cases only provide 
`dict` values. The `| str` part appears to be a copy-paste error from the 
`path` parameter in the previous test.
   
   The correct signature should be:
   ```python
   def test_pydantic_deserialize(self, serialized: dict, type_adapter: 
TypeAdapter[ObjectStoragePath]):
   ```
   ```suggestion
       def test_pydantic_deserialize(self, serialized: dict, type_adapter: 
TypeAdapter[ObjectStoragePath]):
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to