This is an automated email from the ASF dual-hosted git repository.

tallison pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/tika.git


The following commit(s) were added to refs/heads/main by this push:
     new 30b5b26396 TIKA-4628 -- improve pipesClient+pipesServer ipc: critical 
socket.setTcpNoDelay(true) and migrate to pure jackson serialization (#2546)
30b5b26396 is described below

commit 30b5b26396496a2f3d314f204e6322b3049a7085
Author: Tim Allison <[email protected]>
AuthorDate: Fri Jan 23 15:17:37 2026 -0500

    TIKA-4628 -- improve pipesClient+pipesServer ipc: critical 
socket.setTcpNoDelay(true) and migrate to pure jackson serialization (#2546)
    
    * TIKA-4628 -- improve pipesClient+pipesServer ipc: critical 
socket.setTcpNoDelay(true) and migrate to pure jackson serialization
---
 .../ParsingEmbeddedDocumentExtractor.java          |  14 +++
 .../java/org/apache/tika/parser/ParseRecord.java   | 120 +++++++++++++++++++++
 .../apache/tika/parser/RecursiveParserWrapper.java |  12 +++
 .../sax/AbstractRecursiveParserWrapperHandler.java |   2 +
 .../tika/pipes/api/emitter/AbstractEmitter.java    |   7 +-
 .../pipes/api/emitter/AbstractStreamEmitter.java   |   7 +-
 .../apache/tika/pipes/api/emitter/EmitData.java    |   6 +-
 tika-pipes/tika-pipes-core/pom.xml                 |   4 +
 .../org/apache/tika/pipes/core/PipesClient.java    |  46 ++++----
 .../tika/pipes/core/emitter/EmitDataImpl.java      |  32 +++---
 .../core/serialization/EmitDataDeserializer.java   |  75 +++++++++++++
 .../core/serialization/EmitDataSerializer.java     |  45 ++++++++
 .../pipes/core/serialization/JsonPipesIpc.java     |  88 +++++++++++++++
 .../serialization/PipesResultDeserializer.java     |  65 +++++++++++
 .../core/serialization/PipesResultSerializer.java  |  46 ++++++++
 .../tika/pipes/core/server/ParseHandler.java       |  32 +++---
 .../apache/tika/pipes/core/server/PipesServer.java |  49 +++------
 .../apache/tika/pipes/core/MockPassbackFilter.java |  52 +++++++++
 .../apache/tika/pipes/core/PassbackFilterTest.java |  24 +----
 .../apache/tika/pipes/core/PipesClientTest.java    |  24 +++--
 .../tika/pipes/emitter/jdbc/JDBCEmitter.java       |   6 +-
 tika-serialization/pom.xml                         |   5 +
 .../apache/tika/config/loader/FrameworkConfig.java |  11 +-
 .../config/loader/TikaObjectMapperFactory.java     |  18 +++-
 .../org/apache/tika/serialization/TikaModule.java  |   8 +-
 .../serdes/ParseContextDeserializer.java           |  15 ++-
 .../serdes/ParseContextSerializer.java             |   7 +-
 .../apache/tika/serialization/SmileFormatTest.java | 110 +++++++++++++++++++
 28 files changed, 800 insertions(+), 130 deletions(-)

diff --git 
a/tika-core/src/main/java/org/apache/tika/extractor/ParsingEmbeddedDocumentExtractor.java
 
b/tika-core/src/main/java/org/apache/tika/extractor/ParsingEmbeddedDocumentExtractor.java
index b6112cf09f..4b1e406183 100644
--- 
a/tika-core/src/main/java/org/apache/tika/extractor/ParsingEmbeddedDocumentExtractor.java
+++ 
b/tika-core/src/main/java/org/apache/tika/extractor/ParsingEmbeddedDocumentExtractor.java
@@ -61,11 +61,19 @@ public class ParsingEmbeddedDocumentExtractor implements 
EmbeddedDocumentExtract
 
     @Override
     public boolean shouldParseEmbedded(Metadata metadata) {
+        // Check ParseRecord for depth/count limits first
+        ParseRecord parseRecord = context.get(ParseRecord.class);
+        if (parseRecord != null && !parseRecord.shouldParseEmbedded()) {
+            return false;
+        }
+
+        // Then check DocumentSelector for content-based filtering
         DocumentSelector selector = context.get(DocumentSelector.class);
         if (selector != null) {
             return selector.select(metadata);
         }
 
+        // Then check FilenameFilter
         FilenameFilter filter = context.get(FilenameFilter.class);
         if (filter != null) {
             String name = metadata.get(TikaCoreProperties.RESOURCE_NAME_KEY);
@@ -81,6 +89,12 @@ public class ParsingEmbeddedDocumentExtractor implements 
EmbeddedDocumentExtract
     public void parseEmbedded(
             TikaInputStream tis, ContentHandler handler, Metadata metadata, 
ParseContext parseContext, boolean outputHtml)
             throws SAXException, IOException {
+        // Increment embedded count for tracking
+        ParseRecord parseRecord = context.get(ParseRecord.class);
+        if (parseRecord != null) {
+            parseRecord.incrementEmbeddedCount();
+        }
+
         if (outputHtml) {
             AttributesImpl attributes = new AttributesImpl();
             attributes.addAttribute("", "class", "class", "CDATA", 
"package-entry");
diff --git a/tika-core/src/main/java/org/apache/tika/parser/ParseRecord.java 
b/tika-core/src/main/java/org/apache/tika/parser/ParseRecord.java
index ca0edc567c..9204118b08 100644
--- a/tika-core/src/main/java/org/apache/tika/parser/ParseRecord.java
+++ b/tika-core/src/main/java/org/apache/tika/parser/ParseRecord.java
@@ -27,6 +27,10 @@ import org.apache.tika.metadata.Metadata;
  * Use this class to store exceptions, warnings and other information
  * during the parse.  This information is added to the parent's metadata
  * after the parse by the {@link CompositeParser}.
+ * <p>
+ * This class also tracks embedded document processing limits (depth and count)
+ * which can be configured via {@link #setMaxEmbeddedDepth(int)} and
+ * {@link #setMaxEmbeddedCount(int)}.
  */
 public class ParseRecord {
 
@@ -51,6 +55,13 @@ public class ParseRecord {
 
     private boolean writeLimitReached = false;
 
+    // Embedded document tracking
+    private int embeddedCount = 0;
+    private int maxEmbeddedDepth = -1;
+    private int maxEmbeddedCount = -1;
+    private boolean embeddedDepthLimitReached = false;
+    private boolean embeddedCountLimitReached = false;
+
     void beforeParse() {
         depth++;
     }
@@ -111,4 +122,113 @@ public class ParseRecord {
     public List<Metadata> getMetadataList() {
         return metadataList;
     }
+
+    /**
+     * Checks whether an embedded document should be parsed based on 
configured limits.
+     * This should be called before parsing each embedded document.
+     * <p>
+     * If this returns false, the caller should skip parsing the embedded 
document
+     * and the appropriate limit flag will be set.
+     * <p>
+     * Note: The count limit is a hard stop (once hit, no more embedded docs 
are parsed).
+     * The depth limit only affects documents at that depth - sibling 
documents at
+     * shallower depths will still be parsed.
+     *
+     * @return true if the embedded document should be parsed, false if limits 
are exceeded
+     */
+    public boolean shouldParseEmbedded() {
+        // Count limit is a hard stop - once we've hit max, no more embedded 
parsing
+        if (embeddedCountLimitReached) {
+            return false;
+        }
+        if (maxEmbeddedCount >= 0 && embeddedCount >= maxEmbeddedCount) {
+            embeddedCountLimitReached = true;
+            return false;
+        }
+
+        // Depth limit only applies to current depth - siblings at shallower 
levels
+        // can still be parsed. The flag is set for reporting purposes.
+        // depth is 1-indexed (main doc is depth 1), so embedded depth limit 
of N
+        // means we allow parsing up to depth N+1
+        if (maxEmbeddedDepth >= 0 && depth > maxEmbeddedDepth) {
+            embeddedDepthLimitReached = true;
+            return false;
+        }
+        return true;
+    }
+
+    /**
+     * Increments the embedded document count. Should be called when an 
embedded
+     * document is about to be parsed.
+     */
+    public void incrementEmbeddedCount() {
+        embeddedCount++;
+    }
+
+    /**
+     * Gets the current count of embedded documents processed.
+     *
+     * @return the embedded document count
+     */
+    public int getEmbeddedCount() {
+        return embeddedCount;
+    }
+
+    /**
+     * Sets the maximum depth for parsing embedded documents.
+     * A value of -1 means unlimited (the default).
+     * A value of 0 means no embedded documents will be parsed.
+     * A value of 1 means only first-level embedded documents will be parsed, 
etc.
+     *
+     * @param maxEmbeddedDepth the maximum embedded depth, or -1 for unlimited
+     */
+    public void setMaxEmbeddedDepth(int maxEmbeddedDepth) {
+        this.maxEmbeddedDepth = maxEmbeddedDepth;
+    }
+
+    /**
+     * Gets the maximum depth for parsing embedded documents.
+     *
+     * @return the maximum embedded depth, or -1 if unlimited
+     */
+    public int getMaxEmbeddedDepth() {
+        return maxEmbeddedDepth;
+    }
+
+    /**
+     * Sets the maximum number of embedded documents to parse.
+     * A value of -1 means unlimited (the default).
+     *
+     * @param maxEmbeddedCount the maximum embedded count, or -1 for unlimited
+     */
+    public void setMaxEmbeddedCount(int maxEmbeddedCount) {
+        this.maxEmbeddedCount = maxEmbeddedCount;
+    }
+
+    /**
+     * Gets the maximum number of embedded documents to parse.
+     *
+     * @return the maximum embedded count, or -1 if unlimited
+     */
+    public int getMaxEmbeddedCount() {
+        return maxEmbeddedCount;
+    }
+
+    /**
+     * Returns whether the embedded depth limit was reached during parsing.
+     *
+     * @return true if the depth limit was reached
+     */
+    public boolean isEmbeddedDepthLimitReached() {
+        return embeddedDepthLimitReached;
+    }
+
+    /**
+     * Returns whether the embedded count limit was reached during parsing.
+     *
+     * @return true if the count limit was reached
+     */
+    public boolean isEmbeddedCountLimitReached() {
+        return embeddedCountLimitReached;
+    }
 }
diff --git 
a/tika-core/src/main/java/org/apache/tika/parser/RecursiveParserWrapper.java 
b/tika-core/src/main/java/org/apache/tika/parser/RecursiveParserWrapper.java
index 07159dba01..f774e8efe6 100644
--- a/tika-core/src/main/java/org/apache/tika/parser/RecursiveParserWrapper.java
+++ b/tika-core/src/main/java/org/apache/tika/parser/RecursiveParserWrapper.java
@@ -227,6 +227,18 @@ public class RecursiveParserWrapper extends 
ParserDecorator {
             if 
(parserState.recursiveParserWrapperHandler.hasHitMaximumEmbeddedResources()) {
                 return;
             }
+
+            // Also check ParseRecord limits if available
+            ParseRecord parseRecord = context.get(ParseRecord.class);
+            if (parseRecord != null && !parseRecord.shouldParseEmbedded()) {
+                return;
+            }
+
+            // Increment embedded count in ParseRecord
+            if (parseRecord != null) {
+                parseRecord.incrementEmbeddedCount();
+            }
+
             // Work out what this thing is
             String objectName = getResourceName(metadata, 
parserState.unknownCount);
             String objectLocation = this.location + objectName;
diff --git 
a/tika-core/src/main/java/org/apache/tika/sax/AbstractRecursiveParserWrapperHandler.java
 
b/tika-core/src/main/java/org/apache/tika/sax/AbstractRecursiveParserWrapperHandler.java
index ea4efedf6b..5e6a9fb2ce 100644
--- 
a/tika-core/src/main/java/org/apache/tika/sax/AbstractRecursiveParserWrapperHandler.java
+++ 
b/tika-core/src/main/java/org/apache/tika/sax/AbstractRecursiveParserWrapperHandler.java
@@ -37,6 +37,8 @@ public abstract class AbstractRecursiveParserWrapperHandler 
extends DefaultHandl
 
     public final static Property EMBEDDED_RESOURCE_LIMIT_REACHED = 
Property.internalBoolean(
             TikaCoreProperties.TIKA_META_EXCEPTION_PREFIX + 
"embedded_resource_limit_reached");
+    public final static Property EMBEDDED_DEPTH_LIMIT_REACHED = 
Property.internalBoolean(
+            TikaCoreProperties.TIKA_META_EXCEPTION_PREFIX + 
"embedded_depth_limit_reached");
     private static final int MAX_DEPTH = 100;
     private final ContentHandlerFactory contentHandlerFactory;
     private final int maxEmbeddedResources;
diff --git 
a/tika-pipes/tika-pipes-api/src/main/java/org/apache/tika/pipes/api/emitter/AbstractEmitter.java
 
b/tika-pipes/tika-pipes-api/src/main/java/org/apache/tika/pipes/api/emitter/AbstractEmitter.java
index 324d16ace9..250b3df9d9 100644
--- 
a/tika-pipes/tika-pipes-api/src/main/java/org/apache/tika/pipes/api/emitter/AbstractEmitter.java
+++ 
b/tika-pipes/tika-pipes-api/src/main/java/org/apache/tika/pipes/api/emitter/AbstractEmitter.java
@@ -19,6 +19,7 @@ package org.apache.tika.pipes.api.emitter;
 import java.io.IOException;
 import java.util.List;
 
+import org.apache.tika.parser.ParseContext;
 import org.apache.tika.plugins.AbstractTikaExtension;
 import org.apache.tika.plugins.ExtensionConfig;
 
@@ -31,7 +32,11 @@ public abstract class AbstractEmitter extends 
AbstractTikaExtension implements E
     @Override
     public void emit(List<? extends EmitData> emitData) throws IOException {
         for (EmitData item : emitData) {
-            emit(item.getEmitKey(), item.getMetadataList(), 
item.getParseContext());
+            ParseContext parseContext = item.getParseContext();
+            if (parseContext == null) {
+                parseContext = new ParseContext();
+            }
+            emit(item.getEmitKey(), item.getMetadataList(), parseContext);
         }
     }
 }
diff --git 
a/tika-pipes/tika-pipes-api/src/main/java/org/apache/tika/pipes/api/emitter/AbstractStreamEmitter.java
 
b/tika-pipes/tika-pipes-api/src/main/java/org/apache/tika/pipes/api/emitter/AbstractStreamEmitter.java
index 892180d8c7..98727205f1 100644
--- 
a/tika-pipes/tika-pipes-api/src/main/java/org/apache/tika/pipes/api/emitter/AbstractStreamEmitter.java
+++ 
b/tika-pipes/tika-pipes-api/src/main/java/org/apache/tika/pipes/api/emitter/AbstractStreamEmitter.java
@@ -19,6 +19,7 @@ package org.apache.tika.pipes.api.emitter;
 import java.io.IOException;
 import java.util.List;
 
+import org.apache.tika.parser.ParseContext;
 import org.apache.tika.plugins.AbstractTikaExtension;
 import org.apache.tika.plugins.ExtensionConfig;
 
@@ -31,7 +32,11 @@ public abstract class AbstractStreamEmitter extends 
AbstractTikaExtension implem
     @Override
     public void emit(List<? extends EmitData> emitData) throws IOException {
         for (EmitData item : emitData) {
-            emit(item.getEmitKey(), item.getMetadataList(), 
item.getParseContext());
+            ParseContext parseContext = item.getParseContext();
+            if (parseContext == null) {
+                parseContext = new ParseContext();
+            }
+            emit(item.getEmitKey(), item.getMetadataList(), parseContext);
         }
     }
 }
diff --git 
a/tika-pipes/tika-pipes-api/src/main/java/org/apache/tika/pipes/api/emitter/EmitData.java
 
b/tika-pipes/tika-pipes-api/src/main/java/org/apache/tika/pipes/api/emitter/EmitData.java
index 6af852a63b..ec6266ed23 100644
--- 
a/tika-pipes/tika-pipes-api/src/main/java/org/apache/tika/pipes/api/emitter/EmitData.java
+++ 
b/tika-pipes/tika-pipes-api/src/main/java/org/apache/tika/pipes/api/emitter/EmitData.java
@@ -30,6 +30,10 @@ public interface EmitData {
 
     long getEstimatedSizeBytes();
 
+    /**
+     * Gets the ParseContext. This is not serialized over IPC - it's restored
+     * by PipesClient after deserialization from the original FetchEmitTuple.
+     * May return null if not set.
+     */
     ParseContext getParseContext();
-
 }
diff --git a/tika-pipes/tika-pipes-core/pom.xml 
b/tika-pipes/tika-pipes-core/pom.xml
index 8e4ce2e4ef..446b9bca74 100644
--- a/tika-pipes/tika-pipes-core/pom.xml
+++ b/tika-pipes/tika-pipes-core/pom.xml
@@ -61,6 +61,10 @@
       <artifactId>tika-serialization</artifactId>
       <version>${project.version}</version>
     </dependency>
+    <dependency>
+      <groupId>com.fasterxml.jackson.dataformat</groupId>
+      <artifactId>jackson-dataformat-smile</artifactId>
+    </dependency>
     <dependency>
       <groupId>${project.groupId}</groupId>
       <artifactId>tika-pipes-iterator-commons</artifactId>
diff --git 
a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/PipesClient.java
 
b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/PipesClient.java
index 053856bbeb..5c175208ab 100644
--- 
a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/PipesClient.java
+++ 
b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/PipesClient.java
@@ -24,13 +24,14 @@ import static 
org.apache.tika.pipes.core.PipesClient.COMMANDS.ACK;
 import static 
org.apache.tika.pipes.core.server.PipesServer.PROCESSING_STATUS.FINISHED;
 import static 
org.apache.tika.pipes.core.server.PipesServer.PROCESSING_STATUS.READY;
 
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
 import java.io.Closeable;
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
 import java.net.InetAddress;
+import java.net.InetSocketAddress;
 import java.net.ServerSocket;
 import java.net.Socket;
 import java.net.SocketTimeoutException;
@@ -49,7 +50,6 @@ import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.commons.io.FileUtils;
-import org.apache.commons.io.input.UnsynchronizedByteArrayInputStream;
 import org.apache.commons.io.output.UnsynchronizedByteArrayOutputStream;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -62,6 +62,7 @@ import org.apache.tika.pipes.api.FetchEmitTuple;
 import org.apache.tika.pipes.api.PipesResult;
 import org.apache.tika.pipes.api.emitter.EmitKey;
 import org.apache.tika.pipes.core.emitter.EmitDataImpl;
+import org.apache.tika.pipes.core.serialization.JsonPipesIpc;
 import org.apache.tika.pipes.core.server.IntermediateResult;
 import org.apache.tika.pipes.core.server.PipesServer;
 import org.apache.tika.utils.ExceptionUtils;
@@ -254,14 +255,7 @@ public class PipesClient implements Closeable {
 
     private void writeTask(FetchEmitTuple t) throws IOException {
         LOG.debug("pipesClientId={}: sending NEW_REQUEST for id={}", 
pipesClientId, t.getId());
-        UnsynchronizedByteArrayOutputStream bos = 
UnsynchronizedByteArrayOutputStream
-                .builder()
-                .get();
-        try (ObjectOutputStream objectOutputStream = new 
ObjectOutputStream(bos)) {
-            objectOutputStream.writeObject(t);
-        }
-
-        byte[] bytes = bos.toByteArray();
+        byte[] bytes = JsonPipesIpc.toBytes(t);
         serverTuple.output.write(COMMANDS.NEW_REQUEST.getByte());
         serverTuple.output.writeInt(bytes.length);
         serverTuple.output.write(bytes);
@@ -312,7 +306,12 @@ public class PipesClient implements Closeable {
                         lastUpdate = Instant.now();
                         break;
                     case FINISHED:
-                        return readResult(PipesResult.class);
+                        PipesResult result = readResult(PipesResult.class);
+                        // Restore ParseContext from original FetchEmitTuple 
(not serialized back from server)
+                        if (result.emitData() instanceof EmitDataImpl 
emitDataImpl) {
+                            emitDataImpl.setParseContext(t.getParseContext());
+                        }
+                        return result;
                 }
             } catch (SocketTimeoutException e) {
                 LOG.warn("clientId={}: Socket timeout exception while waiting 
for server", pipesClientId, e);
@@ -419,21 +418,13 @@ public class PipesClient implements Closeable {
         return status;
     }
 
-    private <T> T readResult(Class<? extends T> clazz) throws IOException {
+    private <T> T readResult(Class<T> clazz) throws IOException {
         int len = serverTuple.input.readInt();
         byte[] bytes = new byte[len];
         serverTuple.input.readFully(bytes);
 
         writeAck();
-
-        try (ObjectInputStream objectInputStream = new 
ObjectInputStream(UnsynchronizedByteArrayInputStream
-                .builder()
-                .setByteArray(bytes)
-                .get())) {
-            return clazz.cast(objectInputStream.readObject());
-        } catch (ClassNotFoundException e) {
-            throw new IOException(e);
-        }
+        return JsonPipesIpc.fromBytes(bytes, clazz);
     }
 
     private void writeAck() throws IOException {
@@ -443,7 +434,9 @@ public class PipesClient implements Closeable {
 
 
     private void restart() throws InterruptedException, IOException, 
TimeoutException {
-        ServerSocket serverSocket = new ServerSocket(0, 50, 
InetAddress.getLoopbackAddress());
+        ServerSocket serverSocket = new ServerSocket();
+        serverSocket.setReuseAddress(true);
+        serverSocket.bind(new 
InetSocketAddress(InetAddress.getLoopbackAddress(), 0), 50);
         int port = serverSocket.getLocalPort();
         if (serverTuple != null && serverTuple.process != null) {
             int oldPort = serverTuple.serverSocket.getLocalPort();
@@ -494,9 +487,10 @@ public class PipesClient implements Closeable {
             }
         }
         socket.setSoTimeout((int) pipesConfig.getSocketTimeoutMs());
-        socket.setTcpNoDelay(true); // Disable Nagle's algorithm to avoid 
~40ms delays on small writes
-        serverTuple = new ServerTuple(process, serverSocket, socket, new 
DataInputStream(socket.getInputStream()),
-                new DataOutputStream(socket.getOutputStream()), tmpDir);
+        socket.setTcpNoDelay(true);
+        serverTuple = new ServerTuple(process, serverSocket, socket,
+                new DataInputStream(new 
BufferedInputStream(socket.getInputStream())),
+                new DataOutputStream(new 
BufferedOutputStream(socket.getOutputStream())), tmpDir);
         waitForStartup();
     }
 
diff --git 
a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/emitter/EmitDataImpl.java
 
b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/emitter/EmitDataImpl.java
index 1aee991f11..930d594918 100644
--- 
a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/emitter/EmitDataImpl.java
+++ 
b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/emitter/EmitDataImpl.java
@@ -16,7 +16,6 @@
  */
 package org.apache.tika.pipes.core.emitter;
 
-import java.io.Serializable;
 import java.util.List;
 
 import org.apache.tika.metadata.Metadata;
@@ -24,31 +23,23 @@ import org.apache.tika.parser.ParseContext;
 import org.apache.tika.pipes.api.emitter.EmitData;
 import org.apache.tika.utils.StringUtils;
 
-public class EmitDataImpl implements Serializable, EmitData {
-    /**
-     * Serial version UID
-     */
-    private static final long serialVersionUID = -3861669115439125268L;
+public class EmitDataImpl implements EmitData {
 
     private final String emitKey;
     private final List<Metadata> metadataList;
     private final String containerStackTrace;
-    private ParseContext parseContext = null;
+    // ParseContext is not serialized - it's set by PipesClient after 
deserialization
+    private ParseContext parseContext;
 
     public EmitDataImpl(String emitKey, List<Metadata> metadataList) {
         this(emitKey, metadataList, StringUtils.EMPTY);
     }
 
     public EmitDataImpl(String emitKey, List<Metadata> metadataList, String 
containerStackTrace) {
-        this(emitKey, metadataList, containerStackTrace, new ParseContext());
-    }
-
-    public EmitDataImpl(String emitKey, List<Metadata> metadataList, String 
containerStackTrace, ParseContext parseContext) {
         this.emitKey = emitKey;
         this.metadataList = metadataList;
         this.containerStackTrace = (containerStackTrace == null) ? 
StringUtils.EMPTY :
                 containerStackTrace;
-        this.parseContext = parseContext;
     }
 
     public String getEmitKey() {
@@ -67,14 +58,23 @@ public class EmitDataImpl implements Serializable, EmitData 
{
         return estimateSizeInBytes(getEmitKey(), getMetadataList(), 
containerStackTrace);
     }
 
-    public void setParseContext(ParseContext parseContext) {
-        this.parseContext = parseContext;
-    }
-
+    /**
+     * Gets the ParseContext. This is not serialized - it's set by PipesClient
+     * after deserialization from the original FetchEmitTuple.
+     */
+    @Override
     public ParseContext getParseContext() {
         return parseContext;
     }
 
+    /**
+     * Sets the ParseContext. Called by PipesClient after deserialization
+     * to restore the ParseContext from the original FetchEmitTuple.
+     */
+    public void setParseContext(ParseContext parseContext) {
+        this.parseContext = parseContext;
+    }
+
     private static long estimateSizeInBytes(String id, List<Metadata> 
metadataList,
                                             String containerStackTrace) {
         long sz = 36 + id.length() * 2;
diff --git 
a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/serialization/EmitDataDeserializer.java
 
b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/serialization/EmitDataDeserializer.java
new file mode 100644
index 0000000000..8d8c4e303d
--- /dev/null
+++ 
b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/serialization/EmitDataDeserializer.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tika.pipes.core.serialization;
+
+import static 
org.apache.tika.pipes.core.serialization.EmitDataSerializer.CONTAINER_STACK_TRACE;
+import static 
org.apache.tika.pipes.core.serialization.EmitDataSerializer.EMIT_KEY;
+import static 
org.apache.tika.pipes.core.serialization.EmitDataSerializer.METADATA_LIST;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import com.fasterxml.jackson.core.JsonParser;
+import com.fasterxml.jackson.databind.DeserializationContext;
+import com.fasterxml.jackson.databind.JsonDeserializer;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+import org.apache.tika.metadata.Metadata;
+import org.apache.tika.pipes.core.emitter.EmitDataImpl;
+import org.apache.tika.utils.StringUtils;
+
+public class EmitDataDeserializer extends JsonDeserializer<EmitDataImpl> {
+
+    @Override
+    public EmitDataImpl deserialize(JsonParser jsonParser, 
DeserializationContext deserializationContext) throws IOException {
+        JsonNode root = jsonParser.readValueAsTree();
+        ObjectMapper mapper = (ObjectMapper) jsonParser.getCodec();
+
+        String emitKey = readString(EMIT_KEY, root, null, true);
+        List<Metadata> metadataList = readMetadataList(root, mapper);
+        String containerStackTrace = readString(CONTAINER_STACK_TRACE, root, 
StringUtils.EMPTY, false);
+
+        // ParseContext is NOT deserialized - it's restored by PipesClient 
from the original FetchEmitTuple
+        return new EmitDataImpl(emitKey, metadataList, containerStackTrace);
+    }
+
+    private static List<Metadata> readMetadataList(JsonNode root, ObjectMapper 
mapper) throws IOException {
+        JsonNode metadataListNode = root.get(METADATA_LIST);
+        if (metadataListNode == null || !metadataListNode.isArray()) {
+            return new ArrayList<>();
+        }
+        List<Metadata> metadataList = new ArrayList<>();
+        for (JsonNode metadataNode : metadataListNode) {
+            Metadata metadata = mapper.treeToValue(metadataNode, 
Metadata.class);
+            metadataList.add(metadata);
+        }
+        return metadataList;
+    }
+
+    private static String readString(String key, JsonNode root, String 
defaultVal, boolean required) throws IOException {
+        JsonNode node = root.get(key);
+        if (node == null) {
+            if (required) {
+                throw new IOException("Required field missing: " + key);
+            }
+            return defaultVal;
+        }
+        return node.asText();
+    }
+}
diff --git 
a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/serialization/EmitDataSerializer.java
 
b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/serialization/EmitDataSerializer.java
new file mode 100644
index 0000000000..5687088e55
--- /dev/null
+++ 
b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/serialization/EmitDataSerializer.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tika.pipes.core.serialization;
+
+import java.io.IOException;
+
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.fasterxml.jackson.databind.JsonSerializer;
+import com.fasterxml.jackson.databind.SerializerProvider;
+
+import org.apache.tika.pipes.api.emitter.EmitData;
+import org.apache.tika.utils.StringUtils;
+
+public class EmitDataSerializer extends JsonSerializer<EmitData> {
+
+    public static final String EMIT_KEY = "emitKey";
+    public static final String METADATA_LIST = "metadataList";
+    public static final String CONTAINER_STACK_TRACE = "containerStackTrace";
+
+    @Override
+    public void serialize(EmitData emitData, JsonGenerator jsonGenerator, 
SerializerProvider serializerProvider) throws IOException {
+        jsonGenerator.writeStartObject();
+        jsonGenerator.writeStringField(EMIT_KEY, emitData.getEmitKey());
+        jsonGenerator.writeObjectField(METADATA_LIST, 
emitData.getMetadataList());
+        if (!StringUtils.isBlank(emitData.getContainerStackTrace())) {
+            jsonGenerator.writeStringField(CONTAINER_STACK_TRACE, 
emitData.getContainerStackTrace());
+        }
+        // ParseContext is NOT serialized - it's restored by PipesClient from 
the original FetchEmitTuple
+        jsonGenerator.writeEndObject();
+    }
+}
diff --git 
a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/serialization/JsonPipesIpc.java
 
b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/serialization/JsonPipesIpc.java
new file mode 100644
index 0000000000..30cb4c6b6a
--- /dev/null
+++ 
b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/serialization/JsonPipesIpc.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tika.pipes.core.serialization;
+
+import java.io.IOException;
+
+import com.fasterxml.jackson.core.StreamReadConstraints;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.module.SimpleModule;
+import com.fasterxml.jackson.dataformat.smile.SmileFactory;
+
+import org.apache.tika.config.loader.TikaObjectMapperFactory;
+import org.apache.tika.pipes.api.FetchEmitTuple;
+import org.apache.tika.pipes.api.PipesResult;
+import org.apache.tika.pipes.api.emitter.EmitData;
+import org.apache.tika.pipes.core.emitter.EmitDataImpl;
+
+/**
+ * Binary serialization/deserialization for IPC communication between 
PipesClient and PipesServer.
+ * <p>
+ * Uses Jackson's Smile binary format for efficient serialization. Smile is a 
binary JSON format
+ * that is more compact and faster to parse than text JSON, while maintaining 
full compatibility
+ * with the Jackson data binding API.
+ */
+public class JsonPipesIpc {
+
+    private static final ObjectMapper OBJECT_MAPPER;
+
+    static {
+        // Use SmileFactory for binary format - more compact and faster than 
text JSON
+        SmileFactory smileFactory = new SmileFactory();
+
+        // Configure stream constraints for large content (e.g., 30MB+ 
documents)
+        // Default Jackson limit is 20MB which is too small for IPC with large 
documents
+        StreamReadConstraints constraints = StreamReadConstraints.builder()
+                .maxStringLength(Integer.MAX_VALUE)
+                .build();
+        smileFactory.setStreamReadConstraints(constraints);
+
+        // Create mapper with Smile factory and register TikaModule for 
Metadata/ParseContext serializers
+        OBJECT_MAPPER = TikaObjectMapperFactory.createMapper(smileFactory);
+
+        // Add pipes-specific serializers
+        SimpleModule pipesModule = new SimpleModule();
+        pipesModule.addSerializer(FetchEmitTuple.class, new 
FetchEmitTupleSerializer());
+        pipesModule.addDeserializer(FetchEmitTuple.class, new 
FetchEmitTupleDeserializer());
+        pipesModule.addSerializer(EmitData.class, new EmitDataSerializer());
+        pipesModule.addDeserializer(EmitDataImpl.class, new 
EmitDataDeserializer());
+        pipesModule.addSerializer(PipesResult.class, new 
PipesResultSerializer());
+        pipesModule.addDeserializer(PipesResult.class, new 
PipesResultDeserializer());
+        OBJECT_MAPPER.registerModule(pipesModule);
+    }
+
+    /**
+     * Serialize an object to Smile binary format bytes.
+     */
+    public static byte[] toBytes(Object obj) throws IOException {
+        return OBJECT_MAPPER.writeValueAsBytes(obj);
+    }
+
+    /**
+     * Deserialize Smile binary format bytes to an object.
+     */
+    public static <T> T fromBytes(byte[] bytes, Class<T> clazz) throws 
IOException {
+        return OBJECT_MAPPER.readValue(bytes, clazz);
+    }
+
+    /**
+     * Get the configured ObjectMapper for direct use if needed.
+     */
+    public static ObjectMapper getMapper() {
+        return OBJECT_MAPPER;
+    }
+}
diff --git 
a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/serialization/PipesResultDeserializer.java
 
b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/serialization/PipesResultDeserializer.java
new file mode 100644
index 0000000000..72e27ddc61
--- /dev/null
+++ 
b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/serialization/PipesResultDeserializer.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tika.pipes.core.serialization;
+
+import static 
org.apache.tika.pipes.core.serialization.PipesResultSerializer.EMIT_DATA;
+import static 
org.apache.tika.pipes.core.serialization.PipesResultSerializer.MESSAGE;
+import static 
org.apache.tika.pipes.core.serialization.PipesResultSerializer.STATUS;
+
+import java.io.IOException;
+
+import com.fasterxml.jackson.core.JsonParser;
+import com.fasterxml.jackson.databind.DeserializationContext;
+import com.fasterxml.jackson.databind.JsonDeserializer;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+import org.apache.tika.pipes.api.PipesResult;
+import org.apache.tika.pipes.core.emitter.EmitDataImpl;
+
+public class PipesResultDeserializer extends JsonDeserializer<PipesResult> {
+
+    @Override
+    public PipesResult deserialize(JsonParser jsonParser, 
DeserializationContext deserializationContext) throws IOException {
+        JsonNode root = jsonParser.readValueAsTree();
+        ObjectMapper mapper = (ObjectMapper) jsonParser.getCodec();
+
+        String statusStr = readString(STATUS, root, null, true);
+        PipesResult.RESULT_STATUS status = 
PipesResult.RESULT_STATUS.valueOf(statusStr);
+
+        EmitDataImpl emitData = null;
+        JsonNode emitDataNode = root.get(EMIT_DATA);
+        if (emitDataNode != null && !emitDataNode.isNull()) {
+            emitData = mapper.treeToValue(emitDataNode, EmitDataImpl.class);
+        }
+
+        String message = readString(MESSAGE, root, null, false);
+
+        return new PipesResult(status, emitData, message);
+    }
+
+    private static String readString(String key, JsonNode root, String 
defaultVal, boolean required) throws IOException {
+        JsonNode node = root.get(key);
+        if (node == null || node.isNull()) {
+            if (required) {
+                throw new IOException("Required field missing: " + key);
+            }
+            return defaultVal;
+        }
+        return node.asText();
+    }
+}
diff --git 
a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/serialization/PipesResultSerializer.java
 
b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/serialization/PipesResultSerializer.java
new file mode 100644
index 0000000000..ebf01a5a47
--- /dev/null
+++ 
b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/serialization/PipesResultSerializer.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tika.pipes.core.serialization;
+
+import java.io.IOException;
+
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.fasterxml.jackson.databind.JsonSerializer;
+import com.fasterxml.jackson.databind.SerializerProvider;
+
+import org.apache.tika.pipes.api.PipesResult;
+import org.apache.tika.utils.StringUtils;
+
+public class PipesResultSerializer extends JsonSerializer<PipesResult> {
+
+    public static final String STATUS = "status";
+    public static final String EMIT_DATA = "emitData";
+    public static final String MESSAGE = "message";
+
+    @Override
+    public void serialize(PipesResult pipesResult, JsonGenerator 
jsonGenerator, SerializerProvider serializerProvider) throws IOException {
+        jsonGenerator.writeStartObject();
+        jsonGenerator.writeStringField(STATUS, pipesResult.status().name());
+        if (pipesResult.emitData() != null) {
+            jsonGenerator.writeObjectField(EMIT_DATA, pipesResult.emitData());
+        }
+        if (!StringUtils.isBlank(pipesResult.message())) {
+            jsonGenerator.writeStringField(MESSAGE, pipesResult.message());
+        }
+        jsonGenerator.writeEndObject();
+    }
+}
diff --git 
a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/server/ParseHandler.java
 
b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/server/ParseHandler.java
index 4330bedd99..b6d31d0f00 100644
--- 
a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/server/ParseHandler.java
+++ 
b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/server/ParseHandler.java
@@ -35,7 +35,6 @@ import org.apache.tika.digest.SkipContainerDocumentDigest;
 import org.apache.tika.exception.EncryptedDocumentException;
 import org.apache.tika.exception.TikaConfigException;
 import org.apache.tika.exception.WriteLimitReachedException;
-import org.apache.tika.extractor.DocumentSelector;
 import org.apache.tika.extractor.EmbeddedDocumentBytesHandler;
 import org.apache.tika.io.TikaInputStream;
 import org.apache.tika.metadata.Metadata;
@@ -43,10 +42,12 @@ import org.apache.tika.metadata.TikaCoreProperties;
 import org.apache.tika.mime.MediaType;
 import org.apache.tika.parser.AutoDetectParser;
 import org.apache.tika.parser.ParseContext;
+import org.apache.tika.parser.ParseRecord;
 import org.apache.tika.parser.RecursiveParserWrapper;
 import org.apache.tika.pipes.api.FetchEmitTuple;
 import org.apache.tika.pipes.api.ParseMode;
 import org.apache.tika.pipes.core.extractor.EmbeddedDocumentBytesConfig;
+import org.apache.tika.sax.AbstractRecursiveParserWrapperHandler;
 import org.apache.tika.sax.BasicContentHandlerFactory;
 import org.apache.tika.sax.ContentHandlerFactory;
 import org.apache.tika.sax.RecursiveParserWrapperHandler;
@@ -214,18 +215,18 @@ class ParseHandler {
         if (contentHandlerFactory instanceof BasicContentHandlerFactory) {
             maxEmbedded = ((BasicContentHandlerFactory) 
contentHandlerFactory).getMaxEmbeddedResources();
         }
-        final int finalMaxEmbedded = maxEmbedded;
-        parseContext.set(DocumentSelector.class, new DocumentSelector() {
-            int embedded = 0;
 
-            @Override
-            public boolean select(Metadata metadata) {
-                if (finalMaxEmbedded < 0) {
-                    return true;
-                }
-                return embedded++ < finalMaxEmbedded;
-            }
-        });
+        // Configure ParseRecord for embedded document limits
+        // ParseRecord is created by CompositeParser if not present, but we 
configure it here
+        // to set the embedded count limit before parsing starts
+        ParseRecord parseRecord = parseContext.get(ParseRecord.class);
+        if (parseRecord == null) {
+            parseRecord = new ParseRecord();
+            parseContext.set(ParseRecord.class, parseRecord);
+        }
+        if (maxEmbedded >= 0) {
+            parseRecord.setMaxEmbeddedCount(maxEmbedded);
+        }
 
         String containerException = null;
         long start = System.currentTimeMillis();
@@ -259,6 +260,13 @@ class ParseHandler {
             if (writeLimitReached) {
                 metadata.set(TikaCoreProperties.WRITE_LIMIT_REACHED, true);
             }
+            // Set limit reached flags from ParseRecord
+            if (parseRecord.isEmbeddedCountLimitReached()) {
+                
metadata.set(AbstractRecursiveParserWrapperHandler.EMBEDDED_RESOURCE_LIMIT_REACHED,
 true);
+            }
+            if (parseRecord.isEmbeddedDepthLimitReached()) {
+                
metadata.set(AbstractRecursiveParserWrapperHandler.EMBEDDED_DEPTH_LIMIT_REACHED,
 true);
+            }
             if (LOG.isTraceEnabled()) {
                 LOG.trace("timer -- parse only time: {} ms", 
System.currentTimeMillis() - start);
             }
diff --git 
a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/server/PipesServer.java
 
b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/server/PipesServer.java
index a97e8557a9..6e52ceb4d8 100644
--- 
a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/server/PipesServer.java
+++ 
b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/server/PipesServer.java
@@ -22,12 +22,11 @@ import static 
org.apache.tika.pipes.core.server.PipesServer.PROCESSING_STATUS.IN
 import static 
org.apache.tika.pipes.core.server.PipesServer.PROCESSING_STATUS.OOM;
 import static 
org.apache.tika.pipes.core.server.PipesServer.PROCESSING_STATUS.TIMEOUT;
 
-import java.io.ByteArrayOutputStream;
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.net.Socket;
@@ -47,8 +46,6 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 
-import org.apache.commons.io.input.UnsynchronizedByteArrayInputStream;
-import org.apache.commons.io.output.UnsynchronizedByteArrayOutputStream;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.xml.sax.SAXException;
@@ -74,6 +71,7 @@ import org.apache.tika.pipes.core.config.ConfigStore;
 import org.apache.tika.pipes.core.config.ConfigStoreFactory;
 import org.apache.tika.pipes.core.emitter.EmitterManager;
 import org.apache.tika.pipes.core.fetcher.FetcherManager;
+import org.apache.tika.pipes.core.serialization.JsonPipesIpc;
 import org.apache.tika.plugins.ExtensionConfig;
 import org.apache.tika.plugins.TikaPluginManager;
 import org.apache.tika.sax.ContentHandlerFactory;
@@ -167,8 +165,8 @@ public class PipesServer implements AutoCloseable {
             socket.connect(new 
InetSocketAddress(InetAddress.getLoopbackAddress(), port), 
PipesClient.SOCKET_CONNECT_TIMEOUT_MS);
             socket.setTcpNoDelay(true); // Disable Nagle's algorithm to avoid 
~40ms delays on small writes
 
-            DataInputStream dis = new DataInputStream(socket.getInputStream());
-            DataOutputStream dos = new 
DataOutputStream(socket.getOutputStream());
+            DataInputStream dis = new DataInputStream(new 
BufferedInputStream(socket.getInputStream()));
+            DataOutputStream dos = new DataOutputStream(new 
BufferedOutputStream(socket.getOutputStream()));
         try {
             TikaLoader tikaLoader = TikaLoader.load(tikaConfigPath);
             TikaJsonConfig tikaJsonConfig = tikaLoader.getConfig();
@@ -176,6 +174,7 @@ public class PipesServer implements AutoCloseable {
 
             // Set socket timeout from config after loading PipesConfig
             socket.setSoTimeout((int) pipesConfig.getSocketTimeoutMs());
+            socket.setTcpNoDelay(true);
 
             MetadataFilter metadataFilter = tikaLoader.loadMetadataFilters();
             ContentHandlerFactory contentHandlerFactory = 
tikaLoader.loadContentHandlerFactory();
@@ -410,10 +409,9 @@ public class PipesServer implements AutoCloseable {
     private void handleCrash(PROCESSING_STATUS processingStatus, String id, 
Throwable t) {
         LOG.error("{}: {}", processingStatus, id, t);
         String msg = (t != null) ? ExceptionUtils.getStackTrace(t) : "";
-        ByteArrayOutputStream bos = new ByteArrayOutputStream();
-        try (ObjectOutputStream oos = new ObjectOutputStream(bos)) {
-            oos.writeObject(msg);
-            write(processingStatus, bos.toByteArray());
+        try {
+            byte[] bytes = JsonPipesIpc.toBytes(msg);
+            write(processingStatus, bytes);
             awaitAck();
         } catch (IOException e) {
             //swallow
@@ -444,19 +442,12 @@ public class PipesServer implements AutoCloseable {
             int length = input.readInt();
             byte[] bytes = new byte[length];
             input.readFully(bytes);
-
-            try (ObjectInputStream objectInputStream = new ObjectInputStream(
-                    
UnsynchronizedByteArrayInputStream.builder().setByteArray(bytes).get())) {
-                return (FetchEmitTuple) objectInputStream.readObject();
-            }
+            return JsonPipesIpc.fromBytes(bytes, FetchEmitTuple.class);
         } catch (IOException e) {
-            LOG.error("problem reading tuple", e);
-            exit(1);
-        } catch (ClassNotFoundException e) {
-            LOG.error("can't find class?!", e);
-            exit(1);
+            LOG.error("problem reading/deserializing FetchEmitTuple", e);
+            handleCrash(PROCESSING_STATUS.UNSPECIFIED_CRASH, "unknown", e);
         }
-        //unreachable, no?!
+        //unreachable - handleCrash calls exit
         return null;
     }
 
@@ -518,11 +509,8 @@ public class PipesServer implements AutoCloseable {
 
     private void write(PROCESSING_STATUS processingStatus, PipesResult 
pipesResult) {
         try {
-            UnsynchronizedByteArrayOutputStream bos = 
UnsynchronizedByteArrayOutputStream.builder().get();
-            try (ObjectOutputStream objectOutputStream = new 
ObjectOutputStream(bos)) {
-                objectOutputStream.writeObject(pipesResult);
-            }
-            write(processingStatus, bos.toByteArray());
+            byte[] bytes = JsonPipesIpc.toBytes(pipesResult);
+            write(processingStatus, bytes);
         } catch (IOException e) {
             LOG.error("problem writing emit data (forking process shutdown?)", 
e);
             exit(1);
@@ -531,11 +519,8 @@ public class PipesServer implements AutoCloseable {
 
     private void writeIntermediate(Metadata metadata) {
         try {
-            UnsynchronizedByteArrayOutputStream bos = 
UnsynchronizedByteArrayOutputStream.builder().get();
-            try (ObjectOutputStream objectOutputStream = new 
ObjectOutputStream(bos)) {
-                objectOutputStream.writeObject(metadata);
-            }
-            write(INTERMEDIATE_RESULT, bos.toByteArray());
+            byte[] bytes = JsonPipesIpc.toBytes(metadata);
+            write(INTERMEDIATE_RESULT, bytes);
         } catch (IOException e) {
             LOG.error("problem writing intermediate data (forking process 
shutdown?)", e);
             exit(1);
diff --git 
a/tika-pipes/tika-pipes-integration-tests/src/test/java/org/apache/tika/pipes/core/MockPassbackFilter.java
 
b/tika-pipes/tika-pipes-integration-tests/src/test/java/org/apache/tika/pipes/core/MockPassbackFilter.java
new file mode 100644
index 0000000000..b3bb3a7889
--- /dev/null
+++ 
b/tika-pipes/tika-pipes-integration-tests/src/test/java/org/apache/tika/pipes/core/MockPassbackFilter.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tika.pipes.core;
+
+import java.util.List;
+import java.util.Locale;
+
+import org.apache.tika.config.TikaComponent;
+import org.apache.tika.exception.TikaException;
+import org.apache.tika.metadata.Metadata;
+import org.apache.tika.metadata.TikaCoreProperties;
+import org.apache.tika.utils.StringUtils;
+
+/**
+ * Mock PassbackFilter for testing. Removes items without RESOURCE_NAME_KEY
+ * and uppercases the RESOURCE_NAME_KEY values.
+ */
+@TikaComponent(contextKey = PassbackFilter.class)
+public class MockPassbackFilter extends PassbackFilter {
+
+    public MockPassbackFilter() {
+        // Required for Jackson deserialization
+    }
+
+    @Override
+    public void filter(List<Metadata> metadataList) throws TikaException {
+        // Remove items without RESOURCE_NAME_KEY and transform remaining ones
+        metadataList.removeIf(m -> 
StringUtils.isBlank(m.get(TikaCoreProperties.RESOURCE_NAME_KEY)));
+        for (Metadata m : metadataList) {
+            String val = m.get(TikaCoreProperties.RESOURCE_NAME_KEY);
+            // Clear all fields and only keep RESOURCE_NAME_KEY (uppercased)
+            for (String name : m.names()) {
+                m.remove(name);
+            }
+            m.set(TikaCoreProperties.RESOURCE_NAME_KEY, 
val.toUpperCase(Locale.ROOT));
+        }
+    }
+}
diff --git 
a/tika-pipes/tika-pipes-integration-tests/src/test/java/org/apache/tika/pipes/core/PassbackFilterTest.java
 
b/tika-pipes/tika-pipes-integration-tests/src/test/java/org/apache/tika/pipes/core/PassbackFilterTest.java
index a3a55bb372..853136855d 100644
--- 
a/tika-pipes/tika-pipes-integration-tests/src/test/java/org/apache/tika/pipes/core/PassbackFilterTest.java
+++ 
b/tika-pipes/tika-pipes-integration-tests/src/test/java/org/apache/tika/pipes/core/PassbackFilterTest.java
@@ -23,14 +23,12 @@ import java.nio.charset.StandardCharsets;
 import java.nio.file.Files;
 import java.nio.file.Path;
 import java.util.List;
-import java.util.Locale;
 
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.io.TempDir;
 
 import org.apache.tika.config.loader.TikaJsonConfig;
-import org.apache.tika.exception.TikaException;
 import org.apache.tika.metadata.Metadata;
 import org.apache.tika.metadata.TikaCoreProperties;
 import org.apache.tika.parser.ParseContext;
@@ -39,7 +37,6 @@ import org.apache.tika.pipes.api.PipesResult;
 import org.apache.tika.pipes.api.emitter.EmitKey;
 import org.apache.tika.pipes.api.fetcher.FetchKey;
 import org.apache.tika.serialization.JsonMetadataList;
-import org.apache.tika.utils.StringUtils;
 
 public class PassbackFilterTest {
 
@@ -63,7 +60,9 @@ public class PassbackFilterTest {
         init(tmpDir);
         String emitFileBase = "blah";
         ParseContext parseContext = new ParseContext();
-        parseContext.set(PassbackFilter.class, new MyPassbackFilter());
+        // Use JSON config approach for Jackson serialization compatibility
+        // Don't resolve here - let PipesServer resolve on its side
+        parseContext.setJsonConfig("mock-passback-filter", "{}");
         PipesResult pipesResult = pipesClient.process(
                 new FetchEmitTuple(testPdfFile, new FetchKey(fetcherId, 
testPdfFile),
                         new EmitKey(emitterId, emitFileBase), new Metadata(), 
parseContext,
@@ -96,21 +95,4 @@ public class PassbackFilterTest {
                 .get(0)
                 .get(Metadata.CONTENT_LENGTH));
     }
-
-    private static class MyPassbackFilter extends PassbackFilter {
-        @Override
-        public void filter(List<Metadata> metadataList) throws TikaException {
-            // Remove items without RESOURCE_NAME_KEY and transform remaining 
ones
-            metadataList.removeIf(m -> 
StringUtils.isBlank(m.get(TikaCoreProperties.RESOURCE_NAME_KEY)));
-            for (Metadata m : metadataList) {
-                String val = m.get(TikaCoreProperties.RESOURCE_NAME_KEY);
-                // Clear all fields and only keep RESOURCE_NAME_KEY 
(uppercased)
-                for (String name : m.names()) {
-                    m.remove(name);
-                }
-                m.set(TikaCoreProperties.RESOURCE_NAME_KEY, 
val.toUpperCase(Locale.ROOT));
-            }
-        }
-    }
-
 }
diff --git 
a/tika-pipes/tika-pipes-integration-tests/src/test/java/org/apache/tika/pipes/core/PipesClientTest.java
 
b/tika-pipes/tika-pipes-integration-tests/src/test/java/org/apache/tika/pipes/core/PipesClientTest.java
index 1cba2622ac..ee18f7a369 100644
--- 
a/tika-pipes/tika-pipes-integration-tests/src/test/java/org/apache/tika/pipes/core/PipesClientTest.java
+++ 
b/tika-pipes/tika-pipes-integration-tests/src/test/java/org/apache/tika/pipes/core/PipesClientTest.java
@@ -22,7 +22,6 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
 import java.nio.charset.StandardCharsets;
 import java.nio.file.Files;
 import java.nio.file.Path;
-import java.util.List;
 
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
@@ -32,10 +31,8 @@ import org.apache.tika.config.TikaTaskTimeout;
 import org.apache.tika.config.loader.TikaJsonConfig;
 import org.apache.tika.metadata.Metadata;
 import org.apache.tika.metadata.TikaCoreProperties;
-import org.apache.tika.metadata.filter.AttachmentCountingListFilter;
 import org.apache.tika.metadata.filter.CompositeMetadataFilter;
 import org.apache.tika.metadata.filter.MetadataFilter;
-import org.apache.tika.metadata.filter.MockUpperCaseFilter;
 import org.apache.tika.parser.ParseContext;
 import org.apache.tika.pipes.api.FetchEmitTuple;
 import org.apache.tika.pipes.api.PipesResult;
@@ -74,8 +71,11 @@ public class PipesClientTest {
     @Test
     public void testMetadataFilter(@TempDir Path tmp) throws Exception {
         ParseContext parseContext = new ParseContext();
-        MetadataFilter metadataFilter = new 
CompositeMetadataFilter(List.of(new MockUpperCaseFilter()));
-        parseContext.set(MetadataFilter.class, metadataFilter);
+        // Use JSON config approach for Jackson serialization compatibility
+        // Don't resolve here - let PipesServer resolve on its side
+        parseContext.setJsonConfig("metadata-filters", """
+            ["mock-upper-case-filter"]
+        """);
         PipesClient pipesClient = init(tmp, testDoc);
         PipesResult pipesResult = pipesClient.process(
                 new FetchEmitTuple(testDoc, new FetchKey(fetcherName, testDoc),
@@ -89,8 +89,11 @@ public class PipesClientTest {
     @Test
     public void testMetadataListFilter(@TempDir Path tmp) throws Exception {
         ParseContext parseContext = new ParseContext();
-        MetadataFilter metadataFilter = new 
CompositeMetadataFilter(List.of(new AttachmentCountingListFilter()));
-        parseContext.set(MetadataFilter.class, metadataFilter);
+        // Use JSON config approach for Jackson serialization compatibility
+        // Don't resolve here - let PipesServer resolve on its side
+        parseContext.setJsonConfig("metadata-filters", """
+            ["attachment-counting-list-filter"]
+        """);
 
         String testFile = "mock-embedded.xml";
 
@@ -175,8 +178,11 @@ public class PipesClientTest {
         //I did both manually during development, but unit tests are better. :D
         ParseContext parseContext = new ParseContext();
         parseContext.set(TikaTaskTimeout.class, new TikaTaskTimeout(1000));
-        MetadataFilter metadataFilter = new 
CompositeMetadataFilter(List.of(new AttachmentCountingListFilter()));
-        parseContext.set(MetadataFilter.class, metadataFilter);
+        // Use JSON config approach for Jackson serialization compatibility
+        // Don't resolve here - let PipesServer resolve on its side
+        parseContext.setJsonConfig("metadata-filters", """
+            ["attachment-counting-list-filter"]
+        """);
 
         String testFile = "mock-timeout-10s.xml";
         PipesClient pipesClient = init(tmp, testFile);
diff --git 
a/tika-pipes/tika-pipes-plugins/tika-pipes-jdbc/src/main/java/org/apache/tika/pipes/emitter/jdbc/JDBCEmitter.java
 
b/tika-pipes/tika-pipes-plugins/tika-pipes-jdbc/src/main/java/org/apache/tika/pipes/emitter/jdbc/JDBCEmitter.java
index b1898d5914..86ea09edee 100644
--- 
a/tika-pipes/tika-pipes-plugins/tika-pipes-jdbc/src/main/java/org/apache/tika/pipes/emitter/jdbc/JDBCEmitter.java
+++ 
b/tika-pipes/tika-pipes-plugins/tika-pipes-jdbc/src/main/java/org/apache/tika/pipes/emitter/jdbc/JDBCEmitter.java
@@ -183,7 +183,11 @@ public class JDBCEmitter extends AbstractEmitter 
implements Closeable {
     @Override
     public void emit(List<? extends EmitData> emitData) throws IOException {
         for (EmitData d : emitData) {
-            emit(d.getEmitKey(), d.getMetadataList(), d.getParseContext());
+            ParseContext parseContext = d.getParseContext();
+            if (parseContext == null) {
+                parseContext = new ParseContext();
+            }
+            emit(d.getEmitKey(), d.getMetadataList(), parseContext);
         }
     }
 
diff --git a/tika-serialization/pom.xml b/tika-serialization/pom.xml
index 45964fcea1..192ebb2b1a 100644
--- a/tika-serialization/pom.xml
+++ b/tika-serialization/pom.xml
@@ -74,6 +74,11 @@
       <artifactId>junit-jupiter</artifactId>
       <scope>test</scope>
     </dependency>
+    <dependency>
+      <groupId>com.fasterxml.jackson.dataformat</groupId>
+      <artifactId>jackson-dataformat-smile</artifactId>
+      <scope>test</scope>
+    </dependency>
     <dependency>
       <groupId>commons-io</groupId>
       <artifactId>commons-io</artifactId>
diff --git 
a/tika-serialization/src/main/java/org/apache/tika/config/loader/FrameworkConfig.java
 
b/tika-serialization/src/main/java/org/apache/tika/config/loader/FrameworkConfig.java
index 34952ee396..9da1df550e 100644
--- 
a/tika-serialization/src/main/java/org/apache/tika/config/loader/FrameworkConfig.java
+++ 
b/tika-serialization/src/main/java/org/apache/tika/config/loader/FrameworkConfig.java
@@ -42,6 +42,11 @@ public class FrameworkConfig {
     private static final String MIME_INCLUDE_KEY = "_mime-include";
     private static final String MIME_EXCLUDE_KEY = "_mime-exclude";
 
+    // Plain JSON mapper for converting JsonNodes to JSON strings.
+    // This is needed because the main mapper may use a binary format (e.g., 
Smile)
+    // which doesn't support writeValueAsString().
+    private static final ObjectMapper JSON_MAPPER = new ObjectMapper();
+
     private final ParserDecoration decoration;
     private final JsonConfig componentConfigJson;
     private final JsonNode componentConfigNode;
@@ -64,7 +69,8 @@ public class FrameworkConfig {
     public static FrameworkConfig extract(JsonNode configNode,
                                            ObjectMapper objectMapper) throws 
IOException {
         if (configNode == null || !configNode.isObject()) {
-            String jsonString = objectMapper.writeValueAsString(configNode);
+            // Use plain JSON mapper since the main mapper may be binary 
(Smile)
+            String jsonString = JSON_MAPPER.writeValueAsString(configNode);
             JsonConfig jsonConfig = () -> jsonString;
             return new FrameworkConfig(null, jsonConfig, configNode);
         }
@@ -81,7 +87,8 @@ public class FrameworkConfig {
         }
 
         // Remaining fields are component-specific config
-        String jsonString = objectMapper.writeValueAsString(objNode);
+        // Use plain JSON mapper since the main mapper may be binary (Smile)
+        String jsonString = JSON_MAPPER.writeValueAsString(objNode);
         JsonConfig componentConfigJson = () -> jsonString;
 
         return new FrameworkConfig(decoration, componentConfigJson, objNode);
diff --git 
a/tika-serialization/src/main/java/org/apache/tika/config/loader/TikaObjectMapperFactory.java
 
b/tika-serialization/src/main/java/org/apache/tika/config/loader/TikaObjectMapperFactory.java
index b8b7e4389d..e65eee07b9 100644
--- 
a/tika-serialization/src/main/java/org/apache/tika/config/loader/TikaObjectMapperFactory.java
+++ 
b/tika-serialization/src/main/java/org/apache/tika/config/loader/TikaObjectMapperFactory.java
@@ -16,6 +16,7 @@
  */
 package org.apache.tika.config.loader;
 
+import com.fasterxml.jackson.core.JsonFactory;
 import com.fasterxml.jackson.core.JsonParser;
 import com.fasterxml.jackson.databind.DeserializationFeature;
 import com.fasterxml.jackson.databind.ObjectMapper;
@@ -67,7 +68,22 @@ public class TikaObjectMapperFactory {
      * @return configured ObjectMapper
      */
     public static ObjectMapper createMapper() {
-        ObjectMapper mapper = new ObjectMapper();
+        return createMapper(null);
+    }
+
+    /**
+     * Creates an ObjectMapper configured for Tika serialization with a custom 
JsonFactory.
+     * <p>
+     * This can be used to create mappers for binary formats like Smile:
+     * <pre>
+     * ObjectMapper smileMapper = TikaObjectMapperFactory.createMapper(new 
SmileFactory());
+     * </pre>
+     *
+     * @param factory the JsonFactory to use, or null for default JSON
+     * @return configured ObjectMapper
+     */
+    public static ObjectMapper createMapper(JsonFactory factory) {
+        ObjectMapper mapper = (factory != null) ? new ObjectMapper(factory) : 
new ObjectMapper();
 
         // Allow comments in JSON config files (// and /* */ style)
         mapper.configure(JsonParser.Feature.ALLOW_COMMENTS, true);
diff --git 
a/tika-serialization/src/main/java/org/apache/tika/serialization/TikaModule.java
 
b/tika-serialization/src/main/java/org/apache/tika/serialization/TikaModule.java
index 8ad0d588ff..31a95d9e4d 100644
--- 
a/tika-serialization/src/main/java/org/apache/tika/serialization/TikaModule.java
+++ 
b/tika-serialization/src/main/java/org/apache/tika/serialization/TikaModule.java
@@ -90,6 +90,11 @@ public class TikaModule extends SimpleModule {
 
     private static ObjectMapper sharedMapper;
 
+    // Plain JSON mapper for converting JsonNodes to JSON strings.
+    // This is needed because the main mapper may use a binary format (e.g., 
Smile)
+    // which doesn't support writeValueAsString().
+    private static final ObjectMapper JSON_MAPPER = new ObjectMapper();
+
     /**
      * Interfaces that use compact format serialization.
      * Types implementing these interfaces will be serialized as:
@@ -324,7 +329,8 @@ public class TikaModule extends SimpleModule {
                     // Try JsonConfig constructor first (works for any 
component)
                     Constructor<?> jsonConfigCtor = 
findJsonConfigConstructor(clazz);
                     if (jsonConfigCtor != null) {
-                        String json = mapper.writeValueAsString(cleanedConfig);
+                        // Use plain JSON mapper since the main mapper may be 
binary (Smile)
+                        String json = 
JSON_MAPPER.writeValueAsString(cleanedConfig);
                         instance = jsonConfigCtor.newInstance((JsonConfig) () 
-> json);
                     } else {
                         // Fall back to no-arg constructor + Jackson bean 
deserialization
diff --git 
a/tika-serialization/src/main/java/org/apache/tika/serialization/serdes/ParseContextDeserializer.java
 
b/tika-serialization/src/main/java/org/apache/tika/serialization/serdes/ParseContextDeserializer.java
index 2dcf296104..2e2b4dec7d 100644
--- 
a/tika-serialization/src/main/java/org/apache/tika/serialization/serdes/ParseContextDeserializer.java
+++ 
b/tika-serialization/src/main/java/org/apache/tika/serialization/serdes/ParseContextDeserializer.java
@@ -58,6 +58,11 @@ public class ParseContextDeserializer extends 
JsonDeserializer<ParseContext> {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(ParseContextDeserializer.class);
 
+    // Plain JSON mapper for converting JsonNodes to JSON strings.
+    // This is needed because the main mapper may use a binary format (e.g., 
Smile)
+    // which doesn't support writeValueAsString().
+    private static final ObjectMapper JSON_MAPPER = new ObjectMapper();
+
     @Override
     public ParseContext deserialize(JsonParser jsonParser, 
DeserializationContext ctxt)
             throws IOException {
@@ -100,7 +105,8 @@ public class ParseContextDeserializer extends 
JsonDeserializer<ParseContext> {
                 deserializeTypedObjects(value, parseContext, mapper);
             } else {
                 // Store as JSON config for lazy resolution
-                String json = mapper.writeValueAsString(value);
+                // Use plain JSON mapper since the main mapper may be binary 
(Smile)
+                String json = JSON_MAPPER.writeValueAsString(value);
                 parseContext.setJsonConfig(name, json);
             }
         }
@@ -143,8 +149,8 @@ public class ParseContextDeserializer extends 
JsonDeserializer<ParseContext> {
                 } catch (ClassNotFoundException e) {
                     LOG.warn("Could not find class for typed component '{}', 
storing as JSON config",
                             componentName);
-                    // Fall back to storing as JSON config
-                    parseContext.setJsonConfig(componentName, 
mapper.writeValueAsString(configNode));
+                    // Fall back to storing as JSON config (use plain JSON 
mapper)
+                    parseContext.setJsonConfig(componentName, 
JSON_MAPPER.writeValueAsString(configNode));
                     continue;
                 }
             }
@@ -161,7 +167,8 @@ public class ParseContextDeserializer extends 
JsonDeserializer<ParseContext> {
             } catch (Exception e) {
                 LOG.warn("Failed to deserialize typed component '{}' as {}, 
storing as JSON config",
                         componentName, configClass.getName(), e);
-                parseContext.setJsonConfig(componentName, 
mapper.writeValueAsString(configNode));
+                // Use plain JSON mapper since main mapper may be binary 
(Smile)
+                parseContext.setJsonConfig(componentName, 
JSON_MAPPER.writeValueAsString(configNode));
             }
         }
     }
diff --git 
a/tika-serialization/src/main/java/org/apache/tika/serialization/serdes/ParseContextSerializer.java
 
b/tika-serialization/src/main/java/org/apache/tika/serialization/serdes/ParseContextSerializer.java
index e2545d4033..cf73ad8129 100644
--- 
a/tika-serialization/src/main/java/org/apache/tika/serialization/serdes/ParseContextSerializer.java
+++ 
b/tika-serialization/src/main/java/org/apache/tika/serialization/serdes/ParseContextSerializer.java
@@ -91,7 +91,9 @@ public class ParseContextSerializer extends 
JsonSerializer<ParseContext> {
                 hasTypedObjects = true;
             }
             gen.writeFieldName(keyName);
-            gen.writeRawValue(PLAIN_MAPPER.writeValueAsString(value));
+            // Use writeTree instead of writeRawValue for binary format 
support (e.g., Smile)
+            // and stricter validation (fails early if value can't be 
serialized)
+            gen.writeTree(PLAIN_MAPPER.valueToTree(value));
         }
 
         if (hasTypedObjects) {
@@ -102,7 +104,8 @@ public class ParseContextSerializer extends 
JsonSerializer<ParseContext> {
         Map<String, JsonConfig> jsonConfigs = parseContext.getJsonConfigs();
         for (Map.Entry<String, JsonConfig> entry : jsonConfigs.entrySet()) {
             gen.writeFieldName(entry.getKey());
-            gen.writeRawValue(entry.getValue().json());
+            // Parse the JSON string into a tree for binary format support
+            gen.writeTree(PLAIN_MAPPER.readTree(entry.getValue().json()));
         }
 
         gen.writeEndObject();
diff --git 
a/tika-serialization/src/test/java/org/apache/tika/serialization/SmileFormatTest.java
 
b/tika-serialization/src/test/java/org/apache/tika/serialization/SmileFormatTest.java
new file mode 100644
index 0000000000..74836f2526
--- /dev/null
+++ 
b/tika-serialization/src/test/java/org/apache/tika/serialization/SmileFormatTest.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tika.serialization;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+
+import java.io.IOException;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.dataformat.smile.SmileFactory;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import org.apache.tika.config.loader.FrameworkConfig;
+import org.apache.tika.config.loader.TikaObjectMapperFactory;
+import org.apache.tika.metadata.Metadata;
+import org.apache.tika.parser.ParseContext;
+
+/**
+ * Tests that serialization works correctly with Jackson's Smile binary format.
+ * <p>
+ * Smile is a binary JSON format that doesn't support certain text-based 
operations
+ * like writeValueAsString(). These tests ensure our serializers handle this 
correctly.
+ */
+public class SmileFormatTest {
+
+    private ObjectMapper smileMapper;
+
+    @BeforeEach
+    public void setUp() {
+        SmileFactory smileFactory = new SmileFactory();
+        smileMapper = TikaObjectMapperFactory.createMapper(smileFactory);
+    }
+
+    @Test
+    public void testMetadataRoundTripWithSmile() throws IOException {
+        Metadata original = new Metadata();
+        original.set("title", "Test Document");
+        original.set("author", "Test Author");
+        original.add("keywords", "test");
+        original.add("keywords", "smile");
+
+        // Serialize to Smile binary format
+        byte[] bytes = smileMapper.writeValueAsBytes(original);
+
+        // Deserialize from Smile binary format
+        Metadata deserialized = smileMapper.readValue(bytes, Metadata.class);
+
+        assertEquals("Test Document", deserialized.get("title"));
+        assertEquals("Test Author", deserialized.get("author"));
+        String[] keywords = deserialized.getValues("keywords");
+        assertEquals(2, keywords.length);
+    }
+
+    @Test
+    public void testParseContextRoundTripWithSmile() throws IOException {
+        ParseContext original = new ParseContext();
+        original.setJsonConfig("test-config", "{\"key\": \"value\"}");
+
+        // Serialize to Smile binary format
+        byte[] bytes = smileMapper.writeValueAsBytes(original);
+
+        // Deserialize from Smile binary format
+        ParseContext deserialized = smileMapper.readValue(bytes, 
ParseContext.class);
+
+        assertNotNull(deserialized.getJsonConfig("test-config"));
+        // Compare parsed JSON to avoid whitespace differences
+        ObjectMapper jsonMapper = new ObjectMapper();
+        JsonNode originalNode = jsonMapper.readTree("{\"key\": \"value\"}");
+        JsonNode deserializedNode = 
jsonMapper.readTree(deserialized.getJsonConfig("test-config").json());
+        assertEquals(originalNode, deserializedNode);
+    }
+
+    @Test
+    public void testFrameworkConfigExtractWithSmile() throws IOException {
+        // Create a config node using the Smile mapper
+        String json = "{\"option1\": \"value1\", \"option2\": 42}";
+        ObjectMapper jsonMapper = new ObjectMapper();
+        JsonNode configNode = jsonMapper.readTree(json);
+
+        // Extract framework config - this should work even when passed a 
Smile mapper
+        // because FrameworkConfig uses a plain JSON mapper internally for 
writeValueAsString
+        FrameworkConfig frameworkConfig = FrameworkConfig.extract(configNode, 
smileMapper);
+
+        assertNotNull(frameworkConfig.getComponentConfigJson());
+        // The JSON string should be valid
+        String componentJson = frameworkConfig.getComponentConfigJson().json();
+        assertNotNull(componentJson);
+        // Parse it back to verify it's valid JSON
+        JsonNode parsed = jsonMapper.readTree(componentJson);
+        assertEquals("value1", parsed.get("option1").asText());
+        assertEquals(42, parsed.get("option2").asInt());
+    }
+}

Reply via email to