This is an automated email from the ASF dual-hosted git repository.

tpalfy pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new 72e6acc  NIFI-9327: Added timewindow query to QueryNiFiReportingTask 
and MetricsEventReportingTask
72e6acc is described below

commit 72e6accc1240f7a4e7e7ee1f658615349e9db0e1
Author: Lehel <[email protected]>
AuthorDate: Wed Oct 27 11:12:13 2021 +0200

    NIFI-9327: Added timewindow query to QueryNiFiReportingTask and 
MetricsEventReportingTask
---
 .../reporting/sql/MetricsEventReportingTask.java   |  29 +-
 .../nifi/reporting/sql/QueryNiFiReportingTask.java |  23 +-
 .../apache/nifi/reporting/sql/QueryTimeAware.java  |  51 ++++
 .../nifi/reporting/sql/util/TrackedQueryTime.java  |  35 +++
 .../additionalDetails.html                         |  29 ++
 .../additionalDetails.html                         |  13 +
 .../nifi/record/sink/MockRecordSinkService.java    |   3 +-
 .../sql/TestMetricsEventReportingTask.java         | 304 ++++++++++++++++-----
 .../reporting/sql/TestQueryNiFiReportingTask.java  | 276 ++++++++++++++++---
 .../rules/MockPropertyContextActionHandler.java    |  27 +-
 .../nifi/rules/engine/MockRulesEngineService.java  |  13 +-
 11 files changed, 648 insertions(+), 155 deletions(-)

diff --git 
a/nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/main/java/org/apache/nifi/reporting/sql/MetricsEventReportingTask.java
 
b/nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/main/java/org/apache/nifi/reporting/sql/MetricsEventReportingTask.java
index 9a930ef..c3a0baa 100644
--- 
a/nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/main/java/org/apache/nifi/reporting/sql/MetricsEventReportingTask.java
+++ 
b/nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/main/java/org/apache/nifi/reporting/sql/MetricsEventReportingTask.java
@@ -16,10 +16,12 @@
  */
 package org.apache.nifi.reporting.sql;
 
+import org.apache.nifi.annotation.behavior.Stateful;
 import org.apache.nifi.annotation.documentation.CapabilityDescription;
 import org.apache.nifi.annotation.documentation.Tags;
 import org.apache.nifi.annotation.lifecycle.OnScheduled;
 import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.state.Scope;
 import org.apache.nifi.controller.ConfigurationContext;
 import org.apache.nifi.reporting.AbstractReportingTask;
 import org.apache.nifi.reporting.ReportingContext;
@@ -31,19 +33,23 @@ import org.apache.nifi.rules.engine.RulesEngineService;
 import org.apache.nifi.serialization.record.Record;
 import org.apache.nifi.serialization.record.ResultSetRecordSet;
 
-import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
+import static 
org.apache.nifi.reporting.sql.util.TrackedQueryTime.BULLETIN_END_TIME;
+import static 
org.apache.nifi.reporting.sql.util.TrackedQueryTime.BULLETIN_START_TIME;
+import static 
org.apache.nifi.reporting.sql.util.TrackedQueryTime.PROVENANCE_END_TIME;
+import static 
org.apache.nifi.reporting.sql.util.TrackedQueryTime.PROVENANCE_START_TIME;
 import static 
org.apache.nifi.util.db.JdbcProperties.VARIABLE_REGISTRY_ONLY_DEFAULT_PRECISION;
 import static 
org.apache.nifi.util.db.JdbcProperties.VARIABLE_REGISTRY_ONLY_DEFAULT_SCALE;
 
 @Tags({"reporting", "rules", "action", "action handler", "status", 
"connection", "processor", "jvm", "metrics", "history", "bulletin", "sql"})
 @CapabilityDescription("Triggers rules-driven actions based on metrics values 
")
-public class MetricsEventReportingTask  extends AbstractReportingTask {
+@Stateful(scopes = Scope.LOCAL, description = "Stores the Reporting Task's 
last execution time so that on restart the task knows where it left off.")
+public class MetricsEventReportingTask extends AbstractReportingTask 
implements QueryTimeAware {
 
     private List<PropertyDescriptor> properties;
     private MetricsQueryService metricsQueryService;
@@ -67,7 +73,7 @@ public class MetricsEventReportingTask  extends 
AbstractReportingTask {
     }
 
     @OnScheduled
-    public void setup(final ConfigurationContext context) throws IOException {
+    public void setup(final ConfigurationContext context) {
         actionHandler = 
context.getProperty(QueryMetricsUtil.ACTION_HANDLER).asControllerService(PropertyContextActionHandler.class);
         rulesEngineService = 
context.getProperty(QueryMetricsUtil.RULES_ENGINE).asControllerService(RulesEngineService.class);
         final Integer defaultPrecision = 
context.getProperty(VARIABLE_REGISTRY_ONLY_DEFAULT_PRECISION).evaluateAttributeExpressions().asInteger();
@@ -77,17 +83,20 @@ public class MetricsEventReportingTask  extends 
AbstractReportingTask {
 
     @Override
     public void onTrigger(ReportingContext context) {
+        String sql = 
context.getProperty(QueryMetricsUtil.QUERY).evaluateAttributeExpressions().getValue();
         try {
-            final String query = 
context.getProperty(QueryMetricsUtil.QUERY).evaluateAttributeExpressions().getValue();
-            fireRules(context, actionHandler, rulesEngineService, query);
+            sql = processStartAndEndTimes(context, sql, BULLETIN_START_TIME, 
BULLETIN_END_TIME);
+            sql = processStartAndEndTimes(context, sql, PROVENANCE_START_TIME, 
PROVENANCE_END_TIME);
+
+            fireRules(context, actionHandler, rulesEngineService, sql);
         } catch (Exception e) {
             getLogger().error("Error opening loading rules: {}", new 
Object[]{e.getMessage()}, e);
         }
     }
 
     private void fireRules(ReportingContext context, 
PropertyContextActionHandler actionHandler, RulesEngineService engine, String 
query) throws Exception {
+        getLogger().debug("Executing query: {}", query);
         QueryResult queryResult = metricsQueryService.query(context, query);
-        getLogger().debug("Executing query: {}", new Object[]{ query });
         ResultSetRecordSet recordSet = 
metricsQueryService.getResultSetRecordSet(queryResult);
         Record record;
         try {
@@ -97,16 +106,14 @@ public class MetricsEventReportingTask  extends 
AbstractReportingTask {
                     facts.put(fieldName, record.getValue(fieldName));
                 }
                 List<Action> actions = engine.fireRules(facts);
-                if(actions == null ||  actions.isEmpty()){
+                if (actions == null || actions.isEmpty()) {
                     getLogger().debug("No actions required for provided 
facts.");
                 } else {
-                    actions.forEach(action -> {
-                        actionHandler.execute(context, action,facts);
-                    });
+                    actions.forEach(action -> actionHandler.execute(context, 
action, facts));
                 }
             }
         } finally {
             metricsQueryService.closeQuietly(recordSet);
         }
     }
-}
+}
\ No newline at end of file
diff --git 
a/nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/main/java/org/apache/nifi/reporting/sql/QueryNiFiReportingTask.java
 
b/nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/main/java/org/apache/nifi/reporting/sql/QueryNiFiReportingTask.java
index 367db30..61edaa2 100644
--- 
a/nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/main/java/org/apache/nifi/reporting/sql/QueryNiFiReportingTask.java
+++ 
b/nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/main/java/org/apache/nifi/reporting/sql/QueryNiFiReportingTask.java
@@ -16,10 +16,12 @@
  */
 package org.apache.nifi.reporting.sql;
 
+import org.apache.nifi.annotation.behavior.Stateful;
 import org.apache.nifi.annotation.documentation.CapabilityDescription;
 import org.apache.nifi.annotation.documentation.Tags;
 import org.apache.nifi.annotation.lifecycle.OnScheduled;
 import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.state.Scope;
 import org.apache.nifi.controller.ConfigurationContext;
 import org.apache.nifi.record.sink.RecordSinkService;
 import org.apache.nifi.reporting.AbstractReportingTask;
@@ -29,7 +31,6 @@ import org.apache.nifi.reporting.sql.util.QueryMetricsUtil;
 import org.apache.nifi.serialization.record.ResultSetRecordSet;
 import org.apache.nifi.util.StopWatch;
 
-import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
@@ -38,6 +39,10 @@ import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.TimeUnit;
 
+import static 
org.apache.nifi.reporting.sql.util.TrackedQueryTime.BULLETIN_END_TIME;
+import static 
org.apache.nifi.reporting.sql.util.TrackedQueryTime.BULLETIN_START_TIME;
+import static 
org.apache.nifi.reporting.sql.util.TrackedQueryTime.PROVENANCE_END_TIME;
+import static 
org.apache.nifi.reporting.sql.util.TrackedQueryTime.PROVENANCE_START_TIME;
 import static 
org.apache.nifi.util.db.JdbcProperties.VARIABLE_REGISTRY_ONLY_DEFAULT_PRECISION;
 import static 
org.apache.nifi.util.db.JdbcProperties.VARIABLE_REGISTRY_ONLY_DEFAULT_SCALE;
 
@@ -46,7 +51,8 @@ import static 
org.apache.nifi.util.db.JdbcProperties.VARIABLE_REGISTRY_ONLY_DEFA
         + "BULLETINS, PROCESS_GROUP_STATUS, JVM_METRICS, 
CONNECTION_STATUS_PREDICTIONS, or PROVENANCE tables, and can use any functions 
or capabilities provided by Apache Calcite. Note that the "
         + "CONNECTION_STATUS_PREDICTIONS table is not available for querying 
if analytics are not enabled (see the nifi.analytics.predict.enabled property 
in nifi.properties). Attempting a "
         + "query on the table when the capability is disabled will cause an 
error.")
-public class QueryNiFiReportingTask extends AbstractReportingTask {
+@Stateful(scopes = Scope.LOCAL, description = "Stores the Reporting Task's 
last execution time so that on restart the task knows where it left off.")
+public class QueryNiFiReportingTask extends AbstractReportingTask implements 
QueryTimeAware {
 
     private List<PropertyDescriptor> properties;
 
@@ -71,7 +77,7 @@ public class QueryNiFiReportingTask extends 
AbstractReportingTask {
     }
 
     @OnScheduled
-    public void setup(final ConfigurationContext context) throws IOException {
+    public void setup(final ConfigurationContext context) {
         recordSinkService = 
context.getProperty(QueryMetricsUtil.RECORD_SINK).asControllerService(RecordSinkService.class);
         recordSinkService.reset();
         final Integer defaultPrecision = 
context.getProperty(VARIABLE_REGISTRY_ONLY_DEFAULT_PRECISION).evaluateAttributeExpressions().asInteger();
@@ -82,13 +88,16 @@ public class QueryNiFiReportingTask extends 
AbstractReportingTask {
     @Override
     public void onTrigger(ReportingContext context) {
         final StopWatch stopWatch = new StopWatch(true);
+        String sql = context.getProperty(QueryMetricsUtil.QUERY).getValue();
         try {
-            final String sql = 
context.getProperty(QueryMetricsUtil.QUERY).evaluateAttributeExpressions().getValue();
+            sql = processStartAndEndTimes(context, sql, BULLETIN_START_TIME, 
BULLETIN_END_TIME);
+            sql = processStartAndEndTimes(context, sql, PROVENANCE_START_TIME, 
PROVENANCE_END_TIME);
+
+            getLogger().debug("Executing query: {}", sql);
             final QueryResult queryResult = metricsQueryService.query(context, 
sql);
             final ResultSetRecordSet recordSet;
 
             try {
-                getLogger().debug("Executing query: {}", new Object[]{sql});
                 recordSet = 
metricsQueryService.getResultSetRecordSet(queryResult);
             } catch (final Exception e) {
                 getLogger().error("Error creating record set from query 
results due to {}", new Object[]{e.getMessage()}, e);
@@ -110,10 +119,10 @@ public class QueryNiFiReportingTask extends 
AbstractReportingTask {
                 metricsQueryService.closeQuietly(queryResult);
             }
             final long elapsedMillis = 
stopWatch.getElapsed(TimeUnit.MILLISECONDS);
-            getLogger().debug("Successfully queried and sent in {} millis", 
new Object[]{elapsedMillis});
+            getLogger().debug("Successfully queried and sent in {} millis", 
elapsedMillis);
         } catch (Exception e) {
             getLogger().error("Error processing the query due to {}", new 
Object[]{e.getMessage()}, e);
         }
     }
-
 }
+
diff --git 
a/nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/main/java/org/apache/nifi/reporting/sql/QueryTimeAware.java
 
b/nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/main/java/org/apache/nifi/reporting/sql/QueryTimeAware.java
new file mode 100644
index 0000000..7d2f0b2
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/main/java/org/apache/nifi/reporting/sql/QueryTimeAware.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.reporting.sql;
+
+import org.apache.nifi.components.state.Scope;
+import org.apache.nifi.components.state.StateManager;
+import org.apache.nifi.reporting.ReportingContext;
+import org.apache.nifi.reporting.sql.util.TrackedQueryTime;
+
+import java.io.IOException;
+import java.time.Instant;
+import java.util.HashMap;
+import java.util.Map;
+
+public interface QueryTimeAware {
+
+    default String processStartAndEndTimes(ReportingContext context, String 
sql, TrackedQueryTime queryStartTime, TrackedQueryTime queryEndTime) throws 
IOException {
+        StateManager stateManager = context.getStateManager();
+        final Map<String, String> stateMap = new 
HashMap<>(stateManager.getState(Scope.LOCAL).toMap());
+
+        if (sql.contains(queryStartTime.getSqlPlaceholder()) && 
sql.contains(queryEndTime.getSqlPlaceholder())) {
+            final long startTime = stateMap.get(queryStartTime.name()) == null 
? 0 : Long.parseLong(stateMap.get(queryStartTime.name()));
+            final long currentTime = getCurrentTime();
+
+            sql = sql.replace(queryStartTime.getSqlPlaceholder(), 
String.valueOf(startTime));
+            sql = sql.replace(queryEndTime.getSqlPlaceholder(), 
String.valueOf(currentTime));
+
+            stateMap.put(queryStartTime.name(), String.valueOf(currentTime));
+            stateManager.setState(stateMap, Scope.LOCAL);
+        }
+        return sql;
+    }
+
+    default long getCurrentTime() {
+        return Instant.now().toEpochMilli();
+    }
+}
diff --git 
a/nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/main/java/org/apache/nifi/reporting/sql/util/TrackedQueryTime.java
 
b/nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/main/java/org/apache/nifi/reporting/sql/util/TrackedQueryTime.java
new file mode 100644
index 0000000..cf46a29
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/main/java/org/apache/nifi/reporting/sql/util/TrackedQueryTime.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.reporting.sql.util;
+
+public enum TrackedQueryTime {
+
+    BULLETIN_START_TIME("$bulletinStartTime"),
+    BULLETIN_END_TIME("$bulletinEndTime"),
+    PROVENANCE_START_TIME("$provenanceStartTime"),
+    PROVENANCE_END_TIME("$provenanceEndTime");
+
+    private final String sqlPlaceholder;
+
+    TrackedQueryTime(final String sqlPlaceholder) {
+        this.sqlPlaceholder = sqlPlaceholder;
+    }
+
+    public String getSqlPlaceholder() {
+        return sqlPlaceholder;
+    }
+}
diff --git 
a/nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/main/resources/docs/org.apache.nifi.reporting.sql.MetricsEventReportingTask/additionalDetails.html
 
b/nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/main/resources/docs/org.apache.nifi.reporting.sql.MetricsEventReportingTask/additionalDetails.html
index 2392aab..f29975f 100644
--- 
a/nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/main/resources/docs/org.apache.nifi.reporting.sql.MetricsEventReportingTask/additionalDetails.html
+++ 
b/nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/main/resources/docs/org.apache.nifi.reporting.sql.MetricsEventReportingTask/additionalDetails.html
@@ -29,6 +29,35 @@
     A distinct ActionHandler can be used to service all events or an 
ActionHandlerLookup can be used for dynamic handler lookup. NOTE: Optimally 
action handler should be associated with the expected action types
     returned from the rules engine.
 </p>
+<p>
+    The reporting task can uniquely handle items from the bulletin and 
provenance repositories. This means that an item will only be processed once 
when the query is set to unique.
+    The query can be set to unique by defining a time window with special sql 
placeholders ($bulletinStartTime, $bulletinEndTime, $provenanceStartTime, 
$provenanceEndTime)
+    that the reporting task will evaluate runtime. See the SQL Query Examples 
section.
+</p>
+<br/><br/>
+<h2>SQL Query Examples</h2>
+<p>
+    <b>Example:</b> Select all fields from the <code>CONNECTION_STATUS</code> 
table:<br/>
+<pre>SELECT * FROM CONNECTION_STATUS</pre>
+</p>
+<br/>
+<p>
+    <b>Example:</b> Select connection IDs where time-to-backpressure (based on 
queue count) is less than 5 minutes:<br/>
+<pre>SELECT connectionId FROM CONNECTION_STATUS_PREDICTIONS WHERE 
predictedTimeToCountBackpressureMillis < 300000</pre>
+</p>
+<br/>
+<p>
+    <b>Example:</b> Get the unique bulletin categories associated with 
errors:<br/>
+<pre>SELECT DISTINCT(bulletinCategory) FROM BULLETINS WHERE bulletinLevel = 
"ERROR"</pre>
+</p>
+<p>
+    <b>Example:</b> Select all fields from the <code>BULLETINS</code> table 
with time window:<br/>
+<pre>SELECT * from BULLETINS WHERE bulletinTimestamp > $bulletinStartTime AND 
bulletinTimestamp <= $bulletinEndTime</pre>
+</p>
+<p>
+    <b>Example:</b> Select all fields from the <code>PROVENANCE</code> table 
with time window:<br/>
+<pre>SELECT * from PROVENANCE where timestampMillis > $provenanceStartTime and 
timestampMillis <= $provenanceEndTime</pre>
+</p>
 <br/>
 </body>
 </html>
\ No newline at end of file
diff --git 
a/nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/main/resources/docs/org.apache.nifi.reporting.sql.QueryNiFiReportingTask/additionalDetails.html
 
b/nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/main/resources/docs/org.apache.nifi.reporting.sql.QueryNiFiReportingTask/additionalDetails.html
index 33baa0a..346834c 100644
--- 
a/nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/main/resources/docs/org.apache.nifi.reporting.sql.QueryNiFiReportingTask/additionalDetails.html
+++ 
b/nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/main/resources/docs/org.apache.nifi.reporting.sql.QueryNiFiReportingTask/additionalDetails.html
@@ -35,6 +35,11 @@
     SiteToSiteReportingRecordSink (for sending via the Site-to-Site protocol) 
or DatabaseRecordSink (for sending the
     query result rows to an relational database).
 </p>
+<p>
+    The reporting task can uniquely handle items from the bulletin and 
provenance repositories. This means that an item will only be processed once 
when the query is set to unique.
+    The query can be set to unique by defining a time window with special sql 
placeholders ($bulletinStartTime, $bulletinEndTime, $provenanceStartTime, 
$provenanceEndTime)
+    that the reporting task will evaluate runtime. See the SQL Query Examples 
section.
+</p>
 <br/>
 <h2>Table Definitions</h2>
 <p>
@@ -221,6 +226,14 @@
     <b>Example:</b> Get the unique bulletin categories associated with 
errors:<br/>
 <pre>SELECT DISTINCT(bulletinCategory) FROM BULLETINS WHERE bulletinLevel = 
"ERROR"</pre>
 </p>
+<p>
+    <b>Example:</b> Select all fields from the <code>BULLETINS</code> table 
with time window:<br/>
+<pre>SELECT * from BULLETINS WHERE bulletinTimestamp > $bulletinStartTime AND 
bulletinTimestamp <= $bulletinEndTime</pre>
+</p>
+<p>
+    <b>Example:</b> Select all fields from the <code>PROVENANCE</code> table 
with time window:<br/>
+<pre>SELECT * from PROVENANCE where timestampMillis > $provenanceStartTime and 
timestampMillis <= $provenanceEndTime</pre>
+</p>
 <br/>
 </body>
 </html>
\ No newline at end of file
diff --git 
a/nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/test/java/org/apache/nifi/record/sink/MockRecordSinkService.java
 
b/nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/test/java/org/apache/nifi/record/sink/MockRecordSinkService.java
index de2c799..a0f0155 100644
--- 
a/nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/test/java/org/apache/nifi/record/sink/MockRecordSinkService.java
+++ 
b/nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/test/java/org/apache/nifi/record/sink/MockRecordSinkService.java
@@ -33,10 +33,11 @@ import java.util.Map;
 
 public class MockRecordSinkService extends AbstractConfigurableComponent 
implements RecordSinkService {
 
-    private List<Map<String, Object>> rows = new ArrayList<>();;
+    private List<Map<String, Object>> rows = new ArrayList<>();
 
     @Override
     public WriteResult sendData(RecordSet recordSet, Map<String,String> 
attributes, boolean sendZeroResults) throws IOException {
+        rows = new ArrayList<>();
         int numRecordsWritten = 0;
         RecordSchema recordSchema = recordSet.getSchema();
         Record record;
diff --git 
a/nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/test/java/org/apache/nifi/reporting/sql/TestMetricsEventReportingTask.java
 
b/nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/test/java/org/apache/nifi/reporting/sql/TestMetricsEventReportingTask.java
index 6934005..196871b 100644
--- 
a/nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/test/java/org/apache/nifi/reporting/sql/TestMetricsEventReportingTask.java
+++ 
b/nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/test/java/org/apache/nifi/reporting/sql/TestMetricsEventReportingTask.java
@@ -16,10 +16,10 @@
  */
 package org.apache.nifi.reporting.sql;
 
-import com.google.common.collect.Lists;
 import org.apache.nifi.attribute.expression.language.StandardPropertyValue;
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.components.PropertyValue;
+import org.apache.nifi.components.state.Scope;
 import org.apache.nifi.context.PropertyContext;
 import org.apache.nifi.controller.ConfigurationContext;
 import org.apache.nifi.controller.status.ConnectionStatus;
@@ -27,21 +27,30 @@ import org.apache.nifi.controller.status.ProcessGroupStatus;
 import org.apache.nifi.controller.status.ProcessorStatus;
 import org.apache.nifi.controller.status.analytics.ConnectionStatusPredictions;
 import org.apache.nifi.logging.ComponentLog;
-import org.apache.nifi.reporting.BulletinRepository;
+import org.apache.nifi.processor.Processor;
+import org.apache.nifi.provenance.MockProvenanceRepository;
+import org.apache.nifi.provenance.ProvenanceEventRecord;
+import org.apache.nifi.provenance.ProvenanceEventType;
+import org.apache.nifi.reporting.Bulletin;
+import org.apache.nifi.reporting.BulletinFactory;
+import org.apache.nifi.reporting.BulletinQuery;
+import org.apache.nifi.reporting.ComponentType;
 import org.apache.nifi.reporting.EventAccess;
 import org.apache.nifi.reporting.InitializationException;
 import org.apache.nifi.reporting.ReportingContext;
 import org.apache.nifi.reporting.ReportingInitializationContext;
-import org.apache.nifi.reporting.Severity;
 import org.apache.nifi.reporting.sql.util.QueryMetricsUtil;
-import org.apache.nifi.rules.Action;
+import org.apache.nifi.reporting.sql.util.TrackedQueryTime;
 import org.apache.nifi.rules.MockPropertyContextActionHandler;
 import org.apache.nifi.rules.PropertyContextActionHandler;
 import org.apache.nifi.rules.engine.MockRulesEngineService;
 import org.apache.nifi.rules.engine.RulesEngineService;
 import org.apache.nifi.state.MockStateManager;
+import org.apache.nifi.util.MockBulletinRepository;
+import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.MockProcessSession;
 import org.apache.nifi.util.MockPropertyValue;
-import org.apache.nifi.util.Tuple;
+import org.apache.nifi.util.SharedSessionState;
 import org.apache.nifi.util.db.JdbcProperties;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
@@ -49,27 +58,34 @@ import org.mockito.Mockito;
 import org.mockito.stubbing.Answer;
 
 import java.io.IOException;
+import java.time.Instant;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.UUID;
+import java.util.concurrent.atomic.AtomicLong;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertFalse;
-import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.mock;
+
+class TestMetricsEventReportingTask {
 
-public class TestMetricsEventReportingTask {
     private ReportingContext context;
     private MockMetricsEventReportingTask reportingTask;
     private MockPropertyContextActionHandler actionHandler;
-    private MockRulesEngineService rulesEngineService;
     private ProcessGroupStatus status;
+    private MockQueryBulletinRepository mockBulletinRepository;
+    private MockProvenanceRepository mockProvenanceRepository;
+    private AtomicLong currentTime;
+    private MockStateManager mockStateManager;
 
     @BeforeEach
     public void setup() {
+        currentTime = new AtomicLong();
         status = new ProcessGroupStatus();
         actionHandler = new MockPropertyContextActionHandler();
         status.setId("1234");
@@ -112,63 +128,153 @@ public class TestMetricsEventReportingTask {
         rootConnectionStatuses.add(root1ConnectionStatus);
         rootConnectionStatuses.add(root2ConnectionStatus);
         status.setConnectionStatus(rootConnectionStatuses);
-
-        // create a group status with processing time
-        ProcessGroupStatus groupStatus1 = new ProcessGroupStatus();
-        groupStatus1.setProcessorStatus(processorStatuses);
-        groupStatus1.setBytesRead(1234L);
-
-        // Create a nested group status with a connection
-        ProcessGroupStatus groupStatus2 = new ProcessGroupStatus();
-        groupStatus2.setProcessorStatus(processorStatuses);
-        groupStatus2.setBytesRead(12345L);
-        ConnectionStatus nestedConnectionStatus = new ConnectionStatus();
-        nestedConnectionStatus.setId("nested");
-        nestedConnectionStatus.setQueuedCount(1001);
-        Collection<ConnectionStatus> nestedConnectionStatuses = new 
ArrayList<>();
-        nestedConnectionStatuses.add(nestedConnectionStatus);
-        groupStatus2.setConnectionStatus(nestedConnectionStatuses);
-        Collection<ProcessGroupStatus> nestedGroupStatuses = new ArrayList<>();
-        nestedGroupStatuses.add(groupStatus2);
-        groupStatus1.setProcessGroupStatus(nestedGroupStatuses);
-
-        ProcessGroupStatus groupStatus3 = new ProcessGroupStatus();
-        groupStatus3.setBytesRead(1L);
-        ConnectionStatus nestedConnectionStatus2 = new ConnectionStatus();
-        nestedConnectionStatus2.setId("nested2");
-        nestedConnectionStatus2.setQueuedCount(3);
-        Collection<ConnectionStatus> nestedConnectionStatuses2 = new 
ArrayList<>();
-        nestedConnectionStatuses2.add(nestedConnectionStatus2);
-        groupStatus3.setConnectionStatus(nestedConnectionStatuses2);
-        Collection<ProcessGroupStatus> nestedGroupStatuses2 = new 
ArrayList<>();
-        nestedGroupStatuses2.add(groupStatus3);
-
-        Collection<ProcessGroupStatus> groupStatuses = new ArrayList<>();
-        groupStatuses.add(groupStatus1);
-        groupStatuses.add(groupStatus3);
-        status.setProcessGroupStatus(groupStatuses);
     }
 
     @Test
-    public void testConnectionStatusTable() throws IOException, 
InitializationException {
+    void testConnectionStatusTable() throws InitializationException {
         final Map<PropertyDescriptor, String> properties = new HashMap<>();
         properties.put(QueryMetricsUtil.QUERY, "select connectionId, 
predictedQueuedCount, predictedTimeToBytesBackpressureMillis from 
CONNECTION_STATUS_PREDICTIONS");
         reportingTask = initTask(properties);
         reportingTask.onTrigger(context);
-        List<Map<String,Object>> metricsList = actionHandler.getRows();
-        List<Tuple<String, Action>> defaultLogActions = 
actionHandler.getDefaultActionsByType("LOG");
-        List<Tuple<String, Action>> defaultAlertActions = 
actionHandler.getDefaultActionsByType("ALERT");
         List<PropertyContext> propertyContexts = 
actionHandler.getPropertyContexts();
-        assertFalse(metricsList.isEmpty());
-        assertEquals(2,defaultLogActions.size());
-        assertEquals(2,defaultAlertActions.size());
-        assertEquals(4,propertyContexts.size());
+        assertEquals(2, actionHandler.getRows().size());
+        assertEquals(2, propertyContexts.size());
+    }
+
+    @Test
+    void testUniqueBulletinQueryIsInTimeWindow() throws 
InitializationException {
+        final Map<PropertyDescriptor, String> properties = new HashMap<>();
+        properties.put(QueryMetricsUtil.QUERY, "select bulletinCategory from 
BULLETINS where bulletinTimestamp > $bulletinStartTime and bulletinTimestamp <= 
$bulletinEndTime");
+        reportingTask = initTask(properties);
+        currentTime.set(Instant.now().toEpochMilli());
+        reportingTask.onTrigger(context);
+        assertEquals(1, actionHandler.getRows().size());
+
+        actionHandler.reset();
+        final Bulletin bulletin = 
BulletinFactory.createBulletin(ComponentType.CONTROLLER_SERVICE.name().toLowerCase(),
 "WARN", "test bulletin 2", "testFlowFileUuid");
+        mockBulletinRepository.addBulletin(bulletin);
+        currentTime.set(bulletin.getTimestamp().getTime());
+        reportingTask.onTrigger(context);
+        assertEquals(1, actionHandler.getRows().size());
+    }
+
+    @Test
+    void testUniqueBulletinQueryIsOutOfTimeWindow() throws 
InitializationException {
+        final Map<PropertyDescriptor, String> properties = new HashMap<>();
+        properties.put(QueryMetricsUtil.QUERY, "select bulletinCategory from 
BULLETINS where bulletinTimestamp > $bulletinStartTime and bulletinTimestamp <= 
$bulletinEndTime");
+        reportingTask = initTask(properties);
+        currentTime.set(Instant.now().toEpochMilli());
+        reportingTask.onTrigger(context);
+        assertEquals(1, actionHandler.getRows().size());
+
+        actionHandler.reset();
+        final Bulletin bulletin = 
BulletinFactory.createBulletin(ComponentType.CONTROLLER_SERVICE.name().toLowerCase(),
 "WARN", "test bulletin 2", "testFlowFileUuid");
+        mockBulletinRepository.addBulletin(bulletin);
+        currentTime.set(bulletin.getTimestamp().getTime() - 1);
+        reportingTask.onTrigger(context);
+        assertEquals(0, actionHandler.getRows().size());
+    }
+
+    @Test
+    void testUniqueProvenanceQueryIsInTimeWindow() throws 
InitializationException {
+        final Map<PropertyDescriptor, String> properties = new HashMap<>();
+        properties.put(QueryMetricsUtil.QUERY, "select componentId from 
PROVENANCE where timestampMillis > $provenanceStartTime and timestampMillis <= 
$provenanceEndTime");
+        reportingTask = initTask(properties);
+        currentTime.set(Instant.now().toEpochMilli());
+        reportingTask.onTrigger(context);
+        assertEquals(1, actionHandler.getRows().size());
+
+        actionHandler.reset();
+
+        MockFlowFile mockFlowFile = new MockFlowFile(2L);
+        ProvenanceEventRecord prov2 = mockProvenanceRepository.eventBuilder()
+                .setEventType(ProvenanceEventType.CREATE)
+                .fromFlowFile(mockFlowFile)
+                .setComponentId("2")
+                .setComponentType("ReportingTask")
+                .setFlowFileUUID("I am FlowFile 2")
+                .setEventTime(Instant.now().toEpochMilli())
+                .setEventDuration(100)
+                .setTransitUri("test://")
+                .setSourceSystemFlowFileIdentifier("I am FlowFile 2")
+                .setAlternateIdentifierUri("remote://test")
+                .build();
+        mockProvenanceRepository.registerEvent(prov2);
+
+        currentTime.set(prov2.getEventTime());
+        reportingTask.onTrigger(context);
+
+        assertEquals(1, actionHandler.getRows().size());
+    }
+
+    @Test
+    void testUniqueProvenanceQueryIsOutOfTimeWindow() throws 
InitializationException {
+        final Map<PropertyDescriptor, String> properties = new HashMap<>();
+        properties.put(QueryMetricsUtil.QUERY, "select componentId from 
PROVENANCE where timestampMillis > $provenanceStartTime and timestampMillis <= 
$provenanceEndTime");
+        reportingTask = initTask(properties);
+        currentTime.set(Instant.now().toEpochMilli());
+        reportingTask.onTrigger(context);
+        assertEquals(1, actionHandler.getRows().size());
+
+        actionHandler.reset();
+
+        MockFlowFile mockFlowFile = new MockFlowFile(2L);
+        ProvenanceEventRecord prov2 = mockProvenanceRepository.eventBuilder()
+                .setEventType(ProvenanceEventType.CREATE)
+                .fromFlowFile(mockFlowFile)
+                .setComponentId("2")
+                .setComponentType("ReportingTask")
+                .setFlowFileUUID("I am FlowFile 2")
+                .setEventTime(Instant.now().toEpochMilli())
+                .setEventDuration(100)
+                .setTransitUri("test://")
+                .setSourceSystemFlowFileIdentifier("I am FlowFile 2")
+                .setAlternateIdentifierUri("remote://test")
+                .build();
+        mockProvenanceRepository.registerEvent(prov2);
+
+        currentTime.set(prov2.getEventTime() - 1);
+        reportingTask.onTrigger(context);
+
+        assertEquals(0, actionHandler.getRows().size());
     }
 
-    private MockMetricsEventReportingTask initTask(Map<PropertyDescriptor, 
String> customProperties) throws InitializationException, IOException {
+    @Test
+    void testTimeWindowFromStateMap() throws IOException, 
InitializationException {
+        final Map<PropertyDescriptor, String> properties = new HashMap<>();
+        properties.put(QueryMetricsUtil.RECORD_SINK, "mock-record-sink");
+        properties.put(QueryMetricsUtil.QUERY, "select * from BULLETINS, 
PROVENANCE where " +
+                "bulletinTimestamp > $bulletinStartTime and bulletinTimestamp 
<= $bulletinEndTime " +
+                "and timestampMillis > $provenanceStartTime and 
timestampMillis <= $provenanceEndTime");
+        reportingTask = initTask(properties);
+
+        long testBulletinStartTime = 1609538145L;
+        long testProvenanceStartTime = 1641074145L;
+        final Map<String, String> stateMap = new HashMap<>();
+        stateMap.put(TrackedQueryTime.BULLETIN_START_TIME.name(), 
String.valueOf(testBulletinStartTime));
+        stateMap.put(TrackedQueryTime.PROVENANCE_START_TIME.name(), 
String.valueOf(testProvenanceStartTime));
+        mockStateManager.setState(stateMap, Scope.LOCAL);
+
+        final long bulletinStartTime = 
Long.parseLong(context.getStateManager().getState(Scope.LOCAL).get(TrackedQueryTime.BULLETIN_START_TIME.name()));
+        final long provenanceStartTime = 
Long.parseLong(context.getStateManager().getState(Scope.LOCAL).get(TrackedQueryTime.PROVENANCE_START_TIME.name()));
 
+        assertEquals(testBulletinStartTime, bulletinStartTime);
+        assertEquals(testProvenanceStartTime, provenanceStartTime);
+
+        final long currentTime = Instant.now().toEpochMilli();
+        this.currentTime.set(currentTime);
+
+        reportingTask.onTrigger(context);
+
+        final long updatedBulletinStartTime = 
Long.parseLong(context.getStateManager().getState(Scope.LOCAL).get(TrackedQueryTime.BULLETIN_START_TIME.name()));
+        final long updatedProvenanceStartTime = 
Long.parseLong(context.getStateManager().getState(Scope.LOCAL).get(TrackedQueryTime.PROVENANCE_START_TIME.name()));
+
+        assertEquals(currentTime, updatedBulletinStartTime);
+        assertEquals(currentTime, updatedProvenanceStartTime);
+    }
+
+    private MockMetricsEventReportingTask initTask(Map<PropertyDescriptor, 
String> customProperties) throws InitializationException {
         final ComponentLog logger = Mockito.mock(ComponentLog.class);
-        final BulletinRepository bulletinRepository = 
Mockito.mock(BulletinRepository.class);
         reportingTask = new MockMetricsEventReportingTask();
         final ReportingInitializationContext initContext = 
Mockito.mock(ReportingInitializationContext.class);
         
Mockito.when(initContext.getIdentifier()).thenReturn(UUID.randomUUID().toString());
@@ -183,9 +289,8 @@ public class TestMetricsEventReportingTask {
 
         context = Mockito.mock(ReportingContext.class);
         Mockito.when(context.isAnalyticsEnabled()).thenReturn(true);
-        Mockito.when(context.getStateManager()).thenReturn(new 
MockStateManager(reportingTask));
-        
Mockito.when(context.getBulletinRepository()).thenReturn(bulletinRepository);
-        Mockito.when(context.createBulletin(anyString(),any(Severity.class), 
anyString())).thenReturn(null);
+        mockStateManager = new MockStateManager(reportingTask);
+        Mockito.when(context.getStateManager()).thenReturn(mockStateManager);
 
         Mockito.doAnswer((Answer<PropertyValue>) invocation -> {
             final PropertyDescriptor descriptor = invocation.getArgument(0, 
PropertyDescriptor.class);
@@ -200,13 +305,8 @@ public class TestMetricsEventReportingTask {
         actionHandler = new MockPropertyContextActionHandler();
         
Mockito.when(pValue.asControllerService(PropertyContextActionHandler.class)).thenReturn(actionHandler);
 
-        Action action1 = new Action();
-        action1.setType("LOG");
-        Action action2 = new Action();
-        action2.setType("ALERT");
-
         final PropertyValue resValue = 
Mockito.mock(StandardPropertyValue.class);
-        rulesEngineService = new 
MockRulesEngineService(Lists.newArrayList(action1,action2));
+        MockRulesEngineService rulesEngineService = new 
MockRulesEngineService();
         
Mockito.when(resValue.asControllerService(RulesEngineService.class)).thenReturn(rulesEngineService);
 
         ConfigurationContext configContext = 
Mockito.mock(ConfigurationContext.class);
@@ -216,10 +316,82 @@ public class TestMetricsEventReportingTask {
         
Mockito.when(configContext.getProperty(JdbcProperties.VARIABLE_REGISTRY_ONLY_DEFAULT_SCALE)).thenReturn(new
 MockPropertyValue("0"));
         reportingTask.setup(configContext);
 
+        setupMockProvenanceRepository(eventAccess);
+        setupMockBulletinRepository();
+
         return reportingTask;
     }
 
-    private static final class MockMetricsEventReportingTask extends 
MetricsEventReportingTask {
+    private final class MockMetricsEventReportingTask extends 
MetricsEventReportingTask {
+        @Override
+        public long getCurrentTime() {
+            return currentTime.get();
+        }
+    }
+
+    private void setupMockBulletinRepository() {
+        mockBulletinRepository = new MockQueryBulletinRepository();
+        
mockBulletinRepository.addBulletin(BulletinFactory.createBulletin(ComponentType.PROCESSOR.name().toLowerCase(),
 "WARN", "test bulletin 1", "testFlowFileUuid"));
+
+        
Mockito.when(context.getBulletinRepository()).thenReturn(mockBulletinRepository);
+    }
+
+    private void setupMockProvenanceRepository(final EventAccess eventAccess) {
+
+        mockProvenanceRepository = new MockProvenanceRepository();
+        long currentTimeMillis = System.currentTimeMillis();
+        Map<String, String> previousAttributes = new HashMap<>();
+        previousAttributes.put("mime.type", "application/json");
+        previousAttributes.put("test.value", "A");
+        Map<String, String> updatedAttributes = new 
HashMap<>(previousAttributes);
+        updatedAttributes.put("test.value", "B");
+
+        // Generate provenance events and put them in a repository
+        Processor processor = mock(Processor.class);
+        SharedSessionState sharedState = new SharedSessionState(processor, new 
AtomicLong(0));
+        MockProcessSession processSession = new 
MockProcessSession(sharedState, processor);
+        MockFlowFile mockFlowFile = processSession.createFlowFile("Test 
content".getBytes());
+
+        ProvenanceEventRecord prov1 = mockProvenanceRepository.eventBuilder()
+                .setEventType(ProvenanceEventType.CREATE)
+                .fromFlowFile(mockFlowFile)
+                .setComponentId("1")
+                .setComponentType("ReportingTask")
+                .setFlowFileUUID("I am FlowFile 1")
+                .setEventTime(currentTimeMillis)
+                .setEventDuration(100)
+                .setTransitUri("test://")
+                .setSourceSystemFlowFileIdentifier("I am FlowFile 1")
+                .setAlternateIdentifierUri("remote://test")
+                .setAttributes(previousAttributes, updatedAttributes)
+                .build();
 
+        mockProvenanceRepository.registerEvent(prov1);
+
+        
Mockito.when(eventAccess.getProvenanceRepository()).thenReturn(mockProvenanceRepository);
+    }
+
+    private static class MockQueryBulletinRepository extends 
MockBulletinRepository {
+        Map<String, List<Bulletin>> bulletins = new HashMap<>();
+
+        @Override
+        public void addBulletin(Bulletin bulletin) {
+            bulletins.computeIfAbsent(bulletin.getCategory(), key -> new 
ArrayList<>())
+                    .add(bulletin);
+        }
+
+        @Override
+        public List<Bulletin> findBulletins(BulletinQuery bulletinQuery) {
+            return new ArrayList<>(
+                    
Optional.ofNullable(bulletins.get(bulletinQuery.getSourceType().name().toLowerCase()))
+                            .orElse(Collections.emptyList())
+            );
+        }
+
+        @Override
+        public List<Bulletin> findBulletinsForController() {
+            return Optional.ofNullable(bulletins.get("controller"))
+                    .orElse(Collections.emptyList());
+        }
     }
-}
+}
\ No newline at end of file
diff --git 
a/nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/test/java/org/apache/nifi/reporting/sql/TestQueryNiFiReportingTask.java
 
b/nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/test/java/org/apache/nifi/reporting/sql/TestQueryNiFiReportingTask.java
index 87d42fd..a031b1a 100644
--- 
a/nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/test/java/org/apache/nifi/reporting/sql/TestQueryNiFiReportingTask.java
+++ 
b/nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/test/java/org/apache/nifi/reporting/sql/TestQueryNiFiReportingTask.java
@@ -20,6 +20,7 @@ package org.apache.nifi.reporting.sql;
 import org.apache.nifi.attribute.expression.language.StandardPropertyValue;
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.components.PropertyValue;
+import org.apache.nifi.components.state.Scope;
 import org.apache.nifi.controller.ConfigurationContext;
 import org.apache.nifi.controller.status.ConnectionStatus;
 import org.apache.nifi.controller.status.ProcessGroupStatus;
@@ -34,12 +35,14 @@ import org.apache.nifi.record.sink.RecordSinkService;
 import org.apache.nifi.reporting.Bulletin;
 import org.apache.nifi.reporting.BulletinFactory;
 import org.apache.nifi.reporting.BulletinQuery;
+import org.apache.nifi.reporting.BulletinRepository;
 import org.apache.nifi.reporting.ComponentType;
 import org.apache.nifi.reporting.EventAccess;
 import org.apache.nifi.reporting.InitializationException;
 import org.apache.nifi.reporting.ReportingContext;
 import org.apache.nifi.reporting.ReportingInitializationContext;
 import org.apache.nifi.reporting.sql.util.QueryMetricsUtil;
+import org.apache.nifi.reporting.sql.util.TrackedQueryTime;
 import org.apache.nifi.reporting.util.metrics.MetricNames;
 import org.apache.nifi.state.MockStateManager;
 import org.apache.nifi.util.MockBulletinRepository;
@@ -54,12 +57,14 @@ import org.mockito.Mockito;
 import org.mockito.stubbing.Answer;
 
 import java.io.IOException;
+import java.time.Instant;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.UUID;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.stream.Collectors;
@@ -67,21 +72,24 @@ import java.util.stream.Stream;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
-
 import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.mockito.Mockito.mock;
 
-public class TestQueryNiFiReportingTask {
+class TestQueryNiFiReportingTask {
 
     private ReportingContext context;
     private MockQueryNiFiReportingTask reportingTask;
     private MockRecordSinkService mockRecordSinkService;
     private ProcessGroupStatus status;
+    private BulletinRepository mockBulletinRepository;
+    private MockProvenanceRepository mockProvenanceRepository;
+    private AtomicLong currentTime;
+    private MockStateManager mockStateManager;
 
     @BeforeEach
     public void setup() {
-        mockRecordSinkService = new MockRecordSinkService();
+        currentTime = new AtomicLong();
         status = new ProcessGroupStatus();
         status.setId("1234");
         status.setFlowFilesReceived(5);
@@ -145,8 +153,6 @@ public class TestQueryNiFiReportingTask {
         Collection<ConnectionStatus> nestedConnectionStatuses2 = new 
ArrayList<>();
         nestedConnectionStatuses2.add(nestedConnectionStatus2);
         groupStatus3.setConnectionStatus(nestedConnectionStatuses2);
-        Collection<ProcessGroupStatus> nestedGroupStatuses2 = new 
ArrayList<>();
-        nestedGroupStatuses2.add(groupStatus3);
 
         Collection<ProcessGroupStatus> groupStatuses = new ArrayList<>();
         groupStatuses.add(groupStatus1);
@@ -156,7 +162,7 @@ public class TestQueryNiFiReportingTask {
     }
 
     @Test
-    public void testConnectionStatusTable() throws IOException, 
InitializationException {
+    void testConnectionStatusTable() throws InitializationException {
         final Map<PropertyDescriptor, String> properties = new HashMap<>();
         properties.put(QueryMetricsUtil.RECORD_SINK, "mock-record-sink");
         properties.put(QueryMetricsUtil.QUERY, "select 
id,queuedCount,isBackPressureEnabled from CONNECTION_STATUS order by 
queuedCount desc");
@@ -193,7 +199,191 @@ public class TestQueryNiFiReportingTask {
     }
 
     @Test
-    public void testJvmMetricsTable() throws IOException, 
InitializationException {
+    void testBulletinIsInTimeWindow() throws InitializationException {
+        String query = "select * from BULLETINS where bulletinTimestamp > 
$bulletinStartTime and bulletinTimestamp <= $bulletinEndTime";
+
+        final Map<PropertyDescriptor, String> properties = new HashMap<>();
+        properties.put(QueryMetricsUtil.QUERY, query);
+        reportingTask = initTask(properties);
+        currentTime.set(Instant.now().toEpochMilli());
+        reportingTask.onTrigger(context);
+
+        List<Map<String, Object>> rows = mockRecordSinkService.getRows();
+        assertEquals(3, rows.size());
+
+        final Bulletin bulletin = 
BulletinFactory.createBulletin(ComponentType.INPUT_PORT.name().toLowerCase(), 
"ERROR", "test bulletin 3", "testFlowFileUuid");
+        mockBulletinRepository.addBulletin(bulletin);
+        currentTime.set(bulletin.getTimestamp().getTime());
+
+        reportingTask.onTrigger(context);
+
+
+        List<Map<String, Object>> sameRows = mockRecordSinkService.getRows();
+        assertEquals(1, sameRows.size());
+    }
+
+    @Test
+    void testBulletinIsOutOfTimeWindow() throws InitializationException {
+        String query = "select * from BULLETINS where bulletinTimestamp > 
$bulletinStartTime and bulletinTimestamp <= $bulletinEndTime";
+
+        final Map<PropertyDescriptor, String> properties = new HashMap<>();
+        properties.put(QueryMetricsUtil.QUERY, query);
+        reportingTask = initTask(properties);
+        currentTime.set(Instant.now().toEpochMilli());
+        reportingTask.onTrigger(context);
+
+        List<Map<String, Object>> rows = mockRecordSinkService.getRows();
+        assertEquals(3, rows.size());
+
+        final Bulletin bulletin = BulletinFactory.createBulletin("input port", 
"ERROR", "test bulletin 3", "testFlowFileUuid");
+        mockBulletinRepository.addBulletin(bulletin);
+        currentTime.set(bulletin.getTimestamp().getTime() - 1);
+
+        reportingTask.onTrigger(context);
+
+        List<Map<String, Object>> sameRows = mockRecordSinkService.getRows();
+        assertEquals(0, sameRows.size());
+    }
+
+    @Test
+    void testProvenanceEventIsInTimeWindow() throws InitializationException {
+        final Map<PropertyDescriptor, String> properties = new HashMap<>();
+        properties.put(QueryMetricsUtil.QUERY, "select * from PROVENANCE where 
timestampMillis > $provenanceStartTime and timestampMillis <= 
$provenanceEndTime");
+        reportingTask = initTask(properties);
+        currentTime.set(Instant.now().toEpochMilli());
+        reportingTask.onTrigger(context);
+
+        List<Map<String, Object>> rows = mockRecordSinkService.getRows();
+        assertEquals(1001, rows.size());
+
+        MockFlowFile mockFlowFile = new MockFlowFile(1002L);
+        ProvenanceEventRecord prov1002 = 
mockProvenanceRepository.eventBuilder()
+                .setEventType(ProvenanceEventType.CREATE)
+                .fromFlowFile(mockFlowFile)
+                .setComponentId("12345")
+                .setComponentType("ReportingTask")
+                .setFlowFileUUID("I am FlowFile 1")
+                .setEventTime(Instant.now().toEpochMilli())
+                .setEventDuration(100)
+                .setTransitUri("test://")
+                .setSourceSystemFlowFileIdentifier("I am FlowFile 1")
+                .setAlternateIdentifierUri("remote://test")
+                .build();
+
+        mockProvenanceRepository.registerEvent(prov1002);
+
+        currentTime.set(prov1002.getEventTime());
+        reportingTask.onTrigger(context);
+
+        List<Map<String, Object>> sameRows = mockRecordSinkService.getRows();
+        assertEquals(1, sameRows.size());
+    }
+
+    @Test
+    void testProvenanceEventIsOutOfTimeWindow() throws InitializationException 
{
+        final Map<PropertyDescriptor, String> properties = new HashMap<>();
+        properties.put(QueryMetricsUtil.RECORD_SINK, "mock-record-sink");
+        properties.put(QueryMetricsUtil.QUERY, "select * from PROVENANCE where 
timestampMillis > $provenanceStartTime and timestampMillis <= 
$provenanceEndTime");
+        reportingTask = initTask(properties);
+        currentTime.set(Instant.now().toEpochMilli());
+        reportingTask.onTrigger(context);
+
+        List<Map<String, Object>> rows = mockRecordSinkService.getRows();
+        assertEquals(1001, rows.size());
+
+        MockFlowFile mockFlowFile = new MockFlowFile(1002L);
+        ProvenanceEventRecord prov1002 = 
mockProvenanceRepository.eventBuilder()
+                .setEventType(ProvenanceEventType.CREATE)
+                .fromFlowFile(mockFlowFile)
+                .setComponentId("12345")
+                .setComponentType("ReportingTask")
+                .setFlowFileUUID("I am FlowFile 1")
+                .setEventTime(Instant.now().toEpochMilli())
+                .setEventDuration(100)
+                .setTransitUri("test://")
+                .setSourceSystemFlowFileIdentifier("I am FlowFile 1")
+                .setAlternateIdentifierUri("remote://test")
+                .build();
+
+        mockProvenanceRepository.registerEvent(prov1002);
+
+        currentTime.set(prov1002.getEventTime() - 1);
+        reportingTask.onTrigger(context);
+
+        List<Map<String, Object>> sameRows = mockRecordSinkService.getRows();
+        assertEquals(0, sameRows.size());
+    }
+
+    @Test
+    void testUniqueProvenanceAndBulletinQuery() throws InitializationException 
{
+        final Map<PropertyDescriptor, String> properties = new HashMap<>();
+        properties.put(QueryMetricsUtil.QUERY, "select * from BULLETINS, 
PROVENANCE where " +
+                "bulletinTimestamp > $bulletinStartTime and bulletinTimestamp 
<= $bulletinEndTime " +
+                "and timestampMillis > $provenanceStartTime and 
timestampMillis <= $provenanceEndTime");
+        reportingTask = initTask(properties);
+        currentTime.set(Instant.now().toEpochMilli());
+        reportingTask.onTrigger(context);
+
+        List<Map<String, Object>> rows = mockRecordSinkService.getRows();
+        assertEquals(3003, rows.size());
+
+        final Bulletin bulletin = 
BulletinFactory.createBulletin(ComponentType.INPUT_PORT.name().toLowerCase(), 
"ERROR", "test bulletin 3", "testFlowFileUuid");
+        mockBulletinRepository.addBulletin(bulletin);
+
+        MockFlowFile mockFlowFile = new MockFlowFile(1002L);
+        ProvenanceEventRecord prov1002 = 
mockProvenanceRepository.eventBuilder()
+                .setEventType(ProvenanceEventType.CREATE)
+                .fromFlowFile(mockFlowFile)
+                .setComponentId("12345")
+                .setComponentType("ReportingTask")
+                .setFlowFileUUID("I am FlowFile 1")
+                .build();
+
+        mockProvenanceRepository.registerEvent(prov1002);
+
+        currentTime.set(bulletin.getTimestamp().getTime());
+        reportingTask.onTrigger(context);
+
+        List<Map<String, Object>> sameRows = mockRecordSinkService.getRows();
+        assertEquals(1, sameRows.size());
+    }
+
+    @Test
+    void testTimeWindowFromStateMap() throws IOException, 
InitializationException {
+        final Map<PropertyDescriptor, String> properties = new HashMap<>();
+        properties.put(QueryMetricsUtil.RECORD_SINK, "mock-record-sink");
+        properties.put(QueryMetricsUtil.QUERY, "select * from BULLETINS, 
PROVENANCE where " +
+                "bulletinTimestamp > $bulletinStartTime and bulletinTimestamp 
<= $bulletinEndTime " +
+                "and timestampMillis > $provenanceStartTime and 
timestampMillis <= $provenanceEndTime");
+        reportingTask = initTask(properties);
+
+        long testBulletinStartTime = 1609538145L;
+        long testProvenanceStartTime = 1641074145L;
+        final Map<String, String> stateMap = new HashMap<>();
+        stateMap.put(TrackedQueryTime.BULLETIN_START_TIME.name(), 
String.valueOf(testBulletinStartTime));
+        stateMap.put(TrackedQueryTime.PROVENANCE_START_TIME.name(), 
String.valueOf(testProvenanceStartTime));
+        mockStateManager.setState(stateMap, Scope.LOCAL);
+
+        final long bulletinStartTime = 
Long.parseLong(context.getStateManager().getState(Scope.LOCAL).get(TrackedQueryTime.BULLETIN_START_TIME.name()));
+        final long provenanceStartTime = 
Long.parseLong(context.getStateManager().getState(Scope.LOCAL).get(TrackedQueryTime.PROVENANCE_START_TIME.name()));
+
+        assertEquals(testBulletinStartTime, bulletinStartTime);
+        assertEquals(testProvenanceStartTime, provenanceStartTime);
+
+        final long currentTime = Instant.now().toEpochMilli();
+        this.currentTime.set(currentTime);
+
+        reportingTask.onTrigger(context);
+
+        final long updatedBulletinStartTime = 
Long.parseLong(context.getStateManager().getState(Scope.LOCAL).get(TrackedQueryTime.BULLETIN_START_TIME.name()));
+        final long updatedProvenanceStartTime = 
Long.parseLong(context.getStateManager().getState(Scope.LOCAL).get(TrackedQueryTime.PROVENANCE_START_TIME.name()));
+
+        assertEquals(currentTime, updatedBulletinStartTime);
+        assertEquals(currentTime, updatedProvenanceStartTime);
+    }
+
+    @Test
+    void testJvmMetricsTable() throws InitializationException {
         final Map<PropertyDescriptor, String> properties = new HashMap<>();
         properties.put(QueryMetricsUtil.RECORD_SINK, "mock-record-sink");
         properties.put(QueryMetricsUtil.QUERY, "select "
@@ -222,7 +412,7 @@ public class TestQueryNiFiReportingTask {
     }
 
     @Test
-    public void testProcessGroupStatusTable() throws IOException, 
InitializationException {
+    void testProcessGroupStatusTable() throws InitializationException {
         final Map<PropertyDescriptor, String> properties = new HashMap<>();
         properties.put(QueryMetricsUtil.RECORD_SINK, "mock-record-sink");
         properties.put(QueryMetricsUtil.QUERY, "select * from 
PROCESS_GROUP_STATUS order by bytesRead asc");
@@ -247,7 +437,7 @@ public class TestQueryNiFiReportingTask {
     }
 
     @Test
-    public void testNoResults() throws IOException, InitializationException {
+    void testNoResults() throws InitializationException {
         final Map<PropertyDescriptor, String> properties = new HashMap<>();
         properties.put(QueryMetricsUtil.RECORD_SINK, "mock-record-sink");
         properties.put(QueryMetricsUtil.QUERY, "select * from 
CONNECTION_STATUS where queuedCount > 2000");
@@ -259,7 +449,7 @@ public class TestQueryNiFiReportingTask {
     }
 
     @Test
-    public void testProvenanceTable() throws IOException, 
InitializationException {
+    void testProvenanceTable() throws InitializationException {
         final Map<PropertyDescriptor, String> properties = new HashMap<>();
         properties.put(QueryMetricsUtil.RECORD_SINK, "mock-record-sink");
         properties.put(QueryMetricsUtil.QUERY, "select * from PROVENANCE order 
by eventId asc");
@@ -304,7 +494,7 @@ public class TestQueryNiFiReportingTask {
     }
 
     @Test
-    public void testBulletinTable() throws IOException, 
InitializationException {
+    void testBulletinTable() throws InitializationException {
         final Map<PropertyDescriptor, String> properties = new HashMap<>();
         properties.put(QueryMetricsUtil.RECORD_SINK, "mock-record-sink");
         properties.put(QueryMetricsUtil.QUERY, "select * from BULLETINS order 
by bulletinTimestamp asc");
@@ -329,12 +519,12 @@ public class TestQueryNiFiReportingTask {
 
         // Validate the third row
         row = rows.get(2);
-        assertEquals("controller service", row.get("bulletinCategory"));
+        assertEquals("controller_service", row.get("bulletinCategory"));
         assertEquals("ERROR", row.get("bulletinLevel"));
         assertEquals(flowFileUuid, row.get("bulletinFlowFileUuid"));
     }
 
-    private MockQueryNiFiReportingTask initTask(Map<PropertyDescriptor, 
String> customProperties) throws InitializationException, IOException {
+    private MockQueryNiFiReportingTask initTask(Map<PropertyDescriptor, 
String> customProperties) throws InitializationException {
 
         final ComponentLog logger = mock(ComponentLog.class);
         reportingTask = new MockQueryNiFiReportingTask();
@@ -342,6 +532,7 @@ public class TestQueryNiFiReportingTask {
         
Mockito.when(initContext.getIdentifier()).thenReturn(UUID.randomUUID().toString());
         Mockito.when(initContext.getLogger()).thenReturn(logger);
         reportingTask.initialize(initContext);
+
         Map<PropertyDescriptor, String> properties = new HashMap<>();
         for (final PropertyDescriptor descriptor : 
reportingTask.getSupportedPropertyDescriptors()) {
             properties.put(descriptor, descriptor.getDefaultValue());
@@ -349,7 +540,10 @@ public class TestQueryNiFiReportingTask {
         properties.putAll(customProperties);
 
         context = mock(ReportingContext.class);
-        Mockito.when(context.getStateManager()).thenReturn(new 
MockStateManager(reportingTask));
+
+        mockStateManager = new MockStateManager(reportingTask);
+
+        Mockito.when(context.getStateManager()).thenReturn(mockStateManager);
         Mockito.doAnswer((Answer<PropertyValue>) invocation -> {
             final PropertyDescriptor descriptor = invocation.getArgument(0, 
PropertyDescriptor.class);
             return new MockPropertyValue(properties.get(descriptor));
@@ -371,7 +565,7 @@ public class TestQueryNiFiReportingTask {
         
Mockito.when(configContext.getProperty(JdbcProperties.VARIABLE_REGISTRY_ONLY_DEFAULT_SCALE)).thenReturn(new
 MockPropertyValue("0"));
         reportingTask.setup(configContext);
 
-        MockProvenanceRepository provenanceRepository = new 
MockProvenanceRepository();
+        mockProvenanceRepository = new MockProvenanceRepository();
         long currentTimeMillis = System.currentTimeMillis();
         Map<String, String> previousAttributes = new HashMap<>();
         previousAttributes.put("mime.type", "application/json");
@@ -385,7 +579,7 @@ public class TestQueryNiFiReportingTask {
         MockProcessSession processSession = new 
MockProcessSession(sharedState, processor);
         MockFlowFile mockFlowFile = processSession.createFlowFile("Test 
content".getBytes());
 
-        ProvenanceEventRecord prov1 = provenanceRepository.eventBuilder()
+        ProvenanceEventRecord prov1 = mockProvenanceRepository.eventBuilder()
                 .setEventType(ProvenanceEventType.CREATE)
                 .fromFlowFile(mockFlowFile)
                 .setComponentId("12345")
@@ -399,12 +593,12 @@ public class TestQueryNiFiReportingTask {
                 .setAttributes(previousAttributes, updatedAttributes)
                 .build();
 
-        provenanceRepository.registerEvent(prov1);
+        mockProvenanceRepository.registerEvent(prov1);
 
         for (int i = 1; i < 1001; i++) {
             String indexString = Integer.toString(i);
             mockFlowFile = processSession.createFlowFile(("Test content " + 
indexString).getBytes());
-            ProvenanceEventRecord prov = provenanceRepository.eventBuilder()
+            ProvenanceEventRecord prov = 
mockProvenanceRepository.eventBuilder()
                     .fromFlowFile(mockFlowFile)
                     .setEventType(ProvenanceEventType.DROP)
                     .setComponentId(indexString)
@@ -412,52 +606,48 @@ public class TestQueryNiFiReportingTask {
                     .setFlowFileUUID("I am FlowFile " + indexString)
                     .setEventTime(currentTimeMillis - i)
                     .build();
-            provenanceRepository.registerEvent(prov);
+            mockProvenanceRepository.registerEvent(prov);
         }
 
-        
Mockito.when(eventAccess.getProvenanceRepository()).thenReturn(provenanceRepository);
+        
Mockito.when(eventAccess.getProvenanceRepository()).thenReturn(mockProvenanceRepository);
+
+        mockBulletinRepository = new MockQueryBulletinRepository();
+        
mockBulletinRepository.addBulletin(BulletinFactory.createBulletin("controller", 
"WARN", "test bulletin 2", "testFlowFileUuid"));
+        
mockBulletinRepository.addBulletin(BulletinFactory.createBulletin(ComponentType.PROCESSOR.name().toLowerCase(),
 "INFO", "test bulletin 1", "testFlowFileUuid"));
+        
mockBulletinRepository.addBulletin(BulletinFactory.createBulletin(ComponentType.CONTROLLER_SERVICE.name().toLowerCase(),
 "ERROR", "test bulletin 2", "testFlowFileUuid"));
 
-        MockBulletinRepository bulletinRepository = new 
MockQueryBulletinRepository();
-        
bulletinRepository.addBulletin(BulletinFactory.createBulletin("controller", 
"WARN", "test bulletin 2", "testFlowFileUuid"));
-        
bulletinRepository.addBulletin(BulletinFactory.createBulletin("processor", 
"INFO", "test bulletin 1", "testFlowFileUuid"));
-        
bulletinRepository.addBulletin(BulletinFactory.createBulletin("controller 
service", "ERROR", "test bulletin 2", "testFlowFileUuid"));
-        
Mockito.when(context.getBulletinRepository()).thenReturn(bulletinRepository);
+        
Mockito.when(context.getBulletinRepository()).thenReturn(mockBulletinRepository);
 
         return reportingTask;
     }
 
-    private static final class MockQueryNiFiReportingTask extends 
QueryNiFiReportingTask {
+    private final class MockQueryNiFiReportingTask extends 
QueryNiFiReportingTask {
+        @Override
+        public long getCurrentTime() {
+            return currentTime.get();
+        }
     }
 
     private static class MockQueryBulletinRepository extends 
MockBulletinRepository {
-
-        List<Bulletin> bulletinList;
-
-
-        public MockQueryBulletinRepository() {
-            bulletinList = new ArrayList<>();
-        }
+        Map<String, List<Bulletin>> bulletins = new HashMap<>();
 
         @Override
         public void addBulletin(Bulletin bulletin) {
-            bulletinList.add(bulletin);
+            bulletins.computeIfAbsent(bulletin.getCategory(), __ -> new 
ArrayList<>())
+                    .add(bulletin);
         }
 
         @Override
         public List<Bulletin> findBulletins(BulletinQuery bulletinQuery) {
-            if (bulletinQuery.getSourceType().equals(ComponentType.PROCESSOR)) 
{
-                return Collections.singletonList(bulletinList.get(1));
-            } else if 
(bulletinQuery.getSourceType().equals(ComponentType.CONTROLLER_SERVICE)) {
-                return Collections.singletonList(bulletinList.get(2));
-            } else {
-                return Collections.emptyList();
-            }
+            return new ArrayList<>(
+                    
Optional.ofNullable(bulletins.get(bulletinQuery.getSourceType().name().toLowerCase()))
+                            .orElse(Collections.emptyList()));
         }
 
         @Override
         public List<Bulletin> findBulletinsForController() {
-            return Collections.singletonList(bulletinList.get(0));
+            return Optional.ofNullable(bulletins.get("controller"))
+                    .orElse(Collections.emptyList());
         }
-
     }
 }
\ No newline at end of file
diff --git 
a/nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/test/java/org/apache/nifi/rules/MockPropertyContextActionHandler.java
 
b/nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/test/java/org/apache/nifi/rules/MockPropertyContextActionHandler.java
index 323317d..dbbb50a 100644
--- 
a/nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/test/java/org/apache/nifi/rules/MockPropertyContextActionHandler.java
+++ 
b/nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/test/java/org/apache/nifi/rules/MockPropertyContextActionHandler.java
@@ -20,18 +20,14 @@ import 
org.apache.nifi.components.AbstractConfigurableComponent;
 import org.apache.nifi.context.PropertyContext;
 import org.apache.nifi.controller.ControllerServiceInitializationContext;
 import org.apache.nifi.reporting.InitializationException;
-import org.apache.nifi.util.Tuple;
 
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
-import java.util.stream.Collectors;
 
-public class MockPropertyContextActionHandler extends 
AbstractConfigurableComponent implements PropertyContextActionHandler{
-
-    private List<Map<String, Object>> rows = new ArrayList<>();
-    private List<Tuple<String,Action>> defaultActions = new ArrayList<>();
-    private List<PropertyContext> propertyContexts = new ArrayList<>();
+public class MockPropertyContextActionHandler extends 
AbstractConfigurableComponent implements PropertyContextActionHandler {
+    private final List<Map<String, Object>> rows = new ArrayList<>();
+    private final List<PropertyContext> propertyContexts = new ArrayList<>();
 
 
     @Override
@@ -43,7 +39,6 @@ public class MockPropertyContextActionHandler extends 
AbstractConfigurableCompon
     @Override
     public void execute(Action action, Map<String, Object> facts) {
         rows.add(facts);
-        defaultActions.add( new Tuple<>(action.getType(),action));
     }
 
 
@@ -56,15 +51,6 @@ public class MockPropertyContextActionHandler extends 
AbstractConfigurableCompon
         return rows;
     }
 
-    public List<Tuple<String, Action>> getDefaultActions() {
-        return defaultActions;
-    }
-
-    public List<Tuple<String,Action>> getDefaultActionsByType(final String 
type){
-        return defaultActions.stream().filter(stringActionTuple -> 
stringActionTuple
-                .getKey().equalsIgnoreCase(type)).collect(Collectors.toList());
-    }
-
     public List<PropertyContext> getPropertyContexts() {
         return propertyContexts;
     }
@@ -73,4 +59,9 @@ public class MockPropertyContextActionHandler extends 
AbstractConfigurableCompon
     public String getIdentifier() {
         return "MockPropertyContextActionHandler";
     }
-}
+
+    public void reset() {
+        rows.clear();
+        propertyContexts.clear();
+    }
+}
\ No newline at end of file
diff --git 
a/nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/test/java/org/apache/nifi/rules/engine/MockRulesEngineService.java
 
b/nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/test/java/org/apache/nifi/rules/engine/MockRulesEngineService.java
index e3ccc73..90e8472 100644
--- 
a/nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/test/java/org/apache/nifi/rules/engine/MockRulesEngineService.java
+++ 
b/nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/test/java/org/apache/nifi/rules/engine/MockRulesEngineService.java
@@ -18,26 +18,21 @@ package org.apache.nifi.rules.engine;
 
 import org.apache.nifi.components.AbstractConfigurableComponent;
 import org.apache.nifi.controller.ControllerServiceInitializationContext;
-import org.apache.nifi.reporting.InitializationException;
 import org.apache.nifi.rules.Action;
+import org.mockito.Mockito;
 
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 
 public class MockRulesEngineService extends AbstractConfigurableComponent 
implements RulesEngineService {
-    private List<Action> actions;
-
-    public MockRulesEngineService(List<Action> actions) {
-        this.actions = actions;
-    }
-
     @Override
     public List<Action> fireRules(Map<String, Object> facts) {
-        return actions;
+        return Collections.singletonList(Mockito.mock(Action.class));
     }
 
     @Override
-    public void initialize(ControllerServiceInitializationContext context) 
throws InitializationException {
+    public void initialize(ControllerServiceInitializationContext context) {
     }
 
     @Override

Reply via email to