This is an automated email from the ASF dual-hosted git repository.
tallison pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/tika.git
The following commit(s) were added to refs/heads/main by this push:
new 6385290fd Increase logging until we have stability in pipes
client+server on CI (#2436)
6385290fd is described below
commit 6385290fd96d68d821b29255f043501995705efa
Author: Tim Allison <[email protected]>
AuthorDate: Tue Dec 9 17:08:18 2025 -0500
Increase logging until we have stability in pipes client+server on CI
(#2436)
---
.../pipes/opensearch/tests/OpenSearchTest.java | 9 ++---
.../org/apache/tika/pipes/core/PipesClient.java | 38 ++++++++++++----------
.../tika/pipes/core/async/AsyncProcessor.java | 4 ++-
3 files changed, 29 insertions(+), 22 deletions(-)
diff --git
a/tika-integration-tests/tika-pipes-opensearch-integration-tests/src/test/java/org/apache/tika/pipes/opensearch/tests/OpenSearchTest.java
b/tika-integration-tests/tika-pipes-opensearch-integration-tests/src/test/java/org/apache/tika/pipes/opensearch/tests/OpenSearchTest.java
index 80c6b6933..59c41c078 100644
---
a/tika-integration-tests/tika-pipes-opensearch-integration-tests/src/test/java/org/apache/tika/pipes/opensearch/tests/OpenSearchTest.java
+++
b/tika-integration-tests/tika-pipes-opensearch-integration-tests/src/test/java/org/apache/tika/pipes/opensearch/tests/OpenSearchTest.java
@@ -146,12 +146,13 @@ public class OpenSearchTest {
}
statusCounts.put(status, cnt);
}
- assertEquals(numHtmlDocs, (int) statusCounts.get("PARSE_SUCCESS"));
+
+ assertEquals(numHtmlDocs, (int) statusCounts.get("PARSE_SUCCESS"),
"should have had " + numHtmlDocs + " parse successes: " + statusCounts);
//the npe is caught and counted as a "parse success with exception"
- assertEquals(1, (int)
statusCounts.get("PARSE_SUCCESS_WITH_EXCEPTION"));
+ assertEquals(1, (int)
statusCounts.get("PARSE_SUCCESS_WITH_EXCEPTION"), "should have had 1 parse
exception: " + statusCounts);
//the embedded docx is emitted directly
- assertEquals(1, (int) statusCounts.get("EMIT_SUCCESS"));
- assertEquals(2, (int) statusCounts.get("OOM"));
+ assertEquals(1, (int) statusCounts.get("EMIT_SUCCESS"), "should have
had 1 emit success: " + statusCounts);
+ assertEquals(2, (int) statusCounts.get("OOM"), "should have had 2 OOM:
" + statusCounts);
}
diff --git
a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/PipesClient.java
b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/PipesClient.java
index c4a2c0713..d7a4191ca 100644
---
a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/PipesClient.java
+++
b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/PipesClient.java
@@ -137,6 +137,10 @@ public class PipesClient implements Closeable {
}
}
+ public int getPipesClientId() {
+ return pipesClientId;
+ }
+
private void shutItAllDown() throws InterruptedException {
if (serverTuple == null) {
return;
@@ -264,13 +268,13 @@ public class PipesClient implements Closeable {
}
long totalElapsed = Duration.between(start,
Instant.now()).toMillis();
if ( totalElapsed > timeoutMillis) {
- LOG.warn("timeout on client side: id={} elapsed={}
timeoutMillis={}", t.getId(), totalElapsed, timeoutMillis);
+ LOG.warn("clientId={}: timeout on client side: id={}
elapsed={} timeoutMillis={}", pipesClientId, t.getId(), totalElapsed,
timeoutMillis);
return buildFatalResult(t.getId(), t.getEmitKey(),
PipesResult.RESULT_STATUS.TIMEOUT, intermediateResult.get());
}
try {
//read blocks on the socket
PipesServer.PROCESSING_STATUS status = readServerStatus();
- LOG.trace("switch status id={} status={}", t.getId(), status);
+ LOG.trace("clientId={}: switch status id={} status={}",
pipesClientId, t.getId(), status);
String msg = null;
switch (status) {
case OOM:
@@ -297,14 +301,14 @@ public class PipesClient implements Closeable {
return readResult(PipesResult.class);
}
} catch (SocketTimeoutException e) {
- LOG.warn("Socket timeout exception while waiting for server",
e);
+ LOG.warn("clientId={}: Socket timeout exception while waiting
for server", pipesClientId, e);
shutItAllDown();
return buildFatalResult(t.getId(), t.getEmitKey(), TIMEOUT,
intermediateResult.get(), ExceptionUtils.getStackTrace(e));
} catch (SecurityException e) {
throw e;
} catch (Exception e) {
//TODO -- catch socket timeout separately
- LOG.warn("Crash while waiting for server", e);
+ LOG.warn("clientId={} - crash while waiting for server",
pipesClientId, e);
serverTuple.process.waitFor(1, TimeUnit.SECONDS);
PipesResult.RESULT_STATUS status = UNSPECIFIED_CRASH;
if (! serverTuple.process.isAlive()) {
@@ -314,9 +318,9 @@ public class PipesClient implements Closeable {
} else if (exitValue == PipesServer.TIMEOUT_EXIT_CODE) {
status = PipesResult.RESULT_STATUS.TIMEOUT;
}
- LOG.warn("exit value: {}",
serverTuple.process.exitValue());
+ LOG.warn("clientId={}: exit value{}", pipesClientId,
serverTuple.process.exitValue());
} else {
- LOG.warn("process still alive ?!");
+ LOG.warn("clientId={}: process still alive ?!",
pipesClientId);
}
shutItAllDown();
return buildFatalResult(t.getId(), t.getEmitKey(), status,
intermediateResult.get(), ExceptionUtils.getStackTrace(e));
@@ -334,11 +338,11 @@ public class PipesClient implements Closeable {
}
private PipesResult buildFatalResult(String id, EmitKey emitKey,
PipesResult.RESULT_STATUS status, Optional<Metadata> intermediateResultOpt,
String msg) {
- LOG.warn("crash id={} status={}", id, status);
+ LOG.warn("clientId={}: crash id={} status={}", pipesClientId, id,
status);
Metadata intermediateResult = intermediateResultOpt.orElse(new
Metadata());
if (LOG.isTraceEnabled()) {
- LOG.trace("intermediate result: id={}", intermediateResult);
+ LOG.trace("clientId={}: intermediate result: id={}",
pipesClientId, intermediateResult);
}
intermediateResult.set(TikaCoreProperties.PIPES_RESULT,
status.toString());
if (StringUtils.isBlank(msg)) {
@@ -364,7 +368,7 @@ public class PipesClient implements Closeable {
serverTuple.process.waitFor(WAIT_ON_DESTROY_MS, TimeUnit.MILLISECONDS);
if (serverTuple.process.isAlive()) {
- LOG.error("Process still alive after {}ms", WAIT_ON_DESTROY_MS);
+ LOG.error("clientId={}: process still alive after {}ms",
pipesClientId, WAIT_ON_DESTROY_MS);
}
}
@@ -428,9 +432,9 @@ public class PipesClient implements Closeable {
if (serverTuple != null && serverTuple.process != null) {
int oldPort = serverTuple.serverSocket.getLocalPort();
shutItAllDown();
- LOG.info("pipesClientId={}: restarting process on port={} (old
port was {})", pipesClientId, port, oldPort);
+ LOG.info("clientId={}: restarting process on port={} (old port was
{})", pipesClientId, port, oldPort);
} else {
- LOG.info("pipesClientId={}: starting process on port={}",
pipesClientId, port);
+ LOG.info("clientId={}: starting process on port={}",
pipesClientId, port);
}
Path tmpDir = Files.createTempDirectory("pipes-server-" +
pipesClientId + "-");
ProcessBuilder pb = new ProcessBuilder(getCommandline(port, tmpDir));
@@ -441,7 +445,7 @@ public class PipesClient implements Closeable {
} catch (Exception e) {
deleteDir(tmpDir);
//Do we ever want this to be not fatal?!
- LOG.error("failed to start server", e);
+ LOG.error("clientId={}: failed to start server", pipesClientId, e);
String msg = "Failed to start server process";
if (e.getMessage() != null) {
msg += ": " + e.getMessage();
@@ -459,14 +463,14 @@ public class PipesClient implements Closeable {
// Check if the process died before connecting
if (!process.isAlive()) {
int exitValue = process.exitValue();
- LOG.error("pipesClientId={}: Process exited with code {}
before connecting to socket", pipesClientId, exitValue);
+ LOG.error("clientId={}: Process exited with code {} before
connecting to socket", pipesClientId, exitValue);
deleteDir(tmpDir);
throw new ServerInitializationException("Process failed to
start (exit code " + exitValue + "). Check JVM arguments and classpath.");
}
// Check if we've exceeded the overall timeout
long elapsed = System.currentTimeMillis() - startTime;
if (elapsed > SOCKET_CONNECT_TIMEOUT_MS) {
- LOG.error("pipesClientId={}: Timed out waiting for server
to connect after {}ms", pipesClientId, elapsed);
+ LOG.error("clientId={}: Timed out waiting for server to
connect after {}ms", pipesClientId, elapsed);
deleteDir(tmpDir);
throw new ServerInitializationException("Server did not
connect within " + SOCKET_CONNECT_TIMEOUT_MS + "ms");
}
@@ -484,17 +488,17 @@ public class PipesClient implements Closeable {
int b = serverTuple.input.read();
writeAck();
if (b == READY.getByte()) {
- LOG.debug("pipesClientId={}: server ready", pipesClientId);
+ LOG.debug("clientId={}: server ready", pipesClientId);
} else if (b == FINISHED.getByte()) {
int len = serverTuple.input.readInt();
byte[] bytes = new byte[len];
serverTuple.input.readFully(bytes);
writeAck();
String msg = new String(bytes, StandardCharsets.UTF_8);
- LOG.error("Server failed to start: {}", msg);
+ LOG.error("clientId={}: Server failed to start: {}",
pipesClientId, msg);
throw new ServerInitializationException(msg);
} else {
- LOG.error("Unexpected first byte: " + HexFormat.of().formatHex(new
byte[]{ (byte) b }));
+ LOG.error("clientId={}: Unexpected first byte: {}", pipesClientId,
HexFormat.of().formatHex(new byte[]{ (byte) b }));
throw new IOException("Unexpected first byte from server: " +
HexFormat.of().formatHex(new byte[]{ (byte) b }));
}
diff --git
a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/async/AsyncProcessor.java
b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/async/AsyncProcessor.java
index 7fe7fc52e..c21a8707f 100644
---
a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/async/AsyncProcessor.java
+++
b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/async/AsyncProcessor.java
@@ -303,8 +303,10 @@ public class AsyncProcessor implements Closeable {
long start = System.currentTimeMillis();
try {
result = pipesClient.process(t);
+ //TODO -- drop this back to debug or even trace
once we have stability in ci
+ LOG.info("pipesClientId={}, status={}",
pipesClient.getPipesClientId(), result.status());
} catch (IOException e) {
- LOG.warn("pipesClient crash", e);
+ LOG.warn("pipesClientId={} crash",
pipesClient.getPipesClientId(), e);
result = PipesResults.UNSPECIFIED_CRASH;
}
if (LOG.isTraceEnabled()) {