This is an automated email from the ASF dual-hosted git repository.
tallison pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/tika.git
The following commit(s) were added to refs/heads/main by this push:
new 30b5b26396 TIKA-4628 -- improve pipesClient+pipesServer ipc: critical
socket.setTcpNoDelay(true) and migrate to pure jackson serialization (#2546)
30b5b26396 is described below
commit 30b5b26396496a2f3d314f204e6322b3049a7085
Author: Tim Allison <[email protected]>
AuthorDate: Fri Jan 23 15:17:37 2026 -0500
TIKA-4628 -- improve pipesClient+pipesServer ipc: critical
socket.setTcpNoDelay(true) and migrate to pure jackson serialization (#2546)
* TIKA-4628 -- improve pipesClient+pipesServer ipc: critical
socket.setTcpNoDelay(true) and migrate to pure jackson serialization
---
.../ParsingEmbeddedDocumentExtractor.java | 14 +++
.../java/org/apache/tika/parser/ParseRecord.java | 120 +++++++++++++++++++++
.../apache/tika/parser/RecursiveParserWrapper.java | 12 +++
.../sax/AbstractRecursiveParserWrapperHandler.java | 2 +
.../tika/pipes/api/emitter/AbstractEmitter.java | 7 +-
.../pipes/api/emitter/AbstractStreamEmitter.java | 7 +-
.../apache/tika/pipes/api/emitter/EmitData.java | 6 +-
tika-pipes/tika-pipes-core/pom.xml | 4 +
.../org/apache/tika/pipes/core/PipesClient.java | 46 ++++----
.../tika/pipes/core/emitter/EmitDataImpl.java | 32 +++---
.../core/serialization/EmitDataDeserializer.java | 75 +++++++++++++
.../core/serialization/EmitDataSerializer.java | 45 ++++++++
.../pipes/core/serialization/JsonPipesIpc.java | 88 +++++++++++++++
.../serialization/PipesResultDeserializer.java | 65 +++++++++++
.../core/serialization/PipesResultSerializer.java | 46 ++++++++
.../tika/pipes/core/server/ParseHandler.java | 32 +++---
.../apache/tika/pipes/core/server/PipesServer.java | 49 +++------
.../apache/tika/pipes/core/MockPassbackFilter.java | 52 +++++++++
.../apache/tika/pipes/core/PassbackFilterTest.java | 24 +----
.../apache/tika/pipes/core/PipesClientTest.java | 24 +++--
.../tika/pipes/emitter/jdbc/JDBCEmitter.java | 6 +-
tika-serialization/pom.xml | 5 +
.../apache/tika/config/loader/FrameworkConfig.java | 11 +-
.../config/loader/TikaObjectMapperFactory.java | 18 +++-
.../org/apache/tika/serialization/TikaModule.java | 8 +-
.../serdes/ParseContextDeserializer.java | 15 ++-
.../serdes/ParseContextSerializer.java | 7 +-
.../apache/tika/serialization/SmileFormatTest.java | 110 +++++++++++++++++++
28 files changed, 800 insertions(+), 130 deletions(-)
diff --git
a/tika-core/src/main/java/org/apache/tika/extractor/ParsingEmbeddedDocumentExtractor.java
b/tika-core/src/main/java/org/apache/tika/extractor/ParsingEmbeddedDocumentExtractor.java
index b6112cf09f..4b1e406183 100644
---
a/tika-core/src/main/java/org/apache/tika/extractor/ParsingEmbeddedDocumentExtractor.java
+++
b/tika-core/src/main/java/org/apache/tika/extractor/ParsingEmbeddedDocumentExtractor.java
@@ -61,11 +61,19 @@ public class ParsingEmbeddedDocumentExtractor implements
EmbeddedDocumentExtract
@Override
public boolean shouldParseEmbedded(Metadata metadata) {
+ // Check ParseRecord for depth/count limits first
+ ParseRecord parseRecord = context.get(ParseRecord.class);
+ if (parseRecord != null && !parseRecord.shouldParseEmbedded()) {
+ return false;
+ }
+
+ // Then check DocumentSelector for content-based filtering
DocumentSelector selector = context.get(DocumentSelector.class);
if (selector != null) {
return selector.select(metadata);
}
+ // Then check FilenameFilter
FilenameFilter filter = context.get(FilenameFilter.class);
if (filter != null) {
String name = metadata.get(TikaCoreProperties.RESOURCE_NAME_KEY);
@@ -81,6 +89,12 @@ public class ParsingEmbeddedDocumentExtractor implements
EmbeddedDocumentExtract
public void parseEmbedded(
TikaInputStream tis, ContentHandler handler, Metadata metadata,
ParseContext parseContext, boolean outputHtml)
throws SAXException, IOException {
+ // Increment embedded count for tracking
+ ParseRecord parseRecord = context.get(ParseRecord.class);
+ if (parseRecord != null) {
+ parseRecord.incrementEmbeddedCount();
+ }
+
if (outputHtml) {
AttributesImpl attributes = new AttributesImpl();
attributes.addAttribute("", "class", "class", "CDATA",
"package-entry");
diff --git a/tika-core/src/main/java/org/apache/tika/parser/ParseRecord.java
b/tika-core/src/main/java/org/apache/tika/parser/ParseRecord.java
index ca0edc567c..9204118b08 100644
--- a/tika-core/src/main/java/org/apache/tika/parser/ParseRecord.java
+++ b/tika-core/src/main/java/org/apache/tika/parser/ParseRecord.java
@@ -27,6 +27,10 @@ import org.apache.tika.metadata.Metadata;
* Use this class to store exceptions, warnings and other information
* during the parse. This information is added to the parent's metadata
* after the parse by the {@link CompositeParser}.
+ * <p>
+ * This class also tracks embedded document processing limits (depth and count)
+ * which can be configured via {@link #setMaxEmbeddedDepth(int)} and
+ * {@link #setMaxEmbeddedCount(int)}.
*/
public class ParseRecord {
@@ -51,6 +55,13 @@ public class ParseRecord {
private boolean writeLimitReached = false;
+ // Embedded document tracking
+ private int embeddedCount = 0;
+ private int maxEmbeddedDepth = -1;
+ private int maxEmbeddedCount = -1;
+ private boolean embeddedDepthLimitReached = false;
+ private boolean embeddedCountLimitReached = false;
+
void beforeParse() {
depth++;
}
@@ -111,4 +122,113 @@ public class ParseRecord {
public List<Metadata> getMetadataList() {
return metadataList;
}
+
+ /**
+ * Checks whether an embedded document should be parsed based on
configured limits.
+ * This should be called before parsing each embedded document.
+ * <p>
+ * If this returns false, the caller should skip parsing the embedded
document
+ * and the appropriate limit flag will be set.
+ * <p>
+ * Note: The count limit is a hard stop (once hit, no more embedded docs
are parsed).
+ * The depth limit only affects documents at that depth - sibling
documents at
+ * shallower depths will still be parsed.
+ *
+ * @return true if the embedded document should be parsed, false if limits
are exceeded
+ */
+ public boolean shouldParseEmbedded() {
+ // Count limit is a hard stop - once we've hit max, no more embedded
parsing
+ if (embeddedCountLimitReached) {
+ return false;
+ }
+ if (maxEmbeddedCount >= 0 && embeddedCount >= maxEmbeddedCount) {
+ embeddedCountLimitReached = true;
+ return false;
+ }
+
+ // Depth limit only applies to current depth - siblings at shallower
levels
+ // can still be parsed. The flag is set for reporting purposes.
+ // depth is 1-indexed (main doc is depth 1), so embedded depth limit
of N
+ // means we allow parsing up to depth N+1
+ if (maxEmbeddedDepth >= 0 && depth > maxEmbeddedDepth) {
+ embeddedDepthLimitReached = true;
+ return false;
+ }
+ return true;
+ }
+
+ /**
+ * Increments the embedded document count. Should be called when an
embedded
+ * document is about to be parsed.
+ */
+ public void incrementEmbeddedCount() {
+ embeddedCount++;
+ }
+
+ /**
+ * Gets the current count of embedded documents processed.
+ *
+ * @return the embedded document count
+ */
+ public int getEmbeddedCount() {
+ return embeddedCount;
+ }
+
+ /**
+ * Sets the maximum depth for parsing embedded documents.
+ * A value of -1 means unlimited (the default).
+ * A value of 0 means no embedded documents will be parsed.
+ * A value of 1 means only first-level embedded documents will be parsed,
etc.
+ *
+ * @param maxEmbeddedDepth the maximum embedded depth, or -1 for unlimited
+ */
+ public void setMaxEmbeddedDepth(int maxEmbeddedDepth) {
+ this.maxEmbeddedDepth = maxEmbeddedDepth;
+ }
+
+ /**
+ * Gets the maximum depth for parsing embedded documents.
+ *
+ * @return the maximum embedded depth, or -1 if unlimited
+ */
+ public int getMaxEmbeddedDepth() {
+ return maxEmbeddedDepth;
+ }
+
+ /**
+ * Sets the maximum number of embedded documents to parse.
+ * A value of -1 means unlimited (the default).
+ *
+ * @param maxEmbeddedCount the maximum embedded count, or -1 for unlimited
+ */
+ public void setMaxEmbeddedCount(int maxEmbeddedCount) {
+ this.maxEmbeddedCount = maxEmbeddedCount;
+ }
+
+ /**
+ * Gets the maximum number of embedded documents to parse.
+ *
+ * @return the maximum embedded count, or -1 if unlimited
+ */
+ public int getMaxEmbeddedCount() {
+ return maxEmbeddedCount;
+ }
+
+ /**
+ * Returns whether the embedded depth limit was reached during parsing.
+ *
+ * @return true if the depth limit was reached
+ */
+ public boolean isEmbeddedDepthLimitReached() {
+ return embeddedDepthLimitReached;
+ }
+
+ /**
+ * Returns whether the embedded count limit was reached during parsing.
+ *
+ * @return true if the count limit was reached
+ */
+ public boolean isEmbeddedCountLimitReached() {
+ return embeddedCountLimitReached;
+ }
}
diff --git
a/tika-core/src/main/java/org/apache/tika/parser/RecursiveParserWrapper.java
b/tika-core/src/main/java/org/apache/tika/parser/RecursiveParserWrapper.java
index 07159dba01..f774e8efe6 100644
--- a/tika-core/src/main/java/org/apache/tika/parser/RecursiveParserWrapper.java
+++ b/tika-core/src/main/java/org/apache/tika/parser/RecursiveParserWrapper.java
@@ -227,6 +227,18 @@ public class RecursiveParserWrapper extends
ParserDecorator {
if
(parserState.recursiveParserWrapperHandler.hasHitMaximumEmbeddedResources()) {
return;
}
+
+ // Also check ParseRecord limits if available
+ ParseRecord parseRecord = context.get(ParseRecord.class);
+ if (parseRecord != null && !parseRecord.shouldParseEmbedded()) {
+ return;
+ }
+
+ // Increment embedded count in ParseRecord
+ if (parseRecord != null) {
+ parseRecord.incrementEmbeddedCount();
+ }
+
// Work out what this thing is
String objectName = getResourceName(metadata,
parserState.unknownCount);
String objectLocation = this.location + objectName;
diff --git
a/tika-core/src/main/java/org/apache/tika/sax/AbstractRecursiveParserWrapperHandler.java
b/tika-core/src/main/java/org/apache/tika/sax/AbstractRecursiveParserWrapperHandler.java
index ea4efedf6b..5e6a9fb2ce 100644
---
a/tika-core/src/main/java/org/apache/tika/sax/AbstractRecursiveParserWrapperHandler.java
+++
b/tika-core/src/main/java/org/apache/tika/sax/AbstractRecursiveParserWrapperHandler.java
@@ -37,6 +37,8 @@ public abstract class AbstractRecursiveParserWrapperHandler
extends DefaultHandl
public final static Property EMBEDDED_RESOURCE_LIMIT_REACHED =
Property.internalBoolean(
TikaCoreProperties.TIKA_META_EXCEPTION_PREFIX +
"embedded_resource_limit_reached");
+ public final static Property EMBEDDED_DEPTH_LIMIT_REACHED =
Property.internalBoolean(
+ TikaCoreProperties.TIKA_META_EXCEPTION_PREFIX +
"embedded_depth_limit_reached");
private static final int MAX_DEPTH = 100;
private final ContentHandlerFactory contentHandlerFactory;
private final int maxEmbeddedResources;
diff --git
a/tika-pipes/tika-pipes-api/src/main/java/org/apache/tika/pipes/api/emitter/AbstractEmitter.java
b/tika-pipes/tika-pipes-api/src/main/java/org/apache/tika/pipes/api/emitter/AbstractEmitter.java
index 324d16ace9..250b3df9d9 100644
---
a/tika-pipes/tika-pipes-api/src/main/java/org/apache/tika/pipes/api/emitter/AbstractEmitter.java
+++
b/tika-pipes/tika-pipes-api/src/main/java/org/apache/tika/pipes/api/emitter/AbstractEmitter.java
@@ -19,6 +19,7 @@ package org.apache.tika.pipes.api.emitter;
import java.io.IOException;
import java.util.List;
+import org.apache.tika.parser.ParseContext;
import org.apache.tika.plugins.AbstractTikaExtension;
import org.apache.tika.plugins.ExtensionConfig;
@@ -31,7 +32,11 @@ public abstract class AbstractEmitter extends
AbstractTikaExtension implements E
@Override
public void emit(List<? extends EmitData> emitData) throws IOException {
for (EmitData item : emitData) {
- emit(item.getEmitKey(), item.getMetadataList(),
item.getParseContext());
+ ParseContext parseContext = item.getParseContext();
+ if (parseContext == null) {
+ parseContext = new ParseContext();
+ }
+ emit(item.getEmitKey(), item.getMetadataList(), parseContext);
}
}
}
diff --git
a/tika-pipes/tika-pipes-api/src/main/java/org/apache/tika/pipes/api/emitter/AbstractStreamEmitter.java
b/tika-pipes/tika-pipes-api/src/main/java/org/apache/tika/pipes/api/emitter/AbstractStreamEmitter.java
index 892180d8c7..98727205f1 100644
---
a/tika-pipes/tika-pipes-api/src/main/java/org/apache/tika/pipes/api/emitter/AbstractStreamEmitter.java
+++
b/tika-pipes/tika-pipes-api/src/main/java/org/apache/tika/pipes/api/emitter/AbstractStreamEmitter.java
@@ -19,6 +19,7 @@ package org.apache.tika.pipes.api.emitter;
import java.io.IOException;
import java.util.List;
+import org.apache.tika.parser.ParseContext;
import org.apache.tika.plugins.AbstractTikaExtension;
import org.apache.tika.plugins.ExtensionConfig;
@@ -31,7 +32,11 @@ public abstract class AbstractStreamEmitter extends
AbstractTikaExtension implem
@Override
public void emit(List<? extends EmitData> emitData) throws IOException {
for (EmitData item : emitData) {
- emit(item.getEmitKey(), item.getMetadataList(),
item.getParseContext());
+ ParseContext parseContext = item.getParseContext();
+ if (parseContext == null) {
+ parseContext = new ParseContext();
+ }
+ emit(item.getEmitKey(), item.getMetadataList(), parseContext);
}
}
}
diff --git
a/tika-pipes/tika-pipes-api/src/main/java/org/apache/tika/pipes/api/emitter/EmitData.java
b/tika-pipes/tika-pipes-api/src/main/java/org/apache/tika/pipes/api/emitter/EmitData.java
index 6af852a63b..ec6266ed23 100644
---
a/tika-pipes/tika-pipes-api/src/main/java/org/apache/tika/pipes/api/emitter/EmitData.java
+++
b/tika-pipes/tika-pipes-api/src/main/java/org/apache/tika/pipes/api/emitter/EmitData.java
@@ -30,6 +30,10 @@ public interface EmitData {
long getEstimatedSizeBytes();
+ /**
+ * Gets the ParseContext. This is not serialized over IPC - it's restored
+ * by PipesClient after deserialization from the original FetchEmitTuple.
+ * May return null if not set.
+ */
ParseContext getParseContext();
-
}
diff --git a/tika-pipes/tika-pipes-core/pom.xml
b/tika-pipes/tika-pipes-core/pom.xml
index 8e4ce2e4ef..446b9bca74 100644
--- a/tika-pipes/tika-pipes-core/pom.xml
+++ b/tika-pipes/tika-pipes-core/pom.xml
@@ -61,6 +61,10 @@
<artifactId>tika-serialization</artifactId>
<version>${project.version}</version>
</dependency>
+ <dependency>
+ <groupId>com.fasterxml.jackson.dataformat</groupId>
+ <artifactId>jackson-dataformat-smile</artifactId>
+ </dependency>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>tika-pipes-iterator-commons</artifactId>
diff --git
a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/PipesClient.java
b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/PipesClient.java
index 053856bbeb..5c175208ab 100644
---
a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/PipesClient.java
+++
b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/PipesClient.java
@@ -24,13 +24,14 @@ import static
org.apache.tika.pipes.core.PipesClient.COMMANDS.ACK;
import static
org.apache.tika.pipes.core.server.PipesServer.PROCESSING_STATUS.FINISHED;
import static
org.apache.tika.pipes.core.server.PipesServer.PROCESSING_STATUS.READY;
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
import java.io.Closeable;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
import java.net.InetAddress;
+import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.Socket;
import java.net.SocketTimeoutException;
@@ -49,7 +50,6 @@ import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.io.FileUtils;
-import org.apache.commons.io.input.UnsynchronizedByteArrayInputStream;
import org.apache.commons.io.output.UnsynchronizedByteArrayOutputStream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -62,6 +62,7 @@ import org.apache.tika.pipes.api.FetchEmitTuple;
import org.apache.tika.pipes.api.PipesResult;
import org.apache.tika.pipes.api.emitter.EmitKey;
import org.apache.tika.pipes.core.emitter.EmitDataImpl;
+import org.apache.tika.pipes.core.serialization.JsonPipesIpc;
import org.apache.tika.pipes.core.server.IntermediateResult;
import org.apache.tika.pipes.core.server.PipesServer;
import org.apache.tika.utils.ExceptionUtils;
@@ -254,14 +255,7 @@ public class PipesClient implements Closeable {
private void writeTask(FetchEmitTuple t) throws IOException {
LOG.debug("pipesClientId={}: sending NEW_REQUEST for id={}",
pipesClientId, t.getId());
- UnsynchronizedByteArrayOutputStream bos =
UnsynchronizedByteArrayOutputStream
- .builder()
- .get();
- try (ObjectOutputStream objectOutputStream = new
ObjectOutputStream(bos)) {
- objectOutputStream.writeObject(t);
- }
-
- byte[] bytes = bos.toByteArray();
+ byte[] bytes = JsonPipesIpc.toBytes(t);
serverTuple.output.write(COMMANDS.NEW_REQUEST.getByte());
serverTuple.output.writeInt(bytes.length);
serverTuple.output.write(bytes);
@@ -312,7 +306,12 @@ public class PipesClient implements Closeable {
lastUpdate = Instant.now();
break;
case FINISHED:
- return readResult(PipesResult.class);
+ PipesResult result = readResult(PipesResult.class);
+ // Restore ParseContext from original FetchEmitTuple
(not serialized back from server)
+ if (result.emitData() instanceof EmitDataImpl
emitDataImpl) {
+ emitDataImpl.setParseContext(t.getParseContext());
+ }
+ return result;
}
} catch (SocketTimeoutException e) {
LOG.warn("clientId={}: Socket timeout exception while waiting
for server", pipesClientId, e);
@@ -419,21 +418,13 @@ public class PipesClient implements Closeable {
return status;
}
- private <T> T readResult(Class<? extends T> clazz) throws IOException {
+ private <T> T readResult(Class<T> clazz) throws IOException {
int len = serverTuple.input.readInt();
byte[] bytes = new byte[len];
serverTuple.input.readFully(bytes);
writeAck();
-
- try (ObjectInputStream objectInputStream = new
ObjectInputStream(UnsynchronizedByteArrayInputStream
- .builder()
- .setByteArray(bytes)
- .get())) {
- return clazz.cast(objectInputStream.readObject());
- } catch (ClassNotFoundException e) {
- throw new IOException(e);
- }
+ return JsonPipesIpc.fromBytes(bytes, clazz);
}
private void writeAck() throws IOException {
@@ -443,7 +434,9 @@ public class PipesClient implements Closeable {
private void restart() throws InterruptedException, IOException,
TimeoutException {
- ServerSocket serverSocket = new ServerSocket(0, 50,
InetAddress.getLoopbackAddress());
+ ServerSocket serverSocket = new ServerSocket();
+ serverSocket.setReuseAddress(true);
+ serverSocket.bind(new
InetSocketAddress(InetAddress.getLoopbackAddress(), 0), 50);
int port = serverSocket.getLocalPort();
if (serverTuple != null && serverTuple.process != null) {
int oldPort = serverTuple.serverSocket.getLocalPort();
@@ -494,9 +487,10 @@ public class PipesClient implements Closeable {
}
}
socket.setSoTimeout((int) pipesConfig.getSocketTimeoutMs());
- socket.setTcpNoDelay(true); // Disable Nagle's algorithm to avoid
~40ms delays on small writes
- serverTuple = new ServerTuple(process, serverSocket, socket, new
DataInputStream(socket.getInputStream()),
- new DataOutputStream(socket.getOutputStream()), tmpDir);
+ socket.setTcpNoDelay(true);
+ serverTuple = new ServerTuple(process, serverSocket, socket,
+ new DataInputStream(new
BufferedInputStream(socket.getInputStream())),
+ new DataOutputStream(new
BufferedOutputStream(socket.getOutputStream())), tmpDir);
waitForStartup();
}
diff --git
a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/emitter/EmitDataImpl.java
b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/emitter/EmitDataImpl.java
index 1aee991f11..930d594918 100644
---
a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/emitter/EmitDataImpl.java
+++
b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/emitter/EmitDataImpl.java
@@ -16,7 +16,6 @@
*/
package org.apache.tika.pipes.core.emitter;
-import java.io.Serializable;
import java.util.List;
import org.apache.tika.metadata.Metadata;
@@ -24,31 +23,23 @@ import org.apache.tika.parser.ParseContext;
import org.apache.tika.pipes.api.emitter.EmitData;
import org.apache.tika.utils.StringUtils;
-public class EmitDataImpl implements Serializable, EmitData {
- /**
- * Serial version UID
- */
- private static final long serialVersionUID = -3861669115439125268L;
+public class EmitDataImpl implements EmitData {
private final String emitKey;
private final List<Metadata> metadataList;
private final String containerStackTrace;
- private ParseContext parseContext = null;
+ // ParseContext is not serialized - it's set by PipesClient after
deserialization
+ private ParseContext parseContext;
public EmitDataImpl(String emitKey, List<Metadata> metadataList) {
this(emitKey, metadataList, StringUtils.EMPTY);
}
public EmitDataImpl(String emitKey, List<Metadata> metadataList, String
containerStackTrace) {
- this(emitKey, metadataList, containerStackTrace, new ParseContext());
- }
-
- public EmitDataImpl(String emitKey, List<Metadata> metadataList, String
containerStackTrace, ParseContext parseContext) {
this.emitKey = emitKey;
this.metadataList = metadataList;
this.containerStackTrace = (containerStackTrace == null) ?
StringUtils.EMPTY :
containerStackTrace;
- this.parseContext = parseContext;
}
public String getEmitKey() {
@@ -67,14 +58,23 @@ public class EmitDataImpl implements Serializable, EmitData
{
return estimateSizeInBytes(getEmitKey(), getMetadataList(),
containerStackTrace);
}
- public void setParseContext(ParseContext parseContext) {
- this.parseContext = parseContext;
- }
-
+ /**
+ * Gets the ParseContext. This is not serialized - it's set by PipesClient
+ * after deserialization from the original FetchEmitTuple.
+ */
+ @Override
public ParseContext getParseContext() {
return parseContext;
}
+ /**
+ * Sets the ParseContext. Called by PipesClient after deserialization
+ * to restore the ParseContext from the original FetchEmitTuple.
+ */
+ public void setParseContext(ParseContext parseContext) {
+ this.parseContext = parseContext;
+ }
+
private static long estimateSizeInBytes(String id, List<Metadata>
metadataList,
String containerStackTrace) {
long sz = 36 + id.length() * 2;
diff --git
a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/serialization/EmitDataDeserializer.java
b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/serialization/EmitDataDeserializer.java
new file mode 100644
index 0000000000..8d8c4e303d
--- /dev/null
+++
b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/serialization/EmitDataDeserializer.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tika.pipes.core.serialization;
+
+import static
org.apache.tika.pipes.core.serialization.EmitDataSerializer.CONTAINER_STACK_TRACE;
+import static
org.apache.tika.pipes.core.serialization.EmitDataSerializer.EMIT_KEY;
+import static
org.apache.tika.pipes.core.serialization.EmitDataSerializer.METADATA_LIST;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import com.fasterxml.jackson.core.JsonParser;
+import com.fasterxml.jackson.databind.DeserializationContext;
+import com.fasterxml.jackson.databind.JsonDeserializer;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+import org.apache.tika.metadata.Metadata;
+import org.apache.tika.pipes.core.emitter.EmitDataImpl;
+import org.apache.tika.utils.StringUtils;
+
+public class EmitDataDeserializer extends JsonDeserializer<EmitDataImpl> {
+
+ @Override
+ public EmitDataImpl deserialize(JsonParser jsonParser,
DeserializationContext deserializationContext) throws IOException {
+ JsonNode root = jsonParser.readValueAsTree();
+ ObjectMapper mapper = (ObjectMapper) jsonParser.getCodec();
+
+ String emitKey = readString(EMIT_KEY, root, null, true);
+ List<Metadata> metadataList = readMetadataList(root, mapper);
+ String containerStackTrace = readString(CONTAINER_STACK_TRACE, root,
StringUtils.EMPTY, false);
+
+ // ParseContext is NOT deserialized - it's restored by PipesClient
from the original FetchEmitTuple
+ return new EmitDataImpl(emitKey, metadataList, containerStackTrace);
+ }
+
+ private static List<Metadata> readMetadataList(JsonNode root, ObjectMapper
mapper) throws IOException {
+ JsonNode metadataListNode = root.get(METADATA_LIST);
+ if (metadataListNode == null || !metadataListNode.isArray()) {
+ return new ArrayList<>();
+ }
+ List<Metadata> metadataList = new ArrayList<>();
+ for (JsonNode metadataNode : metadataListNode) {
+ Metadata metadata = mapper.treeToValue(metadataNode,
Metadata.class);
+ metadataList.add(metadata);
+ }
+ return metadataList;
+ }
+
+ private static String readString(String key, JsonNode root, String
defaultVal, boolean required) throws IOException {
+ JsonNode node = root.get(key);
+ if (node == null) {
+ if (required) {
+ throw new IOException("Required field missing: " + key);
+ }
+ return defaultVal;
+ }
+ return node.asText();
+ }
+}
diff --git
a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/serialization/EmitDataSerializer.java
b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/serialization/EmitDataSerializer.java
new file mode 100644
index 0000000000..5687088e55
--- /dev/null
+++
b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/serialization/EmitDataSerializer.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tika.pipes.core.serialization;
+
+import java.io.IOException;
+
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.fasterxml.jackson.databind.JsonSerializer;
+import com.fasterxml.jackson.databind.SerializerProvider;
+
+import org.apache.tika.pipes.api.emitter.EmitData;
+import org.apache.tika.utils.StringUtils;
+
+public class EmitDataSerializer extends JsonSerializer<EmitData> {
+
+ public static final String EMIT_KEY = "emitKey";
+ public static final String METADATA_LIST = "metadataList";
+ public static final String CONTAINER_STACK_TRACE = "containerStackTrace";
+
+ @Override
+ public void serialize(EmitData emitData, JsonGenerator jsonGenerator,
SerializerProvider serializerProvider) throws IOException {
+ jsonGenerator.writeStartObject();
+ jsonGenerator.writeStringField(EMIT_KEY, emitData.getEmitKey());
+ jsonGenerator.writeObjectField(METADATA_LIST,
emitData.getMetadataList());
+ if (!StringUtils.isBlank(emitData.getContainerStackTrace())) {
+ jsonGenerator.writeStringField(CONTAINER_STACK_TRACE,
emitData.getContainerStackTrace());
+ }
+ // ParseContext is NOT serialized - it's restored by PipesClient from
the original FetchEmitTuple
+ jsonGenerator.writeEndObject();
+ }
+}
diff --git
a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/serialization/JsonPipesIpc.java
b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/serialization/JsonPipesIpc.java
new file mode 100644
index 0000000000..30cb4c6b6a
--- /dev/null
+++
b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/serialization/JsonPipesIpc.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tika.pipes.core.serialization;
+
+import java.io.IOException;
+
+import com.fasterxml.jackson.core.StreamReadConstraints;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.module.SimpleModule;
+import com.fasterxml.jackson.dataformat.smile.SmileFactory;
+
+import org.apache.tika.config.loader.TikaObjectMapperFactory;
+import org.apache.tika.pipes.api.FetchEmitTuple;
+import org.apache.tika.pipes.api.PipesResult;
+import org.apache.tika.pipes.api.emitter.EmitData;
+import org.apache.tika.pipes.core.emitter.EmitDataImpl;
+
+/**
+ * Binary serialization/deserialization for IPC communication between
PipesClient and PipesServer.
+ * <p>
+ * Uses Jackson's Smile binary format for efficient serialization. Smile is a
binary JSON format
+ * that is more compact and faster to parse than text JSON, while maintaining
full compatibility
+ * with the Jackson data binding API.
+ */
+public class JsonPipesIpc {
+
+ private static final ObjectMapper OBJECT_MAPPER;
+
+ static {
+ // Use SmileFactory for binary format - more compact and faster than
text JSON
+ SmileFactory smileFactory = new SmileFactory();
+
+ // Configure stream constraints for large content (e.g., 30MB+
documents)
+ // Default Jackson limit is 20MB which is too small for IPC with large
documents
+ StreamReadConstraints constraints = StreamReadConstraints.builder()
+ .maxStringLength(Integer.MAX_VALUE)
+ .build();
+ smileFactory.setStreamReadConstraints(constraints);
+
+ // Create mapper with Smile factory and register TikaModule for
Metadata/ParseContext serializers
+ OBJECT_MAPPER = TikaObjectMapperFactory.createMapper(smileFactory);
+
+ // Add pipes-specific serializers
+ SimpleModule pipesModule = new SimpleModule();
+ pipesModule.addSerializer(FetchEmitTuple.class, new
FetchEmitTupleSerializer());
+ pipesModule.addDeserializer(FetchEmitTuple.class, new
FetchEmitTupleDeserializer());
+ pipesModule.addSerializer(EmitData.class, new EmitDataSerializer());
+ pipesModule.addDeserializer(EmitDataImpl.class, new
EmitDataDeserializer());
+ pipesModule.addSerializer(PipesResult.class, new
PipesResultSerializer());
+ pipesModule.addDeserializer(PipesResult.class, new
PipesResultDeserializer());
+ OBJECT_MAPPER.registerModule(pipesModule);
+ }
+
+ /**
+ * Serialize an object to Smile binary format bytes.
+ */
+ public static byte[] toBytes(Object obj) throws IOException {
+ return OBJECT_MAPPER.writeValueAsBytes(obj);
+ }
+
+ /**
+ * Deserialize Smile binary format bytes to an object.
+ */
+ public static <T> T fromBytes(byte[] bytes, Class<T> clazz) throws
IOException {
+ return OBJECT_MAPPER.readValue(bytes, clazz);
+ }
+
+ /**
+ * Get the configured ObjectMapper for direct use if needed.
+ */
+ public static ObjectMapper getMapper() {
+ return OBJECT_MAPPER;
+ }
+}
diff --git
a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/serialization/PipesResultDeserializer.java
b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/serialization/PipesResultDeserializer.java
new file mode 100644
index 0000000000..72e27ddc61
--- /dev/null
+++
b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/serialization/PipesResultDeserializer.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tika.pipes.core.serialization;
+
+import static
org.apache.tika.pipes.core.serialization.PipesResultSerializer.EMIT_DATA;
+import static
org.apache.tika.pipes.core.serialization.PipesResultSerializer.MESSAGE;
+import static
org.apache.tika.pipes.core.serialization.PipesResultSerializer.STATUS;
+
+import java.io.IOException;
+
+import com.fasterxml.jackson.core.JsonParser;
+import com.fasterxml.jackson.databind.DeserializationContext;
+import com.fasterxml.jackson.databind.JsonDeserializer;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+import org.apache.tika.pipes.api.PipesResult;
+import org.apache.tika.pipes.core.emitter.EmitDataImpl;
+
+public class PipesResultDeserializer extends JsonDeserializer<PipesResult> {
+
+ @Override
+ public PipesResult deserialize(JsonParser jsonParser,
DeserializationContext deserializationContext) throws IOException {
+ JsonNode root = jsonParser.readValueAsTree();
+ ObjectMapper mapper = (ObjectMapper) jsonParser.getCodec();
+
+ String statusStr = readString(STATUS, root, null, true);
+ PipesResult.RESULT_STATUS status =
PipesResult.RESULT_STATUS.valueOf(statusStr);
+
+ EmitDataImpl emitData = null;
+ JsonNode emitDataNode = root.get(EMIT_DATA);
+ if (emitDataNode != null && !emitDataNode.isNull()) {
+ emitData = mapper.treeToValue(emitDataNode, EmitDataImpl.class);
+ }
+
+ String message = readString(MESSAGE, root, null, false);
+
+ return new PipesResult(status, emitData, message);
+ }
+
+ private static String readString(String key, JsonNode root, String
defaultVal, boolean required) throws IOException {
+ JsonNode node = root.get(key);
+ if (node == null || node.isNull()) {
+ if (required) {
+ throw new IOException("Required field missing: " + key);
+ }
+ return defaultVal;
+ }
+ return node.asText();
+ }
+}
diff --git
a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/serialization/PipesResultSerializer.java
b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/serialization/PipesResultSerializer.java
new file mode 100644
index 0000000000..ebf01a5a47
--- /dev/null
+++
b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/serialization/PipesResultSerializer.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tika.pipes.core.serialization;
+
+import java.io.IOException;
+
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.fasterxml.jackson.databind.JsonSerializer;
+import com.fasterxml.jackson.databind.SerializerProvider;
+
+import org.apache.tika.pipes.api.PipesResult;
+import org.apache.tika.utils.StringUtils;
+
+public class PipesResultSerializer extends JsonSerializer<PipesResult> {
+
+ public static final String STATUS = "status";
+ public static final String EMIT_DATA = "emitData";
+ public static final String MESSAGE = "message";
+
+ @Override
+ public void serialize(PipesResult pipesResult, JsonGenerator
jsonGenerator, SerializerProvider serializerProvider) throws IOException {
+ jsonGenerator.writeStartObject();
+ jsonGenerator.writeStringField(STATUS, pipesResult.status().name());
+ if (pipesResult.emitData() != null) {
+ jsonGenerator.writeObjectField(EMIT_DATA, pipesResult.emitData());
+ }
+ if (!StringUtils.isBlank(pipesResult.message())) {
+ jsonGenerator.writeStringField(MESSAGE, pipesResult.message());
+ }
+ jsonGenerator.writeEndObject();
+ }
+}
diff --git
a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/server/ParseHandler.java
b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/server/ParseHandler.java
index 4330bedd99..b6d31d0f00 100644
---
a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/server/ParseHandler.java
+++
b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/server/ParseHandler.java
@@ -35,7 +35,6 @@ import org.apache.tika.digest.SkipContainerDocumentDigest;
import org.apache.tika.exception.EncryptedDocumentException;
import org.apache.tika.exception.TikaConfigException;
import org.apache.tika.exception.WriteLimitReachedException;
-import org.apache.tika.extractor.DocumentSelector;
import org.apache.tika.extractor.EmbeddedDocumentBytesHandler;
import org.apache.tika.io.TikaInputStream;
import org.apache.tika.metadata.Metadata;
@@ -43,10 +42,12 @@ import org.apache.tika.metadata.TikaCoreProperties;
import org.apache.tika.mime.MediaType;
import org.apache.tika.parser.AutoDetectParser;
import org.apache.tika.parser.ParseContext;
+import org.apache.tika.parser.ParseRecord;
import org.apache.tika.parser.RecursiveParserWrapper;
import org.apache.tika.pipes.api.FetchEmitTuple;
import org.apache.tika.pipes.api.ParseMode;
import org.apache.tika.pipes.core.extractor.EmbeddedDocumentBytesConfig;
+import org.apache.tika.sax.AbstractRecursiveParserWrapperHandler;
import org.apache.tika.sax.BasicContentHandlerFactory;
import org.apache.tika.sax.ContentHandlerFactory;
import org.apache.tika.sax.RecursiveParserWrapperHandler;
@@ -214,18 +215,18 @@ class ParseHandler {
if (contentHandlerFactory instanceof BasicContentHandlerFactory) {
maxEmbedded = ((BasicContentHandlerFactory)
contentHandlerFactory).getMaxEmbeddedResources();
}
- final int finalMaxEmbedded = maxEmbedded;
- parseContext.set(DocumentSelector.class, new DocumentSelector() {
- int embedded = 0;
- @Override
- public boolean select(Metadata metadata) {
- if (finalMaxEmbedded < 0) {
- return true;
- }
- return embedded++ < finalMaxEmbedded;
- }
- });
+ // Configure ParseRecord for embedded document limits
+ // ParseRecord is created by CompositeParser if not present, but we
configure it here
+ // to set the embedded count limit before parsing starts
+ ParseRecord parseRecord = parseContext.get(ParseRecord.class);
+ if (parseRecord == null) {
+ parseRecord = new ParseRecord();
+ parseContext.set(ParseRecord.class, parseRecord);
+ }
+ if (maxEmbedded >= 0) {
+ parseRecord.setMaxEmbeddedCount(maxEmbedded);
+ }
String containerException = null;
long start = System.currentTimeMillis();
@@ -259,6 +260,13 @@ class ParseHandler {
if (writeLimitReached) {
metadata.set(TikaCoreProperties.WRITE_LIMIT_REACHED, true);
}
+ // Set limit reached flags from ParseRecord
+ if (parseRecord.isEmbeddedCountLimitReached()) {
+
metadata.set(AbstractRecursiveParserWrapperHandler.EMBEDDED_RESOURCE_LIMIT_REACHED,
true);
+ }
+ if (parseRecord.isEmbeddedDepthLimitReached()) {
+
metadata.set(AbstractRecursiveParserWrapperHandler.EMBEDDED_DEPTH_LIMIT_REACHED,
true);
+ }
if (LOG.isTraceEnabled()) {
LOG.trace("timer -- parse only time: {} ms",
System.currentTimeMillis() - start);
}
diff --git
a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/server/PipesServer.java
b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/server/PipesServer.java
index a97e8557a9..6e52ceb4d8 100644
---
a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/server/PipesServer.java
+++
b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/server/PipesServer.java
@@ -22,12 +22,11 @@ import static
org.apache.tika.pipes.core.server.PipesServer.PROCESSING_STATUS.IN
import static
org.apache.tika.pipes.core.server.PipesServer.PROCESSING_STATUS.OOM;
import static
org.apache.tika.pipes.core.server.PipesServer.PROCESSING_STATUS.TIMEOUT;
-import java.io.ByteArrayOutputStream;
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.Socket;
@@ -47,8 +46,6 @@ import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
-import org.apache.commons.io.input.UnsynchronizedByteArrayInputStream;
-import org.apache.commons.io.output.UnsynchronizedByteArrayOutputStream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.xml.sax.SAXException;
@@ -74,6 +71,7 @@ import org.apache.tika.pipes.core.config.ConfigStore;
import org.apache.tika.pipes.core.config.ConfigStoreFactory;
import org.apache.tika.pipes.core.emitter.EmitterManager;
import org.apache.tika.pipes.core.fetcher.FetcherManager;
+import org.apache.tika.pipes.core.serialization.JsonPipesIpc;
import org.apache.tika.plugins.ExtensionConfig;
import org.apache.tika.plugins.TikaPluginManager;
import org.apache.tika.sax.ContentHandlerFactory;
@@ -167,8 +165,8 @@ public class PipesServer implements AutoCloseable {
socket.connect(new
InetSocketAddress(InetAddress.getLoopbackAddress(), port),
PipesClient.SOCKET_CONNECT_TIMEOUT_MS);
socket.setTcpNoDelay(true); // Disable Nagle's algorithm to avoid
~40ms delays on small writes
- DataInputStream dis = new DataInputStream(socket.getInputStream());
- DataOutputStream dos = new
DataOutputStream(socket.getOutputStream());
+ DataInputStream dis = new DataInputStream(new
BufferedInputStream(socket.getInputStream()));
+ DataOutputStream dos = new DataOutputStream(new
BufferedOutputStream(socket.getOutputStream()));
try {
TikaLoader tikaLoader = TikaLoader.load(tikaConfigPath);
TikaJsonConfig tikaJsonConfig = tikaLoader.getConfig();
@@ -176,6 +174,7 @@ public class PipesServer implements AutoCloseable {
// Set socket timeout from config after loading PipesConfig
socket.setSoTimeout((int) pipesConfig.getSocketTimeoutMs());
+ socket.setTcpNoDelay(true);
MetadataFilter metadataFilter = tikaLoader.loadMetadataFilters();
ContentHandlerFactory contentHandlerFactory =
tikaLoader.loadContentHandlerFactory();
@@ -410,10 +409,9 @@ public class PipesServer implements AutoCloseable {
private void handleCrash(PROCESSING_STATUS processingStatus, String id,
Throwable t) {
LOG.error("{}: {}", processingStatus, id, t);
String msg = (t != null) ? ExceptionUtils.getStackTrace(t) : "";
- ByteArrayOutputStream bos = new ByteArrayOutputStream();
- try (ObjectOutputStream oos = new ObjectOutputStream(bos)) {
- oos.writeObject(msg);
- write(processingStatus, bos.toByteArray());
+ try {
+ byte[] bytes = JsonPipesIpc.toBytes(msg);
+ write(processingStatus, bytes);
awaitAck();
} catch (IOException e) {
//swallow
@@ -444,19 +442,12 @@ public class PipesServer implements AutoCloseable {
int length = input.readInt();
byte[] bytes = new byte[length];
input.readFully(bytes);
-
- try (ObjectInputStream objectInputStream = new ObjectInputStream(
-
UnsynchronizedByteArrayInputStream.builder().setByteArray(bytes).get())) {
- return (FetchEmitTuple) objectInputStream.readObject();
- }
+ return JsonPipesIpc.fromBytes(bytes, FetchEmitTuple.class);
} catch (IOException e) {
- LOG.error("problem reading tuple", e);
- exit(1);
- } catch (ClassNotFoundException e) {
- LOG.error("can't find class?!", e);
- exit(1);
+ LOG.error("problem reading/deserializing FetchEmitTuple", e);
+ handleCrash(PROCESSING_STATUS.UNSPECIFIED_CRASH, "unknown", e);
}
- //unreachable, no?!
+ //unreachable - handleCrash calls exit
return null;
}
@@ -518,11 +509,8 @@ public class PipesServer implements AutoCloseable {
private void write(PROCESSING_STATUS processingStatus, PipesResult
pipesResult) {
try {
- UnsynchronizedByteArrayOutputStream bos =
UnsynchronizedByteArrayOutputStream.builder().get();
- try (ObjectOutputStream objectOutputStream = new
ObjectOutputStream(bos)) {
- objectOutputStream.writeObject(pipesResult);
- }
- write(processingStatus, bos.toByteArray());
+ byte[] bytes = JsonPipesIpc.toBytes(pipesResult);
+ write(processingStatus, bytes);
} catch (IOException e) {
LOG.error("problem writing emit data (forking process shutdown?)",
e);
exit(1);
@@ -531,11 +519,8 @@ public class PipesServer implements AutoCloseable {
private void writeIntermediate(Metadata metadata) {
try {
- UnsynchronizedByteArrayOutputStream bos =
UnsynchronizedByteArrayOutputStream.builder().get();
- try (ObjectOutputStream objectOutputStream = new
ObjectOutputStream(bos)) {
- objectOutputStream.writeObject(metadata);
- }
- write(INTERMEDIATE_RESULT, bos.toByteArray());
+ byte[] bytes = JsonPipesIpc.toBytes(metadata);
+ write(INTERMEDIATE_RESULT, bytes);
} catch (IOException e) {
LOG.error("problem writing intermediate data (forking process
shutdown?)", e);
exit(1);
diff --git
a/tika-pipes/tika-pipes-integration-tests/src/test/java/org/apache/tika/pipes/core/MockPassbackFilter.java
b/tika-pipes/tika-pipes-integration-tests/src/test/java/org/apache/tika/pipes/core/MockPassbackFilter.java
new file mode 100644
index 0000000000..b3bb3a7889
--- /dev/null
+++
b/tika-pipes/tika-pipes-integration-tests/src/test/java/org/apache/tika/pipes/core/MockPassbackFilter.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tika.pipes.core;
+
+import java.util.List;
+import java.util.Locale;
+
+import org.apache.tika.config.TikaComponent;
+import org.apache.tika.exception.TikaException;
+import org.apache.tika.metadata.Metadata;
+import org.apache.tika.metadata.TikaCoreProperties;
+import org.apache.tika.utils.StringUtils;
+
+/**
+ * Mock PassbackFilter for testing. Removes items without RESOURCE_NAME_KEY
+ * and uppercases the RESOURCE_NAME_KEY values.
+ */
+@TikaComponent(contextKey = PassbackFilter.class)
+public class MockPassbackFilter extends PassbackFilter {
+
+ public MockPassbackFilter() {
+ // Required for Jackson deserialization
+ }
+
+ @Override
+ public void filter(List<Metadata> metadataList) throws TikaException {
+ // Remove items without RESOURCE_NAME_KEY and transform remaining ones
+ metadataList.removeIf(m ->
StringUtils.isBlank(m.get(TikaCoreProperties.RESOURCE_NAME_KEY)));
+ for (Metadata m : metadataList) {
+ String val = m.get(TikaCoreProperties.RESOURCE_NAME_KEY);
+ // Clear all fields and only keep RESOURCE_NAME_KEY (uppercased)
+ for (String name : m.names()) {
+ m.remove(name);
+ }
+ m.set(TikaCoreProperties.RESOURCE_NAME_KEY,
val.toUpperCase(Locale.ROOT));
+ }
+ }
+}
diff --git
a/tika-pipes/tika-pipes-integration-tests/src/test/java/org/apache/tika/pipes/core/PassbackFilterTest.java
b/tika-pipes/tika-pipes-integration-tests/src/test/java/org/apache/tika/pipes/core/PassbackFilterTest.java
index a3a55bb372..853136855d 100644
---
a/tika-pipes/tika-pipes-integration-tests/src/test/java/org/apache/tika/pipes/core/PassbackFilterTest.java
+++
b/tika-pipes/tika-pipes-integration-tests/src/test/java/org/apache/tika/pipes/core/PassbackFilterTest.java
@@ -23,14 +23,12 @@ import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.List;
-import java.util.Locale;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
import org.apache.tika.config.loader.TikaJsonConfig;
-import org.apache.tika.exception.TikaException;
import org.apache.tika.metadata.Metadata;
import org.apache.tika.metadata.TikaCoreProperties;
import org.apache.tika.parser.ParseContext;
@@ -39,7 +37,6 @@ import org.apache.tika.pipes.api.PipesResult;
import org.apache.tika.pipes.api.emitter.EmitKey;
import org.apache.tika.pipes.api.fetcher.FetchKey;
import org.apache.tika.serialization.JsonMetadataList;
-import org.apache.tika.utils.StringUtils;
public class PassbackFilterTest {
@@ -63,7 +60,9 @@ public class PassbackFilterTest {
init(tmpDir);
String emitFileBase = "blah";
ParseContext parseContext = new ParseContext();
- parseContext.set(PassbackFilter.class, new MyPassbackFilter());
+ // Use JSON config approach for Jackson serialization compatibility
+ // Don't resolve here - let PipesServer resolve on its side
+ parseContext.setJsonConfig("mock-passback-filter", "{}");
PipesResult pipesResult = pipesClient.process(
new FetchEmitTuple(testPdfFile, new FetchKey(fetcherId,
testPdfFile),
new EmitKey(emitterId, emitFileBase), new Metadata(),
parseContext,
@@ -96,21 +95,4 @@ public class PassbackFilterTest {
.get(0)
.get(Metadata.CONTENT_LENGTH));
}
-
- private static class MyPassbackFilter extends PassbackFilter {
- @Override
- public void filter(List<Metadata> metadataList) throws TikaException {
- // Remove items without RESOURCE_NAME_KEY and transform remaining
ones
- metadataList.removeIf(m ->
StringUtils.isBlank(m.get(TikaCoreProperties.RESOURCE_NAME_KEY)));
- for (Metadata m : metadataList) {
- String val = m.get(TikaCoreProperties.RESOURCE_NAME_KEY);
- // Clear all fields and only keep RESOURCE_NAME_KEY
(uppercased)
- for (String name : m.names()) {
- m.remove(name);
- }
- m.set(TikaCoreProperties.RESOURCE_NAME_KEY,
val.toUpperCase(Locale.ROOT));
- }
- }
- }
-
}
diff --git
a/tika-pipes/tika-pipes-integration-tests/src/test/java/org/apache/tika/pipes/core/PipesClientTest.java
b/tika-pipes/tika-pipes-integration-tests/src/test/java/org/apache/tika/pipes/core/PipesClientTest.java
index 1cba2622ac..ee18f7a369 100644
---
a/tika-pipes/tika-pipes-integration-tests/src/test/java/org/apache/tika/pipes/core/PipesClientTest.java
+++
b/tika-pipes/tika-pipes-integration-tests/src/test/java/org/apache/tika/pipes/core/PipesClientTest.java
@@ -22,7 +22,6 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
-import java.util.List;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
@@ -32,10 +31,8 @@ import org.apache.tika.config.TikaTaskTimeout;
import org.apache.tika.config.loader.TikaJsonConfig;
import org.apache.tika.metadata.Metadata;
import org.apache.tika.metadata.TikaCoreProperties;
-import org.apache.tika.metadata.filter.AttachmentCountingListFilter;
import org.apache.tika.metadata.filter.CompositeMetadataFilter;
import org.apache.tika.metadata.filter.MetadataFilter;
-import org.apache.tika.metadata.filter.MockUpperCaseFilter;
import org.apache.tika.parser.ParseContext;
import org.apache.tika.pipes.api.FetchEmitTuple;
import org.apache.tika.pipes.api.PipesResult;
@@ -74,8 +71,11 @@ public class PipesClientTest {
@Test
public void testMetadataFilter(@TempDir Path tmp) throws Exception {
ParseContext parseContext = new ParseContext();
- MetadataFilter metadataFilter = new
CompositeMetadataFilter(List.of(new MockUpperCaseFilter()));
- parseContext.set(MetadataFilter.class, metadataFilter);
+ // Use JSON config approach for Jackson serialization compatibility
+ // Don't resolve here - let PipesServer resolve on its side
+ parseContext.setJsonConfig("metadata-filters", """
+ ["mock-upper-case-filter"]
+ """);
PipesClient pipesClient = init(tmp, testDoc);
PipesResult pipesResult = pipesClient.process(
new FetchEmitTuple(testDoc, new FetchKey(fetcherName, testDoc),
@@ -89,8 +89,11 @@ public class PipesClientTest {
@Test
public void testMetadataListFilter(@TempDir Path tmp) throws Exception {
ParseContext parseContext = new ParseContext();
- MetadataFilter metadataFilter = new
CompositeMetadataFilter(List.of(new AttachmentCountingListFilter()));
- parseContext.set(MetadataFilter.class, metadataFilter);
+ // Use JSON config approach for Jackson serialization compatibility
+ // Don't resolve here - let PipesServer resolve on its side
+ parseContext.setJsonConfig("metadata-filters", """
+ ["attachment-counting-list-filter"]
+ """);
String testFile = "mock-embedded.xml";
@@ -175,8 +178,11 @@ public class PipesClientTest {
//I did both manually during development, but unit tests are better. :D
ParseContext parseContext = new ParseContext();
parseContext.set(TikaTaskTimeout.class, new TikaTaskTimeout(1000));
- MetadataFilter metadataFilter = new
CompositeMetadataFilter(List.of(new AttachmentCountingListFilter()));
- parseContext.set(MetadataFilter.class, metadataFilter);
+ // Use JSON config approach for Jackson serialization compatibility
+ // Don't resolve here - let PipesServer resolve on its side
+ parseContext.setJsonConfig("metadata-filters", """
+ ["attachment-counting-list-filter"]
+ """);
String testFile = "mock-timeout-10s.xml";
PipesClient pipesClient = init(tmp, testFile);
diff --git
a/tika-pipes/tika-pipes-plugins/tika-pipes-jdbc/src/main/java/org/apache/tika/pipes/emitter/jdbc/JDBCEmitter.java
b/tika-pipes/tika-pipes-plugins/tika-pipes-jdbc/src/main/java/org/apache/tika/pipes/emitter/jdbc/JDBCEmitter.java
index b1898d5914..86ea09edee 100644
---
a/tika-pipes/tika-pipes-plugins/tika-pipes-jdbc/src/main/java/org/apache/tika/pipes/emitter/jdbc/JDBCEmitter.java
+++
b/tika-pipes/tika-pipes-plugins/tika-pipes-jdbc/src/main/java/org/apache/tika/pipes/emitter/jdbc/JDBCEmitter.java
@@ -183,7 +183,11 @@ public class JDBCEmitter extends AbstractEmitter
implements Closeable {
@Override
public void emit(List<? extends EmitData> emitData) throws IOException {
for (EmitData d : emitData) {
- emit(d.getEmitKey(), d.getMetadataList(), d.getParseContext());
+ ParseContext parseContext = d.getParseContext();
+ if (parseContext == null) {
+ parseContext = new ParseContext();
+ }
+ emit(d.getEmitKey(), d.getMetadataList(), parseContext);
}
}
diff --git a/tika-serialization/pom.xml b/tika-serialization/pom.xml
index 45964fcea1..192ebb2b1a 100644
--- a/tika-serialization/pom.xml
+++ b/tika-serialization/pom.xml
@@ -74,6 +74,11 @@
<artifactId>junit-jupiter</artifactId>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>com.fasterxml.jackson.dataformat</groupId>
+ <artifactId>jackson-dataformat-smile</artifactId>
+ <scope>test</scope>
+ </dependency>
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
diff --git
a/tika-serialization/src/main/java/org/apache/tika/config/loader/FrameworkConfig.java
b/tika-serialization/src/main/java/org/apache/tika/config/loader/FrameworkConfig.java
index 34952ee396..9da1df550e 100644
---
a/tika-serialization/src/main/java/org/apache/tika/config/loader/FrameworkConfig.java
+++
b/tika-serialization/src/main/java/org/apache/tika/config/loader/FrameworkConfig.java
@@ -42,6 +42,11 @@ public class FrameworkConfig {
private static final String MIME_INCLUDE_KEY = "_mime-include";
private static final String MIME_EXCLUDE_KEY = "_mime-exclude";
+ // Plain JSON mapper for converting JsonNodes to JSON strings.
+ // This is needed because the main mapper may use a binary format (e.g.,
Smile)
+ // which doesn't support writeValueAsString().
+ private static final ObjectMapper JSON_MAPPER = new ObjectMapper();
+
private final ParserDecoration decoration;
private final JsonConfig componentConfigJson;
private final JsonNode componentConfigNode;
@@ -64,7 +69,8 @@ public class FrameworkConfig {
public static FrameworkConfig extract(JsonNode configNode,
ObjectMapper objectMapper) throws
IOException {
if (configNode == null || !configNode.isObject()) {
- String jsonString = objectMapper.writeValueAsString(configNode);
+ // Use plain JSON mapper since the main mapper may be binary
(Smile)
+ String jsonString = JSON_MAPPER.writeValueAsString(configNode);
JsonConfig jsonConfig = () -> jsonString;
return new FrameworkConfig(null, jsonConfig, configNode);
}
@@ -81,7 +87,8 @@ public class FrameworkConfig {
}
// Remaining fields are component-specific config
- String jsonString = objectMapper.writeValueAsString(objNode);
+ // Use plain JSON mapper since the main mapper may be binary (Smile)
+ String jsonString = JSON_MAPPER.writeValueAsString(objNode);
JsonConfig componentConfigJson = () -> jsonString;
return new FrameworkConfig(decoration, componentConfigJson, objNode);
diff --git
a/tika-serialization/src/main/java/org/apache/tika/config/loader/TikaObjectMapperFactory.java
b/tika-serialization/src/main/java/org/apache/tika/config/loader/TikaObjectMapperFactory.java
index b8b7e4389d..e65eee07b9 100644
---
a/tika-serialization/src/main/java/org/apache/tika/config/loader/TikaObjectMapperFactory.java
+++
b/tika-serialization/src/main/java/org/apache/tika/config/loader/TikaObjectMapperFactory.java
@@ -16,6 +16,7 @@
*/
package org.apache.tika.config.loader;
+import com.fasterxml.jackson.core.JsonFactory;
import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
@@ -67,7 +68,22 @@ public class TikaObjectMapperFactory {
* @return configured ObjectMapper
*/
public static ObjectMapper createMapper() {
- ObjectMapper mapper = new ObjectMapper();
+ return createMapper(null);
+ }
+
+ /**
+ * Creates an ObjectMapper configured for Tika serialization with a custom
JsonFactory.
+ * <p>
+ * This can be used to create mappers for binary formats like Smile:
+ * <pre>
+ * ObjectMapper smileMapper = TikaObjectMapperFactory.createMapper(new
SmileFactory());
+ * </pre>
+ *
+ * @param factory the JsonFactory to use, or null for default JSON
+ * @return configured ObjectMapper
+ */
+ public static ObjectMapper createMapper(JsonFactory factory) {
+ ObjectMapper mapper = (factory != null) ? new ObjectMapper(factory) :
new ObjectMapper();
// Allow comments in JSON config files (// and /* */ style)
mapper.configure(JsonParser.Feature.ALLOW_COMMENTS, true);
diff --git
a/tika-serialization/src/main/java/org/apache/tika/serialization/TikaModule.java
b/tika-serialization/src/main/java/org/apache/tika/serialization/TikaModule.java
index 8ad0d588ff..31a95d9e4d 100644
---
a/tika-serialization/src/main/java/org/apache/tika/serialization/TikaModule.java
+++
b/tika-serialization/src/main/java/org/apache/tika/serialization/TikaModule.java
@@ -90,6 +90,11 @@ public class TikaModule extends SimpleModule {
private static ObjectMapper sharedMapper;
+ // Plain JSON mapper for converting JsonNodes to JSON strings.
+ // This is needed because the main mapper may use a binary format (e.g.,
Smile)
+ // which doesn't support writeValueAsString().
+ private static final ObjectMapper JSON_MAPPER = new ObjectMapper();
+
/**
* Interfaces that use compact format serialization.
* Types implementing these interfaces will be serialized as:
@@ -324,7 +329,8 @@ public class TikaModule extends SimpleModule {
// Try JsonConfig constructor first (works for any
component)
Constructor<?> jsonConfigCtor =
findJsonConfigConstructor(clazz);
if (jsonConfigCtor != null) {
- String json = mapper.writeValueAsString(cleanedConfig);
+ // Use plain JSON mapper since the main mapper may be
binary (Smile)
+ String json =
JSON_MAPPER.writeValueAsString(cleanedConfig);
instance = jsonConfigCtor.newInstance((JsonConfig) ()
-> json);
} else {
// Fall back to no-arg constructor + Jackson bean
deserialization
diff --git
a/tika-serialization/src/main/java/org/apache/tika/serialization/serdes/ParseContextDeserializer.java
b/tika-serialization/src/main/java/org/apache/tika/serialization/serdes/ParseContextDeserializer.java
index 2dcf296104..2e2b4dec7d 100644
---
a/tika-serialization/src/main/java/org/apache/tika/serialization/serdes/ParseContextDeserializer.java
+++
b/tika-serialization/src/main/java/org/apache/tika/serialization/serdes/ParseContextDeserializer.java
@@ -58,6 +58,11 @@ public class ParseContextDeserializer extends
JsonDeserializer<ParseContext> {
private static final Logger LOG =
LoggerFactory.getLogger(ParseContextDeserializer.class);
+ // Plain JSON mapper for converting JsonNodes to JSON strings.
+ // This is needed because the main mapper may use a binary format (e.g.,
Smile)
+ // which doesn't support writeValueAsString().
+ private static final ObjectMapper JSON_MAPPER = new ObjectMapper();
+
@Override
public ParseContext deserialize(JsonParser jsonParser,
DeserializationContext ctxt)
throws IOException {
@@ -100,7 +105,8 @@ public class ParseContextDeserializer extends
JsonDeserializer<ParseContext> {
deserializeTypedObjects(value, parseContext, mapper);
} else {
// Store as JSON config for lazy resolution
- String json = mapper.writeValueAsString(value);
+ // Use plain JSON mapper since the main mapper may be binary
(Smile)
+ String json = JSON_MAPPER.writeValueAsString(value);
parseContext.setJsonConfig(name, json);
}
}
@@ -143,8 +149,8 @@ public class ParseContextDeserializer extends
JsonDeserializer<ParseContext> {
} catch (ClassNotFoundException e) {
LOG.warn("Could not find class for typed component '{}',
storing as JSON config",
componentName);
- // Fall back to storing as JSON config
- parseContext.setJsonConfig(componentName,
mapper.writeValueAsString(configNode));
+ // Fall back to storing as JSON config (use plain JSON
mapper)
+ parseContext.setJsonConfig(componentName,
JSON_MAPPER.writeValueAsString(configNode));
continue;
}
}
@@ -161,7 +167,8 @@ public class ParseContextDeserializer extends
JsonDeserializer<ParseContext> {
} catch (Exception e) {
LOG.warn("Failed to deserialize typed component '{}' as {},
storing as JSON config",
componentName, configClass.getName(), e);
- parseContext.setJsonConfig(componentName,
mapper.writeValueAsString(configNode));
+ // Use plain JSON mapper since main mapper may be binary
(Smile)
+ parseContext.setJsonConfig(componentName,
JSON_MAPPER.writeValueAsString(configNode));
}
}
}
diff --git
a/tika-serialization/src/main/java/org/apache/tika/serialization/serdes/ParseContextSerializer.java
b/tika-serialization/src/main/java/org/apache/tika/serialization/serdes/ParseContextSerializer.java
index e2545d4033..cf73ad8129 100644
---
a/tika-serialization/src/main/java/org/apache/tika/serialization/serdes/ParseContextSerializer.java
+++
b/tika-serialization/src/main/java/org/apache/tika/serialization/serdes/ParseContextSerializer.java
@@ -91,7 +91,9 @@ public class ParseContextSerializer extends
JsonSerializer<ParseContext> {
hasTypedObjects = true;
}
gen.writeFieldName(keyName);
- gen.writeRawValue(PLAIN_MAPPER.writeValueAsString(value));
+ // Use writeTree instead of writeRawValue for binary format
support (e.g., Smile)
+ // and stricter validation (fails early if value can't be
serialized)
+ gen.writeTree(PLAIN_MAPPER.valueToTree(value));
}
if (hasTypedObjects) {
@@ -102,7 +104,8 @@ public class ParseContextSerializer extends
JsonSerializer<ParseContext> {
Map<String, JsonConfig> jsonConfigs = parseContext.getJsonConfigs();
for (Map.Entry<String, JsonConfig> entry : jsonConfigs.entrySet()) {
gen.writeFieldName(entry.getKey());
- gen.writeRawValue(entry.getValue().json());
+ // Parse the JSON string into a tree for binary format support
+ gen.writeTree(PLAIN_MAPPER.readTree(entry.getValue().json()));
}
gen.writeEndObject();
diff --git
a/tika-serialization/src/test/java/org/apache/tika/serialization/SmileFormatTest.java
b/tika-serialization/src/test/java/org/apache/tika/serialization/SmileFormatTest.java
new file mode 100644
index 0000000000..74836f2526
--- /dev/null
+++
b/tika-serialization/src/test/java/org/apache/tika/serialization/SmileFormatTest.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tika.serialization;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+
+import java.io.IOException;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.dataformat.smile.SmileFactory;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import org.apache.tika.config.loader.FrameworkConfig;
+import org.apache.tika.config.loader.TikaObjectMapperFactory;
+import org.apache.tika.metadata.Metadata;
+import org.apache.tika.parser.ParseContext;
+
+/**
+ * Tests that serialization works correctly with Jackson's Smile binary format.
+ * <p>
+ * Smile is a binary JSON format that doesn't support certain text-based
operations
+ * like writeValueAsString(). These tests ensure our serializers handle this
correctly.
+ */
+public class SmileFormatTest {
+
+ private ObjectMapper smileMapper;
+
+ @BeforeEach
+ public void setUp() {
+ SmileFactory smileFactory = new SmileFactory();
+ smileMapper = TikaObjectMapperFactory.createMapper(smileFactory);
+ }
+
+ @Test
+ public void testMetadataRoundTripWithSmile() throws IOException {
+ Metadata original = new Metadata();
+ original.set("title", "Test Document");
+ original.set("author", "Test Author");
+ original.add("keywords", "test");
+ original.add("keywords", "smile");
+
+ // Serialize to Smile binary format
+ byte[] bytes = smileMapper.writeValueAsBytes(original);
+
+ // Deserialize from Smile binary format
+ Metadata deserialized = smileMapper.readValue(bytes, Metadata.class);
+
+ assertEquals("Test Document", deserialized.get("title"));
+ assertEquals("Test Author", deserialized.get("author"));
+ String[] keywords = deserialized.getValues("keywords");
+ assertEquals(2, keywords.length);
+ }
+
+ @Test
+ public void testParseContextRoundTripWithSmile() throws IOException {
+ ParseContext original = new ParseContext();
+ original.setJsonConfig("test-config", "{\"key\": \"value\"}");
+
+ // Serialize to Smile binary format
+ byte[] bytes = smileMapper.writeValueAsBytes(original);
+
+ // Deserialize from Smile binary format
+ ParseContext deserialized = smileMapper.readValue(bytes,
ParseContext.class);
+
+ assertNotNull(deserialized.getJsonConfig("test-config"));
+ // Compare parsed JSON to avoid whitespace differences
+ ObjectMapper jsonMapper = new ObjectMapper();
+ JsonNode originalNode = jsonMapper.readTree("{\"key\": \"value\"}");
+ JsonNode deserializedNode =
jsonMapper.readTree(deserialized.getJsonConfig("test-config").json());
+ assertEquals(originalNode, deserializedNode);
+ }
+
+ @Test
+ public void testFrameworkConfigExtractWithSmile() throws IOException {
+ // Create a config node using the Smile mapper
+ String json = "{\"option1\": \"value1\", \"option2\": 42}";
+ ObjectMapper jsonMapper = new ObjectMapper();
+ JsonNode configNode = jsonMapper.readTree(json);
+
+ // Extract framework config - this should work even when passed a
Smile mapper
+ // because FrameworkConfig uses a plain JSON mapper internally for
writeValueAsString
+ FrameworkConfig frameworkConfig = FrameworkConfig.extract(configNode,
smileMapper);
+
+ assertNotNull(frameworkConfig.getComponentConfigJson());
+ // The JSON string should be valid
+ String componentJson = frameworkConfig.getComponentConfigJson().json();
+ assertNotNull(componentJson);
+ // Parse it back to verify it's valid JSON
+ JsonNode parsed = jsonMapper.readTree(componentJson);
+ assertEquals("value1", parsed.get("option1").asText());
+ assertEquals(42, parsed.get("option2").asInt());
+ }
+}