This is an automated email from the ASF dual-hosted git repository.

tallison pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/tika.git


The following commit(s) were added to refs/heads/main by this push:
     new 3eba834f5  TIKA-4556 - fix pipes client/server protocol bug
3eba834f5 is described below

commit 3eba834f507cf3d6e721a421bf51c3110744bdce
Author: Tim Allison <[email protected]>
AuthorDate: Tue Dec 9 12:07:27 2025 -0500

     TIKA-4556 - fix pipes client/server protocol bug
---
 .../org/apache/tika/pipes/core/PipesClient.java    | 20 ++++----
 .../apache/tika/pipes/core/server/PipesServer.java | 57 +++++++++++++++++-----
 .../apache/tika/pipes/core/PipesClientTest.java    | 50 +++++++++++++++++++
 3 files changed, 104 insertions(+), 23 deletions(-)

diff --git 
a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/PipesClient.java
 
b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/PipesClient.java
index cd3ff0152..fb89ade96 100644
--- 
a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/PipesClient.java
+++ 
b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/PipesClient.java
@@ -140,6 +140,7 @@ public class PipesClient implements Closeable {
         if (serverTuple == null) {
             return;
         }
+        LOG.debug("pipesClientId={}: shutting down server", pipesClientId);
         try {
             serverTuple.output.write(COMMANDS.SHUT_DOWN.getByte());
             serverTuple.output.flush();
@@ -233,8 +234,7 @@ public class PipesClient implements Closeable {
     }
 
     private void writeTask(FetchEmitTuple t) throws IOException {
-        long start = System.currentTimeMillis();
-
+        LOG.debug("pipesClientId={}: sending NEW_REQUEST for id={}", 
pipesClientId, t.getId());
         UnsynchronizedByteArrayOutputStream bos = 
UnsynchronizedByteArrayOutputStream
                 .builder()
                 .get();
@@ -247,9 +247,6 @@ public class PipesClient implements Closeable {
         serverTuple.output.writeInt(bytes.length);
         serverTuple.output.write(bytes);
         serverTuple.output.flush();
-        if (LOG.isTraceEnabled()) {
-            LOG.trace("pipesClientId={}: timer -- write tuple: {} ms", 
pipesClientId, System.currentTimeMillis() - start);
-        }
     }
 
     private PipesResult waitForServer(FetchEmitTuple t, IntermediateResult 
intermediateResult) throws InterruptedException {
@@ -425,14 +422,15 @@ public class PipesClient implements Closeable {
 
 
     private void restart() throws InterruptedException, IOException, 
TimeoutException {
+        ServerSocket serverSocket = new ServerSocket(0, 50, 
InetAddress.getLoopbackAddress());
+        int port = serverSocket.getLocalPort();
         if (serverTuple != null && serverTuple.process != null) {
+            int oldPort = serverTuple.serverSocket.getLocalPort();
             shutItAllDown();
-            LOG.info("pipesClientId={}: restarting process", pipesClientId);
+            LOG.info("pipesClientId={}: restarting process on port={} (old 
port was {})", pipesClientId, port, oldPort);
         } else {
-            LOG.info("pipesClientId={}: starting process", pipesClientId);
+            LOG.info("pipesClientId={}: starting process on port={}", 
pipesClientId, port);
         }
-        ServerSocket serverSocket = new ServerSocket(0, 50, 
InetAddress.getLoopbackAddress());
-        int port = serverSocket.getLocalPort();
         Path tmpDir = Files.createTempDirectory("pipes-server-" + 
pipesClientId + "-");
         ProcessBuilder pb = new ProcessBuilder(getCommandline(port, tmpDir));
         pb.inheritIO();
@@ -485,7 +483,7 @@ public class PipesClient implements Closeable {
         int b = serverTuple.input.read();
         writeAck();
         if (b == READY.getByte()) {
-            LOG.debug("got ready byte");
+            LOG.debug("pipesClientId={}: server ready", pipesClientId);
         } else if (b == FINISHED.getByte()) {
             int len = serverTuple.input.readInt();
             byte[] bytes = new byte[len];
@@ -537,7 +535,7 @@ public class PipesClient implements Closeable {
             if (arg.equals("-XX:+ExitOnOutOfMemoryError") || 
arg.equals("-XX:+CrashOnOutOfMemoryError")) {
                 hasExitOnOOM = true;
             }
-            if (arg.startsWith("-Dlog4j.configuration")) {
+            if (arg.startsWith("-Dlog4j.configuration") || 
arg.startsWith("-Dlog4j2.configuration")) {
                 hasLog4j = true;
             }
             if (arg.startsWith("-Xloggc:")) {
diff --git 
a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/server/PipesServer.java
 
b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/server/PipesServer.java
index 99e403ce3..949dcfea4 100644
--- 
a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/server/PipesServer.java
+++ 
b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/server/PipesServer.java
@@ -86,6 +86,7 @@ public class PipesServer implements AutoCloseable {
     private static final Logger LOG = 
LoggerFactory.getLogger(PipesServer.class);
 
     private final long heartbeatIntervalMs;
+    private final String pipesClientId;
 
     //this has to be some number not close to 0-3
     //it looks like the server crashes with exit value 3 on uncaught OOM, for 
example
@@ -151,6 +152,8 @@ public class PipesServer implements AutoCloseable {
     private final PipesWorker.EMIT_STRATEGY emitStrategy;
 
     public static PipesServer load(int port, Path tikaConfigPath) throws 
Exception {
+            String pipesClientId = System.getProperty("pipesClientId", 
"unknown");
+            LOG.debug("pipesClientId={}: connecting to client on port={}", 
pipesClientId, port);
             Socket socket = new Socket();
             socket.connect(new 
InetSocketAddress(InetAddress.getLoopbackAddress(), port), 
PipesClient.SOCKET_CONNECT_TIMEOUT_MS);
 
@@ -165,8 +168,9 @@ public class PipesServer implements AutoCloseable {
             socket.setSoTimeout((int) pipesConfig.getSocketTimeoutMs());
 
             MetadataFilter metadataFilter = tikaLoader.loadMetadataFilters();
-            PipesServer pipesServer = new PipesServer(tikaLoader, pipesConfig, 
socket, dis, dos, metadataFilter);
+            PipesServer pipesServer = new PipesServer(pipesClientId, 
tikaLoader, pipesConfig, socket, dis, dos, metadataFilter);
             pipesServer.initializeResources();
+            LOG.debug("pipesClientId={}: PipesServer loaded and ready", 
pipesClientId);
             return pipesServer;
         } catch (Exception e) {
             LOG.error("Failed to start up", e);
@@ -196,10 +200,11 @@ public class PipesServer implements AutoCloseable {
         }
     }
 
-    public PipesServer(TikaLoader tikaLoader, PipesConfig pipesConfig, Socket 
socket, DataInputStream in,
+    public PipesServer(String pipesClientId, TikaLoader tikaLoader, 
PipesConfig pipesConfig, Socket socket, DataInputStream in,
                        DataOutputStream out, MetadataFilter metadataFilter) 
throws TikaConfigException,
             IOException {
 
+        this.pipesClientId = pipesClientId;
         this.tikaLoader = tikaLoader;
         this.pipesConfig = pipesConfig;
         this.socket = socket;
@@ -234,32 +239,51 @@ public class PipesServer implements AutoCloseable {
     public static void main(String[] args) throws Exception {
         int port = Integer.parseInt(args[0]);
         Path tikaConfig = Paths.get(args[1]);
-        LOG.debug("starting pipes server on port={} with tikaConfig={}", port, 
tikaConfig);
+        String pipesClientId = System.getProperty("pipesClientId", "unknown");
+        LOG.debug("pipesClientId={}: starting pipes server on port={}", 
pipesClientId, port);
         try (PipesServer server = PipesServer.load(port, tikaConfig)) {
-            LOG.debug("successfully started pipes server");
             server.mainLoop();
         } catch (Throwable t) {
-            LOG.error("crashed", t);
+            LOG.error("pipesClientId={}: crashed", pipesClientId, t);
             throw t;
         } finally {
-            LOG.info("server shutting down");
+            LOG.info("pipesClientId={}: server shutting down", pipesClientId);
         }
     }
 
     public void mainLoop() {
         write(PROCESSING_STATUS.READY.getByte());
+        LOG.debug("pipesClientId={}: sent READY, entering main loop", 
pipesClientId);
         ArrayBlockingQueue<Metadata> intermediateResult = new 
ArrayBlockingQueue<>(1);
 
-        LOG.trace("processing requests");
         //main loop
         try {
             long start = System.currentTimeMillis();
             while (true) {
                 int request = input.read();
+                LOG.trace("pipesClientId={}: received command byte={}", 
pipesClientId, HexFormat.of().formatHex(new byte[]{(byte)request}));
                 if (request == -1) {
                     LOG.warn("received -1 from client; shutting down");
                     exit(0);
-                } else if (request == PipesClient.COMMANDS.PING.getByte()) {
+                }
+
+                // Validate that we received a command byte, not a status/ACK 
byte
+                if (request == PipesClient.COMMANDS.ACK.getByte()) {
+                    String msg = String.format(Locale.ROOT,
+                            "pipesClientId=%s: PROTOCOL ERROR - Received ACK 
(byte=0x%02x) when expecting a command. " +
+                            "This indicates a protocol synchronization issue 
where the server missed consuming an ACK. " +
+                            "Valid commands are: PING(0x%02x), 
NEW_REQUEST(0x%02x), SHUT_DOWN(0x%02x). " +
+                            "This is likely a bug in the server's message 
handling - check that all status messages " +
+                            "that trigger client ACKs are properly awaiting 
those ACKs.",
+                            pipesClientId, (byte)request,
+                            PipesClient.COMMANDS.PING.getByte(),
+                            PipesClient.COMMANDS.NEW_REQUEST.getByte(),
+                            PipesClient.COMMANDS.SHUT_DOWN.getByte());
+                    LOG.error(msg);
+                    throw new IllegalStateException(msg);
+                }
+
+                if (request == PipesClient.COMMANDS.PING.getByte()) {
                     writeNoAck(PipesClient.COMMANDS.PING.getByte());
                 } else if (request == 
PipesClient.COMMANDS.NEW_REQUEST.getByte()) {
                     intermediateResult.clear();
@@ -284,8 +308,16 @@ public class PipesServer implements AutoCloseable {
                     }
                     System.exit(0);
                 } else {
-                    LOG.error("Unexpected request byte={}", 
HexFormat.of().formatHex(new byte[]{(byte)request}));
-                    throw new IllegalStateException("Unexpected request");
+                    String msg = String.format(Locale.ROOT,
+                            "pipesClientId=%s: Unexpected byte 0x%02x in 
command position. " +
+                            "Expected one of: PING(0x%02x), ACK(0x%02x), 
NEW_REQUEST(0x%02x), SHUT_DOWN(0x%02x)",
+                            pipesClientId, (byte)request,
+                            PipesClient.COMMANDS.PING.getByte(),
+                            PipesClient.COMMANDS.ACK.getByte(),
+                            PipesClient.COMMANDS.NEW_REQUEST.getByte(),
+                            PipesClient.COMMANDS.SHUT_DOWN.getByte());
+                    LOG.error(msg);
+                    throw new IllegalStateException(msg);
                 }
                 output.flush();
             }
@@ -346,7 +378,7 @@ public class PipesServer implements AutoCloseable {
             long elapsed = System.currentTimeMillis() - start.toEpochMilli();
             if (elapsed > mockProgressCounter * heartbeatIntervalMs) {
                 LOG.debug("still processing: {}", mockProgressCounter);
-                output.write(PROCESSING_STATUS.WORKING.getByte());
+                write(PROCESSING_STATUS.WORKING.getByte());
                 output.writeLong(mockProgressCounter++);
                 output.flush();
             }
@@ -484,6 +516,7 @@ public class PipesServer implements AutoCloseable {
         if (b == ACK.getByte()) {
             return;
         }
+        LOG.error("pipesClientId={}: expected ACK but got byte={}", 
pipesClientId, HexFormat.of().formatHex(new byte[]{ (byte) b}));
         throw new IOException("Wasn't expecting byte=" + 
HexFormat.of().formatHex(new byte[]{ (byte) b}));
     }
 
@@ -503,7 +536,7 @@ public class PipesServer implements AutoCloseable {
             output.flush();
             awaitAck();
         } catch (IOException e) {
-            LOG.error("problem writing data (forking process shutdown?)", e);
+            LOG.error("pipesClientId={}: problem writing data (forking process 
shutdown?)", pipesClientId, e);
             exit(1);
         }
     }
diff --git 
a/tika-pipes/tika-pipes-integration-tests/src/test/java/org/apache/tika/pipes/core/PipesClientTest.java
 
b/tika-pipes/tika-pipes-integration-tests/src/test/java/org/apache/tika/pipes/core/PipesClientTest.java
index b482200e1..e821e1c60 100644
--- 
a/tika-pipes/tika-pipes-integration-tests/src/test/java/org/apache/tika/pipes/core/PipesClientTest.java
+++ 
b/tika-pipes/tika-pipes-integration-tests/src/test/java/org/apache/tika/pipes/core/PipesClientTest.java
@@ -551,4 +551,54 @@ public class PipesClientTest {
                     "Error message should mention the missing emitter");
         }
     }
+
+    @Test
+    public void testHeartbeatProtocol(@TempDir Path tmp) throws Exception {
+        // Test that heartbeat protocol works correctly and doesn't cause 
protocol errors
+        // This test exercises the WORKING status messages during long-running 
operations
+        // to ensure the server properly awaits ACKs after sending heartbeats
+
+        Path inputDir = tmp.resolve("input");
+        Files.createDirectories(inputDir);
+
+        // Create a mock file with 2 second delay to trigger multiple 
heartbeats
+        String mockContent = "<?xml version=\"1.0\" encoding=\"UTF-8\" ?>" +
+                "<mock>" +
+                "<metadata action=\"add\" name=\"dc:creator\">Heartbeat 
Test</metadata>" +
+                "<write element=\"p\">Testing heartbeat protocol 
synchronization</write>" +
+                "<fakeload millis=\"2000\" cpu=\"1\" mb=\"10\"/>" +
+                "</mock>";
+        String testFile = "mock-heartbeat-test.xml";
+        Files.write(inputDir.resolve(testFile), 
mockContent.getBytes(StandardCharsets.UTF_8));
+
+        // Create config with very short heartbeat interval (100ms) to ensure 
heartbeats are sent
+        Path tikaConfigPath = 
PluginsTestHelper.getFileSystemFetcherConfig(tmp, inputDir, 
tmp.resolve("output"));
+        String configContent = Files.readString(tikaConfigPath, 
StandardCharsets.UTF_8);
+
+        // Modify config to add very short heartbeat interval
+        configContent = configContent.replace(
+                "\"pipes\": {",
+                "\"pipes\": {\n    \"heartbeatIntervalMs\": 100,"
+        );
+        Files.writeString(tikaConfigPath, configContent, 
StandardCharsets.UTF_8);
+
+        TikaJsonConfig tikaJsonConfig = TikaJsonConfig.load(tikaConfigPath);
+        PipesConfig pipesConfig = PipesConfig.load(tikaJsonConfig, 
tikaConfigPath);
+
+        try (PipesClient pipesClient = new PipesClient(pipesConfig)) {
+            // Process file - should complete successfully despite multiple 
heartbeats
+            PipesResult pipesResult = pipesClient.process(
+                    new FetchEmitTuple(testFile, new FetchKey(fetcherName, 
testFile),
+                            new EmitKey(), new Metadata(), new ParseContext(),
+                            FetchEmitTuple.ON_PARSE_EXCEPTION.SKIP));
+
+            // Verify successful completion
+            assertTrue(pipesResult.isSuccess(),
+                    "Processing should succeed even with heartbeat messages. 
Got status: " + pipesResult.status());
+            Assertions.assertNotNull(pipesResult.emitData().getMetadataList());
+            assertEquals(1, pipesResult.emitData().getMetadataList().size());
+            Metadata metadata = 
pipesResult.emitData().getMetadataList().get(0);
+            assertEquals("Heartbeat Test", metadata.get("dc:creator"));
+        }
+    }
 }

Reply via email to