This is an automated email from the ASF dual-hosted git repository.
zclllyybb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new c333631534b [chore](ci) Record Codex review traces in Litefuse (#64319)
c333631534b is described below
commit c333631534b08b0e874d4bc863c0cade8bf40a80
Author: zclllyybb <[email protected]>
AuthorDate: Tue Jun 9 20:30:29 2026 +0800
[chore](ci) Record Codex review traces in Litefuse (#64319)
Run the AI review workflow through Codex, persist the review
prompt/output even when review fails, and emit Codex JSONL steps to
Litefuse using the public ingestion API.
The runner now uses a Codex-native auth.json stored at
oss://doris-community-ci/codex/auth.json. It downloads that file
directly into CODEX_HOME and syncs the refreshed Codex auth.json back to
the same object, without converting to or from the old opencode auth
format.
Codex execution uses workspace-write sandboxing with
approval_policy=never instead of dangerous bypass mode. The workflow
does not add any writable directory outside the checked-out repository,
while keeping network access available for GitHub review operations.
The Litefuse emitter creates a trace, root review span, turn generation,
and per-step observations with non-empty input/output. Agent messages
use the event window since the previous agent message as input,
preserving command/tool/reasoning context as a structured list.
Readback verification uses Litefuse public observation APIs to assert
trace I/O, step observation counts, missing I/O, and agent-message
context structure without logging prompt text or command output.
---
.github/scripts/emit_litefuse_otel_io.py | 829 +++++++++++++++++++++++++++
.github/workflows/opencode-review-runner.yml | 159 +++--
2 files changed, 952 insertions(+), 36 deletions(-)
diff --git a/.github/scripts/emit_litefuse_otel_io.py
b/.github/scripts/emit_litefuse_otel_io.py
new file mode 100644
index 00000000000..d8710342d58
--- /dev/null
+++ b/.github/scripts/emit_litefuse_otel_io.py
@@ -0,0 +1,829 @@
+#!/usr/bin/env python3
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import argparse
+import base64
+from datetime import datetime, timedelta, timezone
+import json
+import os
+import secrets
+import time
+import urllib.parse
+import urllib.request
+
+
+def read_text(path, max_chars, tail=False, optional=False):
+ try:
+ with open(path, "r", encoding="utf-8", errors="replace") as handle:
+ data = handle.read()
+ except FileNotFoundError:
+ if optional:
+ return ""
+ raise
+ return truncate_text(data, max_chars, tail)
+
+
+def truncate_text(data, max_chars, tail=False):
+ if max_chars <= 0 or len(data) <= max_chars:
+ return data
+ if tail:
+ return f"[truncated to last {max_chars} chars]\n" + data[-max_chars:]
+ return data[:max_chars] + f"\n[truncated to first {max_chars} chars]"
+
+
+def truncate_json(value, max_chars):
+ text = json.dumps(value, ensure_ascii=False, sort_keys=True)
+ if max_chars <= 0 or len(text) <= max_chars:
+ return value
+ return {"truncated_json": truncate_text(text, max_chars)}
+
+
+def json_attr(value):
+ return json.dumps(value, ensure_ascii=False, sort_keys=True)
+
+
+def load_jsonl(path):
+ events = []
+ with open(path, "r", encoding="utf-8", errors="replace") as handle:
+ for line_number, line in enumerate(handle, start=1):
+ stripped = line.strip()
+ if not stripped:
+ continue
+ try:
+ event = json.loads(stripped)
+ except json.JSONDecodeError as exc:
+ events.append(
+ {
+ "type": "error",
+ "message": f"invalid jsonl line {line_number}: {exc}",
+ }
+ )
+ continue
+ event["_line_number"] = line_number
+ events.append(event)
+ return events
+
+
+def find_final_message(events, output_text):
+ if output_text.strip():
+ return output_text
+ for event in reversed(events):
+ item = event.get("item") or {}
+ if event.get("type") == "item.completed" and item.get("type") ==
"agent_message":
+ text = item.get("text") or ""
+ if text.strip():
+ return text
+ return ""
+
+
+def latest_turn_result(events):
+ for event in reversed(events):
+ if event.get("type") == "turn.completed":
+ return "completed", event.get("usage") or {}
+ if event.get("type") == "turn.failed":
+ return "failed", event.get("error") or {}
+ return "unknown", {}
+
+
+def event_context_payload(event, max_json_chars):
+ item = event.get("item") if isinstance(event.get("item"), dict) else None
+ payload = {
+ "event_type": event.get("type"),
+ "line_number": event.get("_line_number"),
+ }
+ if not item:
+ event_payload = {
+ key: value for key, value in event.items() if key != "_line_number"
+ }
+ payload["payload"] = truncate_json(event_payload, max_json_chars)
+ return payload
+
+ item_type = item.get("type", "unknown")
+ payload.update(
+ {
+ "item_id": item.get("id", ""),
+ "item_type": item_type,
+ "status": item.get("status"),
+ }
+ )
+
+ if item_type == "command_execution":
+ payload.update(
+ {
+ "command": item.get("command") or "",
+ "exit_code": item.get("exit_code"),
+ "aggregated_output": truncate_text(
+ item.get("aggregated_output") or "", max_json_chars,
tail=True
+ ),
+ }
+ )
+ elif item_type == "mcp_tool_call":
+ payload.update(
+ {
+ "server": item.get("server"),
+ "tool": item.get("tool"),
+ "arguments": truncate_json(item.get("arguments"),
max_json_chars),
+ "result": truncate_json(item.get("result"), max_json_chars),
+ "error": truncate_json(item.get("error"), max_json_chars),
+ }
+ )
+ elif item_type == "collab_tool_call":
+ payload.update(
+ {
+ "tool": item.get("tool"),
+ "prompt": truncate_text(item.get("prompt") or "",
max_json_chars),
+ "sender_thread_id": item.get("sender_thread_id"),
+ "receiver_thread_ids": item.get("receiver_thread_ids") or [],
+ "agents_states": truncate_json(item.get("agents_states"),
max_json_chars),
+ }
+ )
+ elif item_type == "agent_message":
+ payload["text"] = truncate_text(item.get("text") or "", max_json_chars)
+ else:
+ payload["item"] = truncate_json(item, max_json_chars)
+
+ return {key: value for key, value in payload.items() if value not in
(None, "")}
+
+
+def build_agent_message_inputs(events, turn_input, max_json_chars):
+ inputs = {}
+ previous_agent_message = None
+ context_events = []
+ for event in events:
+ item = event.get("item") if isinstance(event.get("item"), dict) else {}
+ is_agent_message = (
+ event.get("type") == "item.completed"
+ and item.get("type") == "agent_message"
+ )
+ if is_agent_message:
+ current_id = item.get("id") or f"line:{event.get('_line_number',
0)}"
+ input_payload = {
+ "item_type": "agent_message",
+ "context_window": {
+ "from": (
+ previous_agent_message.get("id")
+ if previous_agent_message
+ else "turn_start"
+ ),
+ "to": current_id,
+ "event_count": len(context_events),
+ },
+ "events_since_previous_agent_message": [
+ event_context_payload(context_event, max_json_chars)
+ for context_event in context_events
+ ],
+ }
+ if previous_agent_message:
+ input_payload["previous_agent_message"] = {
+ "id": previous_agent_message.get("id", ""),
+ "text": truncate_text(
+ previous_agent_message.get("text") or "",
max_json_chars
+ ),
+ }
+ else:
+ input_payload["turn_input"] = truncate_text(turn_input,
max_json_chars)
+ inputs[event.get("_line_number")] = input_payload
+ previous_agent_message = item
+ context_events = []
+ else:
+ context_events.append(event)
+ return inputs
+
+
+def observation_shape(item, max_json_chars, agent_message_input=None):
+ item_type = item.get("type", "unknown")
+ status = item.get("status")
+
+ if item_type == "command_execution":
+ command = item.get("command") or ""
+ return {
+ "name": "codex.command",
+ "type": "span",
+ "input": {"command": command},
+ "output": {
+ "status": status,
+ "exit_code": item.get("exit_code"),
+ "aggregated_output": truncate_text(
+ item.get("aggregated_output") or "", max_json_chars,
tail=True
+ ),
+ },
+ "metadata": {"command": command[:240], "status": status,
"step_kind": "tool"},
+ }
+
+ if item_type == "mcp_tool_call":
+ return {
+ "name": f"codex.mcp.{item.get('server',
'unknown')}.{item.get('tool', 'unknown')}",
+ "type": "span",
+ "input": {
+ "server": item.get("server"),
+ "tool": item.get("tool"),
+ "arguments": truncate_json(item.get("arguments"),
max_json_chars),
+ },
+ "output": {
+ "status": status,
+ "result": truncate_json(item.get("result"), max_json_chars),
+ "error": truncate_json(item.get("error"), max_json_chars),
+ },
+ "metadata": {
+ "server": item.get("server"),
+ "tool": item.get("tool"),
+ "status": status,
+ "step_kind": "tool",
+ },
+ }
+
+ if item_type == "collab_tool_call":
+ return {
+ "name": f"codex.collab.{item.get('tool', 'unknown')}",
+ "type": "span",
+ "input": {
+ "tool": item.get("tool"),
+ "prompt": truncate_text(item.get("prompt") or "",
max_json_chars),
+ "sender_thread_id": item.get("sender_thread_id"),
+ },
+ "output": {
+ "status": status,
+ "receiver_thread_ids": item.get("receiver_thread_ids") or [],
+ "agents_states": truncate_json(item.get("agents_states"),
max_json_chars),
+ },
+ "metadata": {"tool": item.get("tool"), "status": status,
"step_kind": "tool"},
+ }
+
+ if item_type == "web_search":
+ return {
+ "name": "codex.web_search",
+ "type": "span",
+ "input": {"query": item.get("query"), "action":
item.get("action")},
+ "output": {"status": "completed", "query": item.get("query")},
+ "metadata": {"query": (item.get("query") or "")[:240],
"step_kind": "tool"},
+ }
+
+ if item_type == "file_change":
+ return {
+ "name": "codex.file_change",
+ "type": "span",
+ "input": {"changes": item.get("changes") or []},
+ "output": {"status": status, "changes": item.get("changes") or []},
+ "metadata": {"status": status},
+ }
+
+ if item_type == "todo_list":
+ return {
+ "name": "codex.todo_list",
+ "type": "span",
+ "input": {"item_type": item_type},
+ "output": {"items": item.get("items") or []},
+ "metadata": {"item_count": len(item.get("items") or [])},
+ }
+
+ if item_type == "reasoning":
+ return {
+ "name": "codex.reasoning",
+ "type": "span",
+ "input": {"item_type": item_type},
+ "output": {"text": truncate_text(item.get("text") or "",
max_json_chars)},
+ "metadata": {},
+ }
+
+ if item_type == "agent_message":
+ return {
+ "name": "codex.agent_message",
+ "type": "generation",
+ "input": agent_message_input or {"item_type": item_type},
+ "output": {"text": truncate_text(item.get("text") or "",
max_json_chars)},
+ "metadata": {},
+ }
+
+ if item_type == "error":
+ return {
+ "name": "codex.error",
+ "type": "span",
+ "input": {"item_type": item_type},
+ "output": {"message": item.get("message") or ""},
+ "metadata": {},
+ }
+
+ return {
+ "name": f"codex.{item_type}",
+ "type": "span",
+ "input": {"item_type": item_type},
+ "output": truncate_json(item, max_json_chars),
+ "metadata": {"item_type": item_type},
+ }
+
+
+def iso_from_ns(ns):
+ return (
+ datetime.fromtimestamp(ns / 1_000_000_000, timezone.utc)
+ .isoformat(timespec="milliseconds")
+ .replace("+00:00", "Z")
+ )
+
+
+def ingestion_event(event_type, timestamp, body):
+ return {
+ "id": secrets.token_hex(16),
+ "timestamp": timestamp,
+ "type": event_type,
+ "body": body,
+ }
+
+
+def build_ingestion_payload(args, input_text, output_text, events):
+ now = time.time_ns()
+ trace_id = secrets.token_hex(16)
+ root_observation_id = secrets.token_hex(16)
+ turn_status, turn_payload = latest_turn_result(events)
+ final_message = find_final_message(events, output_text)
+ trace_output = final_message or json_attr({"turn_status": turn_status})
+
+ trace_metadata = {
+ "repository": args.repository,
+ "workflow": args.workflow,
+ "run_id": args.run_id,
+ "pr_number": args.pr_number,
+ "head_sha": args.head_sha,
+ "base_sha": args.base_sha,
+ "model_reasoning_effort": args.reasoning_effort,
+ "codex_jsonl": True,
+ }
+ trace_metadata = {
+ key: value for key, value in trace_metadata.items() if value not in
(None, "")
+ }
+
+ completed_items = [
+ event
+ for event in events
+ if event.get("type") == "item.completed" and
isinstance(event.get("item"), dict)
+ ]
+ agent_message_inputs = build_agent_message_inputs(
+ events, input_text, args.max_json_chars
+ )
+ child_count = len(completed_items) + 1
+ root_end = now + (child_count + 2) * 1_000_000
+ timestamp = iso_from_ns(now)
+
+ batch = [
+ ingestion_event(
+ "trace-create",
+ timestamp,
+ {
+ "id": trace_id,
+ "timestamp": timestamp,
+ "name": args.trace_name,
+ "input": input_text,
+ "output": trace_output,
+ "sessionId": args.session_id,
+ "environment": args.environment,
+ "metadata": trace_metadata,
+ "tags": ["doris-ai-review", "codex-jsonl"],
+ },
+ ),
+ ingestion_event(
+ "span-create",
+ timestamp,
+ {
+ "id": root_observation_id,
+ "traceId": trace_id,
+ "name": "codex.review",
+ "startTime": iso_from_ns(now),
+ "endTime": iso_from_ns(root_end),
+ "input": {"prompt": input_text},
+ "output": {"final_message": trace_output, "turn_status":
turn_status},
+ "environment": args.environment,
+ "metadata": {
+ **trace_metadata,
+ "codex_event_count": len(events),
+ "codex_completed_item_count": len(completed_items),
+ },
+ },
+ ),
+ ]
+
+ turn_body = {
+ "id": secrets.token_hex(16),
+ "traceId": trace_id,
+ "parentObservationId": root_observation_id,
+ "name": "codex.turn",
+ "startTime": iso_from_ns(now + 1_000_000),
+ "endTime": iso_from_ns(now + 2_000_000),
+ "model": args.model,
+ "input": {"prompt": input_text},
+ "output": {"status": turn_status, "final_message": trace_output},
+ "environment": args.environment,
+ "metadata": {**trace_metadata, "item_type": "turn"},
+ "level": "ERROR" if turn_status == "failed" else "DEFAULT",
+ }
+ if turn_status == "completed":
+ turn_body["usageDetails"] = {
+ "input": int(turn_payload.get("input_tokens") or 0),
+ "output": int(turn_payload.get("output_tokens") or 0),
+ "cache_read_input_tokens":
int(turn_payload.get("cached_input_tokens") or 0),
+ "reasoning_output_tokens": int(
+ turn_payload.get("reasoning_output_tokens") or 0
+ ),
+ }
+ else:
+ turn_body["statusMessage"] = json_attr(turn_payload)
+
+ batch.append(ingestion_event("generation-create", iso_from_ns(now +
1_000_000), turn_body))
+
+ for offset, event in enumerate(completed_items, start=2):
+ item = event["item"]
+ shape = observation_shape(
+ item, args.max_json_chars,
agent_message_inputs.get(event.get("_line_number"))
+ )
+ item_type = item.get("type", "unknown")
+ item_status = item.get("status", "completed")
+ body = {
+ "id": secrets.token_hex(16),
+ "traceId": trace_id,
+ "parentObservationId": root_observation_id,
+ "name": shape["name"],
+ "startTime": iso_from_ns(now + offset * 1_000_000),
+ "endTime": iso_from_ns(now + offset * 1_000_000 + 750_000),
+ "input": shape["input"],
+ "output": shape["output"],
+ "environment": args.environment,
+ "level": "ERROR" if item_status in ("failed", "declined") else
"DEFAULT",
+ "metadata": {
+ **trace_metadata,
+ "item_id": item.get("id", ""),
+ "item_type": item_type,
+ "event_line": event.get("_line_number", 0),
+ },
+ }
+ for key, value in shape.get("metadata", {}).items():
+ if value not in (None, ""):
+ body["metadata"][key] = value
+ event_type = "generation-create" if shape["type"] == "generation" else
"span-create"
+ if shape["type"] == "generation":
+ body["model"] = args.model
+ batch.append(ingestion_event(event_type, body["startTime"], body))
+
+ return trace_id, {"batch": batch}, len(batch) - 1
+
+
+def post_payload(endpoint, public_key, secret_key, payload):
+ auth = base64.b64encode(f"{public_key}:{secret_key}".encode()).decode()
+ request = urllib.request.Request(
+ endpoint,
+ data=json.dumps(payload).encode(),
+ headers={
+ "Content-Type": "application/json",
+ "Authorization": f"Basic {auth}",
+ },
+ method="POST",
+ )
+ with urllib.request.urlopen(request, timeout=30) as response:
+ body = response.read().decode()
+ detail = json.loads(body) if body else {}
+ errors = detail.get("errors") if isinstance(detail, dict) else None
+ if errors:
+ raise RuntimeError(f"Litefuse ingestion returned errors:
{json_attr(errors)}")
+ return {
+ "status": response.status,
+ "success_count": len(detail.get("successes") or [])
+ if isinstance(detail, dict)
+ else 0,
+ }
+
+
+def fetch_trace(base_url, public_key, secret_key, trace_id):
+ auth = base64.b64encode(f"{public_key}:{secret_key}".encode()).decode()
+ request = urllib.request.Request(
+ f"{base_url.rstrip('/')}/api/public/traces/{trace_id}",
+ headers={"Authorization": f"Basic {auth}"},
+ method="GET",
+ )
+ with urllib.request.urlopen(request, timeout=30) as response:
+ return json.loads(response.read().decode())
+
+
+def fetch_observations_v2(base_url, public_key, secret_key, trace_id):
+ auth = base64.b64encode(f"{public_key}:{secret_key}".encode()).decode()
+ now = datetime.now(timezone.utc)
+ params = urllib.parse.urlencode(
+ {
+ "traceId": trace_id,
+ "fromStartTime": (now -
timedelta(hours=1)).isoformat().replace("+00:00", "Z"),
+ "toStartTime": (now +
timedelta(minutes=5)).isoformat().replace("+00:00", "Z"),
+ "fields": "core,basic,io,trace_context,model,usage",
+ "limit": "100",
+ }
+ )
+ request = urllib.request.Request(
+ f"{base_url.rstrip('/')}/api/public/v2/observations?{params}",
+ headers={"Authorization": f"Basic {auth}"},
+ method="GET",
+ )
+ with urllib.request.urlopen(request, timeout=30) as response:
+ return json.loads(response.read().decode())
+
+
+def fetch_observations_legacy(base_url, public_key, secret_key, trace_id):
+ auth = base64.b64encode(f"{public_key}:{secret_key}".encode()).decode()
+ params = urllib.parse.urlencode({"traceId": trace_id, "limit": "100",
"page": "1"})
+ request = urllib.request.Request(
+ f"{base_url.rstrip('/')}/api/public/observations?{params}",
+ headers={"Authorization": f"Basic {auth}"},
+ method="GET",
+ )
+ with urllib.request.urlopen(request, timeout=30) as response:
+ return json.loads(response.read().decode())
+
+
+def observation_rows_from_v2(payload):
+ rows = payload.get("data") if isinstance(payload, dict) else None
+ if isinstance(rows, list):
+ return rows
+ return []
+
+
+def observation_io_object(observation, field):
+ value = observation.get(field)
+ if isinstance(value, dict):
+ return value
+ if isinstance(value, str):
+ try:
+ parsed = json.loads(value)
+ except json.JSONDecodeError:
+ return {}
+ if isinstance(parsed, dict):
+ return parsed
+ return {}
+
+
+def context_event_types(events_value):
+ if isinstance(events_value, list):
+ return [
+ event.get("item_type") or event.get("event_type")
+ for event in events_value
+ if isinstance(event, dict)
+ ]
+ if isinstance(events_value, dict) and "truncated_json" in events_value:
+ return ["truncated_json"]
+ return []
+
+
+def context_events_readback_ok(input_object):
+ context_window = input_object.get("context_window")
+ event_count = 0
+ if isinstance(context_window, dict):
+ try:
+ event_count = int(context_window.get("event_count") or 0)
+ except (TypeError, ValueError):
+ event_count = 0
+ events_value = input_object.get("events_since_previous_agent_message")
+ if isinstance(events_value, list):
+ return True
+ return event_count == 0 and events_value in (None, "", {})
+
+
+def verify_trace(args, public_key, secret_key, trace_id):
+ last_diagnostic = {}
+ for _ in range(args.verify_attempts):
+ legacy_trace_error = ""
+ try:
+ legacy_detail = fetch_trace(args.base_url, public_key, secret_key,
trace_id)
+ except Exception as exc:
+ legacy_detail = {}
+ legacy_trace_error = type(exc).__name__
+ try:
+ observations_payload = fetch_observations_legacy(
+ args.base_url, public_key, secret_key, trace_id
+ )
+ observations = observation_rows_from_v2(observations_payload)
+ read_source = "legacy_observations"
+ except Exception as exc:
+ try:
+ observations_payload = fetch_observations_v2(
+ args.base_url, public_key, secret_key, trace_id
+ )
+ observations = observation_rows_from_v2(observations_payload)
+ read_source = "v2_observations"
+ except Exception:
+ observations = legacy_detail.get("observations") or []
+ read_source = f"legacy_trace_fallback:{type(exc).__name__}"
+ observations_missing_io = [
+ observation
+ for observation in observations
+ if not (observation.get("input") and observation.get("output"))
+ ]
+ step_observations = [
+ observation
+ for observation in observations
+ if observation.get("name") not in ("codex.review", "codex.turn")
+ ]
+ agent_message_observations = [
+ observation
+ for observation in observations
+ if observation.get("name") == "codex.agent_message"
+ ]
+ agent_message_input_objects = [
+ observation_io_object(observation, "input")
+ for observation in agent_message_observations
+ ]
+ agent_message_input_keys = sorted(
+ {
+ key
+ for input_object in agent_message_input_objects
+ for key in input_object.keys()
+ }
+ )
+ agent_message_all_have_context_window = all(
+ bool(input_object.get("context_window"))
+ for input_object in agent_message_input_objects
+ )
+ agent_message_all_have_context_events = all(
+ context_events_readback_ok(input_object)
+ for input_object in agent_message_input_objects
+ )
+ agent_message_with_previous_count = sum(
+ 1
+ for input_object in agent_message_input_objects
+ if bool(input_object.get("previous_agent_message"))
+ )
+ agent_message_with_turn_input_count = sum(
+ 1
+ for input_object in agent_message_input_objects
+ if bool(input_object.get("turn_input"))
+ )
+ agent_message_context_event_counts = [
+ (input_object.get("context_window") or {}).get("event_count", 0)
+ for input_object in agent_message_input_objects[:20]
+ if isinstance(input_object.get("context_window"), dict)
+ ]
+ agent_message_event_type_samples = [
+ context_event_types(
+ input_object.get("events_since_previous_agent_message")
+ )[:8]
+ for input_object in agent_message_input_objects[:5]
+ ]
+ agent_message_structure_ok = (
+ agent_message_all_have_context_window
+ and agent_message_all_have_context_events
+ and agent_message_with_turn_input_count == 1
+ and (
+ agent_message_with_previous_count
+ == max(len(agent_message_observations) - 1, 0)
+ )
+ )
+ root_observation = next(
+ (observation for observation in observations if
observation.get("name") == "codex.review"),
+ {},
+ )
+ trace_input = legacy_detail.get("input") or
root_observation.get("input")
+ trace_output = legacy_detail.get("output") or
root_observation.get("output")
+ last_diagnostic = {
+ "read_source": read_source,
+ "legacy_trace_input": bool(legacy_detail.get("input")),
+ "legacy_trace_output": bool(legacy_detail.get("output")),
+ "legacy_trace_error": legacy_trace_error,
+ "trace_input": bool(trace_input),
+ "trace_output": bool(trace_output),
+ "observation_count": len(observations),
+ "step_observation_count": len(step_observations),
+ "agent_message_count": len(agent_message_observations),
+ "agent_message_input_keys": agent_message_input_keys,
+ "agent_message_all_have_context_window":
agent_message_all_have_context_window,
+ "agent_message_all_have_context_events":
agent_message_all_have_context_events,
+ "agent_message_with_previous_count":
agent_message_with_previous_count,
+ "agent_message_with_turn_input_count":
agent_message_with_turn_input_count,
+ "agent_message_context_event_counts":
agent_message_context_event_counts,
+ "agent_message_event_type_samples":
agent_message_event_type_samples,
+ "observations_missing_io": [
+ observation.get("name") for observation in
observations_missing_io[:20]
+ ],
+ "observation_names": [observation.get("name") for observation in
observations[:20]],
+ }
+ ok = all(
+ [
+ trace_input,
+ trace_output,
+ len(observations) >= args.min_observations,
+ len(step_observations) >= args.min_step_observations,
+ not observations_missing_io,
+ not agent_message_observations or agent_message_structure_ok,
+ ]
+ )
+ if ok:
+ return {
+ "trace_input": True,
+ "trace_output": True,
+ "read_source": read_source,
+ "observation_count": len(observations),
+ "step_observation_count": len(step_observations),
+ "agent_message_count": len(agent_message_observations),
+ "agent_message_input_keys": agent_message_input_keys,
+ "agent_message_all_have_context_window":
agent_message_all_have_context_window,
+ "agent_message_all_have_context_events":
agent_message_all_have_context_events,
+ "agent_message_with_previous_count":
agent_message_with_previous_count,
+ "agent_message_with_turn_input_count":
agent_message_with_turn_input_count,
+ "agent_message_context_event_counts":
agent_message_context_event_counts,
+ "agent_message_event_type_samples":
agent_message_event_type_samples,
+ "observations_missing_io": [],
+ "observation_names": [
+ observation.get("name") for observation in
observations[:20]
+ ],
+ }
+ time.sleep(args.verify_sleep_seconds)
+ raise RuntimeError(
+ "Litefuse trace "
+ f"{trace_id} did not expose multi-step I/O in time; "
+ f"last_diagnostic={json.dumps(last_diagnostic, sort_keys=True)}"
+ )
+
+
+def parse_args():
+ parser = argparse.ArgumentParser()
+ parser.add_argument("--base-url", default="https://litefuse.cloud")
+ parser.add_argument("--endpoint", default=None)
+ parser.add_argument("--input-file", required=True)
+ parser.add_argument("--events-file", required=True)
+ parser.add_argument("--output-file", default="")
+ parser.add_argument("--trace-name", default="doris-ai-review")
+ parser.add_argument("--session-id", required=True)
+ parser.add_argument("--repository", default="")
+ parser.add_argument("--workflow", default="")
+ parser.add_argument("--run-id", default="")
+ parser.add_argument("--pr-number", default="")
+ parser.add_argument("--head-sha", default="")
+ parser.add_argument("--base-sha", default="")
+ parser.add_argument("--model", default="gpt-5.5")
+ parser.add_argument("--reasoning-effort", default="")
+ parser.add_argument("--environment", default="github-actions")
+ parser.add_argument("--max-input-chars", type=int, default=200_000)
+ parser.add_argument("--max-output-chars", type=int, default=200_000)
+ parser.add_argument("--max-json-chars", type=int, default=40_000)
+ parser.add_argument("--verify", action="store_true")
+ parser.add_argument("--dry-run", action="store_true")
+ parser.add_argument("--verify-attempts", type=int, default=24)
+ parser.add_argument("--verify-sleep-seconds", type=int, default=5)
+ parser.add_argument("--min-observations", type=int, default=3)
+ parser.add_argument("--min-step-observations", type=int, default=1)
+ return parser.parse_args()
+
+
+def main():
+ args = parse_args()
+ endpoint = args.endpoint or
f"{args.base_url.rstrip('/')}/api/public/ingestion"
+
+ input_text = read_text(args.input_file, args.max_input_chars)
+ output_text = (
+ read_text(args.output_file, args.max_output_chars, tail=True,
optional=True)
+ if args.output_file
+ else ""
+ )
+ events = load_jsonl(args.events_file)
+ trace_id, payload, observation_count = build_ingestion_payload(
+ args, input_text, output_text, events
+ )
+
+ result = {
+ "dry_run": args.dry_run,
+ "event_count": len(events),
+ "observation_count": observation_count,
+ "trace_id": trace_id,
+ "session_id": args.session_id,
+ "trace_name": args.trace_name,
+ "model": args.model,
+ "reasoning_effort": args.reasoning_effort,
+ }
+ if args.dry_run:
+ result["batch_count"] = len(payload["batch"])
+ result["event_types"] = [event["type"] for event in
payload["batch"][:10]]
+ print(json.dumps(result, sort_keys=True))
+ return
+
+ public_key = os.environ["LANGFUSE_PUBLIC_KEY"]
+ secret_key = os.environ["LANGFUSE_SECRET_KEY"]
+ result["status"] = post_payload(endpoint, public_key, secret_key, payload)
+
+ if args.verify:
+ try:
+ result["verified"] = verify_trace(args, public_key, secret_key,
trace_id)
+ except Exception as exc:
+ result["verification_error"] = str(exc)
+ print(json.dumps(result, sort_keys=True))
+ raise
+ print(json.dumps(result, sort_keys=True))
+
+
+if __name__ == "__main__":
+ main()
diff --git a/.github/workflows/opencode-review-runner.yml
b/.github/workflows/opencode-review-runner.yml
index 3d80e110fa9..2dd75445c9e 100644
--- a/.github/workflows/opencode-review-runner.yml
+++ b/.github/workflows/opencode-review-runner.yml
@@ -32,16 +32,16 @@ jobs:
with:
ref: ${{ inputs.head_sha }}
- - name: Install runner utilities
+ - name: Install ripgrep
run: |
sudo apt-get update
- sudo apt-get install -y ripgrep unzip
+ sudo apt-get install -y ripgrep
- - name: Install OpenCode
+ - name: Install Codex
run: |
for attempt in 1 2 3; do
- if curl -fsSL https://opencode.ai/install | bash; then
- echo "$HOME/.opencode/bin" >> $GITHUB_PATH
+ if npm install -g @openai/codex; then
+ codex --version
exit 0
fi
echo "Install attempt $attempt failed, retrying in 10s..."
@@ -58,18 +58,42 @@ jobs:
unzip -q "$tmp_dir/ossutil.zip" -d "$tmp_dir"
sudo install -m 0755 "$tmp_dir/ossutil-v1.7.19-linux-amd64/ossutil"
/usr/local/bin/ossutil
- - name: Configure OpenCode auth
- id: configure-auth
+ - name: Configure Codex auth
+ run: |
+ install -m 700 -d "$RUNNER_TEMP/codex-home"
+ printf 'CODEX_HOME=%s\n' "$RUNNER_TEMP/codex-home" >> "$GITHUB_ENV"
+
+ ossutil -i "$OSS_AK" -k "$OSS_SK" -e "$OSS_ENDPOINT" cp -f
"$OSS_CODEX_AUTH_OBJECT" "$RUNNER_TEMP/codex-home/auth.json"
+ chmod 600 "$RUNNER_TEMP/codex-home/auth.json"
+ test -s "$RUNNER_TEMP/codex-home/auth.json"
+ jq -e '
+ .auth_mode == "chatgpt"
+ and (.tokens.access_token | type == "string" and length > 0)
+ and (.tokens.refresh_token | type == "string" and length > 0)
+ ' "$RUNNER_TEMP/codex-home/auth.json" >/dev/null
+
+ cat > "$RUNNER_TEMP/codex-home/config.toml" <<EOF
+ cli_auth_credentials_store = "file"
+ approval_policy = "never"
+
+ [shell_environment_policy]
+ inherit = "all"
+
+ [sandbox_workspace_write]
+ network_access = true
+
+ [otel]
+ environment = "github-actions"
+ exporter = "none"
+ metrics_exporter = "none"
+ trace_exporter = "none"
+ EOF
+ chmod 600 "$RUNNER_TEMP/codex-home/config.toml"
env:
OSS_AK: ${{ secrets.OSS_AK }}
OSS_SK: ${{ secrets.OSS_SK }}
OSS_ENDPOINT: oss-cn-hongkong.aliyuncs.com
- OSS_AUTH_OBJECT: oss://doris-community-ci/auth.json
- run: |
- mkdir -p ~/.local/share/opencode
- ossutil -i "$OSS_AK" -k "$OSS_SK" -e "$OSS_ENDPOINT" cp -f
"$OSS_AUTH_OBJECT" ~/.local/share/opencode/auth.json
- chmod 600 ~/.local/share/opencode/auth.json
- test -s ~/.local/share/opencode/auth.json
+ OSS_CODEX_AUTH_OBJECT: oss://doris-community-ci/codex/auth.json
- name: Prepare review context directory
run: |
@@ -204,24 +228,38 @@ jobs:
continue-on-error: true
env:
GH_TOKEN: ${{ secrets.GITHUB_TOKEN }}
+ REASONING_EFFORT: xhigh
run: |
PROMPT=$(cat "$REVIEW_CONTEXT_DIR/review_prompt.txt")
set +e
- opencode run "$PROMPT" -m "openai/gpt-5.5" 2>&1 | tee
"$REVIEW_CONTEXT_DIR/opencode-review.log"
- status=${PIPESTATUS[0]}
+ codex exec "$PROMPT" \
+ --cd "$GITHUB_WORKSPACE" \
+ --model "gpt-5.5" \
+ --config "model_reasoning_effort=\"${REASONING_EFFORT}\"" \
+ --sandbox workspace-write \
+ --color never \
+ --json \
+ --output-last-message
"$REVIEW_CONTEXT_DIR/codex-final-message.txt" \
+ > "$REVIEW_CONTEXT_DIR/codex-events.jsonl" \
+ 2> >(tee "$REVIEW_CONTEXT_DIR/codex-stderr.log" >&2)
+ status=$?
set -e
- last_log_line=$(
- awk 'NF { line = $0 } END { print line }'
"$REVIEW_CONTEXT_DIR/opencode-review.log" \
- | perl -pe 's/\e\[[0-9;?]*[ -\/]*[@-~]//g'
- )
-
failure_reason=""
- if printf '%s\n' "$last_log_line" | rg -q -i '^Error:|SSE read timed
out'; then
- failure_reason="$last_log_line"
- elif [ "$status" -ne 0 ]; then
- failure_reason="OpenCode exited with status $status"
+ if [ "$status" -ne 0 ]; then
+ if [ -s "$REVIEW_CONTEXT_DIR/codex-events.jsonl" ]; then
+ failure_reason="$(jq -r 'select(.type == "turn.failed") |
.error.message // empty' "$REVIEW_CONTEXT_DIR/codex-events.jsonl" | tail -n 1)"
+ if [ -z "$failure_reason" ]; then
+ failure_reason="$(jq -r 'select(.type == "error") | .message
// .error.message // empty' "$REVIEW_CONTEXT_DIR/codex-events.jsonl" | tail -n
1)"
+ fi
+ fi
+ if [ -z "$failure_reason" ] && [ -s
"$REVIEW_CONTEXT_DIR/codex-stderr.log" ]; then
+ failure_reason="$(awk 'NF { line = $0 } END { print line }'
"$REVIEW_CONTEXT_DIR/codex-stderr.log")"
+ fi
+ if [ -z "$failure_reason" ]; then
+ failure_reason="Codex exited with status $status"
+ fi
fi
if [ -n "$failure_reason" ]; then
@@ -233,23 +271,72 @@ jobs:
exit 1
fi
- - name: Persist OpenCode auth
- if: ${{ always() && steps.configure-auth.outcome == 'success' }}
+ - name: Record review I/O to Litefuse
+ if: ${{ always() }}
env:
- OSS_AK: ${{ secrets.OSS_AK }}
- OSS_SK: ${{ secrets.OSS_SK }}
- OSS_ENDPOINT: oss-cn-hongkong.aliyuncs.com
- OSS_AUTH_OBJECT: oss://doris-community-ci/auth.json
+ GH_TOKEN: ${{ secrets.GITHUB_TOKEN }}
+ LANGFUSE_PUBLIC_KEY: ${{ secrets.LANGFUSE_PK }}
+ LANGFUSE_SECRET_KEY: ${{ secrets.LANGFUSE_SK }}
+ REPO: ${{ github.repository }}
+ PR_NUMBER: ${{ inputs.pr_number }}
+ HEAD_SHA: ${{ inputs.head_sha }}
+ BASE_SHA: ${{ inputs.base_sha }}
+ HELPER_REF: ${{ github.workflow_sha || github.sha }}
+ REASONING_EFFORT: xhigh
run: |
- if [ ! -s ~/.local/share/opencode/auth.json ]; then
- echo "::warning::OpenCode auth file is missing or empty; skip OSS
auth persistence."
+ if [ ! -s "$REVIEW_CONTEXT_DIR/review_prompt.txt" ] || [ ! -s
"$REVIEW_CONTEXT_DIR/codex-events.jsonl" ]; then
+ echo "Review prompt or Codex JSONL event stream is missing;
skipping Litefuse I/O recording."
exit 0
fi
- if ! ossutil -i "$OSS_AK" -k "$OSS_SK" -e "$OSS_ENDPOINT" cp -f
~/.local/share/opencode/auth.json "$OSS_AUTH_OBJECT"; then
- echo "::warning::Failed to persist OpenCode auth to OSS; continue
because review already finished."
+ helper="$RUNNER_TEMP/emit_litefuse_otel_io.py"
+ gh api \
+ -H "Accept: application/vnd.github.raw" \
+
"repos/${REPO}/contents/.github/scripts/emit_litefuse_otel_io.py?ref=${HELPER_REF}"
\
+ > "$helper"
+ chmod 700 "$helper"
+
+ python3 "$helper" \
+ --input-file "$REVIEW_CONTEXT_DIR/review_prompt.txt" \
+ --events-file "$REVIEW_CONTEXT_DIR/codex-events.jsonl" \
+ --output-file "$REVIEW_CONTEXT_DIR/codex-final-message.txt" \
+ --trace-name "doris-ai-review" \
+ --session-id "$GITHUB_RUN_ID" \
+ --repository "$REPO" \
+ --workflow "$GITHUB_WORKFLOW" \
+ --run-id "$GITHUB_RUN_ID" \
+ --pr-number "$PR_NUMBER" \
+ --head-sha "$HEAD_SHA" \
+ --base-sha "$BASE_SHA" \
+ --model "gpt-5.5" \
+ --reasoning-effort "$REASONING_EFFORT" \
+ --environment "github-actions" \
+ --min-observations 4 \
+ --min-step-observations 2 \
+ --verify-attempts 24 \
+ --verify-sleep-seconds 5 \
+ --verify
+
+ - name: Sync Codex auth back to OSS
+ if: ${{ always() }}
+ run: |
+ if [ ! -s "$CODEX_HOME/auth.json" ]; then
+ echo "No Codex auth file found; skipping OSS auth sync."
+ exit 0
fi
+ jq -e '
+ .auth_mode == "chatgpt"
+ and (.tokens.access_token | type == "string" and length > 0)
+ and (.tokens.refresh_token | type == "string" and length > 0)
+ ' "$CODEX_HOME/auth.json" >/dev/null
+ ossutil -i "$OSS_AK" -k "$OSS_SK" -e "$OSS_ENDPOINT" cp -f
"$CODEX_HOME/auth.json" "$OSS_CODEX_AUTH_OBJECT"
+ env:
+ OSS_AK: ${{ secrets.OSS_AK }}
+ OSS_SK: ${{ secrets.OSS_SK }}
+ OSS_ENDPOINT: oss-cn-hongkong.aliyuncs.com
+ OSS_CODEX_AUTH_OBJECT: oss://doris-community-ci/codex/auth.json
+
- name: Comment PR on review failure
if: ${{ always() && steps.review.outcome != 'success' }}
env:
@@ -260,7 +347,7 @@ jobs:
run: |
error_msg="${REVIEW_FAILURE_REASON:-Review step was $REVIEW_OUTCOME
(possibly timeout or cancelled)}"
gh pr comment "${{ inputs.pr_number }}" --body "$(cat <<EOF
- OpenCode automated review failed and did not complete.
+ Codex automated review failed and did not complete.
Error: ${error_msg}
Workflow run: ${RUN_URL}
@@ -276,5 +363,5 @@ jobs:
REVIEW_OUTCOME: ${{ steps.review.outcome }}
run: |
error_msg="${REVIEW_FAILURE_REASON:-Review step was $REVIEW_OUTCOME
(possibly timeout or cancelled)}"
- echo "OpenCode automated review failed: ${error_msg}"
+ echo "Codex automated review failed: ${error_msg}"
exit 1
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]