wenjin272 commented on code in PR #561:
URL: https://github.com/apache/flink-agents/pull/561#discussion_r2894060713
##########
python/flink_agents/api/tests/test_event.py:
##########
@@ -123,5 +123,72 @@ def test_event_with_mixed_serializable_types() -> None:
assert parsed["input"]["nested_row"]["inner"]["type"] == "Row"
-def test_input_event_ignore_row_unserializable() -> None: # noqa D103
+def test_input_event_ignore_row_unserializable_duplicate() -> None: # noqa
D103
Review Comment:
Why we need modify this method name.
##########
python/flink_agents/api/decorators.py:
##########
@@ -20,13 +20,16 @@
from flink_agents.api.events.event import Event
-def action(*listen_events: Type[Event]) -> Callable:
+def action(*listen_events: Type[Event] | str) -> Callable:
Review Comment:
I think we can just remove support for `Type[Event]`, and only support
declaring type-identifier strings in the action decorator.
Regarding type safety and ease of use, I will describe them in another
comment.
##########
python/flink_agents/api/runner_context.py:
##########
@@ -79,15 +81,51 @@ class RunnerContext(ABC):
This context provides access to event handling.
"""
- @abstractmethod
- def send_event(self, event: Event) -> None:
+ def send_event(
+ self,
+ event: Event | None = None,
+ *,
+ identifier: str | None = None,
+ **kwargs: Any,
+ ) -> None:
"""Send an event to the agent for processing.
+ Can be called in two ways:
+
+ 1. **Object form** — pass a pre-built :class:`Event` (or subclass)::
+
+ ctx.send_event(OutputEvent(output="hi"))
+
+ 2. **Shorthand form** — provide ``identifier`` and key-value attrs::
+
+ ctx.send_event(identifier="MyEvent", field1="test", field2=1)
Review Comment:
In #424, I described that I would like to support this shorthand form. But
after the offline discuss with @xintongsong, we consider this way to be
somewhat too casual.
I think we can keep support only for the first approach, having `send_event`
accept only pre-built event object. This ensures consistency of the
`send_event` API between Python and Java, and eliminates the need to introduce
a `_send_event` method.
##########
python/flink_agents/api/tests/test_event.py:
##########
@@ -123,5 +123,72 @@ def test_event_with_mixed_serializable_types() -> None:
assert parsed["input"]["nested_row"]["inner"]["type"] == "Row"
-def test_input_event_ignore_row_unserializable() -> None: # noqa D103
+def test_input_event_ignore_row_unserializable_duplicate() -> None: # noqa
D103
InputEvent(input=Row({"a": 1}))
+
+
+# ── Unified Event tests ──────────────────────────────────────────────────
+
+
+def test_unified_event_creation() -> None:
+ """Test creating a unified event with type and attributes."""
+ event = Event(type="MyEvent", attributes={"field1": "test", "field2": 42})
+ assert event.type == "MyEvent"
+ assert event.attributes == {"field1": "test", "field2": 42}
+ assert event.get_type() == "MyEvent"
+
+
+def test_unified_event_get_type_falls_back_to_class_name() -> None:
+ """Test that get_type() falls back to FQN class name for subclasses."""
+ event = InputEvent(input="hello")
+ assert event.type is None
+ assert event.get_type() == (
+ f"{InputEvent.__module__}.{InputEvent.__qualname__}"
+ )
+
+
+def test_unified_event_base_get_type_no_type_set() -> None:
+ """Test that get_type() returns FQN class name for base Event without
type."""
+ event = Event(a=1)
+ assert event.type is None
+ assert event.get_type() == f"{Event.__module__}.{Event.__qualname__}"
+
+
+def test_unified_event_get_attr_set_attr() -> None:
+ """Test get_attr and set_attr convenience methods."""
+ event = Event(type="TestEvent")
+ event.set_attr("key", "value")
+ assert event.get_attr("key") == "value"
+ assert event.get_attr("missing") is None
+
+
+def test_unified_event_from_json() -> None:
+ """Test deserializing a unified event from JSON."""
+ import json
+
+ data = {"type": "MyEvent", "attributes": {"x": 1}}
+ event = Event.from_json(json.dumps(data))
+ assert event.type == "MyEvent"
+ assert event.attributes == {"x": 1}
+
+
+def test_unified_event_from_json_missing_type() -> None:
+ """Test that from_json raises ValueError when type is missing."""
+ import json
+
+ with pytest.raises(ValueError, match="type"):
+ Event.from_json(json.dumps({"attributes": {}}))
+
+
+def test_unified_event_serialization_roundtrip() -> None:
+ """Test that unified events survive JSON serialization/deserialization."""
+ import json
+
+ original = Event(type="RoundTrip", attributes={"a": 1, "b": "two"})
Review Comment:
I think `Row` field should also be tested, for when user using Table api,
the `input` of `InputEvent` will be `Row` type.
##########
python/flink_agents/runtime/flink_runner_context.py:
##########
@@ -218,22 +218,45 @@ def set_long_term_memory(self, ltm:
InternalBaseLongTermMemory) -> None:
self.__ltm = ltm
@override
- def send_event(self, event: Event) -> None:
+ def _send_event(self, event: Event) -> None:
"""Send an event to the agent for processing.
+ Unified events (where ``event.type`` is set) are serialized as JSON and
+ sent via ``sendUnifiedEvent`` so that Java can reconstruct them without
+ cloudpickle. Subclassed events use the legacy cloudpickle path.
Review Comment:
In our plan, we will support cross-language usage of events and actions. To
support call actions in another language, we should always serialize events to
json, including unified events and subclassed events, so that another language
can reconstruct them. The cloudpickle path should be removed.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]