This is an automated email from the ASF dual-hosted git repository. tallison pushed a commit to branch TIKA-4498 in repository https://gitbox.apache.org/repos/asf/tika.git
commit ccce053cae6dc187ae4807ca088217e981ffae52 Author: tallison <[email protected]> AuthorDate: Fri Oct 3 16:33:11 2025 -0400 TIKA-4498 -- add a PassbackFilter to enable passing back some of the parsed output to the client parser. --- .../org/apache/tika/pipes/core/PassbackFilter.java | 28 +++++ .../org/apache/tika/pipes/core/PipesClient.java | 13 ++- .../org/apache/tika/pipes/core/PipesResult.java | 3 +- .../org/apache/tika/pipes/core/PipesServer.java | 44 ++++++- .../apache/tika/pipes/core/PassbackFilterTest.java | 127 +++++++++++++++++++++ .../apache/tika/pipes/core/tika-emit-config.xml | 47 ++++++++ 6 files changed, 251 insertions(+), 11 deletions(-) diff --git a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/PassbackFilter.java b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/PassbackFilter.java new file mode 100644 index 000000000..af7150354 --- /dev/null +++ b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/PassbackFilter.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.tika.pipes.core; + +import java.io.Serializable; + +import org.apache.tika.metadata.listfilter.MetadataListFilter; + +/** + * Filter/Select some of the emitted output and pass it back to the client parser. + */ +public abstract class PassbackFilter extends MetadataListFilter implements Serializable { + +} diff --git a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/PipesClient.java b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/PipesClient.java index 11eb87205..8c8e2db49 100644 --- a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/PipesClient.java +++ b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/PipesClient.java @@ -334,7 +334,11 @@ public class PipesClient implements Closeable { //there may have been a parse exception, but the parse didn't crash LOG.debug("pipesClientId={} parse success: {} in {} ms", pipesClientId, t.getId(), millis); - return deserializeEmitData(); + return deserializeEmitData(PipesResult.STATUS.PARSE_SUCCESS); + case EMIT_SUCCESS_PASS_BACK: + LOG.debug("pipesClientId={} emit success with passback: {} in {} ms", pipesClientId, t.getId(), + millis); + return deserializeEmitData(PipesResult.STATUS.EMIT_SUCCESS_PASSBACK); case PARSE_EXCEPTION_NO_EMIT: return readMessage(PipesResult.STATUS.PARSE_EXCEPTION_NO_EMIT); case EMIT_SUCCESS: @@ -366,7 +370,7 @@ public class PipesClient implements Closeable { return new PipesResult(status, msg); } - private PipesResult deserializeEmitData() throws IOException { + private PipesResult deserializeEmitData(PipesResult.STATUS status) throws IOException { int length = input.readInt(); byte[] bytes = new byte[length]; input.readFully(bytes); @@ -376,9 +380,9 @@ public class PipesClient implements Closeable { String stack = emitData.getContainerStackTrace(); if (StringUtils.isBlank(stack)) { - return new PipesResult(emitData); + return new PipesResult(status, emitData, false); } else { - return new PipesResult(emitData, stack); + return new PipesResult(status, emitData, stack, false); } } catch (ClassNotFoundException e) { LOG.error("class not found exception deserializing data", e); @@ -387,6 +391,7 @@ public class PipesClient implements Closeable { } } + private PipesResult deserializeIntermediateResult(EmitKey emitKey, ParseContext parseContext) throws IOException { int length = input.readInt(); diff --git a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/PipesResult.java b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/PipesResult.java index efc2f77b5..961c00a75 100644 --- a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/PipesResult.java +++ b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/PipesResult.java @@ -34,6 +34,7 @@ public class PipesResult { OOM, TIMEOUT, UNSPECIFIED_CRASH, NO_EMITTER_FOUND, EMIT_SUCCESS, EMIT_SUCCESS_PARSE_EXCEPTION, EMIT_EXCEPTION, + EMIT_SUCCESS_PASSBACK,//emit happened and some data is returned INTERRUPTED_EXCEPTION, NO_FETCHER_FOUND, INTERMEDIATE_RESULT; } @@ -51,7 +52,7 @@ public class PipesResult { private final EmitData emitData; private final String message; - private PipesResult(STATUS status, EmitData emitData, String message, boolean intermediate) { + public PipesResult(STATUS status, EmitData emitData, String message, boolean intermediate) { this.status = status; this.emitData = emitData; this.message = message; diff --git a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/PipesServer.java b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/PipesServer.java index d49d95c25..3d7e7288a 100644 --- a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/PipesServer.java +++ b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/PipesServer.java @@ -105,7 +105,7 @@ public class PipesServer implements Runnable { public enum STATUS { READY, CALL, PING, FAILED_TO_START, FETCHER_NOT_FOUND, EMITTER_NOT_FOUND, FETCHER_INITIALIZATION_EXCEPTION, FETCH_EXCEPTION, PARSE_SUCCESS, PARSE_EXCEPTION_NO_EMIT, - EMIT_SUCCESS, EMIT_SUCCESS_PARSE_EXCEPTION, EMIT_EXCEPTION, OOM, TIMEOUT, EMPTY_OUTPUT, + EMIT_SUCCESS, EMIT_SUCCESS_PASS_BACK, EMIT_SUCCESS_PARSE_EXCEPTION, EMIT_EXCEPTION, OOM, TIMEOUT, EMPTY_OUTPUT, INTERMEDIATE_RESULT; byte getByte() { @@ -312,12 +312,44 @@ public class PipesServer implements Runnable { write(STATUS.EMIT_EXCEPTION, bytes); return; } - if (StringUtils.isBlank(parseExceptionStack)) { - write(STATUS.EMIT_SUCCESS); - } else { - write(STATUS.EMIT_SUCCESS_PARSE_EXCEPTION, - parseExceptionStack.getBytes(StandardCharsets.UTF_8)); + writeEmitResponse(emitKey, parseData.metadataList, parseExceptionStack, parseContext); + + } + + private void writeEmitResponse(EmitKey emitKey, List<Metadata> metadataList, String parseExceptionStack, ParseContext parseContext) { + + STATUS status = (StringUtils.isBlank(parseExceptionStack)) ? STATUS.EMIT_SUCCESS : + STATUS.EMIT_SUCCESS_PARSE_EXCEPTION; + PassbackFilter filter = parseContext.get(PassbackFilter.class); + if (filter == null) { + if (status == STATUS.EMIT_SUCCESS) { + write(status); + } else { + write(status, parseExceptionStack.getBytes(StandardCharsets.UTF_8)); + } + return; } + List<Metadata> filtered = null; + try { + filtered = filter.filter(metadataList); + } catch (TikaException e) { + LOG.error("problem filtering data for passback", e); + exit(1); + } + + EmitData filteredEmitData = new EmitData(emitKey, filtered, parseExceptionStack); + + try { + UnsynchronizedByteArrayOutputStream bos = UnsynchronizedByteArrayOutputStream.builder().get(); + try (ObjectOutputStream objectOutputStream = new ObjectOutputStream(bos)) { + objectOutputStream.writeObject(filteredEmitData); + } + write(STATUS.EMIT_SUCCESS_PASS_BACK, bos.toByteArray()); + } catch (IOException e) { + LOG.error("problem writing response data (forking process shutdown?)", e); + exit(1); + } + } private void emitContentsAndBytes(Emitter emitter, EmitKey emitKey, diff --git a/tika-pipes/tika-pipes-core/src/test/java/org/apache/tika/pipes/core/PassbackFilterTest.java b/tika-pipes/tika-pipes-core/src/test/java/org/apache/tika/pipes/core/PassbackFilterTest.java new file mode 100644 index 000000000..d0b064f7e --- /dev/null +++ b/tika-pipes/tika-pipes-core/src/test/java/org/apache/tika/pipes/core/PassbackFilterTest.java @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.tika.pipes.core; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNull; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.ArrayList; +import java.util.List; +import java.util.Locale; +import javax.xml.parsers.ParserConfigurationException; + +import org.apache.commons.io.FileUtils; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.xml.sax.SAXException; + +import org.apache.tika.exception.TikaConfigException; +import org.apache.tika.exception.TikaException; +import org.apache.tika.metadata.Metadata; +import org.apache.tika.metadata.TikaCoreProperties; +import org.apache.tika.parser.ParseContext; +import org.apache.tika.pipes.core.emitter.EmitKey; +import org.apache.tika.pipes.core.fetcher.FetchKey; +import org.apache.tika.serialization.JsonMetadataList; +import org.apache.tika.utils.StringUtils; + +public class PassbackFilterTest { + + private Path tmpDir; + String fetcherName = "fs"; + String testPdfFile = "testOverlappingText.pdf"; + + private PipesClient pipesClient; + + @BeforeEach + public void init() throws TikaConfigException, IOException, ParserConfigurationException, SAXException { + Path tikaConfigTemplate = Paths.get("src", "test", "resources", "org", "apache", "tika", "pipes", "core", "tika-emit-config.xml"); + tmpDir = Files.createTempDirectory("tika-pipes"); + Path tikaConfigPath = Files.createTempFile(tmpDir, "tika-pipes-", ".xml"); + String template = Files.readString(tikaConfigTemplate, StandardCharsets.UTF_8); + template = template.replace("EMITTER_BASE_PATH", tmpDir + .toAbsolutePath() + .toString()); + Files.writeString(tikaConfigPath, template); + PipesConfig pipesConfig = PipesConfig.load(tikaConfigPath); + pipesClient = new PipesClient(pipesConfig); + } + + @AfterEach + public void tearDown() throws IOException { + FileUtils.deleteDirectory(tmpDir.toFile()); + } + + @Test + public void testPassbackFilter() throws Exception { + String emitFileBase = "blah"; + ParseContext parseContext = new ParseContext(); + parseContext.set(PassbackFilter.class, new MyPassbackFilter()); + PipesResult pipesResult = pipesClient.process( + new FetchEmitTuple(testPdfFile, new FetchKey(fetcherName, testPdfFile), new EmitKey("fs", emitFileBase), new Metadata(), parseContext, + FetchEmitTuple.ON_PARSE_EXCEPTION.SKIP)); + assertEquals(PipesResult.STATUS.EMIT_SUCCESS_PASSBACK, pipesResult.getStatus()); + Assertions.assertNotNull(pipesResult + .getEmitData() + .getMetadataList()); + assertEquals(1, pipesResult + .getEmitData() + .getMetadataList() + .size()); + Metadata metadata = pipesResult + .getEmitData() + .getMetadataList() + .get(0); + assertEquals("TESTOVERLAPPINGTEXT.PDF", metadata.get(TikaCoreProperties.RESOURCE_NAME_KEY)); + assertNull(metadata.get(Metadata.CONTENT_TYPE)); + assertNull(metadata.get(Metadata.CONTENT_LENGTH)); + assertEquals(1, metadata.names().length); + + List<Metadata> metadataList = JsonMetadataList.fromJson(Files.newBufferedReader(tmpDir.resolve(emitFileBase + ".json"), StandardCharsets.UTF_8)); + assertEquals(1, metadataList.size()); + assertEquals("application/pdf", metadataList + .get(0) + .get(Metadata.CONTENT_TYPE)); + assertEquals("899", metadataList + .get(0) + .get(Metadata.CONTENT_LENGTH)); + } + + private static class MyPassbackFilter extends PassbackFilter { + @Override + public List<Metadata> filter(List<Metadata> metadataList) throws TikaException { + List<Metadata> ret = new ArrayList<>(); + for (Metadata m : metadataList) { + String val = m.get(TikaCoreProperties.RESOURCE_NAME_KEY); + if (!StringUtils.isBlank(val)) { + Metadata retM = new Metadata(); + retM.add(TikaCoreProperties.RESOURCE_NAME_KEY, val.toUpperCase(Locale.ROOT)); + ret.add(retM); + } + } + return ret; + } + } + +} diff --git a/tika-pipes/tika-pipes-core/src/test/resources/org/apache/tika/pipes/core/tika-emit-config.xml b/tika-pipes/tika-pipes-core/src/test/resources/org/apache/tika/pipes/core/tika-emit-config.xml new file mode 100644 index 000000000..d1e04cd2b --- /dev/null +++ b/tika-pipes/tika-pipes-core/src/test/resources/org/apache/tika/pipes/core/tika-emit-config.xml @@ -0,0 +1,47 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<properties> + <pipes> + <params> + <numClients>2</numClients> + <forkedJvmArgs> + <arg>-Xmx1g</arg> + <arg>-XX:ParallelGCThreads=2</arg> + </forkedJvmArgs> + <timeoutMillis>60000</timeoutMillis> + <maxForEmitBatchBytes>0</maxForEmitBatchBytes> <!-- always emit --> + </params> + </pipes> + <autoDetectParserConfig> + <digesterFactory class="org.apache.tika.pipes.core.async.MockDigesterFactory"> + <skipContainerDocument>false</skipContainerDocument> + </digesterFactory> + </autoDetectParserConfig> + <fetchers> + <fetcher class="org.apache.tika.pipes.fetcher.fs.FileSystemFetcher"> + <name>fs</name> + <basePath>src/test/resources/test-documents</basePath> + </fetcher> + </fetchers> + <emitters> + <emitter class="org.apache.tika.pipes.emitter.fs.FileSystemEmitter"> + <name>fs</name> + <basePath>EMITTER_BASE_PATH</basePath> + </emitter> + </emitters> +</properties> \ No newline at end of file
