This is an automated email from the ASF dual-hosted git repository.

rzo1 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/stormcrawler.git


The following commit(s) were added to refs/heads/main by this push:
     new e222c7d0 #1611 - Refactor SQL module to use PreparedStatement in 
SQLSpout and IndexerBolt (#1766)
e222c7d0 is described below

commit e222c7d04cf3255c119fd1a5e80510c77a39d73d
Author: Raju Gupta <[email protected]>
AuthorDate: Sun Jan 11 18:31:33 2026 +0000

    #1611 - Refactor SQL module to use PreparedStatement in SQLSpout and 
IndexerBolt (#1766)
    
    * Refactor SQL module to use PreparedStatement in SQLSpout and IndexerBolt 
for improved readability and performance
    
    * Refactor SQL module to use PreparedStatement in SQLSpout and IndexerBolt 
for improved readability and performance
    
    * Fix forbidden violations
    
    * changes following PR review. Added tests for SQLSpout
    
    * changes following PR review. Added tests for SQLSpout
    
    * changes following PR review. Added tests for SQLSpout
    
    * changes following PR review. Added tests for StatusUpdaterBolt
    
    * Add test dependencies and update MySQL container version in tests
    
    * Fix falkiness in testStoreDiscoveredURL
    
    * ScheduledExecutorService should be shit down when the bolt is cleaned up.
    
    * Refactor IndexerBolt to improve code clarity and add unit tests for 
indexing functionality
    
    * Introduce AbstractSQLTest for shared MySQL container setup in SQL tests
    
    * Introduce AbstractSQLTest for shared MySQL container setup in SQL tests
    
    * Introduce AbstractSQLTest for shared MySQL container setup in SQL tests
    
    * Introduce AbstractSQLTest for shared MySQL container setup in SQL tests
    
    * Introduce AbstractSQLTest for shared MySQL container setup in SQL tests
    
    * Introduce AbstractSQLTest for shared MySQL container setup in SQL tests
    
    * Introduce AbstractSQLTest for shared MySQL container setup in SQL tests
    
    * Introduce AbstractSQLTest for shared MySQL container setup in SQL tests
    
    * Refactor StatusUpdaterBoltTest to improve setup and cleanup methods
    
    * Undo changes in MetricsConsumer
    
    * minor changes
    
    * minor changes
    
    * Changes following PR review.
    
    * Changes following PR review.
    
    ---------
    
    Co-authored-by: raju.gupta <[email protected]>
---
 .../java/org/apache/stormcrawler/TestUtil.java     |  38 +++
 external/sql/pom.xml                               |  29 +-
 .../org/apache/stormcrawler/sql/IndexerBolt.java   | 101 ++++---
 .../java/org/apache/stormcrawler/sql/SQLSpout.java | 199 +++++++------
 .../java/org/apache/stormcrawler/sql/SQLUtil.java  |  21 +-
 .../apache/stormcrawler/sql/StatusUpdaterBolt.java | 133 ++++++---
 .../apache/stormcrawler/sql/AbstractSQLTest.java   |  92 ++++++
 .../apache/stormcrawler/sql/IndexerBoltTest.java   | 311 +++++++++++++++++++++
 .../org/apache/stormcrawler/sql/SQLSpoutTest.java  | 237 ++++++++++++++++
 .../stormcrawler/sql/StatusUpdaterBoltTest.java    | 146 ++++++++++
 pom.xml                                            |   7 +
 11 files changed, 1130 insertions(+), 184 deletions(-)

diff --git a/core/src/test/java/org/apache/stormcrawler/TestUtil.java 
b/core/src/test/java/org/apache/stormcrawler/TestUtil.java
index 033255c4..d8358489 100644
--- a/core/src/test/java/org/apache/stormcrawler/TestUtil.java
+++ b/core/src/test/java/org/apache/stormcrawler/TestUtil.java
@@ -23,6 +23,8 @@ import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
 import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.List;
 import java.util.Map;
 import org.apache.storm.metric.api.IMetric;
 import org.apache.storm.task.TopologyContext;
@@ -48,6 +50,42 @@ public class TestUtil {
         return context;
     }
 
+    /**
+     * Creates a mocked TopologyContext for testing bucket partitioning.
+     *
+     * @param taskIndex The task index for this spout instance (determines 
bucket number)
+     * @param totalTasks Total number of tasks in the topology
+     * @param componentId The component ID
+     * @return Mocked TopologyContext
+     */
+    public static TopologyContext getMockedTopologyContextWithBucket(
+            int taskIndex, int totalTasks, String componentId) {
+        TopologyContext context = mock(TopologyContext.class);
+
+        // Mock metric registration
+        when(context.registerMetric(anyString(), any(IMetric.class), anyInt()))
+                .thenAnswer(
+                        new Answer<IMetric>() {
+                            @Override
+                            public IMetric answer(InvocationOnMock invocation) 
throws Throwable {
+                                return invocation.getArgument(1, 
IMetric.class);
+                            }
+                        });
+
+        // Mock task information for bucket assignment
+        when(context.getThisTaskIndex()).thenReturn(taskIndex);
+        when(context.getThisComponentId()).thenReturn(componentId);
+
+        // Create list of task IDs (0 to totalTasks-1)
+        List<Integer> taskIds = new ArrayList<>();
+        for (int i = 0; i < totalTasks; i++) {
+            taskIds.add(i);
+        }
+        when(context.getComponentTasks(componentId)).thenReturn(taskIds);
+
+        return context;
+    }
+
     public static Tuple getMockedTestTuple(String url, String content, 
Metadata metadata) {
         Tuple tuple = mock(Tuple.class);
         when(tuple.getStringByField("url")).thenReturn(url);
diff --git a/external/sql/pom.xml b/external/sql/pom.xml
index 0eece4e6..2b0ece44 100644
--- a/external/sql/pom.xml
+++ b/external/sql/pom.xml
@@ -37,13 +37,28 @@ under the License.
        <description>SQL-based resources for StormCrawler</description>
 
        <dependencies>
-       <!--  
-               <dependency>
-                       <groupId>mysql</groupId>
-                       <artifactId>mysql-connector-java</artifactId>
-                       <version>5.1.31</version>
-               </dependency>
-               -->
+
+        <dependency>
+            <groupId>org.testcontainers</groupId>
+            <artifactId>junit-jupiter</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.testcontainers</groupId>
+            <artifactId>mysql</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>com.mysql</groupId>
+            <artifactId>mysql-connector-j</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.stormcrawler</groupId>
+            <artifactId>stormcrawler-core</artifactId>
+            <version>${project.version}</version>
+            <type>test-jar</type>
+            <scope>test</scope>
+        </dependency>
        </dependencies>
        
 </project>
diff --git 
a/external/sql/src/main/java/org/apache/stormcrawler/sql/IndexerBolt.java 
b/external/sql/src/main/java/org/apache/stormcrawler/sql/IndexerBolt.java
index 89b711f3..d9b0db0c 100644
--- a/external/sql/src/main/java/org/apache/stormcrawler/sql/IndexerBolt.java
+++ b/external/sql/src/main/java/org/apache/stormcrawler/sql/IndexerBolt.java
@@ -21,7 +21,11 @@ import static 
org.apache.stormcrawler.Constants.StatusStreamName;
 import java.sql.Connection;
 import java.sql.PreparedStatement;
 import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Locale;
 import java.util.Map;
+import java.util.stream.Collectors;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.storm.metric.api.MultiCountMetric;
 import org.apache.storm.task.OutputCollector;
@@ -42,12 +46,14 @@ public class IndexerBolt extends AbstractIndexerBolt {
 
     public static final String SQL_INDEX_TABLE_PARAM_NAME = "sql.index.table";
 
-    private OutputCollector _collector;
+    private OutputCollector collector;
 
     private MultiCountMetric eventCounter;
 
     private Connection connection;
 
+    private PreparedStatement preparedStmt;
+
     private String tableName;
 
     private Map<String, Object> conf;
@@ -56,7 +62,7 @@ public class IndexerBolt extends AbstractIndexerBolt {
     public void prepare(
             Map<String, Object> conf, TopologyContext context, OutputCollector 
collector) {
         super.prepare(conf, context, collector);
-        _collector = collector;
+        this.collector = collector;
 
         this.eventCounter = context.registerMetric("SQLIndexer", new 
MultiCountMetric(), 10);
 
@@ -74,15 +80,14 @@ public class IndexerBolt extends AbstractIndexerBolt {
         String normalisedurl = valueForURL(tuple);
 
         Metadata metadata = (Metadata) tuple.getValueByField("metadata");
-        String text = tuple.getStringByField("text");
 
         boolean keep = filterDocument(metadata);
         if (!keep) {
             eventCounter.scope("Filtered").incrBy(1);
             // treat it as successfully processed even if
             // we do not index it
-            _collector.emit(StatusStreamName, tuple, new Values(url, metadata, 
Status.FETCHED));
-            _collector.ack(tuple);
+            collector.emit(StatusStreamName, tuple, new Values(url, metadata, 
Status.FETCHED));
+            collector.ack(tuple);
             return;
         }
 
@@ -90,35 +95,9 @@ public class IndexerBolt extends AbstractIndexerBolt {
 
             // which metadata to display?
             Map<String, String[]> keyVals = filterMetadata(metadata);
+            List<String> keys = new ArrayList<>(keyVals.keySet());
 
-            StringBuilder query =
-                    new StringBuilder(" insert into ")
-                            .append(tableName)
-                            .append(" (")
-                            .append(fieldNameForURL());
-
-            Object[] keys = keyVals.keySet().toArray();
-
-            for (Object o : keys) {
-                query.append(", ").append((String) o);
-            }
-
-            query.append(") values(?");
-
-            for (int i = 0; i < keys.length; i++) {
-                query.append(", ?");
-            }
-
-            query.append(")");
-
-            query.append(" ON DUPLICATE KEY UPDATE ");
-            for (int i = 0; i < keys.length; i++) {
-                String key = (String) keys[i];
-                if (i > 0) {
-                    query.append(", ");
-                }
-                query.append(key).append("=VALUES(").append(key).append(")");
-            }
+            String query = buildQuery(keys);
 
             if (connection == null) {
                 try {
@@ -131,9 +110,8 @@ public class IndexerBolt extends AbstractIndexerBolt {
 
             LOG.debug("PreparedStatement => {}", query);
 
-            // create the mysql insert preparedstatement
-            PreparedStatement preparedStmt = 
connection.prepareStatement(query.toString());
-
+            // create the mysql insert PreparedStatement
+            preparedStmt = connection.prepareStatement(query);
             // TODO store the text of the document?
             if (StringUtils.isNotBlank(fieldNameForText())) {
                 // builder.field(fieldNameForText(), trimText(text));
@@ -144,21 +122,21 @@ public class IndexerBolt extends AbstractIndexerBolt {
                 preparedStmt.setString(1, normalisedurl);
             }
 
-            for (int i = 0; i < keys.length; i++) {
-                insert(preparedStmt, i + 2, (String) keys[i], keyVals);
+            // Set all metadata parameters
+            for (int i = 0; i < keys.size(); i++) {
+                insert(preparedStmt, i + 2, keys.get(i), keyVals);
             }
-
+            // Execute the statement (single row insert)
             preparedStmt.executeUpdate();
 
             eventCounter.scope("Indexed").incrBy(1);
 
-            _collector.emit(StatusStreamName, tuple, new Values(url, metadata, 
Status.FETCHED));
-            _collector.ack(tuple);
-
+            collector.emit(StatusStreamName, tuple, new Values(url, metadata, 
Status.FETCHED));
+            collector.ack(tuple);
         } catch (Exception e) {
             // do not send to status stream so that it gets replayed
             LOG.error("Error inserting into SQL", e);
-            _collector.fail(tuple);
+            collector.fail(tuple);
             if (connection != null) {
                 // reset the connection
                 try {
@@ -188,4 +166,41 @@ public class IndexerBolt extends AbstractIndexerBolt {
         }
         preparedStmt.setString(position, value);
     }
+
+    private String buildQuery(final List<String> keys) {
+        final String columns = String.join(", ", keys);
+        final String placeholders = keys.stream().map(k -> 
"?").collect(Collectors.joining(", "));
+
+        final String updates =
+                keys.stream()
+                        .map(k -> String.format(Locale.ROOT, "%s=VALUES(%s)", 
k, k))
+                        .collect(Collectors.joining(", "));
+
+        // Build the ON DUPLICATE KEY UPDATE clause
+        // If there are metadata keys, update them; otherwise, update the URL 
field to itself
+        final String updateClause =
+                updates.isEmpty()
+                        ? String.format(
+                                Locale.ROOT, "%s=VALUES(%s)", 
fieldNameForURL(), fieldNameForURL())
+                        : updates;
+
+        return String.format(
+                Locale.ROOT,
+                """
+                            INSERT INTO %s (%s%s)
+                            VALUES (?%s)
+                            ON DUPLICATE KEY UPDATE %s
+                            """,
+                tableName,
+                fieldNameForURL(),
+                columns.isEmpty() ? "" : ", " + columns,
+                placeholders.isEmpty() ? "" : ", " + placeholders,
+                updateClause);
+    }
+
+    @Override
+    public void cleanup() {
+        SQLUtil.closeResource(preparedStmt, "prepared statement");
+        SQLUtil.closeResource(connection, "connection");
+    }
 }
diff --git 
a/external/sql/src/main/java/org/apache/stormcrawler/sql/SQLSpout.java 
b/external/sql/src/main/java/org/apache/stormcrawler/sql/SQLSpout.java
index 084c5de9..9f2f0638 100644
--- a/external/sql/src/main/java/org/apache/stormcrawler/sql/SQLSpout.java
+++ b/external/sql/src/main/java/org/apache/stormcrawler/sql/SQLSpout.java
@@ -19,12 +19,13 @@ package org.apache.stormcrawler.sql;
 import java.nio.ByteBuffer;
 import java.nio.charset.StandardCharsets;
 import java.sql.Connection;
+import java.sql.PreparedStatement;
 import java.sql.ResultSet;
 import java.sql.SQLException;
-import java.sql.Statement;
 import java.sql.Timestamp;
 import java.time.Instant;
 import java.util.List;
+import java.util.Locale;
 import java.util.Map;
 import org.apache.storm.spout.Scheme;
 import org.apache.storm.spout.SpoutOutputCollector;
@@ -43,9 +44,39 @@ public class SQLSpout extends AbstractQueryingSpout {
 
     private static final Scheme SCHEME = new StringTabScheme();
 
-    private String tableName;
+    private static final String BASE_SQL =
+            """
+    SELECT *
+    FROM (
+        SELECT
+            rank() OVER (PARTITION BY host ORDER BY nextfetchdate DESC, url) 
AS ranking,
+            url,
+            metadata,
+            nextfetchdate
+        FROM %s
+        WHERE nextfetchdate <= ? %s
+    ) AS urls_ranks
+    WHERE urls_ranks.ranking <= ?
+    ORDER BY ranking %s
+    """;
+
+    private static final String BUCKET_CLAUSE =
+            """
+    AND bucket = ?
+    """;
+
+    private static final String LIMIT_CLAUSE =
+            """
+    LIMIT ?
+    """;
+
+    private static final String URL_COLUMN = "url";
+    private static final String METADATA_COLUMN = "metadata";
+
+    private static String preparedSql;
 
     private Connection connection;
+    private PreparedStatement ps;
 
     /**
      * if more than one instance of the spout exist, each one is in charge of 
a separate bucket
@@ -70,7 +101,8 @@ public class SQLSpout extends AbstractQueryingSpout {
 
         maxDocsPerBucket = ConfUtils.getInt(conf, 
Constants.SQL_MAX_DOCS_BUCKET_PARAM_NAME, 5);
 
-        tableName = ConfUtils.getString(conf, 
Constants.SQL_STATUS_TABLE_PARAM_NAME, "urls");
+        final String tableName =
+                ConfUtils.getString(conf, 
Constants.SQL_STATUS_TABLE_PARAM_NAME, "urls");
 
         maxNumResults = ConfUtils.getInt(conf, 
Constants.SQL_MAXRESULTS_PARAM_NAME, 100);
 
@@ -88,6 +120,18 @@ public class SQLSpout extends AbstractQueryingSpout {
                     "[" + context.getThisComponentId() + " #" + 
context.getThisTaskIndex() + "] ";
             bucketNum = context.getThisTaskIndex();
         }
+
+        final String bucketClause = (bucketNum >= 0) ? BUCKET_CLAUSE : "";
+        final String limitClause = (maxNumResults != -1) ? LIMIT_CLAUSE : "";
+
+        preparedSql = String.format(Locale.ROOT, BASE_SQL, tableName, 
bucketClause, limitClause);
+
+        try {
+            ps = connection.prepareStatement(preparedSql);
+        } catch (SQLException e) {
+            LOG.error("Failed to prepare statement", e);
+            throw new RuntimeException(e);
+        }
     }
 
     @Override
@@ -117,97 +161,41 @@ public class SQLSpout extends AbstractQueryingSpout {
         // https://mariadb.com/kb/en/library/window-functions-overview/
         // 
http://www.mysqltutorial.org/mysql-window-functions/mysql-rank-function/
 
-        String query =
-                "SELECT * from (select rank() over (partition by host order by 
nextfetchdate desc, url) as ranking, url, metadata, nextfetchdate from "
-                        + tableName;
-
-        query +=
-                " WHERE nextfetchdate <= '" + new 
Timestamp(lastNextFetchDate.toEpochMilli()) + "'";
-
-        // constraint on bucket num
-        if (bucketNum >= 0) {
-            query += " AND bucket = '" + bucketNum + "'";
-        }
-
-        query +=
-                ") as urls_ranks where (urls_ranks.ranking <= "
-                        + maxDocsPerBucket
-                        + ") order by ranking";
-
-        if (maxNumResults != -1) {
-            query += " LIMIT " + this.maxNumResults;
-        }
-
         int alreadyprocessed = 0;
         int numhits = 0;
 
         long timeStartQuery = System.currentTimeMillis();
 
-        // create the java statement
-        Statement st = null;
-        ResultSet rs = null;
         try {
-            st = this.connection.createStatement();
+            int i = 1;
+            ps.setTimestamp(i++, new 
Timestamp(lastNextFetchDate.toEpochMilli()));
 
-            // dump query to log
-            LOG.debug("{} SQL query {}", logIdprefix, query);
-
-            // execute the query, and get a java resultset
-            rs = st.executeQuery(query);
-
-            long timeTaken = System.currentTimeMillis() - timeStartQuery;
-            queryTimes.addMeasurement(timeTaken);
-
-            // iterate through the java resultset
-            while (rs.next()) {
-                String url = rs.getString("url");
-                numhits++;
-                // already processed? skip
-                if (beingProcessed.containsKey(url)) {
-                    alreadyprocessed++;
-                    continue;
-                }
-                String metadata = rs.getString("metadata");
-                if (metadata == null) {
-                    metadata = "";
-                } else if (!metadata.startsWith("\t")) {
-                    metadata = "\t" + metadata;
-                }
-                String URLMD = url + metadata;
-                List<Object> v =
-                        
SCHEME.deserialize(ByteBuffer.wrap(URLMD.getBytes(StandardCharsets.UTF_8)));
-                buffer.add(url, (Metadata) v.get(1));
+            if (bucketNum >= 0) {
+                ps.setInt(i++, bucketNum);
             }
 
-            // no results? reset the date
-            if (numhits == 0) {
-                lastNextFetchDate = null;
+            ps.setInt(i++, maxDocsPerBucket);
+
+            if (maxNumResults != -1) {
+                ps.setInt(i++, maxNumResults);
             }
 
-            
eventCounter.scope("already_being_processed").incrBy(alreadyprocessed);
-            eventCounter.scope("queries").incrBy(1);
-            eventCounter.scope("docs").incrBy(numhits);
+            // dump query to log
+            LOG.debug("{} SQL query {}", logIdprefix, preparedSql);
 
-            LOG.info(
-                    "{} SQL query returned {} hits in {} msec with {} already 
being processed",
-                    logIdprefix,
-                    numhits,
-                    timeTaken,
-                    alreadyprocessed);
+            try (ResultSet rs = ps.executeQuery()) {
+                final long timeTaken = recordQueryTiming(timeStartQuery);
+
+                // iterate through the java resultset
+                while (rs.next()) {
+                    numhits++;
+                    alreadyprocessed += processRow(rs);
+                }
 
+                postProcessResults(numhits, alreadyprocessed, timeTaken);
+            }
         } catch (SQLException e) {
             LOG.error("Exception while querying table", e);
-        } finally {
-            try {
-                if (rs != null) rs.close();
-            } catch (SQLException e) {
-                LOG.error("Exception closing resultset", e);
-            }
-            try {
-                if (st != null) st.close();
-            } catch (SQLException e) {
-                LOG.error("Exception closing statement", e);
-            }
         }
     }
 
@@ -226,10 +214,55 @@ public class SQLSpout extends AbstractQueryingSpout {
     @Override
     public void close() {
         super.close();
-        try {
-            connection.close();
-        } catch (SQLException e) {
-            LOG.error("Exception caught while closing SQL connection", e);
+        SQLUtil.closeResource(ps, "prepared statement");
+        SQLUtil.closeResource(connection, "connection");
+    }
+
+    private long recordQueryTiming(long timeStartQuery) {
+        long timeTaken = System.currentTimeMillis() - timeStartQuery;
+        queryTimes.addMeasurement(timeTaken);
+        return timeTaken;
+    }
+
+    private int processRow(final ResultSet rs) throws SQLException {
+
+        final String url = rs.getString(URL_COLUMN);
+        final String metadata = rs.getString(METADATA_COLUMN);
+
+        // already processed? skip
+        if (beingProcessed.containsKey(url)) {
+            return 1;
+        }
+
+        final String normalisedMetadata =
+                (metadata == null || metadata.startsWith("\t")) ? metadata : 
"\t" + metadata;
+
+        final String urlWithMetadata = String.format(Locale.ROOT, "%s%s", url, 
normalisedMetadata);
+        final List<Object> v =
+                SCHEME.deserialize(
+                        
ByteBuffer.wrap(urlWithMetadata.getBytes(StandardCharsets.UTF_8)));
+        buffer.add(url, (Metadata) v.get(1));
+
+        return 0;
+    }
+
+    private void postProcessResults(
+            final int numHits, final int alreadyProcessed, final long 
timeTaken) {
+
+        // no results? reset the date
+        if (numHits == 0) {
+            lastNextFetchDate = null;
         }
+
+        eventCounter.scope("already_being_processed").incrBy(alreadyProcessed);
+        eventCounter.scope("queries").incrBy(1);
+        eventCounter.scope("docs").incrBy(numHits);
+
+        LOG.info(
+                "{} SQL query returned {} hits in {} msec with {} already 
being processed",
+                logIdprefix,
+                numHits,
+                timeTaken,
+                alreadyProcessed);
     }
 }
diff --git 
a/external/sql/src/main/java/org/apache/stormcrawler/sql/SQLUtil.java 
b/external/sql/src/main/java/org/apache/stormcrawler/sql/SQLUtil.java
index ad75f243..c6c197a6 100644
--- a/external/sql/src/main/java/org/apache/stormcrawler/sql/SQLUtil.java
+++ b/external/sql/src/main/java/org/apache/stormcrawler/sql/SQLUtil.java
@@ -20,23 +20,24 @@ import java.sql.Connection;
 import java.sql.DriverManager;
 import java.sql.SQLException;
 import java.util.Map;
-import java.util.Map.Entry;
 import java.util.Properties;
 
 public class SQLUtil {
 
+    private static final org.slf4j.Logger LOG = 
org.slf4j.LoggerFactory.getLogger(SQLUtil.class);
+
     private SQLUtil() {}
 
     public static Connection getConnection(Map<String, Object> stormConf) 
throws SQLException {
         // SQL connection details
-        Map<String, Object> sqlConf = (Map<String, Object>) 
stormConf.get("sql.connection");
+        Map<String, String> sqlConf = (Map<String, String>) 
stormConf.get("sql.connection");
 
         if (sqlConf == null) {
             throw new RuntimeException(
                     "Missing SQL connection config, add a section 
'sql.connection' to the configuration");
         }
 
-        String url = (String) sqlConf.get("url");
+        String url = sqlConf.get("url");
         if (url == null) {
             throw new RuntimeException(
                     "Missing SQL url, add an entry 'url' to the section 
'sql.connection' of the configuration");
@@ -44,10 +45,18 @@ public class SQLUtil {
 
         Properties props = new Properties();
 
-        for (Entry<String, Object> entry : sqlConf.entrySet()) {
-            props.setProperty(entry.getKey(), (String) entry.getValue());
-        }
+        props.putAll(sqlConf);
 
         return DriverManager.getConnection(url, props);
     }
+
+    public static void closeResource(final AutoCloseable resource, final 
String resourceName) {
+        if (resource != null) {
+            try {
+                resource.close();
+            } catch (Exception e) {
+                LOG.error("Error closing {}", resourceName, e);
+            }
+        }
+    }
 }
diff --git 
a/external/sql/src/main/java/org/apache/stormcrawler/sql/StatusUpdaterBolt.java 
b/external/sql/src/main/java/org/apache/stormcrawler/sql/StatusUpdaterBolt.java
index 20ca051e..27dc2719 100644
--- 
a/external/sql/src/main/java/org/apache/stormcrawler/sql/StatusUpdaterBolt.java
+++ 
b/external/sql/src/main/java/org/apache/stormcrawler/sql/StatusUpdaterBolt.java
@@ -16,6 +16,8 @@
  */
 package org.apache.stormcrawler.sql;
 
+import static org.apache.stormcrawler.sql.SQLUtil.closeResource;
+
 import java.sql.Connection;
 import java.sql.PreparedStatement;
 import java.sql.SQLException;
@@ -23,6 +25,7 @@ import java.sql.Timestamp;
 import java.util.Date;
 import java.util.HashMap;
 import java.util.List;
+import java.util.Locale;
 import java.util.Map;
 import java.util.Optional;
 import java.util.concurrent.Executors;
@@ -52,22 +55,19 @@ public class StatusUpdaterBolt extends 
AbstractStatusUpdaterBolt {
     private MultiCountMetric eventCounter;
 
     private Connection connection;
-    private String tableName;
 
     private URLPartitioner partitioner;
     private int maxNumBuckets = -1;
 
     private int batchMaxSize = 1000;
-    private float batchMaxIdleMsec = 2000;
 
     private int currentBatchSize = 0;
 
-    private PreparedStatement insertPreparedStmt = null;
-
     private long lastInsertBatchTime = -1;
 
-    private String updateQuery;
-    private String insertQuery;
+    private PreparedStatement updatePreparedStmt;
+    private PreparedStatement insertPreparedStmt;
+    private ScheduledExecutorService executor;
 
     private final Map<String, List<Tuple>> waitingAck = new HashMap<>();
 
@@ -88,7 +88,8 @@ public class StatusUpdaterBolt extends 
AbstractStatusUpdaterBolt {
 
         this.eventCounter = context.registerMetric("counter", new 
MultiCountMetric(), 10);
 
-        tableName = ConfUtils.getString(stormConf, 
Constants.SQL_STATUS_TABLE_PARAM_NAME, "urls");
+        final String tableName =
+                ConfUtils.getString(stormConf, 
Constants.SQL_STATUS_TABLE_PARAM_NAME, "urls");
 
         batchMaxSize =
                 ConfUtils.getInt(stormConf, 
Constants.SQL_UPDATE_BATCH_SIZE_PARAM_NAME, 1000);
@@ -100,21 +101,39 @@ public class StatusUpdaterBolt extends 
AbstractStatusUpdaterBolt {
             throw new RuntimeException(ex);
         }
 
-        String query =
-                tableName
-                        + " (url, status, nextfetchdate, metadata, bucket, 
host)"
-                        + " values (?, ?, ?, ?, ?, ?)";
-
-        updateQuery = "REPLACE INTO " + query;
-        insertQuery = "INSERT IGNORE INTO " + query;
+        final String baseColumns =
+                """
+                                (url, status, nextfetchdate, metadata, bucket, 
host)
+                                VALUES (?, ?, ?, ?, ?, ?)
+                             """;
+
+        final String updateQuery =
+                String.format(
+                        Locale.ROOT,
+                        """
+                                 REPLACE INTO %s %s
+                         """,
+                        tableName,
+                        baseColumns);
+
+        final String insertQuery =
+                String.format(
+                        Locale.ROOT,
+                        """
+                            INSERT IGNORE INTO %s %s
+        """,
+                        tableName,
+                        baseColumns);
 
         try {
+            updatePreparedStmt = connection.prepareStatement(updateQuery);
             insertPreparedStmt = connection.prepareStatement(insertQuery);
         } catch (SQLException e) {
-            LOG.error(e.getMessage(), e);
+            LOG.error("Failed to prepare statements", e);
+            throw new RuntimeException(e);
         }
 
-        ScheduledExecutorService executor = 
Executors.newSingleThreadScheduledExecutor();
+        executor = Executors.newSingleThreadScheduledExecutor();
         executor.scheduleAtFixedRate(
                 () -> {
                     try {
@@ -162,37 +181,27 @@ public class StatusUpdaterBolt extends 
AbstractStatusUpdaterBolt {
             partition = Math.abs(partitionKey.hashCode() % maxNumBuckets);
         }
 
-        PreparedStatement preparedStmt = this.insertPreparedStmt;
-
         // create in table if does not already exist
         if (isUpdate) {
-            preparedStmt = connection.prepareStatement(updateQuery);
-        }
-
-        preparedStmt.setString(1, url);
-        preparedStmt.setString(2, status.toString());
-        if (nextFetch.isPresent()) {
-            final Timestamp tsp = Timestamp.from(nextFetch.get().toInstant());
-            preparedStmt.setObject(3, tsp);
-        } else {
-            // a value so large it means it will never be refetched
-            preparedStmt.setObject(3, NEVER);
-        }
-        preparedStmt.setString(4, mdAsString.toString());
-        preparedStmt.setInt(5, partition);
-        preparedStmt.setString(6, partitionKey);
-
-        // updates are not batched
-        if (isUpdate) {
-            preparedStmt.executeUpdate();
-            preparedStmt.close();
+            populate(
+                    url,
+                    status,
+                    nextFetch,
+                    mdAsString,
+                    partition,
+                    partitionKey,
+                    updatePreparedStmt);
+
+            // updates are not batched
+            updatePreparedStmt.executeUpdate();
             eventCounter.scope("sql_updates_number").incrBy(1);
             super.ack(t, url);
             return;
         }
 
         // code below is for inserts i.e. DISCOVERED URLs
-        preparedStmt.addBatch();
+        populate(url, status, nextFetch, mdAsString, partition, partitionKey, 
insertPreparedStmt);
+        insertPreparedStmt.addBatch();
 
         if (lastInsertBatchTime == -1) {
             lastInsertBatchTime = System.currentTimeMillis();
@@ -210,12 +219,36 @@ public class StatusUpdaterBolt extends 
AbstractStatusUpdaterBolt {
         eventCounter.scope("sql_inserts_number").incrBy(1);
     }
 
+    private void populate(
+            final String url,
+            final Status status,
+            final Optional<Date> nextFetch,
+            final StringBuilder mdAsString,
+            final int partition,
+            final String partitionKey,
+            final PreparedStatement preparedStmt)
+            throws SQLException {
+        preparedStmt.setString(1, url);
+        preparedStmt.setString(2, status.toString());
+        if (nextFetch.isPresent()) {
+            final Timestamp tsp = Timestamp.from(nextFetch.get().toInstant());
+            preparedStmt.setObject(3, tsp);
+        } else {
+            // a value so large it means it will never be refetched
+            preparedStmt.setObject(3, NEVER);
+        }
+        preparedStmt.setString(4, mdAsString.toString());
+        preparedStmt.setInt(5, partition);
+        preparedStmt.setString(6, partitionKey);
+    }
+
     private synchronized void checkExecuteBatch() throws SQLException {
         if (currentBatchSize == 0) {
             return;
         }
         long now = System.currentTimeMillis();
         // check whether the insert batches need executing
+        final float batchMaxIdleMsec = 2000;
         if ((currentBatchSize == batchMaxSize)) {
             LOG.info("About to execute batch - triggered by size");
         } else if (lastInsertBatchTime + (long) batchMaxIdleMsec < 
System.currentTimeMillis()) {
@@ -253,17 +286,27 @@ public class StatusUpdaterBolt extends 
AbstractStatusUpdaterBolt {
         lastInsertBatchTime = System.currentTimeMillis();
         currentBatchSize = 0;
         waitingAck.clear();
-
-        insertPreparedStmt.close();
-        insertPreparedStmt = connection.prepareStatement(insertQuery);
     }
 
     @Override
     public void cleanup() {
-        if (connection != null)
+        closeResource(updatePreparedStmt, "update prepared statement");
+        closeResource(insertPreparedStmt, "insert prepared statement");
+        closeResource(connection, "connection");
+        closeExecutor();
+    }
+
+    private void closeExecutor() {
+        if (executor != null) {
+            executor.shutdown();
             try {
-                connection.close();
-            } catch (SQLException e) {
+                if (!executor.awaitTermination(30, TimeUnit.SECONDS)) {
+                    executor.shutdownNow();
+                }
+            } catch (InterruptedException e) {
+                executor.shutdownNow();
+                Thread.currentThread().interrupt();
             }
+        }
     }
 }
diff --git 
a/external/sql/src/test/java/org/apache/stormcrawler/sql/AbstractSQLTest.java 
b/external/sql/src/test/java/org/apache/stormcrawler/sql/AbstractSQLTest.java
new file mode 100644
index 00000000..bf454cf1
--- /dev/null
+++ 
b/external/sql/src/test/java/org/apache/stormcrawler/sql/AbstractSQLTest.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.stormcrawler.sql;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Timeout;
+import org.testcontainers.containers.MySQLContainer;
+import org.testcontainers.junit.jupiter.Container;
+import org.testcontainers.junit.jupiter.Testcontainers;
+import org.testcontainers.utility.DockerImageName;
+
+/**
+ * Abstract base class for SQL module tests that provides a shared MySQL 
container. Uses the
+ * Testcontainers singleton pattern to ensure the container is created once 
and reused across all
+ * test classes, improving test performance.
+ */
+@Testcontainers(disabledWithoutDocker = true)
+@Timeout(value = 120, unit = TimeUnit.SECONDS)
+abstract class AbstractSQLTest {
+
+    private static final DockerImageName MYSQL_IMAGE = 
DockerImageName.parse("mysql:8.4.0");
+
+    @Container
+    private static final MySQLContainer<?> MYSQL_CONTAINER =
+            new MySQLContainer<>(MYSQL_IMAGE)
+                    .withDatabaseName("crawl")
+                    .withUsername("crawler")
+                    .withPassword("crawler")
+                    .withReuse(true);
+
+    static Connection testConnection;
+
+    static Map<String, String> createSqlConnectionConfig() {
+        Map<String, String> sqlConnection = new HashMap<>();
+        sqlConnection.put("url", MYSQL_CONTAINER.getJdbcUrl());
+        sqlConnection.put("user", MYSQL_CONTAINER.getUsername());
+        sqlConnection.put("password", MYSQL_CONTAINER.getPassword());
+        return sqlConnection;
+    }
+
+    void execute(String sql) throws SQLException {
+        try (Statement stmt = testConnection.createStatement()) {
+            stmt.execute(sql);
+        }
+    }
+
+    @BeforeAll
+    static void init() throws SQLException {
+        testConnection =
+                DriverManager.getConnection(
+                        MYSQL_CONTAINER.getJdbcUrl(),
+                        MYSQL_CONTAINER.getUsername(),
+                        MYSQL_CONTAINER.getPassword());
+    }
+
+    @BeforeEach
+    void baseSetup() throws Exception {
+        setupTestTables();
+    }
+
+    protected abstract void setupTestTables() throws Exception;
+
+    @AfterAll
+    static void cleanup() throws Exception {
+        if (testConnection != null) {
+            testConnection.close();
+        }
+    }
+}
diff --git 
a/external/sql/src/test/java/org/apache/stormcrawler/sql/IndexerBoltTest.java 
b/external/sql/src/test/java/org/apache/stormcrawler/sql/IndexerBoltTest.java
new file mode 100644
index 00000000..5a34a6b7
--- /dev/null
+++ 
b/external/sql/src/test/java/org/apache/stormcrawler/sql/IndexerBoltTest.java
@@ -0,0 +1,311 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.stormcrawler.sql;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.sql.ResultSet;
+import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.tuple.Tuple;
+import org.apache.stormcrawler.Constants;
+import org.apache.stormcrawler.Metadata;
+import org.apache.stormcrawler.TestOutputCollector;
+import org.apache.stormcrawler.TestUtil;
+import org.apache.stormcrawler.indexing.AbstractIndexerBolt;
+import org.apache.stormcrawler.persistence.Status;
+import org.apache.stormcrawler.util.RobotsTags;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+class IndexerBoltTest extends AbstractSQLTest {
+
+    private TestOutputCollector output;
+    private final String tableName = "content";
+
+    @Override
+    protected void setupTestTables() throws Exception {
+        execute(
+                """
+                DROP TABLE IF EXISTS content
+                """);
+        execute(
+                """
+                CREATE TABLE IF NOT EXISTS content (
+                    url VARCHAR(255) PRIMARY KEY,
+                    title VARCHAR(255),
+                    description TEXT,
+                    keywords VARCHAR(255)
+                )
+                """);
+    }
+
+    @BeforeEach
+    void setup() {
+        output = new TestOutputCollector();
+    }
+
+    @Test
+    void testBasicIndexing() throws Exception {
+        IndexerBolt bolt = createBolt(createBasicConfig());
+        String url = "http://example.com/page1";;
+
+        executeTuple(bolt, url, "This is the page content", getMetadata());
+
+        // Verify URL was stored in database
+        try (Statement stmt = testConnection.createStatement();
+                ResultSet rs =
+                        stmt.executeQuery(
+                                "SELECT * FROM " + tableName + " WHERE url = 
'" + url + "'")) {
+            assertTrue(rs.next(), "URL should be stored in database");
+            assertEquals(url, rs.getString("url"));
+            assertEquals("Test Page Title", rs.getString("title"));
+            assertEquals("Test page description", rs.getString("description"));
+            assertEquals("test, page, keywords", rs.getString("keywords"));
+        }
+
+        // Verify tuple was acked and status emitted
+        assertEquals(1, output.getAckedTuples().size());
+        assertEquals(1, output.getEmitted(Constants.StatusStreamName).size());
+
+        // Verify emitted status is FETCHED
+        List<Object> emitted = 
output.getEmitted(Constants.StatusStreamName).get(0);
+        assertEquals(url, emitted.get(0));
+        assertEquals(Status.FETCHED, emitted.get(2));
+        bolt.cleanup();
+    }
+
+    @Test
+    void testDuplicateHandling() throws Exception {
+        IndexerBolt bolt = createBolt(createBasicConfig());
+
+        String url = "http://example.com/page2";;
+        executeTuple(bolt, url, "Original content", getMetadata());
+
+        // Second indexing with updated content (same URL)
+        Metadata metadata = new Metadata();
+        metadata.addValue("title", "Updated Title");
+        metadata.addValue("description", "Updated description");
+
+        executeTuple(bolt, url, "Updated content", metadata);
+
+        try (Statement stmt = testConnection.createStatement();
+                ResultSet rs =
+                        stmt.executeQuery(
+                                "SELECT * FROM " + tableName + " WHERE url = 
'" + url + "'")) {
+            assertTrue(rs.next());
+            assertEquals("Updated Title", rs.getString("title"));
+            assertEquals("Updated description", rs.getString("description"));
+            assertEquals("test, page, keywords", rs.getString("keywords"));
+        }
+
+        assertEquals(2, output.getAckedTuples().size());
+        bolt.cleanup();
+    }
+
+    @Test
+    void testFilteringByRobotsNoIndex() throws Exception {
+        IndexerBolt bolt = createBolt(createBasicConfig());
+
+        String url = "http://example.com/noindex-page";;
+        Metadata metadata = new Metadata();
+        metadata.addValue("title", "Should Not Be Indexed");
+        metadata.addValue(RobotsTags.ROBOTS_NO_INDEX, "true");
+
+        executeTuple(bolt, url, "Content", metadata);
+
+        // Verify URL was NOT stored in database
+        try (Statement stmt = testConnection.createStatement();
+                ResultSet rs =
+                        stmt.executeQuery(
+                                "SELECT * FROM " + tableName + " WHERE url = 
'" + url + "'")) {
+            assertFalse(rs.next(), "URL with noindex should not be stored in 
database");
+        }
+
+        // But tuple should still be acked and FETCHED status emitted
+        assertEquals(1, output.getAckedTuples().size());
+        assertEquals(1, output.getEmitted(Constants.StatusStreamName).size());
+
+        List<Object> emitted = 
output.getEmitted(Constants.StatusStreamName).get(0);
+        assertEquals(Status.FETCHED, emitted.get(2));
+        bolt.cleanup();
+    }
+
+    @Test
+    void testFilteringByMetadataFilter() throws Exception {
+        // Configure filter to only index documents with indexable=yes
+        Map<String, Object> conf = createBasicConfig();
+        conf.put(AbstractIndexerBolt.metadataFilterParamName, "indexable=yes");
+
+        IndexerBolt bolt = createBolt(conf);
+
+        // Document that should be filtered out (no indexable metadata)
+        String url1 = "http://example.com/filtered-page";;
+        Metadata metadata1 = new Metadata();
+        metadata1.addValue("title", "Filtered Page");
+
+        executeTuple(bolt, url1, "Content", metadata1);
+
+        // Verify filtered document was NOT stored
+        try (Statement stmt = testConnection.createStatement();
+                ResultSet rs =
+                        stmt.executeQuery(
+                                "SELECT * FROM " + tableName + " WHERE url = 
'" + url1 + "'")) {
+            assertFalse(rs.next(), "Filtered document should not be stored");
+        }
+
+        // Document that should be indexed (has indexable=yes)
+        String url2 = "http://example.com/indexed-page";;
+        Metadata metadata2 = new Metadata();
+        metadata2.addValue("title", "Indexed Page");
+        metadata2.addValue("indexable", "yes");
+
+        Tuple tuple2 = createTuple(url2, "Content", metadata2);
+        bolt.execute(tuple2);
+
+        // Verify indexed document WAS stored
+        try (Statement stmt = testConnection.createStatement();
+                ResultSet rs =
+                        stmt.executeQuery(
+                                "SELECT * FROM " + tableName + " WHERE url = 
'" + url2 + "'")) {
+            assertTrue(rs.next(), "Document with indexable=yes should be 
stored");
+            assertEquals("Indexed Page", rs.getString("title"));
+        }
+
+        // Both tuples should be acked with FETCHED status
+        assertEquals(2, output.getAckedTuples().size());
+        assertEquals(2, output.getEmitted(Constants.StatusStreamName).size());
+        bolt.cleanup();
+    }
+
+    @Test
+    void testMetadataExtraction() throws Exception {
+        // Configure to only extract specific metadata fields
+        Map<String, Object> conf = createBasicConfig();
+        // Only map title and description, not keywords
+        List<String> mdMapping = new ArrayList<>();
+        mdMapping.add("title");
+        mdMapping.add("description");
+        conf.put(AbstractIndexerBolt.metadata2fieldParamName, mdMapping);
+
+        IndexerBolt bolt = createBolt(conf);
+
+        String url = "http://example.com/metadata-test";;
+        Metadata metadata = new Metadata();
+        metadata.addValue("title", "Extracted Title");
+        metadata.addValue("description", "Extracted Description");
+        metadata.addValue("keywords", "these,should,not,be,stored");
+        metadata.addValue("author", "Should Not Be Stored");
+
+        executeTuple(bolt, url, "Content", metadata);
+
+        // Verify only configured metadata was stored
+        try (Statement stmt = testConnection.createStatement();
+                ResultSet rs =
+                        stmt.executeQuery(
+                                "SELECT * FROM " + tableName + " WHERE url = 
'" + url + "'")) {
+            assertTrue(rs.next());
+            assertEquals("Extracted Title", rs.getString("title"));
+            assertEquals("Extracted Description", rs.getString("description"));
+            // keywords column should be null since it wasn't in the mapping
+            assertNull(rs.getString("keywords"));
+        }
+
+        assertEquals(1, output.getAckedTuples().size());
+        bolt.cleanup();
+    }
+
+    @Test
+    void testMetadataAliasMapping() throws Exception {
+        // Configure metadata mapping with aliases
+        Map<String, Object> conf = createBasicConfig();
+        List<String> mdMapping = new ArrayList<>();
+        mdMapping.add("parse.title=title"); // map parse.title to title column
+        mdMapping.add("parse.description=description");
+        conf.put(AbstractIndexerBolt.metadata2fieldParamName, mdMapping);
+
+        IndexerBolt bolt = createBolt(conf);
+
+        String url = "http://example.com/alias-test";;
+        Metadata metadata = new Metadata();
+        metadata.addValue("parse.title", "Title from Parser");
+        metadata.addValue("parse.description", "Description from Parser");
+
+        executeTuple(bolt, url, "Content", metadata);
+
+        // Verify aliased metadata was stored correctly
+        try (Statement stmt = testConnection.createStatement();
+                ResultSet rs =
+                        stmt.executeQuery(
+                                "SELECT * FROM " + tableName + " WHERE url = 
'" + url + "'")) {
+            assertTrue(rs.next());
+            assertEquals("Title from Parser", rs.getString("title"));
+            assertEquals("Description from Parser", 
rs.getString("description"));
+        }
+        bolt.cleanup();
+    }
+
+    private Tuple createTuple(String url, String text, Metadata metadata) {
+        Tuple tuple = mock(Tuple.class);
+        when(tuple.getStringByField("url")).thenReturn(url);
+        when(tuple.getStringByField("text")).thenReturn(text);
+        when(tuple.getValueByField("metadata")).thenReturn(metadata);
+        return tuple;
+    }
+
+    private void executeTuple(IndexerBolt bolt, String url, String text, 
Metadata metadata) {
+        Tuple tuple = createTuple(url, text, metadata);
+        bolt.execute(tuple);
+    }
+
+    private Map<String, Object> createBasicConfig() {
+        Map<String, Object> conf = new HashMap<>();
+        conf.put("sql.connection", createSqlConnectionConfig());
+        conf.put(IndexerBolt.SQL_INDEX_TABLE_PARAM_NAME, tableName);
+        conf.put(AbstractIndexerBolt.urlFieldParamName, "url");
+        // Default metadata mapping
+        List<String> mdMapping = new ArrayList<>();
+        mdMapping.add("title");
+        mdMapping.add("description");
+        mdMapping.add("keywords");
+        conf.put(AbstractIndexerBolt.metadata2fieldParamName, mdMapping);
+        return conf;
+    }
+
+    private IndexerBolt createBolt(Map<String, Object> conf) {
+        IndexerBolt bolt = new IndexerBolt();
+        bolt.prepare(conf, TestUtil.getMockedTopologyContext(), new 
OutputCollector(output));
+        return bolt;
+    }
+
+    private Metadata getMetadata() {
+        Metadata metadata = new Metadata();
+        metadata.addValue("title", "Test Page Title");
+        metadata.addValue("description", "Test page description");
+        metadata.addValue("keywords", "test, page, keywords");
+        return metadata;
+    }
+}
diff --git 
a/external/sql/src/test/java/org/apache/stormcrawler/sql/SQLSpoutTest.java 
b/external/sql/src/test/java/org/apache/stormcrawler/sql/SQLSpoutTest.java
new file mode 100644
index 00000000..9b65f2e2
--- /dev/null
+++ b/external/sql/src/test/java/org/apache/stormcrawler/sql/SQLSpoutTest.java
@@ -0,0 +1,237 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.stormcrawler.sql;
+
+import static 
org.apache.stormcrawler.TestUtil.getMockedTopologyContextWithBucket;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.lang.reflect.Field;
+import java.sql.PreparedStatement;
+import java.sql.Timestamp;
+import java.time.Instant;
+import java.time.temporal.ChronoUnit;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.storm.spout.SpoutOutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.stormcrawler.TestOutputCollector;
+import org.apache.stormcrawler.TestUtil;
+import org.apache.stormcrawler.persistence.urlbuffer.URLBuffer;
+import org.junit.jupiter.api.Test;
+
+class SQLSpoutTest extends AbstractSQLTest {
+
+    @Override
+    protected void setupTestTables() throws Exception {
+        execute("DROP TABLE IF EXISTS urls");
+        execute(
+                """
+                CREATE TABLE IF NOT EXISTS urls (
+                    url VARCHAR(255),
+                    status VARCHAR(16) DEFAULT 'DISCOVERED',
+                    nextfetchdate TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
+                    metadata TEXT,
+                    bucket SMALLINT DEFAULT 0,
+                    host VARCHAR(128),
+                    PRIMARY KEY(url)
+                )
+                """);
+    }
+
+    @Test
+    void bufferIsPopulatedAndEmitted() throws Exception {
+        // Insert base test data with past nextfetchdate to ensure they're 
eligible for fetching
+        Instant pastTime = Instant.now().minus(1, ChronoUnit.HOURS);
+        insertTestURL("http://example.com/page1";, 0, "example.com", pastTime);
+        insertTestURL("http://example.com/page2";, 0, "example.com", pastTime);
+        insertTestURL("http://test.com/page1";, 0, "test.com", pastTime);
+
+        TestOutputCollector testCollector = new TestOutputCollector();
+        SQLSpout spout = createSpout(testCollector, 
TestUtil.getMockedTopologyContext());
+
+        // First call to nextTuple() populates the buffer
+        spout.nextTuple();
+
+        final List<String> expectedURLs =
+                Arrays.asList(
+                        "http://example.com/page1";,
+                        "http://example.com/page2";,
+                        "http://test.com/page1";);
+        assertURLsEmitted(spout, testCollector, 3, expectedURLs);
+        spout.close();
+    }
+
+    @Test
+    void testMaxDocsPerBucket() throws Exception {
+        // Add more URLs from example.com to test the maxDocsPerBucket limit
+        // Insert base test data with past nextfetchdate to ensure they're 
eligible for fetching
+        Instant pastTime = Instant.now().minus(1, ChronoUnit.HOURS);
+        insertTestURL("http://example.com/page1";, 0, "example.com", pastTime);
+        insertTestURL("http://example.com/page2";, 0, "example.com", pastTime);
+        insertTestURL("http://test.com/page1";, 0, "test.com", pastTime);
+
+        TestOutputCollector testCollector = new TestOutputCollector();
+        SQLSpout spout = createSpout(testCollector, 
TestUtil.getMockedTopologyContext());
+
+        pastTime = Instant.now().minus(1, ChronoUnit.HOURS);
+        for (int i = 4; i <= 10; i++) {
+            insertTestURL("http://example.com/page"; + i, 0, "example.com", 
pastTime);
+        }
+
+        spout.nextTuple();
+
+        URLBuffer buffer = getBufferFromSpout(spout);
+
+        // With maxDocsPerBucket=5, we should get at most 5 URLs from 
example.com
+        // plus 1 from test.com = 6 total
+        assertEquals(6, buffer.size());
+        spout.close();
+    }
+
+    @Test
+    void testSingleInstanceNoBucketFiltering() throws Exception {
+
+        Instant pastTime = Instant.now().minus(1, ChronoUnit.HOURS);
+
+        // Insert URLs into different buckets
+        insertTestURL("http://site1.com/page1";, 0, "site1.com", pastTime);
+        insertTestURL("http://site2.com/page1";, 1, "site2.com", pastTime);
+        insertTestURL("http://site3.com/page1";, 2, "site3.com", pastTime);
+        insertTestURL("http://site4.com/page1";, 3, "site4.com", pastTime);
+
+        // Create a single spout instance (totalTasks = 1)
+        TestOutputCollector collector = new TestOutputCollector();
+        TopologyContext context = getMockedTopologyContextWithBucket(0, 1, 
"sqlSpout");
+        SQLSpout singleSpout = createSpout(collector, context);
+
+        // Populate buffer
+        singleSpout.nextTuple();
+
+        final List<String> expectedURLs =
+                Arrays.asList(
+                        "http://site1.com/page1";,
+                        "http://site2.com/page1";,
+                        "http://site3.com/page1";,
+                        "http://site4.com/page1";);
+        assertURLsEmitted(singleSpout, collector, 4, expectedURLs);
+        singleSpout.close();
+    }
+
+    @Test
+    void testBucketPartitioningTwoInstances() throws Exception {
+
+        // Insert URLs into different buckets
+        Instant pastTime = Instant.now().minus(1, ChronoUnit.HOURS);
+
+        // Bucket 0 URLs
+        insertTestURL("http://bucket0-site1.com/page1";, 0, 
"bucket0-site1.com", pastTime);
+        insertTestURL("http://bucket0-site2.com/page1";, 0, 
"bucket0-site2.com", pastTime);
+        insertTestURL("http://bucket0-site3.com/page1";, 0, 
"bucket0-site3.com", pastTime);
+
+        // Bucket 1 URLs
+        insertTestURL("http://bucket1-site1.com/page1";, 1, 
"bucket1-site1.com", pastTime);
+        insertTestURL("http://bucket1-site2.com/page1";, 1, 
"bucket1-site2.com", pastTime);
+        insertTestURL("http://bucket1-site3.com/page1";, 1, 
"bucket1-site3.com", pastTime);
+
+        // Create two spout instances with different bucket assignments
+        SQLSpout[] spouts = new SQLSpout[2];
+        for (int i = 0; i < 2; i++) {
+            TestOutputCollector collector = new TestOutputCollector();
+            TopologyContext context = getMockedTopologyContextWithBucket(i, 2, 
"sqlSpout");
+            spouts[i] = createSpout(collector, context);
+            spouts[i].nextTuple();
+            assertURLsEmitted(
+                    spouts[i],
+                    collector,
+                    3,
+                    Arrays.asList(
+                            "http://bucket"; + i + "-site1.com/page1",
+                            "http://bucket"; + i + "-site2.com/page1",
+                            "http://bucket"; + i + "-site3.com/page1"));
+            spouts[i].close();
+        }
+    }
+
+    private void insertTestURL(String url, int bucket, String host, Instant 
time) throws Exception {
+        String sql =
+                """
+    INSERT INTO urls (url, status, nextfetchdate, metadata, bucket, host)
+    VALUES (?, ?, ?, ?, ?, ?)
+    """;
+
+        try (PreparedStatement ps = testConnection.prepareStatement(sql)) {
+            ps.setString(1, url);
+            ps.setString(2, "DISCOVERED");
+            ps.setTimestamp(3, Timestamp.from(time));
+            ps.setString(4, "\tkey=value\tdepth=0");
+            ps.setInt(5, bucket);
+            ps.setString(6, host);
+            ps.executeUpdate();
+        }
+    }
+
+    private Map<String, Object> createTestConfig() {
+        Map<String, Object> conf = new HashMap<>();
+        conf.put("sql.connection", createSqlConnectionConfig());
+        conf.put("sql.status.table", "urls");
+        conf.put("sql.max.urls.per.bucket", 5);
+        conf.put("sql.spout.max.results", 100);
+        conf.put(
+                "urlbuffer.class", 
"org.apache.stormcrawler.persistence.urlbuffer.SimpleURLBuffer");
+        return conf;
+    }
+
+    private URLBuffer getBufferFromSpout(SQLSpout spoutInstance) throws 
Exception {
+        Field bufferField = 
spoutInstance.getClass().getSuperclass().getDeclaredField("buffer");
+        bufferField.setAccessible(true);
+        return (URLBuffer) bufferField.get(spoutInstance);
+    }
+
+    private void assertURLsEmitted(
+            SQLSpout spout,
+            TestOutputCollector collector,
+            int numTuples,
+            List<String> expectedURLs) {
+        assertEquals(0, collector.getEmitted().size());
+
+        // Emit all URLs
+        Set<String> urls = new HashSet<>();
+        for (int i = 0; i < numTuples; i++) {
+            spout.nextTuple();
+        }
+        for (List<Object> tuple : collector.getEmitted()) {
+            urls.add((String) tuple.get(0));
+        }
+
+        for (String url : expectedURLs) {
+            assertTrue(urls.contains(url));
+        }
+    }
+
+    private SQLSpout createSpout(TestOutputCollector collector, 
TopologyContext context) {
+        SQLSpout singleSpout = new SQLSpout();
+        Map<String, Object> conf = createTestConfig();
+        singleSpout.open(conf, context, new SpoutOutputCollector(collector));
+        singleSpout.activate();
+        return singleSpout;
+    }
+}
diff --git 
a/external/sql/src/test/java/org/apache/stormcrawler/sql/StatusUpdaterBoltTest.java
 
b/external/sql/src/test/java/org/apache/stormcrawler/sql/StatusUpdaterBoltTest.java
new file mode 100644
index 00000000..4108150f
--- /dev/null
+++ 
b/external/sql/src/test/java/org/apache/stormcrawler/sql/StatusUpdaterBoltTest.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.stormcrawler.sql;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.sql.ResultSet;
+import java.sql.Statement;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.tuple.Tuple;
+import org.apache.stormcrawler.Metadata;
+import org.apache.stormcrawler.TestOutputCollector;
+import org.apache.stormcrawler.TestUtil;
+import org.apache.stormcrawler.persistence.Status;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+class StatusUpdaterBoltTest extends AbstractSQLTest {
+
+    private TestOutputCollector output;
+    private StatusUpdaterBolt bolt;
+
+    @Override
+    protected void setupTestTables() throws Exception {
+        execute("DROP TABLE IF EXISTS urls");
+        execute(
+                """
+                CREATE TABLE IF NOT EXISTS urls (
+                    url VARCHAR(255),
+                    status VARCHAR(16) DEFAULT 'DISCOVERED',
+                    nextfetchdate TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
+                    metadata TEXT,
+                    bucket SMALLINT DEFAULT 0,
+                    host VARCHAR(128),
+                    PRIMARY KEY(url)
+                )
+                """);
+    }
+
+    @BeforeEach
+    void setup() {
+        output = new TestOutputCollector();
+        bolt = createBolt();
+    }
+
+    @AfterEach
+    void close() {
+        bolt.cleanup();
+    }
+
+    @Test
+    void testStoreDiscoveredURL() throws Exception {
+        String url = "http://example.com/page1";;
+        Metadata metadata = new Metadata();
+        metadata.addValue("key1", "value1");
+
+        Tuple tuple = createTuple(url, Status.DISCOVERED, metadata);
+        bolt.execute(tuple);
+
+        // Trigger batch execution by sending another tuple (which will also 
check the batch)
+        String url2 = "http://example.com/page1-trigger";;
+        Tuple triggerTuple = createTuple(url2, Status.DISCOVERED, metadata);
+        bolt.execute(triggerTuple);
+
+        // Verify URL was stored
+        try (Statement stmt = testConnection.createStatement();
+                ResultSet rs = stmt.executeQuery("SELECT * FROM urls WHERE url 
= '" + url + "'")) {
+            assertTrue(rs.next(), "URL should be stored in database after 
batch execution");
+            assertEquals("DISCOVERED", rs.getString("status"));
+            assertNotNull(rs.getString("metadata"));
+        }
+        bolt.cleanup();
+    }
+
+    @Test
+    void testUpdateURL() throws Exception {
+        String url = "http://example.com/page2";;
+        Metadata metadata = new Metadata();
+        metadata.addValue("key1", "value1");
+
+        // First store as DISCOVERED
+        Tuple tuple1 = createTuple(url, Status.DISCOVERED, metadata);
+        bolt.execute(tuple1);
+
+        // Now update to FETCHED
+        Tuple tuple2 = createTuple(url, Status.FETCHED, metadata);
+        bolt.execute(tuple2);
+
+        // Verify URL was updated
+        try (Statement stmt = testConnection.createStatement();
+                ResultSet rs = stmt.executeQuery("SELECT * FROM urls WHERE url 
= '" + url + "'")) {
+            assertTrue(rs.next());
+            assertEquals("FETCHED", rs.getString("status"));
+            assertNotNull(rs.getString("metadata"));
+        }
+        bolt.cleanup();
+    }
+
+    private Tuple createTuple(String url, Status status, Metadata metadata) {
+        Tuple tuple = mock(Tuple.class);
+        when(tuple.getStringByField("url")).thenReturn(url);
+        when(tuple.getValueByField("status")).thenReturn(status);
+        when(tuple.getValueByField("metadata")).thenReturn(metadata);
+        return tuple;
+    }
+
+    private Map<String, Object> createTestConfig() {
+        Map<String, Object> conf = new HashMap<>();
+        conf.put("sql.connection", createSqlConnectionConfig());
+        conf.put("sql.status.table", "urls");
+        conf.put("sql.status.max.urls.per.bucket", 10);
+        conf.put("scheduler.class", 
"org.apache.stormcrawler.persistence.DefaultScheduler");
+        conf.put("status.updater.cache.spec", 
"maximumSize=10000,expireAfterAccess=1h");
+        conf.put("sql.update.batch.size", 1);
+        return conf;
+    }
+
+    private StatusUpdaterBolt createBolt() {
+        StatusUpdaterBolt statusUpdaterBolt = new StatusUpdaterBolt();
+        Map<String, Object> conf = createTestConfig();
+        statusUpdaterBolt.prepare(
+                conf, TestUtil.getMockedTopologyContext(), new 
OutputCollector(output));
+        return statusUpdaterBolt;
+    }
+}
diff --git a/pom.xml b/pom.xml
index 5c9ee3f6..2ad34c1d 100644
--- a/pom.xml
+++ b/pom.xml
@@ -88,6 +88,7 @@ under the License.
                <jacoco.branchRatio>1.00</jacoco.branchRatio>
                <jacoco.lineRatio>1.00</jacoco.lineRatio>
                <jacoco.complexityRatio>1.00</jacoco.complexityRatio>
+        <mysql-connector-j>9.3.0</mysql-connector-j>
 
                <skip.format.code>true</skip.format.code>
 
@@ -673,6 +674,12 @@ under the License.
                 <version>${commons.codec.version}</version>
             </dependency>
 
+            <dependency>
+                <groupId>com.mysql</groupId>
+                <artifactId>mysql-connector-j</artifactId>
+                <version>${mysql-connector-j}</version>
+            </dependency>
+
                </dependencies>
        </dependencyManagement>
 

Reply via email to