This is an automated email from the ASF dual-hosted git repository.
tallison pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/tika.git
The following commit(s) were added to refs/heads/main by this push:
new 9cdb66fd2 TIKA-4498 -- add a PassbackFilter to enable passing back
some of the … (#2353)
9cdb66fd2 is described below
commit 9cdb66fd281c8e4e939202c3d033ea5f76b88f57
Author: Tim Allison <[email protected]>
AuthorDate: Sat Oct 4 13:19:11 2025 -0400
TIKA-4498 -- add a PassbackFilter to enable passing back some of the …
(#2353)
* TIKA-4498 -- add a PassbackFilter to enable passing back some of the
parsed output to the client parser.
---
.../org/apache/tika/pipes/core/PassbackFilter.java | 28 +++++
.../org/apache/tika/pipes/core/PipesClient.java | 18 ++-
.../org/apache/tika/pipes/core/PipesResult.java | 3 +-
.../org/apache/tika/pipes/core/PipesServer.java | 44 ++++++-
.../apache/tika/pipes/core/PassbackFilterTest.java | 127 +++++++++++++++++++++
.../apache/tika/pipes/core/tika-emit-config.xml | 47 ++++++++
.../reporters/jdbc/TestJDBCPipesReporter.java | 2 +-
7 files changed, 257 insertions(+), 12 deletions(-)
diff --git
a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/PassbackFilter.java
b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/PassbackFilter.java
new file mode 100644
index 000000000..af7150354
--- /dev/null
+++
b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/PassbackFilter.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tika.pipes.core;
+
+import java.io.Serializable;
+
+import org.apache.tika.metadata.listfilter.MetadataListFilter;
+
+/**
+ * Filter/Select some of the emitted output and pass it back to the client
parser.
+ */
+public abstract class PassbackFilter extends MetadataListFilter implements
Serializable {
+
+}
diff --git
a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/PipesClient.java
b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/PipesClient.java
index 11eb87205..5055d512e 100644
---
a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/PipesClient.java
+++
b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/PipesClient.java
@@ -334,7 +334,11 @@ public class PipesClient implements Closeable {
//there may have been a parse exception, but the parse didn't
crash
LOG.debug("pipesClientId={} parse success: {} in {} ms",
pipesClientId, t.getId(),
millis);
- return deserializeEmitData();
+ return deserializeEmitData(PipesResult.STATUS.PARSE_SUCCESS);
+ case EMIT_SUCCESS_PASS_BACK:
+ LOG.debug("pipesClientId={} emit success with passback: {} in
{} ms", pipesClientId, t.getId(),
+ millis);
+ return
deserializeEmitData(PipesResult.STATUS.EMIT_SUCCESS_PASSBACK);
case PARSE_EXCEPTION_NO_EMIT:
return readMessage(PipesResult.STATUS.PARSE_EXCEPTION_NO_EMIT);
case EMIT_SUCCESS:
@@ -366,7 +370,7 @@ public class PipesClient implements Closeable {
return new PipesResult(status, msg);
}
- private PipesResult deserializeEmitData() throws IOException {
+ private PipesResult deserializeEmitData(PipesResult.STATUS status) throws
IOException {
int length = input.readInt();
byte[] bytes = new byte[length];
input.readFully(bytes);
@@ -376,9 +380,14 @@ public class PipesClient implements Closeable {
String stack = emitData.getContainerStackTrace();
if (StringUtils.isBlank(stack)) {
- return new PipesResult(emitData);
+ //stack is blank for PARSE_SUCCESS and for PASSBACK
+ return new PipesResult(status, emitData, false);
} else {
- return new PipesResult(emitData, stack);
+ //tweak this
+ if (status == PipesResult.STATUS.PARSE_SUCCESS) {
+ status = PipesResult.STATUS.PARSE_SUCCESS_WITH_EXCEPTION;
+ }
+ return new PipesResult(status, emitData, stack, false);
}
} catch (ClassNotFoundException e) {
LOG.error("class not found exception deserializing data", e);
@@ -387,6 +396,7 @@ public class PipesClient implements Closeable {
}
}
+
private PipesResult deserializeIntermediateResult(EmitKey emitKey,
ParseContext parseContext) throws IOException {
int length = input.readInt();
diff --git
a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/PipesResult.java
b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/PipesResult.java
index efc2f77b5..961c00a75 100644
---
a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/PipesResult.java
+++
b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/PipesResult.java
@@ -34,6 +34,7 @@ public class PipesResult {
OOM, TIMEOUT, UNSPECIFIED_CRASH,
NO_EMITTER_FOUND,
EMIT_SUCCESS, EMIT_SUCCESS_PARSE_EXCEPTION, EMIT_EXCEPTION,
+ EMIT_SUCCESS_PASSBACK,//emit happened and some data is returned
INTERRUPTED_EXCEPTION, NO_FETCHER_FOUND,
INTERMEDIATE_RESULT;
}
@@ -51,7 +52,7 @@ public class PipesResult {
private final EmitData emitData;
private final String message;
- private PipesResult(STATUS status, EmitData emitData, String message,
boolean intermediate) {
+ public PipesResult(STATUS status, EmitData emitData, String message,
boolean intermediate) {
this.status = status;
this.emitData = emitData;
this.message = message;
diff --git
a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/PipesServer.java
b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/PipesServer.java
index d49d95c25..3d7e7288a 100644
---
a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/PipesServer.java
+++
b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/PipesServer.java
@@ -105,7 +105,7 @@ public class PipesServer implements Runnable {
public enum STATUS {
READY, CALL, PING, FAILED_TO_START, FETCHER_NOT_FOUND,
EMITTER_NOT_FOUND,
FETCHER_INITIALIZATION_EXCEPTION, FETCH_EXCEPTION, PARSE_SUCCESS,
PARSE_EXCEPTION_NO_EMIT,
- EMIT_SUCCESS, EMIT_SUCCESS_PARSE_EXCEPTION, EMIT_EXCEPTION, OOM,
TIMEOUT, EMPTY_OUTPUT,
+ EMIT_SUCCESS, EMIT_SUCCESS_PASS_BACK, EMIT_SUCCESS_PARSE_EXCEPTION,
EMIT_EXCEPTION, OOM, TIMEOUT, EMPTY_OUTPUT,
INTERMEDIATE_RESULT;
byte getByte() {
@@ -312,12 +312,44 @@ public class PipesServer implements Runnable {
write(STATUS.EMIT_EXCEPTION, bytes);
return;
}
- if (StringUtils.isBlank(parseExceptionStack)) {
- write(STATUS.EMIT_SUCCESS);
- } else {
- write(STATUS.EMIT_SUCCESS_PARSE_EXCEPTION,
- parseExceptionStack.getBytes(StandardCharsets.UTF_8));
+ writeEmitResponse(emitKey, parseData.metadataList,
parseExceptionStack, parseContext);
+
+ }
+
+ private void writeEmitResponse(EmitKey emitKey, List<Metadata>
metadataList, String parseExceptionStack, ParseContext parseContext) {
+
+ STATUS status = (StringUtils.isBlank(parseExceptionStack)) ?
STATUS.EMIT_SUCCESS :
+ STATUS.EMIT_SUCCESS_PARSE_EXCEPTION;
+ PassbackFilter filter = parseContext.get(PassbackFilter.class);
+ if (filter == null) {
+ if (status == STATUS.EMIT_SUCCESS) {
+ write(status);
+ } else {
+ write(status,
parseExceptionStack.getBytes(StandardCharsets.UTF_8));
+ }
+ return;
}
+ List<Metadata> filtered = null;
+ try {
+ filtered = filter.filter(metadataList);
+ } catch (TikaException e) {
+ LOG.error("problem filtering data for passback", e);
+ exit(1);
+ }
+
+ EmitData filteredEmitData = new EmitData(emitKey, filtered,
parseExceptionStack);
+
+ try {
+ UnsynchronizedByteArrayOutputStream bos =
UnsynchronizedByteArrayOutputStream.builder().get();
+ try (ObjectOutputStream objectOutputStream = new
ObjectOutputStream(bos)) {
+ objectOutputStream.writeObject(filteredEmitData);
+ }
+ write(STATUS.EMIT_SUCCESS_PASS_BACK, bos.toByteArray());
+ } catch (IOException e) {
+ LOG.error("problem writing response data (forking process
shutdown?)", e);
+ exit(1);
+ }
+
}
private void emitContentsAndBytes(Emitter emitter, EmitKey emitKey,
diff --git
a/tika-pipes/tika-pipes-core/src/test/java/org/apache/tika/pipes/core/PassbackFilterTest.java
b/tika-pipes/tika-pipes-core/src/test/java/org/apache/tika/pipes/core/PassbackFilterTest.java
new file mode 100644
index 000000000..d0b064f7e
--- /dev/null
+++
b/tika-pipes/tika-pipes-core/src/test/java/org/apache/tika/pipes/core/PassbackFilterTest.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tika.pipes.core;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Locale;
+import javax.xml.parsers.ParserConfigurationException;
+
+import org.apache.commons.io.FileUtils;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.xml.sax.SAXException;
+
+import org.apache.tika.exception.TikaConfigException;
+import org.apache.tika.exception.TikaException;
+import org.apache.tika.metadata.Metadata;
+import org.apache.tika.metadata.TikaCoreProperties;
+import org.apache.tika.parser.ParseContext;
+import org.apache.tika.pipes.core.emitter.EmitKey;
+import org.apache.tika.pipes.core.fetcher.FetchKey;
+import org.apache.tika.serialization.JsonMetadataList;
+import org.apache.tika.utils.StringUtils;
+
+public class PassbackFilterTest {
+
+ private Path tmpDir;
+ String fetcherName = "fs";
+ String testPdfFile = "testOverlappingText.pdf";
+
+ private PipesClient pipesClient;
+
+ @BeforeEach
+ public void init() throws TikaConfigException, IOException,
ParserConfigurationException, SAXException {
+ Path tikaConfigTemplate = Paths.get("src", "test", "resources", "org",
"apache", "tika", "pipes", "core", "tika-emit-config.xml");
+ tmpDir = Files.createTempDirectory("tika-pipes");
+ Path tikaConfigPath = Files.createTempFile(tmpDir, "tika-pipes-",
".xml");
+ String template = Files.readString(tikaConfigTemplate,
StandardCharsets.UTF_8);
+ template = template.replace("EMITTER_BASE_PATH", tmpDir
+ .toAbsolutePath()
+ .toString());
+ Files.writeString(tikaConfigPath, template);
+ PipesConfig pipesConfig = PipesConfig.load(tikaConfigPath);
+ pipesClient = new PipesClient(pipesConfig);
+ }
+
+ @AfterEach
+ public void tearDown() throws IOException {
+ FileUtils.deleteDirectory(tmpDir.toFile());
+ }
+
+ @Test
+ public void testPassbackFilter() throws Exception {
+ String emitFileBase = "blah";
+ ParseContext parseContext = new ParseContext();
+ parseContext.set(PassbackFilter.class, new MyPassbackFilter());
+ PipesResult pipesResult = pipesClient.process(
+ new FetchEmitTuple(testPdfFile, new FetchKey(fetcherName,
testPdfFile), new EmitKey("fs", emitFileBase), new Metadata(), parseContext,
+ FetchEmitTuple.ON_PARSE_EXCEPTION.SKIP));
+ assertEquals(PipesResult.STATUS.EMIT_SUCCESS_PASSBACK,
pipesResult.getStatus());
+ Assertions.assertNotNull(pipesResult
+ .getEmitData()
+ .getMetadataList());
+ assertEquals(1, pipesResult
+ .getEmitData()
+ .getMetadataList()
+ .size());
+ Metadata metadata = pipesResult
+ .getEmitData()
+ .getMetadataList()
+ .get(0);
+ assertEquals("TESTOVERLAPPINGTEXT.PDF",
metadata.get(TikaCoreProperties.RESOURCE_NAME_KEY));
+ assertNull(metadata.get(Metadata.CONTENT_TYPE));
+ assertNull(metadata.get(Metadata.CONTENT_LENGTH));
+ assertEquals(1, metadata.names().length);
+
+ List<Metadata> metadataList =
JsonMetadataList.fromJson(Files.newBufferedReader(tmpDir.resolve(emitFileBase +
".json"), StandardCharsets.UTF_8));
+ assertEquals(1, metadataList.size());
+ assertEquals("application/pdf", metadataList
+ .get(0)
+ .get(Metadata.CONTENT_TYPE));
+ assertEquals("899", metadataList
+ .get(0)
+ .get(Metadata.CONTENT_LENGTH));
+ }
+
+ private static class MyPassbackFilter extends PassbackFilter {
+ @Override
+ public List<Metadata> filter(List<Metadata> metadataList) throws
TikaException {
+ List<Metadata> ret = new ArrayList<>();
+ for (Metadata m : metadataList) {
+ String val = m.get(TikaCoreProperties.RESOURCE_NAME_KEY);
+ if (!StringUtils.isBlank(val)) {
+ Metadata retM = new Metadata();
+ retM.add(TikaCoreProperties.RESOURCE_NAME_KEY,
val.toUpperCase(Locale.ROOT));
+ ret.add(retM);
+ }
+ }
+ return ret;
+ }
+ }
+
+}
diff --git
a/tika-pipes/tika-pipes-core/src/test/resources/org/apache/tika/pipes/core/tika-emit-config.xml
b/tika-pipes/tika-pipes-core/src/test/resources/org/apache/tika/pipes/core/tika-emit-config.xml
new file mode 100644
index 000000000..d1e04cd2b
--- /dev/null
+++
b/tika-pipes/tika-pipes-core/src/test/resources/org/apache/tika/pipes/core/tika-emit-config.xml
@@ -0,0 +1,47 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<properties>
+ <pipes>
+ <params>
+ <numClients>2</numClients>
+ <forkedJvmArgs>
+ <arg>-Xmx1g</arg>
+ <arg>-XX:ParallelGCThreads=2</arg>
+ </forkedJvmArgs>
+ <timeoutMillis>60000</timeoutMillis>
+ <maxForEmitBatchBytes>0</maxForEmitBatchBytes> <!-- always emit -->
+ </params>
+ </pipes>
+ <autoDetectParserConfig>
+ <digesterFactory
class="org.apache.tika.pipes.core.async.MockDigesterFactory">
+ <skipContainerDocument>false</skipContainerDocument>
+ </digesterFactory>
+ </autoDetectParserConfig>
+ <fetchers>
+ <fetcher class="org.apache.tika.pipes.fetcher.fs.FileSystemFetcher">
+ <name>fs</name>
+ <basePath>src/test/resources/test-documents</basePath>
+ </fetcher>
+ </fetchers>
+ <emitters>
+ <emitter class="org.apache.tika.pipes.emitter.fs.FileSystemEmitter">
+ <name>fs</name>
+ <basePath>EMITTER_BASE_PATH</basePath>
+ </emitter>
+ </emitters>
+</properties>
\ No newline at end of file
diff --git
a/tika-pipes/tika-pipes-reporters/tika-pipes-reporter-jdbc/src/test/java/org/apache/tika/pipes/reporters/jdbc/TestJDBCPipesReporter.java
b/tika-pipes/tika-pipes-reporters/tika-pipes-reporter-jdbc/src/test/java/org/apache/tika/pipes/reporters/jdbc/TestJDBCPipesReporter.java
index 0920b4ad5..533fa6dbd 100644
---
a/tika-pipes/tika-pipes-reporters/tika-pipes-reporter-jdbc/src/test/java/org/apache/tika/pipes/reporters/jdbc/TestJDBCPipesReporter.java
+++
b/tika-pipes/tika-pipes-reporters/tika-pipes-reporter-jdbc/src/test/java/org/apache/tika/pipes/reporters/jdbc/TestJDBCPipesReporter.java
@@ -134,7 +134,7 @@ public class TestJDBCPipesReporter {
Map<PipesResult.STATUS, Long> expected = runBatch(reporter,
numThreads, numIterations);
reporter.close();
Map<PipesResult.STATUS, Long> total = countReported(connectionString);
- assertEquals(16, total.size());
+ assertEquals(17, total.size());
long sum = 0;
for (Map.Entry<PipesResult.STATUS, Long> e : expected.entrySet()) {
if (e.getKey() != PARSE_SUCCESS && e.getKey() !=
PARSE_SUCCESS_WITH_EXCEPTION) {