This is an automated email from the ASF dual-hosted git repository.

tallison pushed a commit to branch TIKA-4556-fix-flaky
in repository https://gitbox.apache.org/repos/asf/tika.git

commit 1a52dbe857e2ea688b927d868e92bbc55d3cbd58
Author: tallison <[email protected]>
AuthorDate: Tue Dec 9 09:14:31 2025 -0500

    TIKA-4556 - fix flaky restart
---
 .../org/apache/tika/pipes/core/PipesClient.java    | 41 +++++++++++++++++++---
 .../apache/tika/pipes/core/server/PipesServer.java | 20 +++++++++++
 2 files changed, 57 insertions(+), 4 deletions(-)

diff --git 
a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/PipesClient.java
 
b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/PipesClient.java
index cd3ff0152..55c43fdff 100644
--- 
a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/PipesClient.java
+++ 
b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/PipesClient.java
@@ -138,22 +138,28 @@ public class PipesClient implements Closeable {
 
     private void shutItAllDown() throws InterruptedException {
         if (serverTuple == null) {
+            LOG.trace("shutItAllDown: serverTuple is null, nothing to shut 
down");
             return;
         }
+        LOG.trace("shutItAllDown: sending SHUT_DOWN command");
         try {
             serverTuple.output.write(COMMANDS.SHUT_DOWN.getByte());
             serverTuple.output.flush();
         } catch (IOException e) {
             //swallow
+            LOG.trace("shutItAllDown: IOException while sending SHUT_DOWN 
(process may already be dead)", e);
         }
+        LOG.trace("shutItAllDown: closing streams and sockets");
         List<IOException> exceptions = new ArrayList<>();
         tryToClose(serverTuple.input, exceptions);
         tryToClose(serverTuple.output, exceptions);
         tryToClose(serverTuple.socket, exceptions);
         tryToClose(serverTuple.serverSocket, exceptions);
+        LOG.trace("shutItAllDown: destroying process forcibly");
         destroyForcibly();
 
         deleteDir(serverTuple.tmpDir);
+        LOG.trace("shutItAllDown: setting serverTuple to null");
         serverTuple = null;
 
     }
@@ -163,9 +169,18 @@ public class PipesClient implements Closeable {
             return;
         }
         try {
+            if (closeable instanceof Socket socket) {
+                LOG.trace("tryToClose: closing Socket localAddr={}, 
remoteAddr={}",
+                        socket.getLocalSocketAddress(), 
socket.getRemoteSocketAddress());
+            } else if (closeable instanceof ServerSocket serverSocket) {
+                LOG.trace("tryToClose: closing ServerSocket on port={}", 
serverSocket.getLocalPort());
+            } else {
+                LOG.trace("tryToClose: closing {}", 
closeable.getClass().getSimpleName());
+            }
             closeable.close();
         } catch (IOException e) {
             exceptions.add(e);
+            LOG.trace("tryToClose: IOException while closing {}", 
closeable.getClass().getSimpleName(), e);
         }
     }
 
@@ -235,6 +250,7 @@ public class PipesClient implements Closeable {
     private void writeTask(FetchEmitTuple t) throws IOException {
         long start = System.currentTimeMillis();
 
+        LOG.trace("writeTask: serializing FetchEmitTuple for id={}", 
t.getId());
         UnsynchronizedByteArrayOutputStream bos = 
UnsynchronizedByteArrayOutputStream
                 .builder()
                 .get();
@@ -243,10 +259,12 @@ public class PipesClient implements Closeable {
         }
 
         byte[] bytes = bos.toByteArray();
+        LOG.trace("writeTask: sending NEW_REQUEST command for id={}", 
t.getId());
         serverTuple.output.write(COMMANDS.NEW_REQUEST.getByte());
         serverTuple.output.writeInt(bytes.length);
         serverTuple.output.write(bytes);
         serverTuple.output.flush();
+        LOG.trace("writeTask: NEW_REQUEST sent for id={}", t.getId());
         if (LOG.isTraceEnabled()) {
             LOG.trace("pipesClientId={}: timer -- write tuple: {} ms", 
pipesClientId, System.currentTimeMillis() - start);
         }
@@ -419,20 +437,27 @@ public class PipesClient implements Closeable {
     }
 
     private void writeAck() throws IOException {
+        LOG.trace("writeAck: sending ACK");
         serverTuple.output.write(ACK.getByte());
         serverTuple.output.flush();
+        LOG.trace("writeAck: ACK sent and flushed");
     }
 
 
     private void restart() throws InterruptedException, IOException, 
TimeoutException {
+        LOG.trace("restart: method called");
+        ServerSocket serverSocket = new ServerSocket(0, 50, 
InetAddress.getLoopbackAddress());
+        int port = serverSocket.getLocalPort();
+        LOG.trace("restart: new ServerSocket created on port={}, 
localAddr={}", port, serverSocket.getLocalSocketAddress());
         if (serverTuple != null && serverTuple.process != null) {
+            int oldPort = serverTuple.serverSocket.getLocalPort();
+            LOG.trace("restart: shutting down old server (was on port={})", 
oldPort);
             shutItAllDown();
-            LOG.info("pipesClientId={}: restarting process", pipesClientId);
+            LOG.info("pipesClientId={}: restarting process on port={} (old 
port was {})", pipesClientId, port, oldPort);
         } else {
-            LOG.info("pipesClientId={}: starting process", pipesClientId);
+            LOG.info("pipesClientId={}: starting process on port={}", 
pipesClientId, port);
         }
-        ServerSocket serverSocket = new ServerSocket(0, 50, 
InetAddress.getLoopbackAddress());
-        int port = serverSocket.getLocalPort();
+        LOG.trace("restart: creating temp dir and starting new process");
         Path tmpDir = Files.createTempDirectory("pipes-server-" + 
pipesClientId + "-");
         ProcessBuilder pb = new ProcessBuilder(getCommandline(port, tmpDir));
         pb.inheritIO();
@@ -475,15 +500,23 @@ public class PipesClient implements Closeable {
             }
         }
         socket.setSoTimeout((int) pipesConfig.getSocketTimeoutMs());
+        LOG.trace("restart: socket accepted from remoteAddr={}, localAddr={}, 
port={}",
+                socket.getRemoteSocketAddress(), 
socket.getLocalSocketAddress(), port);
         serverTuple = new ServerTuple(process, serverSocket, socket, new 
DataInputStream(socket.getInputStream()),
                 new DataOutputStream(socket.getOutputStream()), tmpDir);
+        LOG.trace("restart: ServerTuple created (serverSocket port={}, socket 
local={}, socket remote={}), waiting for startup",
+                serverSocket.getLocalPort(), socket.getLocalSocketAddress(), 
socket.getRemoteSocketAddress());
         waitForStartup();
+        LOG.trace("restart: startup complete");
     }
 
     private void waitForStartup() throws IOException {
         //wait for ready byte
+        LOG.trace("waitForStartup: about to read first byte from server");
         int b = serverTuple.input.read();
+        LOG.trace("waitForStartup: read byte={}, about to send ACK", 
HexFormat.of().formatHex(new byte[]{ (byte) b }));
         writeAck();
+        LOG.trace("waitForStartup: ACK sent");
         if (b == READY.getByte()) {
             LOG.debug("got ready byte");
         } else if (b == FINISHED.getByte()) {
diff --git 
a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/server/PipesServer.java
 
b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/server/PipesServer.java
index 99e403ce3..ba3d83cb8 100644
--- 
a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/server/PipesServer.java
+++ 
b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/server/PipesServer.java
@@ -151,11 +151,15 @@ public class PipesServer implements AutoCloseable {
     private final PipesWorker.EMIT_STRATEGY emitStrategy;
 
     public static PipesServer load(int port, Path tikaConfigPath) throws 
Exception {
+            LOG.trace("load: connecting to client on port={}", port);
             Socket socket = new Socket();
             socket.connect(new 
InetSocketAddress(InetAddress.getLoopbackAddress(), port), 
PipesClient.SOCKET_CONNECT_TIMEOUT_MS);
+            LOG.trace("load: connected to client port={}, localAddr={}, 
remoteAddr={}",
+                    port, socket.getLocalSocketAddress(), 
socket.getRemoteSocketAddress());
 
             DataInputStream dis = new DataInputStream(socket.getInputStream());
             DataOutputStream dos = new 
DataOutputStream(socket.getOutputStream());
+        LOG.trace("load: streams created, loading Tika configuration from {}", 
tikaConfigPath);
         try {
             TikaLoader tikaLoader = TikaLoader.load(tikaConfigPath);
             TikaJsonConfig tikaJsonConfig = tikaLoader.getConfig();
@@ -165,8 +169,11 @@ public class PipesServer implements AutoCloseable {
             socket.setSoTimeout((int) pipesConfig.getSocketTimeoutMs());
 
             MetadataFilter metadataFilter = tikaLoader.loadMetadataFilters();
+            LOG.trace("load: creating PipesServer instance");
             PipesServer pipesServer = new PipesServer(tikaLoader, pipesConfig, 
socket, dis, dos, metadataFilter);
+            LOG.trace("load: initializing resources");
             pipesServer.initializeResources();
+            LOG.trace("load: PipesServer successfully loaded and initialized");
             return pipesServer;
         } catch (Exception e) {
             LOG.error("Failed to start up", e);
@@ -235,8 +242,10 @@ public class PipesServer implements AutoCloseable {
         int port = Integer.parseInt(args[0]);
         Path tikaConfig = Paths.get(args[1]);
         LOG.debug("starting pipes server on port={} with tikaConfig={}", port, 
tikaConfig);
+        LOG.trace("main: about to call PipesServer.load()");
         try (PipesServer server = PipesServer.load(port, tikaConfig)) {
             LOG.debug("successfully started pipes server");
+            LOG.trace("main: about to enter mainLoop()");
             server.mainLoop();
         } catch (Throwable t) {
             LOG.error("crashed", t);
@@ -247,7 +256,9 @@ public class PipesServer implements AutoCloseable {
     }
 
     public void mainLoop() {
+        LOG.trace("mainLoop: about to send READY");
         write(PROCESSING_STATUS.READY.getByte());
+        LOG.trace("mainLoop: READY sent and ACK received, entering main loop");
         ArrayBlockingQueue<Metadata> intermediateResult = new 
ArrayBlockingQueue<>(1);
 
         LOG.trace("processing requests");
@@ -255,7 +266,9 @@ public class PipesServer implements AutoCloseable {
         try {
             long start = System.currentTimeMillis();
             while (true) {
+                LOG.trace("mainLoop: about to read command byte");
                 int request = input.read();
+                LOG.trace("mainLoop: read command byte={}", 
HexFormat.of().formatHex(new byte[]{(byte)request}));
                 if (request == -1) {
                     LOG.warn("received -1 from client; shutting down");
                     exit(0);
@@ -480,10 +493,14 @@ public class PipesServer implements AutoCloseable {
     }
 
     private void awaitAck() throws IOException {
+        LOG.trace("awaitAck: about to read ACK");
         int b = input.read();
+        LOG.trace("awaitAck: read byte={}", HexFormat.of().formatHex(new 
byte[]{ (byte) b}));
         if (b == ACK.getByte()) {
+            LOG.trace("awaitAck: successfully received ACK");
             return;
         }
+        LOG.error("awaitAck: expected ACK but got byte={}", 
HexFormat.of().formatHex(new byte[]{ (byte) b}));
         throw new IOException("Wasn't expecting byte=" + 
HexFormat.of().formatHex(new byte[]{ (byte) b}));
     }
 
@@ -499,9 +516,12 @@ public class PipesServer implements AutoCloseable {
 
     private void write(byte b) {
         try {
+            LOG.trace("write: about to write byte={}", 
HexFormat.of().formatHex(new byte[]{ b }));
             output.write(b);
             output.flush();
+            LOG.trace("write: byte written and flushed, awaiting ACK");
             awaitAck();
+            LOG.trace("write: ACK received");
         } catch (IOException e) {
             LOG.error("problem writing data (forking process shutdown?)", e);
             exit(1);

Reply via email to