This is an automated email from the ASF dual-hosted git repository.
rzo1 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/stormcrawler.git
The following commit(s) were added to refs/heads/main by this push:
new e222c7d0 #1611 - Refactor SQL module to use PreparedStatement in
SQLSpout and IndexerBolt (#1766)
e222c7d0 is described below
commit e222c7d04cf3255c119fd1a5e80510c77a39d73d
Author: Raju Gupta <[email protected]>
AuthorDate: Sun Jan 11 18:31:33 2026 +0000
#1611 - Refactor SQL module to use PreparedStatement in SQLSpout and
IndexerBolt (#1766)
* Refactor SQL module to use PreparedStatement in SQLSpout and IndexerBolt
for improved readability and performance
* Refactor SQL module to use PreparedStatement in SQLSpout and IndexerBolt
for improved readability and performance
* Fix forbidden violations
* changes following PR review. Added tests for SQLSpout
* changes following PR review. Added tests for SQLSpout
* changes following PR review. Added tests for SQLSpout
* changes following PR review. Added tests for StatusUpdaterBolt
* Add test dependencies and update MySQL container version in tests
* Fix falkiness in testStoreDiscoveredURL
* ScheduledExecutorService should be shit down when the bolt is cleaned up.
* Refactor IndexerBolt to improve code clarity and add unit tests for
indexing functionality
* Introduce AbstractSQLTest for shared MySQL container setup in SQL tests
* Introduce AbstractSQLTest for shared MySQL container setup in SQL tests
* Introduce AbstractSQLTest for shared MySQL container setup in SQL tests
* Introduce AbstractSQLTest for shared MySQL container setup in SQL tests
* Introduce AbstractSQLTest for shared MySQL container setup in SQL tests
* Introduce AbstractSQLTest for shared MySQL container setup in SQL tests
* Introduce AbstractSQLTest for shared MySQL container setup in SQL tests
* Introduce AbstractSQLTest for shared MySQL container setup in SQL tests
* Refactor StatusUpdaterBoltTest to improve setup and cleanup methods
* Undo changes in MetricsConsumer
* minor changes
* minor changes
* Changes following PR review.
* Changes following PR review.
---------
Co-authored-by: raju.gupta <[email protected]>
---
.../java/org/apache/stormcrawler/TestUtil.java | 38 +++
external/sql/pom.xml | 29 +-
.../org/apache/stormcrawler/sql/IndexerBolt.java | 101 ++++---
.../java/org/apache/stormcrawler/sql/SQLSpout.java | 199 +++++++------
.../java/org/apache/stormcrawler/sql/SQLUtil.java | 21 +-
.../apache/stormcrawler/sql/StatusUpdaterBolt.java | 133 ++++++---
.../apache/stormcrawler/sql/AbstractSQLTest.java | 92 ++++++
.../apache/stormcrawler/sql/IndexerBoltTest.java | 311 +++++++++++++++++++++
.../org/apache/stormcrawler/sql/SQLSpoutTest.java | 237 ++++++++++++++++
.../stormcrawler/sql/StatusUpdaterBoltTest.java | 146 ++++++++++
pom.xml | 7 +
11 files changed, 1130 insertions(+), 184 deletions(-)
diff --git a/core/src/test/java/org/apache/stormcrawler/TestUtil.java
b/core/src/test/java/org/apache/stormcrawler/TestUtil.java
index 033255c4..d8358489 100644
--- a/core/src/test/java/org/apache/stormcrawler/TestUtil.java
+++ b/core/src/test/java/org/apache/stormcrawler/TestUtil.java
@@ -23,6 +23,8 @@ import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.List;
import java.util.Map;
import org.apache.storm.metric.api.IMetric;
import org.apache.storm.task.TopologyContext;
@@ -48,6 +50,42 @@ public class TestUtil {
return context;
}
+ /**
+ * Creates a mocked TopologyContext for testing bucket partitioning.
+ *
+ * @param taskIndex The task index for this spout instance (determines
bucket number)
+ * @param totalTasks Total number of tasks in the topology
+ * @param componentId The component ID
+ * @return Mocked TopologyContext
+ */
+ public static TopologyContext getMockedTopologyContextWithBucket(
+ int taskIndex, int totalTasks, String componentId) {
+ TopologyContext context = mock(TopologyContext.class);
+
+ // Mock metric registration
+ when(context.registerMetric(anyString(), any(IMetric.class), anyInt()))
+ .thenAnswer(
+ new Answer<IMetric>() {
+ @Override
+ public IMetric answer(InvocationOnMock invocation)
throws Throwable {
+ return invocation.getArgument(1,
IMetric.class);
+ }
+ });
+
+ // Mock task information for bucket assignment
+ when(context.getThisTaskIndex()).thenReturn(taskIndex);
+ when(context.getThisComponentId()).thenReturn(componentId);
+
+ // Create list of task IDs (0 to totalTasks-1)
+ List<Integer> taskIds = new ArrayList<>();
+ for (int i = 0; i < totalTasks; i++) {
+ taskIds.add(i);
+ }
+ when(context.getComponentTasks(componentId)).thenReturn(taskIds);
+
+ return context;
+ }
+
public static Tuple getMockedTestTuple(String url, String content,
Metadata metadata) {
Tuple tuple = mock(Tuple.class);
when(tuple.getStringByField("url")).thenReturn(url);
diff --git a/external/sql/pom.xml b/external/sql/pom.xml
index 0eece4e6..2b0ece44 100644
--- a/external/sql/pom.xml
+++ b/external/sql/pom.xml
@@ -37,13 +37,28 @@ under the License.
<description>SQL-based resources for StormCrawler</description>
<dependencies>
- <!--
- <dependency>
- <groupId>mysql</groupId>
- <artifactId>mysql-connector-java</artifactId>
- <version>5.1.31</version>
- </dependency>
- -->
+
+ <dependency>
+ <groupId>org.testcontainers</groupId>
+ <artifactId>junit-jupiter</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.testcontainers</groupId>
+ <artifactId>mysql</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>com.mysql</groupId>
+ <artifactId>mysql-connector-j</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.stormcrawler</groupId>
+ <artifactId>stormcrawler-core</artifactId>
+ <version>${project.version}</version>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
</dependencies>
</project>
diff --git
a/external/sql/src/main/java/org/apache/stormcrawler/sql/IndexerBolt.java
b/external/sql/src/main/java/org/apache/stormcrawler/sql/IndexerBolt.java
index 89b711f3..d9b0db0c 100644
--- a/external/sql/src/main/java/org/apache/stormcrawler/sql/IndexerBolt.java
+++ b/external/sql/src/main/java/org/apache/stormcrawler/sql/IndexerBolt.java
@@ -21,7 +21,11 @@ import static
org.apache.stormcrawler.Constants.StatusStreamName;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Locale;
import java.util.Map;
+import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;
import org.apache.storm.metric.api.MultiCountMetric;
import org.apache.storm.task.OutputCollector;
@@ -42,12 +46,14 @@ public class IndexerBolt extends AbstractIndexerBolt {
public static final String SQL_INDEX_TABLE_PARAM_NAME = "sql.index.table";
- private OutputCollector _collector;
+ private OutputCollector collector;
private MultiCountMetric eventCounter;
private Connection connection;
+ private PreparedStatement preparedStmt;
+
private String tableName;
private Map<String, Object> conf;
@@ -56,7 +62,7 @@ public class IndexerBolt extends AbstractIndexerBolt {
public void prepare(
Map<String, Object> conf, TopologyContext context, OutputCollector
collector) {
super.prepare(conf, context, collector);
- _collector = collector;
+ this.collector = collector;
this.eventCounter = context.registerMetric("SQLIndexer", new
MultiCountMetric(), 10);
@@ -74,15 +80,14 @@ public class IndexerBolt extends AbstractIndexerBolt {
String normalisedurl = valueForURL(tuple);
Metadata metadata = (Metadata) tuple.getValueByField("metadata");
- String text = tuple.getStringByField("text");
boolean keep = filterDocument(metadata);
if (!keep) {
eventCounter.scope("Filtered").incrBy(1);
// treat it as successfully processed even if
// we do not index it
- _collector.emit(StatusStreamName, tuple, new Values(url, metadata,
Status.FETCHED));
- _collector.ack(tuple);
+ collector.emit(StatusStreamName, tuple, new Values(url, metadata,
Status.FETCHED));
+ collector.ack(tuple);
return;
}
@@ -90,35 +95,9 @@ public class IndexerBolt extends AbstractIndexerBolt {
// which metadata to display?
Map<String, String[]> keyVals = filterMetadata(metadata);
+ List<String> keys = new ArrayList<>(keyVals.keySet());
- StringBuilder query =
- new StringBuilder(" insert into ")
- .append(tableName)
- .append(" (")
- .append(fieldNameForURL());
-
- Object[] keys = keyVals.keySet().toArray();
-
- for (Object o : keys) {
- query.append(", ").append((String) o);
- }
-
- query.append(") values(?");
-
- for (int i = 0; i < keys.length; i++) {
- query.append(", ?");
- }
-
- query.append(")");
-
- query.append(" ON DUPLICATE KEY UPDATE ");
- for (int i = 0; i < keys.length; i++) {
- String key = (String) keys[i];
- if (i > 0) {
- query.append(", ");
- }
- query.append(key).append("=VALUES(").append(key).append(")");
- }
+ String query = buildQuery(keys);
if (connection == null) {
try {
@@ -131,9 +110,8 @@ public class IndexerBolt extends AbstractIndexerBolt {
LOG.debug("PreparedStatement => {}", query);
- // create the mysql insert preparedstatement
- PreparedStatement preparedStmt =
connection.prepareStatement(query.toString());
-
+ // create the mysql insert PreparedStatement
+ preparedStmt = connection.prepareStatement(query);
// TODO store the text of the document?
if (StringUtils.isNotBlank(fieldNameForText())) {
// builder.field(fieldNameForText(), trimText(text));
@@ -144,21 +122,21 @@ public class IndexerBolt extends AbstractIndexerBolt {
preparedStmt.setString(1, normalisedurl);
}
- for (int i = 0; i < keys.length; i++) {
- insert(preparedStmt, i + 2, (String) keys[i], keyVals);
+ // Set all metadata parameters
+ for (int i = 0; i < keys.size(); i++) {
+ insert(preparedStmt, i + 2, keys.get(i), keyVals);
}
-
+ // Execute the statement (single row insert)
preparedStmt.executeUpdate();
eventCounter.scope("Indexed").incrBy(1);
- _collector.emit(StatusStreamName, tuple, new Values(url, metadata,
Status.FETCHED));
- _collector.ack(tuple);
-
+ collector.emit(StatusStreamName, tuple, new Values(url, metadata,
Status.FETCHED));
+ collector.ack(tuple);
} catch (Exception e) {
// do not send to status stream so that it gets replayed
LOG.error("Error inserting into SQL", e);
- _collector.fail(tuple);
+ collector.fail(tuple);
if (connection != null) {
// reset the connection
try {
@@ -188,4 +166,41 @@ public class IndexerBolt extends AbstractIndexerBolt {
}
preparedStmt.setString(position, value);
}
+
+ private String buildQuery(final List<String> keys) {
+ final String columns = String.join(", ", keys);
+ final String placeholders = keys.stream().map(k ->
"?").collect(Collectors.joining(", "));
+
+ final String updates =
+ keys.stream()
+ .map(k -> String.format(Locale.ROOT, "%s=VALUES(%s)",
k, k))
+ .collect(Collectors.joining(", "));
+
+ // Build the ON DUPLICATE KEY UPDATE clause
+ // If there are metadata keys, update them; otherwise, update the URL
field to itself
+ final String updateClause =
+ updates.isEmpty()
+ ? String.format(
+ Locale.ROOT, "%s=VALUES(%s)",
fieldNameForURL(), fieldNameForURL())
+ : updates;
+
+ return String.format(
+ Locale.ROOT,
+ """
+ INSERT INTO %s (%s%s)
+ VALUES (?%s)
+ ON DUPLICATE KEY UPDATE %s
+ """,
+ tableName,
+ fieldNameForURL(),
+ columns.isEmpty() ? "" : ", " + columns,
+ placeholders.isEmpty() ? "" : ", " + placeholders,
+ updateClause);
+ }
+
+ @Override
+ public void cleanup() {
+ SQLUtil.closeResource(preparedStmt, "prepared statement");
+ SQLUtil.closeResource(connection, "connection");
+ }
}
diff --git
a/external/sql/src/main/java/org/apache/stormcrawler/sql/SQLSpout.java
b/external/sql/src/main/java/org/apache/stormcrawler/sql/SQLSpout.java
index 084c5de9..9f2f0638 100644
--- a/external/sql/src/main/java/org/apache/stormcrawler/sql/SQLSpout.java
+++ b/external/sql/src/main/java/org/apache/stormcrawler/sql/SQLSpout.java
@@ -19,12 +19,13 @@ package org.apache.stormcrawler.sql;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.sql.Connection;
+import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
-import java.sql.Statement;
import java.sql.Timestamp;
import java.time.Instant;
import java.util.List;
+import java.util.Locale;
import java.util.Map;
import org.apache.storm.spout.Scheme;
import org.apache.storm.spout.SpoutOutputCollector;
@@ -43,9 +44,39 @@ public class SQLSpout extends AbstractQueryingSpout {
private static final Scheme SCHEME = new StringTabScheme();
- private String tableName;
+ private static final String BASE_SQL =
+ """
+ SELECT *
+ FROM (
+ SELECT
+ rank() OVER (PARTITION BY host ORDER BY nextfetchdate DESC, url)
AS ranking,
+ url,
+ metadata,
+ nextfetchdate
+ FROM %s
+ WHERE nextfetchdate <= ? %s
+ ) AS urls_ranks
+ WHERE urls_ranks.ranking <= ?
+ ORDER BY ranking %s
+ """;
+
+ private static final String BUCKET_CLAUSE =
+ """
+ AND bucket = ?
+ """;
+
+ private static final String LIMIT_CLAUSE =
+ """
+ LIMIT ?
+ """;
+
+ private static final String URL_COLUMN = "url";
+ private static final String METADATA_COLUMN = "metadata";
+
+ private static String preparedSql;
private Connection connection;
+ private PreparedStatement ps;
/**
* if more than one instance of the spout exist, each one is in charge of
a separate bucket
@@ -70,7 +101,8 @@ public class SQLSpout extends AbstractQueryingSpout {
maxDocsPerBucket = ConfUtils.getInt(conf,
Constants.SQL_MAX_DOCS_BUCKET_PARAM_NAME, 5);
- tableName = ConfUtils.getString(conf,
Constants.SQL_STATUS_TABLE_PARAM_NAME, "urls");
+ final String tableName =
+ ConfUtils.getString(conf,
Constants.SQL_STATUS_TABLE_PARAM_NAME, "urls");
maxNumResults = ConfUtils.getInt(conf,
Constants.SQL_MAXRESULTS_PARAM_NAME, 100);
@@ -88,6 +120,18 @@ public class SQLSpout extends AbstractQueryingSpout {
"[" + context.getThisComponentId() + " #" +
context.getThisTaskIndex() + "] ";
bucketNum = context.getThisTaskIndex();
}
+
+ final String bucketClause = (bucketNum >= 0) ? BUCKET_CLAUSE : "";
+ final String limitClause = (maxNumResults != -1) ? LIMIT_CLAUSE : "";
+
+ preparedSql = String.format(Locale.ROOT, BASE_SQL, tableName,
bucketClause, limitClause);
+
+ try {
+ ps = connection.prepareStatement(preparedSql);
+ } catch (SQLException e) {
+ LOG.error("Failed to prepare statement", e);
+ throw new RuntimeException(e);
+ }
}
@Override
@@ -117,97 +161,41 @@ public class SQLSpout extends AbstractQueryingSpout {
// https://mariadb.com/kb/en/library/window-functions-overview/
//
http://www.mysqltutorial.org/mysql-window-functions/mysql-rank-function/
- String query =
- "SELECT * from (select rank() over (partition by host order by
nextfetchdate desc, url) as ranking, url, metadata, nextfetchdate from "
- + tableName;
-
- query +=
- " WHERE nextfetchdate <= '" + new
Timestamp(lastNextFetchDate.toEpochMilli()) + "'";
-
- // constraint on bucket num
- if (bucketNum >= 0) {
- query += " AND bucket = '" + bucketNum + "'";
- }
-
- query +=
- ") as urls_ranks where (urls_ranks.ranking <= "
- + maxDocsPerBucket
- + ") order by ranking";
-
- if (maxNumResults != -1) {
- query += " LIMIT " + this.maxNumResults;
- }
-
int alreadyprocessed = 0;
int numhits = 0;
long timeStartQuery = System.currentTimeMillis();
- // create the java statement
- Statement st = null;
- ResultSet rs = null;
try {
- st = this.connection.createStatement();
+ int i = 1;
+ ps.setTimestamp(i++, new
Timestamp(lastNextFetchDate.toEpochMilli()));
- // dump query to log
- LOG.debug("{} SQL query {}", logIdprefix, query);
-
- // execute the query, and get a java resultset
- rs = st.executeQuery(query);
-
- long timeTaken = System.currentTimeMillis() - timeStartQuery;
- queryTimes.addMeasurement(timeTaken);
-
- // iterate through the java resultset
- while (rs.next()) {
- String url = rs.getString("url");
- numhits++;
- // already processed? skip
- if (beingProcessed.containsKey(url)) {
- alreadyprocessed++;
- continue;
- }
- String metadata = rs.getString("metadata");
- if (metadata == null) {
- metadata = "";
- } else if (!metadata.startsWith("\t")) {
- metadata = "\t" + metadata;
- }
- String URLMD = url + metadata;
- List<Object> v =
-
SCHEME.deserialize(ByteBuffer.wrap(URLMD.getBytes(StandardCharsets.UTF_8)));
- buffer.add(url, (Metadata) v.get(1));
+ if (bucketNum >= 0) {
+ ps.setInt(i++, bucketNum);
}
- // no results? reset the date
- if (numhits == 0) {
- lastNextFetchDate = null;
+ ps.setInt(i++, maxDocsPerBucket);
+
+ if (maxNumResults != -1) {
+ ps.setInt(i++, maxNumResults);
}
-
eventCounter.scope("already_being_processed").incrBy(alreadyprocessed);
- eventCounter.scope("queries").incrBy(1);
- eventCounter.scope("docs").incrBy(numhits);
+ // dump query to log
+ LOG.debug("{} SQL query {}", logIdprefix, preparedSql);
- LOG.info(
- "{} SQL query returned {} hits in {} msec with {} already
being processed",
- logIdprefix,
- numhits,
- timeTaken,
- alreadyprocessed);
+ try (ResultSet rs = ps.executeQuery()) {
+ final long timeTaken = recordQueryTiming(timeStartQuery);
+
+ // iterate through the java resultset
+ while (rs.next()) {
+ numhits++;
+ alreadyprocessed += processRow(rs);
+ }
+ postProcessResults(numhits, alreadyprocessed, timeTaken);
+ }
} catch (SQLException e) {
LOG.error("Exception while querying table", e);
- } finally {
- try {
- if (rs != null) rs.close();
- } catch (SQLException e) {
- LOG.error("Exception closing resultset", e);
- }
- try {
- if (st != null) st.close();
- } catch (SQLException e) {
- LOG.error("Exception closing statement", e);
- }
}
}
@@ -226,10 +214,55 @@ public class SQLSpout extends AbstractQueryingSpout {
@Override
public void close() {
super.close();
- try {
- connection.close();
- } catch (SQLException e) {
- LOG.error("Exception caught while closing SQL connection", e);
+ SQLUtil.closeResource(ps, "prepared statement");
+ SQLUtil.closeResource(connection, "connection");
+ }
+
+ private long recordQueryTiming(long timeStartQuery) {
+ long timeTaken = System.currentTimeMillis() - timeStartQuery;
+ queryTimes.addMeasurement(timeTaken);
+ return timeTaken;
+ }
+
+ private int processRow(final ResultSet rs) throws SQLException {
+
+ final String url = rs.getString(URL_COLUMN);
+ final String metadata = rs.getString(METADATA_COLUMN);
+
+ // already processed? skip
+ if (beingProcessed.containsKey(url)) {
+ return 1;
+ }
+
+ final String normalisedMetadata =
+ (metadata == null || metadata.startsWith("\t")) ? metadata :
"\t" + metadata;
+
+ final String urlWithMetadata = String.format(Locale.ROOT, "%s%s", url,
normalisedMetadata);
+ final List<Object> v =
+ SCHEME.deserialize(
+
ByteBuffer.wrap(urlWithMetadata.getBytes(StandardCharsets.UTF_8)));
+ buffer.add(url, (Metadata) v.get(1));
+
+ return 0;
+ }
+
+ private void postProcessResults(
+ final int numHits, final int alreadyProcessed, final long
timeTaken) {
+
+ // no results? reset the date
+ if (numHits == 0) {
+ lastNextFetchDate = null;
}
+
+ eventCounter.scope("already_being_processed").incrBy(alreadyProcessed);
+ eventCounter.scope("queries").incrBy(1);
+ eventCounter.scope("docs").incrBy(numHits);
+
+ LOG.info(
+ "{} SQL query returned {} hits in {} msec with {} already
being processed",
+ logIdprefix,
+ numHits,
+ timeTaken,
+ alreadyProcessed);
}
}
diff --git
a/external/sql/src/main/java/org/apache/stormcrawler/sql/SQLUtil.java
b/external/sql/src/main/java/org/apache/stormcrawler/sql/SQLUtil.java
index ad75f243..c6c197a6 100644
--- a/external/sql/src/main/java/org/apache/stormcrawler/sql/SQLUtil.java
+++ b/external/sql/src/main/java/org/apache/stormcrawler/sql/SQLUtil.java
@@ -20,23 +20,24 @@ import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.util.Map;
-import java.util.Map.Entry;
import java.util.Properties;
public class SQLUtil {
+ private static final org.slf4j.Logger LOG =
org.slf4j.LoggerFactory.getLogger(SQLUtil.class);
+
private SQLUtil() {}
public static Connection getConnection(Map<String, Object> stormConf)
throws SQLException {
// SQL connection details
- Map<String, Object> sqlConf = (Map<String, Object>)
stormConf.get("sql.connection");
+ Map<String, String> sqlConf = (Map<String, String>)
stormConf.get("sql.connection");
if (sqlConf == null) {
throw new RuntimeException(
"Missing SQL connection config, add a section
'sql.connection' to the configuration");
}
- String url = (String) sqlConf.get("url");
+ String url = sqlConf.get("url");
if (url == null) {
throw new RuntimeException(
"Missing SQL url, add an entry 'url' to the section
'sql.connection' of the configuration");
@@ -44,10 +45,18 @@ public class SQLUtil {
Properties props = new Properties();
- for (Entry<String, Object> entry : sqlConf.entrySet()) {
- props.setProperty(entry.getKey(), (String) entry.getValue());
- }
+ props.putAll(sqlConf);
return DriverManager.getConnection(url, props);
}
+
+ public static void closeResource(final AutoCloseable resource, final
String resourceName) {
+ if (resource != null) {
+ try {
+ resource.close();
+ } catch (Exception e) {
+ LOG.error("Error closing {}", resourceName, e);
+ }
+ }
+ }
}
diff --git
a/external/sql/src/main/java/org/apache/stormcrawler/sql/StatusUpdaterBolt.java
b/external/sql/src/main/java/org/apache/stormcrawler/sql/StatusUpdaterBolt.java
index 20ca051e..27dc2719 100644
---
a/external/sql/src/main/java/org/apache/stormcrawler/sql/StatusUpdaterBolt.java
+++
b/external/sql/src/main/java/org/apache/stormcrawler/sql/StatusUpdaterBolt.java
@@ -16,6 +16,8 @@
*/
package org.apache.stormcrawler.sql;
+import static org.apache.stormcrawler.sql.SQLUtil.closeResource;
+
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;
@@ -23,6 +25,7 @@ import java.sql.Timestamp;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
+import java.util.Locale;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.Executors;
@@ -52,22 +55,19 @@ public class StatusUpdaterBolt extends
AbstractStatusUpdaterBolt {
private MultiCountMetric eventCounter;
private Connection connection;
- private String tableName;
private URLPartitioner partitioner;
private int maxNumBuckets = -1;
private int batchMaxSize = 1000;
- private float batchMaxIdleMsec = 2000;
private int currentBatchSize = 0;
- private PreparedStatement insertPreparedStmt = null;
-
private long lastInsertBatchTime = -1;
- private String updateQuery;
- private String insertQuery;
+ private PreparedStatement updatePreparedStmt;
+ private PreparedStatement insertPreparedStmt;
+ private ScheduledExecutorService executor;
private final Map<String, List<Tuple>> waitingAck = new HashMap<>();
@@ -88,7 +88,8 @@ public class StatusUpdaterBolt extends
AbstractStatusUpdaterBolt {
this.eventCounter = context.registerMetric("counter", new
MultiCountMetric(), 10);
- tableName = ConfUtils.getString(stormConf,
Constants.SQL_STATUS_TABLE_PARAM_NAME, "urls");
+ final String tableName =
+ ConfUtils.getString(stormConf,
Constants.SQL_STATUS_TABLE_PARAM_NAME, "urls");
batchMaxSize =
ConfUtils.getInt(stormConf,
Constants.SQL_UPDATE_BATCH_SIZE_PARAM_NAME, 1000);
@@ -100,21 +101,39 @@ public class StatusUpdaterBolt extends
AbstractStatusUpdaterBolt {
throw new RuntimeException(ex);
}
- String query =
- tableName
- + " (url, status, nextfetchdate, metadata, bucket,
host)"
- + " values (?, ?, ?, ?, ?, ?)";
-
- updateQuery = "REPLACE INTO " + query;
- insertQuery = "INSERT IGNORE INTO " + query;
+ final String baseColumns =
+ """
+ (url, status, nextfetchdate, metadata, bucket,
host)
+ VALUES (?, ?, ?, ?, ?, ?)
+ """;
+
+ final String updateQuery =
+ String.format(
+ Locale.ROOT,
+ """
+ REPLACE INTO %s %s
+ """,
+ tableName,
+ baseColumns);
+
+ final String insertQuery =
+ String.format(
+ Locale.ROOT,
+ """
+ INSERT IGNORE INTO %s %s
+ """,
+ tableName,
+ baseColumns);
try {
+ updatePreparedStmt = connection.prepareStatement(updateQuery);
insertPreparedStmt = connection.prepareStatement(insertQuery);
} catch (SQLException e) {
- LOG.error(e.getMessage(), e);
+ LOG.error("Failed to prepare statements", e);
+ throw new RuntimeException(e);
}
- ScheduledExecutorService executor =
Executors.newSingleThreadScheduledExecutor();
+ executor = Executors.newSingleThreadScheduledExecutor();
executor.scheduleAtFixedRate(
() -> {
try {
@@ -162,37 +181,27 @@ public class StatusUpdaterBolt extends
AbstractStatusUpdaterBolt {
partition = Math.abs(partitionKey.hashCode() % maxNumBuckets);
}
- PreparedStatement preparedStmt = this.insertPreparedStmt;
-
// create in table if does not already exist
if (isUpdate) {
- preparedStmt = connection.prepareStatement(updateQuery);
- }
-
- preparedStmt.setString(1, url);
- preparedStmt.setString(2, status.toString());
- if (nextFetch.isPresent()) {
- final Timestamp tsp = Timestamp.from(nextFetch.get().toInstant());
- preparedStmt.setObject(3, tsp);
- } else {
- // a value so large it means it will never be refetched
- preparedStmt.setObject(3, NEVER);
- }
- preparedStmt.setString(4, mdAsString.toString());
- preparedStmt.setInt(5, partition);
- preparedStmt.setString(6, partitionKey);
-
- // updates are not batched
- if (isUpdate) {
- preparedStmt.executeUpdate();
- preparedStmt.close();
+ populate(
+ url,
+ status,
+ nextFetch,
+ mdAsString,
+ partition,
+ partitionKey,
+ updatePreparedStmt);
+
+ // updates are not batched
+ updatePreparedStmt.executeUpdate();
eventCounter.scope("sql_updates_number").incrBy(1);
super.ack(t, url);
return;
}
// code below is for inserts i.e. DISCOVERED URLs
- preparedStmt.addBatch();
+ populate(url, status, nextFetch, mdAsString, partition, partitionKey,
insertPreparedStmt);
+ insertPreparedStmt.addBatch();
if (lastInsertBatchTime == -1) {
lastInsertBatchTime = System.currentTimeMillis();
@@ -210,12 +219,36 @@ public class StatusUpdaterBolt extends
AbstractStatusUpdaterBolt {
eventCounter.scope("sql_inserts_number").incrBy(1);
}
+ private void populate(
+ final String url,
+ final Status status,
+ final Optional<Date> nextFetch,
+ final StringBuilder mdAsString,
+ final int partition,
+ final String partitionKey,
+ final PreparedStatement preparedStmt)
+ throws SQLException {
+ preparedStmt.setString(1, url);
+ preparedStmt.setString(2, status.toString());
+ if (nextFetch.isPresent()) {
+ final Timestamp tsp = Timestamp.from(nextFetch.get().toInstant());
+ preparedStmt.setObject(3, tsp);
+ } else {
+ // a value so large it means it will never be refetched
+ preparedStmt.setObject(3, NEVER);
+ }
+ preparedStmt.setString(4, mdAsString.toString());
+ preparedStmt.setInt(5, partition);
+ preparedStmt.setString(6, partitionKey);
+ }
+
private synchronized void checkExecuteBatch() throws SQLException {
if (currentBatchSize == 0) {
return;
}
long now = System.currentTimeMillis();
// check whether the insert batches need executing
+ final float batchMaxIdleMsec = 2000;
if ((currentBatchSize == batchMaxSize)) {
LOG.info("About to execute batch - triggered by size");
} else if (lastInsertBatchTime + (long) batchMaxIdleMsec <
System.currentTimeMillis()) {
@@ -253,17 +286,27 @@ public class StatusUpdaterBolt extends
AbstractStatusUpdaterBolt {
lastInsertBatchTime = System.currentTimeMillis();
currentBatchSize = 0;
waitingAck.clear();
-
- insertPreparedStmt.close();
- insertPreparedStmt = connection.prepareStatement(insertQuery);
}
@Override
public void cleanup() {
- if (connection != null)
+ closeResource(updatePreparedStmt, "update prepared statement");
+ closeResource(insertPreparedStmt, "insert prepared statement");
+ closeResource(connection, "connection");
+ closeExecutor();
+ }
+
+ private void closeExecutor() {
+ if (executor != null) {
+ executor.shutdown();
try {
- connection.close();
- } catch (SQLException e) {
+ if (!executor.awaitTermination(30, TimeUnit.SECONDS)) {
+ executor.shutdownNow();
+ }
+ } catch (InterruptedException e) {
+ executor.shutdownNow();
+ Thread.currentThread().interrupt();
}
+ }
}
}
diff --git
a/external/sql/src/test/java/org/apache/stormcrawler/sql/AbstractSQLTest.java
b/external/sql/src/test/java/org/apache/stormcrawler/sql/AbstractSQLTest.java
new file mode 100644
index 00000000..bf454cf1
--- /dev/null
+++
b/external/sql/src/test/java/org/apache/stormcrawler/sql/AbstractSQLTest.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.stormcrawler.sql;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Timeout;
+import org.testcontainers.containers.MySQLContainer;
+import org.testcontainers.junit.jupiter.Container;
+import org.testcontainers.junit.jupiter.Testcontainers;
+import org.testcontainers.utility.DockerImageName;
+
+/**
+ * Abstract base class for SQL module tests that provides a shared MySQL
container. Uses the
+ * Testcontainers singleton pattern to ensure the container is created once
and reused across all
+ * test classes, improving test performance.
+ */
+@Testcontainers(disabledWithoutDocker = true)
+@Timeout(value = 120, unit = TimeUnit.SECONDS)
+abstract class AbstractSQLTest {
+
+ private static final DockerImageName MYSQL_IMAGE =
DockerImageName.parse("mysql:8.4.0");
+
+ @Container
+ private static final MySQLContainer<?> MYSQL_CONTAINER =
+ new MySQLContainer<>(MYSQL_IMAGE)
+ .withDatabaseName("crawl")
+ .withUsername("crawler")
+ .withPassword("crawler")
+ .withReuse(true);
+
+ static Connection testConnection;
+
+ static Map<String, String> createSqlConnectionConfig() {
+ Map<String, String> sqlConnection = new HashMap<>();
+ sqlConnection.put("url", MYSQL_CONTAINER.getJdbcUrl());
+ sqlConnection.put("user", MYSQL_CONTAINER.getUsername());
+ sqlConnection.put("password", MYSQL_CONTAINER.getPassword());
+ return sqlConnection;
+ }
+
+ void execute(String sql) throws SQLException {
+ try (Statement stmt = testConnection.createStatement()) {
+ stmt.execute(sql);
+ }
+ }
+
+ @BeforeAll
+ static void init() throws SQLException {
+ testConnection =
+ DriverManager.getConnection(
+ MYSQL_CONTAINER.getJdbcUrl(),
+ MYSQL_CONTAINER.getUsername(),
+ MYSQL_CONTAINER.getPassword());
+ }
+
+ @BeforeEach
+ void baseSetup() throws Exception {
+ setupTestTables();
+ }
+
+ protected abstract void setupTestTables() throws Exception;
+
+ @AfterAll
+ static void cleanup() throws Exception {
+ if (testConnection != null) {
+ testConnection.close();
+ }
+ }
+}
diff --git
a/external/sql/src/test/java/org/apache/stormcrawler/sql/IndexerBoltTest.java
b/external/sql/src/test/java/org/apache/stormcrawler/sql/IndexerBoltTest.java
new file mode 100644
index 00000000..5a34a6b7
--- /dev/null
+++
b/external/sql/src/test/java/org/apache/stormcrawler/sql/IndexerBoltTest.java
@@ -0,0 +1,311 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.stormcrawler.sql;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.sql.ResultSet;
+import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.tuple.Tuple;
+import org.apache.stormcrawler.Constants;
+import org.apache.stormcrawler.Metadata;
+import org.apache.stormcrawler.TestOutputCollector;
+import org.apache.stormcrawler.TestUtil;
+import org.apache.stormcrawler.indexing.AbstractIndexerBolt;
+import org.apache.stormcrawler.persistence.Status;
+import org.apache.stormcrawler.util.RobotsTags;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+class IndexerBoltTest extends AbstractSQLTest {
+
+ private TestOutputCollector output;
+ private final String tableName = "content";
+
+ @Override
+ protected void setupTestTables() throws Exception {
+ execute(
+ """
+ DROP TABLE IF EXISTS content
+ """);
+ execute(
+ """
+ CREATE TABLE IF NOT EXISTS content (
+ url VARCHAR(255) PRIMARY KEY,
+ title VARCHAR(255),
+ description TEXT,
+ keywords VARCHAR(255)
+ )
+ """);
+ }
+
+ @BeforeEach
+ void setup() {
+ output = new TestOutputCollector();
+ }
+
+ @Test
+ void testBasicIndexing() throws Exception {
+ IndexerBolt bolt = createBolt(createBasicConfig());
+ String url = "http://example.com/page1";
+
+ executeTuple(bolt, url, "This is the page content", getMetadata());
+
+ // Verify URL was stored in database
+ try (Statement stmt = testConnection.createStatement();
+ ResultSet rs =
+ stmt.executeQuery(
+ "SELECT * FROM " + tableName + " WHERE url =
'" + url + "'")) {
+ assertTrue(rs.next(), "URL should be stored in database");
+ assertEquals(url, rs.getString("url"));
+ assertEquals("Test Page Title", rs.getString("title"));
+ assertEquals("Test page description", rs.getString("description"));
+ assertEquals("test, page, keywords", rs.getString("keywords"));
+ }
+
+ // Verify tuple was acked and status emitted
+ assertEquals(1, output.getAckedTuples().size());
+ assertEquals(1, output.getEmitted(Constants.StatusStreamName).size());
+
+ // Verify emitted status is FETCHED
+ List<Object> emitted =
output.getEmitted(Constants.StatusStreamName).get(0);
+ assertEquals(url, emitted.get(0));
+ assertEquals(Status.FETCHED, emitted.get(2));
+ bolt.cleanup();
+ }
+
+ @Test
+ void testDuplicateHandling() throws Exception {
+ IndexerBolt bolt = createBolt(createBasicConfig());
+
+ String url = "http://example.com/page2";
+ executeTuple(bolt, url, "Original content", getMetadata());
+
+ // Second indexing with updated content (same URL)
+ Metadata metadata = new Metadata();
+ metadata.addValue("title", "Updated Title");
+ metadata.addValue("description", "Updated description");
+
+ executeTuple(bolt, url, "Updated content", metadata);
+
+ try (Statement stmt = testConnection.createStatement();
+ ResultSet rs =
+ stmt.executeQuery(
+ "SELECT * FROM " + tableName + " WHERE url =
'" + url + "'")) {
+ assertTrue(rs.next());
+ assertEquals("Updated Title", rs.getString("title"));
+ assertEquals("Updated description", rs.getString("description"));
+ assertEquals("test, page, keywords", rs.getString("keywords"));
+ }
+
+ assertEquals(2, output.getAckedTuples().size());
+ bolt.cleanup();
+ }
+
+ @Test
+ void testFilteringByRobotsNoIndex() throws Exception {
+ IndexerBolt bolt = createBolt(createBasicConfig());
+
+ String url = "http://example.com/noindex-page";
+ Metadata metadata = new Metadata();
+ metadata.addValue("title", "Should Not Be Indexed");
+ metadata.addValue(RobotsTags.ROBOTS_NO_INDEX, "true");
+
+ executeTuple(bolt, url, "Content", metadata);
+
+ // Verify URL was NOT stored in database
+ try (Statement stmt = testConnection.createStatement();
+ ResultSet rs =
+ stmt.executeQuery(
+ "SELECT * FROM " + tableName + " WHERE url =
'" + url + "'")) {
+ assertFalse(rs.next(), "URL with noindex should not be stored in
database");
+ }
+
+ // But tuple should still be acked and FETCHED status emitted
+ assertEquals(1, output.getAckedTuples().size());
+ assertEquals(1, output.getEmitted(Constants.StatusStreamName).size());
+
+ List<Object> emitted =
output.getEmitted(Constants.StatusStreamName).get(0);
+ assertEquals(Status.FETCHED, emitted.get(2));
+ bolt.cleanup();
+ }
+
+ @Test
+ void testFilteringByMetadataFilter() throws Exception {
+ // Configure filter to only index documents with indexable=yes
+ Map<String, Object> conf = createBasicConfig();
+ conf.put(AbstractIndexerBolt.metadataFilterParamName, "indexable=yes");
+
+ IndexerBolt bolt = createBolt(conf);
+
+ // Document that should be filtered out (no indexable metadata)
+ String url1 = "http://example.com/filtered-page";
+ Metadata metadata1 = new Metadata();
+ metadata1.addValue("title", "Filtered Page");
+
+ executeTuple(bolt, url1, "Content", metadata1);
+
+ // Verify filtered document was NOT stored
+ try (Statement stmt = testConnection.createStatement();
+ ResultSet rs =
+ stmt.executeQuery(
+ "SELECT * FROM " + tableName + " WHERE url =
'" + url1 + "'")) {
+ assertFalse(rs.next(), "Filtered document should not be stored");
+ }
+
+ // Document that should be indexed (has indexable=yes)
+ String url2 = "http://example.com/indexed-page";
+ Metadata metadata2 = new Metadata();
+ metadata2.addValue("title", "Indexed Page");
+ metadata2.addValue("indexable", "yes");
+
+ Tuple tuple2 = createTuple(url2, "Content", metadata2);
+ bolt.execute(tuple2);
+
+ // Verify indexed document WAS stored
+ try (Statement stmt = testConnection.createStatement();
+ ResultSet rs =
+ stmt.executeQuery(
+ "SELECT * FROM " + tableName + " WHERE url =
'" + url2 + "'")) {
+ assertTrue(rs.next(), "Document with indexable=yes should be
stored");
+ assertEquals("Indexed Page", rs.getString("title"));
+ }
+
+ // Both tuples should be acked with FETCHED status
+ assertEquals(2, output.getAckedTuples().size());
+ assertEquals(2, output.getEmitted(Constants.StatusStreamName).size());
+ bolt.cleanup();
+ }
+
+ @Test
+ void testMetadataExtraction() throws Exception {
+ // Configure to only extract specific metadata fields
+ Map<String, Object> conf = createBasicConfig();
+ // Only map title and description, not keywords
+ List<String> mdMapping = new ArrayList<>();
+ mdMapping.add("title");
+ mdMapping.add("description");
+ conf.put(AbstractIndexerBolt.metadata2fieldParamName, mdMapping);
+
+ IndexerBolt bolt = createBolt(conf);
+
+ String url = "http://example.com/metadata-test";
+ Metadata metadata = new Metadata();
+ metadata.addValue("title", "Extracted Title");
+ metadata.addValue("description", "Extracted Description");
+ metadata.addValue("keywords", "these,should,not,be,stored");
+ metadata.addValue("author", "Should Not Be Stored");
+
+ executeTuple(bolt, url, "Content", metadata);
+
+ // Verify only configured metadata was stored
+ try (Statement stmt = testConnection.createStatement();
+ ResultSet rs =
+ stmt.executeQuery(
+ "SELECT * FROM " + tableName + " WHERE url =
'" + url + "'")) {
+ assertTrue(rs.next());
+ assertEquals("Extracted Title", rs.getString("title"));
+ assertEquals("Extracted Description", rs.getString("description"));
+ // keywords column should be null since it wasn't in the mapping
+ assertNull(rs.getString("keywords"));
+ }
+
+ assertEquals(1, output.getAckedTuples().size());
+ bolt.cleanup();
+ }
+
+ @Test
+ void testMetadataAliasMapping() throws Exception {
+ // Configure metadata mapping with aliases
+ Map<String, Object> conf = createBasicConfig();
+ List<String> mdMapping = new ArrayList<>();
+ mdMapping.add("parse.title=title"); // map parse.title to title column
+ mdMapping.add("parse.description=description");
+ conf.put(AbstractIndexerBolt.metadata2fieldParamName, mdMapping);
+
+ IndexerBolt bolt = createBolt(conf);
+
+ String url = "http://example.com/alias-test";
+ Metadata metadata = new Metadata();
+ metadata.addValue("parse.title", "Title from Parser");
+ metadata.addValue("parse.description", "Description from Parser");
+
+ executeTuple(bolt, url, "Content", metadata);
+
+ // Verify aliased metadata was stored correctly
+ try (Statement stmt = testConnection.createStatement();
+ ResultSet rs =
+ stmt.executeQuery(
+ "SELECT * FROM " + tableName + " WHERE url =
'" + url + "'")) {
+ assertTrue(rs.next());
+ assertEquals("Title from Parser", rs.getString("title"));
+ assertEquals("Description from Parser",
rs.getString("description"));
+ }
+ bolt.cleanup();
+ }
+
+ private Tuple createTuple(String url, String text, Metadata metadata) {
+ Tuple tuple = mock(Tuple.class);
+ when(tuple.getStringByField("url")).thenReturn(url);
+ when(tuple.getStringByField("text")).thenReturn(text);
+ when(tuple.getValueByField("metadata")).thenReturn(metadata);
+ return tuple;
+ }
+
+ private void executeTuple(IndexerBolt bolt, String url, String text,
Metadata metadata) {
+ Tuple tuple = createTuple(url, text, metadata);
+ bolt.execute(tuple);
+ }
+
+ private Map<String, Object> createBasicConfig() {
+ Map<String, Object> conf = new HashMap<>();
+ conf.put("sql.connection", createSqlConnectionConfig());
+ conf.put(IndexerBolt.SQL_INDEX_TABLE_PARAM_NAME, tableName);
+ conf.put(AbstractIndexerBolt.urlFieldParamName, "url");
+ // Default metadata mapping
+ List<String> mdMapping = new ArrayList<>();
+ mdMapping.add("title");
+ mdMapping.add("description");
+ mdMapping.add("keywords");
+ conf.put(AbstractIndexerBolt.metadata2fieldParamName, mdMapping);
+ return conf;
+ }
+
+ private IndexerBolt createBolt(Map<String, Object> conf) {
+ IndexerBolt bolt = new IndexerBolt();
+ bolt.prepare(conf, TestUtil.getMockedTopologyContext(), new
OutputCollector(output));
+ return bolt;
+ }
+
+ private Metadata getMetadata() {
+ Metadata metadata = new Metadata();
+ metadata.addValue("title", "Test Page Title");
+ metadata.addValue("description", "Test page description");
+ metadata.addValue("keywords", "test, page, keywords");
+ return metadata;
+ }
+}
diff --git
a/external/sql/src/test/java/org/apache/stormcrawler/sql/SQLSpoutTest.java
b/external/sql/src/test/java/org/apache/stormcrawler/sql/SQLSpoutTest.java
new file mode 100644
index 00000000..9b65f2e2
--- /dev/null
+++ b/external/sql/src/test/java/org/apache/stormcrawler/sql/SQLSpoutTest.java
@@ -0,0 +1,237 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.stormcrawler.sql;
+
+import static
org.apache.stormcrawler.TestUtil.getMockedTopologyContextWithBucket;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.lang.reflect.Field;
+import java.sql.PreparedStatement;
+import java.sql.Timestamp;
+import java.time.Instant;
+import java.time.temporal.ChronoUnit;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.storm.spout.SpoutOutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.stormcrawler.TestOutputCollector;
+import org.apache.stormcrawler.TestUtil;
+import org.apache.stormcrawler.persistence.urlbuffer.URLBuffer;
+import org.junit.jupiter.api.Test;
+
+class SQLSpoutTest extends AbstractSQLTest {
+
+ @Override
+ protected void setupTestTables() throws Exception {
+ execute("DROP TABLE IF EXISTS urls");
+ execute(
+ """
+ CREATE TABLE IF NOT EXISTS urls (
+ url VARCHAR(255),
+ status VARCHAR(16) DEFAULT 'DISCOVERED',
+ nextfetchdate TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
+ metadata TEXT,
+ bucket SMALLINT DEFAULT 0,
+ host VARCHAR(128),
+ PRIMARY KEY(url)
+ )
+ """);
+ }
+
+ @Test
+ void bufferIsPopulatedAndEmitted() throws Exception {
+ // Insert base test data with past nextfetchdate to ensure they're
eligible for fetching
+ Instant pastTime = Instant.now().minus(1, ChronoUnit.HOURS);
+ insertTestURL("http://example.com/page1", 0, "example.com", pastTime);
+ insertTestURL("http://example.com/page2", 0, "example.com", pastTime);
+ insertTestURL("http://test.com/page1", 0, "test.com", pastTime);
+
+ TestOutputCollector testCollector = new TestOutputCollector();
+ SQLSpout spout = createSpout(testCollector,
TestUtil.getMockedTopologyContext());
+
+ // First call to nextTuple() populates the buffer
+ spout.nextTuple();
+
+ final List<String> expectedURLs =
+ Arrays.asList(
+ "http://example.com/page1",
+ "http://example.com/page2",
+ "http://test.com/page1");
+ assertURLsEmitted(spout, testCollector, 3, expectedURLs);
+ spout.close();
+ }
+
+ @Test
+ void testMaxDocsPerBucket() throws Exception {
+ // Add more URLs from example.com to test the maxDocsPerBucket limit
+ // Insert base test data with past nextfetchdate to ensure they're
eligible for fetching
+ Instant pastTime = Instant.now().minus(1, ChronoUnit.HOURS);
+ insertTestURL("http://example.com/page1", 0, "example.com", pastTime);
+ insertTestURL("http://example.com/page2", 0, "example.com", pastTime);
+ insertTestURL("http://test.com/page1", 0, "test.com", pastTime);
+
+ TestOutputCollector testCollector = new TestOutputCollector();
+ SQLSpout spout = createSpout(testCollector,
TestUtil.getMockedTopologyContext());
+
+ pastTime = Instant.now().minus(1, ChronoUnit.HOURS);
+ for (int i = 4; i <= 10; i++) {
+ insertTestURL("http://example.com/page" + i, 0, "example.com",
pastTime);
+ }
+
+ spout.nextTuple();
+
+ URLBuffer buffer = getBufferFromSpout(spout);
+
+ // With maxDocsPerBucket=5, we should get at most 5 URLs from
example.com
+ // plus 1 from test.com = 6 total
+ assertEquals(6, buffer.size());
+ spout.close();
+ }
+
+ @Test
+ void testSingleInstanceNoBucketFiltering() throws Exception {
+
+ Instant pastTime = Instant.now().minus(1, ChronoUnit.HOURS);
+
+ // Insert URLs into different buckets
+ insertTestURL("http://site1.com/page1", 0, "site1.com", pastTime);
+ insertTestURL("http://site2.com/page1", 1, "site2.com", pastTime);
+ insertTestURL("http://site3.com/page1", 2, "site3.com", pastTime);
+ insertTestURL("http://site4.com/page1", 3, "site4.com", pastTime);
+
+ // Create a single spout instance (totalTasks = 1)
+ TestOutputCollector collector = new TestOutputCollector();
+ TopologyContext context = getMockedTopologyContextWithBucket(0, 1,
"sqlSpout");
+ SQLSpout singleSpout = createSpout(collector, context);
+
+ // Populate buffer
+ singleSpout.nextTuple();
+
+ final List<String> expectedURLs =
+ Arrays.asList(
+ "http://site1.com/page1",
+ "http://site2.com/page1",
+ "http://site3.com/page1",
+ "http://site4.com/page1");
+ assertURLsEmitted(singleSpout, collector, 4, expectedURLs);
+ singleSpout.close();
+ }
+
+ @Test
+ void testBucketPartitioningTwoInstances() throws Exception {
+
+ // Insert URLs into different buckets
+ Instant pastTime = Instant.now().minus(1, ChronoUnit.HOURS);
+
+ // Bucket 0 URLs
+ insertTestURL("http://bucket0-site1.com/page1", 0,
"bucket0-site1.com", pastTime);
+ insertTestURL("http://bucket0-site2.com/page1", 0,
"bucket0-site2.com", pastTime);
+ insertTestURL("http://bucket0-site3.com/page1", 0,
"bucket0-site3.com", pastTime);
+
+ // Bucket 1 URLs
+ insertTestURL("http://bucket1-site1.com/page1", 1,
"bucket1-site1.com", pastTime);
+ insertTestURL("http://bucket1-site2.com/page1", 1,
"bucket1-site2.com", pastTime);
+ insertTestURL("http://bucket1-site3.com/page1", 1,
"bucket1-site3.com", pastTime);
+
+ // Create two spout instances with different bucket assignments
+ SQLSpout[] spouts = new SQLSpout[2];
+ for (int i = 0; i < 2; i++) {
+ TestOutputCollector collector = new TestOutputCollector();
+ TopologyContext context = getMockedTopologyContextWithBucket(i, 2,
"sqlSpout");
+ spouts[i] = createSpout(collector, context);
+ spouts[i].nextTuple();
+ assertURLsEmitted(
+ spouts[i],
+ collector,
+ 3,
+ Arrays.asList(
+ "http://bucket" + i + "-site1.com/page1",
+ "http://bucket" + i + "-site2.com/page1",
+ "http://bucket" + i + "-site3.com/page1"));
+ spouts[i].close();
+ }
+ }
+
+ private void insertTestURL(String url, int bucket, String host, Instant
time) throws Exception {
+ String sql =
+ """
+ INSERT INTO urls (url, status, nextfetchdate, metadata, bucket, host)
+ VALUES (?, ?, ?, ?, ?, ?)
+ """;
+
+ try (PreparedStatement ps = testConnection.prepareStatement(sql)) {
+ ps.setString(1, url);
+ ps.setString(2, "DISCOVERED");
+ ps.setTimestamp(3, Timestamp.from(time));
+ ps.setString(4, "\tkey=value\tdepth=0");
+ ps.setInt(5, bucket);
+ ps.setString(6, host);
+ ps.executeUpdate();
+ }
+ }
+
+ private Map<String, Object> createTestConfig() {
+ Map<String, Object> conf = new HashMap<>();
+ conf.put("sql.connection", createSqlConnectionConfig());
+ conf.put("sql.status.table", "urls");
+ conf.put("sql.max.urls.per.bucket", 5);
+ conf.put("sql.spout.max.results", 100);
+ conf.put(
+ "urlbuffer.class",
"org.apache.stormcrawler.persistence.urlbuffer.SimpleURLBuffer");
+ return conf;
+ }
+
+ private URLBuffer getBufferFromSpout(SQLSpout spoutInstance) throws
Exception {
+ Field bufferField =
spoutInstance.getClass().getSuperclass().getDeclaredField("buffer");
+ bufferField.setAccessible(true);
+ return (URLBuffer) bufferField.get(spoutInstance);
+ }
+
+ private void assertURLsEmitted(
+ SQLSpout spout,
+ TestOutputCollector collector,
+ int numTuples,
+ List<String> expectedURLs) {
+ assertEquals(0, collector.getEmitted().size());
+
+ // Emit all URLs
+ Set<String> urls = new HashSet<>();
+ for (int i = 0; i < numTuples; i++) {
+ spout.nextTuple();
+ }
+ for (List<Object> tuple : collector.getEmitted()) {
+ urls.add((String) tuple.get(0));
+ }
+
+ for (String url : expectedURLs) {
+ assertTrue(urls.contains(url));
+ }
+ }
+
+ private SQLSpout createSpout(TestOutputCollector collector,
TopologyContext context) {
+ SQLSpout singleSpout = new SQLSpout();
+ Map<String, Object> conf = createTestConfig();
+ singleSpout.open(conf, context, new SpoutOutputCollector(collector));
+ singleSpout.activate();
+ return singleSpout;
+ }
+}
diff --git
a/external/sql/src/test/java/org/apache/stormcrawler/sql/StatusUpdaterBoltTest.java
b/external/sql/src/test/java/org/apache/stormcrawler/sql/StatusUpdaterBoltTest.java
new file mode 100644
index 00000000..4108150f
--- /dev/null
+++
b/external/sql/src/test/java/org/apache/stormcrawler/sql/StatusUpdaterBoltTest.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.stormcrawler.sql;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.sql.ResultSet;
+import java.sql.Statement;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.tuple.Tuple;
+import org.apache.stormcrawler.Metadata;
+import org.apache.stormcrawler.TestOutputCollector;
+import org.apache.stormcrawler.TestUtil;
+import org.apache.stormcrawler.persistence.Status;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+class StatusUpdaterBoltTest extends AbstractSQLTest {
+
+ private TestOutputCollector output;
+ private StatusUpdaterBolt bolt;
+
+ @Override
+ protected void setupTestTables() throws Exception {
+ execute("DROP TABLE IF EXISTS urls");
+ execute(
+ """
+ CREATE TABLE IF NOT EXISTS urls (
+ url VARCHAR(255),
+ status VARCHAR(16) DEFAULT 'DISCOVERED',
+ nextfetchdate TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
+ metadata TEXT,
+ bucket SMALLINT DEFAULT 0,
+ host VARCHAR(128),
+ PRIMARY KEY(url)
+ )
+ """);
+ }
+
+ @BeforeEach
+ void setup() {
+ output = new TestOutputCollector();
+ bolt = createBolt();
+ }
+
+ @AfterEach
+ void close() {
+ bolt.cleanup();
+ }
+
+ @Test
+ void testStoreDiscoveredURL() throws Exception {
+ String url = "http://example.com/page1";
+ Metadata metadata = new Metadata();
+ metadata.addValue("key1", "value1");
+
+ Tuple tuple = createTuple(url, Status.DISCOVERED, metadata);
+ bolt.execute(tuple);
+
+ // Trigger batch execution by sending another tuple (which will also
check the batch)
+ String url2 = "http://example.com/page1-trigger";
+ Tuple triggerTuple = createTuple(url2, Status.DISCOVERED, metadata);
+ bolt.execute(triggerTuple);
+
+ // Verify URL was stored
+ try (Statement stmt = testConnection.createStatement();
+ ResultSet rs = stmt.executeQuery("SELECT * FROM urls WHERE url
= '" + url + "'")) {
+ assertTrue(rs.next(), "URL should be stored in database after
batch execution");
+ assertEquals("DISCOVERED", rs.getString("status"));
+ assertNotNull(rs.getString("metadata"));
+ }
+ bolt.cleanup();
+ }
+
+ @Test
+ void testUpdateURL() throws Exception {
+ String url = "http://example.com/page2";
+ Metadata metadata = new Metadata();
+ metadata.addValue("key1", "value1");
+
+ // First store as DISCOVERED
+ Tuple tuple1 = createTuple(url, Status.DISCOVERED, metadata);
+ bolt.execute(tuple1);
+
+ // Now update to FETCHED
+ Tuple tuple2 = createTuple(url, Status.FETCHED, metadata);
+ bolt.execute(tuple2);
+
+ // Verify URL was updated
+ try (Statement stmt = testConnection.createStatement();
+ ResultSet rs = stmt.executeQuery("SELECT * FROM urls WHERE url
= '" + url + "'")) {
+ assertTrue(rs.next());
+ assertEquals("FETCHED", rs.getString("status"));
+ assertNotNull(rs.getString("metadata"));
+ }
+ bolt.cleanup();
+ }
+
+ private Tuple createTuple(String url, Status status, Metadata metadata) {
+ Tuple tuple = mock(Tuple.class);
+ when(tuple.getStringByField("url")).thenReturn(url);
+ when(tuple.getValueByField("status")).thenReturn(status);
+ when(tuple.getValueByField("metadata")).thenReturn(metadata);
+ return tuple;
+ }
+
+ private Map<String, Object> createTestConfig() {
+ Map<String, Object> conf = new HashMap<>();
+ conf.put("sql.connection", createSqlConnectionConfig());
+ conf.put("sql.status.table", "urls");
+ conf.put("sql.status.max.urls.per.bucket", 10);
+ conf.put("scheduler.class",
"org.apache.stormcrawler.persistence.DefaultScheduler");
+ conf.put("status.updater.cache.spec",
"maximumSize=10000,expireAfterAccess=1h");
+ conf.put("sql.update.batch.size", 1);
+ return conf;
+ }
+
+ private StatusUpdaterBolt createBolt() {
+ StatusUpdaterBolt statusUpdaterBolt = new StatusUpdaterBolt();
+ Map<String, Object> conf = createTestConfig();
+ statusUpdaterBolt.prepare(
+ conf, TestUtil.getMockedTopologyContext(), new
OutputCollector(output));
+ return statusUpdaterBolt;
+ }
+}
diff --git a/pom.xml b/pom.xml
index 5c9ee3f6..2ad34c1d 100644
--- a/pom.xml
+++ b/pom.xml
@@ -88,6 +88,7 @@ under the License.
<jacoco.branchRatio>1.00</jacoco.branchRatio>
<jacoco.lineRatio>1.00</jacoco.lineRatio>
<jacoco.complexityRatio>1.00</jacoco.complexityRatio>
+ <mysql-connector-j>9.3.0</mysql-connector-j>
<skip.format.code>true</skip.format.code>
@@ -673,6 +674,12 @@ under the License.
<version>${commons.codec.version}</version>
</dependency>
+ <dependency>
+ <groupId>com.mysql</groupId>
+ <artifactId>mysql-connector-j</artifactId>
+ <version>${mysql-connector-j}</version>
+ </dependency>
+
</dependencies>
</dependencyManagement>