ntjohnson1 commented on code in PR #1541:
URL: 
https://github.com/apache/datafusion-python/pull/1541#discussion_r3248013289


##########
examples/datafusion-ffi-example/python/tests/_test_logical_extension_codec.py:
##########
@@ -0,0 +1,81 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from __future__ import annotations
+
+from datafusion import LogicalPlan, SessionContext
+from datafusion_ffi_example import MyLogicalExtensionCodec
+
+
+def _setup_session_with_codec() -> tuple[SessionContext, 
MyLogicalExtensionCodec]:
+    """Build a session with the user-supplied logical extension codec
+    installed. Tests use a FROM-less query so plan serialization does
+    not pull in `try_encode_table_provider`, which the default codec
+    leaves unimplemented."""
+    base = SessionContext()
+    codec = MyLogicalExtensionCodec()
+    ctx = base.with_logical_extension_codec(codec)
+    return ctx, codec
+
+
+def test_ffi_logical_codec_install_and_export():
+    """Installing a user FFI codec replaces the session's logical
+    codec; the capsule getter on the session re-exports it."""
+    ctx, _codec = _setup_session_with_codec()
+    capsule = ctx.__datafusion_logical_extension_codec__()
+    assert capsule is not None
+
+
+def test_ffi_logical_codec_consulted_on_udf_encode():
+    """Serializing through ctx.logical_codec() routes try_encode_udf to
+    the user-installed FFI codec.
+
+    Verifies the dispatch chain
+    `PyLogicalPlan.to_bytes -> session.logical_codec ->
+    PythonLogicalCodec -> FFI_LogicalExtensionCodec -> user impl`
+    is wired correctly. The user codec's atomic counter increments
+    after a serialization pass, proving every hop forwards.
+
+    Does not test any Python-UDF-specific dispatch — PythonLogicalCodec
+    currently delegates all UDF encoding to its inner codec
+    unconditionally. Python-vs-other branching lands when in-band
+    scalar UDF encoding is added.
+    """
+    ctx, codec = _setup_session_with_codec()
+    df = ctx.sql("SELECT abs(-1) AS x")
+    plan = df.logical_plan()
+
+    before = codec.encode_udf_calls()
+    _ = plan.to_bytes(ctx)
+    after = codec.encode_udf_calls()
+
+    assert after > before, (
+        f"Expected user FFI codec encode_udf to fire, before={before} 
after={after}"
+    )
+
+
+def test_ffi_logical_codec_roundtrip():
+    """A plan referencing an FFI-imported UDF round-trips through the
+    user-supplied logical codec (encode via codec, decode resolves from
+    registry — `try_decode_udf` is only consulted when the UDF is not
+    in the registry, which is the codec-inlined case)."""
+    ctx, _codec = _setup_session_with_codec()
+    df = ctx.sql("SELECT abs(-1) AS x")
+    blob = df.logical_plan().to_bytes(ctx)
+
+    restored = LogicalPlan.from_bytes(ctx, blob)
+    assert restored is not None

Review Comment:
   Should we check something strong than is not None? Can we just check that 
restored == df.logical_plan()



##########
python/datafusion/context.py:
##########
@@ -1750,4 +1750,22 @@ def with_logical_extension_codec(self, codec: Any) -> 
SessionContext:
         This only supports codecs that have been implemented using the
         FFI interface.
         """
-        return self.ctx.with_logical_extension_codec(codec)
+        new_internal = self.ctx.with_logical_extension_codec(codec)
+        new = SessionContext.__new__(SessionContext)
+        new.ctx = new_internal
+        return new
+
+    def __datafusion_physical_extension_codec__(self) -> Any:

Review Comment:
   Hmm existing code here already uses Any otherwise I'd ask if we had more 
precise types. Fairly internal so probably fine.



##########
examples/datafusion-ffi-example/python/tests/_test_physical_extension_codec.py:
##########
@@ -0,0 +1,77 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from __future__ import annotations
+
+import pyarrow as pa
+from datafusion import ExecutionPlan, SessionContext
+from datafusion_ffi_example import MyPhysicalExtensionCodec
+
+
+def _setup_session_with_codec() -> tuple[SessionContext, 
MyPhysicalExtensionCodec]:
+    base = SessionContext()
+    batch = pa.RecordBatch.from_arrays(
+        [pa.array([-1, -2, -3])],
+        names=["a"],
+    )
+    base.register_record_batches("t", [[batch]])
+    codec = MyPhysicalExtensionCodec()
+    ctx = base.with_physical_extension_codec(codec)
+    return ctx, codec
+
+
+def test_ffi_physical_codec_install_and_export():
+    ctx, _codec = _setup_session_with_codec()
+    capsule = ctx.__datafusion_physical_extension_codec__()
+    assert capsule is not None
+
+
+def test_ffi_physical_codec_consulted_on_udf_encode():
+    """Serializing through ctx.physical_codec() routes try_encode_udf to
+    the user-installed FFI codec.
+
+    Mirror of the logical-side dispatch test: verifies
+    `PyExecutionPlan.to_bytes -> session.physical_codec ->
+    PythonPhysicalCodec -> FFI_PhysicalExtensionCodec -> user impl`
+    forwards correctly. Does not test Python-UDF-specific dispatch —
+    PythonPhysicalCodec currently delegates all UDF encoding to its
+    inner codec unconditionally.
+    """
+    ctx, codec = _setup_session_with_codec()
+    df = ctx.sql("SELECT abs(a) AS x FROM t")
+    plan = df.execution_plan()
+
+    before = codec.encode_udf_calls()
+    _ = plan.to_bytes(ctx)
+    after = codec.encode_udf_calls()
+
+    assert after > before, (
+        f"Expected user FFI codec encode_udf to fire, before={before} 
after={after}"
+    )
+
+
+def test_ffi_physical_codec_roundtrip():
+    """A plan referencing an FFI-imported UDF round-trips via the
+    user-supplied physical codec. On decode, the receiver resolves the
+    UDF from the function registry; `try_decode_udf` only fires when a
+    codec inlines the UDF body, which the counting codec does not."""
+    ctx, _codec = _setup_session_with_codec()
+    df = ctx.sql("SELECT abs(a) AS x FROM t")
+    blob = df.execution_plan().to_bytes(ctx)
+
+    restored = ExecutionPlan.from_bytes(ctx, blob)
+    assert restored is not None

Review Comment:
   same roundtrip note



##########
examples/datafusion-ffi-example/python/tests/_test_logical_extension_codec.py:
##########
@@ -0,0 +1,81 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from __future__ import annotations
+
+from datafusion import LogicalPlan, SessionContext
+from datafusion_ffi_example import MyLogicalExtensionCodec
+
+
+def _setup_session_with_codec() -> tuple[SessionContext, 
MyLogicalExtensionCodec]:
+    """Build a session with the user-supplied logical extension codec
+    installed. Tests use a FROM-less query so plan serialization does
+    not pull in `try_encode_table_provider`, which the default codec
+    leaves unimplemented."""
+    base = SessionContext()
+    codec = MyLogicalExtensionCodec()
+    ctx = base.with_logical_extension_codec(codec)
+    return ctx, codec
+
+
+def test_ffi_logical_codec_install_and_export():
+    """Installing a user FFI codec replaces the session's logical
+    codec; the capsule getter on the session re-exports it."""
+    ctx, _codec = _setup_session_with_codec()
+    capsule = ctx.__datafusion_logical_extension_codec__()
+    assert capsule is not None
+
+
+def test_ffi_logical_codec_consulted_on_udf_encode():
+    """Serializing through ctx.logical_codec() routes try_encode_udf to
+    the user-installed FFI codec.
+
+    Verifies the dispatch chain
+    `PyLogicalPlan.to_bytes -> session.logical_codec ->
+    PythonLogicalCodec -> FFI_LogicalExtensionCodec -> user impl`
+    is wired correctly. The user codec's atomic counter increments
+    after a serialization pass, proving every hop forwards.
+
+    Does not test any Python-UDF-specific dispatch — PythonLogicalCodec
+    currently delegates all UDF encoding to its inner codec
+    unconditionally. Python-vs-other branching lands when in-band
+    scalar UDF encoding is added.
+    """
+    ctx, codec = _setup_session_with_codec()
+    df = ctx.sql("SELECT abs(-1) AS x")
+    plan = df.logical_plan()
+
+    before = codec.encode_udf_calls()
+    _ = plan.to_bytes(ctx)
+    after = codec.encode_udf_calls()
+
+    assert after > before, (
+        f"Expected user FFI codec encode_udf to fire, before={before} 
after={after}"
+    )
+
+
+def test_ffi_logical_codec_roundtrip():
+    """A plan referencing an FFI-imported UDF round-trips through the
+    user-supplied logical codec (encode via codec, decode resolves from
+    registry — `try_decode_udf` is only consulted when the UDF is not
+    in the registry, which is the codec-inlined case)."""
+    ctx, _codec = _setup_session_with_codec()
+    df = ctx.sql("SELECT abs(-1) AS x")
+    blob = df.logical_plan().to_bytes(ctx)
+
+    restored = LogicalPlan.from_bytes(ctx, blob)
+    assert restored is not None

Review Comment:
   Ahh I see we do a more complete check on the python side



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to