This is an automated email from the ASF dual-hosted git repository.
tallison pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/tika.git
The following commit(s) were added to refs/heads/main by this push:
new 3eba834f5 TIKA-4556 - fix pipes client/server protocol bug
3eba834f5 is described below
commit 3eba834f507cf3d6e721a421bf51c3110744bdce
Author: Tim Allison <[email protected]>
AuthorDate: Tue Dec 9 12:07:27 2025 -0500
TIKA-4556 - fix pipes client/server protocol bug
---
.../org/apache/tika/pipes/core/PipesClient.java | 20 ++++----
.../apache/tika/pipes/core/server/PipesServer.java | 57 +++++++++++++++++-----
.../apache/tika/pipes/core/PipesClientTest.java | 50 +++++++++++++++++++
3 files changed, 104 insertions(+), 23 deletions(-)
diff --git
a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/PipesClient.java
b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/PipesClient.java
index cd3ff0152..fb89ade96 100644
---
a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/PipesClient.java
+++
b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/PipesClient.java
@@ -140,6 +140,7 @@ public class PipesClient implements Closeable {
if (serverTuple == null) {
return;
}
+ LOG.debug("pipesClientId={}: shutting down server", pipesClientId);
try {
serverTuple.output.write(COMMANDS.SHUT_DOWN.getByte());
serverTuple.output.flush();
@@ -233,8 +234,7 @@ public class PipesClient implements Closeable {
}
private void writeTask(FetchEmitTuple t) throws IOException {
- long start = System.currentTimeMillis();
-
+ LOG.debug("pipesClientId={}: sending NEW_REQUEST for id={}",
pipesClientId, t.getId());
UnsynchronizedByteArrayOutputStream bos =
UnsynchronizedByteArrayOutputStream
.builder()
.get();
@@ -247,9 +247,6 @@ public class PipesClient implements Closeable {
serverTuple.output.writeInt(bytes.length);
serverTuple.output.write(bytes);
serverTuple.output.flush();
- if (LOG.isTraceEnabled()) {
- LOG.trace("pipesClientId={}: timer -- write tuple: {} ms",
pipesClientId, System.currentTimeMillis() - start);
- }
}
private PipesResult waitForServer(FetchEmitTuple t, IntermediateResult
intermediateResult) throws InterruptedException {
@@ -425,14 +422,15 @@ public class PipesClient implements Closeable {
private void restart() throws InterruptedException, IOException,
TimeoutException {
+ ServerSocket serverSocket = new ServerSocket(0, 50,
InetAddress.getLoopbackAddress());
+ int port = serverSocket.getLocalPort();
if (serverTuple != null && serverTuple.process != null) {
+ int oldPort = serverTuple.serverSocket.getLocalPort();
shutItAllDown();
- LOG.info("pipesClientId={}: restarting process", pipesClientId);
+ LOG.info("pipesClientId={}: restarting process on port={} (old
port was {})", pipesClientId, port, oldPort);
} else {
- LOG.info("pipesClientId={}: starting process", pipesClientId);
+ LOG.info("pipesClientId={}: starting process on port={}",
pipesClientId, port);
}
- ServerSocket serverSocket = new ServerSocket(0, 50,
InetAddress.getLoopbackAddress());
- int port = serverSocket.getLocalPort();
Path tmpDir = Files.createTempDirectory("pipes-server-" +
pipesClientId + "-");
ProcessBuilder pb = new ProcessBuilder(getCommandline(port, tmpDir));
pb.inheritIO();
@@ -485,7 +483,7 @@ public class PipesClient implements Closeable {
int b = serverTuple.input.read();
writeAck();
if (b == READY.getByte()) {
- LOG.debug("got ready byte");
+ LOG.debug("pipesClientId={}: server ready", pipesClientId);
} else if (b == FINISHED.getByte()) {
int len = serverTuple.input.readInt();
byte[] bytes = new byte[len];
@@ -537,7 +535,7 @@ public class PipesClient implements Closeable {
if (arg.equals("-XX:+ExitOnOutOfMemoryError") ||
arg.equals("-XX:+CrashOnOutOfMemoryError")) {
hasExitOnOOM = true;
}
- if (arg.startsWith("-Dlog4j.configuration")) {
+ if (arg.startsWith("-Dlog4j.configuration") ||
arg.startsWith("-Dlog4j2.configuration")) {
hasLog4j = true;
}
if (arg.startsWith("-Xloggc:")) {
diff --git
a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/server/PipesServer.java
b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/server/PipesServer.java
index 99e403ce3..949dcfea4 100644
---
a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/server/PipesServer.java
+++
b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/server/PipesServer.java
@@ -86,6 +86,7 @@ public class PipesServer implements AutoCloseable {
private static final Logger LOG =
LoggerFactory.getLogger(PipesServer.class);
private final long heartbeatIntervalMs;
+ private final String pipesClientId;
//this has to be some number not close to 0-3
//it looks like the server crashes with exit value 3 on uncaught OOM, for
example
@@ -151,6 +152,8 @@ public class PipesServer implements AutoCloseable {
private final PipesWorker.EMIT_STRATEGY emitStrategy;
public static PipesServer load(int port, Path tikaConfigPath) throws
Exception {
+ String pipesClientId = System.getProperty("pipesClientId",
"unknown");
+ LOG.debug("pipesClientId={}: connecting to client on port={}",
pipesClientId, port);
Socket socket = new Socket();
socket.connect(new
InetSocketAddress(InetAddress.getLoopbackAddress(), port),
PipesClient.SOCKET_CONNECT_TIMEOUT_MS);
@@ -165,8 +168,9 @@ public class PipesServer implements AutoCloseable {
socket.setSoTimeout((int) pipesConfig.getSocketTimeoutMs());
MetadataFilter metadataFilter = tikaLoader.loadMetadataFilters();
- PipesServer pipesServer = new PipesServer(tikaLoader, pipesConfig,
socket, dis, dos, metadataFilter);
+ PipesServer pipesServer = new PipesServer(pipesClientId,
tikaLoader, pipesConfig, socket, dis, dos, metadataFilter);
pipesServer.initializeResources();
+ LOG.debug("pipesClientId={}: PipesServer loaded and ready",
pipesClientId);
return pipesServer;
} catch (Exception e) {
LOG.error("Failed to start up", e);
@@ -196,10 +200,11 @@ public class PipesServer implements AutoCloseable {
}
}
- public PipesServer(TikaLoader tikaLoader, PipesConfig pipesConfig, Socket
socket, DataInputStream in,
+ public PipesServer(String pipesClientId, TikaLoader tikaLoader,
PipesConfig pipesConfig, Socket socket, DataInputStream in,
DataOutputStream out, MetadataFilter metadataFilter)
throws TikaConfigException,
IOException {
+ this.pipesClientId = pipesClientId;
this.tikaLoader = tikaLoader;
this.pipesConfig = pipesConfig;
this.socket = socket;
@@ -234,32 +239,51 @@ public class PipesServer implements AutoCloseable {
public static void main(String[] args) throws Exception {
int port = Integer.parseInt(args[0]);
Path tikaConfig = Paths.get(args[1]);
- LOG.debug("starting pipes server on port={} with tikaConfig={}", port,
tikaConfig);
+ String pipesClientId = System.getProperty("pipesClientId", "unknown");
+ LOG.debug("pipesClientId={}: starting pipes server on port={}",
pipesClientId, port);
try (PipesServer server = PipesServer.load(port, tikaConfig)) {
- LOG.debug("successfully started pipes server");
server.mainLoop();
} catch (Throwable t) {
- LOG.error("crashed", t);
+ LOG.error("pipesClientId={}: crashed", pipesClientId, t);
throw t;
} finally {
- LOG.info("server shutting down");
+ LOG.info("pipesClientId={}: server shutting down", pipesClientId);
}
}
public void mainLoop() {
write(PROCESSING_STATUS.READY.getByte());
+ LOG.debug("pipesClientId={}: sent READY, entering main loop",
pipesClientId);
ArrayBlockingQueue<Metadata> intermediateResult = new
ArrayBlockingQueue<>(1);
- LOG.trace("processing requests");
//main loop
try {
long start = System.currentTimeMillis();
while (true) {
int request = input.read();
+ LOG.trace("pipesClientId={}: received command byte={}",
pipesClientId, HexFormat.of().formatHex(new byte[]{(byte)request}));
if (request == -1) {
LOG.warn("received -1 from client; shutting down");
exit(0);
- } else if (request == PipesClient.COMMANDS.PING.getByte()) {
+ }
+
+ // Validate that we received a command byte, not a status/ACK
byte
+ if (request == PipesClient.COMMANDS.ACK.getByte()) {
+ String msg = String.format(Locale.ROOT,
+ "pipesClientId=%s: PROTOCOL ERROR - Received ACK
(byte=0x%02x) when expecting a command. " +
+ "This indicates a protocol synchronization issue
where the server missed consuming an ACK. " +
+ "Valid commands are: PING(0x%02x),
NEW_REQUEST(0x%02x), SHUT_DOWN(0x%02x). " +
+ "This is likely a bug in the server's message
handling - check that all status messages " +
+ "that trigger client ACKs are properly awaiting
those ACKs.",
+ pipesClientId, (byte)request,
+ PipesClient.COMMANDS.PING.getByte(),
+ PipesClient.COMMANDS.NEW_REQUEST.getByte(),
+ PipesClient.COMMANDS.SHUT_DOWN.getByte());
+ LOG.error(msg);
+ throw new IllegalStateException(msg);
+ }
+
+ if (request == PipesClient.COMMANDS.PING.getByte()) {
writeNoAck(PipesClient.COMMANDS.PING.getByte());
} else if (request ==
PipesClient.COMMANDS.NEW_REQUEST.getByte()) {
intermediateResult.clear();
@@ -284,8 +308,16 @@ public class PipesServer implements AutoCloseable {
}
System.exit(0);
} else {
- LOG.error("Unexpected request byte={}",
HexFormat.of().formatHex(new byte[]{(byte)request}));
- throw new IllegalStateException("Unexpected request");
+ String msg = String.format(Locale.ROOT,
+ "pipesClientId=%s: Unexpected byte 0x%02x in
command position. " +
+ "Expected one of: PING(0x%02x), ACK(0x%02x),
NEW_REQUEST(0x%02x), SHUT_DOWN(0x%02x)",
+ pipesClientId, (byte)request,
+ PipesClient.COMMANDS.PING.getByte(),
+ PipesClient.COMMANDS.ACK.getByte(),
+ PipesClient.COMMANDS.NEW_REQUEST.getByte(),
+ PipesClient.COMMANDS.SHUT_DOWN.getByte());
+ LOG.error(msg);
+ throw new IllegalStateException(msg);
}
output.flush();
}
@@ -346,7 +378,7 @@ public class PipesServer implements AutoCloseable {
long elapsed = System.currentTimeMillis() - start.toEpochMilli();
if (elapsed > mockProgressCounter * heartbeatIntervalMs) {
LOG.debug("still processing: {}", mockProgressCounter);
- output.write(PROCESSING_STATUS.WORKING.getByte());
+ write(PROCESSING_STATUS.WORKING.getByte());
output.writeLong(mockProgressCounter++);
output.flush();
}
@@ -484,6 +516,7 @@ public class PipesServer implements AutoCloseable {
if (b == ACK.getByte()) {
return;
}
+ LOG.error("pipesClientId={}: expected ACK but got byte={}",
pipesClientId, HexFormat.of().formatHex(new byte[]{ (byte) b}));
throw new IOException("Wasn't expecting byte=" +
HexFormat.of().formatHex(new byte[]{ (byte) b}));
}
@@ -503,7 +536,7 @@ public class PipesServer implements AutoCloseable {
output.flush();
awaitAck();
} catch (IOException e) {
- LOG.error("problem writing data (forking process shutdown?)", e);
+ LOG.error("pipesClientId={}: problem writing data (forking process
shutdown?)", pipesClientId, e);
exit(1);
}
}
diff --git
a/tika-pipes/tika-pipes-integration-tests/src/test/java/org/apache/tika/pipes/core/PipesClientTest.java
b/tika-pipes/tika-pipes-integration-tests/src/test/java/org/apache/tika/pipes/core/PipesClientTest.java
index b482200e1..e821e1c60 100644
---
a/tika-pipes/tika-pipes-integration-tests/src/test/java/org/apache/tika/pipes/core/PipesClientTest.java
+++
b/tika-pipes/tika-pipes-integration-tests/src/test/java/org/apache/tika/pipes/core/PipesClientTest.java
@@ -551,4 +551,54 @@ public class PipesClientTest {
"Error message should mention the missing emitter");
}
}
+
+ @Test
+ public void testHeartbeatProtocol(@TempDir Path tmp) throws Exception {
+ // Test that heartbeat protocol works correctly and doesn't cause
protocol errors
+ // This test exercises the WORKING status messages during long-running
operations
+ // to ensure the server properly awaits ACKs after sending heartbeats
+
+ Path inputDir = tmp.resolve("input");
+ Files.createDirectories(inputDir);
+
+ // Create a mock file with 2 second delay to trigger multiple
heartbeats
+ String mockContent = "<?xml version=\"1.0\" encoding=\"UTF-8\" ?>" +
+ "<mock>" +
+ "<metadata action=\"add\" name=\"dc:creator\">Heartbeat
Test</metadata>" +
+ "<write element=\"p\">Testing heartbeat protocol
synchronization</write>" +
+ "<fakeload millis=\"2000\" cpu=\"1\" mb=\"10\"/>" +
+ "</mock>";
+ String testFile = "mock-heartbeat-test.xml";
+ Files.write(inputDir.resolve(testFile),
mockContent.getBytes(StandardCharsets.UTF_8));
+
+ // Create config with very short heartbeat interval (100ms) to ensure
heartbeats are sent
+ Path tikaConfigPath =
PluginsTestHelper.getFileSystemFetcherConfig(tmp, inputDir,
tmp.resolve("output"));
+ String configContent = Files.readString(tikaConfigPath,
StandardCharsets.UTF_8);
+
+ // Modify config to add very short heartbeat interval
+ configContent = configContent.replace(
+ "\"pipes\": {",
+ "\"pipes\": {\n \"heartbeatIntervalMs\": 100,"
+ );
+ Files.writeString(tikaConfigPath, configContent,
StandardCharsets.UTF_8);
+
+ TikaJsonConfig tikaJsonConfig = TikaJsonConfig.load(tikaConfigPath);
+ PipesConfig pipesConfig = PipesConfig.load(tikaJsonConfig,
tikaConfigPath);
+
+ try (PipesClient pipesClient = new PipesClient(pipesConfig)) {
+ // Process file - should complete successfully despite multiple
heartbeats
+ PipesResult pipesResult = pipesClient.process(
+ new FetchEmitTuple(testFile, new FetchKey(fetcherName,
testFile),
+ new EmitKey(), new Metadata(), new ParseContext(),
+ FetchEmitTuple.ON_PARSE_EXCEPTION.SKIP));
+
+ // Verify successful completion
+ assertTrue(pipesResult.isSuccess(),
+ "Processing should succeed even with heartbeat messages.
Got status: " + pipesResult.status());
+ Assertions.assertNotNull(pipesResult.emitData().getMetadataList());
+ assertEquals(1, pipesResult.emitData().getMetadataList().size());
+ Metadata metadata =
pipesResult.emitData().getMetadataList().get(0);
+ assertEquals("Heartbeat Test", metadata.get("dc:creator"));
+ }
+ }
}