This is an automated email from the ASF dual-hosted git repository.
zclllyybb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new f48efb28db4 [fix](ci) Limit Litefuse ingestion payload size (#64328)
f48efb28db4 is described below
commit f48efb28db486837db5375e6fd0bcfc95ea67a05
Author: zclllyybb <[email protected]>
AuthorDate: Tue Jun 9 22:28:34 2026 +0800
[fix](ci) Limit Litefuse ingestion payload size (#64328)
## Proposed changes
The Codex review workflow can produce large JSONL traces. Sending every
Litefuse observation in a single legacy ingestion request can exceed
Langfuse Cloud's per-request payload limit and fail with HTTP 413.
This patch:
- splits Litefuse ingestion into size-bounded requests;
- uses compact JSON encoding for ingestion payloads;
- adds an explicit smaller cap for agent-message context event details;
- shrinks any oversized single ingestion event before posting;
- passes explicit 4MB payload and 4KB context caps from the workflow.
---
.github/scripts/emit_litefuse_otel_io.py | 159 ++++++++++++++++++++++++++-
.github/workflows/opencode-review-runner.yml | 2 +
2 files changed, 155 insertions(+), 6 deletions(-)
diff --git a/.github/scripts/emit_litefuse_otel_io.py
b/.github/scripts/emit_litefuse_otel_io.py
index d8710342d58..4a93d5262d4 100644
--- a/.github/scripts/emit_litefuse_otel_io.py
+++ b/.github/scripts/emit_litefuse_otel_io.py
@@ -57,6 +57,12 @@ def json_attr(value):
return json.dumps(value, ensure_ascii=False, sort_keys=True)
+def json_payload_bytes(value):
+ return len(
+ json.dumps(value, ensure_ascii=False, separators=(",",
":")).encode("utf-8")
+ )
+
+
def load_jsonl(path):
events = []
with open(path, "r", encoding="utf-8", errors="replace") as handle:
@@ -160,7 +166,9 @@ def event_context_payload(event, max_json_chars):
return {key: value for key, value in payload.items() if value not in
(None, "")}
-def build_agent_message_inputs(events, turn_input, max_json_chars):
+def build_agent_message_inputs(
+ events, turn_input, max_json_chars, max_context_json_chars
+):
inputs = {}
previous_agent_message = None
context_events = []
@@ -184,7 +192,7 @@ def build_agent_message_inputs(events, turn_input,
max_json_chars):
"event_count": len(context_events),
},
"events_since_previous_agent_message": [
- event_context_payload(context_event, max_json_chars)
+ event_context_payload(context_event,
max_context_json_chars)
for context_event in context_events
],
}
@@ -372,7 +380,7 @@ def build_ingestion_payload(args, input_text, output_text,
events):
if event.get("type") == "item.completed" and
isinstance(event.get("item"), dict)
]
agent_message_inputs = build_agent_message_inputs(
- events, input_text, args.max_json_chars
+ events, input_text, args.max_json_chars, args.max_context_json_chars
)
child_count = len(completed_items) + 1
root_end = now + (child_count + 2) * 1_000_000
@@ -479,11 +487,121 @@ def build_ingestion_payload(args, input_text,
output_text, events):
return trace_id, {"batch": batch}, len(batch) - 1
-def post_payload(endpoint, public_key, secret_key, payload):
+def compact_json_bytes(payload):
+ return json.dumps(payload, ensure_ascii=False, separators=(",",
":")).encode("utf-8")
+
+
+def compact_context_event(event, max_chars):
+ compact = {}
+ for key in (
+ "event_type",
+ "line_number",
+ "item_id",
+ "item_type",
+ "status",
+ "server",
+ "tool",
+ "exit_code",
+ ):
+ if event.get(key) not in (None, ""):
+ compact[key] = event.get(key)
+ if event.get("command"):
+ compact["command"] = truncate_text(event.get("command") or "",
max_chars)
+ for key in ("arguments", "result", "error", "payload", "item"):
+ if key in event:
+ compact[key] = truncate_json(event.get(key), max_chars)
+ for key in ("aggregated_output", "text", "prompt"):
+ if event.get(key):
+ compact[key] = truncate_text(event.get(key) or "", max_chars,
tail=True)
+ return compact
+
+
+def shrink_event_for_payload(event, max_payload_bytes):
+ shrunk = json.loads(json.dumps(event, ensure_ascii=False))
+ body = shrunk.get("body") if isinstance(shrunk.get("body"), dict) else {}
+ event_name = body.get("name")
+
+ for max_chars in (2_000, 1_000, 500, 200, 80):
+ candidate = json.loads(json.dumps(shrunk, ensure_ascii=False))
+ candidate_body = candidate.get("body") if
isinstance(candidate.get("body"), dict) else {}
+ input_object = candidate_body.get("input")
+ if isinstance(input_object, dict):
+ context_events =
input_object.get("events_since_previous_agent_message")
+ if isinstance(context_events, list):
+ input_object["events_since_previous_agent_message"] = [
+ compact_context_event(context_event, max_chars)
+ for context_event in context_events
+ if isinstance(context_event, dict)
+ ]
+ if isinstance(input_object.get("previous_agent_message"), dict):
+ previous_message = input_object["previous_agent_message"]
+ previous_message["text"] = truncate_text(
+ previous_message.get("text") or "", max_chars
+ )
+ if input_object.get("turn_input"):
+ input_object["turn_input"] = truncate_text(
+ input_object.get("turn_input") or "", max_chars
+ )
+ if not (
+ isinstance(context_events, list)
+ or "previous_agent_message" in input_object
+ or "turn_input" in input_object
+ ):
+ candidate_body["input"] = truncate_json(input_object,
max_chars)
+ elif input_object not in (None, ""):
+ candidate_body["input"] = truncate_json(input_object, max_chars)
+
+ output_object = candidate_body.get("output")
+ if output_object not in (None, ""):
+ candidate_body["output"] = truncate_json(output_object, max_chars)
+ metadata = candidate_body.get("metadata")
+ if metadata not in (None, ""):
+ candidate_body["metadata"] = truncate_json(metadata, max_chars)
+
+ if json_payload_bytes({"batch": [candidate]}) <= max_payload_bytes:
+ return candidate
+
+ raise RuntimeError(
+ "Litefuse ingestion event is too large after truncation: "
+ f"{json_payload_bytes({'batch': [event]})} bytes > {max_payload_bytes}
bytes; "
+ f"type={event.get('type')}, name={event_name}"
+ )
+
+
+def chunk_payload(payload, max_payload_bytes):
+ batch = payload.get("batch") or []
+ chunks = []
+ current = []
+ current_bytes = json_payload_bytes({"batch": current})
+
+ for event in batch:
+ event_payload = {"batch": [event]}
+ event_bytes = json_payload_bytes(event_payload)
+ if event_bytes > max_payload_bytes:
+ event = shrink_event_for_payload(event, max_payload_bytes)
+ event_payload = {"batch": [event]}
+ event_bytes = json_payload_bytes(event_payload)
+
+ candidate = {"batch": current + [event]}
+ candidate_bytes = json_payload_bytes(candidate)
+ if current and candidate_bytes > max_payload_bytes:
+ chunks.append(({"batch": current}, current_bytes))
+ current = [event]
+ current_bytes = event_bytes
+ else:
+ current.append(event)
+ current_bytes = candidate_bytes
+
+ if current:
+ chunks.append(({"batch": current}, current_bytes))
+ return chunks
+
+
+def post_payload_once(endpoint, public_key, secret_key, payload):
auth = base64.b64encode(f"{public_key}:{secret_key}".encode()).decode()
request = urllib.request.Request(
endpoint,
- data=json.dumps(payload).encode(),
+ data=compact_json_bytes(payload),
headers={
"Content-Type": "application/json",
"Authorization": f"Basic {auth}",
@@ -504,6 +622,25 @@ def post_payload(endpoint, public_key, secret_key,
payload):
}
+def post_payload(endpoint, public_key, secret_key, payload, max_payload_bytes):
+ statuses = []
+ success_count = 0
+ request_sizes = []
+ chunks = chunk_payload(payload, max_payload_bytes)
+ for chunk, request_size in chunks:
+ status = post_payload_once(endpoint, public_key, secret_key, chunk)
+ statuses.append(status["status"])
+ success_count += int(status.get("success_count") or 0)
+ request_sizes.append(request_size)
+ return {
+ "statuses": statuses,
+ "request_count": len(chunks),
+ "request_sizes": request_sizes,
+ "max_request_size": max(request_sizes) if request_sizes else 0,
+ "success_count": success_count,
+ }
+
+
def fetch_trace(base_url, public_key, secret_key, trace_id):
auth = base64.b64encode(f"{public_key}:{secret_key}".encode()).decode()
request = urllib.request.Request(
@@ -771,6 +908,8 @@ def parse_args():
parser.add_argument("--max-input-chars", type=int, default=200_000)
parser.add_argument("--max-output-chars", type=int, default=200_000)
parser.add_argument("--max-json-chars", type=int, default=40_000)
+ parser.add_argument("--max-context-json-chars", type=int, default=4_000)
+ parser.add_argument("--max-payload-bytes", type=int, default=4_000_000)
parser.add_argument("--verify", action="store_true")
parser.add_argument("--dry-run", action="store_true")
parser.add_argument("--verify-attempts", type=int, default=24)
@@ -808,12 +947,20 @@ def main():
if args.dry_run:
result["batch_count"] = len(payload["batch"])
result["event_types"] = [event["type"] for event in
payload["batch"][:10]]
+ chunks = chunk_payload(payload, args.max_payload_bytes)
+ result["request_count"] = len(chunks)
+ result["request_sizes"] = [request_size for _, request_size in chunks]
+ result["max_request_size"] = (
+ max(result["request_sizes"]) if result["request_sizes"] else 0
+ )
print(json.dumps(result, sort_keys=True))
return
public_key = os.environ["LANGFUSE_PUBLIC_KEY"]
secret_key = os.environ["LANGFUSE_SECRET_KEY"]
- result["status"] = post_payload(endpoint, public_key, secret_key, payload)
+ result["status"] = post_payload(
+ endpoint, public_key, secret_key, payload, args.max_payload_bytes
+ )
if args.verify:
try:
diff --git a/.github/workflows/opencode-review-runner.yml
b/.github/workflows/opencode-review-runner.yml
index d0310de1ba8..44d4db3d8e2 100644
--- a/.github/workflows/opencode-review-runner.yml
+++ b/.github/workflows/opencode-review-runner.yml
@@ -347,6 +347,8 @@ jobs:
--model "gpt-5.5" \
--reasoning-effort "$REASONING_EFFORT" \
--environment "github-actions" \
+ --max-context-json-chars 4000 \
+ --max-payload-bytes 4000000 \
--min-observations 4 \
--min-step-observations 2 \
--verify-attempts 24 \
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]