This is an automated email from the ASF dual-hosted git repository.
tallison pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/tika.git
The following commit(s) were added to refs/heads/main by this push:
new 60e1aa217b TIKA-4670 -- improve exit handling btwn pipesclient and
pipesserver (#2618)
60e1aa217b is described below
commit 60e1aa217b10b9972d32987194d76328cb59fe7d
Author: Tim Allison <[email protected]>
AuthorDate: Wed Feb 18 16:36:44 2026 -0500
TIKA-4670 -- improve exit handling btwn pipesclient and pipesserver (#2618)
---
.../tika/pipes/core/PerClientServerManager.java | 20 +-
.../org/apache/tika/pipes/core/PipesClient.java | 153 +++------
.../tika/pipes/core/protocol/PipesMessage.java | 169 ++++++++++
.../tika/pipes/core/protocol/PipesMessageType.java | 96 ++++++
.../core/protocol/ProtocolDesyncException.java | 30 ++
.../core/protocol/ShutDownReceivedException.java | 34 ++
.../tika/pipes/core/server/ConnectionHandler.java | 255 +++++----------
.../apache/tika/pipes/core/server/PipesServer.java | 364 ++++++---------------
.../tika/pipes/core/server/ServerProtocolIO.java | 133 ++++++++
.../tika/pipes/core/protocol/PipesMessageTest.java | 202 ++++++++++++
.../apache/tika/pipes/core/PipesClientTest.java | 61 +++-
11 files changed, 970 insertions(+), 547 deletions(-)
diff --git
a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/PerClientServerManager.java
b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/PerClientServerManager.java
index 94c972f013..a8cf40ba10 100644
---
a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/PerClientServerManager.java
+++
b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/PerClientServerManager.java
@@ -34,7 +34,6 @@ import org.apache.commons.io.FileUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.tika.pipes.core.server.PipesServer;
import org.apache.tika.utils.ProcessUtils;
/**
@@ -161,17 +160,14 @@ public class PerClientServerManager implements
ServerManager {
int exitValue = process.exitValue();
LOG.error("clientId={}: Process exited with code {} before
connecting to socket",
clientId, exitValue);
- // Don't treat known crash exit codes as initialization
failures
- // These indicate the server started but crashed during
processing
- if (exitValue == PipesServer.OOM_EXIT_CODE ||
- exitValue == PipesServer.TIMEOUT_EXIT_CODE ||
- exitValue ==
PipesServer.UNSPECIFIED_CRASH_EXIT_CODE) {
- // Mark for restart and throw IOException so caller
can retry
- pendingRestart = true;
- throw new IOException("Server crashed (exit code " +
exitValue + ") - will retry");
- }
- throw new ServerInitializationException(
- "Process failed to start (exit code " + exitValue
+ "). Check JVM arguments and classpath.");
+ // Always treat pre-connect death as retryable.
+ // The only non-retryable paths are:
+ // 1. pb.start() fails (can't launch process) - handled in
startServer()
+ // 2. Server explicitly reports bad config via protocol -
handled in waitForStartup()
+ // 3. Exhausted all retry attempts - handled in maybeInit()
+ pendingRestart = true;
+ throw new IOException(
+ "Server process died before connecting (exit code
" + exitValue + ") - will retry");
}
// Check if we've exceeded the overall timeout
long elapsed = System.currentTimeMillis() - startTime;
diff --git
a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/PipesClient.java
b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/PipesClient.java
index 82c1d7bfea..fb4ae62824 100644
---
a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/PipesClient.java
+++
b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/PipesClient.java
@@ -19,9 +19,6 @@ package org.apache.tika.pipes.core;
import static org.apache.tika.pipes.api.PipesResult.RESULT_STATUS.TIMEOUT;
import static
org.apache.tika.pipes.api.PipesResult.RESULT_STATUS.UNSPECIFIED_CRASH;
-import static org.apache.tika.pipes.core.PipesClient.COMMANDS.ACK;
-import static
org.apache.tika.pipes.core.server.PipesServer.PROCESSING_STATUS.FINISHED;
-import static
org.apache.tika.pipes.core.server.PipesServer.PROCESSING_STATUS.READY;
import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
@@ -35,9 +32,7 @@ import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
-import java.util.HexFormat;
import java.util.List;
-import java.util.Locale;
import java.util.Optional;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
@@ -53,9 +48,10 @@ import org.apache.tika.pipes.api.FetchEmitTuple;
import org.apache.tika.pipes.api.PipesResult;
import org.apache.tika.pipes.api.emitter.EmitKey;
import org.apache.tika.pipes.core.emitter.EmitDataImpl;
+import org.apache.tika.pipes.core.protocol.PipesMessage;
+import org.apache.tika.pipes.core.protocol.PipesMessageType;
import org.apache.tika.pipes.core.serialization.JsonPipesIpc;
import org.apache.tika.pipes.core.server.IntermediateResult;
-import org.apache.tika.pipes.core.server.PipesServer;
import org.apache.tika.utils.ExceptionUtils;
import org.apache.tika.utils.StringUtils;
@@ -71,14 +67,6 @@ import org.apache.tika.utils.StringUtils;
*/
public class PipesClient implements Closeable {
- public enum COMMANDS {
- PING, ACK, NEW_REQUEST, SHUT_DOWN;
-
- public byte getByte() {
- return (byte) (ordinal() + 1);
- }
- }
-
private static final Logger LOG =
LoggerFactory.getLogger(PipesClient.class);
private static final AtomicInteger CLIENT_COUNTER = new AtomicInteger(0);
public static final int SOCKET_CONNECT_TIMEOUT_MS = 60000;
@@ -86,6 +74,7 @@ public class PipesClient implements Closeable {
private final PipesConfig pipesConfig;
private final ServerManager serverManager;
+ private final boolean ownsServerManager;
private final int pipesClientId;
private ConnectionTuple connectionTuple;
@@ -93,6 +82,10 @@ public class PipesClient implements Closeable {
/**
* Creates a PipesClient with the given server manager.
+ * <p>
+ * The caller retains ownership of the server manager and is responsible
+ * for closing it. This is used in shared mode where multiple clients
+ * share a single server manager.
*
* @param pipesConfig the pipes configuration
* @param serverManager the server manager (per-client or shared)
@@ -100,6 +93,7 @@ public class PipesClient implements Closeable {
public PipesClient(PipesConfig pipesConfig, ServerManager serverManager) {
this.pipesConfig = pipesConfig;
this.serverManager = serverManager;
+ this.ownsServerManager = false;
this.pipesClientId = CLIENT_COUNTER.getAndIncrement();
}
@@ -117,6 +111,7 @@ public class PipesClient implements Closeable {
this.pipesConfig = pipesConfig;
this.pipesClientId = CLIENT_COUNTER.getAndIncrement();
this.serverManager = new PerClientServerManager(pipesConfig,
tikaConfigPath, pipesClientId);
+ this.ownsServerManager = true;
}
public int getFilesProcessed() {
@@ -132,10 +127,9 @@ public class PipesClient implements Closeable {
return false;
}
try {
- connectionTuple.output.write(COMMANDS.PING.getByte());
- connectionTuple.output.flush();
- int ping = connectionTuple.input.read();
- if (ping == COMMANDS.PING.getByte()) {
+ PipesMessage.ping().write(connectionTuple.output);
+ PipesMessage response = PipesMessage.read(connectionTuple.input);
+ if (response.type() == PipesMessageType.PING) {
return true;
}
} catch (IOException e) {
@@ -151,6 +145,9 @@ public class PipesClient implements Closeable {
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
+ if (ownsServerManager) {
+ serverManager.close();
+ }
}
public int getPipesClientId() {
@@ -167,8 +164,7 @@ public class PipesClient implements Closeable {
}
LOG.debug("pipesClientId={}: closing connection", pipesClientId);
try {
- connectionTuple.output.write(COMMANDS.SHUT_DOWN.getByte());
- connectionTuple.output.flush();
+ PipesMessage.shutDown().write(connectionTuple.output);
} catch (IOException e) {
// swallow
}
@@ -307,17 +303,13 @@ public class PipesClient implements Closeable {
private void writeTask(FetchEmitTuple t) throws IOException {
LOG.debug("pipesClientId={}: sending NEW_REQUEST for id={}",
pipesClientId, t.getId());
byte[] bytes = JsonPipesIpc.toBytes(t);
- connectionTuple.output.write(COMMANDS.NEW_REQUEST.getByte());
- connectionTuple.output.writeInt(bytes.length);
- connectionTuple.output.write(bytes);
- connectionTuple.output.flush();
+ PipesMessage.newRequest(bytes).write(connectionTuple.output);
}
private PipesResult waitForServer(FetchEmitTuple t, IntermediateResult
intermediateResult) throws InterruptedException {
long timeoutMillis = getTimeoutMillis(pipesConfig,
t.getParseContext());
Instant start = Instant.now();
Instant lastUpdate = start;
- long lastProgressCounter = 0;
while (true) {
if (Thread.currentThread().isInterrupted()) {
@@ -334,42 +326,49 @@ public class PipesClient implements Closeable {
intermediateResult.get());
}
try {
- // Read blocks on the socket
- PipesServer.PROCESSING_STATUS status = readServerStatus();
- LOG.trace("clientId={}: switch status id={} status={}",
pipesClientId, t.getId(), status);
- String msg = null;
- switch (status) {
+ PipesMessage msg = PipesMessage.read(connectionTuple.input);
+ LOG.trace("clientId={}: received message type={} id={}",
pipesClientId, msg.type(), t.getId());
+
+ // Send ACK only for messages that require it
+ if (msg.type().requiresAck()) {
+ PipesMessage.ack().write(connectionTuple.output);
+ }
+
+ switch (msg.type()) {
case OOM:
- msg = readResult(String.class);
- serverManager.markServerForRestart(); // Signal that
server is dying
+ String oomMsg = JsonPipesIpc.fromBytes(msg.payload(),
String.class);
+ serverManager.markServerForRestart();
closeConnection();
return buildFatalResult(t.getId(), t.getEmitKey(),
PipesResult.RESULT_STATUS.OOM,
- intermediateResult.get(), msg);
+ intermediateResult.get(), oomMsg);
case TIMEOUT:
- msg = readResult(String.class);
- serverManager.markServerForRestart(); // Signal that
server is dying
+ String timeoutMsg =
JsonPipesIpc.fromBytes(msg.payload(), String.class);
+ serverManager.markServerForRestart();
closeConnection();
- return buildFatalResult(t.getId(), t.getEmitKey(),
TIMEOUT, intermediateResult.get(), msg);
+ return buildFatalResult(t.getId(), t.getEmitKey(),
TIMEOUT,
+ intermediateResult.get(), timeoutMsg);
case UNSPECIFIED_CRASH:
- msg = readResult(String.class);
+ String crashMsg =
JsonPipesIpc.fromBytes(msg.payload(), String.class);
+ serverManager.markServerForRestart();
closeConnection();
return buildFatalResult(t.getId(), t.getEmitKey(),
UNSPECIFIED_CRASH,
- intermediateResult.get(), msg);
+ intermediateResult.get(), crashMsg);
case INTERMEDIATE_RESULT:
- intermediateResult.set(readResult(Metadata.class));
+
intermediateResult.set(JsonPipesIpc.fromBytes(msg.payload(), Metadata.class));
lastUpdate = Instant.now();
break;
case WORKING:
- lastProgressCounter = readProgressCounter();
lastUpdate = Instant.now();
break;
case FINISHED:
- PipesResult result = readResult(PipesResult.class);
+ PipesResult result =
JsonPipesIpc.fromBytes(msg.payload(), PipesResult.class);
// Restore ParseContext from original FetchEmitTuple
(not serialized back from server)
if (result.emitData() instanceof EmitDataImpl
emitDataImpl) {
emitDataImpl.setParseContext(t.getParseContext());
}
return result;
+ default:
+ throw new IOException("Unexpected message type from
server: " + msg.type());
}
} catch (SocketTimeoutException e) {
LOG.warn("clientId={}: Socket timeout exception while waiting
for server", pipesClientId, e);
@@ -385,9 +384,9 @@ public class PipesClient implements Closeable {
// Handle crash and determine status based on exit code
int exitCode = serverManager.handleCrashAndGetExitCode();
PipesResult.RESULT_STATUS status = UNSPECIFIED_CRASH;
- if (exitCode == PipesServer.OOM_EXIT_CODE) {
+ if (exitCode == PipesMessageType.OOM.getExitCode().orElse(-1))
{
status = PipesResult.RESULT_STATUS.OOM;
- } else if (exitCode == PipesServer.TIMEOUT_EXIT_CODE) {
+ } else if (exitCode ==
PipesMessageType.TIMEOUT.getExitCode().orElse(-1)) {
status = PipesResult.RESULT_STATUS.TIMEOUT;
}
closeConnection();
@@ -397,10 +396,6 @@ public class PipesClient implements Closeable {
}
}
- private long readProgressCounter() throws IOException {
- return connectionTuple.input.readLong();
- }
-
private PipesResult buildFatalResult(String id, EmitKey emitKey,
PipesResult.RESULT_STATUS status,
Optional<Metadata>
intermediateResultOpt) {
return buildFatalResult(id, emitKey, status, intermediateResultOpt,
null);
@@ -422,63 +417,19 @@ public class PipesClient implements Closeable {
}
}
- private PipesServer.PROCESSING_STATUS readServerStatus() throws
IOException {
- int statusByte = connectionTuple.input.read();
- writeAck();
- PipesServer.PROCESSING_STATUS status = null;
- try {
- status = PipesServer.PROCESSING_STATUS.lookup(statusByte);
- } catch (IllegalArgumentException e) {
- String byteString = "-1";
- if (statusByte > -1) {
- byteString = String.format(Locale.US, "%02x", (byte)
statusByte);
- }
- throw new IOException("problem reading response from server: " +
byteString, e);
- }
- return status;
- }
-
- private <T> T readResult(Class<T> clazz) throws IOException {
- int len = connectionTuple.input.readInt();
- if (len < 0 || len > PipesServer.MAX_FETCH_EMIT_TUPLE_BYTES) {
- throw new IOException("Server response length " + len +
- " exceeds maximum allowed size of " +
PipesServer.MAX_FETCH_EMIT_TUPLE_BYTES + " bytes");
- }
- byte[] bytes = new byte[len];
- connectionTuple.input.readFully(bytes);
-
- writeAck();
- return JsonPipesIpc.fromBytes(bytes, clazz);
- }
-
- private void writeAck() throws IOException {
- connectionTuple.output.write(ACK.getByte());
- connectionTuple.output.flush();
- }
-
private void waitForStartup() throws IOException {
- // Wait for ready byte
- int b = connectionTuple.input.read();
- writeAck();
- if (b == READY.getByte()) {
+ PipesMessage msg = PipesMessage.read(connectionTuple.input);
+ if (msg.type() == PipesMessageType.READY) {
LOG.debug("clientId={}: server ready", pipesClientId);
- } else if (b == FINISHED.getByte()) {
- int len = connectionTuple.input.readInt();
- if (len < 0 || len > PipesServer.MAX_FETCH_EMIT_TUPLE_BYTES) {
- throw new IOException("Server startup error message length " +
len +
- " exceeds maximum allowed size of " +
PipesServer.MAX_FETCH_EMIT_TUPLE_BYTES + " bytes");
- }
- byte[] bytes = new byte[len];
- connectionTuple.input.readFully(bytes);
- writeAck();
- String msg = new String(bytes, StandardCharsets.UTF_8);
- LOG.error("clientId={}: Server failed to start: {}",
pipesClientId, msg);
- throw new ServerInitializationException(msg);
+ } else if (msg.type() == PipesMessageType.STARTUP_FAILED) {
+ // Send ACK for startup failure
+ PipesMessage.ack().write(connectionTuple.output);
+ String errorMsg = new String(msg.payload(),
StandardCharsets.UTF_8);
+ LOG.error("clientId={}: Server failed to start: {}",
pipesClientId, errorMsg);
+ throw new ServerInitializationException(errorMsg);
} else {
- LOG.error("clientId={}: Unexpected first byte: {}", pipesClientId,
- HexFormat.of().formatHex(new byte[]{(byte) b}));
- throw new IOException("Unexpected first byte from server: " +
- HexFormat.of().formatHex(new byte[]{(byte) b}));
+ LOG.error("clientId={}: Unexpected first message type: {}",
pipesClientId, msg.type());
+ throw new IOException("Unexpected first message type from server:
" + msg.type());
}
}
diff --git
a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/protocol/PipesMessage.java
b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/protocol/PipesMessage.java
new file mode 100644
index 0000000000..753f780fda
--- /dev/null
+++
b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/protocol/PipesMessage.java
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tika.pipes.core.protocol;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.EOFException;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.util.Locale;
+
+/**
+ * Uniform framed message for the PipesClient/PipesServer IPC protocol.
+ * <p>
+ * Wire format: {@code [MAGIC 0x54 0x4B][TYPE 1B][LEN 4B][PAYLOAD]}
+ * <ul>
+ * <li>MAGIC — two bytes {@code 0x54 0x4B} ("TK") for desync detection</li>
+ * <li>TYPE — one byte identifying the {@link PipesMessageType}</li>
+ * <li>LEN — four-byte big-endian payload length (0 for empty payloads)</li>
+ * <li>PAYLOAD — {@code LEN} bytes of payload data</li>
+ * </ul>
+ */
+public record PipesMessage(PipesMessageType type, byte[] payload) {
+
+ static final byte MAGIC_0 = 0x54; // 'T'
+ static final byte MAGIC_1 = 0x4B; // 'K'
+
+ /** Maximum payload size: 100 MB (same as old MAX_FETCH_EMIT_TUPLE_BYTES).
*/
+ public static final int MAX_PAYLOAD_BYTES = 100 * 1024 * 1024;
+
+ private static final byte[] EMPTY = new byte[0];
+
+ /**
+ * Reads one framed message from the stream.
+ *
+ * @throws ProtocolDesyncException if magic bytes don't match
+ * @throws EOFException if the stream ends before a complete message
+ * @throws IOException on payload size violations or I/O errors
+ */
+ public static PipesMessage read(DataInputStream in) throws IOException {
+ int m0 = in.read();
+ if (m0 == -1) {
+ throw new EOFException("Stream closed before magic byte");
+ }
+ int m1 = in.read();
+ if (m1 == -1) {
+ throw new EOFException("Stream closed after first magic byte");
+ }
+ if ((byte) m0 != MAGIC_0 || (byte) m1 != MAGIC_1) {
+ throw new ProtocolDesyncException(
+ String.format(Locale.ROOT, "Expected magic 0x%02x%02x but
got 0x%02x%02x",
+ MAGIC_0 & 0xFF, MAGIC_1 & 0xFF, m0 & 0xFF, m1 &
0xFF));
+ }
+
+ int typeByte = in.read();
+ if (typeByte == -1) {
+ throw new EOFException("Stream closed before type byte");
+ }
+ PipesMessageType type = PipesMessageType.lookup(typeByte);
+
+ int len = in.readInt();
+ if (len < 0) {
+ throw new IOException("Negative payload length: " + len);
+ }
+ if (len > MAX_PAYLOAD_BYTES) {
+ throw new IOException("Payload length " + len +
+ " exceeds maximum of " + MAX_PAYLOAD_BYTES + " bytes");
+ }
+
+ byte[] payload;
+ if (len == 0) {
+ payload = EMPTY;
+ } else {
+ payload = new byte[len];
+ in.readFully(payload);
+ }
+ return new PipesMessage(type, payload);
+ }
+
+ /**
+ * Writes this message to the stream and flushes.
+ */
+ public void write(DataOutputStream out) throws IOException {
+ out.write(MAGIC_0);
+ out.write(MAGIC_1);
+ out.write(type.getByte());
+ out.writeInt(payload.length);
+ if (payload.length > 0) {
+ out.write(payload);
+ }
+ out.flush();
+ }
+
+ // ---- convenience factories ----
+
+ public static PipesMessage ping() {
+ return new PipesMessage(PipesMessageType.PING, EMPTY);
+ }
+
+ public static PipesMessage ack() {
+ return new PipesMessage(PipesMessageType.ACK, EMPTY);
+ }
+
+ public static PipesMessage ready() {
+ return new PipesMessage(PipesMessageType.READY, EMPTY);
+ }
+
+ public static PipesMessage shutDown() {
+ return new PipesMessage(PipesMessageType.SHUT_DOWN, EMPTY);
+ }
+
+ /**
+ * Creates a WORKING heartbeat with a progress counter in the payload.
+ */
+ public static PipesMessage working(long counter) {
+ byte[] payload = ByteBuffer.allocate(Long.BYTES)
+ .order(ByteOrder.BIG_ENDIAN)
+ .putLong(counter)
+ .array();
+ return new PipesMessage(PipesMessageType.WORKING, payload);
+ }
+
+ public static PipesMessage newRequest(byte[] payload) {
+ return new PipesMessage(PipesMessageType.NEW_REQUEST, payload);
+ }
+
+ public static PipesMessage finished(byte[] payload) {
+ return new PipesMessage(PipesMessageType.FINISHED, payload);
+ }
+
+ public static PipesMessage intermediateResult(byte[] payload) {
+ return new PipesMessage(PipesMessageType.INTERMEDIATE_RESULT, payload);
+ }
+
+ public static PipesMessage startupFailed(byte[] payload) {
+ return new PipesMessage(PipesMessageType.STARTUP_FAILED, payload);
+ }
+
+ public static PipesMessage crash(PipesMessageType crashType, byte[]
payload) {
+ return new PipesMessage(crashType, payload);
+ }
+
+ /**
+ * Extracts the progress counter from a WORKING message payload.
+ */
+ public long progressCounter() {
+ if (type != PipesMessageType.WORKING) {
+ throw new IllegalStateException("progressCounter() only valid for
WORKING messages");
+ }
+ return ByteBuffer.wrap(payload)
+ .order(ByteOrder.BIG_ENDIAN)
+ .getLong();
+ }
+}
diff --git
a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/protocol/PipesMessageType.java
b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/protocol/PipesMessageType.java
new file mode 100644
index 0000000000..a0d2e6eb7f
--- /dev/null
+++
b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/protocol/PipesMessageType.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tika.pipes.core.protocol;
+
+import java.util.Locale;
+import java.util.OptionalInt;
+
+/**
+ * Unified message types for the PipesClient/PipesServer IPC protocol.
+ * <p>
+ * Replaces the separate {@code PipesClient.COMMANDS} and
+ * {@code PipesServer.PROCESSING_STATUS} enums with a single enum
+ * that carries explicit wire bytes, ACK requirements, and exit codes.
+ */
+public enum PipesMessageType {
+
+ PING(0x01, false, -1),
+ ACK(0x02, false, -1),
+ NEW_REQUEST(0x03, false, -1),
+ SHUT_DOWN(0x04, false, -1),
+ READY(0x05, false, -1),
+ STARTUP_FAILED(0x06, true, -1),
+ INTERMEDIATE_RESULT(0x07, true, -1),
+ WORKING(0x08, false, -1),
+ FINISHED(0x09, true, -1),
+ OOM(0x0A, true, 18),
+ TIMEOUT(0x0B, true, 17),
+ UNSPECIFIED_CRASH(0x0C, true, 19);
+
+ private final int wireByte;
+ private final boolean requiresAck;
+ private final int exitCode;
+
+ PipesMessageType(int wireByte, boolean requiresAck, int exitCode) {
+ this.wireByte = wireByte;
+ this.requiresAck = requiresAck;
+ this.exitCode = exitCode;
+ }
+
+ /**
+ * Returns the single byte used on the wire for this message type.
+ */
+ public byte getByte() {
+ return (byte) wireByte;
+ }
+
+ /**
+ * Returns {@code true} if the receiver must send an ACK after reading
+ * a message of this type.
+ */
+ public boolean requiresAck() {
+ return requiresAck;
+ }
+
+ /**
+ * Returns the exit code the server should use when exiting due to this
+ * condition, or empty if this message type does not trigger an exit.
+ */
+ public OptionalInt getExitCode() {
+ if (exitCode < 0) {
+ return OptionalInt.empty();
+ }
+ return OptionalInt.of(exitCode);
+ }
+
+ /**
+ * Looks up a message type by its wire byte.
+ *
+ * @param b the wire byte
+ * @return the matching message type
+ * @throws IllegalArgumentException if no type matches
+ */
+ public static PipesMessageType lookup(int b) {
+ for (PipesMessageType type : values()) {
+ if (type.wireByte == b) {
+ return type;
+ }
+ }
+ throw new IllegalArgumentException(
+ String.format(Locale.ROOT, "Unknown PipesMessageType wire
byte: 0x%02x", b & 0xFF));
+ }
+}
diff --git
a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/protocol/ProtocolDesyncException.java
b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/protocol/ProtocolDesyncException.java
new file mode 100644
index 0000000000..fc89680095
--- /dev/null
+++
b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/protocol/ProtocolDesyncException.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tika.pipes.core.protocol;
+
+import java.io.IOException;
+
+/**
+ * Thrown when the framing magic bytes do not match, indicating that the
+ * IPC stream is desynchronized and the connection is unsalvageable.
+ */
+public class ProtocolDesyncException extends IOException {
+
+ public ProtocolDesyncException(String message) {
+ super(message);
+ }
+}
diff --git
a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/protocol/ShutDownReceivedException.java
b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/protocol/ShutDownReceivedException.java
new file mode 100644
index 0000000000..de9fedf957
--- /dev/null
+++
b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/protocol/ShutDownReceivedException.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tika.pipes.core.protocol;
+
+import java.io.IOException;
+
+/**
+ * Thrown when a SHUT_DOWN message is received where an ACK was expected.
+ * <p>
+ * This allows callers to distinguish a clean shutdown request from other
+ * I/O errors and respond with the appropriate lifecycle action (e.g.,
+ * {@code System.exit(0)} in PipesServer vs closing only the connection
+ * in ConnectionHandler).
+ */
+public class ShutDownReceivedException extends IOException {
+
+ public ShutDownReceivedException() {
+ super("Received SHUT_DOWN while awaiting ACK");
+ }
+}
diff --git
a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/server/ConnectionHandler.java
b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/server/ConnectionHandler.java
index e33d1f0cef..25e080a941 100644
---
a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/server/ConnectionHandler.java
+++
b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/server/ConnectionHandler.java
@@ -16,12 +16,6 @@
*/
package org.apache.tika.pipes.core.server;
-import static org.apache.tika.pipes.core.PipesClient.COMMANDS.ACK;
-import static
org.apache.tika.pipes.core.server.PipesServer.PROCESSING_STATUS.FINISHED;
-import static
org.apache.tika.pipes.core.server.PipesServer.PROCESSING_STATUS.INTERMEDIATE_RESULT;
-import static
org.apache.tika.pipes.core.server.PipesServer.PROCESSING_STATUS.OOM;
-import static
org.apache.tika.pipes.core.server.PipesServer.PROCESSING_STATUS.TIMEOUT;
-
import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.Closeable;
@@ -32,7 +26,6 @@ import java.net.Socket;
import java.net.SocketException;
import java.time.Duration;
import java.time.Instant;
-import java.util.HexFormat;
import java.util.Locale;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.CountDownLatch;
@@ -55,10 +48,10 @@ import org.apache.tika.pipes.api.PipesResult;
import org.apache.tika.pipes.core.EmitStrategyConfig;
import org.apache.tika.pipes.core.PipesClient;
import org.apache.tika.pipes.core.PipesConfig;
+import org.apache.tika.pipes.core.protocol.PipesMessage;
+import org.apache.tika.pipes.core.protocol.PipesMessageType;
import org.apache.tika.pipes.core.serialization.JsonPipesIpc;
import org.apache.tika.serialization.ParseContextUtils;
-import org.apache.tika.utils.ExceptionUtils;
-import org.apache.tika.utils.StringUtils;
/**
* Handles a single client connection in shared server mode.
@@ -67,9 +60,11 @@ import org.apache.tika.utils.StringUtils;
* one PipesClient. It shares resources (parser, fetcher manager, etc.) with
* other handlers but has its own socket, streams, and executor.
* <p>
- * Unlike the per-client PipesServer, a ConnectionHandler does NOT call
- * System.exit() on errors - it just closes the connection and terminates
- * its thread. The shared server continues running for other clients.
+ * Unlike the per-client PipesServer, a ConnectionHandler does not call
+ * System.exit() for most errors — it just closes the connection and
+ * terminates its thread. However, OOM and TIMEOUT require a JVM restart,
+ * so those still call System.exit(). For all other crashes the shared
+ * server continues running for other clients.
*/
public class ConnectionHandler implements Runnable, Closeable {
@@ -88,6 +83,7 @@ public class ConnectionHandler implements Runnable, Closeable
{
private final ExecutorCompletionService<PipesResult>
executorCompletionService =
new ExecutorCompletionService<>(executorService);
+ private final ServerProtocolIO protocolIO;
private volatile boolean running = true;
/**
@@ -107,14 +103,15 @@ public class ConnectionHandler implements Runnable,
Closeable {
this.resources = resources;
this.pipesConfig = pipesConfig;
this.heartbeatIntervalMs = pipesConfig.getHeartbeatIntervalMs();
+ this.protocolIO = new ServerProtocolIO(input, output);
}
@Override
public void run() {
LOG.debug("handlerId={}: starting connection handler", handlerId);
try {
- // Send READY signal
- write(PipesServer.PROCESSING_STATUS.READY.getByte());
+ // Send READY signal (fire-and-forget, no ACK)
+ PipesMessage.ready().write(output);
LOG.debug("handlerId={}: sent READY, entering main loop",
handlerId);
mainLoop();
@@ -134,58 +131,58 @@ public class ConnectionHandler implements Runnable,
Closeable {
while (running) {
try {
- int request = input.read();
- LOG.trace("handlerId={}: received command byte={}", handlerId,
- HexFormat.of().formatHex(new byte[]{(byte) request}));
-
- if (request == -1) {
- LOG.debug("handlerId={}: received -1 from client; closing
connection", handlerId);
- return;
+ PipesMessage msg = PipesMessage.read(input);
+ LOG.trace("handlerId={}: received message type={}", handlerId,
msg.type());
+
+ switch (msg.type()) {
+ case PING:
+ PipesMessage.ping().write(output);
+ break;
+ case NEW_REQUEST:
+ intermediateResult.clear();
+ CountDownLatch countDownLatch = new CountDownLatch(1);
+
+ FetchEmitTuple fetchEmitTuple;
+ try {
+ fetchEmitTuple =
JsonPipesIpc.fromBytes(msg.payload(), FetchEmitTuple.class);
+ } catch (IOException e) {
+ LOG.error("handlerId={}: problem deserializing
FetchEmitTuple", handlerId, e);
+ handleCrash(PipesMessageType.UNSPECIFIED_CRASH,
"unknown", e);
+ return; // connection is unsalvageable after
deserialization failure
+ }
+ try {
+
ServerProtocolIO.validateFetchEmitTuple(fetchEmitTuple);
+ ParseContext mergedContext =
resources.createMergedParseContext(fetchEmitTuple.getParseContext());
+ ParseContextUtils.resolveAll(mergedContext,
getClass().getClassLoader());
+
+ PipesWorker pipesWorker =
createPipesWorker(intermediateResult, fetchEmitTuple,
+ mergedContext, countDownLatch);
+ executorCompletionService.submit(pipesWorker);
+
+ loopUntilDone(fetchEmitTuple, mergedContext,
intermediateResult, countDownLatch);
+ } catch (TikaConfigException e) {
+ LOG.error("handlerId={}: config error processing
request", handlerId, e);
+ handleCrash(PipesMessageType.UNSPECIFIED_CRASH,
fetchEmitTuple.getId(), e);
+ } catch (Throwable t) {
+ LOG.error("handlerId={}: error processing
request", handlerId, t);
+ }
+ break;
+ case SHUT_DOWN:
+ LOG.info("handlerId={}: received SHUT_DOWN, closing
connection", handlerId);
+ return;
+ default:
+ String errorMsg = String.format(Locale.ROOT,
+ "handlerId=%d: Unexpected message type %s in
command position",
+ handlerId, msg.type());
+ LOG.error(errorMsg);
+ throw new IllegalStateException(errorMsg);
}
-
- // Validate command byte
- if (request == PipesClient.COMMANDS.ACK.getByte()) {
- String msg = String.format(Locale.ROOT,
- "handlerId=%d: PROTOCOL ERROR - Received ACK when
expecting command", handlerId);
- LOG.error(msg);
- throw new IllegalStateException(msg);
- }
-
- if (request == PipesClient.COMMANDS.PING.getByte()) {
- writeNoAck(PipesClient.COMMANDS.PING.getByte());
- } else if (request ==
PipesClient.COMMANDS.NEW_REQUEST.getByte()) {
- intermediateResult.clear();
- CountDownLatch countDownLatch = new CountDownLatch(1);
-
- FetchEmitTuple fetchEmitTuple = readFetchEmitTuple();
- try {
- validateFetchEmitTuple(fetchEmitTuple);
- ParseContext mergedContext =
resources.createMergedParseContext(fetchEmitTuple.getParseContext());
- ParseContextUtils.resolveAll(mergedContext,
getClass().getClassLoader());
-
- PipesWorker pipesWorker =
createPipesWorker(intermediateResult, fetchEmitTuple,
- mergedContext, countDownLatch);
- executorCompletionService.submit(pipesWorker);
-
- loopUntilDone(fetchEmitTuple, mergedContext,
intermediateResult, countDownLatch);
- } catch (TikaConfigException e) {
- LOG.error("handlerId={}: config error processing
request", handlerId, e);
-
handleCrash(PipesServer.PROCESSING_STATUS.UNSPECIFIED_CRASH,
fetchEmitTuple.getId(), e);
- } catch (Throwable t) {
- LOG.error("handlerId={}: error processing request",
handlerId, t);
- }
- } else if (request ==
PipesClient.COMMANDS.SHUT_DOWN.getByte()) {
- LOG.info("handlerId={}: received SHUT_DOWN, closing
connection", handlerId);
- return;
- } else {
- String msg = String.format(Locale.ROOT,
- "handlerId=%d: Unexpected byte 0x%02x in command
position", handlerId, (byte) request);
- LOG.error(msg);
- throw new IllegalStateException(msg);
- }
- output.flush();
+ } catch (java.io.EOFException e) {
+ // Client disconnected (stream closed)
+ LOG.debug("handlerId={}: client disconnected (EOF)",
handlerId);
+ return;
} catch (SocketException e) {
- // Client disconnected
+ // Client disconnected (socket closed)
LOG.debug("handlerId={}: client disconnected", handlerId);
return;
} catch (IOException e) {
@@ -216,7 +213,7 @@ public class ConnectionHandler implements Runnable,
Closeable {
CountDownLatch countDownLatch) throws
InterruptedException, IOException {
Instant start = Instant.now();
long timeoutMillis = PipesClient.getTimeoutMillis(pipesConfig,
mergedContext);
- long mockProgressCounter = 0;
+ long progressCounter = 1;
boolean wroteIntermediateResult = false;
while (running) {
@@ -224,7 +221,7 @@ public class ConnectionHandler implements Runnable,
Closeable {
if (!wroteIntermediateResult) {
Metadata intermediate = intermediateResult.poll(100,
TimeUnit.MILLISECONDS);
if (intermediate != null) {
- writeIntermediate(intermediate);
+ protocolIO.writeIntermediate(intermediate);
countDownLatch.countDown();
wroteIntermediateResult = true;
}
@@ -237,33 +234,31 @@ public class ConnectionHandler implements Runnable,
Closeable {
try {
pipesResult = future.get();
} catch (OutOfMemoryError e) {
- handleCrash(OOM, fetchEmitTuple.getId(), e);
+ handleCrash(PipesMessageType.OOM, fetchEmitTuple.getId(),
e);
LOG.error("handlerId={}: exiting server due to OOM",
handlerId);
- System.exit(1);
+ System.exit(PipesMessageType.OOM.getExitCode().orElse(18));
} catch (ExecutionException e) {
Throwable t = e.getCause();
LOG.error("handlerId={}: crash processing {}", handlerId,
fetchEmitTuple.getId(), t);
if (t instanceof OutOfMemoryError) {
- handleCrash(OOM, fetchEmitTuple.getId(), t);
+ handleCrash(PipesMessageType.OOM,
fetchEmitTuple.getId(), t);
LOG.error("handlerId={}: exiting server due to OOM",
handlerId);
- System.exit(1);
+
System.exit(PipesMessageType.OOM.getExitCode().orElse(18));
}
-
handleCrash(PipesServer.PROCESSING_STATUS.UNSPECIFIED_CRASH,
fetchEmitTuple.getId(), t);
+ handleCrash(PipesMessageType.UNSPECIFIED_CRASH,
fetchEmitTuple.getId(), t);
return;
}
LOG.debug("handlerId={}: finished task id={} status={}",
handlerId,
fetchEmitTuple.getId(), pipesResult.status());
- write(FINISHED, pipesResult);
+ protocolIO.writeFinished(pipesResult);
return;
}
- // Send heartbeat
+ // Send fire-and-forget heartbeat
long elapsed = System.currentTimeMillis() - start.toEpochMilli();
- if (elapsed > mockProgressCounter * heartbeatIntervalMs) {
- LOG.trace("handlerId={}: still processing, counter={}",
handlerId, mockProgressCounter);
- write(PipesServer.PROCESSING_STATUS.WORKING.getByte());
- output.writeLong(mockProgressCounter++);
- output.flush();
+ if (elapsed > progressCounter * heartbeatIntervalMs) {
+ LOG.trace("handlerId={}: still processing, counter={}",
handlerId, progressCounter);
+ PipesMessage.working(progressCounter++).write(output);
}
// Check timeout
@@ -276,19 +271,17 @@ public class ConnectionHandler implements Runnable,
Closeable {
private void handleTimeout(String id) throws IOException {
LOG.warn("handlerId={}: timeout processing id={}", handlerId, id);
- write(TIMEOUT.getByte());
+ handleCrash(PipesMessageType.TIMEOUT, id,
+ new RuntimeException("Server-side timeout processing " + id));
// Timeout means a parsing thread is stuck - the JVM must be restarted
LOG.error("handlerId={}: exiting server due to timeout", handlerId);
- System.exit(1);
+ System.exit(PipesMessageType.TIMEOUT.getExitCode().orElse(17));
}
- private void handleCrash(PipesServer.PROCESSING_STATUS processingStatus,
String id, Throwable t) {
- LOG.error("handlerId={}: {} processing id={}", handlerId,
processingStatus, id, t);
- String msg = (t != null) ? ExceptionUtils.getStackTrace(t) : "";
+ private void handleCrash(PipesMessageType crashType, String id, Throwable
t) {
+ LOG.error("handlerId={}: {} processing id={}", handlerId, crashType,
id, t);
try {
- byte[] bytes = JsonPipesIpc.toBytes(msg);
- write(processingStatus, bytes);
- // Note: write() already awaits ACKs internally, don't call
awaitAck() again
+ protocolIO.writeCrash(crashType, t);
} catch (IOException e) {
LOG.warn("handlerId={}: problem writing crash info to client",
handlerId, e);
}
@@ -296,96 +289,6 @@ public class ConnectionHandler implements Runnable,
Closeable {
// For other crashes (UNSPECIFIED_CRASH), we just close this connection
}
- private FetchEmitTuple readFetchEmitTuple() throws IOException {
- int length = input.readInt();
- if (length < 0 || length > PipesServer.MAX_FETCH_EMIT_TUPLE_BYTES) {
- throw new IOException("FetchEmitTuple length " + length +
- " exceeds maximum allowed size of " +
PipesServer.MAX_FETCH_EMIT_TUPLE_BYTES + " bytes");
- }
- byte[] bytes = new byte[length];
- input.readFully(bytes);
- return JsonPipesIpc.fromBytes(bytes, FetchEmitTuple.class);
- }
-
- private void validateFetchEmitTuple(FetchEmitTuple fetchEmitTuple) throws
TikaConfigException {
- ParseContext requestContext = fetchEmitTuple.getParseContext();
- if (requestContext == null) {
- return;
- }
- org.apache.tika.pipes.core.extractor.UnpackConfig unpackConfig =
-
requestContext.get(org.apache.tika.pipes.core.extractor.UnpackConfig.class);
- org.apache.tika.pipes.api.ParseMode parseMode =
- requestContext.get(org.apache.tika.pipes.api.ParseMode.class);
-
- if (unpackConfig != null &&
!StringUtils.isBlank(unpackConfig.getEmitter())
- && parseMode != org.apache.tika.pipes.api.ParseMode.UNPACK) {
- throw new TikaConfigException(
- "FetchEmitTuple has UnpackConfig with emitter '" +
unpackConfig.getEmitter() +
- "' but ParseMode is " + parseMode + ". " +
- "To extract embedded bytes, set ParseMode.UNPACK
in the ParseContext.");
- }
- }
-
- private void write(PipesServer.PROCESSING_STATUS processingStatus,
PipesResult pipesResult) {
- try {
- byte[] bytes = JsonPipesIpc.toBytes(pipesResult);
- write(processingStatus, bytes);
- } catch (IOException e) {
- LOG.error("handlerId={}: problem writing emit data", handlerId, e);
- }
- }
-
- private void writeIntermediate(Metadata metadata) {
- try {
- byte[] bytes = JsonPipesIpc.toBytes(metadata);
- write(INTERMEDIATE_RESULT, bytes);
- } catch (IOException e) {
- LOG.error("handlerId={}: problem writing intermediate data",
handlerId, e);
- }
- }
-
- private void awaitAck() throws IOException {
- int b = input.read();
- if (b == ACK.getByte()) {
- return;
- }
- LOG.error("handlerId={}: expected ACK but got byte={}", handlerId,
- HexFormat.of().formatHex(new byte[]{(byte) b}));
- throw new IOException("Expected ACK but got byte=" +
HexFormat.of().formatHex(new byte[]{(byte) b}));
- }
-
- private void writeNoAck(byte b) {
- try {
- output.write(b);
- output.flush();
- } catch (IOException e) {
- LOG.error("handlerId={}: problem writing data", handlerId, e);
- }
- }
-
- private void write(byte b) {
- try {
- output.write(b);
- output.flush();
- awaitAck();
- } catch (IOException e) {
- LOG.error("handlerId={}: problem writing data", handlerId, e);
- }
- }
-
- private void write(PipesServer.PROCESSING_STATUS status, byte[] bytes) {
- try {
- write(status.getByte());
- int len = bytes.length;
- output.writeInt(len);
- output.write(bytes);
- output.flush();
- awaitAck();
- } catch (IOException e) {
- LOG.error("handlerId={}: problem writing data", handlerId, e);
- }
- }
-
@Override
public void close() {
running = false;
diff --git
a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/server/PipesServer.java
b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/server/PipesServer.java
index 66d0b7c333..e69137e5f7 100644
---
a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/server/PipesServer.java
+++
b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/server/PipesServer.java
@@ -16,12 +16,6 @@
*/
package org.apache.tika.pipes.core.server;
-import static org.apache.tika.pipes.core.PipesClient.COMMANDS.ACK;
-import static
org.apache.tika.pipes.core.server.PipesServer.PROCESSING_STATUS.FINISHED;
-import static
org.apache.tika.pipes.core.server.PipesServer.PROCESSING_STATUS.INTERMEDIATE_RESULT;
-import static
org.apache.tika.pipes.core.server.PipesServer.PROCESSING_STATUS.OOM;
-import static
org.apache.tika.pipes.core.server.PipesServer.PROCESSING_STATUS.TIMEOUT;
-
import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.DataInputStream;
@@ -64,7 +58,6 @@ import org.apache.tika.parser.AutoDetectParser;
import org.apache.tika.parser.ParseContext;
import org.apache.tika.parser.RecursiveParserWrapper;
import org.apache.tika.pipes.api.FetchEmitTuple;
-import org.apache.tika.pipes.api.ParseMode;
import org.apache.tika.pipes.api.PipesResult;
import org.apache.tika.pipes.core.EmitStrategy;
import org.apache.tika.pipes.core.EmitStrategyConfig;
@@ -73,16 +66,17 @@ import org.apache.tika.pipes.core.PipesConfig;
import org.apache.tika.pipes.core.config.ConfigStore;
import org.apache.tika.pipes.core.config.ConfigStoreFactory;
import org.apache.tika.pipes.core.emitter.EmitterManager;
-import org.apache.tika.pipes.core.extractor.UnpackConfig;
import org.apache.tika.pipes.core.extractor.UnpackExtractorFactory;
import org.apache.tika.pipes.core.fetcher.FetcherManager;
+import org.apache.tika.pipes.core.protocol.PipesMessage;
+import org.apache.tika.pipes.core.protocol.PipesMessageType;
+import org.apache.tika.pipes.core.protocol.ShutDownReceivedException;
import org.apache.tika.pipes.core.serialization.JsonPipesIpc;
import org.apache.tika.plugins.ExtensionConfig;
import org.apache.tika.plugins.TikaPluginManager;
import org.apache.tika.sax.ContentHandlerFactory;
import org.apache.tika.serialization.ParseContextUtils;
import org.apache.tika.utils.ExceptionUtils;
-import org.apache.tika.utils.StringUtils;
/**
* This server is forked from the PipesClient. This class isolates
@@ -97,60 +91,14 @@ public class PipesServer implements AutoCloseable {
private static final Logger LOG =
LoggerFactory.getLogger(PipesServer.class);
public static final int AUTH_TOKEN_LENGTH_BYTES = 32;
- public static final int MAX_FETCH_EMIT_TUPLE_BYTES = 100 * 1024 * 1024; //
100MB
private final long heartbeatIntervalMs;
private final String pipesClientId;
- //this has to be some number not close to 0-3
- //it looks like the server crashes with exit value 3 on uncaught OOM, for
example
- public static final int TIMEOUT_EXIT_CODE = 17;
- public static final int OOM_EXIT_CODE = 18;
- public static final int UNSPECIFIED_CRASH_EXIT_CODE = 19;
-
-
- public enum PROCESSING_STATUS {
- READY, INTERMEDIATE_RESULT, WORKING, FINISHED,
- OOM(OOM_EXIT_CODE), TIMEOUT(TIMEOUT_EXIT_CODE),
UNSPECIFIED_CRASH(UNSPECIFIED_CRASH_EXIT_CODE);
-
- int exitCode = -1;
- public static PROCESSING_STATUS lookup(int b) {
- if (b < 1) {
- throw new IllegalArgumentException("bad result value: " + b);
- }
- int ordinal = b - 1;
- if (ordinal >= PROCESSING_STATUS.values().length) {
- throw new IllegalArgumentException("ordinal > than array
length? " + ordinal);
- }
- return PROCESSING_STATUS.values()[ordinal];
- }
- PROCESSING_STATUS() {
-
- }
-
- PROCESSING_STATUS(int exitCode) {
- this.exitCode = exitCode;
- }
-
- public int getExitCode() {
- return exitCode;
- }
-
- public byte getByte() {
- return (byte) (ordinal() + 1);
- }
- }
-
private Detector detector;
-
-
- private final Object[] lock = new Object[0];
private final DataInputStream input;
private final DataOutputStream output;
- //if an extract is larger than this value, emit it directly;
- //if it is smaller than this value, write it back to the
- //PipesClient so that it can cache the extracts and then batch emit.
private final TikaLoader tikaLoader;
private final PipesConfig pipesConfig;
@@ -166,6 +114,7 @@ public class PipesServer implements AutoCloseable {
private final ExecutorService executorService =
Executors.newSingleThreadExecutor();
private final ExecutorCompletionService<PipesResult>
executorCompletionService = new ExecutorCompletionService<>(executorService);
private final EmitStrategy emitStrategy;
+ private final ServerProtocolIO protocolIO;
public static PipesServer load(int port, Path tikaConfigPath) throws
Exception {
String pipesClientId = System.getProperty("pipesClientId",
"unknown");
@@ -195,23 +144,12 @@ public class PipesServer implements AutoCloseable {
} catch (Exception e) {
LOG.error("Failed to start up", e);
try {
- // Write FINISHED status byte and await ACK
- dos.writeByte(FINISHED.getByte());
- dos.flush();
- int ack = dis.read();
- if (ack != PipesClient.COMMANDS.ACK.getByte()) {
- LOG.warn("Expected ACK but got: {}", ack);
- }
-
- // Write error message and await ACK
String msg = ExceptionUtils.getStackTrace(e);
byte[] bytes = msg.getBytes(StandardCharsets.UTF_8);
- dos.writeInt(bytes.length);
- dos.write(bytes);
- dos.flush();
- ack = dis.read();
- if (ack != PipesClient.COMMANDS.ACK.getByte()) {
- LOG.warn("Expected ACK but got: {}", ack);
+ PipesMessage.startupFailed(bytes).write(dos);
+ PipesMessage ackMsg = PipesMessage.read(dis);
+ if (ackMsg.type() != PipesMessageType.ACK) {
+ LOG.warn("Expected ACK but got: {}", ackMsg.type());
}
} catch (IOException ioException) {
LOG.error("Failed to send startup failure message to client",
ioException);
@@ -250,6 +188,7 @@ public class PipesServer implements AutoCloseable {
}
emitStrategy = pipesConfig.getEmitStrategy().getType();
+ this.protocolIO = new ServerProtocolIO(input, output);
}
@@ -365,84 +304,73 @@ public class PipesServer implements AutoCloseable {
}
public void mainLoop() {
- write(PROCESSING_STATUS.READY.getByte());
+ try {
+ PipesMessage.ready().write(output);
+ } catch (IOException e) {
+ LOG.error("pipesClientId={}: failed to send READY", pipesClientId,
e);
+ exit(PipesMessageType.UNSPECIFIED_CRASH.getExitCode().orElse(19));
+ return;
+ }
LOG.debug("pipesClientId={}: sent READY, entering main loop",
pipesClientId);
ArrayBlockingQueue<Metadata> intermediateResult = new
ArrayBlockingQueue<>(1);
//main loop
try {
- long start = System.currentTimeMillis();
while (true) {
- int request = input.read();
- LOG.trace("pipesClientId={}: received command byte={}",
pipesClientId, HexFormat.of().formatHex(new byte[]{(byte)request}));
- if (request == -1) {
- LOG.debug("received -1 from client; shutting down");
- exit(0);
- }
-
- // Validate that we received a command byte, not a status/ACK
byte
- if (request == PipesClient.COMMANDS.ACK.getByte()) {
- String msg = String.format(Locale.ROOT,
- "pipesClientId=%s: PROTOCOL ERROR - Received ACK
(byte=0x%02x) when expecting a command. " +
- "This indicates a protocol synchronization issue
where the server missed consuming an ACK. " +
- "Valid commands are: PING(0x%02x),
NEW_REQUEST(0x%02x), SHUT_DOWN(0x%02x). " +
- "This is likely a bug in the server's message
handling - check that all status messages " +
- "that trigger client ACKs are properly awaiting
those ACKs.",
- pipesClientId, (byte)request,
- PipesClient.COMMANDS.PING.getByte(),
- PipesClient.COMMANDS.NEW_REQUEST.getByte(),
- PipesClient.COMMANDS.SHUT_DOWN.getByte());
- LOG.error(msg);
- throw new IllegalStateException(msg);
- }
-
- if (request == PipesClient.COMMANDS.PING.getByte()) {
- writeNoAck(PipesClient.COMMANDS.PING.getByte());
- } else if (request ==
PipesClient.COMMANDS.NEW_REQUEST.getByte()) {
- intermediateResult.clear();
- CountDownLatch countDownLatch = new CountDownLatch(1);
-
- FetchEmitTuple fetchEmitTuple = readFetchEmitTuple();
- // Validate before merging with global config
- validateFetchEmitTuple(fetchEmitTuple);
- // Create merged ParseContext: defaults from tika-config +
request overrides
- ParseContext mergedContext =
createMergedParseContext(fetchEmitTuple.getParseContext());
- // Resolve friendly-named configs in ParseContext to
actual objects
- ParseContextUtils.resolveAll(mergedContext,
getClass().getClassLoader());
-
- PipesWorker pipesWorker =
getPipesWorker(intermediateResult, fetchEmitTuple, mergedContext,
countDownLatch);
- executorCompletionService.submit(pipesWorker);
- //set progress counter
- try {
- loopUntilDone(fetchEmitTuple, mergedContext,
executorCompletionService, intermediateResult, countDownLatch);
- } catch (Throwable t) {
- LOG.error("Serious problem: {}",
HexFormat.of().formatHex(new byte[]{(byte)request}), t);
- }
- } else if (request ==
PipesClient.COMMANDS.SHUT_DOWN.getByte()) {
- LOG.debug("shutting down");
- try {
- close();
- } catch (Exception e) {
- //swallow
- }
- System.exit(0);
- } else {
- String msg = String.format(Locale.ROOT,
- "pipesClientId=%s: Unexpected byte 0x%02x in
command position. " +
- "Expected one of: PING(0x%02x), ACK(0x%02x),
NEW_REQUEST(0x%02x), SHUT_DOWN(0x%02x)",
- pipesClientId, (byte)request,
- PipesClient.COMMANDS.PING.getByte(),
- PipesClient.COMMANDS.ACK.getByte(),
- PipesClient.COMMANDS.NEW_REQUEST.getByte(),
- PipesClient.COMMANDS.SHUT_DOWN.getByte());
- LOG.error(msg);
- throw new IllegalStateException(msg);
+ PipesMessage msg = PipesMessage.read(input);
+ LOG.trace("pipesClientId={}: received message type={}",
pipesClientId, msg.type());
+
+ switch (msg.type()) {
+ case PING:
+ PipesMessage.ping().write(output);
+ break;
+ case NEW_REQUEST:
+ intermediateResult.clear();
+ CountDownLatch countDownLatch = new CountDownLatch(1);
+
+ FetchEmitTuple fetchEmitTuple;
+ try {
+ fetchEmitTuple =
JsonPipesIpc.fromBytes(msg.payload(), FetchEmitTuple.class);
+ } catch (IOException e) {
+ LOG.error("problem deserializing FetchEmitTuple",
e);
+ handleCrash(PipesMessageType.UNSPECIFIED_CRASH,
"unknown", e);
+ break; // unreachable after handleCrash/exit, but
needed for compilation
+ }
+ // Validate before merging with global config
+
ServerProtocolIO.validateFetchEmitTuple(fetchEmitTuple);
+ // Create merged ParseContext: defaults from
tika-config + request overrides
+ ParseContext mergedContext =
createMergedParseContext(fetchEmitTuple.getParseContext());
+ // Resolve friendly-named configs in ParseContext to
actual objects
+ ParseContextUtils.resolveAll(mergedContext,
getClass().getClassLoader());
+
+ PipesWorker pipesWorker =
getPipesWorker(intermediateResult, fetchEmitTuple, mergedContext,
countDownLatch);
+ executorCompletionService.submit(pipesWorker);
+ try {
+ loopUntilDone(fetchEmitTuple, mergedContext,
executorCompletionService, intermediateResult, countDownLatch);
+ } catch (Throwable t) {
+ LOG.error("Serious problem processing request", t);
+ }
+ break;
+ case SHUT_DOWN:
+ LOG.debug("shutting down");
+ try {
+ close();
+ } catch (Exception e) {
+ //swallow
+ }
+ System.exit(0);
+ break;
+ default:
+ String errorMsg = String.format(Locale.ROOT,
+ "pipesClientId=%s: Unexpected message type %s
in command position",
+ pipesClientId, msg.type());
+ LOG.error(errorMsg);
+ throw new IllegalStateException(errorMsg);
}
- output.flush();
}
} catch (Throwable t) {
LOG.error("main loop error (did the forking process shut down?)",
t);
- exit(1);
+ exit(PipesMessageType.UNSPECIFIED_CRASH.getExitCode().orElse(19));
}
}
@@ -463,7 +391,7 @@ public class PipesServer implements AutoCloseable {
ArrayBlockingQueue<Metadata>
intermediateResult, CountDownLatch countDownLatch) throws InterruptedException,
IOException {
Instant start = Instant.now();
long timeoutMillis = PipesClient.getTimeoutMillis(pipesConfig,
mergedContext);
- long mockProgressCounter = 0;
+ long progressCounter = 1;
boolean wroteIntermediateResult = false;
while (true) {
@@ -484,54 +412,54 @@ public class PipesServer implements AutoCloseable {
try {
pipesResult = future.get();
} catch (OutOfMemoryError e) {
- handleCrash(OOM, fetchEmitTuple.getId(), e);
+ handleCrash(PipesMessageType.OOM, fetchEmitTuple.getId(),
e);
+ return; // handleCrash calls exit(), but guard against
unexpected return
} catch (ExecutionException e) {
Throwable t = e.getCause();
LOG.error("crash: {}", fetchEmitTuple.getId(), t);
if (t instanceof OutOfMemoryError) {
- handleCrash(OOM, fetchEmitTuple.getId(), t);
+ handleCrash(PipesMessageType.OOM,
fetchEmitTuple.getId(), t);
+ return;
}
- handleCrash(PROCESSING_STATUS.UNSPECIFIED_CRASH,
fetchEmitTuple.getId(), t);
+ handleCrash(PipesMessageType.UNSPECIFIED_CRASH,
fetchEmitTuple.getId(), t);
+ return;
}
LOG.debug("executor completionService finished task: id={}
status={}", fetchEmitTuple.getId(), pipesResult.status());
- write(FINISHED, pipesResult);
+ writeFinished(pipesResult);
return;
}
- // Send heartbeat if we've waited long enough
+ // Send fire-and-forget heartbeat if we've waited long enough
long elapsed = System.currentTimeMillis() - start.toEpochMilli();
- if (elapsed > mockProgressCounter * heartbeatIntervalMs) {
- LOG.debug("still processing: {}", mockProgressCounter);
- write(PROCESSING_STATUS.WORKING.getByte());
- output.writeLong(mockProgressCounter++);
- output.flush();
+ if (elapsed > progressCounter * heartbeatIntervalMs) {
+ LOG.debug("still processing: {}", progressCounter);
+ PipesMessage.working(progressCounter++).write(output);
}
- checkTimeout(start, timeoutMillis);
+ if (checkTimeout(start, timeoutMillis, fetchEmitTuple.getId())) {
+ return; // handleCrash calls exit(), but guard against
unexpected return
+ }
}
}
- private void checkTimeout(Instant start, long timeoutMillis) throws
IOException {
-
+ private boolean checkTimeout(Instant start, long timeoutMillis, String id)
{
if (Duration.between(start, Instant.now()).toMillis() > timeoutMillis)
{
- write(TIMEOUT.getByte());
- exit(TIMEOUT_EXIT_CODE);
+ handleCrash(PipesMessageType.TIMEOUT, id,
+ new RuntimeException("Server-side timeout after " +
timeoutMillis + "ms"));
+ return true;
}
+ return false;
}
- private void handleCrash(PROCESSING_STATUS processingStatus, String id,
Throwable t) {
- LOG.error("{}: {}", processingStatus, id, t);
- String msg = (t != null) ? ExceptionUtils.getStackTrace(t) : "";
+ private void handleCrash(PipesMessageType crashType, String id, Throwable
t) {
+ LOG.error("{}: {}", crashType, id, t);
try {
- byte[] bytes = JsonPipesIpc.toBytes(msg);
- write(processingStatus, bytes);
- awaitAck();
+ protocolIO.writeCrash(crashType, t);
} catch (IOException e) {
- //swallow
LOG.warn("problem writing crash info to client", e);
}
- exit(processingStatus.getExitCode());
+ exit(crashType.getExitCode().orElse(19));
}
@@ -550,48 +478,6 @@ public class PipesServer implements AutoCloseable {
System.exit(exitCode);
}
-
- private FetchEmitTuple readFetchEmitTuple() {
- try {
- int length = input.readInt();
- if (length < 0 || length > MAX_FETCH_EMIT_TUPLE_BYTES) {
- throw new IOException("FetchEmitTuple length " + length +
- " exceeds maximum allowed size of " +
MAX_FETCH_EMIT_TUPLE_BYTES + " bytes");
- }
- byte[] bytes = new byte[length];
- input.readFully(bytes);
- return JsonPipesIpc.fromBytes(bytes, FetchEmitTuple.class);
- } catch (IOException e) {
- LOG.error("problem reading/deserializing FetchEmitTuple", e);
- handleCrash(PROCESSING_STATUS.UNSPECIFIED_CRASH, "unknown", e);
- }
- //unreachable - handleCrash calls exit
- return null;
- }
-
- /**
- * Validates the FetchEmitTuple before merging with global config.
- * If the tuple explicitly sets UnpackConfig with an emitter but ParseMode
is not UNPACK,
- * that's a configuration error.
- */
- private void validateFetchEmitTuple(FetchEmitTuple fetchEmitTuple) throws
TikaConfigException {
- ParseContext requestContext = fetchEmitTuple.getParseContext();
- if (requestContext == null) {
- return;
- }
- UnpackConfig unpackConfig = requestContext.get(UnpackConfig.class);
- ParseMode parseMode = requestContext.get(ParseMode.class);
-
- // If tuple explicitly has UnpackConfig with emitter but not UNPACK
mode, that's an error
- if (unpackConfig != null &&
!StringUtils.isBlank(unpackConfig.getEmitter())
- && parseMode != ParseMode.UNPACK) {
- throw new TikaConfigException(
- "FetchEmitTuple has UnpackConfig with emitter '" +
unpackConfig.getEmitter() +
- "' but ParseMode is " + parseMode + ". " +
- "To extract embedded bytes, set ParseMode.UNPACK in the
ParseContext.");
- }
- }
-
protected void initializeResources() throws TikaException, IOException,
SAXException {
TikaJsonConfig tikaJsonConfig = tikaLoader.getConfig();
@@ -634,84 +520,50 @@ public class PipesServer implements AutoCloseable {
private ConfigStore createConfigStore(PipesConfig pipesConfig,
TikaPluginManager tikaPluginManager) throws TikaException {
String configStoreType = pipesConfig.getConfigStoreType();
String configStoreParams = pipesConfig.getConfigStoreParams();
-
+
if (configStoreType == null || "memory".equals(configStoreType)) {
// Use default in-memory store (no persistence)
return null;
}
-
+
ExtensionConfig storeConfig = new ExtensionConfig(
configStoreType, configStoreType, configStoreParams);
-
+
return ConfigStoreFactory.createConfigStore(
tikaPluginManager,
configStoreType,
storeConfig);
}
-
- private void write(PROCESSING_STATUS processingStatus, PipesResult
pipesResult) {
+ private void writeFinished(PipesResult pipesResult) {
try {
- byte[] bytes = JsonPipesIpc.toBytes(pipesResult);
- write(processingStatus, bytes);
+ protocolIO.writeFinished(pipesResult);
+ } catch (ShutDownReceivedException e) {
+ handleShutDown();
} catch (IOException e) {
LOG.error("problem writing emit data (forking process shutdown?)",
e);
- exit(1);
+ exit(PipesMessageType.UNSPECIFIED_CRASH.getExitCode().orElse(19));
}
}
private void writeIntermediate(Metadata metadata) {
try {
- byte[] bytes = JsonPipesIpc.toBytes(metadata);
- write(INTERMEDIATE_RESULT, bytes);
+ protocolIO.writeIntermediate(metadata);
+ } catch (ShutDownReceivedException e) {
+ handleShutDown();
} catch (IOException e) {
LOG.error("problem writing intermediate data (forking process
shutdown?)", e);
- exit(1);
- }
- }
-
- private void awaitAck() throws IOException {
- int b = input.read();
- if (b == ACK.getByte()) {
- return;
- }
- LOG.error("pipesClientId={}: expected ACK but got byte={}",
pipesClientId, HexFormat.of().formatHex(new byte[]{ (byte) b}));
- throw new IOException("Wasn't expecting byte=" +
HexFormat.of().formatHex(new byte[]{ (byte) b}));
- }
-
- private void writeNoAck(byte b) {
- try {
- output.write(b);
- output.flush();
- } catch (IOException e) {
- LOG.error("problem writing data (forking process shutdown?)", e);
- exit(1);
+ exit(PipesMessageType.UNSPECIFIED_CRASH.getExitCode().orElse(19));
}
}
- private void write(byte b) {
+ private void handleShutDown() {
+ LOG.info("pipesClientId={}: received SHUT_DOWN, shutting down
gracefully", pipesClientId);
try {
- output.write(b);
- output.flush();
- awaitAck();
- } catch (IOException e) {
- LOG.error("pipesClientId={}: problem writing data (forking process
shutdown?)", pipesClientId, e);
- exit(1);
- }
- }
-
-
- private void write(PROCESSING_STATUS status, byte[] bytes) {
- try {
- write(status.getByte());
- int len = bytes.length;
- output.writeInt(len);
- output.write(bytes);
- output.flush();
- awaitAck();
- } catch (IOException e) {
- LOG.error("problem writing data (forking process shutdown?)", e);
- exit(1);
+ close();
+ } catch (Exception e) {
+ //swallow
}
+ exit(0);
}
}
diff --git
a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/server/ServerProtocolIO.java
b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/server/ServerProtocolIO.java
new file mode 100644
index 0000000000..3d71f87457
--- /dev/null
+++
b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/server/ServerProtocolIO.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tika.pipes.core.server;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+import org.apache.tika.exception.TikaConfigException;
+import org.apache.tika.metadata.Metadata;
+import org.apache.tika.parser.ParseContext;
+import org.apache.tika.pipes.api.FetchEmitTuple;
+import org.apache.tika.pipes.api.ParseMode;
+import org.apache.tika.pipes.api.PipesResult;
+import org.apache.tika.pipes.core.extractor.UnpackConfig;
+import org.apache.tika.pipes.core.protocol.PipesMessage;
+import org.apache.tika.pipes.core.protocol.PipesMessageType;
+import org.apache.tika.pipes.core.protocol.ShutDownReceivedException;
+import org.apache.tika.pipes.core.serialization.JsonPipesIpc;
+import org.apache.tika.utils.ExceptionUtils;
+import org.apache.tika.utils.StringUtils;
+
+/**
+ * Centralizes protocol I/O operations shared by {@link PipesServer} and
+ * {@link ConnectionHandler}.
+ * <p>
+ * This class handles the pure protocol mechanics — serialization, framing,
+ * and ACK exchange. It does <b>not</b> make lifecycle decisions (exit vs.
+ * return, close connection vs. shut down JVM). Callers are responsible for
+ * catching exceptions and responding according to their own lifecycle policy.
+ */
+public class ServerProtocolIO {
+
+ private final DataInputStream input;
+ private final DataOutputStream output;
+
+ public ServerProtocolIO(DataInputStream input, DataOutputStream output) {
+ this.input = input;
+ this.output = output;
+ }
+
+ /**
+ * Writes a FINISHED message with the serialized result and waits for ACK.
+ *
+ * @throws ShutDownReceivedException if SHUT_DOWN is received instead of
ACK
+ * @throws IOException on serialization or I/O errors
+ */
+ public void writeFinished(PipesResult pipesResult) throws IOException {
+ byte[] bytes = JsonPipesIpc.toBytes(pipesResult);
+ PipesMessage.finished(bytes).write(output);
+ awaitAck();
+ }
+
+ /**
+ * Writes an INTERMEDIATE_RESULT message with the serialized metadata and
waits for ACK.
+ *
+ * @throws ShutDownReceivedException if SHUT_DOWN is received instead of
ACK
+ * @throws IOException on serialization or I/O errors
+ */
+ public void writeIntermediate(Metadata metadata) throws IOException {
+ byte[] bytes = JsonPipesIpc.toBytes(metadata);
+ PipesMessage.intermediateResult(bytes).write(output);
+ awaitAck();
+ }
+
+ /**
+ * Writes a crash message (OOM, TIMEOUT, or UNSPECIFIED_CRASH) with the
+ * serialized stack trace and waits for ACK.
+ *
+ * @throws IOException on serialization, I/O, or unexpected ACK response
+ */
+ public void writeCrash(PipesMessageType crashType, Throwable t) throws
IOException {
+ String msg = (t != null) ? ExceptionUtils.getStackTrace(t) : "";
+ byte[] bytes = JsonPipesIpc.toBytes(msg);
+ PipesMessage.crash(crashType, bytes).write(output);
+ awaitAck();
+ }
+
+ /**
+ * Reads a framed message and verifies it is an ACK.
+ *
+ * @throws ShutDownReceivedException if the message is SHUT_DOWN
+ * @throws IOException if the message is any other non-ACK type, or on I/O
error
+ */
+ public void awaitAck() throws IOException {
+ PipesMessage msg = PipesMessage.read(input);
+ if (msg.type() == PipesMessageType.ACK) {
+ return;
+ }
+ if (msg.type() == PipesMessageType.SHUT_DOWN) {
+ throw new ShutDownReceivedException();
+ }
+ throw new IOException("Expected ACK but got " + msg.type());
+ }
+
+ /**
+ * Validates that a FetchEmitTuple's configuration is consistent.
+ * <p>
+ * If the tuple has an UnpackConfig with an emitter but ParseMode is not
UNPACK,
+ * that's a configuration error.
+ */
+ public static void validateFetchEmitTuple(FetchEmitTuple fetchEmitTuple)
+ throws TikaConfigException {
+ ParseContext requestContext = fetchEmitTuple.getParseContext();
+ if (requestContext == null) {
+ return;
+ }
+ UnpackConfig unpackConfig = requestContext.get(UnpackConfig.class);
+ ParseMode parseMode = requestContext.get(ParseMode.class);
+
+ if (unpackConfig != null &&
!StringUtils.isBlank(unpackConfig.getEmitter())
+ && parseMode != ParseMode.UNPACK) {
+ throw new TikaConfigException(
+ "FetchEmitTuple has UnpackConfig with emitter '" +
unpackConfig.getEmitter() +
+ "' but ParseMode is " + parseMode + ". " +
+ "To extract embedded bytes, set ParseMode.UNPACK
in the ParseContext.");
+ }
+ }
+}
diff --git
a/tika-pipes/tika-pipes-core/src/test/java/org/apache/tika/pipes/core/protocol/PipesMessageTest.java
b/tika-pipes/tika-pipes-core/src/test/java/org/apache/tika/pipes/core/protocol/PipesMessageTest.java
new file mode 100644
index 0000000000..372cf49bee
--- /dev/null
+++
b/tika-pipes/tika-pipes-core/src/test/java/org/apache/tika/pipes/core/protocol/PipesMessageTest.java
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tika.pipes.core.protocol;
+
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.EOFException;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+
+import org.junit.jupiter.api.Test;
+
+class PipesMessageTest {
+
+ @Test
+ void testRoundTripEmptyPayload() throws IOException {
+ for (PipesMessageType type : new PipesMessageType[]{
+ PipesMessageType.PING, PipesMessageType.ACK,
+ PipesMessageType.READY, PipesMessageType.SHUT_DOWN}) {
+ PipesMessage original = new PipesMessage(type, new byte[0]);
+ PipesMessage roundTripped = roundTrip(original);
+ assertEquals(type, roundTripped.type());
+ assertEquals(0, roundTripped.payload().length);
+ }
+ }
+
+ @Test
+ void testRoundTripWithPayload() throws IOException {
+ byte[] payload = "hello world".getBytes(StandardCharsets.UTF_8);
+ PipesMessage original = PipesMessage.finished(payload);
+ PipesMessage roundTripped = roundTrip(original);
+ assertEquals(PipesMessageType.FINISHED, roundTripped.type());
+ assertArrayEquals(payload, roundTripped.payload());
+ }
+
+ @Test
+ void testRoundTripAllTypes() throws IOException {
+ byte[] payload = "test".getBytes(StandardCharsets.UTF_8);
+ for (PipesMessageType type : PipesMessageType.values()) {
+ PipesMessage original = new PipesMessage(type, payload);
+ PipesMessage roundTripped = roundTrip(original);
+ assertEquals(type, roundTripped.type());
+ assertArrayEquals(payload, roundTripped.payload());
+ }
+ }
+
+ @Test
+ void testWorkingMessageRoundTrip() throws IOException {
+ PipesMessage original = PipesMessage.working(42L);
+ PipesMessage roundTripped = roundTrip(original);
+ assertEquals(PipesMessageType.WORKING, roundTripped.type());
+ assertEquals(42L, roundTripped.progressCounter());
+ }
+
+ @Test
+ void testDesyncDetectionBadMagic() {
+ byte[] bad = new byte[]{0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x00};
+ assertThrows(ProtocolDesyncException.class, () ->
+ PipesMessage.read(new DataInputStream(new
ByteArrayInputStream(bad))));
+ }
+
+ @Test
+ void testDesyncDetectionPartialMagic() {
+ // First byte correct, second wrong
+ byte[] bad = new byte[]{0x54, 0x00, 0x01, 0x00, 0x00, 0x00, 0x00};
+ assertThrows(ProtocolDesyncException.class, () ->
+ PipesMessage.read(new DataInputStream(new
ByteArrayInputStream(bad))));
+ }
+
+ @Test
+ void testEofBeforeMagic() {
+ byte[] empty = new byte[0];
+ assertThrows(EOFException.class, () ->
+ PipesMessage.read(new DataInputStream(new
ByteArrayInputStream(empty))));
+ }
+
+ @Test
+ void testEofAfterFirstMagicByte() {
+ byte[] partial = new byte[]{0x54};
+ assertThrows(EOFException.class, () ->
+ PipesMessage.read(new DataInputStream(new
ByteArrayInputStream(partial))));
+ }
+
+ @Test
+ void testNegativePayloadLength() throws IOException {
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ DataOutputStream dos = new DataOutputStream(baos);
+ dos.write(PipesMessage.MAGIC_0);
+ dos.write(PipesMessage.MAGIC_1);
+ dos.write(PipesMessageType.FINISHED.getByte());
+ dos.writeInt(-1); // negative length
+ dos.flush();
+
+ assertThrows(IOException.class, () ->
+ PipesMessage.read(new DataInputStream(new
ByteArrayInputStream(baos.toByteArray()))));
+ }
+
+ @Test
+ void testOversizedPayloadRejection() throws IOException {
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ DataOutputStream dos = new DataOutputStream(baos);
+ dos.write(PipesMessage.MAGIC_0);
+ dos.write(PipesMessage.MAGIC_1);
+ dos.write(PipesMessageType.FINISHED.getByte());
+ dos.writeInt(PipesMessage.MAX_PAYLOAD_BYTES + 1);
+ dos.flush();
+
+ assertThrows(IOException.class, () ->
+ PipesMessage.read(new DataInputStream(new
ByteArrayInputStream(baos.toByteArray()))));
+ }
+
+ @Test
+ void testRequiresAckAssertions() {
+ assertFalse(PipesMessageType.PING.requiresAck());
+ assertFalse(PipesMessageType.ACK.requiresAck());
+ assertFalse(PipesMessageType.NEW_REQUEST.requiresAck());
+ assertFalse(PipesMessageType.SHUT_DOWN.requiresAck());
+ assertFalse(PipesMessageType.READY.requiresAck());
+ assertFalse(PipesMessageType.WORKING.requiresAck());
+
+ assertTrue(PipesMessageType.STARTUP_FAILED.requiresAck());
+ assertTrue(PipesMessageType.INTERMEDIATE_RESULT.requiresAck());
+ assertTrue(PipesMessageType.FINISHED.requiresAck());
+ assertTrue(PipesMessageType.OOM.requiresAck());
+ assertTrue(PipesMessageType.TIMEOUT.requiresAck());
+ assertTrue(PipesMessageType.UNSPECIFIED_CRASH.requiresAck());
+ }
+
+ @Test
+ void testGetByteAndLookupInverse() {
+ for (PipesMessageType type : PipesMessageType.values()) {
+ byte b = type.getByte();
+ PipesMessageType looked = PipesMessageType.lookup(b);
+ assertEquals(type, looked, "lookup(getByte()) failed for " + type);
+ }
+ }
+
+ @Test
+ void testLookupUnknownByte() {
+ assertThrows(IllegalArgumentException.class, () ->
PipesMessageType.lookup(0xFF));
+ assertThrows(IllegalArgumentException.class, () ->
PipesMessageType.lookup(0x00));
+ }
+
+ @Test
+ void testExitCodes() {
+ assertTrue(PipesMessageType.OOM.getExitCode().isPresent());
+ assertEquals(18, PipesMessageType.OOM.getExitCode().getAsInt());
+
+ assertTrue(PipesMessageType.TIMEOUT.getExitCode().isPresent());
+ assertEquals(17, PipesMessageType.TIMEOUT.getExitCode().getAsInt());
+
+
assertTrue(PipesMessageType.UNSPECIFIED_CRASH.getExitCode().isPresent());
+ assertEquals(19,
PipesMessageType.UNSPECIFIED_CRASH.getExitCode().getAsInt());
+
+ assertFalse(PipesMessageType.PING.getExitCode().isPresent());
+ assertFalse(PipesMessageType.FINISHED.getExitCode().isPresent());
+ assertFalse(PipesMessageType.READY.getExitCode().isPresent());
+ }
+
+ @Test
+ void testConvenienceFactories() throws IOException {
+ assertEquals(PipesMessageType.PING,
roundTrip(PipesMessage.ping()).type());
+ assertEquals(PipesMessageType.ACK,
roundTrip(PipesMessage.ack()).type());
+ assertEquals(PipesMessageType.READY,
roundTrip(PipesMessage.ready()).type());
+ assertEquals(PipesMessageType.SHUT_DOWN,
roundTrip(PipesMessage.shutDown()).type());
+
+ byte[] data = "test".getBytes(StandardCharsets.UTF_8);
+ assertEquals(PipesMessageType.NEW_REQUEST,
roundTrip(PipesMessage.newRequest(data)).type());
+ assertEquals(PipesMessageType.FINISHED,
roundTrip(PipesMessage.finished(data)).type());
+ assertEquals(PipesMessageType.INTERMEDIATE_RESULT,
roundTrip(PipesMessage.intermediateResult(data)).type());
+ assertEquals(PipesMessageType.STARTUP_FAILED,
roundTrip(PipesMessage.startupFailed(data)).type());
+ assertEquals(PipesMessageType.OOM,
roundTrip(PipesMessage.crash(PipesMessageType.OOM, data)).type());
+ }
+
+ private PipesMessage roundTrip(PipesMessage msg) throws IOException {
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ msg.write(new DataOutputStream(baos));
+ return PipesMessage.read(new DataInputStream(new
ByteArrayInputStream(baos.toByteArray())));
+ }
+}
diff --git
a/tika-pipes/tika-pipes-integration-tests/src/test/java/org/apache/tika/pipes/core/PipesClientTest.java
b/tika-pipes/tika-pipes-integration-tests/src/test/java/org/apache/tika/pipes/core/PipesClientTest.java
index 5cef444687..a84edf5275 100644
---
a/tika-pipes/tika-pipes-integration-tests/src/test/java/org/apache/tika/pipes/core/PipesClientTest.java
+++
b/tika-pipes/tika-pipes-integration-tests/src/test/java/org/apache/tika/pipes/core/PipesClientTest.java
@@ -294,7 +294,8 @@ public class PipesClientTest {
Assertions.assertNotNull(pipesResult.message(), "Should have error
message");
assertTrue(pipesResult.message().contains("exit code") ||
pipesResult.message().contains("JVM") ||
- pipesResult.message().contains("Process failed"),
+ pipesResult.message().contains("Process failed") ||
+ pipesResult.message().contains("couldn't connect
to server"),
"Error message should indicate process failure: " +
pipesResult.message());
}
}
@@ -353,7 +354,9 @@ public class PipesClientTest {
// Should have error message about the crash
Assertions.assertNotNull(pipesResult.message(), "Should have error
message");
assertTrue(pipesResult.message().contains("problem reading
response") |
- pipesResult.message().contains("SocketException"),
+ pipesResult.message().contains("SocketException") |
+ pipesResult.message().contains("EOFException") |
+ pipesResult.message().contains("Stream closed"),
"Error message should mention the detection crash: " +
pipesResult.message());
// Note: Because crash happens during pre-parse (before
intermediate result is sent),
@@ -779,6 +782,60 @@ public class PipesClientTest {
"User filter should take priority over CONTENT_ONLY filter");
}
+ @Test
+ public void testRecoveryAfterServerCrash(@TempDir Path tmp) throws
Exception {
+ // Test that after a server crash (System.exit), the client can recover
+ // and successfully process the next document.
+ // This exercises the full crash → restart → reconnect path.
+ Path inputDir = tmp.resolve("input");
+ Files.createDirectories(inputDir);
+
+ // Create a mock file that will crash the server
+ String crashContent = "<?xml version=\"1.0\" encoding=\"UTF-8\" ?>" +
"<mock>" +
+ "<metadata action=\"add\" name=\"dc:creator\">Crash
Test</metadata>" +
+ "<write element=\"p\">content before crash</write>" +
+ "<system_exit/>" + "</mock>";
+ String crashFile = "mock-crash.xml";
+ Files.write(inputDir.resolve(crashFile),
crashContent.getBytes(StandardCharsets.UTF_8));
+
+ // Create a normal mock file
+ String normalContent = "<?xml version=\"1.0\" encoding=\"UTF-8\" ?>" +
"<mock>" +
+ "<metadata action=\"add\" name=\"dc:creator\">Normal
Author</metadata>" +
+ "<write element=\"p\">normal content</write>" +
+ "</mock>";
+ String normalFile = "mock-normal.xml";
+ Files.write(inputDir.resolve(normalFile),
normalContent.getBytes(StandardCharsets.UTF_8));
+
+ Path tikaConfigPath =
PluginsTestHelper.getFileSystemFetcherConfig(tmp, inputDir,
tmp.resolve("output"));
+ TikaJsonConfig tikaJsonConfig = TikaJsonConfig.load(tikaConfigPath);
+ PipesConfig pipesConfig = PipesConfig.load(tikaJsonConfig);
+
+ try (PipesClient pipesClient = new PipesClient(pipesConfig,
tikaConfigPath)) {
+ // First: process the crashing file — server should die
+ PipesResult crashResult = pipesClient.process(
+ new FetchEmitTuple(crashFile, new FetchKey(fetcherName,
crashFile),
+ new EmitKey(), new Metadata(), new ParseContext(),
+ FetchEmitTuple.ON_PARSE_EXCEPTION.SKIP));
+
+ assertTrue(crashResult.isProcessCrash(),
+ "Crash file should result in process crash, got: " +
crashResult.status());
+
+ // Second: process the normal file — client should restart server
and succeed
+ PipesResult normalResult = pipesClient.process(
+ new FetchEmitTuple(normalFile, new FetchKey(fetcherName,
normalFile),
+ new EmitKey(), new Metadata(), new ParseContext(),
+ FetchEmitTuple.ON_PARSE_EXCEPTION.SKIP));
+
+ assertTrue(normalResult.isSuccess(),
+ "Normal file should succeed after crash recovery, got: " +
normalResult.status() +
+ " message: " + normalResult.message());
+
Assertions.assertNotNull(normalResult.emitData().getMetadataList());
+ assertEquals(1, normalResult.emitData().getMetadataList().size());
+ Metadata metadata =
normalResult.emitData().getMetadataList().get(0);
+ assertEquals("Normal Author", metadata.get("dc:creator"));
+ }
+ }
+
@Test
public void testConcatenateMode(@TempDir Path tmp) throws Exception {
// Test that CONCATENATE mode returns a single metadata object with
content