This is an automated email from the ASF dual-hosted git repository. tballison pushed a commit to branch fix/pipesclient-closeconnection-race in repository https://gitbox.apache.org/repos/asf/tika.git
commit fa29494333671e5197746988ba0834836778f523 Author: tallison <[email protected]> AuthorDate: Thu May 28 17:08:09 2026 -0400 fix race condition in PipesClient --- .../org/apache/tika/pipes/core/PipesClient.java | 68 ++++++++++++++++------ 1 file changed, 50 insertions(+), 18 deletions(-) diff --git a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/PipesClient.java b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/PipesClient.java index be1b1e6034..c69362ba8b 100644 --- a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/PipesClient.java +++ b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/PipesClient.java @@ -76,7 +76,11 @@ public class PipesClient implements Closeable { private final boolean ownsServerManager; private final int pipesClientId; - private ConnectionTuple connectionTuple; + // volatile + connectionLock: closeConnection() can be called concurrently by the + // in-flight parse thread (timeout/crash paths in waitForServer) and the thread + // calling close(). The lock lets one thread atomically claim and null the tuple. + private final Object connectionLock = new Object(); + private volatile ConnectionTuple connectionTuple; private int filesProcessed = 0; /** @@ -118,7 +122,11 @@ public class PipesClient implements Closeable { } private boolean ping() { - if (connectionTuple == null) { + // Snapshot the volatile once: a concurrent closeConnection() can null the + // field at any point, but the local reference stays valid (close() unblocks + // us by closing the socket, surfacing as IOException below - not an NPE). + ConnectionTuple tuple = connectionTuple; + if (tuple == null) { return false; } // Check if server process is still running @@ -126,8 +134,8 @@ public class PipesClient implements Closeable { return false; } try { - PipesMessage.ping().write(connectionTuple.output); - PipesMessage response = PipesMessage.read(connectionTuple.input); + PipesMessage.ping().write(tuple.output); + PipesMessage response = PipesMessage.read(tuple.input); if (response.type() == PipesMessageType.PING) { return true; } @@ -158,20 +166,27 @@ public class PipesClient implements Closeable { * Server lifecycle is managed by PipesParser. */ private void closeConnection() throws InterruptedException { - if (connectionTuple == null) { + // Atomically claim the tuple and null the field so concurrent callers + // (parse thread vs. close() thread) don't deref a field another thread + // has already nulled. Whoever loses the race sees a null tuple and bails. + ConnectionTuple tuple; + synchronized (connectionLock) { + tuple = connectionTuple; + connectionTuple = null; + } + if (tuple == null) { return; } LOG.debug("pipesClientId={}: closing connection", pipesClientId); try { - PipesMessage.shutDown().write(connectionTuple.output); + PipesMessage.shutDown().write(tuple.output); } catch (IOException e) { // swallow } List<IOException> exceptions = new ArrayList<>(); - tryToClose(connectionTuple.input, exceptions); - tryToClose(connectionTuple.output, exceptions); - tryToClose(connectionTuple.socket, exceptions); - connectionTuple = null; + tryToClose(tuple.input, exceptions); + tryToClose(tuple.output, exceptions); + tryToClose(tuple.socket, exceptions); } private void tryToClose(Closeable closeable, List<IOException> exceptions) { @@ -292,20 +307,33 @@ public class PipesClient implements Closeable { // Connect to server Socket socket = serverManager.connect((int) pipesConfig.getSocketTimeoutMs()); - connectionTuple = new ConnectionTuple(socket, - new DataInputStream(new BufferedInputStream(socket.getInputStream())), - new DataOutputStream(new BufferedOutputStream(socket.getOutputStream()))); + synchronized (connectionLock) { + connectionTuple = new ConnectionTuple(socket, + new DataInputStream(new BufferedInputStream(socket.getInputStream())), + new DataOutputStream(new BufferedOutputStream(socket.getOutputStream()))); + } waitForStartup(); } private void writeTask(FetchEmitTuple t) throws IOException { + ConnectionTuple tuple = connectionTuple; + if (tuple == null) { + throw new IOException("connection closed"); + } LOG.debug("pipesClientId={}: sending NEW_REQUEST for id={}", pipesClientId, t.getId()); byte[] bytes = JsonPipesIpc.toBytes(t); - PipesMessage.newRequest(bytes).write(connectionTuple.output); + PipesMessage.newRequest(bytes).write(tuple.output); } private PipesResult waitForServer(FetchEmitTuple t, IntermediateResult intermediateResult) throws InterruptedException { + // Snapshot the volatile once; a concurrent close() may null the field, but the + // local stays valid and its blocking read unblocks via socket close (IOException). + ConnectionTuple tuple = connectionTuple; + if (tuple == null) { + return buildFatalResult(t.getId(), t.getEmitKey(), UNSPECIFIED_CRASH, + intermediateResult.get()); + } TimeoutLimits limits = TimeoutLimits.get(t.getParseContext()); long progressTimeoutMillis = limits.getProgressTimeoutMillis(); long totalTaskTimeoutMillis = limits.getTotalTaskTimeoutMillis(); @@ -337,12 +365,12 @@ public class PipesClient implements Closeable { intermediateResult.get()); } try { - PipesMessage msg = PipesMessage.read(connectionTuple.input); + PipesMessage msg = PipesMessage.read(tuple.input); LOG.trace("clientId={}: received message type={} id={}", pipesClientId, msg.type(), t.getId()); // Send ACK only for messages that require it if (msg.type().requiresAck()) { - PipesMessage.ack().write(connectionTuple.output); + PipesMessage.ack().write(tuple.output); } switch (msg.type()) { @@ -429,12 +457,16 @@ public class PipesClient implements Closeable { } private void waitForStartup() throws IOException { - PipesMessage msg = PipesMessage.read(connectionTuple.input); + ConnectionTuple tuple = connectionTuple; + if (tuple == null) { + throw new IOException("connection closed"); + } + PipesMessage msg = PipesMessage.read(tuple.input); if (msg.type() == PipesMessageType.READY) { LOG.info("clientId={}: server successfully started", pipesClientId); } else if (msg.type() == PipesMessageType.STARTUP_FAILED) { // Send ACK for startup failure - PipesMessage.ack().write(connectionTuple.output); + PipesMessage.ack().write(tuple.output); String errorMsg = new String(msg.payload(), StandardCharsets.UTF_8); LOG.error("clientId={}: Server failed to start: {}", pipesClientId, errorMsg); throw new ServerInitializationException(errorMsg);
