This is an automated email from the ASF dual-hosted git repository. tallison pushed a commit to branch TIKA-4672-es-emitter in repository https://gitbox.apache.org/repos/asf/tika.git
commit df03103bb8d6b80ba51c8f11e802980d58ea9dc4 Author: tballison <[email protected]> AuthorDate: Fri Feb 20 11:30:18 2026 -0500 TIKA-4XXX: Add ES emitter and reporter pipes plugin Adds an Elasticsearch-compatible emitter and pipes reporter as a new tika-pipes plugin, avoiding use of the trademarked "Elasticsearch" name in class and package identifiers (plugin id: es-emitter). Key changes: - New plugin: tika-pipes-elasticsearch with ESEmitter, ESPipesReporter, ESEmitterConfig, ESEmitterFactory, ESPipesPlugin; all classes under org.apache.tika.pipes.{emitter,reporter,plugin}.es packages - ESClient talks directly to the _bulk REST endpoint using Apache HttpClient (ASL v2), avoiding the SSPL-licensed ES Java client - Reporter writes per-document parse status (PARSE_SUCCESS, PARSE_SUCCESS_WITH_EXCEPTION, EMIT_SUCCESS, OOM/TIMEOUT) to a dedicated ES index, mirroring the OpenSearch reporter - HttpClientFactory: add verifySsl flag (default false preserves existing trust-all behaviour); when true uses JVM default trust store and hostname verification; copy() propagates the flag - VectorSerializer: switch to BIG_ENDIAN byte order to match ES 9.3+ dense_vector base64 format (big-endian IEEE 754 float32) - Security: fix JSON field injection in ESClient.isValidJson by using JsonParser with explicit trailing-token check; override ESEmitterConfig.toString() to redact apiKey from logs - Integration tests: 6 Docker-based tests (Testcontainers, ES 9.3.0) covering basic FS→ES pipeline, reporter validation, RMETA/PARENT_CHILD attachment strategies, upsert, and kNN vector search; vector test uses base64 on ES >= 9.3, falls back to float-array bulk indexing on older versions - Documentation: ES emitter and reporter sections in pipes/index.adoc with configuration tables and SSL warning Co-authored-by: Cursor <[email protected]> --- docs/modules/ROOT/pages/pipes/index.adoc | 100 +++++++- .../pom.xml | 2 +- .../elasticsearch/tests/ElasticsearchTest.java | 237 ++++++++++++++++--- .../tests/ElasticsearchTestClient.java | 32 +-- .../elasticsearch-vector-mappings.json | 17 ++ .../resources/elasticsearch/plugins-template.json | 16 +- .../apache/tika/inference/VectorSerializer.java | 18 +- .../tika/inference/VectorSerializerTest.java | 13 ++ .../org/apache/tika/client/HttpClientFactory.java | 40 +++- .../tika-pipes-elasticsearch/pom.xml | 13 +- .../ElasticsearchClient.java => es/ESClient.java} | 181 ++++++--------- .../ESEmitter.java} | 54 +++-- .../ESEmitterConfig.java} | 39 ++-- .../ESEmitterFactory.java} | 14 +- .../tika/pipes/emitter/es/HttpClientConfig.java | 39 ++++ .../{elasticsearch => es}/JsonResponse.java | 3 +- .../ESPipesPlugin.java} | 16 +- .../tika/pipes/reporter/es/ESPipesReporter.java | 251 +++++++++++++++++++++ .../es/ESReporterConfig.java} | 24 +- .../es/ESReporterFactory.java} | 30 +-- .../src/main/resources/plugin.properties | 6 +- .../ESClientTest.java} | 108 ++++----- 22 files changed, 916 insertions(+), 337 deletions(-) diff --git a/docs/modules/ROOT/pages/pipes/index.adoc b/docs/modules/ROOT/pages/pipes/index.adoc index ff67ab6e0c..0f94b9c080 100644 --- a/docs/modules/ROOT/pages/pipes/index.adoc +++ b/docs/modules/ROOT/pages/pipes/index.adoc @@ -24,7 +24,7 @@ This section covers Tika Pipes for scalable, fault-tolerant document processing. Tika Pipes provides a framework for processing large volumes of documents with: * **Fetchers** - Retrieve documents from various sources (filesystem, S3, HTTP, etc.) -* **Emitters** - Send parsed results to various destinations (filesystem, OpenSearch, Solr, etc.) +* **Emitters** - Send parsed results to various destinations (filesystem, OpenSearch, ES-compatible, Solr, etc.) * **Pipelines** - Configure processing workflows == Topics @@ -39,6 +39,104 @@ Tika Pipes provides a framework for processing large volumes of documents with: // * link:configuration.html[Configuration] // * link:async.html[Async Processing] +== Emitters + +=== ES Emitter (`es-emitter`) + +The ES emitter sends parsed documents to any ES-compatible REST API (ES 7+/8+) via +the `_bulk` endpoint. It uses plain HTTP (Apache HttpClient) — there is no dependency +on the ES Java client, which carries a non-ASL license. + +[source,json] +---- +"emitters": { + "my-es": { + "es-emitter": { + "esUrl": "https://localhost:9200/my-index", + "idField": "_id", + "attachmentStrategy": "SEPARATE_DOCUMENTS", + "updateStrategy": "UPSERT", + "embeddedFileFieldName": "embedded", + "apiKey": "<base64-encoded id:api_key>" + } + } +} +---- + +[cols="1,1,3"] +|=== +|Field |Default |Description + +|`esUrl` +|_required_ +|Full URL including the index name, e.g. `https://localhost:9200/my-index` + +|`idField` +|`_id` +|Metadata field used as the document `_id` + +|`attachmentStrategy` +|`SEPARATE_DOCUMENTS` +|How embedded documents are stored. `SEPARATE_DOCUMENTS` gives each embedded +file its own flat document. `PARENT_CHILD` uses an ES join field so embedded +files are linked to their container via `relation_type`. + +|`updateStrategy` +|`OVERWRITE` +|`OVERWRITE` uses a bulk `index` action (full replace). +`UPSERT` uses a bulk `update` / `doc_as_upsert` action (field-level merge). + +|`embeddedFileFieldName` +|`embedded` +|Name of the join-field relation used in `PARENT_CHILD` mode. + +|`apiKey` +|_none_ +|Base64-encoded `id:api_key` sent as `Authorization: ApiKey <value>`. +Takes precedence over `httpClientConfig` basic auth. + +|`httpClientConfig` +|_none_ +|Optional block for `userName`, `password`, `authScheme`, `connectionTimeout`, +`socketTimeout`, `proxyHost`, `proxyPort`, and `verifySsl` (boolean, default `false`). +|=== + +[WARNING] +==== +By default (`verifySsl: false`) TLS certificate verification is disabled — all +certificates are trusted and hostname verification is skipped. Set +`httpClientConfig.verifySsl: true` to enable proper certificate and hostname +validation using the JVM's default trust store. When `verifySsl` is `false`, +do not transmit credentials over plain HTTP in production; prefer HTTPS with +network-level controls (VPN, private endpoint) until verification is enabled. +==== + +=== ES Pipes Reporter (`es-pipes-reporter`) + +The ES reporter writes per-document parse status back into the same index, +so you can query the processing outcome alongside the extracted content. + +[source,json] +---- +"pipes-reporters": { + "es-pipes-reporter": { + "esUrl": "https://localhost:9200/my-index", + "keyPrefix": "tika_", + "includeRouting": false + } +} +---- + +The reporter adds `<keyPrefix>parse_status`, `<keyPrefix>parse_time_ms`, +and (when the forked JVM exits abnormally) `<keyPrefix>exit_value` fields +to each document via an upsert. + +=== OpenSearch Emitter + +The OpenSearch emitter is configured identically but uses `opensearch-emitter` as the +plugin key and `openSearchUrl` as the URL field. It also ships with an +`opensearch-pipes-reporter`. + == Advanced Topics * xref:pipes/shared-server-mode.adoc[Shared Server Mode] - Experimental mode for reduced memory usage diff --git a/tika-integration-tests/tika-pipes-elasticsearch-integration-tests/pom.xml b/tika-integration-tests/tika-pipes-elasticsearch-integration-tests/pom.xml index ce6f15a746..4f879fee04 100644 --- a/tika-integration-tests/tika-pipes-elasticsearch-integration-tests/pom.xml +++ b/tika-integration-tests/tika-pipes-elasticsearch-integration-tests/pom.xml @@ -70,7 +70,7 @@ <artifactId>apache-rat-plugin</artifactId> <configuration> <inputExcludes> - <inputExclude>src/test/resources/elasticsearch/*.json</inputExclude> + <inputExclude>src/test/resources/elasticsearch/**</inputExclude> </inputExcludes> </configuration> </plugin> diff --git a/tika-integration-tests/tika-pipes-elasticsearch-integration-tests/src/test/java/org/apache/tika/pipes/elasticsearch/tests/ElasticsearchTest.java b/tika-integration-tests/tika-pipes-elasticsearch-integration-tests/src/test/java/org/apache/tika/pipes/elasticsearch/tests/ElasticsearchTest.java index 5bcbacef1f..44f13658bd 100644 --- a/tika-integration-tests/tika-pipes-elasticsearch-integration-tests/src/test/java/org/apache/tika/pipes/elasticsearch/tests/ElasticsearchTest.java +++ b/tika-integration-tests/tika-pipes-elasticsearch-integration-tests/src/test/java/org/apache/tika/pipes/elasticsearch/tests/ElasticsearchTest.java @@ -16,8 +16,9 @@ */ package org.apache.tika.pipes.elasticsearch.tests; -import static org.apache.tika.pipes.emitter.elasticsearch.ElasticsearchEmitter.DEFAULT_EMBEDDED_FILE_FIELD_NAME; +import static org.apache.tika.pipes.emitter.es.ESEmitter.DEFAULT_EMBEDDED_FILE_FIELD_NAME; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -28,8 +29,11 @@ import java.nio.charset.StandardCharsets; import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.Paths; +import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -57,9 +61,9 @@ import org.apache.tika.parser.ParseContext; import org.apache.tika.pipes.api.ParseMode; import org.apache.tika.pipes.api.emitter.Emitter; import org.apache.tika.pipes.core.emitter.EmitterManager; -import org.apache.tika.pipes.emitter.elasticsearch.ElasticsearchEmitterConfig; -import org.apache.tika.pipes.emitter.elasticsearch.HttpClientConfig; -import org.apache.tika.pipes.emitter.elasticsearch.JsonResponse; +import org.apache.tika.pipes.emitter.es.ESEmitterConfig; +import org.apache.tika.pipes.emitter.es.HttpClientConfig; +import org.apache.tika.pipes.emitter.es.JsonResponse; import org.apache.tika.plugins.TikaPluginManager; /** @@ -75,7 +79,7 @@ public class ElasticsearchTest { private static final DockerImageName ES_IMAGE = DockerImageName.parse( - "docker.elastic.co/elasticsearch/elasticsearch:8.17.0"); + "docker.elastic.co/elasticsearch/elasticsearch:9.3.0"); private static GenericContainer<?> CONTAINER; @@ -123,9 +127,8 @@ public class ElasticsearchTest { "elasticsearch-mappings.json"); runPipes(client, - ElasticsearchEmitterConfig.AttachmentStrategy - .SEPARATE_DOCUMENTS, - ElasticsearchEmitterConfig.UpdateStrategy.UPSERT, + ESEmitterConfig.AttachmentStrategy.SEPARATE_DOCUMENTS, + ESEmitterConfig.UpdateStrategy.UPSERT, ParseMode.CONCATENATE, endpoint, pipesDirectory, testDocDirectory); @@ -149,6 +152,36 @@ public class ElasticsearchTest { assertEquals(numHtmlDocs + numTestDocs, results.getJson().get("hits").get("total").get("value") .asInt()); + + // validate reporter fields + Map<String, Integer> statusCounts = new HashMap<>(); + for (JsonNode n : results.getJson().get("hits").get("hits")) { + String status = n.get("_source").get("my_test_parse_status").asText(); + long parseTimeMs = n.get("_source").get("my_test_parse_time_ms").asLong(); + Integer cnt = statusCounts.get(status); + statusCounts.put(status, cnt == null ? 1 : cnt + 1); + } + assertEquals(numHtmlDocs, (int) statusCounts.get("PARSE_SUCCESS"), + "should have had " + numHtmlDocs + " parse successes: " + statusCounts); + assertEquals(1, (int) statusCounts.get("PARSE_SUCCESS_WITH_EXCEPTION"), + "should have had 1 parse exception: " + statusCounts); + assertEquals(1, (int) statusCounts.get("EMIT_SUCCESS"), + "should have had 1 emit success: " + statusCounts); + assertEquals(2, numberOfCrashes(statusCounts), + "should have had 2 OOM or 1 OOM and 1 timeout: " + statusCounts); + } + + private int numberOfCrashes(Map<String, Integer> statusCounts) { + Integer oom = statusCounts.get("OOM"); + Integer timeout = statusCounts.get("TIMEOUT"); + int sum = 0; + if (oom != null) { + sum += oom; + } + if (timeout != null) { + sum += timeout; + } + return sum; } @Test @@ -165,8 +198,8 @@ public class ElasticsearchTest { "elasticsearch-parent-child-mappings.json"); runPipes(client, - ElasticsearchEmitterConfig.AttachmentStrategy.PARENT_CHILD, - ElasticsearchEmitterConfig.UpdateStrategy.OVERWRITE, + ESEmitterConfig.AttachmentStrategy.PARENT_CHILD, + ESEmitterConfig.UpdateStrategy.OVERWRITE, ParseMode.RMETA, endpoint, pipesDirectory, testDocDirectory); @@ -241,9 +274,8 @@ public class ElasticsearchTest { "elasticsearch-mappings.json"); runPipes(client, - ElasticsearchEmitterConfig.AttachmentStrategy - .SEPARATE_DOCUMENTS, - ElasticsearchEmitterConfig.UpdateStrategy.OVERWRITE, + ESEmitterConfig.AttachmentStrategy.SEPARATE_DOCUMENTS, + ESEmitterConfig.UpdateStrategy.OVERWRITE, ParseMode.RMETA, endpoint, pipesDirectory, testDocDirectory); @@ -323,9 +355,8 @@ public class ElasticsearchTest { "elasticsearch-mappings.json"); runPipes(client, - ElasticsearchEmitterConfig.AttachmentStrategy - .SEPARATE_DOCUMENTS, - ElasticsearchEmitterConfig.UpdateStrategy.UPSERT, + ESEmitterConfig.AttachmentStrategy.SEPARATE_DOCUMENTS, + ESEmitterConfig.UpdateStrategy.UPSERT, ParseMode.RMETA, endpoint, pipesDirectory, testDocDirectory); @@ -352,9 +383,8 @@ public class ElasticsearchTest { sendMappings(client, endpoint, TEST_INDEX, "elasticsearch-mappings.json"); Path pluginsConfigFile = getPluginsConfig(pipesDirectory, - ElasticsearchEmitterConfig.AttachmentStrategy - .SEPARATE_DOCUMENTS, - ElasticsearchEmitterConfig.UpdateStrategy.UPSERT, + ESEmitterConfig.AttachmentStrategy.SEPARATE_DOCUMENTS, + ESEmitterConfig.UpdateStrategy.UPSERT, ParseMode.RMETA, endpoint, testDocDirectory); TikaJsonConfig tikaJsonConfig = @@ -392,28 +422,167 @@ public class ElasticsearchTest { doc1.get("content").asText()); } + /** + * Verifies that documents with {@code dense_vector} fields are correctly + * ranked by kNN — no inference engine required, vectors are mocked directly. + * + * <p>ES 9.3 introduced native support for big-endian base64 float32 strings + * as an alternative to JSON float arrays for {@code dense_vector} ingestion, + * reducing per-element numeric parsing overhead by an order of magnitude. + * When the running ES instance is ≥ 9.3, this test uses the emitter with + * base64-encoded vectors (validating our encoding end-to-end). For older + * instances it falls back to indexing float arrays directly via the bulk + * REST API. + * + * <p>Three documents are inserted in a randomized order and the test asserts + * that kNN always returns them in cosine-distance order, confirming both + * correct encoding and correct ranking. + * + * <p>Distances from query [1, 0, 0]: + * <ul> + * <li>"near" [0.99, 0.14, 0] — cosine ≈ 0.990 (~8°)</li> + * <li>"mid" [1.0, 1.0, 0] — cosine ≈ 0.707 (45°)</li> + * <li>"far" [0.0, 1.0, 0] — cosine = 0.000 (90°)</li> + * </ul> + */ + @Test + public void testVectorSearch( + @TempDir Path pipesDirectory, + @TempDir Path testDocDirectory) throws Exception { + + ElasticsearchTestClient client = getNewClient(); + String endpoint = getEndpoint(); + sendMappings(client, endpoint, TEST_INDEX, + "elasticsearch-vector-mappings.json"); + + // Each entry is [docId, vector]. Distances from query [1, 0, 0] + // are strictly ordered: near > mid > far, so ranking is unambiguous. + List<Object[]> docs = new ArrayList<>(Arrays.asList( + new Object[]{"near", new float[]{0.99f, 0.14f, 0.0f}}, + new Object[]{"mid", new float[]{1.0f, 1.0f, 0.0f}}, + new Object[]{"far", new float[]{0.0f, 1.0f, 0.0f}} + )); + + // Randomize insertion order — ranking must be independent of it. + Collections.shuffle(docs); + + if (esVersionAtLeast(client, 9, 3)) { + // ES 9.3+: use the emitter; metadata string values flow through as + // JSON strings and ES decodes the base64 float32 encoding natively. + Path pluginsConfigFile = getPluginsConfig(pipesDirectory, + ESEmitterConfig.AttachmentStrategy.SEPARATE_DOCUMENTS, + ESEmitterConfig.UpdateStrategy.UPSERT, + ParseMode.RMETA, endpoint, testDocDirectory); + TikaJsonConfig tikaJsonConfig = TikaJsonConfig.load(pluginsConfigFile); + Emitter emitter = EmitterManager + .load(TikaPluginManager.load(tikaJsonConfig), tikaJsonConfig) + .getEmitter(); + + for (Object[] doc : docs) { + String id = (String) doc[0]; + float[] vec = (float[]) doc[1]; + Metadata metadata = new Metadata(); + metadata.set("content", id + " document"); + metadata.set("vector", base64Vector(vec)); + emitter.emit(id, Collections.singletonList(metadata), + new ParseContext()); + } + } else { + // Older ES: fall back to direct bulk indexing with JSON float arrays. + StringBuilder bulk = new StringBuilder(); + for (Object[] doc : docs) { + String id = (String) doc[0]; + float[] vec = (float[]) doc[1]; + bulk.append("{\"index\":{\"_id\":\"").append(id).append("\"}}\n"); + bulk.append("{\"content\":\"").append(id) + .append(" document\",\"vector\":").append(floatArrayJson(vec)) + .append("}\n"); + } + JsonResponse bulkResp = + client.postJson(endpoint + "/_bulk", bulk.toString()); + assertEquals(200, bulkResp.getStatus(), + "bulk index failed: " + bulkResp.getMsg()); + assertFalse(bulkResp.getJson().get("errors").asBoolean(), + "bulk index had errors: " + bulkResp.getJson()); + } + + client.getJson(endpoint + "/_refresh"); + + String knn = "{ \"knn\": { \"field\": \"vector\", " + + "\"query_vector\": [1.0, 0.0, 0.0], " + + "\"k\": 3, \"num_candidates\": 100 } }"; + JsonResponse results = client.postJson(endpoint + "/_search", knn); + assertEquals(200, results.getStatus(), + "kNN search failed: " + results.getMsg()); + + JsonNode hits = results.getJson().get("hits").get("hits"); + assertEquals(3, hits.size(), "expected all 3 docs in kNN results"); + assertEquals("near", hits.get(0).get("_id").asText(), + "nearest doc should rank first"); + assertEquals("mid", hits.get(1).get("_id").asText(), + "mid doc should rank second"); + assertEquals("far", hits.get(2).get("_id").asText(), + "farthest doc should rank last"); + } + + /** + * Returns true if the running ES instance version is at least + * {@code major}.{@code minor}. + */ + private boolean esVersionAtLeast(ElasticsearchTestClient client, + int major, int minor) throws Exception { + JsonResponse resp = client.getJson(getBaseUrl() + "/"); + String version = resp.getJson().get("version").get("number").asText(); + String[] parts = version.split("\\."); + int maj = Integer.parseInt(parts[0]); + int min = Integer.parseInt(parts[1]); + return maj > major || (maj == major && min >= minor); + } + + /** + * Encodes a float vector as big-endian base64, matching the format + * accepted by ES 9.3+ for {@code dense_vector} fields. + */ + private static String base64Vector(float... floats) { + java.nio.ByteBuffer buf = java.nio.ByteBuffer.allocate(floats.length * Float.BYTES) + .order(java.nio.ByteOrder.BIG_ENDIAN); + buf.asFloatBuffer().put(floats); + return java.util.Base64.getEncoder().encodeToString(buf.array()); + } + + /** Formats a float array as a JSON number array, e.g. {@code [0.99,0.14,0.0]}. */ + private static String floatArrayJson(float[] vec) { + StringBuilder sb = new StringBuilder("["); + for (int i = 0; i < vec.length; i++) { + if (i > 0) sb.append(','); + sb.append(vec[i]); + } + return sb.append(']').toString(); + } + // ----------------------------------------------------------------- // Helpers // ----------------------------------------------------------------- + private String getBaseUrl() { + return "http://" + CONTAINER.getHost() + ":" + CONTAINER.getMappedPort(9200); + } + private String getEndpoint() { - return "http://" + CONTAINER.getHost() + ":" + - CONTAINER.getMappedPort(9200) + "/" + TEST_INDEX; + return getBaseUrl() + "/" + TEST_INDEX; } private ElasticsearchTestClient getNewClient() throws TikaConfigException { HttpClientFactory httpClientFactory = new HttpClientFactory(); - ElasticsearchEmitterConfig config = - new ElasticsearchEmitterConfig( + ESEmitterConfig config = + new ESEmitterConfig( getEndpoint(), "_id", - ElasticsearchEmitterConfig.AttachmentStrategy - .SEPARATE_DOCUMENTS, - ElasticsearchEmitterConfig.UpdateStrategy - .OVERWRITE, + ESEmitterConfig.AttachmentStrategy.SEPARATE_DOCUMENTS, + ESEmitterConfig.UpdateStrategy.OVERWRITE, 10, DEFAULT_EMBEDDED_FILE_FIELD_NAME, null, new HttpClientConfig(null, null, null, - -1, -1, null, -1)); + -1, -1, null, 0, false)); return new ElasticsearchTestClient(config, httpClientFactory.build()); } @@ -446,8 +615,8 @@ public class ElasticsearchTest { private void runPipes( ElasticsearchTestClient client, - ElasticsearchEmitterConfig.AttachmentStrategy attachStrat, - ElasticsearchEmitterConfig.UpdateStrategy updateStrat, + ESEmitterConfig.AttachmentStrategy attachStrat, + ESEmitterConfig.UpdateStrategy updateStrat, ParseMode parseMode, String endpoint, Path pipesDirectory, Path testDocDirectory) throws Exception { @@ -466,8 +635,8 @@ public class ElasticsearchTest { @NotNull private Path getPluginsConfig( Path pipesDirectory, - ElasticsearchEmitterConfig.AttachmentStrategy attachStrat, - ElasticsearchEmitterConfig.UpdateStrategy updateStrat, + ESEmitterConfig.AttachmentStrategy attachStrat, + ESEmitterConfig.UpdateStrategy updateStrat, ParseMode parseMode, String endpoint, Path testDocDirectory) throws IOException { @@ -480,9 +649,7 @@ public class ElasticsearchTest { Files.copy(is, log4jPropFile); } - boolean includeRouting = (attachStrat == - ElasticsearchEmitterConfig.AttachmentStrategy - .PARENT_CHILD); + boolean includeRouting = (attachStrat == ESEmitterConfig.AttachmentStrategy.PARENT_CHILD); Map<String, Object> replacements = new HashMap<>(); replacements.put("ATTACHMENT_STRATEGY", diff --git a/tika-integration-tests/tika-pipes-elasticsearch-integration-tests/src/test/java/org/apache/tika/pipes/elasticsearch/tests/ElasticsearchTestClient.java b/tika-integration-tests/tika-pipes-elasticsearch-integration-tests/src/test/java/org/apache/tika/pipes/elasticsearch/tests/ElasticsearchTestClient.java index 3808609924..387e6f6164 100644 --- a/tika-integration-tests/tika-pipes-elasticsearch-integration-tests/src/test/java/org/apache/tika/pipes/elasticsearch/tests/ElasticsearchTestClient.java +++ b/tika-integration-tests/tika-pipes-elasticsearch-integration-tests/src/test/java/org/apache/tika/pipes/elasticsearch/tests/ElasticsearchTestClient.java @@ -33,18 +33,16 @@ import org.apache.http.client.methods.HttpPut; import org.apache.http.entity.ByteArrayEntity; import org.apache.http.util.EntityUtils; -import org.apache.tika.pipes.emitter.elasticsearch.ElasticsearchClient; -import org.apache.tika.pipes.emitter.elasticsearch.ElasticsearchEmitterConfig; -import org.apache.tika.pipes.emitter.elasticsearch.JsonResponse; +import org.apache.tika.pipes.emitter.es.ESClient; +import org.apache.tika.pipes.emitter.es.ESEmitterConfig; +import org.apache.tika.pipes.emitter.es.JsonResponse; /** - * Extends ElasticsearchClient with GET, PUT, and DELETE for integration - * testing purposes. + * Extends ESClient with GET, PUT, and DELETE for integration testing purposes. */ -public class ElasticsearchTestClient extends ElasticsearchClient { +public class ElasticsearchTestClient extends ESClient { - public ElasticsearchTestClient(ElasticsearchEmitterConfig config, - HttpClient httpClient) { + public ElasticsearchTestClient(ESEmitterConfig config, HttpClient httpClient) { super(config, httpClient); } @@ -54,8 +52,7 @@ public class ElasticsearchTestClient extends ElasticsearchClient { new ByteArrayEntity(json.getBytes(StandardCharsets.UTF_8)); httpRequest.setEntity(entity); httpRequest.setHeader("Accept", "application/json"); - httpRequest.setHeader("Content-type", - "application/json; charset=utf-8"); + httpRequest.setHeader("Content-type", "application/json; charset=utf-8"); HttpResponse response = null; try { @@ -72,9 +69,7 @@ public class ElasticsearchTestClient extends ElasticsearchClient { } } else { return new JsonResponse(status, - new String( - EntityUtils.toByteArray( - response.getEntity()), + new String(EntityUtils.toByteArray(response.getEntity()), StandardCharsets.UTF_8)); } } finally { @@ -88,8 +83,7 @@ public class ElasticsearchTestClient extends ElasticsearchClient { public JsonResponse getJson(String url) throws IOException { HttpGet httpRequest = new HttpGet(url); httpRequest.setHeader("Accept", "application/json"); - httpRequest.setHeader("Content-type", - "application/json; charset=utf-8"); + httpRequest.setHeader("Content-type", "application/json; charset=utf-8"); HttpResponse response = null; try { @@ -106,9 +100,7 @@ public class ElasticsearchTestClient extends ElasticsearchClient { } } else { return new JsonResponse(status, - new String( - EntityUtils.toByteArray( - response.getEntity()), + new String(EntityUtils.toByteArray(response.getEntity()), StandardCharsets.UTF_8)); } } finally { @@ -136,9 +128,7 @@ public class ElasticsearchTestClient extends ElasticsearchClient { } } else { return new JsonResponse(status, - new String( - EntityUtils.toByteArray( - response.getEntity()), + new String(EntityUtils.toByteArray(response.getEntity()), StandardCharsets.UTF_8)); } } finally { diff --git a/tika-integration-tests/tika-pipes-elasticsearch-integration-tests/src/test/resources/elasticsearch/elasticsearch-vector-mappings.json b/tika-integration-tests/tika-pipes-elasticsearch-integration-tests/src/test/resources/elasticsearch/elasticsearch-vector-mappings.json new file mode 100644 index 0000000000..a52187f892 --- /dev/null +++ b/tika-integration-tests/tika-pipes-elasticsearch-integration-tests/src/test/resources/elasticsearch/elasticsearch-vector-mappings.json @@ -0,0 +1,17 @@ +{ + "settings": { + "number_of_shards": 1 + }, + "mappings": { + "properties": { + "content": { "type": "text" }, + "vector": { + "type": "dense_vector", + "dims": 3, + "index": true, + "similarity": "cosine", + "index_options": { "type": "flat" } + } + } + } +} diff --git a/tika-integration-tests/tika-pipes-elasticsearch-integration-tests/src/test/resources/elasticsearch/plugins-template.json b/tika-integration-tests/tika-pipes-elasticsearch-integration-tests/src/test/resources/elasticsearch/plugins-template.json index 2975b9d85f..c5ae829b5c 100644 --- a/tika-integration-tests/tika-pipes-elasticsearch-integration-tests/src/test/resources/elasticsearch/plugins-template.json +++ b/tika-integration-tests/tika-pipes-elasticsearch-integration-tests/src/test/resources/elasticsearch/plugins-template.json @@ -15,8 +15,8 @@ }, "emitters": { "ese": { - "elasticsearch-emitter": { - "elasticsearchUrl": "ELASTICSEARCH_URL", + "es-emitter": { + "esUrl": "ELASTICSEARCH_URL", "updateStrategy": "UPDATE_STRATEGY", "attachmentStrategy": "ATTACHMENT_STRATEGY", "commitWithin": 10, @@ -30,6 +30,18 @@ } } }, + "pipes-reporters": { + "es-pipes-reporter": { + "esUrl": "ELASTICSEARCH_URL", + "keyPrefix": "my_test_", + "includeRouting": "INCLUDE_ROUTING", + "httpClientConfig": { + "authScheme": "http", + "connectionTimeout": 60, + "socketTimeout": 60 + } + } + }, "pipes-iterator": { "file-system-pipes-iterator": { "basePath": "FETCHER_BASE_PATH", diff --git a/tika-parsers/tika-parsers-ml/tika-inference/src/main/java/org/apache/tika/inference/VectorSerializer.java b/tika-parsers/tika-parsers-ml/tika-inference/src/main/java/org/apache/tika/inference/VectorSerializer.java index f350fee123..6c6500c698 100644 --- a/tika-parsers/tika-parsers-ml/tika-inference/src/main/java/org/apache/tika/inference/VectorSerializer.java +++ b/tika-parsers/tika-parsers-ml/tika-inference/src/main/java/org/apache/tika/inference/VectorSerializer.java @@ -22,9 +22,13 @@ import java.nio.FloatBuffer; import java.util.Base64; /** - * Serializes and deserializes float vectors as base64-encoded little-endian - * float32 byte arrays. Little-endian matches numpy/PyTorch convention so - * vectors from Python inference servers round-trip cleanly. + * Serializes and deserializes float vectors as base64-encoded big-endian + * float32 byte arrays. + * + * <p>Big-endian matches the format expected by Elasticsearch's + * {@code dense_vector} field type, which accepts either a JSON float array + * or a base64-encoded binary string in big-endian float32 order. + * See the Elasticsearch dense_vector mapping documentation for details. */ public final class VectorSerializer { @@ -32,22 +36,22 @@ public final class VectorSerializer { } /** - * Encode a float array as a base64 string (little-endian float32). + * Encode a float array as a base64 string (big-endian float32). */ public static String encode(float[] vector) { ByteBuffer buf = ByteBuffer.allocate(vector.length * Float.BYTES) - .order(ByteOrder.LITTLE_ENDIAN); + .order(ByteOrder.BIG_ENDIAN); buf.asFloatBuffer().put(vector); return Base64.getEncoder().encodeToString(buf.array()); } /** - * Decode a base64 string back to a float array (little-endian float32). + * Decode a base64 string back to a float array (big-endian float32). */ public static float[] decode(String base64) { byte[] bytes = Base64.getDecoder().decode(base64); FloatBuffer fb = ByteBuffer.wrap(bytes) - .order(ByteOrder.LITTLE_ENDIAN) + .order(ByteOrder.BIG_ENDIAN) .asFloatBuffer(); float[] vector = new float[fb.remaining()]; fb.get(vector); diff --git a/tika-parsers/tika-parsers-ml/tika-inference/src/test/java/org/apache/tika/inference/VectorSerializerTest.java b/tika-parsers/tika-parsers-ml/tika-inference/src/test/java/org/apache/tika/inference/VectorSerializerTest.java index b3fd5d22b5..bee9db1e54 100644 --- a/tika-parsers/tika-parsers-ml/tika-inference/src/test/java/org/apache/tika/inference/VectorSerializerTest.java +++ b/tika-parsers/tika-parsers-ml/tika-inference/src/test/java/org/apache/tika/inference/VectorSerializerTest.java @@ -40,6 +40,19 @@ public class VectorSerializerTest { assertEquals(0, decoded.length); } + /** + * Pins the byte order to big-endian using the exact example from the + * Elasticsearch dense_vector documentation: [0.5, 10, 6] encodes to + * "PwAAAEEgAABAwAAA". If this test fails the byte order has been changed + * and ES indexing of vectors will silently produce wrong results. + */ + @Test + void testKnownElasticsearchBase64() { + float[] vec = {0.5f, 10.0f, 6.0f}; + assertEquals("PwAAAEEgAABAwAAA", VectorSerializer.encode(vec)); + assertArrayEquals(vec, VectorSerializer.decode("PwAAAEEgAABAwAAA"), 1e-6f); + } + @Test void testLargeVector() { float[] large = new float[768]; diff --git a/tika-pipes/tika-httpclient-commons/src/main/java/org/apache/tika/client/HttpClientFactory.java b/tika-pipes/tika-httpclient-commons/src/main/java/org/apache/tika/client/HttpClientFactory.java index 4919c17aee..296323a90b 100644 --- a/tika-pipes/tika-httpclient-commons/src/main/java/org/apache/tika/client/HttpClientFactory.java +++ b/tika-pipes/tika-httpclient-commons/src/main/java/org/apache/tika/client/HttpClientFactory.java @@ -109,6 +109,7 @@ public class HttpClientFactory { private String authScheme = "basic"; //ntlm or basic private boolean credentialsAESEncrypted = false; private boolean disableContentCompression = false; + private boolean verifySsl = false; public String getProxyHost() { return proxyHost; @@ -237,6 +238,10 @@ public class HttpClientFactory { this.disableContentCompression = disableContentCompression; } + public void setVerifySsl(boolean verifySsl) { + this.verifySsl = verifySsl; + } + public HttpClientFactory copy() throws TikaConfigException { HttpClientFactory cp = new HttpClientFactory(); cp.setAllowedHostsForRedirect(new HashSet<>(allowedHostsForRedirect)); @@ -249,28 +254,36 @@ public class HttpClientFactory { cp.setMaxConnections(maxConnections); cp.setNtDomain(ntDomain); cp.setPassword(password); + cp.setUserName(userName); cp.setProxyHost(proxyHost); cp.setProxyPort(proxyPort); cp.setRequestTimeout(requestTimeout); cp.setSocketTimeout(socketTimeout); + cp.setVerifySsl(verifySsl); return cp; } public HttpClient build() throws TikaConfigException { - LOG.info("http client does not verify ssl at this point. " + - "If you need that, please open a ticket."); - TrustStrategy acceptingTrustStrategy = (cert, authType) -> true; SSLContext sslContext = null; - try { - sslContext = - SSLContexts.custom().loadTrustMaterial( - null, acceptingTrustStrategy).build(); - } catch (NoSuchAlgorithmException | KeyManagementException | KeyStoreException e) { - throw new TikaConfigException("", e); + SSLConnectionSocketFactory sslsf; + if (verifySsl) { + sslContext = SSLContexts.createDefault(); + sslsf = new SSLConnectionSocketFactory(sslContext, + SSLConnectionSocketFactory.getDefaultHostnameVerifier()); + } else { + LOG.info("http client does not verify ssl at this point. " + + "If you need that, please open a ticket."); + TrustStrategy acceptingTrustStrategy = (cert, authType) -> true; + try { + sslContext = + SSLContexts.custom().loadTrustMaterial( + null, acceptingTrustStrategy).build(); + } catch (NoSuchAlgorithmException | KeyManagementException | KeyStoreException e) { + throw new TikaConfigException("", e); + } + sslsf = new SSLConnectionSocketFactory(sslContext, NoopHostnameVerifier.INSTANCE); } - SSLConnectionSocketFactory sslsf = - new SSLConnectionSocketFactory(sslContext, NoopHostnameVerifier.INSTANCE); Registry<ConnectionSocketFactory> socketFactoryRegistry = RegistryBuilder.<ConnectionSocketFactory>create().register("https", sslsf) @@ -294,7 +307,10 @@ public class HttpClientFactory { .setConnectionRequestTimeout(requestTimeout) .setConnectionRequestTimeout(connectTimeout).setSocketTimeout(socketTimeout) .build()).setKeepAliveStrategy(getKeepAliveStrategy()) - .setSSLSocketFactory(sslsf).setSSLHostnameVerifier(NoopHostnameVerifier.INSTANCE) + .setSSLSocketFactory(sslsf) + .setSSLHostnameVerifier(verifySsl + ? SSLConnectionSocketFactory.getDefaultHostnameVerifier() + : NoopHostnameVerifier.INSTANCE) .build(); } diff --git a/tika-pipes/tika-pipes-plugins/tika-pipes-elasticsearch/pom.xml b/tika-pipes/tika-pipes-plugins/tika-pipes-elasticsearch/pom.xml index 4995637318..c621edf54d 100644 --- a/tika-pipes/tika-pipes-plugins/tika-pipes-elasticsearch/pom.xml +++ b/tika-pipes/tika-pipes-plugins/tika-pipes-elasticsearch/pom.xml @@ -51,6 +51,11 @@ <artifactId>tika-httpclient-commons</artifactId> <version>${project.version}</version> </dependency> + <dependency> + <groupId>${project.groupId}</groupId> + <artifactId>tika-pipes-reporter-commons</artifactId> + <version>${project.version}</version> + </dependency> <dependency> <groupId>org.eclipse.jetty</groupId> <artifactId>jetty-io</artifactId> @@ -88,11 +93,11 @@ <configuration> <archive> <manifestEntries> - <Plugin-Id>elasticsearch-emitter</Plugin-Id> + <Plugin-Id>es-emitter</Plugin-Id> <Plugin-Version>${project.version}</Plugin-Version> - <Plugin-Class>org.apache.tika.pipes.plugin.elasticsearch.ElasticsearchPipesPlugin</Plugin-Class> - <Plugin-Provider>Elasticsearch Emitter</Plugin-Provider> - <Plugin-Description>Elasticsearch emitter (plain HTTP, no ES client)</Plugin-Description> + <Plugin-Class>org.apache.tika.pipes.plugin.es.ESPipesPlugin</Plugin-Class> + <Plugin-Provider>ES Emitter</Plugin-Provider> + <Plugin-Description>ES emitter (plain HTTP, no ES client)</Plugin-Description> <Plugin-Dependencies></Plugin-Dependencies> </manifestEntries> </archive> diff --git a/tika-pipes/tika-pipes-plugins/tika-pipes-elasticsearch/src/main/java/org/apache/tika/pipes/emitter/elasticsearch/ElasticsearchClient.java b/tika-pipes/tika-pipes-plugins/tika-pipes-elasticsearch/src/main/java/org/apache/tika/pipes/emitter/es/ESClient.java similarity index 67% rename from tika-pipes/tika-pipes-plugins/tika-pipes-elasticsearch/src/main/java/org/apache/tika/pipes/emitter/elasticsearch/ElasticsearchClient.java rename to tika-pipes/tika-pipes-plugins/tika-pipes-elasticsearch/src/main/java/org/apache/tika/pipes/emitter/es/ESClient.java index 919c12ab9b..81965594d9 100644 --- a/tika-pipes/tika-pipes-plugins/tika-pipes-elasticsearch/src/main/java/org/apache/tika/pipes/emitter/elasticsearch/ElasticsearchClient.java +++ b/tika-pipes/tika-pipes-plugins/tika-pipes-elasticsearch/src/main/java/org/apache/tika/pipes/emitter/es/ESClient.java @@ -14,7 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.tika.pipes.emitter.elasticsearch; +package org.apache.tika.pipes.emitter.es; import java.io.BufferedReader; import java.io.IOException; @@ -28,6 +28,7 @@ import java.util.UUID; import com.fasterxml.jackson.core.JsonFactory; import com.fasterxml.jackson.core.JsonGenerator; +import com.fasterxml.jackson.core.JsonParser; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.http.HttpResponse; @@ -45,33 +46,29 @@ import org.apache.tika.pipes.api.emitter.EmitData; import org.apache.tika.utils.StringUtils; /** - * Plain HTTP client for Elasticsearch's REST API. + * Plain HTTP client for the ES REST API. * - * <p>This does <b>not</b> use the Elasticsearch Java client library + * <p>This does <b>not</b> use the ES Java client library * (which is SSPL / Elastic License). Instead it talks directly to - * Elasticsearch's {@code _bulk} REST endpoint using Apache HttpClient - * (ASL v2). + * the {@code _bulk} REST endpoint using Apache HttpClient (ASL v2). * * <p>Supports API key authentication ({@code Authorization: ApiKey ...}) * as well as basic auth via the underlying {@link HttpClient}. */ -public class ElasticsearchClient { +public class ESClient { - private static final Logger LOG = - LoggerFactory.getLogger(ElasticsearchClient.class); + private static final Logger LOG = LoggerFactory.getLogger(ESClient.class); protected final HttpClient httpClient; private final MetadataToJsonWriter metadataToJsonWriter; - private final ElasticsearchEmitterConfig config; + private final ESEmitterConfig config; - protected ElasticsearchClient(ElasticsearchEmitterConfig config, - HttpClient httpClient) { + protected ESClient(ESEmitterConfig config, HttpClient httpClient) { this.config = config; this.httpClient = httpClient; this.metadataToJsonWriter = - (config.updateStrategy() == - ElasticsearchEmitterConfig.UpdateStrategy.OVERWRITE) + (config.updateStrategy() == ESEmitterConfig.UpdateStrategy.OVERWRITE) ? new InsertMetadataToJsonWriter() : new UpsertMetadataToJsonWriter(); } @@ -94,16 +91,14 @@ public class ElasticsearchClient { private void emitJson(StringBuilder json) throws IOException, TikaClientException { - String requestUrl = config.elasticsearchUrl() + "/_bulk"; + String requestUrl = config.esUrl() + "/_bulk"; JsonResponse response = postJson(requestUrl, json.toString()); if (response.getStatus() != 200) { throw new TikaClientException(response.getMsg()); } else { - // If there's a single error in the bulk response, throw JsonNode errorNode = response.getJson().get("errors"); if (errorNode != null && errorNode.asText().equals("true")) { - throw new TikaClientException( - response.getJson().toString()); + throw new TikaClientException(response.getJson().toString()); } } } @@ -113,16 +108,15 @@ public class ElasticsearchClient { int i = 0; String routing = (config.attachmentStrategy() == - ElasticsearchEmitterConfig.AttachmentStrategy - .PARENT_CHILD) ? emitKey : null; + ESEmitterConfig.AttachmentStrategy.PARENT_CHILD) + ? emitKey : null; for (Metadata metadata : metadataList) { StringBuilder id = new StringBuilder(emitKey); if (i > 0) { id.append("-").append(UUID.randomUUID()); } - String indexJson = - metadataToJsonWriter.getBulkJson(id.toString(), routing); + String indexJson = metadataToJsonWriter.getBulkJson(id.toString(), routing); json.append(indexJson).append("\n"); if (i == 0) { json.append(metadataToJsonWriter.writeContainer( @@ -140,7 +134,7 @@ public class ElasticsearchClient { // Package-private for testing static String metadataToJsonContainerInsert( Metadata metadata, - ElasticsearchEmitterConfig.AttachmentStrategy attachmentStrategy) + ESEmitterConfig.AttachmentStrategy attachmentStrategy) throws IOException { return new InsertMetadataToJsonWriter().writeContainer( metadata, attachmentStrategy); @@ -149,27 +143,22 @@ public class ElasticsearchClient { // Package-private for testing static String metadataToJsonEmbeddedInsert( Metadata metadata, - ElasticsearchEmitterConfig.AttachmentStrategy attachmentStrategy, + ESEmitterConfig.AttachmentStrategy attachmentStrategy, String emitKey, String embeddedFileFieldName) throws IOException { return new InsertMetadataToJsonWriter().writeEmbedded( - metadata, attachmentStrategy, emitKey, - embeddedFileFieldName); + metadata, attachmentStrategy, emitKey, embeddedFileFieldName); } public JsonResponse postJson(String url, String json) throws IOException { HttpPost httpRequest = new HttpPost(url); - StringEntity entity = - new StringEntity(json, StandardCharsets.UTF_8); + StringEntity entity = new StringEntity(json, StandardCharsets.UTF_8); httpRequest.setEntity(entity); httpRequest.setHeader("Accept", "application/json"); - httpRequest.setHeader("Content-type", - "application/json; charset=utf-8"); + httpRequest.setHeader("Content-type", "application/json; charset=utf-8"); - // ES 8.x API key auth if (!StringUtils.isEmpty(config.apiKey())) { - httpRequest.setHeader("Authorization", - "ApiKey " + config.apiKey()); + httpRequest.setHeader("Authorization", "ApiKey " + config.apiKey()); } HttpResponse response = null; @@ -190,9 +179,7 @@ public class ElasticsearchClient { } } else { return new JsonResponse(status, - new String( - EntityUtils.toByteArray( - response.getEntity()), + new String(EntityUtils.toByteArray(response.getEntity()), StandardCharsets.UTF_8)); } } finally { @@ -208,36 +195,29 @@ public class ElasticsearchClient { // ----------------------------------------------------------------------- private interface MetadataToJsonWriter { - String writeContainer( - Metadata metadata, - ElasticsearchEmitterConfig.AttachmentStrategy strategy) + String writeContainer(Metadata metadata, + ESEmitterConfig.AttachmentStrategy strategy) throws IOException; - String writeEmbedded( - Metadata metadata, - ElasticsearchEmitterConfig.AttachmentStrategy strategy, - String emitKey, String embeddedFileFieldName) + String writeEmbedded(Metadata metadata, + ESEmitterConfig.AttachmentStrategy strategy, + String emitKey, String embeddedFileFieldName) throws IOException; String getBulkJson(String id, String routing) throws IOException; } - private static class InsertMetadataToJsonWriter - implements MetadataToJsonWriter { + private static class InsertMetadataToJsonWriter implements MetadataToJsonWriter { @Override - public String writeContainer( - Metadata metadata, - ElasticsearchEmitterConfig.AttachmentStrategy strategy) + public String writeContainer(Metadata metadata, + ESEmitterConfig.AttachmentStrategy strategy) throws IOException { StringWriter writer = new StringWriter(); - try (JsonGenerator jg = - new JsonFactory().createGenerator(writer)) { + try (JsonGenerator jg = new JsonFactory().createGenerator(writer)) { jg.writeStartObject(); writeMetadata(metadata, jg); - if (strategy == - ElasticsearchEmitterConfig.AttachmentStrategy - .PARENT_CHILD) { + if (strategy == ESEmitterConfig.AttachmentStrategy.PARENT_CHILD) { jg.writeStringField("relation_type", "container"); } jg.writeEndObject(); @@ -246,26 +226,20 @@ public class ElasticsearchClient { } @Override - public String writeEmbedded( - Metadata metadata, - ElasticsearchEmitterConfig.AttachmentStrategy strategy, - String emitKey, String embeddedFileFieldName) + public String writeEmbedded(Metadata metadata, + ESEmitterConfig.AttachmentStrategy strategy, + String emitKey, String embeddedFileFieldName) throws IOException { StringWriter writer = new StringWriter(); - try (JsonGenerator jg = - new JsonFactory().createGenerator(writer)) { + try (JsonGenerator jg = new JsonFactory().createGenerator(writer)) { jg.writeStartObject(); writeMetadata(metadata, jg); - if (strategy == - ElasticsearchEmitterConfig.AttachmentStrategy - .PARENT_CHILD) { + if (strategy == ESEmitterConfig.AttachmentStrategy.PARENT_CHILD) { jg.writeObjectFieldStart("relation_type"); jg.writeStringField("name", embeddedFileFieldName); jg.writeStringField("parent", emitKey); jg.writeEndObject(); - } else if (strategy == - ElasticsearchEmitterConfig.AttachmentStrategy - .SEPARATE_DOCUMENTS) { + } else if (strategy == ESEmitterConfig.AttachmentStrategy.SEPARATE_DOCUMENTS) { jg.writeStringField("parent", emitKey); } jg.writeEndObject(); @@ -274,11 +248,9 @@ public class ElasticsearchClient { } @Override - public String getBulkJson(String id, String routing) - throws IOException { + public String getBulkJson(String id, String routing) throws IOException { StringWriter writer = new StringWriter(); - try (JsonGenerator jg = - new JsonFactory().createGenerator(writer)) { + try (JsonGenerator jg = new JsonFactory().createGenerator(writer)) { jg.writeStartObject(); jg.writeObjectFieldStart("index"); jg.writeStringField("_id", id); @@ -292,23 +264,18 @@ public class ElasticsearchClient { } } - private static class UpsertMetadataToJsonWriter - implements MetadataToJsonWriter { + private static class UpsertMetadataToJsonWriter implements MetadataToJsonWriter { @Override - public String writeContainer( - Metadata metadata, - ElasticsearchEmitterConfig.AttachmentStrategy strategy) + public String writeContainer(Metadata metadata, + ESEmitterConfig.AttachmentStrategy strategy) throws IOException { StringWriter writer = new StringWriter(); - try (JsonGenerator jg = - new JsonFactory().createGenerator(writer)) { + try (JsonGenerator jg = new JsonFactory().createGenerator(writer)) { jg.writeStartObject(); jg.writeObjectFieldStart("doc"); writeMetadata(metadata, jg); - if (strategy == - ElasticsearchEmitterConfig.AttachmentStrategy - .PARENT_CHILD) { + if (strategy == ESEmitterConfig.AttachmentStrategy.PARENT_CHILD) { jg.writeStringField("relation_type", "container"); } jg.writeEndObject(); @@ -319,27 +286,21 @@ public class ElasticsearchClient { } @Override - public String writeEmbedded( - Metadata metadata, - ElasticsearchEmitterConfig.AttachmentStrategy strategy, - String emitKey, String embeddedFileFieldName) + public String writeEmbedded(Metadata metadata, + ESEmitterConfig.AttachmentStrategy strategy, + String emitKey, String embeddedFileFieldName) throws IOException { StringWriter writer = new StringWriter(); - try (JsonGenerator jg = - new JsonFactory().createGenerator(writer)) { + try (JsonGenerator jg = new JsonFactory().createGenerator(writer)) { jg.writeStartObject(); jg.writeObjectFieldStart("doc"); writeMetadata(metadata, jg); - if (strategy == - ElasticsearchEmitterConfig.AttachmentStrategy - .PARENT_CHILD) { + if (strategy == ESEmitterConfig.AttachmentStrategy.PARENT_CHILD) { jg.writeObjectFieldStart("relation_type"); jg.writeStringField("name", embeddedFileFieldName); jg.writeStringField("parent", emitKey); jg.writeEndObject(); - } else if (strategy == - ElasticsearchEmitterConfig.AttachmentStrategy - .SEPARATE_DOCUMENTS) { + } else if (strategy == ESEmitterConfig.AttachmentStrategy.SEPARATE_DOCUMENTS) { jg.writeStringField("parent", emitKey); } jg.writeEndObject(); @@ -350,11 +311,9 @@ public class ElasticsearchClient { } @Override - public String getBulkJson(String id, String routing) - throws IOException { + public String getBulkJson(String id, String routing) throws IOException { StringWriter writer = new StringWriter(); - try (JsonGenerator jg = - new JsonFactory().createGenerator(writer)) { + try (JsonGenerator jg = new JsonFactory().createGenerator(writer)) { jg.writeStartObject(); jg.writeObjectFieldStart("update"); jg.writeStringField("_id", id); @@ -373,19 +332,16 @@ public class ElasticsearchClient { * Metadata fields whose values are serialized JSON from the * tika-inference pipeline. These must be written as raw JSON * (arrays/objects) rather than escaped strings so that - * Elasticsearch can index vectors, locators, etc. natively. + * ES can index vectors, locators, etc. natively. */ - static final Set<String> INFERENCE_JSON_FIELDS = Set.of( - "tika:chunks"); + static final Set<String> INFERENCE_JSON_FIELDS = Set.of("tika:chunks"); - private static void writeMetadata(Metadata metadata, - JsonGenerator jsonGenerator) + private static void writeMetadata(Metadata metadata, JsonGenerator jsonGenerator) throws IOException { for (String n : metadata.names()) { String[] vals = metadata.getValues(n); if (vals.length == 1) { - if (INFERENCE_JSON_FIELDS.contains(n) - && isValidJson(vals[0])) { + if (INFERENCE_JSON_FIELDS.contains(n) && isValidJson(vals[0])) { jsonGenerator.writeFieldName(n); jsonGenerator.writeRawValue(vals[0]); } else { @@ -401,12 +357,17 @@ public class ElasticsearchClient { } } - private static final ObjectMapper VALIDATION_MAPPER = new ObjectMapper(); + private static final JsonFactory STRICT_JSON_FACTORY = new JsonFactory(); /** - * Validates that the value is well-formed JSON (array or object) - * before writing it as raw JSON. This prevents injection of - * arbitrary content into the bulk request payload. + * Validates that the value is well-formed JSON (array or object) with + * no trailing content before writing it as raw JSON. + * + * <p>{@code ObjectMapper.readTree()} silently ignores trailing content, + * so a value like {@code [1,2,3], "injected": true} would pass a simple + * readTree check and then inject extra fields into the document via + * {@code writeRawValue}. We use a {@link JsonParser} directly and + * assert that the stream is fully consumed after the root value. */ private static boolean isValidJson(String value) { if (value == null || value.isEmpty()) { @@ -416,8 +377,14 @@ public class ElasticsearchClient { if (first != '[' && first != '{') { return false; } - try { - VALIDATION_MAPPER.readTree(value); + try (JsonParser parser = STRICT_JSON_FACTORY.createParser(value)) { + parser.nextToken(); + parser.skipChildren(); + if (parser.nextToken() != null) { + LOG.warn("Field value has trailing content after root JSON value; " + + "writing as escaped string"); + return false; + } return true; } catch (IOException e) { LOG.warn("Field value starts with '{}' but is not valid JSON; " diff --git a/tika-pipes/tika-pipes-plugins/tika-pipes-elasticsearch/src/main/java/org/apache/tika/pipes/emitter/elasticsearch/ElasticsearchEmitter.java b/tika-pipes/tika-pipes-plugins/tika-pipes-elasticsearch/src/main/java/org/apache/tika/pipes/emitter/es/ESEmitter.java similarity index 70% rename from tika-pipes/tika-pipes-plugins/tika-pipes-elasticsearch/src/main/java/org/apache/tika/pipes/emitter/elasticsearch/ElasticsearchEmitter.java rename to tika-pipes/tika-pipes-plugins/tika-pipes-elasticsearch/src/main/java/org/apache/tika/pipes/emitter/es/ESEmitter.java index 2c8c66783a..cb382a6b63 100644 --- a/tika-pipes/tika-pipes-plugins/tika-pipes-elasticsearch/src/main/java/org/apache/tika/pipes/emitter/elasticsearch/ElasticsearchEmitter.java +++ b/tika-pipes/tika-pipes-plugins/tika-pipes-elasticsearch/src/main/java/org/apache/tika/pipes/emitter/es/ESEmitter.java @@ -14,7 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.tika.pipes.emitter.elasticsearch; +package org.apache.tika.pipes.emitter.es; import java.io.IOException; import java.util.List; @@ -33,40 +33,36 @@ import org.apache.tika.pipes.api.emitter.EmitData; import org.apache.tika.plugins.ExtensionConfig; /** - * Emitter that sends documents to Elasticsearch via its REST API. + * Emitter that sends documents to an ES-compatible REST API. * - * <p>This emitter does <b>not</b> depend on the Elasticsearch Java client + * <p>This emitter does <b>not</b> depend on the ES Java client * (which changed to a non-ASL license). It uses plain HTTP via * Apache HttpClient to call the {@code _bulk} endpoint directly. * * <p>Supports: * <ul> - * <li>API key authentication ({@code Authorization: ApiKey <base64>}) - * — common with Elasticsearch 8.x</li> + * <li>API key authentication ({@code Authorization: ApiKey <base64>})</li> * <li>Basic authentication (username/password via httpClientConfig)</li> * <li>OVERWRITE and UPSERT update strategies</li> * <li>SEPARATE_DOCUMENTS and PARENT_CHILD attachment strategies</li> * </ul> */ -public class ElasticsearchEmitter extends AbstractEmitter { +public class ESEmitter extends AbstractEmitter { public static final String DEFAULT_EMBEDDED_FILE_FIELD_NAME = "embedded"; - private static final Logger LOG = - LoggerFactory.getLogger(ElasticsearchEmitter.class); + private static final Logger LOG = LoggerFactory.getLogger(ESEmitter.class); - private ElasticsearchClient elasticsearchClient; + private ESClient esClient; private final HttpClientFactory httpClientFactory; - private final ElasticsearchEmitterConfig config; + private final ESEmitterConfig config; - public static ElasticsearchEmitter build(ExtensionConfig pluginConfig) + public static ESEmitter build(ExtensionConfig pluginConfig) throws TikaConfigException, IOException { - ElasticsearchEmitterConfig config = - ElasticsearchEmitterConfig.load(pluginConfig.json()); - return new ElasticsearchEmitter(pluginConfig, config); + ESEmitterConfig config = ESEmitterConfig.load(pluginConfig.json()); + return new ESEmitter(pluginConfig, config); } - public ElasticsearchEmitter(ExtensionConfig pluginConfig, - ElasticsearchEmitterConfig config) + public ESEmitter(ExtensionConfig pluginConfig, ESEmitterConfig config) throws IOException, TikaConfigException { super(pluginConfig); this.config = config; @@ -80,10 +76,9 @@ public class ElasticsearchEmitter extends AbstractEmitter { LOG.debug("metadataList is null or empty"); return; } - try { LOG.debug("about to emit {} docs", emitData.size()); - elasticsearchClient.emitDocuments(emitData); + esClient.emitDocuments(emitData); LOG.info("successfully emitted {} docs", emitData.size()); } catch (TikaClientException e) { LOG.warn("problem emitting docs", e); @@ -101,7 +96,7 @@ public class ElasticsearchEmitter extends AbstractEmitter { try { LOG.debug("about to emit one doc with {} metadata entries", metadataList.size()); - elasticsearchClient.emitDocument(emitKey, metadataList); + esClient.emitDocument(emitKey, metadataList); LOG.info("successfully emitted one doc"); } catch (TikaClientException e) { LOG.warn("problem emitting doc", e); @@ -110,18 +105,29 @@ public class ElasticsearchEmitter extends AbstractEmitter { } private void configure() throws TikaConfigException { - ConfigValidator.mustNotBeEmpty("elasticsearchUrl", - config.elasticsearchUrl()); + ConfigValidator.mustNotBeEmpty("esUrl", config.esUrl()); ConfigValidator.mustNotBeEmpty("idField", config.idField()); HttpClientConfig http = config.httpClientConfig(); if (http != null) { httpClientFactory.setUserName(http.userName()); httpClientFactory.setPassword(http.password()); + if (http.socketTimeout() > 0) { + httpClientFactory.setSocketTimeout(http.socketTimeout()); + } + if (http.connectionTimeout() > 0) { + httpClientFactory.setConnectTimeout(http.connectionTimeout()); + } + if (http.authScheme() != null) { + httpClientFactory.setAuthScheme(http.authScheme()); + } + if (http.proxyHost() != null) { + httpClientFactory.setProxyHost(http.proxyHost()); + httpClientFactory.setProxyPort(http.proxyPort()); + } + httpClientFactory.setVerifySsl(http.verifySsl()); } - elasticsearchClient = - new ElasticsearchClient(config, httpClientFactory.build()); + esClient = new ESClient(config, httpClientFactory.build()); } - } diff --git a/tika-pipes/tika-pipes-plugins/tika-pipes-elasticsearch/src/main/java/org/apache/tika/pipes/emitter/elasticsearch/ElasticsearchEmitterConfig.java b/tika-pipes/tika-pipes-plugins/tika-pipes-elasticsearch/src/main/java/org/apache/tika/pipes/emitter/es/ESEmitterConfig.java similarity index 56% rename from tika-pipes/tika-pipes-plugins/tika-pipes-elasticsearch/src/main/java/org/apache/tika/pipes/emitter/elasticsearch/ElasticsearchEmitterConfig.java rename to tika-pipes/tika-pipes-plugins/tika-pipes-elasticsearch/src/main/java/org/apache/tika/pipes/emitter/es/ESEmitterConfig.java index 874431daac..288098957b 100644 --- a/tika-pipes/tika-pipes-plugins/tika-pipes-elasticsearch/src/main/java/org/apache/tika/pipes/emitter/elasticsearch/ElasticsearchEmitterConfig.java +++ b/tika-pipes/tika-pipes-plugins/tika-pipes-elasticsearch/src/main/java/org/apache/tika/pipes/emitter/es/ESEmitterConfig.java @@ -14,7 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.tika.pipes.emitter.elasticsearch; +package org.apache.tika.pipes.emitter.es; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; @@ -22,25 +22,25 @@ import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.tika.exception.TikaConfigException; /** - * Configuration for the Elasticsearch emitter. + * Configuration for the ES emitter. * - * @param elasticsearchUrl Full URL including index, e.g. {@code https://localhost:9200/my-index} - * @param idField Metadata field to use as the document {@code _id} + * @param esUrl Full URL including index, e.g. {@code https://localhost:9200/my-index} + * @param idField Metadata field to use as the document {@code _id} * @param attachmentStrategy How to handle embedded documents * @param updateStrategy Whether to overwrite or upsert * @param commitWithin Not used by ES, kept for API parity with OpenSearch emitter * @param embeddedFileFieldName Field name for embedded-file relation - * @param apiKey Elasticsearch API key for authentication (Base64-encoded - * {@code id:api_key}). Sent as {@code Authorization: ApiKey <value>}. + * @param apiKey API key for authentication (Base64-encoded {@code id:api_key}). + * Sent as {@code Authorization: ApiKey <value>}. * If null/empty, falls back to httpClientConfig's userName/password * for basic auth. * @param httpClientConfig HTTP connection settings (basic auth, timeouts, proxy) */ -public record ElasticsearchEmitterConfig(String elasticsearchUrl, String idField, - AttachmentStrategy attachmentStrategy, - UpdateStrategy updateStrategy, int commitWithin, - String embeddedFileFieldName, String apiKey, - HttpClientConfig httpClientConfig) { +public record ESEmitterConfig(String esUrl, String idField, + AttachmentStrategy attachmentStrategy, + UpdateStrategy updateStrategy, int commitWithin, + String embeddedFileFieldName, String apiKey, + HttpClientConfig httpClientConfig) { public enum AttachmentStrategy { SEPARATE_DOCUMENTS, PARENT_CHILD, } @@ -51,15 +51,24 @@ public record ElasticsearchEmitterConfig(String elasticsearchUrl, String idField private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); - public static ElasticsearchEmitterConfig load(final String json) + public static ESEmitterConfig load(final String json) throws TikaConfigException { try { - return OBJECT_MAPPER.readValue(json, - ElasticsearchEmitterConfig.class); + return OBJECT_MAPPER.readValue(json, ESEmitterConfig.class); } catch (JsonProcessingException e) { throw new TikaConfigException( - "Failed to parse ElasticsearchEmitterConfig from JSON", e); + "Failed to parse ESEmitterConfig from JSON", e); } } + /** Overrides the record default to prevent {@code apiKey} leaking into logs. */ + @Override + public String toString() { + return "ESEmitterConfig{esUrl='" + esUrl + '\'' + + ", idField='" + idField + '\'' + + ", attachmentStrategy=" + attachmentStrategy + + ", updateStrategy=" + updateStrategy + + ", apiKey=" + (apiKey != null ? "[REDACTED]" : "null") + + '}'; + } } diff --git a/tika-pipes/tika-pipes-plugins/tika-pipes-elasticsearch/src/main/java/org/apache/tika/pipes/emitter/elasticsearch/ElasticsearchEmitterFactory.java b/tika-pipes/tika-pipes-plugins/tika-pipes-elasticsearch/src/main/java/org/apache/tika/pipes/emitter/es/ESEmitterFactory.java similarity index 80% copy from tika-pipes/tika-pipes-plugins/tika-pipes-elasticsearch/src/main/java/org/apache/tika/pipes/emitter/elasticsearch/ElasticsearchEmitterFactory.java copy to tika-pipes/tika-pipes-plugins/tika-pipes-elasticsearch/src/main/java/org/apache/tika/pipes/emitter/es/ESEmitterFactory.java index 94bda68b2a..21771d1d5d 100644 --- a/tika-pipes/tika-pipes-plugins/tika-pipes-elasticsearch/src/main/java/org/apache/tika/pipes/emitter/elasticsearch/ElasticsearchEmitterFactory.java +++ b/tika-pipes/tika-pipes-plugins/tika-pipes-elasticsearch/src/main/java/org/apache/tika/pipes/emitter/es/ESEmitterFactory.java @@ -14,7 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.tika.pipes.emitter.elasticsearch; +package org.apache.tika.pipes.emitter.es; import java.io.IOException; @@ -26,14 +26,14 @@ import org.apache.tika.pipes.api.emitter.EmitterFactory; import org.apache.tika.plugins.ExtensionConfig; /** - * Factory for creating Elasticsearch emitters. + * Factory for creating ES emitters. * * <p>Example JSON configuration: * <pre> * "emitters": { - * "elasticsearch-emitter": { + * "es-emitter": { * "my-es-emitter": { - * "elasticsearchUrl": "https://localhost:9200/my-index", + * "esUrl": "https://localhost:9200/my-index", * "idField": "id", * "apiKey": "base64-encoded-id:api_key", * "attachmentStrategy": "PARENT_CHILD", @@ -45,9 +45,9 @@ import org.apache.tika.plugins.ExtensionConfig; * </pre> */ @Extension -public class ElasticsearchEmitterFactory implements EmitterFactory { +public class ESEmitterFactory implements EmitterFactory { - public static final String NAME = "elasticsearch-emitter"; + public static final String NAME = "es-emitter"; @Override public String getName() { @@ -57,6 +57,6 @@ public class ElasticsearchEmitterFactory implements EmitterFactory { @Override public Emitter buildExtension(ExtensionConfig extensionConfig) throws IOException, TikaConfigException { - return ElasticsearchEmitter.build(extensionConfig); + return ESEmitter.build(extensionConfig); } } diff --git a/tika-pipes/tika-pipes-plugins/tika-pipes-elasticsearch/src/main/java/org/apache/tika/pipes/emitter/es/HttpClientConfig.java b/tika-pipes/tika-pipes-plugins/tika-pipes-elasticsearch/src/main/java/org/apache/tika/pipes/emitter/es/HttpClientConfig.java new file mode 100644 index 0000000000..4056c282aa --- /dev/null +++ b/tika-pipes/tika-pipes-plugins/tika-pipes-elasticsearch/src/main/java/org/apache/tika/pipes/emitter/es/HttpClientConfig.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.tika.pipes.emitter.es; + +/** + * HTTP client settings for the ES emitter and reporter. + * Field names and semantics are intentionally aligned with the OpenSearch + * emitter's HttpClientConfig for configuration consistency. + * + * @param userName Username for basic authentication (optional) + * @param password Password for basic authentication (optional) + * @param authScheme Auth scheme passed to HttpClientFactory, e.g. {@code "basic"} or {@code "ntlm"} + * @param connectionTimeout Connect timeout in milliseconds + * @param socketTimeout Socket read timeout in milliseconds + * @param proxyHost HTTP proxy host (optional) + * @param proxyPort HTTP proxy port + * @param verifySsl When {@code true}, the HTTP client validates server certificates and + * hostnames using the JVM's default trust store. Defaults to + * {@code false} for backward compatibility (trust-all / no hostname check). + */ +public record HttpClientConfig(String userName, String password, + String authScheme, int connectionTimeout, + int socketTimeout, String proxyHost, int proxyPort, + boolean verifySsl) { +} diff --git a/tika-pipes/tika-pipes-plugins/tika-pipes-elasticsearch/src/main/java/org/apache/tika/pipes/emitter/elasticsearch/JsonResponse.java b/tika-pipes/tika-pipes-plugins/tika-pipes-elasticsearch/src/main/java/org/apache/tika/pipes/emitter/es/JsonResponse.java similarity index 96% rename from tika-pipes/tika-pipes-plugins/tika-pipes-elasticsearch/src/main/java/org/apache/tika/pipes/emitter/elasticsearch/JsonResponse.java rename to tika-pipes/tika-pipes-plugins/tika-pipes-elasticsearch/src/main/java/org/apache/tika/pipes/emitter/es/JsonResponse.java index 5e6f3373e3..ce4cc591b8 100644 --- a/tika-pipes/tika-pipes-plugins/tika-pipes-elasticsearch/src/main/java/org/apache/tika/pipes/emitter/elasticsearch/JsonResponse.java +++ b/tika-pipes/tika-pipes-plugins/tika-pipes-elasticsearch/src/main/java/org/apache/tika/pipes/emitter/es/JsonResponse.java @@ -14,8 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.tika.pipes.emitter.elasticsearch; - +package org.apache.tika.pipes.emitter.es; import com.fasterxml.jackson.databind.JsonNode; diff --git a/tika-pipes/tika-pipes-plugins/tika-pipes-elasticsearch/src/main/java/org/apache/tika/pipes/plugin/elasticsearch/ElasticsearchPipesPlugin.java b/tika-pipes/tika-pipes-plugins/tika-pipes-elasticsearch/src/main/java/org/apache/tika/pipes/plugin/es/ESPipesPlugin.java similarity index 72% rename from tika-pipes/tika-pipes-plugins/tika-pipes-elasticsearch/src/main/java/org/apache/tika/pipes/plugin/elasticsearch/ElasticsearchPipesPlugin.java rename to tika-pipes/tika-pipes-plugins/tika-pipes-elasticsearch/src/main/java/org/apache/tika/pipes/plugin/es/ESPipesPlugin.java index 1b6a195171..0e1d60401f 100644 --- a/tika-pipes/tika-pipes-plugins/tika-pipes-elasticsearch/src/main/java/org/apache/tika/pipes/plugin/elasticsearch/ElasticsearchPipesPlugin.java +++ b/tika-pipes/tika-pipes-plugins/tika-pipes-elasticsearch/src/main/java/org/apache/tika/pipes/plugin/es/ESPipesPlugin.java @@ -14,37 +14,35 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.tika.pipes.plugin.elasticsearch; +package org.apache.tika.pipes.plugin.es; import org.pf4j.Plugin; import org.pf4j.PluginWrapper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class ElasticsearchPipesPlugin extends Plugin { - private static final Logger LOG = - LoggerFactory.getLogger(ElasticsearchPipesPlugin.class); +public class ESPipesPlugin extends Plugin { + private static final Logger LOG = LoggerFactory.getLogger(ESPipesPlugin.class); - public ElasticsearchPipesPlugin(PluginWrapper wrapper) { + public ESPipesPlugin(PluginWrapper wrapper) { super(wrapper); } @Override public void start() { - LOG.info("Starting Elasticsearch pipes plugin"); + LOG.info("Starting ES pipes plugin"); super.start(); } @Override public void stop() { - LOG.info("Stopping Elasticsearch pipes plugin"); + LOG.info("Stopping ES pipes plugin"); super.stop(); } @Override public void delete() { - LOG.info("Deleting Elasticsearch pipes plugin"); + LOG.info("Deleting ES pipes plugin"); super.delete(); } - } diff --git a/tika-pipes/tika-pipes-plugins/tika-pipes-elasticsearch/src/main/java/org/apache/tika/pipes/reporter/es/ESPipesReporter.java b/tika-pipes/tika-pipes-plugins/tika-pipes-elasticsearch/src/main/java/org/apache/tika/pipes/reporter/es/ESPipesReporter.java new file mode 100644 index 0000000000..801eb51b05 --- /dev/null +++ b/tika-pipes/tika-pipes-plugins/tika-pipes-elasticsearch/src/main/java/org/apache/tika/pipes/reporter/es/ESPipesReporter.java @@ -0,0 +1,251 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.tika.pipes.reporter.es; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStreamReader; +import java.io.Reader; +import java.io.StringWriter; +import java.nio.charset.StandardCharsets; + +import com.fasterxml.jackson.core.JsonFactory; +import com.fasterxml.jackson.core.JsonGenerator; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.http.HttpResponse; +import org.apache.http.client.HttpClient; +import org.apache.http.client.methods.CloseableHttpResponse; +import org.apache.http.client.methods.HttpPost; +import org.apache.http.entity.StringEntity; +import org.apache.http.util.EntityUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.tika.client.HttpClientFactory; +import org.apache.tika.client.TikaClientException; +import org.apache.tika.exception.TikaConfigException; +import org.apache.tika.metadata.ExternalProcess; +import org.apache.tika.metadata.Metadata; +import org.apache.tika.pipes.api.FetchEmitTuple; +import org.apache.tika.pipes.api.PipesResult; +import org.apache.tika.pipes.api.pipesiterator.TotalCountResult; +import org.apache.tika.pipes.emitter.es.HttpClientConfig; +import org.apache.tika.pipes.emitter.es.JsonResponse; +import org.apache.tika.pipes.reporters.PipesReporterBase; +import org.apache.tika.plugins.ExtensionConfig; +import org.apache.tika.utils.StringUtils; + +public class ESPipesReporter extends PipesReporterBase { + + private static final Logger LOG = LoggerFactory.getLogger(ESPipesReporter.class); + + public static final String DEFAULT_PARSE_TIME_KEY = "parse_time_ms"; + public static final String DEFAULT_PARSE_STATUS_KEY = "parse_status"; + public static final String DEFAULT_EXIT_VALUE_KEY = "exit_value"; + + private final ESReporterConfig config; + private HttpClient httpClient; + private String parseTimeKey = DEFAULT_PARSE_TIME_KEY; + private String parseStatusKey = DEFAULT_PARSE_STATUS_KEY; + private String exitValueKey = DEFAULT_EXIT_VALUE_KEY; + + public static ESPipesReporter build(ExtensionConfig pluginConfig) + throws TikaConfigException, IOException { + ESReporterConfig config = ESReporterConfig.load(pluginConfig.json()); + return new ESPipesReporter(pluginConfig, config); + } + + public ESPipesReporter(ExtensionConfig pluginConfig, ESReporterConfig config) + throws TikaConfigException { + super(pluginConfig, config.includes(), config.excludes()); + this.config = config; + init(); + } + + private void init() throws TikaConfigException { + if (StringUtils.isBlank(config.esUrl())) { + throw new TikaConfigException("Must specify an esUrl!"); + } + HttpClientFactory httpClientFactory = new HttpClientFactory(); + HttpClientConfig http = config.httpClientConfig(); + if (http != null) { + httpClientFactory.setUserName(http.userName()); + httpClientFactory.setPassword(http.password()); + if (http.socketTimeout() > 0) { + httpClientFactory.setSocketTimeout(http.socketTimeout()); + } + if (http.connectionTimeout() > 0) { + httpClientFactory.setConnectTimeout(http.connectionTimeout()); + } + if (http.authScheme() != null) { + httpClientFactory.setAuthScheme(http.authScheme()); + } + if (http.proxyHost() != null) { + httpClientFactory.setProxyHost(http.proxyHost()); + httpClientFactory.setProxyPort(http.proxyPort()); + } + httpClientFactory.setVerifySsl(http.verifySsl()); + } + httpClient = httpClientFactory.build(); + + parseStatusKey = StringUtils.isBlank(config.keyPrefix()) + ? parseStatusKey : config.keyPrefix() + parseStatusKey; + parseTimeKey = StringUtils.isBlank(config.keyPrefix()) + ? parseTimeKey : config.keyPrefix() + parseTimeKey; + exitValueKey = StringUtils.isBlank(config.keyPrefix()) + ? exitValueKey : config.keyPrefix() + exitValueKey; + } + + @Override + public void report(FetchEmitTuple t, PipesResult result, long elapsed) { + if (!accept(result.status())) { + return; + } + Metadata metadata = new Metadata(); + metadata.set(parseStatusKey, result.status().name()); + metadata.set(parseTimeKey, Long.toString(elapsed)); + if (result.emitData() != null && result.emitData().getMetadataList() != null && + !result.emitData().getMetadataList().isEmpty()) { + Metadata m = result.emitData().getMetadataList().get(0); + if (m.get(ExternalProcess.EXIT_VALUE) != null) { + metadata.set(exitValueKey, m.get(ExternalProcess.EXIT_VALUE)); + } + } + try { + String routing = config.includeRouting() + ? t.getEmitKey().getEmitKey() : null; + emitDocument(t.getEmitKey().getEmitKey(), routing, metadata); + } catch (IOException | TikaClientException e) { + LOG.warn("failed to report status for '{}'", t.getId(), e); + } + } + + private void emitDocument(String emitKey, String routing, Metadata metadata) + throws IOException, TikaClientException { + StringWriter writer = new StringWriter(); + writeBulkHeader(emitKey, routing, writer); + writer.append("\n"); + writeDoc(metadata, writer); + writer.append("\n"); + + String requestUrl = config.esUrl() + "/_bulk"; + JsonResponse response = postJson(requestUrl, writer.toString()); + if (response.getStatus() != 200) { + throw new TikaClientException(response.getMsg()); + } + JsonNode errorNode = response.getJson().get("errors"); + if (errorNode != null && errorNode.asText().equals("true")) { + throw new TikaClientException(response.getJson().toString()); + } + } + + private void writeBulkHeader(String id, String routing, StringWriter writer) + throws IOException { + try (JsonGenerator jg = new JsonFactory().createGenerator(writer)) { + jg.writeStartObject(); + jg.writeObjectFieldStart("update"); + jg.writeStringField("_id", id); + if (!StringUtils.isEmpty(routing)) { + jg.writeStringField("routing", routing); + } + jg.writeNumberField("retry_on_conflict", 3); + jg.writeEndObject(); + jg.writeEndObject(); + } + } + + private void writeDoc(Metadata metadata, StringWriter writer) throws IOException { + try (JsonGenerator jg = new JsonFactory().createGenerator(writer)) { + jg.writeStartObject(); + jg.writeObjectFieldStart("doc"); + for (String n : metadata.names()) { + String[] vals = metadata.getValues(n); + if (vals.length == 1) { + jg.writeStringField(n, vals[0]); + } else { + jg.writeArrayFieldStart(n); + for (String v : vals) { + jg.writeString(v); + } + jg.writeEndArray(); + } + } + jg.writeEndObject(); + jg.writeBooleanField("doc_as_upsert", true); + jg.writeEndObject(); + } + } + + private JsonResponse postJson(String url, String json) throws IOException { + HttpPost httpRequest = new HttpPost(url); + httpRequest.setEntity(new StringEntity(json, StandardCharsets.UTF_8)); + httpRequest.setHeader("Accept", "application/json"); + httpRequest.setHeader("Content-type", "application/json; charset=utf-8"); + if (!StringUtils.isEmpty(config.apiKey())) { + httpRequest.setHeader("Authorization", "ApiKey " + config.apiKey()); + } + HttpResponse response = null; + try { + response = httpClient.execute(httpRequest); + int status = response.getStatusLine().getStatusCode(); + if (status == 200) { + try (Reader reader = new BufferedReader( + new InputStreamReader( + response.getEntity().getContent(), + StandardCharsets.UTF_8))) { + JsonNode node = new ObjectMapper().readTree(reader); + return new JsonResponse(200, node); + } + } else { + return new JsonResponse(status, + new String(EntityUtils.toByteArray(response.getEntity()), + StandardCharsets.UTF_8)); + } + } finally { + if (response instanceof CloseableHttpResponse) { + ((CloseableHttpResponse) response).close(); + } + httpRequest.releaseConnection(); + } + } + + @Override + public void report(TotalCountResult totalCountResult) { + // not supported + } + + @Override + public boolean supportsTotalCount() { + return false; + } + + @Override + public void error(Throwable t) { + LOG.error("crashed", t); + } + + @Override + public void error(String msg) { + LOG.error("crashed {}", msg); + } + + @Override + public void close() throws IOException { + // nothing to close + } +} diff --git a/tika-pipes/tika-pipes-plugins/tika-pipes-elasticsearch/src/main/java/org/apache/tika/pipes/emitter/elasticsearch/HttpClientConfig.java b/tika-pipes/tika-pipes-plugins/tika-pipes-elasticsearch/src/main/java/org/apache/tika/pipes/reporter/es/ESReporterConfig.java similarity index 53% rename from tika-pipes/tika-pipes-plugins/tika-pipes-elasticsearch/src/main/java/org/apache/tika/pipes/emitter/elasticsearch/HttpClientConfig.java rename to tika-pipes/tika-pipes-plugins/tika-pipes-elasticsearch/src/main/java/org/apache/tika/pipes/reporter/es/ESReporterConfig.java index 6b2b18b00b..0ff1f8809c 100644 --- a/tika-pipes/tika-pipes-plugins/tika-pipes-elasticsearch/src/main/java/org/apache/tika/pipes/emitter/elasticsearch/HttpClientConfig.java +++ b/tika-pipes/tika-pipes-plugins/tika-pipes-elasticsearch/src/main/java/org/apache/tika/pipes/reporter/es/ESReporterConfig.java @@ -14,22 +14,28 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.tika.pipes.emitter.elasticsearch; +package org.apache.tika.pipes.reporter.es; -import java.io.IOException; +import java.util.Set; +import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.tika.exception.TikaConfigException; +import org.apache.tika.pipes.emitter.es.HttpClientConfig; -public record HttpClientConfig(String userName, String password, - String authScheme, int connectionTimeout, - int socketTimeout, String proxyHost, - int proxyPort) { +public record ESReporterConfig(String esUrl, Set<String> includes, Set<String> excludes, + String keyPrefix, boolean includeRouting, + String apiKey, HttpClientConfig httpClientConfig) { private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); - public static HttpClientConfig load(final String json) throws IOException { - return OBJECT_MAPPER.readValue(json, HttpClientConfig.class); + public static ESReporterConfig load(final String json) throws TikaConfigException { + try { + return OBJECT_MAPPER.readValue(json, ESReporterConfig.class); + } catch (JsonProcessingException e) { + throw new TikaConfigException( + "Failed to parse ESReporterConfig from JSON", e); + } } - } diff --git a/tika-pipes/tika-pipes-plugins/tika-pipes-elasticsearch/src/main/java/org/apache/tika/pipes/emitter/elasticsearch/ElasticsearchEmitterFactory.java b/tika-pipes/tika-pipes-plugins/tika-pipes-elasticsearch/src/main/java/org/apache/tika/pipes/reporter/es/ESReporterFactory.java similarity index 59% rename from tika-pipes/tika-pipes-plugins/tika-pipes-elasticsearch/src/main/java/org/apache/tika/pipes/emitter/elasticsearch/ElasticsearchEmitterFactory.java rename to tika-pipes/tika-pipes-plugins/tika-pipes-elasticsearch/src/main/java/org/apache/tika/pipes/reporter/es/ESReporterFactory.java index 94bda68b2a..589232f37c 100644 --- a/tika-pipes/tika-pipes-plugins/tika-pipes-elasticsearch/src/main/java/org/apache/tika/pipes/emitter/elasticsearch/ElasticsearchEmitterFactory.java +++ b/tika-pipes/tika-pipes-plugins/tika-pipes-elasticsearch/src/main/java/org/apache/tika/pipes/reporter/es/ESReporterFactory.java @@ -14,40 +14,34 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.tika.pipes.emitter.elasticsearch; +package org.apache.tika.pipes.reporter.es; import java.io.IOException; import org.pf4j.Extension; import org.apache.tika.exception.TikaConfigException; -import org.apache.tika.pipes.api.emitter.Emitter; -import org.apache.tika.pipes.api.emitter.EmitterFactory; +import org.apache.tika.pipes.api.reporter.PipesReporterFactory; import org.apache.tika.plugins.ExtensionConfig; /** - * Factory for creating Elasticsearch emitters. + * Factory for creating ES pipes reporters. * * <p>Example JSON configuration: * <pre> - * "emitters": { - * "elasticsearch-emitter": { - * "my-es-emitter": { - * "elasticsearchUrl": "https://localhost:9200/my-index", - * "idField": "id", - * "apiKey": "base64-encoded-id:api_key", - * "attachmentStrategy": "PARENT_CHILD", - * "updateStrategy": "UPSERT", - * "embeddedFileFieldName": "embedded" - * } + * "pipes-reporters": { + * "es-pipes-reporter": { + * "esUrl": "http://localhost:9200/tika-status", + * "keyPrefix": "status_", + * "includeRouting": false * } * } * </pre> */ @Extension -public class ElasticsearchEmitterFactory implements EmitterFactory { +public class ESReporterFactory implements PipesReporterFactory { - public static final String NAME = "elasticsearch-emitter"; + public static final String NAME = "es-pipes-reporter"; @Override public String getName() { @@ -55,8 +49,8 @@ public class ElasticsearchEmitterFactory implements EmitterFactory { } @Override - public Emitter buildExtension(ExtensionConfig extensionConfig) + public ESPipesReporter buildExtension(ExtensionConfig extensionConfig) throws IOException, TikaConfigException { - return ElasticsearchEmitter.build(extensionConfig); + return ESPipesReporter.build(extensionConfig); } } diff --git a/tika-pipes/tika-pipes-plugins/tika-pipes-elasticsearch/src/main/resources/plugin.properties b/tika-pipes/tika-pipes-plugins/tika-pipes-elasticsearch/src/main/resources/plugin.properties index 9fc35c59c4..1e76d5b52e 100644 --- a/tika-pipes/tika-pipes-plugins/tika-pipes-elasticsearch/src/main/resources/plugin.properties +++ b/tika-pipes/tika-pipes-plugins/tika-pipes-elasticsearch/src/main/resources/plugin.properties @@ -14,8 +14,8 @@ # See the License for the specific language governing permissions and # limitations under the License. -plugin.id=tika-pipes-elasticsearch-plugin -plugin.class=org.apache.tika.pipes.plugin.elasticsearch.ElasticsearchPipesPlugin +plugin.id=tika-pipes-es-plugin +plugin.class=org.apache.tika.pipes.plugin.es.ESPipesPlugin plugin.version=4.0.0-SNAPSHOT plugin.provider=Apache Tika -plugin.description=Pipes for Elasticsearch (plain HTTP, ASL v2 only) +plugin.description=ES emitter for Tika Pipes (plain HTTP, ASL v2 only) diff --git a/tika-pipes/tika-pipes-plugins/tika-pipes-elasticsearch/src/test/java/org/apache/tika/pipes/emitter/elasticsearch/ElasticsearchClientTest.java b/tika-pipes/tika-pipes-plugins/tika-pipes-elasticsearch/src/test/java/org/apache/tika/pipes/emitter/es/ESClientTest.java similarity index 57% rename from tika-pipes/tika-pipes-plugins/tika-pipes-elasticsearch/src/test/java/org/apache/tika/pipes/emitter/elasticsearch/ElasticsearchClientTest.java rename to tika-pipes/tika-pipes-plugins/tika-pipes-elasticsearch/src/test/java/org/apache/tika/pipes/emitter/es/ESClientTest.java index 3193b700b2..fac02a1bab 100644 --- a/tika-pipes/tika-pipes-plugins/tika-pipes-elasticsearch/src/test/java/org/apache/tika/pipes/emitter/elasticsearch/ElasticsearchClientTest.java +++ b/tika-pipes/tika-pipes-plugins/tika-pipes-elasticsearch/src/test/java/org/apache/tika/pipes/emitter/es/ESClientTest.java @@ -14,9 +14,10 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.tika.pipes.emitter.elasticsearch; +package org.apache.tika.pipes.emitter.es; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertTrue; import com.fasterxml.jackson.databind.JsonNode; @@ -26,7 +27,7 @@ import org.junit.jupiter.api.Test; import org.apache.tika.TikaTest; import org.apache.tika.metadata.Metadata; -public class ElasticsearchClientTest extends TikaTest { +public class ESClientTest extends TikaTest { private static final ObjectMapper MAPPER = new ObjectMapper(); @@ -36,23 +37,19 @@ public class ElasticsearchClientTest extends TikaTest { metadata.add("authors", "author1"); metadata.add("authors", "author2"); metadata.add("title", "title1"); - for (ElasticsearchEmitterConfig.AttachmentStrategy strategy : - ElasticsearchEmitterConfig.AttachmentStrategy.values()) { - String json = - ElasticsearchClient.metadataToJsonContainerInsert( - metadata, strategy); + for (ESEmitterConfig.AttachmentStrategy strategy : + ESEmitterConfig.AttachmentStrategy.values()) { + String json = ESClient.metadataToJsonContainerInsert(metadata, strategy); assertContains("author1", json); assertContains("author2", json); assertContains("authors", json); assertContains("title1", json); } - for (ElasticsearchEmitterConfig.AttachmentStrategy strategy : - ElasticsearchEmitterConfig.AttachmentStrategy.values()) { - String json = - ElasticsearchClient.metadataToJsonEmbeddedInsert( - metadata, strategy, "myEmitKey", - ElasticsearchEmitter - .DEFAULT_EMBEDDED_FILE_FIELD_NAME); + for (ESEmitterConfig.AttachmentStrategy strategy : + ESEmitterConfig.AttachmentStrategy.values()) { + String json = ESClient.metadataToJsonEmbeddedInsert( + metadata, strategy, "myEmitKey", + ESEmitter.DEFAULT_EMBEDDED_FILE_FIELD_NAME); assertContains("author1", json); assertContains("author2", json); assertContains("authors", json); @@ -64,11 +61,8 @@ public class ElasticsearchClientTest extends TikaTest { public void testParentChildContainer() throws Exception { Metadata metadata = new Metadata(); metadata.add("title", "parent doc"); - String json = - ElasticsearchClient.metadataToJsonContainerInsert( - metadata, - ElasticsearchEmitterConfig.AttachmentStrategy - .PARENT_CHILD); + String json = ESClient.metadataToJsonContainerInsert( + metadata, ESEmitterConfig.AttachmentStrategy.PARENT_CHILD); assertContains("relation_type", json); assertContains("container", json); } @@ -77,12 +71,9 @@ public class ElasticsearchClientTest extends TikaTest { public void testParentChildEmbedded() throws Exception { Metadata metadata = new Metadata(); metadata.add("title", "child doc"); - String json = - ElasticsearchClient.metadataToJsonEmbeddedInsert( - metadata, - ElasticsearchEmitterConfig.AttachmentStrategy - .PARENT_CHILD, - "parentKey", "embedded"); + String json = ESClient.metadataToJsonEmbeddedInsert( + metadata, ESEmitterConfig.AttachmentStrategy.PARENT_CHILD, + "parentKey", "embedded"); assertContains("relation_type", json); assertContains("parentKey", json); assertContains("embedded", json); @@ -92,12 +83,9 @@ public class ElasticsearchClientTest extends TikaTest { public void testSeparateDocumentsEmbedded() throws Exception { Metadata metadata = new Metadata(); metadata.add("title", "child doc"); - String json = - ElasticsearchClient.metadataToJsonEmbeddedInsert( - metadata, - ElasticsearchEmitterConfig.AttachmentStrategy - .SEPARATE_DOCUMENTS, - "parentKey", "embedded"); + String json = ESClient.metadataToJsonEmbeddedInsert( + metadata, ESEmitterConfig.AttachmentStrategy.SEPARATE_DOCUMENTS, + "parentKey", "embedded"); assertContains("parent", json); assertContains("parentKey", json); assertNotContained("relation_type", json); @@ -107,69 +95,69 @@ public class ElasticsearchClientTest extends TikaTest { public void testChunksFieldWrittenAsRawJson() throws Exception { Metadata metadata = new Metadata(); metadata.set("title", "test doc"); + // "PwAAAEEgAABAwAAA" is the ES-documented base64 for [0.5, 10.0, 6.0] + // in big-endian float32 — the exact format ES dense_vector expects. metadata.set("tika:chunks", - "[{\"text\":\"hello\",\"vector\":\"AAAA\"," + "[{\"text\":\"hello\",\"vector\":\"PwAAAEEgAABAwAAA\"," + "\"locators\":{\"text\":[{\"start_offset\":0," + "\"end_offset\":5}]}}]"); - String json = - ElasticsearchClient.metadataToJsonContainerInsert( - metadata, - ElasticsearchEmitterConfig.AttachmentStrategy - .SEPARATE_DOCUMENTS); + String json = ESClient.metadataToJsonContainerInsert( + metadata, ESEmitterConfig.AttachmentStrategy.SEPARATE_DOCUMENTS); - // Parse the output JSON — tika:chunks should be a real JSON - // array, not a double-escaped string JsonNode doc = MAPPER.readTree(json); JsonNode chunks = doc.get("tika:chunks"); assertTrue(chunks.isArray(), "tika:chunks should be a JSON array, not a string"); assertEquals(1, chunks.size()); assertEquals("hello", chunks.get(0).get("text").asText()); - assertEquals("AAAA", chunks.get(0).get("vector").asText()); + assertEquals("PwAAAEEgAABAwAAA", chunks.get(0).get("vector").asText()); } @Test public void testNonJsonFieldStaysString() throws Exception { Metadata metadata = new Metadata(); metadata.set("tika:chunks", "not json at all"); - String json = - ElasticsearchClient.metadataToJsonContainerInsert( - metadata, - ElasticsearchEmitterConfig.AttachmentStrategy - .SEPARATE_DOCUMENTS); + String json = ESClient.metadataToJsonContainerInsert( + metadata, ESEmitterConfig.AttachmentStrategy.SEPARATE_DOCUMENTS); JsonNode doc = MAPPER.readTree(json); - // Should be a plain string since it doesn't look like JSON assertTrue(doc.get("tika:chunks").isTextual()); - assertEquals("not json at all", - doc.get("tika:chunks").asText()); + assertEquals("not json at all", doc.get("tika:chunks").asText()); } @Test public void testRegularFieldNotRawJson() throws Exception { Metadata metadata = new Metadata(); - // A regular field whose value happens to look like JSON - // should NOT be written as raw JSON metadata.set("description", "[some bracketed text]"); - String json = - ElasticsearchClient.metadataToJsonContainerInsert( - metadata, - ElasticsearchEmitterConfig.AttachmentStrategy - .SEPARATE_DOCUMENTS); + String json = ESClient.metadataToJsonContainerInsert( + metadata, ESEmitterConfig.AttachmentStrategy.SEPARATE_DOCUMENTS); JsonNode doc = MAPPER.readTree(json); assertTrue(doc.get("description").isTextual()); } + @Test + public void testTrailingContentRejected() throws Exception { + // A value that passes a naive readTree() check but contains trailing + // content that would inject extra fields into the document via writeRawValue. + Metadata metadata = new Metadata(); + metadata.set("tika:chunks", "[1,2,3], \"injected\": true"); + String json = ESClient.metadataToJsonContainerInsert( + metadata, ESEmitterConfig.AttachmentStrategy.SEPARATE_DOCUMENTS); + JsonNode doc = MAPPER.readTree(json); + // Must be written as an escaped string, not raw JSON — no injection + assertTrue(doc.get("tika:chunks").isTextual(), + "trailing-content value must be escaped, not written as raw JSON"); + assertFalse(doc.has("injected"), + "injected field must not appear in document"); + } + @Test public void testMultiValuedMetadata() throws Exception { Metadata metadata = new Metadata(); metadata.add("tags", "tag1"); metadata.add("tags", "tag2"); metadata.add("tags", "tag3"); - String json = - ElasticsearchClient.metadataToJsonContainerInsert( - metadata, - ElasticsearchEmitterConfig.AttachmentStrategy - .SEPARATE_DOCUMENTS); + String json = ESClient.metadataToJsonContainerInsert( + metadata, ESEmitterConfig.AttachmentStrategy.SEPARATE_DOCUMENTS); assertContains("tag1", json); assertContains("tag2", json); assertContains("tag3", json);
