This is an automated email from the ASF dual-hosted git repository. tallison pushed a commit to branch TIKA-4556-fix-flaky in repository https://gitbox.apache.org/repos/asf/tika.git
commit 1a52dbe857e2ea688b927d868e92bbc55d3cbd58 Author: tallison <[email protected]> AuthorDate: Tue Dec 9 09:14:31 2025 -0500 TIKA-4556 - fix flaky restart --- .../org/apache/tika/pipes/core/PipesClient.java | 41 +++++++++++++++++++--- .../apache/tika/pipes/core/server/PipesServer.java | 20 +++++++++++ 2 files changed, 57 insertions(+), 4 deletions(-) diff --git a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/PipesClient.java b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/PipesClient.java index cd3ff0152..55c43fdff 100644 --- a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/PipesClient.java +++ b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/PipesClient.java @@ -138,22 +138,28 @@ public class PipesClient implements Closeable { private void shutItAllDown() throws InterruptedException { if (serverTuple == null) { + LOG.trace("shutItAllDown: serverTuple is null, nothing to shut down"); return; } + LOG.trace("shutItAllDown: sending SHUT_DOWN command"); try { serverTuple.output.write(COMMANDS.SHUT_DOWN.getByte()); serverTuple.output.flush(); } catch (IOException e) { //swallow + LOG.trace("shutItAllDown: IOException while sending SHUT_DOWN (process may already be dead)", e); } + LOG.trace("shutItAllDown: closing streams and sockets"); List<IOException> exceptions = new ArrayList<>(); tryToClose(serverTuple.input, exceptions); tryToClose(serverTuple.output, exceptions); tryToClose(serverTuple.socket, exceptions); tryToClose(serverTuple.serverSocket, exceptions); + LOG.trace("shutItAllDown: destroying process forcibly"); destroyForcibly(); deleteDir(serverTuple.tmpDir); + LOG.trace("shutItAllDown: setting serverTuple to null"); serverTuple = null; } @@ -163,9 +169,18 @@ public class PipesClient implements Closeable { return; } try { + if (closeable instanceof Socket socket) { + LOG.trace("tryToClose: closing Socket localAddr={}, remoteAddr={}", + socket.getLocalSocketAddress(), socket.getRemoteSocketAddress()); + } else if (closeable instanceof ServerSocket serverSocket) { + LOG.trace("tryToClose: closing ServerSocket on port={}", serverSocket.getLocalPort()); + } else { + LOG.trace("tryToClose: closing {}", closeable.getClass().getSimpleName()); + } closeable.close(); } catch (IOException e) { exceptions.add(e); + LOG.trace("tryToClose: IOException while closing {}", closeable.getClass().getSimpleName(), e); } } @@ -235,6 +250,7 @@ public class PipesClient implements Closeable { private void writeTask(FetchEmitTuple t) throws IOException { long start = System.currentTimeMillis(); + LOG.trace("writeTask: serializing FetchEmitTuple for id={}", t.getId()); UnsynchronizedByteArrayOutputStream bos = UnsynchronizedByteArrayOutputStream .builder() .get(); @@ -243,10 +259,12 @@ public class PipesClient implements Closeable { } byte[] bytes = bos.toByteArray(); + LOG.trace("writeTask: sending NEW_REQUEST command for id={}", t.getId()); serverTuple.output.write(COMMANDS.NEW_REQUEST.getByte()); serverTuple.output.writeInt(bytes.length); serverTuple.output.write(bytes); serverTuple.output.flush(); + LOG.trace("writeTask: NEW_REQUEST sent for id={}", t.getId()); if (LOG.isTraceEnabled()) { LOG.trace("pipesClientId={}: timer -- write tuple: {} ms", pipesClientId, System.currentTimeMillis() - start); } @@ -419,20 +437,27 @@ public class PipesClient implements Closeable { } private void writeAck() throws IOException { + LOG.trace("writeAck: sending ACK"); serverTuple.output.write(ACK.getByte()); serverTuple.output.flush(); + LOG.trace("writeAck: ACK sent and flushed"); } private void restart() throws InterruptedException, IOException, TimeoutException { + LOG.trace("restart: method called"); + ServerSocket serverSocket = new ServerSocket(0, 50, InetAddress.getLoopbackAddress()); + int port = serverSocket.getLocalPort(); + LOG.trace("restart: new ServerSocket created on port={}, localAddr={}", port, serverSocket.getLocalSocketAddress()); if (serverTuple != null && serverTuple.process != null) { + int oldPort = serverTuple.serverSocket.getLocalPort(); + LOG.trace("restart: shutting down old server (was on port={})", oldPort); shutItAllDown(); - LOG.info("pipesClientId={}: restarting process", pipesClientId); + LOG.info("pipesClientId={}: restarting process on port={} (old port was {})", pipesClientId, port, oldPort); } else { - LOG.info("pipesClientId={}: starting process", pipesClientId); + LOG.info("pipesClientId={}: starting process on port={}", pipesClientId, port); } - ServerSocket serverSocket = new ServerSocket(0, 50, InetAddress.getLoopbackAddress()); - int port = serverSocket.getLocalPort(); + LOG.trace("restart: creating temp dir and starting new process"); Path tmpDir = Files.createTempDirectory("pipes-server-" + pipesClientId + "-"); ProcessBuilder pb = new ProcessBuilder(getCommandline(port, tmpDir)); pb.inheritIO(); @@ -475,15 +500,23 @@ public class PipesClient implements Closeable { } } socket.setSoTimeout((int) pipesConfig.getSocketTimeoutMs()); + LOG.trace("restart: socket accepted from remoteAddr={}, localAddr={}, port={}", + socket.getRemoteSocketAddress(), socket.getLocalSocketAddress(), port); serverTuple = new ServerTuple(process, serverSocket, socket, new DataInputStream(socket.getInputStream()), new DataOutputStream(socket.getOutputStream()), tmpDir); + LOG.trace("restart: ServerTuple created (serverSocket port={}, socket local={}, socket remote={}), waiting for startup", + serverSocket.getLocalPort(), socket.getLocalSocketAddress(), socket.getRemoteSocketAddress()); waitForStartup(); + LOG.trace("restart: startup complete"); } private void waitForStartup() throws IOException { //wait for ready byte + LOG.trace("waitForStartup: about to read first byte from server"); int b = serverTuple.input.read(); + LOG.trace("waitForStartup: read byte={}, about to send ACK", HexFormat.of().formatHex(new byte[]{ (byte) b })); writeAck(); + LOG.trace("waitForStartup: ACK sent"); if (b == READY.getByte()) { LOG.debug("got ready byte"); } else if (b == FINISHED.getByte()) { diff --git a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/server/PipesServer.java b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/server/PipesServer.java index 99e403ce3..ba3d83cb8 100644 --- a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/server/PipesServer.java +++ b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/server/PipesServer.java @@ -151,11 +151,15 @@ public class PipesServer implements AutoCloseable { private final PipesWorker.EMIT_STRATEGY emitStrategy; public static PipesServer load(int port, Path tikaConfigPath) throws Exception { + LOG.trace("load: connecting to client on port={}", port); Socket socket = new Socket(); socket.connect(new InetSocketAddress(InetAddress.getLoopbackAddress(), port), PipesClient.SOCKET_CONNECT_TIMEOUT_MS); + LOG.trace("load: connected to client port={}, localAddr={}, remoteAddr={}", + port, socket.getLocalSocketAddress(), socket.getRemoteSocketAddress()); DataInputStream dis = new DataInputStream(socket.getInputStream()); DataOutputStream dos = new DataOutputStream(socket.getOutputStream()); + LOG.trace("load: streams created, loading Tika configuration from {}", tikaConfigPath); try { TikaLoader tikaLoader = TikaLoader.load(tikaConfigPath); TikaJsonConfig tikaJsonConfig = tikaLoader.getConfig(); @@ -165,8 +169,11 @@ public class PipesServer implements AutoCloseable { socket.setSoTimeout((int) pipesConfig.getSocketTimeoutMs()); MetadataFilter metadataFilter = tikaLoader.loadMetadataFilters(); + LOG.trace("load: creating PipesServer instance"); PipesServer pipesServer = new PipesServer(tikaLoader, pipesConfig, socket, dis, dos, metadataFilter); + LOG.trace("load: initializing resources"); pipesServer.initializeResources(); + LOG.trace("load: PipesServer successfully loaded and initialized"); return pipesServer; } catch (Exception e) { LOG.error("Failed to start up", e); @@ -235,8 +242,10 @@ public class PipesServer implements AutoCloseable { int port = Integer.parseInt(args[0]); Path tikaConfig = Paths.get(args[1]); LOG.debug("starting pipes server on port={} with tikaConfig={}", port, tikaConfig); + LOG.trace("main: about to call PipesServer.load()"); try (PipesServer server = PipesServer.load(port, tikaConfig)) { LOG.debug("successfully started pipes server"); + LOG.trace("main: about to enter mainLoop()"); server.mainLoop(); } catch (Throwable t) { LOG.error("crashed", t); @@ -247,7 +256,9 @@ public class PipesServer implements AutoCloseable { } public void mainLoop() { + LOG.trace("mainLoop: about to send READY"); write(PROCESSING_STATUS.READY.getByte()); + LOG.trace("mainLoop: READY sent and ACK received, entering main loop"); ArrayBlockingQueue<Metadata> intermediateResult = new ArrayBlockingQueue<>(1); LOG.trace("processing requests"); @@ -255,7 +266,9 @@ public class PipesServer implements AutoCloseable { try { long start = System.currentTimeMillis(); while (true) { + LOG.trace("mainLoop: about to read command byte"); int request = input.read(); + LOG.trace("mainLoop: read command byte={}", HexFormat.of().formatHex(new byte[]{(byte)request})); if (request == -1) { LOG.warn("received -1 from client; shutting down"); exit(0); @@ -480,10 +493,14 @@ public class PipesServer implements AutoCloseable { } private void awaitAck() throws IOException { + LOG.trace("awaitAck: about to read ACK"); int b = input.read(); + LOG.trace("awaitAck: read byte={}", HexFormat.of().formatHex(new byte[]{ (byte) b})); if (b == ACK.getByte()) { + LOG.trace("awaitAck: successfully received ACK"); return; } + LOG.error("awaitAck: expected ACK but got byte={}", HexFormat.of().formatHex(new byte[]{ (byte) b})); throw new IOException("Wasn't expecting byte=" + HexFormat.of().formatHex(new byte[]{ (byte) b})); } @@ -499,9 +516,12 @@ public class PipesServer implements AutoCloseable { private void write(byte b) { try { + LOG.trace("write: about to write byte={}", HexFormat.of().formatHex(new byte[]{ b })); output.write(b); output.flush(); + LOG.trace("write: byte written and flushed, awaiting ACK"); awaitAck(); + LOG.trace("write: ACK received"); } catch (IOException e) { LOG.error("problem writing data (forking process shutdown?)", e); exit(1);
