This is an automated email from the ASF dual-hosted git repository.

zclllyybb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new c333631534b [chore](ci) Record Codex review traces in Litefuse (#64319)
c333631534b is described below

commit c333631534b08b0e874d4bc863c0cade8bf40a80
Author: zclllyybb <[email protected]>
AuthorDate: Tue Jun 9 20:30:29 2026 +0800

    [chore](ci) Record Codex review traces in Litefuse (#64319)
    
    Run the AI review workflow through Codex, persist the review
    prompt/output even when review fails, and emit Codex JSONL steps to
    Litefuse using the public ingestion API.
    
    The runner now uses a Codex-native auth.json stored at
    oss://doris-community-ci/codex/auth.json. It downloads that file
    directly into CODEX_HOME and syncs the refreshed Codex auth.json back to
    the same object, without converting to or from the old opencode auth
    format.
    
    Codex execution uses workspace-write sandboxing with
    approval_policy=never instead of dangerous bypass mode. The workflow
    does not add any writable directory outside the checked-out repository,
    while keeping network access available for GitHub review operations.
    
    The Litefuse emitter creates a trace, root review span, turn generation,
    and per-step observations with non-empty input/output. Agent messages
    use the event window since the previous agent message as input,
    preserving command/tool/reasoning context as a structured list.
    
    Readback verification uses Litefuse public observation APIs to assert
    trace I/O, step observation counts, missing I/O, and agent-message
    context structure without logging prompt text or command output.
---
 .github/scripts/emit_litefuse_otel_io.py     | 829 +++++++++++++++++++++++++++
 .github/workflows/opencode-review-runner.yml | 159 +++--
 2 files changed, 952 insertions(+), 36 deletions(-)

diff --git a/.github/scripts/emit_litefuse_otel_io.py 
b/.github/scripts/emit_litefuse_otel_io.py
new file mode 100644
index 00000000000..d8710342d58
--- /dev/null
+++ b/.github/scripts/emit_litefuse_otel_io.py
@@ -0,0 +1,829 @@
+#!/usr/bin/env python3
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import argparse
+import base64
+from datetime import datetime, timedelta, timezone
+import json
+import os
+import secrets
+import time
+import urllib.parse
+import urllib.request
+
+
+def read_text(path, max_chars, tail=False, optional=False):
+    try:
+        with open(path, "r", encoding="utf-8", errors="replace") as handle:
+            data = handle.read()
+    except FileNotFoundError:
+        if optional:
+            return ""
+        raise
+    return truncate_text(data, max_chars, tail)
+
+
+def truncate_text(data, max_chars, tail=False):
+    if max_chars <= 0 or len(data) <= max_chars:
+        return data
+    if tail:
+        return f"[truncated to last {max_chars} chars]\n" + data[-max_chars:]
+    return data[:max_chars] + f"\n[truncated to first {max_chars} chars]"
+
+
+def truncate_json(value, max_chars):
+    text = json.dumps(value, ensure_ascii=False, sort_keys=True)
+    if max_chars <= 0 or len(text) <= max_chars:
+        return value
+    return {"truncated_json": truncate_text(text, max_chars)}
+
+
+def json_attr(value):
+    return json.dumps(value, ensure_ascii=False, sort_keys=True)
+
+
+def load_jsonl(path):
+    events = []
+    with open(path, "r", encoding="utf-8", errors="replace") as handle:
+        for line_number, line in enumerate(handle, start=1):
+            stripped = line.strip()
+            if not stripped:
+                continue
+            try:
+                event = json.loads(stripped)
+            except json.JSONDecodeError as exc:
+                events.append(
+                    {
+                        "type": "error",
+                        "message": f"invalid jsonl line {line_number}: {exc}",
+                    }
+                )
+                continue
+            event["_line_number"] = line_number
+            events.append(event)
+    return events
+
+
+def find_final_message(events, output_text):
+    if output_text.strip():
+        return output_text
+    for event in reversed(events):
+        item = event.get("item") or {}
+        if event.get("type") == "item.completed" and item.get("type") == 
"agent_message":
+            text = item.get("text") or ""
+            if text.strip():
+                return text
+    return ""
+
+
+def latest_turn_result(events):
+    for event in reversed(events):
+        if event.get("type") == "turn.completed":
+            return "completed", event.get("usage") or {}
+        if event.get("type") == "turn.failed":
+            return "failed", event.get("error") or {}
+    return "unknown", {}
+
+
+def event_context_payload(event, max_json_chars):
+    item = event.get("item") if isinstance(event.get("item"), dict) else None
+    payload = {
+        "event_type": event.get("type"),
+        "line_number": event.get("_line_number"),
+    }
+    if not item:
+        event_payload = {
+            key: value for key, value in event.items() if key != "_line_number"
+        }
+        payload["payload"] = truncate_json(event_payload, max_json_chars)
+        return payload
+
+    item_type = item.get("type", "unknown")
+    payload.update(
+        {
+            "item_id": item.get("id", ""),
+            "item_type": item_type,
+            "status": item.get("status"),
+        }
+    )
+
+    if item_type == "command_execution":
+        payload.update(
+            {
+                "command": item.get("command") or "",
+                "exit_code": item.get("exit_code"),
+                "aggregated_output": truncate_text(
+                    item.get("aggregated_output") or "", max_json_chars, 
tail=True
+                ),
+            }
+        )
+    elif item_type == "mcp_tool_call":
+        payload.update(
+            {
+                "server": item.get("server"),
+                "tool": item.get("tool"),
+                "arguments": truncate_json(item.get("arguments"), 
max_json_chars),
+                "result": truncate_json(item.get("result"), max_json_chars),
+                "error": truncate_json(item.get("error"), max_json_chars),
+            }
+        )
+    elif item_type == "collab_tool_call":
+        payload.update(
+            {
+                "tool": item.get("tool"),
+                "prompt": truncate_text(item.get("prompt") or "", 
max_json_chars),
+                "sender_thread_id": item.get("sender_thread_id"),
+                "receiver_thread_ids": item.get("receiver_thread_ids") or [],
+                "agents_states": truncate_json(item.get("agents_states"), 
max_json_chars),
+            }
+        )
+    elif item_type == "agent_message":
+        payload["text"] = truncate_text(item.get("text") or "", max_json_chars)
+    else:
+        payload["item"] = truncate_json(item, max_json_chars)
+
+    return {key: value for key, value in payload.items() if value not in 
(None, "")}
+
+
+def build_agent_message_inputs(events, turn_input, max_json_chars):
+    inputs = {}
+    previous_agent_message = None
+    context_events = []
+    for event in events:
+        item = event.get("item") if isinstance(event.get("item"), dict) else {}
+        is_agent_message = (
+            event.get("type") == "item.completed"
+            and item.get("type") == "agent_message"
+        )
+        if is_agent_message:
+            current_id = item.get("id") or f"line:{event.get('_line_number', 
0)}"
+            input_payload = {
+                "item_type": "agent_message",
+                "context_window": {
+                    "from": (
+                        previous_agent_message.get("id")
+                        if previous_agent_message
+                        else "turn_start"
+                    ),
+                    "to": current_id,
+                    "event_count": len(context_events),
+                },
+                "events_since_previous_agent_message": [
+                    event_context_payload(context_event, max_json_chars)
+                    for context_event in context_events
+                ],
+            }
+            if previous_agent_message:
+                input_payload["previous_agent_message"] = {
+                    "id": previous_agent_message.get("id", ""),
+                    "text": truncate_text(
+                        previous_agent_message.get("text") or "", 
max_json_chars
+                    ),
+                }
+            else:
+                input_payload["turn_input"] = truncate_text(turn_input, 
max_json_chars)
+            inputs[event.get("_line_number")] = input_payload
+            previous_agent_message = item
+            context_events = []
+        else:
+            context_events.append(event)
+    return inputs
+
+
+def observation_shape(item, max_json_chars, agent_message_input=None):
+    item_type = item.get("type", "unknown")
+    status = item.get("status")
+
+    if item_type == "command_execution":
+        command = item.get("command") or ""
+        return {
+            "name": "codex.command",
+            "type": "span",
+            "input": {"command": command},
+            "output": {
+                "status": status,
+                "exit_code": item.get("exit_code"),
+                "aggregated_output": truncate_text(
+                    item.get("aggregated_output") or "", max_json_chars, 
tail=True
+                ),
+            },
+            "metadata": {"command": command[:240], "status": status, 
"step_kind": "tool"},
+        }
+
+    if item_type == "mcp_tool_call":
+        return {
+            "name": f"codex.mcp.{item.get('server', 
'unknown')}.{item.get('tool', 'unknown')}",
+            "type": "span",
+            "input": {
+                "server": item.get("server"),
+                "tool": item.get("tool"),
+                "arguments": truncate_json(item.get("arguments"), 
max_json_chars),
+            },
+            "output": {
+                "status": status,
+                "result": truncate_json(item.get("result"), max_json_chars),
+                "error": truncate_json(item.get("error"), max_json_chars),
+            },
+            "metadata": {
+                "server": item.get("server"),
+                "tool": item.get("tool"),
+                "status": status,
+                "step_kind": "tool",
+            },
+        }
+
+    if item_type == "collab_tool_call":
+        return {
+            "name": f"codex.collab.{item.get('tool', 'unknown')}",
+            "type": "span",
+            "input": {
+                "tool": item.get("tool"),
+                "prompt": truncate_text(item.get("prompt") or "", 
max_json_chars),
+                "sender_thread_id": item.get("sender_thread_id"),
+            },
+            "output": {
+                "status": status,
+                "receiver_thread_ids": item.get("receiver_thread_ids") or [],
+                "agents_states": truncate_json(item.get("agents_states"), 
max_json_chars),
+            },
+            "metadata": {"tool": item.get("tool"), "status": status, 
"step_kind": "tool"},
+        }
+
+    if item_type == "web_search":
+        return {
+            "name": "codex.web_search",
+            "type": "span",
+            "input": {"query": item.get("query"), "action": 
item.get("action")},
+            "output": {"status": "completed", "query": item.get("query")},
+            "metadata": {"query": (item.get("query") or "")[:240], 
"step_kind": "tool"},
+        }
+
+    if item_type == "file_change":
+        return {
+            "name": "codex.file_change",
+            "type": "span",
+            "input": {"changes": item.get("changes") or []},
+            "output": {"status": status, "changes": item.get("changes") or []},
+            "metadata": {"status": status},
+        }
+
+    if item_type == "todo_list":
+        return {
+            "name": "codex.todo_list",
+            "type": "span",
+            "input": {"item_type": item_type},
+            "output": {"items": item.get("items") or []},
+            "metadata": {"item_count": len(item.get("items") or [])},
+        }
+
+    if item_type == "reasoning":
+        return {
+            "name": "codex.reasoning",
+            "type": "span",
+            "input": {"item_type": item_type},
+            "output": {"text": truncate_text(item.get("text") or "", 
max_json_chars)},
+            "metadata": {},
+        }
+
+    if item_type == "agent_message":
+        return {
+            "name": "codex.agent_message",
+            "type": "generation",
+            "input": agent_message_input or {"item_type": item_type},
+            "output": {"text": truncate_text(item.get("text") or "", 
max_json_chars)},
+            "metadata": {},
+        }
+
+    if item_type == "error":
+        return {
+            "name": "codex.error",
+            "type": "span",
+            "input": {"item_type": item_type},
+            "output": {"message": item.get("message") or ""},
+            "metadata": {},
+        }
+
+    return {
+        "name": f"codex.{item_type}",
+        "type": "span",
+        "input": {"item_type": item_type},
+        "output": truncate_json(item, max_json_chars),
+        "metadata": {"item_type": item_type},
+    }
+
+
+def iso_from_ns(ns):
+    return (
+        datetime.fromtimestamp(ns / 1_000_000_000, timezone.utc)
+        .isoformat(timespec="milliseconds")
+        .replace("+00:00", "Z")
+    )
+
+
+def ingestion_event(event_type, timestamp, body):
+    return {
+        "id": secrets.token_hex(16),
+        "timestamp": timestamp,
+        "type": event_type,
+        "body": body,
+    }
+
+
+def build_ingestion_payload(args, input_text, output_text, events):
+    now = time.time_ns()
+    trace_id = secrets.token_hex(16)
+    root_observation_id = secrets.token_hex(16)
+    turn_status, turn_payload = latest_turn_result(events)
+    final_message = find_final_message(events, output_text)
+    trace_output = final_message or json_attr({"turn_status": turn_status})
+
+    trace_metadata = {
+        "repository": args.repository,
+        "workflow": args.workflow,
+        "run_id": args.run_id,
+        "pr_number": args.pr_number,
+        "head_sha": args.head_sha,
+        "base_sha": args.base_sha,
+        "model_reasoning_effort": args.reasoning_effort,
+        "codex_jsonl": True,
+    }
+    trace_metadata = {
+        key: value for key, value in trace_metadata.items() if value not in 
(None, "")
+    }
+
+    completed_items = [
+        event
+        for event in events
+        if event.get("type") == "item.completed" and 
isinstance(event.get("item"), dict)
+    ]
+    agent_message_inputs = build_agent_message_inputs(
+        events, input_text, args.max_json_chars
+    )
+    child_count = len(completed_items) + 1
+    root_end = now + (child_count + 2) * 1_000_000
+    timestamp = iso_from_ns(now)
+
+    batch = [
+        ingestion_event(
+            "trace-create",
+            timestamp,
+            {
+                "id": trace_id,
+                "timestamp": timestamp,
+                "name": args.trace_name,
+                "input": input_text,
+                "output": trace_output,
+                "sessionId": args.session_id,
+                "environment": args.environment,
+                "metadata": trace_metadata,
+                "tags": ["doris-ai-review", "codex-jsonl"],
+            },
+        ),
+        ingestion_event(
+            "span-create",
+            timestamp,
+            {
+                "id": root_observation_id,
+                "traceId": trace_id,
+                "name": "codex.review",
+                "startTime": iso_from_ns(now),
+                "endTime": iso_from_ns(root_end),
+                "input": {"prompt": input_text},
+                "output": {"final_message": trace_output, "turn_status": 
turn_status},
+                "environment": args.environment,
+                "metadata": {
+                    **trace_metadata,
+                    "codex_event_count": len(events),
+                    "codex_completed_item_count": len(completed_items),
+                },
+            },
+        ),
+    ]
+
+    turn_body = {
+        "id": secrets.token_hex(16),
+        "traceId": trace_id,
+        "parentObservationId": root_observation_id,
+        "name": "codex.turn",
+        "startTime": iso_from_ns(now + 1_000_000),
+        "endTime": iso_from_ns(now + 2_000_000),
+        "model": args.model,
+        "input": {"prompt": input_text},
+        "output": {"status": turn_status, "final_message": trace_output},
+        "environment": args.environment,
+        "metadata": {**trace_metadata, "item_type": "turn"},
+        "level": "ERROR" if turn_status == "failed" else "DEFAULT",
+    }
+    if turn_status == "completed":
+        turn_body["usageDetails"] = {
+            "input": int(turn_payload.get("input_tokens") or 0),
+            "output": int(turn_payload.get("output_tokens") or 0),
+            "cache_read_input_tokens": 
int(turn_payload.get("cached_input_tokens") or 0),
+            "reasoning_output_tokens": int(
+                turn_payload.get("reasoning_output_tokens") or 0
+            ),
+        }
+    else:
+        turn_body["statusMessage"] = json_attr(turn_payload)
+
+    batch.append(ingestion_event("generation-create", iso_from_ns(now + 
1_000_000), turn_body))
+
+    for offset, event in enumerate(completed_items, start=2):
+        item = event["item"]
+        shape = observation_shape(
+            item, args.max_json_chars, 
agent_message_inputs.get(event.get("_line_number"))
+        )
+        item_type = item.get("type", "unknown")
+        item_status = item.get("status", "completed")
+        body = {
+            "id": secrets.token_hex(16),
+            "traceId": trace_id,
+            "parentObservationId": root_observation_id,
+            "name": shape["name"],
+            "startTime": iso_from_ns(now + offset * 1_000_000),
+            "endTime": iso_from_ns(now + offset * 1_000_000 + 750_000),
+            "input": shape["input"],
+            "output": shape["output"],
+            "environment": args.environment,
+            "level": "ERROR" if item_status in ("failed", "declined") else 
"DEFAULT",
+            "metadata": {
+                **trace_metadata,
+                "item_id": item.get("id", ""),
+                "item_type": item_type,
+                "event_line": event.get("_line_number", 0),
+            },
+        }
+        for key, value in shape.get("metadata", {}).items():
+            if value not in (None, ""):
+                body["metadata"][key] = value
+        event_type = "generation-create" if shape["type"] == "generation" else 
"span-create"
+        if shape["type"] == "generation":
+            body["model"] = args.model
+        batch.append(ingestion_event(event_type, body["startTime"], body))
+
+    return trace_id, {"batch": batch}, len(batch) - 1
+
+
+def post_payload(endpoint, public_key, secret_key, payload):
+    auth = base64.b64encode(f"{public_key}:{secret_key}".encode()).decode()
+    request = urllib.request.Request(
+        endpoint,
+        data=json.dumps(payload).encode(),
+        headers={
+            "Content-Type": "application/json",
+            "Authorization": f"Basic {auth}",
+        },
+        method="POST",
+    )
+    with urllib.request.urlopen(request, timeout=30) as response:
+        body = response.read().decode()
+        detail = json.loads(body) if body else {}
+        errors = detail.get("errors") if isinstance(detail, dict) else None
+        if errors:
+            raise RuntimeError(f"Litefuse ingestion returned errors: 
{json_attr(errors)}")
+        return {
+            "status": response.status,
+            "success_count": len(detail.get("successes") or [])
+            if isinstance(detail, dict)
+            else 0,
+        }
+
+
+def fetch_trace(base_url, public_key, secret_key, trace_id):
+    auth = base64.b64encode(f"{public_key}:{secret_key}".encode()).decode()
+    request = urllib.request.Request(
+        f"{base_url.rstrip('/')}/api/public/traces/{trace_id}",
+        headers={"Authorization": f"Basic {auth}"},
+        method="GET",
+    )
+    with urllib.request.urlopen(request, timeout=30) as response:
+        return json.loads(response.read().decode())
+
+
+def fetch_observations_v2(base_url, public_key, secret_key, trace_id):
+    auth = base64.b64encode(f"{public_key}:{secret_key}".encode()).decode()
+    now = datetime.now(timezone.utc)
+    params = urllib.parse.urlencode(
+        {
+            "traceId": trace_id,
+            "fromStartTime": (now - 
timedelta(hours=1)).isoformat().replace("+00:00", "Z"),
+            "toStartTime": (now + 
timedelta(minutes=5)).isoformat().replace("+00:00", "Z"),
+            "fields": "core,basic,io,trace_context,model,usage",
+            "limit": "100",
+        }
+    )
+    request = urllib.request.Request(
+        f"{base_url.rstrip('/')}/api/public/v2/observations?{params}",
+        headers={"Authorization": f"Basic {auth}"},
+        method="GET",
+    )
+    with urllib.request.urlopen(request, timeout=30) as response:
+        return json.loads(response.read().decode())
+
+
+def fetch_observations_legacy(base_url, public_key, secret_key, trace_id):
+    auth = base64.b64encode(f"{public_key}:{secret_key}".encode()).decode()
+    params = urllib.parse.urlencode({"traceId": trace_id, "limit": "100", 
"page": "1"})
+    request = urllib.request.Request(
+        f"{base_url.rstrip('/')}/api/public/observations?{params}",
+        headers={"Authorization": f"Basic {auth}"},
+        method="GET",
+    )
+    with urllib.request.urlopen(request, timeout=30) as response:
+        return json.loads(response.read().decode())
+
+
+def observation_rows_from_v2(payload):
+    rows = payload.get("data") if isinstance(payload, dict) else None
+    if isinstance(rows, list):
+        return rows
+    return []
+
+
+def observation_io_object(observation, field):
+    value = observation.get(field)
+    if isinstance(value, dict):
+        return value
+    if isinstance(value, str):
+        try:
+            parsed = json.loads(value)
+        except json.JSONDecodeError:
+            return {}
+        if isinstance(parsed, dict):
+            return parsed
+    return {}
+
+
+def context_event_types(events_value):
+    if isinstance(events_value, list):
+        return [
+            event.get("item_type") or event.get("event_type")
+            for event in events_value
+            if isinstance(event, dict)
+        ]
+    if isinstance(events_value, dict) and "truncated_json" in events_value:
+        return ["truncated_json"]
+    return []
+
+
+def context_events_readback_ok(input_object):
+    context_window = input_object.get("context_window")
+    event_count = 0
+    if isinstance(context_window, dict):
+        try:
+            event_count = int(context_window.get("event_count") or 0)
+        except (TypeError, ValueError):
+            event_count = 0
+    events_value = input_object.get("events_since_previous_agent_message")
+    if isinstance(events_value, list):
+        return True
+    return event_count == 0 and events_value in (None, "", {})
+
+
+def verify_trace(args, public_key, secret_key, trace_id):
+    last_diagnostic = {}
+    for _ in range(args.verify_attempts):
+        legacy_trace_error = ""
+        try:
+            legacy_detail = fetch_trace(args.base_url, public_key, secret_key, 
trace_id)
+        except Exception as exc:
+            legacy_detail = {}
+            legacy_trace_error = type(exc).__name__
+        try:
+            observations_payload = fetch_observations_legacy(
+                args.base_url, public_key, secret_key, trace_id
+            )
+            observations = observation_rows_from_v2(observations_payload)
+            read_source = "legacy_observations"
+        except Exception as exc:
+            try:
+                observations_payload = fetch_observations_v2(
+                    args.base_url, public_key, secret_key, trace_id
+                )
+                observations = observation_rows_from_v2(observations_payload)
+                read_source = "v2_observations"
+            except Exception:
+                observations = legacy_detail.get("observations") or []
+                read_source = f"legacy_trace_fallback:{type(exc).__name__}"
+        observations_missing_io = [
+            observation
+            for observation in observations
+            if not (observation.get("input") and observation.get("output"))
+        ]
+        step_observations = [
+            observation
+            for observation in observations
+            if observation.get("name") not in ("codex.review", "codex.turn")
+        ]
+        agent_message_observations = [
+            observation
+            for observation in observations
+            if observation.get("name") == "codex.agent_message"
+        ]
+        agent_message_input_objects = [
+            observation_io_object(observation, "input")
+            for observation in agent_message_observations
+        ]
+        agent_message_input_keys = sorted(
+            {
+                key
+                for input_object in agent_message_input_objects
+                for key in input_object.keys()
+            }
+        )
+        agent_message_all_have_context_window = all(
+            bool(input_object.get("context_window"))
+            for input_object in agent_message_input_objects
+        )
+        agent_message_all_have_context_events = all(
+            context_events_readback_ok(input_object)
+            for input_object in agent_message_input_objects
+        )
+        agent_message_with_previous_count = sum(
+            1
+            for input_object in agent_message_input_objects
+            if bool(input_object.get("previous_agent_message"))
+        )
+        agent_message_with_turn_input_count = sum(
+            1
+            for input_object in agent_message_input_objects
+            if bool(input_object.get("turn_input"))
+        )
+        agent_message_context_event_counts = [
+            (input_object.get("context_window") or {}).get("event_count", 0)
+            for input_object in agent_message_input_objects[:20]
+            if isinstance(input_object.get("context_window"), dict)
+        ]
+        agent_message_event_type_samples = [
+            context_event_types(
+                input_object.get("events_since_previous_agent_message")
+            )[:8]
+            for input_object in agent_message_input_objects[:5]
+        ]
+        agent_message_structure_ok = (
+            agent_message_all_have_context_window
+            and agent_message_all_have_context_events
+            and agent_message_with_turn_input_count == 1
+            and (
+                agent_message_with_previous_count
+                == max(len(agent_message_observations) - 1, 0)
+            )
+        )
+        root_observation = next(
+            (observation for observation in observations if 
observation.get("name") == "codex.review"),
+            {},
+        )
+        trace_input = legacy_detail.get("input") or 
root_observation.get("input")
+        trace_output = legacy_detail.get("output") or 
root_observation.get("output")
+        last_diagnostic = {
+            "read_source": read_source,
+            "legacy_trace_input": bool(legacy_detail.get("input")),
+            "legacy_trace_output": bool(legacy_detail.get("output")),
+            "legacy_trace_error": legacy_trace_error,
+            "trace_input": bool(trace_input),
+            "trace_output": bool(trace_output),
+            "observation_count": len(observations),
+            "step_observation_count": len(step_observations),
+            "agent_message_count": len(agent_message_observations),
+            "agent_message_input_keys": agent_message_input_keys,
+            "agent_message_all_have_context_window": 
agent_message_all_have_context_window,
+            "agent_message_all_have_context_events": 
agent_message_all_have_context_events,
+            "agent_message_with_previous_count": 
agent_message_with_previous_count,
+            "agent_message_with_turn_input_count": 
agent_message_with_turn_input_count,
+            "agent_message_context_event_counts": 
agent_message_context_event_counts,
+            "agent_message_event_type_samples": 
agent_message_event_type_samples,
+            "observations_missing_io": [
+                observation.get("name") for observation in 
observations_missing_io[:20]
+            ],
+            "observation_names": [observation.get("name") for observation in 
observations[:20]],
+        }
+        ok = all(
+            [
+                trace_input,
+                trace_output,
+                len(observations) >= args.min_observations,
+                len(step_observations) >= args.min_step_observations,
+                not observations_missing_io,
+                not agent_message_observations or agent_message_structure_ok,
+            ]
+        )
+        if ok:
+            return {
+                "trace_input": True,
+                "trace_output": True,
+                "read_source": read_source,
+                "observation_count": len(observations),
+                "step_observation_count": len(step_observations),
+                "agent_message_count": len(agent_message_observations),
+                "agent_message_input_keys": agent_message_input_keys,
+                "agent_message_all_have_context_window": 
agent_message_all_have_context_window,
+                "agent_message_all_have_context_events": 
agent_message_all_have_context_events,
+                "agent_message_with_previous_count": 
agent_message_with_previous_count,
+                "agent_message_with_turn_input_count": 
agent_message_with_turn_input_count,
+                "agent_message_context_event_counts": 
agent_message_context_event_counts,
+                "agent_message_event_type_samples": 
agent_message_event_type_samples,
+                "observations_missing_io": [],
+                "observation_names": [
+                    observation.get("name") for observation in 
observations[:20]
+                ],
+            }
+        time.sleep(args.verify_sleep_seconds)
+    raise RuntimeError(
+        "Litefuse trace "
+        f"{trace_id} did not expose multi-step I/O in time; "
+        f"last_diagnostic={json.dumps(last_diagnostic, sort_keys=True)}"
+    )
+
+
+def parse_args():
+    parser = argparse.ArgumentParser()
+    parser.add_argument("--base-url", default="https://litefuse.cloud";)
+    parser.add_argument("--endpoint", default=None)
+    parser.add_argument("--input-file", required=True)
+    parser.add_argument("--events-file", required=True)
+    parser.add_argument("--output-file", default="")
+    parser.add_argument("--trace-name", default="doris-ai-review")
+    parser.add_argument("--session-id", required=True)
+    parser.add_argument("--repository", default="")
+    parser.add_argument("--workflow", default="")
+    parser.add_argument("--run-id", default="")
+    parser.add_argument("--pr-number", default="")
+    parser.add_argument("--head-sha", default="")
+    parser.add_argument("--base-sha", default="")
+    parser.add_argument("--model", default="gpt-5.5")
+    parser.add_argument("--reasoning-effort", default="")
+    parser.add_argument("--environment", default="github-actions")
+    parser.add_argument("--max-input-chars", type=int, default=200_000)
+    parser.add_argument("--max-output-chars", type=int, default=200_000)
+    parser.add_argument("--max-json-chars", type=int, default=40_000)
+    parser.add_argument("--verify", action="store_true")
+    parser.add_argument("--dry-run", action="store_true")
+    parser.add_argument("--verify-attempts", type=int, default=24)
+    parser.add_argument("--verify-sleep-seconds", type=int, default=5)
+    parser.add_argument("--min-observations", type=int, default=3)
+    parser.add_argument("--min-step-observations", type=int, default=1)
+    return parser.parse_args()
+
+
+def main():
+    args = parse_args()
+    endpoint = args.endpoint or 
f"{args.base_url.rstrip('/')}/api/public/ingestion"
+
+    input_text = read_text(args.input_file, args.max_input_chars)
+    output_text = (
+        read_text(args.output_file, args.max_output_chars, tail=True, 
optional=True)
+        if args.output_file
+        else ""
+    )
+    events = load_jsonl(args.events_file)
+    trace_id, payload, observation_count = build_ingestion_payload(
+        args, input_text, output_text, events
+    )
+
+    result = {
+        "dry_run": args.dry_run,
+        "event_count": len(events),
+        "observation_count": observation_count,
+        "trace_id": trace_id,
+        "session_id": args.session_id,
+        "trace_name": args.trace_name,
+        "model": args.model,
+        "reasoning_effort": args.reasoning_effort,
+    }
+    if args.dry_run:
+        result["batch_count"] = len(payload["batch"])
+        result["event_types"] = [event["type"] for event in 
payload["batch"][:10]]
+        print(json.dumps(result, sort_keys=True))
+        return
+
+    public_key = os.environ["LANGFUSE_PUBLIC_KEY"]
+    secret_key = os.environ["LANGFUSE_SECRET_KEY"]
+    result["status"] = post_payload(endpoint, public_key, secret_key, payload)
+
+    if args.verify:
+        try:
+            result["verified"] = verify_trace(args, public_key, secret_key, 
trace_id)
+        except Exception as exc:
+            result["verification_error"] = str(exc)
+            print(json.dumps(result, sort_keys=True))
+            raise
+    print(json.dumps(result, sort_keys=True))
+
+
+if __name__ == "__main__":
+    main()
diff --git a/.github/workflows/opencode-review-runner.yml 
b/.github/workflows/opencode-review-runner.yml
index 3d80e110fa9..2dd75445c9e 100644
--- a/.github/workflows/opencode-review-runner.yml
+++ b/.github/workflows/opencode-review-runner.yml
@@ -32,16 +32,16 @@ jobs:
         with:
           ref: ${{ inputs.head_sha }}
 
-      - name: Install runner utilities
+      - name: Install ripgrep
         run: |
           sudo apt-get update
-          sudo apt-get install -y ripgrep unzip
+          sudo apt-get install -y ripgrep
 
-      - name: Install OpenCode
+      - name: Install Codex
         run: |
           for attempt in 1 2 3; do
-            if curl -fsSL https://opencode.ai/install | bash; then
-              echo "$HOME/.opencode/bin" >> $GITHUB_PATH
+            if npm install -g @openai/codex; then
+              codex --version
               exit 0
             fi
             echo "Install attempt $attempt failed, retrying in 10s..."
@@ -58,18 +58,42 @@ jobs:
           unzip -q "$tmp_dir/ossutil.zip" -d "$tmp_dir"
           sudo install -m 0755 "$tmp_dir/ossutil-v1.7.19-linux-amd64/ossutil" 
/usr/local/bin/ossutil
 
-      - name: Configure OpenCode auth
-        id: configure-auth
+      - name: Configure Codex auth
+        run: |
+          install -m 700 -d "$RUNNER_TEMP/codex-home"
+          printf 'CODEX_HOME=%s\n' "$RUNNER_TEMP/codex-home" >> "$GITHUB_ENV"
+
+          ossutil -i "$OSS_AK" -k "$OSS_SK" -e "$OSS_ENDPOINT" cp -f 
"$OSS_CODEX_AUTH_OBJECT" "$RUNNER_TEMP/codex-home/auth.json"
+          chmod 600 "$RUNNER_TEMP/codex-home/auth.json"
+          test -s "$RUNNER_TEMP/codex-home/auth.json"
+          jq -e '
+            .auth_mode == "chatgpt"
+            and (.tokens.access_token | type == "string" and length > 0)
+            and (.tokens.refresh_token | type == "string" and length > 0)
+          ' "$RUNNER_TEMP/codex-home/auth.json" >/dev/null
+
+          cat > "$RUNNER_TEMP/codex-home/config.toml" <<EOF
+          cli_auth_credentials_store = "file"
+          approval_policy = "never"
+
+          [shell_environment_policy]
+          inherit = "all"
+
+          [sandbox_workspace_write]
+          network_access = true
+
+          [otel]
+          environment = "github-actions"
+          exporter = "none"
+          metrics_exporter = "none"
+          trace_exporter = "none"
+          EOF
+          chmod 600 "$RUNNER_TEMP/codex-home/config.toml"
         env:
           OSS_AK: ${{ secrets.OSS_AK }}
           OSS_SK: ${{ secrets.OSS_SK }}
           OSS_ENDPOINT: oss-cn-hongkong.aliyuncs.com
-          OSS_AUTH_OBJECT: oss://doris-community-ci/auth.json
-        run: |
-          mkdir -p ~/.local/share/opencode
-          ossutil -i "$OSS_AK" -k "$OSS_SK" -e "$OSS_ENDPOINT" cp -f 
"$OSS_AUTH_OBJECT" ~/.local/share/opencode/auth.json
-          chmod 600 ~/.local/share/opencode/auth.json
-          test -s ~/.local/share/opencode/auth.json
+          OSS_CODEX_AUTH_OBJECT: oss://doris-community-ci/codex/auth.json
 
       - name: Prepare review context directory
         run: |
@@ -204,24 +228,38 @@ jobs:
         continue-on-error: true
         env:
           GH_TOKEN: ${{ secrets.GITHUB_TOKEN }}
+          REASONING_EFFORT: xhigh
         run: |
           PROMPT=$(cat "$REVIEW_CONTEXT_DIR/review_prompt.txt")
 
           set +e
-          opencode run "$PROMPT" -m "openai/gpt-5.5" 2>&1 | tee 
"$REVIEW_CONTEXT_DIR/opencode-review.log"
-          status=${PIPESTATUS[0]}
+          codex exec "$PROMPT" \
+            --cd "$GITHUB_WORKSPACE" \
+            --model "gpt-5.5" \
+            --config "model_reasoning_effort=\"${REASONING_EFFORT}\"" \
+            --sandbox workspace-write \
+            --color never \
+            --json \
+            --output-last-message 
"$REVIEW_CONTEXT_DIR/codex-final-message.txt" \
+            > "$REVIEW_CONTEXT_DIR/codex-events.jsonl" \
+            2> >(tee "$REVIEW_CONTEXT_DIR/codex-stderr.log" >&2)
+          status=$?
           set -e
 
-          last_log_line=$(
-            awk 'NF { line = $0 } END { print line }' 
"$REVIEW_CONTEXT_DIR/opencode-review.log" \
-              | perl -pe 's/\e\[[0-9;?]*[ -\/]*[@-~]//g'
-          )
-
           failure_reason=""
-          if printf '%s\n' "$last_log_line" | rg -q -i '^Error:|SSE read timed 
out'; then
-            failure_reason="$last_log_line"
-          elif [ "$status" -ne 0 ]; then
-            failure_reason="OpenCode exited with status $status"
+          if [ "$status" -ne 0 ]; then
+            if [ -s "$REVIEW_CONTEXT_DIR/codex-events.jsonl" ]; then
+              failure_reason="$(jq -r 'select(.type == "turn.failed") | 
.error.message // empty' "$REVIEW_CONTEXT_DIR/codex-events.jsonl" | tail -n 1)"
+              if [ -z "$failure_reason" ]; then
+                failure_reason="$(jq -r 'select(.type == "error") | .message 
// .error.message // empty' "$REVIEW_CONTEXT_DIR/codex-events.jsonl" | tail -n 
1)"
+              fi
+            fi
+            if [ -z "$failure_reason" ] && [ -s 
"$REVIEW_CONTEXT_DIR/codex-stderr.log" ]; then
+              failure_reason="$(awk 'NF { line = $0 } END { print line }' 
"$REVIEW_CONTEXT_DIR/codex-stderr.log")"
+            fi
+            if [ -z "$failure_reason" ]; then
+              failure_reason="Codex exited with status $status"
+            fi
           fi
 
           if [ -n "$failure_reason" ]; then
@@ -233,23 +271,72 @@ jobs:
             exit 1
           fi
 
-      - name: Persist OpenCode auth
-        if: ${{ always() && steps.configure-auth.outcome == 'success' }}
+      - name: Record review I/O to Litefuse
+        if: ${{ always() }}
         env:
-          OSS_AK: ${{ secrets.OSS_AK }}
-          OSS_SK: ${{ secrets.OSS_SK }}
-          OSS_ENDPOINT: oss-cn-hongkong.aliyuncs.com
-          OSS_AUTH_OBJECT: oss://doris-community-ci/auth.json
+          GH_TOKEN: ${{ secrets.GITHUB_TOKEN }}
+          LANGFUSE_PUBLIC_KEY: ${{ secrets.LANGFUSE_PK }}
+          LANGFUSE_SECRET_KEY: ${{ secrets.LANGFUSE_SK }}
+          REPO: ${{ github.repository }}
+          PR_NUMBER: ${{ inputs.pr_number }}
+          HEAD_SHA: ${{ inputs.head_sha }}
+          BASE_SHA: ${{ inputs.base_sha }}
+          HELPER_REF: ${{ github.workflow_sha || github.sha }}
+          REASONING_EFFORT: xhigh
         run: |
-          if [ ! -s ~/.local/share/opencode/auth.json ]; then
-            echo "::warning::OpenCode auth file is missing or empty; skip OSS 
auth persistence."
+          if [ ! -s "$REVIEW_CONTEXT_DIR/review_prompt.txt" ] || [ ! -s 
"$REVIEW_CONTEXT_DIR/codex-events.jsonl" ]; then
+            echo "Review prompt or Codex JSONL event stream is missing; 
skipping Litefuse I/O recording."
             exit 0
           fi
 
-          if ! ossutil -i "$OSS_AK" -k "$OSS_SK" -e "$OSS_ENDPOINT" cp -f 
~/.local/share/opencode/auth.json "$OSS_AUTH_OBJECT"; then
-            echo "::warning::Failed to persist OpenCode auth to OSS; continue 
because review already finished."
+          helper="$RUNNER_TEMP/emit_litefuse_otel_io.py"
+          gh api \
+            -H "Accept: application/vnd.github.raw" \
+            
"repos/${REPO}/contents/.github/scripts/emit_litefuse_otel_io.py?ref=${HELPER_REF}"
 \
+            > "$helper"
+          chmod 700 "$helper"
+
+          python3 "$helper" \
+            --input-file "$REVIEW_CONTEXT_DIR/review_prompt.txt" \
+            --events-file "$REVIEW_CONTEXT_DIR/codex-events.jsonl" \
+            --output-file "$REVIEW_CONTEXT_DIR/codex-final-message.txt" \
+            --trace-name "doris-ai-review" \
+            --session-id "$GITHUB_RUN_ID" \
+            --repository "$REPO" \
+            --workflow "$GITHUB_WORKFLOW" \
+            --run-id "$GITHUB_RUN_ID" \
+            --pr-number "$PR_NUMBER" \
+            --head-sha "$HEAD_SHA" \
+            --base-sha "$BASE_SHA" \
+            --model "gpt-5.5" \
+            --reasoning-effort "$REASONING_EFFORT" \
+            --environment "github-actions" \
+            --min-observations 4 \
+            --min-step-observations 2 \
+            --verify-attempts 24 \
+            --verify-sleep-seconds 5 \
+            --verify
+
+      - name: Sync Codex auth back to OSS
+        if: ${{ always() }}
+        run: |
+          if [ ! -s "$CODEX_HOME/auth.json" ]; then
+            echo "No Codex auth file found; skipping OSS auth sync."
+            exit 0
           fi
 
+          jq -e '
+            .auth_mode == "chatgpt"
+            and (.tokens.access_token | type == "string" and length > 0)
+            and (.tokens.refresh_token | type == "string" and length > 0)
+          ' "$CODEX_HOME/auth.json" >/dev/null
+          ossutil -i "$OSS_AK" -k "$OSS_SK" -e "$OSS_ENDPOINT" cp -f 
"$CODEX_HOME/auth.json" "$OSS_CODEX_AUTH_OBJECT"
+        env:
+          OSS_AK: ${{ secrets.OSS_AK }}
+          OSS_SK: ${{ secrets.OSS_SK }}
+          OSS_ENDPOINT: oss-cn-hongkong.aliyuncs.com
+          OSS_CODEX_AUTH_OBJECT: oss://doris-community-ci/codex/auth.json
+
       - name: Comment PR on review failure
         if: ${{ always() && steps.review.outcome != 'success' }}
         env:
@@ -260,7 +347,7 @@ jobs:
         run: |
           error_msg="${REVIEW_FAILURE_REASON:-Review step was $REVIEW_OUTCOME 
(possibly timeout or cancelled)}"
           gh pr comment "${{ inputs.pr_number }}" --body "$(cat <<EOF
-          OpenCode automated review failed and did not complete.
+          Codex automated review failed and did not complete.
 
           Error: ${error_msg}
           Workflow run: ${RUN_URL}
@@ -276,5 +363,5 @@ jobs:
           REVIEW_OUTCOME: ${{ steps.review.outcome }}
         run: |
           error_msg="${REVIEW_FAILURE_REASON:-Review step was $REVIEW_OUTCOME 
(possibly timeout or cancelled)}"
-          echo "OpenCode automated review failed: ${error_msg}"
+          echo "Codex automated review failed: ${error_msg}"
           exit 1


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to