This is an automated email from the ASF dual-hosted git repository. rzo1 pushed a commit to branch allow-to-modify-indexer-fields-for-storage in repository https://gitbox.apache.org/repos/asf/stormcrawler.git
commit c4b85972890f78d58908e7ab8d29d8e68873ab6e Author: Richard Zowalla <r...@apache.org> AuthorDate: Mon Sep 22 08:45:13 2025 +0200 #1669 - feat(indexer): add template method to customize document content Introduce a new protected hook method `customizeDocument` in `IndexerBolt` that allows subclasses to modify the JSON document before it is indexed. This makes it easier to support custom index mappings with additional top-level fields without duplicating the full `execute()` logic. The default implementation is a no-op. Subclasses can override to add, remove, or transform fields as needed. --- .../stormcrawler/opensearch/bolt/IndexerBolt.java | 23 +++ .../opensearch/bolt/CustomizedIndexerBoltTest.java | 155 +++++++++++++++++++++ 2 files changed, 178 insertions(+) diff --git a/external/opensearch/src/main/java/org/apache/stormcrawler/opensearch/bolt/IndexerBolt.java b/external/opensearch/src/main/java/org/apache/stormcrawler/opensearch/bolt/IndexerBolt.java index f03efc67..eef9231e 100644 --- a/external/opensearch/src/main/java/org/apache/stormcrawler/opensearch/bolt/IndexerBolt.java +++ b/external/opensearch/src/main/java/org/apache/stormcrawler/opensearch/bolt/IndexerBolt.java @@ -227,6 +227,8 @@ public class IndexerBolt extends AbstractIndexerBolt } } + customizeDocument(builder, metadata, tuple); + builder.endObject(); final IndexRequest indexRequest = @@ -270,6 +272,27 @@ public class IndexerBolt extends AbstractIndexerBolt } } + /** + * Hook method for subclasses to customize the JSON document before it is indexed. + * + * <p>This method is called after the default fields (text, URL, filtered metadata) have been + * added to the {@link XContentBuilder}. Implementations can add, remove, or transform fields to + * suit application-specific needs. + * + * <p>This extension point is particularly useful when the index mapping has been customized to + * include additional top-level fields beyond those provided by the default implementation. + * Subclasses can ensure these fields are properly populated during indexing. + * + * @param builder the {@link XContentBuilder} used to construct the document + * @param metadata the {@link Metadata} associated with the document + * @param tuple the input {@link Tuple} containing the crawl data + * @throws IOException if adding fields to the builder fails + */ + protected void customizeDocument(XContentBuilder builder, Metadata metadata, Tuple tuple) + throws IOException { + // Default implementation is empty. Subclasses may override. + } + /** * Must be overridden for implementing custom index names based on some metadata information By * Default, indexName coming from config is used diff --git a/external/opensearch/src/test/java/org/apache/stormcrawler/opensearch/bolt/CustomizedIndexerBoltTest.java b/external/opensearch/src/test/java/org/apache/stormcrawler/opensearch/bolt/CustomizedIndexerBoltTest.java new file mode 100644 index 00000000..460ec0e9 --- /dev/null +++ b/external/opensearch/src/test/java/org/apache/stormcrawler/opensearch/bolt/CustomizedIndexerBoltTest.java @@ -0,0 +1,155 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.stormcrawler.opensearch.bolt; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import org.apache.storm.task.OutputCollector; +import org.apache.storm.tuple.Tuple; +import org.apache.stormcrawler.Constants; +import org.apache.stormcrawler.Metadata; +import org.apache.stormcrawler.TestOutputCollector; +import org.apache.stormcrawler.TestUtil; +import org.apache.stormcrawler.indexing.AbstractIndexerBolt; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.Timeout; +import org.opensearch.core.xcontent.XContentBuilder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +class CustomizedIndexerBoltTest extends AbstractOpenSearchTest { + + private IndexerBolt bolt; + + protected TestOutputCollector output; + + private static final Logger LOG = LoggerFactory.getLogger(CustomizedIndexerBoltTest.class); + + private static ExecutorService executorService; + + @BeforeAll + static void beforeClass() { + executorService = Executors.newFixedThreadPool(2); + } + + @AfterAll + static void afterClass() { + executorService.shutdown(); + executorService = null; + } + + @BeforeEach + void setupIndexerBolt() { + bolt = new CustomIndexerBolt("content"); + // give the indexer the port for connecting to OpenSearch + final String host = opensearchContainer.getHost(); + final Integer port = opensearchContainer.getFirstMappedPort(); + final Map<String, Object> conf = new HashMap<>(); + conf.put(AbstractIndexerBolt.urlFieldParamName, "url"); + conf.put(AbstractIndexerBolt.canonicalMetadataParamName, "canonical"); + conf.put("opensearch.indexer.addresses", host + ":" + port); + output = new TestOutputCollector(); + bolt.prepare(conf, TestUtil.getMockedTopologyContext(), new OutputCollector(output)); + } + + @AfterEach + void close() { + LOG.info("Closing indexer bolt and Opensearch container"); + super.close(); + bolt.cleanup(); + output = null; + } + + private void index(String url, String text, Metadata metadata) { + Tuple tuple = mock(Tuple.class); + when(tuple.getStringByField("text")).thenReturn(text); + when(tuple.getStringByField("url")).thenReturn(url); + when(tuple.getValueByField("metadata")).thenReturn(metadata); + bolt.execute(tuple); + } + + private int lastIndex(String url, String text, Metadata metadata, long timeoutInMs) + throws ExecutionException, InterruptedException, TimeoutException { + var oldSize = output.getEmitted(Constants.StatusStreamName).size(); + index(url, text, metadata); + return executorService + .submit( + () -> { + // check that something has been emitted out + var outputSize = output.getEmitted(Constants.StatusStreamName).size(); + while (outputSize == oldSize) { + Thread.sleep(100); + outputSize = output.getEmitted(Constants.StatusStreamName).size(); + } + return outputSize; + }) + .get(timeoutInMs, TimeUnit.MILLISECONDS); + } + + @Test + @Timeout(value = 2, unit = TimeUnit.MINUTES) + void simultaneousCanonicals() + throws ExecutionException, InterruptedException, TimeoutException { + Metadata m1 = new Metadata(); + String url = + "https://www.obozrevatel.com/ukr/dnipro/city/u-dnipri-ta-oblasti-ogolosili-shtormove-poperedzhennya.htm"; + m1.addValue("canonical", url); + Metadata m2 = new Metadata(); + String url2 = + "https://www.obozrevatel.com/ukr/dnipro/city/u-dnipri-ta-oblasti-ogolosili-shtormove-poperedzhennya/amp.htm"; + m2.addValue("canonical", url); + index(url, "", m1); + lastIndex(url2, "", m2, 10_000); + // should be two in status output + assertEquals(2, output.getEmitted(Constants.StatusStreamName).size()); + // and 2 acked + assertEquals(2, output.getAckedTuples().size()); + + assertEquals(2, CustomIndexerBolt.called); + } + + private static class CustomIndexerBolt extends IndexerBolt { + + public static int called = 0; + + public CustomIndexerBolt(String content) { + super(content); + } + + @Override + protected void customizeDocument(XContentBuilder builder, Metadata metadata, Tuple tuple) { + assertNotNull(builder); + assertNotNull(metadata); + assertNotNull(tuple); + called++; + } + } +}