This is an automated email from the ASF dual-hosted git repository.

tallison pushed a commit to branch elasticsearch-emitter
in repository https://gitbox.apache.org/repos/asf/tika.git

commit 8e548d6adb3ccfdb43cb9cfac5df0c1ef482bec8
Author: tballison <[email protected]>
AuthorDate: Tue Feb 17 14:32:41 2026 -0500

    Add Elasticsearch emitter pipes plugin and integration tests
    
    - Add tika-pipes-elasticsearch plugin with ElasticsearchClient,
      ElasticsearchEmitter, and PF4J plugin wiring.
    - Add Elasticsearch integration tests with Testcontainers.
    - Refactor shared logic in OpenSearchClient.
    
    Co-authored-by: Cursor <[email protected]>
---
 tika-integration-tests/pom.xml                     |   1 +
 .../pom.xml                                        | 115 +++++
 .../elasticsearch/tests/ElasticsearchTest.java     | 528 +++++++++++++++++++++
 .../tests/ElasticsearchTestClient.java             | 151 ++++++
 .../elasticsearch/elasticsearch-mappings.json      |  19 +
 .../elasticsearch-parent-child-mappings.json       |  28 ++
 .../resources/elasticsearch/plugins-template.json  |  78 +++
 .../resources/pipes-fork-server-custom-log4j2.xml  |  33 ++
 .../src/test/resources/test-documents/fake_oom.xml |  24 +
 .../src/test/resources/test-documents/npe.xml      |  25 +
 .../src/test/resources/test-documents/oom.xml      |  24 +
 .../test-documents/test_recursive_embedded.docx    | Bin 0 -> 27082 bytes
 tika-pipes/tika-pipes-plugins/pom.xml              |   1 +
 .../tika-pipes-elasticsearch/pom.xml               | 134 ++++++
 .../src/main/assembly/assembly.xml                 |  55 +++
 .../emitter/elasticsearch/ElasticsearchClient.java | 428 +++++++++++++++++
 .../elasticsearch/ElasticsearchEmitter.java        | 127 +++++
 .../elasticsearch/ElasticsearchEmitterConfig.java  |  65 +++
 .../elasticsearch/ElasticsearchEmitterFactory.java |  62 +++
 .../emitter/elasticsearch/HttpClientConfig.java    |  35 ++
 .../pipes/emitter/elasticsearch/JsonResponse.java  |  60 +++
 .../elasticsearch/ElasticsearchPipesPlugin.java    |  50 ++
 .../src/main/resources/plugin.properties           |  21 +
 .../elasticsearch/ElasticsearchClientTest.java     | 177 +++++++
 .../pipes/emitter/opensearch/OpenSearchClient.java |  43 +-
 25 files changed, 2283 insertions(+), 1 deletion(-)

diff --git a/tika-integration-tests/pom.xml b/tika-integration-tests/pom.xml
index a026e088c2..861c812771 100644
--- a/tika-integration-tests/pom.xml
+++ b/tika-integration-tests/pom.xml
@@ -32,6 +32,7 @@
   <packaging>pom</packaging>
 
   <modules>
+    <module>tika-pipes-elasticsearch-integration-tests</module>
     <module>tika-pipes-opensearch-integration-tests</module>
     <module>tika-pipes-solr-integration-tests</module>
     <module>tika-pipes-s3-integration-tests</module>
diff --git 
a/tika-integration-tests/tika-pipes-elasticsearch-integration-tests/pom.xml 
b/tika-integration-tests/tika-pipes-elasticsearch-integration-tests/pom.xml
new file mode 100644
index 0000000000..ce6f15a746
--- /dev/null
+++ b/tika-integration-tests/tika-pipes-elasticsearch-integration-tests/pom.xml
@@ -0,0 +1,115 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing,
+  software distributed under the License is distributed on an
+  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  KIND, either express or implied.  See the License for the
+  specific language governing permissions and limitations
+  under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+  <parent>
+    <artifactId>tika-integration-tests</artifactId>
+    <groupId>org.apache.tika</groupId>
+    <version>4.0.0-SNAPSHOT</version>
+  </parent>
+  <modelVersion>4.0.0</modelVersion>
+
+  <artifactId>tika-pipes-elasticsearch-integration-tests</artifactId>
+  <name>Apache Tika Elasticsearch integration tests</name>
+
+  <dependencies>
+    <dependency>
+      <groupId>${project.groupId}</groupId>
+      <artifactId>tika-app</artifactId>
+      <version>${project.version}</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>${project.groupId}</groupId>
+      <artifactId>tika-pipes-file-system</artifactId>
+      <version>${project.version}</version>
+      <scope>test</scope>
+      <type>zip</type>
+    </dependency>
+    <dependency>
+      <groupId>${project.groupId}</groupId>
+      <artifactId>tika-pipes-elasticsearch</artifactId>
+      <version>${project.version}</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>${project.groupId}</groupId>
+      <artifactId>tika-core</artifactId>
+      <version>${project.version}</version>
+      <type>test-jar</type>
+      <scope>test</scope>
+    </dependency>
+    <!-- testcontainers — GenericContainer only, no ES client needed -->
+    <dependency>
+      <groupId>org.testcontainers</groupId>
+      <artifactId>testcontainers-junit-jupiter</artifactId>
+      <scope>test</scope>
+    </dependency>
+  </dependencies>
+
+  <build>
+    <plugins>
+    <plugin>
+      <groupId>org.apache.rat</groupId>
+      <artifactId>apache-rat-plugin</artifactId>
+      <configuration>
+        <inputExcludes>
+          <inputExclude>src/test/resources/elasticsearch/*.json</inputExclude>
+        </inputExcludes>
+    </configuration>
+  </plugin>
+      <plugin>
+      <groupId>org.apache.maven.plugins</groupId>
+      <artifactId>maven-dependency-plugin</artifactId>
+      <executions>
+      <execution>
+        <id>copy-plugins</id>
+        <phase>process-test-resources</phase>
+        <goals>
+          <goal>copy</goal>
+        </goals>
+        <configuration>
+          <outputDirectory>${project.build.directory}/plugins</outputDirectory>
+          <artifactItems>
+            <artifactItem>
+              <groupId>org.apache.tika</groupId>
+              <artifactId>tika-pipes-file-system</artifactId>
+              <version>${project.version}</version>
+              <type>zip</type>
+              <overWrite>true</overWrite>
+            </artifactItem>
+            <artifactItem>
+              <groupId>org.apache.tika</groupId>
+              <artifactId>tika-pipes-elasticsearch</artifactId>
+              <version>${project.version}</version>
+              <type>zip</type>
+              <overWrite>true</overWrite>
+            </artifactItem>
+          </artifactItems>
+        </configuration>
+      </execution>
+      </executions>
+      </plugin>
+    </plugins>
+    </build>
+
+  <scm>
+    <tag>3.0.0-rc1</tag>
+  </scm>
+</project>
diff --git 
a/tika-integration-tests/tika-pipes-elasticsearch-integration-tests/src/test/java/org/apache/tika/pipes/elasticsearch/tests/ElasticsearchTest.java
 
b/tika-integration-tests/tika-pipes-elasticsearch-integration-tests/src/test/java/org/apache/tika/pipes/elasticsearch/tests/ElasticsearchTest.java
new file mode 100644
index 0000000000..5bcbacef1f
--- /dev/null
+++ 
b/tika-integration-tests/tika-pipes-elasticsearch-integration-tests/src/test/java/org/apache/tika/pipes/elasticsearch/tests/ElasticsearchTest.java
@@ -0,0 +1,528 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tika.pipes.elasticsearch.tests;
+
+import static 
org.apache.tika.pipes.emitter.elasticsearch.ElasticsearchEmitter.DEFAULT_EMBEDDED_FILE_FIELD_NAME;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import org.apache.commons.io.IOUtils;
+import org.jetbrains.annotations.NotNull;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.wait.strategy.HttpWaitStrategy;
+import org.testcontainers.junit.jupiter.Testcontainers;
+import org.testcontainers.utility.DockerImageName;
+
+import org.apache.tika.cli.TikaCLI;
+import org.apache.tika.client.HttpClientFactory;
+import org.apache.tika.config.JsonConfigHelper;
+import org.apache.tika.config.loader.TikaJsonConfig;
+import org.apache.tika.exception.TikaConfigException;
+import org.apache.tika.metadata.Metadata;
+import org.apache.tika.parser.ParseContext;
+import org.apache.tika.pipes.api.ParseMode;
+import org.apache.tika.pipes.api.emitter.Emitter;
+import org.apache.tika.pipes.core.emitter.EmitterManager;
+import org.apache.tika.pipes.emitter.elasticsearch.ElasticsearchEmitterConfig;
+import org.apache.tika.pipes.emitter.elasticsearch.HttpClientConfig;
+import org.apache.tika.pipes.emitter.elasticsearch.JsonResponse;
+import org.apache.tika.plugins.TikaPluginManager;
+
+/**
+ * Integration tests for the Elasticsearch emitter using a Dockerized
+ * Elasticsearch instance via testcontainers.
+ *
+ * <p>Uses a {@link GenericContainer} with the official Elasticsearch Docker
+ * image. Security is disabled for test simplicity — no ES Java client
+ * dependency needed.
+ */
+@Testcontainers(disabledWithoutDocker = true)
+public class ElasticsearchTest {
+
+    private static final DockerImageName ES_IMAGE =
+            DockerImageName.parse(
+                    "docker.elastic.co/elasticsearch/elasticsearch:8.17.0");
+
+    private static GenericContainer<?> CONTAINER;
+
+    protected static final String TEST_INDEX = "tika-pipes-index";
+    private int numTestDocs = 0;
+
+    @BeforeAll
+    public static void setUp() {
+        CONTAINER = new GenericContainer<>(ES_IMAGE)
+                .withExposedPorts(9200)
+                .withEnv("discovery.type", "single-node")
+                .withEnv("xpack.security.enabled", "false")
+                .withEnv("cluster.routing.allocation.disk.threshold_enabled",
+                        "false")
+                .waitingFor(new HttpWaitStrategy()
+                        .forPort(9200)
+                        .forStatusCode(200));
+
+        CONTAINER.start();
+    }
+
+    @AfterAll
+    public static void tearDown() {
+        CONTAINER.close();
+    }
+
+    @AfterEach
+    public void clearIndex() throws TikaConfigException, IOException {
+        ElasticsearchTestClient client = getNewClient();
+        String endpoint = getEndpoint();
+        client.deleteIndex(endpoint);
+    }
+
+    @Test
+    public void testBasicFSToElasticsearch(
+            @TempDir Path pipesDirectory,
+            @TempDir Path testDocDirectory) throws Exception {
+
+        ElasticsearchTestClient client = getNewClient();
+        int numHtmlDocs = 42;
+        createTestHtmlFiles("Happiness", numHtmlDocs, testDocDirectory);
+
+        String endpoint = getEndpoint();
+        sendMappings(client, endpoint, TEST_INDEX,
+                "elasticsearch-mappings.json");
+
+        runPipes(client,
+                ElasticsearchEmitterConfig.AttachmentStrategy
+                        .SEPARATE_DOCUMENTS,
+                ElasticsearchEmitterConfig.UpdateStrategy.UPSERT,
+                ParseMode.CONCATENATE, endpoint,
+                pipesDirectory, testDocDirectory);
+
+        String query = "{ \"track_total_hits\": true, \"query\": " +
+                "{ \"match\": { \"content\": " +
+                "{ \"query\": \"happiness\" } } } }";
+
+        JsonResponse results =
+                client.postJson(endpoint + "/_search", query);
+        assertEquals(200, results.getStatus());
+        assertEquals(numHtmlDocs + 1,
+                results.getJson().get("hits").get("total").get("value")
+                        .asInt());
+
+        // match all
+        query = "{ \"track_total_hits\": true, \"query\": " +
+                "{ \"match_all\": {} }, " +
+                "\"from\": 0, \"size\": 1000 }";
+        results = client.postJson(endpoint + "/_search", query);
+        assertEquals(200, results.getStatus());
+        assertEquals(numHtmlDocs + numTestDocs,
+                results.getJson().get("hits").get("total").get("value")
+                        .asInt());
+    }
+
+    @Test
+    public void testParentChildFSToElasticsearch(
+            @TempDir Path pipesDirectory,
+            @TempDir Path testDocDirectory) throws Exception {
+
+        int numHtmlDocs = 42;
+        ElasticsearchTestClient client = getNewClient();
+
+        createTestHtmlFiles("Happiness", numHtmlDocs, testDocDirectory);
+        String endpoint = getEndpoint();
+        sendMappings(client, endpoint, TEST_INDEX,
+                "elasticsearch-parent-child-mappings.json");
+
+        runPipes(client,
+                ElasticsearchEmitterConfig.AttachmentStrategy.PARENT_CHILD,
+                ElasticsearchEmitterConfig.UpdateStrategy.OVERWRITE,
+                ParseMode.RMETA, endpoint,
+                pipesDirectory, testDocDirectory);
+
+        // match all
+        String query = "{ " +
+                "\"from\":0, \"size\":1000," +
+                "\"track_total_hits\": true, \"query\": { " +
+                "\"match_all\": {} } }";
+        JsonResponse results =
+                client.postJson(endpoint + "/_search", query);
+        assertEquals(200, results.getStatus());
+        assertEquals(numHtmlDocs + 3 + 12,
+                // 3 mock files and the .docx has 11 embedded + itself
+                results.getJson().get("hits").get("total").get("value")
+                        .asInt());
+
+        // check an embedded file
+        query = "{ \"track_total_hits\": true, \"query\": " +
+                "{ \"query_string\": { " +
+                "\"default_field\": \"content\", " +
+                "\"query\": \"embed4 zip\", " +
+                "\"minimum_should_match\":2 } } } ";
+        results = client.postJson(endpoint + "/_search", query);
+        assertEquals(200, results.getStatus());
+        assertEquals(1,
+                results.getJson().get("hits").get("total").get("value")
+                        .asInt());
+        JsonNode source = results.getJson().get("hits").get("hits")
+                .get(0).get("_source");
+
+        Matcher m = Pattern
+                .compile("\\Atest_recursive_embedded" +
+                        ".docx-[0-9a-f]{8}-[0-9a-f]{4}-" +
+                        "[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}\\Z")
+                .matcher(
+                        results.getJson().get("hits").get("hits").get(0)
+                                .get("_id").asText());
+        assertTrue(m.find(), "test_recursive_embedded.docx_$guid");
+        assertEquals("test_recursive_embedded.docx",
+                results.getJson().get("hits").get("hits").get(0)
+                        .get("_routing").asText());
+        assertEquals("test_recursive_embedded.docx",
+                source.get("relation_type").get("parent").asText());
+        assertEquals("embedded",
+                source.get("relation_type").get("name").asText());
+
+        assertEquals("application/zip",
+                source.get("mime").asText());
+
+        // verify parent query returns all children
+        query = "{ \"track_total_hits\": true, \"query\": " +
+                "{ \"parent_id\": { " +
+                "\"type\": \"embedded\", " +
+                "\"id\": \"test_recursive_embedded.docx\" } } } ";
+        results = client.postJson(endpoint + "/_search", query);
+        assertEquals(11,
+                results.getJson().get("hits").get("total").get("value")
+                        .asInt());
+    }
+
+    @Test
+    public void testSeparateDocsFSToElasticsearch(
+            @TempDir Path pipesDirectory,
+            @TempDir Path testDocDirectory) throws Exception {
+
+        ElasticsearchTestClient client = getNewClient();
+
+        int numHtmlDocs = 42;
+        createTestHtmlFiles("Happiness", numHtmlDocs, testDocDirectory);
+        String endpoint = getEndpoint();
+        sendMappings(client, endpoint, TEST_INDEX,
+                "elasticsearch-mappings.json");
+
+        runPipes(client,
+                ElasticsearchEmitterConfig.AttachmentStrategy
+                        .SEPARATE_DOCUMENTS,
+                ElasticsearchEmitterConfig.UpdateStrategy.OVERWRITE,
+                ParseMode.RMETA, endpoint,
+                pipesDirectory, testDocDirectory);
+
+        String query = "{ \"track_total_hits\": true, \"query\": " +
+                "{ \"match\": { \"content\": " +
+                "{ \"query\": \"happiness\" } } } }";
+
+        JsonResponse results =
+                client.postJson(endpoint + "/_search", query);
+        assertEquals(200, results.getStatus());
+        assertEquals(numHtmlDocs + 1,
+                results.getJson().get("hits").get("total").get("value")
+                        .asInt());
+
+        // match all
+        query = "{ \"track_total_hits\": true, \"query\": { " +
+                "\"match_all\": {} } }";
+        results = client.postJson(endpoint + "/_search", query);
+        assertEquals(200, results.getStatus());
+        assertEquals(numHtmlDocs + 3 + 12,
+                results.getJson().get("hits").get("total").get("value")
+                        .asInt());
+
+        // check an embedded file
+        query = "{ \"track_total_hits\": true, \"query\": " +
+                "{ \"query_string\": { " +
+                "\"default_field\": \"content\", " +
+                "\"query\": \"embed4 zip\", " +
+                "\"minimum_should_match\":2 } } } ";
+        results = client.postJson(endpoint + "/_search", query);
+        assertEquals(200, results.getStatus());
+        assertEquals(1,
+                results.getJson().get("hits").get("total").get("value")
+                        .asInt());
+        JsonNode source = results.getJson().get("hits").get("hits")
+                .get(0).get("_source");
+
+        Matcher m = Pattern
+                .compile("\\Atest_recursive_embedded" +
+                        ".docx-[0-9a-f]{8}-[0-9a-f]{4}-" +
+                        "[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}\\Z")
+                .matcher(
+                        results.getJson().get("hits").get("hits").get(0)
+                                .get("_id").asText());
+        assertTrue(m.find(), "test_recursive_embedded.docx-$guid");
+
+        assertNull(
+                results.getJson().get("hits").get("hits").get(0)
+                        .get("_routing"),
+                "test_recursive_embedded.docx");
+        assertNull(source.get("relation_type"),
+                "test_recursive_embedded.docx");
+
+        assertEquals("application/zip",
+                source.get("mime").asText());
+
+        // parent_id query should fail — no join in schema
+        query = "{ \"track_total_hits\": true, \"query\": " +
+                "{ \"parent_id\": { " +
+                "\"type\": \"embedded\", " +
+                "\"id\": \"test_recursive_embedded.docx\" } } } ";
+        results = client.postJson(endpoint + "/_search", query);
+        assertEquals(400, results.getStatus());
+    }
+
+    @Test
+    public void testUpsertSeparateDocsFSToElasticsearch(
+            @TempDir Path pipesDirectory,
+            @TempDir Path testDocDirectory) throws Exception {
+
+        ElasticsearchTestClient client = getNewClient();
+
+        int numHtmlDocs = 42;
+        createTestHtmlFiles("Happiness", numHtmlDocs, testDocDirectory);
+        String endpoint = getEndpoint();
+        sendMappings(client, endpoint, TEST_INDEX,
+                "elasticsearch-mappings.json");
+
+        runPipes(client,
+                ElasticsearchEmitterConfig.AttachmentStrategy
+                        .SEPARATE_DOCUMENTS,
+                ElasticsearchEmitterConfig.UpdateStrategy.UPSERT,
+                ParseMode.RMETA, endpoint,
+                pipesDirectory, testDocDirectory);
+
+        String query = "{ \"track_total_hits\": true, \"query\": " +
+                "{ \"match\": { \"content\": " +
+                "{ \"query\": \"happiness\" } } } }";
+
+        JsonResponse results =
+                client.postJson(endpoint + "/_search", query);
+        assertEquals(200, results.getStatus());
+        assertEquals(numHtmlDocs + 1,
+                results.getJson().get("hits").get("total").get("value")
+                        .asInt());
+    }
+
+    @Test
+    public void testUpsert(
+            @TempDir Path pipesDirectory,
+            @TempDir Path testDocDirectory) throws Exception {
+
+        ElasticsearchTestClient client = getNewClient();
+
+        String endpoint = getEndpoint();
+        sendMappings(client, endpoint, TEST_INDEX,
+                "elasticsearch-mappings.json");
+        Path pluginsConfigFile = getPluginsConfig(pipesDirectory,
+                ElasticsearchEmitterConfig.AttachmentStrategy
+                        .SEPARATE_DOCUMENTS,
+                ElasticsearchEmitterConfig.UpdateStrategy.UPSERT,
+                ParseMode.RMETA, endpoint, testDocDirectory);
+
+        TikaJsonConfig tikaJsonConfig =
+                TikaJsonConfig.load(pluginsConfigFile);
+        Emitter emitter = EmitterManager
+                .load(TikaPluginManager.load(tikaJsonConfig),
+                        tikaJsonConfig)
+                .getEmitter();
+        Metadata metadata = new Metadata();
+        metadata.set("mime", "mimeA");
+        metadata.set("title", "titleA");
+        emitter.emit("1",
+                Collections.singletonList(metadata), new ParseContext());
+        client.getJson(endpoint + "/_refresh");
+        metadata.set("title", "titleB");
+        emitter.emit("1",
+                Collections.singletonList(metadata), new ParseContext());
+        client.getJson(endpoint + "/_refresh");
+
+        Metadata metadata2 = new Metadata();
+        metadata2.set("content", "the quick brown fox");
+        emitter.emit("1",
+                Collections.singletonList(metadata2), new ParseContext());
+        client.getJson(endpoint + "/_refresh");
+
+        String query = "{ \"track_total_hits\": true, \"query\": { " +
+                "\"match_all\": {} } }";
+        JsonResponse response =
+                client.postJson(endpoint + "/_search", query);
+        JsonNode doc1 = response.getJson().get("hits").get("hits")
+                .get(0).get("_source");
+        assertEquals("mimeA", doc1.get("mime").asText());
+        assertEquals("titleB", doc1.get("title").asText());
+        assertEquals("the quick brown fox",
+                doc1.get("content").asText());
+    }
+
+    // -----------------------------------------------------------------
+    // Helpers
+    // -----------------------------------------------------------------
+
+    private String getEndpoint() {
+        return "http://"; + CONTAINER.getHost() + ":" +
+                CONTAINER.getMappedPort(9200) + "/" + TEST_INDEX;
+    }
+
+    private ElasticsearchTestClient getNewClient()
+            throws TikaConfigException {
+        HttpClientFactory httpClientFactory = new HttpClientFactory();
+        ElasticsearchEmitterConfig config =
+                new ElasticsearchEmitterConfig(
+                        getEndpoint(), "_id",
+                        ElasticsearchEmitterConfig.AttachmentStrategy
+                                .SEPARATE_DOCUMENTS,
+                        ElasticsearchEmitterConfig.UpdateStrategy
+                                .OVERWRITE,
+                        10, DEFAULT_EMBEDDED_FILE_FIELD_NAME, null,
+                        new HttpClientConfig(null, null, null,
+                                -1, -1, null, -1));
+        return new ElasticsearchTestClient(config,
+                httpClientFactory.build());
+    }
+
+    protected void sendMappings(ElasticsearchTestClient client,
+                                String endpoint, String index,
+                                String mappingsFile) throws Exception {
+        String mappings = IOUtils.toString(
+                ElasticsearchTest.class.getResourceAsStream(
+                        "/elasticsearch/" + mappingsFile),
+                StandardCharsets.UTF_8);
+        int status = -1;
+        int tries = 0;
+        JsonResponse response = null;
+        while (status != 200 && tries++ < 20) {
+            response = client.putJson(endpoint, mappings);
+            if (status != 200) {
+                Thread.sleep(1000);
+            }
+            status = response.getStatus();
+        }
+        if (status != 200) {
+            throw new IllegalArgumentException(
+                    "couldn't create index/add mappings: " + response);
+        }
+        assertTrue(response.getJson().get("acknowledged").asBoolean());
+        assertEquals(index,
+                response.getJson().get("index").asText());
+    }
+
+    private void runPipes(
+            ElasticsearchTestClient client,
+            ElasticsearchEmitterConfig.AttachmentStrategy attachStrat,
+            ElasticsearchEmitterConfig.UpdateStrategy updateStrat,
+            ParseMode parseMode, String endpoint,
+            Path pipesDirectory, Path testDocDirectory) throws Exception {
+
+        Path pluginsConfig = getPluginsConfig(pipesDirectory,
+                attachStrat, updateStrat, parseMode,
+                endpoint, testDocDirectory);
+
+        TikaCLI.main(new String[]{
+                "-a", "-c",
+                pluginsConfig.toAbsolutePath().toString()});
+
+        // refresh to make content searchable
+        client.getJson(endpoint + "/_refresh");
+    }
+
+    @NotNull
+    private Path getPluginsConfig(
+            Path pipesDirectory,
+            ElasticsearchEmitterConfig.AttachmentStrategy attachStrat,
+            ElasticsearchEmitterConfig.UpdateStrategy updateStrat,
+            ParseMode parseMode, String endpoint,
+            Path testDocDirectory) throws IOException {
+
+        Path tikaConfig = pipesDirectory.resolve("plugins-config.json");
+
+        Path log4jPropFile = pipesDirectory.resolve("log4j2.xml");
+        try (InputStream is = ElasticsearchTest.class
+                .getResourceAsStream(
+                        "/pipes-fork-server-custom-log4j2.xml")) {
+            Files.copy(is, log4jPropFile);
+        }
+
+        boolean includeRouting = (attachStrat ==
+                ElasticsearchEmitterConfig.AttachmentStrategy
+                        .PARENT_CHILD);
+
+        Map<String, Object> replacements = new HashMap<>();
+        replacements.put("ATTACHMENT_STRATEGY",
+                attachStrat.toString());
+        replacements.put("UPDATE_STRATEGY",
+                updateStrat.toString());
+        replacements.put("FETCHER_BASE_PATH", testDocDirectory);
+        replacements.put("PARSE_MODE", parseMode.name());
+        replacements.put("INCLUDE_ROUTING", includeRouting);
+        replacements.put("ELASTICSEARCH_URL", endpoint);
+        replacements.put("LOG4J_JVM_ARG",
+                "-Dlog4j.configurationFile=" +
+                        log4jPropFile.toAbsolutePath());
+
+        JsonConfigHelper.writeConfigFromResource(
+                "/elasticsearch/plugins-template.json",
+                ElasticsearchTest.class, replacements, tikaConfig);
+
+        return tikaConfig;
+    }
+
+    private void createTestHtmlFiles(String bodyContent,
+                                     int numHtmlDocs,
+                                     Path testDocDirectory)
+            throws Exception {
+        Files.createDirectories(testDocDirectory);
+        for (int i = 0; i < numHtmlDocs; ++i) {
+            String html = "<html><body>" + bodyContent +
+                    "</body></html>";
+            Path p = testDocDirectory.resolve("test-" + i + ".html");
+            Files.write(p, html.getBytes(StandardCharsets.UTF_8));
+        }
+        File testDocuments = Paths
+                .get(ElasticsearchTest.class
+                        .getResource("/test-documents").toURI())
+                .toFile();
+        for (File f : testDocuments.listFiles()) {
+            Path targ = testDocDirectory.resolve(f.getName());
+            Files.copy(f.toPath(), targ);
+            numTestDocs++;
+        }
+    }
+}
diff --git 
a/tika-integration-tests/tika-pipes-elasticsearch-integration-tests/src/test/java/org/apache/tika/pipes/elasticsearch/tests/ElasticsearchTestClient.java
 
b/tika-integration-tests/tika-pipes-elasticsearch-integration-tests/src/test/java/org/apache/tika/pipes/elasticsearch/tests/ElasticsearchTestClient.java
new file mode 100644
index 0000000000..3808609924
--- /dev/null
+++ 
b/tika-integration-tests/tika-pipes-elasticsearch-integration-tests/src/test/java/org/apache/tika/pipes/elasticsearch/tests/ElasticsearchTestClient.java
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tika.pipes.elasticsearch.tests;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.Reader;
+import java.nio.charset.StandardCharsets;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.http.HttpResponse;
+import org.apache.http.client.HttpClient;
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.client.methods.HttpDelete;
+import org.apache.http.client.methods.HttpGet;
+import org.apache.http.client.methods.HttpPut;
+import org.apache.http.entity.ByteArrayEntity;
+import org.apache.http.util.EntityUtils;
+
+import org.apache.tika.pipes.emitter.elasticsearch.ElasticsearchClient;
+import org.apache.tika.pipes.emitter.elasticsearch.ElasticsearchEmitterConfig;
+import org.apache.tika.pipes.emitter.elasticsearch.JsonResponse;
+
+/**
+ * Extends ElasticsearchClient with GET, PUT, and DELETE for integration
+ * testing purposes.
+ */
+public class ElasticsearchTestClient extends ElasticsearchClient {
+
+    public ElasticsearchTestClient(ElasticsearchEmitterConfig config,
+                                   HttpClient httpClient) {
+        super(config, httpClient);
+    }
+
+    public JsonResponse putJson(String url, String json) throws IOException {
+        HttpPut httpRequest = new HttpPut(url);
+        ByteArrayEntity entity =
+                new ByteArrayEntity(json.getBytes(StandardCharsets.UTF_8));
+        httpRequest.setEntity(entity);
+        httpRequest.setHeader("Accept", "application/json");
+        httpRequest.setHeader("Content-type",
+                "application/json; charset=utf-8");
+
+        HttpResponse response = null;
+        try {
+            response = httpClient.execute(httpRequest);
+            int status = response.getStatusLine().getStatusCode();
+            if (status == 200) {
+                try (Reader reader = new BufferedReader(
+                        new InputStreamReader(
+                                response.getEntity().getContent(),
+                                StandardCharsets.UTF_8))) {
+                    ObjectMapper mapper = new ObjectMapper();
+                    JsonNode node = mapper.readTree(reader);
+                    return new JsonResponse(200, node);
+                }
+            } else {
+                return new JsonResponse(status,
+                        new String(
+                                EntityUtils.toByteArray(
+                                        response.getEntity()),
+                                StandardCharsets.UTF_8));
+            }
+        } finally {
+            if (response instanceof CloseableHttpResponse) {
+                ((CloseableHttpResponse) response).close();
+            }
+            httpRequest.releaseConnection();
+        }
+    }
+
+    public JsonResponse getJson(String url) throws IOException {
+        HttpGet httpRequest = new HttpGet(url);
+        httpRequest.setHeader("Accept", "application/json");
+        httpRequest.setHeader("Content-type",
+                "application/json; charset=utf-8");
+
+        HttpResponse response = null;
+        try {
+            response = httpClient.execute(httpRequest);
+            int status = response.getStatusLine().getStatusCode();
+            if (status == 200) {
+                try (Reader reader = new BufferedReader(
+                        new InputStreamReader(
+                                response.getEntity().getContent(),
+                                StandardCharsets.UTF_8))) {
+                    ObjectMapper mapper = new ObjectMapper();
+                    JsonNode node = mapper.readTree(reader);
+                    return new JsonResponse(200, node);
+                }
+            } else {
+                return new JsonResponse(status,
+                        new String(
+                                EntityUtils.toByteArray(
+                                        response.getEntity()),
+                                StandardCharsets.UTF_8));
+            }
+        } finally {
+            if (response instanceof CloseableHttpResponse) {
+                ((CloseableHttpResponse) response).close();
+            }
+            httpRequest.releaseConnection();
+        }
+    }
+
+    public JsonResponse deleteIndex(String url) throws IOException {
+        HttpDelete httpRequest = new HttpDelete(url);
+        HttpResponse response = null;
+        try {
+            response = httpClient.execute(httpRequest);
+            int status = response.getStatusLine().getStatusCode();
+            if (status == 200) {
+                try (Reader reader = new BufferedReader(
+                        new InputStreamReader(
+                                response.getEntity().getContent(),
+                                StandardCharsets.UTF_8))) {
+                    ObjectMapper mapper = new ObjectMapper();
+                    JsonNode node = mapper.readTree(reader);
+                    return new JsonResponse(200, node);
+                }
+            } else {
+                return new JsonResponse(status,
+                        new String(
+                                EntityUtils.toByteArray(
+                                        response.getEntity()),
+                                StandardCharsets.UTF_8));
+            }
+        } finally {
+            if (response instanceof CloseableHttpResponse) {
+                ((CloseableHttpResponse) response).close();
+            }
+            httpRequest.releaseConnection();
+        }
+    }
+}
diff --git 
a/tika-integration-tests/tika-pipes-elasticsearch-integration-tests/src/test/resources/elasticsearch/elasticsearch-mappings.json
 
b/tika-integration-tests/tika-pipes-elasticsearch-integration-tests/src/test/resources/elasticsearch/elasticsearch-mappings.json
new file mode 100644
index 0000000000..900457d7ce
--- /dev/null
+++ 
b/tika-integration-tests/tika-pipes-elasticsearch-integration-tests/src/test/resources/elasticsearch/elasticsearch-mappings.json
@@ -0,0 +1,19 @@
+{
+  "settings": {
+    "number_of_shards": 1
+  },
+  "mappings" : {
+    "dynamic": "strict",
+    "properties" : {
+      "content" : { "type" : "text"},
+      "length" : { "type" : "long"},
+      "creators" : { "type" : "text"},
+      "title" : { "type" : "text"},
+      "mime" : { "type" : "keyword"},
+      "tika_exception" : { "type" : "text"},
+      "parent" : { "type" : "text"},
+      "my_test_parse_time_ms" : {"type": "long"},
+      "my_test_parse_status": {"type": "keyword"}
+    }
+  }
+}
\ No newline at end of file
diff --git 
a/tika-integration-tests/tika-pipes-elasticsearch-integration-tests/src/test/resources/elasticsearch/elasticsearch-parent-child-mappings.json
 
b/tika-integration-tests/tika-pipes-elasticsearch-integration-tests/src/test/resources/elasticsearch/elasticsearch-parent-child-mappings.json
new file mode 100644
index 0000000000..4b845fde8c
--- /dev/null
+++ 
b/tika-integration-tests/tika-pipes-elasticsearch-integration-tests/src/test/resources/elasticsearch/elasticsearch-parent-child-mappings.json
@@ -0,0 +1,28 @@
+{
+  "settings": {
+    "number_of_shards": 1
+  },
+  "mappings" : {
+    "dynamic": "strict",
+    "_routing": {
+      "required":true
+    },
+    "properties" : {
+      "content" : { "type" : "text"},
+      "length" : { "type" : "long"},
+      "creators" : { "type" : "text"},
+      "title" : { "type" : "text"},
+      "mime" : { "type" : "keyword"},
+      "tika_exception" : { "type" : "text"},
+      "relation_type":{
+        "type":"join",
+        "eager_global_ordinals":true,
+        "relations":{
+          "container":"embedded"
+        }
+      },
+      "my_test_parse_time_ms" : {"type": "long"},
+      "my_test_parse_status": {"type": "keyword"}
+    }
+  }
+}
\ No newline at end of file
diff --git 
a/tika-integration-tests/tika-pipes-elasticsearch-integration-tests/src/test/resources/elasticsearch/plugins-template.json
 
b/tika-integration-tests/tika-pipes-elasticsearch-integration-tests/src/test/resources/elasticsearch/plugins-template.json
new file mode 100644
index 0000000000..2975b9d85f
--- /dev/null
+++ 
b/tika-integration-tests/tika-pipes-elasticsearch-integration-tests/src/test/resources/elasticsearch/plugins-template.json
@@ -0,0 +1,78 @@
+{
+  "content-handler-factory": {
+    "basic-content-handler-factory": {
+      "type": "TEXT",
+      "writeLimit": -1,
+      "throwOnWriteLimitReached": true
+    }
+  },
+  "fetchers": {
+    "fsf": {
+      "file-system-fetcher": {
+        "basePath": "FETCHER_BASE_PATH"
+      }
+    }
+  },
+  "emitters": {
+    "ese": {
+      "elasticsearch-emitter": {
+        "elasticsearchUrl": "ELASTICSEARCH_URL",
+        "updateStrategy": "UPDATE_STRATEGY",
+        "attachmentStrategy": "ATTACHMENT_STRATEGY",
+        "commitWithin": 10,
+        "idField": "_id",
+        "embeddedFileFieldName": "embedded",
+        "httpClientConfig": {
+          "authScheme": "http",
+          "connectionTimeout": 60,
+          "socketTimeout": 60
+        }
+      }
+    }
+  },
+  "pipes-iterator": {
+    "file-system-pipes-iterator": {
+      "basePath": "FETCHER_BASE_PATH",
+      "countTotal": true,
+      "fetcherId": "fsf",
+      "emitterId": "ese"
+    }
+  },
+  "pipes": {
+    "parseMode": "PARSE_MODE",
+    "onParseException": "EMIT",
+    "emitStrategy": {
+      "type": "DYNAMIC",
+      "thresholdBytes": 10000
+    },
+    "emitMaxEstimatedBytes": 100000,
+    "emitWithinMillis": 60000,
+    "numEmitters": 1,
+    "numClients": 3,
+    "forkedJvmArgs": [
+      "-Xmx512m",
+      "-XX:ParallelGCThreads=2",
+      "LOG4J_JVM_ARG"
+    ],
+    "timeoutMillis": 60000
+  },
+  "metadata-filters": [
+    {
+      "date-normalizing-metadata-filter": {}
+    },
+    {
+      "field-name-mapping-filter": {
+        "excludeUnmapped": true,
+        "mappings": {
+          "X-TIKA:content": "content",
+          "Content-Length": "length",
+          "dc:creator": "creators",
+          "dc:title": "title",
+          "Content-Type": "mime",
+          "X-TIKA:EXCEPTION:container_exception": "tika_exception"
+        }
+      }
+    }
+  ],
+  "plugin-roots": "target/plugins"
+}
diff --git 
a/tika-integration-tests/tika-pipes-elasticsearch-integration-tests/src/test/resources/pipes-fork-server-custom-log4j2.xml
 
b/tika-integration-tests/tika-pipes-elasticsearch-integration-tests/src/test/resources/pipes-fork-server-custom-log4j2.xml
new file mode 100644
index 0000000000..63131796bb
--- /dev/null
+++ 
b/tika-integration-tests/tika-pipes-elasticsearch-integration-tests/src/test/resources/pipes-fork-server-custom-log4j2.xml
@@ -0,0 +1,33 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing,
+  software distributed under the License is distributed on an
+  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  KIND, either express or implied.  See the License for the
+  specific language governing permissions and limitations
+  under the License.
+-->
+<Configuration>
+  <Appenders>
+    <Console name="console" target="SYSTEM_ERR">
+      <PatternLayout
+          pattern="%-5p [%t] %d{HH:mm:ss,SSS} %c %m%n" />
+    </Console>
+  </Appenders>
+  <Loggers>
+    <!-- turn off logging for the forked processes -->
+    <Root level="fatal" additivity="false">
+      <AppenderRef ref="console" />
+    </Root>
+  </Loggers>
+</Configuration>
diff --git 
a/tika-integration-tests/tika-pipes-elasticsearch-integration-tests/src/test/resources/test-documents/fake_oom.xml
 
b/tika-integration-tests/tika-pipes-elasticsearch-integration-tests/src/test/resources/test-documents/fake_oom.xml
new file mode 100644
index 0000000000..42aa9a7785
--- /dev/null
+++ 
b/tika-integration-tests/tika-pipes-elasticsearch-integration-tests/src/test/resources/test-documents/fake_oom.xml
@@ -0,0 +1,24 @@
+<?xml version="1.0" encoding="UTF-8" ?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing,
+  software distributed under the License is distributed on an
+  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  KIND, either express or implied.  See the License for the
+  specific language governing permissions and limitations
+  under the License.
+-->
+
+<mock>
+    <metadata action="add" name="author">Nikolai Lobachevsky</metadata>
+    <throw class="java.lang.OutOfMemoryError">oom message</throw>
+</mock>
\ No newline at end of file
diff --git 
a/tika-integration-tests/tika-pipes-elasticsearch-integration-tests/src/test/resources/test-documents/npe.xml
 
b/tika-integration-tests/tika-pipes-elasticsearch-integration-tests/src/test/resources/test-documents/npe.xml
new file mode 100644
index 0000000000..93599dbc48
--- /dev/null
+++ 
b/tika-integration-tests/tika-pipes-elasticsearch-integration-tests/src/test/resources/test-documents/npe.xml
@@ -0,0 +1,25 @@
+<?xml version="1.0" encoding="UTF-8" ?>
+
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing,
+  software distributed under the License is distributed on an
+  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  KIND, either express or implied.  See the License for the
+  specific language governing permissions and limitations
+  under the License.
+-->
+<mock>
+  <metadata action="add" name="dc:creator">embeddedAuthor</metadata>
+  <write element="p">some_embedded_content</write>;
+  <throw class="java.lang.NullPointerException">another null pointer 
exception</throw>;
+</mock>
\ No newline at end of file
diff --git 
a/tika-integration-tests/tika-pipes-elasticsearch-integration-tests/src/test/resources/test-documents/oom.xml
 
b/tika-integration-tests/tika-pipes-elasticsearch-integration-tests/src/test/resources/test-documents/oom.xml
new file mode 100644
index 0000000000..3ee835e681
--- /dev/null
+++ 
b/tika-integration-tests/tika-pipes-elasticsearch-integration-tests/src/test/resources/test-documents/oom.xml
@@ -0,0 +1,24 @@
+<?xml version="1.0" encoding="UTF-8" ?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing,
+  software distributed under the License is distributed on an
+  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  KIND, either express or implied.  See the License for the
+  specific language governing permissions and limitations
+  under the License.
+-->
+
+<mock>
+  <metadata action="add" name="author">Nikolai Lobachevsky</metadata>
+  <oom/>
+</mock>
\ No newline at end of file
diff --git 
a/tika-integration-tests/tika-pipes-elasticsearch-integration-tests/src/test/resources/test-documents/test_recursive_embedded.docx
 
b/tika-integration-tests/tika-pipes-elasticsearch-integration-tests/src/test/resources/test-documents/test_recursive_embedded.docx
new file mode 100644
index 0000000000..cd562cbb82
Binary files /dev/null and 
b/tika-integration-tests/tika-pipes-elasticsearch-integration-tests/src/test/resources/test-documents/test_recursive_embedded.docx
 differ
diff --git a/tika-pipes/tika-pipes-plugins/pom.xml 
b/tika-pipes/tika-pipes-plugins/pom.xml
index dcfe22c58b..84167181e2 100644
--- a/tika-pipes/tika-pipes-plugins/pom.xml
+++ b/tika-pipes/tika-pipes-plugins/pom.xml
@@ -43,6 +43,7 @@
     <module>tika-pipes-json</module>
     <module>tika-pipes-kafka</module>
     <module>tika-pipes-microsoft-graph</module>
+    <module>tika-pipes-elasticsearch</module>
     <module>tika-pipes-opensearch</module>
     <module>tika-pipes-s3</module>
     <module>tika-pipes-solr</module>
diff --git a/tika-pipes/tika-pipes-plugins/tika-pipes-elasticsearch/pom.xml 
b/tika-pipes/tika-pipes-plugins/tika-pipes-elasticsearch/pom.xml
new file mode 100644
index 0000000000..4995637318
--- /dev/null
+++ b/tika-pipes/tika-pipes-plugins/tika-pipes-elasticsearch/pom.xml
@@ -0,0 +1,134 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing,
+  software distributed under the License is distributed on an
+  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  KIND, either express or implied.  See the License for the
+  specific language governing permissions and limitations
+  under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
https://maven.apache.org/xsd/maven-4.0.0.xsd";>
+  <parent>
+    <artifactId>tika-pipes-plugins</artifactId>
+    <groupId>org.apache.tika</groupId>
+    <version>4.0.0-SNAPSHOT</version>
+  </parent>
+  <modelVersion>4.0.0</modelVersion>
+
+  <artifactId>tika-pipes-elasticsearch</artifactId>
+  <name>Apache Tika Pipes Elasticsearch</name>
+  <description>
+    Elasticsearch emitter for Apache Tika Pipes.
+    Uses plain HTTP (Apache HttpClient) — no Elasticsearch Java client 
dependency.
+    All dependencies are ASL v2 licensed.
+  </description>
+  <properties>
+    <!-- Never include the core artifacts in your plugin lib directory. If you 
do, it will cause the classloading
+     to get messed up when finding your plugins. -->
+    
<plugin.excluded.artifactIds>tika-core,tika-pipes-api,tika-serialization,tika-plugins-core</plugin.excluded.artifactIds>
+    
<plugin.excluded.groupIds>org.apache.logging.log4j,org.slf4j</plugin.excluded.groupIds>
+  </properties>
+
+  <dependencies>
+    <dependency>
+      <groupId>${project.groupId}</groupId>
+      <artifactId>tika-pipes-api</artifactId>
+      <version>${project.version}</version>
+      <scope>provided</scope>
+    </dependency>
+    <dependency>
+      <groupId>${project.groupId}</groupId>
+      <artifactId>tika-httpclient-commons</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.eclipse.jetty</groupId>
+      <artifactId>jetty-io</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.eclipse.jetty</groupId>
+      <artifactId>jetty-http</artifactId>
+    </dependency>
+  </dependencies>
+
+  <build>
+    <plugins>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-dependency-plugin</artifactId>
+        <executions>
+          <execution>
+            <id>copy-dependencies</id>
+            <phase>package</phase>
+            <goals>
+              <goal>copy-dependencies</goal>
+            </goals>
+            <configuration>
+              <outputDirectory>${project.build.directory}/lib</outputDirectory>
+              <includeScope>runtime</includeScope>
+              
<excludeArtifactIds>${plugin.excluded.artifactIds}</excludeArtifactIds>
+              <excludeGroupIds>${plugin.excluded.groupIds}</excludeGroupIds>
+            </configuration>
+          </execution>
+        </executions>
+      </plugin>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-jar-plugin</artifactId>
+        <configuration>
+          <archive>
+            <manifestEntries>
+              <Plugin-Id>elasticsearch-emitter</Plugin-Id>
+              <Plugin-Version>${project.version}</Plugin-Version>
+              
<Plugin-Class>org.apache.tika.pipes.plugin.elasticsearch.ElasticsearchPipesPlugin</Plugin-Class>
+              <Plugin-Provider>Elasticsearch Emitter</Plugin-Provider>
+              <Plugin-Description>Elasticsearch emitter (plain HTTP, no ES 
client)</Plugin-Description>
+              <Plugin-Dependencies></Plugin-Dependencies>
+            </manifestEntries>
+          </archive>
+        </configuration>
+      </plugin>
+      <plugin>
+        <artifactId>maven-assembly-plugin</artifactId>
+        <configuration>
+          <descriptors>
+            <descriptor>src/main/assembly/assembly.xml</descriptor>
+          </descriptors>
+          <appendAssemblyId>false</appendAssemblyId>
+        </configuration>
+        <executions>
+          <execution>
+            <id>make-assembly</id>
+            <phase>package</phase>
+            <goals>
+              <goal>single</goal>
+            </goals>
+          </execution>
+        </executions>
+      </plugin>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-compiler-plugin</artifactId>
+        <configuration>
+          <annotationProcessors>
+            
<annotationProcessor>org.pf4j.processor.ExtensionAnnotationProcessor</annotationProcessor>
+          </annotationProcessors>
+        </configuration>
+      </plugin>
+    </plugins>
+  </build>
+
+  <scm>
+    <tag>3.0.0-rc1</tag>
+  </scm>
+</project>
diff --git 
a/tika-pipes/tika-pipes-plugins/tika-pipes-elasticsearch/src/main/assembly/assembly.xml
 
b/tika-pipes/tika-pipes-plugins/tika-pipes-elasticsearch/src/main/assembly/assembly.xml
new file mode 100644
index 0000000000..ea0f8b4a1c
--- /dev/null
+++ 
b/tika-pipes/tika-pipes-plugins/tika-pipes-elasticsearch/src/main/assembly/assembly.xml
@@ -0,0 +1,55 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements.  See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<assembly xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+          xmlns="http://maven.apache.org/ASSEMBLY/2.0.0";
+          xsi:schemaLocation="http://maven.apache.org/ASSEMBLY/2.0.0
+                              http://maven.apache.org/xsd/assembly-2.0.0.xsd";>
+  <id>dependencies-zip</id>
+  <formats>
+    <format>zip</format>
+  </formats>
+  <includeBaseDirectory>false</includeBaseDirectory>
+  <fileSets>
+    <fileSet>
+      <directory>${project.build.directory}/lib</directory>
+      <outputDirectory>/lib</outputDirectory>
+    </fileSet>
+    <fileSet>
+      <directory>${project.build.directory}</directory>
+      <outputDirectory>/lib</outputDirectory>
+      <includes>
+        <include>${project.artifactId}-${project.version}.jar</include>
+      </includes>
+    </fileSet>
+    <fileSet>
+      <directory>${project.build.directory}</directory>
+      <outputDirectory>/</outputDirectory>
+      <includes>
+        <include>classes/META-INF/extensions.idx</include>
+        <include>classes/META-INF/MANIFEST.MF</include>
+      </includes>
+    </fileSet>
+    <fileSet>
+      <directory>${project.basedir}/src/main/resources</directory>
+      <outputDirectory>/</outputDirectory>
+      <includes>
+        <include>plugin.properties</include>
+      </includes>
+    </fileSet>
+  </fileSets>
+</assembly>
diff --git 
a/tika-pipes/tika-pipes-plugins/tika-pipes-elasticsearch/src/main/java/org/apache/tika/pipes/emitter/elasticsearch/ElasticsearchClient.java
 
b/tika-pipes/tika-pipes-plugins/tika-pipes-elasticsearch/src/main/java/org/apache/tika/pipes/emitter/elasticsearch/ElasticsearchClient.java
new file mode 100644
index 0000000000..919c12ab9b
--- /dev/null
+++ 
b/tika-pipes/tika-pipes-plugins/tika-pipes-elasticsearch/src/main/java/org/apache/tika/pipes/emitter/elasticsearch/ElasticsearchClient.java
@@ -0,0 +1,428 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tika.pipes.emitter.elasticsearch;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.Reader;
+import java.io.StringWriter;
+import java.nio.charset.StandardCharsets;
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
+
+import com.fasterxml.jackson.core.JsonFactory;
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.http.HttpResponse;
+import org.apache.http.client.HttpClient;
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.entity.StringEntity;
+import org.apache.http.util.EntityUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.tika.client.TikaClientException;
+import org.apache.tika.metadata.Metadata;
+import org.apache.tika.pipes.api.emitter.EmitData;
+import org.apache.tika.utils.StringUtils;
+
+/**
+ * Plain HTTP client for Elasticsearch's REST API.
+ *
+ * <p>This does <b>not</b> use the Elasticsearch Java client library
+ * (which is SSPL / Elastic License). Instead it talks directly to
+ * Elasticsearch's {@code _bulk} REST endpoint using Apache HttpClient
+ * (ASL v2).
+ *
+ * <p>Supports API key authentication ({@code Authorization: ApiKey ...})
+ * as well as basic auth via the underlying {@link HttpClient}.
+ */
+public class ElasticsearchClient {
+
+    private static final Logger LOG =
+            LoggerFactory.getLogger(ElasticsearchClient.class);
+
+    protected final HttpClient httpClient;
+
+    private final MetadataToJsonWriter metadataToJsonWriter;
+    private final ElasticsearchEmitterConfig config;
+
+    protected ElasticsearchClient(ElasticsearchEmitterConfig config,
+                                  HttpClient httpClient) {
+        this.config = config;
+        this.httpClient = httpClient;
+        this.metadataToJsonWriter =
+                (config.updateStrategy() ==
+                        ElasticsearchEmitterConfig.UpdateStrategy.OVERWRITE)
+                        ? new InsertMetadataToJsonWriter()
+                        : new UpsertMetadataToJsonWriter();
+    }
+
+    public void emitDocuments(List<? extends EmitData> emitData)
+            throws IOException, TikaClientException {
+        StringBuilder json = new StringBuilder();
+        for (EmitData d : emitData) {
+            appendDoc(d.getEmitKey(), d.getMetadataList(), json);
+        }
+        emitJson(json);
+    }
+
+    public void emitDocument(String emitKey, List<Metadata> metadataList)
+            throws IOException, TikaClientException {
+        StringBuilder json = new StringBuilder();
+        appendDoc(emitKey, metadataList, json);
+        emitJson(json);
+    }
+
+    private void emitJson(StringBuilder json)
+            throws IOException, TikaClientException {
+        String requestUrl = config.elasticsearchUrl() + "/_bulk";
+        JsonResponse response = postJson(requestUrl, json.toString());
+        if (response.getStatus() != 200) {
+            throw new TikaClientException(response.getMsg());
+        } else {
+            // If there's a single error in the bulk response, throw
+            JsonNode errorNode = response.getJson().get("errors");
+            if (errorNode != null && errorNode.asText().equals("true")) {
+                throw new TikaClientException(
+                        response.getJson().toString());
+            }
+        }
+    }
+
+    private void appendDoc(String emitKey, List<Metadata> metadataList,
+                           StringBuilder json) throws IOException {
+        int i = 0;
+        String routing =
+                (config.attachmentStrategy() ==
+                        ElasticsearchEmitterConfig.AttachmentStrategy
+                                .PARENT_CHILD) ? emitKey : null;
+
+        for (Metadata metadata : metadataList) {
+            StringBuilder id = new StringBuilder(emitKey);
+            if (i > 0) {
+                id.append("-").append(UUID.randomUUID());
+            }
+            String indexJson =
+                    metadataToJsonWriter.getBulkJson(id.toString(), routing);
+            json.append(indexJson).append("\n");
+            if (i == 0) {
+                json.append(metadataToJsonWriter.writeContainer(
+                        metadata, config.attachmentStrategy()));
+            } else {
+                json.append(metadataToJsonWriter.writeEmbedded(
+                        metadata, config.attachmentStrategy(), emitKey,
+                        config.embeddedFileFieldName()));
+            }
+            json.append("\n");
+            i++;
+        }
+    }
+
+    // Package-private for testing
+    static String metadataToJsonContainerInsert(
+            Metadata metadata,
+            ElasticsearchEmitterConfig.AttachmentStrategy attachmentStrategy)
+            throws IOException {
+        return new InsertMetadataToJsonWriter().writeContainer(
+                metadata, attachmentStrategy);
+    }
+
+    // Package-private for testing
+    static String metadataToJsonEmbeddedInsert(
+            Metadata metadata,
+            ElasticsearchEmitterConfig.AttachmentStrategy attachmentStrategy,
+            String emitKey, String embeddedFileFieldName)
+            throws IOException {
+        return new InsertMetadataToJsonWriter().writeEmbedded(
+                metadata, attachmentStrategy, emitKey,
+                embeddedFileFieldName);
+    }
+
+    public JsonResponse postJson(String url, String json) throws IOException {
+        HttpPost httpRequest = new HttpPost(url);
+        StringEntity entity =
+                new StringEntity(json, StandardCharsets.UTF_8);
+        httpRequest.setEntity(entity);
+        httpRequest.setHeader("Accept", "application/json");
+        httpRequest.setHeader("Content-type",
+                "application/json; charset=utf-8");
+
+        // ES 8.x API key auth
+        if (!StringUtils.isEmpty(config.apiKey())) {
+            httpRequest.setHeader("Authorization",
+                    "ApiKey " + config.apiKey());
+        }
+
+        HttpResponse response = null;
+        try {
+            response = httpClient.execute(httpRequest);
+            int status = response.getStatusLine().getStatusCode();
+            if (status == 200) {
+                try (Reader reader = new BufferedReader(
+                        new InputStreamReader(
+                                response.getEntity().getContent(),
+                                StandardCharsets.UTF_8))) {
+                    ObjectMapper mapper = new ObjectMapper();
+                    JsonNode node = mapper.readTree(reader);
+                    if (LOG.isTraceEnabled()) {
+                        LOG.trace("node: {}", node);
+                    }
+                    return new JsonResponse(200, node);
+                }
+            } else {
+                return new JsonResponse(status,
+                        new String(
+                                EntityUtils.toByteArray(
+                                        response.getEntity()),
+                                StandardCharsets.UTF_8));
+            }
+        } finally {
+            if (response instanceof CloseableHttpResponse) {
+                ((CloseableHttpResponse) response).close();
+            }
+            httpRequest.releaseConnection();
+        }
+    }
+
+    // -----------------------------------------------------------------------
+    // JSON writers for _bulk API
+    // -----------------------------------------------------------------------
+
+    private interface MetadataToJsonWriter {
+        String writeContainer(
+                Metadata metadata,
+                ElasticsearchEmitterConfig.AttachmentStrategy strategy)
+                throws IOException;
+
+        String writeEmbedded(
+                Metadata metadata,
+                ElasticsearchEmitterConfig.AttachmentStrategy strategy,
+                String emitKey, String embeddedFileFieldName)
+                throws IOException;
+
+        String getBulkJson(String id, String routing) throws IOException;
+    }
+
+    private static class InsertMetadataToJsonWriter
+            implements MetadataToJsonWriter {
+
+        @Override
+        public String writeContainer(
+                Metadata metadata,
+                ElasticsearchEmitterConfig.AttachmentStrategy strategy)
+                throws IOException {
+            StringWriter writer = new StringWriter();
+            try (JsonGenerator jg =
+                         new JsonFactory().createGenerator(writer)) {
+                jg.writeStartObject();
+                writeMetadata(metadata, jg);
+                if (strategy ==
+                        ElasticsearchEmitterConfig.AttachmentStrategy
+                                .PARENT_CHILD) {
+                    jg.writeStringField("relation_type", "container");
+                }
+                jg.writeEndObject();
+            }
+            return writer.toString();
+        }
+
+        @Override
+        public String writeEmbedded(
+                Metadata metadata,
+                ElasticsearchEmitterConfig.AttachmentStrategy strategy,
+                String emitKey, String embeddedFileFieldName)
+                throws IOException {
+            StringWriter writer = new StringWriter();
+            try (JsonGenerator jg =
+                         new JsonFactory().createGenerator(writer)) {
+                jg.writeStartObject();
+                writeMetadata(metadata, jg);
+                if (strategy ==
+                        ElasticsearchEmitterConfig.AttachmentStrategy
+                                .PARENT_CHILD) {
+                    jg.writeObjectFieldStart("relation_type");
+                    jg.writeStringField("name", embeddedFileFieldName);
+                    jg.writeStringField("parent", emitKey);
+                    jg.writeEndObject();
+                } else if (strategy ==
+                        ElasticsearchEmitterConfig.AttachmentStrategy
+                                .SEPARATE_DOCUMENTS) {
+                    jg.writeStringField("parent", emitKey);
+                }
+                jg.writeEndObject();
+            }
+            return writer.toString();
+        }
+
+        @Override
+        public String getBulkJson(String id, String routing)
+                throws IOException {
+            StringWriter writer = new StringWriter();
+            try (JsonGenerator jg =
+                         new JsonFactory().createGenerator(writer)) {
+                jg.writeStartObject();
+                jg.writeObjectFieldStart("index");
+                jg.writeStringField("_id", id);
+                if (!StringUtils.isEmpty(routing)) {
+                    jg.writeStringField("routing", routing);
+                }
+                jg.writeEndObject();
+                jg.writeEndObject();
+            }
+            return writer.toString();
+        }
+    }
+
+    private static class UpsertMetadataToJsonWriter
+            implements MetadataToJsonWriter {
+
+        @Override
+        public String writeContainer(
+                Metadata metadata,
+                ElasticsearchEmitterConfig.AttachmentStrategy strategy)
+                throws IOException {
+            StringWriter writer = new StringWriter();
+            try (JsonGenerator jg =
+                         new JsonFactory().createGenerator(writer)) {
+                jg.writeStartObject();
+                jg.writeObjectFieldStart("doc");
+                writeMetadata(metadata, jg);
+                if (strategy ==
+                        ElasticsearchEmitterConfig.AttachmentStrategy
+                                .PARENT_CHILD) {
+                    jg.writeStringField("relation_type", "container");
+                }
+                jg.writeEndObject();
+                jg.writeBooleanField("doc_as_upsert", true);
+                jg.writeEndObject();
+            }
+            return writer.toString();
+        }
+
+        @Override
+        public String writeEmbedded(
+                Metadata metadata,
+                ElasticsearchEmitterConfig.AttachmentStrategy strategy,
+                String emitKey, String embeddedFileFieldName)
+                throws IOException {
+            StringWriter writer = new StringWriter();
+            try (JsonGenerator jg =
+                         new JsonFactory().createGenerator(writer)) {
+                jg.writeStartObject();
+                jg.writeObjectFieldStart("doc");
+                writeMetadata(metadata, jg);
+                if (strategy ==
+                        ElasticsearchEmitterConfig.AttachmentStrategy
+                                .PARENT_CHILD) {
+                    jg.writeObjectFieldStart("relation_type");
+                    jg.writeStringField("name", embeddedFileFieldName);
+                    jg.writeStringField("parent", emitKey);
+                    jg.writeEndObject();
+                } else if (strategy ==
+                        ElasticsearchEmitterConfig.AttachmentStrategy
+                                .SEPARATE_DOCUMENTS) {
+                    jg.writeStringField("parent", emitKey);
+                }
+                jg.writeEndObject();
+                jg.writeBooleanField("doc_as_upsert", true);
+                jg.writeEndObject();
+            }
+            return writer.toString();
+        }
+
+        @Override
+        public String getBulkJson(String id, String routing)
+                throws IOException {
+            StringWriter writer = new StringWriter();
+            try (JsonGenerator jg =
+                         new JsonFactory().createGenerator(writer)) {
+                jg.writeStartObject();
+                jg.writeObjectFieldStart("update");
+                jg.writeStringField("_id", id);
+                if (!StringUtils.isEmpty(routing)) {
+                    jg.writeStringField("routing", routing);
+                }
+                jg.writeNumberField("retry_on_conflict", 3);
+                jg.writeEndObject();
+                jg.writeEndObject();
+            }
+            return writer.toString();
+        }
+    }
+
+    /**
+     * Metadata fields whose values are serialized JSON from the
+     * tika-inference pipeline. These must be written as raw JSON
+     * (arrays/objects) rather than escaped strings so that
+     * Elasticsearch can index vectors, locators, etc. natively.
+     */
+    static final Set<String> INFERENCE_JSON_FIELDS = Set.of(
+            "tika:chunks");
+
+    private static void writeMetadata(Metadata metadata,
+                                      JsonGenerator jsonGenerator)
+            throws IOException {
+        for (String n : metadata.names()) {
+            String[] vals = metadata.getValues(n);
+            if (vals.length == 1) {
+                if (INFERENCE_JSON_FIELDS.contains(n)
+                        && isValidJson(vals[0])) {
+                    jsonGenerator.writeFieldName(n);
+                    jsonGenerator.writeRawValue(vals[0]);
+                } else {
+                    jsonGenerator.writeStringField(n, vals[0]);
+                }
+            } else {
+                jsonGenerator.writeArrayFieldStart(n);
+                for (String v : vals) {
+                    jsonGenerator.writeString(v);
+                }
+                jsonGenerator.writeEndArray();
+            }
+        }
+    }
+
+    private static final ObjectMapper VALIDATION_MAPPER = new ObjectMapper();
+
+    /**
+     * Validates that the value is well-formed JSON (array or object)
+     * before writing it as raw JSON. This prevents injection of
+     * arbitrary content into the bulk request payload.
+     */
+    private static boolean isValidJson(String value) {
+        if (value == null || value.isEmpty()) {
+            return false;
+        }
+        char first = value.charAt(0);
+        if (first != '[' && first != '{') {
+            return false;
+        }
+        try {
+            VALIDATION_MAPPER.readTree(value);
+            return true;
+        } catch (IOException e) {
+            LOG.warn("Field value starts with '{}' but is not valid JSON; "
+                    + "writing as escaped string", first);
+            return false;
+        }
+    }
+}
diff --git 
a/tika-pipes/tika-pipes-plugins/tika-pipes-elasticsearch/src/main/java/org/apache/tika/pipes/emitter/elasticsearch/ElasticsearchEmitter.java
 
b/tika-pipes/tika-pipes-plugins/tika-pipes-elasticsearch/src/main/java/org/apache/tika/pipes/emitter/elasticsearch/ElasticsearchEmitter.java
new file mode 100644
index 0000000000..2c8c66783a
--- /dev/null
+++ 
b/tika-pipes/tika-pipes-plugins/tika-pipes-elasticsearch/src/main/java/org/apache/tika/pipes/emitter/elasticsearch/ElasticsearchEmitter.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tika.pipes.emitter.elasticsearch;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.tika.client.HttpClientFactory;
+import org.apache.tika.client.TikaClientException;
+import org.apache.tika.config.ConfigValidator;
+import org.apache.tika.exception.TikaConfigException;
+import org.apache.tika.metadata.Metadata;
+import org.apache.tika.parser.ParseContext;
+import org.apache.tika.pipes.api.emitter.AbstractEmitter;
+import org.apache.tika.pipes.api.emitter.EmitData;
+import org.apache.tika.plugins.ExtensionConfig;
+
+/**
+ * Emitter that sends documents to Elasticsearch via its REST API.
+ *
+ * <p>This emitter does <b>not</b> depend on the Elasticsearch Java client
+ * (which changed to a non-ASL license). It uses plain HTTP via
+ * Apache HttpClient to call the {@code _bulk} endpoint directly.
+ *
+ * <p>Supports:
+ * <ul>
+ *   <li>API key authentication ({@code Authorization: ApiKey &lt;base64&gt;})
+ *       — common with Elasticsearch 8.x</li>
+ *   <li>Basic authentication (username/password via httpClientConfig)</li>
+ *   <li>OVERWRITE and UPSERT update strategies</li>
+ *   <li>SEPARATE_DOCUMENTS and PARENT_CHILD attachment strategies</li>
+ * </ul>
+ */
+public class ElasticsearchEmitter extends AbstractEmitter {
+
+    public static final String DEFAULT_EMBEDDED_FILE_FIELD_NAME = "embedded";
+    private static final Logger LOG =
+            LoggerFactory.getLogger(ElasticsearchEmitter.class);
+
+    private ElasticsearchClient elasticsearchClient;
+    private final HttpClientFactory httpClientFactory;
+    private final ElasticsearchEmitterConfig config;
+
+    public static ElasticsearchEmitter build(ExtensionConfig pluginConfig)
+            throws TikaConfigException, IOException {
+        ElasticsearchEmitterConfig config =
+                ElasticsearchEmitterConfig.load(pluginConfig.json());
+        return new ElasticsearchEmitter(pluginConfig, config);
+    }
+
+    public ElasticsearchEmitter(ExtensionConfig pluginConfig,
+                                ElasticsearchEmitterConfig config)
+            throws IOException, TikaConfigException {
+        super(pluginConfig);
+        this.config = config;
+        httpClientFactory = new HttpClientFactory();
+        configure();
+    }
+
+    @Override
+    public void emit(List<? extends EmitData> emitData) throws IOException {
+        if (emitData == null || emitData.isEmpty()) {
+            LOG.debug("metadataList is null or empty");
+            return;
+        }
+
+        try {
+            LOG.debug("about to emit {} docs", emitData.size());
+            elasticsearchClient.emitDocuments(emitData);
+            LOG.info("successfully emitted {} docs", emitData.size());
+        } catch (TikaClientException e) {
+            LOG.warn("problem emitting docs", e);
+            throw new IOException(e.getMessage(), e);
+        }
+    }
+
+    @Override
+    public void emit(String emitKey, List<Metadata> metadataList,
+                     ParseContext parseContext) throws IOException {
+        if (metadataList == null || metadataList.isEmpty()) {
+            LOG.debug("metadataList is null or empty");
+            return;
+        }
+        try {
+            LOG.debug("about to emit one doc with {} metadata entries",
+                    metadataList.size());
+            elasticsearchClient.emitDocument(emitKey, metadataList);
+            LOG.info("successfully emitted one doc");
+        } catch (TikaClientException e) {
+            LOG.warn("problem emitting doc", e);
+            throw new IOException("failed to add document", e);
+        }
+    }
+
+    private void configure() throws TikaConfigException {
+        ConfigValidator.mustNotBeEmpty("elasticsearchUrl",
+                config.elasticsearchUrl());
+        ConfigValidator.mustNotBeEmpty("idField", config.idField());
+
+        HttpClientConfig http = config.httpClientConfig();
+        if (http != null) {
+            httpClientFactory.setUserName(http.userName());
+            httpClientFactory.setPassword(http.password());
+        }
+
+        elasticsearchClient =
+                new ElasticsearchClient(config, httpClientFactory.build());
+    }
+
+}
diff --git 
a/tika-pipes/tika-pipes-plugins/tika-pipes-elasticsearch/src/main/java/org/apache/tika/pipes/emitter/elasticsearch/ElasticsearchEmitterConfig.java
 
b/tika-pipes/tika-pipes-plugins/tika-pipes-elasticsearch/src/main/java/org/apache/tika/pipes/emitter/elasticsearch/ElasticsearchEmitterConfig.java
new file mode 100644
index 0000000000..874431daac
--- /dev/null
+++ 
b/tika-pipes/tika-pipes-plugins/tika-pipes-elasticsearch/src/main/java/org/apache/tika/pipes/emitter/elasticsearch/ElasticsearchEmitterConfig.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tika.pipes.emitter.elasticsearch;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+import org.apache.tika.exception.TikaConfigException;
+
+/**
+ * Configuration for the Elasticsearch emitter.
+ *
+ * @param elasticsearchUrl Full URL including index, e.g. {@code 
https://localhost:9200/my-index}
+ * @param idField          Metadata field to use as the document {@code _id}
+ * @param attachmentStrategy How to handle embedded documents
+ * @param updateStrategy     Whether to overwrite or upsert
+ * @param commitWithin       Not used by ES, kept for API parity with 
OpenSearch emitter
+ * @param embeddedFileFieldName Field name for embedded-file relation
+ * @param apiKey             Elasticsearch API key for authentication 
(Base64-encoded
+ *                           {@code id:api_key}). Sent as {@code 
Authorization: ApiKey <value>}.
+ *                           If null/empty, falls back to httpClientConfig's 
userName/password
+ *                           for basic auth.
+ * @param httpClientConfig   HTTP connection settings (basic auth, timeouts, 
proxy)
+ */
+public record ElasticsearchEmitterConfig(String elasticsearchUrl, String 
idField,
+                                         AttachmentStrategy attachmentStrategy,
+                                         UpdateStrategy updateStrategy, int 
commitWithin,
+                                         String embeddedFileFieldName, String 
apiKey,
+                                         HttpClientConfig httpClientConfig) {
+    public enum AttachmentStrategy {
+        SEPARATE_DOCUMENTS, PARENT_CHILD,
+    }
+
+    public enum UpdateStrategy {
+        OVERWRITE, UPSERT
+    }
+
+    private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+
+    public static ElasticsearchEmitterConfig load(final String json)
+            throws TikaConfigException {
+        try {
+            return OBJECT_MAPPER.readValue(json,
+                    ElasticsearchEmitterConfig.class);
+        } catch (JsonProcessingException e) {
+            throw new TikaConfigException(
+                    "Failed to parse ElasticsearchEmitterConfig from JSON", e);
+        }
+    }
+
+}
diff --git 
a/tika-pipes/tika-pipes-plugins/tika-pipes-elasticsearch/src/main/java/org/apache/tika/pipes/emitter/elasticsearch/ElasticsearchEmitterFactory.java
 
b/tika-pipes/tika-pipes-plugins/tika-pipes-elasticsearch/src/main/java/org/apache/tika/pipes/emitter/elasticsearch/ElasticsearchEmitterFactory.java
new file mode 100644
index 0000000000..94bda68b2a
--- /dev/null
+++ 
b/tika-pipes/tika-pipes-plugins/tika-pipes-elasticsearch/src/main/java/org/apache/tika/pipes/emitter/elasticsearch/ElasticsearchEmitterFactory.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tika.pipes.emitter.elasticsearch;
+
+import java.io.IOException;
+
+import org.pf4j.Extension;
+
+import org.apache.tika.exception.TikaConfigException;
+import org.apache.tika.pipes.api.emitter.Emitter;
+import org.apache.tika.pipes.api.emitter.EmitterFactory;
+import org.apache.tika.plugins.ExtensionConfig;
+
+/**
+ * Factory for creating Elasticsearch emitters.
+ *
+ * <p>Example JSON configuration:
+ * <pre>
+ * "emitters": {
+ *   "elasticsearch-emitter": {
+ *     "my-es-emitter": {
+ *       "elasticsearchUrl": "https://localhost:9200/my-index";,
+ *       "idField": "id",
+ *       "apiKey": "base64-encoded-id:api_key",
+ *       "attachmentStrategy": "PARENT_CHILD",
+ *       "updateStrategy": "UPSERT",
+ *       "embeddedFileFieldName": "embedded"
+ *     }
+ *   }
+ * }
+ * </pre>
+ */
+@Extension
+public class ElasticsearchEmitterFactory implements EmitterFactory {
+
+    public static final String NAME = "elasticsearch-emitter";
+
+    @Override
+    public String getName() {
+        return NAME;
+    }
+
+    @Override
+    public Emitter buildExtension(ExtensionConfig extensionConfig)
+            throws IOException, TikaConfigException {
+        return ElasticsearchEmitter.build(extensionConfig);
+    }
+}
diff --git 
a/tika-pipes/tika-pipes-plugins/tika-pipes-elasticsearch/src/main/java/org/apache/tika/pipes/emitter/elasticsearch/HttpClientConfig.java
 
b/tika-pipes/tika-pipes-plugins/tika-pipes-elasticsearch/src/main/java/org/apache/tika/pipes/emitter/elasticsearch/HttpClientConfig.java
new file mode 100644
index 0000000000..6b2b18b00b
--- /dev/null
+++ 
b/tika-pipes/tika-pipes-plugins/tika-pipes-elasticsearch/src/main/java/org/apache/tika/pipes/emitter/elasticsearch/HttpClientConfig.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tika.pipes.emitter.elasticsearch;
+
+import java.io.IOException;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+
+public record HttpClientConfig(String userName, String password,
+                               String authScheme, int connectionTimeout,
+                               int socketTimeout, String proxyHost,
+                               int proxyPort) {
+
+    private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+
+    public static HttpClientConfig load(final String json) throws IOException {
+        return OBJECT_MAPPER.readValue(json, HttpClientConfig.class);
+    }
+
+}
diff --git 
a/tika-pipes/tika-pipes-plugins/tika-pipes-elasticsearch/src/main/java/org/apache/tika/pipes/emitter/elasticsearch/JsonResponse.java
 
b/tika-pipes/tika-pipes-plugins/tika-pipes-elasticsearch/src/main/java/org/apache/tika/pipes/emitter/elasticsearch/JsonResponse.java
new file mode 100644
index 0000000000..5e6f3373e3
--- /dev/null
+++ 
b/tika-pipes/tika-pipes-plugins/tika-pipes-elasticsearch/src/main/java/org/apache/tika/pipes/emitter/elasticsearch/JsonResponse.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tika.pipes.emitter.elasticsearch;
+
+
+import com.fasterxml.jackson.databind.JsonNode;
+
+public class JsonResponse {
+
+    private final int status;
+    private final String msg;
+    private final JsonNode root;
+
+    public JsonResponse(int status, JsonNode root) {
+        this.status = status;
+        this.root = root;
+        this.msg = null;
+    }
+
+    public JsonResponse(int status, String msg) {
+        this.status = status;
+        this.msg = msg;
+        this.root = null;
+    }
+
+    public int getStatus() {
+        return status;
+    }
+
+    public String getMsg() {
+        return msg;
+    }
+
+    public JsonNode getJson() {
+        return root;
+    }
+
+    @Override
+    public String toString() {
+        return "JsonResponse{" +
+                "status=" + status +
+                ", msg='" + msg + '\'' +
+                ", root=" + root +
+                '}';
+    }
+}
diff --git 
a/tika-pipes/tika-pipes-plugins/tika-pipes-elasticsearch/src/main/java/org/apache/tika/pipes/plugin/elasticsearch/ElasticsearchPipesPlugin.java
 
b/tika-pipes/tika-pipes-plugins/tika-pipes-elasticsearch/src/main/java/org/apache/tika/pipes/plugin/elasticsearch/ElasticsearchPipesPlugin.java
new file mode 100644
index 0000000000..1b6a195171
--- /dev/null
+++ 
b/tika-pipes/tika-pipes-plugins/tika-pipes-elasticsearch/src/main/java/org/apache/tika/pipes/plugin/elasticsearch/ElasticsearchPipesPlugin.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tika.pipes.plugin.elasticsearch;
+
+import org.pf4j.Plugin;
+import org.pf4j.PluginWrapper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ElasticsearchPipesPlugin extends Plugin {
+    private static final Logger LOG =
+            LoggerFactory.getLogger(ElasticsearchPipesPlugin.class);
+
+    public ElasticsearchPipesPlugin(PluginWrapper wrapper) {
+        super(wrapper);
+    }
+
+    @Override
+    public void start() {
+        LOG.info("Starting Elasticsearch pipes plugin");
+        super.start();
+    }
+
+    @Override
+    public void stop() {
+        LOG.info("Stopping Elasticsearch pipes plugin");
+        super.stop();
+    }
+
+    @Override
+    public void delete() {
+        LOG.info("Deleting Elasticsearch pipes plugin");
+        super.delete();
+    }
+
+}
diff --git 
a/tika-pipes/tika-pipes-plugins/tika-pipes-elasticsearch/src/main/resources/plugin.properties
 
b/tika-pipes/tika-pipes-plugins/tika-pipes-elasticsearch/src/main/resources/plugin.properties
new file mode 100644
index 0000000000..9fc35c59c4
--- /dev/null
+++ 
b/tika-pipes/tika-pipes-plugins/tika-pipes-elasticsearch/src/main/resources/plugin.properties
@@ -0,0 +1,21 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+plugin.id=tika-pipes-elasticsearch-plugin
+plugin.class=org.apache.tika.pipes.plugin.elasticsearch.ElasticsearchPipesPlugin
+plugin.version=4.0.0-SNAPSHOT
+plugin.provider=Apache Tika
+plugin.description=Pipes for Elasticsearch (plain HTTP, ASL v2 only)
diff --git 
a/tika-pipes/tika-pipes-plugins/tika-pipes-elasticsearch/src/test/java/org/apache/tika/pipes/emitter/elasticsearch/ElasticsearchClientTest.java
 
b/tika-pipes/tika-pipes-plugins/tika-pipes-elasticsearch/src/test/java/org/apache/tika/pipes/emitter/elasticsearch/ElasticsearchClientTest.java
new file mode 100644
index 0000000000..3193b700b2
--- /dev/null
+++ 
b/tika-pipes/tika-pipes-plugins/tika-pipes-elasticsearch/src/test/java/org/apache/tika/pipes/emitter/elasticsearch/ElasticsearchClientTest.java
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tika.pipes.emitter.elasticsearch;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.junit.jupiter.api.Test;
+
+import org.apache.tika.TikaTest;
+import org.apache.tika.metadata.Metadata;
+
+public class ElasticsearchClientTest extends TikaTest {
+
+    private static final ObjectMapper MAPPER = new ObjectMapper();
+
+    @Test
+    public void testSerialization() throws Exception {
+        Metadata metadata = new Metadata();
+        metadata.add("authors", "author1");
+        metadata.add("authors", "author2");
+        metadata.add("title", "title1");
+        for (ElasticsearchEmitterConfig.AttachmentStrategy strategy :
+                ElasticsearchEmitterConfig.AttachmentStrategy.values()) {
+            String json =
+                    ElasticsearchClient.metadataToJsonContainerInsert(
+                            metadata, strategy);
+            assertContains("author1", json);
+            assertContains("author2", json);
+            assertContains("authors", json);
+            assertContains("title1", json);
+        }
+        for (ElasticsearchEmitterConfig.AttachmentStrategy strategy :
+                ElasticsearchEmitterConfig.AttachmentStrategy.values()) {
+            String json =
+                    ElasticsearchClient.metadataToJsonEmbeddedInsert(
+                            metadata, strategy, "myEmitKey",
+                            ElasticsearchEmitter
+                                    .DEFAULT_EMBEDDED_FILE_FIELD_NAME);
+            assertContains("author1", json);
+            assertContains("author2", json);
+            assertContains("authors", json);
+            assertContains("title1", json);
+        }
+    }
+
+    @Test
+    public void testParentChildContainer() throws Exception {
+        Metadata metadata = new Metadata();
+        metadata.add("title", "parent doc");
+        String json =
+                ElasticsearchClient.metadataToJsonContainerInsert(
+                        metadata,
+                        ElasticsearchEmitterConfig.AttachmentStrategy
+                                .PARENT_CHILD);
+        assertContains("relation_type", json);
+        assertContains("container", json);
+    }
+
+    @Test
+    public void testParentChildEmbedded() throws Exception {
+        Metadata metadata = new Metadata();
+        metadata.add("title", "child doc");
+        String json =
+                ElasticsearchClient.metadataToJsonEmbeddedInsert(
+                        metadata,
+                        ElasticsearchEmitterConfig.AttachmentStrategy
+                                .PARENT_CHILD,
+                        "parentKey", "embedded");
+        assertContains("relation_type", json);
+        assertContains("parentKey", json);
+        assertContains("embedded", json);
+    }
+
+    @Test
+    public void testSeparateDocumentsEmbedded() throws Exception {
+        Metadata metadata = new Metadata();
+        metadata.add("title", "child doc");
+        String json =
+                ElasticsearchClient.metadataToJsonEmbeddedInsert(
+                        metadata,
+                        ElasticsearchEmitterConfig.AttachmentStrategy
+                                .SEPARATE_DOCUMENTS,
+                        "parentKey", "embedded");
+        assertContains("parent", json);
+        assertContains("parentKey", json);
+        assertNotContained("relation_type", json);
+    }
+
+    @Test
+    public void testChunksFieldWrittenAsRawJson() throws Exception {
+        Metadata metadata = new Metadata();
+        metadata.set("title", "test doc");
+        metadata.set("tika:chunks",
+                "[{\"text\":\"hello\",\"vector\":\"AAAA\","
+                        + "\"locators\":{\"text\":[{\"start_offset\":0,"
+                        + "\"end_offset\":5}]}}]");
+        String json =
+                ElasticsearchClient.metadataToJsonContainerInsert(
+                        metadata,
+                        ElasticsearchEmitterConfig.AttachmentStrategy
+                                .SEPARATE_DOCUMENTS);
+
+        // Parse the output JSON — tika:chunks should be a real JSON
+        // array, not a double-escaped string
+        JsonNode doc = MAPPER.readTree(json);
+        JsonNode chunks = doc.get("tika:chunks");
+        assertTrue(chunks.isArray(),
+                "tika:chunks should be a JSON array, not a string");
+        assertEquals(1, chunks.size());
+        assertEquals("hello", chunks.get(0).get("text").asText());
+        assertEquals("AAAA", chunks.get(0).get("vector").asText());
+    }
+
+    @Test
+    public void testNonJsonFieldStaysString() throws Exception {
+        Metadata metadata = new Metadata();
+        metadata.set("tika:chunks", "not json at all");
+        String json =
+                ElasticsearchClient.metadataToJsonContainerInsert(
+                        metadata,
+                        ElasticsearchEmitterConfig.AttachmentStrategy
+                                .SEPARATE_DOCUMENTS);
+        JsonNode doc = MAPPER.readTree(json);
+        // Should be a plain string since it doesn't look like JSON
+        assertTrue(doc.get("tika:chunks").isTextual());
+        assertEquals("not json at all",
+                doc.get("tika:chunks").asText());
+    }
+
+    @Test
+    public void testRegularFieldNotRawJson() throws Exception {
+        Metadata metadata = new Metadata();
+        // A regular field whose value happens to look like JSON
+        // should NOT be written as raw JSON
+        metadata.set("description", "[some bracketed text]");
+        String json =
+                ElasticsearchClient.metadataToJsonContainerInsert(
+                        metadata,
+                        ElasticsearchEmitterConfig.AttachmentStrategy
+                                .SEPARATE_DOCUMENTS);
+        JsonNode doc = MAPPER.readTree(json);
+        assertTrue(doc.get("description").isTextual());
+    }
+
+    @Test
+    public void testMultiValuedMetadata() throws Exception {
+        Metadata metadata = new Metadata();
+        metadata.add("tags", "tag1");
+        metadata.add("tags", "tag2");
+        metadata.add("tags", "tag3");
+        String json =
+                ElasticsearchClient.metadataToJsonContainerInsert(
+                        metadata,
+                        ElasticsearchEmitterConfig.AttachmentStrategy
+                                .SEPARATE_DOCUMENTS);
+        assertContains("tag1", json);
+        assertContains("tag2", json);
+        assertContains("tag3", json);
+    }
+}
diff --git 
a/tika-pipes/tika-pipes-plugins/tika-pipes-opensearch/src/main/java/org/apache/tika/pipes/emitter/opensearch/OpenSearchClient.java
 
b/tika-pipes/tika-pipes-plugins/tika-pipes-opensearch/src/main/java/org/apache/tika/pipes/emitter/opensearch/OpenSearchClient.java
index 7cd8df5ca0..cc543da2c9 100644
--- 
a/tika-pipes/tika-pipes-plugins/tika-pipes-opensearch/src/main/java/org/apache/tika/pipes/emitter/opensearch/OpenSearchClient.java
+++ 
b/tika-pipes/tika-pipes-plugins/tika-pipes-opensearch/src/main/java/org/apache/tika/pipes/emitter/opensearch/OpenSearchClient.java
@@ -23,6 +23,7 @@ import java.io.Reader;
 import java.io.StringWriter;
 import java.nio.charset.StandardCharsets;
 import java.util.List;
+import java.util.Set;
 import java.util.UUID;
 
 import com.fasterxml.jackson.core.JsonFactory;
@@ -310,13 +311,28 @@ public class OpenSearchClient {
         }
     }
 
+    /**
+     * Metadata fields whose values are serialized JSON from the
+     * tika-inference pipeline. These must be written as raw JSON
+     * (arrays/objects) rather than escaped strings so that
+     * OpenSearch can index vectors, locators, etc. natively.
+     */
+    static final Set<String> INFERENCE_JSON_FIELDS = Set.of(
+            "tika:chunks");
+
     private static void writeMetadata(Metadata metadata, JsonGenerator 
jsonGenerator) throws IOException {
         //writes the metadata without the start { or the end }
         //to allow for other fields to be added
         for (String n : metadata.names()) {
             String[] vals = metadata.getValues(n);
             if (vals.length == 1) {
-                jsonGenerator.writeStringField(n, vals[0]);
+                if (INFERENCE_JSON_FIELDS.contains(n)
+                        && isValidJson(vals[0])) {
+                    jsonGenerator.writeFieldName(n);
+                    jsonGenerator.writeRawValue(vals[0]);
+                } else {
+                    jsonGenerator.writeStringField(n, vals[0]);
+                }
             } else {
                 jsonGenerator.writeArrayFieldStart(n);
                 for (String v : vals) {
@@ -326,4 +342,29 @@ public class OpenSearchClient {
             }
         }
     }
+
+    private static final ObjectMapper VALIDATION_MAPPER = new ObjectMapper();
+
+    /**
+     * Validates that the value is well-formed JSON (array or object)
+     * before writing it as raw JSON. This prevents injection of
+     * arbitrary content into the bulk request payload.
+     */
+    private static boolean isValidJson(String value) {
+        if (value == null || value.isEmpty()) {
+            return false;
+        }
+        char first = value.charAt(0);
+        if (first != '[' && first != '{') {
+            return false;
+        }
+        try {
+            VALIDATION_MAPPER.readTree(value);
+            return true;
+        } catch (IOException e) {
+            LOG.warn("Field value starts with '{}' but is not valid JSON; "
+                    + "writing as escaped string", first);
+            return false;
+        }
+    }
 }

Reply via email to