This is an automated email from the ASF dual-hosted git repository.

zclllyybb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new f48efb28db4 [fix](ci) Limit Litefuse ingestion payload size (#64328)
f48efb28db4 is described below

commit f48efb28db486837db5375e6fd0bcfc95ea67a05
Author: zclllyybb <[email protected]>
AuthorDate: Tue Jun 9 22:28:34 2026 +0800

    [fix](ci) Limit Litefuse ingestion payload size (#64328)
    
    ## Proposed changes
    
    The Codex review workflow can produce large JSONL traces. Sending every
    Litefuse observation in a single legacy ingestion request can exceed
    Langfuse Cloud's per-request payload limit and fail with HTTP 413.
    
    This patch:
    - splits Litefuse ingestion into size-bounded requests;
    - uses compact JSON encoding for ingestion payloads;
    - adds an explicit smaller cap for agent-message context event details;
    - shrinks any oversized single ingestion event before posting;
    - passes explicit 4MB payload and 4KB context caps from the workflow.
---
 .github/scripts/emit_litefuse_otel_io.py     | 159 ++++++++++++++++++++++++++-
 .github/workflows/opencode-review-runner.yml |   2 +
 2 files changed, 155 insertions(+), 6 deletions(-)

diff --git a/.github/scripts/emit_litefuse_otel_io.py 
b/.github/scripts/emit_litefuse_otel_io.py
index d8710342d58..4a93d5262d4 100644
--- a/.github/scripts/emit_litefuse_otel_io.py
+++ b/.github/scripts/emit_litefuse_otel_io.py
@@ -57,6 +57,12 @@ def json_attr(value):
     return json.dumps(value, ensure_ascii=False, sort_keys=True)
 
 
+def json_payload_bytes(value):
+    return len(
+        json.dumps(value, ensure_ascii=False, separators=(",", 
":")).encode("utf-8")
+    )
+
+
 def load_jsonl(path):
     events = []
     with open(path, "r", encoding="utf-8", errors="replace") as handle:
@@ -160,7 +166,9 @@ def event_context_payload(event, max_json_chars):
     return {key: value for key, value in payload.items() if value not in 
(None, "")}
 
 
-def build_agent_message_inputs(events, turn_input, max_json_chars):
+def build_agent_message_inputs(
+    events, turn_input, max_json_chars, max_context_json_chars
+):
     inputs = {}
     previous_agent_message = None
     context_events = []
@@ -184,7 +192,7 @@ def build_agent_message_inputs(events, turn_input, 
max_json_chars):
                     "event_count": len(context_events),
                 },
                 "events_since_previous_agent_message": [
-                    event_context_payload(context_event, max_json_chars)
+                    event_context_payload(context_event, 
max_context_json_chars)
                     for context_event in context_events
                 ],
             }
@@ -372,7 +380,7 @@ def build_ingestion_payload(args, input_text, output_text, 
events):
         if event.get("type") == "item.completed" and 
isinstance(event.get("item"), dict)
     ]
     agent_message_inputs = build_agent_message_inputs(
-        events, input_text, args.max_json_chars
+        events, input_text, args.max_json_chars, args.max_context_json_chars
     )
     child_count = len(completed_items) + 1
     root_end = now + (child_count + 2) * 1_000_000
@@ -479,11 +487,121 @@ def build_ingestion_payload(args, input_text, 
output_text, events):
     return trace_id, {"batch": batch}, len(batch) - 1
 
 
-def post_payload(endpoint, public_key, secret_key, payload):
+def compact_json_bytes(payload):
+    return json.dumps(payload, ensure_ascii=False, separators=(",", 
":")).encode("utf-8")
+
+
+def compact_context_event(event, max_chars):
+    compact = {}
+    for key in (
+        "event_type",
+        "line_number",
+        "item_id",
+        "item_type",
+        "status",
+        "server",
+        "tool",
+        "exit_code",
+    ):
+        if event.get(key) not in (None, ""):
+            compact[key] = event.get(key)
+    if event.get("command"):
+        compact["command"] = truncate_text(event.get("command") or "", 
max_chars)
+    for key in ("arguments", "result", "error", "payload", "item"):
+        if key in event:
+            compact[key] = truncate_json(event.get(key), max_chars)
+    for key in ("aggregated_output", "text", "prompt"):
+        if event.get(key):
+            compact[key] = truncate_text(event.get(key) or "", max_chars, 
tail=True)
+    return compact
+
+
+def shrink_event_for_payload(event, max_payload_bytes):
+    shrunk = json.loads(json.dumps(event, ensure_ascii=False))
+    body = shrunk.get("body") if isinstance(shrunk.get("body"), dict) else {}
+    event_name = body.get("name")
+
+    for max_chars in (2_000, 1_000, 500, 200, 80):
+        candidate = json.loads(json.dumps(shrunk, ensure_ascii=False))
+        candidate_body = candidate.get("body") if 
isinstance(candidate.get("body"), dict) else {}
+        input_object = candidate_body.get("input")
+        if isinstance(input_object, dict):
+            context_events = 
input_object.get("events_since_previous_agent_message")
+            if isinstance(context_events, list):
+                input_object["events_since_previous_agent_message"] = [
+                    compact_context_event(context_event, max_chars)
+                    for context_event in context_events
+                    if isinstance(context_event, dict)
+                ]
+            if isinstance(input_object.get("previous_agent_message"), dict):
+                previous_message = input_object["previous_agent_message"]
+                previous_message["text"] = truncate_text(
+                    previous_message.get("text") or "", max_chars
+                )
+            if input_object.get("turn_input"):
+                input_object["turn_input"] = truncate_text(
+                    input_object.get("turn_input") or "", max_chars
+                )
+            if not (
+                isinstance(context_events, list)
+                or "previous_agent_message" in input_object
+                or "turn_input" in input_object
+            ):
+                candidate_body["input"] = truncate_json(input_object, 
max_chars)
+        elif input_object not in (None, ""):
+            candidate_body["input"] = truncate_json(input_object, max_chars)
+
+        output_object = candidate_body.get("output")
+        if output_object not in (None, ""):
+            candidate_body["output"] = truncate_json(output_object, max_chars)
+        metadata = candidate_body.get("metadata")
+        if metadata not in (None, ""):
+            candidate_body["metadata"] = truncate_json(metadata, max_chars)
+
+        if json_payload_bytes({"batch": [candidate]}) <= max_payload_bytes:
+            return candidate
+
+    raise RuntimeError(
+        "Litefuse ingestion event is too large after truncation: "
+        f"{json_payload_bytes({'batch': [event]})} bytes > {max_payload_bytes} 
bytes; "
+        f"type={event.get('type')}, name={event_name}"
+    )
+
+
+def chunk_payload(payload, max_payload_bytes):
+    batch = payload.get("batch") or []
+    chunks = []
+    current = []
+    current_bytes = json_payload_bytes({"batch": current})
+
+    for event in batch:
+        event_payload = {"batch": [event]}
+        event_bytes = json_payload_bytes(event_payload)
+        if event_bytes > max_payload_bytes:
+            event = shrink_event_for_payload(event, max_payload_bytes)
+            event_payload = {"batch": [event]}
+            event_bytes = json_payload_bytes(event_payload)
+
+        candidate = {"batch": current + [event]}
+        candidate_bytes = json_payload_bytes(candidate)
+        if current and candidate_bytes > max_payload_bytes:
+            chunks.append(({"batch": current}, current_bytes))
+            current = [event]
+            current_bytes = event_bytes
+        else:
+            current.append(event)
+            current_bytes = candidate_bytes
+
+    if current:
+        chunks.append(({"batch": current}, current_bytes))
+    return chunks
+
+
+def post_payload_once(endpoint, public_key, secret_key, payload):
     auth = base64.b64encode(f"{public_key}:{secret_key}".encode()).decode()
     request = urllib.request.Request(
         endpoint,
-        data=json.dumps(payload).encode(),
+        data=compact_json_bytes(payload),
         headers={
             "Content-Type": "application/json",
             "Authorization": f"Basic {auth}",
@@ -504,6 +622,25 @@ def post_payload(endpoint, public_key, secret_key, 
payload):
         }
 
 
+def post_payload(endpoint, public_key, secret_key, payload, max_payload_bytes):
+    statuses = []
+    success_count = 0
+    request_sizes = []
+    chunks = chunk_payload(payload, max_payload_bytes)
+    for chunk, request_size in chunks:
+        status = post_payload_once(endpoint, public_key, secret_key, chunk)
+        statuses.append(status["status"])
+        success_count += int(status.get("success_count") or 0)
+        request_sizes.append(request_size)
+    return {
+        "statuses": statuses,
+        "request_count": len(chunks),
+        "request_sizes": request_sizes,
+        "max_request_size": max(request_sizes) if request_sizes else 0,
+        "success_count": success_count,
+    }
+
+
 def fetch_trace(base_url, public_key, secret_key, trace_id):
     auth = base64.b64encode(f"{public_key}:{secret_key}".encode()).decode()
     request = urllib.request.Request(
@@ -771,6 +908,8 @@ def parse_args():
     parser.add_argument("--max-input-chars", type=int, default=200_000)
     parser.add_argument("--max-output-chars", type=int, default=200_000)
     parser.add_argument("--max-json-chars", type=int, default=40_000)
+    parser.add_argument("--max-context-json-chars", type=int, default=4_000)
+    parser.add_argument("--max-payload-bytes", type=int, default=4_000_000)
     parser.add_argument("--verify", action="store_true")
     parser.add_argument("--dry-run", action="store_true")
     parser.add_argument("--verify-attempts", type=int, default=24)
@@ -808,12 +947,20 @@ def main():
     if args.dry_run:
         result["batch_count"] = len(payload["batch"])
         result["event_types"] = [event["type"] for event in 
payload["batch"][:10]]
+        chunks = chunk_payload(payload, args.max_payload_bytes)
+        result["request_count"] = len(chunks)
+        result["request_sizes"] = [request_size for _, request_size in chunks]
+        result["max_request_size"] = (
+            max(result["request_sizes"]) if result["request_sizes"] else 0
+        )
         print(json.dumps(result, sort_keys=True))
         return
 
     public_key = os.environ["LANGFUSE_PUBLIC_KEY"]
     secret_key = os.environ["LANGFUSE_SECRET_KEY"]
-    result["status"] = post_payload(endpoint, public_key, secret_key, payload)
+    result["status"] = post_payload(
+        endpoint, public_key, secret_key, payload, args.max_payload_bytes
+    )
 
     if args.verify:
         try:
diff --git a/.github/workflows/opencode-review-runner.yml 
b/.github/workflows/opencode-review-runner.yml
index d0310de1ba8..44d4db3d8e2 100644
--- a/.github/workflows/opencode-review-runner.yml
+++ b/.github/workflows/opencode-review-runner.yml
@@ -347,6 +347,8 @@ jobs:
             --model "gpt-5.5" \
             --reasoning-effort "$REASONING_EFFORT" \
             --environment "github-actions" \
+            --max-context-json-chars 4000 \
+            --max-payload-bytes 4000000 \
             --min-observations 4 \
             --min-step-observations 2 \
             --verify-attempts 24 \


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to