This is an automated email from the ASF dual-hosted git repository.

rzo1 pushed a commit to branch allow-to-modify-indexer-fields-for-storage
in repository https://gitbox.apache.org/repos/asf/stormcrawler.git

commit c4b85972890f78d58908e7ab8d29d8e68873ab6e
Author: Richard Zowalla <r...@apache.org>
AuthorDate: Mon Sep 22 08:45:13 2025 +0200

    #1669 - feat(indexer): add template method to customize document content
    
    Introduce a new protected hook method `customizeDocument` in `IndexerBolt`
    that allows subclasses to modify the JSON document before it is indexed.
    
    This makes it easier to support custom index mappings with additional
    top-level fields without duplicating the full `execute()` logic.
    
    The default implementation is a no-op. Subclasses can override to add,
    remove, or transform fields as needed.
---
 .../stormcrawler/opensearch/bolt/IndexerBolt.java  |  23 +++
 .../opensearch/bolt/CustomizedIndexerBoltTest.java | 155 +++++++++++++++++++++
 2 files changed, 178 insertions(+)

diff --git 
a/external/opensearch/src/main/java/org/apache/stormcrawler/opensearch/bolt/IndexerBolt.java
 
b/external/opensearch/src/main/java/org/apache/stormcrawler/opensearch/bolt/IndexerBolt.java
index f03efc67..eef9231e 100644
--- 
a/external/opensearch/src/main/java/org/apache/stormcrawler/opensearch/bolt/IndexerBolt.java
+++ 
b/external/opensearch/src/main/java/org/apache/stormcrawler/opensearch/bolt/IndexerBolt.java
@@ -227,6 +227,8 @@ public class IndexerBolt extends AbstractIndexerBolt
                 }
             }
 
+            customizeDocument(builder, metadata, tuple);
+
             builder.endObject();
 
             final IndexRequest indexRequest =
@@ -270,6 +272,27 @@ public class IndexerBolt extends AbstractIndexerBolt
         }
     }
 
+    /**
+     * Hook method for subclasses to customize the JSON document before it is 
indexed.
+     *
+     * <p>This method is called after the default fields (text, URL, filtered 
metadata) have been
+     * added to the {@link XContentBuilder}. Implementations can add, remove, 
or transform fields to
+     * suit application-specific needs.
+     *
+     * <p>This extension point is particularly useful when the index mapping 
has been customized to
+     * include additional top-level fields beyond those provided by the 
default implementation.
+     * Subclasses can ensure these fields are properly populated during 
indexing.
+     *
+     * @param builder the {@link XContentBuilder} used to construct the 
document
+     * @param metadata the {@link Metadata} associated with the document
+     * @param tuple the input {@link Tuple} containing the crawl data
+     * @throws IOException if adding fields to the builder fails
+     */
+    protected void customizeDocument(XContentBuilder builder, Metadata 
metadata, Tuple tuple)
+            throws IOException {
+        // Default implementation is empty. Subclasses may override.
+    }
+
     /**
      * Must be overridden for implementing custom index names based on some 
metadata information By
      * Default, indexName coming from config is used
diff --git 
a/external/opensearch/src/test/java/org/apache/stormcrawler/opensearch/bolt/CustomizedIndexerBoltTest.java
 
b/external/opensearch/src/test/java/org/apache/stormcrawler/opensearch/bolt/CustomizedIndexerBoltTest.java
new file mode 100644
index 00000000..460ec0e9
--- /dev/null
+++ 
b/external/opensearch/src/test/java/org/apache/stormcrawler/opensearch/bolt/CustomizedIndexerBoltTest.java
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.stormcrawler.opensearch.bolt;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.tuple.Tuple;
+import org.apache.stormcrawler.Constants;
+import org.apache.stormcrawler.Metadata;
+import org.apache.stormcrawler.TestOutputCollector;
+import org.apache.stormcrawler.TestUtil;
+import org.apache.stormcrawler.indexing.AbstractIndexerBolt;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+import org.opensearch.core.xcontent.XContentBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+class CustomizedIndexerBoltTest extends AbstractOpenSearchTest {
+
+    private IndexerBolt bolt;
+
+    protected TestOutputCollector output;
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(CustomizedIndexerBoltTest.class);
+
+    private static ExecutorService executorService;
+
+    @BeforeAll
+    static void beforeClass() {
+        executorService = Executors.newFixedThreadPool(2);
+    }
+
+    @AfterAll
+    static void afterClass() {
+        executorService.shutdown();
+        executorService = null;
+    }
+
+    @BeforeEach
+    void setupIndexerBolt() {
+        bolt = new CustomIndexerBolt("content");
+        // give the indexer the port for connecting to OpenSearch
+        final String host = opensearchContainer.getHost();
+        final Integer port = opensearchContainer.getFirstMappedPort();
+        final Map<String, Object> conf = new HashMap<>();
+        conf.put(AbstractIndexerBolt.urlFieldParamName, "url");
+        conf.put(AbstractIndexerBolt.canonicalMetadataParamName, "canonical");
+        conf.put("opensearch.indexer.addresses", host + ":" + port);
+        output = new TestOutputCollector();
+        bolt.prepare(conf, TestUtil.getMockedTopologyContext(), new 
OutputCollector(output));
+    }
+
+    @AfterEach
+    void close() {
+        LOG.info("Closing indexer bolt and Opensearch container");
+        super.close();
+        bolt.cleanup();
+        output = null;
+    }
+
+    private void index(String url, String text, Metadata metadata) {
+        Tuple tuple = mock(Tuple.class);
+        when(tuple.getStringByField("text")).thenReturn(text);
+        when(tuple.getStringByField("url")).thenReturn(url);
+        when(tuple.getValueByField("metadata")).thenReturn(metadata);
+        bolt.execute(tuple);
+    }
+
+    private int lastIndex(String url, String text, Metadata metadata, long 
timeoutInMs)
+            throws ExecutionException, InterruptedException, TimeoutException {
+        var oldSize = output.getEmitted(Constants.StatusStreamName).size();
+        index(url, text, metadata);
+        return executorService
+                .submit(
+                        () -> {
+                            // check that something has been emitted out
+                            var outputSize = 
output.getEmitted(Constants.StatusStreamName).size();
+                            while (outputSize == oldSize) {
+                                Thread.sleep(100);
+                                outputSize = 
output.getEmitted(Constants.StatusStreamName).size();
+                            }
+                            return outputSize;
+                        })
+                .get(timeoutInMs, TimeUnit.MILLISECONDS);
+    }
+
+    @Test
+    @Timeout(value = 2, unit = TimeUnit.MINUTES)
+    void simultaneousCanonicals()
+            throws ExecutionException, InterruptedException, TimeoutException {
+        Metadata m1 = new Metadata();
+        String url =
+                
"https://www.obozrevatel.com/ukr/dnipro/city/u-dnipri-ta-oblasti-ogolosili-shtormove-poperedzhennya.htm";;
+        m1.addValue("canonical", url);
+        Metadata m2 = new Metadata();
+        String url2 =
+                
"https://www.obozrevatel.com/ukr/dnipro/city/u-dnipri-ta-oblasti-ogolosili-shtormove-poperedzhennya/amp.htm";;
+        m2.addValue("canonical", url);
+        index(url, "", m1);
+        lastIndex(url2, "", m2, 10_000);
+        // should be two in status output
+        assertEquals(2, output.getEmitted(Constants.StatusStreamName).size());
+        // and 2 acked
+        assertEquals(2, output.getAckedTuples().size());
+
+        assertEquals(2, CustomIndexerBolt.called);
+    }
+
+    private static class CustomIndexerBolt extends IndexerBolt {
+
+        public static int called = 0;
+
+        public CustomIndexerBolt(String content) {
+            super(content);
+        }
+
+        @Override
+        protected void customizeDocument(XContentBuilder builder, Metadata 
metadata, Tuple tuple) {
+            assertNotNull(builder);
+            assertNotNull(metadata);
+            assertNotNull(tuple);
+            called++;
+        }
+    }
+}

Reply via email to