This is an automated email from the ASF dual-hosted git repository. tallison pushed a commit to branch elasticsearch-emitter in repository https://gitbox.apache.org/repos/asf/tika.git
commit 8e548d6adb3ccfdb43cb9cfac5df0c1ef482bec8 Author: tballison <[email protected]> AuthorDate: Tue Feb 17 14:32:41 2026 -0500 Add Elasticsearch emitter pipes plugin and integration tests - Add tika-pipes-elasticsearch plugin with ElasticsearchClient, ElasticsearchEmitter, and PF4J plugin wiring. - Add Elasticsearch integration tests with Testcontainers. - Refactor shared logic in OpenSearchClient. Co-authored-by: Cursor <[email protected]> --- tika-integration-tests/pom.xml | 1 + .../pom.xml | 115 +++++ .../elasticsearch/tests/ElasticsearchTest.java | 528 +++++++++++++++++++++ .../tests/ElasticsearchTestClient.java | 151 ++++++ .../elasticsearch/elasticsearch-mappings.json | 19 + .../elasticsearch-parent-child-mappings.json | 28 ++ .../resources/elasticsearch/plugins-template.json | 78 +++ .../resources/pipes-fork-server-custom-log4j2.xml | 33 ++ .../src/test/resources/test-documents/fake_oom.xml | 24 + .../src/test/resources/test-documents/npe.xml | 25 + .../src/test/resources/test-documents/oom.xml | 24 + .../test-documents/test_recursive_embedded.docx | Bin 0 -> 27082 bytes tika-pipes/tika-pipes-plugins/pom.xml | 1 + .../tika-pipes-elasticsearch/pom.xml | 134 ++++++ .../src/main/assembly/assembly.xml | 55 +++ .../emitter/elasticsearch/ElasticsearchClient.java | 428 +++++++++++++++++ .../elasticsearch/ElasticsearchEmitter.java | 127 +++++ .../elasticsearch/ElasticsearchEmitterConfig.java | 65 +++ .../elasticsearch/ElasticsearchEmitterFactory.java | 62 +++ .../emitter/elasticsearch/HttpClientConfig.java | 35 ++ .../pipes/emitter/elasticsearch/JsonResponse.java | 60 +++ .../elasticsearch/ElasticsearchPipesPlugin.java | 50 ++ .../src/main/resources/plugin.properties | 21 + .../elasticsearch/ElasticsearchClientTest.java | 177 +++++++ .../pipes/emitter/opensearch/OpenSearchClient.java | 43 +- 25 files changed, 2283 insertions(+), 1 deletion(-) diff --git a/tika-integration-tests/pom.xml b/tika-integration-tests/pom.xml index a026e088c2..861c812771 100644 --- a/tika-integration-tests/pom.xml +++ b/tika-integration-tests/pom.xml @@ -32,6 +32,7 @@ <packaging>pom</packaging> <modules> + <module>tika-pipes-elasticsearch-integration-tests</module> <module>tika-pipes-opensearch-integration-tests</module> <module>tika-pipes-solr-integration-tests</module> <module>tika-pipes-s3-integration-tests</module> diff --git a/tika-integration-tests/tika-pipes-elasticsearch-integration-tests/pom.xml b/tika-integration-tests/tika-pipes-elasticsearch-integration-tests/pom.xml new file mode 100644 index 0000000000..ce6f15a746 --- /dev/null +++ b/tika-integration-tests/tika-pipes-elasticsearch-integration-tests/pom.xml @@ -0,0 +1,115 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <parent> + <artifactId>tika-integration-tests</artifactId> + <groupId>org.apache.tika</groupId> + <version>4.0.0-SNAPSHOT</version> + </parent> + <modelVersion>4.0.0</modelVersion> + + <artifactId>tika-pipes-elasticsearch-integration-tests</artifactId> + <name>Apache Tika Elasticsearch integration tests</name> + + <dependencies> + <dependency> + <groupId>${project.groupId}</groupId> + <artifactId>tika-app</artifactId> + <version>${project.version}</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>${project.groupId}</groupId> + <artifactId>tika-pipes-file-system</artifactId> + <version>${project.version}</version> + <scope>test</scope> + <type>zip</type> + </dependency> + <dependency> + <groupId>${project.groupId}</groupId> + <artifactId>tika-pipes-elasticsearch</artifactId> + <version>${project.version}</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>${project.groupId}</groupId> + <artifactId>tika-core</artifactId> + <version>${project.version}</version> + <type>test-jar</type> + <scope>test</scope> + </dependency> + <!-- testcontainers — GenericContainer only, no ES client needed --> + <dependency> + <groupId>org.testcontainers</groupId> + <artifactId>testcontainers-junit-jupiter</artifactId> + <scope>test</scope> + </dependency> + </dependencies> + + <build> + <plugins> + <plugin> + <groupId>org.apache.rat</groupId> + <artifactId>apache-rat-plugin</artifactId> + <configuration> + <inputExcludes> + <inputExclude>src/test/resources/elasticsearch/*.json</inputExclude> + </inputExcludes> + </configuration> + </plugin> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-dependency-plugin</artifactId> + <executions> + <execution> + <id>copy-plugins</id> + <phase>process-test-resources</phase> + <goals> + <goal>copy</goal> + </goals> + <configuration> + <outputDirectory>${project.build.directory}/plugins</outputDirectory> + <artifactItems> + <artifactItem> + <groupId>org.apache.tika</groupId> + <artifactId>tika-pipes-file-system</artifactId> + <version>${project.version}</version> + <type>zip</type> + <overWrite>true</overWrite> + </artifactItem> + <artifactItem> + <groupId>org.apache.tika</groupId> + <artifactId>tika-pipes-elasticsearch</artifactId> + <version>${project.version}</version> + <type>zip</type> + <overWrite>true</overWrite> + </artifactItem> + </artifactItems> + </configuration> + </execution> + </executions> + </plugin> + </plugins> + </build> + + <scm> + <tag>3.0.0-rc1</tag> + </scm> +</project> diff --git a/tika-integration-tests/tika-pipes-elasticsearch-integration-tests/src/test/java/org/apache/tika/pipes/elasticsearch/tests/ElasticsearchTest.java b/tika-integration-tests/tika-pipes-elasticsearch-integration-tests/src/test/java/org/apache/tika/pipes/elasticsearch/tests/ElasticsearchTest.java new file mode 100644 index 0000000000..5bcbacef1f --- /dev/null +++ b/tika-integration-tests/tika-pipes-elasticsearch-integration-tests/src/test/java/org/apache/tika/pipes/elasticsearch/tests/ElasticsearchTest.java @@ -0,0 +1,528 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.tika.pipes.elasticsearch.tests; + +import static org.apache.tika.pipes.emitter.elasticsearch.ElasticsearchEmitter.DEFAULT_EMBEDDED_FILE_FIELD_NAME; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.io.File; +import java.io.IOException; +import java.io.InputStream; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +import com.fasterxml.jackson.databind.JsonNode; +import org.apache.commons.io.IOUtils; +import org.jetbrains.annotations.NotNull; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; +import org.testcontainers.containers.GenericContainer; +import org.testcontainers.containers.wait.strategy.HttpWaitStrategy; +import org.testcontainers.junit.jupiter.Testcontainers; +import org.testcontainers.utility.DockerImageName; + +import org.apache.tika.cli.TikaCLI; +import org.apache.tika.client.HttpClientFactory; +import org.apache.tika.config.JsonConfigHelper; +import org.apache.tika.config.loader.TikaJsonConfig; +import org.apache.tika.exception.TikaConfigException; +import org.apache.tika.metadata.Metadata; +import org.apache.tika.parser.ParseContext; +import org.apache.tika.pipes.api.ParseMode; +import org.apache.tika.pipes.api.emitter.Emitter; +import org.apache.tika.pipes.core.emitter.EmitterManager; +import org.apache.tika.pipes.emitter.elasticsearch.ElasticsearchEmitterConfig; +import org.apache.tika.pipes.emitter.elasticsearch.HttpClientConfig; +import org.apache.tika.pipes.emitter.elasticsearch.JsonResponse; +import org.apache.tika.plugins.TikaPluginManager; + +/** + * Integration tests for the Elasticsearch emitter using a Dockerized + * Elasticsearch instance via testcontainers. + * + * <p>Uses a {@link GenericContainer} with the official Elasticsearch Docker + * image. Security is disabled for test simplicity — no ES Java client + * dependency needed. + */ +@Testcontainers(disabledWithoutDocker = true) +public class ElasticsearchTest { + + private static final DockerImageName ES_IMAGE = + DockerImageName.parse( + "docker.elastic.co/elasticsearch/elasticsearch:8.17.0"); + + private static GenericContainer<?> CONTAINER; + + protected static final String TEST_INDEX = "tika-pipes-index"; + private int numTestDocs = 0; + + @BeforeAll + public static void setUp() { + CONTAINER = new GenericContainer<>(ES_IMAGE) + .withExposedPorts(9200) + .withEnv("discovery.type", "single-node") + .withEnv("xpack.security.enabled", "false") + .withEnv("cluster.routing.allocation.disk.threshold_enabled", + "false") + .waitingFor(new HttpWaitStrategy() + .forPort(9200) + .forStatusCode(200)); + + CONTAINER.start(); + } + + @AfterAll + public static void tearDown() { + CONTAINER.close(); + } + + @AfterEach + public void clearIndex() throws TikaConfigException, IOException { + ElasticsearchTestClient client = getNewClient(); + String endpoint = getEndpoint(); + client.deleteIndex(endpoint); + } + + @Test + public void testBasicFSToElasticsearch( + @TempDir Path pipesDirectory, + @TempDir Path testDocDirectory) throws Exception { + + ElasticsearchTestClient client = getNewClient(); + int numHtmlDocs = 42; + createTestHtmlFiles("Happiness", numHtmlDocs, testDocDirectory); + + String endpoint = getEndpoint(); + sendMappings(client, endpoint, TEST_INDEX, + "elasticsearch-mappings.json"); + + runPipes(client, + ElasticsearchEmitterConfig.AttachmentStrategy + .SEPARATE_DOCUMENTS, + ElasticsearchEmitterConfig.UpdateStrategy.UPSERT, + ParseMode.CONCATENATE, endpoint, + pipesDirectory, testDocDirectory); + + String query = "{ \"track_total_hits\": true, \"query\": " + + "{ \"match\": { \"content\": " + + "{ \"query\": \"happiness\" } } } }"; + + JsonResponse results = + client.postJson(endpoint + "/_search", query); + assertEquals(200, results.getStatus()); + assertEquals(numHtmlDocs + 1, + results.getJson().get("hits").get("total").get("value") + .asInt()); + + // match all + query = "{ \"track_total_hits\": true, \"query\": " + + "{ \"match_all\": {} }, " + + "\"from\": 0, \"size\": 1000 }"; + results = client.postJson(endpoint + "/_search", query); + assertEquals(200, results.getStatus()); + assertEquals(numHtmlDocs + numTestDocs, + results.getJson().get("hits").get("total").get("value") + .asInt()); + } + + @Test + public void testParentChildFSToElasticsearch( + @TempDir Path pipesDirectory, + @TempDir Path testDocDirectory) throws Exception { + + int numHtmlDocs = 42; + ElasticsearchTestClient client = getNewClient(); + + createTestHtmlFiles("Happiness", numHtmlDocs, testDocDirectory); + String endpoint = getEndpoint(); + sendMappings(client, endpoint, TEST_INDEX, + "elasticsearch-parent-child-mappings.json"); + + runPipes(client, + ElasticsearchEmitterConfig.AttachmentStrategy.PARENT_CHILD, + ElasticsearchEmitterConfig.UpdateStrategy.OVERWRITE, + ParseMode.RMETA, endpoint, + pipesDirectory, testDocDirectory); + + // match all + String query = "{ " + + "\"from\":0, \"size\":1000," + + "\"track_total_hits\": true, \"query\": { " + + "\"match_all\": {} } }"; + JsonResponse results = + client.postJson(endpoint + "/_search", query); + assertEquals(200, results.getStatus()); + assertEquals(numHtmlDocs + 3 + 12, + // 3 mock files and the .docx has 11 embedded + itself + results.getJson().get("hits").get("total").get("value") + .asInt()); + + // check an embedded file + query = "{ \"track_total_hits\": true, \"query\": " + + "{ \"query_string\": { " + + "\"default_field\": \"content\", " + + "\"query\": \"embed4 zip\", " + + "\"minimum_should_match\":2 } } } "; + results = client.postJson(endpoint + "/_search", query); + assertEquals(200, results.getStatus()); + assertEquals(1, + results.getJson().get("hits").get("total").get("value") + .asInt()); + JsonNode source = results.getJson().get("hits").get("hits") + .get(0).get("_source"); + + Matcher m = Pattern + .compile("\\Atest_recursive_embedded" + + ".docx-[0-9a-f]{8}-[0-9a-f]{4}-" + + "[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}\\Z") + .matcher( + results.getJson().get("hits").get("hits").get(0) + .get("_id").asText()); + assertTrue(m.find(), "test_recursive_embedded.docx_$guid"); + assertEquals("test_recursive_embedded.docx", + results.getJson().get("hits").get("hits").get(0) + .get("_routing").asText()); + assertEquals("test_recursive_embedded.docx", + source.get("relation_type").get("parent").asText()); + assertEquals("embedded", + source.get("relation_type").get("name").asText()); + + assertEquals("application/zip", + source.get("mime").asText()); + + // verify parent query returns all children + query = "{ \"track_total_hits\": true, \"query\": " + + "{ \"parent_id\": { " + + "\"type\": \"embedded\", " + + "\"id\": \"test_recursive_embedded.docx\" } } } "; + results = client.postJson(endpoint + "/_search", query); + assertEquals(11, + results.getJson().get("hits").get("total").get("value") + .asInt()); + } + + @Test + public void testSeparateDocsFSToElasticsearch( + @TempDir Path pipesDirectory, + @TempDir Path testDocDirectory) throws Exception { + + ElasticsearchTestClient client = getNewClient(); + + int numHtmlDocs = 42; + createTestHtmlFiles("Happiness", numHtmlDocs, testDocDirectory); + String endpoint = getEndpoint(); + sendMappings(client, endpoint, TEST_INDEX, + "elasticsearch-mappings.json"); + + runPipes(client, + ElasticsearchEmitterConfig.AttachmentStrategy + .SEPARATE_DOCUMENTS, + ElasticsearchEmitterConfig.UpdateStrategy.OVERWRITE, + ParseMode.RMETA, endpoint, + pipesDirectory, testDocDirectory); + + String query = "{ \"track_total_hits\": true, \"query\": " + + "{ \"match\": { \"content\": " + + "{ \"query\": \"happiness\" } } } }"; + + JsonResponse results = + client.postJson(endpoint + "/_search", query); + assertEquals(200, results.getStatus()); + assertEquals(numHtmlDocs + 1, + results.getJson().get("hits").get("total").get("value") + .asInt()); + + // match all + query = "{ \"track_total_hits\": true, \"query\": { " + + "\"match_all\": {} } }"; + results = client.postJson(endpoint + "/_search", query); + assertEquals(200, results.getStatus()); + assertEquals(numHtmlDocs + 3 + 12, + results.getJson().get("hits").get("total").get("value") + .asInt()); + + // check an embedded file + query = "{ \"track_total_hits\": true, \"query\": " + + "{ \"query_string\": { " + + "\"default_field\": \"content\", " + + "\"query\": \"embed4 zip\", " + + "\"minimum_should_match\":2 } } } "; + results = client.postJson(endpoint + "/_search", query); + assertEquals(200, results.getStatus()); + assertEquals(1, + results.getJson().get("hits").get("total").get("value") + .asInt()); + JsonNode source = results.getJson().get("hits").get("hits") + .get(0).get("_source"); + + Matcher m = Pattern + .compile("\\Atest_recursive_embedded" + + ".docx-[0-9a-f]{8}-[0-9a-f]{4}-" + + "[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}\\Z") + .matcher( + results.getJson().get("hits").get("hits").get(0) + .get("_id").asText()); + assertTrue(m.find(), "test_recursive_embedded.docx-$guid"); + + assertNull( + results.getJson().get("hits").get("hits").get(0) + .get("_routing"), + "test_recursive_embedded.docx"); + assertNull(source.get("relation_type"), + "test_recursive_embedded.docx"); + + assertEquals("application/zip", + source.get("mime").asText()); + + // parent_id query should fail — no join in schema + query = "{ \"track_total_hits\": true, \"query\": " + + "{ \"parent_id\": { " + + "\"type\": \"embedded\", " + + "\"id\": \"test_recursive_embedded.docx\" } } } "; + results = client.postJson(endpoint + "/_search", query); + assertEquals(400, results.getStatus()); + } + + @Test + public void testUpsertSeparateDocsFSToElasticsearch( + @TempDir Path pipesDirectory, + @TempDir Path testDocDirectory) throws Exception { + + ElasticsearchTestClient client = getNewClient(); + + int numHtmlDocs = 42; + createTestHtmlFiles("Happiness", numHtmlDocs, testDocDirectory); + String endpoint = getEndpoint(); + sendMappings(client, endpoint, TEST_INDEX, + "elasticsearch-mappings.json"); + + runPipes(client, + ElasticsearchEmitterConfig.AttachmentStrategy + .SEPARATE_DOCUMENTS, + ElasticsearchEmitterConfig.UpdateStrategy.UPSERT, + ParseMode.RMETA, endpoint, + pipesDirectory, testDocDirectory); + + String query = "{ \"track_total_hits\": true, \"query\": " + + "{ \"match\": { \"content\": " + + "{ \"query\": \"happiness\" } } } }"; + + JsonResponse results = + client.postJson(endpoint + "/_search", query); + assertEquals(200, results.getStatus()); + assertEquals(numHtmlDocs + 1, + results.getJson().get("hits").get("total").get("value") + .asInt()); + } + + @Test + public void testUpsert( + @TempDir Path pipesDirectory, + @TempDir Path testDocDirectory) throws Exception { + + ElasticsearchTestClient client = getNewClient(); + + String endpoint = getEndpoint(); + sendMappings(client, endpoint, TEST_INDEX, + "elasticsearch-mappings.json"); + Path pluginsConfigFile = getPluginsConfig(pipesDirectory, + ElasticsearchEmitterConfig.AttachmentStrategy + .SEPARATE_DOCUMENTS, + ElasticsearchEmitterConfig.UpdateStrategy.UPSERT, + ParseMode.RMETA, endpoint, testDocDirectory); + + TikaJsonConfig tikaJsonConfig = + TikaJsonConfig.load(pluginsConfigFile); + Emitter emitter = EmitterManager + .load(TikaPluginManager.load(tikaJsonConfig), + tikaJsonConfig) + .getEmitter(); + Metadata metadata = new Metadata(); + metadata.set("mime", "mimeA"); + metadata.set("title", "titleA"); + emitter.emit("1", + Collections.singletonList(metadata), new ParseContext()); + client.getJson(endpoint + "/_refresh"); + metadata.set("title", "titleB"); + emitter.emit("1", + Collections.singletonList(metadata), new ParseContext()); + client.getJson(endpoint + "/_refresh"); + + Metadata metadata2 = new Metadata(); + metadata2.set("content", "the quick brown fox"); + emitter.emit("1", + Collections.singletonList(metadata2), new ParseContext()); + client.getJson(endpoint + "/_refresh"); + + String query = "{ \"track_total_hits\": true, \"query\": { " + + "\"match_all\": {} } }"; + JsonResponse response = + client.postJson(endpoint + "/_search", query); + JsonNode doc1 = response.getJson().get("hits").get("hits") + .get(0).get("_source"); + assertEquals("mimeA", doc1.get("mime").asText()); + assertEquals("titleB", doc1.get("title").asText()); + assertEquals("the quick brown fox", + doc1.get("content").asText()); + } + + // ----------------------------------------------------------------- + // Helpers + // ----------------------------------------------------------------- + + private String getEndpoint() { + return "http://" + CONTAINER.getHost() + ":" + + CONTAINER.getMappedPort(9200) + "/" + TEST_INDEX; + } + + private ElasticsearchTestClient getNewClient() + throws TikaConfigException { + HttpClientFactory httpClientFactory = new HttpClientFactory(); + ElasticsearchEmitterConfig config = + new ElasticsearchEmitterConfig( + getEndpoint(), "_id", + ElasticsearchEmitterConfig.AttachmentStrategy + .SEPARATE_DOCUMENTS, + ElasticsearchEmitterConfig.UpdateStrategy + .OVERWRITE, + 10, DEFAULT_EMBEDDED_FILE_FIELD_NAME, null, + new HttpClientConfig(null, null, null, + -1, -1, null, -1)); + return new ElasticsearchTestClient(config, + httpClientFactory.build()); + } + + protected void sendMappings(ElasticsearchTestClient client, + String endpoint, String index, + String mappingsFile) throws Exception { + String mappings = IOUtils.toString( + ElasticsearchTest.class.getResourceAsStream( + "/elasticsearch/" + mappingsFile), + StandardCharsets.UTF_8); + int status = -1; + int tries = 0; + JsonResponse response = null; + while (status != 200 && tries++ < 20) { + response = client.putJson(endpoint, mappings); + if (status != 200) { + Thread.sleep(1000); + } + status = response.getStatus(); + } + if (status != 200) { + throw new IllegalArgumentException( + "couldn't create index/add mappings: " + response); + } + assertTrue(response.getJson().get("acknowledged").asBoolean()); + assertEquals(index, + response.getJson().get("index").asText()); + } + + private void runPipes( + ElasticsearchTestClient client, + ElasticsearchEmitterConfig.AttachmentStrategy attachStrat, + ElasticsearchEmitterConfig.UpdateStrategy updateStrat, + ParseMode parseMode, String endpoint, + Path pipesDirectory, Path testDocDirectory) throws Exception { + + Path pluginsConfig = getPluginsConfig(pipesDirectory, + attachStrat, updateStrat, parseMode, + endpoint, testDocDirectory); + + TikaCLI.main(new String[]{ + "-a", "-c", + pluginsConfig.toAbsolutePath().toString()}); + + // refresh to make content searchable + client.getJson(endpoint + "/_refresh"); + } + + @NotNull + private Path getPluginsConfig( + Path pipesDirectory, + ElasticsearchEmitterConfig.AttachmentStrategy attachStrat, + ElasticsearchEmitterConfig.UpdateStrategy updateStrat, + ParseMode parseMode, String endpoint, + Path testDocDirectory) throws IOException { + + Path tikaConfig = pipesDirectory.resolve("plugins-config.json"); + + Path log4jPropFile = pipesDirectory.resolve("log4j2.xml"); + try (InputStream is = ElasticsearchTest.class + .getResourceAsStream( + "/pipes-fork-server-custom-log4j2.xml")) { + Files.copy(is, log4jPropFile); + } + + boolean includeRouting = (attachStrat == + ElasticsearchEmitterConfig.AttachmentStrategy + .PARENT_CHILD); + + Map<String, Object> replacements = new HashMap<>(); + replacements.put("ATTACHMENT_STRATEGY", + attachStrat.toString()); + replacements.put("UPDATE_STRATEGY", + updateStrat.toString()); + replacements.put("FETCHER_BASE_PATH", testDocDirectory); + replacements.put("PARSE_MODE", parseMode.name()); + replacements.put("INCLUDE_ROUTING", includeRouting); + replacements.put("ELASTICSEARCH_URL", endpoint); + replacements.put("LOG4J_JVM_ARG", + "-Dlog4j.configurationFile=" + + log4jPropFile.toAbsolutePath()); + + JsonConfigHelper.writeConfigFromResource( + "/elasticsearch/plugins-template.json", + ElasticsearchTest.class, replacements, tikaConfig); + + return tikaConfig; + } + + private void createTestHtmlFiles(String bodyContent, + int numHtmlDocs, + Path testDocDirectory) + throws Exception { + Files.createDirectories(testDocDirectory); + for (int i = 0; i < numHtmlDocs; ++i) { + String html = "<html><body>" + bodyContent + + "</body></html>"; + Path p = testDocDirectory.resolve("test-" + i + ".html"); + Files.write(p, html.getBytes(StandardCharsets.UTF_8)); + } + File testDocuments = Paths + .get(ElasticsearchTest.class + .getResource("/test-documents").toURI()) + .toFile(); + for (File f : testDocuments.listFiles()) { + Path targ = testDocDirectory.resolve(f.getName()); + Files.copy(f.toPath(), targ); + numTestDocs++; + } + } +} diff --git a/tika-integration-tests/tika-pipes-elasticsearch-integration-tests/src/test/java/org/apache/tika/pipes/elasticsearch/tests/ElasticsearchTestClient.java b/tika-integration-tests/tika-pipes-elasticsearch-integration-tests/src/test/java/org/apache/tika/pipes/elasticsearch/tests/ElasticsearchTestClient.java new file mode 100644 index 0000000000..3808609924 --- /dev/null +++ b/tika-integration-tests/tika-pipes-elasticsearch-integration-tests/src/test/java/org/apache/tika/pipes/elasticsearch/tests/ElasticsearchTestClient.java @@ -0,0 +1,151 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.tika.pipes.elasticsearch.tests; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStreamReader; +import java.io.Reader; +import java.nio.charset.StandardCharsets; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.http.HttpResponse; +import org.apache.http.client.HttpClient; +import org.apache.http.client.methods.CloseableHttpResponse; +import org.apache.http.client.methods.HttpDelete; +import org.apache.http.client.methods.HttpGet; +import org.apache.http.client.methods.HttpPut; +import org.apache.http.entity.ByteArrayEntity; +import org.apache.http.util.EntityUtils; + +import org.apache.tika.pipes.emitter.elasticsearch.ElasticsearchClient; +import org.apache.tika.pipes.emitter.elasticsearch.ElasticsearchEmitterConfig; +import org.apache.tika.pipes.emitter.elasticsearch.JsonResponse; + +/** + * Extends ElasticsearchClient with GET, PUT, and DELETE for integration + * testing purposes. + */ +public class ElasticsearchTestClient extends ElasticsearchClient { + + public ElasticsearchTestClient(ElasticsearchEmitterConfig config, + HttpClient httpClient) { + super(config, httpClient); + } + + public JsonResponse putJson(String url, String json) throws IOException { + HttpPut httpRequest = new HttpPut(url); + ByteArrayEntity entity = + new ByteArrayEntity(json.getBytes(StandardCharsets.UTF_8)); + httpRequest.setEntity(entity); + httpRequest.setHeader("Accept", "application/json"); + httpRequest.setHeader("Content-type", + "application/json; charset=utf-8"); + + HttpResponse response = null; + try { + response = httpClient.execute(httpRequest); + int status = response.getStatusLine().getStatusCode(); + if (status == 200) { + try (Reader reader = new BufferedReader( + new InputStreamReader( + response.getEntity().getContent(), + StandardCharsets.UTF_8))) { + ObjectMapper mapper = new ObjectMapper(); + JsonNode node = mapper.readTree(reader); + return new JsonResponse(200, node); + } + } else { + return new JsonResponse(status, + new String( + EntityUtils.toByteArray( + response.getEntity()), + StandardCharsets.UTF_8)); + } + } finally { + if (response instanceof CloseableHttpResponse) { + ((CloseableHttpResponse) response).close(); + } + httpRequest.releaseConnection(); + } + } + + public JsonResponse getJson(String url) throws IOException { + HttpGet httpRequest = new HttpGet(url); + httpRequest.setHeader("Accept", "application/json"); + httpRequest.setHeader("Content-type", + "application/json; charset=utf-8"); + + HttpResponse response = null; + try { + response = httpClient.execute(httpRequest); + int status = response.getStatusLine().getStatusCode(); + if (status == 200) { + try (Reader reader = new BufferedReader( + new InputStreamReader( + response.getEntity().getContent(), + StandardCharsets.UTF_8))) { + ObjectMapper mapper = new ObjectMapper(); + JsonNode node = mapper.readTree(reader); + return new JsonResponse(200, node); + } + } else { + return new JsonResponse(status, + new String( + EntityUtils.toByteArray( + response.getEntity()), + StandardCharsets.UTF_8)); + } + } finally { + if (response instanceof CloseableHttpResponse) { + ((CloseableHttpResponse) response).close(); + } + httpRequest.releaseConnection(); + } + } + + public JsonResponse deleteIndex(String url) throws IOException { + HttpDelete httpRequest = new HttpDelete(url); + HttpResponse response = null; + try { + response = httpClient.execute(httpRequest); + int status = response.getStatusLine().getStatusCode(); + if (status == 200) { + try (Reader reader = new BufferedReader( + new InputStreamReader( + response.getEntity().getContent(), + StandardCharsets.UTF_8))) { + ObjectMapper mapper = new ObjectMapper(); + JsonNode node = mapper.readTree(reader); + return new JsonResponse(200, node); + } + } else { + return new JsonResponse(status, + new String( + EntityUtils.toByteArray( + response.getEntity()), + StandardCharsets.UTF_8)); + } + } finally { + if (response instanceof CloseableHttpResponse) { + ((CloseableHttpResponse) response).close(); + } + httpRequest.releaseConnection(); + } + } +} diff --git a/tika-integration-tests/tika-pipes-elasticsearch-integration-tests/src/test/resources/elasticsearch/elasticsearch-mappings.json b/tika-integration-tests/tika-pipes-elasticsearch-integration-tests/src/test/resources/elasticsearch/elasticsearch-mappings.json new file mode 100644 index 0000000000..900457d7ce --- /dev/null +++ b/tika-integration-tests/tika-pipes-elasticsearch-integration-tests/src/test/resources/elasticsearch/elasticsearch-mappings.json @@ -0,0 +1,19 @@ +{ + "settings": { + "number_of_shards": 1 + }, + "mappings" : { + "dynamic": "strict", + "properties" : { + "content" : { "type" : "text"}, + "length" : { "type" : "long"}, + "creators" : { "type" : "text"}, + "title" : { "type" : "text"}, + "mime" : { "type" : "keyword"}, + "tika_exception" : { "type" : "text"}, + "parent" : { "type" : "text"}, + "my_test_parse_time_ms" : {"type": "long"}, + "my_test_parse_status": {"type": "keyword"} + } + } +} \ No newline at end of file diff --git a/tika-integration-tests/tika-pipes-elasticsearch-integration-tests/src/test/resources/elasticsearch/elasticsearch-parent-child-mappings.json b/tika-integration-tests/tika-pipes-elasticsearch-integration-tests/src/test/resources/elasticsearch/elasticsearch-parent-child-mappings.json new file mode 100644 index 0000000000..4b845fde8c --- /dev/null +++ b/tika-integration-tests/tika-pipes-elasticsearch-integration-tests/src/test/resources/elasticsearch/elasticsearch-parent-child-mappings.json @@ -0,0 +1,28 @@ +{ + "settings": { + "number_of_shards": 1 + }, + "mappings" : { + "dynamic": "strict", + "_routing": { + "required":true + }, + "properties" : { + "content" : { "type" : "text"}, + "length" : { "type" : "long"}, + "creators" : { "type" : "text"}, + "title" : { "type" : "text"}, + "mime" : { "type" : "keyword"}, + "tika_exception" : { "type" : "text"}, + "relation_type":{ + "type":"join", + "eager_global_ordinals":true, + "relations":{ + "container":"embedded" + } + }, + "my_test_parse_time_ms" : {"type": "long"}, + "my_test_parse_status": {"type": "keyword"} + } + } +} \ No newline at end of file diff --git a/tika-integration-tests/tika-pipes-elasticsearch-integration-tests/src/test/resources/elasticsearch/plugins-template.json b/tika-integration-tests/tika-pipes-elasticsearch-integration-tests/src/test/resources/elasticsearch/plugins-template.json new file mode 100644 index 0000000000..2975b9d85f --- /dev/null +++ b/tika-integration-tests/tika-pipes-elasticsearch-integration-tests/src/test/resources/elasticsearch/plugins-template.json @@ -0,0 +1,78 @@ +{ + "content-handler-factory": { + "basic-content-handler-factory": { + "type": "TEXT", + "writeLimit": -1, + "throwOnWriteLimitReached": true + } + }, + "fetchers": { + "fsf": { + "file-system-fetcher": { + "basePath": "FETCHER_BASE_PATH" + } + } + }, + "emitters": { + "ese": { + "elasticsearch-emitter": { + "elasticsearchUrl": "ELASTICSEARCH_URL", + "updateStrategy": "UPDATE_STRATEGY", + "attachmentStrategy": "ATTACHMENT_STRATEGY", + "commitWithin": 10, + "idField": "_id", + "embeddedFileFieldName": "embedded", + "httpClientConfig": { + "authScheme": "http", + "connectionTimeout": 60, + "socketTimeout": 60 + } + } + } + }, + "pipes-iterator": { + "file-system-pipes-iterator": { + "basePath": "FETCHER_BASE_PATH", + "countTotal": true, + "fetcherId": "fsf", + "emitterId": "ese" + } + }, + "pipes": { + "parseMode": "PARSE_MODE", + "onParseException": "EMIT", + "emitStrategy": { + "type": "DYNAMIC", + "thresholdBytes": 10000 + }, + "emitMaxEstimatedBytes": 100000, + "emitWithinMillis": 60000, + "numEmitters": 1, + "numClients": 3, + "forkedJvmArgs": [ + "-Xmx512m", + "-XX:ParallelGCThreads=2", + "LOG4J_JVM_ARG" + ], + "timeoutMillis": 60000 + }, + "metadata-filters": [ + { + "date-normalizing-metadata-filter": {} + }, + { + "field-name-mapping-filter": { + "excludeUnmapped": true, + "mappings": { + "X-TIKA:content": "content", + "Content-Length": "length", + "dc:creator": "creators", + "dc:title": "title", + "Content-Type": "mime", + "X-TIKA:EXCEPTION:container_exception": "tika_exception" + } + } + } + ], + "plugin-roots": "target/plugins" +} diff --git a/tika-integration-tests/tika-pipes-elasticsearch-integration-tests/src/test/resources/pipes-fork-server-custom-log4j2.xml b/tika-integration-tests/tika-pipes-elasticsearch-integration-tests/src/test/resources/pipes-fork-server-custom-log4j2.xml new file mode 100644 index 0000000000..63131796bb --- /dev/null +++ b/tika-integration-tests/tika-pipes-elasticsearch-integration-tests/src/test/resources/pipes-fork-server-custom-log4j2.xml @@ -0,0 +1,33 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. +--> +<Configuration> + <Appenders> + <Console name="console" target="SYSTEM_ERR"> + <PatternLayout + pattern="%-5p [%t] %d{HH:mm:ss,SSS} %c %m%n" /> + </Console> + </Appenders> + <Loggers> + <!-- turn off logging for the forked processes --> + <Root level="fatal" additivity="false"> + <AppenderRef ref="console" /> + </Root> + </Loggers> +</Configuration> diff --git a/tika-integration-tests/tika-pipes-elasticsearch-integration-tests/src/test/resources/test-documents/fake_oom.xml b/tika-integration-tests/tika-pipes-elasticsearch-integration-tests/src/test/resources/test-documents/fake_oom.xml new file mode 100644 index 0000000000..42aa9a7785 --- /dev/null +++ b/tika-integration-tests/tika-pipes-elasticsearch-integration-tests/src/test/resources/test-documents/fake_oom.xml @@ -0,0 +1,24 @@ +<?xml version="1.0" encoding="UTF-8" ?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. +--> + +<mock> + <metadata action="add" name="author">Nikolai Lobachevsky</metadata> + <throw class="java.lang.OutOfMemoryError">oom message</throw> +</mock> \ No newline at end of file diff --git a/tika-integration-tests/tika-pipes-elasticsearch-integration-tests/src/test/resources/test-documents/npe.xml b/tika-integration-tests/tika-pipes-elasticsearch-integration-tests/src/test/resources/test-documents/npe.xml new file mode 100644 index 0000000000..93599dbc48 --- /dev/null +++ b/tika-integration-tests/tika-pipes-elasticsearch-integration-tests/src/test/resources/test-documents/npe.xml @@ -0,0 +1,25 @@ +<?xml version="1.0" encoding="UTF-8" ?> + +<!-- + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. +--> +<mock> + <metadata action="add" name="dc:creator">embeddedAuthor</metadata> + <write element="p">some_embedded_content</write>; + <throw class="java.lang.NullPointerException">another null pointer exception</throw>; +</mock> \ No newline at end of file diff --git a/tika-integration-tests/tika-pipes-elasticsearch-integration-tests/src/test/resources/test-documents/oom.xml b/tika-integration-tests/tika-pipes-elasticsearch-integration-tests/src/test/resources/test-documents/oom.xml new file mode 100644 index 0000000000..3ee835e681 --- /dev/null +++ b/tika-integration-tests/tika-pipes-elasticsearch-integration-tests/src/test/resources/test-documents/oom.xml @@ -0,0 +1,24 @@ +<?xml version="1.0" encoding="UTF-8" ?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. +--> + +<mock> + <metadata action="add" name="author">Nikolai Lobachevsky</metadata> + <oom/> +</mock> \ No newline at end of file diff --git a/tika-integration-tests/tika-pipes-elasticsearch-integration-tests/src/test/resources/test-documents/test_recursive_embedded.docx b/tika-integration-tests/tika-pipes-elasticsearch-integration-tests/src/test/resources/test-documents/test_recursive_embedded.docx new file mode 100644 index 0000000000..cd562cbb82 Binary files /dev/null and b/tika-integration-tests/tika-pipes-elasticsearch-integration-tests/src/test/resources/test-documents/test_recursive_embedded.docx differ diff --git a/tika-pipes/tika-pipes-plugins/pom.xml b/tika-pipes/tika-pipes-plugins/pom.xml index dcfe22c58b..84167181e2 100644 --- a/tika-pipes/tika-pipes-plugins/pom.xml +++ b/tika-pipes/tika-pipes-plugins/pom.xml @@ -43,6 +43,7 @@ <module>tika-pipes-json</module> <module>tika-pipes-kafka</module> <module>tika-pipes-microsoft-graph</module> + <module>tika-pipes-elasticsearch</module> <module>tika-pipes-opensearch</module> <module>tika-pipes-s3</module> <module>tika-pipes-solr</module> diff --git a/tika-pipes/tika-pipes-plugins/tika-pipes-elasticsearch/pom.xml b/tika-pipes/tika-pipes-plugins/tika-pipes-elasticsearch/pom.xml new file mode 100644 index 0000000000..4995637318 --- /dev/null +++ b/tika-pipes/tika-pipes-plugins/tika-pipes-elasticsearch/pom.xml @@ -0,0 +1,134 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"> + <parent> + <artifactId>tika-pipes-plugins</artifactId> + <groupId>org.apache.tika</groupId> + <version>4.0.0-SNAPSHOT</version> + </parent> + <modelVersion>4.0.0</modelVersion> + + <artifactId>tika-pipes-elasticsearch</artifactId> + <name>Apache Tika Pipes Elasticsearch</name> + <description> + Elasticsearch emitter for Apache Tika Pipes. + Uses plain HTTP (Apache HttpClient) — no Elasticsearch Java client dependency. + All dependencies are ASL v2 licensed. + </description> + <properties> + <!-- Never include the core artifacts in your plugin lib directory. If you do, it will cause the classloading + to get messed up when finding your plugins. --> + <plugin.excluded.artifactIds>tika-core,tika-pipes-api,tika-serialization,tika-plugins-core</plugin.excluded.artifactIds> + <plugin.excluded.groupIds>org.apache.logging.log4j,org.slf4j</plugin.excluded.groupIds> + </properties> + + <dependencies> + <dependency> + <groupId>${project.groupId}</groupId> + <artifactId>tika-pipes-api</artifactId> + <version>${project.version}</version> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>${project.groupId}</groupId> + <artifactId>tika-httpclient-commons</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>org.eclipse.jetty</groupId> + <artifactId>jetty-io</artifactId> + </dependency> + <dependency> + <groupId>org.eclipse.jetty</groupId> + <artifactId>jetty-http</artifactId> + </dependency> + </dependencies> + + <build> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-dependency-plugin</artifactId> + <executions> + <execution> + <id>copy-dependencies</id> + <phase>package</phase> + <goals> + <goal>copy-dependencies</goal> + </goals> + <configuration> + <outputDirectory>${project.build.directory}/lib</outputDirectory> + <includeScope>runtime</includeScope> + <excludeArtifactIds>${plugin.excluded.artifactIds}</excludeArtifactIds> + <excludeGroupIds>${plugin.excluded.groupIds}</excludeGroupIds> + </configuration> + </execution> + </executions> + </plugin> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-jar-plugin</artifactId> + <configuration> + <archive> + <manifestEntries> + <Plugin-Id>elasticsearch-emitter</Plugin-Id> + <Plugin-Version>${project.version}</Plugin-Version> + <Plugin-Class>org.apache.tika.pipes.plugin.elasticsearch.ElasticsearchPipesPlugin</Plugin-Class> + <Plugin-Provider>Elasticsearch Emitter</Plugin-Provider> + <Plugin-Description>Elasticsearch emitter (plain HTTP, no ES client)</Plugin-Description> + <Plugin-Dependencies></Plugin-Dependencies> + </manifestEntries> + </archive> + </configuration> + </plugin> + <plugin> + <artifactId>maven-assembly-plugin</artifactId> + <configuration> + <descriptors> + <descriptor>src/main/assembly/assembly.xml</descriptor> + </descriptors> + <appendAssemblyId>false</appendAssemblyId> + </configuration> + <executions> + <execution> + <id>make-assembly</id> + <phase>package</phase> + <goals> + <goal>single</goal> + </goals> + </execution> + </executions> + </plugin> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-compiler-plugin</artifactId> + <configuration> + <annotationProcessors> + <annotationProcessor>org.pf4j.processor.ExtensionAnnotationProcessor</annotationProcessor> + </annotationProcessors> + </configuration> + </plugin> + </plugins> + </build> + + <scm> + <tag>3.0.0-rc1</tag> + </scm> +</project> diff --git a/tika-pipes/tika-pipes-plugins/tika-pipes-elasticsearch/src/main/assembly/assembly.xml b/tika-pipes/tika-pipes-plugins/tika-pipes-elasticsearch/src/main/assembly/assembly.xml new file mode 100644 index 0000000000..ea0f8b4a1c --- /dev/null +++ b/tika-pipes/tika-pipes-plugins/tika-pipes-elasticsearch/src/main/assembly/assembly.xml @@ -0,0 +1,55 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<assembly xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xmlns="http://maven.apache.org/ASSEMBLY/2.0.0" + xsi:schemaLocation="http://maven.apache.org/ASSEMBLY/2.0.0 + http://maven.apache.org/xsd/assembly-2.0.0.xsd"> + <id>dependencies-zip</id> + <formats> + <format>zip</format> + </formats> + <includeBaseDirectory>false</includeBaseDirectory> + <fileSets> + <fileSet> + <directory>${project.build.directory}/lib</directory> + <outputDirectory>/lib</outputDirectory> + </fileSet> + <fileSet> + <directory>${project.build.directory}</directory> + <outputDirectory>/lib</outputDirectory> + <includes> + <include>${project.artifactId}-${project.version}.jar</include> + </includes> + </fileSet> + <fileSet> + <directory>${project.build.directory}</directory> + <outputDirectory>/</outputDirectory> + <includes> + <include>classes/META-INF/extensions.idx</include> + <include>classes/META-INF/MANIFEST.MF</include> + </includes> + </fileSet> + <fileSet> + <directory>${project.basedir}/src/main/resources</directory> + <outputDirectory>/</outputDirectory> + <includes> + <include>plugin.properties</include> + </includes> + </fileSet> + </fileSets> +</assembly> diff --git a/tika-pipes/tika-pipes-plugins/tika-pipes-elasticsearch/src/main/java/org/apache/tika/pipes/emitter/elasticsearch/ElasticsearchClient.java b/tika-pipes/tika-pipes-plugins/tika-pipes-elasticsearch/src/main/java/org/apache/tika/pipes/emitter/elasticsearch/ElasticsearchClient.java new file mode 100644 index 0000000000..919c12ab9b --- /dev/null +++ b/tika-pipes/tika-pipes-plugins/tika-pipes-elasticsearch/src/main/java/org/apache/tika/pipes/emitter/elasticsearch/ElasticsearchClient.java @@ -0,0 +1,428 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.tika.pipes.emitter.elasticsearch; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStreamReader; +import java.io.Reader; +import java.io.StringWriter; +import java.nio.charset.StandardCharsets; +import java.util.List; +import java.util.Set; +import java.util.UUID; + +import com.fasterxml.jackson.core.JsonFactory; +import com.fasterxml.jackson.core.JsonGenerator; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.http.HttpResponse; +import org.apache.http.client.HttpClient; +import org.apache.http.client.methods.CloseableHttpResponse; +import org.apache.http.client.methods.HttpPost; +import org.apache.http.entity.StringEntity; +import org.apache.http.util.EntityUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.tika.client.TikaClientException; +import org.apache.tika.metadata.Metadata; +import org.apache.tika.pipes.api.emitter.EmitData; +import org.apache.tika.utils.StringUtils; + +/** + * Plain HTTP client for Elasticsearch's REST API. + * + * <p>This does <b>not</b> use the Elasticsearch Java client library + * (which is SSPL / Elastic License). Instead it talks directly to + * Elasticsearch's {@code _bulk} REST endpoint using Apache HttpClient + * (ASL v2). + * + * <p>Supports API key authentication ({@code Authorization: ApiKey ...}) + * as well as basic auth via the underlying {@link HttpClient}. + */ +public class ElasticsearchClient { + + private static final Logger LOG = + LoggerFactory.getLogger(ElasticsearchClient.class); + + protected final HttpClient httpClient; + + private final MetadataToJsonWriter metadataToJsonWriter; + private final ElasticsearchEmitterConfig config; + + protected ElasticsearchClient(ElasticsearchEmitterConfig config, + HttpClient httpClient) { + this.config = config; + this.httpClient = httpClient; + this.metadataToJsonWriter = + (config.updateStrategy() == + ElasticsearchEmitterConfig.UpdateStrategy.OVERWRITE) + ? new InsertMetadataToJsonWriter() + : new UpsertMetadataToJsonWriter(); + } + + public void emitDocuments(List<? extends EmitData> emitData) + throws IOException, TikaClientException { + StringBuilder json = new StringBuilder(); + for (EmitData d : emitData) { + appendDoc(d.getEmitKey(), d.getMetadataList(), json); + } + emitJson(json); + } + + public void emitDocument(String emitKey, List<Metadata> metadataList) + throws IOException, TikaClientException { + StringBuilder json = new StringBuilder(); + appendDoc(emitKey, metadataList, json); + emitJson(json); + } + + private void emitJson(StringBuilder json) + throws IOException, TikaClientException { + String requestUrl = config.elasticsearchUrl() + "/_bulk"; + JsonResponse response = postJson(requestUrl, json.toString()); + if (response.getStatus() != 200) { + throw new TikaClientException(response.getMsg()); + } else { + // If there's a single error in the bulk response, throw + JsonNode errorNode = response.getJson().get("errors"); + if (errorNode != null && errorNode.asText().equals("true")) { + throw new TikaClientException( + response.getJson().toString()); + } + } + } + + private void appendDoc(String emitKey, List<Metadata> metadataList, + StringBuilder json) throws IOException { + int i = 0; + String routing = + (config.attachmentStrategy() == + ElasticsearchEmitterConfig.AttachmentStrategy + .PARENT_CHILD) ? emitKey : null; + + for (Metadata metadata : metadataList) { + StringBuilder id = new StringBuilder(emitKey); + if (i > 0) { + id.append("-").append(UUID.randomUUID()); + } + String indexJson = + metadataToJsonWriter.getBulkJson(id.toString(), routing); + json.append(indexJson).append("\n"); + if (i == 0) { + json.append(metadataToJsonWriter.writeContainer( + metadata, config.attachmentStrategy())); + } else { + json.append(metadataToJsonWriter.writeEmbedded( + metadata, config.attachmentStrategy(), emitKey, + config.embeddedFileFieldName())); + } + json.append("\n"); + i++; + } + } + + // Package-private for testing + static String metadataToJsonContainerInsert( + Metadata metadata, + ElasticsearchEmitterConfig.AttachmentStrategy attachmentStrategy) + throws IOException { + return new InsertMetadataToJsonWriter().writeContainer( + metadata, attachmentStrategy); + } + + // Package-private for testing + static String metadataToJsonEmbeddedInsert( + Metadata metadata, + ElasticsearchEmitterConfig.AttachmentStrategy attachmentStrategy, + String emitKey, String embeddedFileFieldName) + throws IOException { + return new InsertMetadataToJsonWriter().writeEmbedded( + metadata, attachmentStrategy, emitKey, + embeddedFileFieldName); + } + + public JsonResponse postJson(String url, String json) throws IOException { + HttpPost httpRequest = new HttpPost(url); + StringEntity entity = + new StringEntity(json, StandardCharsets.UTF_8); + httpRequest.setEntity(entity); + httpRequest.setHeader("Accept", "application/json"); + httpRequest.setHeader("Content-type", + "application/json; charset=utf-8"); + + // ES 8.x API key auth + if (!StringUtils.isEmpty(config.apiKey())) { + httpRequest.setHeader("Authorization", + "ApiKey " + config.apiKey()); + } + + HttpResponse response = null; + try { + response = httpClient.execute(httpRequest); + int status = response.getStatusLine().getStatusCode(); + if (status == 200) { + try (Reader reader = new BufferedReader( + new InputStreamReader( + response.getEntity().getContent(), + StandardCharsets.UTF_8))) { + ObjectMapper mapper = new ObjectMapper(); + JsonNode node = mapper.readTree(reader); + if (LOG.isTraceEnabled()) { + LOG.trace("node: {}", node); + } + return new JsonResponse(200, node); + } + } else { + return new JsonResponse(status, + new String( + EntityUtils.toByteArray( + response.getEntity()), + StandardCharsets.UTF_8)); + } + } finally { + if (response instanceof CloseableHttpResponse) { + ((CloseableHttpResponse) response).close(); + } + httpRequest.releaseConnection(); + } + } + + // ----------------------------------------------------------------------- + // JSON writers for _bulk API + // ----------------------------------------------------------------------- + + private interface MetadataToJsonWriter { + String writeContainer( + Metadata metadata, + ElasticsearchEmitterConfig.AttachmentStrategy strategy) + throws IOException; + + String writeEmbedded( + Metadata metadata, + ElasticsearchEmitterConfig.AttachmentStrategy strategy, + String emitKey, String embeddedFileFieldName) + throws IOException; + + String getBulkJson(String id, String routing) throws IOException; + } + + private static class InsertMetadataToJsonWriter + implements MetadataToJsonWriter { + + @Override + public String writeContainer( + Metadata metadata, + ElasticsearchEmitterConfig.AttachmentStrategy strategy) + throws IOException { + StringWriter writer = new StringWriter(); + try (JsonGenerator jg = + new JsonFactory().createGenerator(writer)) { + jg.writeStartObject(); + writeMetadata(metadata, jg); + if (strategy == + ElasticsearchEmitterConfig.AttachmentStrategy + .PARENT_CHILD) { + jg.writeStringField("relation_type", "container"); + } + jg.writeEndObject(); + } + return writer.toString(); + } + + @Override + public String writeEmbedded( + Metadata metadata, + ElasticsearchEmitterConfig.AttachmentStrategy strategy, + String emitKey, String embeddedFileFieldName) + throws IOException { + StringWriter writer = new StringWriter(); + try (JsonGenerator jg = + new JsonFactory().createGenerator(writer)) { + jg.writeStartObject(); + writeMetadata(metadata, jg); + if (strategy == + ElasticsearchEmitterConfig.AttachmentStrategy + .PARENT_CHILD) { + jg.writeObjectFieldStart("relation_type"); + jg.writeStringField("name", embeddedFileFieldName); + jg.writeStringField("parent", emitKey); + jg.writeEndObject(); + } else if (strategy == + ElasticsearchEmitterConfig.AttachmentStrategy + .SEPARATE_DOCUMENTS) { + jg.writeStringField("parent", emitKey); + } + jg.writeEndObject(); + } + return writer.toString(); + } + + @Override + public String getBulkJson(String id, String routing) + throws IOException { + StringWriter writer = new StringWriter(); + try (JsonGenerator jg = + new JsonFactory().createGenerator(writer)) { + jg.writeStartObject(); + jg.writeObjectFieldStart("index"); + jg.writeStringField("_id", id); + if (!StringUtils.isEmpty(routing)) { + jg.writeStringField("routing", routing); + } + jg.writeEndObject(); + jg.writeEndObject(); + } + return writer.toString(); + } + } + + private static class UpsertMetadataToJsonWriter + implements MetadataToJsonWriter { + + @Override + public String writeContainer( + Metadata metadata, + ElasticsearchEmitterConfig.AttachmentStrategy strategy) + throws IOException { + StringWriter writer = new StringWriter(); + try (JsonGenerator jg = + new JsonFactory().createGenerator(writer)) { + jg.writeStartObject(); + jg.writeObjectFieldStart("doc"); + writeMetadata(metadata, jg); + if (strategy == + ElasticsearchEmitterConfig.AttachmentStrategy + .PARENT_CHILD) { + jg.writeStringField("relation_type", "container"); + } + jg.writeEndObject(); + jg.writeBooleanField("doc_as_upsert", true); + jg.writeEndObject(); + } + return writer.toString(); + } + + @Override + public String writeEmbedded( + Metadata metadata, + ElasticsearchEmitterConfig.AttachmentStrategy strategy, + String emitKey, String embeddedFileFieldName) + throws IOException { + StringWriter writer = new StringWriter(); + try (JsonGenerator jg = + new JsonFactory().createGenerator(writer)) { + jg.writeStartObject(); + jg.writeObjectFieldStart("doc"); + writeMetadata(metadata, jg); + if (strategy == + ElasticsearchEmitterConfig.AttachmentStrategy + .PARENT_CHILD) { + jg.writeObjectFieldStart("relation_type"); + jg.writeStringField("name", embeddedFileFieldName); + jg.writeStringField("parent", emitKey); + jg.writeEndObject(); + } else if (strategy == + ElasticsearchEmitterConfig.AttachmentStrategy + .SEPARATE_DOCUMENTS) { + jg.writeStringField("parent", emitKey); + } + jg.writeEndObject(); + jg.writeBooleanField("doc_as_upsert", true); + jg.writeEndObject(); + } + return writer.toString(); + } + + @Override + public String getBulkJson(String id, String routing) + throws IOException { + StringWriter writer = new StringWriter(); + try (JsonGenerator jg = + new JsonFactory().createGenerator(writer)) { + jg.writeStartObject(); + jg.writeObjectFieldStart("update"); + jg.writeStringField("_id", id); + if (!StringUtils.isEmpty(routing)) { + jg.writeStringField("routing", routing); + } + jg.writeNumberField("retry_on_conflict", 3); + jg.writeEndObject(); + jg.writeEndObject(); + } + return writer.toString(); + } + } + + /** + * Metadata fields whose values are serialized JSON from the + * tika-inference pipeline. These must be written as raw JSON + * (arrays/objects) rather than escaped strings so that + * Elasticsearch can index vectors, locators, etc. natively. + */ + static final Set<String> INFERENCE_JSON_FIELDS = Set.of( + "tika:chunks"); + + private static void writeMetadata(Metadata metadata, + JsonGenerator jsonGenerator) + throws IOException { + for (String n : metadata.names()) { + String[] vals = metadata.getValues(n); + if (vals.length == 1) { + if (INFERENCE_JSON_FIELDS.contains(n) + && isValidJson(vals[0])) { + jsonGenerator.writeFieldName(n); + jsonGenerator.writeRawValue(vals[0]); + } else { + jsonGenerator.writeStringField(n, vals[0]); + } + } else { + jsonGenerator.writeArrayFieldStart(n); + for (String v : vals) { + jsonGenerator.writeString(v); + } + jsonGenerator.writeEndArray(); + } + } + } + + private static final ObjectMapper VALIDATION_MAPPER = new ObjectMapper(); + + /** + * Validates that the value is well-formed JSON (array or object) + * before writing it as raw JSON. This prevents injection of + * arbitrary content into the bulk request payload. + */ + private static boolean isValidJson(String value) { + if (value == null || value.isEmpty()) { + return false; + } + char first = value.charAt(0); + if (first != '[' && first != '{') { + return false; + } + try { + VALIDATION_MAPPER.readTree(value); + return true; + } catch (IOException e) { + LOG.warn("Field value starts with '{}' but is not valid JSON; " + + "writing as escaped string", first); + return false; + } + } +} diff --git a/tika-pipes/tika-pipes-plugins/tika-pipes-elasticsearch/src/main/java/org/apache/tika/pipes/emitter/elasticsearch/ElasticsearchEmitter.java b/tika-pipes/tika-pipes-plugins/tika-pipes-elasticsearch/src/main/java/org/apache/tika/pipes/emitter/elasticsearch/ElasticsearchEmitter.java new file mode 100644 index 0000000000..2c8c66783a --- /dev/null +++ b/tika-pipes/tika-pipes-plugins/tika-pipes-elasticsearch/src/main/java/org/apache/tika/pipes/emitter/elasticsearch/ElasticsearchEmitter.java @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.tika.pipes.emitter.elasticsearch; + +import java.io.IOException; +import java.util.List; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.tika.client.HttpClientFactory; +import org.apache.tika.client.TikaClientException; +import org.apache.tika.config.ConfigValidator; +import org.apache.tika.exception.TikaConfigException; +import org.apache.tika.metadata.Metadata; +import org.apache.tika.parser.ParseContext; +import org.apache.tika.pipes.api.emitter.AbstractEmitter; +import org.apache.tika.pipes.api.emitter.EmitData; +import org.apache.tika.plugins.ExtensionConfig; + +/** + * Emitter that sends documents to Elasticsearch via its REST API. + * + * <p>This emitter does <b>not</b> depend on the Elasticsearch Java client + * (which changed to a non-ASL license). It uses plain HTTP via + * Apache HttpClient to call the {@code _bulk} endpoint directly. + * + * <p>Supports: + * <ul> + * <li>API key authentication ({@code Authorization: ApiKey <base64>}) + * — common with Elasticsearch 8.x</li> + * <li>Basic authentication (username/password via httpClientConfig)</li> + * <li>OVERWRITE and UPSERT update strategies</li> + * <li>SEPARATE_DOCUMENTS and PARENT_CHILD attachment strategies</li> + * </ul> + */ +public class ElasticsearchEmitter extends AbstractEmitter { + + public static final String DEFAULT_EMBEDDED_FILE_FIELD_NAME = "embedded"; + private static final Logger LOG = + LoggerFactory.getLogger(ElasticsearchEmitter.class); + + private ElasticsearchClient elasticsearchClient; + private final HttpClientFactory httpClientFactory; + private final ElasticsearchEmitterConfig config; + + public static ElasticsearchEmitter build(ExtensionConfig pluginConfig) + throws TikaConfigException, IOException { + ElasticsearchEmitterConfig config = + ElasticsearchEmitterConfig.load(pluginConfig.json()); + return new ElasticsearchEmitter(pluginConfig, config); + } + + public ElasticsearchEmitter(ExtensionConfig pluginConfig, + ElasticsearchEmitterConfig config) + throws IOException, TikaConfigException { + super(pluginConfig); + this.config = config; + httpClientFactory = new HttpClientFactory(); + configure(); + } + + @Override + public void emit(List<? extends EmitData> emitData) throws IOException { + if (emitData == null || emitData.isEmpty()) { + LOG.debug("metadataList is null or empty"); + return; + } + + try { + LOG.debug("about to emit {} docs", emitData.size()); + elasticsearchClient.emitDocuments(emitData); + LOG.info("successfully emitted {} docs", emitData.size()); + } catch (TikaClientException e) { + LOG.warn("problem emitting docs", e); + throw new IOException(e.getMessage(), e); + } + } + + @Override + public void emit(String emitKey, List<Metadata> metadataList, + ParseContext parseContext) throws IOException { + if (metadataList == null || metadataList.isEmpty()) { + LOG.debug("metadataList is null or empty"); + return; + } + try { + LOG.debug("about to emit one doc with {} metadata entries", + metadataList.size()); + elasticsearchClient.emitDocument(emitKey, metadataList); + LOG.info("successfully emitted one doc"); + } catch (TikaClientException e) { + LOG.warn("problem emitting doc", e); + throw new IOException("failed to add document", e); + } + } + + private void configure() throws TikaConfigException { + ConfigValidator.mustNotBeEmpty("elasticsearchUrl", + config.elasticsearchUrl()); + ConfigValidator.mustNotBeEmpty("idField", config.idField()); + + HttpClientConfig http = config.httpClientConfig(); + if (http != null) { + httpClientFactory.setUserName(http.userName()); + httpClientFactory.setPassword(http.password()); + } + + elasticsearchClient = + new ElasticsearchClient(config, httpClientFactory.build()); + } + +} diff --git a/tika-pipes/tika-pipes-plugins/tika-pipes-elasticsearch/src/main/java/org/apache/tika/pipes/emitter/elasticsearch/ElasticsearchEmitterConfig.java b/tika-pipes/tika-pipes-plugins/tika-pipes-elasticsearch/src/main/java/org/apache/tika/pipes/emitter/elasticsearch/ElasticsearchEmitterConfig.java new file mode 100644 index 0000000000..874431daac --- /dev/null +++ b/tika-pipes/tika-pipes-plugins/tika-pipes-elasticsearch/src/main/java/org/apache/tika/pipes/emitter/elasticsearch/ElasticsearchEmitterConfig.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.tika.pipes.emitter.elasticsearch; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; + +import org.apache.tika.exception.TikaConfigException; + +/** + * Configuration for the Elasticsearch emitter. + * + * @param elasticsearchUrl Full URL including index, e.g. {@code https://localhost:9200/my-index} + * @param idField Metadata field to use as the document {@code _id} + * @param attachmentStrategy How to handle embedded documents + * @param updateStrategy Whether to overwrite or upsert + * @param commitWithin Not used by ES, kept for API parity with OpenSearch emitter + * @param embeddedFileFieldName Field name for embedded-file relation + * @param apiKey Elasticsearch API key for authentication (Base64-encoded + * {@code id:api_key}). Sent as {@code Authorization: ApiKey <value>}. + * If null/empty, falls back to httpClientConfig's userName/password + * for basic auth. + * @param httpClientConfig HTTP connection settings (basic auth, timeouts, proxy) + */ +public record ElasticsearchEmitterConfig(String elasticsearchUrl, String idField, + AttachmentStrategy attachmentStrategy, + UpdateStrategy updateStrategy, int commitWithin, + String embeddedFileFieldName, String apiKey, + HttpClientConfig httpClientConfig) { + public enum AttachmentStrategy { + SEPARATE_DOCUMENTS, PARENT_CHILD, + } + + public enum UpdateStrategy { + OVERWRITE, UPSERT + } + + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + + public static ElasticsearchEmitterConfig load(final String json) + throws TikaConfigException { + try { + return OBJECT_MAPPER.readValue(json, + ElasticsearchEmitterConfig.class); + } catch (JsonProcessingException e) { + throw new TikaConfigException( + "Failed to parse ElasticsearchEmitterConfig from JSON", e); + } + } + +} diff --git a/tika-pipes/tika-pipes-plugins/tika-pipes-elasticsearch/src/main/java/org/apache/tika/pipes/emitter/elasticsearch/ElasticsearchEmitterFactory.java b/tika-pipes/tika-pipes-plugins/tika-pipes-elasticsearch/src/main/java/org/apache/tika/pipes/emitter/elasticsearch/ElasticsearchEmitterFactory.java new file mode 100644 index 0000000000..94bda68b2a --- /dev/null +++ b/tika-pipes/tika-pipes-plugins/tika-pipes-elasticsearch/src/main/java/org/apache/tika/pipes/emitter/elasticsearch/ElasticsearchEmitterFactory.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.tika.pipes.emitter.elasticsearch; + +import java.io.IOException; + +import org.pf4j.Extension; + +import org.apache.tika.exception.TikaConfigException; +import org.apache.tika.pipes.api.emitter.Emitter; +import org.apache.tika.pipes.api.emitter.EmitterFactory; +import org.apache.tika.plugins.ExtensionConfig; + +/** + * Factory for creating Elasticsearch emitters. + * + * <p>Example JSON configuration: + * <pre> + * "emitters": { + * "elasticsearch-emitter": { + * "my-es-emitter": { + * "elasticsearchUrl": "https://localhost:9200/my-index", + * "idField": "id", + * "apiKey": "base64-encoded-id:api_key", + * "attachmentStrategy": "PARENT_CHILD", + * "updateStrategy": "UPSERT", + * "embeddedFileFieldName": "embedded" + * } + * } + * } + * </pre> + */ +@Extension +public class ElasticsearchEmitterFactory implements EmitterFactory { + + public static final String NAME = "elasticsearch-emitter"; + + @Override + public String getName() { + return NAME; + } + + @Override + public Emitter buildExtension(ExtensionConfig extensionConfig) + throws IOException, TikaConfigException { + return ElasticsearchEmitter.build(extensionConfig); + } +} diff --git a/tika-pipes/tika-pipes-plugins/tika-pipes-elasticsearch/src/main/java/org/apache/tika/pipes/emitter/elasticsearch/HttpClientConfig.java b/tika-pipes/tika-pipes-plugins/tika-pipes-elasticsearch/src/main/java/org/apache/tika/pipes/emitter/elasticsearch/HttpClientConfig.java new file mode 100644 index 0000000000..6b2b18b00b --- /dev/null +++ b/tika-pipes/tika-pipes-plugins/tika-pipes-elasticsearch/src/main/java/org/apache/tika/pipes/emitter/elasticsearch/HttpClientConfig.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.tika.pipes.emitter.elasticsearch; + +import java.io.IOException; + +import com.fasterxml.jackson.databind.ObjectMapper; + + +public record HttpClientConfig(String userName, String password, + String authScheme, int connectionTimeout, + int socketTimeout, String proxyHost, + int proxyPort) { + + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + + public static HttpClientConfig load(final String json) throws IOException { + return OBJECT_MAPPER.readValue(json, HttpClientConfig.class); + } + +} diff --git a/tika-pipes/tika-pipes-plugins/tika-pipes-elasticsearch/src/main/java/org/apache/tika/pipes/emitter/elasticsearch/JsonResponse.java b/tika-pipes/tika-pipes-plugins/tika-pipes-elasticsearch/src/main/java/org/apache/tika/pipes/emitter/elasticsearch/JsonResponse.java new file mode 100644 index 0000000000..5e6f3373e3 --- /dev/null +++ b/tika-pipes/tika-pipes-plugins/tika-pipes-elasticsearch/src/main/java/org/apache/tika/pipes/emitter/elasticsearch/JsonResponse.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.tika.pipes.emitter.elasticsearch; + + +import com.fasterxml.jackson.databind.JsonNode; + +public class JsonResponse { + + private final int status; + private final String msg; + private final JsonNode root; + + public JsonResponse(int status, JsonNode root) { + this.status = status; + this.root = root; + this.msg = null; + } + + public JsonResponse(int status, String msg) { + this.status = status; + this.msg = msg; + this.root = null; + } + + public int getStatus() { + return status; + } + + public String getMsg() { + return msg; + } + + public JsonNode getJson() { + return root; + } + + @Override + public String toString() { + return "JsonResponse{" + + "status=" + status + + ", msg='" + msg + '\'' + + ", root=" + root + + '}'; + } +} diff --git a/tika-pipes/tika-pipes-plugins/tika-pipes-elasticsearch/src/main/java/org/apache/tika/pipes/plugin/elasticsearch/ElasticsearchPipesPlugin.java b/tika-pipes/tika-pipes-plugins/tika-pipes-elasticsearch/src/main/java/org/apache/tika/pipes/plugin/elasticsearch/ElasticsearchPipesPlugin.java new file mode 100644 index 0000000000..1b6a195171 --- /dev/null +++ b/tika-pipes/tika-pipes-plugins/tika-pipes-elasticsearch/src/main/java/org/apache/tika/pipes/plugin/elasticsearch/ElasticsearchPipesPlugin.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.tika.pipes.plugin.elasticsearch; + +import org.pf4j.Plugin; +import org.pf4j.PluginWrapper; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class ElasticsearchPipesPlugin extends Plugin { + private static final Logger LOG = + LoggerFactory.getLogger(ElasticsearchPipesPlugin.class); + + public ElasticsearchPipesPlugin(PluginWrapper wrapper) { + super(wrapper); + } + + @Override + public void start() { + LOG.info("Starting Elasticsearch pipes plugin"); + super.start(); + } + + @Override + public void stop() { + LOG.info("Stopping Elasticsearch pipes plugin"); + super.stop(); + } + + @Override + public void delete() { + LOG.info("Deleting Elasticsearch pipes plugin"); + super.delete(); + } + +} diff --git a/tika-pipes/tika-pipes-plugins/tika-pipes-elasticsearch/src/main/resources/plugin.properties b/tika-pipes/tika-pipes-plugins/tika-pipes-elasticsearch/src/main/resources/plugin.properties new file mode 100644 index 0000000000..9fc35c59c4 --- /dev/null +++ b/tika-pipes/tika-pipes-plugins/tika-pipes-elasticsearch/src/main/resources/plugin.properties @@ -0,0 +1,21 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +plugin.id=tika-pipes-elasticsearch-plugin +plugin.class=org.apache.tika.pipes.plugin.elasticsearch.ElasticsearchPipesPlugin +plugin.version=4.0.0-SNAPSHOT +plugin.provider=Apache Tika +plugin.description=Pipes for Elasticsearch (plain HTTP, ASL v2 only) diff --git a/tika-pipes/tika-pipes-plugins/tika-pipes-elasticsearch/src/test/java/org/apache/tika/pipes/emitter/elasticsearch/ElasticsearchClientTest.java b/tika-pipes/tika-pipes-plugins/tika-pipes-elasticsearch/src/test/java/org/apache/tika/pipes/emitter/elasticsearch/ElasticsearchClientTest.java new file mode 100644 index 0000000000..3193b700b2 --- /dev/null +++ b/tika-pipes/tika-pipes-plugins/tika-pipes-elasticsearch/src/test/java/org/apache/tika/pipes/emitter/elasticsearch/ElasticsearchClientTest.java @@ -0,0 +1,177 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.tika.pipes.emitter.elasticsearch; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.junit.jupiter.api.Test; + +import org.apache.tika.TikaTest; +import org.apache.tika.metadata.Metadata; + +public class ElasticsearchClientTest extends TikaTest { + + private static final ObjectMapper MAPPER = new ObjectMapper(); + + @Test + public void testSerialization() throws Exception { + Metadata metadata = new Metadata(); + metadata.add("authors", "author1"); + metadata.add("authors", "author2"); + metadata.add("title", "title1"); + for (ElasticsearchEmitterConfig.AttachmentStrategy strategy : + ElasticsearchEmitterConfig.AttachmentStrategy.values()) { + String json = + ElasticsearchClient.metadataToJsonContainerInsert( + metadata, strategy); + assertContains("author1", json); + assertContains("author2", json); + assertContains("authors", json); + assertContains("title1", json); + } + for (ElasticsearchEmitterConfig.AttachmentStrategy strategy : + ElasticsearchEmitterConfig.AttachmentStrategy.values()) { + String json = + ElasticsearchClient.metadataToJsonEmbeddedInsert( + metadata, strategy, "myEmitKey", + ElasticsearchEmitter + .DEFAULT_EMBEDDED_FILE_FIELD_NAME); + assertContains("author1", json); + assertContains("author2", json); + assertContains("authors", json); + assertContains("title1", json); + } + } + + @Test + public void testParentChildContainer() throws Exception { + Metadata metadata = new Metadata(); + metadata.add("title", "parent doc"); + String json = + ElasticsearchClient.metadataToJsonContainerInsert( + metadata, + ElasticsearchEmitterConfig.AttachmentStrategy + .PARENT_CHILD); + assertContains("relation_type", json); + assertContains("container", json); + } + + @Test + public void testParentChildEmbedded() throws Exception { + Metadata metadata = new Metadata(); + metadata.add("title", "child doc"); + String json = + ElasticsearchClient.metadataToJsonEmbeddedInsert( + metadata, + ElasticsearchEmitterConfig.AttachmentStrategy + .PARENT_CHILD, + "parentKey", "embedded"); + assertContains("relation_type", json); + assertContains("parentKey", json); + assertContains("embedded", json); + } + + @Test + public void testSeparateDocumentsEmbedded() throws Exception { + Metadata metadata = new Metadata(); + metadata.add("title", "child doc"); + String json = + ElasticsearchClient.metadataToJsonEmbeddedInsert( + metadata, + ElasticsearchEmitterConfig.AttachmentStrategy + .SEPARATE_DOCUMENTS, + "parentKey", "embedded"); + assertContains("parent", json); + assertContains("parentKey", json); + assertNotContained("relation_type", json); + } + + @Test + public void testChunksFieldWrittenAsRawJson() throws Exception { + Metadata metadata = new Metadata(); + metadata.set("title", "test doc"); + metadata.set("tika:chunks", + "[{\"text\":\"hello\",\"vector\":\"AAAA\"," + + "\"locators\":{\"text\":[{\"start_offset\":0," + + "\"end_offset\":5}]}}]"); + String json = + ElasticsearchClient.metadataToJsonContainerInsert( + metadata, + ElasticsearchEmitterConfig.AttachmentStrategy + .SEPARATE_DOCUMENTS); + + // Parse the output JSON — tika:chunks should be a real JSON + // array, not a double-escaped string + JsonNode doc = MAPPER.readTree(json); + JsonNode chunks = doc.get("tika:chunks"); + assertTrue(chunks.isArray(), + "tika:chunks should be a JSON array, not a string"); + assertEquals(1, chunks.size()); + assertEquals("hello", chunks.get(0).get("text").asText()); + assertEquals("AAAA", chunks.get(0).get("vector").asText()); + } + + @Test + public void testNonJsonFieldStaysString() throws Exception { + Metadata metadata = new Metadata(); + metadata.set("tika:chunks", "not json at all"); + String json = + ElasticsearchClient.metadataToJsonContainerInsert( + metadata, + ElasticsearchEmitterConfig.AttachmentStrategy + .SEPARATE_DOCUMENTS); + JsonNode doc = MAPPER.readTree(json); + // Should be a plain string since it doesn't look like JSON + assertTrue(doc.get("tika:chunks").isTextual()); + assertEquals("not json at all", + doc.get("tika:chunks").asText()); + } + + @Test + public void testRegularFieldNotRawJson() throws Exception { + Metadata metadata = new Metadata(); + // A regular field whose value happens to look like JSON + // should NOT be written as raw JSON + metadata.set("description", "[some bracketed text]"); + String json = + ElasticsearchClient.metadataToJsonContainerInsert( + metadata, + ElasticsearchEmitterConfig.AttachmentStrategy + .SEPARATE_DOCUMENTS); + JsonNode doc = MAPPER.readTree(json); + assertTrue(doc.get("description").isTextual()); + } + + @Test + public void testMultiValuedMetadata() throws Exception { + Metadata metadata = new Metadata(); + metadata.add("tags", "tag1"); + metadata.add("tags", "tag2"); + metadata.add("tags", "tag3"); + String json = + ElasticsearchClient.metadataToJsonContainerInsert( + metadata, + ElasticsearchEmitterConfig.AttachmentStrategy + .SEPARATE_DOCUMENTS); + assertContains("tag1", json); + assertContains("tag2", json); + assertContains("tag3", json); + } +} diff --git a/tika-pipes/tika-pipes-plugins/tika-pipes-opensearch/src/main/java/org/apache/tika/pipes/emitter/opensearch/OpenSearchClient.java b/tika-pipes/tika-pipes-plugins/tika-pipes-opensearch/src/main/java/org/apache/tika/pipes/emitter/opensearch/OpenSearchClient.java index 7cd8df5ca0..cc543da2c9 100644 --- a/tika-pipes/tika-pipes-plugins/tika-pipes-opensearch/src/main/java/org/apache/tika/pipes/emitter/opensearch/OpenSearchClient.java +++ b/tika-pipes/tika-pipes-plugins/tika-pipes-opensearch/src/main/java/org/apache/tika/pipes/emitter/opensearch/OpenSearchClient.java @@ -23,6 +23,7 @@ import java.io.Reader; import java.io.StringWriter; import java.nio.charset.StandardCharsets; import java.util.List; +import java.util.Set; import java.util.UUID; import com.fasterxml.jackson.core.JsonFactory; @@ -310,13 +311,28 @@ public class OpenSearchClient { } } + /** + * Metadata fields whose values are serialized JSON from the + * tika-inference pipeline. These must be written as raw JSON + * (arrays/objects) rather than escaped strings so that + * OpenSearch can index vectors, locators, etc. natively. + */ + static final Set<String> INFERENCE_JSON_FIELDS = Set.of( + "tika:chunks"); + private static void writeMetadata(Metadata metadata, JsonGenerator jsonGenerator) throws IOException { //writes the metadata without the start { or the end } //to allow for other fields to be added for (String n : metadata.names()) { String[] vals = metadata.getValues(n); if (vals.length == 1) { - jsonGenerator.writeStringField(n, vals[0]); + if (INFERENCE_JSON_FIELDS.contains(n) + && isValidJson(vals[0])) { + jsonGenerator.writeFieldName(n); + jsonGenerator.writeRawValue(vals[0]); + } else { + jsonGenerator.writeStringField(n, vals[0]); + } } else { jsonGenerator.writeArrayFieldStart(n); for (String v : vals) { @@ -326,4 +342,29 @@ public class OpenSearchClient { } } } + + private static final ObjectMapper VALIDATION_MAPPER = new ObjectMapper(); + + /** + * Validates that the value is well-formed JSON (array or object) + * before writing it as raw JSON. This prevents injection of + * arbitrary content into the bulk request payload. + */ + private static boolean isValidJson(String value) { + if (value == null || value.isEmpty()) { + return false; + } + char first = value.charAt(0); + if (first != '[' && first != '{') { + return false; + } + try { + VALIDATION_MAPPER.readTree(value); + return true; + } catch (IOException e) { + LOG.warn("Field value starts with '{}' but is not valid JSON; " + + "writing as escaped string", first); + return false; + } + } }
