gmunozfe commented on code in PR #4094: URL: https://github.com/apache/incubator-kie-kogito-runtimes/pull/4094#discussion_r2580850902
########## api/kogito-services/src/test/java/org/kie/kogito/services/context/ProcessInstanceContextTest.java: ########## @@ -0,0 +1,490 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.kie.kogito.services.context; + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; + +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.slf4j.MDC; + +import static org.junit.jupiter.api.Assertions.*; + +/** + * Unit tests for ProcessInstanceContext utility class. + * Tests thread safety, context management, and async operations. + */ +class ProcessInstanceContextTest { + + private static final String TEST_PROCESS_ID = "test-process-123"; + private static final String TEST_PROCESS_ID_2 = "test-process-456"; + + @BeforeEach + void setUp() { + // Ensure clean state before each test + ProcessInstanceContext.clear(); + } + + @AfterEach + void tearDown() { + // Clean up after each test + ProcessInstanceContext.clear(); + ProcessInstanceContext.clearExtensions(); + } + + @Test + void testSetAndGetProcessInstanceId() { + // Initially no context (returns empty string) + assertEquals(ProcessInstanceContext.GENERAL_CONTEXT, ProcessInstanceContext.getProcessInstanceId()); + assertFalse(ProcessInstanceContext.hasContext()); + + // Set process instance ID + ProcessInstanceContext.setProcessInstanceId(TEST_PROCESS_ID); + + // Verify MDC is set + assertEquals(TEST_PROCESS_ID, ProcessInstanceContext.getProcessInstanceId()); + assertEquals(TEST_PROCESS_ID, MDC.get(ProcessInstanceContext.MDC_PROCESS_INSTANCE_KEY)); + assertTrue(ProcessInstanceContext.hasContext()); + } + + @Test + void testSetNullProcessInstanceId() { + ProcessInstanceContext.setProcessInstanceId(null); + + // Should use general context (empty string) + assertEquals(ProcessInstanceContext.GENERAL_CONTEXT, ProcessInstanceContext.getProcessInstanceId()); + assertEquals(ProcessInstanceContext.GENERAL_CONTEXT, MDC.get(ProcessInstanceContext.MDC_PROCESS_INSTANCE_KEY)); + assertFalse(ProcessInstanceContext.hasContext()); + } + + @Test + void testClearContext() { + ProcessInstanceContext.setProcessInstanceId(TEST_PROCESS_ID); + assertTrue(ProcessInstanceContext.hasContext()); + + ProcessInstanceContext.clear(); + + assertEquals(ProcessInstanceContext.GENERAL_CONTEXT, ProcessInstanceContext.getProcessInstanceId()); + assertEquals(ProcessInstanceContext.GENERAL_CONTEXT, MDC.get(ProcessInstanceContext.MDC_PROCESS_INSTANCE_KEY)); + assertFalse(ProcessInstanceContext.hasContext()); + } + + @Test + void testContextReplacement() { + ProcessInstanceContext.setProcessInstanceId(TEST_PROCESS_ID); + assertEquals(TEST_PROCESS_ID, ProcessInstanceContext.getProcessInstanceId()); + + // Set different ID - should replace + ProcessInstanceContext.setProcessInstanceId(TEST_PROCESS_ID_2); + assertEquals(TEST_PROCESS_ID_2, ProcessInstanceContext.getProcessInstanceId()); + + // Clear should remove all context + ProcessInstanceContext.clear(); + assertFalse(ProcessInstanceContext.hasContext()); + } + + @Test + void testWithProcessInstanceContextSupplier() { + assertFalse(ProcessInstanceContext.hasContext()); + + ProcessInstanceContext.setProcessInstanceId(TEST_PROCESS_ID); + String result; + try { + assertEquals(TEST_PROCESS_ID, ProcessInstanceContext.getProcessInstanceId()); + assertTrue(ProcessInstanceContext.hasContext()); + result = "test-result"; + } finally { + ProcessInstanceContext.clear(); + } + + assertEquals("test-result", result); + assertFalse(ProcessInstanceContext.hasContext()); + } + + @Test + void testWithProcessInstanceContextRunnable() { + assertFalse(ProcessInstanceContext.hasContext()); + AtomicReference<String> capturedId = new AtomicReference<>(); + + ProcessInstanceContext.setProcessInstanceId(TEST_PROCESS_ID); + try { + capturedId.set(ProcessInstanceContext.getProcessInstanceId()); + } finally { + ProcessInstanceContext.clear(); + } + + assertEquals(TEST_PROCESS_ID, capturedId.get()); + assertFalse(ProcessInstanceContext.hasContext()); + } + + @Test + void testWithProcessInstanceContextExceptionHandling() { + assertFalse(ProcessInstanceContext.hasContext()); + + assertThrows(RuntimeException.class, () -> { + ProcessInstanceContext.setProcessInstanceId(TEST_PROCESS_ID); + try { + assertTrue(ProcessInstanceContext.hasContext()); + throw new RuntimeException("Test exception"); + } finally { + ProcessInstanceContext.clear(); + } + }); + + // Context should be cleared even after exception + assertFalse(ProcessInstanceContext.hasContext()); + } + + @Test + void testAsyncContextPropagation() { + ProcessInstanceContext.setProcessInstanceId(TEST_PROCESS_ID); + + // Copy context for async operation + Map<String, String> contextMap = ProcessInstanceContext.copyContextForAsync(); + assertNotNull(contextMap); + assertEquals(TEST_PROCESS_ID, contextMap.get(ProcessInstanceContext.MDC_PROCESS_INSTANCE_KEY)); + + // Clear original context + ProcessInstanceContext.clear(); + assertFalse(ProcessInstanceContext.hasContext()); + + // Restore context from async + ProcessInstanceContext.setContextFromAsync(contextMap); + assertEquals(TEST_PROCESS_ID, ProcessInstanceContext.getProcessInstanceId()); + assertTrue(ProcessInstanceContext.hasContext()); + } + + @Test + void testAsyncContextWithNullMap() { + ProcessInstanceContext.setProcessInstanceId(TEST_PROCESS_ID); + assertTrue(ProcessInstanceContext.hasContext()); + + // Setting null context map should clear context + ProcessInstanceContext.setContextFromAsync(null); + assertFalse(ProcessInstanceContext.hasContext()); + assertEquals(ProcessInstanceContext.GENERAL_CONTEXT, ProcessInstanceContext.getProcessInstanceId()); + } + + @Test + void testThreadIsolation() throws InterruptedException { + ExecutorService executor = Executors.newFixedThreadPool(3); + CountDownLatch latch = new CountDownLatch(3); + AtomicReference<Exception> exception = new AtomicReference<>(); + + try { + // Thread 1: Set and verify process ID 1 + executor.submit(() -> { + try { + ProcessInstanceContext.setProcessInstanceId("process-1"); + Thread.sleep(100); // Allow other threads to run + assertEquals("process-1", ProcessInstanceContext.getProcessInstanceId()); + } catch (Exception e) { + exception.set(e); + } finally { + ProcessInstanceContext.clear(); + latch.countDown(); + } + }); + + // Thread 2: Set and verify process ID 2 + executor.submit(() -> { + try { + ProcessInstanceContext.setProcessInstanceId("process-2"); + Thread.sleep(100); // Allow other threads to run + assertEquals("process-2", ProcessInstanceContext.getProcessInstanceId()); + } catch (Exception e) { + exception.set(e); + } finally { + ProcessInstanceContext.clear(); + latch.countDown(); + } + }); + + // Thread 3: No context set + executor.submit(() -> { + try { + Thread.sleep(100); // Allow other threads to run + // In a new thread without context, should return empty string + assertFalse(ProcessInstanceContext.hasContext()); + } catch (Exception e) { + exception.set(e); + } finally { + latch.countDown(); + } + }); + + assertTrue(latch.await(5, TimeUnit.SECONDS)); + assertNull(exception.get()); + + } finally { + executor.shutdown(); + } + } + + @Test + void testAsyncOperationWithContextPropagation() throws Exception { + ProcessInstanceContext.setProcessInstanceId(TEST_PROCESS_ID); + Map<String, String> contextMap = ProcessInstanceContext.copyContextForAsync(); + + CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> { + // Restore context in async thread + ProcessInstanceContext.setContextFromAsync(contextMap); + try { + return ProcessInstanceContext.getProcessInstanceId(); + } finally { + ProcessInstanceContext.clear(); + } + }); + + String result = future.get(5, TimeUnit.SECONDS); + assertEquals(TEST_PROCESS_ID, result); + + // Original thread context should still be intact + assertEquals(TEST_PROCESS_ID, ProcessInstanceContext.getProcessInstanceId()); + } + + @Test + void testMdcIntegration() { + // Verify MDC is properly managed + ProcessInstanceContext.setProcessInstanceId(TEST_PROCESS_ID); + + assertEquals(TEST_PROCESS_ID, MDC.get(ProcessInstanceContext.MDC_PROCESS_INSTANCE_KEY)); + + ProcessInstanceContext.clear(); + + assertEquals(ProcessInstanceContext.GENERAL_CONTEXT, MDC.get(ProcessInstanceContext.MDC_PROCESS_INSTANCE_KEY)); + } + + @Test + void testGeneralContextBehavior() { + ProcessInstanceContext.setProcessInstanceId(ProcessInstanceContext.GENERAL_CONTEXT); + + assertFalse(ProcessInstanceContext.hasContext()); + assertEquals(ProcessInstanceContext.GENERAL_CONTEXT, ProcessInstanceContext.getProcessInstanceId()); + assertEquals(ProcessInstanceContext.GENERAL_CONTEXT, MDC.get(ProcessInstanceContext.MDC_PROCESS_INSTANCE_KEY)); + } + + @Test + void testHasContextAccuracy() { + // Initially no context + assertFalse(ProcessInstanceContext.hasContext()); + + // Set general context explicitly - should not count as having context + ProcessInstanceContext.setProcessInstanceId(ProcessInstanceContext.GENERAL_CONTEXT); + assertFalse(ProcessInstanceContext.hasContext()); + + // Set specific context + ProcessInstanceContext.setProcessInstanceId(TEST_PROCESS_ID); + assertTrue(ProcessInstanceContext.hasContext()); + + // Clear should remove context + ProcessInstanceContext.clear(); + assertFalse(ProcessInstanceContext.hasContext()); + } + + @Test + void testMdcConsistencyAfterOperations() { + // Test that MDC always stays consistent after various operations + + // Set process ID + ProcessInstanceContext.setProcessInstanceId(TEST_PROCESS_ID); + assertEquals(TEST_PROCESS_ID, MDC.get(ProcessInstanceContext.MDC_PROCESS_INSTANCE_KEY)); + + // Clear should reset to general context + ProcessInstanceContext.clear(); + assertEquals(ProcessInstanceContext.GENERAL_CONTEXT, MDC.get(ProcessInstanceContext.MDC_PROCESS_INSTANCE_KEY)); + + // Setting null should also result in general context + ProcessInstanceContext.setProcessInstanceId(null); + assertEquals(ProcessInstanceContext.GENERAL_CONTEXT, MDC.get(ProcessInstanceContext.MDC_PROCESS_INSTANCE_KEY)); + } + + @Test + void testOptimizationSkipsSameValue() { + // Test that setting the same value twice doesn't update MDC unnecessarily + ProcessInstanceContext.setProcessInstanceId(TEST_PROCESS_ID); + String mdcValue1 = MDC.get(ProcessInstanceContext.MDC_PROCESS_INSTANCE_KEY); + + // Set same value again + ProcessInstanceContext.setProcessInstanceId(TEST_PROCESS_ID); + String mdcValue2 = MDC.get(ProcessInstanceContext.MDC_PROCESS_INSTANCE_KEY); + + assertEquals(TEST_PROCESS_ID, mdcValue1); + assertEquals(TEST_PROCESS_ID, mdcValue2); + assertEquals(mdcValue1, mdcValue2); + } + + @Test + void testConcurrentAccess() throws InterruptedException { + int threadCount = 10; + int iterationsPerThread = 1000; + ExecutorService executor = Executors.newFixedThreadPool(threadCount); + CountDownLatch latch = new CountDownLatch(threadCount); + AtomicReference<Exception> exception = new AtomicReference<>(); + + for (int t = 0; t < threadCount; t++) { + final int threadId = t; + executor.submit(() -> { + try { + for (int i = 0; i < iterationsPerThread; i++) { + String processId = "thread-" + threadId + "-process-" + i; + ProcessInstanceContext.setProcessInstanceId(processId); + assertEquals(processId, ProcessInstanceContext.getProcessInstanceId()); + assertTrue(ProcessInstanceContext.hasContext()); + ProcessInstanceContext.clear(); + } + } catch (Exception e) { + exception.set(e); + } finally { + latch.countDown(); + } + }); + } + + assertTrue(latch.await(30, TimeUnit.SECONDS)); + assertNull(exception.get()); + + executor.shutdown(); + } + + @Test + void testExtensionRegistrationAndRestoration() { + // Create a mock extension + MockContextExtension extension = new MockContextExtension("test."); + + // Register the extension + ProcessInstanceContext.registerExtension("test.", extension); + + // Set up context with both core and extension keys + ProcessInstanceContext.setProcessInstanceId(TEST_PROCESS_ID); + MDC.put("test.key1", "value1"); + MDC.put("test.key2", "value2"); + MDC.put("other.key", "should-not-be-restored"); + + // Copy context for async + Map<String, String> contextMap = ProcessInstanceContext.copyContextForAsync(); + + // Clear MDC + ProcessInstanceContext.clear(); + MDC.clear(); + + // Restore context from async + ProcessInstanceContext.setContextFromAsync(contextMap); + + // Verify core context is restored + assertEquals(TEST_PROCESS_ID, ProcessInstanceContext.getProcessInstanceId()); + + // Verify extension was called to restore its keys + assertTrue(extension.restoreCalled); + assertEquals(2, extension.restoredKeys.size()); + assertEquals("value1", extension.restoredKeys.get("test.key1")); + assertEquals("value2", extension.restoredKeys.get("test.key2")); + assertNull(extension.restoredKeys.get("other.key")); + } + + @Test + void testExtensionRegistrationValidation() { + MockContextExtension extension = new MockContextExtension("test."); + + // Test null prefix + assertThrows(IllegalArgumentException.class, () -> { + ProcessInstanceContext.registerExtension(null, extension); + }); + + // Test empty prefix + assertThrows(IllegalArgumentException.class, () -> { + ProcessInstanceContext.registerExtension("", extension); + }); + + // Test null extension + assertThrows(IllegalArgumentException.class, () -> { + ProcessInstanceContext.registerExtension("test.", null); + }); + } + + @Test + void testExtensionExceptionHandling() { + // Create an extension that throws exceptions + ContextExtension faultyExtension = new ContextExtension() { + @Override + public String getMdcKeyPrefix() { + return "faulty."; + } + + @Override + public void restoreKeys(Map<String, String> preservedKeys) { + throw new RuntimeException("Extension failure"); + } + }; + + ProcessInstanceContext.registerExtension("faulty.", faultyExtension); + + // Set up context + ProcessInstanceContext.setProcessInstanceId(TEST_PROCESS_ID); + MDC.put("faulty.key", "value"); + + Map<String, String> contextMap = ProcessInstanceContext.copyContextForAsync(); + ProcessInstanceContext.clear(); + + // This should not throw, even though the extension fails + assertDoesNotThrow(() -> { + ProcessInstanceContext.setContextFromAsync(contextMap); + }); + + // Core functionality should still work + assertEquals(TEST_PROCESS_ID, ProcessInstanceContext.getProcessInstanceId()); + } + + /** + * Mock implementation of ContextExtension for testing. + */ + private static class MockContextExtension implements ContextExtension { + private final String prefix; + boolean restoreCalled = false; + Map<String, String> restoredKeys = new HashMap<>(); + + MockContextExtension(String prefix) { + this.prefix = prefix; + } + + @Override + public String getMdcKeyPrefix() { + return prefix; + } + + @Override + public void restoreKeys(Map<String, String> preservedKeys) { + restoreCalled = true; + restoredKeys.clear(); + restoredKeys.putAll(preservedKeys); + + // Actually restore the keys to MDC for testing + preservedKeys.forEach(MDC::put); + } + } + Review Comment: As mentioned before, missing test for calling `setContextFromAsync` while MDC already has content and check that it's deleted. ########## kogito-test-utils/src/main/java/org/kie/kogito/test/utils/JsonProcessInstanceLogAnalyzer.java: ########## @@ -0,0 +1,559 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.kie.kogito.test.utils; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.time.LocalDateTime; +import java.time.format.DateTimeFormatter; +import java.time.format.DateTimeParseException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.fail; + +/** + * Utility class for analyzing process instance aware logging in JSON format. + * Supports parsing JSON log format with MDC fields including processInstanceId. + * This class replaces pipe-delimited format parsing for machine-consumable JSON logs. + */ +public class JsonProcessInstanceLogAnalyzer { + + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + + // Common timestamp patterns in JSON logs + private static final DateTimeFormatter[] TIMESTAMP_FORMATTERS = { + DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'"), + DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSSX"), + DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSS"), + DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS"), + DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss,SSS"), + DateTimeFormatter.ISO_LOCAL_DATE_TIME + }; + + /** + * Represents a single JSON log entry with all its components. + */ + public static class JsonLogEntry { + public final LocalDateTime timestamp; + public final String level; + public final String loggerName; + public final String message; + public final Map<String, String> mdc; + public final String threadName; + public final String sequenceNumber; + public final JsonNode rawJson; + + public JsonLogEntry(LocalDateTime timestamp, String level, String loggerName, + String message, Map<String, String> mdc, String threadName, + String sequenceNumber, JsonNode rawJson) { + this.timestamp = timestamp; + this.level = level != null ? level : "INFO"; + this.loggerName = loggerName != null ? loggerName : "unknown.logger"; + this.message = message != null ? message : ""; + this.mdc = mdc != null ? new HashMap<>(mdc) : new HashMap<>(); + this.threadName = threadName; + this.sequenceNumber = sequenceNumber; + this.rawJson = rawJson; + } + + /** + * Get the process instance ID from MDC. + */ + public String getProcessInstanceId() { + return mdc.get("processInstanceId"); + } + + /** + * Check if this log entry has a process instance ID. + */ + public boolean hasProcessInstance() { + String processInstanceId = getProcessInstanceId(); + return processInstanceId != null && !processInstanceId.trim().isEmpty(); + } + + /** + * Check if this log entry is general context (no process instance ID). + */ + public boolean isGeneralContext() { + return !hasProcessInstance(); + } + + /** + * Get trace ID from MDC if available. + */ + public String getTraceId() { + return mdc.get("traceId"); + } + + /** + * Get span ID from MDC if available. + */ + public String getSpanId() { + return mdc.get("spanId"); + } + + @Override + public String toString() { + return String.format("JsonLogEntry{timestamp=%s, level=%s, processInstanceId=%s, logger=%s, message=%s}", + timestamp, level, getProcessInstanceId(), loggerName, message); + } + } + + /** + * Statistics about JSON log entries for analysis. + */ + public static class JsonLogStatistics { + public final long totalLogs; + public final long processSpecificLogs; + public final long generalContextLogs; + public final Map<String, Long> logsByProcessInstance; + public final Map<String, Long> logsByLevel; + public final Map<String, Long> logsByLogger; + public final long logsWithTracing; + + public JsonLogStatistics(List<JsonLogEntry> entries) { + this.totalLogs = entries.size(); + this.processSpecificLogs = entries.stream().filter(JsonLogEntry::hasProcessInstance).count(); + this.generalContextLogs = entries.stream().filter(JsonLogEntry::isGeneralContext).count(); + this.logsByProcessInstance = entries.stream() + .collect(Collectors.groupingBy( + entry -> entry.hasProcessInstance() ? entry.getProcessInstanceId() : "", + Collectors.counting())); + this.logsByLevel = entries.stream() + .collect(Collectors.groupingBy(entry -> entry.level, Collectors.counting())); + this.logsByLogger = entries.stream() + .collect(Collectors.groupingBy(entry -> entry.loggerName, Collectors.counting())); + this.logsWithTracing = entries.stream() + .filter(entry -> entry.getTraceId() != null) + .count(); + } + + @Override + public String toString() { + return String.format( + "JsonLogStatistics{total=%d, processSpecific=%d, general=%d, byProcess=%s, byLevel=%s, withTracing=%d}", + totalLogs, processSpecificLogs, generalContextLogs, logsByProcessInstance, logsByLevel, logsWithTracing); + } + } + + /** + * Parse JSON log file with multiline support and resilient error handling. + */ + public static List<JsonLogEntry> parseJsonLogFile(Path logFile) throws IOException { + List<String> lines = Files.readAllLines(logFile); + List<JsonLogEntry> entries = new ArrayList<>(); + AtomicInteger malformedLineCount = new AtomicInteger(0); + AtomicInteger lineNumber = new AtomicInteger(0); + + for (String line : lines) { + lineNumber.incrementAndGet(); + + if (line.trim().isEmpty()) { + continue; // Skip empty lines + } + + try { + JsonLogEntry entry = parseJsonLogLine(line, malformedLineCount, lineNumber.get()); + if (entry != null) { + entries.add(entry); + } + } catch (Exception e) { + malformedLineCount.incrementAndGet(); + System.err.printf("Warning: Failed to parse JSON log line %d: %s - Error: %s%n", + lineNumber.get(), line.substring(0, Math.min(100, line.length())), e.getMessage()); + } + } + + // Log statistics about parsing + if (malformedLineCount.get() > 0) { + System.err.printf("Warning: Encountered %d malformed/problematic lines out of %d total lines while parsing %s%n", + malformedLineCount.get(), lineNumber.get(), logFile.getFileName()); + } + + return entries; + } + + /** + * Parse a single JSON log line into a JsonLogEntry. + */ + private static JsonLogEntry parseJsonLogLine(String line, AtomicInteger malformedLineCount, int lineNumber) { + try { + JsonNode jsonNode = OBJECT_MAPPER.readTree(line); + + // Extract timestamp + LocalDateTime timestamp = parseTimestamp(jsonNode, malformedLineCount); + + // Extract standard fields + String level = getTextValue(jsonNode, "level"); + String loggerName = getTextValue(jsonNode, "loggerName"); + String message = getTextValue(jsonNode, "message"); + String threadName = getTextValue(jsonNode, "thread"); + String sequenceNumber = getTextValue(jsonNode, "sequenceNumber"); + + // Extract MDC fields + Map<String, String> mdc = extractMdcFields(jsonNode); + + return new JsonLogEntry(timestamp, level, loggerName, message, mdc, threadName, sequenceNumber, jsonNode); + + } catch (JsonProcessingException e) { + // Try fallback parsing for non-JSON lines + return tryFallbackParsing(line, malformedLineCount, lineNumber); + } + } + + /** + * Extract MDC fields from JSON log entry. + */ + private static Map<String, String> extractMdcFields(JsonNode jsonNode) { + Map<String, String> mdc = new HashMap<>(); + + // Look for MDC in common field names + JsonNode mdcNode = jsonNode.get("mdc"); + if (mdcNode == null) { + mdcNode = jsonNode.get("MDC"); + } + if (mdcNode == null) { + mdcNode = jsonNode.get("context"); + } + + if (mdcNode != null && mdcNode.isObject()) { + mdcNode.fields().forEachRemaining(entry -> { + String key = entry.getKey(); + JsonNode value = entry.getValue(); + if (value.isTextual()) { + mdc.put(key, value.asText()); + } else if (!value.isNull()) { + mdc.put(key, value.toString()); + } + }); + } + + // Also check for direct MDC fields at root level (some formats) + String[] commonMdcFields = { "processInstanceId", "traceId", "spanId", "userId", "correlationId" }; + for (String field : commonMdcFields) { + JsonNode fieldNode = jsonNode.get(field); + if (fieldNode != null && fieldNode.isTextual()) { + mdc.put(field, fieldNode.asText()); + } + } + + return mdc; + } + + /** + * Parse timestamp from JSON node using multiple format attempts. + */ + private static LocalDateTime parseTimestamp(JsonNode jsonNode, AtomicInteger malformedLineCount) { + String timestampStr = getTextValue(jsonNode, "timestamp"); + if (timestampStr == null) { + timestampStr = getTextValue(jsonNode, "@timestamp"); + } + if (timestampStr == null) { + timestampStr = getTextValue(jsonNode, "time"); + } + + if (timestampStr != null) { + for (DateTimeFormatter formatter : TIMESTAMP_FORMATTERS) { + try { + return LocalDateTime.parse(timestampStr, formatter); + } catch (DateTimeParseException e) { + // Try next formatter + } + } + } + + // Fallback to current time + malformedLineCount.incrementAndGet(); + return LocalDateTime.now(); + } + + /** + * Get text value from JSON node, handling null checks. + */ + private static String getTextValue(JsonNode node, String fieldName) { + JsonNode fieldNode = node.get(fieldName); + return fieldNode != null && !fieldNode.isNull() ? fieldNode.asText() : null; + } + + /** + * Try fallback parsing for non-JSON lines (stack traces, etc.). + */ + private static JsonLogEntry tryFallbackParsing(String line, AtomicInteger malformedLineCount, int lineNumber) { + // This could be a stack trace or multiline continuation + // For now, create a simple entry + malformedLineCount.incrementAndGet(); + + Map<String, String> emptyMdc = new HashMap<>(); + return new JsonLogEntry( + LocalDateTime.now(), + "INFO", + "unknown.logger", + line, + emptyMdc, Review Comment: you're using an empty MDC to indicate that this is not a proper JSON but a Fallback entry, no? ########## quarkus/addons/token-exchange/runtime/src/main/java/org/kie/kogito/addons/quarkus/token/exchange/cache/TokenPolicyManager.java: ########## @@ -55,25 +56,37 @@ public long expireAfterRead(String key, CachedTokens value, long currentTime, lo } /** - * Calculate time to expiration based on token's actual expiration time minus proactive refresh buffer + * Calculate time to expiration based on token's actual expiration time minus proactive refresh buffer. + * This method sets the process instance context from the cache key for proper logging. */ private static long calculateTimeToExpiration(String cacheKey, CachedTokens tokens) { - String authName = CacheUtils.extractAuthNameFromCacheKey(cacheKey); - long proactiveRefreshSeconds = Math.max(0, ConfigReaderUtils.getProactiveRefreshSeconds(authName)); + // Extract process instance ID from cache key and set context for logging + String processInstanceId = CacheUtils.extractProcessInstanceIdFromCacheKey(cacheKey); + if (processInstanceId != null && !processInstanceId.isEmpty()) { + ProcessInstanceContext.setProcessInstanceId(processInstanceId); + } - long currentTimeSeconds = TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis()); - long tokenExpirationSeconds = tokens.expirationTime(); + try { + String authName = CacheUtils.extractAuthNameFromCacheKey(cacheKey); + long proactiveRefreshSeconds = Math.max(0, ConfigReaderUtils.getProactiveRefreshSeconds(authName)); - // Schedule refresh proactiveRefreshSeconds before actual expiration - long refreshTimeSeconds = tokenExpirationSeconds - proactiveRefreshSeconds; - long timeToRefreshSeconds = Math.max(0, refreshTimeSeconds - currentTimeSeconds); + long currentTimeSeconds = TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis()); + long tokenExpirationSeconds = tokens.expirationTime(); - // Convert to nanoseconds for Caffeine - long timeToRefreshNanos = TimeUnit.SECONDS.toNanos(timeToRefreshSeconds); + // Schedule refresh proactiveRefreshSeconds before actual expiration + long refreshTimeSeconds = tokenExpirationSeconds - proactiveRefreshSeconds; + long timeToRefreshSeconds = Math.max(0, refreshTimeSeconds - currentTimeSeconds); - LOGGER.info("Token for key '{}' will be refreshed in {} seconds (expires at {}, refresh buffer {} seconds)", - cacheKey, timeToRefreshSeconds, tokenExpirationSeconds, proactiveRefreshSeconds); + // Convert to nanoseconds for Caffeine + long timeToRefreshNanos = TimeUnit.SECONDS.toNanos(timeToRefreshSeconds); - return timeToRefreshNanos; + LOGGER.info("Token for key '{}' will be refreshed in {} seconds (expires at {}, refresh buffer {} seconds)", + cacheKey, timeToRefreshSeconds, tokenExpirationSeconds, proactiveRefreshSeconds); Review Comment: You could also log currentTimeSeconds, but up to you ########## api/kogito-services/src/main/java/org/kie/kogito/services/context/ProcessInstanceContext.java: ########## @@ -0,0 +1,214 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.kie.kogito.services.context; + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.slf4j.MDC; + +/** + * Utility class for managing process instance context in logging operations. + * This class uses SLF4J's Mapped Diagnostic Context (MDC) to ensure process instance IDs + * are automatically included in log messages. + * + * When no process instance is available, an empty string is used as the default value + * to provide cleaner formatting and easier searching in log aggregation systems. + * + * Thread Safety: This class is thread-safe and properly manages context isolation + * between different threads using MDC's inherent ThreadLocal-based storage. + */ +public final class ProcessInstanceContext { + + private static final Logger LOGGER = LoggerFactory.getLogger(ProcessInstanceContext.class); + + private static final Map<String, ContextExtension> EXTENSIONS = new ConcurrentHashMap<>(); + + /** + * MDC key used to store the process instance ID. + * This key should be referenced in logging configurations. + */ + public static final String MDC_PROCESS_INSTANCE_KEY = "processInstanceId"; + + /** + * Default value used when no process instance ID is available. + * Empty string for cleaner log formatting and easier searching. + */ + public static final String GENERAL_CONTEXT = ""; + + // Private constructor to prevent instantiation + private ProcessInstanceContext() { + // Utility class + } + + /** + * Sets the process instance ID for the current thread context. + * This method updates SLF4J MDC and only performs the update if the value changes. + * + * @param processInstanceId the process instance ID to set, or null to use general context + */ + public static void setProcessInstanceId(String processInstanceId) { + String effectiveId = processInstanceId != null ? processInstanceId : GENERAL_CONTEXT; + + // Only update MDC if the value is changing (optimization) + String currentId = MDC.get(MDC_PROCESS_INSTANCE_KEY); + if (!effectiveId.equals(currentId)) { + MDC.put(MDC_PROCESS_INSTANCE_KEY, effectiveId); + } + } + + /** + * Gets the current process instance ID from MDC. + * + * @return the current process instance ID, or empty string if no context is set + */ + public static String getProcessInstanceId() { + String id = MDC.get(MDC_PROCESS_INSTANCE_KEY); + return id != null ? id : GENERAL_CONTEXT; + } + + /** + * Clears the process instance context for the current thread. + * This resets the MDC to the general context (empty string). + */ + public static void clear() { + MDC.put(MDC_PROCESS_INSTANCE_KEY, GENERAL_CONTEXT); Review Comment: Why reset to empty string instead of removing the key from the context: ```suggestion MDC.remove(MDC_PROCESS_INSTANCE_KEY); ``` I think it's a better pattern to reset MDC. ########## kogito-test-utils/src/main/java/org/kie/kogito/test/utils/JsonProcessInstanceLogAnalyzer.java: ########## @@ -0,0 +1,559 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.kie.kogito.test.utils; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.time.LocalDateTime; +import java.time.format.DateTimeFormatter; +import java.time.format.DateTimeParseException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.fail; + +/** + * Utility class for analyzing process instance aware logging in JSON format. + * Supports parsing JSON log format with MDC fields including processInstanceId. + * This class replaces pipe-delimited format parsing for machine-consumable JSON logs. + */ +public class JsonProcessInstanceLogAnalyzer { + + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + + // Common timestamp patterns in JSON logs + private static final DateTimeFormatter[] TIMESTAMP_FORMATTERS = { + DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'"), + DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSSX"), + DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSS"), + DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS"), + DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss,SSS"), + DateTimeFormatter.ISO_LOCAL_DATE_TIME + }; + + /** + * Represents a single JSON log entry with all its components. + */ + public static class JsonLogEntry { + public final LocalDateTime timestamp; + public final String level; + public final String loggerName; + public final String message; + public final Map<String, String> mdc; + public final String threadName; + public final String sequenceNumber; + public final JsonNode rawJson; + + public JsonLogEntry(LocalDateTime timestamp, String level, String loggerName, + String message, Map<String, String> mdc, String threadName, + String sequenceNumber, JsonNode rawJson) { + this.timestamp = timestamp; + this.level = level != null ? level : "INFO"; + this.loggerName = loggerName != null ? loggerName : "unknown.logger"; + this.message = message != null ? message : ""; + this.mdc = mdc != null ? new HashMap<>(mdc) : new HashMap<>(); + this.threadName = threadName; + this.sequenceNumber = sequenceNumber; + this.rawJson = rawJson; + } + + /** + * Get the process instance ID from MDC. + */ + public String getProcessInstanceId() { + return mdc.get("processInstanceId"); + } + + /** + * Check if this log entry has a process instance ID. + */ + public boolean hasProcessInstance() { + String processInstanceId = getProcessInstanceId(); + return processInstanceId != null && !processInstanceId.trim().isEmpty(); + } + + /** + * Check if this log entry is general context (no process instance ID). + */ + public boolean isGeneralContext() { + return !hasProcessInstance(); + } + + /** + * Get trace ID from MDC if available. + */ + public String getTraceId() { + return mdc.get("traceId"); + } + + /** + * Get span ID from MDC if available. + */ + public String getSpanId() { + return mdc.get("spanId"); + } + + @Override + public String toString() { + return String.format("JsonLogEntry{timestamp=%s, level=%s, processInstanceId=%s, logger=%s, message=%s}", + timestamp, level, getProcessInstanceId(), loggerName, message); + } + } + + /** + * Statistics about JSON log entries for analysis. + */ + public static class JsonLogStatistics { + public final long totalLogs; + public final long processSpecificLogs; + public final long generalContextLogs; + public final Map<String, Long> logsByProcessInstance; + public final Map<String, Long> logsByLevel; + public final Map<String, Long> logsByLogger; + public final long logsWithTracing; + + public JsonLogStatistics(List<JsonLogEntry> entries) { + this.totalLogs = entries.size(); + this.processSpecificLogs = entries.stream().filter(JsonLogEntry::hasProcessInstance).count(); + this.generalContextLogs = entries.stream().filter(JsonLogEntry::isGeneralContext).count(); + this.logsByProcessInstance = entries.stream() + .collect(Collectors.groupingBy( + entry -> entry.hasProcessInstance() ? entry.getProcessInstanceId() : "", + Collectors.counting())); + this.logsByLevel = entries.stream() + .collect(Collectors.groupingBy(entry -> entry.level, Collectors.counting())); + this.logsByLogger = entries.stream() + .collect(Collectors.groupingBy(entry -> entry.loggerName, Collectors.counting())); + this.logsWithTracing = entries.stream() + .filter(entry -> entry.getTraceId() != null) + .count(); + } + + @Override + public String toString() { + return String.format( + "JsonLogStatistics{total=%d, processSpecific=%d, general=%d, byProcess=%s, byLevel=%s, withTracing=%d}", + totalLogs, processSpecificLogs, generalContextLogs, logsByProcessInstance, logsByLevel, logsWithTracing); + } + } + + /** + * Parse JSON log file with multiline support and resilient error handling. + */ + public static List<JsonLogEntry> parseJsonLogFile(Path logFile) throws IOException { + List<String> lines = Files.readAllLines(logFile); Review Comment: Reading the entire file in memory works fine for small files. For performance enhancement, probably it's better to read line by line (as it's done below in a stream, instead of loading in memory the whole log file here), wdyt?: ``` try (Stream<String> lines = Files.lines(logFile)) { lines.forEach(line -> { ``` ########## api/kogito-services/src/main/java/org/kie/kogito/services/context/ProcessInstanceContext.java: ########## @@ -0,0 +1,214 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.kie.kogito.services.context; + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.slf4j.MDC; + +/** + * Utility class for managing process instance context in logging operations. + * This class uses SLF4J's Mapped Diagnostic Context (MDC) to ensure process instance IDs + * are automatically included in log messages. + * + * When no process instance is available, an empty string is used as the default value + * to provide cleaner formatting and easier searching in log aggregation systems. + * + * Thread Safety: This class is thread-safe and properly manages context isolation + * between different threads using MDC's inherent ThreadLocal-based storage. + */ +public final class ProcessInstanceContext { + + private static final Logger LOGGER = LoggerFactory.getLogger(ProcessInstanceContext.class); + + private static final Map<String, ContextExtension> EXTENSIONS = new ConcurrentHashMap<>(); + + /** + * MDC key used to store the process instance ID. + * This key should be referenced in logging configurations. + */ + public static final String MDC_PROCESS_INSTANCE_KEY = "processInstanceId"; + + /** + * Default value used when no process instance ID is available. + * Empty string for cleaner log formatting and easier searching. + */ + public static final String GENERAL_CONTEXT = ""; + + // Private constructor to prevent instantiation + private ProcessInstanceContext() { + // Utility class + } + + /** + * Sets the process instance ID for the current thread context. + * This method updates SLF4J MDC and only performs the update if the value changes. + * + * @param processInstanceId the process instance ID to set, or null to use general context + */ + public static void setProcessInstanceId(String processInstanceId) { + String effectiveId = processInstanceId != null ? processInstanceId : GENERAL_CONTEXT; + + // Only update MDC if the value is changing (optimization) + String currentId = MDC.get(MDC_PROCESS_INSTANCE_KEY); + if (!effectiveId.equals(currentId)) { + MDC.put(MDC_PROCESS_INSTANCE_KEY, effectiveId); + } + } + + /** + * Gets the current process instance ID from MDC. + * + * @return the current process instance ID, or empty string if no context is set + */ + public static String getProcessInstanceId() { + String id = MDC.get(MDC_PROCESS_INSTANCE_KEY); + return id != null ? id : GENERAL_CONTEXT; + } + + /** + * Clears the process instance context for the current thread. + * This resets the MDC to the general context (empty string). + */ + public static void clear() { + MDC.put(MDC_PROCESS_INSTANCE_KEY, GENERAL_CONTEXT); + } + + /** + * Checks if a process instance context is currently set. + * + * @return true if a process instance context is set, false otherwise + */ + public static boolean hasContext() { + String id = MDC.get(MDC_PROCESS_INSTANCE_KEY); + return id != null && !GENERAL_CONTEXT.equals(id); + } + + /** + * Gets a copy of the current MDC context map for propagation to other threads. + * This is useful for async operations that need to maintain the same logging context. + * + * @return a copy of the current MDC context map, or null if no context is set + */ + public static Map<String, String> copyContextForAsync() { + return MDC.getCopyOfContextMap(); + } + + /** + * Sets the MDC context map from a previously copied context. + * This is useful for async operations that need to restore logging context. + * + * @param contextMap the context map to restore, or null to reset to general context + */ + public static void setContextFromAsync(Map<String, String> contextMap) { + if (contextMap != null) { + if (EXTENSIONS.isEmpty()) { + MDC.setContextMap(contextMap); + } else { + // Restore core context first + Map<String, String> coreContext = filterCoreKeys(contextMap); + MDC.setContextMap(coreContext); + + // Then restore extension-specific keys + restoreExtensionKeys(contextMap); + } + } else { + MDC.clear(); + MDC.put(MDC_PROCESS_INSTANCE_KEY, GENERAL_CONTEXT); + } + } + + /** + * Registers a context extension that will participate in context preservation. + * + * @param prefix the MDC key prefix this extension manages + * @param extension the extension to register + * @throws IllegalArgumentException if prefix or extension is null/empty + */ + public static void registerExtension(String prefix, ContextExtension extension) { Review Comment: To avoid accidental collisions, you could also enforce the prefix final char (dot for example). In this case you're forcing to define "user." as prefix and this will never collide with "username", wdyt? ########## api/kogito-services/src/main/java/org/kie/kogito/services/context/ProcessInstanceContext.java: ########## @@ -0,0 +1,214 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.kie.kogito.services.context; + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.slf4j.MDC; + +/** + * Utility class for managing process instance context in logging operations. + * This class uses SLF4J's Mapped Diagnostic Context (MDC) to ensure process instance IDs + * are automatically included in log messages. + * + * When no process instance is available, an empty string is used as the default value + * to provide cleaner formatting and easier searching in log aggregation systems. + * + * Thread Safety: This class is thread-safe and properly manages context isolation + * between different threads using MDC's inherent ThreadLocal-based storage. + */ +public final class ProcessInstanceContext { + + private static final Logger LOGGER = LoggerFactory.getLogger(ProcessInstanceContext.class); + + private static final Map<String, ContextExtension> EXTENSIONS = new ConcurrentHashMap<>(); + + /** + * MDC key used to store the process instance ID. + * This key should be referenced in logging configurations. + */ + public static final String MDC_PROCESS_INSTANCE_KEY = "processInstanceId"; + + /** + * Default value used when no process instance ID is available. + * Empty string for cleaner log formatting and easier searching. + */ + public static final String GENERAL_CONTEXT = ""; + + // Private constructor to prevent instantiation + private ProcessInstanceContext() { + // Utility class + } + + /** + * Sets the process instance ID for the current thread context. + * This method updates SLF4J MDC and only performs the update if the value changes. + * + * @param processInstanceId the process instance ID to set, or null to use general context + */ + public static void setProcessInstanceId(String processInstanceId) { + String effectiveId = processInstanceId != null ? processInstanceId : GENERAL_CONTEXT; + + // Only update MDC if the value is changing (optimization) + String currentId = MDC.get(MDC_PROCESS_INSTANCE_KEY); + if (!effectiveId.equals(currentId)) { + MDC.put(MDC_PROCESS_INSTANCE_KEY, effectiveId); + } + } + + /** + * Gets the current process instance ID from MDC. + * + * @return the current process instance ID, or empty string if no context is set + */ + public static String getProcessInstanceId() { + String id = MDC.get(MDC_PROCESS_INSTANCE_KEY); + return id != null ? id : GENERAL_CONTEXT; + } + + /** + * Clears the process instance context for the current thread. + * This resets the MDC to the general context (empty string). + */ + public static void clear() { + MDC.put(MDC_PROCESS_INSTANCE_KEY, GENERAL_CONTEXT); + } + + /** + * Checks if a process instance context is currently set. + * + * @return true if a process instance context is set, false otherwise + */ + public static boolean hasContext() { + String id = MDC.get(MDC_PROCESS_INSTANCE_KEY); + return id != null && !GENERAL_CONTEXT.equals(id); + } + + /** + * Gets a copy of the current MDC context map for propagation to other threads. + * This is useful for async operations that need to maintain the same logging context. + * + * @return a copy of the current MDC context map, or null if no context is set + */ + public static Map<String, String> copyContextForAsync() { + return MDC.getCopyOfContextMap(); + } + + /** + * Sets the MDC context map from a previously copied context. Review Comment: I would also clarify that only keys included in the copied context (and restored via extensions) will remain, All other MDC values are discarded. ########## kogito-test-utils/src/main/java/org/kie/kogito/test/utils/JsonProcessInstanceLoggingTestBase.java: ########## @@ -0,0 +1,220 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.kie.kogito.test.utils; + +import java.io.IOException; +import java.nio.file.Path; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.assertj.core.api.Assertions.assertThat; + +/** + * Base class for JSON-based process instance logging tests. + * Extends the existing ProcessInstanceLoggingTestBase to provide JSON-specific functionality + * while maintaining compatibility with existing infrastructure. + */ +public abstract class JsonProcessInstanceLoggingTestBase extends ProcessInstanceLoggingTestBase { + + private static final Logger LOGGER = LoggerFactory.getLogger(JsonProcessInstanceLoggingTestBase.class); + + /** + * Parse all JSON log files (including rotated ones) and combine results. + * + * @return Combined JSON log entries from all files, sorted by timestamp + * @throws IOException if file operations fail + */ + protected List<JsonProcessInstanceLogAnalyzer.JsonLogEntry> parseAllJsonLogFiles() throws IOException { + List<Path> logFiles = getAllLogFiles(); + List<JsonProcessInstanceLogAnalyzer.JsonLogEntry> allEntries = new ArrayList<>(); + + LOGGER.info("Parsing {} JSON log files for comprehensive analysis", logFiles.size()); + + for (Path logFile : logFiles) { + parseJsonLogFile(logFile, allEntries); + } + + // Sort all entries by timestamp to ensure chronological order + allEntries.sort((e1, e2) -> e1.timestamp.compareTo(e2.timestamp)); + + LOGGER.info("Combined {} JSON log entries from {} files", allEntries.size(), logFiles.size()); + + if (allEntries.isEmpty()) { + throw new IOException("No JSON log entries found in any log files: " + logFiles); + } + + // Validate combined results + JsonProcessInstanceLogAnalyzer.validateJsonStructure(allEntries); + + return allEntries; + } + + private static void parseJsonLogFile(Path logFile, List<JsonProcessInstanceLogAnalyzer.JsonLogEntry> allEntries) { + try { + LOGGER.info("Parsing JSON log file: {}", logFile); + List<JsonProcessInstanceLogAnalyzer.JsonLogEntry> entries = + JsonProcessInstanceLogAnalyzer.parseJsonLogFile(logFile); + allEntries.addAll(entries); + LOGGER.debug("Added {} entries from {}", entries.size(), logFile.getFileName()); + } catch (IOException e) { + LOGGER.warn("Failed to parse JSON log file {}: {}", logFile, e.getMessage()); + // Continue with other files rather than failing completely + } + } + + /** + * Calculate and log JSON statistics for debugging. + * + * @param entries JSON log entries to analyze + */ + protected void logJsonStatistics(List<JsonProcessInstanceLogAnalyzer.JsonLogEntry> entries) { + JsonProcessInstanceLogAnalyzer.JsonLogStatistics stats = + JsonProcessInstanceLogAnalyzer.calculateJsonStatistics(entries); + LOGGER.info("JSON log analysis: {}", stats); + } + + /** + * Validate that each process instance has distinct JSON log entries using all available log files. + * + * @param processInstanceIds Process instance IDs to validate + * @throws IOException if file operations fail + */ + protected void validateProcessInstanceJsonLogsRobust(String... processInstanceIds) throws IOException { + List<JsonProcessInstanceLogAnalyzer.JsonLogEntry> entries = parseAllJsonLogFilesWithRetry(); + + Map<String, List<JsonProcessInstanceLogAnalyzer.JsonLogEntry>> entriesByProcess = + JsonProcessInstanceLogAnalyzer.groupByProcessInstance(entries); + + JsonProcessInstanceLogAnalyzer.validateProcessInstanceIsolation( + entriesByProcess, Set.of(processInstanceIds)); + + logJsonStatistics(entries); + + // Verify that all process instances generated logs with their respective IDs + for (String processInstanceId : processInstanceIds) { + List<JsonProcessInstanceLogAnalyzer.JsonLogEntry> processLogs = entriesByProcess.get(processInstanceId); + assertThat(processLogs) + .as("Process instance " + processInstanceId + " should have dedicated JSON log entries") + .isNotEmpty(); + + // Validate MDC fields + validateJsonMdcFields(processLogs, processInstanceId); + } + + LOGGER.info("Process instance JSON logging validation completed for {} process instances (rotation-aware)", processInstanceIds.length); + } + + /** + * Validate MDC fields in JSON log entries for a specific process instance. + */ + private void validateJsonMdcFields(List<JsonProcessInstanceLogAnalyzer.JsonLogEntry> processLogs, String processInstanceId) { + for (JsonProcessInstanceLogAnalyzer.JsonLogEntry entry : processLogs) { + assertThat(entry.mdc) + .as("JSON log entry should have MDC fields") + .isNotNull(); + + assertThat(entry.getProcessInstanceId()) + .as("JSON log entry should have processInstanceId in MDC") + .isEqualTo(processInstanceId); + } + } + + /** + * Parse all JSON log files with retry logic and rotation handling for CI environments. + * + * @return Parsed JSON log entries from all available log files + * @throws IOException if file operations fail after retries + */ + protected List<JsonProcessInstanceLogAnalyzer.JsonLogEntry> parseAllJsonLogFilesWithRetry() throws IOException { + int maxRetries = isRunningInCI() ? 3 : 1; + IOException lastException = null; + + for (int retry = 0; retry < maxRetries; retry++) { + try { + List<JsonProcessInstanceLogAnalyzer.JsonLogEntry> entries = parseAllJsonLogFiles(retry, maxRetries); + if (entries != null) + return entries; + + } catch (IOException e) { + lastException = e; + LOGGER.warn("Failed to parse JSON log files (attempt {}/{}): {}", retry + 1, maxRetries, e.getMessage()); + if (retry < maxRetries - 1) { + try { + Thread.sleep(2000); Review Comment: Can you use Awaitility instead of Thread.sleep? ########## kogito-test-utils/src/main/java/org/kie/kogito/test/utils/JsonProcessInstanceLoggingTestBase.java: ########## @@ -0,0 +1,220 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.kie.kogito.test.utils; + +import java.io.IOException; +import java.nio.file.Path; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.assertj.core.api.Assertions.assertThat; + +/** + * Base class for JSON-based process instance logging tests. + * Extends the existing ProcessInstanceLoggingTestBase to provide JSON-specific functionality + * while maintaining compatibility with existing infrastructure. + */ +public abstract class JsonProcessInstanceLoggingTestBase extends ProcessInstanceLoggingTestBase { + + private static final Logger LOGGER = LoggerFactory.getLogger(JsonProcessInstanceLoggingTestBase.class); + + /** + * Parse all JSON log files (including rotated ones) and combine results. + * + * @return Combined JSON log entries from all files, sorted by timestamp + * @throws IOException if file operations fail + */ + protected List<JsonProcessInstanceLogAnalyzer.JsonLogEntry> parseAllJsonLogFiles() throws IOException { + List<Path> logFiles = getAllLogFiles(); + List<JsonProcessInstanceLogAnalyzer.JsonLogEntry> allEntries = new ArrayList<>(); + + LOGGER.info("Parsing {} JSON log files for comprehensive analysis", logFiles.size()); + + for (Path logFile : logFiles) { + parseJsonLogFile(logFile, allEntries); + } + + // Sort all entries by timestamp to ensure chronological order + allEntries.sort((e1, e2) -> e1.timestamp.compareTo(e2.timestamp)); + + LOGGER.info("Combined {} JSON log entries from {} files", allEntries.size(), logFiles.size()); + + if (allEntries.isEmpty()) { + throw new IOException("No JSON log entries found in any log files: " + logFiles); + } + + // Validate combined results + JsonProcessInstanceLogAnalyzer.validateJsonStructure(allEntries); + + return allEntries; + } + + private static void parseJsonLogFile(Path logFile, List<JsonProcessInstanceLogAnalyzer.JsonLogEntry> allEntries) { + try { + LOGGER.info("Parsing JSON log file: {}", logFile); + List<JsonProcessInstanceLogAnalyzer.JsonLogEntry> entries = + JsonProcessInstanceLogAnalyzer.parseJsonLogFile(logFile); + allEntries.addAll(entries); + LOGGER.debug("Added {} entries from {}", entries.size(), logFile.getFileName()); + } catch (IOException e) { + LOGGER.warn("Failed to parse JSON log file {}: {}", logFile, e.getMessage()); + // Continue with other files rather than failing completely + } + } + + /** + * Calculate and log JSON statistics for debugging. + * + * @param entries JSON log entries to analyze + */ + protected void logJsonStatistics(List<JsonProcessInstanceLogAnalyzer.JsonLogEntry> entries) { + JsonProcessInstanceLogAnalyzer.JsonLogStatistics stats = + JsonProcessInstanceLogAnalyzer.calculateJsonStatistics(entries); + LOGGER.info("JSON log analysis: {}", stats); + } + + /** + * Validate that each process instance has distinct JSON log entries using all available log files. + * + * @param processInstanceIds Process instance IDs to validate + * @throws IOException if file operations fail + */ + protected void validateProcessInstanceJsonLogsRobust(String... processInstanceIds) throws IOException { + List<JsonProcessInstanceLogAnalyzer.JsonLogEntry> entries = parseAllJsonLogFilesWithRetry(); + + Map<String, List<JsonProcessInstanceLogAnalyzer.JsonLogEntry>> entriesByProcess = + JsonProcessInstanceLogAnalyzer.groupByProcessInstance(entries); + + JsonProcessInstanceLogAnalyzer.validateProcessInstanceIsolation( + entriesByProcess, Set.of(processInstanceIds)); + + logJsonStatistics(entries); + + // Verify that all process instances generated logs with their respective IDs + for (String processInstanceId : processInstanceIds) { + List<JsonProcessInstanceLogAnalyzer.JsonLogEntry> processLogs = entriesByProcess.get(processInstanceId); + assertThat(processLogs) + .as("Process instance " + processInstanceId + " should have dedicated JSON log entries") + .isNotEmpty(); + + // Validate MDC fields + validateJsonMdcFields(processLogs, processInstanceId); + } + + LOGGER.info("Process instance JSON logging validation completed for {} process instances (rotation-aware)", processInstanceIds.length); + } + + /** + * Validate MDC fields in JSON log entries for a specific process instance. + */ + private void validateJsonMdcFields(List<JsonProcessInstanceLogAnalyzer.JsonLogEntry> processLogs, String processInstanceId) { + for (JsonProcessInstanceLogAnalyzer.JsonLogEntry entry : processLogs) { + assertThat(entry.mdc) + .as("JSON log entry should have MDC fields") + .isNotNull(); + + assertThat(entry.getProcessInstanceId()) + .as("JSON log entry should have processInstanceId in MDC") + .isEqualTo(processInstanceId); + } + } + + /** + * Parse all JSON log files with retry logic and rotation handling for CI environments. + * + * @return Parsed JSON log entries from all available log files + * @throws IOException if file operations fail after retries + */ + protected List<JsonProcessInstanceLogAnalyzer.JsonLogEntry> parseAllJsonLogFilesWithRetry() throws IOException { + int maxRetries = isRunningInCI() ? 3 : 1; + IOException lastException = null; + + for (int retry = 0; retry < maxRetries; retry++) { + try { + List<JsonProcessInstanceLogAnalyzer.JsonLogEntry> entries = parseAllJsonLogFiles(retry, maxRetries); + if (entries != null) + return entries; + + } catch (IOException e) { + lastException = e; + LOGGER.warn("Failed to parse JSON log files (attempt {}/{}): {}", retry + 1, maxRetries, e.getMessage()); + if (retry < maxRetries - 1) { + try { + Thread.sleep(2000); + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + throw new IOException("Interrupted during retry", ie); + } + } + } + } + + throw new IOException("Failed to parse JSON log files after " + maxRetries + " attempts", lastException); + } + + private List<JsonProcessInstanceLogAnalyzer.JsonLogEntry> parseAllJsonLogFiles(int retry, int maxRetries) throws IOException { + List<JsonProcessInstanceLogAnalyzer.JsonLogEntry> entries = parseAllJsonLogFiles(); + + if (entries.isEmpty() && retry < maxRetries - 1) { + LOGGER.warn("No JSON log entries found in any files, retrying... (attempt {}/{})", retry + 1, maxRetries); + try { + Thread.sleep(2000); Review Comment: Same here ########## quarkus/extensions/kogito-quarkus-serverless-workflow-jdbc-token-persistence-extension/kogito-quarkus-serverless-workflow-jdbc-token-persistence-integration-test/src/test/java/org/kie/kogito/quarkus/token/persistence/workflows/TokenExchangeIT.java: ########## @@ -116,37 +119,58 @@ private void validateCachingBehavior() { } /** - * Get the path to the Quarkus log file + * Validate OAuth2 token exchange and caching behavior from JSON log files */ - private Path getQuarkusLogFile() { - // The log file path is configured in application.properties as quarkus.log.file.path - // For integration tests, Quarkus uses target/quarkus.log - String logPath = System.getProperty("quarkus.log.file.path", "target/quarkus.log"); - return Paths.get(logPath); - } - - /** - * Validate OAuth2 token exchange and caching behavior from log file - */ - private void validateOAuth2LogsFromFile(Path logFile) throws IOException { - List<String> logLines = Files.readAllLines(logFile); - LOGGER.info("Analyzing {} log lines from {} for OAuth2 token exchange patterns", logLines.size(), logFile); - - Assertions.assertThat(logLines).hasSizeGreaterThan(0); - - List<String> usedJDBCRepository = logLines.stream().filter(line -> line.contains(LOG_PREFIX_USED_REPOSITORY + ": JdbcTokenCacheRepository")).toList(); - List<String> usedInMemoryRepository = logLines.stream().filter(line -> line.contains(LOG_PREFIX_USED_REPOSITORY + ": InMemoryTokenCacheRepository")).toList(); + private void validateOAuth2LogsFromFile(String processInstanceId) throws IOException { + // Parse all JSON log files to handle rotation + List<JsonProcessInstanceLogAnalyzer.JsonLogEntry> allEntries = parseAllJsonLogFilesWithRetry(); + + Assertions.assertThat(allEntries).hasSizeGreaterThan(0); + + LOGGER.info("Analyzing {} JSON log entries for OAuth2 token exchange patterns for process instance {}", + allEntries.size(), processInstanceId); + + // Filter logs to only include those related to this process instance or general context + // This prevents interference from concurrent tests + List<JsonProcessInstanceLogAnalyzer.JsonLogEntry> processSpecificEntries = allEntries.stream() + .filter(entry -> processInstanceId.equals(entry.getProcessInstanceId()) || + entry.getProcessInstanceId() == null || + entry.getProcessInstanceId().isEmpty()) + .toList(); + + LOGGER.info("Found {} JSON log entries specific to process instance {} or general context", + processSpecificEntries.size(), processInstanceId); + + // Check repository initialization logs in all entries (not just process-specific) + // Repository initialization might be logged before test starts, so we check but don't fail + List<JsonProcessInstanceLogAnalyzer.JsonLogEntry> usedJDBCRepository = allEntries.stream() + .filter(entry -> entry.message.contains(LOG_PREFIX_USED_REPOSITORY + ": JdbcTokenCacheRepository")) + .toList(); + List<JsonProcessInstanceLogAnalyzer.JsonLogEntry> usedInMemoryRepository = allEntries.stream() + .filter(entry -> entry.message.contains(LOG_PREFIX_USED_REPOSITORY + ": InMemoryTokenCacheRepository")) + .toList(); Assertions.assertThat(usedJDBCRepository).hasSize(1); Assertions.assertThat(usedInMemoryRepository).hasSize(0); - LOGGER.info("JDBC repository was used as expected"); - - List<String> startTokenExchangeLogLines = logLines.stream().filter(line -> line.contains(LOG_PREFIX_STARTING_TOKEN_EXCHANGE)).toList(); - List<String> completedTokenExchangeLogLines = logLines.stream().filter(line -> line.contains(LOG_PREFIX_COMPLETED_TOKEN_EXCHANGE)).toList(); - List<String> failedTokenExchangeLogLines = logLines.stream().filter(line -> line.contains(LOG_PREFIX_FAILED_TOKEN_EXCHANGE)).toList(); - List<String> refreshTokenExchangeLogLines = logLines.stream().filter(line -> line.contains(LOG_PREFIX_TOKEN_REFRESH)).toList(); - List<String> completedRefreshTokenExchangeLogLines = logLines.stream().filter(line -> line.contains(LOG_PREFIX_REFRESH_COMPLETED)).toList(); - List<String> failedRefreshTokenExchangeLogLines = logLines.stream().filter(line -> line.contains(LOG_PREFIX_FAILED_TO_REFRESH_TOKEN)).toList(); + // Filter token exchange logs to only those related to this process instance + List<JsonProcessInstanceLogAnalyzer.JsonLogEntry> startTokenExchangeLogLines = processSpecificEntries.stream() + .filter(entry -> entry.message.contains(LOG_PREFIX_STARTING_TOKEN_EXCHANGE)) + .toList(); + List<JsonProcessInstanceLogAnalyzer.JsonLogEntry> completedTokenExchangeLogLines = processSpecificEntries.stream() + .filter(entry -> entry.message.contains(LOG_PREFIX_COMPLETED_TOKEN_EXCHANGE)) + .toList(); + List<JsonProcessInstanceLogAnalyzer.JsonLogEntry> failedTokenExchangeLogLines = processSpecificEntries.stream() + .filter(entry -> entry.message.contains(LOG_PREFIX_FAILED_TOKEN_EXCHANGE)) + .toList(); + List<JsonProcessInstanceLogAnalyzer.JsonLogEntry> refreshTokenExchangeLogLines = processSpecificEntries.stream() + .filter(entry -> entry.message.contains(LOG_PREFIX_TOKEN_REFRESH) && processInstanceId.equals(entry.getProcessInstanceId())) + .toList(); + List<JsonProcessInstanceLogAnalyzer.JsonLogEntry> completedRefreshTokenExchangeLogLines = processSpecificEntries.stream() + .filter(entry -> entry.message.contains(LOG_PREFIX_REFRESH_COMPLETED) && processInstanceId.equals(entry.getProcessInstanceId())) + .toList(); + List<JsonProcessInstanceLogAnalyzer.JsonLogEntry> failedRefreshTokenExchangeLogLines = processSpecificEntries.stream() + .filter(entry -> entry.message.contains(LOG_PREFIX_FAILED_TO_REFRESH_TOKEN) && processInstanceId.equals(entry.getProcessInstanceId())) + .toList(); Review Comment: Each stream().filter call iterates over all the entries. This is repeated 6 times. Why don't you use only a single pass over the list and classify entries to use it later? We could have a map with an entry for each status ("start", "completed", ..) and then iterate just once, , adding each log entry to the appropriate list in the map. This reduces complexity, makes the code more efficient for large logs, with easier maintenance, wdyt? ########## kogito-test-utils/src/main/java/org/kie/kogito/test/utils/ProcessInstanceLoggingTestBase.java: ########## @@ -0,0 +1,244 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.kie.kogito.test.utils; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.time.Duration; +import java.util.ArrayList; +import java.util.List; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import io.restassured.http.ContentType; + +import static io.restassured.RestAssured.given; +import static org.awaitility.Awaitility.await; + +/** + * Base class for process instance logging tests providing common functionality. + * This class eliminates code duplication between different logging test implementations. + */ +public abstract class ProcessInstanceLoggingTestBase { + + private static final Logger LOGGER = LoggerFactory.getLogger(ProcessInstanceLoggingTestBase.class); + + /** + * Get the configured log file path. + * + * @return Path to the Quarkus log file + */ + protected Path getLogFilePath() { + String logPath = System.getProperty("quarkus.log.file.path", "target/quarkus.log"); + return Paths.get(logPath); + } + + /** + * Get all log files including rotated ones in chronological order (newest first). + * Handles Quarkus default rotation: quarkus.log, quarkus.log.1, quarkus.log.2, etc. + * + * @return List of log file paths sorted by modification time (newest first) + * @throws IOException if directory access fails + */ + protected List<Path> getAllLogFiles() throws IOException { + Path logFile = getLogFilePath(); + Path logDir = logFile.getParent(); + String baseName = logFile.getFileName().toString(); + + List<Path> logFiles = new ArrayList<>(); + + // Add main log file if it exists + if (Files.exists(logFile)) { + logFiles.add(logFile); + } + + // Look for rotated files: baseName.1, baseName.2, etc. + for (int i = 1; i <= 10; i++) { // Check up to 10 rotated files (more than default max-backup-index=5) Review Comment: Can this be defined as a constant? ########## quarkus/extensions/kogito-quarkus-serverless-workflow-otel-extension/kogito-quarkus-serverless-workflow-otel-deployment/src/test/java/org/kie/kogito/quarkus/serverless/workflow/otel/deployment/SonataFlowOtelExtensionStructureTest.java: ########## @@ -16,21 +16,18 @@ * specific language governing permissions and limitations * under the License. */ -package org.kie.kogito.addons.jwt.deployment; +package org.kie.kogito.quarkus.serverless.workflow.otel.deployment; -import io.quarkus.deployment.annotations.BuildStep; -import io.quarkus.deployment.builditem.FeatureBuildItem; +import org.junit.jupiter.api.Test; -/** - * Quarkus deployment processor for JWT parser functionality - * The JWT parser work item handlers are discovered via ServiceLoader and don't require CDI bean registration - */ -public class JwtParserProcessor { +import static org.junit.jupiter.api.Assertions.assertNotNull; - private static final String FEATURE = "sonataflow-addons-jwt-parser"; +public class SonataFlowOtelExtensionStructureTest { - @BuildStep - FeatureBuildItem feature() { - return new FeatureBuildItem(FEATURE); + @Test + public void shouldLoadProcessorClass() { + SonataFlowOtelProcessor processor = new SonataFlowOtelProcessor(); + assertNotNull(processor); Review Comment: It can be also use assertAll for reporting multiple assertions together, but up to you ########## kogito-test-utils/src/main/java/org/kie/kogito/test/utils/JsonProcessInstanceLogAnalyzer.java: ########## @@ -0,0 +1,559 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.kie.kogito.test.utils; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.time.LocalDateTime; +import java.time.format.DateTimeFormatter; +import java.time.format.DateTimeParseException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.fail; + +/** + * Utility class for analyzing process instance aware logging in JSON format. + * Supports parsing JSON log format with MDC fields including processInstanceId. + * This class replaces pipe-delimited format parsing for machine-consumable JSON logs. + */ +public class JsonProcessInstanceLogAnalyzer { + + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + + // Common timestamp patterns in JSON logs + private static final DateTimeFormatter[] TIMESTAMP_FORMATTERS = { + DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'"), + DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSSX"), + DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSS"), + DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS"), + DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss,SSS"), + DateTimeFormatter.ISO_LOCAL_DATE_TIME + }; + + /** + * Represents a single JSON log entry with all its components. + */ + public static class JsonLogEntry { + public final LocalDateTime timestamp; + public final String level; + public final String loggerName; + public final String message; + public final Map<String, String> mdc; + public final String threadName; + public final String sequenceNumber; + public final JsonNode rawJson; + + public JsonLogEntry(LocalDateTime timestamp, String level, String loggerName, + String message, Map<String, String> mdc, String threadName, + String sequenceNumber, JsonNode rawJson) { + this.timestamp = timestamp; + this.level = level != null ? level : "INFO"; + this.loggerName = loggerName != null ? loggerName : "unknown.logger"; + this.message = message != null ? message : ""; + this.mdc = mdc != null ? new HashMap<>(mdc) : new HashMap<>(); + this.threadName = threadName; + this.sequenceNumber = sequenceNumber; + this.rawJson = rawJson; + } + + /** + * Get the process instance ID from MDC. + */ + public String getProcessInstanceId() { + return mdc.get("processInstanceId"); + } + + /** + * Check if this log entry has a process instance ID. + */ + public boolean hasProcessInstance() { + String processInstanceId = getProcessInstanceId(); + return processInstanceId != null && !processInstanceId.trim().isEmpty(); + } + + /** + * Check if this log entry is general context (no process instance ID). + */ + public boolean isGeneralContext() { + return !hasProcessInstance(); + } + + /** + * Get trace ID from MDC if available. + */ + public String getTraceId() { + return mdc.get("traceId"); + } + + /** + * Get span ID from MDC if available. + */ + public String getSpanId() { + return mdc.get("spanId"); + } + + @Override + public String toString() { + return String.format("JsonLogEntry{timestamp=%s, level=%s, processInstanceId=%s, logger=%s, message=%s}", + timestamp, level, getProcessInstanceId(), loggerName, message); + } + } + + /** + * Statistics about JSON log entries for analysis. + */ + public static class JsonLogStatistics { + public final long totalLogs; + public final long processSpecificLogs; + public final long generalContextLogs; + public final Map<String, Long> logsByProcessInstance; + public final Map<String, Long> logsByLevel; + public final Map<String, Long> logsByLogger; + public final long logsWithTracing; + + public JsonLogStatistics(List<JsonLogEntry> entries) { + this.totalLogs = entries.size(); + this.processSpecificLogs = entries.stream().filter(JsonLogEntry::hasProcessInstance).count(); + this.generalContextLogs = entries.stream().filter(JsonLogEntry::isGeneralContext).count(); + this.logsByProcessInstance = entries.stream() + .collect(Collectors.groupingBy( + entry -> entry.hasProcessInstance() ? entry.getProcessInstanceId() : "", + Collectors.counting())); + this.logsByLevel = entries.stream() + .collect(Collectors.groupingBy(entry -> entry.level, Collectors.counting())); + this.logsByLogger = entries.stream() + .collect(Collectors.groupingBy(entry -> entry.loggerName, Collectors.counting())); + this.logsWithTracing = entries.stream() + .filter(entry -> entry.getTraceId() != null) + .count(); + } + + @Override + public String toString() { + return String.format( + "JsonLogStatistics{total=%d, processSpecific=%d, general=%d, byProcess=%s, byLevel=%s, withTracing=%d}", + totalLogs, processSpecificLogs, generalContextLogs, logsByProcessInstance, logsByLevel, logsWithTracing); + } + } + + /** + * Parse JSON log file with multiline support and resilient error handling. + */ + public static List<JsonLogEntry> parseJsonLogFile(Path logFile) throws IOException { + List<String> lines = Files.readAllLines(logFile); + List<JsonLogEntry> entries = new ArrayList<>(); + AtomicInteger malformedLineCount = new AtomicInteger(0); + AtomicInteger lineNumber = new AtomicInteger(0); + + for (String line : lines) { + lineNumber.incrementAndGet(); + + if (line.trim().isEmpty()) { + continue; // Skip empty lines + } + + try { + JsonLogEntry entry = parseJsonLogLine(line, malformedLineCount, lineNumber.get()); + if (entry != null) { + entries.add(entry); + } + } catch (Exception e) { + malformedLineCount.incrementAndGet(); + System.err.printf("Warning: Failed to parse JSON log line %d: %s - Error: %s%n", + lineNumber.get(), line.substring(0, Math.min(100, line.length())), e.getMessage()); + } + } + + // Log statistics about parsing + if (malformedLineCount.get() > 0) { + System.err.printf("Warning: Encountered %d malformed/problematic lines out of %d total lines while parsing %s%n", Review Comment: Why don't you define a Logger instead of using the standard error stream `System.err`? There are more occurrences of this in the class. ########## quarkus/extensions/kogito-quarkus-serverless-workflow-otel-extension/kogito-quarkus-serverless-workflow-otel/src/main/java/org/kie/kogito/quarkus/serverless/workflow/otel/NodeSpanManager.java: ########## @@ -0,0 +1,235 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.kie.kogito.quarkus.serverless.workflow.otel; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +import org.kie.kogito.quarkus.serverless.workflow.otel.config.SonataFlowOtelConfig; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import io.opentelemetry.api.common.Attributes; +import io.opentelemetry.api.trace.Span; +import io.opentelemetry.api.trace.SpanKind; +import io.opentelemetry.api.trace.StatusCode; +import io.opentelemetry.api.trace.Tracer; +import io.opentelemetry.context.Context; +import io.opentelemetry.context.Scope; + +import jakarta.enterprise.context.ApplicationScoped; +import jakarta.inject.Inject; + +import static org.kie.kogito.quarkus.serverless.workflow.otel.SonataFlowOtelAttributes.EVENT_DESCRIPTION; +import static org.kie.kogito.quarkus.serverless.workflow.otel.SonataFlowOtelAttributes.RequestProperties; +import static org.kie.kogito.quarkus.serverless.workflow.otel.SonataFlowOtelAttributes.SERVICE_NAME; +import static org.kie.kogito.quarkus.serverless.workflow.otel.SonataFlowOtelAttributes.SERVICE_VERSION; +import static org.kie.kogito.quarkus.serverless.workflow.otel.SonataFlowOtelAttributes.SONATAFLOW_PROCESS_ID; +import static org.kie.kogito.quarkus.serverless.workflow.otel.SonataFlowOtelAttributes.SONATAFLOW_PROCESS_INSTANCE_ID; +import static org.kie.kogito.quarkus.serverless.workflow.otel.SonataFlowOtelAttributes.SONATAFLOW_PROCESS_INSTANCE_NODE; +import static org.kie.kogito.quarkus.serverless.workflow.otel.SonataFlowOtelAttributes.SONATAFLOW_PROCESS_INSTANCE_STATE; +import static org.kie.kogito.quarkus.serverless.workflow.otel.SonataFlowOtelAttributes.SONATAFLOW_PROCESS_VERSION; +import static org.kie.kogito.quarkus.serverless.workflow.otel.SonataFlowOtelAttributes.SONATAFLOW_TRANSACTION_ID; +import static org.kie.kogito.quarkus.serverless.workflow.otel.SonataFlowOtelAttributes.SpanNames; +import static org.kie.kogito.quarkus.serverless.workflow.otel.SonataFlowOtelAttributes.TrackerAttributes; + +@ApplicationScoped +public class NodeSpanManager { + + private static final Logger LOGGER = LoggerFactory.getLogger(NodeSpanManager.class); + private final Tracer tracer; + private final SonataFlowOtelConfig config; + private final Map<String, Span> activeNodeSpans = new ConcurrentHashMap<>(); + private final Map<String, Scope> activeNodeScopes = new ConcurrentHashMap<>(); + private final Map<String, Span> lastActiveNodeSpan = new ConcurrentHashMap<>(); + + @Inject + public NodeSpanManager(Tracer tracer, SonataFlowOtelConfig config) { + this.tracer = tracer; + this.config = config; + } + + public Span createNodeSpan(String processInstanceId, String processId, String processVersion, String processState, String nodeId) { + if (!config.enabled() || !config.spans().enabled()) { + return null; + } + + String spanName = SpanNames.createProcessSpanName(processId); + + Span span = tracer.spanBuilder(spanName) + .setParent(Context.current()) // Use current context for trace propagation + .setSpanKind(SpanKind.INTERNAL) + .setAttribute(SONATAFLOW_PROCESS_INSTANCE_ID, processInstanceId) + .setAttribute(SONATAFLOW_PROCESS_ID, processId) + .setAttribute(SONATAFLOW_PROCESS_VERSION, processVersion) + .setAttribute(SONATAFLOW_PROCESS_INSTANCE_STATE, processState) + .setAttribute(SERVICE_NAME, config.serviceName()) + .setAttribute(SERVICE_VERSION, config.serviceVersion()) + .setAttribute(SONATAFLOW_PROCESS_INSTANCE_NODE, nodeId) + .startSpan(); + + String spanKey = processInstanceId + ":" + nodeId; + Scope scope = span.makeCurrent(); + activeNodeSpans.put(spanKey, span); + activeNodeScopes.put(spanKey, scope); Review Comment: There's a possible leak here: `scope` is stored in `activeNodeScopes` to be closed later. If anything goes wrong before `completeNodeSpan` or `endRemainingSpans` are called, then that scope remains open, and it's not closed ever. Have you consider it? ########## kogito-test-utils/src/main/java/org/kie/kogito/test/utils/ProcessInstanceLoggingTestBase.java: ########## @@ -0,0 +1,244 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.kie.kogito.test.utils; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.time.Duration; +import java.util.ArrayList; +import java.util.List; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import io.restassured.http.ContentType; + +import static io.restassured.RestAssured.given; +import static org.awaitility.Awaitility.await; + +/** + * Base class for process instance logging tests providing common functionality. + * This class eliminates code duplication between different logging test implementations. + */ +public abstract class ProcessInstanceLoggingTestBase { + + private static final Logger LOGGER = LoggerFactory.getLogger(ProcessInstanceLoggingTestBase.class); + + /** + * Get the configured log file path. + * + * @return Path to the Quarkus log file + */ + protected Path getLogFilePath() { + String logPath = System.getProperty("quarkus.log.file.path", "target/quarkus.log"); + return Paths.get(logPath); + } + + /** + * Get all log files including rotated ones in chronological order (newest first). + * Handles Quarkus default rotation: quarkus.log, quarkus.log.1, quarkus.log.2, etc. + * + * @return List of log file paths sorted by modification time (newest first) + * @throws IOException if directory access fails + */ + protected List<Path> getAllLogFiles() throws IOException { + Path logFile = getLogFilePath(); + Path logDir = logFile.getParent(); + String baseName = logFile.getFileName().toString(); + + List<Path> logFiles = new ArrayList<>(); + + // Add main log file if it exists + if (Files.exists(logFile)) { + logFiles.add(logFile); + } + + // Look for rotated files: baseName.1, baseName.2, etc. + for (int i = 1; i <= 10; i++) { // Check up to 10 rotated files (more than default max-backup-index=5) + Path rotatedFile = logDir.resolve(baseName + "." + i); + if (Files.exists(rotatedFile)) { + logFiles.add(rotatedFile); + } + } + + // Sort by last modified time (newest first) to ensure we read logs in chronological order + logFiles.sort((p1, p2) -> { + try { + return Files.getLastModifiedTime(p2).compareTo(Files.getLastModifiedTime(p1)); + } catch (IOException e) { + return 0; // Keep original order if we can't compare + } + }); + + LOGGER.debug("Found {} log files: {}", logFiles.size(), logFiles); + return logFiles; + } + + /** + * Clear a specific log file to start with a clean slate. + * + * Warning: If your test uses getAllLogFiles() or parseAllLogFiles() methods later, + * consider using clearAllLogFiles() instead to avoid inconsistent results from + * rotated log files. + * + * @param logFile Path to the log file + * @throws IOException if file operations fail + */ + protected void clearLogFile(Path logFile) throws IOException { + if (Files.exists(logFile)) { + Files.write(logFile, new byte[0]); + } + } + + /** + * Clear all log files including rotated ones to start with a clean slate. + * This ensures we don't have stale logs from previous test runs affecting current test. + * + * This is the recommended method for test setup when using getAllLogFiles() or + * parseAllLogFiles() methods to ensure consistent results. + * + * @throws IOException if file operations fail + */ + protected void clearAllLogFiles() throws IOException { + try { + List<Path> logFiles = getAllLogFiles(); + for (Path logFile : logFiles) { + if (Files.exists(logFile)) { + Files.write(logFile, new byte[0]); + LOGGER.debug("Cleared log file: {}", logFile); + } + } + LOGGER.info("Cleared {} log files", logFiles.size()); + } catch (IOException e) { + LOGGER.warn("Failed to clear some log files: {}", e.getMessage()); + // Don't fail the test, just warn + } + } + + /** + * Wait for logs to be flushed to disk. + * + * @throws InterruptedException if thread is interrupted + */ + protected void waitForLogFlush() throws InterruptedException { + // Increase default wait time for CI environments + long waitTime = isRunningInCI() ? 3000 : 1000; + Thread.sleep(waitTime); + } + + /** + * Wait for logs to be flushed to disk with custom duration. + * + * @param millis milliseconds to wait + * @throws InterruptedException if thread is interrupted + */ + protected void waitForLogFlush(long millis) throws InterruptedException { + // In CI environments, use at least the provided time or 2 seconds, whichever is higher + long actualWaitTime = isRunningInCI() ? Math.max(millis, 2000) : millis; + Thread.sleep(actualWaitTime); + } + + /** + * Detect if running in CI environment based on common CI environment variables. + * + * @return true if running in CI, false otherwise + */ + protected boolean isRunningInCI() { + return System.getenv("CI") != null || + System.getenv("JENKINS_URL") != null || + System.getenv("GITHUB_ACTIONS") != null || + System.getenv("TRAVIS") != null || + System.getenv("CIRCLECI") != null || + System.getProperty("ci.environment") != null; + } + + /** + * Wait for workflow completion using polling. + * + * @param workflowPath REST path for the workflow + * @param processInstanceId Process instance ID + * @param timeout Maximum time to wait + */ + protected void waitForWorkflowCompletion(String workflowPath, String processInstanceId, Duration timeout) { Review Comment: This could also return boolean to indicate whether workflow completed or timed out. It would be useful for conditional assertions in tests, wdyt? ########## quarkus/extensions/kogito-quarkus-serverless-workflow-otel-extension/kogito-quarkus-serverless-workflow-otel/src/main/java/org/kie/kogito/quarkus/serverless/workflow/otel/NodeOtelEventListener.java: ########## @@ -0,0 +1,310 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.kie.kogito.quarkus.serverless.workflow.otel; + +import java.util.Date; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; + +import org.kie.api.event.process.ProcessCompletedEvent; +import org.kie.api.event.process.ProcessNodeLeftEvent; +import org.kie.api.event.process.ProcessNodeTriggeredEvent; +import org.kie.api.event.process.ProcessStartedEvent; +import org.kie.kogito.internal.process.event.DefaultKogitoProcessEventListener; +import org.kie.kogito.internal.process.runtime.KogitoNodeInstance; +import org.kie.kogito.internal.process.runtime.KogitoProcessInstance; +import org.kie.kogito.process.ProcessInstance; +import org.kie.kogito.quarkus.serverless.workflow.otel.config.SonataFlowOtelConfig; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import io.opentelemetry.api.trace.Span; + +import static org.kie.kogito.quarkus.serverless.workflow.otel.SonataFlowOtelAttributes.EventDescriptions; +import static org.kie.kogito.quarkus.serverless.workflow.otel.SonataFlowOtelAttributes.Events; +import static org.kie.kogito.quarkus.serverless.workflow.otel.SonataFlowOtelAttributes.NodePatterns; +import static org.kie.kogito.quarkus.serverless.workflow.otel.SonataFlowOtelAttributes.ProcessStates; + +public class NodeOtelEventListener extends DefaultKogitoProcessEventListener { + + record NodeProcessDetails(String processInstanceId, String processId, String processVersion, String processState, String nodeId) { + } + + private static final Logger LOGGER = LoggerFactory.getLogger(NodeOtelEventListener.class); + + private final NodeSpanManager spanManager; + private final SonataFlowOtelConfig config; + private final ProcessEventHandler processEventHandler; + private final HeaderContextExtractor headerExtractor; + private final Set<String> processedErrors = ConcurrentHashMap.newKeySet(); + + public NodeOtelEventListener(NodeSpanManager spanManager, SonataFlowOtelConfig config, HeaderContextExtractor headerExtractor) { + this.spanManager = spanManager; + this.config = config; + this.headerExtractor = headerExtractor; + this.processEventHandler = new ProcessEventHandler(spanManager, config); + LOGGER.debug("NodeOtelEventListener initialized"); + } + + @Override + public void afterProcessStarted(ProcessStartedEvent event) { + LOGGER.debug("Process started event received - start event will be added in beforeNodeTriggered"); + } + + @Override + public void beforeNodeTriggered(ProcessNodeTriggeredEvent event) { + try { + NodeProcessDetails details = extractNodeAndProcessDetails(event); + + if (isInternalNode(details.nodeId())) { + LOGGER.trace("Skipping internal node: {}", details.nodeId()); + return; + } + + LOGGER.debug("Node triggered: {} for process instance {} (process: {})", + details.nodeId(), details.processInstanceId(), details.processId()); + + Map<String, String> extractedContext = OtelContextHolder.getExtractedContext(); + if (extractedContext.isEmpty()) { + extractedContext = handleContextReestablishment( + (KogitoProcessInstance) event.getProcessInstance(), details.processInstanceId()); + } + + createAndConfigureNodeSpan(details.processInstanceId(), details.processId(), + details.processVersion(), details.processState(), details.nodeId(), extractedContext); + + } catch (Exception e) { + LOGGER.error("Error in beforeNodeTriggered: {}", e.getMessage(), e); + } + } + + private NodeProcessDetails extractNodeAndProcessDetails(ProcessNodeTriggeredEvent event) { + KogitoNodeInstance nodeInstance = (KogitoNodeInstance) event.getNodeInstance(); + KogitoProcessInstance processInstance = (KogitoProcessInstance) event.getProcessInstance(); + + String processInstanceId = processInstance.getId(); + String processId = processInstance.getProcessId(); + String processVersion = processInstance.getProcessVersion(); + String processState = getProcessState(processInstance.getState()); + String nodeId = nodeInstance.getNodeName(); + + return new NodeProcessDetails(processInstanceId, processId, processVersion, processState, nodeId); + } + + private Map<String, String> handleContextReestablishment(KogitoProcessInstance processInstance, String processInstanceId) { + try { + Map<String, java.util.List<String>> processHeaders = processInstance.getHeaders(); + if (processHeaders != null && !processHeaders.isEmpty()) { + Map<String, String> headerContext = headerExtractor.extractFromProcessHeaders(processHeaders); + if (!headerContext.isEmpty()) { + populateMDCFromContext(headerContext); + LOGGER.debug("Re-established context from process headers for process instance {}: {}", + processInstanceId, headerContext.keySet()); + return headerContext; + } + } + } catch (Exception e) { + LOGGER.debug("Failed to extract context from process headers: {}", e.getMessage()); + } + return Map.of(); + } + + private void createAndConfigureNodeSpan(String processInstanceId, String processId, String processVersion, String processState, String nodeId, Map<String, String> extractedContext) { + Span span = spanManager.createNodeSpanWithContext(processInstanceId, processId, processVersion, processState, nodeId, extractedContext); + + if (span != null) { Review Comment: If span is null, it's silently skipped. You could add an "else" clause with a warning for tracking this situation ########## quarkus/extensions/kogito-quarkus-serverless-workflow-otel-extension/kogito-quarkus-serverless-workflow-otel/src/main/java/org/kie/kogito/quarkus/serverless/workflow/otel/logging/OtelLogHandlerInitializer.java: ########## @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.kie.kogito.quarkus.serverless.workflow.otel.logging; + +import java.util.logging.Level; +import java.util.logging.Logger; + +import io.quarkus.runtime.StartupEvent; + +import jakarta.enterprise.context.ApplicationScoped; +import jakarta.enterprise.event.Observes; + +@ApplicationScoped +public class OtelLogHandlerInitializer { + + private static final OtelLogHandler handler = new OtelLogHandler(); + + void onStart(@Observes StartupEvent ev) { Review Comment: It could be also added a check to ensure the handler isn’t already added, to prevent accidental double registration, but up to you -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
