This is an automated email from the ASF dual-hosted git repository. xtsong pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/flink-agents.git
commit 893e5232ea4dc1586bb8443e53285b150df0be62 Author: sxnan <[email protected]> AuthorDate: Fri Jan 16 18:11:12 2026 +0800 [log] Improve the event log content for PythonEvent --- .../e2e_tests_integration/event_log_test.py | 100 ------------------ .../python_event_logging_test.py | 63 +++++++---- .../flink_agents/runtime/flink_runner_context.py | 5 +- .../runtime/java/java_resource_wrapper.py | 2 +- python/flink_agents/runtime/python_java_utils.py | 20 +++- .../eventlog/EventLogRecordJsonDeserializer.java | 37 +++++-- .../eventlog/EventLogRecordJsonSerializer.java | 116 ++++++++++++++++++--- .../python/context/PythonRunnerContextImpl.java | 4 +- .../agents/runtime/python/event/PythonEvent.java | 26 ++--- .../runtime/python/utils/PythonActionExecutor.java | 4 +- .../eventlog/EventLogRecordJsonSerdeTest.java | 16 ++- .../runtime/eventlog/FileEventLoggerTest.java | 2 +- .../runtime/python/event/PythonEventTest.java | 34 +++--- 13 files changed, 231 insertions(+), 198 deletions(-) diff --git a/python/flink_agents/e2e_tests/e2e_tests_integration/event_log_test.py b/python/flink_agents/e2e_tests/e2e_tests_integration/event_log_test.py deleted file mode 100644 index 275f5a73..00000000 --- a/python/flink_agents/e2e_tests/e2e_tests_integration/event_log_test.py +++ /dev/null @@ -1,100 +0,0 @@ -################################################################################ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -################################################################################# -import json -import os -import sysconfig -from pathlib import Path - -from pyflink.common import Configuration, Encoder, WatermarkStrategy -from pyflink.common.typeinfo import Types -from pyflink.datastream import RuntimeExecutionMode, StreamExecutionEnvironment -from pyflink.datastream.connectors.file_system import ( - FileSource, - StreamFormat, - StreamingFileSink, -) - -from flink_agents.api.execution_environment import AgentsExecutionEnvironment -from flink_agents.e2e_tests.e2e_tests_integration.flink_integration_agent import ( - DataStreamAgent, - ItemData, - MyKeySelector, -) - -current_dir = Path(__file__).parent -os.environ["PYTHONPATH"] = sysconfig.get_paths()["purelib"] - - -def test_event_log_base_dir_flink(tmp_path: Path) -> None: # noqa: D103 - config = Configuration() - config.set_string("state.backend.type", "rocksdb") - config.set_string("checkpointing.interval", "1s") - config.set_string("restart-strategy.type", "disable") - env = StreamExecutionEnvironment.get_execution_environment(config) - env.set_runtime_mode(RuntimeExecutionMode.STREAMING) - env.set_parallelism(1) - - input_datastream = env.from_source( - source=FileSource.for_record_stream_format( - StreamFormat.text_line_format(), f"file:///{current_dir}/../resources/input" - ).build(), - watermark_strategy=WatermarkStrategy.no_watermarks(), - source_name="event_log_test_source", - ) - - deserialize_datastream = input_datastream.map( - lambda x: ItemData.model_validate_json(x) - ) - - agents_env = AgentsExecutionEnvironment.get_execution_environment(env=env) - event_log_dir = tmp_path / "event_log" - agents_env.get_config().set_str("baseLogDir", str(event_log_dir)) - - output_datastream = ( - agents_env.from_datastream( - input=deserialize_datastream, key_selector=MyKeySelector() - ) - .apply(DataStreamAgent()) - .to_datastream() - ) - - result_dir = tmp_path / "results" - result_dir.mkdir(parents=True, exist_ok=True) - output_datastream.map(lambda x: x.model_dump_json(), Types.STRING()).add_sink( - StreamingFileSink.for_row_format( - base_path=str(result_dir.absolute()), - encoder=Encoder.simple_string_encoder(), - ).build() - ) - - agents_env.execute() - - event_logs = list(event_log_dir.glob("events-*.log")) - assert event_logs, "No event log files found in configured baseLogDir." - - first_log = event_logs[0] - record = None - with first_log.open("r", encoding="utf-8") as handle: - for line in handle: - if line.strip(): - record = json.loads(line) - break - - assert record is not None, "Event log file is empty." - assert "context" in record - assert "event" in record diff --git a/python/flink_agents/e2e_tests/e2e_tests_integration/python_event_logging_test.py b/python/flink_agents/e2e_tests/e2e_tests_integration/python_event_logging_test.py index ee85aeb6..2db30804 100644 --- a/python/flink_agents/e2e_tests/e2e_tests_integration/python_event_logging_test.py +++ b/python/flink_agents/e2e_tests/e2e_tests_integration/python_event_logging_test.py @@ -16,7 +16,9 @@ # limitations under the License. ################################################################################# import json +import os import shutil +import sysconfig import tempfile from pathlib import Path @@ -37,6 +39,8 @@ from flink_agents.api.events.event import InputEvent, OutputEvent from flink_agents.api.execution_environment import AgentsExecutionEnvironment from flink_agents.api.runner_context import RunnerContext +os.environ["PYTHONPATH"] = sysconfig.get_paths()["purelib"] + class InputKeySelector(KeySelector): """Key selector for input data.""" @@ -60,9 +64,9 @@ class PythonEventLoggingAgent(Agent): ) -def test_python_event_logging() -> None: - """Test that PythonEvent can be logged with readable content.""" - # Check that log files were created in the default location +def test_python_event_logging(tmp_path: Path) -> None: + """Test event logs are written to configured directory with expected content.""" + event_log_dir = tmp_path / "event_log" default_log_dir = Path(tempfile.gettempdir()) / "flink-agents" shutil.rmtree(default_log_dir, ignore_errors=True) @@ -73,6 +77,7 @@ def test_python_event_logging() -> None: # Create agent environment agents_env = AgentsExecutionEnvironment.get_execution_environment(env=env) + agents_env.get_config().set_str("baseLogDir", str(event_log_dir)) # Set up input source current_dir = Path(__file__).parent @@ -96,28 +101,42 @@ def test_python_event_logging() -> None: # Execute the job agents_env.execute() - # Also check our custom log directory - log_files = [] - if default_log_dir.exists(): - log_files.extend(default_log_dir.glob("events-*.log")) + # Check that log files were created in configured directory + log_files = list(event_log_dir.glob("events-*.log")) # At least one log file should exist assert len(log_files) > 0, ( - f"Event log files should be created in {default_log_dir}" + f"Event log files should be created in {event_log_dir}" ) - # Check that log files contain readable PythonEvent content - log_content = "" + # Check that log files contain structured event content + record = None + record_line = None + has_processed_review = False for log_file in log_files: - with log_file.open() as f: - log_content += f.read() - - print(log_content) - - # Verify log contains expected content - should have readable event data via - # eventString - assert "processed_review" in log_content, ( - "Log should contain processed event content from eventString" - ) - assert "eventString" in log_content, "Log should contain eventString field" - assert "eventType" in log_content, "Log should contain event type information" + with log_file.open(encoding="utf-8") as handle: + for line in handle: + if line.strip(): + record = json.loads(line) + record_line = line + event_payload = record.get("event", {}) + if "processed_review" in json.dumps(event_payload): + has_processed_review = True + break + if record is not None and has_processed_review: + break + + assert record is not None, "Event log file is empty." + assert record_line is not None, "Event log file is empty." + assert "timestamp" in record + assert "event" in record + assert "eventType" in record["event"] + assert has_processed_review, "Log should contain processed review content" + + event_type_idx = record_line.find('"eventType"') + id_idx = record_line.find('"id"') + attributes_idx = record_line.find('"attributes"') + assert event_type_idx != -1 + assert id_idx != -1 + assert attributes_idx != -1 + assert event_type_idx < id_idx < attributes_idx diff --git a/python/flink_agents/runtime/flink_runner_context.py b/python/flink_agents/runtime/flink_runner_context.py index 994f095e..0e8664a0 100644 --- a/python/flink_agents/runtime/flink_runner_context.py +++ b/python/flink_agents/runtime/flink_runner_context.py @@ -44,6 +44,7 @@ from flink_agents.runtime.memory.internal_base_long_term_memory import ( from flink_agents.runtime.memory.vector_store_long_term_memory import ( VectorStoreLongTermMemory, ) +from flink_agents.runtime.python_java_utils import _build_event_log_string logger = logging.getLogger(__name__) @@ -220,9 +221,9 @@ class FlinkRunnerContext(RunnerContext): """ class_path = f"{event.__class__.__module__}.{event.__class__.__qualname__}" event_bytes = cloudpickle.dumps(event) - event_string = str(event) + event_json_str = _build_event_log_string(event, class_path) try: - self._j_runner_context.sendEvent(class_path, event_bytes, event_string) + self._j_runner_context.sendEvent(class_path, event_bytes, event_json_str) except Exception as e: err_msg = "Failed to send event " + class_path + " to runner context" raise RuntimeError(err_msg) from e diff --git a/python/flink_agents/runtime/java/java_resource_wrapper.py b/python/flink_agents/runtime/java/java_resource_wrapper.py index 56a101ca..0d1e8a46 100644 --- a/python/flink_agents/runtime/java/java_resource_wrapper.py +++ b/python/flink_agents/runtime/java/java_resource_wrapper.py @@ -17,7 +17,6 @@ ################################################################################# from typing import Any, List -from pemja import findClass from pydantic import Field from typing_extensions import override @@ -54,6 +53,7 @@ class JavaPrompt(Prompt): def format_messages( self, role: MessageRole = MessageRole.SYSTEM, **kwargs: str ) -> List[ChatMessage]: + from pemja import findClass j_MessageRole = findClass("org.apache.flink.agents.api.chat.messages.MessageRole") j_chat_messages = self.j_prompt.formatMessages(j_MessageRole.fromValue(role.value), kwargs) chatMessages = [ChatMessage(role=MessageRole(j_chat_message.getRole().getValue()), diff --git a/python/flink_agents/runtime/python_java_utils.py b/python/flink_agents/runtime/python_java_utils.py index 509d0c51..dc320e7c 100644 --- a/python/flink_agents/runtime/python_java_utils.py +++ b/python/flink_agents/runtime/python_java_utils.py @@ -16,13 +16,13 @@ # limitations under the License. ################################################################################# import importlib +import json from typing import Any, Callable, Dict import cloudpickle -from pemja import findClass from flink_agents.api.chat_message import ChatMessage, MessageRole -from flink_agents.api.events.event import InputEvent +from flink_agents.api.events.event import Event, InputEvent from flink_agents.api.resource import Resource, ResourceType, get_resource_class from flink_agents.api.tools.tool import ToolMetadata from flink_agents.api.tools.utils import create_model_from_java_tool_schema_str @@ -46,14 +46,25 @@ def convert_to_python_object(bytesObject: bytes) -> Any: return cloudpickle.loads(bytesObject) +def _build_event_log_string(event: InputEvent | Event, event_type: str) -> str: + try: + payload = json.loads(event.model_dump_json()) + payload["eventType"] = event_type + payload.setdefault("attributes", {}) + return json.dumps(payload) + except Exception: + return str(event) + + def wrap_to_input_event(bytesObject: bytes) -> tuple[bytes, str]: """Wrap data to python input event and serialize. Returns: - A tuple of (serialized_event_bytes, event_string_representation) + A tuple of (serialized_event_bytes, event_json_str) """ event = InputEvent(input=cloudpickle.loads(bytesObject)) - return (cloudpickle.dumps(event), str(event)) + event_type = f"{event.__class__.__module__}.{event.__class__.__qualname__}" + return (cloudpickle.dumps(event), _build_event_log_string(event, event_type)) def get_output_from_output_event(bytesObject: bytes) -> Any: @@ -166,6 +177,7 @@ def from_java_chat_message(j_chat_message: Any) -> ChatMessage: def to_java_chat_message(chat_message: ChatMessage) -> Any: """Convert a chat message to a java chat message.""" + from pemja import findClass j_ChatMessage = findClass("org.apache.flink.agents.api.chat.messages.ChatMessage") j_chat_message = j_ChatMessage() diff --git a/runtime/src/main/java/org/apache/flink/agents/runtime/eventlog/EventLogRecordJsonDeserializer.java b/runtime/src/main/java/org/apache/flink/agents/runtime/eventlog/EventLogRecordJsonDeserializer.java index f2b89d3a..a516edf5 100644 --- a/runtime/src/main/java/org/apache/flink/agents/runtime/eventlog/EventLogRecordJsonDeserializer.java +++ b/runtime/src/main/java/org/apache/flink/agents/runtime/eventlog/EventLogRecordJsonDeserializer.java @@ -23,6 +23,7 @@ import com.fasterxml.jackson.databind.DeserializationContext; import com.fasterxml.jackson.databind.JsonDeserializer; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ObjectNode; import org.apache.flink.agents.api.Event; import org.apache.flink.agents.api.EventContext; @@ -51,24 +52,21 @@ public class EventLogRecordJsonDeserializer extends JsonDeserializer<EventLogRec ObjectMapper mapper = (ObjectMapper) parser.getCodec(); JsonNode rootNode = mapper.readTree(parser); - // Deserialize context - JsonNode contextNode = rootNode.get("context"); - if (contextNode == null) { - throw new IOException("Missing 'context' field in EventLogRecord JSON"); + // Deserialize timestamp + JsonNode timestampNode = rootNode.get("timestamp"); + if (timestampNode == null || !timestampNode.isTextual()) { + throw new IOException("Missing 'timestamp' field in EventLogRecord JSON"); } - EventContext eventContext = mapper.treeToValue(contextNode, EventContext.class); - if (eventContext == null) { - throw new IOException("Failed to deserialize EventContext"); - } - - // Deserialize event using eventType from context + // Deserialize event using eventType from event node JsonNode eventNode = rootNode.get("event"); if (eventNode == null) { throw new IOException("Missing 'event' field in EventLogRecord JSON"); } + String eventType = getEventType(eventNode); - Event event = deserializeEvent(mapper, eventNode, eventContext.getEventType()); + Event event = deserializeEvent(mapper, stripEventType(eventNode), eventType); + EventContext eventContext = new EventContext(eventType, timestampNode.asText()); return new EventLogRecord(eventContext, event); } @@ -102,4 +100,21 @@ public class EventLogRecordJsonDeserializer extends JsonDeserializer<EventLogRec String.format("Failed to deserialize event of type '%s'", eventType), e); } } + + private static String getEventType(JsonNode eventNode) throws IOException { + JsonNode eventTypeNode = eventNode.get("eventType"); + if (eventTypeNode == null || !eventTypeNode.isTextual()) { + throw new IOException("Missing 'eventType' field in event JSON"); + } + return eventTypeNode.asText(); + } + + private static JsonNode stripEventType(JsonNode eventNode) { + if (eventNode.isObject()) { + ObjectNode copy = ((ObjectNode) eventNode).deepCopy(); + copy.remove("eventType"); + return copy; + } + return eventNode; + } } diff --git a/runtime/src/main/java/org/apache/flink/agents/runtime/eventlog/EventLogRecordJsonSerializer.java b/runtime/src/main/java/org/apache/flink/agents/runtime/eventlog/EventLogRecordJsonSerializer.java index 89a4e0f7..db7bebd7 100644 --- a/runtime/src/main/java/org/apache/flink/agents/runtime/eventlog/EventLogRecordJsonSerializer.java +++ b/runtime/src/main/java/org/apache/flink/agents/runtime/eventlog/EventLogRecordJsonSerializer.java @@ -19,10 +19,17 @@ package org.apache.flink.agents.runtime.eventlog; import com.fasterxml.jackson.core.JsonGenerator; +import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.JsonSerializer; +import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.SerializerProvider; +import com.fasterxml.jackson.databind.node.ObjectNode; +import org.apache.flink.agents.api.Event; +import org.apache.flink.agents.runtime.python.event.PythonEvent; import java.io.IOException; +import java.util.Iterator; +import java.util.Map; /** * Custom JSON serializer for {@link EventLogRecord}. @@ -31,7 +38,7 @@ import java.io.IOException; * for structured logging. The serialization includes: * * <ul> - * <li>EventContext with eventType and timestamp information + * <li>Top-level timestamp * <li>Event data serialized as a standard JSON object * </ul> * @@ -39,11 +46,9 @@ import java.io.IOException; * * <pre>{@code * { - * "context": { - * "eventType": "org.apache.flink.agents.api.InputEvent", - * "timestamp": "2024-01-15T10:30:00Z" - * }, + * "timestamp": "2024-01-15T10:30:00Z", * "event": { + * "eventType": "org.apache.flink.agents.api.InputEvent" * // Event-specific fields serialized normally * } * } @@ -55,16 +60,103 @@ public class EventLogRecordJsonSerializer extends JsonSerializer<EventLogRecord> public void serialize(EventLogRecord record, JsonGenerator gen, SerializerProvider serializers) throws IOException { - gen.writeStartObject(); + ObjectMapper mapper = (ObjectMapper) gen.getCodec(); + if (mapper == null) { + mapper = new ObjectMapper(); + } - // Serialize context - contains eventType and timestamp - gen.writeFieldName("context"); - serializers.defaultSerializeValue(record.getContext(), gen); + gen.writeStartObject(); + gen.writeStringField("timestamp", record.getContext().getTimestamp()); - // Serialize event - standard JSON serialization gen.writeFieldName("event"); - serializers.defaultSerializeValue(record.getEvent(), gen); - + JsonNode eventNode = buildEventNode(record.getEvent(), mapper); + if (!eventNode.isObject()) { + throw new IllegalStateException( + "Event log payload must be a JSON object, but was: " + eventNode.getNodeType()); + } + eventNode = reorderEventFields((ObjectNode) eventNode, record.getEvent(), mapper); + gen.writeTree(eventNode); gen.writeEndObject(); } + + private JsonNode buildEventNode(Event event, ObjectMapper mapper) { + if (event instanceof PythonEvent) { + return buildPythonEventNode((PythonEvent) event, mapper); + } + JsonNode eventNode = mapper.valueToTree(event); + if (eventNode.isObject()) { + ObjectNode objectNode = (ObjectNode) eventNode; + objectNode.put("eventType", event.getClass().getName()); + objectNode.remove("sourceTimestamp"); + } + return eventNode; + } + + private JsonNode buildPythonEventNode(PythonEvent event, ObjectMapper mapper) { + String eventJsonStr = event.getEventJsonStr(); + if (eventJsonStr != null) { + try { + JsonNode parsed = mapper.readTree(eventJsonStr); + if (parsed.isObject()) { + ObjectNode objectNode = (ObjectNode) parsed; + objectNode.remove("sourceTimestamp"); + return objectNode; + } + return parsed; + } catch (IOException ignored) { + // Fallback to raw eventJsonStr + } + } + ObjectNode fallback = mapper.createObjectNode(); + if (event.getEventType() != null) { + fallback.put("eventType", event.getEventType()); + } + fallback.put("id", event.getId().toString()); + fallback.put("rawEventJsonStr", eventJsonStr); + return fallback; + } + + private ObjectNode reorderEventFields(ObjectNode original, Event event, ObjectMapper mapper) { + ObjectNode ordered = mapper.createObjectNode(); + + JsonNode eventTypeNode = original.get("eventType"); + if (eventTypeNode != null) { + ordered.set("eventType", eventTypeNode); + } else if (event instanceof PythonEvent) { + String eventType = ((PythonEvent) event).getEventType(); + if (eventType != null) { + ordered.put("eventType", eventType); + } + } else { + ordered.put("eventType", event.getClass().getName()); + } + + JsonNode idNode = original.get("id"); + if (idNode != null) { + ordered.set("id", idNode); + } else if (event.getId() != null) { + ordered.put("id", event.getId().toString()); + } + + JsonNode attributesNode = original.get("attributes"); + if (attributesNode != null) { + ordered.set("attributes", attributesNode); + } else { + ordered.putObject("attributes"); + } + + Iterator<Map.Entry<String, JsonNode>> fields = original.fields(); + while (fields.hasNext()) { + Map.Entry<String, JsonNode> entry = fields.next(); + String fieldName = entry.getKey(); + if ("sourceTimestamp".equals(fieldName)) { + continue; + } + if (!ordered.has(fieldName)) { + ordered.set(fieldName, entry.getValue()); + } + } + + return ordered; + } } diff --git a/runtime/src/main/java/org/apache/flink/agents/runtime/python/context/PythonRunnerContextImpl.java b/runtime/src/main/java/org/apache/flink/agents/runtime/python/context/PythonRunnerContextImpl.java index ddabf503..3d37b05a 100644 --- a/runtime/src/main/java/org/apache/flink/agents/runtime/python/context/PythonRunnerContextImpl.java +++ b/runtime/src/main/java/org/apache/flink/agents/runtime/python/context/PythonRunnerContextImpl.java @@ -46,8 +46,8 @@ public class PythonRunnerContextImpl extends RunnerContextImpl { super.sendEvent(event); } - public void sendEvent(String type, byte[] event, String eventString) { + public void sendEvent(String type, byte[] event, String eventJsonStr) { // this method will be invoked by PythonActionExecutor's python interpreter. - sendEvent(new PythonEvent(event, type, eventString)); + sendEvent(new PythonEvent(event, type, eventJsonStr)); } } diff --git a/runtime/src/main/java/org/apache/flink/agents/runtime/python/event/PythonEvent.java b/runtime/src/main/java/org/apache/flink/agents/runtime/python/event/PythonEvent.java index 8f5248d1..2abe7cb6 100644 --- a/runtime/src/main/java/org/apache/flink/agents/runtime/python/event/PythonEvent.java +++ b/runtime/src/main/java/org/apache/flink/agents/runtime/python/event/PythonEvent.java @@ -33,19 +33,19 @@ import java.util.UUID; * An event generated by the framework, passing a Python event to the Java agent runner. * * <p>This class stores Python objects as serialized byte arrays for processing, and also maintains - * a human-readable string representation (eventString) for logging purposes. The eventString is - * generated on the Python side when the event is created. + * a JSON string representation (eventJsonStr) for logging purposes. The eventJsonStr is generated + * on the Python side when the event is created. */ public class PythonEvent extends Event { private final byte[] event; private final String eventType; - private final String eventString; + private final String eventJsonStr; - public PythonEvent(byte[] event, String eventType, String eventString) { + public PythonEvent(byte[] event, String eventType, String eventJsonStr) { super(); this.event = event; this.eventType = eventType; - this.eventString = eventString; + this.eventJsonStr = eventJsonStr; } @JsonCreator @@ -54,11 +54,11 @@ public class PythonEvent extends Event { @JsonProperty("attributes") Map<String, Object> attributes, @JsonProperty("event") byte[] event, @JsonProperty("eventType") String eventType, - @JsonProperty("eventString") String eventString) { + @JsonProperty("eventJsonStr") String eventJsonStr) { super(id, attributes); this.event = event; this.eventType = eventType; - this.eventString = eventString; + this.eventJsonStr = eventJsonStr; } @JsonIgnore // Don't serialize byte array in logs - used for processing only @@ -71,15 +71,15 @@ public class PythonEvent extends Event { } /** - * Returns the human-readable string representation of this event. + * Returns the JSON string representation of this event. * * <p>This string is generated on the Python side when the event is created and is primarily * used for logging purposes. * - * @return the string representation of the event, or null if not available + * @return the JSON string representation of the event, or null if not available */ - public String getEventString() { - return eventString; + public String getEventJsonStr() { + return eventJsonStr; } @Override @@ -92,11 +92,11 @@ public class PythonEvent extends Event { PythonEvent other = (PythonEvent) o; return Arrays.equals(event, other.event) && Objects.equals(this.eventType, other.eventType) - && Objects.equals(this.eventString, other.eventString); + && Objects.equals(this.eventJsonStr, other.eventJsonStr); } @Override public int hashCode() { - return Objects.hash(super.hashCode(), Arrays.hashCode(event), eventType, eventString); + return Objects.hash(super.hashCode(), Arrays.hashCode(event), eventType, eventJsonStr); } } diff --git a/runtime/src/main/java/org/apache/flink/agents/runtime/python/utils/PythonActionExecutor.java b/runtime/src/main/java/org/apache/flink/agents/runtime/python/utils/PythonActionExecutor.java index 55d38187..d47d8f58 100644 --- a/runtime/src/main/java/org/apache/flink/agents/runtime/python/utils/PythonActionExecutor.java +++ b/runtime/src/main/java/org/apache/flink/agents/runtime/python/utils/PythonActionExecutor.java @@ -146,8 +146,8 @@ public class PythonActionExecutor { checkState(result.getClass().isArray() && ((Object[]) result).length == 2); Object[] resultArray = (Object[]) result; byte[] eventBytes = (byte[]) resultArray[0]; - String eventString = (String) resultArray[1]; - return new PythonEvent(eventBytes, EventUtil.PYTHON_INPUT_EVENT_NAME, eventString); + String eventJsonStr = (String) resultArray[1]; + return new PythonEvent(eventBytes, EventUtil.PYTHON_INPUT_EVENT_NAME, eventJsonStr); } public Object getOutputFromOutputEvent(byte[] pythonOutputEvent) { diff --git a/runtime/src/test/java/org/apache/flink/agents/runtime/eventlog/EventLogRecordJsonSerdeTest.java b/runtime/src/test/java/org/apache/flink/agents/runtime/eventlog/EventLogRecordJsonSerdeTest.java index 6f695bd6..14a3aa87 100644 --- a/runtime/src/test/java/org/apache/flink/agents/runtime/eventlog/EventLogRecordJsonSerdeTest.java +++ b/runtime/src/test/java/org/apache/flink/agents/runtime/eventlog/EventLogRecordJsonSerdeTest.java @@ -56,20 +56,16 @@ class EventLogRecordJsonSerdeTest { JsonNode jsonNode = objectMapper.readTree(json); // Verify structure - assertTrue(jsonNode.has("context")); + assertTrue(jsonNode.has("timestamp")); assertTrue(jsonNode.has("event")); - // Verify context - JsonNode contextNode = jsonNode.get("context"); - assertTrue(contextNode.has("eventType")); - assertTrue(contextNode.has("timestamp")); - assertEquals( - "org.apache.flink.agents.api.InputEvent", contextNode.get("eventType").asText()); - // Verify event JsonNode eventNode = jsonNode.get("event"); + assertTrue(eventNode.has("eventType")); assertTrue(eventNode.has("input")); assertEquals("test input data", eventNode.get("input").asText()); + assertEquals("org.apache.flink.agents.api.InputEvent", eventNode.get("eventType").asText()); + assertFalse(eventNode.has("sourceTimestamp")); } @Test @@ -86,7 +82,7 @@ class EventLogRecordJsonSerdeTest { JsonNode jsonNode = objectMapper.readTree(json); assertEquals( "org.apache.flink.agents.api.OutputEvent", - jsonNode.get("context").get("eventType").asText()); + jsonNode.get("event").get("eventType").asText()); assertEquals("test output data", jsonNode.get("event").get("output").asText()); } @@ -104,7 +100,7 @@ class EventLogRecordJsonSerdeTest { JsonNode jsonNode = objectMapper.readTree(json); assertEquals( "org.apache.flink.agents.runtime.eventlog.EventLogRecordJsonSerdeTest$CustomTestEvent", - jsonNode.get("context").get("eventType").asText()); + jsonNode.get("event").get("eventType").asText()); JsonNode eventNode = jsonNode.get("event"); assertEquals("custom data", eventNode.get("customData").asText()); diff --git a/runtime/src/test/java/org/apache/flink/agents/runtime/eventlog/FileEventLoggerTest.java b/runtime/src/test/java/org/apache/flink/agents/runtime/eventlog/FileEventLoggerTest.java index 1a77ccf0..1cd4d0ca 100644 --- a/runtime/src/test/java/org/apache/flink/agents/runtime/eventlog/FileEventLoggerTest.java +++ b/runtime/src/test/java/org/apache/flink/agents/runtime/eventlog/FileEventLoggerTest.java @@ -178,7 +178,7 @@ class FileEventLoggerTest { JsonNode jsonNode = objectMapper.readTree(lines.get(0)); assertEquals( "org.apache.flink.agents.runtime.eventlog.FileEventLoggerTest$TestCustomEvent", - jsonNode.get("context").get("eventType").asText()); + jsonNode.get("event").get("eventType").asText()); JsonNode eventNode = jsonNode.get("event"); assertEquals("custom data", eventNode.get("customData").asText()); diff --git a/runtime/src/test/java/org/apache/flink/agents/runtime/python/event/PythonEventTest.java b/runtime/src/test/java/org/apache/flink/agents/runtime/python/event/PythonEventTest.java index e3a21c57..597f1014 100644 --- a/runtime/src/test/java/org/apache/flink/agents/runtime/python/event/PythonEventTest.java +++ b/runtime/src/test/java/org/apache/flink/agents/runtime/python/event/PythonEventTest.java @@ -43,33 +43,36 @@ class PythonEventTest { } @Test - void testCreatePythonEventWithEventString() { + void testCreatePythonEventWithEventJsonStr() { // Given byte[] eventBytes = new byte[] {1, 2, 3, 4, 5}; String eventType = "flink_agents.api.events.event.InputEvent"; - String eventString = "InputEvent(input='test data')"; + String eventJsonStr = + "{\"eventType\":\"flink_agents.api.events.event.InputEvent\",\"input\":\"test data\"}"; // When - PythonEvent event = new PythonEvent(eventBytes, eventType, eventString); + PythonEvent event = new PythonEvent(eventBytes, eventType, eventJsonStr); // Then assertThat(event.getEvent()).isEqualTo(eventBytes); assertThat(event.getEventType()).isEqualTo(eventType); - assertThat(event.getEventString()).isEqualTo(eventString); + assertThat(event.getEventJsonStr()).isEqualTo(eventJsonStr); } @Test - void testJsonSerializationWithEventString() throws Exception { + void testJsonSerializationWithEventJsonStr() throws Exception { // Given UUID expectedId = UUID.randomUUID(); Map<String, Object> expectedAttributes = new HashMap<>(); expectedAttributes.put("testKey", "testValue"); byte[] eventBytes = "test_bytes".getBytes(); String eventType = "flink_agents.api.events.event.OutputEvent"; - String eventString = "OutputEvent(output={'key': 'value'})"; + String eventJsonStr = + "{\"eventType\":\"flink_agents.api.events.event.OutputEvent\",\"output\":{\"key\":\"value\"}}"; PythonEvent event = - new PythonEvent(expectedId, expectedAttributes, eventBytes, eventType, eventString); + new PythonEvent( + expectedId, expectedAttributes, eventBytes, eventType, eventJsonStr); // When String json = objectMapper.writeValueAsString(event); @@ -78,27 +81,28 @@ class PythonEventTest { JsonNode jsonNode = objectMapper.readTree(json); assertThat(jsonNode.has("id")).isTrue(); assertThat(jsonNode.has("eventType")).isTrue(); - assertThat(jsonNode.has("eventString")).isTrue(); + assertThat(jsonNode.has("eventJsonStr")).isTrue(); assertThat(jsonNode.has("attributes")).isTrue(); // event bytes should not be serialized assertThat(jsonNode.has("event")).isFalse(); assertThat(jsonNode.get("eventType").asText()).isEqualTo(eventType); - assertThat(jsonNode.get("eventString").asText()).isEqualTo(eventString); + assertThat(jsonNode.get("eventJsonStr").asText()).isEqualTo(eventJsonStr); assertThat(jsonNode.get("attributes").get("testKey").asText()).isEqualTo("testValue"); } @Test - void testEventLogRecordSerializationWithEventString() throws Exception { + void testEventLogRecordSerializationWithEventJsonStr() throws Exception { // Given - simulate how PythonEvent is used in EventLogger UUID eventId = UUID.randomUUID(); Map<String, Object> attributes = new HashMap<>(); attributes.put("source", "python"); byte[] eventBytes = "serialized_event".getBytes(); String eventType = "flink_agents.api.events.event.InputEvent"; - String eventString = "InputEvent(input={'key': 'value', 'count': 42})"; + String eventJsonStr = + "{\"eventType\":\"flink_agents.api.events.event.InputEvent\",\"input\":{\"key\":\"value\",\"count\":42}}"; PythonEvent pythonEvent = - new PythonEvent(eventId, attributes, eventBytes, eventType, eventString); + new PythonEvent(eventId, attributes, eventBytes, eventType, eventJsonStr); pythonEvent.setSourceTimestamp(1234567890L); EventContext context = new EventContext(pythonEvent); @@ -110,13 +114,7 @@ class PythonEventTest { // Then JsonNode jsonNode = objectMapper.readTree(json); - // Verify context contains PythonEvent type - assertThat(jsonNode.get("context").get("eventType").asText()) - .isEqualTo("org.apache.flink.agents.runtime.python.event.PythonEvent"); - - // Verify event contains human-readable eventString JsonNode eventNode = jsonNode.get("event"); - assertThat(eventNode.get("eventString").asText()).isEqualTo(eventString); assertThat(eventNode.get("eventType").asText()).isEqualTo(eventType); assertThat(eventNode.get("id").asText()).isEqualTo(eventId.toString()); // Byte array should not be in the log
