This is an automated email from the ASF dual-hosted git repository.

tallison pushed a commit to branch TIKA-4498
in repository https://gitbox.apache.org/repos/asf/tika.git

commit ccce053cae6dc187ae4807ca088217e981ffae52
Author: tallison <[email protected]>
AuthorDate: Fri Oct 3 16:33:11 2025 -0400

    TIKA-4498 -- add a PassbackFilter to enable passing back some of the parsed 
output to the client parser.
---
 .../org/apache/tika/pipes/core/PassbackFilter.java |  28 +++++
 .../org/apache/tika/pipes/core/PipesClient.java    |  13 ++-
 .../org/apache/tika/pipes/core/PipesResult.java    |   3 +-
 .../org/apache/tika/pipes/core/PipesServer.java    |  44 ++++++-
 .../apache/tika/pipes/core/PassbackFilterTest.java | 127 +++++++++++++++++++++
 .../apache/tika/pipes/core/tika-emit-config.xml    |  47 ++++++++
 6 files changed, 251 insertions(+), 11 deletions(-)

diff --git 
a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/PassbackFilter.java
 
b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/PassbackFilter.java
new file mode 100644
index 000000000..af7150354
--- /dev/null
+++ 
b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/PassbackFilter.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tika.pipes.core;
+
+import java.io.Serializable;
+
+import org.apache.tika.metadata.listfilter.MetadataListFilter;
+
+/**
+ * Filter/Select some of the emitted output and pass it back to the client 
parser.
+ */
+public abstract class PassbackFilter extends MetadataListFilter implements 
Serializable {
+
+}
diff --git 
a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/PipesClient.java
 
b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/PipesClient.java
index 11eb87205..8c8e2db49 100644
--- 
a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/PipesClient.java
+++ 
b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/PipesClient.java
@@ -334,7 +334,11 @@ public class PipesClient implements Closeable {
                 //there may have been a parse exception, but the parse didn't 
crash
                 LOG.debug("pipesClientId={} parse success: {} in {} ms", 
pipesClientId, t.getId(),
                         millis);
-                return deserializeEmitData();
+                return deserializeEmitData(PipesResult.STATUS.PARSE_SUCCESS);
+            case EMIT_SUCCESS_PASS_BACK:
+                LOG.debug("pipesClientId={} emit success with passback: {} in 
{} ms", pipesClientId, t.getId(),
+                        millis);
+                return 
deserializeEmitData(PipesResult.STATUS.EMIT_SUCCESS_PASSBACK);
             case PARSE_EXCEPTION_NO_EMIT:
                 return readMessage(PipesResult.STATUS.PARSE_EXCEPTION_NO_EMIT);
             case EMIT_SUCCESS:
@@ -366,7 +370,7 @@ public class PipesClient implements Closeable {
         return new PipesResult(status, msg);
     }
 
-    private PipesResult deserializeEmitData() throws IOException {
+    private PipesResult deserializeEmitData(PipesResult.STATUS status) throws 
IOException {
         int length = input.readInt();
         byte[] bytes = new byte[length];
         input.readFully(bytes);
@@ -376,9 +380,9 @@ public class PipesClient implements Closeable {
 
             String stack = emitData.getContainerStackTrace();
             if (StringUtils.isBlank(stack)) {
-                return new PipesResult(emitData);
+                return new PipesResult(status, emitData, false);
             } else {
-                return new PipesResult(emitData, stack);
+                return new PipesResult(status, emitData, stack, false);
             }
         } catch (ClassNotFoundException e) {
             LOG.error("class not found exception deserializing data", e);
@@ -387,6 +391,7 @@ public class PipesClient implements Closeable {
         }
     }
 
+
     private PipesResult deserializeIntermediateResult(EmitKey emitKey, 
ParseContext parseContext) throws IOException {
 
         int length = input.readInt();
diff --git 
a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/PipesResult.java
 
b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/PipesResult.java
index efc2f77b5..961c00a75 100644
--- 
a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/PipesResult.java
+++ 
b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/PipesResult.java
@@ -34,6 +34,7 @@ public class PipesResult {
         OOM, TIMEOUT, UNSPECIFIED_CRASH,
         NO_EMITTER_FOUND,
         EMIT_SUCCESS, EMIT_SUCCESS_PARSE_EXCEPTION, EMIT_EXCEPTION,
+        EMIT_SUCCESS_PASSBACK,//emit happened and some data is returned
         INTERRUPTED_EXCEPTION, NO_FETCHER_FOUND,
         INTERMEDIATE_RESULT;
     }
@@ -51,7 +52,7 @@ public class PipesResult {
     private final EmitData emitData;
     private final String message;
 
-    private PipesResult(STATUS status, EmitData emitData, String message, 
boolean intermediate) {
+    public PipesResult(STATUS status, EmitData emitData, String message, 
boolean intermediate) {
         this.status = status;
         this.emitData = emitData;
         this.message = message;
diff --git 
a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/PipesServer.java
 
b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/PipesServer.java
index d49d95c25..3d7e7288a 100644
--- 
a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/PipesServer.java
+++ 
b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/PipesServer.java
@@ -105,7 +105,7 @@ public class PipesServer implements Runnable {
     public enum STATUS {
         READY, CALL, PING, FAILED_TO_START, FETCHER_NOT_FOUND, 
EMITTER_NOT_FOUND,
         FETCHER_INITIALIZATION_EXCEPTION, FETCH_EXCEPTION, PARSE_SUCCESS, 
PARSE_EXCEPTION_NO_EMIT,
-        EMIT_SUCCESS, EMIT_SUCCESS_PARSE_EXCEPTION, EMIT_EXCEPTION, OOM, 
TIMEOUT, EMPTY_OUTPUT,
+        EMIT_SUCCESS, EMIT_SUCCESS_PASS_BACK, EMIT_SUCCESS_PARSE_EXCEPTION, 
EMIT_EXCEPTION, OOM, TIMEOUT, EMPTY_OUTPUT,
         INTERMEDIATE_RESULT;
 
         byte getByte() {
@@ -312,12 +312,44 @@ public class PipesServer implements Runnable {
             write(STATUS.EMIT_EXCEPTION, bytes);
             return;
         }
-        if (StringUtils.isBlank(parseExceptionStack)) {
-            write(STATUS.EMIT_SUCCESS);
-        } else {
-            write(STATUS.EMIT_SUCCESS_PARSE_EXCEPTION,
-                    parseExceptionStack.getBytes(StandardCharsets.UTF_8));
+        writeEmitResponse(emitKey, parseData.metadataList, 
parseExceptionStack, parseContext);
+
+    }
+
+    private void writeEmitResponse(EmitKey emitKey, List<Metadata> 
metadataList,  String parseExceptionStack, ParseContext parseContext) {
+
+        STATUS status = (StringUtils.isBlank(parseExceptionStack)) ? 
STATUS.EMIT_SUCCESS :
+                STATUS.EMIT_SUCCESS_PARSE_EXCEPTION;
+        PassbackFilter filter = parseContext.get(PassbackFilter.class);
+        if (filter == null) {
+            if (status == STATUS.EMIT_SUCCESS) {
+                write(status);
+            } else {
+                write(status, 
parseExceptionStack.getBytes(StandardCharsets.UTF_8));
+            }
+            return;
         }
+        List<Metadata> filtered = null;
+        try {
+            filtered = filter.filter(metadataList);
+        } catch (TikaException e) {
+            LOG.error("problem filtering data for passback", e);
+            exit(1);
+        }
+
+        EmitData filteredEmitData = new EmitData(emitKey, filtered, 
parseExceptionStack);
+
+        try {
+            UnsynchronizedByteArrayOutputStream bos = 
UnsynchronizedByteArrayOutputStream.builder().get();
+            try (ObjectOutputStream objectOutputStream = new 
ObjectOutputStream(bos)) {
+                objectOutputStream.writeObject(filteredEmitData);
+            }
+            write(STATUS.EMIT_SUCCESS_PASS_BACK, bos.toByteArray());
+        } catch (IOException e) {
+            LOG.error("problem writing response data (forking process 
shutdown?)", e);
+            exit(1);
+        }
+
     }
 
     private void emitContentsAndBytes(Emitter emitter, EmitKey emitKey,
diff --git 
a/tika-pipes/tika-pipes-core/src/test/java/org/apache/tika/pipes/core/PassbackFilterTest.java
 
b/tika-pipes/tika-pipes-core/src/test/java/org/apache/tika/pipes/core/PassbackFilterTest.java
new file mode 100644
index 000000000..d0b064f7e
--- /dev/null
+++ 
b/tika-pipes/tika-pipes-core/src/test/java/org/apache/tika/pipes/core/PassbackFilterTest.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tika.pipes.core;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Locale;
+import javax.xml.parsers.ParserConfigurationException;
+
+import org.apache.commons.io.FileUtils;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.xml.sax.SAXException;
+
+import org.apache.tika.exception.TikaConfigException;
+import org.apache.tika.exception.TikaException;
+import org.apache.tika.metadata.Metadata;
+import org.apache.tika.metadata.TikaCoreProperties;
+import org.apache.tika.parser.ParseContext;
+import org.apache.tika.pipes.core.emitter.EmitKey;
+import org.apache.tika.pipes.core.fetcher.FetchKey;
+import org.apache.tika.serialization.JsonMetadataList;
+import org.apache.tika.utils.StringUtils;
+
+public class PassbackFilterTest {
+
+    private Path tmpDir;
+    String fetcherName = "fs";
+    String testPdfFile = "testOverlappingText.pdf";
+
+    private PipesClient pipesClient;
+
+    @BeforeEach
+    public void init() throws TikaConfigException, IOException, 
ParserConfigurationException, SAXException {
+        Path tikaConfigTemplate = Paths.get("src", "test", "resources", "org", 
"apache", "tika", "pipes", "core", "tika-emit-config.xml");
+        tmpDir = Files.createTempDirectory("tika-pipes");
+        Path tikaConfigPath = Files.createTempFile(tmpDir, "tika-pipes-", 
".xml");
+        String template = Files.readString(tikaConfigTemplate, 
StandardCharsets.UTF_8);
+        template = template.replace("EMITTER_BASE_PATH", tmpDir
+                .toAbsolutePath()
+                .toString());
+        Files.writeString(tikaConfigPath, template);
+        PipesConfig pipesConfig = PipesConfig.load(tikaConfigPath);
+        pipesClient = new PipesClient(pipesConfig);
+    }
+
+    @AfterEach
+    public void tearDown() throws IOException {
+        FileUtils.deleteDirectory(tmpDir.toFile());
+    }
+
+    @Test
+    public void testPassbackFilter() throws Exception {
+        String emitFileBase = "blah";
+        ParseContext parseContext = new ParseContext();
+        parseContext.set(PassbackFilter.class, new MyPassbackFilter());
+        PipesResult pipesResult = pipesClient.process(
+                new FetchEmitTuple(testPdfFile, new FetchKey(fetcherName, 
testPdfFile), new EmitKey("fs", emitFileBase), new Metadata(), parseContext,
+                        FetchEmitTuple.ON_PARSE_EXCEPTION.SKIP));
+        assertEquals(PipesResult.STATUS.EMIT_SUCCESS_PASSBACK, 
pipesResult.getStatus());
+        Assertions.assertNotNull(pipesResult
+                .getEmitData()
+                .getMetadataList());
+        assertEquals(1, pipesResult
+                .getEmitData()
+                .getMetadataList()
+                .size());
+        Metadata metadata = pipesResult
+                .getEmitData()
+                .getMetadataList()
+                .get(0);
+        assertEquals("TESTOVERLAPPINGTEXT.PDF", 
metadata.get(TikaCoreProperties.RESOURCE_NAME_KEY));
+        assertNull(metadata.get(Metadata.CONTENT_TYPE));
+        assertNull(metadata.get(Metadata.CONTENT_LENGTH));
+        assertEquals(1, metadata.names().length);
+
+        List<Metadata> metadataList = 
JsonMetadataList.fromJson(Files.newBufferedReader(tmpDir.resolve(emitFileBase + 
".json"), StandardCharsets.UTF_8));
+        assertEquals(1, metadataList.size());
+        assertEquals("application/pdf", metadataList
+                .get(0)
+                .get(Metadata.CONTENT_TYPE));
+        assertEquals("899", metadataList
+                .get(0)
+                .get(Metadata.CONTENT_LENGTH));
+    }
+
+    private static class MyPassbackFilter extends PassbackFilter {
+        @Override
+        public List<Metadata> filter(List<Metadata> metadataList) throws 
TikaException {
+            List<Metadata> ret = new ArrayList<>();
+            for (Metadata m : metadataList) {
+                String val = m.get(TikaCoreProperties.RESOURCE_NAME_KEY);
+                if (!StringUtils.isBlank(val)) {
+                    Metadata retM = new Metadata();
+                    retM.add(TikaCoreProperties.RESOURCE_NAME_KEY, 
val.toUpperCase(Locale.ROOT));
+                    ret.add(retM);
+                }
+            }
+            return ret;
+        }
+    }
+
+}
diff --git 
a/tika-pipes/tika-pipes-core/src/test/resources/org/apache/tika/pipes/core/tika-emit-config.xml
 
b/tika-pipes/tika-pipes-core/src/test/resources/org/apache/tika/pipes/core/tika-emit-config.xml
new file mode 100644
index 000000000..d1e04cd2b
--- /dev/null
+++ 
b/tika-pipes/tika-pipes-core/src/test/resources/org/apache/tika/pipes/core/tika-emit-config.xml
@@ -0,0 +1,47 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<properties>
+  <pipes>
+    <params>
+      <numClients>2</numClients>
+      <forkedJvmArgs>
+        <arg>-Xmx1g</arg>
+        <arg>-XX:ParallelGCThreads=2</arg>
+      </forkedJvmArgs>
+      <timeoutMillis>60000</timeoutMillis>
+      <maxForEmitBatchBytes>0</maxForEmitBatchBytes> <!-- always emit -->
+    </params>
+  </pipes>
+  <autoDetectParserConfig>
+    <digesterFactory 
class="org.apache.tika.pipes.core.async.MockDigesterFactory">
+      <skipContainerDocument>false</skipContainerDocument>
+    </digesterFactory>
+  </autoDetectParserConfig>
+  <fetchers>
+    <fetcher class="org.apache.tika.pipes.fetcher.fs.FileSystemFetcher">
+      <name>fs</name>
+      <basePath>src/test/resources/test-documents</basePath>
+    </fetcher>
+  </fetchers>
+  <emitters>
+    <emitter class="org.apache.tika.pipes.emitter.fs.FileSystemEmitter">
+      <name>fs</name>
+      <basePath>EMITTER_BASE_PATH</basePath>
+    </emitter>
+  </emitters>
+</properties>
\ No newline at end of file

Reply via email to