This is an automated email from the ASF dual-hosted git repository.

tallison pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/tika.git


The following commit(s) were added to refs/heads/main by this push:
     new 6385290fd Increase logging until we have stability in pipes 
client+server on CI (#2436)
6385290fd is described below

commit 6385290fd96d68d821b29255f043501995705efa
Author: Tim Allison <[email protected]>
AuthorDate: Tue Dec 9 17:08:18 2025 -0500

    Increase logging until we have stability in pipes client+server on CI 
(#2436)
---
 .../pipes/opensearch/tests/OpenSearchTest.java     |  9 ++---
 .../org/apache/tika/pipes/core/PipesClient.java    | 38 ++++++++++++----------
 .../tika/pipes/core/async/AsyncProcessor.java      |  4 ++-
 3 files changed, 29 insertions(+), 22 deletions(-)

diff --git 
a/tika-integration-tests/tika-pipes-opensearch-integration-tests/src/test/java/org/apache/tika/pipes/opensearch/tests/OpenSearchTest.java
 
b/tika-integration-tests/tika-pipes-opensearch-integration-tests/src/test/java/org/apache/tika/pipes/opensearch/tests/OpenSearchTest.java
index 80c6b6933..59c41c078 100644
--- 
a/tika-integration-tests/tika-pipes-opensearch-integration-tests/src/test/java/org/apache/tika/pipes/opensearch/tests/OpenSearchTest.java
+++ 
b/tika-integration-tests/tika-pipes-opensearch-integration-tests/src/test/java/org/apache/tika/pipes/opensearch/tests/OpenSearchTest.java
@@ -146,12 +146,13 @@ public class OpenSearchTest {
             }
             statusCounts.put(status, cnt);
         }
-        assertEquals(numHtmlDocs, (int) statusCounts.get("PARSE_SUCCESS"));
+
+        assertEquals(numHtmlDocs, (int) statusCounts.get("PARSE_SUCCESS"), 
"should have had " + numHtmlDocs + " parse successes: " + statusCounts);
         //the npe is caught and counted as a "parse success with exception"
-        assertEquals(1, (int) 
statusCounts.get("PARSE_SUCCESS_WITH_EXCEPTION"));
+        assertEquals(1, (int) 
statusCounts.get("PARSE_SUCCESS_WITH_EXCEPTION"), "should have had 1 parse 
exception: " + statusCounts);
         //the embedded docx is emitted directly
-        assertEquals(1, (int) statusCounts.get("EMIT_SUCCESS"));
-        assertEquals(2, (int) statusCounts.get("OOM"));
+        assertEquals(1, (int) statusCounts.get("EMIT_SUCCESS"), "should have 
had 1 emit success: " + statusCounts);
+        assertEquals(2, (int) statusCounts.get("OOM"), "should have had 2 OOM: 
" + statusCounts);
 
     }
 
diff --git 
a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/PipesClient.java
 
b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/PipesClient.java
index c4a2c0713..d7a4191ca 100644
--- 
a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/PipesClient.java
+++ 
b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/PipesClient.java
@@ -137,6 +137,10 @@ public class PipesClient implements Closeable {
         }
     }
 
+    public int getPipesClientId() {
+        return pipesClientId;
+    }
+
     private void shutItAllDown() throws InterruptedException {
         if (serverTuple == null) {
             return;
@@ -264,13 +268,13 @@ public class PipesClient implements Closeable {
             }
             long totalElapsed = Duration.between(start, 
Instant.now()).toMillis();
             if ( totalElapsed > timeoutMillis) {
-                LOG.warn("timeout on client side: id={} elapsed={} 
timeoutMillis={}", t.getId(), totalElapsed, timeoutMillis);
+                LOG.warn("clientId={}: timeout on client side: id={} 
elapsed={} timeoutMillis={}", pipesClientId, t.getId(), totalElapsed, 
timeoutMillis);
                 return buildFatalResult(t.getId(), t.getEmitKey(), 
PipesResult.RESULT_STATUS.TIMEOUT, intermediateResult.get());
             }
             try {
                 //read blocks on the socket
                 PipesServer.PROCESSING_STATUS status = readServerStatus();
-                LOG.trace("switch status id={} status={}", t.getId(), status);
+                LOG.trace("clientId={}: switch status id={} status={}", 
pipesClientId, t.getId(), status);
                 String msg = null;
                 switch (status) {
                     case OOM:
@@ -297,14 +301,14 @@ public class PipesClient implements Closeable {
                         return readResult(PipesResult.class);
                 }
             } catch (SocketTimeoutException e) {
-                LOG.warn("Socket timeout exception while waiting for server", 
e);
+                LOG.warn("clientId={}: Socket timeout exception while waiting 
for server", pipesClientId, e);
                 shutItAllDown();
                 return buildFatalResult(t.getId(), t.getEmitKey(), TIMEOUT, 
intermediateResult.get(), ExceptionUtils.getStackTrace(e));
             } catch (SecurityException e) {
                 throw e;
             } catch (Exception e) {
                 //TODO -- catch socket timeout separately
-                LOG.warn("Crash while waiting for server", e);
+                LOG.warn("clientId={} - crash while waiting for server", 
pipesClientId, e);
                 serverTuple.process.waitFor(1, TimeUnit.SECONDS);
                 PipesResult.RESULT_STATUS status = UNSPECIFIED_CRASH;
                 if (! serverTuple.process.isAlive()) {
@@ -314,9 +318,9 @@ public class PipesClient implements Closeable {
                     } else if (exitValue == PipesServer.TIMEOUT_EXIT_CODE) {
                         status = PipesResult.RESULT_STATUS.TIMEOUT;
                     }
-                    LOG.warn("exit value: {}", 
serverTuple.process.exitValue());
+                    LOG.warn("clientId={}: exit value{}", pipesClientId, 
serverTuple.process.exitValue());
                 } else {
-                    LOG.warn("process still alive ?!");
+                    LOG.warn("clientId={}: process still alive ?!", 
pipesClientId);
                 }
                 shutItAllDown();
                 return buildFatalResult(t.getId(), t.getEmitKey(), status, 
intermediateResult.get(), ExceptionUtils.getStackTrace(e));
@@ -334,11 +338,11 @@ public class PipesClient implements Closeable {
     }
 
     private PipesResult buildFatalResult(String id, EmitKey emitKey, 
PipesResult.RESULT_STATUS status, Optional<Metadata> intermediateResultOpt, 
String msg) {
-        LOG.warn("crash id={} status={}", id, status);
+        LOG.warn("clientId={}: crash id={} status={}", pipesClientId, id, 
status);
         Metadata intermediateResult = intermediateResultOpt.orElse(new 
Metadata());
 
         if (LOG.isTraceEnabled()) {
-            LOG.trace("intermediate result: id={}", intermediateResult);
+            LOG.trace("clientId={}: intermediate result: id={}", 
pipesClientId, intermediateResult);
         }
         intermediateResult.set(TikaCoreProperties.PIPES_RESULT, 
status.toString());
         if (StringUtils.isBlank(msg)) {
@@ -364,7 +368,7 @@ public class PipesClient implements Closeable {
         serverTuple.process.waitFor(WAIT_ON_DESTROY_MS, TimeUnit.MILLISECONDS);
 
         if (serverTuple.process.isAlive()) {
-            LOG.error("Process still alive after {}ms", WAIT_ON_DESTROY_MS);
+            LOG.error("clientId={}: process still alive after {}ms", 
pipesClientId, WAIT_ON_DESTROY_MS);
         }
     }
 
@@ -428,9 +432,9 @@ public class PipesClient implements Closeable {
         if (serverTuple != null && serverTuple.process != null) {
             int oldPort = serverTuple.serverSocket.getLocalPort();
             shutItAllDown();
-            LOG.info("pipesClientId={}: restarting process on port={} (old 
port was {})", pipesClientId, port, oldPort);
+            LOG.info("clientId={}: restarting process on port={} (old port was 
{})", pipesClientId, port, oldPort);
         } else {
-            LOG.info("pipesClientId={}: starting process on port={}", 
pipesClientId, port);
+            LOG.info("clientId={}: starting process on port={}", 
pipesClientId, port);
         }
         Path tmpDir = Files.createTempDirectory("pipes-server-" + 
pipesClientId + "-");
         ProcessBuilder pb = new ProcessBuilder(getCommandline(port, tmpDir));
@@ -441,7 +445,7 @@ public class PipesClient implements Closeable {
         } catch (Exception e) {
             deleteDir(tmpDir);
             //Do we ever want this to be not fatal?!
-            LOG.error("failed to start server", e);
+            LOG.error("clientId={}: failed to start server", pipesClientId, e);
             String msg = "Failed to start server process";
             if (e.getMessage() != null) {
                 msg += ": " + e.getMessage();
@@ -459,14 +463,14 @@ public class PipesClient implements Closeable {
                 // Check if the process died before connecting
                 if (!process.isAlive()) {
                     int exitValue = process.exitValue();
-                    LOG.error("pipesClientId={}: Process exited with code {} 
before connecting to socket", pipesClientId, exitValue);
+                    LOG.error("clientId={}: Process exited with code {} before 
connecting to socket", pipesClientId, exitValue);
                     deleteDir(tmpDir);
                     throw new ServerInitializationException("Process failed to 
start (exit code " + exitValue + "). Check JVM arguments and classpath.");
                 }
                 // Check if we've exceeded the overall timeout
                 long elapsed = System.currentTimeMillis() - startTime;
                 if (elapsed > SOCKET_CONNECT_TIMEOUT_MS) {
-                    LOG.error("pipesClientId={}: Timed out waiting for server 
to connect after {}ms", pipesClientId, elapsed);
+                    LOG.error("clientId={}: Timed out waiting for server to 
connect after {}ms", pipesClientId, elapsed);
                     deleteDir(tmpDir);
                     throw new ServerInitializationException("Server did not 
connect within " + SOCKET_CONNECT_TIMEOUT_MS + "ms");
                 }
@@ -484,17 +488,17 @@ public class PipesClient implements Closeable {
         int b = serverTuple.input.read();
         writeAck();
         if (b == READY.getByte()) {
-            LOG.debug("pipesClientId={}: server ready", pipesClientId);
+            LOG.debug("clientId={}: server ready", pipesClientId);
         } else if (b == FINISHED.getByte()) {
             int len = serverTuple.input.readInt();
             byte[] bytes = new byte[len];
             serverTuple.input.readFully(bytes);
             writeAck();
             String msg = new String(bytes, StandardCharsets.UTF_8);
-            LOG.error("Server failed to start: {}", msg);
+            LOG.error("clientId={}: Server failed to start: {}", 
pipesClientId, msg);
             throw new ServerInitializationException(msg);
         } else {
-            LOG.error("Unexpected first byte: " + HexFormat.of().formatHex(new 
byte[]{ (byte) b }));
+            LOG.error("clientId={}: Unexpected first byte: {}", pipesClientId, 
HexFormat.of().formatHex(new byte[]{ (byte) b }));
             throw new IOException("Unexpected first byte from server: " + 
HexFormat.of().formatHex(new byte[]{ (byte) b }));
         }
 
diff --git 
a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/async/AsyncProcessor.java
 
b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/async/AsyncProcessor.java
index 7fe7fc52e..c21a8707f 100644
--- 
a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/async/AsyncProcessor.java
+++ 
b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/async/AsyncProcessor.java
@@ -303,8 +303,10 @@ public class AsyncProcessor implements Closeable {
                         long start = System.currentTimeMillis();
                         try {
                             result = pipesClient.process(t);
+                            //TODO -- drop this back to debug or even trace 
once we have stability in ci
+                            LOG.info("pipesClientId={}, status={}", 
pipesClient.getPipesClientId(), result.status());
                         } catch (IOException e) {
-                            LOG.warn("pipesClient crash", e);
+                            LOG.warn("pipesClientId={} crash", 
pipesClient.getPipesClientId(), e);
                             result = PipesResults.UNSPECIFIED_CRASH;
                         }
                         if (LOG.isTraceEnabled()) {

Reply via email to