This is an automated email from the ASF dual-hosted git repository.

tallison pushed a commit to branch elasticsearch-emitter
in repository https://gitbox.apache.org/repos/asf/tika.git

commit df03103bb8d6b80ba51c8f11e802980d58ea9dc4
Author: tballison <[email protected]>
AuthorDate: Fri Feb 20 11:30:18 2026 -0500

    TIKA-4XXX: Add ES emitter and reporter pipes plugin
    
    Adds an Elasticsearch-compatible emitter and pipes reporter as a new
    tika-pipes plugin, avoiding use of the trademarked "Elasticsearch" name
    in class and package identifiers (plugin id: es-emitter).
    
    Key changes:
    - New plugin: tika-pipes-elasticsearch with ESEmitter, ESPipesReporter,
      ESEmitterConfig, ESEmitterFactory, ESPipesPlugin; all classes under
      org.apache.tika.pipes.{emitter,reporter,plugin}.es packages
    - ESClient talks directly to the _bulk REST endpoint using Apache
      HttpClient (ASL v2), avoiding the SSPL-licensed ES Java client
    - Reporter writes per-document parse status (PARSE_SUCCESS,
      PARSE_SUCCESS_WITH_EXCEPTION, EMIT_SUCCESS, OOM/TIMEOUT) to a
      dedicated ES index, mirroring the OpenSearch reporter
    - HttpClientFactory: add verifySsl flag (default false preserves
      existing trust-all behaviour); when true uses JVM default trust store
      and hostname verification; copy() propagates the flag
    - VectorSerializer: switch to BIG_ENDIAN byte order to match ES 9.3+
      dense_vector base64 format (big-endian IEEE 754 float32)
    - Security: fix JSON field injection in ESClient.isValidJson by using
      JsonParser with explicit trailing-token check; override
      ESEmitterConfig.toString() to redact apiKey from logs
    - Integration tests: 6 Docker-based tests (Testcontainers, ES 9.3.0)
      covering basic FS→ES pipeline, reporter validation, RMETA/PARENT_CHILD
      attachment strategies, upsert, and kNN vector search; vector test uses
      base64 on ES >= 9.3, falls back to float-array bulk indexing on older
      versions
    - Documentation: ES emitter and reporter sections in pipes/index.adoc
      with configuration tables and SSL warning
    
    Co-authored-by: Cursor <[email protected]>
---
 docs/modules/ROOT/pages/pipes/index.adoc           | 100 +++++++-
 .../pom.xml                                        |   2 +-
 .../elasticsearch/tests/ElasticsearchTest.java     | 237 ++++++++++++++++---
 .../tests/ElasticsearchTestClient.java             |  32 +--
 .../elasticsearch-vector-mappings.json             |  17 ++
 .../resources/elasticsearch/plugins-template.json  |  16 +-
 .../apache/tika/inference/VectorSerializer.java    |  18 +-
 .../tika/inference/VectorSerializerTest.java       |  13 ++
 .../org/apache/tika/client/HttpClientFactory.java  |  40 +++-
 .../tika-pipes-elasticsearch/pom.xml               |  13 +-
 .../ElasticsearchClient.java => es/ESClient.java}  | 181 ++++++---------
 .../ESEmitter.java}                                |  54 +++--
 .../ESEmitterConfig.java}                          |  39 ++--
 .../ESEmitterFactory.java}                         |  14 +-
 .../tika/pipes/emitter/es/HttpClientConfig.java    |  39 ++++
 .../{elasticsearch => es}/JsonResponse.java        |   3 +-
 .../ESPipesPlugin.java}                            |  16 +-
 .../tika/pipes/reporter/es/ESPipesReporter.java    | 251 +++++++++++++++++++++
 .../es/ESReporterConfig.java}                      |  24 +-
 .../es/ESReporterFactory.java}                     |  30 +--
 .../src/main/resources/plugin.properties           |   6 +-
 .../ESClientTest.java}                             | 108 ++++-----
 22 files changed, 916 insertions(+), 337 deletions(-)

diff --git a/docs/modules/ROOT/pages/pipes/index.adoc 
b/docs/modules/ROOT/pages/pipes/index.adoc
index ff67ab6e0c..0f94b9c080 100644
--- a/docs/modules/ROOT/pages/pipes/index.adoc
+++ b/docs/modules/ROOT/pages/pipes/index.adoc
@@ -24,7 +24,7 @@ This section covers Tika Pipes for scalable, fault-tolerant 
document processing.
 Tika Pipes provides a framework for processing large volumes of documents with:
 
 * **Fetchers** - Retrieve documents from various sources (filesystem, S3, 
HTTP, etc.)
-* **Emitters** - Send parsed results to various destinations (filesystem, 
OpenSearch, Solr, etc.)
+* **Emitters** - Send parsed results to various destinations (filesystem, 
OpenSearch, ES-compatible, Solr, etc.)
 * **Pipelines** - Configure processing workflows
 
 == Topics
@@ -39,6 +39,104 @@ Tika Pipes provides a framework for processing large 
volumes of documents with:
 // * link:configuration.html[Configuration]
 // * link:async.html[Async Processing]
 
+== Emitters
+
+=== ES Emitter (`es-emitter`)
+
+The ES emitter sends parsed documents to any ES-compatible REST API (ES 7+/8+) 
via
+the `_bulk` endpoint. It uses plain HTTP (Apache HttpClient) — there is no 
dependency
+on the ES Java client, which carries a non-ASL license.
+
+[source,json]
+----
+"emitters": {
+  "my-es": {
+    "es-emitter": {
+      "esUrl": "https://localhost:9200/my-index";,
+      "idField": "_id",
+      "attachmentStrategy": "SEPARATE_DOCUMENTS",
+      "updateStrategy": "UPSERT",
+      "embeddedFileFieldName": "embedded",
+      "apiKey": "<base64-encoded id:api_key>"
+    }
+  }
+}
+----
+
+[cols="1,1,3"]
+|===
+|Field |Default |Description
+
+|`esUrl`
+|_required_
+|Full URL including the index name, e.g. `https://localhost:9200/my-index`
+
+|`idField`
+|`_id`
+|Metadata field used as the document `_id`
+
+|`attachmentStrategy`
+|`SEPARATE_DOCUMENTS`
+|How embedded documents are stored. `SEPARATE_DOCUMENTS` gives each embedded
+file its own flat document. `PARENT_CHILD` uses an ES join field so embedded
+files are linked to their container via `relation_type`.
+
+|`updateStrategy`
+|`OVERWRITE`
+|`OVERWRITE` uses a bulk `index` action (full replace).
+`UPSERT` uses a bulk `update` / `doc_as_upsert` action (field-level merge).
+
+|`embeddedFileFieldName`
+|`embedded`
+|Name of the join-field relation used in `PARENT_CHILD` mode.
+
+|`apiKey`
+|_none_
+|Base64-encoded `id:api_key` sent as `Authorization: ApiKey <value>`.
+Takes precedence over `httpClientConfig` basic auth.
+
+|`httpClientConfig`
+|_none_
+|Optional block for `userName`, `password`, `authScheme`, `connectionTimeout`,
+`socketTimeout`, `proxyHost`, `proxyPort`, and `verifySsl` (boolean, default 
`false`).
+|===
+
+[WARNING]
+====
+By default (`verifySsl: false`) TLS certificate verification is disabled — all
+certificates are trusted and hostname verification is skipped.  Set
+`httpClientConfig.verifySsl: true` to enable proper certificate and hostname
+validation using the JVM's default trust store.  When `verifySsl` is `false`,
+do not transmit credentials over plain HTTP in production; prefer HTTPS with
+network-level controls (VPN, private endpoint) until verification is enabled.
+====
+
+=== ES Pipes Reporter (`es-pipes-reporter`)
+
+The ES reporter writes per-document parse status back into the same index,
+so you can query the processing outcome alongside the extracted content.
+
+[source,json]
+----
+"pipes-reporters": {
+  "es-pipes-reporter": {
+    "esUrl": "https://localhost:9200/my-index";,
+    "keyPrefix": "tika_",
+    "includeRouting": false
+  }
+}
+----
+
+The reporter adds `<keyPrefix>parse_status`, `<keyPrefix>parse_time_ms`,
+and (when the forked JVM exits abnormally) `<keyPrefix>exit_value` fields
+to each document via an upsert.
+
+=== OpenSearch Emitter
+
+The OpenSearch emitter is configured identically but uses `opensearch-emitter` 
as the
+plugin key and `openSearchUrl` as the URL field. It also ships with an
+`opensearch-pipes-reporter`.
+
 == Advanced Topics
 
 * xref:pipes/shared-server-mode.adoc[Shared Server Mode] - Experimental mode 
for reduced memory usage
diff --git 
a/tika-integration-tests/tika-pipes-elasticsearch-integration-tests/pom.xml 
b/tika-integration-tests/tika-pipes-elasticsearch-integration-tests/pom.xml
index ce6f15a746..4f879fee04 100644
--- a/tika-integration-tests/tika-pipes-elasticsearch-integration-tests/pom.xml
+++ b/tika-integration-tests/tika-pipes-elasticsearch-integration-tests/pom.xml
@@ -70,7 +70,7 @@
       <artifactId>apache-rat-plugin</artifactId>
       <configuration>
         <inputExcludes>
-          <inputExclude>src/test/resources/elasticsearch/*.json</inputExclude>
+          <inputExclude>src/test/resources/elasticsearch/**</inputExclude>
         </inputExcludes>
     </configuration>
   </plugin>
diff --git 
a/tika-integration-tests/tika-pipes-elasticsearch-integration-tests/src/test/java/org/apache/tika/pipes/elasticsearch/tests/ElasticsearchTest.java
 
b/tika-integration-tests/tika-pipes-elasticsearch-integration-tests/src/test/java/org/apache/tika/pipes/elasticsearch/tests/ElasticsearchTest.java
index 5bcbacef1f..44f13658bd 100644
--- 
a/tika-integration-tests/tika-pipes-elasticsearch-integration-tests/src/test/java/org/apache/tika/pipes/elasticsearch/tests/ElasticsearchTest.java
+++ 
b/tika-integration-tests/tika-pipes-elasticsearch-integration-tests/src/test/java/org/apache/tika/pipes/elasticsearch/tests/ElasticsearchTest.java
@@ -16,8 +16,9 @@
  */
 package org.apache.tika.pipes.elasticsearch.tests;
 
-import static 
org.apache.tika.pipes.emitter.elasticsearch.ElasticsearchEmitter.DEFAULT_EMBEDDED_FILE_FIELD_NAME;
+import static 
org.apache.tika.pipes.emitter.es.ESEmitter.DEFAULT_EMBEDDED_FILE_FIELD_NAME;
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
@@ -28,8 +29,11 @@ import java.nio.charset.StandardCharsets;
 import java.nio.file.Files;
 import java.nio.file.Path;
 import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
@@ -57,9 +61,9 @@ import org.apache.tika.parser.ParseContext;
 import org.apache.tika.pipes.api.ParseMode;
 import org.apache.tika.pipes.api.emitter.Emitter;
 import org.apache.tika.pipes.core.emitter.EmitterManager;
-import org.apache.tika.pipes.emitter.elasticsearch.ElasticsearchEmitterConfig;
-import org.apache.tika.pipes.emitter.elasticsearch.HttpClientConfig;
-import org.apache.tika.pipes.emitter.elasticsearch.JsonResponse;
+import org.apache.tika.pipes.emitter.es.ESEmitterConfig;
+import org.apache.tika.pipes.emitter.es.HttpClientConfig;
+import org.apache.tika.pipes.emitter.es.JsonResponse;
 import org.apache.tika.plugins.TikaPluginManager;
 
 /**
@@ -75,7 +79,7 @@ public class ElasticsearchTest {
 
     private static final DockerImageName ES_IMAGE =
             DockerImageName.parse(
-                    "docker.elastic.co/elasticsearch/elasticsearch:8.17.0");
+                    "docker.elastic.co/elasticsearch/elasticsearch:9.3.0");
 
     private static GenericContainer<?> CONTAINER;
 
@@ -123,9 +127,8 @@ public class ElasticsearchTest {
                 "elasticsearch-mappings.json");
 
         runPipes(client,
-                ElasticsearchEmitterConfig.AttachmentStrategy
-                        .SEPARATE_DOCUMENTS,
-                ElasticsearchEmitterConfig.UpdateStrategy.UPSERT,
+                ESEmitterConfig.AttachmentStrategy.SEPARATE_DOCUMENTS,
+                ESEmitterConfig.UpdateStrategy.UPSERT,
                 ParseMode.CONCATENATE, endpoint,
                 pipesDirectory, testDocDirectory);
 
@@ -149,6 +152,36 @@ public class ElasticsearchTest {
         assertEquals(numHtmlDocs + numTestDocs,
                 results.getJson().get("hits").get("total").get("value")
                         .asInt());
+
+        // validate reporter fields
+        Map<String, Integer> statusCounts = new HashMap<>();
+        for (JsonNode n : results.getJson().get("hits").get("hits")) {
+            String status = 
n.get("_source").get("my_test_parse_status").asText();
+            long parseTimeMs = 
n.get("_source").get("my_test_parse_time_ms").asLong();
+            Integer cnt = statusCounts.get(status);
+            statusCounts.put(status, cnt == null ? 1 : cnt + 1);
+        }
+        assertEquals(numHtmlDocs, (int) statusCounts.get("PARSE_SUCCESS"),
+                "should have had " + numHtmlDocs + " parse successes: " + 
statusCounts);
+        assertEquals(1, (int) statusCounts.get("PARSE_SUCCESS_WITH_EXCEPTION"),
+                "should have had 1 parse exception: " + statusCounts);
+        assertEquals(1, (int) statusCounts.get("EMIT_SUCCESS"),
+                "should have had 1 emit success: " + statusCounts);
+        assertEquals(2, numberOfCrashes(statusCounts),
+                "should have had 2 OOM or 1 OOM and 1 timeout: " + 
statusCounts);
+    }
+
+    private int numberOfCrashes(Map<String, Integer> statusCounts) {
+        Integer oom = statusCounts.get("OOM");
+        Integer timeout = statusCounts.get("TIMEOUT");
+        int sum = 0;
+        if (oom != null) {
+            sum += oom;
+        }
+        if (timeout != null) {
+            sum += timeout;
+        }
+        return sum;
     }
 
     @Test
@@ -165,8 +198,8 @@ public class ElasticsearchTest {
                 "elasticsearch-parent-child-mappings.json");
 
         runPipes(client,
-                ElasticsearchEmitterConfig.AttachmentStrategy.PARENT_CHILD,
-                ElasticsearchEmitterConfig.UpdateStrategy.OVERWRITE,
+                ESEmitterConfig.AttachmentStrategy.PARENT_CHILD,
+                ESEmitterConfig.UpdateStrategy.OVERWRITE,
                 ParseMode.RMETA, endpoint,
                 pipesDirectory, testDocDirectory);
 
@@ -241,9 +274,8 @@ public class ElasticsearchTest {
                 "elasticsearch-mappings.json");
 
         runPipes(client,
-                ElasticsearchEmitterConfig.AttachmentStrategy
-                        .SEPARATE_DOCUMENTS,
-                ElasticsearchEmitterConfig.UpdateStrategy.OVERWRITE,
+                ESEmitterConfig.AttachmentStrategy.SEPARATE_DOCUMENTS,
+                ESEmitterConfig.UpdateStrategy.OVERWRITE,
                 ParseMode.RMETA, endpoint,
                 pipesDirectory, testDocDirectory);
 
@@ -323,9 +355,8 @@ public class ElasticsearchTest {
                 "elasticsearch-mappings.json");
 
         runPipes(client,
-                ElasticsearchEmitterConfig.AttachmentStrategy
-                        .SEPARATE_DOCUMENTS,
-                ElasticsearchEmitterConfig.UpdateStrategy.UPSERT,
+                ESEmitterConfig.AttachmentStrategy.SEPARATE_DOCUMENTS,
+                ESEmitterConfig.UpdateStrategy.UPSERT,
                 ParseMode.RMETA, endpoint,
                 pipesDirectory, testDocDirectory);
 
@@ -352,9 +383,8 @@ public class ElasticsearchTest {
         sendMappings(client, endpoint, TEST_INDEX,
                 "elasticsearch-mappings.json");
         Path pluginsConfigFile = getPluginsConfig(pipesDirectory,
-                ElasticsearchEmitterConfig.AttachmentStrategy
-                        .SEPARATE_DOCUMENTS,
-                ElasticsearchEmitterConfig.UpdateStrategy.UPSERT,
+                ESEmitterConfig.AttachmentStrategy.SEPARATE_DOCUMENTS,
+                ESEmitterConfig.UpdateStrategy.UPSERT,
                 ParseMode.RMETA, endpoint, testDocDirectory);
 
         TikaJsonConfig tikaJsonConfig =
@@ -392,28 +422,167 @@ public class ElasticsearchTest {
                 doc1.get("content").asText());
     }
 
+    /**
+     * Verifies that documents with {@code dense_vector} fields are correctly
+     * ranked by kNN — no inference engine required, vectors are mocked 
directly.
+     *
+     * <p>ES 9.3 introduced native support for big-endian base64 float32 
strings
+     * as an alternative to JSON float arrays for {@code dense_vector} 
ingestion,
+     * reducing per-element numeric parsing overhead by an order of magnitude.
+     * When the running ES instance is ≥ 9.3, this test uses the emitter with
+     * base64-encoded vectors (validating our encoding end-to-end). For older
+     * instances it falls back to indexing float arrays directly via the bulk
+     * REST API.
+     *
+     * <p>Three documents are inserted in a randomized order and the test 
asserts
+     * that kNN always returns them in cosine-distance order, confirming both
+     * correct encoding and correct ranking.
+     *
+     * <p>Distances from query [1, 0, 0]:
+     * <ul>
+     *   <li>"near" [0.99, 0.14, 0] — cosine ≈ 0.990 (~8°)</li>
+     *   <li>"mid"  [1.0,  1.0,  0] — cosine ≈ 0.707 (45°)</li>
+     *   <li>"far"  [0.0,  1.0,  0] — cosine = 0.000 (90°)</li>
+     * </ul>
+     */
+    @Test
+    public void testVectorSearch(
+            @TempDir Path pipesDirectory,
+            @TempDir Path testDocDirectory) throws Exception {
+
+        ElasticsearchTestClient client = getNewClient();
+        String endpoint = getEndpoint();
+        sendMappings(client, endpoint, TEST_INDEX,
+                "elasticsearch-vector-mappings.json");
+
+        // Each entry is [docId, vector]. Distances from query [1, 0, 0]
+        // are strictly ordered: near > mid > far, so ranking is unambiguous.
+        List<Object[]> docs = new ArrayList<>(Arrays.asList(
+                new Object[]{"near", new float[]{0.99f, 0.14f, 0.0f}},
+                new Object[]{"mid",  new float[]{1.0f,  1.0f,  0.0f}},
+                new Object[]{"far",  new float[]{0.0f,  1.0f,  0.0f}}
+        ));
+
+        // Randomize insertion order — ranking must be independent of it.
+        Collections.shuffle(docs);
+
+        if (esVersionAtLeast(client, 9, 3)) {
+            // ES 9.3+: use the emitter; metadata string values flow through as
+            // JSON strings and ES decodes the base64 float32 encoding 
natively.
+            Path pluginsConfigFile = getPluginsConfig(pipesDirectory,
+                    ESEmitterConfig.AttachmentStrategy.SEPARATE_DOCUMENTS,
+                    ESEmitterConfig.UpdateStrategy.UPSERT,
+                    ParseMode.RMETA, endpoint, testDocDirectory);
+            TikaJsonConfig tikaJsonConfig = 
TikaJsonConfig.load(pluginsConfigFile);
+            Emitter emitter = EmitterManager
+                    .load(TikaPluginManager.load(tikaJsonConfig), 
tikaJsonConfig)
+                    .getEmitter();
+
+            for (Object[] doc : docs) {
+                String id = (String) doc[0];
+                float[] vec = (float[]) doc[1];
+                Metadata metadata = new Metadata();
+                metadata.set("content", id + " document");
+                metadata.set("vector", base64Vector(vec));
+                emitter.emit(id, Collections.singletonList(metadata),
+                        new ParseContext());
+            }
+        } else {
+            // Older ES: fall back to direct bulk indexing with JSON float 
arrays.
+            StringBuilder bulk = new StringBuilder();
+            for (Object[] doc : docs) {
+                String id = (String) doc[0];
+                float[] vec = (float[]) doc[1];
+                
bulk.append("{\"index\":{\"_id\":\"").append(id).append("\"}}\n");
+                bulk.append("{\"content\":\"").append(id)
+                        .append(" 
document\",\"vector\":").append(floatArrayJson(vec))
+                        .append("}\n");
+            }
+            JsonResponse bulkResp =
+                    client.postJson(endpoint + "/_bulk", bulk.toString());
+            assertEquals(200, bulkResp.getStatus(),
+                    "bulk index failed: " + bulkResp.getMsg());
+            assertFalse(bulkResp.getJson().get("errors").asBoolean(),
+                    "bulk index had errors: " + bulkResp.getJson());
+        }
+
+        client.getJson(endpoint + "/_refresh");
+
+        String knn = "{ \"knn\": { \"field\": \"vector\", " +
+                "\"query_vector\": [1.0, 0.0, 0.0], " +
+                "\"k\": 3, \"num_candidates\": 100 } }";
+        JsonResponse results = client.postJson(endpoint + "/_search", knn);
+        assertEquals(200, results.getStatus(),
+                "kNN search failed: " + results.getMsg());
+
+        JsonNode hits = results.getJson().get("hits").get("hits");
+        assertEquals(3, hits.size(), "expected all 3 docs in kNN results");
+        assertEquals("near", hits.get(0).get("_id").asText(),
+                "nearest doc should rank first");
+        assertEquals("mid",  hits.get(1).get("_id").asText(),
+                "mid doc should rank second");
+        assertEquals("far",  hits.get(2).get("_id").asText(),
+                "farthest doc should rank last");
+    }
+
+    /**
+     * Returns true if the running ES instance version is at least
+     * {@code major}.{@code minor}.
+     */
+    private boolean esVersionAtLeast(ElasticsearchTestClient client,
+                                     int major, int minor) throws Exception {
+        JsonResponse resp = client.getJson(getBaseUrl() + "/");
+        String version = resp.getJson().get("version").get("number").asText();
+        String[] parts = version.split("\\.");
+        int maj = Integer.parseInt(parts[0]);
+        int min = Integer.parseInt(parts[1]);
+        return maj > major || (maj == major && min >= minor);
+    }
+
+    /**
+     * Encodes a float vector as big-endian base64, matching the format
+     * accepted by ES 9.3+ for {@code dense_vector} fields.
+     */
+    private static String base64Vector(float... floats) {
+        java.nio.ByteBuffer buf = java.nio.ByteBuffer.allocate(floats.length * 
Float.BYTES)
+                .order(java.nio.ByteOrder.BIG_ENDIAN);
+        buf.asFloatBuffer().put(floats);
+        return java.util.Base64.getEncoder().encodeToString(buf.array());
+    }
+
+    /** Formats a float array as a JSON number array, e.g. {@code 
[0.99,0.14,0.0]}. */
+    private static String floatArrayJson(float[] vec) {
+        StringBuilder sb = new StringBuilder("[");
+        for (int i = 0; i < vec.length; i++) {
+            if (i > 0) sb.append(',');
+            sb.append(vec[i]);
+        }
+        return sb.append(']').toString();
+    }
+
     // -----------------------------------------------------------------
     // Helpers
     // -----------------------------------------------------------------
 
+    private String getBaseUrl() {
+        return "http://"; + CONTAINER.getHost() + ":" + 
CONTAINER.getMappedPort(9200);
+    }
+
     private String getEndpoint() {
-        return "http://"; + CONTAINER.getHost() + ":" +
-                CONTAINER.getMappedPort(9200) + "/" + TEST_INDEX;
+        return getBaseUrl() + "/" + TEST_INDEX;
     }
 
     private ElasticsearchTestClient getNewClient()
             throws TikaConfigException {
         HttpClientFactory httpClientFactory = new HttpClientFactory();
-        ElasticsearchEmitterConfig config =
-                new ElasticsearchEmitterConfig(
+        ESEmitterConfig config =
+                new ESEmitterConfig(
                         getEndpoint(), "_id",
-                        ElasticsearchEmitterConfig.AttachmentStrategy
-                                .SEPARATE_DOCUMENTS,
-                        ElasticsearchEmitterConfig.UpdateStrategy
-                                .OVERWRITE,
+                        ESEmitterConfig.AttachmentStrategy.SEPARATE_DOCUMENTS,
+                        ESEmitterConfig.UpdateStrategy.OVERWRITE,
                         10, DEFAULT_EMBEDDED_FILE_FIELD_NAME, null,
                         new HttpClientConfig(null, null, null,
-                                -1, -1, null, -1));
+                                -1, -1, null, 0, false));
         return new ElasticsearchTestClient(config,
                 httpClientFactory.build());
     }
@@ -446,8 +615,8 @@ public class ElasticsearchTest {
 
     private void runPipes(
             ElasticsearchTestClient client,
-            ElasticsearchEmitterConfig.AttachmentStrategy attachStrat,
-            ElasticsearchEmitterConfig.UpdateStrategy updateStrat,
+            ESEmitterConfig.AttachmentStrategy attachStrat,
+            ESEmitterConfig.UpdateStrategy updateStrat,
             ParseMode parseMode, String endpoint,
             Path pipesDirectory, Path testDocDirectory) throws Exception {
 
@@ -466,8 +635,8 @@ public class ElasticsearchTest {
     @NotNull
     private Path getPluginsConfig(
             Path pipesDirectory,
-            ElasticsearchEmitterConfig.AttachmentStrategy attachStrat,
-            ElasticsearchEmitterConfig.UpdateStrategy updateStrat,
+            ESEmitterConfig.AttachmentStrategy attachStrat,
+            ESEmitterConfig.UpdateStrategy updateStrat,
             ParseMode parseMode, String endpoint,
             Path testDocDirectory) throws IOException {
 
@@ -480,9 +649,7 @@ public class ElasticsearchTest {
             Files.copy(is, log4jPropFile);
         }
 
-        boolean includeRouting = (attachStrat ==
-                ElasticsearchEmitterConfig.AttachmentStrategy
-                        .PARENT_CHILD);
+        boolean includeRouting = (attachStrat == 
ESEmitterConfig.AttachmentStrategy.PARENT_CHILD);
 
         Map<String, Object> replacements = new HashMap<>();
         replacements.put("ATTACHMENT_STRATEGY",
diff --git 
a/tika-integration-tests/tika-pipes-elasticsearch-integration-tests/src/test/java/org/apache/tika/pipes/elasticsearch/tests/ElasticsearchTestClient.java
 
b/tika-integration-tests/tika-pipes-elasticsearch-integration-tests/src/test/java/org/apache/tika/pipes/elasticsearch/tests/ElasticsearchTestClient.java
index 3808609924..387e6f6164 100644
--- 
a/tika-integration-tests/tika-pipes-elasticsearch-integration-tests/src/test/java/org/apache/tika/pipes/elasticsearch/tests/ElasticsearchTestClient.java
+++ 
b/tika-integration-tests/tika-pipes-elasticsearch-integration-tests/src/test/java/org/apache/tika/pipes/elasticsearch/tests/ElasticsearchTestClient.java
@@ -33,18 +33,16 @@ import org.apache.http.client.methods.HttpPut;
 import org.apache.http.entity.ByteArrayEntity;
 import org.apache.http.util.EntityUtils;
 
-import org.apache.tika.pipes.emitter.elasticsearch.ElasticsearchClient;
-import org.apache.tika.pipes.emitter.elasticsearch.ElasticsearchEmitterConfig;
-import org.apache.tika.pipes.emitter.elasticsearch.JsonResponse;
+import org.apache.tika.pipes.emitter.es.ESClient;
+import org.apache.tika.pipes.emitter.es.ESEmitterConfig;
+import org.apache.tika.pipes.emitter.es.JsonResponse;
 
 /**
- * Extends ElasticsearchClient with GET, PUT, and DELETE for integration
- * testing purposes.
+ * Extends ESClient with GET, PUT, and DELETE for integration testing purposes.
  */
-public class ElasticsearchTestClient extends ElasticsearchClient {
+public class ElasticsearchTestClient extends ESClient {
 
-    public ElasticsearchTestClient(ElasticsearchEmitterConfig config,
-                                   HttpClient httpClient) {
+    public ElasticsearchTestClient(ESEmitterConfig config, HttpClient 
httpClient) {
         super(config, httpClient);
     }
 
@@ -54,8 +52,7 @@ public class ElasticsearchTestClient extends 
ElasticsearchClient {
                 new ByteArrayEntity(json.getBytes(StandardCharsets.UTF_8));
         httpRequest.setEntity(entity);
         httpRequest.setHeader("Accept", "application/json");
-        httpRequest.setHeader("Content-type",
-                "application/json; charset=utf-8");
+        httpRequest.setHeader("Content-type", "application/json; 
charset=utf-8");
 
         HttpResponse response = null;
         try {
@@ -72,9 +69,7 @@ public class ElasticsearchTestClient extends 
ElasticsearchClient {
                 }
             } else {
                 return new JsonResponse(status,
-                        new String(
-                                EntityUtils.toByteArray(
-                                        response.getEntity()),
+                        new 
String(EntityUtils.toByteArray(response.getEntity()),
                                 StandardCharsets.UTF_8));
             }
         } finally {
@@ -88,8 +83,7 @@ public class ElasticsearchTestClient extends 
ElasticsearchClient {
     public JsonResponse getJson(String url) throws IOException {
         HttpGet httpRequest = new HttpGet(url);
         httpRequest.setHeader("Accept", "application/json");
-        httpRequest.setHeader("Content-type",
-                "application/json; charset=utf-8");
+        httpRequest.setHeader("Content-type", "application/json; 
charset=utf-8");
 
         HttpResponse response = null;
         try {
@@ -106,9 +100,7 @@ public class ElasticsearchTestClient extends 
ElasticsearchClient {
                 }
             } else {
                 return new JsonResponse(status,
-                        new String(
-                                EntityUtils.toByteArray(
-                                        response.getEntity()),
+                        new 
String(EntityUtils.toByteArray(response.getEntity()),
                                 StandardCharsets.UTF_8));
             }
         } finally {
@@ -136,9 +128,7 @@ public class ElasticsearchTestClient extends 
ElasticsearchClient {
                 }
             } else {
                 return new JsonResponse(status,
-                        new String(
-                                EntityUtils.toByteArray(
-                                        response.getEntity()),
+                        new 
String(EntityUtils.toByteArray(response.getEntity()),
                                 StandardCharsets.UTF_8));
             }
         } finally {
diff --git 
a/tika-integration-tests/tika-pipes-elasticsearch-integration-tests/src/test/resources/elasticsearch/elasticsearch-vector-mappings.json
 
b/tika-integration-tests/tika-pipes-elasticsearch-integration-tests/src/test/resources/elasticsearch/elasticsearch-vector-mappings.json
new file mode 100644
index 0000000000..a52187f892
--- /dev/null
+++ 
b/tika-integration-tests/tika-pipes-elasticsearch-integration-tests/src/test/resources/elasticsearch/elasticsearch-vector-mappings.json
@@ -0,0 +1,17 @@
+{
+  "settings": {
+    "number_of_shards": 1
+  },
+  "mappings": {
+    "properties": {
+      "content": { "type": "text" },
+      "vector": {
+        "type": "dense_vector",
+        "dims": 3,
+        "index": true,
+        "similarity": "cosine",
+        "index_options": { "type": "flat" }
+      }
+    }
+  }
+}
diff --git 
a/tika-integration-tests/tika-pipes-elasticsearch-integration-tests/src/test/resources/elasticsearch/plugins-template.json
 
b/tika-integration-tests/tika-pipes-elasticsearch-integration-tests/src/test/resources/elasticsearch/plugins-template.json
index 2975b9d85f..c5ae829b5c 100644
--- 
a/tika-integration-tests/tika-pipes-elasticsearch-integration-tests/src/test/resources/elasticsearch/plugins-template.json
+++ 
b/tika-integration-tests/tika-pipes-elasticsearch-integration-tests/src/test/resources/elasticsearch/plugins-template.json
@@ -15,8 +15,8 @@
   },
   "emitters": {
     "ese": {
-      "elasticsearch-emitter": {
-        "elasticsearchUrl": "ELASTICSEARCH_URL",
+      "es-emitter": {
+        "esUrl": "ELASTICSEARCH_URL",
         "updateStrategy": "UPDATE_STRATEGY",
         "attachmentStrategy": "ATTACHMENT_STRATEGY",
         "commitWithin": 10,
@@ -30,6 +30,18 @@
       }
     }
   },
+  "pipes-reporters": {
+    "es-pipes-reporter": {
+      "esUrl": "ELASTICSEARCH_URL",
+      "keyPrefix": "my_test_",
+      "includeRouting": "INCLUDE_ROUTING",
+      "httpClientConfig": {
+        "authScheme": "http",
+        "connectionTimeout": 60,
+        "socketTimeout": 60
+      }
+    }
+  },
   "pipes-iterator": {
     "file-system-pipes-iterator": {
       "basePath": "FETCHER_BASE_PATH",
diff --git 
a/tika-parsers/tika-parsers-ml/tika-inference/src/main/java/org/apache/tika/inference/VectorSerializer.java
 
b/tika-parsers/tika-parsers-ml/tika-inference/src/main/java/org/apache/tika/inference/VectorSerializer.java
index f350fee123..6c6500c698 100644
--- 
a/tika-parsers/tika-parsers-ml/tika-inference/src/main/java/org/apache/tika/inference/VectorSerializer.java
+++ 
b/tika-parsers/tika-parsers-ml/tika-inference/src/main/java/org/apache/tika/inference/VectorSerializer.java
@@ -22,9 +22,13 @@ import java.nio.FloatBuffer;
 import java.util.Base64;
 
 /**
- * Serializes and deserializes float vectors as base64-encoded little-endian
- * float32 byte arrays. Little-endian matches numpy/PyTorch convention so
- * vectors from Python inference servers round-trip cleanly.
+ * Serializes and deserializes float vectors as base64-encoded big-endian
+ * float32 byte arrays.
+ *
+ * <p>Big-endian matches the format expected by Elasticsearch's
+ * {@code dense_vector} field type, which accepts either a JSON float array
+ * or a base64-encoded binary string in big-endian float32 order.
+ * See the Elasticsearch dense_vector mapping documentation for details.
  */
 public final class VectorSerializer {
 
@@ -32,22 +36,22 @@ public final class VectorSerializer {
     }
 
     /**
-     * Encode a float array as a base64 string (little-endian float32).
+     * Encode a float array as a base64 string (big-endian float32).
      */
     public static String encode(float[] vector) {
         ByteBuffer buf = ByteBuffer.allocate(vector.length * Float.BYTES)
-                .order(ByteOrder.LITTLE_ENDIAN);
+                .order(ByteOrder.BIG_ENDIAN);
         buf.asFloatBuffer().put(vector);
         return Base64.getEncoder().encodeToString(buf.array());
     }
 
     /**
-     * Decode a base64 string back to a float array (little-endian float32).
+     * Decode a base64 string back to a float array (big-endian float32).
      */
     public static float[] decode(String base64) {
         byte[] bytes = Base64.getDecoder().decode(base64);
         FloatBuffer fb = ByteBuffer.wrap(bytes)
-                .order(ByteOrder.LITTLE_ENDIAN)
+                .order(ByteOrder.BIG_ENDIAN)
                 .asFloatBuffer();
         float[] vector = new float[fb.remaining()];
         fb.get(vector);
diff --git 
a/tika-parsers/tika-parsers-ml/tika-inference/src/test/java/org/apache/tika/inference/VectorSerializerTest.java
 
b/tika-parsers/tika-parsers-ml/tika-inference/src/test/java/org/apache/tika/inference/VectorSerializerTest.java
index b3fd5d22b5..bee9db1e54 100644
--- 
a/tika-parsers/tika-parsers-ml/tika-inference/src/test/java/org/apache/tika/inference/VectorSerializerTest.java
+++ 
b/tika-parsers/tika-parsers-ml/tika-inference/src/test/java/org/apache/tika/inference/VectorSerializerTest.java
@@ -40,6 +40,19 @@ public class VectorSerializerTest {
         assertEquals(0, decoded.length);
     }
 
+    /**
+     * Pins the byte order to big-endian using the exact example from the
+     * Elasticsearch dense_vector documentation: [0.5, 10, 6] encodes to
+     * "PwAAAEEgAABAwAAA". If this test fails the byte order has been changed
+     * and ES indexing of vectors will silently produce wrong results.
+     */
+    @Test
+    void testKnownElasticsearchBase64() {
+        float[] vec = {0.5f, 10.0f, 6.0f};
+        assertEquals("PwAAAEEgAABAwAAA", VectorSerializer.encode(vec));
+        assertArrayEquals(vec, VectorSerializer.decode("PwAAAEEgAABAwAAA"), 
1e-6f);
+    }
+
     @Test
     void testLargeVector() {
         float[] large = new float[768];
diff --git 
a/tika-pipes/tika-httpclient-commons/src/main/java/org/apache/tika/client/HttpClientFactory.java
 
b/tika-pipes/tika-httpclient-commons/src/main/java/org/apache/tika/client/HttpClientFactory.java
index 4919c17aee..296323a90b 100644
--- 
a/tika-pipes/tika-httpclient-commons/src/main/java/org/apache/tika/client/HttpClientFactory.java
+++ 
b/tika-pipes/tika-httpclient-commons/src/main/java/org/apache/tika/client/HttpClientFactory.java
@@ -109,6 +109,7 @@ public class HttpClientFactory {
     private String authScheme = "basic"; //ntlm or basic
     private boolean credentialsAESEncrypted = false;
     private boolean disableContentCompression = false;
+    private boolean verifySsl = false;
 
     public String getProxyHost() {
         return proxyHost;
@@ -237,6 +238,10 @@ public class HttpClientFactory {
         this.disableContentCompression = disableContentCompression;
     }
 
+    public void setVerifySsl(boolean verifySsl) {
+        this.verifySsl = verifySsl;
+    }
+
     public HttpClientFactory copy() throws TikaConfigException {
         HttpClientFactory cp = new HttpClientFactory();
         cp.setAllowedHostsForRedirect(new HashSet<>(allowedHostsForRedirect));
@@ -249,28 +254,36 @@ public class HttpClientFactory {
         cp.setMaxConnections(maxConnections);
         cp.setNtDomain(ntDomain);
         cp.setPassword(password);
+        cp.setUserName(userName);
         cp.setProxyHost(proxyHost);
         cp.setProxyPort(proxyPort);
         cp.setRequestTimeout(requestTimeout);
         cp.setSocketTimeout(socketTimeout);
+        cp.setVerifySsl(verifySsl);
         return cp;
     }
 
 
     public HttpClient build() throws TikaConfigException {
-        LOG.info("http client does not verify ssl at this point.  " +
-                "If you need that, please open a ticket.");
-        TrustStrategy acceptingTrustStrategy = (cert, authType) -> true;
         SSLContext sslContext = null;
-        try {
-            sslContext =
-                    SSLContexts.custom().loadTrustMaterial(
-                            null, acceptingTrustStrategy).build();
-        } catch (NoSuchAlgorithmException | KeyManagementException | 
KeyStoreException e) {
-            throw new TikaConfigException("", e);
+        SSLConnectionSocketFactory sslsf;
+        if (verifySsl) {
+            sslContext = SSLContexts.createDefault();
+            sslsf = new SSLConnectionSocketFactory(sslContext,
+                    SSLConnectionSocketFactory.getDefaultHostnameVerifier());
+        } else {
+            LOG.info("http client does not verify ssl at this point.  " +
+                    "If you need that, please open a ticket.");
+            TrustStrategy acceptingTrustStrategy = (cert, authType) -> true;
+            try {
+                sslContext =
+                        SSLContexts.custom().loadTrustMaterial(
+                                null, acceptingTrustStrategy).build();
+            } catch (NoSuchAlgorithmException | KeyManagementException | 
KeyStoreException e) {
+                throw new TikaConfigException("", e);
+            }
+            sslsf = new SSLConnectionSocketFactory(sslContext, 
NoopHostnameVerifier.INSTANCE);
         }
-        SSLConnectionSocketFactory sslsf =
-                new SSLConnectionSocketFactory(sslContext, 
NoopHostnameVerifier.INSTANCE);
 
         Registry<ConnectionSocketFactory> socketFactoryRegistry =
                 
RegistryBuilder.<ConnectionSocketFactory>create().register("https", sslsf)
@@ -294,7 +307,10 @@ public class HttpClientFactory {
                         .setConnectionRequestTimeout(requestTimeout)
                         
.setConnectionRequestTimeout(connectTimeout).setSocketTimeout(socketTimeout)
                         .build()).setKeepAliveStrategy(getKeepAliveStrategy())
-                
.setSSLSocketFactory(sslsf).setSSLHostnameVerifier(NoopHostnameVerifier.INSTANCE)
+                .setSSLSocketFactory(sslsf)
+                .setSSLHostnameVerifier(verifySsl
+                        ? 
SSLConnectionSocketFactory.getDefaultHostnameVerifier()
+                        : NoopHostnameVerifier.INSTANCE)
                 .build();
     }
 
diff --git a/tika-pipes/tika-pipes-plugins/tika-pipes-elasticsearch/pom.xml 
b/tika-pipes/tika-pipes-plugins/tika-pipes-elasticsearch/pom.xml
index 4995637318..c621edf54d 100644
--- a/tika-pipes/tika-pipes-plugins/tika-pipes-elasticsearch/pom.xml
+++ b/tika-pipes/tika-pipes-plugins/tika-pipes-elasticsearch/pom.xml
@@ -51,6 +51,11 @@
       <artifactId>tika-httpclient-commons</artifactId>
       <version>${project.version}</version>
     </dependency>
+    <dependency>
+      <groupId>${project.groupId}</groupId>
+      <artifactId>tika-pipes-reporter-commons</artifactId>
+      <version>${project.version}</version>
+    </dependency>
     <dependency>
       <groupId>org.eclipse.jetty</groupId>
       <artifactId>jetty-io</artifactId>
@@ -88,11 +93,11 @@
         <configuration>
           <archive>
             <manifestEntries>
-              <Plugin-Id>elasticsearch-emitter</Plugin-Id>
+              <Plugin-Id>es-emitter</Plugin-Id>
               <Plugin-Version>${project.version}</Plugin-Version>
-              
<Plugin-Class>org.apache.tika.pipes.plugin.elasticsearch.ElasticsearchPipesPlugin</Plugin-Class>
-              <Plugin-Provider>Elasticsearch Emitter</Plugin-Provider>
-              <Plugin-Description>Elasticsearch emitter (plain HTTP, no ES 
client)</Plugin-Description>
+              
<Plugin-Class>org.apache.tika.pipes.plugin.es.ESPipesPlugin</Plugin-Class>
+              <Plugin-Provider>ES Emitter</Plugin-Provider>
+              <Plugin-Description>ES emitter (plain HTTP, no ES 
client)</Plugin-Description>
               <Plugin-Dependencies></Plugin-Dependencies>
             </manifestEntries>
           </archive>
diff --git 
a/tika-pipes/tika-pipes-plugins/tika-pipes-elasticsearch/src/main/java/org/apache/tika/pipes/emitter/elasticsearch/ElasticsearchClient.java
 
b/tika-pipes/tika-pipes-plugins/tika-pipes-elasticsearch/src/main/java/org/apache/tika/pipes/emitter/es/ESClient.java
similarity index 67%
rename from 
tika-pipes/tika-pipes-plugins/tika-pipes-elasticsearch/src/main/java/org/apache/tika/pipes/emitter/elasticsearch/ElasticsearchClient.java
rename to 
tika-pipes/tika-pipes-plugins/tika-pipes-elasticsearch/src/main/java/org/apache/tika/pipes/emitter/es/ESClient.java
index 919c12ab9b..81965594d9 100644
--- 
a/tika-pipes/tika-pipes-plugins/tika-pipes-elasticsearch/src/main/java/org/apache/tika/pipes/emitter/elasticsearch/ElasticsearchClient.java
+++ 
b/tika-pipes/tika-pipes-plugins/tika-pipes-elasticsearch/src/main/java/org/apache/tika/pipes/emitter/es/ESClient.java
@@ -14,7 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.tika.pipes.emitter.elasticsearch;
+package org.apache.tika.pipes.emitter.es;
 
 import java.io.BufferedReader;
 import java.io.IOException;
@@ -28,6 +28,7 @@ import java.util.UUID;
 
 import com.fasterxml.jackson.core.JsonFactory;
 import com.fasterxml.jackson.core.JsonGenerator;
+import com.fasterxml.jackson.core.JsonParser;
 import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import org.apache.http.HttpResponse;
@@ -45,33 +46,29 @@ import org.apache.tika.pipes.api.emitter.EmitData;
 import org.apache.tika.utils.StringUtils;
 
 /**
- * Plain HTTP client for Elasticsearch's REST API.
+ * Plain HTTP client for the ES REST API.
  *
- * <p>This does <b>not</b> use the Elasticsearch Java client library
+ * <p>This does <b>not</b> use the ES Java client library
  * (which is SSPL / Elastic License). Instead it talks directly to
- * Elasticsearch's {@code _bulk} REST endpoint using Apache HttpClient
- * (ASL v2).
+ * the {@code _bulk} REST endpoint using Apache HttpClient (ASL v2).
  *
  * <p>Supports API key authentication ({@code Authorization: ApiKey ...})
  * as well as basic auth via the underlying {@link HttpClient}.
  */
-public class ElasticsearchClient {
+public class ESClient {
 
-    private static final Logger LOG =
-            LoggerFactory.getLogger(ElasticsearchClient.class);
+    private static final Logger LOG = LoggerFactory.getLogger(ESClient.class);
 
     protected final HttpClient httpClient;
 
     private final MetadataToJsonWriter metadataToJsonWriter;
-    private final ElasticsearchEmitterConfig config;
+    private final ESEmitterConfig config;
 
-    protected ElasticsearchClient(ElasticsearchEmitterConfig config,
-                                  HttpClient httpClient) {
+    protected ESClient(ESEmitterConfig config, HttpClient httpClient) {
         this.config = config;
         this.httpClient = httpClient;
         this.metadataToJsonWriter =
-                (config.updateStrategy() ==
-                        ElasticsearchEmitterConfig.UpdateStrategy.OVERWRITE)
+                (config.updateStrategy() == 
ESEmitterConfig.UpdateStrategy.OVERWRITE)
                         ? new InsertMetadataToJsonWriter()
                         : new UpsertMetadataToJsonWriter();
     }
@@ -94,16 +91,14 @@ public class ElasticsearchClient {
 
     private void emitJson(StringBuilder json)
             throws IOException, TikaClientException {
-        String requestUrl = config.elasticsearchUrl() + "/_bulk";
+        String requestUrl = config.esUrl() + "/_bulk";
         JsonResponse response = postJson(requestUrl, json.toString());
         if (response.getStatus() != 200) {
             throw new TikaClientException(response.getMsg());
         } else {
-            // If there's a single error in the bulk response, throw
             JsonNode errorNode = response.getJson().get("errors");
             if (errorNode != null && errorNode.asText().equals("true")) {
-                throw new TikaClientException(
-                        response.getJson().toString());
+                throw new TikaClientException(response.getJson().toString());
             }
         }
     }
@@ -113,16 +108,15 @@ public class ElasticsearchClient {
         int i = 0;
         String routing =
                 (config.attachmentStrategy() ==
-                        ElasticsearchEmitterConfig.AttachmentStrategy
-                                .PARENT_CHILD) ? emitKey : null;
+                        ESEmitterConfig.AttachmentStrategy.PARENT_CHILD)
+                        ? emitKey : null;
 
         for (Metadata metadata : metadataList) {
             StringBuilder id = new StringBuilder(emitKey);
             if (i > 0) {
                 id.append("-").append(UUID.randomUUID());
             }
-            String indexJson =
-                    metadataToJsonWriter.getBulkJson(id.toString(), routing);
+            String indexJson = metadataToJsonWriter.getBulkJson(id.toString(), 
routing);
             json.append(indexJson).append("\n");
             if (i == 0) {
                 json.append(metadataToJsonWriter.writeContainer(
@@ -140,7 +134,7 @@ public class ElasticsearchClient {
     // Package-private for testing
     static String metadataToJsonContainerInsert(
             Metadata metadata,
-            ElasticsearchEmitterConfig.AttachmentStrategy attachmentStrategy)
+            ESEmitterConfig.AttachmentStrategy attachmentStrategy)
             throws IOException {
         return new InsertMetadataToJsonWriter().writeContainer(
                 metadata, attachmentStrategy);
@@ -149,27 +143,22 @@ public class ElasticsearchClient {
     // Package-private for testing
     static String metadataToJsonEmbeddedInsert(
             Metadata metadata,
-            ElasticsearchEmitterConfig.AttachmentStrategy attachmentStrategy,
+            ESEmitterConfig.AttachmentStrategy attachmentStrategy,
             String emitKey, String embeddedFileFieldName)
             throws IOException {
         return new InsertMetadataToJsonWriter().writeEmbedded(
-                metadata, attachmentStrategy, emitKey,
-                embeddedFileFieldName);
+                metadata, attachmentStrategy, emitKey, embeddedFileFieldName);
     }
 
     public JsonResponse postJson(String url, String json) throws IOException {
         HttpPost httpRequest = new HttpPost(url);
-        StringEntity entity =
-                new StringEntity(json, StandardCharsets.UTF_8);
+        StringEntity entity = new StringEntity(json, StandardCharsets.UTF_8);
         httpRequest.setEntity(entity);
         httpRequest.setHeader("Accept", "application/json");
-        httpRequest.setHeader("Content-type",
-                "application/json; charset=utf-8");
+        httpRequest.setHeader("Content-type", "application/json; 
charset=utf-8");
 
-        // ES 8.x API key auth
         if (!StringUtils.isEmpty(config.apiKey())) {
-            httpRequest.setHeader("Authorization",
-                    "ApiKey " + config.apiKey());
+            httpRequest.setHeader("Authorization", "ApiKey " + 
config.apiKey());
         }
 
         HttpResponse response = null;
@@ -190,9 +179,7 @@ public class ElasticsearchClient {
                 }
             } else {
                 return new JsonResponse(status,
-                        new String(
-                                EntityUtils.toByteArray(
-                                        response.getEntity()),
+                        new 
String(EntityUtils.toByteArray(response.getEntity()),
                                 StandardCharsets.UTF_8));
             }
         } finally {
@@ -208,36 +195,29 @@ public class ElasticsearchClient {
     // -----------------------------------------------------------------------
 
     private interface MetadataToJsonWriter {
-        String writeContainer(
-                Metadata metadata,
-                ElasticsearchEmitterConfig.AttachmentStrategy strategy)
+        String writeContainer(Metadata metadata,
+                              ESEmitterConfig.AttachmentStrategy strategy)
                 throws IOException;
 
-        String writeEmbedded(
-                Metadata metadata,
-                ElasticsearchEmitterConfig.AttachmentStrategy strategy,
-                String emitKey, String embeddedFileFieldName)
+        String writeEmbedded(Metadata metadata,
+                             ESEmitterConfig.AttachmentStrategy strategy,
+                             String emitKey, String embeddedFileFieldName)
                 throws IOException;
 
         String getBulkJson(String id, String routing) throws IOException;
     }
 
-    private static class InsertMetadataToJsonWriter
-            implements MetadataToJsonWriter {
+    private static class InsertMetadataToJsonWriter implements 
MetadataToJsonWriter {
 
         @Override
-        public String writeContainer(
-                Metadata metadata,
-                ElasticsearchEmitterConfig.AttachmentStrategy strategy)
+        public String writeContainer(Metadata metadata,
+                                     ESEmitterConfig.AttachmentStrategy 
strategy)
                 throws IOException {
             StringWriter writer = new StringWriter();
-            try (JsonGenerator jg =
-                         new JsonFactory().createGenerator(writer)) {
+            try (JsonGenerator jg = new JsonFactory().createGenerator(writer)) 
{
                 jg.writeStartObject();
                 writeMetadata(metadata, jg);
-                if (strategy ==
-                        ElasticsearchEmitterConfig.AttachmentStrategy
-                                .PARENT_CHILD) {
+                if (strategy == 
ESEmitterConfig.AttachmentStrategy.PARENT_CHILD) {
                     jg.writeStringField("relation_type", "container");
                 }
                 jg.writeEndObject();
@@ -246,26 +226,20 @@ public class ElasticsearchClient {
         }
 
         @Override
-        public String writeEmbedded(
-                Metadata metadata,
-                ElasticsearchEmitterConfig.AttachmentStrategy strategy,
-                String emitKey, String embeddedFileFieldName)
+        public String writeEmbedded(Metadata metadata,
+                                    ESEmitterConfig.AttachmentStrategy 
strategy,
+                                    String emitKey, String 
embeddedFileFieldName)
                 throws IOException {
             StringWriter writer = new StringWriter();
-            try (JsonGenerator jg =
-                         new JsonFactory().createGenerator(writer)) {
+            try (JsonGenerator jg = new JsonFactory().createGenerator(writer)) 
{
                 jg.writeStartObject();
                 writeMetadata(metadata, jg);
-                if (strategy ==
-                        ElasticsearchEmitterConfig.AttachmentStrategy
-                                .PARENT_CHILD) {
+                if (strategy == 
ESEmitterConfig.AttachmentStrategy.PARENT_CHILD) {
                     jg.writeObjectFieldStart("relation_type");
                     jg.writeStringField("name", embeddedFileFieldName);
                     jg.writeStringField("parent", emitKey);
                     jg.writeEndObject();
-                } else if (strategy ==
-                        ElasticsearchEmitterConfig.AttachmentStrategy
-                                .SEPARATE_DOCUMENTS) {
+                } else if (strategy == 
ESEmitterConfig.AttachmentStrategy.SEPARATE_DOCUMENTS) {
                     jg.writeStringField("parent", emitKey);
                 }
                 jg.writeEndObject();
@@ -274,11 +248,9 @@ public class ElasticsearchClient {
         }
 
         @Override
-        public String getBulkJson(String id, String routing)
-                throws IOException {
+        public String getBulkJson(String id, String routing) throws 
IOException {
             StringWriter writer = new StringWriter();
-            try (JsonGenerator jg =
-                         new JsonFactory().createGenerator(writer)) {
+            try (JsonGenerator jg = new JsonFactory().createGenerator(writer)) 
{
                 jg.writeStartObject();
                 jg.writeObjectFieldStart("index");
                 jg.writeStringField("_id", id);
@@ -292,23 +264,18 @@ public class ElasticsearchClient {
         }
     }
 
-    private static class UpsertMetadataToJsonWriter
-            implements MetadataToJsonWriter {
+    private static class UpsertMetadataToJsonWriter implements 
MetadataToJsonWriter {
 
         @Override
-        public String writeContainer(
-                Metadata metadata,
-                ElasticsearchEmitterConfig.AttachmentStrategy strategy)
+        public String writeContainer(Metadata metadata,
+                                     ESEmitterConfig.AttachmentStrategy 
strategy)
                 throws IOException {
             StringWriter writer = new StringWriter();
-            try (JsonGenerator jg =
-                         new JsonFactory().createGenerator(writer)) {
+            try (JsonGenerator jg = new JsonFactory().createGenerator(writer)) 
{
                 jg.writeStartObject();
                 jg.writeObjectFieldStart("doc");
                 writeMetadata(metadata, jg);
-                if (strategy ==
-                        ElasticsearchEmitterConfig.AttachmentStrategy
-                                .PARENT_CHILD) {
+                if (strategy == 
ESEmitterConfig.AttachmentStrategy.PARENT_CHILD) {
                     jg.writeStringField("relation_type", "container");
                 }
                 jg.writeEndObject();
@@ -319,27 +286,21 @@ public class ElasticsearchClient {
         }
 
         @Override
-        public String writeEmbedded(
-                Metadata metadata,
-                ElasticsearchEmitterConfig.AttachmentStrategy strategy,
-                String emitKey, String embeddedFileFieldName)
+        public String writeEmbedded(Metadata metadata,
+                                    ESEmitterConfig.AttachmentStrategy 
strategy,
+                                    String emitKey, String 
embeddedFileFieldName)
                 throws IOException {
             StringWriter writer = new StringWriter();
-            try (JsonGenerator jg =
-                         new JsonFactory().createGenerator(writer)) {
+            try (JsonGenerator jg = new JsonFactory().createGenerator(writer)) 
{
                 jg.writeStartObject();
                 jg.writeObjectFieldStart("doc");
                 writeMetadata(metadata, jg);
-                if (strategy ==
-                        ElasticsearchEmitterConfig.AttachmentStrategy
-                                .PARENT_CHILD) {
+                if (strategy == 
ESEmitterConfig.AttachmentStrategy.PARENT_CHILD) {
                     jg.writeObjectFieldStart("relation_type");
                     jg.writeStringField("name", embeddedFileFieldName);
                     jg.writeStringField("parent", emitKey);
                     jg.writeEndObject();
-                } else if (strategy ==
-                        ElasticsearchEmitterConfig.AttachmentStrategy
-                                .SEPARATE_DOCUMENTS) {
+                } else if (strategy == 
ESEmitterConfig.AttachmentStrategy.SEPARATE_DOCUMENTS) {
                     jg.writeStringField("parent", emitKey);
                 }
                 jg.writeEndObject();
@@ -350,11 +311,9 @@ public class ElasticsearchClient {
         }
 
         @Override
-        public String getBulkJson(String id, String routing)
-                throws IOException {
+        public String getBulkJson(String id, String routing) throws 
IOException {
             StringWriter writer = new StringWriter();
-            try (JsonGenerator jg =
-                         new JsonFactory().createGenerator(writer)) {
+            try (JsonGenerator jg = new JsonFactory().createGenerator(writer)) 
{
                 jg.writeStartObject();
                 jg.writeObjectFieldStart("update");
                 jg.writeStringField("_id", id);
@@ -373,19 +332,16 @@ public class ElasticsearchClient {
      * Metadata fields whose values are serialized JSON from the
      * tika-inference pipeline. These must be written as raw JSON
      * (arrays/objects) rather than escaped strings so that
-     * Elasticsearch can index vectors, locators, etc. natively.
+     * ES can index vectors, locators, etc. natively.
      */
-    static final Set<String> INFERENCE_JSON_FIELDS = Set.of(
-            "tika:chunks");
+    static final Set<String> INFERENCE_JSON_FIELDS = Set.of("tika:chunks");
 
-    private static void writeMetadata(Metadata metadata,
-                                      JsonGenerator jsonGenerator)
+    private static void writeMetadata(Metadata metadata, JsonGenerator 
jsonGenerator)
             throws IOException {
         for (String n : metadata.names()) {
             String[] vals = metadata.getValues(n);
             if (vals.length == 1) {
-                if (INFERENCE_JSON_FIELDS.contains(n)
-                        && isValidJson(vals[0])) {
+                if (INFERENCE_JSON_FIELDS.contains(n) && isValidJson(vals[0])) 
{
                     jsonGenerator.writeFieldName(n);
                     jsonGenerator.writeRawValue(vals[0]);
                 } else {
@@ -401,12 +357,17 @@ public class ElasticsearchClient {
         }
     }
 
-    private static final ObjectMapper VALIDATION_MAPPER = new ObjectMapper();
+    private static final JsonFactory STRICT_JSON_FACTORY = new JsonFactory();
 
     /**
-     * Validates that the value is well-formed JSON (array or object)
-     * before writing it as raw JSON. This prevents injection of
-     * arbitrary content into the bulk request payload.
+     * Validates that the value is well-formed JSON (array or object) with
+     * no trailing content before writing it as raw JSON.
+     *
+     * <p>{@code ObjectMapper.readTree()} silently ignores trailing content,
+     * so a value like {@code [1,2,3], "injected": true} would pass a simple
+     * readTree check and then inject extra fields into the document via
+     * {@code writeRawValue}. We use a {@link JsonParser} directly and
+     * assert that the stream is fully consumed after the root value.
      */
     private static boolean isValidJson(String value) {
         if (value == null || value.isEmpty()) {
@@ -416,8 +377,14 @@ public class ElasticsearchClient {
         if (first != '[' && first != '{') {
             return false;
         }
-        try {
-            VALIDATION_MAPPER.readTree(value);
+        try (JsonParser parser = STRICT_JSON_FACTORY.createParser(value)) {
+            parser.nextToken();
+            parser.skipChildren();
+            if (parser.nextToken() != null) {
+                LOG.warn("Field value has trailing content after root JSON 
value; "
+                        + "writing as escaped string");
+                return false;
+            }
             return true;
         } catch (IOException e) {
             LOG.warn("Field value starts with '{}' but is not valid JSON; "
diff --git 
a/tika-pipes/tika-pipes-plugins/tika-pipes-elasticsearch/src/main/java/org/apache/tika/pipes/emitter/elasticsearch/ElasticsearchEmitter.java
 
b/tika-pipes/tika-pipes-plugins/tika-pipes-elasticsearch/src/main/java/org/apache/tika/pipes/emitter/es/ESEmitter.java
similarity index 70%
rename from 
tika-pipes/tika-pipes-plugins/tika-pipes-elasticsearch/src/main/java/org/apache/tika/pipes/emitter/elasticsearch/ElasticsearchEmitter.java
rename to 
tika-pipes/tika-pipes-plugins/tika-pipes-elasticsearch/src/main/java/org/apache/tika/pipes/emitter/es/ESEmitter.java
index 2c8c66783a..cb382a6b63 100644
--- 
a/tika-pipes/tika-pipes-plugins/tika-pipes-elasticsearch/src/main/java/org/apache/tika/pipes/emitter/elasticsearch/ElasticsearchEmitter.java
+++ 
b/tika-pipes/tika-pipes-plugins/tika-pipes-elasticsearch/src/main/java/org/apache/tika/pipes/emitter/es/ESEmitter.java
@@ -14,7 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.tika.pipes.emitter.elasticsearch;
+package org.apache.tika.pipes.emitter.es;
 
 import java.io.IOException;
 import java.util.List;
@@ -33,40 +33,36 @@ import org.apache.tika.pipes.api.emitter.EmitData;
 import org.apache.tika.plugins.ExtensionConfig;
 
 /**
- * Emitter that sends documents to Elasticsearch via its REST API.
+ * Emitter that sends documents to an ES-compatible REST API.
  *
- * <p>This emitter does <b>not</b> depend on the Elasticsearch Java client
+ * <p>This emitter does <b>not</b> depend on the ES Java client
  * (which changed to a non-ASL license). It uses plain HTTP via
  * Apache HttpClient to call the {@code _bulk} endpoint directly.
  *
  * <p>Supports:
  * <ul>
- *   <li>API key authentication ({@code Authorization: ApiKey &lt;base64&gt;})
- *       — common with Elasticsearch 8.x</li>
+ *   <li>API key authentication ({@code Authorization: ApiKey 
&lt;base64&gt;})</li>
  *   <li>Basic authentication (username/password via httpClientConfig)</li>
  *   <li>OVERWRITE and UPSERT update strategies</li>
  *   <li>SEPARATE_DOCUMENTS and PARENT_CHILD attachment strategies</li>
  * </ul>
  */
-public class ElasticsearchEmitter extends AbstractEmitter {
+public class ESEmitter extends AbstractEmitter {
 
     public static final String DEFAULT_EMBEDDED_FILE_FIELD_NAME = "embedded";
-    private static final Logger LOG =
-            LoggerFactory.getLogger(ElasticsearchEmitter.class);
+    private static final Logger LOG = LoggerFactory.getLogger(ESEmitter.class);
 
-    private ElasticsearchClient elasticsearchClient;
+    private ESClient esClient;
     private final HttpClientFactory httpClientFactory;
-    private final ElasticsearchEmitterConfig config;
+    private final ESEmitterConfig config;
 
-    public static ElasticsearchEmitter build(ExtensionConfig pluginConfig)
+    public static ESEmitter build(ExtensionConfig pluginConfig)
             throws TikaConfigException, IOException {
-        ElasticsearchEmitterConfig config =
-                ElasticsearchEmitterConfig.load(pluginConfig.json());
-        return new ElasticsearchEmitter(pluginConfig, config);
+        ESEmitterConfig config = ESEmitterConfig.load(pluginConfig.json());
+        return new ESEmitter(pluginConfig, config);
     }
 
-    public ElasticsearchEmitter(ExtensionConfig pluginConfig,
-                                ElasticsearchEmitterConfig config)
+    public ESEmitter(ExtensionConfig pluginConfig, ESEmitterConfig config)
             throws IOException, TikaConfigException {
         super(pluginConfig);
         this.config = config;
@@ -80,10 +76,9 @@ public class ElasticsearchEmitter extends AbstractEmitter {
             LOG.debug("metadataList is null or empty");
             return;
         }
-
         try {
             LOG.debug("about to emit {} docs", emitData.size());
-            elasticsearchClient.emitDocuments(emitData);
+            esClient.emitDocuments(emitData);
             LOG.info("successfully emitted {} docs", emitData.size());
         } catch (TikaClientException e) {
             LOG.warn("problem emitting docs", e);
@@ -101,7 +96,7 @@ public class ElasticsearchEmitter extends AbstractEmitter {
         try {
             LOG.debug("about to emit one doc with {} metadata entries",
                     metadataList.size());
-            elasticsearchClient.emitDocument(emitKey, metadataList);
+            esClient.emitDocument(emitKey, metadataList);
             LOG.info("successfully emitted one doc");
         } catch (TikaClientException e) {
             LOG.warn("problem emitting doc", e);
@@ -110,18 +105,29 @@ public class ElasticsearchEmitter extends AbstractEmitter 
{
     }
 
     private void configure() throws TikaConfigException {
-        ConfigValidator.mustNotBeEmpty("elasticsearchUrl",
-                config.elasticsearchUrl());
+        ConfigValidator.mustNotBeEmpty("esUrl", config.esUrl());
         ConfigValidator.mustNotBeEmpty("idField", config.idField());
 
         HttpClientConfig http = config.httpClientConfig();
         if (http != null) {
             httpClientFactory.setUserName(http.userName());
             httpClientFactory.setPassword(http.password());
+            if (http.socketTimeout() > 0) {
+                httpClientFactory.setSocketTimeout(http.socketTimeout());
+            }
+            if (http.connectionTimeout() > 0) {
+                httpClientFactory.setConnectTimeout(http.connectionTimeout());
+            }
+            if (http.authScheme() != null) {
+                httpClientFactory.setAuthScheme(http.authScheme());
+            }
+            if (http.proxyHost() != null) {
+                httpClientFactory.setProxyHost(http.proxyHost());
+                httpClientFactory.setProxyPort(http.proxyPort());
+            }
+            httpClientFactory.setVerifySsl(http.verifySsl());
         }
 
-        elasticsearchClient =
-                new ElasticsearchClient(config, httpClientFactory.build());
+        esClient = new ESClient(config, httpClientFactory.build());
     }
-
 }
diff --git 
a/tika-pipes/tika-pipes-plugins/tika-pipes-elasticsearch/src/main/java/org/apache/tika/pipes/emitter/elasticsearch/ElasticsearchEmitterConfig.java
 
b/tika-pipes/tika-pipes-plugins/tika-pipes-elasticsearch/src/main/java/org/apache/tika/pipes/emitter/es/ESEmitterConfig.java
similarity index 56%
rename from 
tika-pipes/tika-pipes-plugins/tika-pipes-elasticsearch/src/main/java/org/apache/tika/pipes/emitter/elasticsearch/ElasticsearchEmitterConfig.java
rename to 
tika-pipes/tika-pipes-plugins/tika-pipes-elasticsearch/src/main/java/org/apache/tika/pipes/emitter/es/ESEmitterConfig.java
index 874431daac..288098957b 100644
--- 
a/tika-pipes/tika-pipes-plugins/tika-pipes-elasticsearch/src/main/java/org/apache/tika/pipes/emitter/elasticsearch/ElasticsearchEmitterConfig.java
+++ 
b/tika-pipes/tika-pipes-plugins/tika-pipes-elasticsearch/src/main/java/org/apache/tika/pipes/emitter/es/ESEmitterConfig.java
@@ -14,7 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.tika.pipes.emitter.elasticsearch;
+package org.apache.tika.pipes.emitter.es;
 
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.ObjectMapper;
@@ -22,25 +22,25 @@ import com.fasterxml.jackson.databind.ObjectMapper;
 import org.apache.tika.exception.TikaConfigException;
 
 /**
- * Configuration for the Elasticsearch emitter.
+ * Configuration for the ES emitter.
  *
- * @param elasticsearchUrl Full URL including index, e.g. {@code 
https://localhost:9200/my-index}
- * @param idField          Metadata field to use as the document {@code _id}
+ * @param esUrl              Full URL including index, e.g. {@code 
https://localhost:9200/my-index}
+ * @param idField            Metadata field to use as the document {@code _id}
  * @param attachmentStrategy How to handle embedded documents
  * @param updateStrategy     Whether to overwrite or upsert
  * @param commitWithin       Not used by ES, kept for API parity with 
OpenSearch emitter
  * @param embeddedFileFieldName Field name for embedded-file relation
- * @param apiKey             Elasticsearch API key for authentication 
(Base64-encoded
- *                           {@code id:api_key}). Sent as {@code 
Authorization: ApiKey <value>}.
+ * @param apiKey             API key for authentication (Base64-encoded {@code 
id:api_key}).
+ *                           Sent as {@code Authorization: ApiKey <value>}.
  *                           If null/empty, falls back to httpClientConfig's 
userName/password
  *                           for basic auth.
  * @param httpClientConfig   HTTP connection settings (basic auth, timeouts, 
proxy)
  */
-public record ElasticsearchEmitterConfig(String elasticsearchUrl, String 
idField,
-                                         AttachmentStrategy attachmentStrategy,
-                                         UpdateStrategy updateStrategy, int 
commitWithin,
-                                         String embeddedFileFieldName, String 
apiKey,
-                                         HttpClientConfig httpClientConfig) {
+public record ESEmitterConfig(String esUrl, String idField,
+                              AttachmentStrategy attachmentStrategy,
+                              UpdateStrategy updateStrategy, int commitWithin,
+                              String embeddedFileFieldName, String apiKey,
+                              HttpClientConfig httpClientConfig) {
     public enum AttachmentStrategy {
         SEPARATE_DOCUMENTS, PARENT_CHILD,
     }
@@ -51,15 +51,24 @@ public record ElasticsearchEmitterConfig(String 
elasticsearchUrl, String idField
 
     private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
 
-    public static ElasticsearchEmitterConfig load(final String json)
+    public static ESEmitterConfig load(final String json)
             throws TikaConfigException {
         try {
-            return OBJECT_MAPPER.readValue(json,
-                    ElasticsearchEmitterConfig.class);
+            return OBJECT_MAPPER.readValue(json, ESEmitterConfig.class);
         } catch (JsonProcessingException e) {
             throw new TikaConfigException(
-                    "Failed to parse ElasticsearchEmitterConfig from JSON", e);
+                    "Failed to parse ESEmitterConfig from JSON", e);
         }
     }
 
+    /** Overrides the record default to prevent {@code apiKey} leaking into 
logs. */
+    @Override
+    public String toString() {
+        return "ESEmitterConfig{esUrl='" + esUrl + '\'' +
+                ", idField='" + idField + '\'' +
+                ", attachmentStrategy=" + attachmentStrategy +
+                ", updateStrategy=" + updateStrategy +
+                ", apiKey=" + (apiKey != null ? "[REDACTED]" : "null") +
+                '}';
+    }
 }
diff --git 
a/tika-pipes/tika-pipes-plugins/tika-pipes-elasticsearch/src/main/java/org/apache/tika/pipes/emitter/elasticsearch/ElasticsearchEmitterFactory.java
 
b/tika-pipes/tika-pipes-plugins/tika-pipes-elasticsearch/src/main/java/org/apache/tika/pipes/emitter/es/ESEmitterFactory.java
similarity index 80%
copy from 
tika-pipes/tika-pipes-plugins/tika-pipes-elasticsearch/src/main/java/org/apache/tika/pipes/emitter/elasticsearch/ElasticsearchEmitterFactory.java
copy to 
tika-pipes/tika-pipes-plugins/tika-pipes-elasticsearch/src/main/java/org/apache/tika/pipes/emitter/es/ESEmitterFactory.java
index 94bda68b2a..21771d1d5d 100644
--- 
a/tika-pipes/tika-pipes-plugins/tika-pipes-elasticsearch/src/main/java/org/apache/tika/pipes/emitter/elasticsearch/ElasticsearchEmitterFactory.java
+++ 
b/tika-pipes/tika-pipes-plugins/tika-pipes-elasticsearch/src/main/java/org/apache/tika/pipes/emitter/es/ESEmitterFactory.java
@@ -14,7 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.tika.pipes.emitter.elasticsearch;
+package org.apache.tika.pipes.emitter.es;
 
 import java.io.IOException;
 
@@ -26,14 +26,14 @@ import org.apache.tika.pipes.api.emitter.EmitterFactory;
 import org.apache.tika.plugins.ExtensionConfig;
 
 /**
- * Factory for creating Elasticsearch emitters.
+ * Factory for creating ES emitters.
  *
  * <p>Example JSON configuration:
  * <pre>
  * "emitters": {
- *   "elasticsearch-emitter": {
+ *   "es-emitter": {
  *     "my-es-emitter": {
- *       "elasticsearchUrl": "https://localhost:9200/my-index";,
+ *       "esUrl": "https://localhost:9200/my-index";,
  *       "idField": "id",
  *       "apiKey": "base64-encoded-id:api_key",
  *       "attachmentStrategy": "PARENT_CHILD",
@@ -45,9 +45,9 @@ import org.apache.tika.plugins.ExtensionConfig;
  * </pre>
  */
 @Extension
-public class ElasticsearchEmitterFactory implements EmitterFactory {
+public class ESEmitterFactory implements EmitterFactory {
 
-    public static final String NAME = "elasticsearch-emitter";
+    public static final String NAME = "es-emitter";
 
     @Override
     public String getName() {
@@ -57,6 +57,6 @@ public class ElasticsearchEmitterFactory implements 
EmitterFactory {
     @Override
     public Emitter buildExtension(ExtensionConfig extensionConfig)
             throws IOException, TikaConfigException {
-        return ElasticsearchEmitter.build(extensionConfig);
+        return ESEmitter.build(extensionConfig);
     }
 }
diff --git 
a/tika-pipes/tika-pipes-plugins/tika-pipes-elasticsearch/src/main/java/org/apache/tika/pipes/emitter/es/HttpClientConfig.java
 
b/tika-pipes/tika-pipes-plugins/tika-pipes-elasticsearch/src/main/java/org/apache/tika/pipes/emitter/es/HttpClientConfig.java
new file mode 100644
index 0000000000..4056c282aa
--- /dev/null
+++ 
b/tika-pipes/tika-pipes-plugins/tika-pipes-elasticsearch/src/main/java/org/apache/tika/pipes/emitter/es/HttpClientConfig.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tika.pipes.emitter.es;
+
+/**
+ * HTTP client settings for the ES emitter and reporter.
+ * Field names and semantics are intentionally aligned with the OpenSearch
+ * emitter's HttpClientConfig for configuration consistency.
+ *
+ * @param userName          Username for basic authentication (optional)
+ * @param password          Password for basic authentication (optional)
+ * @param authScheme        Auth scheme passed to HttpClientFactory, e.g. 
{@code "basic"} or {@code "ntlm"}
+ * @param connectionTimeout Connect timeout in milliseconds
+ * @param socketTimeout     Socket read timeout in milliseconds
+ * @param proxyHost         HTTP proxy host (optional)
+ * @param proxyPort         HTTP proxy port
+ * @param verifySsl         When {@code true}, the HTTP client validates 
server certificates and
+ *                          hostnames using the JVM's default trust store.  
Defaults to
+ *                          {@code false} for backward compatibility 
(trust-all / no hostname check).
+ */
+public record HttpClientConfig(String userName, String password,
+                               String authScheme, int connectionTimeout,
+                               int socketTimeout, String proxyHost, int 
proxyPort,
+                               boolean verifySsl) {
+}
diff --git 
a/tika-pipes/tika-pipes-plugins/tika-pipes-elasticsearch/src/main/java/org/apache/tika/pipes/emitter/elasticsearch/JsonResponse.java
 
b/tika-pipes/tika-pipes-plugins/tika-pipes-elasticsearch/src/main/java/org/apache/tika/pipes/emitter/es/JsonResponse.java
similarity index 96%
rename from 
tika-pipes/tika-pipes-plugins/tika-pipes-elasticsearch/src/main/java/org/apache/tika/pipes/emitter/elasticsearch/JsonResponse.java
rename to 
tika-pipes/tika-pipes-plugins/tika-pipes-elasticsearch/src/main/java/org/apache/tika/pipes/emitter/es/JsonResponse.java
index 5e6f3373e3..ce4cc591b8 100644
--- 
a/tika-pipes/tika-pipes-plugins/tika-pipes-elasticsearch/src/main/java/org/apache/tika/pipes/emitter/elasticsearch/JsonResponse.java
+++ 
b/tika-pipes/tika-pipes-plugins/tika-pipes-elasticsearch/src/main/java/org/apache/tika/pipes/emitter/es/JsonResponse.java
@@ -14,8 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.tika.pipes.emitter.elasticsearch;
-
+package org.apache.tika.pipes.emitter.es;
 
 import com.fasterxml.jackson.databind.JsonNode;
 
diff --git 
a/tika-pipes/tika-pipes-plugins/tika-pipes-elasticsearch/src/main/java/org/apache/tika/pipes/plugin/elasticsearch/ElasticsearchPipesPlugin.java
 
b/tika-pipes/tika-pipes-plugins/tika-pipes-elasticsearch/src/main/java/org/apache/tika/pipes/plugin/es/ESPipesPlugin.java
similarity index 72%
rename from 
tika-pipes/tika-pipes-plugins/tika-pipes-elasticsearch/src/main/java/org/apache/tika/pipes/plugin/elasticsearch/ElasticsearchPipesPlugin.java
rename to 
tika-pipes/tika-pipes-plugins/tika-pipes-elasticsearch/src/main/java/org/apache/tika/pipes/plugin/es/ESPipesPlugin.java
index 1b6a195171..0e1d60401f 100644
--- 
a/tika-pipes/tika-pipes-plugins/tika-pipes-elasticsearch/src/main/java/org/apache/tika/pipes/plugin/elasticsearch/ElasticsearchPipesPlugin.java
+++ 
b/tika-pipes/tika-pipes-plugins/tika-pipes-elasticsearch/src/main/java/org/apache/tika/pipes/plugin/es/ESPipesPlugin.java
@@ -14,37 +14,35 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.tika.pipes.plugin.elasticsearch;
+package org.apache.tika.pipes.plugin.es;
 
 import org.pf4j.Plugin;
 import org.pf4j.PluginWrapper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class ElasticsearchPipesPlugin extends Plugin {
-    private static final Logger LOG =
-            LoggerFactory.getLogger(ElasticsearchPipesPlugin.class);
+public class ESPipesPlugin extends Plugin {
+    private static final Logger LOG = 
LoggerFactory.getLogger(ESPipesPlugin.class);
 
-    public ElasticsearchPipesPlugin(PluginWrapper wrapper) {
+    public ESPipesPlugin(PluginWrapper wrapper) {
         super(wrapper);
     }
 
     @Override
     public void start() {
-        LOG.info("Starting Elasticsearch pipes plugin");
+        LOG.info("Starting ES pipes plugin");
         super.start();
     }
 
     @Override
     public void stop() {
-        LOG.info("Stopping Elasticsearch pipes plugin");
+        LOG.info("Stopping ES pipes plugin");
         super.stop();
     }
 
     @Override
     public void delete() {
-        LOG.info("Deleting Elasticsearch pipes plugin");
+        LOG.info("Deleting ES pipes plugin");
         super.delete();
     }
-
 }
diff --git 
a/tika-pipes/tika-pipes-plugins/tika-pipes-elasticsearch/src/main/java/org/apache/tika/pipes/reporter/es/ESPipesReporter.java
 
b/tika-pipes/tika-pipes-plugins/tika-pipes-elasticsearch/src/main/java/org/apache/tika/pipes/reporter/es/ESPipesReporter.java
new file mode 100644
index 0000000000..801eb51b05
--- /dev/null
+++ 
b/tika-pipes/tika-pipes-plugins/tika-pipes-elasticsearch/src/main/java/org/apache/tika/pipes/reporter/es/ESPipesReporter.java
@@ -0,0 +1,251 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tika.pipes.reporter.es;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.Reader;
+import java.io.StringWriter;
+import java.nio.charset.StandardCharsets;
+
+import com.fasterxml.jackson.core.JsonFactory;
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.http.HttpResponse;
+import org.apache.http.client.HttpClient;
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.entity.StringEntity;
+import org.apache.http.util.EntityUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.tika.client.HttpClientFactory;
+import org.apache.tika.client.TikaClientException;
+import org.apache.tika.exception.TikaConfigException;
+import org.apache.tika.metadata.ExternalProcess;
+import org.apache.tika.metadata.Metadata;
+import org.apache.tika.pipes.api.FetchEmitTuple;
+import org.apache.tika.pipes.api.PipesResult;
+import org.apache.tika.pipes.api.pipesiterator.TotalCountResult;
+import org.apache.tika.pipes.emitter.es.HttpClientConfig;
+import org.apache.tika.pipes.emitter.es.JsonResponse;
+import org.apache.tika.pipes.reporters.PipesReporterBase;
+import org.apache.tika.plugins.ExtensionConfig;
+import org.apache.tika.utils.StringUtils;
+
+public class ESPipesReporter extends PipesReporterBase {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(ESPipesReporter.class);
+
+    public static final String DEFAULT_PARSE_TIME_KEY = "parse_time_ms";
+    public static final String DEFAULT_PARSE_STATUS_KEY = "parse_status";
+    public static final String DEFAULT_EXIT_VALUE_KEY = "exit_value";
+
+    private final ESReporterConfig config;
+    private HttpClient httpClient;
+    private String parseTimeKey = DEFAULT_PARSE_TIME_KEY;
+    private String parseStatusKey = DEFAULT_PARSE_STATUS_KEY;
+    private String exitValueKey = DEFAULT_EXIT_VALUE_KEY;
+
+    public static ESPipesReporter build(ExtensionConfig pluginConfig)
+            throws TikaConfigException, IOException {
+        ESReporterConfig config = ESReporterConfig.load(pluginConfig.json());
+        return new ESPipesReporter(pluginConfig, config);
+    }
+
+    public ESPipesReporter(ExtensionConfig pluginConfig, ESReporterConfig 
config)
+            throws TikaConfigException {
+        super(pluginConfig, config.includes(), config.excludes());
+        this.config = config;
+        init();
+    }
+
+    private void init() throws TikaConfigException {
+        if (StringUtils.isBlank(config.esUrl())) {
+            throw new TikaConfigException("Must specify an esUrl!");
+        }
+        HttpClientFactory httpClientFactory = new HttpClientFactory();
+        HttpClientConfig http = config.httpClientConfig();
+        if (http != null) {
+            httpClientFactory.setUserName(http.userName());
+            httpClientFactory.setPassword(http.password());
+            if (http.socketTimeout() > 0) {
+                httpClientFactory.setSocketTimeout(http.socketTimeout());
+            }
+            if (http.connectionTimeout() > 0) {
+                httpClientFactory.setConnectTimeout(http.connectionTimeout());
+            }
+            if (http.authScheme() != null) {
+                httpClientFactory.setAuthScheme(http.authScheme());
+            }
+            if (http.proxyHost() != null) {
+                httpClientFactory.setProxyHost(http.proxyHost());
+                httpClientFactory.setProxyPort(http.proxyPort());
+            }
+            httpClientFactory.setVerifySsl(http.verifySsl());
+        }
+        httpClient = httpClientFactory.build();
+
+        parseStatusKey = StringUtils.isBlank(config.keyPrefix())
+                ? parseStatusKey : config.keyPrefix() + parseStatusKey;
+        parseTimeKey = StringUtils.isBlank(config.keyPrefix())
+                ? parseTimeKey : config.keyPrefix() + parseTimeKey;
+        exitValueKey = StringUtils.isBlank(config.keyPrefix())
+                ? exitValueKey : config.keyPrefix() + exitValueKey;
+    }
+
+    @Override
+    public void report(FetchEmitTuple t, PipesResult result, long elapsed) {
+        if (!accept(result.status())) {
+            return;
+        }
+        Metadata metadata = new Metadata();
+        metadata.set(parseStatusKey, result.status().name());
+        metadata.set(parseTimeKey, Long.toString(elapsed));
+        if (result.emitData() != null && result.emitData().getMetadataList() 
!= null &&
+                !result.emitData().getMetadataList().isEmpty()) {
+            Metadata m = result.emitData().getMetadataList().get(0);
+            if (m.get(ExternalProcess.EXIT_VALUE) != null) {
+                metadata.set(exitValueKey, m.get(ExternalProcess.EXIT_VALUE));
+            }
+        }
+        try {
+            String routing = config.includeRouting()
+                    ? t.getEmitKey().getEmitKey() : null;
+            emitDocument(t.getEmitKey().getEmitKey(), routing, metadata);
+        } catch (IOException | TikaClientException e) {
+            LOG.warn("failed to report status for '{}'", t.getId(), e);
+        }
+    }
+
+    private void emitDocument(String emitKey, String routing, Metadata 
metadata)
+            throws IOException, TikaClientException {
+        StringWriter writer = new StringWriter();
+        writeBulkHeader(emitKey, routing, writer);
+        writer.append("\n");
+        writeDoc(metadata, writer);
+        writer.append("\n");
+
+        String requestUrl = config.esUrl() + "/_bulk";
+        JsonResponse response = postJson(requestUrl, writer.toString());
+        if (response.getStatus() != 200) {
+            throw new TikaClientException(response.getMsg());
+        }
+        JsonNode errorNode = response.getJson().get("errors");
+        if (errorNode != null && errorNode.asText().equals("true")) {
+            throw new TikaClientException(response.getJson().toString());
+        }
+    }
+
+    private void writeBulkHeader(String id, String routing, StringWriter 
writer)
+            throws IOException {
+        try (JsonGenerator jg = new JsonFactory().createGenerator(writer)) {
+            jg.writeStartObject();
+            jg.writeObjectFieldStart("update");
+            jg.writeStringField("_id", id);
+            if (!StringUtils.isEmpty(routing)) {
+                jg.writeStringField("routing", routing);
+            }
+            jg.writeNumberField("retry_on_conflict", 3);
+            jg.writeEndObject();
+            jg.writeEndObject();
+        }
+    }
+
+    private void writeDoc(Metadata metadata, StringWriter writer) throws 
IOException {
+        try (JsonGenerator jg = new JsonFactory().createGenerator(writer)) {
+            jg.writeStartObject();
+            jg.writeObjectFieldStart("doc");
+            for (String n : metadata.names()) {
+                String[] vals = metadata.getValues(n);
+                if (vals.length == 1) {
+                    jg.writeStringField(n, vals[0]);
+                } else {
+                    jg.writeArrayFieldStart(n);
+                    for (String v : vals) {
+                        jg.writeString(v);
+                    }
+                    jg.writeEndArray();
+                }
+            }
+            jg.writeEndObject();
+            jg.writeBooleanField("doc_as_upsert", true);
+            jg.writeEndObject();
+        }
+    }
+
+    private JsonResponse postJson(String url, String json) throws IOException {
+        HttpPost httpRequest = new HttpPost(url);
+        httpRequest.setEntity(new StringEntity(json, StandardCharsets.UTF_8));
+        httpRequest.setHeader("Accept", "application/json");
+        httpRequest.setHeader("Content-type", "application/json; 
charset=utf-8");
+        if (!StringUtils.isEmpty(config.apiKey())) {
+            httpRequest.setHeader("Authorization", "ApiKey " + 
config.apiKey());
+        }
+        HttpResponse response = null;
+        try {
+            response = httpClient.execute(httpRequest);
+            int status = response.getStatusLine().getStatusCode();
+            if (status == 200) {
+                try (Reader reader = new BufferedReader(
+                        new InputStreamReader(
+                                response.getEntity().getContent(),
+                                StandardCharsets.UTF_8))) {
+                    JsonNode node = new ObjectMapper().readTree(reader);
+                    return new JsonResponse(200, node);
+                }
+            } else {
+                return new JsonResponse(status,
+                        new 
String(EntityUtils.toByteArray(response.getEntity()),
+                                StandardCharsets.UTF_8));
+            }
+        } finally {
+            if (response instanceof CloseableHttpResponse) {
+                ((CloseableHttpResponse) response).close();
+            }
+            httpRequest.releaseConnection();
+        }
+    }
+
+    @Override
+    public void report(TotalCountResult totalCountResult) {
+        // not supported
+    }
+
+    @Override
+    public boolean supportsTotalCount() {
+        return false;
+    }
+
+    @Override
+    public void error(Throwable t) {
+        LOG.error("crashed", t);
+    }
+
+    @Override
+    public void error(String msg) {
+        LOG.error("crashed {}", msg);
+    }
+
+    @Override
+    public void close() throws IOException {
+        // nothing to close
+    }
+}
diff --git 
a/tika-pipes/tika-pipes-plugins/tika-pipes-elasticsearch/src/main/java/org/apache/tika/pipes/emitter/elasticsearch/HttpClientConfig.java
 
b/tika-pipes/tika-pipes-plugins/tika-pipes-elasticsearch/src/main/java/org/apache/tika/pipes/reporter/es/ESReporterConfig.java
similarity index 53%
rename from 
tika-pipes/tika-pipes-plugins/tika-pipes-elasticsearch/src/main/java/org/apache/tika/pipes/emitter/elasticsearch/HttpClientConfig.java
rename to 
tika-pipes/tika-pipes-plugins/tika-pipes-elasticsearch/src/main/java/org/apache/tika/pipes/reporter/es/ESReporterConfig.java
index 6b2b18b00b..0ff1f8809c 100644
--- 
a/tika-pipes/tika-pipes-plugins/tika-pipes-elasticsearch/src/main/java/org/apache/tika/pipes/emitter/elasticsearch/HttpClientConfig.java
+++ 
b/tika-pipes/tika-pipes-plugins/tika-pipes-elasticsearch/src/main/java/org/apache/tika/pipes/reporter/es/ESReporterConfig.java
@@ -14,22 +14,28 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.tika.pipes.emitter.elasticsearch;
+package org.apache.tika.pipes.reporter.es;
 
-import java.io.IOException;
+import java.util.Set;
 
+import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.ObjectMapper;
 
+import org.apache.tika.exception.TikaConfigException;
+import org.apache.tika.pipes.emitter.es.HttpClientConfig;
 
-public record HttpClientConfig(String userName, String password,
-                               String authScheme, int connectionTimeout,
-                               int socketTimeout, String proxyHost,
-                               int proxyPort) {
+public record ESReporterConfig(String esUrl, Set<String> includes, Set<String> 
excludes,
+                               String keyPrefix, boolean includeRouting,
+                               String apiKey, HttpClientConfig 
httpClientConfig) {
 
     private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
 
-    public static HttpClientConfig load(final String json) throws IOException {
-        return OBJECT_MAPPER.readValue(json, HttpClientConfig.class);
+    public static ESReporterConfig load(final String json) throws 
TikaConfigException {
+        try {
+            return OBJECT_MAPPER.readValue(json, ESReporterConfig.class);
+        } catch (JsonProcessingException e) {
+            throw new TikaConfigException(
+                    "Failed to parse ESReporterConfig from JSON", e);
+        }
     }
-
 }
diff --git 
a/tika-pipes/tika-pipes-plugins/tika-pipes-elasticsearch/src/main/java/org/apache/tika/pipes/emitter/elasticsearch/ElasticsearchEmitterFactory.java
 
b/tika-pipes/tika-pipes-plugins/tika-pipes-elasticsearch/src/main/java/org/apache/tika/pipes/reporter/es/ESReporterFactory.java
similarity index 59%
rename from 
tika-pipes/tika-pipes-plugins/tika-pipes-elasticsearch/src/main/java/org/apache/tika/pipes/emitter/elasticsearch/ElasticsearchEmitterFactory.java
rename to 
tika-pipes/tika-pipes-plugins/tika-pipes-elasticsearch/src/main/java/org/apache/tika/pipes/reporter/es/ESReporterFactory.java
index 94bda68b2a..589232f37c 100644
--- 
a/tika-pipes/tika-pipes-plugins/tika-pipes-elasticsearch/src/main/java/org/apache/tika/pipes/emitter/elasticsearch/ElasticsearchEmitterFactory.java
+++ 
b/tika-pipes/tika-pipes-plugins/tika-pipes-elasticsearch/src/main/java/org/apache/tika/pipes/reporter/es/ESReporterFactory.java
@@ -14,40 +14,34 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.tika.pipes.emitter.elasticsearch;
+package org.apache.tika.pipes.reporter.es;
 
 import java.io.IOException;
 
 import org.pf4j.Extension;
 
 import org.apache.tika.exception.TikaConfigException;
-import org.apache.tika.pipes.api.emitter.Emitter;
-import org.apache.tika.pipes.api.emitter.EmitterFactory;
+import org.apache.tika.pipes.api.reporter.PipesReporterFactory;
 import org.apache.tika.plugins.ExtensionConfig;
 
 /**
- * Factory for creating Elasticsearch emitters.
+ * Factory for creating ES pipes reporters.
  *
  * <p>Example JSON configuration:
  * <pre>
- * "emitters": {
- *   "elasticsearch-emitter": {
- *     "my-es-emitter": {
- *       "elasticsearchUrl": "https://localhost:9200/my-index";,
- *       "idField": "id",
- *       "apiKey": "base64-encoded-id:api_key",
- *       "attachmentStrategy": "PARENT_CHILD",
- *       "updateStrategy": "UPSERT",
- *       "embeddedFileFieldName": "embedded"
- *     }
+ * "pipes-reporters": {
+ *   "es-pipes-reporter": {
+ *     "esUrl": "http://localhost:9200/tika-status";,
+ *     "keyPrefix": "status_",
+ *     "includeRouting": false
  *   }
  * }
  * </pre>
  */
 @Extension
-public class ElasticsearchEmitterFactory implements EmitterFactory {
+public class ESReporterFactory implements PipesReporterFactory {
 
-    public static final String NAME = "elasticsearch-emitter";
+    public static final String NAME = "es-pipes-reporter";
 
     @Override
     public String getName() {
@@ -55,8 +49,8 @@ public class ElasticsearchEmitterFactory implements 
EmitterFactory {
     }
 
     @Override
-    public Emitter buildExtension(ExtensionConfig extensionConfig)
+    public ESPipesReporter buildExtension(ExtensionConfig extensionConfig)
             throws IOException, TikaConfigException {
-        return ElasticsearchEmitter.build(extensionConfig);
+        return ESPipesReporter.build(extensionConfig);
     }
 }
diff --git 
a/tika-pipes/tika-pipes-plugins/tika-pipes-elasticsearch/src/main/resources/plugin.properties
 
b/tika-pipes/tika-pipes-plugins/tika-pipes-elasticsearch/src/main/resources/plugin.properties
index 9fc35c59c4..1e76d5b52e 100644
--- 
a/tika-pipes/tika-pipes-plugins/tika-pipes-elasticsearch/src/main/resources/plugin.properties
+++ 
b/tika-pipes/tika-pipes-plugins/tika-pipes-elasticsearch/src/main/resources/plugin.properties
@@ -14,8 +14,8 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-plugin.id=tika-pipes-elasticsearch-plugin
-plugin.class=org.apache.tika.pipes.plugin.elasticsearch.ElasticsearchPipesPlugin
+plugin.id=tika-pipes-es-plugin
+plugin.class=org.apache.tika.pipes.plugin.es.ESPipesPlugin
 plugin.version=4.0.0-SNAPSHOT
 plugin.provider=Apache Tika
-plugin.description=Pipes for Elasticsearch (plain HTTP, ASL v2 only)
+plugin.description=ES emitter for Tika Pipes (plain HTTP, ASL v2 only)
diff --git 
a/tika-pipes/tika-pipes-plugins/tika-pipes-elasticsearch/src/test/java/org/apache/tika/pipes/emitter/elasticsearch/ElasticsearchClientTest.java
 
b/tika-pipes/tika-pipes-plugins/tika-pipes-elasticsearch/src/test/java/org/apache/tika/pipes/emitter/es/ESClientTest.java
similarity index 57%
rename from 
tika-pipes/tika-pipes-plugins/tika-pipes-elasticsearch/src/test/java/org/apache/tika/pipes/emitter/elasticsearch/ElasticsearchClientTest.java
rename to 
tika-pipes/tika-pipes-plugins/tika-pipes-elasticsearch/src/test/java/org/apache/tika/pipes/emitter/es/ESClientTest.java
index 3193b700b2..fac02a1bab 100644
--- 
a/tika-pipes/tika-pipes-plugins/tika-pipes-elasticsearch/src/test/java/org/apache/tika/pipes/emitter/elasticsearch/ElasticsearchClientTest.java
+++ 
b/tika-pipes/tika-pipes-plugins/tika-pipes-elasticsearch/src/test/java/org/apache/tika/pipes/emitter/es/ESClientTest.java
@@ -14,9 +14,10 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.tika.pipes.emitter.elasticsearch;
+package org.apache.tika.pipes.emitter.es;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
 import com.fasterxml.jackson.databind.JsonNode;
@@ -26,7 +27,7 @@ import org.junit.jupiter.api.Test;
 import org.apache.tika.TikaTest;
 import org.apache.tika.metadata.Metadata;
 
-public class ElasticsearchClientTest extends TikaTest {
+public class ESClientTest extends TikaTest {
 
     private static final ObjectMapper MAPPER = new ObjectMapper();
 
@@ -36,23 +37,19 @@ public class ElasticsearchClientTest extends TikaTest {
         metadata.add("authors", "author1");
         metadata.add("authors", "author2");
         metadata.add("title", "title1");
-        for (ElasticsearchEmitterConfig.AttachmentStrategy strategy :
-                ElasticsearchEmitterConfig.AttachmentStrategy.values()) {
-            String json =
-                    ElasticsearchClient.metadataToJsonContainerInsert(
-                            metadata, strategy);
+        for (ESEmitterConfig.AttachmentStrategy strategy :
+                ESEmitterConfig.AttachmentStrategy.values()) {
+            String json = ESClient.metadataToJsonContainerInsert(metadata, 
strategy);
             assertContains("author1", json);
             assertContains("author2", json);
             assertContains("authors", json);
             assertContains("title1", json);
         }
-        for (ElasticsearchEmitterConfig.AttachmentStrategy strategy :
-                ElasticsearchEmitterConfig.AttachmentStrategy.values()) {
-            String json =
-                    ElasticsearchClient.metadataToJsonEmbeddedInsert(
-                            metadata, strategy, "myEmitKey",
-                            ElasticsearchEmitter
-                                    .DEFAULT_EMBEDDED_FILE_FIELD_NAME);
+        for (ESEmitterConfig.AttachmentStrategy strategy :
+                ESEmitterConfig.AttachmentStrategy.values()) {
+            String json = ESClient.metadataToJsonEmbeddedInsert(
+                    metadata, strategy, "myEmitKey",
+                    ESEmitter.DEFAULT_EMBEDDED_FILE_FIELD_NAME);
             assertContains("author1", json);
             assertContains("author2", json);
             assertContains("authors", json);
@@ -64,11 +61,8 @@ public class ElasticsearchClientTest extends TikaTest {
     public void testParentChildContainer() throws Exception {
         Metadata metadata = new Metadata();
         metadata.add("title", "parent doc");
-        String json =
-                ElasticsearchClient.metadataToJsonContainerInsert(
-                        metadata,
-                        ElasticsearchEmitterConfig.AttachmentStrategy
-                                .PARENT_CHILD);
+        String json = ESClient.metadataToJsonContainerInsert(
+                metadata, ESEmitterConfig.AttachmentStrategy.PARENT_CHILD);
         assertContains("relation_type", json);
         assertContains("container", json);
     }
@@ -77,12 +71,9 @@ public class ElasticsearchClientTest extends TikaTest {
     public void testParentChildEmbedded() throws Exception {
         Metadata metadata = new Metadata();
         metadata.add("title", "child doc");
-        String json =
-                ElasticsearchClient.metadataToJsonEmbeddedInsert(
-                        metadata,
-                        ElasticsearchEmitterConfig.AttachmentStrategy
-                                .PARENT_CHILD,
-                        "parentKey", "embedded");
+        String json = ESClient.metadataToJsonEmbeddedInsert(
+                metadata, ESEmitterConfig.AttachmentStrategy.PARENT_CHILD,
+                "parentKey", "embedded");
         assertContains("relation_type", json);
         assertContains("parentKey", json);
         assertContains("embedded", json);
@@ -92,12 +83,9 @@ public class ElasticsearchClientTest extends TikaTest {
     public void testSeparateDocumentsEmbedded() throws Exception {
         Metadata metadata = new Metadata();
         metadata.add("title", "child doc");
-        String json =
-                ElasticsearchClient.metadataToJsonEmbeddedInsert(
-                        metadata,
-                        ElasticsearchEmitterConfig.AttachmentStrategy
-                                .SEPARATE_DOCUMENTS,
-                        "parentKey", "embedded");
+        String json = ESClient.metadataToJsonEmbeddedInsert(
+                metadata, 
ESEmitterConfig.AttachmentStrategy.SEPARATE_DOCUMENTS,
+                "parentKey", "embedded");
         assertContains("parent", json);
         assertContains("parentKey", json);
         assertNotContained("relation_type", json);
@@ -107,69 +95,69 @@ public class ElasticsearchClientTest extends TikaTest {
     public void testChunksFieldWrittenAsRawJson() throws Exception {
         Metadata metadata = new Metadata();
         metadata.set("title", "test doc");
+        // "PwAAAEEgAABAwAAA" is the ES-documented base64 for [0.5, 10.0, 6.0]
+        // in big-endian float32 — the exact format ES dense_vector expects.
         metadata.set("tika:chunks",
-                "[{\"text\":\"hello\",\"vector\":\"AAAA\","
+                "[{\"text\":\"hello\",\"vector\":\"PwAAAEEgAABAwAAA\","
                         + "\"locators\":{\"text\":[{\"start_offset\":0,"
                         + "\"end_offset\":5}]}}]");
-        String json =
-                ElasticsearchClient.metadataToJsonContainerInsert(
-                        metadata,
-                        ElasticsearchEmitterConfig.AttachmentStrategy
-                                .SEPARATE_DOCUMENTS);
+        String json = ESClient.metadataToJsonContainerInsert(
+                metadata, 
ESEmitterConfig.AttachmentStrategy.SEPARATE_DOCUMENTS);
 
-        // Parse the output JSON — tika:chunks should be a real JSON
-        // array, not a double-escaped string
         JsonNode doc = MAPPER.readTree(json);
         JsonNode chunks = doc.get("tika:chunks");
         assertTrue(chunks.isArray(),
                 "tika:chunks should be a JSON array, not a string");
         assertEquals(1, chunks.size());
         assertEquals("hello", chunks.get(0).get("text").asText());
-        assertEquals("AAAA", chunks.get(0).get("vector").asText());
+        assertEquals("PwAAAEEgAABAwAAA", chunks.get(0).get("vector").asText());
     }
 
     @Test
     public void testNonJsonFieldStaysString() throws Exception {
         Metadata metadata = new Metadata();
         metadata.set("tika:chunks", "not json at all");
-        String json =
-                ElasticsearchClient.metadataToJsonContainerInsert(
-                        metadata,
-                        ElasticsearchEmitterConfig.AttachmentStrategy
-                                .SEPARATE_DOCUMENTS);
+        String json = ESClient.metadataToJsonContainerInsert(
+                metadata, 
ESEmitterConfig.AttachmentStrategy.SEPARATE_DOCUMENTS);
         JsonNode doc = MAPPER.readTree(json);
-        // Should be a plain string since it doesn't look like JSON
         assertTrue(doc.get("tika:chunks").isTextual());
-        assertEquals("not json at all",
-                doc.get("tika:chunks").asText());
+        assertEquals("not json at all", doc.get("tika:chunks").asText());
     }
 
     @Test
     public void testRegularFieldNotRawJson() throws Exception {
         Metadata metadata = new Metadata();
-        // A regular field whose value happens to look like JSON
-        // should NOT be written as raw JSON
         metadata.set("description", "[some bracketed text]");
-        String json =
-                ElasticsearchClient.metadataToJsonContainerInsert(
-                        metadata,
-                        ElasticsearchEmitterConfig.AttachmentStrategy
-                                .SEPARATE_DOCUMENTS);
+        String json = ESClient.metadataToJsonContainerInsert(
+                metadata, 
ESEmitterConfig.AttachmentStrategy.SEPARATE_DOCUMENTS);
         JsonNode doc = MAPPER.readTree(json);
         assertTrue(doc.get("description").isTextual());
     }
 
+    @Test
+    public void testTrailingContentRejected() throws Exception {
+        // A value that passes a naive readTree() check but contains trailing
+        // content that would inject extra fields into the document via 
writeRawValue.
+        Metadata metadata = new Metadata();
+        metadata.set("tika:chunks", "[1,2,3], \"injected\": true");
+        String json = ESClient.metadataToJsonContainerInsert(
+                metadata, 
ESEmitterConfig.AttachmentStrategy.SEPARATE_DOCUMENTS);
+        JsonNode doc = MAPPER.readTree(json);
+        // Must be written as an escaped string, not raw JSON — no injection
+        assertTrue(doc.get("tika:chunks").isTextual(),
+                "trailing-content value must be escaped, not written as raw 
JSON");
+        assertFalse(doc.has("injected"),
+                "injected field must not appear in document");
+    }
+
     @Test
     public void testMultiValuedMetadata() throws Exception {
         Metadata metadata = new Metadata();
         metadata.add("tags", "tag1");
         metadata.add("tags", "tag2");
         metadata.add("tags", "tag3");
-        String json =
-                ElasticsearchClient.metadataToJsonContainerInsert(
-                        metadata,
-                        ElasticsearchEmitterConfig.AttachmentStrategy
-                                .SEPARATE_DOCUMENTS);
+        String json = ESClient.metadataToJsonContainerInsert(
+                metadata, 
ESEmitterConfig.AttachmentStrategy.SEPARATE_DOCUMENTS);
         assertContains("tag1", json);
         assertContains("tag2", json);
         assertContains("tag3", json);

Reply via email to